@@ -4,41 +4,72 @@ module Nanoc::Int::Compiler::Phases
4
4
class Write < Abstract
5
5
include Nanoc ::Int ::ContractsSupport
6
6
7
+ class Worker
8
+ def initialize ( queue :, snapshot_repo :)
9
+ @queue = queue
10
+ @snapshot_repo = snapshot_repo
11
+ end
12
+
13
+ def start
14
+ @thread = Thread . new do
15
+ Thread . current . abort_on_exception = true
16
+ Thread . current . priority = -1 # schedule I/O work ASAP
17
+
18
+ writer = Nanoc ::Int ::ItemRepWriter . new
19
+
20
+ while rep = @queue . pop # rubocop:disable Lint/AssignmentInCondition
21
+ writer . write_all ( rep , @snapshot_repo )
22
+ end
23
+ end
24
+ end
25
+
26
+ def join
27
+ @thread . join
28
+ end
29
+ end
30
+
31
+ class WorkerPool
32
+ def initialize ( queue :, size :, snapshot_repo :)
33
+ @workers = Array . new ( size ) { Worker . new ( queue : queue , snapshot_repo : snapshot_repo ) }
34
+ end
35
+
36
+ def start
37
+ @workers . each ( &:start )
38
+ end
39
+
40
+ def join
41
+ @workers . each ( &:join )
42
+ end
43
+ end
44
+
45
+ QUEUE_SIZE = 1000
46
+ WORKER_POOL_SIZE = 5
47
+
7
48
def initialize ( snapshot_repo :, wrapped :)
8
49
super ( wrapped : wrapped )
9
50
10
51
@snapshot_repo = snapshot_repo
11
52
12
- @queue_to_write = SizedQueue . new ( 1000 )
53
+ @queue = SizedQueue . new ( QUEUE_SIZE )
54
+ @worker_pool = WorkerPool . new ( queue : @queue , size : WORKER_POOL_SIZE , snapshot_repo : @snapshot_repo )
13
55
end
14
56
15
57
def start
16
58
super
17
-
18
- @thread = Thread . new do
19
- Thread . current . abort_on_exception = true
20
- Thread . current . priority = -1 # schedule I/O work ASAP
21
-
22
- writer = Nanoc ::Int ::ItemRepWriter . new
23
-
24
- while rep = @queue_to_write . pop # rubocop:disable Lint/AssignmentInCondition
25
- writer . write_all ( rep , @snapshot_repo )
26
- end
27
- end
59
+ @worker_pool . start
28
60
end
29
61
30
62
def stop
31
63
super
32
-
33
- @queue_to_write . close
34
- @thread . join
64
+ @queue . close
65
+ @worker_pool . join
35
66
end
36
67
37
68
contract Nanoc ::Int ::ItemRep , C ::KeywordArgs [ is_outdated : C ::Bool ] , C ::Func [ C ::None => C ::Any ] => C ::Any
38
69
def run ( rep , is_outdated :) # rubocop:disable Lint/UnusedMethodArgument
39
70
yield
40
71
41
- @queue_to_write << rep
72
+ @queue << rep
42
73
43
74
Nanoc ::Int ::NotificationCenter . post ( :rep_write_enqueued , rep )
44
75
end
0 commit comments