Documentation
¶
Index ¶
- func GenClientID() string
- func RegisterPublisher(scheme string, factory PublisherFactory)
- func RegisterSubscriber(scheme string, factory SubscriberFactory)
- type Handler
- type HandlerFunc
- type InMemPubTopic
- type InMemPublisher
- type InMemSubTopic
- type InMemSubscriber
- type PubMessage
- type PubTopic
- type Publisher
- type PublisherFactory
- type RawSubMessage
- type StartPosition
- type SubMessage
- type SubOption
- type SubOptions
- type SubTopic
- type Subscriber
- type SubscriberFactory
- type Subscription
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GenClientID ¶
func GenClientID() string
GenClientID generates random client ID. It is guaranteed to: start with "bps-", only contain [0-9A-Za-z-]
func RegisterPublisher ¶
func RegisterPublisher(scheme string, factory PublisherFactory)
RegisterPublisher registers a new protocol with a scheme and a corresponding PublisherFactory.
func RegisterSubscriber ¶
func RegisterSubscriber(scheme string, factory SubscriberFactory)
RegisterSubscriber registers a new protocol with a scheme and a corresponding SubscriberFactory.
Types ¶
type Handler ¶
type Handler interface {
Handle(SubMessage)
}
Handler defines a message handler. Consuming can be stopped by returning bps.Done.
func SafeHandler ¶ added in v0.1.0
SafeHandler wraps a handler with a mutex to synchronize access. It is intended to be used only by subscriber implementations which need it. It shouldn't be used by lib consumer.
type HandlerFunc ¶
type HandlerFunc func(SubMessage)
HandlerFunc is a func-based handler adapter.
func (HandlerFunc) Handle ¶
func (f HandlerFunc) Handle(msg SubMessage)
Handle handles a single message.
type InMemPubTopic ¶
type InMemPubTopic struct {
// contains filtered or unexported fields
}
InMemPubTopic is an in-memory implementation of a Topic. Useful for tests.
func (*InMemPubTopic) Messages ¶
func (t *InMemPubTopic) Messages() []*PubMessage
Messages returns published messages.
func (*InMemPubTopic) Publish ¶
func (t *InMemPubTopic) Publish(_ context.Context, msg *PubMessage) error
Publish implements Topic.
type InMemPublisher ¶
type InMemPublisher struct {
// contains filtered or unexported fields
}
InMemPublisher is an in-memory publisher implementation which can be used for tests.
func NewInMemPublisher ¶
func NewInMemPublisher() *InMemPublisher
NewInMemPublisher returns an initialised publisher.
func (*InMemPublisher) Topic ¶
func (p *InMemPublisher) Topic(name string) PubTopic
Topic implements Publisher interface. It will auto-provision a topic if it does not exist.
type InMemSubTopic ¶
type InMemSubTopic struct {
// contains filtered or unexported fields
}
InMemSubTopic is a subscriber topic handle, that consumes messages from seeded data. It is useful mainly for testing.
func NewInMemSubTopic ¶
func NewInMemSubTopic(msgs []SubMessage) *InMemSubTopic
NewInMemSubTopic returns new seeded in-memory subscriber topic handle.
func (*InMemSubTopic) Subscribe ¶
func (s *InMemSubTopic) Subscribe(handler Handler, _ ...SubOption) (Subscription, error)
Subscribe subscribes to in-memory messages by topic. It starts handling from the first (oldest) available message.
type InMemSubscriber ¶
type InMemSubscriber struct {
// contains filtered or unexported fields
}
InMemSubscriber is a subscriber, that consumes messages from seeded data. It is useful mainly for testing.
func NewInMemSubscriber ¶
func NewInMemSubscriber(messagesByTopic map[string][]SubMessage) *InMemSubscriber
NewInMemSubscriber returns new subscriber, that consumes messages from seeded data.
func (*InMemSubscriber) Close ¶
func (s *InMemSubscriber) Close() error
Close forgets seeded messages.
func (*InMemSubscriber) Replace ¶ added in v0.2.0
func (s *InMemSubscriber) Replace(messagesByTopic map[string][]SubMessage)
Replace replaces messages. It does not affect already used topics (they will return messages, available before Replace).
func (*InMemSubscriber) Topic ¶
func (s *InMemSubscriber) Topic(topic string) SubTopic
Topic returns named topic handle. Seeded messages are copied for each topic handle
type PubMessage ¶
type PubMessage struct {
// ID is an optional message identifier.
// It may not be supported by some implementations (then it is ignored).
// Or may be used just to calculate partition the message.
ID string `json:"id,omitempty"`
// Data is the message payload.
Data []byte `json:"data,omitempty"`
// Attributes contains optional key-value labels.
// It may not be supported by some implementations (then it is ignored).
Attributes map[string]string `json:"attributes,omitempty"`
}
PubMessage represents a single message for publishing.
type PubTopic ¶
type PubTopic interface {
// Publish publishes a message to the topic.
Publish(context.Context, *PubMessage) error
}
PubTopic is a publisher handle to a topic.
type Publisher ¶
type Publisher interface {
// Topic returns a topic handle by name.
Topic(name string) PubTopic
// Close closes the producer connection.
Close() error
}
Publisher defines the main publisher interface.
Example ¶
package main
import (
"context"
"fmt"
"github.com/bsm/bps"
)
func main() {
ctx := context.Background()
pub := bps.NewInMemPublisher()
defer pub.Close()
topicA := pub.Topic("topic-a")
topicB := pub.Topic("topic-b")
topicA.Publish(ctx, &bps.PubMessage{
Data: []byte("message-1"),
})
topicB.Publish(ctx, &bps.PubMessage{
Data: []byte("message-2"),
})
topicA.Publish(ctx, &bps.PubMessage{
Data: []byte("message-2"),
})
fmt.Println(len(topicA.(*bps.InMemPubTopic).Messages()))
fmt.Println(len(topicB.(*bps.InMemPubTopic).Messages()))
}
Output: 2 1
type PublisherFactory ¶
PublisherFactory constructs a publisher from a URL.
type RawSubMessage ¶
type RawSubMessage []byte
RawSubMessage is an adapter for raw slice of bytes that behaves as a SubMessage.
type StartPosition ¶
type StartPosition string
StartPosition defines starting position to consume messages.
const ( // PositionNewest tells to start consuming messages from the newest available // (published AFTER subscribing). PositionNewest StartPosition = "newest" // PositionOldest tells to start consuming messages from the oldest available // (published BEFORE subscribing). PositionOldest StartPosition = "oldest" )
StartPosition options.
type SubMessage ¶
type SubMessage interface {
// Data returns raw (serialized) message data.
Data() []byte
}
SubMessage defines a subscription message details.
type SubOption ¶
type SubOption func(*SubOptions)
SubOption defines a single subscription option.
func IgnoreSubscriptionErrors ¶
func IgnoreSubscriptionErrors() SubOption
IgnoreSubscriptionErrors configures subscription to silently ignore errors.
func StartAt ¶
func StartAt(pos StartPosition) SubOption
StartAt configures subscription start position.
func WithErrorHandler ¶
WithErrorHandler configures subscription error handler.
type SubOptions ¶
type SubOptions struct {
// StartAt defines starting position to consume messages.
// May not be supported by some implementations.
// Default: implementation-specific (PositionNewest is recommended).
StartAt StartPosition
// ErrorHandler is a subscription error handler (system/implementation-specific errors).
// Default: log errors to STDERR.
ErrorHandler func(error)
}
SubOptions holds subscription options.
func (*SubOptions) Apply ¶
func (o *SubOptions) Apply(options []SubOption) *SubOptions
Apply configures SubOptions struct by applying each single SubOption one by one.
It is meant to be used by pubsub implementations like this:
func (s *SubImpl) Subscribe(..., options ...bps.SubOption) error {
opts := (&bps.SubOptions{
// implementation-specific defaults
}).Apply(options)
...
}
type SubTopic ¶
type SubTopic interface {
// Subscribe subscribes for topic messages and handles them in background
// till error occurs or bps.Done is returned.
// Handler is guaranteed to be called synchronously (messages are handled one by one).
Subscribe(handler Handler, opts ...SubOption) (Subscription, error)
}
SubTopic defines a subscriber topic handle.
type Subscriber ¶
type Subscriber interface {
// Topic returns a subscriber topic handle.
Topic(name string) SubTopic
// Close closes the subscriber connection.
Close() error
}
Subscriber defines the main subscriber interface.
Example ¶
package main
import (
"fmt"
"time"
"github.com/bsm/bps"
)
func main() {
subscriber := bps.NewInMemSubscriber(
map[string][]bps.SubMessage{
"foo": []bps.SubMessage{
bps.RawSubMessage("foo1"),
bps.RawSubMessage("foo2"),
},
},
)
defer subscriber.Close()
subscription, err := subscriber.Topic("foo").Subscribe(
bps.HandlerFunc(func(msg bps.SubMessage) {
fmt.Printf("%s\n", msg.Data())
}),
)
if err != nil {
panic(err.Error())
}
defer subscription.Close()
time.Sleep(time.Second) // wait to receive some messages
}
Output: foo1 foo2
func NewSubscriber ¶
func NewSubscriber(ctx context.Context, urlStr string) (Subscriber, error)
NewSubscriber inits to a subscriber via URL.
sub, err := bps.NewSubscriber(context.TODO(), "kafka://10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092/namespace")
type SubscriberFactory ¶
SubscriberFactory constructs a subscriber from a URL.
type Subscription ¶
type Subscription interface {
// Close stops message handling and frees resources.
// It is safe to be called multiple times.
Close() error
}
Subscription defines a subscription-manager interface.