1 unstable release
| 0.1.0 | Nov 23, 2025 |
|---|
#11 in #event-bus
Used in avila
68KB
867 lines
avx-events
Event-driven architecture for Avila Experience Fabric
Pub/sub event bus, event sourcing, and message-driven patterns for building distributed AVX (Avila Experience) platform applications.
Features
- Event Bus: In-memory and distributed pub/sub
- Event Sourcing: Append-only event store with replay
- CQRS Support: Command/Query separation patterns
- Message Patterns: Request/reply, fire-and-forget, broadcast
- Dead Letter Queue: Failed event handling
- Event Replay: Time-travel debugging and audit trails
- Async/Await: Built on Tokio for high concurrency
- Type-Safe: Strongly typed events with serde
Installation
[dependencies]
avx-events = "0.1"
tokio = { version = "1", features = ["full"] }
Quick Start
Define Events
use avx_events::{Event, EventMetadata};
use serde::{Deserialize, Serialize};
use chrono::Utc;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserCreated {
pub user_id: String,
pub email: String,
pub name: String,
}
impl Event for UserCreated {
fn event_type(&self) -> &'static str {
"user.created"
}
fn aggregate_id(&self) -> String {
self.user_id.clone()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrderPlaced {
pub order_id: String,
pub user_id: String,
pub total: f64,
}
impl Event for OrderPlaced {
fn event_type(&self) -> &'static str {
"order.placed"
}
fn aggregate_id(&self) -> String {
self.order_id.clone()
}
}
Create Event Bus
use avx_events::EventBus;
#[tokio::main]
async fn main() {
let bus = EventBus::new();
// Subscribe to events
let mut subscriber = bus.subscribe::<UserCreated>().await;
// Publish event
bus.publish(UserCreated {
user_id: "123".into(),
email: "user@example.com".into(),
name: "John Doe".into(),
}).await.unwrap();
// Receive event
if let Some(event) = subscriber.recv().await {
println!("Received: {:?}", event);
}
}
Multiple Subscribers
use avx_events::EventBus;
#[tokio::main]
async fn main() {
let bus = EventBus::new();
// Service 1: Send email
let mut email_sub = bus.subscribe::<UserCreated>().await;
tokio::spawn(async move {
while let Some(event) = email_sub.recv().await {
send_welcome_email(&event.email).await;
}
});
// Service 2: Create profile
let mut profile_sub = bus.subscribe::<UserCreated>().await;
tokio::spawn(async move {
while let Some(event) = profile_sub.recv().await {
create_user_profile(&event.user_id).await;
}
});
// Service 3: Analytics
let mut analytics_sub = bus.subscribe::<UserCreated>().await;
tokio::spawn(async move {
while let Some(event) = analytics_sub.recv().await {
track_signup_event(&event).await;
}
});
// Publish event - all subscribers receive it
bus.publish(UserCreated {
user_id: "456".into(),
email: "jane@example.com".into(),
name: "Jane Smith".into(),
}).await.unwrap();
}
Event Sourcing
use avx_events::{EventStore, AggregateRoot};
// Define aggregate
pub struct UserAggregate {
pub id: String,
pub email: String,
pub name: String,
pub version: u64,
}
impl AggregateRoot for UserAggregate {
type Event = UserEvent;
fn apply(&mut self, event: Self::Event) {
match event {
UserEvent::Created(e) => {
self.id = e.user_id;
self.email = e.email;
self.name = e.name;
},
UserEvent::EmailChanged(e) => {
self.email = e.new_email;
},
}
self.version += 1;
}
}
// Use event store
#[tokio::main]
async fn main() {
let store = EventStore::new();
// Save events
store.append("user-123", vec![
UserEvent::Created(UserCreated { /* ... */ }),
UserEvent::EmailChanged(EmailChanged { /* ... */ }),
]).await.unwrap();
// Replay events to rebuild state
let events = store.get_events("user-123", 0).await.unwrap();
let mut user = UserAggregate::default();
for event in events {
user.apply(event);
}
println!("User state: {:?}", user);
}
CQRS Pattern
use avx_events::{CommandHandler, QueryHandler};
// Commands (write side)
pub struct CreateUserCommand {
pub email: String,
pub name: String,
}
pub struct CreateUserHandler {
event_bus: EventBus,
}
impl CommandHandler<CreateUserCommand> for CreateUserHandler {
type Result = String; // user_id
async fn handle(&self, cmd: CreateUserCommand) -> Result<Self::Result, Error> {
let user_id = uuid::Uuid::new_v4().to_string();
// Validate
if cmd.email.is_empty() {
return Err(Error::validation("Email required"));
}
// Publish event
self.event_bus.publish(UserCreated {
user_id: user_id.clone(),
email: cmd.email,
name: cmd.name,
}).await?;
Ok(user_id)
}
}
// Queries (read side)
pub struct GetUserQuery {
pub user_id: String,
}
pub struct GetUserHandler {
read_model: UserReadModel,
}
impl QueryHandler<GetUserQuery> for GetUserHandler {
type Result = UserView;
async fn handle(&self, query: GetUserQuery) -> Result<Self::Result, Error> {
self.read_model.find_by_id(&query.user_id).await
}
}
Topic-based Routing
use avx_events::TopicBus;
#[tokio::main]
async fn main() {
let bus = TopicBus::new();
// Subscribe to specific topics
let mut user_sub = bus.subscribe("users.*").await;
let mut order_sub = bus.subscribe("orders.*").await;
let mut all_sub = bus.subscribe("*").await; // All events
// Publish to topics
bus.publish_to("users.created", UserCreated { /* ... */ }).await;
bus.publish_to("orders.placed", OrderPlaced { /* ... */ }).await;
// user_sub receives UserCreated only
// order_sub receives OrderPlaced only
// all_sub receives both
}
Dead Letter Queue
use avx_events::{EventBus, DeadLetterQueue};
#[tokio::main]
async fn main() {
let bus = EventBus::with_dlq(DeadLetterQueue::new());
let mut subscriber = bus.subscribe::<UserCreated>().await;
tokio::spawn(async move {
while let Some(event) = subscriber.recv().await {
if let Err(e) = process_event(event).await {
// Event automatically goes to DLQ after retries
eprintln!("Failed to process: {}", e);
}
}
});
// View DLQ
let dlq_events = bus.dead_letter_queue().list().await;
println!("Failed events: {}", dlq_events.len());
// Retry from DLQ
for event in dlq_events {
bus.republish(event).await;
}
}
Request/Reply Pattern
use avx_events::RequestReplyBus;
#[tokio::main]
async fn main() {
let bus = RequestReplyBus::new();
// Responder
tokio::spawn(async move {
let mut requests = bus.listen::<GetUserRequest>().await;
while let Some((req, reply)) = requests.recv().await {
let user = fetch_user(&req.user_id).await;
reply.send(user).await;
}
});
// Requester
let response = bus.request(GetUserRequest {
user_id: "123".into(),
}).await.unwrap();
println!("User: {:?}", response);
}
Distributed Event Bus
Use with Redis, NATS, or Kafka:
use avx_events::distributed::RedisEventBus;
#[tokio::main]
async fn main() {
let bus = RedisEventBus::connect("redis://localhost:6379")
.await
.unwrap();
// Now events are distributed across services
bus.publish(UserCreated { /* ... */ }).await;
}
Event Metadata
All events carry metadata:
pub struct EventEnvelope<T> {
pub event: T,
pub metadata: EventMetadata,
}
pub struct EventMetadata {
pub event_id: String,
pub event_type: String,
pub aggregate_id: String,
pub timestamp: i64,
pub correlation_id: Option<String>,
pub causation_id: Option<String>,
pub user_id: Option<String>,
}
Integration with AVX Ecosystem
use avx_events::EventBus;
use avx_telemetry::init_tracing;
use tracing::info;
#[tokio::main]
async fn main() {
init_tracing();
let bus = EventBus::new();
let mut subscriber = bus.subscribe::<UserCreated>().await;
tokio::spawn(async move {
while let Some(event) = subscriber.recv().await {
info!(
user_id = %event.user_id,
email = %event.email,
"User created event received"
);
}
});
}
Testing
use avx_events::testing::MockEventBus;
#[tokio::test]
async fn test_user_service() {
let bus = MockEventBus::new();
let service = UserService::new(bus.clone());
service.create_user("user@example.com", "User").await.unwrap();
// Assert event was published
let events = bus.published_events::<UserCreated>().await;
assert_eq!(events.len(), 1);
assert_eq!(events[0].email, "user@example.com");
}
Part of AVX Ecosystem
avx-events enables event-driven microservices:
- avx-gateway: Event-driven request processing
- avx-telemetry: Event logging and tracing
- avx-api-core: Domain events from business logic
Examples
cargo run --example basic_pubsub
cargo run --example event_sourcing
cargo run --example cqrs
cargo run --example request_reply
cargo run --example distributed
Performance
- In-memory: 100,000+ events/sec
- Redis: 10,000+ events/sec
- Overhead: < 1ms per event
License
MIT OR Apache-2.0
See LICENSE-MIT and LICENSE-APACHE for details.
Links
- Repository: https://github.com/avilaops/arxis
- Documentation: https://docs.rs/avx-events
- Crates.io: https://crates.io/crates/avx-events
- AVX Platform: https://avilaops.com
Dependencies
~18–26MB
~397K SLoC