Skip to content

Commit 4549dc4

Browse files
committed
Create threading directory and THREADING.md
1 parent 58d5a16 commit 4549dc4

File tree

6 files changed

+243
-79
lines changed

6 files changed

+243
-79
lines changed

v3/docs/EVENTS.md

Lines changed: 0 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -534,84 +534,6 @@ except IndexError:
534534
pass
535535
```
536536

537-
# 8. Threadsafe Queue
538-
539-
This queue is designed to interface between one `uasyncio` task and a single
540-
thread running in a different context. This can be an interrupt service routine
541-
(ISR), code running in a different thread or code on a different core.
542-
543-
Any Python object may be placed on a `ThreadSafeQueue`. If bi-directional
544-
communication is required between the two contexts, two `ThreadSafeQueue`
545-
instances are required.
546-
547-
Attributes of `ThreadSafeQueue`:
548-
1. It is of fixed size defined on instantiation.
549-
2. It uses pre-allocated buffers of various types (`Queue` uses a `list`).
550-
3. It is an asynchronous iterator allowing retrieval with `async for`.
551-
4. It provides synchronous "put" and "get" methods. If the queue becomes full
552-
(put) or empty (get), behaviour is user definable. The method either blocks or
553-
raises an `IndexError`.
554-
555-
Constructor mandatory arg:
556-
* `buf` Buffer for the queue, e.g. list `[0 for _ in range(20)]` or array. A
557-
buffer of size `N` can hold a maximum of `N-1` items.
558-
559-
Synchronous methods.
560-
* `qsize` No arg. Returns the number of items in the queue.
561-
* `empty` No arg. Returns `True` if the queue is empty.
562-
* `full` No arg. Returns `True` if the queue is full.
563-
* `get_sync` Arg `block=False`. Returns an object from the queue. Raises
564-
`IndexError` if the queue is empty, unless `block==True` in which case the
565-
method blocks until the `uasyncio` tasks put an item on the queue.
566-
* `put_sync` Args: the object to put on the queue, `block=False`. Raises
567-
`IndexError` if the queue is full unless `block==True` in which case the
568-
method blocks until the `uasyncio` tasks remove an item from the queue.
569-
570-
The blocking methods should not be used in the `uasyncio` context, because by
571-
blocking they will lock up the scheduler.
572-
573-
Asynchronous methods:
574-
* `put` Arg: the object to put on the queue. If the queue is full, it will
575-
block until space is available.
576-
577-
In use as a data consumer the `uasyncio` code will use `async for` to retrieve
578-
items from the queue. If it is a data provider it will use `put` to place
579-
objects on the queue.
580-
581-
Data consumer:
582-
```python
583-
async def handle_queued_data(q):
584-
async for obj in q:
585-
# Process obj
586-
```
587-
Data provider:
588-
```python
589-
async def feed_queue(q):
590-
while True:
591-
data = await data_source()
592-
await q.put(data)
593-
```
594-
The alternate thread will use synchronous methods.
595-
596-
Data provider (throw if full):
597-
```python
598-
while True:
599-
data = data_source()
600-
try:
601-
q.put_sync(data)
602-
except IndexError:
603-
# Queue is full
604-
```
605-
Data consumer (block while empty):
606-
```python
607-
while True:
608-
data = q.get(block=True) # May take a while if the uasyncio side is slow
609-
process(data) # Do something with it
610-
```
611-
Note that where the alternate thread is an ISR it is very bad practice to allow
612-
blocking. The application should be designed in such a way that the full/empty
613-
case does not occur.
614-
615537
###### [Contents](./EVENTS.md#0-contents)
616538

617539
# 100 Appendix 1 Polling

v3/docs/THREADING.md

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
# Thread safe classes
2+
3+
These provide an interface between `uasyncio` tasks and code running in a
4+
different context. Supported contexts are:
5+
1. An interrupt service routine (ISR).
6+
2. Another thread running on the same core.
7+
3. Code running on a different core (currently only supported on RP2).
8+
9+
The first two cases are relatively straightforward because both contexts share
10+
a common bytecode interpreter and GIL. There is a guarantee that even a hard
11+
MicroPython (MP) ISR will not interrupt execution of a line of Python code.
12+
13+
This is not the case where the threads run on different cores, where there is
14+
no synchronisation between the streams of machine code. If the two threads
15+
concurrently modify a shared Python object, there is no guarantee that
16+
corruption will not occur.
17+
18+
# 2. Threadsafe Event
19+
20+
The `ThreadsafeFlag` has a limitation in that only a single task can wait on
21+
it. The `ThreadSafeEvent` overcomes this. It is subclassed from `Event` and
22+
presents the same interface. The `set` method may be called from an ISR or from
23+
code running on another core. Any number of tasks may wait on it.
24+
25+
The following Pyboard-specific code demos its use in a hard ISR:
26+
```python
27+
import uasyncio as asyncio
28+
from threadsafe import ThreadSafeEvent
29+
from pyb import Timer
30+
31+
async def waiter(n, evt):
32+
try:
33+
await evt.wait()
34+
print(f"Waiter {n} got event")
35+
except asyncio.CancelledError:
36+
print(f"Waiter {n} cancelled")
37+
38+
async def can(task):
39+
await asyncio.sleep_ms(100)
40+
task.cancel()
41+
42+
async def main():
43+
evt = ThreadSafeEvent()
44+
tim = Timer(4, freq=1, callback=lambda t: evt.set())
45+
nt = 0
46+
while True:
47+
tasks = [asyncio.create_task(waiter(n + 1, evt)) for n in range(4)]
48+
asyncio.create_task(can(tasks[nt]))
49+
await asyncio.gather(*tasks, return_exceptions=True)
50+
evt.clear()
51+
print("Cleared event")
52+
nt = (nt + 1) % 4
53+
54+
asyncio.run(main())
55+
```
56+
57+
# 3. Threadsafe Queue
58+
59+
This queue is designed to interface between one `uasyncio` task and a single
60+
thread running in a different context. This can be an interrupt service routine
61+
(ISR), code running in a different thread or code on a different core.
62+
63+
Any Python object may be placed on a `ThreadSafeQueue`. If bi-directional
64+
communication is required between the two contexts, two `ThreadSafeQueue`
65+
instances are required.
66+
67+
Attributes of `ThreadSafeQueue`:
68+
1. It is of fixed size defined on instantiation.
69+
2. It uses pre-allocated buffers of various types (`Queue` uses a `list`).
70+
3. It is an asynchronous iterator allowing retrieval with `async for`.
71+
4. It provides synchronous "put" and "get" methods. If the queue becomes full
72+
(put) or empty (get), behaviour is user definable. The method either blocks or
73+
raises an `IndexError`.
74+
75+
Constructor mandatory arg:
76+
* `buf` Buffer for the queue, e.g. list `[0 for _ in range(20)]` or array. A
77+
buffer of size `N` can hold a maximum of `N-1` items.
78+
79+
Synchronous methods.
80+
* `qsize` No arg. Returns the number of items in the queue.
81+
* `empty` No arg. Returns `True` if the queue is empty.
82+
* `full` No arg. Returns `True` if the queue is full.
83+
* `get_sync` Arg `block=False`. Returns an object from the queue. Raises
84+
`IndexError` if the queue is empty, unless `block==True` in which case the
85+
method blocks until the `uasyncio` tasks put an item on the queue.
86+
* `put_sync` Args: the object to put on the queue, `block=False`. Raises
87+
`IndexError` if the queue is full unless `block==True` in which case the
88+
method blocks until the `uasyncio` tasks remove an item from the queue.
89+
90+
See the note below re blocking methods.
91+
92+
Asynchronous methods:
93+
* `put` Arg: the object to put on the queue. If the queue is full, it will
94+
block until space is available.
95+
96+
In use as a data consumer the `uasyncio` code will use `async for` to retrieve
97+
items from the queue. If it is a data provider it will use `put` to place
98+
objects on the queue.
99+
100+
Data consumer:
101+
```python
102+
async def handle_queued_data(q):
103+
async for obj in q:
104+
# Process obj
105+
```
106+
Data provider:
107+
```python
108+
async def feed_queue(q):
109+
while True:
110+
data = await data_source()
111+
await q.put(data)
112+
```
113+
The alternate thread will use synchronous methods.
114+
115+
Data provider (throw if full):
116+
```python
117+
while True:
118+
data = data_source()
119+
try:
120+
q.put_sync(data)
121+
except IndexError:
122+
# Queue is full
123+
```
124+
Data consumer (block while empty):
125+
```python
126+
while True:
127+
data = q.get(block=True) # May take a while if the uasyncio side is slow
128+
process(data) # Do something with it
129+
```
130+
131+
## 3.1 Blocking
132+
133+
The synchronous `get_sync` and `put_sync` methods have blocking modes invoked
134+
by passing `block=True`. Blocking modes are intended to be used in a multi
135+
threaded context. They should not be invoked in a `uasyncio` task, because
136+
blocking locks up the scheduler. Nor should they be used in an ISR where
137+
blocking code can have unpredictable consequences.
138+
139+
These methods, called with `blocking=False`, produce an immediate return. To
140+
avoid an `IndexError` the user should check for full or empty status before
141+
calling.
142+
143+
## 3.2 A complete example
144+
145+
This demonstrates an echo server running on core 2. The `sender` task sends
146+
consecutive integers to the server, which echoes them back on a second queue.
147+
```python
148+
import uasyncio as asyncio
149+
from threadsafe import ThreadSafeQueue
150+
import _thread
151+
from time import sleep_ms
152+
153+
def core_2(getq, putq): # Run on core 2
154+
buf = []
155+
while True:
156+
while getq.qsize(): # Ensure no exception when queue is empty
157+
buf.append(getq.get_sync())
158+
for x in buf:
159+
putq.put_sync(x, block=True) # Wait if queue fills.
160+
buf.clear()
161+
sleep_ms(30)
162+
163+
async def sender(to_core2):
164+
x = 0
165+
while True:
166+
await to_core2.put(x := x + 1)
167+
168+
async def main():
169+
to_core2 = ThreadSafeQueue([0 for _ in range(10)])
170+
from_core2 = ThreadSafeQueue([0 for _ in range(10)])
171+
_thread.start_new_thread(core_2, (to_core2, from_core2))
172+
asyncio.create_task(sender(to_core2))
173+
n = 0
174+
async for x in from_core2:
175+
if not x % 1000:
176+
print(f"Received {x} queue items.")
177+
n += 1
178+
assert x == n
179+
180+
asyncio.run(main())
181+
```

v3/primitives/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ def _handle_exception(loop, context):
4848
"ESwitch": "events",
4949
"EButton": "events",
5050
"RingbufQueue": "ringbuf_queue",
51-
"ThreadSafeQueue": "threadsafe_queue",
5251
}
5352

5453
# Copied from uasyncio.__init__.py

v3/threadsafe/__init__.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# __init__.py Common functions for uasyncio threadsafe primitives
2+
3+
# Copyright (c) 2022 Peter Hinch
4+
# Released under the MIT License (MIT) - see LICENSE file
5+
6+
try:
7+
import uasyncio as asyncio
8+
except ImportError:
9+
import asyncio
10+
11+
_attrs = {
12+
"ThreadSafeEvent": "threadsafe_event",
13+
"ThreadSafeQueue": "threadsafe_queue",
14+
}
15+
16+
# Copied from uasyncio.__init__.py
17+
# Lazy loader, effectively does:
18+
# global attr
19+
# from .mod import attr
20+
def __getattr__(attr):
21+
mod = _attrs.get(attr, None)
22+
if mod is None:
23+
raise AttributeError(attr)
24+
value = getattr(__import__(mod, None, None, True, 1), attr)
25+
globals()[attr] = value
26+
return value

v3/threadsafe/threadsafe_event.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# threadsafe_queue.py Provides ThreadsafeQueue class
2+
3+
# Copyright (c) 2022 Peter Hinch
4+
# Released under the MIT License (MIT) - see LICENSE file
5+
6+
import uasyncio as asyncio
7+
8+
9+
class ThreadSafeEvent(asyncio.Event):
10+
def __init__(self):
11+
super().__init__()
12+
self._waiting_on_tsf = False
13+
self._tsf = asyncio.ThreadSafeFlag()
14+
15+
def set(self):
16+
self._tsf.set()
17+
18+
async def _waiter(self):
19+
await self._tsf.wait()
20+
super().set()
21+
self._waiting_on_tsf = False
22+
23+
async def wait(self):
24+
if self._waiting_on_tsf == False:
25+
self._waiting_on_tsf = True
26+
await asyncio.sleep_ms(0)
27+
try:
28+
await self._tsf.wait()
29+
super().set()
30+
self._waiting_on_tsf = False
31+
except asyncio.CancelledError:
32+
asyncio.create_task(self._waiter())
33+
raise
34+
else:
35+
await super().wait()
36+
File renamed without changes.

0 commit comments

Comments
 (0)