Readme
XERV: Workflow Orchestration Engine
XERV is a workflow orchestration platform for building data pipelines and automations. It combines the accessibility of low-code tools (n8n, Zapier) with the reliability of systems engineering (Temporal, Rust).
Key characteristics:
Memory-mapped arena for efficient data passing between pipeline stages
Async pipeline execution with topological DAG scheduling
YAML-based flow definitions with no visual editor needed
Standard library nodes for merge, split, switch, loop operations
Testable workflows with mock providers for time, HTTP, filesystems
Fault-tolerant execution via write-ahead logs (WAL) and crash recovery
Quick Start
Installation
Add to your Cargo.toml :
[ dependencies ]
xerv-core = " 0.1"
xerv-executor = " 0.1"
xerv-nodes = " 0.1"
Write a Flow (YAML)
Create flows/ order_processing. yaml :
name : order_processing
version : " 1.0"
triggers :
- id : api_webhook
type : webhook
params :
port : 8080
path : /orders
nodes :
fraud_check :
type : std::switch
config :
condition :
type : greater_than
field : risk_score
value : 0. 8
process_safe :
type : std::log
config :
message : " Processing safe order"
process_risky :
type : std::log
config :
message : " Flagging risky order"
merge_results :
type : std::merge
config :
strategy : wait_all
edges :
- from : api_webhook
to : fraud_check
- from : fraud_check.false
to : process_safe
- from : fraud_check.true
to : process_risky
- from : process_safe
to : merge_results
- from : process_risky
to : merge_results
Load and Execute
use xerv_executor:: prelude:: * ;
use xerv_nodes:: prelude:: * ;
# [ tokio ::main ]
async fn main ( ) -> Result < ( ) > {
// Load the flow from YAML
let config = LoaderConfig:: default( ) ;
let flow = FlowLoader:: load_file( " flows/order_processing.yaml" , & config) ? ;
// Create a pipeline controller
let pipeline = Pipeline:: new( flow. loaded) ? ;
// Start listeners (webhooks, cron, etc.)
let listeners = pipeline. start_listeners ( ) . await? ;
// Run traces as they come in
let executor = pipeline. executor ( ) ;
while let Some ( trace) = executor. next_trace ( ) . await {
let result = executor. run ( trace) . await? ;
println! ( " Trace completed: {:?} " , result) ;
}
Ok ( ( ) )
}
Test Your Flow
# [ tokio ::test ]
async fn test_fraud_detection ( ) {
let input = serde_json:: json! ( {
" order_id" : " ORD-123" ,
" risk_score" : 0. 95 ,
" amount" : 5000
} ) ;
let result = FlowRunner:: new( )
. with_fixed_time ( " 2024-01-15T10:00:00Z" )
. set_entry_point ( NodeId:: new( 0 ) )
. run ( serde_json:: to_vec( & input) . unwrap ( ) )
. await
. unwrap ( ) ;
assert! ( result. is_success ( ) ) ;
assert_eq! ( result. completed_nodes. len ( ) , 4 ) ;
}
How XERV Works
Data Flow
graph TD
A[ " Trigger Event" ] - -> B[ " Arena: Write input<br/>(Rkyv serialization)" ]
B - -> C[ " Scheduler: Execute DAG<br/>in topological order" ]
C - -> D[ " Node 0: Read from Arena<br/>via RelPtr<T>" ]
D - -> E[ " Node 0: Write output<br/>to Arena" ]
E - -> F[ " Node 1: Read from Arena<br/>via RelPtr<T>" ]
F - -> G[ " WAL: Record<br/>completed nodes<br/>(durability)" ]
G - -> H[ " Trace Complete /<br/>Error" ]
Architecture
graph TB
subgraph CP [ " Control Plane (Pipeline lifecycle)" ]
CP1 [ " Start/pause/stop/drain" ]
CP2 [ " Listener management<br/>(webhooks, cron, fs watches)" ]
CP3 [ " Trace routing to executor" ]
end
subgraph EP [ " Execution Plane (Executor + Scheduler)" ]
EP1 [ " DAG validation" ]
EP2 [ " Topological sort" ]
EP3 [ " Concurrent node execution" ]
EP4 [ " Selector resolution<br/>(${node.field} → arena offsets)" ]
end
subgraph DP [ " Data Plane (Arena + WAL)" ]
DP1 [ " Memory-mapped append-only arena per trace" ]
DP2 [ " Write-ahead log for crash recovery" ]
DP3 [ " Memory-efficient RelPtr<T><br/>access between nodes" ]
end
CP < - -> EP
EP < - -> DP
Core Concepts
Nodes
A node is a discrete unit of work. Nodes:
Have typed input/output ports
Execute async functions
Write results to the arena
Can be written in Rust or WebAssembly
Standard library nodes:
std:: merge - N→1 barrier (wait for all inputs)
std:: split - 1→N fan-out (iterate over collection)
std:: switch - Conditional routing based on expression
std:: loop - Controlled iteration with exit condition
std:: map - Field renaming and transformation
std:: concat - String concatenation
std:: aggregate - Numeric aggregation (sum, avg, min, max)
Edges
An edge connects output port of one node to input port of another. Format:
from: node_name. port_name
to: other_node. input_port_name
For conditional nodes like std:: switch, use special ports:
from: fraud_check. true # When condition is true
to: process_risky
from: fraud_check. false # When condition is false
to: process_safe
Selectors
A selector is a template string for referencing data:
$ { node_name. field. path}
$ { pipeline. config. max_value}
Selectors are resolved at runtime by the linker , which:
Parses selector expressions from config
Maps node fields to arena offsets (RelPtr)
Provides type-safe access during execution
Example in a node config:
nodes :
check_limit :
type : std::switch
config :
condition :
type : greater_than
field : amount
value : ${pipeline.config.limit} # Resolved at link time
Arena
The arena is a memory-mapped file where all trace data lives. Each trace gets its own arena at /tmp/xerv/trace_{uuid}.bin .
Layout:
graph TD
A[ " Header<br/>(metadata + config offset)" ]
B[ " Pipeline Config<br/>(rkyv-serialized)" ]
C[ " Data Region<br/>(append-only entries)" ]
D[ " Entry 0: [size: u32][rkyv bytes]" ]
E[ " Entry 1: [size: u32][rkyv bytes]" ]
F[ " ..." ]
A - -> B
B - -> C
C - -> D
D - -> E
E - -> F
style A fill: # e1f5ff
style B fill: # e1f5ff
style C fill: # fff3e0
style D fill: # fff3e0
style E fill: # fff3e0
style F fill: # fff3e0
Nodes access data via relative pointers (RelPtr< T> ), which are stable across process restarts since they're offsets from the arena base.
Write-Ahead Log (WAL)
The WAL records node completions before execution continues. On crash:
Incomplete nodes replay from their input
Complete nodes skip execution
Results are read from the arena
This enables fault-tolerant trace execution.
Next Steps
Project Structure
xerv/
├── xerv- core/ # Arena, WAL , core traits ( Node, Schema, Context)
├── xerv- nodes/ # Standard library ( merge, split, switch, loop )
├── xerv- executor/ # Scheduler, linker, pipeline controller, REST API
├── xerv- cli/ # CLI binary ( deploy, dev, inspect, bench)
└── xerv- macros/ # Procedural macros ( # [ xerv ::node ] , # [ xerv ::schema ] )
Development
# Build all crates
cargo build --release
# Run tests
cargo test --all
# Check code quality
cargo clippy --all-targets -- -D warnings
cargo fmt --all --check
License
Apache License 2.0 - See LICENSE for details.