Skip to content

Commit 197c2b5

Browse files
committed
Broker: add wildcard subscriptions.
1 parent cd56ee0 commit 197c2b5

File tree

4 files changed

+53
-17
lines changed

4 files changed

+53
-17
lines changed

v3/docs/DRIVERS.md

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ MicroPython's `asyncio` when used in a microcontroller context.
2626
6.1 [Encoder class](./DRIVERS.md#61-encoder-class)
2727
7. [Ringbuf Queue](./DRIVERS.md#7-ringbuf-queue) A MicroPython optimised queue primitive.
2828
8. [Delay_ms class](./DRIVERS.md#8-delay_ms-class) A flexible retriggerable delay with callback or Event interface.
29-
9. [Message Broker](./DRIVERS.md#9-message-broker) A flexible means of messaging between
30-
tasks.
29+
9. [Message Broker](./DRIVERS.md#9-message-broker) A flexible means of messaging between tasks.
3130
9.1 [Further examples](./DRIVERS.md#91-further-examples)
3231
9.2 [User agents](./DRIVERS.md#92-user-agents) User defined Agent classes.
33-
9.3 [Notes](./DRIVERS.md#93-notes)
32+
9.3 [Wildcard subscriptions](./DRIVERS.md#93-wildcard-subscriptions)
33+
9.4 [Notes](./DRIVERS.md#9-notes)
3434
10. [Additional functions](./DRIVERS.md#10-additional-functions)
3535
10.1 [launch](./DRIVERS.md#101-launch) Run a coro or callback interchangeably.
3636
10.2 [set_global_exception](./DRIVERS.md#102-set_global_exception) Simplify debugging with a global exception handler.
@@ -1332,7 +1332,21 @@ async def main():
13321332

13331333
asyncio.run(main())
13341334
```
1335-
## 9.3 Notes
1335+
## 9.3 Wildcard subscriptions
1336+
1337+
In the case of publications whose topics are strings, a single call to
1338+
`.subscribe` can subscribe an `agent` to multiple topics. This is by wildcard
1339+
matching. By default exact matching is used, however this can be changed to use
1340+
regular expressions as in this code fragment:
1341+
```py
1342+
from primitives import Broker, RegExp
1343+
broker.subscribe(RegExp(".*_topic"), some_agent)
1344+
```
1345+
In this case `some_agent` would be triggered by publications to `foo_topic` or
1346+
`bar_topic` because the string `".*_topic"` matches these by the rules of
1347+
regular expressions.
1348+
1349+
## 9.4 Notes
13361350

13371351
#### The publish/subscribe model
13381352

v3/primitives/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def _handle_exception(loop, context):
5555
"SwArray": "sw_array",
5656
"Broker": "broker",
5757
"Agent": "broker",
58+
"RegExp": "broker",
5859
}
5960

6061
# Copied from asyncio.__init__.py

v3/primitives/broker.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,21 @@
88

99
import asyncio
1010
from primitives import Queue, RingbufQueue, type_coro
11+
import re
1112

1213

1314
class Agent:
1415
pass
1516

1617

18+
class RegExp:
19+
def __init__(self, re_str):
20+
self.re = re.compile(re_str)
21+
22+
def matching(self, topic):
23+
return re.match(self.re, topic) is not None
24+
25+
1726
def _validate(a):
1827
return (
1928
isinstance(a, asyncio.Event)
@@ -50,7 +59,13 @@ def unsubscribe(self, topic, agent, *args):
5059
print(f"Unsubscribe topic {topic} fail: topic not subscribed.")
5160

5261
def publish(self, topic, message):
53-
agents = self.get(topic, [])
62+
agents = set() # Agents which are triggered by this topic
63+
if isinstance(topic, str): # Check regexps
64+
# Are any keys RegExp instances?
65+
for regexp in [k for k in self.keys() if isinstance(k, RegExp)]:
66+
if regexp.matching(topic):
67+
agents.update(self[regexp]) # Append matching agents
68+
agents.update(self.get(topic, [])) # Exact match
5469
for agent, args in agents:
5570
if isinstance(agent, asyncio.Event):
5671
agent.set()

v3/primitives/tests/broker_test.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# import primitives.tests.broker_test
44

55
import asyncio
6-
from primitives import Broker, Queue, RingbufQueue
6+
from primitives import Broker, Queue, RingbufQueue, RegExp
77

88
broker = Broker()
99

@@ -49,12 +49,13 @@ def get_data(self, topic, message):
4949

5050
async def print_queue(q):
5151
while True:
52-
topic, message = await q.get()
52+
topic, message = await asyncio.wait_for(q.get(), 2)
5353
print(topic, message)
5454

5555

5656
async def print_ringbuf_q(q):
57-
async for topic, message, args in q:
57+
while True:
58+
topic, message, args = await asyncio.wait_for(q.get(), 2)
5859
print(topic, message, args)
5960

6061

@@ -98,20 +99,19 @@ async def main():
9899
print("Unsubscribing method")
99100
broker.unsubscribe("foo_topic", tc.get_data) # Async method
100101
print("Retrieving foo_topic messages from Queue")
101-
try:
102-
await asyncio.wait_for(print_queue(q), 5)
103-
except asyncio.TimeoutError:
104-
print("Timeout")
105102
print("Retrieving bar_topic messages from RingbufQueue")
106-
try:
107-
await asyncio.wait_for(print_ringbuf_q(rq), 5)
108-
except asyncio.TimeoutError:
109-
print("Timeout")
103+
await asyncio.gather(print_queue(q), print_ringbuf_q(rq), return_exceptions=True)
104+
# Queues are now empty
110105
print()
106+
print("*** Unsubscribing queues ***")
107+
broker.unsubscribe("foo_topic", q)
108+
broker.unsubscribe("bar_topic", rq, "args", "added")
109+
print()
110+
111111
print("*** Testing error reports and exception ***")
112112
print()
113113
Broker.Verbose = True
114-
print("*** Check error on invalid unsubscribe ***")
114+
print("*** Produce warning messages on invalid unsubscribe ***")
115115
broker.unsubscribe("rats", "more rats") # Invalid topic
116116
broker.unsubscribe("foo_topic", "rats") # Invalid agent
117117
print("*** Check exception on invalid subscribe ***")
@@ -120,6 +120,12 @@ async def main():
120120
print("Test FAIL")
121121
except ValueError:
122122
print("Test PASS")
123+
print()
124+
print("*** Test wildcard subscribe ***")
125+
broker.subscribe(RegExp(".*_topic"), func)
126+
broker.publish("FAIL", func) # No match
127+
asyncio.create_task(test(5))
128+
await asyncio.sleep(10)
123129

124130

125131
asyncio.run(main())

0 commit comments

Comments
 (0)