Skip to content

[chore] Revert dc8e2dd #12917

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 0 additions & 28 deletions .chloggen/lock-attributes-gate.yaml

This file was deleted.

10 changes: 8 additions & 2 deletions internal/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (

var NewPipelineTelemetryGate = featuregate.GlobalRegistry().MustRegister(
"telemetry.newPipelineTelemetry",
featuregate.StageStable,
featuregate.StageAlpha,
featuregate.WithRegisterFromVersion("v0.123.0"),
featuregate.WithRegisterToVersion("v0.127.0"),
featuregate.WithRegisterDescription("Instruments Collector pipelines and injects component-identifying attributes"),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/rfcs/component-universal-telemetry.md"),
)

Expand Down Expand Up @@ -46,10 +46,16 @@ type TelemetrySettings struct {
// The publicization of this API is tracked in https://github.com/open-telemetry/opentelemetry-collector/issues/12405

func WithoutAttributes(ts TelemetrySettings, fields ...string) TelemetrySettings {
if !NewPipelineTelemetryGate.IsEnabled() {
return ts
}
return WithAttributeSet(ts, componentattribute.RemoveAttributes(ts.extraAttributes, fields...))
}

func WithAttributeSet(ts TelemetrySettings, attrs attribute.Set) TelemetrySettings {
if !NewPipelineTelemetryGate.IsEnabled() {
return ts
}
ts.extraAttributes = attrs
ts.Logger = componentattribute.ZapLoggerWithAttributes(ts.Logger, ts.extraAttributes)
ts.TracerProvider = componentattribute.TracerProviderWithAttributes(ts.TracerProvider, ts.extraAttributes)
Expand Down
11 changes: 11 additions & 0 deletions processor/memorylimiterprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/memorylimiter"
"go.opentelemetry.io/collector/internal/telemetry"
"go.opentelemetry.io/collector/internal/telemetry/componentattribute"
Expand All @@ -33,7 +34,17 @@ func TestCreateDefaultConfig(t *testing.T) {
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
}

func setGate(t *testing.T, gate *featuregate.Gate, value bool) {
initialValue := gate.IsEnabled()
require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), value))
t.Cleanup(func() {
_ = featuregate.GlobalRegistry().Set(gate.ID(), initialValue)
})
}

func TestCreateProcessor(t *testing.T) {
setGate(t, telemetry.NewPipelineTelemetryGate, true)

factory := NewFactory()
require.NotNil(t, factory)

Expand Down
2 changes: 1 addition & 1 deletion processor/memorylimiterprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
go.opentelemetry.io/collector/consumer/consumererror v0.124.0
go.opentelemetry.io/collector/consumer/consumertest v0.124.0
go.opentelemetry.io/collector/consumer/xconsumer v0.124.0
go.opentelemetry.io/collector/featuregate v1.30.0
go.opentelemetry.io/collector/internal/memorylimiter v0.124.0
go.opentelemetry.io/collector/internal/telemetry v0.124.0
go.opentelemetry.io/collector/pdata v1.30.0
Expand Down Expand Up @@ -57,7 +58,6 @@ require (
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/component/componentstatus v0.124.0 // indirect
go.opentelemetry.io/collector/featuregate v1.30.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.124.0 // indirect
go.opentelemetry.io/contrib/bridges/otelzap v0.10.0 // indirect
go.opentelemetry.io/otel/log v0.11.0 // indirect
Expand Down
11 changes: 11 additions & 0 deletions receiver/otlpreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/xconsumer"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/telemetry"
"go.opentelemetry.io/collector/internal/telemetry/componentattribute"
"go.opentelemetry.io/collector/internal/testutil"
Expand All @@ -36,7 +37,17 @@ func TestCreateDefaultConfig(t *testing.T) {
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
}

func setGate(t *testing.T, gate *featuregate.Gate, value bool) {
initialValue := gate.IsEnabled()
require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), value))
t.Cleanup(func() {
_ = featuregate.GlobalRegistry().Set(gate.ID(), initialValue)
})
}

func TestCreateSameReceiver(t *testing.T) {
setGate(t, telemetry.NewPipelineTelemetryGate, true)

factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.GRPC.NetAddr.Endpoint = testutil.GetAvailableLocalAddress(t)
Expand Down
2 changes: 1 addition & 1 deletion receiver/otlpreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
go.opentelemetry.io/collector/consumer/consumererror v0.124.0
go.opentelemetry.io/collector/consumer/consumertest v0.124.0
go.opentelemetry.io/collector/consumer/xconsumer v0.124.0
go.opentelemetry.io/collector/featuregate v1.30.0
go.opentelemetry.io/collector/internal/sharedcomponent v0.124.0
go.opentelemetry.io/collector/internal/telemetry v0.124.0
go.opentelemetry.io/collector/pdata v1.30.0
Expand Down Expand Up @@ -68,7 +69,6 @@ require (
go.opentelemetry.io/collector/config/configmiddleware v0.0.0-20250422165940-c47951a8bf71 // indirect
go.opentelemetry.io/collector/extension/extensionauth v1.30.0 // indirect
go.opentelemetry.io/collector/extension/extensionmiddleware v0.0.0-20250422165940-c47951a8bf71 // indirect
go.opentelemetry.io/collector/featuregate v1.30.0 // indirect
go.opentelemetry.io/collector/pipeline v0.124.0 // indirect
go.opentelemetry.io/contrib/bridges/otelzap v0.10.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion service/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ require (
go.opentelemetry.io/collector/receiver/xreceiver v0.124.0
go.opentelemetry.io/collector/semconv v0.124.0
go.opentelemetry.io/collector/service/hostcapabilities v0.124.0
go.opentelemetry.io/contrib/bridges/otelzap v0.10.0
go.opentelemetry.io/contrib/otelconf v0.15.0
go.opentelemetry.io/contrib/propagators/b3 v1.35.0
go.opentelemetry.io/otel v1.35.0
Expand Down Expand Up @@ -109,7 +110,6 @@ require (
go.opentelemetry.io/collector/consumer/consumererror v0.124.0 // indirect
go.opentelemetry.io/collector/extension/extensionauth v1.30.0 // indirect
go.opentelemetry.io/collector/extension/extensionmiddleware v0.0.0-20250422165940-c47951a8bf71 // indirect
go.opentelemetry.io/contrib/bridges/otelzap v0.10.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
go.opentelemetry.io/contrib/zpages v0.60.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.11.0 // indirect
Expand Down
8 changes: 3 additions & 5 deletions service/internal/graph/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,10 @@
builder *builders.ConnectorBuilder,
nexts []baseConsumer,
) error {
set := connector.Settings{
ID: n.componentID,
TelemetrySettings: telemetry.WithAttributeSet(tel, *n.Set()),
BuildInfo: info,
set := connector.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
if telemetry.NewPipelineTelemetryGate.IsEnabled() {
set.TelemetrySettings = telemetry.WithAttributeSet(set.TelemetrySettings, *n.Set())

Check warning on line 54 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L54

Added line #L54 was not covered by tests
}

switch n.rcvrPipelineType {
case pipeline.SignalTraces:
return n.buildTraces(ctx, set, builder, nexts)
Expand Down
8 changes: 3 additions & 5 deletions service/internal/graph/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,10 @@
info component.BuildInfo,
builder *builders.ExporterBuilder,
) error {
set := exporter.Settings{
ID: n.componentID,
TelemetrySettings: telemetry.WithAttributeSet(tel, *n.Set()),
BuildInfo: info,
set := exporter.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
if telemetry.NewPipelineTelemetryGate.IsEnabled() {
set.TelemetrySettings = telemetry.WithAttributeSet(set.TelemetrySettings, *n.Set())

Check warning on line 50 in service/internal/graph/exporter.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/exporter.go#L50

Added line #L50 was not covered by tests
}

var err error
switch n.pipelineType {
case pipeline.SignalTraces:
Expand Down
8 changes: 3 additions & 5 deletions service/internal/graph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,10 @@
builder *builders.ProcessorBuilder,
next baseConsumer,
) error {
set := processor.Settings{
ID: n.componentID,
TelemetrySettings: telemetry.WithAttributeSet(tel, *n.Set()),
BuildInfo: info,
set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
if telemetry.NewPipelineTelemetryGate.IsEnabled() {
set.TelemetrySettings = telemetry.WithAttributeSet(set.TelemetrySettings, *n.Set())

Check warning on line 52 in service/internal/graph/processor.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/processor.go#L52

Added line #L52 was not covered by tests
}

var err error
switch n.pipelineID.Signal() {
case pipeline.SignalTraces:
Expand Down
8 changes: 3 additions & 5 deletions service/internal/graph/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,10 @@
builder *builders.ReceiverBuilder,
nexts []baseConsumer,
) error {
set := receiver.Settings{
ID: n.componentID,
TelemetrySettings: telemetry.WithAttributeSet(tel, *n.Set()),
BuildInfo: info,
set := receiver.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
if telemetry.NewPipelineTelemetryGate.IsEnabled() {
set.TelemetrySettings = telemetry.WithAttributeSet(set.TelemetrySettings, *n.Set())

Check warning on line 47 in service/internal/graph/receiver.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/receiver.go#L47

Added line #L47 was not covered by tests
}

var err error
switch n.pipelineType {
case pipeline.SignalTraces:
Expand Down
55 changes: 41 additions & 14 deletions service/telemetry/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
package telemetry // import "go.opentelemetry.io/collector/service/telemetry"

import (
"go.opentelemetry.io/contrib/bridges/otelzap"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/log"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/internal/telemetry"
"go.opentelemetry.io/collector/internal/telemetry/componentattribute"
)

Expand Down Expand Up @@ -41,29 +43,54 @@

var lp log.LoggerProvider

logger = logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
core = componentattribute.NewConsoleCoreWithAttributes(core, attribute.NewSet())
if telemetry.NewPipelineTelemetryGate.IsEnabled() {
logger = logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
core = componentattribute.NewConsoleCoreWithAttributes(core, attribute.NewSet())

if len(cfg.Logs.Processors) > 0 && set.SDK != nil {
lp = set.SDK.LoggerProvider()

core = componentattribute.NewOTelTeeCoreWithAttributes(
core,
lp,
"go.opentelemetry.io/collector/service/telemetry",
cfg.Logs.Level,
attribute.NewSet(),
)
}

if cfg.Logs.Sampling != nil && cfg.Logs.Sampling.Enabled {
core = componentattribute.NewWrapperCoreWithAttributes(core, func(c zapcore.Core) zapcore.Core {
return newSampledCore(c, cfg.Logs.Sampling)
})
}

return core
}))
} else {
if len(cfg.Logs.Processors) > 0 && set.SDK != nil {
lp = set.SDK.LoggerProvider()

core = componentattribute.NewOTelTeeCoreWithAttributes(
core,
lp,
"go.opentelemetry.io/collector/service/telemetry",
cfg.Logs.Level,
attribute.NewSet(),
)
logger = logger.WithOptions(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
core, err := zapcore.NewIncreaseLevelCore(zapcore.NewTee(
c,
otelzap.NewCore("go.opentelemetry.io/collector/service/telemetry",
otelzap.WithLoggerProvider(lp),
),
), zap.NewAtomicLevelAt(cfg.Logs.Level))
if err != nil {
panic(err)

Check warning on line 82 in service/telemetry/logger.go

View check run for this annotation

Codecov / codecov/patch

service/telemetry/logger.go#L82

Added line #L82 was not covered by tests
}
return core
}))
}

if cfg.Logs.Sampling != nil && cfg.Logs.Sampling.Enabled {
core = componentattribute.NewWrapperCoreWithAttributes(core, func(c zapcore.Core) zapcore.Core {
logger = logger.WithOptions(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return newSampledCore(c, cfg.Logs.Sampling)
})
}))
}

return core
}))
}

return logger, lp, nil
}
Expand Down
39 changes: 30 additions & 9 deletions service/telemetry/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,26 @@ import (
"github.com/stretchr/testify/require"
config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/telemetry"
)

func setGate(t *testing.T, gate *featuregate.Gate, value bool) {
initialValue := gate.IsEnabled()
require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), value))
t.Cleanup(func() {
_ = featuregate.GlobalRegistry().Set(gate.ID(), initialValue)
})
}

func TestNewLogger(t *testing.T) {
tests := []struct {
name string
wantCoreType any
wantErr error
cfg Config
name string
wantCoreType any
wantCoreTypeRfc any
wantErr error
cfg Config
}{
{
name: "no log config",
Expand All @@ -40,7 +52,8 @@ func TestNewLogger(t *testing.T) {
InitialFields: map[string]any{"fieldKey": "filed-value"},
},
},
wantCoreType: "*componentattribute.consoleCoreWithAttributes",
wantCoreType: "*zapcore.ioCore",
wantCoreTypeRfc: "*componentattribute.consoleCoreWithAttributes",
},
{
name: "log config with processors",
Expand All @@ -63,7 +76,8 @@ func TestNewLogger(t *testing.T) {
},
},
},
wantCoreType: "*componentattribute.otelTeeCoreWithAttributes",
wantCoreType: "*zapcore.levelFilterCore",
wantCoreTypeRfc: "*componentattribute.otelTeeCoreWithAttributes",
},
{
name: "log config with sampling",
Expand All @@ -85,7 +99,8 @@ func TestNewLogger(t *testing.T) {
InitialFields: map[string]any(nil),
},
},
wantCoreType: "*componentattribute.wrapperCoreWithAttributes",
wantCoreType: "*zapcore.sampler",
wantCoreTypeRfc: "*componentattribute.wrapperCoreWithAttributes",
},
}
for _, tt := range tests {
Expand All @@ -110,7 +125,13 @@ func TestNewLogger(t *testing.T) {
}
}
}

testCoreType(t, tt.wantCoreType)
t.Run(tt.name, func(t *testing.T) {
setGate(t, telemetry.NewPipelineTelemetryGate, false)
testCoreType(t, tt.wantCoreType)
})
t.Run(tt.name+" (pipeline telemetry on)", func(t *testing.T) {
setGate(t, telemetry.NewPipelineTelemetryGate, true)
testCoreType(t, tt.wantCoreTypeRfc)
})
}
}
Loading