Skip to content

Commit 9e11d1f

Browse files
committed
V3 Barrier primitive: access callback return value, improve docs.
1 parent 3a6b718 commit 9e11d1f

File tree

3 files changed

+73
-18
lines changed

3 files changed

+73
-18
lines changed

v3/docs/TUTORIAL.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -927,18 +927,25 @@ Constructor.
927927
Mandatory arg:
928928
* `participants` The number of coros which will use the barrier.
929929
Optional args:
930-
* `func` Callback to run. Default `None`.
930+
* `func` Callback or coroutine to run. Default `None`.
931931
* `args` Tuple of args for the callback. Default `()`.
932932

933933
Public synchronous methods:
934934
* `busy` No args. Returns `True` if at least one coro is waiting on the
935935
barrier, or if at least one non-waiting coro has not triggered it.
936936
* `trigger` No args. The barrier records that the coro has passed the critical
937937
point. Returns "immediately".
938+
* `result` No args. If a callback was provided, returns the return value from
939+
the callback. If a coro, returns the `Task` instance. See below.
938940

939941
The callback can be a function or a coro. Typically a function will be used; it
940942
must run to completion beore the barrier is released. A coro will be promoted
941-
to a `Task` and run asynchronously.
943+
to a `Task` and run asynchronously. The `Task` may be retrieved (e.g. for
944+
cancellation) using the `result` method.
945+
946+
If a coro waits on a barrier, it should issue an `await` prior to accessing the
947+
`result` method. To guarantee that the callback has run it is necessary to wait
948+
until all participant coros have passed the barrier.
942949

943950
Participant coros issue `await my_barrier` whereupon execution pauses until all
944951
other participants are also waiting on it. At this point any callback will run

v3/primitives/barrier.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,29 +27,31 @@ def __init__(self, participants, func=None, args=()):
2727
self._func = func
2828
self._args = args
2929
self._reset(True)
30+
self._res = None
3031

3132
def __await__(self):
32-
self._update()
33-
if self._at_limit(): # All other threads are also at limit
34-
if self._func is not None:
35-
launch(self._func, self._args)
36-
self._reset(not self._down) # Toggle direction to release others
33+
if self.trigger():
3734
return
3835

3936
direction = self._down
40-
while True: # Wait until last waiting thread changes the direction
37+
while True: # Wait until last waiting task changes the direction
4138
if direction != self._down:
4239
return
4340
await asyncio.sleep_ms(0)
4441

4542
__iter__ = __await__
4643

44+
def result(self):
45+
return self._res
46+
4747
def trigger(self):
4848
self._update()
49-
if self._at_limit(): # All other threads are also at limit
49+
if self._at_limit(): # All other tasks are also at limit
5050
if self._func is not None:
51-
launch(self._func, self._args)
51+
self._res = launch(self._func, self._args)
5252
self._reset(not self._down) # Toggle direction to release others
53+
return True
54+
return False
5355

5456
def _reset(self, down):
5557
self._down = down

v3/primitives/tests/asyntest.py

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ def print_tests():
2424
test(0) Print this list.
2525
test(1) Test message acknowledge.
2626
test(2) Test Messge and Lock objects.
27-
test(3) Test the Barrier class.
28-
test(4) Test Semaphore
29-
test(5) Test BoundedSemaphore.
30-
test(6) Test the Condition class.
31-
test(7) Test the Queue class.
27+
test(3) Test the Barrier class with callback.
28+
test(4) Test the Barrier class with coroutine.
29+
test(5) Test Semaphore
30+
test(6) Test BoundedSemaphore.
31+
test(7) Test the Condition class.
32+
test(8) Test the Queue class.
3233
'''
3334
print('\x1b[32m')
3435
print(st)
@@ -186,6 +187,49 @@ def barrier_test():
186187
asyncio.create_task(report(barrier))
187188
asyncio.run(killer(2))
188189

190+
# ************ Barrier test 1 ************
191+
192+
async def my_coro(text):
193+
try:
194+
await asyncio.sleep_ms(0)
195+
while True:
196+
await asyncio.sleep(1)
197+
print(text)
198+
except asyncio.CancelledError:
199+
print('my_coro was cancelled.')
200+
201+
async def report1(barrier, x):
202+
await asyncio.sleep(x)
203+
print('report instance', x, 'waiting')
204+
await barrier
205+
print('report instance', x, 'done')
206+
207+
async def bart():
208+
barrier = Barrier(4, my_coro, ('my_coro running',))
209+
for x in range(3):
210+
asyncio.create_task(report1(barrier, x))
211+
await barrier
212+
# Must yield before reading result(). Here we wait long enough for
213+
await asyncio.sleep_ms(1500) # coro to print
214+
barrier.result().cancel()
215+
await asyncio.sleep(2)
216+
217+
def barrier_test1():
218+
printexp('''Running (runtime = 5s):
219+
report instance 0 waiting
220+
report instance 1 waiting
221+
report instance 2 waiting
222+
report instance 2 done
223+
report instance 1 done
224+
report instance 0 done
225+
my_coro running
226+
my_coro was cancelled.
227+
228+
Exact report instance done sequence may vary, but 3 instances should report
229+
done before my_coro runs.
230+
''', 5)
231+
asyncio.run(bart())
232+
189233
# ************ Semaphore test ************
190234

191235
async def run_sema(n, sema, barrier):
@@ -373,12 +417,14 @@ def test(n):
373417
elif n == 3:
374418
barrier_test() # Test the Barrier class.
375419
elif n == 4:
376-
semaphore_test(False) # Test Semaphore
420+
barrier_test1() # Test the Barrier class.
377421
elif n == 5:
378-
semaphore_test(True) # Test BoundedSemaphore.
422+
semaphore_test(False) # Test Semaphore
379423
elif n == 6:
380-
condition_test() # Test the Condition class.
424+
semaphore_test(True) # Test BoundedSemaphore.
381425
elif n == 7:
426+
condition_test() # Test the Condition class.
427+
elif n == 8:
382428
queue_test() # Test the Queue class.
383429
except KeyboardInterrupt:
384430
print('Interrupted')

0 commit comments

Comments
 (0)