requeue

package module
v0.0.0-...-c0583b9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2020 License: Apache-2.0 Imports: 22 Imported by: 0

README

nats-requeue

GoDoc CircleCI License

This is a service that persists NATS messages to disk and retries sending them at a deferred time.

THIS IS STILL BEING IMPLEMENTED. IT IS NOT READY FOR USE.

Getting Started

Installing
go get github.com/nickpoorman/nats-requeue

This will retrieve the library and install the retry-queue command line utility into your $GOBIN path.

Configuration

Badger recommends running with GOMAXPROCS=128.

I would recommend setting your soft ulimit to 65535. Google how to do it on your OS.

AWS ECS

Uses

DLQ (dead letter queue)

TODO

Disk-backed Buffer

TODO

How Requeue Works

All queue meta information is kept in memory and synced to disk.

Checkpointing

In memory meta for a queue consists of a Checkpoint. The checkpoint is used for seeking to the a key in Badger. This allows us to iterate directly between the events we are looking for in a time range.

Contributing

This project uses Flatbuffers to serialize messages. To build the protocol/*.fbs files you must have flatc installed and then run the following.

make protocol

Thanks

  • NATS for an awesome distributed messaging system.
  • This project uses dgraph-io/badger for storing the messages on disk.

Documentation

Overview

Package requeue implements a durable service that persists [NATS](https://github.com/nats-io) messages to disk and retries sending them at a deferred time.

Index

Constants

View Source
const (
	// DefaultNatsServers is the default nats server URLs (separated by comma).
	DefaultNatsServers = nats.DefaultURL

	// DefaultNatsClientName is the default name to assign to the NATS client
	// connection.
	DefaultNatsClientName = "requeue-nats"

	// DefaultNatsRetryOnFailure is true by default so that requeue will attempt
	// to automatically reconnect to nats on a failure.
	DefaultNatsRetryOnFailure = true

	// DefaultNatsSubject is the deafult subject requeue will subscribe to for
	// messages. By default `requeue.>` will match
	// `requeue.foo`, `requeue.foo.bar`, and `requeue.foo.bar.baz`.
	// ">" matches any length of the tail of a subject, and can only be the last token
	// E.g. 'foo.>' will match 'foo.bar', 'foo.bar.baz', 'foo.foo.bar.bax.22'.
	DefaultNatsSubject = "requeue.>"

	// DefaultNatsQueueName is the default queue to subscribe to. Messages from
	// the queue will be distributed amongst the the subscribers of the queue.
	DefaultNatsQueueName = "requeue-workers"

	DefaultNumConcurrentBatchTransactions = 4
)

Variables

This section is empty.

Functions

func RetryRequest

func RetryRequest(nc *nats.Conn, subject string, payload []byte, timeout time.Duration, times int) (*nats.Msg, error)

Types

type Conn

type Conn struct {
	Opts Options
	// contains filtered or unexported fields
}

func Connect

func Connect(options ...Option) (*Conn, error)

func NewConn

func NewConn(o Options) *Conn

func (*Conn) Close

func (c *Conn) Close()

func (*Conn) HasBeenClosed

func (c *Conn) HasBeenClosed() <-chan struct{}

func (*Conn) NATSClosedHandler

func (c *Conn) NATSClosedHandler(nc *nats.Conn)

func (*Conn) NATSDisconnectErrHandler

func (c *Conn) NATSDisconnectErrHandler(nc *nats.Conn, err error)

func (*Conn) NATSErrorHandler

func (c *Conn) NATSErrorHandler(con *nats.Conn, sub *nats.Subscription, natsErr error)

func (*Conn) NATSReconnectHandler

func (c *Conn) NATSReconnectHandler(nc *nats.Conn)

type Option

type Option func(*Options) error

Option is a function on the options to connect a Service.

func BadgerWriteMsgErr

func BadgerWriteMsgErr(cb func(*nats.Msg, error)) Option

BadgerWriteMsgErr sets the callback to be triggered when there is an error writing a message to Badger.

func ConnectContext

func ConnectContext(ctx context.Context) Option

ConnectContext sets the context to be used for connect.

func DataDir

func DataDir(path string) Option

DataDir is the directory where data will be stored. An instance of the data store will be created in this directory. This directory is also looped over by the reaper looking for zombie instances that need to be merged into the main data instance created at initialization.

func NATSConnectionError

func NATSConnectionError(connErrCb func(*Conn, error)) Option

NATSConnectionError is a callback when the connection is unable to be established.

func NATSOptions

func NATSOptions(natsOptions []nats.Option) Option

NATSOptions are options that will be provided to NATS upon establishing a connection.

func NATSQueueName

func NATSQueueName(natsQueueName string) Option

NatsQueueName is the queue to subscribe to. Messages from the queue will be distributed amongst the the subscribers of the queue.

func NATSServers

func NATSServers(natsServers string) Option

NATSServers is the nats server URLs (separated by comma).

func NATSSubject

func NATSSubject(natsSubject string) Option

NATSSubject is the subject requeue will subscribe to for messages. By default `requeue.>` will match `requeue.foo`, `requeue.foo.bar`, and `requeue.foo.bar.baz`. ">" matches any length of the tail of a subject, and can only be the last token E.g. 'foo.>' will match 'foo.bar', 'foo.bar.baz', 'foo.foo.bar.bax.22'.

func ReaperOptions

func ReaperOptions(options ...reaper.Option) Option

ReaperOpts sets the options for the instance reaper.

func RepublisherOptions

func RepublisherOptions(options ...republisher.Option) Option

RepublisherOpts sets the options for the republisher.

type Options

type Options struct {
	// contains filtered or unexported fields
}

TODO: These options should probably be lower case so they are private. Options can be used to create a customized Service connections.

func GetDefaultOptions

func GetDefaultOptions() Options

func (Options) Connect

func (o Options) Connect() (*Conn, error)

Connect will attempt to connect to a NATS server with multiple options and setup connections to the disk database.

Directories

Path Synopsis
cmd
requeue command
internal
key