Skip to content

Commit 2ac972d

Browse files
committed
V3 primitives replace polling with Event.
1 parent 53c368d commit 2ac972d

File tree

4 files changed

+35
-29
lines changed

4 files changed

+35
-29
lines changed

v3/docs/TUTORIAL.md

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -500,8 +500,8 @@ target. A primitive is loaded by issuing (for example):
500500
from primitives.semaphore import Semaphore, BoundedSemaphore
501501
from primitives.queue import Queue
502502
```
503-
When `uasyncio` acquires an official version (which will be more efficient) the
504-
invocation lines alone should be changed:
503+
When `uasyncio` acquires official versions of the CPython primitives the
504+
invocation lines alone should be changed. e.g. :
505505
```python
506506
from uasyncio import Semaphore, BoundedSemaphore
507507
from uasyncio import Queue
@@ -848,7 +848,7 @@ asyncio.run(queue_go(4))
848848

849849
## 3.6 Message
850850

851-
This is an unofficial primitive and has no analog in CPython asyncio.
851+
This is an unofficial primitive and has no counterpart in CPython asyncio.
852852

853853
This is a minor adaptation of the `Event` class. It provides the following:
854854
* `.set()` has an optional data payload.
@@ -910,8 +910,8 @@ Secondly it can allow a task to pause until one or more other tasks have
910910
terminated or passed a particular point. For example an application might want
911911
to shut down various peripherals before starting a sleep period. The task
912912
wanting to sleep initiates several shut down tasks and waits until they have
913-
triggered the barrier to indicate completion. This use case may be better
914-
served by `gather`.
913+
triggered the barrier to indicate completion. This use case may also be served
914+
by `gather`.
915915

916916
The key difference between `Barrier` and `gather` is symmetry: `gather` is
917917
asymmetrical. One task owns the `gather` and awaits completion of a set of
@@ -921,9 +921,8 @@ the `while True:` constructs common in firmware applications. Use of `gather`
921921
would imply instantiating a set of tasks on every pass of the loop.
922922

923923
`gather` provides access to return values; irrelevant to `Barrier` because
924-
passing a barrier does not imply return.
925-
926-
Currently `gather` is more efficient.
924+
passing a barrier does not imply return. `Barrier` now has an efficient
925+
implementation using `Event` to suspend waiting tasks.
927926

928927
Constructor.
929928
Mandatory arg:

v3/primitives/barrier.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
# Copyright (c) 2018-2020 Peter Hinch
33
# Released under the MIT License (MIT) - see LICENSE file
44

5+
# Now uses Event rather than polling.
6+
57
try:
68
import uasyncio as asyncio
79
except ImportError:
@@ -14,20 +16,14 @@
1416
# At that point the callback is executed. Then the barrier is 'opened' and
1517
# execution of all participants resumes.
1618

17-
# The nowait arg is to support task cancellation. It enables usage where one or
18-
# more coros can register that they have reached the barrier without waiting
19-
# for it. Any coros waiting normally on the barrier will pause until all
20-
# non-waiting coros have passed the barrier and all waiting ones have reached
21-
# it. The use of nowait promotes efficiency by enabling tasks which have been
22-
# cancelled to leave the task queue as soon as possible.
23-
2419
class Barrier():
2520
def __init__(self, participants, func=None, args=()):
2621
self._participants = participants
2722
self._func = func
2823
self._args = args
2924
self._reset(True)
3025
self._res = None
26+
self._evt = asyncio.Event()
3127

3228
def __await__(self):
3329
if self.trigger():
@@ -37,15 +33,19 @@ def __await__(self):
3733
while True: # Wait until last waiting task changes the direction
3834
if direction != self._down:
3935
return
40-
await asyncio.sleep_ms(0)
36+
await self._evt.wait()
37+
self._evt.clear()
4138

4239
__iter__ = __await__
4340

4441
def result(self):
4542
return self._res
4643

4744
def trigger(self):
48-
self._update()
45+
self._count += -1 if self._down else 1
46+
if self._count < 0 or self._count > self._participants:
47+
raise ValueError('Too many tasks accessing Barrier')
48+
self._evt.set()
4949
if self._at_limit(): # All other tasks are also at limit
5050
if self._func is not None:
5151
self._res = launch(self._func, self._args)
@@ -67,8 +67,3 @@ def busy(self):
6767
def _at_limit(self): # Has count reached up or down limit?
6868
limit = 0 if self._down else self._participants
6969
return self._count == limit
70-
71-
def _update(self):
72-
self._count += -1 if self._down else 1
73-
if self._count < 0 or self._count > self._participants:
74-
raise ValueError('Too many tasks accessing Barrier')

v3/primitives/queue.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,18 @@ class Queue:
2323
def __init__(self, maxsize=0):
2424
self.maxsize = maxsize
2525
self._queue = []
26+
self._evput = asyncio.Event() # Triggered by put, tested by get
27+
self._evget = asyncio.Event() # Triggered by get, tested by put
2628

2729
def _get(self):
30+
self._evget.set()
2831
return self._queue.pop(0)
2932

3033
async def get(self): # Usage: item = await queue.get()
31-
while self.empty():
34+
if self.empty():
3235
# Queue is empty, put the calling Task on the waiting queue
33-
await asyncio.sleep_ms(0)
36+
await self._evput.wait()
37+
self._evput.clear()
3438
return self._get()
3539

3640
def get_nowait(self): # Remove and return an item from the queue.
@@ -40,12 +44,14 @@ def get_nowait(self): # Remove and return an item from the queue.
4044
return self._get()
4145

4246
def _put(self, val):
47+
self._evput.set()
4348
self._queue.append(val)
4449

4550
async def put(self, val): # Usage: await queue.put(item)
46-
while self.qsize() >= self.maxsize and self.maxsize:
51+
if self.qsize() >= self.maxsize and self.maxsize:
4752
# Queue full
48-
await asyncio.sleep_ms(0)
53+
await self._evget.wait()
54+
self._evget.clear()
4955
# Task(s) waiting to get from queue, schedule first Task
5056
self._put(val)
5157

v3/primitives/semaphore.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
class Semaphore():
1414
def __init__(self, value=1):
1515
self._count = value
16+
self._event = asyncio.Event()
1617

1718
async def __aenter__(self):
1819
await self.acquire()
@@ -23,11 +24,16 @@ async def __aexit__(self, *args):
2324
await asyncio.sleep(0)
2425

2526
async def acquire(self):
26-
while self._count == 0:
27-
await asyncio.sleep_ms(0)
27+
self._event.clear()
28+
while self._count == 0: # Multiple tasks may be waiting for
29+
await self._event.wait() # a release
30+
self._event.clear()
31+
# When we yield, another task may succeed. In this case
32+
await asyncio.sleep(0) # the loop repeats
2833
self._count -= 1
2934

3035
def release(self):
36+
self._event.set()
3137
self._count += 1
3238

3339
class BoundedSemaphore(Semaphore):
@@ -37,6 +43,6 @@ def __init__(self, value=1):
3743

3844
def release(self):
3945
if self._count < self._initial_value:
40-
self._count += 1
46+
super().release()
4147
else:
4248
raise ValueError('Semaphore released more than acquired')

0 commit comments

Comments
 (0)