Documentation
¶
Index ¶
- Constants
- Variables
- func BootstrapCluster(conf *Config, logs LogStore, stable StableStore, snaps SnapshotStore, ...) error
- func HasExistingState(logs LogStore, stable StableStore, snaps SnapshotStore) (bool, error)
- func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport)
- func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, ...) error
- func ValidateConfig(config *Config) error
- type AppendEntriesRequest
- type AppendEntriesResponse
- type AppendFuture
- type AppendPipeline
- type ApplyFuture
- type Config
- type Configuration
- type ConfigurationChangeCommand
- type ConfigurationFuture
- type DiscardSnapshotSink
- type DiscardSnapshotStore
- type FSM
- type FSMSnapshot
- type FileSnapshotSink
- type FileSnapshotStore
- func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64, configuration Configuration, ...) (SnapshotSink, error)
- func (f *FileSnapshotStore) List() ([]*SnapshotMeta, error)
- func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error)
- func (f *FileSnapshotStore) ReapSnapshots() error
- type FilterFn
- type Future
- type IndexFuture
- type InmemSnapshotSink
- type InmemSnapshotStore
- type InmemStore
- func (i *InmemStore) DeleteRange(min, max uint64) error
- func (i *InmemStore) FirstIndex() (uint64, error)
- func (i *InmemStore) Get(key []byte) ([]byte, error)
- func (i *InmemStore) GetLog(index uint64, log *Log) error
- func (i *InmemStore) GetUint64(key []byte) (uint64, error)
- func (i *InmemStore) LastIndex() (uint64, error)
- func (i *InmemStore) Set(key []byte, val []byte) error
- func (i *InmemStore) SetUint64(key []byte, val uint64) error
- func (i *InmemStore) StoreLog(log *Log) error
- func (i *InmemStore) StoreLogs(logs []*Log) error
- type InmemTransport
- func (i *InmemTransport) AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, ...) error
- func (i *InmemTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error)
- func (i *InmemTransport) Close() error
- func (i *InmemTransport) Connect(peer ServerAddress, t Transport)
- func (i *InmemTransport) Consumer() <-chan RPC
- func (i *InmemTransport) DecodePeer(buf []byte) ServerAddress
- func (i *InmemTransport) Disconnect(peer ServerAddress)
- func (i *InmemTransport) DisconnectAll()
- func (i *InmemTransport) EncodePeer(id ServerID, p ServerAddress) []byte
- func (i *InmemTransport) InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, ...) error
- func (i *InmemTransport) LocalAddr() ServerAddress
- func (i *InmemTransport) RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, ...) error
- func (i *InmemTransport) SetHeartbeatHandler(cb func(RPC))
- type InstallSnapshotRequest
- type InstallSnapshotResponse
- type Log
- type LogCache
- type LogStore
- type LogType
- type LoopbackTransport
- type NetworkTransport
- func NewNetworkTransport(stream StreamLayer, maxPool int, timeout time.Duration, logOutput io.Writer) *NetworkTransport
- func NewNetworkTransportWithConfig(config *NetworkTransportConfig) *NetworkTransport
- func NewNetworkTransportWithLogger(stream StreamLayer, maxPool int, timeout time.Duration, logger *log.Logger) *NetworkTransport
- func NewTCPTransport(bindAddr string, advertise net.Addr, maxPool int, timeout time.Duration, ...) (*NetworkTransport, error)
- func NewTCPTransportWithConfig(bindAddr string, advertise net.Addr, config *NetworkTransportConfig) (*NetworkTransport, error)
- func NewTCPTransportWithLogger(bindAddr string, advertise net.Addr, maxPool int, timeout time.Duration, ...) (*NetworkTransport, error)
- func (n *NetworkTransport) AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, ...) error
- func (n *NetworkTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error)
- func (n *NetworkTransport) Close() error
- func (n *NetworkTransport) Consumer() <-chan RPC
- func (n *NetworkTransport) DecodePeer(buf []byte) ServerAddress
- func (n *NetworkTransport) EncodePeer(id ServerID, p ServerAddress) []byte
- func (n *NetworkTransport) InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, ...) error
- func (n *NetworkTransport) IsShutdown() bool
- func (n *NetworkTransport) LocalAddr() ServerAddress
- func (n *NetworkTransport) RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, ...) error
- func (n *NetworkTransport) SetHeartbeatHandler(cb func(rpc RPC))
- type NetworkTransportConfig
- type Observation
- type Observer
- type ProtocolVersion
- type RPC
- type RPCHeader
- type RPCResponse
- type Raft
- func (r *Raft) AddNonvoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture
- func (r *Raft) AddPeer(peer ServerAddress) Future
- func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture
- func (r *Raft) AppliedIndex() uint64
- func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture
- func (r *Raft) Barrier(timeout time.Duration) Future
- func (r *Raft) BootstrapCluster(configuration Configuration) Future
- func (r *Raft) DemoteVoter(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture
- func (r *Raft) DeregisterObserver(or *Observer)
- func (r *Raft) GetConfiguration() ConfigurationFuture
- func (r *Raft) LastContact() time.Time
- func (r *Raft) LastIndex() uint64
- func (r *Raft) Leader() ServerAddress
- func (r *Raft) LeaderCh() <-chan bool
- func (r *Raft) RegisterObserver(or *Observer)
- func (r *Raft) RemovePeer(peer ServerAddress) Future
- func (r *Raft) RemoveServer(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture
- func (r *Raft) Restore(meta *SnapshotMeta, reader io.Reader, timeout time.Duration) error
- func (r *Raft) Shutdown() Future
- func (r *Raft) Snapshot() SnapshotFuture
- func (r *Raft) State() RaftState
- func (r *Raft) Stats() map[string]string
- func (r *Raft) String() string
- func (r *Raft) VerifyLeader() Future
- type RaftState
- type RequestVoteRequest
- type RequestVoteResponse
- type Server
- type ServerAddress
- type ServerAddressProvider
- type ServerID
- type ServerSuffrage
- type SnapshotFuture
- type SnapshotMeta
- type SnapshotSink
- type SnapshotStore
- type SnapshotVersion
- type StableStore
- type StreamLayer
- type TCPStreamLayer
- type Transport
- type WithClose
- type WithPeers
- type WithRPCHeader
Constants ¶
const ( // DefaultTimeoutScale is the default TimeoutScale in a NetworkTransport. DefaultTimeoutScale = 256 * 1024 // 256KB )
Variables ¶
var ( // ErrLeader is returned when an operation can't be completed on a // leader node. ErrLeader = errors.New("node is the leader") // ErrNotLeader is returned when an operation can't be completed on a // follower or candidate node. ErrNotLeader = errors.New("node is not the leader") // ErrLeadershipLost is returned when a leader fails to commit a log entry // because it's been deposed in the process. ErrLeadershipLost = errors.New("leadership lost while committing log") // ErrAbortedByRestore is returned when a leader fails to commit a log // entry because it's been superseded by a user snapshot restore. ErrAbortedByRestore = errors.New("snapshot restored while committing log") // ErrRaftShutdown is returned when operations are requested against an // inactive Raft. ErrRaftShutdown = errors.New("raft is already shutdown") // ErrEnqueueTimeout is returned when a command fails due to a timeout. ErrEnqueueTimeout = errors.New("timed out enqueuing operation") // ErrNothingNewToSnapshot is returned when trying to create a snapshot // but there's nothing new commited to the FSM since we started. ErrNothingNewToSnapshot = errors.New("nothing new to snapshot") // ErrUnsupportedProtocol is returned when an operation is attempted // that's not supported by the current protocol version. ErrUnsupportedProtocol = errors.New("operation not supported with current protocol version") // ErrCantBootstrap is returned when attempt is made to bootstrap a // cluster that already has state present. ErrCantBootstrap = errors.New("bootstrap only works on new clusters") )
var ( // ErrTransportShutdown is returned when operations on a transport are // invoked after it's been terminated. ErrTransportShutdown = errors.New("transport shutdown") // ErrPipelineShutdown is returned when the pipeline is closed. ErrPipelineShutdown = errors.New("append pipeline closed") )
var ( // ErrLogNotFound indicates a given log entry is not available. ErrLogNotFound = errors.New("log not found") // ErrPipelineReplicationNotSupported can be returned by the transport to // signal that pipeline replication is not supported in general, and that // no error message should be produced. ErrPipelineReplicationNotSupported = errors.New("pipeline replication not supported") )
Functions ¶
func BootstrapCluster ¶ added in v1.0.0
func BootstrapCluster(conf *Config, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport, configuration Configuration) error
BootstrapCluster initializes a server's storage with the given cluster configuration. This should only be called at the beginning of time for the cluster, and you absolutely must make sure that you call it with the same configuration on all the Voter servers. There is no need to bootstrap Nonvoter and Staging servers.
One sane approach is to boostrap a single server with a configuration listing just itself as a Voter, then invoke AddVoter() on it to add other servers to the cluster.
func HasExistingState ¶ added in v1.0.0
func HasExistingState(logs LogStore, stable StableStore, snaps SnapshotStore) (bool, error)
HasExistingState returns true if the server has any existing state (logs, knowledge of a current term, or any snapshots).
func NewInmemTransport ¶
func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport)
NewInmemTransport is used to initialize a new transport and generates a random local address if none is specified
func RecoverCluster ¶ added in v1.0.0
func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport, configuration Configuration) error
RecoverCluster is used to manually force a new configuration in order to recover from a loss of quorum where the current configuration cannot be restored, such as when several servers die at the same time. This works by reading all the current state for this server, creating a snapshot with the supplied configuration, and then truncating the Raft log. This is the only safe way to force a given configuration without actually altering the log to insert any new entries, which could cause conflicts with other servers with different state.
WARNING! This operation implicitly commits all entries in the Raft log, so in general this is an extremely unsafe operation. If you've lost your other servers and are performing a manual recovery, then you've also lost the commit information, so this is likely the best you can do, but you should be aware that calling this can cause Raft log entries that were in the process of being replicated but not yet be committed to be committed.
Note the FSM passed here is used for the snapshot operations and will be left in a state that should not be used by the application. Be sure to discard this FSM and any associated state and provide a fresh one when calling NewRaft later.
A typical way to recover the cluster is to shut down all servers and then run RecoverCluster on every server using an identical configuration. When the cluster is then restarted, and election should occur and then Raft will resume normal operation. If it's desired to make a particular server the leader, this can be used to inject a new configuration with that server as the sole voter, and then join up other new clean-state peer servers using the usual APIs in order to bring the cluster back into a known state.
func ValidateConfig ¶
ValidateConfig is used to validate a sane configuration
Types ¶
type AppendEntriesRequest ¶
type AppendEntriesRequest struct {
RPCHeader
// Provide the current term and leader
Term uint64
Leader []byte
// Provide the previous entries for integrity checking
PrevLogEntry uint64
PrevLogTerm uint64
// New entries to commit
Entries []*Log
// Commit index on the leader
LeaderCommitIndex uint64
}
AppendEntriesRequest is the command used to append entries to the replicated log.
func (*AppendEntriesRequest) GetRPCHeader ¶ added in v1.0.0
func (r *AppendEntriesRequest) GetRPCHeader() RPCHeader
See WithRPCHeader.
type AppendEntriesResponse ¶
type AppendEntriesResponse struct {
RPCHeader
// Newer term if leader is out of date
Term uint64
// Last Log is a hint to help accelerate rebuilding slow nodes
LastLog uint64
// We may not succeed if we have a conflicting entry
Success bool
// There are scenarios where this request didn't succeed
// but there's no need to wait/back-off the next attempt.
NoRetryBackoff bool
}
AppendEntriesResponse is the response returned from an AppendEntriesRequest.
func (*AppendEntriesResponse) GetRPCHeader ¶ added in v1.0.0
func (r *AppendEntriesResponse) GetRPCHeader() RPCHeader
See WithRPCHeader.
type AppendFuture ¶
type AppendFuture interface {
Future
// Start returns the time that the append request was started.
// It is always OK to call this method.
Start() time.Time
// Request holds the parameters of the AppendEntries call.
// It is always OK to call this method.
Request() *AppendEntriesRequest
// Response holds the results of the AppendEntries call.
// This method must only be called after the Error
// method returns, and will only be valid on success.
Response() *AppendEntriesResponse
}
AppendFuture is used to return information about a pipelined AppendEntries request.
type AppendPipeline ¶
type AppendPipeline interface {
// AppendEntries is used to add another request to the pipeline.
// The send may block which is an effective form of back-pressure.
AppendEntries(args *AppendEntriesRequest, resp *AppendEntriesResponse) (AppendFuture, error)
// Consumer returns a channel that can be used to consume
// response futures when they are ready.
Consumer() <-chan AppendFuture
// Close closes the pipeline and cancels all inflight RPCs
Close() error
}
AppendPipeline is used for pipelining AppendEntries requests. It is used to increase the replication throughput by masking latency and better utilizing bandwidth.
type ApplyFuture ¶
type ApplyFuture interface {
IndexFuture
// Response returns the FSM response as returned
// by the FSM.Apply method. This must not be called
// until after the Error method has returned.
Response() interface{}
}
ApplyFuture is used for Apply and can return the FSM response.
type Config ¶
type Config struct {
// ProtocolVersion allows a Raft server to inter-operate with older
// Raft servers running an older version of the code. This is used to
// version the wire protocol as well as Raft-specific log entries that
// the server uses when _speaking_ to other servers. There is currently
// no auto-negotiation of versions so all servers must be manually
// configured with compatible versions. See ProtocolVersionMin and
// ProtocolVersionMax for the versions of the protocol that this server
// can _understand_.
ProtocolVersion ProtocolVersion
// HeartbeatTimeout specifies the time in follower state without
// a leader before we attempt an election.
HeartbeatTimeout time.Duration
// ElectionTimeout specifies the time in candidate state without
// a leader before we attempt an election.
ElectionTimeout time.Duration
// CommitTimeout controls the time without an Apply() operation
// before we heartbeat to ensure a timely commit. Due to random
// staggering, may be delayed as much as 2x this value.
CommitTimeout time.Duration
// MaxAppendEntries controls the maximum number of append entries
// to send at once. We want to strike a balance between efficiency
// and avoiding waste if the follower is going to reject because of
// an inconsistent log.
MaxAppendEntries int
// If we are a member of a cluster, and RemovePeer is invoked for the
// local node, then we forget all peers and transition into the follower state.
// If ShutdownOnRemove is is set, we additional shutdown Raft. Otherwise,
// we can become a leader of a cluster containing only this node.
ShutdownOnRemove bool
// TrailingLogs controls how many logs we leave after a snapshot. This is
// used so that we can quickly replay logs on a follower instead of being
// forced to send an entire snapshot.
TrailingLogs uint64
// SnapshotInterval controls how often we check if we should perform a snapshot.
// We randomly stagger between this value and 2x this value to avoid the entire
// cluster from performing a snapshot at once.
SnapshotInterval time.Duration
// SnapshotThreshold controls how many outstanding logs there must be before
// we perform a snapshot. This is to prevent excessive snapshots when we can
// just replay a small set of logs.
SnapshotThreshold uint64
// LeaderLeaseTimeout is used to control how long the "lease" lasts
// for being the leader without being able to contact a quorum
// of nodes. If we reach this interval without contact, we will
// step down as leader.
LeaderLeaseTimeout time.Duration
// StartAsLeader forces Raft to start in the leader state. This should
// never be used except for testing purposes, as it can cause a split-brain.
StartAsLeader bool
// The unique ID for this server across all time. When running with
// ProtocolVersion < 3, you must set this to be the same as the network
// address of your transport.
LocalID ServerID
// NotifyCh is used to provide a channel that will be notified of leadership
// changes. Raft will block writing to this channel, so it should either be
// buffered or aggressively consumed.
NotifyCh chan<- bool
// LogOutput is used as a sink for logs, unless Logger is specified.
// Defaults to os.Stderr.
LogOutput io.Writer
// Logger is a user-provided logger. If nil, a logger writing to LogOutput
// is used.
Logger *log.Logger
}
Config provides any necessary configuration for the Raft server.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns a Config with usable defaults.
type Configuration ¶ added in v1.0.0
type Configuration struct {
Servers []Server
}
Configuration tracks which servers are in the cluster, and whether they have votes. This should include the local server, if it's a member of the cluster. The servers are listed no particular order, but each should only appear once. These entries are appended to the log during membership changes.
func ReadConfigJSON ¶ added in v1.0.0
func ReadConfigJSON(path string) (Configuration, error)
ReadConfigJSON reads a new-style peers.json and returns a configuration structure. This can be used to perform manual recovery when running protocol versions that use server IDs.
func ReadPeersJSON ¶ added in v1.0.0
func ReadPeersJSON(path string) (Configuration, error)
ReadPeersJSON consumes a legacy peers.json file in the format of the old JSON peer store and creates a new-style configuration structure. This can be used to migrate this data or perform manual recovery when running protocol versions that can interoperate with older, unversioned Raft servers. This should not be used once server IDs are in use, because the old peers.json file didn't have support for these, nor non-voter suffrage types.
func (*Configuration) Clone ¶ added in v1.0.0
func (c *Configuration) Clone() (copy Configuration)
Clone makes a deep copy of a Configuration.
type ConfigurationChangeCommand ¶ added in v1.0.0
type ConfigurationChangeCommand uint8
ConfigurationChangeCommand is the different ways to change the cluster configuration.
const ( // AddStaging makes a server Staging unless its Voter. AddStaging ConfigurationChangeCommand = iota // AddNonvoter makes a server Nonvoter unless its Staging or Voter. AddNonvoter // DemoteVoter makes a server Nonvoter unless its absent. DemoteVoter // RemoveServer removes a server entirely from the cluster membership. RemoveServer // Promote is created automatically by a leader; it turns a Staging server // into a Voter. Promote )
func (ConfigurationChangeCommand) String ¶ added in v1.0.0
func (c ConfigurationChangeCommand) String() string
type ConfigurationFuture ¶ added in v1.0.0
type ConfigurationFuture interface {
IndexFuture
// Configuration contains the latest configuration. This must
// not be called until after the Error method has returned.
Configuration() Configuration
}
ConfigurationFuture is used for GetConfiguration and can return the latest configuration in use by Raft.
type DiscardSnapshotSink ¶
type DiscardSnapshotSink struct{}
func (*DiscardSnapshotSink) Cancel ¶
func (d *DiscardSnapshotSink) Cancel() error
func (*DiscardSnapshotSink) Close ¶
func (d *DiscardSnapshotSink) Close() error
func (*DiscardSnapshotSink) ID ¶
func (d *DiscardSnapshotSink) ID() string
type DiscardSnapshotStore ¶
type DiscardSnapshotStore struct{}
DiscardSnapshotStore is used to successfully snapshot while always discarding the snapshot. This is useful for when the log should be truncated but no snapshot should be retained. This should never be used for production use, and is only suitable for testing.
func NewDiscardSnapshotStore ¶
func NewDiscardSnapshotStore() *DiscardSnapshotStore
NewDiscardSnapshotStore is used to create a new DiscardSnapshotStore.
func (*DiscardSnapshotStore) Create ¶
func (d *DiscardSnapshotStore) Create(version SnapshotVersion, index, term uint64, configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error)
func (*DiscardSnapshotStore) List ¶
func (d *DiscardSnapshotStore) List() ([]*SnapshotMeta, error)
func (*DiscardSnapshotStore) Open ¶
func (d *DiscardSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error)
type FSM ¶
type FSM interface {
// Apply log is invoked once a log entry is committed.
// It returns a value which will be made available in the
// ApplyFuture returned by Raft.Apply method if that
// method was called on the same Raft node as the FSM.
Apply(*Log) interface{}
// Snapshot is used to support log compaction. This call should
// return an FSMSnapshot which can be used to save a point-in-time
// snapshot of the FSM. Apply and Snapshot are not called in multiple
// threads, but Apply will be called concurrently with Persist. This means
// the FSM should be implemented in a fashion that allows for concurrent
// updates while a snapshot is happening.
Snapshot() (FSMSnapshot, error)
// Restore is used to restore an FSM from a snapshot. It is not called
// concurrently with any other command. The FSM must discard all previous
// state.
Restore(io.ReadCloser) error
}
FSM provides an interface that can be implemented by clients to make use of the replicated log.
type FSMSnapshot ¶
type FSMSnapshot interface {
// Persist should dump all necessary state to the WriteCloser 'sink',
// and call sink.Close() when finished or call sink.Cancel() on error.
Persist(sink SnapshotSink) error
// Release is invoked when we are finished with the snapshot.
Release()
}
FSMSnapshot is returned by an FSM in response to a Snapshot It must be safe to invoke FSMSnapshot methods with concurrent calls to Apply.
type FileSnapshotSink ¶
type FileSnapshotSink struct {
// contains filtered or unexported fields
}
FileSnapshotSink implements SnapshotSink with a file.
