Skip to content

Commit d25590d

Browse files
committed
Add processor duration metric.
1 parent 8cc2954 commit d25590d

15 files changed

+206
-8
lines changed

.chloggen/processor-duration.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: processorhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add processor duration metric.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [13231]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [user]

docs/rfcs/component-universal-telemetry.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,12 @@ package is internal, then the scope name should be that of the module which cont
8080

8181
### Auto-Instrumented Metrics
8282

83-
There are two straightforward measurements that can be made on any pdata:
83+
There are a few measurements that can be made on any pdata:
8484

85-
1. A count of "items" (spans, data points, or log records). These are low cost but broadly useful, so they should be enabled by default.
86-
2. A measure of size, based on [ProtoMarshaler.Sizer()](https://github.com/open-telemetry/opentelemetry-collector/blob/9907ba50df0d5853c34d2962cf21da42e15a560d/pdata/ptrace/pb.go#l11).
85+
1. Count of "items" (spans, data points, or log records). These are low cost but broadly useful, so they should be enabled by default.
86+
1. Measure of size, based on [ProtoMarshaler.Sizer()](https://github.com/open-telemetry/opentelemetry-collector/blob/9907ba50df0d5853c34d2962cf21da42e15a560d/pdata/ptrace/pb.go#l11).
8787
These may be high cost to compute, so by default they should be disabled (and not calculated). This default setting may change in the future if it is demonstrated that the cost is generally acceptable.
88+
1. Duration. It is the time it takes to process items in a component. Duration measurement is especially useful for tuning processors with high computational overhead.
8889

8990
The location of these measurements can be described in terms of whether the data is "consumed" or "produced", from the perspective of the
9091
component to which the telemetry is attributed. Metrics which contain the term "produced" describe data which is emitted from the component,

processor/processorhelper/documentation.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,14 @@
66

77
The following telemetry is emitted by this component.
88

9+
### otelcol_processor_duration
10+
11+
Time taken by the processor to complete. [alpha]
12+
13+
| Unit | Metric Type | Value Type |
14+
| ---- | ----------- | ---------- |
15+
| s | Histogram | Double |
16+
917
### otelcol_processor_incoming_items
1018

1119
Number of items passed to the processor. [alpha]

processor/processorhelper/generated_package_test.go

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/processorhelper/internal/metadata/generated_telemetry.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/processorhelper/internal/metadatatest/generated_telemetrytest.go

Lines changed: 16 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/processorhelper/internal/metadatatest/generated_telemetrytest_test.go

Lines changed: 4 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/processorhelper/logs.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ func NewLogs(
4949
logsConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
5050
span := trace.SpanFromContext(ctx)
5151
span.AddEvent("Start processing.", eventOptions)
52+
53+
ps := newProcessorStart()
54+
defer obs.recordDuration(ctx, ps)
55+
5256
recordsIn := ld.LogRecordCount()
5357

5458
var errFunc error

processor/processorhelper/logs_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,40 @@ func TestLogs_RecordIn_ErrorOut(t *testing.T) {
183183
}, metricdatatest.IgnoreTimestamp())
184184
}
185185

186+
func TestLogs_ProcessDuration(t *testing.T) {
187+
// Regardless of how many logs are ingested, emit just one
188+
mockAggregate := func(_ context.Context, _ plog.Logs) (plog.Logs, error) {
189+
ld := plog.NewLogs()
190+
ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
191+
return ld, nil
192+
}
193+
194+
incomingLogs := plog.NewLogs()
195+
incomingLogRecords := incomingLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()
196+
197+
// Add 3 records to the incoming
198+
incomingLogRecords.AppendEmpty()
199+
incomingLogRecords.AppendEmpty()
200+
incomingLogRecords.AppendEmpty()
201+
202+
tel := componenttest.NewTelemetry()
203+
lp, err := NewLogs(context.Background(), newSettings(tel), &testLogsCfg, consumertest.NewNop(), mockAggregate)
204+
require.NoError(t, err)
205+
206+
assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))
207+
assert.NoError(t, lp.ConsumeLogs(context.Background(), incomingLogs))
208+
assert.NoError(t, lp.Shutdown(context.Background()))
209+
210+
metadatatest.AssertEqualProcessorDuration(t, tel,
211+
[]metricdata.HistogramDataPoint[float64]{
212+
{
213+
Count: 1,
214+
BucketCounts: []uint64{1},
215+
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "logs")),
216+
},
217+
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
218+
}
219+
186220
func newSettings(tel *componenttest.Telemetry) processor.Settings {
187221
set := processortest.NewNopSettings(component.MustNewType("processorhelper"))
188222
set.TelemetrySettings = tel.NewTelemetrySettings()

processor/processorhelper/metadata.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,13 @@ telemetry:
2828
sum:
2929
value_type: int
3030
monotonic: true
31+
32+
processor_duration:
33+
enabled: true
34+
stability:
35+
level: alpha
36+
description: Time taken by the processor to complete.
37+
unit: s
38+
histogram:
39+
async: false
40+
value_type: double

processor/processorhelper/metrics.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ func NewMetrics(
4949
metricsConsumer, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
5050
span := trace.SpanFromContext(ctx)
5151
span.AddEvent("Start processing.", eventOptions)
52+
53+
ps := newProcessorStart()
54+
defer obs.recordDuration(ctx, ps)
55+
5256
pointsIn := md.DataPointCount()
5357

5458
var errFunc error

processor/processorhelper/metrics_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,3 +180,38 @@ func TestMetrics_RecordIn_ErrorOut(t *testing.T) {
180180
},
181181
}, metricdatatest.IgnoreTimestamp())
182182
}
183+
184+
func TestMetrics_ProcesDuration(t *testing.T) {
185+
// Regardless of how many data points are ingested, emit 3
186+
mockAggregate := func(_ context.Context, _ pmetric.Metrics) (pmetric.Metrics, error) {
187+
md := pmetric.NewMetrics()
188+
md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty()
189+
md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty()
190+
md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty()
191+
return md, nil
192+
}
193+
194+
incomingMetrics := pmetric.NewMetrics()
195+
dps := incomingMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints()
196+
197+
// Add 2 data points to the incoming
198+
dps.AppendEmpty()
199+
dps.AppendEmpty()
200+
201+
tel := componenttest.NewTelemetry()
202+
mp, err := NewMetrics(context.Background(), newSettings(tel), &testMetricsCfg, consumertest.NewNop(), mockAggregate)
203+
require.NoError(t, err)
204+
205+
assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))
206+
assert.NoError(t, mp.ConsumeMetrics(context.Background(), incomingMetrics))
207+
assert.NoError(t, mp.Shutdown(context.Background()))
208+
209+
metadatatest.AssertEqualProcessorDuration(t, tel,
210+
[]metricdata.HistogramDataPoint[float64]{
211+
{
212+
Count: 1,
213+
BucketCounts: []uint64{1},
214+
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "metrics")),
215+
},
216+
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
217+
}

processor/processorhelper/obsreport.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package processorhelper // import "go.opentelemetry.io/collector/processor/proce
55

66
import (
77
"context"
8+
"time"
89

910
"go.opentelemetry.io/otel/attribute"
1011
"go.opentelemetry.io/otel/metric"
@@ -40,3 +41,17 @@ func (or *obsReport) recordInOut(ctx context.Context, incoming, outgoing int) {
4041
or.telemetryBuilder.ProcessorIncomingItems.Add(ctx, int64(incoming), or.otelAttrs)
4142
or.telemetryBuilder.ProcessorOutgoingItems.Add(ctx, int64(outgoing), or.otelAttrs)
4243
}
44+
45+
func (or *obsReport) recordDuration(ctx context.Context, ps processorStart) {
46+
duration := time.Since(ps.Time)
47+
durationSecs := float64(duration.Microseconds()) / 1e6 // Convert to seconds
48+
or.telemetryBuilder.ProcessorDuration.Record(ctx, float64(durationSecs), or.otelAttrs)
49+
}
50+
51+
type processorStart struct {
52+
time.Time
53+
}
54+
55+
func newProcessorStart() processorStart {
56+
return processorStart{time.Now()}
57+
}

processor/processorhelper/traces.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ func NewTraces(
4949
traceConsumer, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error {
5050
span := trace.SpanFromContext(ctx)
5151
span.AddEvent("Start processing.", eventOptions)
52+
53+
ps := newProcessorStart()
54+
defer obs.recordDuration(ctx, ps)
55+
5256
spansIn := td.SpanCount()
5357

5458
var errFunc error

processor/processorhelper/traces_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,3 +184,38 @@ func TestTraces_RecordIn_ErrorOut(t *testing.T) {
184184
},
185185
}, metricdatatest.IgnoreTimestamp())
186186
}
187+
188+
func TestTraces_ProcessDuration(t *testing.T) {
189+
// Regardless of how many spans are ingested, emit just one
190+
mockAggregate := func(_ context.Context, _ ptrace.Traces) (ptrace.Traces, error) {
191+
td := ptrace.NewTraces()
192+
td.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
193+
return td, nil
194+
}
195+
196+
incomingTraces := ptrace.NewTraces()
197+
incomingSpans := incomingTraces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans()
198+
199+
// Add 4 records to the incoming
200+
incomingSpans.AppendEmpty()
201+
incomingSpans.AppendEmpty()
202+
incomingSpans.AppendEmpty()
203+
incomingSpans.AppendEmpty()
204+
205+
tel := componenttest.NewTelemetry()
206+
tp, err := NewTraces(context.Background(), newSettings(tel), &testLogsCfg, consumertest.NewNop(), mockAggregate)
207+
require.NoError(t, err)
208+
209+
assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost()))
210+
assert.NoError(t, tp.ConsumeTraces(context.Background(), incomingTraces))
211+
assert.NoError(t, tp.Shutdown(context.Background()))
212+
213+
metadatatest.AssertEqualProcessorDuration(t, tel,
214+
[]metricdata.HistogramDataPoint[float64]{
215+
{
216+
Count: 1,
217+
BucketCounts: []uint64{1},
218+
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "traces")),
219+
},
220+
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
221+
}

0 commit comments

Comments
 (0)