#data-replication #cdc #pipeline #change-data-capture #streaming

rigatoni-stores

State store implementations for Rigatoni CDC/Data Replication: Memory, File, Redis for distributed state management

6 releases

new 0.2.0 Dec 12, 2025
0.1.4 Dec 1, 2025
0.1.3 Nov 23, 2025

#1586 in Database interfaces

Apache-2.0

225KB
3K SLoC

rigatoni-stores

State store implementations for Rigatoni ETL framework - persist resume tokens for fault tolerance.

Crates.io Documentation License: Apache-2.0

Overview

State store implementations for persisting MongoDB change stream resume tokens, enabling fault-tolerant ETL pipelines with exactly-once or at-least-once semantics.

Supported Stores

Memory Store (Available)

  • Fast - In-memory HashMap for development/testing
  • Thread-safe - Uses Arc<RwLock<HashMap>>
  • No persistence - Data lost on restart

File Store (Available)

  • Persistent - JSON files on disk
  • Human-readable - Easy to inspect and debug
  • One file per collection - Organized storage

Redis Store (Available)

  • Distributed - Share state across multiple pipeline instances
  • Distributed Locking - Redis-based locking for horizontal scaling without duplicates
  • Connection pooling - Efficient connection management with deadpool
  • Production-ready - For multi-instance deployments
  • TTL support - Optional token expiration
  • Retry logic - Automatic retries with exponential backoff

Installation

[dependencies]
rigatoni-stores = { version = "0.2.0", features = ["memory", "file", "redis-store"] }

Available Features

  • memory - In-memory store (enabled by default)
  • file - File-based store (enabled by default)
  • redis-store - Redis store with connection pooling and retry logic
  • all-stores - All store implementations

Quick Start

Memory Store (Development/Testing)

use rigatoni_stores::memory::MemoryStore;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let store = MemoryStore::new();

    // Use with Rigatoni pipeline
    // let pipeline = Pipeline::with_store(config, destination, store).await?;

    Ok(())
}

File Store (Persistent)

use rigatoni_stores::file::FileStore;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Stores resume tokens in ./state/ directory
    let store = FileStore::new("./state").await?;

    // Use with Rigatoni pipeline
    // let pipeline = Pipeline::with_store(config, destination, store).await?;

    Ok(())
}

Redis Store (Distributed)

use rigatoni_stores::redis::{RedisStore, RedisConfig};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Configure Redis with connection pooling and TTL
    let config = RedisConfig::builder()
        .url("redis://localhost:6379")
        .pool_size(10)
        .ttl(Duration::from_secs(7 * 24 * 60 * 60)) // 7 days
        .max_retries(3)
        .build()?;

    let store = RedisStore::new(config).await?;

    // Use with Rigatoni pipeline for distributed state
    // let pipeline = Pipeline::with_store(config, destination, store).await?;

    Ok(())
}

Redis Configuration Options

  • url - Redis connection URL (supports redis:// and rediss:// schemes)
  • pool_size - Connection pool size (default: 10)
  • ttl - Optional expiration time for resume tokens (recommended: 7-30 days)
  • max_retries - Maximum retry attempts for transient errors (default: 3)
  • connection_timeout - Connection timeout duration (default: 5 seconds)

Note: Redis Cluster mode is not currently implemented. Use Redis Sentinel for high availability.

State Store Trait

All stores implement the StateStore trait from rigatoni-core:

use rigatoni_core::store::StateStore;

#[async_trait]
pub trait StateStore: Send + Sync {
    /// Save a resume token for a collection
    async fn save_resume_token(&self, collection: &str, token: Document)
        -> Result<(), StateError>;

    /// Load a resume token for a collection
    async fn load_resume_token(&self, collection: &str)
        -> Result<Option<Document>, StateError>;

    /// Clear a resume token for a collection
    async fn clear_resume_token(&self, collection: &str)
        -> Result<(), StateError>;
}

Custom Store Implementation

Implement your own store for custom backends:

use rigatoni_core::store::{StateStore, StateError};
use async_trait::async_trait;
use bson::Document;

pub struct CustomStore {
    // Your storage backend
}

#[async_trait]
impl StateStore for CustomStore {
    async fn save_resume_token(&self, collection: &str, token: Document)
        -> Result<(), StateError>
    {
        // Your implementation
        Ok(())
    }

    async fn load_resume_token(&self, collection: &str)
        -> Result<Option<Document>, StateError>
    {
        // Your implementation
        Ok(None)
    }

    async fn clear_resume_token(&self, collection: &str)
        -> Result<(), StateError>
    {
        // Your implementation
        Ok(())
    }
}

Use Cases

Development/Testing

Use Memory Store for fast iteration without persistence

Single-Instance Production

Use File Store for simple, reliable persistence

Multi-Instance Production

Use Redis Store for distributed state across pipeline instances with:

  • Shared resume tokens across multiple workers
  • Distributed locking for horizontal scaling without duplicate processing
  • Connection pooling for efficient Redis usage
  • Automatic retry logic for transient failures
  • Optional TTL to prevent unbounded growth

See the Multi-Instance Deployment Guide for Kubernetes examples and configuration.

Documentation

License

Licensed under the Apache License, Version 2.0 (LICENSE or http://www.apache.org/licenses/LICENSE-2.0).

Dependencies

~22–38MB
~555K SLoC