Skip to content

Commit 1eb962c

Browse files
authored
Move extension creation into a builder class (#830)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 0be3aa7 commit 1eb962c

File tree

4 files changed

+218
-110
lines changed

4 files changed

+218
-110
lines changed

service/builder/builder.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const (
2828
kindLogReceiver = "receiver"
2929
kindLogProcessor = "processor"
3030
kindLogExporter = "exporter"
31+
kindLogExtension = "extension"
3132
typeLogKey = "component_type"
3233
nameLogKey = "component_name"
3334
)

service/builder/extensions_builder.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
// Copyright 2020, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package builder
16+
17+
import (
18+
"context"
19+
20+
"github.com/pkg/errors"
21+
"go.uber.org/zap"
22+
23+
"github.com/open-telemetry/opentelemetry-collector/component"
24+
"github.com/open-telemetry/opentelemetry-collector/component/componenterror"
25+
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
26+
)
27+
28+
// builtExporter is an exporter that is built based on a config. It can have
29+
// a trace and/or a metrics consumer and have a shutdown function.
30+
type builtExtension struct {
31+
logger *zap.Logger
32+
extension component.ServiceExtension
33+
}
34+
35+
// Start the receiver.
36+
func (ext *builtExtension) Start(ctx context.Context, host component.Host) error {
37+
return ext.extension.Start(ctx, host)
38+
}
39+
40+
// Stop the receiver.
41+
func (ext *builtExtension) Shutdown(ctx context.Context) error {
42+
return ext.extension.Shutdown(ctx)
43+
}
44+
45+
var _ component.ServiceExtension = (*builtExtension)(nil)
46+
47+
// Exporters is a map of exporters created from exporter configs.
48+
type Extensions map[configmodels.Extension]*builtExtension
49+
50+
// StartAll starts all exporters.
51+
func (exts Extensions) StartAll(ctx context.Context, host component.Host) error {
52+
for _, ext := range exts {
53+
ext.logger.Info("Extension is starting...")
54+
55+
if err := ext.Start(ctx, host); err != nil {
56+
return err
57+
}
58+
59+
ext.logger.Info("Extension started.")
60+
}
61+
return nil
62+
}
63+
64+
// ShutdownAll stops all exporters.
65+
func (exts Extensions) ShutdownAll(ctx context.Context) error {
66+
var errs []error
67+
for _, ext := range exts {
68+
err := ext.Shutdown(ctx)
69+
if err != nil {
70+
errs = append(errs, err)
71+
}
72+
}
73+
74+
if len(errs) != 0 {
75+
return componenterror.CombineErrors(errs)
76+
}
77+
return nil
78+
}
79+
80+
func (exts Extensions) NotifyPipelineReady() error {
81+
for _, ext := range exts {
82+
if pw, ok := ext.extension.(component.PipelineWatcher); ok {
83+
if err := pw.Ready(); err != nil {
84+
ext.logger.Error("Error notifying extension that the pipeline was started.")
85+
return err
86+
}
87+
}
88+
}
89+
90+
return nil
91+
}
92+
93+
func (exts Extensions) NotifyPipelineNotReady() error {
94+
// Notify extensions in reverse order.
95+
var errs []error
96+
for _, ext := range exts {
97+
if pw, ok := ext.extension.(component.PipelineWatcher); ok {
98+
if err := pw.NotReady(); err != nil {
99+
ext.logger.Error("Error notifying extension that the pipeline was shutdown.")
100+
errs = append(errs, err)
101+
}
102+
}
103+
}
104+
105+
if len(errs) != 0 {
106+
return componenterror.CombineErrors(errs)
107+
}
108+
109+
return nil
110+
}
111+
112+
func (exts Extensions) GetServiceExtensions() map[configmodels.Extension]component.ServiceExtension {
113+
result := make(map[configmodels.Extension]component.ServiceExtension, len(exts))
114+
for k, v := range exts {
115+
result[k] = v.extension
116+
}
117+
return result
118+
}
119+
120+
// ExportersBuilder builds exporters from config.
121+
type ExtensionsBuilder struct {
122+
logger *zap.Logger
123+
config *configmodels.Config
124+
factories map[string]component.ExtensionFactory
125+
}
126+
127+
// NewExportersBuilder creates a new ExportersBuilder. Call BuildExporters() on the returned value.
128+
func NewExtensionsBuilder(
129+
logger *zap.Logger,
130+
config *configmodels.Config,
131+
factories map[string]component.ExtensionFactory,
132+
) *ExtensionsBuilder {
133+
return &ExtensionsBuilder{logger.With(zap.String(kindLogKey, kindLogExtension)), config, factories}
134+
}
135+
136+
// Build extensions from config.
137+
func (eb *ExtensionsBuilder) Build() (Extensions, error) {
138+
extensions := make(Extensions)
139+
140+
for _, extName := range eb.config.Service.Extensions {
141+
extCfg, exists := eb.config.Extensions[extName]
142+
if !exists {
143+
return nil, errors.Errorf("extension %q is not configured", extName)
144+
}
145+
146+
componentLogger := eb.logger.With(zap.String(typeLogKey, extCfg.Type()), zap.String(nameLogKey, extCfg.Name()))
147+
ext, err := eb.buildExtension(componentLogger, extCfg)
148+
if err != nil {
149+
return nil, err
150+
}
151+
152+
extensions[extCfg] = ext
153+
}
154+
155+
return extensions, nil
156+
}
157+
158+
func (eb *ExtensionsBuilder) buildExtension(logger *zap.Logger, cfg configmodels.Extension) (*builtExtension, error) {
159+
factory := eb.factories[cfg.Type()]
160+
if factory == nil {
161+
return nil, errors.Errorf("extension factory for type %q is not configured", cfg.Type())
162+
}
163+
164+
ext := &builtExtension{
165+
logger: logger,
166+
}
167+
168+
ex, err := factory.CreateExtension(context.Background(), component.ExtensionCreateParams{Logger: eb.logger}, cfg)
169+
if err != nil {
170+
return nil, errors.Wrapf(err, "failed to create extension %q", cfg.Name())
171+
}
172+
173+
// Check if the factory really created the extension.
174+
if ex == nil {
175+
return nil, errors.Errorf("factory for %q produced a nil extension", cfg.Name())
176+
}
177+
178+
ext.extension = ex
179+
180+
return ext, nil
181+
}

service/service.go

Lines changed: 29 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,18 @@ import (
4040

4141
// Application represents a collector application
4242
type Application struct {
43-
info ApplicationStartInfo
44-
rootCmd *cobra.Command
45-
v *viper.Viper
46-
logger *zap.Logger
47-
exporters builder.Exporters
48-
builtReceivers builder.Receivers
49-
builtPipelines builder.BuiltPipelines
43+
info ApplicationStartInfo
44+
rootCmd *cobra.Command
45+
v *viper.Viper
46+
logger *zap.Logger
47+
builtExporters builder.Exporters
48+
builtReceivers builder.Receivers
49+
builtPipelines builder.BuiltPipelines
50+
builtExtensions builder.Extensions
5051

5152
factories config.Factories
5253
config *configmodels.Config
5354

54-
extensionsList []component.ServiceExtension
55-
extensionsMap map[configmodels.Extension]component.ServiceExtension
56-
5755
// stopTestChan is used to terminate the application in end to end tests.
5856
stopTestChan chan struct{}
5957
// readyChan is used in tests to indicate that the application is ready.
@@ -184,7 +182,7 @@ func (app *Application) GetFactory(kind component.Kind, componentType string) co
184182
}
185183

186184
func (app *Application) GetExtensions() map[configmodels.Extension]component.ServiceExtension {
187-
return app.extensionsMap
185+
return app.builtExtensions.GetServiceExtensions()
188186
}
189187

190188
func (app *Application) init() error {
@@ -262,37 +260,13 @@ func (app *Application) setupConfigurationComponents(ctx context.Context, factor
262260
}
263261

264262
func (app *Application) setupExtensions(ctx context.Context) error {
265-
app.extensionsMap = make(map[configmodels.Extension]component.ServiceExtension)
266-
for _, extName := range app.config.Service.Extensions {
267-
extCfg, exists := app.config.Extensions[extName]
268-
if !exists {
269-
return errors.Errorf("extension %q is not configured", extName)
270-
}
271-
272-
factory, exists := app.factories.Extensions[extCfg.Type()]
273-
if !exists {
274-
return errors.Errorf("extension factory for type %q is not configured", extCfg.Type())
275-
}
276-
277-
ext, err := factory.CreateExtension(ctx, component.ExtensionCreateParams{Logger: app.logger}, extCfg)
278-
if err != nil {
279-
return errors.Wrapf(err, "failed to create extension %q", extName)
280-
}
281-
282-
// Check if the factory really created the extension.
283-
if ext == nil {
284-
return errors.Errorf("factory for %q produced a nil extension", extName)
285-
}
286-
287-
if err := ext.Start(ctx, app); err != nil {
288-
return errors.Wrapf(err, "error starting extension %q", extName)
289-
}
290-
291-
app.extensionsList = append(app.extensionsList, ext)
292-
app.extensionsMap[extCfg] = ext
263+
var err error
264+
app.builtExtensions, err = builder.NewExtensionsBuilder(app.logger, app.config, app.factories.Extensions).Build()
265+
if err != nil {
266+
return errors.Wrap(err, "cannot build builtExtensions")
293267
}
294-
295-
return nil
268+
app.logger.Info("Starting extensions...")
269+
return app.builtExtensions.StartAll(ctx, app)
296270
}
297271

298272
func (app *Application) setupPipelines(ctx context.Context) error {
@@ -301,19 +275,20 @@ func (app *Application) setupPipelines(ctx context.Context) error {
301275

302276
// First create exporters.
303277
var err error
304-
app.exporters, err = builder.NewExportersBuilder(app.logger, app.config, app.factories.Exporters).Build()
278+
app.builtExporters, err = builder.NewExportersBuilder(app.logger, app.config, app.factories.Exporters).Build()
305279
if err != nil {
306-
return errors.Wrap(err, "cannot build exporters")
280+
return errors.Wrap(err, "cannot build builtExporters")
307281
}
308282
app.logger.Info("Starting exporters...")
309-
err = app.exporters.StartAll(ctx, app)
283+
284+
err = app.builtExporters.StartAll(ctx, app)
310285
if err != nil {
311-
return errors.Wrap(err, "cannot start exporters")
286+
return errors.Wrap(err, "cannot start builtExporters")
312287
}
313288

314289
// Create pipelines and their processors and plug exporters to the
315290
// end of the pipelines.
316-
app.builtPipelines, err = builder.NewPipelinesBuilder(app.logger, app.config, app.exporters, app.factories.Processors).Build()
291+
app.builtPipelines, err = builder.NewPipelinesBuilder(app.logger, app.config, app.builtExporters, app.factories.Processors).Build()
317292
if err != nil {
318293
return errors.Wrap(err, "cannot build pipelines")
319294
}
@@ -339,43 +314,6 @@ func (app *Application) setupPipelines(ctx context.Context) error {
339314
return nil
340315
}
341316

342-
func (app *Application) notifyPipelineReady() error {
343-
for i, ext := range app.extensionsList {
344-
if pw, ok := ext.(component.PipelineWatcher); ok {
345-
if err := pw.Ready(); err != nil {
346-
return errors.Wrapf(
347-
err,
348-
"error notifying extension %q that the pipeline was started",
349-
app.config.Service.Extensions[i],
350-
)
351-
}
352-
}
353-
}
354-
355-
return nil
356-
}
357-
358-
func (app *Application) notifyPipelineNotReady() error {
359-
// Notify extensions in reverse order.
360-
var errs []error
361-
for i := len(app.extensionsList) - 1; i >= 0; i-- {
362-
ext := app.extensionsList[i]
363-
if pw, ok := ext.(component.PipelineWatcher); ok {
364-
if err := pw.NotReady(); err != nil {
365-
errs = append(errs, errors.Wrapf(err,
366-
"error notifying extension %q that the pipeline was shutdown",
367-
app.config.Service.Extensions[i]))
368-
}
369-
}
370-
}
371-
372-
if len(errs) != 0 {
373-
return componenterror.CombineErrors(errs)
374-
}
375-
376-
return nil
377-
}
378-
379317
func (app *Application) shutdownPipelines(ctx context.Context) error {
380318
// Shutdown order is the reverse of building: first receivers, then flushing pipelines
381319
// giving senders a chance to send all their data. This may take time, the allowed
@@ -395,8 +333,8 @@ func (app *Application) shutdownPipelines(ctx context.Context) error {
395333
errs = append(errs, errors.Wrap(err, "failed to shutdown processors"))
396334
}
397335

398-
app.logger.Info("Shutting down exporters...")
399-
err = app.exporters.ShutdownAll(ctx)
336+
app.logger.Info("Stopping exporters...")
337+
err = app.builtExporters.ShutdownAll(ctx)
400338
if err != nil {
401339
errs = append(errs, errors.Wrap(err, "failed to shutdown exporters"))
402340
}
@@ -409,21 +347,11 @@ func (app *Application) shutdownPipelines(ctx context.Context) error {
409347
}
410348

411349
func (app *Application) shutdownExtensions(ctx context.Context) error {
412-
// Shutdown extensions in reverse order.
413-
var errs []error
414-
for i := len(app.extensionsList) - 1; i >= 0; i-- {
415-
ext := app.extensionsList[i]
416-
if err := ext.Shutdown(ctx); err != nil {
417-
errs = append(errs, errors.Wrapf(err,
418-
"error shutting down extension %q",
419-
app.config.Service.Extensions[i]))
420-
}
421-
}
422-
423-
if len(errs) != 0 {
424-
return componenterror.CombineErrors(errs)
350+
app.logger.Info("Stopping extensions...")
351+
err := app.builtExtensions.ShutdownAll(ctx)
352+
if err != nil {
353+
return errors.Wrap(err, "failed to shutdown extensions")
425354
}
426-
427355
return nil
428356
}
429357

@@ -450,7 +378,7 @@ func (app *Application) execute(ctx context.Context, factory ConfigFactory) erro
450378
return err
451379
}
452380

453-
err = app.notifyPipelineReady()
381+
err = app.builtExtensions.NotifyPipelineReady()
454382
if err != nil {
455383
return err
456384
}
@@ -465,7 +393,7 @@ func (app *Application) execute(ctx context.Context, factory ConfigFactory) erro
465393
runtime.KeepAlive(ballast)
466394
app.logger.Info("Starting shutdown...")
467395

468-
err = app.notifyPipelineNotReady()
396+
err = app.builtExtensions.NotifyPipelineNotReady()
469397
if err != nil {
470398
errs = append(errs, errors.Wrap(err, "failed to notify that pipeline is not ready"))
471399
}

0 commit comments

Comments
 (0)