lucidpubsub

package module
v0.0.0-...-91f8765 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2018 License: MIT Imports: 10 Imported by: 0

README

Lucid PubSub

Google PubSub Ordering Queue in Go.

Usage

  1. Add an attribute in pubsub for the sequential index of the the published event
  2. Receive using a context.WithTimeout on a loop. 150-350ms seems to work best
  3. Iterate to dequeue
handler := NewMemoryQueue(0)
cctx, cancel := context.WithTimeout(ctx, time.Duration(200*time.Millisecond))
err = sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
  seq, err := strconv.ParseInt(m.Attributes["sequence"], 10, 64)
  if err != nil {
    m.Nack()
    cancel()
  }

  if err := handler.Enqueue(seq, m.Data); err != nil {
    m.Nack()
    cancel()
  }

  m.Ack()
})
  
it := handler.NewIterator()
for it.HasNext() {
  bytes := it.Next()
  //take a bite out of the ordered []bytes!
}

Documentation

Index

Constants

View Source
const (
	Data_Prefix byte = iota
	Sequence_Prefix
	Other_Prefix
)

Variables

View Source
var (
	CurrSeqKey = []byte{Data_Prefix, 0}
)

Functions

This section is empty.

Types

type BadgerPayloadHandler

type BadgerPayloadHandler func(txn *badger.Txn, data []byte) error

type BadgerQueue

type BadgerQueue struct {
	// contains filtered or unexported fields
}

func NewBadgerQueue

func NewBadgerQueue(projectID, subscriptionID, storageDir string, listenWindow time.Duration) (*BadgerQueue, error)

func (*BadgerQueue) Close

func (e *BadgerQueue) Close() error

func (*BadgerQueue) Process

func (e *BadgerQueue) Process(handler BadgerPayloadHandler) error

func (*BadgerQueue) Receive

func (e *BadgerQueue) Receive(ctx context.Context, handler BadgerPayloadHandler) error

type MemoryQueue

type MemoryQueue struct {
	// contains filtered or unexported fields
}

func NewMemoryQueue

func NewMemoryQueue(seedSequence int64) *MemoryQueue

NewMemoryQueue create new MemoryQueue

func (*MemoryQueue) Dump

func (m *MemoryQueue) Dump()

Dump debug info

func (*MemoryQueue) Enqueue

func (m *MemoryQueue) Enqueue(seq int64, data []byte) error

Enqueue message for ordering

func (*MemoryQueue) LatestAck

func (m *MemoryQueue) LatestAck() int64

LatestAck get the latest item acked

func (*MemoryQueue) NewIterator

func (m *MemoryQueue) NewIterator() *iterator

NewIterator create new iterator on items in the queue

Directories

Path Synopsis