pubsub

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 3, 2025 License: MIT Imports: 15 Imported by: 2

README ยถ

Generic Pub/Sub Library

Go Version License Go Report Card

A comprehensive, production-ready Go library that provides a unified interface for publish-subscribe messaging across multiple cloud providers and messaging services. This library abstracts the complexity of different pub/sub implementations while providing advanced features like batch publishing, health monitoring, and graceful shutdown.

๐Ÿš€ Features

Core Functionality
  • Universal Interface: Single API that works with multiple pub/sub providers
  • Single & Batch Publishing: Publish individual messages or batches for improved performance
  • Asynchronous Consumption: Non-blocking message consumption with multiple listeners
  • Message Acknowledgment: Reliable message processing with acknowledgment support
  • Health Monitoring: Built-in health checks and connectivity monitoring
  • Graceful Shutdown: Proper resource cleanup and coordinated shutdown
Supported Providers
  • โœ… PubNub: Real-time messaging with global distribution
  • โœ… Google Cloud Pub/Sub: Scalable, fully-managed messaging service
  • ๐Ÿ”„ More providers coming soon (AWS SNS/SQS, Azure Service Bus, Apache Kafka)
Advanced Features
  • Connection Management: Automatic reconnection with configurable backoff policies
  • Error Handling: Comprehensive error categorization and monitoring
  • Structured Logging: Integration with structured logging libraries
  • Metrics Collection: Detailed publishing and consumption metrics
  • Concurrent Processing: Optimized for high-throughput scenarios
  • Production Ready: Battle-tested patterns for enterprise environments

๐Ÿ“ฆ Installation

go get github.com/piyushkumar96/generic-pubsub

๐Ÿ Quick Start

PubNub Example
package main

import (
    "context"
    "log"
    
    ae "github.com/piyushkumar96/app-error"
    l "github.com/piyushkumar96/generic-logger"
    pubsub "github.com/piyushkumar96/generic-pubsub"
)

func main() {
    // Create error channel
    errorChan := make(chan *ae.AppError, 5)
    
    // Create PubNub client
    client := pubsub.NewPubNubClient(&pubsub.PubNubOptions{
        UUID:             "your-unique-client-id",
        PublishKey:       "your-publish-key",
        SubscribeKey:     "your-subscribe-key",
        PublishChannel:   "my-channel",
        SubscriptionList: []string{"my-channel"},
        IsSSLSecure:      true,
        MaxRetry:         3,
    }, l.Logger, errorChan)
    
    ctx := context.Background()
    
    // Publish a message
    message := []byte(`{"hello": "world"}`)
    txnData, err := client.Publish(ctx, message)
    if err != nil {
        log.Fatalf("Failed to publish: %v", err)
    }
    
    log.Printf("Published message ID: %s", txnData.EventID)
}
GCP Pub/Sub Example
package main

import (
    "context"
    "log"
    
    ae "github.com/piyushkumar96/app-error"
    l "github.com/piyushkumar96/generic-logger"
    pubsub "github.com/piyushkumar96/generic-pubsub"
)

func main() {
    // Create error channel
    errorChan := make(chan *ae.AppError, 5)
    
    // Create GCP client
    client, err := pubsub.NewGCPPubSubClient(context.Background(), &pubsub.GCPPubSubOptions{
        ProjectID:      "your-gcp-project",
        TopicID:        "your-topic",
        SubscriptionID: "your-subscription",
        CredsFilePath:  "/path/to/service-account.json",
    }, l.Logger, errorChan)
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }
    defer client.Teardown(context.Background())
    
    // Publish a message
    message := []byte(`{"hello": "world"}`)
    txnData, err := client.Publish(context.Background(), message)
    if err != nil {
        log.Fatalf("Failed to publish: %v", err)
    }
    
    log.Printf("Published message ID: %s", txnData.EventID)
}

๐Ÿ“š Comprehensive Examples

We provide detailed examples for different use cases and complexity levels:

๐Ÿ”ฐ Getting Started
๐Ÿ“– Example Features
Feature PubNub Example GCP Pub/Sub Example
Single Message Publishing โœ… โœ…
Batch Message Publishing โœ… (5 messages) โœ… (8 messages)
Multiple Listeners โœ… (2 listeners) โœ… (2 listeners)
Message Acknowledgment โœ… Message Actions โœ… Native Ack
Health Monitoring โœ… Heartbeat โœ… Topic Existence
Graceful Shutdown โœ… Signal Handling โœ… Coordinated Cleanup
Error Monitoring โœ… Categorized โœ… Production Patterns
Metrics Collection โœ… Detailed Stats โœ… Batch Analytics

๐Ÿ”ง API Reference

Core Interface
type IPubSub interface {
    // Publish sends a single message
    Publish(ctx context.Context, msg []byte) (EventTxnData, *ae.AppError)
    
    // PublishBatch sends multiple messages
    PublishBatch(ctx context.Context, msgs [][]byte) ([]EventTxnData, *ae.AppError)
    
    // Listen starts consuming messages
    Listen(ctx context.Context) *ae.AppError
    
    // ListenWithWait integrates with WaitGroup
    ListenWithWait(ctx context.Context, wg *sync.WaitGroup) *ae.AppError
    
    // AcknowledgeMessage confirms message processing
    AcknowledgeMessage(ctx context.Context, msgID string) *ae.AppError
    
    // CheckHealth verifies client connectivity
    CheckHealth(ctx context.Context) (bool, *ae.AppError)
    
    // Teardown cleans up resources
    Teardown(ctx context.Context)
}
Data Structures
EventTxnData

Contains comprehensive metadata about published messages:

type EventTxnData struct {
    EventID            string        // Unique message identifier
    SequenceNo         int           // Position in batch (for batch operations)
    IsPublished        bool          // Success status
    MessageSizeInBytes int           // Message payload size
    TimeTakenToPublish time.Duration // Publishing latency
    Timestamp          int64         // Unix timestamp
    Error              error         // Error if publishing failed
}
ConsumedMessage

Represents received messages with metadata:

type ConsumedMessage struct {
    Data []byte                    // Raw message payload
    Meta map[string]interface{}    // Provider-specific metadata
}

โš™๏ธ Configuration

PubNub Options
type PubNubOptions struct {
    UUID                    string                    // Client identifier
    PublishKey              string                    // PubNub publish key
    SubscribeKey            string                    // PubNub subscribe key
    SecretKey               string                    // Optional secret key
    IsSSLSecure             bool                      // Enable SSL/TLS
    PublishChannel          string                    // Default publish channel
    SubscriptionList        []string                  // Channels to subscribe to
    BackoffPolicy           PubnubReconnectBackoff   // Reconnection strategy
    ConnectTimeoutInSec     int                      // Connection timeout
    MaxRetry                int32                    // Max retry attempts
    CloseListenersOnExit    bool                     // Auto-close on exit
    EnableDebugMode         bool                     // Debug logging
}
GCP Pub/Sub Options
type GCPPubSubOptions struct {
    ProjectID       string // GCP project identifier
    TopicID         string // Pub/Sub topic name
    SubscriptionID  string // Subscription name
    CredsFilePath   string // Service account key path
    EnableDebugMode bool   // Debug logging
}

๐Ÿ”’ Authentication & Security

PubNub
  • API Keys: Use publish/subscribe keys from PubNub console
  • SSL/TLS: Enable IsSSLSecure for encrypted connections
  • Secret Keys: Optional additional security layer
GCP Pub/Sub
  • Service Account: Use JSON key file for authentication
  • IAM Permissions: Ensure account has pubsub.editor or appropriate roles
  • Network Security: Configure VPC and firewall rules as needed

๐Ÿšฆ Error Handling

The library provides comprehensive error categorization:

General Errors (ERR_PS_1xxx)
  • PublishEvent: Single message publishing failure
  • PublishEventBatch: Batch publishing failure
  • ContextCancelled: Operation cancelled
  • HealthCheck: Health monitoring failure
PubNub Errors (ERR_PS_2xxx)
  • EmptySubscriptionList: No channels configured
  • AlreadyListening: Client already in listening state
  • PNCliBadRequest: Invalid request parameters
  • PNCliAccessDenied: Authentication failure
GCP Pub/Sub Errors (ERR_PS_3xxx)
  • GCPPSClientInit: Client initialization failure
  • GCPPSReceiveEvent: Message receiving failure
  • GCPPSClientTeardown: Cleanup failure

๐Ÿ“Š Monitoring & Observability

Built-in Metrics
  • Publishing Metrics: Message size, latency, success rates
  • Consumption Metrics: Processing time, acknowledgment rates
  • Connection Metrics: Health status, reconnection events
  • Error Metrics: Categorized error counts and rates
Logging Integration

The library integrates with structured logging libraries:

import l "github.com/piyushkumar96/generic-logger"

// All operations include structured logging
client := pubsub.NewPubNubClient(options, l.Logger, errorChan)

๐Ÿ—๏ธ Production Best Practices

Resource Management
// Always use context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// Proper cleanup
defer client.Teardown(ctx)
Error Monitoring
// Monitor errors in separate goroutine
go func() {
    for err := range errorChan {
        log.Printf("Pub/Sub Error: %s (Code: %s)", err.GetMsg(), err.GetErrCode())
        // Send to monitoring system, implement retry logic, etc.
    }
}()
Graceful Shutdown
// Handle shutdown signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

select {
case <-sigChan:
    log.Println("Shutdown signal received")
    client.Teardown(ctx)
}

๐Ÿงช Testing

The library includes comprehensive mocks for testing:

import "github.com/piyushkumar96/generic-pubsub/mocks"

func TestMyService(t *testing.T) {
    mockPubSub := mocks.NewMockIPubSub(t)
    mockPubSub.EXPECT().Publish(mock.Anything, mock.Anything).Return(
        pubsub.EventTxnData{EventID: "test-id", IsPublished: true}, nil)
    
    // Test your service with the mock
}

๐Ÿค Contributing

We welcome contributions! Here's how you can help:

  1. Fork the Repository
  2. Create a Feature Branch: git checkout -b feature/amazing-feature
  3. Commit Changes: git commit -m 'Add amazing feature'
  4. Push to Branch: git push origin feature/amazing-feature
  5. Open a Pull Request
Development Guidelines
  • Write Tests: Ensure good test coverage for new features
  • Document Code: Add comprehensive comments and documentation
  • Follow Conventions: Use existing code style and patterns
  • Update Examples: Add examples for new features

๐Ÿ“‹ Requirements

  • Go: Version 1.21 or higher
  • Dependencies: See go.mod for complete list
Key Dependencies
  • cloud.google.com/go/pubsub - GCP Pub/Sub client
  • github.com/pubnub/go/v7 - PubNub Go SDK
  • github.com/piyushkumar96/app-error - Structured error handling
  • github.com/piyushkumar96/generic-logger - Logging interface

๐Ÿ“Š Performance Characteristics

Throughput Benchmarks
  • PubNub: Up to 1000 messages/second per client
  • GCP Pub/Sub: Up to 10,000 messages/second per client (batch mode)
Latency
  • PubNub: ~50-100ms global latency
  • GCP Pub/Sub: ~10-50ms within region
Resource Usage
  • Memory: ~10MB base + ~1KB per queued message
  • CPU: Low overhead with goroutine-based concurrency
  • Network: Efficient batching reduces connection overhead

๐Ÿ—บ๏ธ Roadmap

Upcoming Features
  • AWS SNS/SQS Support: Amazon messaging services
  • Azure Service Bus: Microsoft cloud messaging
  • Apache Kafka: Distributed streaming platform
  • Redis Streams: Redis-based messaging
  • NATS: Cloud-native messaging system
Enhancements
  • Circuit Breaker: Fault tolerance patterns
  • Rate Limiting: Built-in rate limiting capabilities
  • Message Encryption: End-to-end encryption support
  • Schema Registry: Message schema validation
  • Dead Letter Queues: Failed message handling

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿ™ Acknowledgments

  • PubNub Team: For providing excellent real-time messaging infrastructure
  • Google Cloud: For the robust and scalable Pub/Sub service
  • Go Community: For the amazing ecosystem and tools
  • Contributors: Everyone who has contributed to making this library better

๐Ÿ“ž Support

  • Documentation: Check our comprehensive examples and API reference
  • Issues: Report bugs or request features via GitHub Issues
  • Discussions: Join community discussions for help and best practices

Made with โค๏ธ for the Go community

Documentation ยถ

Overview ยถ

Package pubsub defines error constants used throughout the pub/sub implementations. These errors are categorized by functionality and provide structured error codes for better error handling and debugging.

Package pubsub provides a generic publish-subscribe interface that can be implemented by different messaging systems like GCP PubSub, PubNub, etc. This package defines the core interfaces and data structures for pub/sub operations.

Package pubsub provides utility functions for structured logging throughout the pub/sub library. These functions standardize log formatting and ensure consistent logging patterns across different pub/sub implementations.

Index ยถ

Constants ยถ

This section is empty.

Variables ยถ

View Source
var (

	// PublishEvent indicates a failure to publish a single message
	PublishEvent = ae.GetCustomErr("ERR_PS_1001",
		"failed to publish event", false)

	// PublishEventBatch indicates a failure to publish multiple messages in batch
	PublishEventBatch = ae.GetCustomErr("ERR_PS_1002",
		"failed to publish batch events",
		false)

	// ContextCancelled indicates the operation was cancelled due to context cancellation
	ContextCancelled = ae.GetCustomErr("ERR_PS_1003",
		"context already cancelled",
		true)

	// CliConnectionTimeout indicates a client connection timeout occurred
	CliConnectionTimeout = ae.GetCustomErr("ERR_PS_1004",
		"client connection timed out",
		true)

	// CliDisconnected indicates the client has been disconnected
	CliDisconnected = ae.GetCustomErr("ERR_PS_1005",
		"client disconnected",
		false)

	// UnhandledMessageDataType indicates an unsupported message data type was received
	UnhandledMessageDataType = ae.GetCustomErr("ERR_PS_1006",
		"unhandled data type for message, skipping further processing",
		false)

	// HealthCheck indicates a health check operation failed
	HealthCheck = ae.GetCustomErr("ERR_PS_1007",
		"failed to check health",
		true)

	// ClientIsUnHealthy indicates the pub/sub client is in an unhealthy state
	ClientIsUnHealthy = ae.GetCustomErr("ERR_PS_1008",
		"pubsub client is un healthy",
		false)

	// AcknowledgeMessage indicates a message acknowledgment operation failed
	AcknowledgeMessage = ae.GetCustomErr("ERR_PS_1009",
		"failed to acknowledge message",
		false)

	// PNCliConnectionDestroyed indicates PubNub connection is being destroyed
	PNCliConnectionDestroyed = ae.GetCustomErr("ERR_PS_2001",
		"destroying pubnub connection",
		false)

	// PNCliClosingListenerChannels indicates PubNub listener channels are being closed
	PNCliClosingListenerChannels = ae.GetCustomErr("ERR_PS_2002",
		"closing pubnub listener channels",
		false)

	// EmptySubscriptionList indicates no channels were provided for subscription
	EmptySubscriptionList = ae.GetCustomErr("ERR_PS_2003",
		"empty subscription list",
		false)

	// AlreadyListening indicates the client is already in listening mode
	AlreadyListening = ae.GetCustomErr("ERR_PS_2004",
		"already listening for messages",
		false)

	// NoChannelsListeningForEvents indicates no listener channels are configured
	NoChannelsListeningForEvents = ae.GetCustomErr("ERR_PS_2005",
		"no channels listening for events",
		false)

	// PNCliBadRequest indicates PubNub returned a bad request error
	PNCliBadRequest = ae.GetCustomErr("ERR_PS_2006",
		"pubnub client bad request",
		false)

	// PNCliAccessDenied indicates PubNub access was denied (authentication/authorization error)
	PNCliAccessDenied = ae.GetCustomErr("ERR_PS_2007",
		"pubnub client access denied",
		false)

	// PNCliSubsLoopStopped indicates PubNub subscription loop has stopped unexpectedly
	PNCliSubsLoopStopped = ae.GetCustomErr("ERR_PS_2008",
		"pubnub client subscription loop stopped",
		false)

	// PNCliReqMessageCountExceeded indicates PubNub request message count limit was exceeded
	PNCliReqMessageCountExceeded = ae.GetCustomErr("ERR_PS_2009",
		"pubnub client request message count exceeded the limit",
		false)

	// PNUnknownCategory indicates PubNub sent an unknown status category
	PNUnknownCategory = ae.GetCustomErr("ERR_PS_2010",
		"pubnub client received unknown category message",
		false)

	// PNCancelledCategory indicates PubNub request was cancelled
	PNCancelledCategory = ae.GetCustomErr("ERR_PS_2011",
		"pubnub client request cancelled",
		false)

	// GCPPSClientInit indicates an error occurred while initializing the GCP Pub/Sub client
	GCPPSClientInit = ae.GetCustomErr("ERR_PS_3000",
		"error while initiating gcp pubsub client",
		false)

	// GCPPSReceiveEvent indicates an error occurred while receiving messages from GCP Pub/Sub
	GCPPSReceiveEvent = ae.GetCustomErr("ERR_PS_3001",
		"error while receiving event",
		false)

	// GCPPSClientTeardown indicates an error occurred while tearing down the GCP Pub/Sub client
	GCPPSClientTeardown = ae.GetCustomErr("ERR_PS_3002",
		"error while tearing down gcp pubsub client",
		false)
)

Functions ยถ

This section is empty.

Types ยถ

type ConsumedMessage ยถ

type ConsumedMessage struct {
	// Data is the raw message payload as received from the pub/sub system
	Data []byte

	// Meta contains additional metadata about the message
	// This may include message ID, publish time, topic info, etc.
	// The exact contents depend on the pub/sub implementation
	Meta map[string]interface{}
}

ConsumedMessage represents a message received from the pub/sub system. It contains both the message payload and associated metadata.

type EventTxnData ยถ

type EventTxnData struct {
	// EventID is the unique identifier of the published message
	// This is typically provided by the pub/sub system
	EventID string

	// SequenceNo is the index number within a batch of messages
	// Used to correlate messages in batch operations
	SequenceNo int

	// IsPublished indicates whether the message was successfully published
	IsPublished bool

	// MessageSizeInBytes is the size of the message payload in bytes
	MessageSizeInBytes int

	// TimeTakenToPublish is the duration it took to publish the message
	TimeTakenToPublish time.Duration

	// Timestamp is the Unix timestamp when the message was published
	Timestamp int64

	// Error contains any error that occurred during publishing
	// This field is set when IsPublished is false
	Error error
}

EventTxnData contains metadata about a published message. This structure provides comprehensive information about the publication transaction.

type GCPPubSubClient ยถ

type GCPPubSubClient struct {
	// contains filtered or unexported fields
}

GCPPubSubClient implements the IPubSub interface for Google Cloud Pub/Sub. It provides methods to publish and consume messages using GCP Pub/Sub service. The client handles connection management, message acknowledgment, and error handling.

func NewGCPPubSubClient ยถ

func NewGCPPubSubClient(ctx context.Context, opts *GCPPubSubOptions, logger l.ILogger, listenError chan<- *ae.AppError) (*GCPPubSubClient, *ae.AppError)

NewGCPPubSubClient creates a new GCP Pub/Sub client instance. It initializes the client with the provided options and establishes connection to GCP.

Parameters:

  • ctx: Context for the operation (used for timeout and cancellation)
  • opts: Configuration options including project ID, topic, subscription, and credentials
  • logger: Logger instance for structured logging
  • listenError: Channel to receive errors during message consumption

Returns:

  • *GCPPubSubClient: Initialized client ready for pub/sub operations
  • *ae.AppError: Error if client initialization fails

func (*GCPPubSubClient) AcknowledgeMessage ยถ

func (p *GCPPubSubClient) AcknowledgeMessage(ctx context.Context, msgID string) *ae.AppError

AcknowledgeMessage acknowledges a processed message, removing it from the subscription. The message must have been previously received and cached by this client.

Parameters:

  • ctx: Context for request cancellation and timeout
  • msgID: Unique identifier of the message to acknowledge

Returns:

  • *ae.AppError: Error if acknowledgment fails or message not found

func (*GCPPubSubClient) AddListener ยถ

func (p *GCPPubSubClient) AddListener(listener ...chan<- *ConsumedMessage)

AddListener adds one or more listener channels to receive consumed messages. Messages received from the subscription will be sent to all registered listeners.

Parameters:

  • listener: One or more channels that will receive ConsumedMessage instances

func (*GCPPubSubClient) CheckHealth ยถ

func (p *GCPPubSubClient) CheckHealth(ctx context.Context) (bool, *ae.AppError)

CheckHealth verifies that the GCP Pub/Sub client is healthy by checking topic existence. This is a simple health check that validates connectivity to GCP Pub/Sub.

Parameters:

  • ctx: Context for request cancellation and timeout

Returns:

  • bool: True if healthy, false otherwise
  • *ae.AppError: Error if health check fails

func (*GCPPubSubClient) Listen ยถ

func (p *GCPPubSubClient) Listen(ctx context.Context) *ae.AppError

Listen starts listening for messages from the GCP Pub/Sub subscription. This is a non-blocking call that starts the listening process in a goroutine.

Parameters:

  • ctx: Context for request cancellation and timeout

Returns:

  • *ae.AppError: Error if listening setup fails

func (*GCPPubSubClient) ListenWithWait ยถ

func (p *GCPPubSubClient) ListenWithWait(ctx context.Context, wg *sync.WaitGroup) *ae.AppError

ListenWithWait starts listening for messages and integrates with a WaitGroup. The WaitGroup's Done() method is called when the function exits, allowing coordinated shutdown with other goroutines.

Parameters:

  • ctx: Context for request cancellation and timeout
  • wg: WaitGroup to coordinate with (Done() called on exit)

Returns:

  • *ae.AppError: Error if listening setup fails

func (*GCPPubSubClient) Publish ยถ

func (p *GCPPubSubClient) Publish(ctx context.Context, msg []byte) (EventTxnData, *ae.AppError)

Publish sends a single message to the configured GCP Pub/Sub topic. It captures timing and size metrics for the publish operation.

Parameters:

  • ctx: Context for request cancellation and timeout
  • msg: Message payload as byte slice

Returns:

  • EventTxnData: Contains publish metadata including message ID and timing
  • *ae.AppError: Error if publishing fails

func (*GCPPubSubClient) PublishBatch ยถ

func (p *GCPPubSubClient) PublishBatch(ctx context.Context, msgs [][]byte) ([]EventTxnData, *ae.AppError)

PublishBatch publishes multiple messages concurrently to the GCP Pub/Sub topic. Each message is published in its own goroutine for optimal performance.

Parameters:

  • ctx: Context for request cancellation and timeout
  • msgs: Slice of message payloads, each as byte slice

Returns:

  • []EventTxnData: Slice containing publish metadata for each message
  • *ae.AppError: Error if any messages fail to publish

func (*GCPPubSubClient) SetListener ยถ

func (p *GCPPubSubClient) SetListener(listener []chan<- *ConsumedMessage)

SetListener replaces all existing listener channels with the provided ones. This completely overwrites the current listener list.

Parameters:

  • listener: Slice of channels that will receive ConsumedMessage instances

func (*GCPPubSubClient) Teardown ยถ

func (p *GCPPubSubClient) Teardown(ctx context.Context)

Teardown cleanly shuts down the GCP Pub/Sub client and releases all resources. This includes closing listener channels and shutting down the underlying client. Should be called when the client is no longer needed.

Parameters:

  • ctx: Context for request cancellation and timeout

type GCPPubSubOptions ยถ

type GCPPubSubOptions struct {
	// ProjectID is the GCP project identifier where the Pub/Sub resources exist
	ProjectID string

	// TopicID is the name of the Pub/Sub topic for publishing messages
	TopicID string

	// SubscriptionID is the name of the subscription for consuming messages
	SubscriptionID string

	// EnableDebugMode enables verbose logging for debugging purposes
	EnableDebugMode bool

	// CredsFilePath is the path to the GCP service account credentials JSON file
	CredsFilePath string
}

GCPPubSubOptions contains configuration options for the GCP Pub/Sub client. These options are used to establish connection and configure the client behavior.

type IPubSub ยถ

type IPubSub interface {
	// Publish sends a single message to the pub/sub system.
	// ctx: Context for request cancellation and timeout
	// msg: Message data as byte slice
	// Returns: EventTxnData containing publish metadata, and error if any
	Publish(ctx context.Context, msg []byte) (EventTxnData, *ae.AppError)

	// PublishBatch sends multiple messages to the pub/sub system.
	// ctx: Context for request cancellation and timeout
	// msgs: Slice of message data (each as byte slice)
	// Returns: Slice of EventTxnData for each message, and error if any
	PublishBatch(ctx context.Context, msgs [][]byte) ([]EventTxnData, *ae.AppError)

	// Listen starts listening for messages from the pub/sub system.
	// This is a non-blocking call that returns immediately.
	// ctx: Context for request cancellation and timeout
	// Returns: Error if failed to start listening
	Listen(ctx context.Context) *ae.AppError

	// ListenWithWait starts listening for messages and integrates with WaitGroup.
	// This allows coordinated shutdown with other goroutines.
	// ctx: Context for request cancellation and timeout
	// wg: WaitGroup to coordinate with (will call Done() when finished)
	// Returns: Error if failed to start listening
	ListenWithWait(ctx context.Context, wg *sync.WaitGroup) *ae.AppError

	// AcknowledgeMessage acknowledges that a message has been processed.
	// This removes the message from the subscription queue.
	// ctx: Context for request cancellation and timeout
	// msgID: Unique identifier of the message to acknowledge
	// Returns: Error if acknowledgment failed
	AcknowledgeMessage(ctx context.Context, msgID string) *ae.AppError

	// CheckHealth verifies that the pub/sub client is healthy and connected.
	// ctx: Context for request cancellation and timeout
	// Returns: True if healthy, false otherwise, plus any error
	CheckHealth(ctx context.Context) (bool, *ae.AppError)

	// Teardown cleans up resources and closes connections.
	// Should be called when the pub/sub client is no longer needed.
	// ctx: Context for request cancellation and timeout
	Teardown(ctx context.Context)
}

IPubSub is an interface that defines the methods for a publish-subscribe system. It provides a unified API for different messaging systems to implement.

Implementations should handle: - Message publishing (single and batch) - Message consumption and listening - Message acknowledgment - Health monitoring - Resource cleanup

type PubNubClient ยถ

type PubNubClient struct {
	// contains filtered or unexported fields
}

PubNubClient implements the IPubSub interface for PubNub messaging service. It provides methods to publish and consume messages using PubNub's real-time messaging platform. The client handles connection state, reconnection logic, and event processing.

func NewPubNubClient ยถ

func NewPubNubClient(pnOpts *PubNubOptions, logger l.ILogger, listenError chan<- *ae.AppError) *PubNubClient

NewPubNubClient creates a new PubNub client instance with the provided options. It configures the PubNub client with retry policies, timeouts, and authentication.

Parameters:

  • pnOpts: Configuration options including keys, channels, and connection settings
  • logger: Logger instance for structured logging
  • listenError: Channel to receive errors during message consumption

Returns:

  • *PubNubClient: Initialized PubNub client ready for pub/sub operations

func (*PubNubClient) AcknowledgeMessage ยถ

func (p *PubNubClient) AcknowledgeMessage(ctx context.Context, msgID string) *ae.AppError

AcknowledgeMessage acknowledges a message by adding a message action. This creates a receipt action to indicate the message has been processed.

Parameters:

  • ctx: Context for request cancellation and timeout
  • msgID: Message timetoken to acknowledge

Returns:

  • *ae.AppError: Error if acknowledgment fails

func (*PubNubClient) AddListener ยถ

func (p *PubNubClient) AddListener(listener ...chan<- *ConsumedMessage)

AddListener adds one or more listener channels to receive consumed messages. Messages received from subscriptions will be sent to all registered listeners.

Parameters:

  • listener: One or more channels that will receive ConsumedMessage instances

func (*PubNubClient) CheckHealth ยถ

func (p *PubNubClient) CheckHealth(ctx context.Context) (bool, *ae.AppError)

CheckHealth verifies that the PubNub client is healthy by sending a heartbeat. This method tests connectivity and responsiveness of the PubNub service.

Parameters:

  • ctx: Context for request cancellation and timeout

Returns:

  • bool: True if healthy, false otherwise
  • *ae.AppError: Error if health check fails

func (*PubNubClient) IsConnected ยถ

func (p *PubNubClient) IsConnected() bool

IsConnected returns whether the client is currently connected to PubNub. This method is thread-safe and provides read-only access to the connection state.

Returns:

  • bool: True if the client is connected, false otherwise

func (*PubNubClient) IsListening ยถ

func (p *PubNubClient) IsListening() bool

IsListening returns whether the client is currently listening for messages. This method is thread-safe and provides read-only access to the listening state.

Returns:

  • bool: True if the client is actively listening, false otherwise

func (*PubNubClient) Listen ยถ

func (p *PubNubClient) Listen(ctx context.Context) *ae.AppError

Listen starts listening for messages from the configured PubNub channels. This method delegates to the internal listen implementation.

Parameters:

  • ctx: Context for request cancellation and timeout

Returns:

  • *ae.AppError: Error if listening setup fails

func (*PubNubClient) ListenWithWait ยถ

func (p *PubNubClient) ListenWithWait(ctx context.Context, wg *sync.WaitGroup) *ae.AppError

ListenWithWait starts listening for messages and integrates with a WaitGroup. The WaitGroup's Done() method is called when the function exits.

Parameters:

  • ctx: Context for request cancellation and timeout
  • wg: WaitGroup to coordinate with (Done() called on exit)

Returns:

  • *ae.AppError: Error if listening setup fails

func (*PubNubClient) Publish ยถ

func (p *PubNubClient) Publish(ctx context.Context, msg []byte) (EventTxnData, *ae.AppError)

Publish sends a single message to the configured PubNub channel. It captures timing and size metrics for the publish operation.

Parameters:

  • ctx: Context for request cancellation and timeout
  • msg: Message payload as byte slice

Returns:

  • EventTxnData: Contains publish metadata including message ID and timing
  • *ae.AppError: Error if publishing fails

func (*PubNubClient) PublishBatch ยถ

func (p *PubNubClient) PublishBatch(ctx context.Context, msgs [][]byte) ([]EventTxnData, *ae.AppError)

PublishBatch publishes multiple messages sequentially to the PubNub channel. Each message is published one after another to maintain order.

Parameters:

  • ctx: Context for request cancellation and timeout
  • msgs: Slice of message payloads, each as byte slice

Returns:

  • []EventTxnData: Slice containing publish metadata for each message
  • *ae.AppError: Error if any messages fail to publish

func (*PubNubClient) SetListener ยถ

func (p *PubNubClient) SetListener(listener []chan<- *ConsumedMessage)

SetListener replaces all existing listener channels with the provided ones. This completely overwrites the current listener list.

Parameters:

  • listener: Slice of channels that will receive ConsumedMessage instances

func (*PubNubClient) Subscribe ยถ

func (p *PubNubClient) Subscribe(channel ...string)

Subscribe subscribes to one or more PubNub channels. This method adds the specified channels to the active subscription list.

Parameters:

  • channel: One or more channel names to subscribe to

func (*PubNubClient) Teardown ยถ

func (p *PubNubClient) Teardown(ctx context.Context)

Teardown cleanly shuts down the PubNub client and releases all resources. It optionally closes listener channels, destroys the connection, and resets client state. Once this method completes, the client instance should not be used.

Parameters:

  • ctx: Context for request cancellation and timeout

type PubNubOptions ยถ

type PubNubOptions struct {
	// UUID is a UTF-8 encoded, unique string of up to 64 characters used to identify
	// a single client (end user, device, or server) that connects to PubNub.
	UUID string

	// PublishKey is the PubNub publish key for your application
	PublishKey string

	// SubscribeKey is the PubNub subscribe key for your application
	SubscribeKey string

	// SecretKey is the PubNub secret key for additional security (optional)
	SecretKey string

	// IsSSLSecure enables SSL/TLS encryption for connections (default: false)
	IsSSLSecure bool

	// PublishChannel is the channel name where events will be published
	PublishChannel string

	// SubscriptionList contains the list of channels to listen on for messages
	SubscriptionList []string

	// BackoffPolicy defines the backoff logic to apply when reconnecting to the PubNub server
	// in case of a connection loss. Options are Linear or Exponential backoff.
	BackoffPolicy PubnubReconnectBackoff

	// ConnectTimeoutInSec is the maximum duration until we give up on trying to connect.
	// This must be specified in multiples of seconds with a minimum of 1 second.
	ConnectTimeoutInSec int

	// MaxRetry is the maximum number of times to attempt to retry connecting to the
	// PubNub server. This applies only to the initial connection; it won't make
	// PubNub reconnect if the connection gets interrupted after a successful connection.
	//
	// Use -1 for infinite retry, 0 for no retry, and any positive value for that
	// many count of retries.
	MaxRetry int32

	// CloseListenersOnExit determines whether to close the channels listening for
	// PubNub messages when the monitor thread dies.
	CloseListenersOnExit bool

	// EnableDebugMode specifies whether to enable the internal logger of the PubNub
	// library for debugging purposes.
	EnableDebugMode bool
}

PubNubOptions contains configuration options for the PubNub client. These options control connection behavior, retry logic, and channel configuration.

type PubnubReconnectBackoff ยถ

type PubnubReconnectBackoff int32

PubnubReconnectBackoff defines the reconnection backoff strategy for PubNub client.

const (
	// PubnubReconnectBackoffLinear applies linear backoff when reconnecting
	PubnubReconnectBackoffLinear PubnubReconnectBackoff = iota

	// PubnubReconnectBackoffExponential applies exponential backoff when reconnecting
	PubnubReconnectBackoffExponential
)

Directories ยถ

Path Synopsis
Package pubsub defines constants used throughout the pub/sub implementations.
Package pubsub defines constants used throughout the pub/sub implementations.
examples
gcp-pubsub command
Package main demonstrates comprehensive usage of the generic-pubsub library with GCP Pub/Sub.
Package main demonstrates comprehensive usage of the generic-pubsub library with GCP Pub/Sub.
pubnub command
Package main demonstrates comprehensive usage of the generic-pubsub library with PubNub.
Package main demonstrates comprehensive usage of the generic-pubsub library with PubNub.