6 releases
| new 0.2.0 | Dec 12, 2025 |
|---|---|
| 0.1.4 | Dec 1, 2025 |
| 0.1.3 | Nov 23, 2025 |
#1586 in Database interfaces
225KB
3K
SLoC
rigatoni-stores
State store implementations for Rigatoni ETL framework - persist resume tokens for fault tolerance.
Overview
State store implementations for persisting MongoDB change stream resume tokens, enabling fault-tolerant ETL pipelines with exactly-once or at-least-once semantics.
Supported Stores
Memory Store (Available)
- Fast - In-memory HashMap for development/testing
- Thread-safe - Uses
Arc<RwLock<HashMap>> - No persistence - Data lost on restart
File Store (Available)
- Persistent - JSON files on disk
- Human-readable - Easy to inspect and debug
- One file per collection - Organized storage
Redis Store (Available)
- Distributed - Share state across multiple pipeline instances
- Distributed Locking - Redis-based locking for horizontal scaling without duplicates
- Connection pooling - Efficient connection management with deadpool
- Production-ready - For multi-instance deployments
- TTL support - Optional token expiration
- Retry logic - Automatic retries with exponential backoff
Installation
[dependencies]
rigatoni-stores = { version = "0.2.0", features = ["memory", "file", "redis-store"] }
Available Features
memory- In-memory store (enabled by default)file- File-based store (enabled by default)redis-store- Redis store with connection pooling and retry logicall-stores- All store implementations
Quick Start
Memory Store (Development/Testing)
use rigatoni_stores::memory::MemoryStore;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let store = MemoryStore::new();
// Use with Rigatoni pipeline
// let pipeline = Pipeline::with_store(config, destination, store).await?;
Ok(())
}
File Store (Persistent)
use rigatoni_stores::file::FileStore;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Stores resume tokens in ./state/ directory
let store = FileStore::new("./state").await?;
// Use with Rigatoni pipeline
// let pipeline = Pipeline::with_store(config, destination, store).await?;
Ok(())
}
Redis Store (Distributed)
use rigatoni_stores::redis::{RedisStore, RedisConfig};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure Redis with connection pooling and TTL
let config = RedisConfig::builder()
.url("redis://localhost:6379")
.pool_size(10)
.ttl(Duration::from_secs(7 * 24 * 60 * 60)) // 7 days
.max_retries(3)
.build()?;
let store = RedisStore::new(config).await?;
// Use with Rigatoni pipeline for distributed state
// let pipeline = Pipeline::with_store(config, destination, store).await?;
Ok(())
}
Redis Configuration Options
url- Redis connection URL (supportsredis://andrediss://schemes)pool_size- Connection pool size (default: 10)ttl- Optional expiration time for resume tokens (recommended: 7-30 days)max_retries- Maximum retry attempts for transient errors (default: 3)connection_timeout- Connection timeout duration (default: 5 seconds)
Note: Redis Cluster mode is not currently implemented. Use Redis Sentinel for high availability.
State Store Trait
All stores implement the StateStore trait from rigatoni-core:
use rigatoni_core::store::StateStore;
#[async_trait]
pub trait StateStore: Send + Sync {
/// Save a resume token for a collection
async fn save_resume_token(&self, collection: &str, token: Document)
-> Result<(), StateError>;
/// Load a resume token for a collection
async fn load_resume_token(&self, collection: &str)
-> Result<Option<Document>, StateError>;
/// Clear a resume token for a collection
async fn clear_resume_token(&self, collection: &str)
-> Result<(), StateError>;
}
Custom Store Implementation
Implement your own store for custom backends:
use rigatoni_core::store::{StateStore, StateError};
use async_trait::async_trait;
use bson::Document;
pub struct CustomStore {
// Your storage backend
}
#[async_trait]
impl StateStore for CustomStore {
async fn save_resume_token(&self, collection: &str, token: Document)
-> Result<(), StateError>
{
// Your implementation
Ok(())
}
async fn load_resume_token(&self, collection: &str)
-> Result<Option<Document>, StateError>
{
// Your implementation
Ok(None)
}
async fn clear_resume_token(&self, collection: &str)
-> Result<(), StateError>
{
// Your implementation
Ok(())
}
}
Use Cases
Development/Testing
Use Memory Store for fast iteration without persistence
Single-Instance Production
Use File Store for simple, reliable persistence
Multi-Instance Production
Use Redis Store for distributed state across pipeline instances with:
- Shared resume tokens across multiple workers
- Distributed locking for horizontal scaling without duplicate processing
- Connection pooling for efficient Redis usage
- Automatic retry logic for transient failures
- Optional TTL to prevent unbounded growth
See the Multi-Instance Deployment Guide for Kubernetes examples and configuration.
Documentation
License
Licensed under the Apache License, Version 2.0 (LICENSE or http://www.apache.org/licenses/LICENSE-2.0).
Dependencies
~22–38MB
~555K SLoC