Skip to content

Azure Event Hub checkpoint state is disregarded after collector restart #37157

@dentonk

Description

@dentonk

Component(s)

receiver/azureeventhub

What happened?

Description

The azureeventhub receiver supports persisting the checkpoint state using a storage extension. This allows the receiver to pick up at the offset where it left off in the case of a interruption in operation, thus helping to reduce the chance of data loss. Testing has shown that the checkpoint state is stored correctly, however any time the receiver is restarted, the stored checkpoint seems to be disregarded and instead the original offset in the config is utilized.

Steps to Reproduce

  • Create an Azure Event Hub namespace, hub, consumer group and policy
    • To simplify things, when creating the hub, only use a single partition
  • Configure the OTel Collector with the azureeventhub receiver and resources above
    • When configuring the receiver, set the offset to "" which tells it to "starts with the latest offset."
  • Configure the file storage extension and add it to the azureeventhub receiver
  • Configure the collector with the file exporter
  • Start the collector
  • Send a single message to the event hub (this can be done using the Data Explorer > Send events option within the Azure Event Hub UI)
  • Stop the collector
  • Use either strings or the bbolt CLI to view the checkpoint file (bbolt get receiver_azureeventhub_ default otelnamespace/otelhub/otelcg/0)
  • While the collector is not running, send another message to the event hub
  • Start the collector
  • Check the output file
  • Stop the collector
  • Use either strings or the bbolt CLI to view the checkpoint file

Expected Result

The message that was sent while the OTel Collector was not running should be immediately ingested once the collector is restarted and sent to the output file. The output file should contain both messages. Also, the checkpoint state file should show the offset of the most recent event.

Actual Result

The message that was sent while the OTel Collector was not running is never ingested. When you view the checkpoint state file at the end, the offset is set to @latest.

Collector version

Collector Contrib v0.115.0

Environment information

Environment

OS: Docker Desktop v4.36.0 on Apple M3

version: "2"
services:
  otel-collector:
    image: otel/opentelemetry-collector-contrib:latest
    restart: always
    command: ["--config=/etc/otel-collector-config.yaml", "${OTELCOL_ARGS}"]
    volumes:
      - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
      - ./output.log:/output.log
      - ./storage:/storage

OpenTelemetry Collector configuration

receivers:
  azureeventhub:
    connection: Endpoint=sb://otelnamespace.servicebus.windows.net/;SharedAccessKeyName=otel_policy;SharedAccessKey=<REDACTED>;EntityPath=otelhub
    format: raw
    group: otelcg
    offset: ""
    partition: "0"
    storage: file_storage/eventhub_checkpoint
processors:
  transform/DecodeEventHub:
      error_mode: ignore
      log_statements:
          - context: log
            statements:
              - set(body, Decode(body, "utf-8"))
exporters:
  file:
    path: /output.log
extensions:
  file_storage/eventhub_checkpoint:
      directory: /storage
service:
  extensions:
    - file_storage/eventhub_checkpoint
  pipelines:
    logs/pipeline1:
      receivers:
        - azureeventhub
      processors:
        - transform/DecodeEventHub
      exporters:
        - file

Log output

No response

Additional context

Checkpoint after first message and shutdown: ✅

sudo ~/go/bin/bbolt get receiver_azureeventhub_ default otelnamespace/otelhub/otelcg/0
{"offset":"0","sequenceNumber":0,"enqueueTime":"2025-01-12T21:36:56.487Z"}

Checkpoint after second message and restart: ❌

sudo ~/go/bin/bbolt get receiver_azureeventhub_ default otelnamespace/otelhub/otelcg/0
{"offset":"@latest","sequenceNumber":0,"enqueueTime":"0001-01-01T00:00:00Z"}

Looking at the code, I looks like eventhub.NewHubFromConnectionString() does try to take the existing checkpoint into consideration (link)
However, it looks like the first thing that setUpOnePartition() does is appends additional receiverOptions that I believe likely "override" the checkpoint that it may have previously loaded. (link)

	if applyOffset && h.config.Offset != "" {
		receiverOptions = append(receiverOptions, eventhub.ReceiveWithStartingOffset(h.config.Offset))
	} else {
		receiverOptions = append(receiverOptions, eventhub.ReceiveWithLatestOffset())
	}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions