Skip to content

Commit 1a6b219

Browse files
committed
Add ringbuf_queue primiive.
1 parent 524ccae commit 1a6b219

File tree

5 files changed

+214
-13
lines changed

5 files changed

+214
-13
lines changed

v3/docs/EVENTS.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ This document assumes familiarity with `uasyncio`. See [official docs](http://do
2525
6.2 [EButton](./EVENTS.md#62-ebutton) Debounced pushbutton with double and long press events
2626
     6.2.1 [The suppress constructor argument](./EVENTS.md#621-the-suppress-constructor-argument)
2727
     6.2.2 [The sense constructor argument](./EVENTS.md#622-the-sense-constructor-argument)
28+
7. [Ringbuf queue](./EVENTS.md#7-ringbuf-queue) A MicroPython optimised queue primitive.
2829
[Appendix 1 Polling](./EVENTS.md#100-appendix-1-polling)
2930

3031
# 1. An alternative to callbacks in uasyncio code
@@ -478,6 +479,52 @@ determine whether the button is closed or open.
478479

479480
###### [Contents](./EVENTS.md#0-contents)
480481

482+
# 7. Ringbuf Queue
483+
484+
The API of the `Queue` aims for CPython compatibility. This is at some cost to
485+
efficiency. As the name suggests, the `RingbufQueue` class uses a pre-allocated
486+
circular buffer which may be of any mutable type supporting the buffer protocol
487+
e.g. `list`, `array` or `bytearray`.
488+
489+
Attributes of `RingbufQueue`:
490+
1. It is of fixed size, `Queue` can grow to arbitrary size.
491+
2. It uses pre-allocated buffers of various types (`Queue` uses a `list`).
492+
3. It is an asynchronous iterator allowing retrieval with `async for`.
493+
4. It has an "overwrite oldest data" synchronous write mode.
494+
495+
Constructor mandatory arg:
496+
* `buf` Buffer for the queue, e.g. list `[0 for _ in range(20)]` or array. A
497+
buffer of size `N` can hold a maximum of `N-1` items.
498+
499+
Synchronous methods (immediate return):
500+
* `qsize` No arg. Returns the number of items in the queue.
501+
* `empty` No arg. Returns `True` if the queue is empty.
502+
* `full` No arg. Returns `True` if the queue is full.
503+
* `get_nowait` No arg. Returns an object from the queue. Raises an exception
504+
if the queue is empty.
505+
* `put_nowait` Arg: the object to put on the queue. Raises an exception if the
506+
queue is full. If the calling code ignores the exception the oldest item in
507+
the queue will be overwritten. In some applications this can be of use.
508+
509+
Asynchronous methods:
510+
* `put` Arg: the object to put on the queue. If the queue is full, it will
511+
block until space is available.
512+
513+
Retrieving items from the queue:
514+
515+
The `RingbufQueue` is an asynchronous iterator. Results are retrieved using
516+
`async for`:
517+
```python
518+
async def handle_queued_data(q):
519+
async for obj in q:
520+
await asyncio.sleep(0) # See below
521+
# Process obj
522+
```
523+
The `sleep` is necessary if you have multiple tasks waiting on the queue,
524+
otherwise one task hogs all the data.
525+
526+
###### [Contents](./EVENTS.md#0-contents)
527+
481528
# 100 Appendix 1 Polling
482529

483530
The primitives or drivers referenced here do not use polling with the following

v3/docs/TUTORIAL.md

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -947,8 +947,10 @@ is raised.
947947

948948
## 3.5 Queue
949949

950-
This is currently an unofficial implementation. Its API is as per CPython
951-
asyncio.
950+
This is currently an unofficial implementation. Its API is a subset of that of
951+
CPython's `asyncio.Queue`. Like `asyncio.Queue` this class is not thread safe.
952+
A queue class optimised for MicroPython is presented in
953+
[Ringbuf queue](./EVENTS.md#7-ringbuf-queue).
952954

953955
The `Queue` class provides a means of synchronising producer and consumer
954956
tasks: the producer puts data items onto the queue with the consumer removing
@@ -1001,14 +1003,15 @@ async def queue_go(delay):
10011003

10021004
asyncio.run(queue_go(4))
10031005
```
1004-
In common with CPython's `asyncio.Queue` this class is not thread safe.
10051006

10061007
###### [Contents](./TUTORIAL.md#contents)
10071008

10081009
## 3.6 ThreadSafeFlag
10091010

10101011
This requires firmware V1.15 or later.
1011-
See also [Interfacing uasyncio to interrupts](./INTERRUPTS.md).
1012+
See also [Interfacing uasyncio to interrupts](./INTERRUPTS.md). Because of
1013+
[this issue](https://github.com/micropython/micropython/issues/7965) the
1014+
`ThreadSafeFlag` class does not work under the Unix build.
10121015

10131016
This official class provides an efficient means of synchronising a task with a
10141017
truly asynchronous event such as a hardware interrupt service routine or code
@@ -1313,7 +1316,9 @@ finally:
13131316

13141317
## 3.9 Message
13151318

1316-
This requires firmware V1.15 or later.
1319+
This requires firmware V1.15 or later. Note that because of
1320+
[this issue](https://github.com/micropython/micropython/issues/7965) the
1321+
`Message` class does not work under the Unix build.
13171322

13181323
This is an unofficial primitive with no counterpart in CPython asyncio. It uses
13191324
[ThreadSafeFlag](./TUTORIAL.md#36-threadsafeflag) to provide an object similar

v3/primitives/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def _handle_exception(loop, context):
4747
"WaitAny": "events",
4848
"ESwitch": "events",
4949
"EButton": "events",
50+
"RingbufQueue": "ringbuf_queue",
5051
}
5152

5253
# Copied from uasyncio.__init__.py

v3/primitives/ringbuf_queue.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# ringbuf_queue.py Provides RingbufQueue class
2+
# API differs from CPython
3+
# Uses pre-allocated ring buffer: can use list or array
4+
# Asynchronous iterator allowing consumer to use async for
5+
# put_nowait QueueFull exception can be ignored allowing oldest data to be discarded.
6+
7+
import uasyncio as asyncio
8+
9+
# Exception raised by get_nowait().
10+
class QueueEmpty(Exception):
11+
pass
12+
13+
# Exception raised by put_nowait().
14+
class QueueFull(Exception):
15+
pass
16+
17+
class RingbufQueue: # MicroPython optimised
18+
def __init__(self, buf):
19+
self._q = buf
20+
self._size = len(buf)
21+
self._wi = 0
22+
self._ri = 0
23+
self._evput = asyncio.Event() # Triggered by put, tested by get
24+
self._evget = asyncio.Event() # Triggered by get, tested by put
25+
26+
def full(self):
27+
return ((self._wi + 1) % self._size) == self._ri
28+
29+
def empty(self):
30+
return self._ri == self._wi
31+
32+
def qsize(self):
33+
return (self._wi - self._ri) % self._size
34+
35+
def get_nowait(self): # Remove and return an item from the queue.
36+
# Return an item if one is immediately available, else raise QueueEmpty.
37+
if self.empty():
38+
raise QueueEmpty()
39+
r = self._q[self._ri]
40+
self._ri = (self._ri + 1) % self._size
41+
return r
42+
43+
def put_nowait(self, v):
44+
self._q[self._wi] = v
45+
self._evput.set() # Schedule any tasks waiting on get
46+
self._evput.clear()
47+
self._wi = (self._wi + 1) % self._size
48+
if self._wi == self._ri: # Would indicate empty
49+
self._ri = (self._ri + 1) % self._size # Discard a message
50+
raise QueueFull # Caller can ignore if overwrites are OK
51+
52+
async def put(self, val): # Usage: await queue.put(item)
53+
while self.full(): # Queue full
54+
await self._evget.wait() # May be >1 task waiting on ._evget
55+
# Task(s) waiting to get from queue, schedule first Task
56+
self.put_nowait(val)
57+
58+
def __aiter__(self):
59+
return self
60+
61+
async def __anext__(self):
62+
while self.empty(): # Empty. May be more than one task waiting on ._evput
63+
await self._evput.wait()
64+
r = self._q[self._ri]
65+
self._ri = (self._ri + 1) % self._size
66+
self._evget.set() # Schedule all tasks waiting on ._evget
67+
self._evget.clear()
68+
return r

v3/primitives/tests/asyntest.py

Lines changed: 88 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,10 @@
1313
import uasyncio as asyncio
1414
except ImportError:
1515
import asyncio
16+
import sys
17+
unix = "linux" in sys.implementation._machine
1618

17-
from primitives.message import Message
18-
from primitives.barrier import Barrier
19-
from primitives.semaphore import Semaphore, BoundedSemaphore
20-
from primitives.condition import Condition
19+
from primitives import Message, Barrier, Semaphore, BoundedSemaphore, Condition, Queue, RingbufQueue
2120

2221
def print_tests():
2322
st = '''Available functions:
@@ -30,6 +29,7 @@ def print_tests():
3029
test(6) Test BoundedSemaphore.
3130
test(7) Test the Condition class.
3231
test(8) Test the Queue class.
32+
test(9) Test the RingbufQueue class.
3333
'''
3434
print('\x1b[32m')
3535
print(st)
@@ -83,6 +83,9 @@ async def ack_coro(delay):
8383
print("Time to die...")
8484

8585
def ack_test():
86+
if unix:
87+
print("Message class is incompatible with Unix build.")
88+
return
8689
printexp('''message was set
8790
message_wait 1 got message with value 0
8891
message_wait 2 got message with value 0
@@ -142,6 +145,9 @@ async def run_message_test():
142145
print('Tasks complete')
143146

144147
def msg_test():
148+
if unix:
149+
print("Message class is incompatible with Unix build.")
150+
return
145151
printexp('''Test Lock class
146152
Test Message class
147153
waiting for message
@@ -389,8 +395,6 @@ def condition_test():
389395

390396
# ************ Queue test ************
391397

392-
from primitives.queue import Queue
393-
394398
async def slow_process():
395399
await asyncio.sleep(2)
396400
return 42
@@ -462,7 +466,7 @@ async def queue_go():
462466
getter(q)
463467
)
464468
print('Queue tests complete')
465-
print("I've seen starships burn off the shoulder of Orion...")
469+
print("I've seen attack ships burn off the shoulder of Orion...")
466470
print("Time to die...")
467471

468472
def queue_test():
@@ -476,12 +480,86 @@ def queue_test():
476480
Queue tests complete
477481
478482
479-
I've seen starships burn off the shoulder of Orion...
483+
I've seen attack ships burn off the shoulder of Orion...
480484
Time to die...
481485
482486
''', 20)
483487
asyncio.run(queue_go())
484488

489+
# ************ RingbufQueue test ************
490+
491+
async def qread(q, lst, twr):
492+
async for item in q:
493+
lst.append(item)
494+
await asyncio.sleep_ms(twr)
495+
496+
async def read(q, t, twr=0):
497+
lst = []
498+
try:
499+
await asyncio.wait_for(qread(q, lst, twr), t)
500+
except asyncio.TimeoutError:
501+
pass
502+
return lst
503+
504+
async def put_list(q, lst, twp=0):
505+
for item in lst:
506+
await q.put(item)
507+
await asyncio.sleep_ms(twp)
508+
509+
async def rbq_go():
510+
q = RingbufQueue([0 for _ in range(10)]) # 10 elements
511+
pl = [n for n in range(15)]
512+
print("Read waits on slow write.")
513+
asyncio.create_task(put_list(q, pl, 100))
514+
rl = await read(q, 2)
515+
assert pl == rl
516+
print('done')
517+
print("Write waits on slow read.")
518+
asyncio.create_task(put_list(q, pl))
519+
rl = await read(q, 2, 100)
520+
assert pl == rl
521+
print('done')
522+
print("Testing full, empty and qsize methods.")
523+
assert q.empty()
524+
assert q.qsize() == 0
525+
assert not q.full()
526+
await put_list(q, (1,2,3))
527+
assert not q.empty()
528+
assert q.qsize() == 3
529+
assert not q.full()
530+
print("Done")
531+
print("Testing put_nowait and overruns.")
532+
nfail = 0
533+
for x in range(4, 15):
534+
try:
535+
q.put_nowait(x)
536+
except:
537+
nfail += 1
538+
assert nfail == 5
539+
assert q.full()
540+
rl = await read(q, 2)
541+
assert rl == [6, 7, 8, 9, 10, 11, 12, 13, 14]
542+
print("Tests complete.")
543+
print("I've seen attack ships burn off the shoulder of Orion...")
544+
print("Time to die...")
545+
546+
def rbq_test():
547+
printexp('''Running (runtime = 6s):
548+
Read waits on slow write.
549+
done
550+
Write waits on slow read.
551+
done
552+
Testing full, empty and qsize methods.
553+
Done
554+
Testing put_nowait and overruns.
555+
Tests complete.
556+
I've seen attack ships burn off the shoulder of Orion...
557+
Time to die...
558+
559+
''', 20)
560+
asyncio.run(rbq_go())
561+
562+
# ************ ************
485563
def test(n):
486564
try:
487565
if n == 1:
@@ -500,6 +578,8 @@ def test(n):
500578
condition_test() # Test the Condition class.
501579
elif n == 8:
502580
queue_test() # Test the Queue class.
581+
elif n == 9:
582+
rbq_test() # Test the RingbufQueue class.
503583
except KeyboardInterrupt:
504584
print('Interrupted')
505585
finally:

0 commit comments

Comments
 (0)