#event-streaming #event-handling #events #atprotocol

bin+lib atproto-jetstream

AT Protocol Jetstream event consumer library with WebSocket streaming and compression support

19 releases (7 breaking)

Uses new Rust 2024

0.13.0 Sep 22, 2025
0.11.3 Sep 3, 2025
0.10.0 Jul 28, 2025

#69 in WebSocket

Download history 329/week @ 2025-06-25 206/week @ 2025-07-02 597/week @ 2025-07-09 29/week @ 2025-07-16 118/week @ 2025-07-23 27/week @ 2025-07-30 9/week @ 2025-08-06 93/week @ 2025-08-13 368/week @ 2025-08-20 42/week @ 2025-08-27 159/week @ 2025-09-03 24/week @ 2025-09-10 259/week @ 2025-09-17 47/week @ 2025-09-24 69/week @ 2025-10-01 1/week @ 2025-10-08

379 downloads per month

MIT license

250KB
3.5K SLoC

atproto-jetstream

WebSocket consumer for AT Protocol Jetstream events.

Overview

Real-time event streaming with Zstandard compression, automatic reconnection, and configurable event filtering for AT Protocol repository changes.

Features

  • WebSocket streaming: High-performance event consumption with automatic reconnection handling
  • Event filtering: Configurable filtering by collections and DIDs for targeted event processing
  • Zstandard compression: Built-in support for compressed event streams with custom dictionaries
  • Event handlers: Flexible handler system supporting multiple custom event processors
  • Background processing: Asynchronous event processing with graceful shutdown via cancellation tokens
  • Structured errors: Comprehensive error handling with detailed error codes

CLI Tools

The following command-line tool is available when built with the clap feature:

  • atproto-jetstream-consumer: Real-time event stream consumer with filtering, compression, and background processing support

Usage

Basic Event Consumer

use atproto_jetstream::{Consumer, ConsumerTaskConfig, EventHandler, JetstreamEvent, CancellationToken};
use async_trait::async_trait;

struct MyEventHandler;

#[async_trait]
impl EventHandler for MyEventHandler {
    async fn handle_event(&self, event: JetstreamEvent) -> anyhow::Result<()> {
        println!("Received event: {:?}", event);
        Ok(())
    }

    fn handler_id(&self) -> String {
        "my-handler".to_string()
    }
}

let config = ConsumerTaskConfig {
    user_agent: "my-app/1.0".to_string(),
    compression: false,
    zstd_dictionary_location: String::new(),
    jetstream_hostname: "jetstream1.us-east.bsky.network".to_string(),
    collections: vec!["app.bsky.feed.post".to_string()],
};

let consumer = Consumer::new(config);
consumer.register_handler(std::sync::Arc::new(MyEventHandler)).await?;

let cancellation_token = CancellationToken::new();
consumer.run_background(cancellation_token).await?;

With Compression

let config = ConsumerTaskConfig {
    compression: true,
    zstd_dictionary_location: "./data/zstd_dictionary".to_string(),
    // ... other config
};

// Download dictionary first:
// curl -o data/zstd_dictionary https://github.com/bluesky-social/jetstream/raw/refs/heads/main/pkg/models/zstd_dictionary

Command Line Examples

# Basic streaming
cargo run --bin atproto-jetstream-consumer \
    --hostname jetstream1.us-east.bsky.network \
    --collections app.bsky.feed.post \
    --user-agent "my-consumer/1.0"

# With compression
cargo run --bin atproto-jetstream-consumer \
    --hostname jetstream1.us-east.bsky.network \
    --collections app.bsky.feed.post \
    --compression \
    --zstd-dictionary ./data/zstd_dictionary

# With filtering
cargo run --bin atproto-jetstream-consumer \
    --hostname jetstream1.us-east.bsky.network \
    --collections "app.bsky.feed.post,app.bsky.actor.profile" \
    --dids "did:plc:user123,did:plc:user456"

License

MIT License

Dependencies

~28–45MB
~778K SLoC