Description
Component(s)
receiver/kafka
What happened?
Description
If topic contains messages that do not match the receivers desired encoding an unmarshal error is returned which causes the kafka consumer groups to restart. The result is that the collector appears to be blocked. While it is moving through offsets...it is very slow due to consumer group restart churn.
Steps to Reproduce
Create a topic and add messages in otlp_proto and otlp_json encoding. Start the collector and look at the log.
Expected Result
Unmarshalling errors should be logged and the consume loop should continue to next message with out restarting the kafka consumer groups.
Actual Result
Unmarshalling errors cause the kafka consumer groups to be restarted. The result is that the collector appears to be blocked. While it is moving through offsets it can take hours to move through messages encoded incorrectly.
Collector version
v0.120.1
Environment information
Environment
OS: MacOS
go version go1.24.2 darwin/arm64
OpenTelemetry Collector configuration
receivers:
kafka:
brokers: "localhost:9093"
topic: "otel-mixed-traces"
encoding: "otlp_proto"
group_id: "otel-mixed-traces-cg-foo"
initial_offset: earliest
message_marking:
after: true
on_error: true
auth:
tls:
ca_file: ./certs/ca_cert
cert_file: ./certs/cert.crt
key_file: ./certs/cert.key
processors:
batch:
timeout: 5s
send_batch_size: 5000
memory_limiter:
check_interval: 1s
limit_percentage: 95
limit_mib: 24000
spike_limit_percentage: 20
service:
extensions: [ health_check ]
telemetry:
metrics:
address: 0.0.0.0:8888
pipelines:
traces:
receivers: [ kafka ]
processors: [ memory_limiter, batch ]
exporters: [ debug ]
exporters:
debug:
verbosity: basic
extensions:
health_check:
pprof:
zpages:
Log output
2025-05-05T20:50:22.880-0600 error kafkareceiver/kafka_receiver.go:580 failed to unmarshal message {"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "error": "proto: illegal wireType 6"}
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver.(*tracesConsumerGroupHandler).ConsumeClaim
/Users/[email protected]/projects/maple/opentelemetry-collector-contrib/receiver/kafkareceiver/kafka_receiver.go:580
github.com/IBM/sarama.(*consumerGroupSession).consume
/Users/[email protected]/go/pkg/mod/github.com/!i!b!m/[email protected]/consumer_group.go:952
github.com/IBM/sarama.newConsumerGroupSession.func2
/Users/[email protected]/go/pkg/mod/github.com/!i!b!m/[email protected]/consumer_group.go:877
2025-05-05T20:50:30.341-0600 info kafkareceiver/kafka_receiver.go:551 Starting consumer group {"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 0}
2025-05-05T20:50:30.341-0600 info kafkareceiver/kafka_receiver.go:551 Starting consumer group {"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 2}
2025-05-05T20:50:30.343-0600 info kafkareceiver/kafka_receiver.go:551 Starting consumer group {"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 1}
2025-05-05T20:50:30.462-0600 info kafkareceiver/kafka_receiver.go:551 Starting consumer group {"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 3}
2025-05-05T20:50:34.224-0600 error kafkareceiver/kafka_receiver.go:580 failed to unmarshal message {"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "error": "proto: illegal wireType 6"}
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver.(*tracesConsumerGroupHandler).ConsumeClaim
/Users/[email protected]/projects/maple/opentelemetry-collector-contrib/receiver/kafkareceiver/kafka_receiver.go:580
github.com/IBM/sarama.(*consumerGroupSession).consume
/Users/[email protected]/go/pkg/mod/github.com/!i!b!m/[email protected]/consumer_group.go:952
github.com/IBM/sarama.newConsumerGroupSession.func2
/Users/[email protected]/go/pkg/mod/github.com/!i!b!m/[email protected]/consumer_group.go:877
2025-05-05T20:50:42.801-0600 info kafkareceiver/kafka_receiver.go:551 Starting consumer group {"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 1}
2025-05-05T20:50:42.802-0600 info kafkareceiver/kafka_receiver.go:551 Starting consumer group {"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 2}
2025-05-05T20:50:42.803-0600 info kafkareceiver/kafka_receiver.go:551 Starting consumer group {"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 0}
2025-05-05T20:50:42.926-0600 info kafkareceiver/kafka_receiver.go:551 Starting consumer group {"otelcol.component.id": "kafka", "otelcol.component.kind": "Receiver", "otelcol.signal": "traces", "partition": 3}
Additional context
No response