-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Description
Component(s)
receiver/azureeventhub
What happened?
Description
The azureeventhub receiver supports persisting the checkpoint state using a storage extension. This allows the receiver to pick up at the offset where it left off in the case of a interruption in operation, thus helping to reduce the chance of data loss. Testing has shown that the checkpoint state is stored correctly, however any time the receiver is restarted, the stored checkpoint seems to be disregarded and instead the original offset in the config is utilized.
Steps to Reproduce
- Create an Azure Event Hub namespace, hub, consumer group and policy
- To simplify things, when creating the hub, only use a single partition
- Configure the OTel Collector with the azureeventhub receiver and resources above
- When configuring the receiver, set the
offset
to""
which tells it to "starts with the latest offset."
- When configuring the receiver, set the
- Configure the file storage extension and add it to the azureeventhub receiver
- Configure the collector with the file exporter
- Start the collector
- Send a single message to the event hub (this can be done using the Data Explorer > Send events option within the Azure Event Hub UI)
- Stop the collector
- Use either
strings
or the bbolt CLI to view the checkpoint file (bbolt get receiver_azureeventhub_ default otelnamespace/otelhub/otelcg/0
) - While the collector is not running, send another message to the event hub
- Start the collector
- Check the output file
- Stop the collector
- Use either
strings
or the bbolt CLI to view the checkpoint file
Expected Result
The message that was sent while the OTel Collector was not running should be immediately ingested once the collector is restarted and sent to the output file. The output file should contain both messages. Also, the checkpoint state file should show the offset of the most recent event.
Actual Result
The message that was sent while the OTel Collector was not running is never ingested. When you view the checkpoint state file at the end, the offset is set to @latest
.
Collector version
Collector Contrib v0.115.0
Environment information
Environment
OS: Docker Desktop v4.36.0 on Apple M3
version: "2"
services:
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
restart: always
command: ["--config=/etc/otel-collector-config.yaml", "${OTELCOL_ARGS}"]
volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
- ./output.log:/output.log
- ./storage:/storage
OpenTelemetry Collector configuration
receivers:
azureeventhub:
connection: Endpoint=sb://otelnamespace.servicebus.windows.net/;SharedAccessKeyName=otel_policy;SharedAccessKey=<REDACTED>;EntityPath=otelhub
format: raw
group: otelcg
offset: ""
partition: "0"
storage: file_storage/eventhub_checkpoint
processors:
transform/DecodeEventHub:
error_mode: ignore
log_statements:
- context: log
statements:
- set(body, Decode(body, "utf-8"))
exporters:
file:
path: /output.log
extensions:
file_storage/eventhub_checkpoint:
directory: /storage
service:
extensions:
- file_storage/eventhub_checkpoint
pipelines:
logs/pipeline1:
receivers:
- azureeventhub
processors:
- transform/DecodeEventHub
exporters:
- file
Log output
No response
Additional context
Checkpoint after first message and shutdown: ✅
sudo ~/go/bin/bbolt get receiver_azureeventhub_ default otelnamespace/otelhub/otelcg/0
{"offset":"0","sequenceNumber":0,"enqueueTime":"2025-01-12T21:36:56.487Z"}
Checkpoint after second message and restart: ❌
sudo ~/go/bin/bbolt get receiver_azureeventhub_ default otelnamespace/otelhub/otelcg/0
{"offset":"@latest","sequenceNumber":0,"enqueueTime":"0001-01-01T00:00:00Z"}
Looking at the code, I looks like eventhub.NewHubFromConnectionString()
does try to take the existing checkpoint into consideration (link)
However, it looks like the first thing that setUpOnePartition()
does is appends additional receiverOptions
that I believe likely "override" the checkpoint that it may have previously loaded. (link)
if applyOffset && h.config.Offset != "" {
receiverOptions = append(receiverOptions, eventhub.ReceiveWithStartingOffset(h.config.Offset))
} else {
receiverOptions = append(receiverOptions, eventhub.ReceiveWithLatestOffset())
}