6 releases
| 0.2.0 | Dec 12, 2025 |
|---|---|
| 0.1.4 | Dec 1, 2025 |
| 0.1.3 | Nov 23, 2025 |
#2390 in Database interfaces
Used in 2 crates
235KB
3K
SLoC
rigatoni-core
Core traits, pipeline orchestration, and MongoDB integration for the Rigatoni CDC/Data Replication framework.
Overview
rigatoni-core provides the foundational components for building data replication pipelines with Rigatoni:
- Pipeline Orchestration - Multi-worker architecture with retry logic and graceful shutdown
- MongoDB Source - Real-time change stream integration with resume token support
- Destination Trait - Generic interface for pluggable output destinations
- Event Model - Type-safe change event representation
- State Management - Resume token persistence for fault tolerance
- Distributed Locking - Redis-based locking for horizontal scaling without duplicates
- Metrics - Prometheus-compatible metrics for observability
Installation
Add this to your Cargo.toml:
[dependencies]
rigatoni-core = "0.2.0"
Quick Start
use rigatoni_core::pipeline::{Pipeline, PipelineConfig};
use rigatoni_stores::memory::MemoryStore;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = PipelineConfig::builder()
.mongodb_uri("mongodb://localhost:27017")
.database("mydb")
.collections(vec!["users".to_string()])
.batch_size(1000)
.build()?;
let store = MemoryStore::new();
let destination = /* your destination */;
let mut pipeline = Pipeline::new(config, store, destination).await?;
pipeline.start().await?;
Ok(())
}
Features
This crate includes:
- MongoDB change stream source (default)
- Pipeline orchestration with batching and retry
- Distributed locking for multi-instance deployments
- Event model and destination trait
- State store trait for resume tokens
- Prometheus-compatible metrics
Documentation
License
Licensed under the Apache License, Version 2.0 (LICENSE or http://www.apache.org/licenses/LICENSE-2.0).
Dependencies
~24–44MB
~582K SLoC