leafMQ

package module
v0.0.0-...-0829b1e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2022 License: MIT Imports: 5 Imported by: 6

README

Message Queue

Message Queue Interface

MessageQueue interface{
    Publish(ctx context.Context, topic string, msg Message) error	
    Use(middlewareFunc ...MiddlewareFunc)
    Listen()
    Subscribe(topic string, dispatcher Dispatcher) error
    Ping(ctx context.Context) error
    Close() error  
    Publisher() Publisher
    Consumer() Consumer
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var MissingHandler = fmt.Errorf("error missing handler")

Functions

This section is empty.

Types

type Consumer

type Consumer interface {
	Ping(ctx context.Context) error
	Use(middlewareFunc ...MiddlewareFunc)
	Listen()
	Subscribe(topic string, dispatcher Dispatcher) error
	Close() error
}

type DispatchDTO

type DispatchDTO struct {
	Type      DispatchType
	Source    string
	RequestID string
	MsgType   string
	Msg       Message
	Log       leafLogger.Logger
	Err       error
}

type DispatchType

type DispatchType string
const (
	Handle DispatchType = "handle"
	Error  DispatchType = "error"
)

type Dispatcher

type Dispatcher interface {
	AddHandler(handler HandlerFunc, errorHandler ErrorHandlerFunc, msgType ...string)
	Use(middlewareFunc ...MiddlewareFunc)
	Dispatch(dto DispatchDTO, middlewareFunc ...MiddlewareFunc) error
}

func NewMultiEventDispatcher

func NewMultiEventDispatcher() Dispatcher

func NewSingleEventDispatcher

func NewSingleEventDispatcher() Dispatcher

type ErrorHandlerFunc

type ErrorHandlerFunc func(ctx context.Context, msg Message, err error)

type HandlerFunc

type HandlerFunc func(ctx context.Context, msg Message) error

type Job

type Job interface {
	Process(ctx context.Context, msg Message) error
	OnError(ctx context.Context, msg Message, err error)
}

type Message

type Message struct {
	Ordering   string            `json:"ordering"`
	Data       []byte            `json:"data"`
	Attributes map[string]string `json:"attributes"`
	// contains filtered or unexported fields
}

func (Message) GetID

func (m Message) GetID() string

func (Message) MarshalJSON

func (m Message) MarshalJSON() ([]byte, error)

func (*Message) SetID

func (m *Message) SetID(ID string)

func (*Message) UnmarshalJSON

func (m *Message) UnmarshalJSON(data []byte) error

type MessageQueue

type MessageQueue interface {
	Publisher
	Consumer
	Publisher() Publisher
	Consumer() Consumer
}

func NoopQueue

func NoopQueue() MessageQueue

type MiddlewareHandlerFunc

type MiddlewareHandlerFunc func(ctx context.Context, dto DispatchDTO) error

type Publisher

type Publisher interface {
	Ping(ctx context.Context) error
	Publish(ctx context.Context, topic string, msg Message) error
	Close() error
}