Skip to content

Commit 0590de8

Browse files
Implement multithreading for snapshot generation
1 parent d2d6d15 commit 0590de8

File tree

3 files changed

+346
-36
lines changed

3 files changed

+346
-36
lines changed

src/base.zig

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub const ModuleImport = @import("base/ModuleImport.zig");
1313
pub const StringLiteral = @import("base/StringLiteral.zig");
1414
pub const RegionInfo = @import("base/RegionInfo.zig");
1515
pub const Scratch = @import("base/Scratch.zig").Scratch;
16+
pub const parallel = @import("base/parallel.zig");
1617

1718
pub const ModuleWork = module_work.ModuleWork;
1819
pub const ModuleWorkIdx = module_work.ModuleWorkIdx;

src/base/parallel.zig

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
const std = @import("std");
2+
const Allocator = std.mem.Allocator;
3+
const Thread = std.Thread;
4+
5+
/// Maximum number of threads that can be spawned
6+
const MAX_THREADS = 128;
7+
8+
/// Configuration for parallel execution
9+
pub const ParallelConfig = struct {
10+
/// Maximum number of threads to spawn (0 = auto-detect from CPU count)
11+
max_threads: usize = 0,
12+
13+
/// Force single-threaded execution
14+
single_threaded: bool = false,
15+
};
16+
17+
/// Result of parallel execution
18+
pub const ParallelResult = struct {
19+
success: usize,
20+
failed: usize,
21+
};
22+
23+
/// Worker thread function signature
24+
/// Takes: allocator, work_item -> bool (success/failure)
25+
pub fn WorkerFn(comptime T: type) type {
26+
return *const fn (allocator: Allocator, work_item: T) bool;
27+
}
28+
29+
/// Internal worker thread context
30+
fn WorkerContext(comptime T: type) type {
31+
return struct {
32+
allocator: Allocator,
33+
work_items: []const T,
34+
index: *std.atomic.Value(usize),
35+
success_count: *std.atomic.Value(usize),
36+
failed_count: *std.atomic.Value(usize),
37+
worker_fn: WorkerFn(T),
38+
thread_id: usize,
39+
};
40+
}
41+
42+
/// Worker thread implementation using work-stealing
43+
fn workerThread(comptime T: type, ctx: WorkerContext(T)) void {
44+
while (true) {
45+
const i = ctx.index.fetchAdd(1, .seq_cst);
46+
if (i >= ctx.work_items.len) break;
47+
48+
const work_item = ctx.work_items[i];
49+
const success = ctx.worker_fn(ctx.allocator, work_item);
50+
51+
if (success) {
52+
_ = ctx.success_count.fetchAdd(1, .seq_cst);
53+
} else {
54+
_ = ctx.failed_count.fetchAdd(1, .seq_cst);
55+
}
56+
}
57+
}
58+
59+
/// Process work items in parallel across multiple threads
60+
///
61+
/// Generic function that:
62+
/// 1. Takes an array of work items of type T
63+
/// 2. Calls worker_fn for each item on available threads
64+
/// 3. Returns success/failure counts
65+
///
66+
/// Example usage:
67+
/// ```
68+
/// const MyWorkItem = struct { path: []const u8 };
69+
///
70+
/// fn processItem(allocator: Allocator, item: MyWorkItem) bool {
71+
/// // Process the work item
72+
/// std.log.info("processing {s}", .{item.path});
73+
/// return true; // or false on failure
74+
/// }
75+
///
76+
/// const result = try processParallel(MyWorkItem, allocator, work_items, processItem, .{});
77+
/// ```
78+
pub fn processParallel(
79+
comptime T: type,
80+
allocator: Allocator,
81+
work_items: []const T,
82+
worker_fn: WorkerFn(T),
83+
config: ParallelConfig,
84+
) !ParallelResult {
85+
if (work_items.len == 0) {
86+
return ParallelResult{ .success = 0, .failed = 0 };
87+
}
88+
89+
var success_count = std.atomic.Value(usize).init(0);
90+
var failed_count = std.atomic.Value(usize).init(0);
91+
92+
if (config.single_threaded) {
93+
// Process everything in main thread
94+
var index = std.atomic.Value(usize).init(0);
95+
const ctx = WorkerContext(T){
96+
.allocator = allocator,
97+
.work_items = work_items,
98+
.index = &index,
99+
.success_count = &success_count,
100+
.failed_count = &failed_count,
101+
.worker_fn = worker_fn,
102+
.thread_id = 0,
103+
};
104+
workerThread(T, ctx);
105+
} else {
106+
// Process in parallel using work-stealing
107+
const cpu_count = std.Thread.getCpuCount() catch 1;
108+
const max_threads = if (config.max_threads == 0) cpu_count else config.max_threads;
109+
const thread_count = @min(@min(max_threads, MAX_THREADS), work_items.len);
110+
111+
var index = std.atomic.Value(usize).init(0);
112+
var threads: [MAX_THREADS]Thread = undefined;
113+
114+
// Start worker threads
115+
for (0..thread_count) |i| {
116+
const ctx = WorkerContext(T){
117+
.allocator = allocator,
118+
.work_items = work_items,
119+
.index = &index,
120+
.success_count = &success_count,
121+
.failed_count = &failed_count,
122+
.worker_fn = worker_fn,
123+
.thread_id = i,
124+
};
125+
threads[i] = try Thread.spawn(.{}, workerThread, .{ T, ctx });
126+
}
127+
128+
// Wait for all threads to complete
129+
for (threads[0..thread_count]) |thread| {
130+
thread.join();
131+
}
132+
}
133+
134+
return ParallelResult{
135+
.success = success_count.load(.seq_cst),
136+
.failed = failed_count.load(.seq_cst),
137+
};
138+
}
139+
140+
/// Convenience function for simple parallel map operations
141+
/// Maps each input item to an output item using the provided function
142+
pub fn parallelMap(
143+
comptime T: type,
144+
comptime R: type,
145+
allocator: Allocator,
146+
inputs: []const T,
147+
outputs: []R,
148+
map_fn: fn (T) R,
149+
config: ParallelConfig,
150+
) !void {
151+
if (inputs.len != outputs.len) {
152+
return error.LengthMismatch;
153+
}
154+
155+
const MapItem = struct { input: T, output_ptr: *R };
156+
157+
const MapWorker = struct {
158+
fn worker(alloc: Allocator, item: MapItem) bool {
159+
_ = alloc; // unused
160+
item.output_ptr.* = map_fn(item.input);
161+
return true;
162+
}
163+
};
164+
165+
var work_items = try allocator.alloc(MapItem, inputs.len);
166+
defer allocator.free(work_items);
167+
168+
for (inputs, outputs, 0..) |input, *output, i| {
169+
work_items[i] = .{ .input = input, .output_ptr = output };
170+
}
171+
172+
const result = try processParallel(
173+
MapItem,
174+
allocator,
175+
work_items,
176+
MapWorker.worker,
177+
config,
178+
);
179+
180+
if (result.failed > 0) {
181+
return error.SomeWorkItemsFailed;
182+
}
183+
}
184+
185+
test "processParallel basic functionality" {
186+
const testing = std.testing;
187+
const allocator = testing.allocator;
188+
189+
const WorkItem = struct { value: i32 };
190+
191+
const TestWorker = struct {
192+
fn worker(alloc: Allocator, item: WorkItem) bool {
193+
_ = alloc; // unused
194+
return item.value >= 0; // Success if non-negative
195+
}
196+
};
197+
198+
const work_items = [_]WorkItem{
199+
.{ .value = 1 },
200+
.{ .value = 2 },
201+
.{ .value = -1 }, // This will fail
202+
.{ .value = 4 },
203+
};
204+
205+
const result = try processParallel(
206+
WorkItem,
207+
allocator,
208+
&work_items,
209+
TestWorker.worker,
210+
.{ .single_threaded = true },
211+
);
212+
213+
try testing.expectEqual(@as(usize, 3), result.success);
214+
try testing.expectEqual(@as(usize, 1), result.failed);
215+
}
216+
217+
test "parallelMap basic functionality" {
218+
const testing = std.testing;
219+
const allocator = testing.allocator;
220+
221+
const square = struct {
222+
fn square(x: i32) i32 {
223+
return x * x;
224+
}
225+
}.square;
226+
227+
const inputs = [_]i32{ 1, 2, 3, 4, 5 };
228+
var outputs: [5]i32 = undefined;
229+
230+
try parallelMap(
231+
i32,
232+
i32,
233+
allocator,
234+
&inputs,
235+
&outputs,
236+
square,
237+
.{ .single_threaded = true },
238+
);
239+
240+
const expected = [_]i32{ 1, 4, 9, 16, 25 };
241+
try testing.expectEqualSlices(i32, &expected, &outputs);
242+
}

0 commit comments

Comments
 (0)