Skip to content

Commit e904848

Browse files
authored
Ensure context propagated, logger enhancement (#829)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 995b153 commit e904848

9 files changed

+139
-119
lines changed

service/builder/builder.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@ const (
2323
// flags
2424
configCfg = "config"
2525
memBallastFlag = "mem-ballast-size-mib"
26+
27+
kindLogKey = "component_kind"
28+
kindLogReceiver = "receiver"
29+
kindLogProcessor = "processor"
30+
kindLogExporter = "exporter"
31+
typeLogKey = "component_type"
32+
nameLogKey = "component_name"
2633
)
2734

2835
var (

service/builder/exporters_builder.go

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,22 +29,23 @@ import (
2929
// builtExporter is an exporter that is built based on a config. It can have
3030
// a trace and/or a metrics consumer and have a shutdown function.
3131
type builtExporter struct {
32-
te component.TraceExporterBase
33-
me component.MetricsExporterBase
32+
logger *zap.Logger
33+
te component.TraceExporterBase
34+
me component.MetricsExporterBase
3435
}
3536

3637
// Start the exporter.
3738
func (exp *builtExporter) Start(ctx context.Context, host component.Host) error {
3839
var errors []error
3940
if exp.te != nil {
40-
err := exp.te.Start(context.Background(), host)
41+
err := exp.te.Start(ctx, host)
4142
if err != nil {
4243
errors = append(errors, err)
4344
}
4445
}
4546

4647
if exp.me != nil {
47-
err := exp.me.Start(context.Background(), host)
48+
err := exp.me.Start(ctx, host)
4849
if err != nil {
4950
errors = append(errors, err)
5051
}
@@ -54,16 +55,16 @@ func (exp *builtExporter) Start(ctx context.Context, host component.Host) error
5455
}
5556

5657
// Shutdown the trace component and the metrics component of an exporter.
57-
func (exp *builtExporter) Shutdown(context.Context) error {
58+
func (exp *builtExporter) Shutdown(ctx context.Context) error {
5859
var errors []error
5960
if exp.te != nil {
60-
if err := exp.te.Shutdown(context.Background()); err != nil {
61+
if err := exp.te.Shutdown(ctx); err != nil {
6162
errors = append(errors, err)
6263
}
6364
}
6465

6566
if exp.me != nil {
66-
if err := exp.me.Shutdown(context.Background()); err != nil {
67+
if err := exp.me.Shutdown(ctx); err != nil {
6768
errors = append(errors, err)
6869
}
6970
}
@@ -75,23 +76,23 @@ func (exp *builtExporter) Shutdown(context.Context) error {
7576
type Exporters map[configmodels.Exporter]*builtExporter
7677

7778
// StartAll starts all exporters.
78-
func (exps Exporters) StartAll(logger *zap.Logger, host component.Host) error {
79-
for cfg, exp := range exps {
80-
logger.Info("Exporter is starting...", zap.String("exporter", cfg.Name()))
79+
func (exps Exporters) StartAll(ctx context.Context, host component.Host) error {
80+
for _, exp := range exps {
81+
exp.logger.Info("Exporter is starting...")
8182

82-
if err := exp.Start(context.Background(), host); err != nil {
83+
if err := exp.Start(ctx, host); err != nil {
8384
return err
8485
}
85-
logger.Info("Exporter started.", zap.String("exporter", cfg.Name()))
86+
exp.logger.Info("Exporter started.")
8687
}
8788
return nil
8889
}
8990

9091
// ShutdownAll stops all exporters.
91-
func (exps Exporters) ShutdownAll() error {
92+
func (exps Exporters) ShutdownAll(ctx context.Context) error {
9293
var errs []error
9394
for _, exp := range exps {
94-
err := exp.Shutdown(context.Background())
95+
err := exp.Shutdown(ctx)
9596
if err != nil {
9697
errs = append(errs, err)
9798
}
@@ -127,7 +128,7 @@ func NewExportersBuilder(
127128
config *configmodels.Config,
128129
factories map[string]component.ExporterFactoryBase,
129130
) *ExportersBuilder {
130-
return &ExportersBuilder{logger, config, factories}
131+
return &ExportersBuilder{logger.With(zap.String(kindLogKey, kindLogExporter)), config, factories}
131132
}
132133

133134
// BuildExporters exporters from config.
@@ -140,7 +141,8 @@ func (eb *ExportersBuilder) Build() (Exporters, error) {
140141

141142
// BuildExporters exporters based on configuration and required input data types.
142143
for _, cfg := range eb.config.Exporters {
143-
exp, err := eb.buildExporter(cfg, exporterInputDataTypes)
144+
componentLogger := eb.logger.With(zap.String(typeLogKey, cfg.Type()), zap.String(nameLogKey, cfg.Name()))
145+
exp, err := eb.buildExporter(componentLogger, cfg, exporterInputDataTypes)
144146
if err != nil {
145147
return nil, err
146148
}
@@ -185,6 +187,7 @@ func (eb *ExportersBuilder) calcExportersRequiredDataTypes() exportersRequiredDa
185187
}
186188

187189
func (eb *ExportersBuilder) buildExporter(
190+
logger *zap.Logger,
188191
config configmodels.Exporter,
189192
exportersInputDataTypes exportersRequiredDataTypes,
190193
) (*builtExporter, error) {
@@ -193,22 +196,23 @@ func (eb *ExportersBuilder) buildExporter(
193196
return nil, fmt.Errorf("exporter factory not found for type: %s", config.Type())
194197
}
195198

196-
exporter := &builtExporter{}
199+
exporter := &builtExporter{
200+
logger: logger,
201+
}
197202

198203
inputDataTypes := exportersInputDataTypes[config]
199204
if inputDataTypes == nil {
200205
// TODO https://github.com/open-telemetry/opentelemetry-collector/issues/294
201206
// Move this validation to config/config.go:validateConfig
202207
// No data types where requested for this exporter. This can only happen
203208
// if there are no pipelines associated with the exporter.
204-
eb.logger.Warn("Exporter " + config.Name() +
205-
" is not associated with any pipeline and will not export data.")
209+
logger.Warn("Exportee is not associated with any pipeline and will not export data.")
206210
return exporter, nil
207211
}
208212

209213
if requirement, ok := inputDataTypes[configmodels.TracesDataType]; ok {
210214
// Traces data type is required. Create a trace exporter based on config.
211-
te, err := createTraceExporter(factory, eb.logger, config)
215+
te, err := createTraceExporter(factory, logger, config)
212216
if err != nil {
213217
if err == configerror.ErrDataTypeIsNotSupported {
214218
// Could not create because this exporter does not support this data type.
@@ -227,7 +231,7 @@ func (eb *ExportersBuilder) buildExporter(
227231

228232
if requirement, ok := inputDataTypes[configmodels.MetricsDataType]; ok {
229233
// Metrics data type is required. Create a trace exporter based on config.
230-
me, err := createMetricsExporter(factory, eb.logger, config)
234+
me, err := createMetricsExporter(factory, logger, config)
231235
if err != nil {
232236
if err == configerror.ErrDataTypeIsNotSupported {
233237
// Could not create because this exporter does not support this data type.

service/builder/exporters_builder_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,7 @@ func TestExportersBuilder_Build(t *testing.T) {
7373
assert.Nil(t, e1.me)
7474

7575
// Ensure it can be started.
76-
err = exporters.StartAll(zap.NewNop(), componenttest.NewNopHost())
77-
assert.NoError(t, err)
76+
assert.NoError(t, exporters.StartAll(context.Background(), componenttest.NewNopHost()))
7877

7978
// Ensure it can be stopped.
8079
if err = e1.Shutdown(context.Background()); err != nil {
@@ -110,14 +109,14 @@ func TestExportersBuilder_StartAll(t *testing.T) {
110109
traceExporter := &config.ExampleExporterConsumer{}
111110
metricExporter := &config.ExampleExporterConsumer{}
112111
exporters[expCfg] = &builtExporter{
113-
te: traceExporter,
114-
me: metricExporter,
112+
logger: zap.NewNop(),
113+
te: traceExporter,
114+
me: metricExporter,
115115
}
116116
assert.False(t, traceExporter.ExporterStarted)
117117
assert.False(t, metricExporter.ExporterStarted)
118118

119-
err := exporters.StartAll(zap.NewNop(), componenttest.NewNopHost())
120-
assert.NoError(t, err)
119+
assert.NoError(t, exporters.StartAll(context.Background(), componenttest.NewNopHost()))
121120

122121
assert.True(t, traceExporter.ExporterStarted)
123122
assert.True(t, metricExporter.ExporterStarted)
@@ -129,12 +128,13 @@ func TestExportersBuilder_StopAll(t *testing.T) {
129128
traceExporter := &config.ExampleExporterConsumer{}
130129
metricExporter := &config.ExampleExporterConsumer{}
131130
exporters[expCfg] = &builtExporter{
132-
te: traceExporter,
133-
me: metricExporter,
131+
logger: zap.NewNop(),
132+
te: traceExporter,
133+
me: metricExporter,
134134
}
135135
assert.False(t, traceExporter.ExporterShutdown)
136136
assert.False(t, metricExporter.ExporterShutdown)
137-
exporters.ShutdownAll()
137+
assert.NoError(t, exporters.ShutdownAll(context.Background()))
138138

139139
assert.True(t, traceExporter.ExporterShutdown)
140140
assert.True(t, metricExporter.ExporterShutdown)
@@ -194,10 +194,10 @@ func (b *badExporterFactory) CreateDefaultConfig() configmodels.Exporter {
194194
return &configmodels.ExporterSettings{}
195195
}
196196

197-
func (b *badExporterFactory) CreateTraceExporter(logger *zap.Logger, cfg configmodels.Exporter) (component.TraceExporterOld, error) {
197+
func (b *badExporterFactory) CreateTraceExporter(_ *zap.Logger, _ configmodels.Exporter) (component.TraceExporterOld, error) {
198198
return nil, nil
199199
}
200200

201-
func (b *badExporterFactory) CreateMetricsExporter(logger *zap.Logger, cfg configmodels.Exporter) (component.MetricsExporterOld, error) {
201+
func (b *badExporterFactory) CreateMetricsExporter(_ *zap.Logger, _ configmodels.Exporter) (component.MetricsExporterOld, error) {
202202
return nil, nil
203203
}

service/builder/pipelines_builder.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
// It can have a trace and/or a metrics consumer (the consumer is either the first
3333
// processor in the pipeline or the exporter if pipeline has no processors).
3434
type builtPipeline struct {
35+
logger *zap.Logger
3536
firstTC consumer.TraceConsumerBase
3637
firstMC consumer.MetricsConsumerBase
3738

@@ -45,33 +46,33 @@ type builtPipeline struct {
4546
// BuiltPipelines is a map of build pipelines created from pipeline configs.
4647
type BuiltPipelines map[*configmodels.Pipeline]*builtPipeline
4748

48-
func (bps BuiltPipelines) StartProcessors(logger *zap.Logger, host component.Host) error {
49-
for cfg, bp := range bps {
50-
logger.Info("Pipeline is starting...", zap.String("pipeline", cfg.Name))
49+
func (bps BuiltPipelines) StartProcessors(ctx context.Context, host component.Host) error {
50+
for _, bp := range bps {
51+
bp.logger.Info("Pipeline is starting...")
5152
// Start in reverse order, starting from the back of processors pipeline.
5253
// This is important so that processors that are earlier in the pipeline and
5354
// reference processors that are later in the pipeline do not start sending
5455
// data to later pipelines which are not yet started.
5556
for i := len(bp.processors) - 1; i >= 0; i-- {
56-
if err := bp.processors[i].Start(context.Background(), host); err != nil {
57+
if err := bp.processors[i].Start(ctx, host); err != nil {
5758
return err
5859
}
5960
}
60-
logger.Info("Pipeline is started.", zap.String("pipeline", cfg.Name))
61+
bp.logger.Info("Pipeline is started.")
6162
}
6263
return nil
6364
}
6465

65-
func (bps BuiltPipelines) ShutdownProcessors(logger *zap.Logger) error {
66+
func (bps BuiltPipelines) ShutdownProcessors(ctx context.Context) error {
6667
var errs []error
67-
for cfg, bp := range bps {
68-
logger.Info("Pipeline is shutting down...", zap.String("pipeline", cfg.Name))
68+
for _, bp := range bps {
69+
bp.logger.Info("Pipeline is shutting down...")
6970
for _, p := range bp.processors {
70-
if err := p.Shutdown(context.Background()); err != nil {
71+
if err := p.Shutdown(ctx); err != nil {
7172
errs = append(errs, err)
7273
}
7374
}
74-
logger.Info("Pipeline is shutdown.", zap.String("pipeline", cfg.Name))
75+
bp.logger.Info("Pipeline is shutdown.")
7576
}
7677

7778
if len(errs) != 0 {
@@ -117,8 +118,7 @@ func (pb *PipelinesBuilder) Build() (BuiltPipelines, error) {
117118
// Builds a pipeline of processors. Returns the first processor in the pipeline.
118119
// The last processor in the pipeline will be plugged to fan out the data into exporters
119120
// that are configured for this pipeline.
120-
func (pb *PipelinesBuilder) buildPipeline(
121-
pipelineCfg *configmodels.Pipeline,
121+
func (pb *PipelinesBuilder) buildPipeline(pipelineCfg *configmodels.Pipeline,
122122
) (*builtPipeline, error) {
123123

124124
// BuildProcessors the pipeline backwards.
@@ -152,18 +152,19 @@ func (pb *PipelinesBuilder) buildPipeline(
152152
// it becomes the next for the previous one (previous in the pipeline,
153153
// which we will build in the next loop iteration).
154154
var err error
155+
componentLogger := pb.logger.With(zap.String(kindLogKey, kindLogProcessor), zap.String(typeLogKey, procCfg.Type()), zap.String(nameLogKey, procCfg.Name()))
155156
switch pipelineCfg.InputType {
156157
case configmodels.TracesDataType:
157158
var proc component.TraceProcessorBase
158-
proc, err = createTraceProcessor(factory, pb.logger, procCfg, tc)
159+
proc, err = createTraceProcessor(factory, componentLogger, procCfg, tc)
159160
if proc != nil {
160161
mutatesConsumedData = mutatesConsumedData || proc.GetCapabilities().MutatesConsumedData
161162
}
162163
processors[i] = proc
163164
tc = proc
164165
case configmodels.MetricsDataType:
165166
var proc component.MetricsProcessorBase
166-
proc, err = createMetricsProcessor(factory, pb.logger, procCfg, mc)
167+
proc, err = createMetricsProcessor(factory, componentLogger, procCfg, mc)
167168
if proc != nil {
168169
mutatesConsumedData = mutatesConsumedData || proc.GetCapabilities().MutatesConsumedData
169170
}
@@ -182,9 +183,12 @@ func (pb *PipelinesBuilder) buildPipeline(
182183
}
183184
}
184185

185-
pb.logger.Info("Pipeline is enabled.", zap.String("pipelines", pipelineCfg.Name))
186+
pipelineLogger := pb.logger.With(zap.String("pipeline_name", pipelineCfg.Name),
187+
zap.String("pipeline_datatype", pipelineCfg.InputType.GetString()))
188+
pipelineLogger.Info("Pipeline is enabled.")
186189

187190
bp := &builtPipeline{
191+
pipelineLogger,
188192
tc,
189193
mc,
190194
mutatesConsumedData,

service/builder/pipelines_builder_test.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,7 @@ func testPipeline(t *testing.T, pipelineName string, exporterNames []string) {
161161
assert.NoError(t, err)
162162
require.NotNil(t, pipelineProcessors)
163163

164-
err = pipelineProcessors.StartProcessors(zap.NewNop(), componenttest.NewNopHost())
165-
assert.NoError(t, err)
164+
assert.NoError(t, pipelineProcessors.StartProcessors(context.Background(), componenttest.NewNopHost()))
166165

167166
processor := pipelineProcessors[cfg.Service.Pipelines[pipelineName]]
168167

@@ -201,7 +200,7 @@ func testPipeline(t *testing.T, pipelineName string, exporterNames []string) {
201200
assertEqualTraceData(t, generateTestTraceDataWithAttributes(), consumer.Traces[0])
202201
}
203202

204-
err = pipelineProcessors.ShutdownProcessors(zap.NewNop())
203+
err = pipelineProcessors.ShutdownProcessors(context.Background())
205204
assert.NoError(t, err)
206205
}
207206

@@ -273,17 +272,17 @@ func (b *badProcessorFactory) CreateDefaultConfig() configmodels.Processor {
273272
}
274273

275274
func (b *badProcessorFactory) CreateTraceProcessor(
276-
logger *zap.Logger,
277-
nextConsumer consumer.TraceConsumerOld,
278-
cfg configmodels.Processor,
275+
_ *zap.Logger,
276+
_ consumer.TraceConsumerOld,
277+
_ configmodels.Processor,
279278
) (component.TraceProcessorOld, error) {
280279
return nil, nil
281280
}
282281

283282
func (b *badProcessorFactory) CreateMetricsProcessor(
284-
logger *zap.Logger,
285-
consumer consumer.MetricsConsumerOld,
286-
cfg configmodels.Processor,
283+
_ *zap.Logger,
284+
_ consumer.MetricsConsumerOld,
285+
_ configmodels.Processor,
287286
) (component.MetricsProcessorOld, error) {
288287
return nil, nil
289288
}

0 commit comments

Comments
 (0)