kafkareceiver

package module
v0.137.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 6, 2025 License: Apache-2.0 Imports: 40 Imported by: 20

README

Kafka Receiver

Status
Stability development: profiles
beta: metrics, logs, traces
Distributions core, contrib
Issues Open issues Closed issues
Code coverage codecov
Code Owners @pavolloffay, @MovieStoreGuy, @axw

Kafka receiver receives telemetry data from Kafka, with configurable topics and encodings.

If used in conjunction with the kafkaexporter configured with include_metadata_keys. The Kafka receiver will also propagate the Kafka headers to the downstream pipeline, giving access to the rest of the pipeline to arbitrary metadata keys and values.

Getting Started

[!NOTE] You can opt out of using the franz-go client by disabling the feature gate receiver.kafkareceiver.UseFranzGo when you run the OpenTelemetry Collector. See the following page for more details: Feature Gates

The franz-go client supports directly consuming from multiple topics by specifying a regex expression. To enable this feature, prefix your topic with the ^ character. This is identical to how the librdkafka client works.

If you use the ^ prefix, in the deprecated topic setting, if any of the topics have the ^ prefix, regex consuming will be enabled.

There are no required settings.

The following settings can be optionally configured:

  • brokers (default = localhost:9092): The list of kafka brokers.
  • protocol_version (default = 2.1.0): Kafka protocol version.
  • resolve_canonical_bootstrap_servers_only (default = false): Whether to resolve then reverse-lookup broker IPs during startup
  • logs
    • topic (default = otlp_logs): The name of the Kafka topic from which to consume logs.
    • encoding (default = otlp_proto): The encoding for the Kafka topic. See Supported encodings.
  • metrics
    • topic (default = otlp_metrics): The name of the Kafka topic from which to consume metrics.
    • encoding (default = otlp_proto): The encoding for the Kafka topic. See Supported encodings.
  • traces
    • topic (default = otlp_spans): The name of the Kafka topic from which to consume traces.
    • encoding (default = otlp_proto): The encoding for the Kafka topic. See Supported encodings.
  • profiles
    • topic (default = otlp_profiles): The name of the Kafka topic from which to consume profiles.
    • encoding (default = otlp_proto): The encoding for the Kafka topic. See Supported encodings.
  • topic (Deprecated [v0.124.0]: use logs::topic, traces::topic, or metrics::topic). If this is set, it will take precedence over the default value for those fields.
  • encoding (Deprecated [v0.124.0]: use logs::encoding, traces::encoding, or metrics::encoding). If this is set, it will take precedence over the default value for those fields.
  • group_id (default = otel-collector): The consumer group that receiver will be consuming messages from
  • client_id (default = otel-collector): The consumer client ID that receiver will use
  • rack_id (default = ""): The rack identifier for this client. When set and brokers are configured with a rack-aware replica selector, the client will prefer fetching from the closest replica.
  • use_leader_epoch (default = true): (Experimental) When enabled, the consumer uses the leader epoch returned by brokers (KIP-320) to detect log truncation. Setting this to false clears the leader epoch from fetch offsets, disabling KIP-320. Disabling can improve compatibility with brokers that don’t fully support leader epochs (e.g., Azure Event Hubs), at the cost of losing automatic log-truncation safety.
  • initial_offset (default = latest): The initial offset to use if no offset was previously committed. Must be latest or earliest.
  • session_timeout (default = 10s): The request timeout for detecting client failures when using Kafka’s group management facilities.
  • heartbeat_interval (default = 3s): The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities.
  • group_rebalance_strategy (default = range): This strategy is used to assign partitions to consumers within a consumer group. This setting determines how Kafka distributes topic partitions among the consumers in the group during rebalances. Supported strategies are:
    • range: This strategy assigns partitions to consumers based on a range. It aims to distribute partitions evenly across consumers, but it can lead to uneven distribution if the number of partitions is not a multiple of the number of consumers. For more information, refer to the Kafka RangeAssignor documentation, see RangeAssignor.
    • roundrobin: This strategy assigns partitions to consumers in a round-robin fashion. It ensures a more even distribution of partitions across consumers, especially when the number of partitions is not a multiple of the number of consumers. For more information, refer to the Kafka RoundRobinAssignor documentation, see RoundRobinAssignor.
    • sticky: This strategy aims to maintain the same partition assignments during rebalances as much as possible. It minimizes the number of partition movements, which can be beneficial for stateful consumers. For more information, refer to the Kafka StickyAssignor documentation, see StickyAssignor.
  • group_instance_id: A unique identifier for the consumer instance within a consumer group.
    • If set to a non-empty string, the consumer is treated as a static member of the group. This means that the consumer will maintain its partition assignments across restarts and rebalances, as long as it rejoins the group with the same group_instance_id.
    • If set to an empty string (or not set), the consumer is treated as a dynamic member. In this case, the consumer's partition assignments may change during rebalances.
    • Using a group_instance_id is useful for stateful consumers or when you need to ensure that a specific consumer instance is always assigned the same set of partitions.
  • min_fetch_size (default = 1): The minimum number of message bytes to fetch in a request, defaults to 1 byte.
  • default_fetch_size (default = 1048576): The default number of message bytes to fetch in a request, defaults to 1MB.
  • max_fetch_size (default = 0): The maximum number of message bytes to fetch in a request, defaults to unlimited.
  • max_fetch_wait (default = 250ms): The maximum amount of time the broker should wait for min_fetch_size bytes to be available before returning anyway.
  • max_partition_fetch_size (default = 1048576): The default number of message bytes to fetch in a request per partition, defaults to 1MB. If a single record batch is larger than this value, the broker will still return it to ensure the consumer can make progress. This setting only applies while using franz-go.
  • tls: see TLS Configuration Settings for the full set of available options.
  • auth
    • plain_text (Deprecated in v0.123.0: use sasl with mechanism set to PLAIN instead.)
      • username: The username to use.
      • password: The password to use
    • sasl
      • username: The username to use.
      • password: The password to use.
      • mechanism: The sasl mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM_OAUTHBEARER, or PLAIN)
      • aws_msk
        • region: AWS Region in case of AWS_MSK_IAM_OAUTHBEARER mechanism
    • tls (Deprecated in v0.124.0: configure tls at the top level): this is an alias for tls at the top level.
    • kerberos
      • service_name: Kerberos service name
      • realm: Kerberos realm
      • use_keytab: Use of keytab instead of password, if this is true, keytab file will be used instead of password
      • username: The Kerberos username used for authenticate with KDC
      • password: The Kerberos password used for authenticate with KDC
      • config_file: Path to Kerberos configuration. i.e /etc/krb5.conf
      • keytab_file: Path to keytab file. i.e /etc/security/kafka.keytab
      • disable_fast_negotiation: Disable PA-FX-FAST negotiation (Pre-Authentication Framework - Fast). Some common Kerberos implementations do not support PA-FX-FAST negotiation. This is set to false by default.
  • metadata
    • full (default = true): Whether to maintain a full set of metadata. When disabled, the client does not make the initial request to broker at the startup.
    • refresh_interval (default = 10m): The refreshInterval controls the frequency at which cluster metadata is refreshed in the background.
    • retry
      • max (default = 3): The number of retries to get metadata
      • backoff (default = 250ms): How long to wait between metadata retries
  • autocommit
    • enable: (default = true) Whether or not to auto-commit updated offsets back to the broker
    • interval: (default = 1s) How frequently to commit updated offsets. Ineffective unless auto-commit is enabled
  • message_marking:
    • after: (default = false) If true, the messages are marked after the pipeline execution
    • on_error: (default = false) If false, only the successfully processed messages are marked. This applies to non-permanent errors. Note: this can block the entire partition in case a message processing returns a non-permanent error
    • on_permanent_error: (default = value of on_error) If false, messages that generate permanent errors are not marked. If true, messages that generate permanent errors are marked. Note: this can block the entire partition in case a message processing returns a permanent error
  • header_extraction:
    • extract_headers (default = false): Allows user to attach header fields to resource attributes in otel pipeline
    • headers (default = []): List of headers they'd like to extract from kafka record. Note: Matching pattern will be exact. Regexes are not supported as of now.
  • error_backoff: BackOff configuration in case of errors
    • enabled: (default = false) Whether to enable backoff when next consumers return errors
    • initial_interval: The time to wait after the first error before retrying
    • max_interval: The upper bound on backoff interval between consecutive retries
    • multiplier: The value multiplied by the backoff interval bounds
    • randomization_factor: A random factor used to calculate next backoff. Randomized interval = RetryInterval * (1 ± RandomizationFactor)
    • max_elapsed_time: The maximum amount of time trying to backoff before giving up. If set to 0, the retries are never stopped.
  • telemetry
    • metrics
      • kafka_receiver_records_delay:
        • enabled (default = false) Whether the metric kafka_receiver_records_delay will be reported or not.
Supported encodings

The Kafka receiver supports encoding extensions, as well as the following built-in encodings.

Available for all signals:

  • otlp_proto: the payload is decoded as OTLP Protobuf
  • otlp_json: the payload is decoded as OTLP JSON

Available only for traces:

  • jaeger_proto: the payload is deserialized to a single Jaeger proto Span.
  • jaeger_json: the payload is deserialized to a single Jaeger JSON Span using jsonpb.
  • zipkin_proto: the payload is deserialized into a list of Zipkin proto spans.
  • zipkin_json: the payload is deserialized into a list of Zipkin V2 JSON spans.
  • zipkin_thrift: the payload is deserialized into a list of Zipkin Thrift spans.

Available only for logs:

  • raw: the payload's bytes are inserted as the body of a log record.
  • text: the payload are decoded as text and inserted as the body of a log record. By default, it uses UTF-8 to decode. You can use text_<ENCODING>, like text_utf-8, text_shift_jis, etc., to customize this behavior.
  • json: the payload is decoded as JSON and inserted as the body of a log record.
  • azure_resource_logs: the payload is converted from Azure Resource Logs format to OTel format.
Message header propagation

The Kafka receiver will extract Kafka message headers and include them as request metadata (context). This metadata can then be used throughout the pipeline, for example to set attributes using the attributes processor.

Example configurations
Minimal configuration

By default, the receiver does not require any configuration. With the following configuration, the receiver will consume messages from the default topics from localhost:9092 using the otlp_proto encoding:

receivers:
  kafka:
TLS and authentication

In this example the receiver is configured to connect to Kafka using TLS for encryption, and SASL/SCRAM for authentication:

receivers:
  kafka:
    tls:
    auth:
      sasl:
        username: "user"
        password: "secret"
        mechanism: "SCRAM-SHA-512"
Header extraction

In addition to propagating Kafka message headers as metadata as described above in Message header propagation, the Kafka receiver can also be configured to extract and attach specific headers as resource attributes. e.g.

receivers:
  kafka:
    header_extraction:
      extract_headers: true
      headers: ["header1", "header2"]

If we produce a Kafka message with headers "header1: value1" and "header2: value2" with the above configuration, the receiver will attach these headers as resource attributes with the prefix "kafka.header.", i.e.

"resource": {
  "attributes": {
    "kafka.header.header1": "value1",
    "kafka.header.header2": "value2",
  }
}
...

Documentation

Overview

Package kafkareceiver receives traces from Kafka.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewFactory

func NewFactory() receiver.Factory

NewFactory creates Kafka receiver factory.

Types

type Config

type Config struct {
	configkafka.ClientConfig   `mapstructure:",squash"`
	configkafka.ConsumerConfig `mapstructure:",squash"`

	// Logs holds configuration about how logs should be consumed.
	Logs TopicEncodingConfig `mapstructure:"logs"`

	// Metrics holds configuration about how metrics should be consumed.
	Metrics TopicEncodingConfig `mapstructure:"metrics"`

	// Traces holds configuration about how traces should be consumed.
	Traces TopicEncodingConfig `mapstructure:"traces"`

	// Profiles holds configuration about how profiles should be consumed.
	Profiles TopicEncodingConfig `mapstructure:"profiles"`

	// Topic holds the name of the Kafka topic from which to consume data.
	//
	// Topic has no default. If explicitly specified, it will take precedence
	// over the default values of Logs.Topic, Traces.Topic, and Metrics.Topic.
	//
	// Deprecated [v0.124.0]: Use Logs.Topic, Traces.Topic, and Metrics.Topic.
	Topic string `mapstructure:"topic"`

	// Encoding holds the expected encoding of messages (default "otlp_proto")
	//
	// Encoding has no default. If explicitly specified, it will take precedence
	// over the default values of Logs.Encoding, Traces.Encoding, and
	// Metrics.Encoding.
	//
	// Deprecated [v0.124.0]: Use Logs.Encoding, Traces.Encoding, and
	// Metrics.Encoding.
	Encoding string `mapstructure:"encoding"`

	// MessageMarking controls the way the messages are marked as consumed.
	MessageMarking MessageMarking `mapstructure:"message_marking"`

	// HeaderExtraction controls extraction of headers from Kafka records.
	HeaderExtraction HeaderExtraction `mapstructure:"header_extraction"`

	// ErrorBackoff controls backoff/retry behavior when the next consumer
	// returns an error.
	ErrorBackOff configretry.BackOffConfig `mapstructure:"error_backoff"`

	// Telemetry controls optional telemetry configuration.
	Telemetry TelemetryConfig `mapstructure:"telemetry"`
}

Config defines configuration for Kafka receiver.

func (*Config) Unmarshal added in v0.124.0

func (c *Config) Unmarshal(conf *confmap.Conf) error

type HeaderExtraction added in v0.87.0

type HeaderExtraction struct {
	ExtractHeaders bool     `mapstructure:"extract_headers"`
	Headers        []string `mapstructure:"headers"`
}

type MessageMarking added in v0.38.0

type MessageMarking struct {
	// If true, the messages are marked after the pipeline execution
	After bool `mapstructure:"after"`

	// If false, only the successfully processed messages are marked. This only applies
	// to non-permanent errors. It has no impact if After is set to false.
	// Note: this can block the entire partition in case a message processing returns
	// a non-permanent error.
	OnError bool `mapstructure:"on_error"`

	// If false, only the successfully processed messages are marked. This only applies
	// to permanent errors. It has no impact if After is set to false.
	// Default value inherits from OnError for backward compatibility.
	// Note: this can block the entire partition in case a message processing returns
	// a permanent error.
	OnPermanentError bool `mapstructure:"on_permanent_error"`
}

type MetricConfig added in v0.132.0

type MetricConfig struct {
	Enabled bool `mapstructure:"enabled"`
	// contains filtered or unexported fields
}

MetricConfig provides common config for a particular metric.

type MetricsConfig added in v0.132.0

type MetricsConfig struct {
	// KafkaReceiverRecordsDelay controls whether the metric kafka_receiver_records_delay
	// that measures the time in seconds between producing and receiving a batch of records
	// will be reported or not. This metric is not reported by default because
	// it may slow down high-volume consuming.
	KafkaReceiverRecordsDelay MetricConfig `mapstructure:"kafka_receiver_records_delay"`
	// contains filtered or unexported fields
}

MetricsConfig provides config for optional receiver metrics.

type TelemetryConfig added in v0.132.0

type TelemetryConfig struct {
	Metrics MetricsConfig `mapstructure:"metrics"`
	// contains filtered or unexported fields
}

type TopicEncodingConfig added in v0.124.0

type TopicEncodingConfig struct {
	// Topic holds the name of the Kafka topic from which messages of the
	// signal type should be consumed.
	//
	// The default depends on the signal type:
	//  - "otlp_spans" for traces
	//  - "otlp_metrics" for metrics
	//  - "otlp_logs" for logs
	//  - "otlp_profiles" for profiles
	Topic string `mapstructure:"topic"`

	// Encoding holds the expected encoding of messages for the signal type
	//
	// Defaults to "otlp_proto".
	Encoding string `mapstructure:"encoding"`
}

TopicEncodingConfig holds signal-specific topic and encoding configuration.

Directories

Path Synopsis
internal