19 releases (7 breaking)
Uses new Rust 2024
| 0.13.0 | Sep 22, 2025 |
|---|---|
| 0.11.3 | Sep 3, 2025 |
| 0.10.0 | Jul 28, 2025 |
#69 in WebSocket
379 downloads per month
250KB
3.5K
SLoC
atproto-jetstream
WebSocket consumer for AT Protocol Jetstream events.
Overview
Real-time event streaming with Zstandard compression, automatic reconnection, and configurable event filtering for AT Protocol repository changes.
Features
- WebSocket streaming: High-performance event consumption with automatic reconnection handling
- Event filtering: Configurable filtering by collections and DIDs for targeted event processing
- Zstandard compression: Built-in support for compressed event streams with custom dictionaries
- Event handlers: Flexible handler system supporting multiple custom event processors
- Background processing: Asynchronous event processing with graceful shutdown via cancellation tokens
- Structured errors: Comprehensive error handling with detailed error codes
CLI Tools
The following command-line tool is available when built with the clap feature:
atproto-jetstream-consumer: Real-time event stream consumer with filtering, compression, and background processing support
Usage
Basic Event Consumer
use atproto_jetstream::{Consumer, ConsumerTaskConfig, EventHandler, JetstreamEvent, CancellationToken};
use async_trait::async_trait;
struct MyEventHandler;
#[async_trait]
impl EventHandler for MyEventHandler {
async fn handle_event(&self, event: JetstreamEvent) -> anyhow::Result<()> {
println!("Received event: {:?}", event);
Ok(())
}
fn handler_id(&self) -> String {
"my-handler".to_string()
}
}
let config = ConsumerTaskConfig {
user_agent: "my-app/1.0".to_string(),
compression: false,
zstd_dictionary_location: String::new(),
jetstream_hostname: "jetstream1.us-east.bsky.network".to_string(),
collections: vec!["app.bsky.feed.post".to_string()],
};
let consumer = Consumer::new(config);
consumer.register_handler(std::sync::Arc::new(MyEventHandler)).await?;
let cancellation_token = CancellationToken::new();
consumer.run_background(cancellation_token).await?;
With Compression
let config = ConsumerTaskConfig {
compression: true,
zstd_dictionary_location: "./data/zstd_dictionary".to_string(),
// ... other config
};
// Download dictionary first:
// curl -o data/zstd_dictionary https://github.com/bluesky-social/jetstream/raw/refs/heads/main/pkg/models/zstd_dictionary
Command Line Examples
# Basic streaming
cargo run --bin atproto-jetstream-consumer \
--hostname jetstream1.us-east.bsky.network \
--collections app.bsky.feed.post \
--user-agent "my-consumer/1.0"
# With compression
cargo run --bin atproto-jetstream-consumer \
--hostname jetstream1.us-east.bsky.network \
--collections app.bsky.feed.post \
--compression \
--zstd-dictionary ./data/zstd_dictionary
# With filtering
cargo run --bin atproto-jetstream-consumer \
--hostname jetstream1.us-east.bsky.network \
--collections "app.bsky.feed.post,app.bsky.actor.profile" \
--dids "did:plc:user123,did:plc:user456"
License
MIT License
Dependencies
~28–45MB
~778K SLoC