Skip to content

Try to integrate fork of Chili parallel rutime #140206

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 72 additions & 113 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion compiler/rustc_data_structures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ indexmap = "2.4.0"
jobserver_crate = { version = "0.1.28", package = "jobserver" }
measureme = "12.0.1"
rustc-hash = "2.0.0"
rustc-rayon-core = { version = "0.5.0" }
chili = { git = "https://github.com/zetanumbers/chili.git", branch = "rustc" }
rustc-stable-hash = { version = "0.1.0", features = ["nightly"] }
rustc_arena = { path = "../rustc_arena" }
rustc_graphviz = { path = "../rustc_graphviz" }
Expand Down
2 changes: 1 addition & 1 deletion compiler/rustc_data_structures/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub use self::freeze::{FreezeLock, FreezeReadGuard, FreezeWriteGuard};
pub use self::lock::{Lock, LockGuard, Mode};
pub use self::mode::{is_dyn_thread_safe, set_dyn_thread_safe_mode};
pub use self::parallel::{
join, par_for_each_in, par_map, parallel_guard, scope, try_par_for_each_in,
join, join4, par_for_each_in, par_map, parallel_guard, try_par_for_each_in,
};
pub use self::vec::{AppendOnlyIndexVec, AppendOnlyVec};
pub use self::worker_local::{Registry, WorkerLocal};
Expand Down
113 changes: 65 additions & 48 deletions compiler/rustc_data_structures/src/sync/parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,49 +58,58 @@ where
(a.unwrap(), b.unwrap())
}

/// Runs a list of blocks in parallel. The first block is executed immediately on
/// the current thread. Use that for the longest running block.
#[macro_export]
macro_rules! parallel {
(impl $fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => {
parallel!(impl $fblock [$block, $($c,)*] [$($rest),*])
};
(impl $fblock:block [$($blocks:expr,)*] []) => {
$crate::sync::parallel_guard(|guard| {
$crate::sync::scope(|s| {
$(
let block = $crate::sync::FromDyn::from(|| $blocks);
s.spawn(move |_| {
guard.run(move || block.into_inner()());
});
)*
guard.run(|| $fblock);
});
});
};
($fblock:block, $($blocks:block),*) => {
if $crate::sync::is_dyn_thread_safe() {
// Reverse the order of the later blocks since Rayon executes them in reverse order
// when using a single thread. This ensures the execution order matches that
// of a single threaded rustc.
parallel!(impl $fblock [] [$($blocks),*]);
} else {
$crate::sync::parallel_guard(|guard| {
guard.run(|| $fblock);
$(guard.run(|| $blocks);)*
});
}
};
}

// This function only works when `mode::is_dyn_thread_safe()`.
pub fn scope<'scope, OP, R>(op: OP) -> R
pub fn join4<F0, F1, F2, F3, R0, R1, R2, R3>(
oper0: F0,
oper1: F1,
oper2: F2,
oper3: F3,
) -> (R0, R1, R2, R3)
where
OP: FnOnce(&rayon_core::Scope<'scope>) -> R + DynSend,
R: DynSend,
F0: FnOnce() -> R0 + DynSend,
F1: FnOnce() -> R1 + DynSend,
F2: FnOnce() -> R2 + DynSend,
F3: FnOnce() -> R3 + DynSend,
R0: DynSend,
R1: DynSend,
R2: DynSend,
R3: DynSend,
{
let op = FromDyn::from(op);
rayon_core::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
if mode::is_dyn_thread_safe() {
let oper0 = FromDyn::from(oper0);
let oper1 = FromDyn::from(oper1);
let oper2 = FromDyn::from(oper2);
let oper3 = FromDyn::from(oper3);
// Swap closures around because Chili executes second one on the current thread
let (r1, (r2, (r3, r0))) = parallel_guard(|guard| {
chili::Scope::with_current(|scope| {
scope.unwrap().join_with_heartbeat_every::<1, _, _, _, _>(
move |_| guard.run(move || FromDyn::from(oper1.into_inner()())),
move |scope| {
scope.join_with_heartbeat_every::<1, _, _, _, _>(
move |_| guard.run(move || FromDyn::from(oper2.into_inner()())),
move |scope| {
scope.join_with_heartbeat_every::<1, _, _, _, _>(
move |_| guard.run(move || FromDyn::from(oper3.into_inner()())),
move |_| guard.run(move || FromDyn::from(oper0.into_inner()())),
)
},
)
},
)
})
});
(
r0.unwrap().into_inner(),
r1.unwrap().into_inner(),
r2.unwrap().into_inner(),
r3.unwrap().into_inner(),
)
} else {
let (r0, r1, r2, r3) = parallel_guard(|guard| {
(guard.run(oper0), guard.run(oper1), guard.run(oper2), guard.run(oper3))
});
(r0.unwrap(), r1.unwrap(), r2.unwrap(), r3.unwrap())
}
}

#[inline]
Expand All @@ -112,11 +121,14 @@ where
if mode::is_dyn_thread_safe() {
let oper_a = FromDyn::from(oper_a);
let oper_b = FromDyn::from(oper_b);
let (a, b) = parallel_guard(|guard| {
rayon_core::join(
move || guard.run(move || FromDyn::from(oper_a.into_inner()())),
move || guard.run(move || FromDyn::from(oper_b.into_inner()())),
)
let (b, a) = parallel_guard(|guard| {
chili::Scope::with_current(|scope| {
scope.unwrap().join_with_heartbeat_every::<1, _, _, _, _>(
// Swap arguments around because Chili executes second one on the current thread
move |_| guard.run(move || FromDyn::from(oper_b.into_inner()())),
move |_| guard.run(move || FromDyn::from(oper_a.into_inner()())),
)
})
});
(a.unwrap().into_inner(), b.unwrap().into_inner())
} else {
Expand All @@ -136,6 +148,7 @@ fn par_slice<I: DynSend>(
}

fn par_rec<I: DynSend, F: Fn(&mut I) + DynSync + DynSend>(
scope: &mut chili::Scope<'_>,
items: &mut [I],
state: &State<'_, F>,
) {
Expand All @@ -147,7 +160,11 @@ fn par_slice<I: DynSend>(
let (left, right) = items.split_at_mut(items.len() / 2);
let mut left = state.for_each.derive(left);
let mut right = state.for_each.derive(right);
rayon_core::join(move || par_rec(*left, state), move || par_rec(*right, state));
scope.join(
// Swap arguments around because Chili executes second one on the current thread
move |scope| par_rec(scope, *right, state),
move |scope| par_rec(scope, *left, state),
);
}
}

Expand All @@ -156,7 +173,7 @@ fn par_slice<I: DynSend>(
guard,
group: std::cmp::max(items.len() / 128, 1),
};
par_rec(items, &state)
chili::Scope::with_current(|scope| par_rec(&mut scope.unwrap(), items, &state));
}

pub fn par_for_each_in<I: DynSend, T: IntoIterator<Item = I>>(
Expand Down
2 changes: 1 addition & 1 deletion compiler/rustc_interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2024"

[dependencies]
# tidy-alphabetical-start
rustc-rayon-core = { version = "0.5.0" }
chili = { git = "https://github.com/zetanumbers/chili.git", branch = "rustc" }
rustc_ast = { path = "../rustc_ast" }
rustc_ast_lowering = { path = "../rustc_ast_lowering" }
rustc_ast_passes = { path = "../rustc_ast_passes" }
Expand Down
35 changes: 17 additions & 18 deletions compiler/rustc_interface/src/passes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ use std::{env, fs, iter};

use rustc_ast as ast;
use rustc_codegen_ssa::traits::CodegenBackend;
use rustc_data_structures::parallel;
use rustc_data_structures::steal::Steal;
use rustc_data_structures::sync::{AppendOnlyIndexVec, FreezeLock, WorkerLocal};
use rustc_data_structures::sync::{AppendOnlyIndexVec, FreezeLock, WorkerLocal, join, join4};
use rustc_expand::base::{ExtCtxt, LintStoreExpand};
use rustc_feature::Features;
use rustc_fs_util::try_canonicalize;
Expand Down Expand Up @@ -902,8 +901,8 @@ fn run_required_analyses(tcx: TyCtxt<'_>) {
rustc_passes::hir_id_validator::check_crate(tcx);
let sess = tcx.sess;
sess.time("misc_checking_1", || {
parallel!(
{
join4(
|| {
sess.time("looking_for_entry_point", || tcx.ensure_ok().entry_fn(()));

sess.time("looking_for_derive_registrar", || {
Expand All @@ -912,27 +911,27 @@ fn run_required_analyses(tcx: TyCtxt<'_>) {

CStore::from_tcx(tcx).report_unused_deps(tcx);
},
{
|| {
tcx.par_hir_for_each_module(|module| {
tcx.ensure_ok().check_mod_loops(module);
tcx.ensure_ok().check_mod_attrs(module);
tcx.ensure_ok().check_mod_naked_functions(module);
tcx.ensure_ok().check_mod_unstable_api_usage(module);
});
},
{
|| {
sess.time("unused_lib_feature_checking", || {
rustc_passes::stability::check_unused_or_stable_features(tcx)
});
},
{
|| {
// We force these queries to run,
// since they might not otherwise get called.
// This marks the corresponding crate-level attributes
// as used, and ensures that their values are valid.
tcx.ensure_ok().limits(());
tcx.ensure_ok().stability_index(());
}
},
);
});

Expand Down Expand Up @@ -1027,36 +1026,36 @@ fn analysis(tcx: TyCtxt<'_>, (): ()) {
}

sess.time("misc_checking_3", || {
parallel!(
{
join(
|| {
tcx.ensure_ok().effective_visibilities(());

parallel!(
{
join4(
|| {
tcx.ensure_ok().check_private_in_public(());
},
{
|| {
tcx.par_hir_for_each_module(|module| {
tcx.ensure_ok().check_mod_deathness(module)
});
},
{
|| {
sess.time("lint_checking", || {
rustc_lint::check_crate(tcx);
});
},
{
|| {
tcx.ensure_ok().clashing_extern_declarations(());
}
},
);
},
{
|| {
sess.time("privacy_checking_modules", || {
tcx.par_hir_for_each_module(|module| {
tcx.ensure_ok().check_mod_privacy(module);
});
});
}
},
);

// This check has to be run after all lints are done processing. We don't
Expand Down
60 changes: 33 additions & 27 deletions compiler/rustc_interface/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::env::consts::{DLL_PREFIX, DLL_SUFFIX};
use std::num::NonZero;
use std::path::{Path, PathBuf};
use std::sync::OnceLock;
use std::sync::atomic::{AtomicBool, Ordering};
Expand Down Expand Up @@ -190,17 +191,17 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
let current_gcx = FromDyn::from(CurrentGcx::new());
let current_gcx2 = current_gcx.clone();

let builder = rayon_core::ThreadPoolBuilder::new()
.thread_name(|_| "rustc".to_string())
.acquire_thread_handler(jobserver::acquire_thread)
.release_thread_handler(jobserver::release_thread)
.num_threads(threads)
.deadlock_handler(move || {
let config = chili::Config {
// .thread_name(|_| "rustc".to_string())
acquire_thread_handler: Some(Box::new(jobserver::acquire_thread)),
release_thread_handler: Some(Box::new(jobserver::release_thread)),
thread_count: NonZero::new(threads),
stack_size: NonZero::new(thread_stack_size),
deadlock_handler: Some(Box::new(move || {
// On deadlock, creates a new thread and forwards information in thread
// locals to it. The new thread runs the deadlock handler.

let current_gcx2 = current_gcx2.clone();
let registry = rayon_core::Registry::current();
let session_globals = rustc_span::with_session_globals(|session_globals| {
session_globals as *const SessionGlobals as usize
});
Expand All @@ -226,16 +227,19 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
// We need the complete map to ensure we find a cycle to break.
QueryCtxt::new(tcx).collect_active_jobs().ok().expect("failed to collect active queries in deadlock handler")
});
break_query_cycles(query_map, &registry);
break_query_cycles(query_map);
})
})
});

on_panic.disable();
})
.unwrap();
})
.stack_size(thread_stack_size);
})),
// TODO: Tune heartbeat_interval
// heartbeat_interval: Duration::from_micros(100),
..Default::default()
};

// We create the session globals on the main thread, then create the thread
// pool. Upon creation, each worker thread created gets a copy of the
Expand All @@ -244,23 +248,25 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce(CurrentGcx) -> R + Send,
rustc_span::create_session_globals_then(edition, extra_symbols, Some(sm_inputs), || {
rustc_span::with_session_globals(|session_globals| {
let session_globals = FromDyn::from(session_globals);
builder
.build_scoped(
// Initialize each new worker thread when created.
move |thread: rayon_core::ThreadBuilder| {
// Register the thread for use with the `WorkerLocal` type.
registry.register();

rustc_span::set_session_globals_then(session_globals.into_inner(), || {
thread.run()
})
},
// Run `f` on the first thread in the thread pool.
move |pool: &rayon_core::ThreadPool| {
pool.install(|| f(current_gcx.into_inner()))
},
)
.unwrap()
chili::ThreadPool::scoped_global(
config,
// Initialize each new worker thread when created.
move |thread: chili::ThreadBuilder| {
// Register the thread for use with the `WorkerLocal` type.
registry.register();

rustc_span::set_session_globals_then(session_globals.into_inner(), || {
thread.run()
})
},
// Run `f` on the first thread in the thread pool.
move || {
chili::Scope::with_current(|scope| {
scope.unwrap().install(|_| f(current_gcx.into_inner()))
})
},
)
.unwrap()
})
})
}
Expand Down
2 changes: 1 addition & 1 deletion compiler/rustc_middle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ bitflags = "2.4.1"
either = "1.5.0"
gsgdt = "0.1.2"
polonius-engine = "0.13.0"
rustc-rayon-core = { version = "0.5.0" }
chili = { git = "https://github.com/zetanumbers/chili.git", branch = "rustc" }
rustc_abi = { path = "../rustc_abi" }
rustc_apfloat = "0.2.0"
rustc_arena = { path = "../rustc_arena" }
Expand Down
2 changes: 1 addition & 1 deletion compiler/rustc_middle/src/ty/context/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl<'a, 'tcx> ImplicitCtxt<'a, 'tcx> {
}

// Import the thread-local variable from Rayon, which is preserved for Rayon jobs.
use rayon_core::tlv::TLV;
use chili::tlv::TLV;

#[inline]
fn erase(context: &ImplicitCtxt<'_, '_>) -> *const () {
Expand Down
Loading