Skip to content

Commit 4cfc84e

Browse files
thmshmmMovieStoreGuy
authored andcommitted
[exporter/kafkaexporter] Add encoding extensions support (open-telemetry#34384)
**Description:** Add support for encoding extensions in the kafkaexporter To be able to use encoding extensions this PR adds extension support and proposes to rename the existing `encoding` configuration property to `format` and reusing the `encoding` property for configuring encoding extensions. Reason is to be consistent with other receivers/exporters. Related to open-telemetry#33888 which adds encoding extension support in the `kafkareceiver`. **Link to tracking Issue:** n/a **Testing:** Tested via the following configuration. ``` receivers: kafka: brokers: - localhost:29092 encoding: json group_id: test1 topic: logs_in extensions: json_log_encoding: exporters: debug: verbosity: detailed kafka: brokers: - localhost:29092 encoding: json_log_encoding topic: json_out processors: batch: service: extensions: [json_log_encoding] pipelines: logs: receivers: [kafka] processors: [batch] exporters: [debug, kafka] telemetry: logs: level: "info" ``` Any json can be written to the `logs_in` topic and results be viewed in the `json_out` topic. When removing `encoding: json_log_encoding` the default format type is used and the output in `json_out` topic changes accordingly. **Documentation:** Updated README.md within the receiver describing the use of encoding extensions. Co-authored-by: Sean Marciniak <[email protected]>
1 parent ef01c49 commit 4cfc84e

File tree

6 files changed

+408
-80
lines changed

6 files changed

+408
-80
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkaexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support for encoding extensions in the Kafka exporter.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [34384]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
This change adds support for encoding extensions in the Kafka exporter. Loading extensions takes precedence over the internally supported encodings.
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/kafkaexporter/factory.go

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,7 @@ func (f *kafkaExporterFactory) createTracesExporter(
105105
if oCfg.Encoding == "otlp_json" {
106106
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
107107
}
108-
exp, err := newTracesExporter(oCfg, set)
109-
if err != nil {
110-
return nil, err
111-
}
108+
exp := newTracesExporter(oCfg, set)
112109
return exporterhelper.NewTracesExporter(
113110
ctx,
114111
set,
@@ -136,10 +133,7 @@ func (f *kafkaExporterFactory) createMetricsExporter(
136133
if oCfg.Encoding == "otlp_json" {
137134
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
138135
}
139-
exp, err := newMetricsExporter(oCfg, set)
140-
if err != nil {
141-
return nil, err
142-
}
136+
exp := newMetricsExporter(oCfg, set)
143137
return exporterhelper.NewMetricsExporter(
144138
ctx,
145139
set,
@@ -167,10 +161,7 @@ func (f *kafkaExporterFactory) createLogsExporter(
167161
if oCfg.Encoding == "otlp_json" {
168162
set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment")
169163
}
170-
exp, err := newLogsExporter(oCfg, set)
171-
if err != nil {
172-
return nil, err
173-
}
164+
exp := newLogsExporter(oCfg, set)
174165
return exporterhelper.NewLogsExporter(
175166
ctx,
176167
set,

exporter/kafkaexporter/kafka_exporter.go

Lines changed: 90 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,23 @@ func (e *kafkaTracesProducer) Close(context.Context) error {
6565
return e.producer.Close()
6666
}
6767

68-
func (e *kafkaTracesProducer) start(_ context.Context, _ component.Host) error {
68+
func (e *kafkaTracesProducer) start(_ context.Context, host component.Host) error {
69+
// extensions take precedence over internal encodings
70+
if marshaler, errExt := loadEncodingExtension[ptrace.Marshaler](
71+
host,
72+
e.cfg.Encoding,
73+
); errExt == nil {
74+
e.marshaler = &tracesEncodingMarshaler{
75+
marshaler: *marshaler,
76+
encoding: e.cfg.Encoding,
77+
}
78+
}
79+
if marshaler, errInt := createTracesMarshaler(e.cfg); e.marshaler == nil && errInt == nil {
80+
e.marshaler = marshaler
81+
}
82+
if e.marshaler == nil {
83+
return errUnrecognizedEncoding
84+
}
6985
producer, err := newSaramaProducer(e.cfg)
7086
if err != nil {
7187
return err
@@ -107,7 +123,23 @@ func (e *kafkaMetricsProducer) Close(context.Context) error {
107123
return e.producer.Close()
108124
}
109125

110-
func (e *kafkaMetricsProducer) start(_ context.Context, _ component.Host) error {
126+
func (e *kafkaMetricsProducer) start(_ context.Context, host component.Host) error {
127+
// extensions take precedence over internal encodings
128+
if marshaler, errExt := loadEncodingExtension[pmetric.Marshaler](
129+
host,
130+
e.cfg.Encoding,
131+
); errExt == nil {
132+
e.marshaler = &metricsEncodingMarshaler{
133+
marshaler: *marshaler,
134+
encoding: e.cfg.Encoding,
135+
}
136+
}
137+
if marshaler, errInt := createMetricMarshaler(e.cfg); e.marshaler == nil && errInt == nil {
138+
e.marshaler = marshaler
139+
}
140+
if e.marshaler == nil {
141+
return errUnrecognizedEncoding
142+
}
111143
producer, err := newSaramaProducer(e.cfg)
112144
if err != nil {
113145
return err
@@ -149,7 +181,23 @@ func (e *kafkaLogsProducer) Close(context.Context) error {
149181
return e.producer.Close()
150182
}
151183

152-
func (e *kafkaLogsProducer) start(_ context.Context, _ component.Host) error {
184+
func (e *kafkaLogsProducer) start(_ context.Context, host component.Host) error {
185+
// extensions take precedence over internal encodings
186+
if marshaler, errExt := loadEncodingExtension[plog.Marshaler](
187+
host,
188+
e.cfg.Encoding,
189+
); errExt == nil {
190+
e.marshaler = &logsEncodingMarshaler{
191+
marshaler: *marshaler,
192+
encoding: e.cfg.Encoding,
193+
}
194+
}
195+
if marshaler, errInt := createLogMarshaler(e.cfg); e.marshaler == nil && errInt == nil {
196+
e.marshaler = marshaler
197+
}
198+
if e.marshaler == nil {
199+
return errUnrecognizedEncoding
200+
}
153201
producer, err := newSaramaProducer(e.cfg)
154202
if err != nil {
155203
return err
@@ -204,50 +252,26 @@ func newSaramaProducer(config Config) (sarama.SyncProducer, error) {
204252
return producer, nil
205253
}
206254

207-
func newMetricsExporter(config Config, set exporter.Settings) (*kafkaMetricsProducer, error) {
208-
marshaler, err := createMetricMarshaler(config)
209-
if err != nil {
210-
return nil, err
211-
}
212-
213-
if marshaler == nil {
214-
return nil, errUnrecognizedEncoding
215-
}
216-
255+
func newMetricsExporter(config Config, set exporter.Settings) *kafkaMetricsProducer {
217256
return &kafkaMetricsProducer{
218-
cfg: config,
219-
marshaler: marshaler,
220-
logger: set.Logger,
221-
}, nil
222-
257+
cfg: config,
258+
logger: set.Logger,
259+
}
223260
}
224261

225262
// newTracesExporter creates Kafka exporter.
226-
func newTracesExporter(config Config, set exporter.Settings) (*kafkaTracesProducer, error) {
227-
marshaler, err := createTracesMarshaler(config)
228-
if err != nil {
229-
return nil, err
230-
}
231-
263+
func newTracesExporter(config Config, set exporter.Settings) *kafkaTracesProducer {
232264
return &kafkaTracesProducer{
233-
cfg: config,
234-
marshaler: marshaler,
235-
logger: set.Logger,
236-
}, nil
237-
}
238-
239-
func newLogsExporter(config Config, set exporter.Settings) (*kafkaLogsProducer, error) {
240-
marshaler, err := createLogMarshaler(config)
241-
if err != nil {
242-
return nil, err
265+
cfg: config,
266+
logger: set.Logger,
243267
}
268+
}
244269

270+
func newLogsExporter(config Config, set exporter.Settings) *kafkaLogsProducer {
245271
return &kafkaLogsProducer{
246-
cfg: config,
247-
marshaler: marshaler,
248-
logger: set.Logger,
249-
}, nil
250-
272+
cfg: config,
273+
logger: set.Logger,
274+
}
251275
}
252276

253277
type resourceSlice[T any] interface {
@@ -271,3 +295,30 @@ func getTopic[T resource](cfg *Config, resources resourceSlice[T]) string {
271295
}
272296
return cfg.Topic
273297
}
298+
299+
// loadEncodingExtension tries to load an available extension for the given encoding.
300+
func loadEncodingExtension[T any](host component.Host, encoding string) (*T, error) {
301+
extensionID, err := encodingToComponentID(encoding)
302+
if err != nil {
303+
return nil, err
304+
}
305+
encodingExtension, ok := host.GetExtensions()[*extensionID]
306+
if !ok {
307+
return nil, fmt.Errorf("unknown encoding extension %q", encoding)
308+
}
309+
unmarshaler, ok := encodingExtension.(T)
310+
if !ok {
311+
return nil, fmt.Errorf("extension %q is not an unmarshaler", encoding)
312+
}
313+
return &unmarshaler, nil
314+
}
315+
316+
// encodingToComponentID converts an encoding string to a component ID using the given encoding as type.
317+
func encodingToComponentID(encoding string) (*component.ID, error) {
318+
componentType, err := component.NewType(encoding)
319+
if err != nil {
320+
return nil, fmt.Errorf("invalid component type: %w", err)
321+
}
322+
id := component.NewID(componentType)
323+
return &id, nil
324+
}

0 commit comments

Comments
 (0)