Skip to content

Implement multithreading for snapshot generation #7987

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/base.zig
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub const ModuleImport = @import("base/ModuleImport.zig");
pub const StringLiteral = @import("base/StringLiteral.zig");
pub const RegionInfo = @import("base/RegionInfo.zig");
pub const Scratch = @import("base/Scratch.zig").Scratch;
pub const parallel = @import("base/parallel.zig");

/// Whether a function calls itself.
pub const Recursive = enum {
Expand Down
186 changes: 186 additions & 0 deletions src/base/parallel.zig
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of putting this in base necessarily. Different concurrent work would want to be mapped differently. Though maybe this is the dfault type for large compiler work and fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess a chunk size setting could also be added later

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different concurrent work would want to be mapped differently.

100% agree. I'm not intending this to be a be-all-end-all; just serve this particular simple need at the moment.

Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
const std = @import("std");
const Allocator = std.mem.Allocator;
const Thread = std.Thread;

/// Atomic type for thread-safe usize operations
pub const AtomicUsize = std.atomic.Value(usize);

/// Processing options for parallel execution
pub const ProcessOptions = struct {
max_threads: usize,
use_per_thread_arenas: bool,
};

/// Worker thread function signature
/// Takes: allocator, context, item_id -> void
pub fn WorkerFn(comptime T: type) type {
return *const fn (allocator: Allocator, context: *T, item_id: usize) void;
}

/// Internal worker thread context
fn WorkerContext(comptime T: type) type {
return struct {
work_item_count: usize,
index: *AtomicUsize,
worker_fn: WorkerFn(T),
context: *T,
base_allocator: Allocator,
options: ProcessOptions,
};
}

/// Worker thread implementation using work-stealing
fn workerThread(comptime T: type, ctx: WorkerContext(T)) void {
if (ctx.options.use_per_thread_arenas) {
// Use per-thread arena allocator with page allocator that clears between work items
// var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
var arena = std.heap.ArenaAllocator.init(ctx.base_allocator);
defer arena.deinit();

while (true) {
const i = ctx.index.fetchAdd(1, .monotonic);
if (i >= ctx.work_item_count) break;

// Clear arena between work items
_ = arena.reset(.retain_capacity);

ctx.worker_fn(arena.allocator(), ctx.context, i);
}
} else {
// Use the base allocator directly
while (true) {
const i = ctx.index.fetchAdd(1, .monotonic);
if (i >= ctx.work_item_count) break;
ctx.worker_fn(ctx.base_allocator, ctx.context, i);
}
}
}

/// Process work items in parallel across multiple threads
///
/// Generic function that:
/// 1. Takes a count of work items
/// 2. Spawns worker threads to process items
///
/// Example usage:
/// ```
/// const MyWorkItem = struct { path: []const u8 };
///
/// fn processItem(allocator: Allocator, item: MyWorkItem) bool {
/// // Process the work item
/// std.log.info("processing {s}", .{item.path});
/// return true; // or false on failure
/// }
///
/// const result = try processParallel(MyWorkItem, allocator, work_items, processItem, .{});
/// ```
pub fn process(
comptime T: type,
context: *T,
worker_fn: WorkerFn(T),
allocator: Allocator,
work_item_count: usize,
options: ProcessOptions,
) !void {
if (work_item_count == 0) {
return;
}

if (options.max_threads == 1) {
// Process everything in main thread
var index = AtomicUsize.init(0);
const ctx = WorkerContext(T){
.work_item_count = work_item_count,
.index = &index,
.worker_fn = worker_fn,
.context = context,
.base_allocator = allocator,
.options = options,
};
workerThread(T, ctx);
} else {
const thread_count = @min(
if (options.max_threads == 0) std.Thread.getCpuCount() catch 1 else options.max_threads,
work_item_count,
);

var index = AtomicUsize.init(0);
const fixed_stack_thread_count: usize = 16;
var threads: [fixed_stack_thread_count]Thread = undefined;
var extra_threads: std.ArrayList(Thread) = undefined;

if (thread_count > fixed_stack_thread_count) {
extra_threads = std.ArrayList(Thread).init(allocator);
}

// Start worker threads
for (0..thread_count) |i| {
const ctx = WorkerContext(T){
.work_item_count = work_item_count,
.index = &index,
.worker_fn = worker_fn,
.context = context,
.base_allocator = allocator,
.options = options,
};
if (i < threads.len) {
threads[i] = try Thread.spawn(.{}, workerThread, .{ T, ctx });
} else {
try extra_threads.append(try Thread.spawn(.{}, workerThread, .{ T, ctx }));
}
}

// Wait for all threads to complete
for (threads[0..@min(thread_count, fixed_stack_thread_count)]) |thread| {
thread.join();
}
if (thread_count > fixed_stack_thread_count) {
for (extra_threads.items) |thread| {
thread.join();
}
extra_threads.deinit();
}
}
}

test "process basic functionality" {
const testing = std.testing;
const allocator = testing.allocator;

const MyContext = struct {
items: []const i32,
outputs: []i32,
};

const TestWorker = struct {
fn worker(worker_allocator: Allocator, item: *MyContext, item_id: usize) void {
_ = worker_allocator; // unused in this test
const value = item.items[item_id];
if (value < 0) {
item.outputs[item_id] = -1;
} else {
item.outputs[item_id] = value * value;
}
}
};

var outputs: [5]i32 = undefined; // Preallocate output array

var context = MyContext{
.items = &[_]i32{ 1, 2, -3, 4, 5 },
.outputs = &outputs,
};

try process(
MyContext,
&context,
TestWorker.worker,
allocator,
outputs.len,
.{ .max_threads = 1, .use_per_thread_arenas = false },
);
try testing.expectEqual(
outputs,
[_]i32{ 1, 4, -1, 16, 25 },
);
}
Loading
Loading