Documentation
¶
Overview ¶
A data pipeline processing engine.
See the README for more complete examples and guides.
Code Organization:
The pipeline package provides an API for how nodes can be connected to form a pipeline. The individual implementations of each node exist in this kapacitor package. The reason for the separation is to keep the exported API from the pipeline package clean as it is consumed via the TICKscripts (a DSL for Kapacitor).
Other Concepts:
Stream vs Batch -- Use of the word 'stream' indicates data arrives a single data point at a time. Use of the word 'batch' indicates data arrives in sets or batches or data points.
Task -- A task represents a concrete workload to perform. It consists of a pipeline and an identifying name. Basic CRUD operations can be performed on tasks.
Task Master -- Responsible for executing a task in a specific environment.
Replay -- Replays static datasets against tasks.
Index ¶
- Constants
- Variables
- func ConvertResultTimes(r *Result)
- func CreateDBRPMap(dbrps []DBRP) map[DBRP]bool
- func EvalPredicate(se stateful.Expression, scopePool stateful.ScopePool, ...) (bool, error)
- func ReplayBatchFromChan(clck clock.Clock, batches []<-chan edge.BufferedBatchMessage, ...) <-chan error
- func ReplayBatchFromIO(clck clock.Clock, data []io.ReadCloser, collectors []BatchCollector, ...) <-chan error
- func ReplayStreamFromChan(clck clock.Clock, points <-chan edge.PointMessage, collector StreamCollector, ...) <-chan error
- func ReplayStreamFromIO(clck clock.Clock, data io.ReadCloser, collector StreamCollector, recTime bool, ...) <-chan error
- func WriteBatchForRecording(w io.Writer, b edge.BufferedBatchMessage) error
- func WritePointForRecording(w io.Writer, p edge.PointMessage, precision string) error
- type AlertNode
- type AutoscaleNode
- type BarrierNode
- type BatchCollector
- type BatchNode
- type BatchQueries
- type ChangeDetectNode
- type CombineNode
- type DBRP
- type DefaultNode
- func (n *DefaultNode) Barrier(b edge.BarrierMessage) (edge.Message, error)
- func (n *DefaultNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error)
- func (n *DefaultNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error)
- func (n *DefaultNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error)
- func (n *DefaultNode) Done()
- func (n *DefaultNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error)
- func (n *DefaultNode) Point(p edge.PointMessage) (edge.Message, error)
- func (n *DefaultNode) Wait() error
- type DeleteNode
- func (n *DeleteNode) Barrier(b edge.BarrierMessage) (edge.Message, error)
- func (n *DeleteNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error)
- func (n *DeleteNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error)
- func (n *DeleteNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error)
- func (n *DeleteNode) Done()
- func (n *DeleteNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error)
- func (n *DeleteNode) Point(p edge.PointMessage) (edge.Message, error)
- func (n *DeleteNode) Wait() error
- type DerivativeNode
- type Diagnostic
- type Edge
- type EdgeDiagnostic
- type EvalNode
- type ExecutingTask
- func (et *ExecutingTask) BatchCount() (int, error)
- func (et *ExecutingTask) BatchQueries(start, stop time.Time) ([]BatchQueries, error)
- func (et *ExecutingTask) EDot(labels bool) []byte
- func (et *ExecutingTask) ExecutionStats() (ExecutionStats, error)
- func (et *ExecutingTask) GetOutput(name string) (Output, error)
- func (et *ExecutingTask) Snapshot() (*TaskSnapshot, error)
- func (et *ExecutingTask) StartBatching() error
- func (et *ExecutingTask) StopStats()
- func (et *ExecutingTask) Wait() error
- type ExecutionStats
- type FlattenNode
- type FromNode
- func (n *FromNode) Barrier(b edge.BarrierMessage) (edge.Message, error)
- func (n *FromNode) BatchPoint(edge.BatchPointMessage) (edge.Message, error)
- func (n *FromNode) BeginBatch(edge.BeginBatchMessage) (edge.Message, error)
- func (n *FromNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error)
- func (n *FromNode) Done()
- func (n *FromNode) EndBatch(edge.EndBatchMessage) (edge.Message, error)
- func (n *FromNode) Point(p edge.PointMessage) (edge.Message, error)
- func (n *FromNode) Wait() error
- type GroupByNode
- func (n *GroupByNode) Barrier(b edge.BarrierMessage) error
- func (n *GroupByNode) BatchPoint(bp edge.BatchPointMessage) error
- func (n *GroupByNode) BeginBatch(begin edge.BeginBatchMessage) error
- func (n *GroupByNode) DeleteGroup(d edge.DeleteGroupMessage) error
- func (n *GroupByNode) Done()
- func (n *GroupByNode) EndBatch(end edge.EndBatchMessage) error
- func (n *GroupByNode) Point(p edge.PointMessage) error
- func (n *GroupByNode) Wait() error
- type HTTPOutNode
- type HTTPPostNode
- type InfluxDBOutNode
- func (n *InfluxDBOutNode) Barrier(b edge.BarrierMessage) (edge.Message, error)
- func (n *InfluxDBOutNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error)
- func (n *InfluxDBOutNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error)
- func (n *InfluxDBOutNode) BufferedBatch(batch edge.BufferedBatchMessage) (edge.Message, error)
- func (n *InfluxDBOutNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error)
- func (n *InfluxDBOutNode) Done()
- func (n *InfluxDBOutNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error)
- func (n *InfluxDBOutNode) Point(p edge.PointMessage) (edge.Message, error)
- func (n *InfluxDBOutNode) Wait() error
- type InfluxQLNode
- type JoinNode
- type KapacitorLoopbackNode
- func (n *KapacitorLoopbackNode) Barrier(edge.BarrierMessage) error
- func (n *KapacitorLoopbackNode) BatchPoint(bp edge.BatchPointMessage) error
- func (n *KapacitorLoopbackNode) BeginBatch(begin edge.BeginBatchMessage) error
- func (n *KapacitorLoopbackNode) DeleteGroup(edge.DeleteGroupMessage) error
- func (n *KapacitorLoopbackNode) Done()
- func (n *KapacitorLoopbackNode) EndBatch(edge.EndBatchMessage) error
- func (n *KapacitorLoopbackNode) Point(p edge.PointMessage) error
- func (n *KapacitorLoopbackNode) Wait() error
- type LogNode
- func (n *LogNode) Barrier(b edge.BarrierMessage) (edge.Message, error)
- func (n *LogNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error)
- func (n *LogNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error)
- func (n *LogNode) BufferedBatch(batch edge.BufferedBatchMessage) (edge.Message, error)
- func (n *LogNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error)
- func (n *LogNode) Done()
- func (n *LogNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error)
- func (n *LogNode) Point(p edge.PointMessage) (edge.Message, error)
- func (n *LogNode) Wait() error
- type LogService
- type MaxDuration
- type NoOpNode
- type Node
- type NodeDiagnostic
- type NoopMetaClient
- func (m *NoopMetaClient) Authenticate(username, password string) (ui *meta.UserInfo, err error)
- func (m *NoopMetaClient) CreateDatabase(name string) (*meta.DatabaseInfo, error)
- func (m *NoopMetaClient) CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
- func (m *NoopMetaClient) CreateRetentionPolicy(database string, rpi *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error)
- func (m *NoopMetaClient) Database(name string) *meta.DatabaseInfo
- func (m *NoopMetaClient) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error)
- func (m *NoopMetaClient) Users() ([]meta.UserInfo, error)
- func (m *NoopMetaClient) WaitForLeader(d time.Duration) error
- type Output
- type Query
- func (q *Query) AlignGroup()
- func (q *Query) Clone() (*Query, error)
- func (q *Query) DBRPs() ([]DBRP, error)
- func (q *Query) Dimensions(dims []interface{}) error
- func (q *Query) Fill(option influxql.FillOption, value interface{})
- func (q *Query) IsGroupedByTime() bool
- func (q *Query) SetStartTime(s time.Time)
- func (q *Query) SetStopTime(s time.Time)
- func (q *Query) StartTime() time.Time
- func (q *Query) StopTime() time.Time
- func (q *Query) String() string
- type QueryNode
- type Result
- type SampleNode
- type ShiftNode
- func (n *ShiftNode) Barrier(b edge.BarrierMessage) (edge.Message, error)
- func (n *ShiftNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error)
- func (n *ShiftNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error)
- func (n *ShiftNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error)
- func (n *ShiftNode) Done()
- func (n *ShiftNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error)
- func (n *ShiftNode) Point(p edge.PointMessage) (edge.Message, error)
- func (n *ShiftNode) Wait() error
- type SideloadNode
- func (n *SideloadNode) Barrier(b edge.BarrierMessage) (edge.Message, error)
- func (n *SideloadNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error)
- func (n *SideloadNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error)
- func (n *SideloadNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error)
- func (n *SideloadNode) Done()
- func (n *SideloadNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error)
- func (n *SideloadNode) Point(p edge.PointMessage) (edge.Message, error)
- func (n *SideloadNode) Wait() error
- type Socket
- type StateTrackingNode
- type StatsNode
- type StreamCollector
- type StreamEdge
- type StreamNode
- type Task
- type TaskDiagnostic
- type TaskMaster
- func (tm *TaskMaster) BatchCollectors(id string) []BatchCollector
- func (tm *TaskMaster) Close() error
- func (tm *TaskMaster) CreateTICKScope() *stateful.Scope
- func (tm *TaskMaster) DelFork(id string)
- func (tm *TaskMaster) DeleteTask(id string) error
- func (tm *TaskMaster) Drain()
- func (tm *TaskMaster) ExecutingDot(id string, labels bool) string
- func (tm *TaskMaster) ExecutionStats(id string) (ExecutionStats, error)
- func (tm *TaskMaster) ID() string
- func (tm *TaskMaster) IsExecuting(id string) bool
- func (tm *TaskMaster) New(id string) *TaskMaster
- func (tm *TaskMaster) NewFork(taskName string, dbrps []DBRP, measurements []string) (edge.StatsEdge, error)
- func (tm *TaskMaster) NewTask(id, script string, tt TaskType, dbrps []DBRP, snapshotInterval time.Duration, ...) (*Task, error)
- func (tm *TaskMaster) NewTemplate(id, script string, tt TaskType) (*Template, error)
- func (tm *TaskMaster) Open() (err error)
- func (tm *TaskMaster) SnapshotTask(id string) (*TaskSnapshot, error)
- func (tm *TaskMaster) StartTask(t *Task) (*ExecutingTask, error)
- func (tm *TaskMaster) StopTask(id string) error
- func (tm *TaskMaster) StopTasks()
- func (tm *TaskMaster) Stream(name string) (StreamCollector, error)
- func (tm *TaskMaster) WriteKapacitorPoint(p edge.PointMessage) error
- func (tm *TaskMaster) WritePoints(database, retentionPolicy string, consistencyLevel imodels.ConsistencyLevel, ...) error
- type TaskMasterLookup
- type TaskSnapshot
- type TaskType
- type Template
- type TimeDimension
- type UDFNode
- type UDFProcess
- func (p *UDFProcess) Abort(err error)
- func (p *UDFProcess) Close() error
- func (p *UDFProcess) In() chan<- edge.Message
- func (p *UDFProcess) Info() (udf.Info, error)
- func (p *UDFProcess) Init(options []*agent.Option) error
- func (p *UDFProcess) Open() error
- func (p *UDFProcess) Out() <-chan edge.Message
- func (p *UDFProcess) Restore(snapshot []byte) error
- func (p *UDFProcess) Snapshot() ([]byte, error)
- type UDFService
- type UDFSocket
- func (s *UDFSocket) Abort(err error)
- func (s *UDFSocket) Close() error
- func (s *UDFSocket) In() chan<- edge.Message
- func (s *UDFSocket) Info() (udf.Info, error)
- func (s *UDFSocket) Init(options []*agent.Option) error
- func (s *UDFSocket) Open() error
- func (s *UDFSocket) Out() <-chan edge.Message
- func (s *UDFSocket) Restore(snapshot []byte) error
- func (s *UDFSocket) Snapshot() ([]byte, error)
- type UnionNode
- type WhereNode
- type WindowNode
Constants ¶
const (
MainTaskMaster = "main"
)
Variables ¶
var ErrAborted = errors.New("edged aborted")
var ErrTaskMasterClosed = errors.New("TaskMaster is closed")
var ErrTaskMasterOpen = errors.New("TaskMaster is open")
var ErrWrongTaskType = errors.New("wrong task type")
Functions ¶
func ConvertResultTimes ¶ added in v0.10.1
func ConvertResultTimes(r *Result)
func CreateDBRPMap ¶
func EvalPredicate ¶
func EvalPredicate(se stateful.Expression, scopePool stateful.ScopePool, p edge.FieldsTagsTimeGetter) (bool, error)
EvalPredicate - Evaluate a given expression as a boolean predicate against a set of fields and tags
func ReplayBatchFromChan ¶ added in v1.0.0
func ReplayBatchFromChan(clck clock.Clock, batches []<-chan edge.BufferedBatchMessage, collectors []BatchCollector, recTime bool) <-chan error
Replay batch data from a channel source.
func ReplayBatchFromIO ¶ added in v1.0.0
func ReplayBatchFromIO(clck clock.Clock, data []io.ReadCloser, collectors []BatchCollector, recTime bool) <-chan error
Replay batch data from an IO source.
func ReplayStreamFromChan ¶ added in v1.0.0
func ReplayStreamFromChan(clck clock.Clock, points <-chan edge.PointMessage, collector StreamCollector, recTime bool) <-chan error
Replay stream data from a channel source.
func ReplayStreamFromIO ¶ added in v1.0.0
func ReplayStreamFromIO(clck clock.Clock, data io.ReadCloser, collector StreamCollector, recTime bool, precision string) <-chan error
Replay stream data from an IO source.
func WriteBatchForRecording ¶
func WriteBatchForRecording(w io.