Skip to content

Update OTel-Arrow exporter to use QueueBatch settings #40211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/otelarrow-queuebatch-default.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otelarrowexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Configure the combined queue-batch facility, modifies the defaults `sending_queue` settings.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [40211]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Removes experimental batcher config, which was never recommended, and documents how to replace the formerly recommended concurrentbatchprocessor configuration.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
97 changes: 62 additions & 35 deletions exporter/otelarrowexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ limits the decision to a random subset of `N` streams.

### Matching Metadata Per Stream

The following configuration values allow for separate streams per unique
The following configuration values allow for separate streams per unique
metadata combinations:
- `metadata_keys` (default = empty): When set, this exporter will create one
arrow exporter instance per distinct combination of values in the
arrow exporter instance per distinct combination of values in the
client.Metadata.
- `metadata_cardinality_limit` (default = 1000): When metadata_keys is not empty,
- `metadata_cardinality_limit` (default = 1000): When metadata_keys is not empty,
this setting limits the number of unique combinations of metadata key values
that will be processed over the lifetime of the exporter.

Expand Down Expand Up @@ -267,43 +267,70 @@ exporters:

### Batching Configuration

This exporter includes a new, experimental `batcher` configuration for
batching in the `exporterhelper` module, but this mode is disabled by
default. This batching support works when combined with
`queue_sender` functionality.
This exporter supports built-in `exporterhelper` support for combined
queue and batch behavior via the `sending_queue` settings. Note that
the bytes-based batching is supported, but that the exporterhelper
estimates batch sizes using the OTLP representation, not considering
Arrow compression.

In the default configuration, without a persistent storage extension,
the exporter uses an in-memory queue and will respond to the caller
before the export completes.

In the `sending_queue` structure, the default settings set by this
component are:

- `block_on_overflow: true`
- `wait_for_result: false`
- `sizer: items`
- `queue_size: 100_000`
- `batch::flush_timeout: 1s`
- `batch::min_size: 1_000`
- `batch::max_size: 1_500`
- `num_consumers: 100`

This indicates to use the in-memory queue, to return success to the
client on acceptance and block when full, to allow 100 thousand items
to export concurrently in up to 100 concurrent batches of 1000 to 1500
items each.

For additional safety in the event of a Collector crash, set
`wait_for_result: true`. The example below demonstrates how to use the
in-memory queue for batching with this additional level of safety:

```
exporters:
otelarrow:
batcher:
enabled: true
# ...
sending_queue:
enabled: true
storage: file_storage/otc
extensions:
file_storage/otc:
directory: /var/lib/storage/otc
```

The built-in batcher is only recommended with a persistent queue,
otherwise it cannot provide back-pressure to the caller. If building
a custom build of the OpenTelemetry Collector, we recommend using the
[Concurrent Batch
Processor](https://github.com/open-telemetry/otel-arrow/blob/main/collector/processor/concurrentbatchprocessor/README.md)
to provide simultaneous back-pressure, concurrency, and batching
functionality. See [more discussion on this
issue](https://github.com/open-telemetry/opentelemetry-collector/issues/10368).

```
exporters:
otelarrow:
batcher:
enabled: false
sending_queue:
enabled: false
processors:
concurrentbatch:
send_batch_max_size: 1500
send_batch_size: 1000
timeout: 1s
# Use wait_for_result: true for additional safety, otherwise
# a collector crash will cause loss of data.
wait_for_result: true

# This is the default setting, it ensures the exporter will
# block the pipeline (subject to deadline) instead of failing
# fast when overflow data arrives.
block_on_overflow: true

# The queue will admit 1 million items into the queue and return
# success before blocking new requests.
sizer: items
queue_size: 1_000_000

# Use relatively large batches, improves compression.
batch:
flush_timeout: 1s
min_size: 4_000
max_size: 5_000

# With max-size batches, we need 200 consumers to keep the
# OTel-Arrow streams busy. There will be (num_consumers /
# num_streams) pending requests per stream on average.
num_consumers: 200

# Optional persistent storage. If this is set, you
# can safely use wait_for_result: false above.
# storage: name_of_extension
```
4 changes: 0 additions & 4 deletions exporter/otelarrowexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ type Config struct {

configgrpc.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.

// Experimental: This configuration is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved
BatcherConfig exporterhelper.BatcherConfig `mapstructure:"batcher"` //nolint:staticcheck

// Arrow includes settings specific to OTel Arrow.
Arrow ArrowConfig `mapstructure:"arrow"`

Expand Down
23 changes: 10 additions & 13 deletions exporter/otelarrowexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,16 @@ func TestUnmarshalConfig(t *testing.T) {
MaxElapsedTime: 10 * time.Minute,
},
QueueSettings: exporterhelper.QueueBatchConfig{
Enabled: true,
NumConsumers: 2,
QueueSize: 10,
Sizer: exporterhelper.RequestSizerTypeRequests,
Enabled: true,
NumConsumers: 2,
QueueSize: 10,
Sizer: exporterhelper.RequestSizerTypeItems,
BlockOnOverflow: true,
Batch: &exporterhelper.BatchConfig{
FlushTimeout: 200 * time.Millisecond,
MinSize: 1000,
MaxSize: 10000,
},
},
ClientConfig: configgrpc.ClientConfig{
Headers: map[string]configopaque.String{
Expand All @@ -86,15 +92,6 @@ func TestUnmarshalConfig(t *testing.T) {
BalancerName: "experimental",
Auth: &configauth.Config{AuthenticatorID: component.NewID(component.MustNewType("nop"))},
},
BatcherConfig: exporterhelper.BatcherConfig{ //nolint:staticcheck
Enabled: true,
FlushTimeout: 200 * time.Millisecond,
SizeConfig: exporterhelper.SizeConfig{ //nolint:staticcheck
Sizer: exporterhelper.RequestSizerTypeItems,
MinSize: 1000,
MaxSize: 10000,
},
},
Arrow: ArrowConfig{
NumStreams: 2,
MaxStreamLifetime: 2 * time.Hour,
Expand Down
37 changes: 27 additions & 10 deletions exporter/otelarrowexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package otelarrowexporter // import "github.com/open-telemetry/opentelemetry-col

import (
"context"
"time"

arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -35,14 +36,31 @@ func NewFactory() exporter.Factory {
}

func createDefaultConfig() component.Config {
batcherCfg := exporterhelper.NewDefaultBatcherConfig() //nolint:staticcheck
batcherCfg.Enabled = false
// These defaults are taken from the experimental setup used
// in the blog post covering Phase 1 performance results. These
// were the defaults used in the concurrentbatchprocessor, too.
queueCfg := exporterhelper.NewDefaultQueueConfig()
queueCfg.BlockOnOverflow = true
queueCfg.Sizer = exporterhelper.RequestSizerTypeItems
queueCfg.Batch = &exporterhelper.BatchConfig{
FlushTimeout: time.Second,
MinSize: 1000,
MaxSize: 1500,
}
// The default is configured in items, this value represents
// 60-100 concurrent batches.
queueCfg.QueueSize = 100000
// This enables by default an appropriate number of consumers
// Note for this exporter the consumer's role is to take from
// the queue and call into an Arrow stream. When the exporter
// falls back to OTLP, this is the number of concurrent OTLP
// exports.
queueCfg.NumConsumers = int(queueCfg.QueueSize / queueCfg.Batch.MinSize)

return &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
RetryConfig: configretry.NewDefaultBackOffConfig(),
QueueSettings: exporterhelper.NewDefaultQueueConfig(),
BatcherConfig: batcherCfg,
QueueSettings: queueCfg,
ClientConfig: configgrpc.ClientConfig{
Headers: map[string]configopaque.String{},
// Default to zstd compression
Expand All @@ -68,15 +86,14 @@ func createDefaultConfig() component.Config {
}
}

func helperOptions(e exp) []exporterhelper.Option {
func helperOptions(e exp, qbs exporterhelper.QueueBatchSettings) []exporterhelper.Option {
cfg := e.getConfig().(*Config)
return []exporterhelper.Option{
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithTimeout(cfg.TimeoutSettings),
exporterhelper.WithRetry(cfg.RetryConfig),
exporterhelper.WithQueue(cfg.QueueSettings),
exporterhelper.WithQueueBatch(cfg.QueueSettings, qbs),
exporterhelper.WithStart(e.start),
exporterhelper.WithBatcher(cfg.BatcherConfig), //nolint:staticcheck
exporterhelper.WithShutdown(e.shutdown),
}
}
Expand Down Expand Up @@ -106,7 +123,7 @@ func createTracesExporter(
}
return exporterhelper.NewTraces(ctx, e.getSettings(), e.getConfig(),
e.pushTraces,
helperOptions(e)...,
helperOptions(e, exporterhelper.NewTracesQueueBatchSettings())...,
)
}

Expand All @@ -125,7 +142,7 @@ func createMetricsExporter(
}
return exporterhelper.NewMetrics(ctx, e.getSettings(), e.getConfig(),
e.pushMetrics,
helperOptions(e)...,
helperOptions(e, exporterhelper.NewMetricsQueueBatchSettings())...,
)
}

Expand All @@ -144,6 +161,6 @@ func createLogsExporter(
}
return exporterhelper.NewLogs(ctx, e.getSettings(), e.getConfig(),
e.pushLogs,
helperOptions(e)...,
helperOptions(e, exporterhelper.NewLogsQueueBatchSettings())...,
)
}
5 changes: 4 additions & 1 deletion exporter/otelarrowexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ func TestCreateDefaultConfig(t *testing.T) {
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
ocfg := factory.CreateDefaultConfig().(*Config)
assert.Equal(t, ocfg.RetryConfig, configretry.NewDefaultBackOffConfig())
assert.Equal(t, ocfg.QueueSettings, exporterhelper.NewDefaultQueueConfig())

// We customize the queue/batch settings.
assert.NotEqual(t, ocfg.QueueSettings, exporterhelper.NewDefaultQueueConfig())

assert.Equal(t, ocfg.TimeoutSettings, exporterhelper.NewDefaultTimeoutConfig())
assert.Equal(t, configcompression.TypeZstd, ocfg.Compression)
assert.Equal(t, ArrowConfig{
Expand Down
6 changes: 6 additions & 0 deletions exporter/otelarrowexporter/internal/arrow/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ var (
)

const (
// DefaultProducersPerStream is the factor used to configure
// the combined exporterhelper batch/queue function, which has
// a num_consumers parameter. That field is set to this factor
// times the number of streams by default.
DefaultProducersPerStream = 10

// DefaultMaxStreamLifetime is 30 seconds, because the
// marginal compression benefit of a longer OTel-Arrow stream
// is limited after 100s of batches.
Expand Down
1 change: 1 addition & 0 deletions exporter/otelarrowexporter/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ status:
tests:
config:
endpoint: http://127.0.0.1:4317

13 changes: 6 additions & 7 deletions exporter/otelarrowexporter/otelarrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,17 +483,17 @@ func TestSendTracesWhenEndpointHasHttpScheme(t *testing.T) {
// Ensure that initially there is no data in the receiver.
assert.EqualValues(t, 0, rcv.requestCount.Load())

// Send empty trace.
td := ptrace.NewTraces()
// Send 2 spans.
td := testdata.GenerateTraces(2)
assert.NoError(t, exp.ConsumeTraces(context.Background(), td))

// Wait until it is received.
// Wait until received.
assert.Eventually(t, func() bool {
return rcv.requestCount.Load() > 0
}, 10*time.Second, 5*time.Millisecond)

// Ensure it was received empty.
assert.EqualValues(t, 0, rcv.totalItems.Load())
// Ensure all were received.
assert.EqualValues(t, 2, rcv.totalItems.Load())
})
}
}
Expand Down Expand Up @@ -748,7 +748,7 @@ func TestSendTracesOnResourceExhaustion(t *testing.T) {

assert.EqualValues(t, 0, rcv.requestCount.Load())

td := ptrace.NewTraces()
td := testdata.GenerateTraces(2)
assert.NoError(t, exp.ConsumeTraces(context.Background(), td))

assert.Never(t, func() bool {
Expand Down Expand Up @@ -934,7 +934,6 @@ func testSendArrowTraces(t *testing.T, clientWaitForReady, streamServiceAvailabl
cfg.QueueSettings.Enabled = false

set := exportertest.NewNopSettings(factory.Type())
set.Logger = zaptest.NewLogger(t)
exp, err := factory.CreateTraces(context.Background(), set, cfg)
require.NoError(t, err)
require.NotNil(t, exp)
Expand Down
9 changes: 4 additions & 5 deletions exporter/otelarrowexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ sending_queue:
enabled: true
num_consumers: 2
queue_size: 10
batch:
flush_timeout: 200ms
min_size: 1000
max_size: 10000
retry_on_failure:
enabled: true
initial_interval: 10s
Expand All @@ -25,11 +29,6 @@ keepalive:
timeout: 30s
permit_without_stream: true
balancer_name: "experimental"
batcher:
enabled: true
flush_timeout: 200ms
min_size: 1000
max_size: 10000
arrow:
num_streams: 2
disabled: false
Expand Down