3 releases
| 0.1.2 | Nov 18, 2020 |
|---|---|
| 0.1.1 | Sep 14, 2020 |
| 0.1.0 | Sep 14, 2020 |
#805 in Concurrency
12KB
186 lines
mp2c
Multi producer multi polling consumer
What is mp2c?
MP2C is a data structure that enables multiple producers/publishers to send messages to multiple consumers/subscribers.
The async mp2c data structure is called a Carousel.
What do you mean by messages?
A message in mp2c context is a vector of u8. It's up to the producers/ consumers to marshall and unmarshall these messages as they best see fit.
Is mp2c thread safe?
Yes.
How can multiple producers(threads) send messages to a mp2c Carousel?
Cloning a mp2c::asynch::Carousel creates a clone of the underlying std::sync::Sender and every invocation of Carousel::put will send messages to the consumers.
Does mp2c support async message pub?
mp2c::asynch::Carousel supports full async behavior. All messages put on the Carousel are asynchronously sent to the consumers.
Is there a memory overhead?
Yes. In the spirit of don't communicate by sharing memory, share memory by communicating, all messages are cloned as many times as the count of mp2c::asynch::Consumers.
Multi producer multi consumer example
use mp2c::asynch::{Carousel, Consumer};
struct TestConsumer1;
impl Consumer for TestConsumer1 {
fn consume(&mut self, data: Vec<u8>) {
let msg = String::from_utf8(data).unwrap();
// do something with msg
}
}
struct TestConsumer2;
impl Consumer for TestConsumer2 {
fn consume(&mut self, data: Vec<u8>) {
let msg = String::from_utf8(data).unwrap();
// do something with msg
}
}
let mut v: Vec<Box<dyn Consumer + Send + 'static>> = Vec::new();
v.push(Box::new(TestConsumer1));
v.push(Box::new(TestConsumer2));
let c = Carousel::new(v);
for _ in 1..10 {
let cloned_c = c.clone();
let t = std::thread::spawn(move || {
cloned_c.put(String::from("test").into_bytes());
});
t.join().unwrap();
}
What's next?
Message id
Add a message id to each message being put on the data carousel.
Release history
v0.1.2
Make consumer mutable
v0.1.1
Updated README with example.
v0.1.0
Initial release.