Skip to content

Commit 5cafba7

Browse files
dmathieuFiery-Fenix
authored andcommitted
[chore] [exporter/elasticsearch] Allow storing state in the OTel encoder (open-telemetry#38732)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description For open-telemetry#38606, we want to be able to store state within the otel encoder. We can't do that, as we encoder is root methods, not within a struct. So this PR changes that behavior to keep encoders in memory, so they can introduce shared state such as cache.
1 parent 53bf07a commit 5cafba7

File tree

11 files changed

+77
-29
lines changed

11 files changed

+77
-29
lines changed

exporter/elasticsearchexporter/exporter.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,22 @@ type elasticsearchExporter struct {
3535
allowedMappingModes map[string]MappingMode
3636
bulkIndexers bulkIndexers
3737
bufferPool *pool.BufferPool
38+
encoders map[MappingMode]documentEncoder
3839
}
3940

40-
func newExporter(cfg *Config, set exporter.Settings, index string) *elasticsearchExporter {
41+
func newExporter(cfg *Config, set exporter.Settings, index string) (*elasticsearchExporter, error) {
4142
allowedMappingModes := cfg.allowedMappingModes()
4243
defaultMappingMode := allowedMappingModes[canonicalMappingModeName(cfg.Mapping.Mode)]
44+
45+
encoders := map[MappingMode]documentEncoder{}
46+
for i := range NumMappingModes {
47+
enc, err := newEncoder(i)
48+
if err != nil {
49+
return nil, err
50+
}
51+
encoders[i] = enc
52+
}
53+
4354
return &elasticsearchExporter{
4455
set: set,
4556
config: cfg,
@@ -48,7 +59,8 @@ func newExporter(cfg *Config, set exporter.Settings, index string) *elasticsearc
4859
allowedMappingModes: allowedMappingModes,
4960
defaultMappingMode: defaultMappingMode,
5061
bufferPool: pool.NewBufferPool(),
51-
}
62+
encoders: encoders,
63+
}, nil
5264
}
5365

5466
func (e *elasticsearchExporter) Start(ctx context.Context, host component.Host) error {
@@ -65,13 +77,21 @@ func (e *elasticsearchExporter) Shutdown(ctx context.Context) error {
6577
return nil
6678
}
6779

80+
func (e *elasticsearchExporter) getEncoder(m MappingMode) (documentEncoder, error) {
81+
if enc, ok := e.encoders[m]; ok {
82+
return enc, nil
83+
}
84+
85+
return nil, fmt.Errorf("no encoder setup for mapping mode %s", m)
86+
}
87+
6888
func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
6989
mappingMode, err := e.getMappingMode(ctx)
7090
if err != nil {
7191
return err
7292
}
7393
router := newDocumentRouter(mappingMode, e.index, e.config)
74-
encoder, err := newEncoder(mappingMode)
94+
encoder, err := e.getEncoder(mappingMode)
7595
if err != nil {
7696
return err
7797
}
@@ -172,7 +192,7 @@ func (e *elasticsearchExporter) pushMetricsData(
172192
}
173193
router := newDocumentRouter(mappingMode, e.index, e.config)
174194
hasher := newDataPointHasher(mappingMode)
175-
encoder, err := newEncoder(mappingMode)
195+
encoder, err := e.getEncoder(mappingMode)
176196
if err != nil {
177197
return err
178198
}
@@ -335,7 +355,7 @@ func (e *elasticsearchExporter) pushTraceData(
335355
}
336356
router := newDocumentRouter(mappingMode, e.index, e.config)
337357
spanEventRouter := newDocumentRouter(mappingMode, e.config.LogsIndex, e.config)
338-
encoder, err := newEncoder(mappingMode)
358+
encoder, err := e.getEncoder(mappingMode)
339359
if err != nil {
340360
return err
341361
}
@@ -465,7 +485,7 @@ func (e *elasticsearchExporter) pushProfilesData(ctx context.Context, pd pprofil
465485
if err != nil {
466486
return err
467487
}
468-
encoder, err := newEncoder(mappingMode)
488+
encoder, err := e.getEncoder(mappingMode)
469489
if err != nil {
470490
return err
471491
}

exporter/elasticsearchexporter/factory.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,10 @@ func createLogsExporter(
108108

109109
handleDeprecatedConfig(cf, set.Logger)
110110

111-
exporter := newExporter(cf, set, cf.LogsIndex)
111+
exporter, err := newExporter(cf, set, cf.LogsIndex)
112+
if err != nil {
113+
return nil, err
114+
}
112115

113116
return exporterhelper.NewLogs(
114117
ctx,
@@ -127,7 +130,10 @@ func createMetricsExporter(
127130
cf := cfg.(*Config)
128131
handleDeprecatedConfig(cf, set.Logger)
129132

130-
exporter := newExporter(cf, set, cf.MetricsIndex)
133+
exporter, err := newExporter(cf, set, cf.MetricsIndex)
134+
if err != nil {
135+
return nil, err
136+
}
131137

132138
return exporterhelper.NewMetrics(
133139
ctx,
@@ -145,7 +151,10 @@ func createTracesExporter(ctx context.Context,
145151
cf := cfg.(*Config)
146152
handleDeprecatedConfig(cf, set.Logger)
147153

148-
exporter := newExporter(cf, set, cf.TracesIndex)
154+
exporter, err := newExporter(cf, set, cf.TracesIndex)
155+
if err != nil {
156+
return nil, err
157+
}
149158

150159
return exporterhelper.NewTraces(
151160
ctx,
@@ -168,7 +177,10 @@ func createProfilesExporter(
168177

169178
handleDeprecatedConfig(cf, set.Logger)
170179

171-
exporter := newExporter(cf, set, "")
180+
exporter, err := newExporter(cf, set, "")
181+
if err != nil {
182+
return nil, err
183+
}
172184

173185
return xexporterhelper.NewProfilesExporter(
174186
ctx,

exporter/elasticsearchexporter/internal/serializer/otelserializer/logs.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer"
1616
)
1717

18-
func SerializeLog(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, record plog.LogRecord, idx elasticsearch.Index, buf *bytes.Buffer) error {
18+
func (*Serializer) SerializeLog(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, record plog.LogRecord, idx elasticsearch.Index, buf *bytes.Buffer) error {
1919
v := json.NewVisitor(buf)
2020
// Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1.
2121
// This is required to generate the correct dynamic mapping in ES.

exporter/elasticsearchexporter/internal/serializer/otelserializer/logs_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ func TestSerializeLog(t *testing.T) {
185185
logs.MarkReadOnly()
186186

187187
var buf bytes.Buffer
188-
err := SerializeLog(resourceLogs.Resource(), "", scopeLogs.Scope(), "", record, elasticsearch.Index{}, &buf)
188+
ser := New()
189+
err := ser.SerializeLog(resourceLogs.Resource(), "", scopeLogs.Scope(), "", record, elasticsearch.Index{}, &buf)
189190
if (err != nil) != tt.wantErr {
190191
t.Errorf("Log() error = %v, wantErr %v", err, tt.wantErr)
191192
}

exporter/elasticsearchexporter/internal/serializer/otelserializer/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer"
2020
)
2121

22-
func SerializeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []datapoints.DataPoint, validationErrors *[]error, idx elasticsearch.Index, buf *bytes.Buffer) (map[string]string, error) {
22+
func (*Serializer) SerializeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []datapoints.DataPoint, validationErrors *[]error, idx elasticsearch.Index, buf *bytes.Buffer) (map[string]string, error) {
2323
if len(dataPoints) == 0 {
2424
return nil, nil
2525
}

exporter/elasticsearchexporter/internal/serializer/otelserializer/metrics_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ func TestSerializeMetricsConflict(t *testing.T) {
3333

3434
var validationErrors []error
3535
var buf bytes.Buffer
36-
_, err := SerializeMetrics(resourceMetrics.Resource(), "", scopeMetrics.Scope(), "", dataPoints, &validationErrors, elasticsearch.Index{}, &buf)
36+
ser := New()
37+
_, err := ser.SerializeMetrics(resourceMetrics.Resource(), "", scopeMetrics.Scope(), "", dataPoints, &validationErrors, elasticsearch.Index{}, &buf)
3738
if err != nil {
3839
t.Errorf("Metrics() error = %v", err)
3940
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package otelserializer // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer"
5+
6+
type Serializer struct{}
7+
8+
// New builds a new Serializer
9+
func New() *Serializer {
10+
return &Serializer{}
11+
}

exporter/elasticsearchexporter/internal/serializer/otelserializer/profile.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ const (
2424
)
2525

2626
// SerializeProfile serializes a profile and calls the `pushData` callback for each generated document.
27-
func SerializeProfile(resource pcommon.Resource, scope pcommon.InstrumentationScope, profile pprofile.Profile, pushData func(*bytes.Buffer, string, string) error) error {
27+
func (*Serializer) SerializeProfile(resource pcommon.Resource, scope pcommon.InstrumentationScope, profile pprofile.Profile, pushData func(*bytes.Buffer, string, string) error) error {
2828
pushDataAsJSON := func(data any, id, index string) error {
2929
c, err := toJSON(data)
3030
if err != nil {

exporter/elasticsearchexporter/internal/serializer/otelserializer/profile_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ func TestSerializeProfile(t *testing.T) {
115115
profiles.MarkReadOnly()
116116

117117
buf := []*bytes.Buffer{}
118-
err := SerializeProfile(resource.Resource(), scope.Scope(), profile, func(b *bytes.Buffer, _ string, _ string) error {
118+
ser := New()
119+
err := ser.SerializeProfile(resource.Resource(), scope.Scope(), profile, func(b *bytes.Buffer, _ string, _ string) error {
119120
buf = append(buf, b)
120121
return nil
121122
})

exporter/elasticsearchexporter/internal/serializer/otelserializer/traces.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch"
1515
)
1616

17-
func SerializeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, idx elasticsearch.Index, buf *bytes.Buffer) {
17+
func (*Serializer) SerializeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, idx elasticsearch.Index, buf *bytes.Buffer) {
1818
v := json.NewVisitor(buf)
1919
// Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1.
2020
// This is required to generate the correct dynamic mapping in ES.
@@ -41,7 +41,7 @@ func SerializeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, sco
4141
_ = v.OnObjectFinished()
4242
}
4343

44-
func SerializeSpan(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, span ptrace.Span, idx elasticsearch.Index, buf *bytes.Buffer) error {
44+
func (*Serializer) SerializeSpan(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, span ptrace.Span, idx elasticsearch.Index, buf *bytes.Buffer) error {
4545
v := json.NewVisitor(buf)
4646
// Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1.
4747
// This is required to generate the correct dynamic mapping in ES.

exporter/elasticsearchexporter/model.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func newEncoder(mode MappingMode) (documentEncoder, error) {
125125
profilesUnsupportedEncoder: profilesUnsupportedEncoder{mode: mode},
126126
}, nil
127127
case MappingOTel:
128-
return otelModeEncoder{}, nil
128+
return otelModeEncoder{serializer: otelserializer.New()}, nil
129129
}
130130
return nil, fmt.Errorf("unknown mapping mode %q (%d)", mode, int(mode))
131131
}
@@ -150,7 +150,9 @@ type bodymapModeEncoder struct {
150150
profilesUnsupportedEncoder
151151
}
152152

153-
type otelModeEncoder struct{}
153+
type otelModeEncoder struct {
154+
serializer *otelserializer.Serializer
155+
}
154156

155157
const (
156158
traceIDField = "traceID"
@@ -234,61 +236,61 @@ func (e otelModeEncoder) encodeLog(
234236
idx elasticsearch.Index,
235237
buf *bytes.Buffer,
236238
) error {
237-
return otelserializer.SerializeLog(
239+
return e.serializer.SerializeLog(
238240
ec.resource, ec.resourceSchemaURL,
239241
ec.scope, ec.scopeSchemaURL,
240242
record, idx, buf,
241243
)
242244
}
243245

244-
func (otelModeEncoder) encodeSpan(
246+
func (e otelModeEncoder) encodeSpan(
245247
ec encodingContext,
246248
span ptrace.Span,
247249
idx elasticsearch.Index,
248250
buf *bytes.Buffer,
249251
) error {
250-
return otelserializer.SerializeSpan(
252+
return e.serializer.SerializeSpan(
251253
ec.resource, ec.resourceSchemaURL,
252254
ec.scope, ec.scopeSchemaURL,
253255
span, idx, buf,
254256
)
255257
}
256258

257-
func (otelModeEncoder) encodeSpanEvent(
259+
func (e otelModeEncoder) encodeSpanEvent(
258260
ec encodingContext,
259261
span ptrace.Span,
260262
spanEvent ptrace.SpanEvent,
261263
idx elasticsearch.Index,
262264
buf *bytes.Buffer,
263265
) error {
264-
otelserializer.SerializeSpanEvent(
266+
e.serializer.SerializeSpanEvent(
265267
ec.resource, ec.resourceSchemaURL,
266268
ec.scope, ec.scopeSchemaURL,
267269
span, spanEvent, idx, buf,
268270
)
269271
return nil
270272
}
271273

272-
func (otelModeEncoder) encodeMetrics(
274+
func (e otelModeEncoder) encodeMetrics(
273275
ec encodingContext,
274276
dataPoints []datapoints.DataPoint,
275277
validationErrors *[]error,
276278
idx elasticsearch.Index,
277279
buf *bytes.Buffer,
278280
) (map[string]string, error) {
279-
return otelserializer.SerializeMetrics(
281+
return e.serializer.SerializeMetrics(
280282
ec.resource, ec.resourceSchemaURL,
281283
ec.scope, ec.scopeSchemaURL,
282284
dataPoints, validationErrors, idx, buf,
283285
)
284286
}
285287

286-
func (otelModeEncoder) encodeProfile(
288+
func (e otelModeEncoder) encodeProfile(
287289
ec encodingContext,
288290
profile pprofile.Profile,
289291
pushData func(*bytes.Buffer, string, string) error,
290292
) error {
291-
return otelserializer.SerializeProfile(ec.resource, ec.scope, profile, pushData)
293+
return e.serializer.SerializeProfile(ec.resource, ec.scope, profile, pushData)
292294
}
293295

294296
func (e bodymapModeEncoder) encodeLog(

0 commit comments

Comments
 (0)