workflow

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2026 License: Apache-2.0 Imports: 18 Imported by: 0

README

Workflow

A Go library for defining and running multi-step processes as directed graphs. You get conditional branching, parallel execution, expression-driven templates, durable checkpointing, and the ability to suspend and resume on signals or wall-clock waits.

Think of it like a lightweight hybrid of Temporal and AWS Step Functions. The execution engine is all that lives inside; everything that doesn't belong inside an engine — storage, queues, leasing — is an interface you implement however you like.

Edge conditions and ${...} parameter templates are evaluated by github.com/deepnoodle-ai/expr, a small zero-dependency expression evaluator with a Go-like syntax. It's the only external dependency of the root module.

Main concepts

Concept Description
Workflow A repeatable process defined as a directed graph of steps
Step A node in the graph — runs an activity, joins branches, sleeps, waits, or pauses
Activity A function that performs the actual work
Edge Defines flow between steps, optionally guarded by a condition
Execution A single run of a workflow, with its own state
Branch An independent execution thread with its own copy of state
State Branch-local mutable variables that activities read and write
Runner Convenience entry point that composes heartbeat, timeout, resume, and hooks

Quick example

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/deepnoodle-ai/workflow"
	"github.com/deepnoodle-ai/workflow/activities"
)

func main() {
	attempt := 0

	myOperation := func(ctx workflow.Context, _ map[string]any) (string, error) {
		attempt++
		if attempt < 3 {
			return "", fmt.Errorf("service is temporarily unavailable")
		}
		return "SUCCESS", nil
	}

	wf, err := workflow.New(workflow.Options{
		Name: "demo",
		Steps: []*workflow.Step{
			{
				Name:     "Call My Operation",
				Activity: "my_operation",
				Store:    "result",
				Retry:    []*workflow.RetryConfig{{MaxRetries: 2}},
				Next:     []*workflow.Edge{{Step: "Finish"}},
			},
			{
				Name:     "Finish",
				Activity: "print",
				Parameters: map[string]any{
					"message": "Workflow completed. Result: ${state.result}",
				},
			},
		},
		Outputs: []*workflow.Output{
			{Name: "result", Variable: "result"},
		},
	})
	if err != nil {
		log.Fatal(err)
	}

	reg := workflow.NewActivityRegistry()
	reg.MustRegister(
		workflow.TypedActivityFunc("my_operation", myOperation),
		activities.NewPrintActivity(),
	)

	exec, err := workflow.NewExecution(wf, reg)
	if err != nil {
		log.Fatal(err)
	}

	runner := workflow.NewRunner()
	result, err := runner.Run(context.Background(), exec)
	if err != nil {
		log.Fatal(err)
	}
	if !result.Completed() {
		log.Fatalf("execution did not complete: %v", result.Error)
	}

	if got, ok := result.OutputString("result"); ok {
		fmt.Println("final result:", got)
	}
}

For one-shot scripts and tests, calling exec.Execute(ctx) directly is perfectly fine. The Runner is the convenient wrapper when you also want heartbeating, default timeouts, resume-from-checkpoint, and completion hooks.

Bring your own storage

The library is a pure execution engine — it doesn't ship a database, a queue, or a UI, and it never will. Instead it defines small interfaces (Checkpointer, StepProgressStore, ActivityLogger, SignalStore, WorkflowRegistry) and you wire up whatever backends fit your stack. The bundled MemoryCheckpointer and FileCheckpointer are great for development and tests.

If you'd rather not start from scratch, the experimental/ submodules give you a head start:

  • experimental/worker/ — a queue-backed durable worker with claim loop, heartbeat, reaper, and credit reconciliation. Handlers receive a *HandlerContext carrying the current Claim plus pre-fenced stores. The runquery subpackage exposes a backend-neutral read API (GetRun, ListRuns, CountRuns, DeleteRun) so dashboards can stay storage-agnostic.
  • experimental/store/postgres/ — pgx-backed persistence that implements every store interface plus runquery.Store. Schema namespace is configurable via WithSchema(...) so it can live alongside other tables.
  • experimental/store/sqlite/ — the same surface backed by database/sql. Single-writer, perfect for dev and single-process deployments.

These submodules have their own go.mod, so the root module stays stdlib-only. Their APIs are still being shaped — expect some churn.

Where to look next

  • documentation/ — friendly user guides for activities, branching, checkpointing, expressions, signals, sleep, pause, state management, testing, and more.
  • llms.txt — the full API reference, including the JSON workflow format and the script-compiler interface.
  • docs/worker.md and docs/postgres.md — guides for the experimental worker and Postgres store.
  • docs/suspension.md — the suspend / resume / replay-safety contract.
  • MIGRATION.md — every breaking change between pre-v1 and v1, with before/after snippets.
  • examples/ — runnable example programs covering branching, joins, retries, child workflows, and the suspend/resume primitives (signal_wait, durable_sleep, pause_unpause).

Documentation

Index

Constants

View Source
const (
	// ErrorTypeAll acts as a wildcard that matches any error except
	// fatal errors. A retry/catch pattern of ErrorTypeAll will NOT
	// match an error classified as ErrorTypeFatal — fatal errors are
	// matchable only by an explicit ErrorTypeFatal pattern. This is
	// the documented escape valve for "this error must not be
	// retried, even by callers using the default catch-all pattern."
	ErrorTypeAll = "all"

	// ErrorTypeActivityFailed matches any error except timeouts and fatal errors
	ErrorTypeActivityFailed = "activity_failed"

	// ErrorTypeTimeout matches an error that wraps
	// context.DeadlineExceeded or workflow.ErrWaitTimeout. Substring
	// matching of the literal string "timeout" is intentionally NOT
	// done — too many error messages contain the word incidentally.
	// Surface a real timeout via context.DeadlineExceeded or
	// ErrWaitTimeout for it to be classified here.
	ErrorTypeTimeout = "timeout"

	// ErrorTypeFatal indicates an execution failed due to a fatal error.
	// The approach we're taking is that by default, unknown errors are
	// classified as activity failed errors. This is because we want to
	// allow retries on unknown errors by default. If we know a specific
	// error should NOT be retried, it should have type=ErrorTypeFatal set.
	ErrorTypeFatal = "fatal_error"
)

Error type constants for classification and matching

View Source
const CheckpointSchemaVersion = 1

CheckpointSchemaVersion is the current wire-format version for checkpoints written by this library. It is embedded in every saved Checkpoint so readers can detect and reject incompatible shapes.

Version contract:

  • v1: Initial stable schema. Locked at v1 release.

Consumers that persist checkpoints to their own storage MUST treat SchemaVersion as a compatibility signal: if a loaded checkpoint has a SchemaVersion higher than the library version understands, or lower than the minimum supported version (1), the load should fail rather than proceed with a potentially wrong interpretation.

Variables

View Source
var (
	// ErrDuplicateStepName is reported when two steps share a name.
	ErrDuplicateStepName = errors.New("workflow: duplicate step name")
	// ErrEmptyStepName is reported when a step has no name.
	ErrEmptyStepName = errors.New("workflow: empty step name")
	// ErrUnknownStartStep is reported when Options.StartAt names a step
	// that does not exist in the workflow.
	ErrUnknownStartStep = errors.New("workflow: start step not found")
	// ErrUnknownEdgeTarget is reported when an edge points at a step
	// that does not exist in the workflow.
	ErrUnknownEdgeTarget = errors.New("workflow: edge destination not found")
	// ErrUnknownCatchTarget is reported when a catch handler points at
	// a step that does not exist in the workflow.
	ErrUnknownCatchTarget = errors.New("workflow: catch destination not found")
	// ErrUnknownJoinBranch is reported when JoinConfig.Branches names
	// a branch that no upstream edge declares.
	ErrUnknownJoinBranch = errors.New("workflow: join branch not found")
	// ErrInvalidStepKind is reported when a step mixes multiple step
	// kinds (activity/join/wait_signal/sleep/pause).
	ErrInvalidStepKind = errors.New("workflow: conflicting step kinds")
	// ErrInvalidModifier is reported when a modifier field (Retry,
	// Catch) is attached to a step kind that cannot use it.
	ErrInvalidModifier = errors.New("workflow: modifier not allowed on step kind")
	// ErrInvalidRetryConfig is reported when a RetryConfig has
	// nonsensical bounds (negative retries, MaxDelay < BaseDelay, etc.).
	ErrInvalidRetryConfig = errors.New("workflow: invalid retry config")
	// ErrInvalidSleepConfig is reported when a SleepConfig has a
	// non-positive Duration.
	ErrInvalidSleepConfig = errors.New("workflow: invalid sleep config")
	// ErrInvalidWaitConfig is reported when a WaitSignalConfig has a
	// missing topic, non-positive timeout, or dangling OnTimeout.
	ErrInvalidWaitConfig = errors.New("workflow: invalid wait_signal config")
	// ErrReservedBranchName is reported when a named branch uses the
	// reserved name "main".
	ErrReservedBranchName = errors.New("workflow: branch name 'main' is reserved")
	// ErrDuplicateBranchName is reported when two edges declare the
	// same branch name.
	ErrDuplicateBranchName = errors.New("workflow: duplicate branch name")
	// ErrUnknownActivity is reported when a step references an activity
	// name that is not registered on the ActivityRegistry passed to
	// NewExecution. Surfaced as a ValidationProblem on *ValidationError.
	ErrUnknownActivity = errors.New("workflow: activity not registered")
	// ErrInvalidTemplate is reported when a parameter template or
	// WaitSignalConfig.Topic template fails to parse or compile.
	ErrInvalidTemplate = errors.New("workflow: invalid template")
	// ErrInvalidExpression is reported when an edge condition or
	// parameter script expression fails to compile.
	ErrInvalidExpression = errors.New("workflow: invalid expression")
	// ErrInvalidStorePath is reported when a Store field
	// (Step.Store, WaitSignalConfig.Store, CatchConfig.Store,
	// Output.Variable) is given with a leading "state." prefix. Store
	// fields must be bare variable names.
	ErrInvalidStorePath = errors.New("workflow: store field must be a bare variable name")
)

Structural validation sentinels. All are reported as ValidationProblem fields on *ValidationError when workflow.New runs.

View Source
var ErrAlreadyStarted = errors.New("workflow: execution already started")

ErrAlreadyStarted is returned when Run/Execute is called on an Execution that has already been started.

View Source
var ErrBranchNotFound = errors.New("workflow: branch not found")

ErrBranchNotFound is returned by PauseBranch / UnpauseBranch when the given branch ID is not present in the execution's branch states.

View Source
var ErrDuplicateActivity = errors.New("workflow: duplicate activity registration")

ErrDuplicateActivity is returned when Register is called with an activity name that has already been registered.

View Source
var ErrFenceViolation = errors.New("fence violation: lease lost")

ErrFenceViolation is returned when a fence check fails, indicating the worker has lost its lease and should stop processing. ErrFenceViolation is always wrapped with the original fence check error for context.

ErrFenceViolation bypasses retry and catch handlers. The engine treats it as non-retryable and non-catchable, similar to ErrorTypeFatal. A lost lease is not a recoverable activity error — retrying on the same worker is pointless and catching it would mask the real problem.

View Source
var ErrInvalidHeartbeatInterval = errors.New("workflow: heartbeat interval must be positive")

ErrInvalidHeartbeatInterval is returned when a HeartbeatConfig has a non-positive Interval.

View Source
var ErrNilExecution = errors.New("workflow: execution must not be nil")

ErrNilExecution is returned when Runner.Run receives a nil *Execution.

View Source
var ErrNilHeartbeatFunc = errors.New("workflow: heartbeat func must not be nil")

ErrNilHeartbeatFunc is returned when a HeartbeatConfig has a nil Func.

View Source
var ErrNoCheckpoint = errors.New("workflow: no checkpoint found")

ErrNoCheckpoint is returned when Resume or RunOrResume cannot find a checkpoint for the given execution ID. Use errors.Is to check for it.

View Source
var ErrWaitTimeout = errors.New("workflow: wait timed out")

ErrWaitTimeout is returned from Context.Wait when the wait's deadline has passed and no signal was delivered. Catch handlers can match on ErrorTypeTimeout to route timeouts to a recovery step.

Functions

func DefaultScriptCompiler added in v0.0.2

func DefaultScriptCompiler() script.Compiler

DefaultScriptCompiler returns a script.Compiler backed by github.com/deepnoodle-ai/expr with the standard builtin function set enabled. This is the compiler used by NewExecution when ExecutionOptions.ScriptCompiler is nil — it handles edge conditions and ${...} parameter templates out of the box.

expr is expression-only: it cannot mutate state, so workflows that need state-mutating scripts must provide their own script.Compiler (for example, by wrapping a language like Risor behind the script.Compiler interface).

func MatchesErrorType

func MatchesErrorType(err error, errorType string) bool

MatchesErrorType checks if an error matches a specified error type pattern

func NewContext

func NewContext(ctx context.Context, opts ExecutionContextOptions) *executionContext

NewContext creates a new workflow context with direct state access.

func NewExecutionID

func NewExecutionID() string

NewExecutionID returns a new opaque ID suitable for identifying an execution. Format: "exec_" followed by 16 bytes of base32-encoded entropy (26 chars, lowercased, no padding).

func NewJSONLogger

func NewJSONLogger() *slog.Logger

NewJSONLogger returns a logger that writes to stdout in JSON format. It is provided as a convenience for simple consumers; any *slog.Logger passed via ExecutionOptions.Logger will be respected by the engine.

func OutputAs added in v0.0.2

func OutputAs[T any](r *ExecutionResult, key string) (T, bool)

OutputAs returns the output at key coerced to T and whether the type assertion succeeded. Generic counterpart to OutputString / OutputBool for arbitrary types — useful when consumers store custom structs in workflow outputs.

Returns (zero T, false) if the key is missing or the value cannot be type-asserted to T. No JSON-style conversion is performed; the value must already be of type T (or assignable to it).

func PauseBranchInCheckpoint added in v0.0.2

func PauseBranchInCheckpoint(ctx context.Context, cp Checkpointer, executionID, branchID, reason string) error

PauseBranchInCheckpoint loads the latest checkpoint for the given execution, flips the target branch's PauseRequested flag, and saves the checkpoint back. It is the operator-facing entry point for pausing an execution that is not currently loaded in any host process.

If the branch is parked on a durable wait (signal-wait or sleep), the wait's absolute WakeAt is captured into Remaining and cleared, so the pause duration does not consume the wait's timeout budget. See freezeWaitOnPause.

The operation is a non-atomic load-modify-write against the Checkpointer. If a host process is concurrently running the same execution, the save here may race with the host's own checkpoint writes. Consumers requiring strict atomicity should use a Checkpointer implementation that serializes writes (e.g. a Postgres backend with row-level locking or optimistic concurrency on a version column).

PauseBranchInCheckpoint is idempotent: pausing an already-paused branch is a no-op save. Returns ErrNoCheckpoint if no checkpoint exists for the execution ID, or ErrBranchNotFound if the branch is not in the checkpoint.

func UnpauseBranchInCheckpoint added in v0.0.2

func UnpauseBranchInCheckpoint(ctx context.Context, cp Checkpointer, executionID, branchID string) error

UnpauseBranchInCheckpoint is the operator-facing entry point for clearing a branch's pause flag in a checkpoint without loading the execution. If the branch's wait was frozen by a prior pause, its WakeAt is rebased to now + Remaining so the wait resumes with the time it had left at pause time. See PauseBranchInCheckpoint for the concurrency contract.

Types

type Activity

type Activity interface {

	// Name returns the name of the Activity
	Name() string

	// Execute the Activity with the given parameters.
	Execute(ctx Context, parameters map[string]any) (any, error)
}

Activity represents an action that can be executed as part of a workflow.

func ActivityFunc added in v0.0.2

func ActivityFunc(name string, fn ExecuteActivityFunc) Activity

ActivityFunc returns an Activity backed by fn. The returned value implements the Activity interface, mirroring http.HandlerFunc.

func NewTypedActivity

func NewTypedActivity[TParams, TResult any](activity TypedActivity[TParams, TResult]) Activity

NewTypedActivity creates a new typed activity that implements the Activity interface

func TypedActivityFunc added in v0.0.2

func TypedActivityFunc[TParams, TResult any](name string, fn func(ctx Context, params TParams) (TResult, error)) Activity

TypedActivityFunc returns an Activity backed by a strongly-typed function. Parameters are JSON-marshalled into TParams by the adapter layer.

type ActivityExecutionEvent

type ActivityExecutionEvent struct {
	ExecutionID  string
	WorkflowName string
	BranchID     string
	StepName     string
	ActivityName string
	Parameters   map[string]any
	Result       any
	StartTime    time.Time
	EndTime      time.Time
	Duration     time.Duration
	Error        error
}

ActivityExecutionEvent provides context for activity execution events

type ActivityLogEntry

type ActivityLogEntry struct {
	ID          string                 `json:"id"`
	ExecutionID string                 `json:"execution_id"`
	Activity    string                 `json:"activity"`
	StepName    string                 `json:"step_name"`
	BranchID    string                 `json:"branch_id"`
	Parameters  map[string]interface{} `json:"parameters"`
	Result      interface{}            `json:"result,omitempty"`
	Error       string                 `json:"error,omitempty"`
	StartTime   time.Time              `json:"start_time"`
	Duration    float64                `json:"duration"`
}

ActivityLogEntry represents a single operation log entry

type ActivityLogger

type ActivityLogger interface {
	// LogActivity logs a completed activity
	LogActivity(ctx context.Context, entry *ActivityLogEntry) error

	// GetActivityHistory retrieves activity log for an execution
	GetActivityHistory(ctx context.Context, executionID string) ([]*ActivityLogEntry, error)
}

ActivityLogger defines simple operation logging interface

type ActivityRegistry

type ActivityRegistry struct {
	// contains filtered or unexported fields
}

ActivityRegistry owns the set of activities an Execution can call by name. It is opaque — consumers construct one via NewActivityRegistry and add activities through Register or MustRegister. The registry is read-only once passed to NewExecution.

func NewActivityRegistry added in v0.0.2

func NewActivityRegistry() *ActivityRegistry

NewActivityRegistry returns an empty registry.

func (*ActivityRegistry) Get added in v0.0.2

func (r *ActivityRegistry) Get(name string) (Activity, bool)

Get returns the activity registered under name, if any.

func (*ActivityRegistry) MustRegister added in v0.0.2

func (r *ActivityRegistry) MustRegister(a Activity) *ActivityRegistry

MustRegister panics on registration failure. Returns the registry so calls can chain in init() or a builder expression.

func (*ActivityRegistry) Names added in v0.0.2

func (r *ActivityRegistry) Names() []string

Names returns the registered activity names in sorted order.

func (*ActivityRegistry) Register added in v0.0.2

func (r *ActivityRegistry) Register(a Activity) error

Register adds an activity to the registry. Returns ErrDuplicateActivity if an activity with the same name is already registered.

type AtomicCheckpointer added in v0.0.2

type AtomicCheckpointer interface {
	// AtomicUpdate loads the checkpoint for the given execution,
	// runs fn against the loaded copy, and saves the result. The
	// entire read-modify-write cycle must be atomic with respect to
	// other writers of the same execution.
	//
	// If fn returns an error the checkpoint MUST NOT be saved; the
	// error is returned verbatim to the caller.
	//
	// If no checkpoint exists for the execution ID, AtomicUpdate
	// returns ErrNoCheckpoint without invoking fn.
	AtomicUpdate(ctx context.Context, executionID string, fn func(*Checkpoint) error) error
}

AtomicCheckpointer is an optional side interface a Checkpointer may implement to expose a compare-and-swap style update path. When the engine needs to mutate a checkpoint in-place (e.g. PauseBranchInCheckpoint), it prefers AtomicUpdate over a naive load-modify-write sequence.

A Checkpointer that does not implement AtomicCheckpointer still works — the engine falls back to load-modify-write and accepts the race window that implies. Backends with real transactional primitives (Postgres SELECT ... FOR UPDATE, Redis WATCH/MULTI, etcd CAS) should implement this interface to close that window.

type BaseExecutionCallbacks

type BaseExecutionCallbacks struct{}

BaseExecutionCallbacks provides a default implementation that does nothing

func (*BaseExecutionCallbacks) AfterActivityExecution

func (n *BaseExecutionCallbacks) AfterActivityExecution(ctx context.Context, event *ActivityExecutionEvent)

func (*BaseExecutionCallbacks) AfterBranchExecution added in v0.0.2

func (n *BaseExecutionCallbacks) AfterBranchExecution(ctx context.Context, event *BranchExecutionEvent)

func (*BaseExecutionCallbacks) AfterWorkflowExecution

func (n *BaseExecutionCallbacks) AfterWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)

func (*BaseExecutionCallbacks) BeforeActivityExecution

func (n *BaseExecutionCallbacks) BeforeActivityExecution(ctx context.Context, event *ActivityExecutionEvent)

func (*BaseExecutionCallbacks) BeforeBranchExecution added in v0.0.2

func (n *BaseExecutionCallbacks) BeforeBranchExecution(ctx context.Context, event *BranchExecutionEvent)

func (*BaseExecutionCallbacks) BeforeWorkflowExecution

func (n *BaseExecutionCallbacks) BeforeWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)

type BranchExecutionEvent added in v0.0.2

type BranchExecutionEvent struct {
	ExecutionID  string
	WorkflowName string
	BranchID     string
	Status       ExecutionStatus
	StartTime    time.Time
	EndTime      time.Time
	Duration     time.Duration
	CurrentStep  string
	StepOutputs  map[string]any
	Error        error
}

BranchExecutionEvent provides context for branch-level execution events

type BranchLocalState added in v0.0.2

type BranchLocalState struct {
	// contains filtered or unexported fields
}

BranchLocalState provides activities with access to workflow input variables and to the execution branch's copy of state variables. It is safe for concurrent use.

BranchLocalState is embedded in executionContext so its methods bubble up as the Context.Get/Set/Delete/Keys methods activity code uses directly.

func NewBranchLocalState added in v0.0.2

func NewBranchLocalState(inputs, variables map[string]any) *BranchLocalState

func (*BranchLocalState) Delete added in v0.0.2

func (s *BranchLocalState) Delete(key string)

Delete removes a branch-local variable.

func (*BranchLocalState) Get added in v0.0.2

func (s *BranchLocalState) Get(key string) (any, bool)

Get returns a branch-local variable and whether it was present.

func (*BranchLocalState) Keys added in v0.0.2

func (s *BranchLocalState) Keys() []string

Keys returns the names of all branch-local variables in sorted order.

func (*BranchLocalState) Set added in v0.0.2

func (s *BranchLocalState) Set(key string, value any)

Set writes a branch-local variable.

type BranchState added in v0.0.2

type BranchState struct {
	ID           string          `json:"id"`
	Status       ExecutionStatus `json:"status"`
	CurrentStep  string          `json:"current_step"`
	StartTime    time.Time       `json:"start_time,omitzero"`
	EndTime      time.Time       `json:"end_time,omitzero"`
	ErrorMessage string          `json:"error_message,omitempty"`
	StepOutputs  map[string]any  `json:"step_outputs"`
	Variables    map[string]any  `json:"variables"`
	// Wait is populated when the branch is hard-suspended on a durable
	// wait (signal-wait or durable sleep). nil otherwise.
	Wait *WaitState `json:"wait,omitempty"`
	// PauseRequested marks a branch as paused by an explicit pause
	// trigger — either an external PauseBranch call or a declarative
	// Pause step. A branch with PauseRequested=true will re-park at its
	// next step boundary after construction; UnpauseBranch must clear
	// the flag before the branch can advance.
	PauseRequested bool `json:"pause_requested,omitempty"`
	// PauseReason is an optional human-readable note describing why
	// the branch was paused. Set by the PauseBranch caller or by a
	// PauseConfig.Reason on a declarative Pause step.
	PauseReason string `json:"pause_reason,omitempty"`
	// ActivityHistory is the persisted cache for the currently
	// executing activity. It survives wait-unwind replays so
	// activities can cache expensive work across suspensions via
	// [Context.History] + [History.RecordOrReplay]. Cleared
	// when the step advances past the activity so there is no
	// cross-step leakage.
	ActivityHistory map[string]any `json:"activity_history,omitempty"`
	// ActivityHistoryStep records which step's activity owns the
	// current ActivityHistory map. executeActivity uses it to scope
	// history access to a single step: if the branch has raced ahead to
	// a new step before the orchestrator cleared the prior step's
	// history, the mismatch discards the stale entries so they do not
	// leak into the next activity.
	ActivityHistoryStep string `json:"activity_history_step,omitempty"`
}

BranchState tracks the state of an execution branch. This struct is designed to be fully JSON serializable.

func (*BranchState) Copy added in v0.0.2

func (p *BranchState) Copy() *BranchState

Copy returns a shallow copy of the branch state.

type CallbackChain

type CallbackChain struct {
	// contains filtered or unexported fields
}

CallbackChain allows chaining multiple callback implementations

func NewCallbackChain

func NewCallbackChain(callbacks ...ExecutionCallbacks) *CallbackChain

NewCallbackChain creates a new callback chain

func (*CallbackChain) Add

func (c *CallbackChain) Add(callback ExecutionCallbacks)

Add adds a callback to the chain

func (*CallbackChain) AfterActivityExecution

func (c *CallbackChain) AfterActivityExecution(ctx context.Context, event *ActivityExecutionEvent)

func (*CallbackChain) AfterBranchExecution added in v0.0.2

func (c *CallbackChain) AfterBranchExecution(ctx context.Context, event *BranchExecutionEvent)

func (*CallbackChain) AfterWorkflowExecution

func (c *CallbackChain) AfterWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)

func (*CallbackChain) BeforeActivityExecution

func (c *CallbackChain) BeforeActivityExecution(ctx context.Context, event *ActivityExecutionEvent)

func (*CallbackChain) BeforeBranchExecution added in v0.0.2

func (c *CallbackChain) BeforeBranchExecution(ctx context.Context, event *BranchExecutionEvent)

func (*CallbackChain) BeforeWorkflowExecution

func (c *CallbackChain) BeforeWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)

type CatchConfig

type CatchConfig struct {
	ErrorEquals []string `json:"error_equals"`
	Next        string   `json:"next"`
	Store       string   `json:"store,omitempty"`
}

CatchConfig configures fallback behavior when errors occur

type Checkpoint

type Checkpoint struct {
	// SchemaVersion is the wire-format version. Set to
	// CheckpointSchemaVersion by the engine on every save.
	SchemaVersion int `json:"schema_version"`

	// ID is the unique identifier for this checkpoint snapshot.
	// Distinct from ExecutionID: a single execution writes many
	// checkpoints over its lifetime.
	ID string `json:"id"`

	// ExecutionID is the ID of the execution this checkpoint belongs
	// to. Stable across all snapshots of the same execution.
	ExecutionID string `json:"execution_id"`

	// WorkflowName is the name of the workflow definition used to
	// produce the execution. Informational — the engine does not
	// consult it on resume.
	WorkflowName string `json:"workflow_name"`

	// Status is the execution status at the time the checkpoint was
	// written.
	Status ExecutionStatus `json:"status"`

	// Inputs is the workflow's input values, as supplied on
	// NewExecution. Immutable for the lifetime of the execution.
	Inputs map[string]any `json:"inputs"`

	// Outputs is the declared workflow outputs extracted from the
	// final branch states when the execution completes. Empty until
	// then.
	Outputs map[string]any `json:"outputs"`

	// Variables is the top-level workflow state, distinct from
	// per-branch state. Used by the orchestrator for shared values
	// that are not branch-local.
	Variables map[string]any `json:"variables"`

	// BranchStates holds the per-branch persisted state, keyed by
	// branch ID. Each entry captures the branch's variables, current
	// step, retry counters, and any active WaitState.
	BranchStates map[string]*BranchState `json:"branch_states"`

	// JoinStates holds the persisted state for active join steps,
	// keyed by join step name.
	JoinStates map[string]*JoinState `json:"join_states"`

	// BranchCounter is the monotonic counter used to allocate
	// branch IDs. Persisted so resumed executions continue to
	// allocate unique IDs.
	BranchCounter int `json:"branch_counter"`

	// Error is the terminal error message when Status is
	// ExecutionStatusFailed. Empty otherwise.
	Error string `json:"error,omitempty"`

	// StartTime is when the execution first began running.
	StartTime time.Time `json:"start_time,omitzero"`

	// EndTime is when the execution reached a terminal status
	// (completed, failed, or canceled). Zero for in-flight or
	// suspended executions.
	EndTime time.Time `json:"end_time,omitzero"`

	// CheckpointAt is when this snapshot was written.
	CheckpointAt time.Time `json:"checkpoint_at"`
}

Checkpoint is the serialized snapshot of an execution. It is the single unit of persistence the engine writes and reads when checkpointing, resuming, and inspecting running or dormant executions.

Round-trip contract

  • The engine marshals Checkpoint to JSON via encoding/json.
  • Consumers may swap a different encoder for storage, but MUST produce a byte stream that round-trips back through the same struct with every field preserved.
  • SchemaVersion is set on every save by the engine. Readers must reject any checkpoint whose SchemaVersion is higher than the library's CheckpointSchemaVersion.

The JSON tag on every field is part of the stable format. Adding a field is always safe (zero value on load from an older writer); renaming or removing a field is a schema break.

Load-bearing fields

The orchestrator's resume logic depends on these fields surviving the round-trip; a custom encoder that drops them will silently corrupt resumed executions:

  • BranchState.Variables — branch-local state that activities read and write. Without it, resumed branches restart from a blank state.
  • BranchState.Wait — the active wait/sleep state. Without it, a branch that was hard-suspended on a signal-wait or sleep can't be re-parked on resume; the engine treats it as a fresh advance.
  • BranchState.PauseRequested — whether the branch was paused by an operator or a Pause step. Without it, a paused branch resumes as if it had never been paused.
  • BranchState.ActivityHistory / ActivityHistoryStep — the per-step replay cache used by Context.History. Without it, activities re-execute side effects on every wait-unwind replay.

All other fields are advisory or recoverable from the workflow definition.

type Checkpointer

type Checkpointer interface {
	// SaveCheckpoint persists the given checkpoint snapshot. The
	// engine sets SchemaVersion = CheckpointSchemaVersion on every
	// call; implementations should record it verbatim.
	SaveCheckpoint(ctx context.Context, checkpoint *Checkpoint) error

	// LoadCheckpoint returns the most recent checkpoint for the
	// given execution ID. Returns ErrNoCheckpoint when no
	// checkpoint exists for the execution. Implementations MUST
	// reject any loaded checkpoint whose SchemaVersion is greater
	// than CheckpointSchemaVersion — the library did not write it
	// and cannot safely interpret it.
	LoadCheckpoint(ctx context.Context, executionID string) (*Checkpoint, error)

	// DeleteCheckpoint removes checkpoint data for an execution.
	DeleteCheckpoint(ctx context.Context, executionID string) error
}

Checkpointer is the small interface the engine uses to persist and load execution snapshots. Consumers plug in their own storage (Postgres, Redis, S3, etc.) by implementing these three methods.

The built-in FileCheckpointer and MemoryCheckpointer exist for development and testing only; production deployments should provide their own implementation.

func WithFencing added in v0.0.2

func WithFencing(inner Checkpointer, fenceCheck FenceFunc) Checkpointer

WithFencing wraps a Checkpointer with a pre-save fence validation. Before each SaveCheckpoint call, fenceCheck is called. If it returns an error, the save is aborted and the error is returned wrapped with ErrFenceViolation.

LoadCheckpoint and DeleteCheckpoint pass through to the inner checkpointer without fence checks.

Use this with distributed workers to prevent stale workers from overwriting checkpoint state after losing their lease.

type ChildWorkflowExecutor

type ChildWorkflowExecutor interface {
	// ExecuteSync runs a child workflow synchronously and waits for completion
	ExecuteSync(ctx context.Context, spec *ChildWorkflowSpec) (*ChildWorkflowResult, error)

	// ExecuteAsync starts a child workflow asynchronously and returns immediately
	ExecuteAsync(ctx context.Context, spec *ChildWorkflowSpec) (*ChildWorkflowHandle, error)

	// GetResult retrieves the result of an asynchronous execution
	GetResult(ctx context.Context, handle *ChildWorkflowHandle) (*ChildWorkflowResult, error)
}

ChildWorkflowExecutor manages child workflow executions

type ChildWorkflowExecutorOptions

type ChildWorkflowExecutorOptions struct {
	WorkflowRegistry WorkflowRegistry
	Activities       []Activity
	Logger           *slog.Logger
	ActivityLogger   ActivityLogger
	Checkpointer     Checkpointer
	// ScriptCompiler is the scripting engine used by child executions.
	// When nil, child executions fall back to DefaultScriptCompiler
	// (github.com/deepnoodle-ai/expr). Set this to override with a
	// different engine.
	ScriptCompiler script.Compiler
	// CleanupTimeout is how long to retain an async child workflow's
	// result in memory after completion before evicting it from the
	// in-flight map. GetResult on an evicted handle returns an error.
	// Zero (the default) means one hour. Negative values disable
	// cleanup entirely (results are retained for the lifetime of the
	// process — useful in tests, dangerous in long-running services).
	CleanupTimeout time.Duration
}

ChildWorkflowExecutorOptions configures a DefaultChildWorkflowExecutor

type ChildWorkflowHandle

type ChildWorkflowHandle struct {
	ExecutionID  string `json:"execution_id"`
	WorkflowName string `json:"workflow_name"`
}

ChildWorkflowHandle represents an asynchronous child workflow execution

type ChildWorkflowResult

type ChildWorkflowResult struct {
	Outputs     map[string]interface{} `json:"outputs"`
	Status      ExecutionStatus        `json:"status"`
	ExecutionID string                 `json:"execution_id"`
	Duration    time.Duration          `json:"duration"`
}

ChildWorkflowResult represents the result of a child workflow execution.

The execution error is the second return value from ExecuteSync/GetResult — it is not duplicated on this struct.

type ChildWorkflowSpec

type ChildWorkflowSpec struct {
	WorkflowName string                 `json:"workflow_name"`
	Inputs       map[string]interface{} `json:"inputs,omitempty"`
	Timeout      time.Duration          `json:"timeout,omitempty"`
	ParentID     string                 `json:"parent_id,omitempty"` // for tracing
}

ChildWorkflowSpec specifies how to execute a child workflow.

Whether the child runs synchronously or asynchronously is selected at the call site by invoking ChildWorkflowExecutor.ExecuteSync or ExecuteAsync — there is no flag on the spec.

type CompletionHook added in v0.0.2

type CompletionHook func(ctx context.Context, result *ExecutionResult) ([]FollowUpSpec, error)

CompletionHook is called after a workflow completes successfully. It returns follow-up specs describing workflows that should be triggered as a result of this execution.

The hook runs synchronously after the execution completes. Keep it fast — it should build descriptors, not execute workflows. The consumer persists the FollowUpSpecs to their own durable outbox for async processing.

Returning an error does not change the execution result — the workflow is still completed. The error is logged and the consumer can inspect result.FollowUps to see what was produced before the error.

type Context

type Context interface {
	context.Context

	// Inputs returns a read-only view over the workflow inputs for
	// this branch.
	Inputs() Inputs

	// Set writes a branch-local variable.
	Set(key string, value any)
	// Get returns a branch-local variable and whether it was present.
	Get(key string) (value any, exists bool)
	// Delete removes a branch-local variable.
	Delete(key string)
	// Keys returns the names of all branch-local variables in sorted
	// order.
	Keys() []string

	// Logger returns the slog.Logger configured on the execution,
	// scoped to this execution and branch.
	Logger() *slog.Logger
	// Compiler returns the script.Compiler configured on the
	// execution.
	Compiler() script.Compiler
	// BranchID returns the ID of the running branch.
	BranchID() string
	// StepName returns the name of the currently executing step.
	StepName() string

	// Wait durably parks the current branch until a signal is
	// delivered to topic, or until timeout elapses.
	//
	// # Behavior
	//
	// On the first invocation in a step, Wait registers a wait
	// state against the branch's checkpoint and returns a sentinel
	// error (waitUnwind) that the engine catches at the activity
	// boundary. The activity goroutine exits, the orchestrator
	// persists a checkpoint with BranchState.Wait set, and the
	// execution ends with Status=ExecutionStatusSuspended.
	//
	// When the consumer delivers a signal to the topic via the
	// execution's SignalStore and calls Resume, the engine re-enters
	// the same activity. On the second invocation, Wait sees the
	// delivered payload and returns it to the caller as the first
	// return value.
	//
	// # Replay safety
	//
	// Activities that call Wait MUST be replay-safe: any side
	// effect that runs before the Wait call will run again on the
	// resumed invocation. Use Context.History to memoize expensive
	// or non-idempotent work. The shape is:
	//
	//	func(ctx Context, p Params) (any, error) {
	//	    result, _ := ctx.History().RecordOrReplay("key", func() (any, error) {
	//	        // side-effecting work runs once, replays from cache
	//	    })
	//	    payload, err := ctx.Wait("topic", timeout)
	//	    // post-wait code runs once after the signal arrives
	//	}
	//
	// # Deadline behavior
	//
	// timeout is wall-clock and starts ticking from when Wait is
	// first called; the engine records an absolute deadline at
	// register time. On resume, the engine recomputes the remaining
	// time against the original deadline; a wait that has already
	// expired wakes immediately with ErrWaitTimeout.
	//
	// Routing a timeout to a step (instead of failing the activity)
	// requires a declarative WaitSignalConfig with OnTimeout set;
	// the imperative Context.Wait always surfaces ErrWaitTimeout to
	// the caller.
	//
	// # Custom Context implementations
	//
	// Test doubles or alternative Context implementations MUST
	// forward Wait to the engine's underlying implementation. Wait
	// depends on engine-internal branch state — the checkpoint, the
	// wait registry, the resume path — and a custom Context that
	// returns nil/error from Wait without touching that state will
	// silently break replay semantics. The workflowtest package is
	// the supported pattern for unit tests.
	Wait(topic string, timeout time.Duration) (any, error)
	// History returns the per-activity-invocation persisted cache.
	// Returns a process-local, non-persistent cache if called outside
	// of an activity invocation.
	History() *History
	// ReportProgress forwards a progress update to the configured
	// StepProgressStore, if any. No-op otherwise.
	ReportProgress(detail ProgressDetail)
}

Context is the activity-facing extension of context.Context. It exposes the workflow inputs, the branch-local state map, progress and signal plumbing, and the small identity values an activity may want to log.

The interface is idiomatic Go: property-style accessors (Logger, BranchID, StepName) rather than GetLogger/GetBranchID, and the variable store methods are the ones you'd expect on a map.

All methods are safe for concurrent use.

func WithCancel

func WithCancel(parent Context) (Context, context.CancelFunc)

WithCancel creates a new workflow context with cancellation.

func WithTimeout

func WithTimeout(parent Context, timeout time.Duration) (Context, context.CancelFunc)

WithTimeout creates a new workflow context with a timeout.

type DefaultChildWorkflowExecutor

type DefaultChildWorkflowExecutor struct {
	// contains filtered or unexported fields
}

DefaultChildWorkflowExecutor provides a basic implementation of ChildWorkflowExecutor

func NewDefaultChildWorkflowExecutor

func NewDefaultChildWorkflowExecutor(opts ChildWorkflowExecutorOptions) (*DefaultChildWorkflowExecutor, error)

NewDefaultChildWorkflowExecutor creates a new DefaultChildWorkflowExecutor

func (*DefaultChildWorkflowExecutor) ExecuteAsync

ExecuteAsync starts a child workflow asynchronously and returns a handle immediately. The child runs in a detached goroutine that uses context.Background(), so the caller's context cancellation does not propagate.

Async vs. checkpoint semantics

The async execution map is in-process state. It is NOT part of the parent execution's checkpoint:

  • If the parent process restarts while an async child is running, the child goroutine dies with the process. The parent's resumed execution will hold a ChildWorkflowHandle that no longer resolves: GetResult returns "not found".
  • If the parent execution checkpoints and resumes in the same process, the handle still resolves until cleanupTimeout elapses after the child completes.

In short: async children are best-effort, single-process. Workflows that need durable child orchestration across restarts should use ExecuteSync (the parent execution waits, the checkpoint captures the child's outputs in state) or model the child as a separate top-level execution coordinated via signals.

TODO(v1.1): persist async-child handles to the checkpointer so that resumed parents can re-attach.

func (*DefaultChildWorkflowExecutor) ExecuteSync

ExecuteSync runs a child workflow synchronously

func (*DefaultChildWorkflowExecutor) GetResult

GetResult retrieves the result of an asynchronous execution

type Each

type Each struct {
	Items any    `json:"items"`
	As    string `json:"as,omitempty"`
}

Each is used to configure a step to loop over a list of items.

type Edge

type Edge struct {
	Step      string `json:"step"`
	Condition string `json:"condition,omitempty"`
	// BranchName optionally names the branch created when this edge
	// is followed. Empty means "continue on the current branch".
	BranchName string `json:"branch,omitempty"`
}

Edge is used to configure a next step in a workflow.

type EdgeMatchingStrategy

type EdgeMatchingStrategy string

EdgeMatchingStrategy defines how edges should be evaluated

const (
	// EdgeMatchingAll evaluates all edges and follows all matches (default behavior)
	EdgeMatchingAll EdgeMatchingStrategy = "all"

	// EdgeMatchingFirst evaluates edges in order and follows only the first matching one
	EdgeMatchingFirst EdgeMatchingStrategy = "first"
)

type ErrorOutput

type ErrorOutput struct {
	Error   string      `json:"Error"`
	Cause   string      `json:"Cause"`
	Details interface{} `json:"Details,omitempty"`
}

ErrorOutput represents the structured error information passed to catch handlers

type ExecuteActivityFunc

type ExecuteActivityFunc func(ctx Context, parameters map[string]any) (any, error)

ExecuteActivityFunc is the signature for an Activity execution function.

type ExecuteOption added in v0.0.2

type ExecuteOption func(*executeConfig)

ExecuteOption configures a single call to Execution.Execute.

func ResumeFrom added in v0.0.2

func ResumeFrom(priorID string) ExecuteOption

ResumeFrom tells Execute to load the checkpoint for priorID and resume. If no checkpoint is found, Execute proceeds with a fresh run — the semantics of the deleted RunOrResume.

type Execution

type Execution struct {
	// contains filtered or unexported fields
}

Execution represents a simplified workflow execution with checkpointing

func NewExecution

func NewExecution(wf *Workflow, reg *ActivityRegistry, opts ...ExecutionOption) (*Execution, error)

NewExecution creates a new execution for the given workflow and activity registry. Every configurable knob is a functional option.

func (*Execution) Execute added in v0.0.2

func (e *Execution) Execute(ctx context.Context, opts ...ExecuteOption) (*ExecutionResult, error)

Execute runs the workflow and returns a structured ExecutionResult.

By default, Execute starts a fresh run. Pass ResumeFrom(priorID) to resume from a previous execution's checkpoint. If ResumeFrom is set and no checkpoint exists for priorID, Execute proceeds with a fresh run.

An error return means the execution could not be attempted (infrastructure failure). When error is nil, result is non-nil and contains the execution outcome — including workflow-level failures, which are represented in result.Error rather than the error return.

func (*Execution) GetOutputs

func (e *Execution) GetOutputs() map[string]any

GetOutputs returns the current execution outputs

func (*Execution) ID

func (e *Execution) ID() string

ID returns the execution ID

func (*Execution) PauseBranch added in v0.0.2

func (e *Execution) PauseBranch(branchID, reason string) error

PauseBranch requests that the named branch pause at its next step boundary. If the branch is currently running it observes the request at the top of its next loop iteration and exits cleanly; if the branch has already exited (e.g., it's suspended on a wait), the persistent PauseRequested flag ensures the pause takes effect the next time the branch is reconstructed from checkpoint.

PauseBranch is idempotent: pausing an already-paused branch is a no-op. The reason, if provided, overwrites any previously-recorded reason.

PauseBranch returns ErrBranchNotFound if no branch with the given ID exists in this execution's state.

func (*Execution) Status

func (e *Execution) Status() ExecutionStatus

Status returns the current execution status

func (*Execution) UnpauseBranch added in v0.0.2

func (e *Execution) UnpauseBranch(branchID string) error

UnpauseBranch clears the pause request on the named branch. If the execution loop is still running and the branch is still in activeBranches (i.e., PauseBranch was called but the branch hasn't yet hit a step boundary), the branch will continue normally. If the branch has already parked on Paused status, clearing the flag is not enough to restart it — the caller must invoke Run/Resume/ExecuteOrResume to spawn a fresh goroutine for the branch.

UnpauseBranch is idempotent: unpausing a branch that is not paused is a no-op. Returns ErrBranchNotFound if no such branch exists.

type ExecutionCallbacks

type ExecutionCallbacks interface {
	// Workflow-level callbacks
	BeforeWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)
	AfterWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)

	// Path-level callbacks
	BeforeBranchExecution(ctx context.Context, event *BranchExecutionEvent)
	AfterBranchExecution(ctx context.Context, event *BranchExecutionEvent)

	// Activity-level callbacks
	BeforeActivityExecution(ctx context.Context, event *ActivityExecutionEvent)
	AfterActivityExecution(ctx context.Context, event *ActivityExecutionEvent)
}

ExecutionCallbacks defines the callback interface for workflow execution events

func NewBaseExecutionCallbacks

func NewBaseExecutionCallbacks() ExecutionCallbacks

NewBaseExecutionCallbacks creates a new no-op callbacks implementation. Embed this in your own callbacks to get a default implementation that does nothing.

type ExecutionContextOptions

type ExecutionContextOptions struct {
	BranchLocalState *BranchLocalState
	Logger           *slog.Logger
	Compiler         script.Compiler
	BranchID         string
	StepName         string
	ExecutionID      string
	SignalStore      SignalStore
	// PendingWait is the wait state the branch was parked on before the
	// current activity invocation, if any. Set by the engine when a
	// checkpoint is being replayed so workflow.Wait can reuse the
	// original deadline.
	PendingWait *WaitState
	// ActivityHistory is the per-activity-invocation persisted cache
	// for this step. Non-nil only when the engine is running an
	// activity; nil for handler contexts that don't execute activity
	// code.
	ActivityHistory *History
}

type ExecutionOption added in v0.0.2

type ExecutionOption func(*executionConfig)

ExecutionOption is a functional option for NewExecution.

func WithActivityLogger added in v0.0.2

func WithActivityLogger(al ActivityLogger) ExecutionOption

WithActivityLogger configures where per-activity invocation logs are written. Defaults to a null logger.

func WithCheckpointer added in v0.0.2

func WithCheckpointer(cp Checkpointer) ExecutionOption

WithCheckpointer configures where checkpoint snapshots are saved. Defaults to a null checkpointer that discards everything.

func WithExecutionCallbacks added in v0.0.2

func WithExecutionCallbacks(cb ExecutionCallbacks) ExecutionOption

WithExecutionCallbacks installs lifecycle callbacks. Defaults to no-op.

func WithExecutionID added in v0.0.2

func WithExecutionID(id string) ExecutionOption

WithExecutionID sets a fixed execution ID. When omitted, a new ID is generated via NewExecutionID. Use this when your orchestration layer (queue, DB) needs to know the ID before NewExecution is called.

func WithInputs added in v0.0.2

func WithInputs(m map[string]any) ExecutionOption

WithInputs sets the workflow input values for this execution. Values not present here fall back to the Input.Default declared on the workflow. Extra keys not declared on the workflow are rejected.

func WithLogger added in v0.0.2

func WithLogger(l *slog.Logger) ExecutionOption

WithLogger sets the structured logger. Defaults to a discard logger.

func WithScriptCompiler added in v0.0.2

func WithScriptCompiler(sc script.Compiler) ExecutionOption

WithScriptCompiler overrides the default script compiler used to evaluate parameter templates and edge conditions. Defaults to the built-in expr compiler.

func WithSignalStore added in v0.0.2

func WithSignalStore(ss SignalStore) ExecutionOption

WithSignalStore configures the signal delivery rendezvous used by workflow.Wait and declarative WaitSignal steps. Required for any workflow that uses signals.

func WithStepProgressStore added in v0.0.2

func WithStepProgressStore(s StepProgressStore) ExecutionOption

WithStepProgressStore configures a store that receives progress updates as steps transition between states. Calls are async and store latency does not affect execution speed.

type ExecutionResult added in v0.0.2

type ExecutionResult struct {
	// WorkflowName identifies which workflow was executed.
	WorkflowName string

	// Status is the final execution status.
	Status ExecutionStatus

	// Outputs contains the workflow's output values, keyed by output name.
	// Empty if the workflow failed before producing outputs.
	Outputs map[string]any

	// Error is the classified workflow error, if the execution failed.
	// nil when Status is ExecutionStatusCompleted.
	Error *WorkflowError

	// Timing contains execution duration measurements.
	Timing ExecutionTiming

	// FollowUps contains follow-up workflow specs produced by completion hooks.
	// Empty when no hooks are configured or the execution did not complete
	// successfully.
	FollowUps []FollowUpSpec

	// Suspension describes the durable wait(s) that caused the execution
	// to end without completing. Populated when Status is
	// ExecutionStatusSuspended (and in future phases, Paused). nil
	// otherwise.
	Suspension *SuspensionInfo
}

ExecutionResult contains the outcome of a workflow execution. When returned from Execute/ExecuteOrResume, it is always non-nil if error is nil.

func (*ExecutionResult) Completed added in v0.0.2

func (r *ExecutionResult) Completed() bool

Completed returns true if the execution finished successfully.

func (*ExecutionResult) Failed added in v0.0.2

func (r *ExecutionResult) Failed() bool

Failed returns true if the execution finished with an error.

func (*ExecutionResult) NeedsResume added in v0.0.2

func (r *ExecutionResult) NeedsResume() bool

NeedsResume returns true if the execution ended in a dormant state that requires an external trigger (signal delivery, wall-clock wake, or operator unpause) before it can continue. Equivalent to r.Suspended() || r.Paused().

func (*ExecutionResult) NextWakeAt added in v0.0.2

func (r *ExecutionResult) NextWakeAt() (time.Time, bool)

NextWakeAt returns the earliest wall-clock deadline across all suspended branches and whether one is set. Returns (zero, false) if the execution is not suspended or no branch has a deadline. Consumers use this to schedule a wall-clock resume — typical use is `time.AfterFunc(time.Until(t), resumeFn)`.

func (*ExecutionResult) Output added in v0.0.2

func (r *ExecutionResult) Output(key string) (any, bool)

Output returns the raw output value at key and whether it was present. Returns (nil, false) when the result has no outputs map or the key is missing.

func (*ExecutionResult) OutputBool added in v0.0.2

func (r *ExecutionResult) OutputBool(key string) (bool, bool)

OutputBool returns the output at key as a bool and whether the type assertion succeeded. Returns (false, false) if the key is missing or the value is not a bool.

func (*ExecutionResult) OutputInt added in v0.0.2

func (r *ExecutionResult) OutputInt(key string) (int, bool)

OutputInt returns the output at key as an int and whether the conversion succeeded. Recognises Go's numeric types (int, int32, int64, float32, float64) so values that round-tripped through JSON (where numbers come back as float64) work as expected. Returns (0, false) if the key is missing or the value is not numeric.

Float values are truncated to int; precision loss is the caller's responsibility — use OutputAs[float64] when fractional precision matters.

func (*ExecutionResult) OutputString added in v0.0.2

func (r *ExecutionResult) OutputString(key string) (string, bool)

OutputString returns the output at key as a string and whether the type assertion succeeded. Returns ("", false) if the key is missing or the value is not a string.

func (*ExecutionResult) Paused added in v0.0.2

func (r *ExecutionResult) Paused() bool

Paused returns true if the execution ended dormant on an explicit pause trigger (PauseBranch call or declarative Pause step). The caller must clear the pause via UnpauseBranch / UnpauseBranchInCheckpoint before calling Resume.

func (*ExecutionResult) Suspended added in v0.0.2

func (r *ExecutionResult) Suspended() bool

Suspended returns true if the execution ended hard-suspended on a durable wait (signal-wait or durable sleep). The caller is responsible for scheduling resume when the external trigger arrives (use Suspension.Topics to subscribe to signals, Suspension.WakeAt to schedule a wall-clock resume).

func (*ExecutionResult) Topics added in v0.0.2

func (r *ExecutionResult) Topics() []string

Topics returns the union of signal topics that suspended branches are waiting on, or nil if the execution is not suspended on a signal-wait. Consumers use this to register signal listeners before scheduling a resume.

func (*ExecutionResult) WaitReason added in v0.0.2

func (r *ExecutionResult) WaitReason() SuspensionReason

WaitReason returns the dominant suspension reason if the execution is suspended, or empty string otherwise. Convenience for the common "what kind of resume do I need to schedule?" question.

type ExecutionStatus

type ExecutionStatus string

ExecutionStatus represents the execution status

const (
	ExecutionStatusPending ExecutionStatus = "pending"
	ExecutionStatusRunning ExecutionStatus = "running"
	// ExecutionStatusWaiting is for branches that are blocked mid-run on a
	// join — their goroutine is parked on an in-process channel and the
	// execution is still live. Waiting is not a terminal state.
	ExecutionStatusWaiting ExecutionStatus = "waiting"
	// ExecutionStatusSuspended is for branches hard-suspended on a durable
	// wait (signal-wait, durable sleep). Their goroutine has exited and
	// they only live in the checkpoint. The execution cannot make
	// progress without external input (signal, wall-clock). When all
	// active branches are suspended, the execution loop exits and the
	// execution's final status is Suspended.
	ExecutionStatusSuspended ExecutionStatus = "suspended"
	// ExecutionStatusPaused is for branches parked by an explicit pause —
	// either an external PauseBranch call or a declarative Pause step.
	// Unlike Suspended, a paused branch has no declared resumption
	// condition; an external actor must clear the flag via UnpauseBranch
	// before the branch can continue. Paused is reported independently
	// from Suspended in SuspensionInfo so operators can distinguish the
	// two when deciding what action to take.
	ExecutionStatusPaused    ExecutionStatus = "paused"
	ExecutionStatusCompleted ExecutionStatus = "completed"
	ExecutionStatusFailed    ExecutionStatus = "failed"
)

type ExecutionSummary

type ExecutionSummary struct {
	ExecutionID  string        `json:"execution_id"`
	WorkflowName string        `json:"workflow_name"`
	Status       string        `json:"status"`
	StartTime    time.Time     `json:"start_time"`
	EndTime      time.Time     `json:"end_time,omitempty"`
	Duration     time.Duration `json:"duration"`
	Error        string        `json:"error,omitempty"`
}

ExecutionSummary provides a summary view of an execution

type ExecutionTiming added in v0.0.2

type ExecutionTiming struct {
	StartedAt  time.Time
	FinishedAt time.Time
	Duration   time.Duration
}

ExecutionTiming captures wall-clock timing for the execution.

type FenceFunc added in v0.0.2

type FenceFunc func(ctx context.Context) error

FenceFunc validates that the current worker still holds its lease or lock. Return nil if the fence is still valid. Return an error to abort the checkpoint save — the execution will receive the error and should terminate.

type FileActivityLogger

type FileActivityLogger struct {
	// contains filtered or unexported fields
}

FileActivityLogger is an implementation of ActivityLogger that logs to a file. A file is created per execution. The file is formatted as newline-delimited JSON.

func NewFileActivityLogger

func NewFileActivityLogger(directory string) *FileActivityLogger

func (*FileActivityLogger) GetActivityHistory

func (l *FileActivityLogger) GetActivityHistory(ctx context.Context, executionID string) ([]*ActivityLogEntry, error)

func (*FileActivityLogger) LogActivity

func (l *FileActivityLogger) LogActivity(ctx context.Context, entry *ActivityLogEntry) error

type FileCheckpointer

type FileCheckpointer struct {
	// contains filtered or unexported fields
}

FileCheckpointer is a file-based implementation that persists checkpoints to disk

func NewFileCheckpointer

func NewFileCheckpointer(dataDir string) (*FileCheckpointer, error)

NewFileCheckpointer creates a new file-based checkpointer

func (*FileCheckpointer) DeleteCheckpoint

func (c *FileCheckpointer) DeleteCheckpoint(ctx context.Context, executionID string) error

DeleteCheckpoint removes all checkpoint data for an execution

func (*FileCheckpointer) ListExecutions

func (c *FileCheckpointer) ListExecutions(ctx context.Context) ([]*ExecutionSummary, error)

ListExecutions returns a list of all executions with their latest checkpoint info

func (*FileCheckpointer) LoadCheckpoint

func (c *FileCheckpointer) LoadCheckpoint(ctx context.Context, executionID string) (*Checkpoint, error)

LoadCheckpoint loads the latest checkpoint for an execution

func (*FileCheckpointer) SaveCheckpoint

func (c *FileCheckpointer) SaveCheckpoint(ctx context.Context, checkpoint *Checkpoint) error

SaveCheckpoint saves the execution checkpoint to disk

type FollowUpSpec added in v0.0.2

type FollowUpSpec struct {
	// WorkflowName identifies which workflow to trigger.
	WorkflowName string

	// Inputs are the input values for the follow-up workflow.
	Inputs map[string]any

	// Metadata is arbitrary data the consumer can use for routing,
	// deduplication, or prioritization. The library does not inspect it.
	Metadata map[string]any
}

FollowUpSpec describes a workflow that should be triggered after a successful execution. It is a descriptor, not an execution request — the consumer is responsible for persisting and processing these.

type HeartbeatConfig added in v0.0.2

type HeartbeatConfig struct {
	// Interval is how often the heartbeat function is called.
	Interval time.Duration

	// Func is called on each heartbeat tick. Return nil to indicate liveness.
	// Return an error to signal lease loss — the Runner cancels the execution
	// context, causing the workflow to abort.
	Func HeartbeatFunc
}

HeartbeatConfig configures periodic liveness reporting.

type HeartbeatFunc added in v0.0.2

type HeartbeatFunc func(ctx context.Context) error

HeartbeatFunc is called periodically to prove worker liveness. Return nil to continue. Return an error to abort the execution.

type History added in v0.0.2

type History struct {
	// contains filtered or unexported fields
}

History is the per-activity-invocation persisted cache returned by Context.History. Activities use it to cache the result of expensive or non-idempotent operations across wait-unwind replays: any value recorded once survives the suspension + resume cycle, so a replayed activity can pick up cached work without re-executing it.

Entries are scoped to a single activity invocation. Once the step completes and the branch advances to its successor, the history is cleared — no cross-step leakage.

Use cases:

  • Caching LLM calls before a Context.Wait so replays don't re-bill.
  • Caching non-idempotent HTTP writes that can't be keyed off natural idempotency tokens.
  • Memoizing expensive computation inside an agent loop.

Thread-safety: safe for concurrent use within a single activity invocation. Different activity invocations own different History instances.

func NewHistoryForTest added in v0.0.2

func NewHistoryForTest() *History

NewHistoryForTest returns a fresh, non-persistent History for use by test helpers (e.g. workflowtest.FakeContext).

func (*History) Get added in v0.0.2

func (h *History) Get(key string) (any, bool)

Get returns the cached value for key, if any, and whether it was present. Exposed for testing and introspection; activities should prefer History.RecordOrReplay for the normal cache-or-compute flow.

func (*History) Len added in v0.0.2

func (h *History) Len() int

Len returns the number of entries currently cached.

func (*History) RecordOrReplay added in v0.0.2

func (h *History) RecordOrReplay(key string, fn func() (any, error)) (any, error)

RecordOrReplay runs fn on the first call for the given key and caches its result. Subsequent calls for the same key — including calls from activity replays after a wait-unwind — return the cached value without invoking fn. If fn returns an error, no cache entry is written and the error is returned unchanged.

Concurrency: fn runs outside the history's lock so independent RecordOrReplay calls can proceed in parallel. If two goroutines race on the same key, both run fn, but only the first result is cached; the second caller sees the cached first result.

type Input

type Input struct {
	Name        string      `json:"name" yaml:"name"`
	Type        string      `json:"type" yaml:"type"`
	Description string      `json:"description,omitempty" yaml:"description,omitempty"`
	Default     interface{} `json:"default,omitempty" yaml:"default,omitempty"`
}

Input defines a workflow input parameter

func (*Input) IsRequired

func (i *Input) IsRequired() bool

type Inputs added in v0.0.2

type Inputs struct {
	// contains filtered or unexported fields
}

Inputs is a read-only view over workflow input values. It exists as a named type so we can grow it with typed accessors (GetString, GetInt) in the future without breaking the Context interface.

func NewInputsForTest added in v0.0.2

func NewInputsForTest(m map[string]any) Inputs

NewInputsForTest builds an Inputs from a snapshot map for use by test helpers (e.g. workflowtest.FakeContext). The map is taken by reference; callers that need mutation safety must pass a copy.

func (Inputs) Get added in v0.0.2

func (i Inputs) Get(key string) (any, bool)

Get returns the value of an input and whether it was present.

func (Inputs) Keys added in v0.0.2

func (i Inputs) Keys() []string

Keys returns the input names in sorted order.

func (Inputs) Len added in v0.0.2

func (i Inputs) Len() int

Len returns the number of inputs.

func (Inputs) ToMap added in v0.0.2

func (i Inputs) ToMap() map[string]any

ToMap returns a copy of the inputs as a plain map. Mutating the returned map does not affect the underlying execution state.

type JitterStrategy

type JitterStrategy string

JitterStrategy defines the jitter strategy for retry delays

const (
	JitterNone JitterStrategy = "NONE"
	JitterFull JitterStrategy = "FULL"
)

type JoinConfig

type JoinConfig struct {
	// Branches specifies which named branches to wait for. If empty,
	// waits for all active branches.
	Branches []string `json:"branches,omitempty"`

	// Count specifies the number of branches to wait for. If 0, waits
	// for all specified branches.
	Count int `json:"count,omitempty"`

	// BranchMappings specifies where to store branch data. Supports two
	// syntaxes:
	//  1. Store entire branch state: "branchID": "destination"
	//     Example: "branchA": "results.branchA" stores all branchA
	//     variables under results.branchA.
	//  2. Extract specific variables: "branchID.variable": "destination"
	//     Example: "branchA.result": "extracted.value" stores only
	//     branchA.result under extracted.value.
	// Supports nested field extraction using dot notation for both
	// variable names and destinations.
	BranchMappings map[string]string `json:"branch_mappings,omitempty"`
}

JoinConfig configures a step to wait for multiple branches to converge.

type JoinState

type JoinState struct {
	StepName        string      `json:"step_name"`
	WaitingBranchID string      `json:"waiting_branch_id"` // The single branch that's waiting
	Config          *JoinConfig `json:"config"`
	CreatedAt       time.Time   `json:"created_at"`
}

JoinState tracks a branch waiting at a join step

type MemorySignalStore added in v0.0.2

type MemorySignalStore struct {
	// contains filtered or unexported fields
}

MemorySignalStore is an in-memory implementation of SignalStore suitable for tests and development.

func NewMemorySignalStore added in v0.0.2

func NewMemorySignalStore() *MemorySignalStore

NewMemorySignalStore returns a new empty in-memory store.

func (*MemorySignalStore) Receive added in v0.0.2

func (m *MemorySignalStore) Receive(ctx context.Context, executionID, topic string) (*Signal, error)

Receive implements SignalStore. Returns (nil, nil) when no signal is pending.

func (*MemorySignalStore) Send added in v0.0.2

func (m *MemorySignalStore) Send(ctx context.Context, executionID, topic string, payload any) error

Send implements SignalStore.

type MemoryWorkflowRegistry

type MemoryWorkflowRegistry struct {
	// contains filtered or unexported fields
}

MemoryWorkflowRegistry implements WorkflowRegistry using in-memory storage

func NewMemoryWorkflowRegistry

func NewMemoryWorkflowRegistry() *MemoryWorkflowRegistry

NewMemoryWorkflowRegistry creates a new in-memory workflow registry

func (*MemoryWorkflowRegistry) Get

func (r *MemoryWorkflowRegistry) Get(name string) (*Workflow, bool)

Get retrieves a workflow by name

func (*MemoryWorkflowRegistry) List

func (r *MemoryWorkflowRegistry) List() []string

List returns all registered workflow names

func (*MemoryWorkflowRegistry) Register

func (r *MemoryWorkflowRegistry) Register(workflow *Workflow) error

Register adds a workflow to the registry

type NullActivityLogger

type NullActivityLogger struct{}

NullActivityLogger is a no-op implementation of ActivityLogger.

func NewNullActivityLogger

func NewNullActivityLogger() *NullActivityLogger

func (*NullActivityLogger) GetActivityHistory

func (l *NullActivityLogger) GetActivityHistory(ctx context.Context, executionID string) ([]*ActivityLogEntry, error)

func (*NullActivityLogger) LogActivity

func (l *NullActivityLogger) LogActivity(ctx context.Context, entry *ActivityLogEntry) error

type NullCheckpointer

type NullCheckpointer struct{}

NullCheckpointer is a no-op implementation

func NewNullCheckpointer

func NewNullCheckpointer() *NullCheckpointer

func (*NullCheckpointer) DeleteCheckpoint

func (c *NullCheckpointer) DeleteCheckpoint(ctx context.Context, executionID string) error

func (*NullCheckpointer) LoadCheckpoint

func (c *NullCheckpointer) LoadCheckpoint(ctx context.Context, executionID string) (*Checkpoint, error)

func (*NullCheckpointer) SaveCheckpoint

func (c *NullCheckpointer) SaveCheckpoint(ctx context.Context, checkpoint *Checkpoint) error

type Options

type Options struct {
	Name        string         `json:"name" yaml:"name"`
	Steps       []*Step        `json:"steps" yaml:"steps"`
	Description string         `json:"description,omitempty" yaml:"description,omitempty"`
	Inputs      []*Input       `json:"inputs,omitempty" yaml:"inputs,omitempty"`
	Outputs     []*Output      `json:"outputs,omitempty" yaml:"outputs,omitempty"`
	State       map[string]any `json:"state,omitempty" yaml:"state,omitempty"`
	// StartAt names the step that the first execution branch begins on.
	// When empty, the first step in Steps is the start step. Validated
	// at New() time to reference an existing step.
	StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
}

Options are used to configure a workflow.

type Output

type Output struct {
	Name     string `json:"name" yaml:"name"`
	Variable string `json:"variable" yaml:"variable"`
	// Branch names the execution branch to extract the output value from.
	// Defaults to "main" when empty.
	Branch      string `json:"branch,omitempty" yaml:"branch,omitempty"`
	Description string `json:"description,omitempty" yaml:"description,omitempty"`
}

Output defines a workflow output parameter

type PauseConfig added in v0.0.2

type PauseConfig struct {
	// Reason is an optional human-readable note describing why the
	// step pauses. Stored on BranchState.PauseReason when the pause
	// triggers. The engine does not interpret it.
	Reason string `json:"reason,omitempty"`
}

PauseConfig configures a declarative pause step. When a branch reaches a step with a non-nil Pause config the branch parks with ExecutionStatusPaused; the step graph advances past the pause step (using its Next edges), so on unpause + resume the branch continues at the successor step without re-triggering the pause.

Pause is a manual hold-point: unlike a WaitSignal or a durable Sleep there is no declared resumption condition — an external caller (operator, parent workflow, automated check) must call UnpauseBranch to release the branch. Use it for approval gates, production-deploy holds, or any point where human judgment is required before continuing.

type ProgressDetail added in v0.0.2

type ProgressDetail struct {
	// Message is a human-readable description of the current progress.
	Message string

	// Data is arbitrary structured data that consumers can use to
	// drive UIs, metrics, or logging. The library does not inspect
	// it.
	Data map[string]any
}

ProgressDetail carries intra-activity progress information. Message is human-readable; Data is machine-readable.

type RetryConfig

type RetryConfig struct {
	ErrorEquals    []string       `json:"error_equals,omitempty"`
	MaxRetries     int            `json:"max_retries,omitempty"`
	BaseDelay      time.Duration  `json:"base_delay,omitempty"`
	MaxDelay       time.Duration  `json:"max_delay,omitempty"`
	BackoffRate    float64        `json:"backoff_rate,omitempty"`
	JitterStrategy JitterStrategy `json:"jitter_strategy,omitempty"`
	Timeout        time.Duration  `json:"timeout,omitempty"`
}

RetryConfig configures retry behavior for a step.

type RunOption added in v0.0.2

type RunOption func(*runConfig)

RunOption is a functional option for a single Runner.Run call.

func WithCompletionHook added in v0.0.2

func WithCompletionHook(h CompletionHook) RunOption

WithCompletionHook installs a hook called after successful execution to produce follow-up workflow specs. Optional.

func WithHeartbeat added in v0.0.2

func WithHeartbeat(h *HeartbeatConfig) RunOption

WithHeartbeat installs a heartbeat config for this run. Optional.

func WithResumeFrom added in v0.0.2

func WithResumeFrom(priorExecutionID string) RunOption

WithResumeFrom triggers resume-or-run behavior. When set, the Runner attempts to resume from this execution's checkpoint before falling back to a fresh run. When empty, always starts fresh.

func WithRunTimeout added in v0.0.2

func WithRunTimeout(d time.Duration) RunOption

WithRunTimeout overrides the Runner's default timeout for this run. A negative duration means no timeout.

type Runner added in v0.0.2

type Runner struct {
	// contains filtered or unexported fields
}

Runner manages the full lifecycle of workflow executions. It composes heartbeating, crash recovery, structured results, and completion hooks.

Create a Runner once at startup and call Run for each workflow execution.

func NewRunner added in v0.0.2

func NewRunner(opts ...RunnerOption) *Runner

NewRunner creates a Runner with the given options.

func (*Runner) Run added in v0.0.2

func (r *Runner) Run(ctx context.Context, exec *Execution, opts ...RunOption) (*ExecutionResult, error)

Run executes a workflow with full lifecycle management. It:

  1. Starts a heartbeat goroutine (if configured)
  2. Applies a timeout (if configured)
  3. Calls exec.Execute (with ResumeFrom if WithResumeFrom is set)
  4. Stops the heartbeat and collects the result
  5. Calls the completion hook (if configured and execution succeeded)
  6. Returns the structured ExecutionResult

The caller creates the Execution with its own options. The Runner manages lifecycle concerns on top of that.

The error return indicates infrastructure failures (execution couldn't run). Workflow-level failures are in result.Error.

type RunnerOption added in v0.0.2

type RunnerOption func(*runnerConfig)

RunnerOption is a functional option for NewRunner.

func WithDefaultTimeout added in v0.0.2

func WithDefaultTimeout(d time.Duration) RunnerOption

WithDefaultTimeout sets the default per-execution timeout applied unless Run overrides it via WithRunTimeout. Zero means no timeout.

func WithRunnerLogger added in v0.0.2

func WithRunnerLogger(l *slog.Logger) RunnerOption

WithRunnerLogger sets the Runner's structured logger. Defaults to a discard logger.

type Signal added in v0.0.2

type Signal struct {
	ExecutionID string
	Topic       string
	Payload     any
}

Signal is a single delivered event for a (executionID, topic) pair.

type SignalStore added in v0.0.2

type SignalStore interface {
	// Send delivers a signal for the given execution + topic. Signals queue
	// in the store even if no branch is currently waiting.
	Send(ctx context.Context, executionID, topic string, payload any) error

	// Receive pops the oldest pending signal for the given execution + topic.
	// Returns (nil, nil) if no signal is pending — callers treat that as
	// "not yet, unwind and wait".
	Receive(ctx context.Context, executionID, topic string) (*Signal, error)
}

SignalStore is the rendezvous point for delivering external events to paused workflow executions. Implementations must provide FIFO ordering per (executionID, topic) and exactly-once consumption on Receive.

Spike scope: Subscribe is intentionally omitted.

type SleepConfig added in v0.0.2

type SleepConfig struct {
	// Duration is the wall-clock duration the path should sleep.
	// Must be positive.
	Duration time.Duration `json:"duration"`
}

SleepConfig configures a step that durably sleeps for a fixed wall-clock duration. The path hard-suspends — its goroutine exits, the checkpoint records an absolute WakeAt, and the execution ends dormant until a consumer resumes it at or after WakeAt.

Sleep survives process restarts: on resume before WakeAt the path re-suspends; on resume at or after WakeAt the path wakes and advances to the successor step.

When a sleeping path is paused via PauseBranch, the sleep clock freezes: the remaining duration is recorded on WaitState and the absolute WakeAt is cleared. On unpause, WakeAt is recomputed as now + remaining, so the pause period does not consume sleep time.

type Step

type Step struct {
	Name                 string               `json:"name"`
	Description          string               `json:"description,omitempty"`
	Store                string               `json:"store,omitempty"`
	Activity             string               `json:"activity,omitempty"`
	Parameters           map[string]any       `json:"parameters,omitempty"`
	Each                 *Each                `json:"each,omitempty"`
	Join                 *JoinConfig          `json:"join,omitempty"`
	WaitSignal           *WaitSignalConfig    `json:"wait_signal,omitempty"`
	Sleep                *SleepConfig         `json:"sleep,omitempty"`
	Pause                *PauseConfig         `json:"pause,omitempty"`
	Next                 []*Edge              `json:"next,omitempty"`
	EdgeMatchingStrategy EdgeMatchingStrategy `json:"edge_matching_strategy,omitempty"`
	Retry                []*RetryConfig       `json:"retry,omitempty"`
	Catch                []*CatchConfig       `json:"catch,omitempty"`
}

Step represents a single node in a workflow's step graph.

Step kinds

A step has exactly one kind, selected by which of these mutually exclusive fields is set:

  • Activity — invokes a registered activity by name. The default kind; the only kind that produces a value via Store.
  • Join — waits for one or more named branches to converge, then merges their state per JoinConfig.BranchMappings.
  • WaitSignal — parks the branch until an external signal is delivered to a topic.
  • Sleep — durably suspends the branch for a wall-clock duration. Survives process restarts.
  • Pause — declarative counterpart to PauseBranch; parks the branch until an operator unpauses it.

workflow.New rejects any step that sets more than one kind field with ErrInvalidStepKind, and any step that sets none with the implicit "activity" default — Activity may be empty only if a Sleep, Pause, Join, or WaitSignal is set.

Modifier fields

  • Store — name of the variable to write the step result into. Activity-kind only.
  • Parameters — typed input passed to the activity (Activity-kind only). Values may use ${...} templates.
  • Each — fan-out loop over a list. The step is executed once per item in a fresh sub-branch.
  • Next — outgoing edges, evaluated against EdgeMatchingStrategy.
  • EdgeMatchingStrategy — "all" (default; follow every matching edge, branching the path) or "first" (follow only the first match, single branch continues).
  • Retry — per-error-class retry policy with backoff. Activity-kind only; rejected on Sleep/Pause/Join/WaitSignal at workflow.New.
  • Catch — per-error-class fallback routing. Activity-kind only; same restriction as Retry.

Mixing a modifier with an incompatible kind is rejected at validation time with ErrInvalidModifier.

func (*Step) GetEdgeMatchingStrategy

func (s *Step) GetEdgeMatchingStrategy() EdgeMatchingStrategy

GetEdgeMatchingStrategy returns the edge matching strategy for this step, defaulting to "all" if not specified

type StepProgress added in v0.0.2

type StepProgress struct {
	// StepName identifies the step.
	StepName string

	// BranchID identifies the execution path this step is running on.
	BranchID string

	// Status is the current step status.
	Status StepStatus

	// ActivityName is the activity bound to this step.
	ActivityName string

	// Attempt is the current attempt number (1-based). Increments on retries.
	Attempt int

	// Detail is optional progress information set by activities via
	// Context.ReportProgress. Nil unless the activity reports intra-step
	// progress.
	Detail *ProgressDetail

	// StartedAt is when the step began executing. Zero for pending steps.
	StartedAt time.Time

	// FinishedAt is when the step completed or failed. Zero for running steps.
	FinishedAt time.Time

	// Error is the error message if the step failed. Empty otherwise.
	Error string
}

StepProgress describes the current state of a step within an execution.

type StepProgressStore added in v0.0.2

type StepProgressStore interface {
	UpdateStepProgress(ctx context.Context, executionID string, progress StepProgress) error
}

StepProgressStore persists step progress updates. Implement this interface to write step progress to your backend (database, cache, API, etc.).

UpdateStepProgress is called asynchronously on every step state transition and on intra-activity progress reports. Errors are logged but do not fail the workflow — step progress is observability, not correctness.

type StepStatus added in v0.0.2

type StepStatus string

StepStatus represents the execution state of a step.

const (
	StepStatusPending   StepStatus = "pending"
	StepStatusRunning   StepStatus = "running"
	StepStatusCompleted StepStatus = "completed"
	StepStatusFailed    StepStatus = "failed"
	StepStatusSkipped   StepStatus = "skipped"
)

type SuspendedBranch added in v0.0.2

type SuspendedBranch struct {
	BranchID    string
	StepName    string
	Reason      SuspensionReason
	Topic       string    // set for waiting_signal
	WakeAt      time.Time // zero if no deadline
	PauseReason string    // set for paused
}

SuspendedBranch describes a single branch's suspension state.

type SuspensionInfo added in v0.0.2

type SuspensionInfo struct {
	// Reason is the dominant reason for the suspension. When multiple
	// branches are suspended for different reasons, the dominant one is
	// reported; SuspendedBranches has the full breakdown.
	Reason SuspensionReason

	// SuspendedBranches is one entry per hard-suspended branch.
	SuspendedBranches []SuspendedBranch

	// Topics is the union of signal topics any suspended branch is
	// waiting on. Convenience for consumers that just want to know
	// "which channels should deliver into me?".
	Topics []string

	// WakeAt is the earliest absolute deadline across all suspended
	// branches (signal timeouts or sleep wake-ups). Zero if no branch has a
	// deadline.
	WakeAt time.Time
}

SuspensionInfo describes why an execution ended dormant and what external input would move it forward. Consumers use this to decide how to schedule a resume — e.g., enqueue a signal listener, schedule a wake-up at WakeAt, or wait for an operator unpause.

type SuspensionReason added in v0.0.2

type SuspensionReason string

SuspensionReason classifies why an execution ended in a dormant state.

const (
	// SuspensionReasonWaitingSignal means one or more branches are parked
	// on a workflow.Wait or a declarative WaitSignal step.
	SuspensionReasonWaitingSignal SuspensionReason = "waiting_signal"
	// SuspensionReasonSleeping means one or more branches are parked on a
	// durable Sleep (Phase 2 — reserved).
	SuspensionReasonSleeping SuspensionReason = "sleeping"
	// SuspensionReasonPaused means one or more branches were paused by an
	// operator or a Pause step (Phase 1 — reserved).
	SuspensionReasonPaused SuspensionReason = "paused"
)

type TypedActivity

type TypedActivity[TParams, TResult any] interface {

	// Name returns the name of the Activity
	Name() string

	// Execute the Activity with the given parameters.
	Execute(ctx Context, parameters TParams) (TResult, error)
}

TypedActivity is a parameterized interface for activities that assists with marshalling parameters and results.

type TypedActivityAdapter

type TypedActivityAdapter[TParams, TResult any] struct {
	// contains filtered or unexported fields
}

TypedActivityAdapter wraps a TypedActivity to implement the Activity interface.

func (*TypedActivityAdapter[TParams, TResult]) Activity

func (a *TypedActivityAdapter[TParams, TResult]) Activity() TypedActivity[TParams, TResult]

Activity returns the underlying TypedActivity

func (*TypedActivityAdapter[TParams, TResult]) Execute

func (a *TypedActivityAdapter[TParams, TResult]) Execute(ctx Context, parameters map[string]any) (any, error)

Execute the Activity.

func (*TypedActivityAdapter[TParams, TResult]) Name

func (a *TypedActivityAdapter[TParams, TResult]) Name() string

Name of the Activity.

type ValidationError added in v0.0.2

type ValidationError struct {
	Problems []ValidationProblem
}

ValidationError contains all problems found during validation.

func (*ValidationError) Error added in v0.0.2

func (e *ValidationError) Error() string

func (*ValidationError) Is added in v0.0.2

func (e *ValidationError) Is(target error) bool

Is reports whether err matches any sentinel attached to one of the contained problems. This makes errors.Is(err, ErrDuplicateStepName) work against a ValidationError containing a duplicate-name problem.

type ValidationProblem added in v0.0.2

type ValidationProblem struct {
	// Step is the name of the step where the problem was found.
	// Empty for workflow-level problems.
	Step string

	// Message describes the problem.
	Message string

	// Err is the sentinel error associated with this problem, if any.
	// Callers can use errors.Is against the enclosing *ValidationError
	// to test for specific problem classes (ErrDuplicateStepName, etc.).
	Err error
}

ValidationProblem describes a single structural issue in a workflow.

func (ValidationProblem) String added in v0.0.2

func (p ValidationProblem) String() string

type WaitKind added in v0.0.2

type WaitKind string

WaitKind identifies the kind of durable wait a branch is parked on.

Only the constants defined in this file are valid. JSON unmarshaling rejects any other value. A new kind may not be added without bumping checkpoint compatibility and teaching every resume/replay site how to handle it.

const (
	// WaitKindSignal is a wait for an external signal delivered via a
	// SignalStore — the Phase 3 primitive.
	WaitKindSignal WaitKind = "signal"
	// WaitKindSleep is a wait for a wall-clock deadline. Phase 2 will
	// implement the step handler; the kind is defined now so checkpoints
	// written in Phase 3 can round-trip Phase 2 state without a schema
	// migration.
	WaitKindSleep WaitKind = "sleep"
)

func (*WaitKind) UnmarshalJSON added in v0.0.2

func (k *WaitKind) UnmarshalJSON(data []byte) error

UnmarshalJSON enforces that only defined WaitKind values round-trip.

type WaitSignalConfig added in v0.0.2

type WaitSignalConfig struct {
	// Topic is a Risor-templated rendezvous key. Required.
	Topic string `json:"topic"`
	// Timeout is the maximum time to wait for the signal. Required.
	Timeout time.Duration `json:"timeout"`
	// Store is the path variable that receives the signal payload when
	// the signal is delivered. Optional.
	Store string `json:"store,omitempty"`
	// OnTimeout is the name of the step to route to when the wait
	// times out. When empty, a timeout fails the step.
	OnTimeout string `json:"on_timeout,omitempty"`
}

WaitSignalConfig configures a step to park a path until an external signal is delivered via the execution's SignalStore.

The declarative counterpart of workflow.Wait. Use it when the step graph, not imperative activity code, is the right place to express "stop here until X arrives" — e.g., a gate before a production deploy, a human-in-the-loop approval, a callback from an async external system.

Topic is a template evaluated at step-entry time against the current branch state; the resolved value is what the engine registers as the rendezvous key. Typical patterns:

  • Static: "approval-requested"
  • Dynamic: "callback-${state.request_id}"
  • Expression: "${state.meta.correlation_id}"

Store is the variable name that receives the signal payload when it arrives. Like Step.Store, a "state." prefix is stripped.

Timeout is required and must be positive. A timeout with no OnTimeout routing fails the step with a WorkflowError of type ErrorTypeTimeout. A timeout with OnTimeout set routes the path to the named next step without failing.

type WaitState added in v0.0.2

type WaitState struct {
	// Kind identifies the wait variant. Required; must be one of the
	// defined WaitKind constants.
	Kind WaitKind `json:"kind"`
	// Topic is the resolved rendezvous topic. Set when Kind == Signal.
	Topic string `json:"topic,omitempty"`
	// WakeAt is the absolute wall-clock deadline at which the wait times
	// out (for signal waits) or wakes (for sleeps). Zero means no
	// deadline; a zero WakeAt with a positive (or explicitly zero)
	// Remaining means the wait clock is frozen by a pause.
	WakeAt time.Time `json:"wake_at,omitzero"`
	// Timeout is the original timeout duration as specified by the caller.
	// Recorded for observability; WakeAt is the authoritative deadline.
	Timeout time.Duration `json:"timeout,omitzero"`
	// Remaining is the amount of time left on the wait when the owning
	// branch was paused mid-wait. Populated for both signal waits and
	// sleeps while the branch is paused; cleared on unpause, at which
	// point WakeAt is recomputed as now + Remaining. The pause clock
	// must not consume the wait's timeout budget (see FR-19 and the
	// freezeWaitOnPause / thawWaitOnUnpause helpers).
	Remaining time.Duration `json:"remaining,omitzero"`
}

WaitState carries the information needed to resume a hard-suspended branch without re-running templates: the resolved topic (for signal waits), the absolute deadline, the original timeout, and the kind.

WaitState is inlined on BranchState (nullable). Consumers should treat an absent or nil Wait as "no pending wait".

type Workflow

type Workflow struct {
	// contains filtered or unexported fields
}

Workflow defines a repeatable process as a graph of steps to be executed.

func New

func New(opts Options) (*Workflow, error)

New returns a new Workflow configured with the given options.

New runs structural validation and fails fast on any problem it finds, returning a *ValidationError with the full list of problems. Structural validation does not consult the activity registry or the script compiler — that binding-layer validation runs during NewExecution.

func (*Workflow) Description

func (w *Workflow) Description() string

Description returns the workflow description

func (*Workflow) GetStep

func (w *Workflow) GetStep(name string) (*Step, bool)

GetStep returns a step by name

func (*Workflow) InitialState

func (w *Workflow) InitialState() map[string]any

InitialState returns the workflow initial state

func (*Workflow) Inputs

func (w *Workflow) Inputs() []*Input

Inputs returns the workflow inputs

func (*Workflow) Name

func (w *Workflow) Name() string

Name returns the workflow name

func (*Workflow) Outputs

func (w *Workflow) Outputs() []*Output

Outputs returns the workflow outputs

func (*Workflow) Start

func (w *Workflow) Start() *Step

Start returns the workflow start step

func (*Workflow) StepNames

func (w *Workflow) StepNames() []string

StepNames returns the names of all steps in the workflow

func (*Workflow) Steps

func (w *Workflow) Steps() []*Step

Steps returns the workflow steps

func (*Workflow) Validate added in v0.0.2

func (w *Workflow) Validate() error

Validate checks the workflow for structural problems.

Structural validation does not consult the activity registry or the script compiler — those binding-level checks run at NewExecution time. Validate collects every problem it finds into a *ValidationError rather than failing on the first one.

This runs automatically as part of workflow.New. It is also exposed for tools (editors, linters) that want to validate a workflow without constructing one.

type WorkflowError

type WorkflowError struct {
	Type    string      `json:"type"`
	Cause   string      `json:"cause"`
	Details interface{} `json:"details,omitempty"`
	Wrapped error       `json:"-"` // Original error being wrapped
}

WorkflowError represents a structured error with classification. It supports Go's error wrapping patterns via Unwrap.

Details

Details is intentionally typed as any so consumers can attach arbitrary structured context. It is NOT guaranteed to round-trip through Checkpoint persistence: Checkpoint.Error is a flat string, so on resume the Details field will be lost. If a consumer needs structured details to survive a checkpoint/resume cycle, wrap a custom error type and surface the structure from the wrapped error instead of relying on Details.

func ClassifyError

func ClassifyError(err error) *WorkflowError

ClassifyError attempts to classify a regular error into a WorkflowError

func NewWorkflowError

func NewWorkflowError(errorType, cause string) *WorkflowError

NewWorkflowError creates a new WorkflowError with the specified type and cause. The type can be any user-defined string e.g. "network-error". The important thing is that it may be used to match against the type used in a retry config.

func (*WorkflowError) Error

func (e *WorkflowError) Error() string

Error implements the error interface

func (*WorkflowError) ToErrorOutput

func (e *WorkflowError) ToErrorOutput() ErrorOutput

ToErrorOutput converts a WorkflowError to ErrorOutput for catch handlers

func (*WorkflowError) Unwrap

func (e *WorkflowError) Unwrap() error

Unwrap implements the error unwrapping interface for Go's errors.Is and errors.As

type WorkflowExecutionEvent

type WorkflowExecutionEvent struct {
	ExecutionID  string
	WorkflowName string
	Status       ExecutionStatus
	StartTime    time.Time
	EndTime      time.Time
	Duration     time.Duration
	Inputs       map[string]any
	Outputs      map[string]any
	PathCount    int
	Error        error
}

WorkflowExecutionEvent provides context for workflow-level execution events

type WorkflowRegistry

type WorkflowRegistry interface {
	// Register adds a workflow to the registry
	Register(workflow *Workflow) error

	// Get retrieves a workflow by name
	Get(name string) (*Workflow, bool)

	// List returns all registered workflow names
	List() []string
}

WorkflowRegistry manages a collection of workflow definitions

Directories

Path Synopsis
cmd
workflow command
examples
branching command
callbacks command
child_workflows command
durable_sleep command
Example: durable_sleep
Example: durable_sleep
edge_matching command
error_handling command
fenced_checkpointer command
Example: fenced_checkpointer
Example: fenced_checkpointer
join_branches command
pause_unpause command
Example: pause_unpause
Example: pause_unpause
retry command
retry_simple command
runner command
Example: runner
Example: runner
signal_wait command
Example: signal_wait
Example: signal_wait
simple command
step_progress command
Example: step_progress
Example: step_progress
structured_result command
Example: structured_result
Example: structured_result
validation command
Example: validation
Example: validation
experimental
worker module
internal
require
Package require provides a minimal subset of the testify/require API so that tests in this module can run without a third-party dependency.
Package require provides a minimal subset of the testify/require API so that tests in this module can run without a third-party dependency.
Package workflowtest provides test utilities for the workflow library.
Package workflowtest provides test utilities for the workflow library.