Skip to content

Unmarshal errors cause consumer group restarts and slow consumption #39909

Closed
@don-zur

Description

@don-zur

Component(s)

receiver/kafka

What happened?

Description

If topic contains messages that do not match the receivers desired encoding an unmarshal error is returned which causes the kafka consumer groups to restart. The result is that the collector appears to be blocked. While it is moving through offsets...it is very slow due to consumer group restart churn.

Steps to Reproduce

Create a topic and add messages in otlp_proto and otlp_json encoding. Start the collector and look at the log.

Expected Result

Unmarshalling errors should be logged and the consume loop should continue to next message with out restarting the kafka consumer groups.

Actual Result

Unmarshalling errors cause the kafka consumer groups to be restarted. The result is that the collector appears to be blocked. While it is moving through offsets it can take hours to move through messages encoded incorrectly.

Collector version

v0.120.1

Environment information

Environment

OS: MacOS
go version go1.24.2 darwin/arm64

OpenTelemetry Collector configuration

receivers:
    kafka:
      brokers: "localhost:9093"
      topic: "otel-mixed-traces"
      encoding: "otlp_proto"
      group_id: "otel-mixed-traces-cg-foo"
      initial_offset: earliest

      message_marking:  
        after: true
        on_error: true

      auth:
        tls:
          ca_file: ./certs/ca_cert
          cert_file: ./certs/cert.crt
          key_file: ./certs/cert.key

processors:
  batch:
    timeout: 5s
    send_batch_size: 5000
  memory_limiter:
    check_interval: 1s
    limit_percentage: 95
    limit_mib: 24000
    spike_limit_percentage: 20


service:
  extensions: [ health_check ]
  telemetry:
    metrics:
      address: 0.0.0.0:8888
  pipelines:
    traces:
      receivers: [ kafka ]
      processors: [ memory_limiter, batch ]
      exporters: [ debug ]

exporters:
  debug:
    verbosity: basic

extensions:
  health_check:
  pprof:
  zpages:

Log output

2025-05-05T20:50:22.880-0600	error	kafkareceiver/kafka_receiver.go:580	failed to unmarshal message	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "error": "proto: illegal wireType 6"}
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver.(*tracesConsumerGroupHandler).ConsumeClaim
	/Users/[email protected]/projects/maple/opentelemetry-collector-contrib/receiver/kafkareceiver/kafka_receiver.go:580
github.com/IBM/sarama.(*consumerGroupSession).consume
	/Users/[email protected]/go/pkg/mod/github.com/!i!b!m/[email protected]/consumer_group.go:952
github.com/IBM/sarama.newConsumerGroupSession.func2
	/Users/[email protected]/go/pkg/mod/github.com/!i!b!m/[email protected]/consumer_group.go:877
2025-05-05T20:50:30.341-0600	info	kafkareceiver/kafka_receiver.go:551	Starting consumer group	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 0}
2025-05-05T20:50:30.341-0600	info	kafkareceiver/kafka_receiver.go:551	Starting consumer group	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 2}
2025-05-05T20:50:30.343-0600	info	kafkareceiver/kafka_receiver.go:551	Starting consumer group	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 1}
2025-05-05T20:50:30.462-0600	info	kafkareceiver/kafka_receiver.go:551	Starting consumer group	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 3}
2025-05-05T20:50:34.224-0600	error	kafkareceiver/kafka_receiver.go:580	failed to unmarshal message	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "error": "proto: illegal wireType 6"}
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver.(*tracesConsumerGroupHandler).ConsumeClaim
	/Users/[email protected]/projects/maple/opentelemetry-collector-contrib/receiver/kafkareceiver/kafka_receiver.go:580
github.com/IBM/sarama.(*consumerGroupSession).consume
	/Users/[email protected]/go/pkg/mod/github.com/!i!b!m/[email protected]/consumer_group.go:952
github.com/IBM/sarama.newConsumerGroupSession.func2
	/Users/[email protected]/go/pkg/mod/github.com/!i!b!m/[email protected]/consumer_group.go:877
2025-05-05T20:50:42.801-0600	info	kafkareceiver/kafka_receiver.go:551	Starting consumer group	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 1}
2025-05-05T20:50:42.802-0600	info	kafkareceiver/kafka_receiver.go:551	Starting consumer group	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 2}
2025-05-05T20:50:42.803-0600	info	kafkareceiver/kafka_receiver.go:551	Starting consumer group	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 0}
2025-05-05T20:50:42.926-0600	info	kafkareceiver/kafka_receiver.go:551	Starting consumer group	{"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 3}

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions