Skip to content

Commit 7f13eb6

Browse files
authored
Make Kafka payload encoding configurable (#1584)
* Make Kafka payload encoding configurable Signed-off-by: Pavol Loffay <[email protected]> * Expose marshalers in factory Signed-off-by: Pavol Loffay <[email protected]> * Remove zipkin from exporter readme Signed-off-by: Pavol Loffay <[email protected]> * Make custom encodings optional in the factory Signed-off-by: Pavol Loffay <[email protected]>
1 parent b13392b commit 7f13eb6

29 files changed

+1003
-127
lines changed

exporter/kafkaexporter/README.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,18 @@
22

33
Kafka exporter exports traces to Kafka. This exporter uses a synchronous producer
44
that blocks and does not batch messages, therefore it should be used with batch and queued retry
5-
processors for higher throughput and resiliency.
5+
processors for higher throughput and resiliency. Message payload encoding is configurable.
66

7-
Message payloads are serialized OTLP `ExportTraceServiceRequest`.
8-
97
The following settings are required:
108
- `protocol_version` (no default): Kafka protocol version e.g. 2.0.0
119

1210
The following settings can be optionally configured:
1311
- `brokers` (default = localhost:9092): The list of kafka brokers
1412
- `topic` (default = otlp_spans): The name of the kafka topic to export to
13+
- `encoding` (default = otlp_proto): The encoding of the payload sent to kafka. Available encodings:
14+
- `otlp_proto`: the payload is serialized to `ExportTraceServiceRequest`.
15+
- `jaeger_proto`: the payload is serialized to a single Jaeger proto `Span`.
16+
- `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`.
1517
- `metadata`
1618
- `full` (default = true): Whether to maintain a full set of metadata.
1719
When disabled the client does not make the initial request to broker at the startup.

exporter/kafkaexporter/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ type Config struct {
3434
ProtocolVersion string `mapstructure:"protocol_version"`
3535
// The name of the kafka topic to export to (default "otlp_spans")
3636
Topic string `mapstructure:"topic"`
37+
// Encoding of the messages (default "otlp_proto")
38+
Encoding string `mapstructure:"encoding"`
3739

3840
// Metadata is the namespace for metadata management properties used by the
3941
// Client, and shared by the Producer/Consumer.

exporter/kafkaexporter/config_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ func TestLoadConfig(t *testing.T) {
5858
NumConsumers: 2,
5959
QueueSize: 10,
6060
},
61-
Topic: "spans",
62-
Brokers: []string{"foo:123", "bar:456"},
61+
Topic: "spans",
62+
Encoding: "otlp_proto",
63+
Brokers: []string{"foo:123", "bar:456"},
6364
Metadata: Metadata{
6465
Full: false,
6566
Retry: MetadataRetry{

exporter/kafkaexporter/factory.go

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ import (
2424
)
2525

2626
const (
27-
typeStr = "kafka"
28-
defaultTopic = "otlp_spans"
29-
defaultBroker = "localhost:9092"
27+
typeStr = "kafka"
28+
defaultTopic = "otlp_spans"
29+
defaultEncoding = "otlp_proto"
30+
defaultBroker = "localhost:9092"
3031
// default from sarama.NewConfig()
3132
defaultMetadataRetryMax = 3
3233
// default from sarama.NewConfig()
@@ -35,12 +36,30 @@ const (
3536
defaultMetadataFull = true
3637
)
3738

39+
// FactoryOption applies changes to kafkaExporterFactory.
40+
type FactoryOption func(factory *kafkaExporterFactory)
41+
42+
// WithAddMarshallers adds marshallers.
43+
func WithAddMarshallers(encodingMarshaller map[string]Marshaller) FactoryOption {
44+
return func(factory *kafkaExporterFactory) {
45+
for encoding, marshaller := range encodingMarshaller {
46+
factory.marshallers[encoding] = marshaller
47+
}
48+
}
49+
}
50+
3851
// NewFactory creates Kafka exporter factory.
39-
func NewFactory() component.ExporterFactory {
52+
func NewFactory(options ...FactoryOption) component.ExporterFactory {
53+
f := &kafkaExporterFactory{
54+
marshallers: defaultMarshallers(),
55+
}
56+
for _, o := range options {
57+
o(f)
58+
}
4059
return exporterhelper.NewFactory(
4160
typeStr,
4261
createDefaultConfig,
43-
exporterhelper.WithTraces(createTraceExporter))
62+
exporterhelper.WithTraces(f.createTraceExporter))
4463
}
4564

4665
func createDefaultConfig() configmodels.Exporter {
@@ -57,6 +76,7 @@ func createDefaultConfig() configmodels.Exporter {
5776
QueueSettings: qs,
5877
Brokers: []string{defaultBroker},
5978
Topic: defaultTopic,
79+
Encoding: defaultEncoding,
6080
Metadata: Metadata{
6181
Full: defaultMetadataFull,
6282
Retry: MetadataRetry{
@@ -67,13 +87,17 @@ func createDefaultConfig() configmodels.Exporter {
6787
}
6888
}
6989

70-
func createTraceExporter(
90+
type kafkaExporterFactory struct {
91+
marshallers map[string]Marshaller
92+
}
93+
94+
func (f *kafkaExporterFactory) createTraceExporter(
7195
_ context.Context,
7296
params component.ExporterCreateParams,
7397
cfg configmodels.Exporter,
7498
) (component.TraceExporter, error) {
7599
oCfg := cfg.(*Config)
76-
exp, err := newExporter(*oCfg, params)
100+
exp, err := newExporter(*oCfg, params, f.marshallers)
77101
if err != nil {
78102
return nil, err
79103
}

exporter/kafkaexporter/factory_test.go

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"go.opentelemetry.io/collector/component"
2525
"go.opentelemetry.io/collector/config/configcheck"
26+
"go.opentelemetry.io/collector/consumer/pdata"
2627
)
2728

2829
func TestCreateDefaultConfig(t *testing.T) {
@@ -39,7 +40,8 @@ func TestCreateTracesExporter(t *testing.T) {
3940
cfg.ProtocolVersion = "2.0.0"
4041
// this disables contacting the broker so we can successfully create the exporter
4142
cfg.Metadata.Full = false
42-
r, err := createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
43+
f := kafkaExporterFactory{marshallers: defaultMarshallers()}
44+
r, err := f.createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
4345
require.NoError(t, err)
4446
assert.NotNil(t, r)
4547
}
@@ -48,8 +50,43 @@ func TestCreateTracesExporter_err(t *testing.T) {
4850
cfg := createDefaultConfig().(*Config)
4951
cfg.Brokers = []string{"invalid:9092"}
5052
cfg.ProtocolVersion = "2.0.0"
51-
// we get the error because the exporter
52-
r, err := createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
53+
f := kafkaExporterFactory{marshallers: defaultMarshallers()}
54+
r, err := f.createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
55+
// no available broker
5356
require.Error(t, err)
5457
assert.Nil(t, r)
5558
}
59+
60+
func TestWithMarshallers(t *testing.T) {
61+
cm := &customMarshaller{}
62+
f := NewFactory(WithAddMarshallers(map[string]Marshaller{cm.Encoding(): cm}))
63+
cfg := createDefaultConfig().(*Config)
64+
// disable contacting broker
65+
cfg.Metadata.Full = false
66+
67+
t.Run("custom_encoding", func(t *testing.T) {
68+
cfg.Encoding = cm.Encoding()
69+
exporter, err := f.CreateTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
70+
require.NoError(t, err)
71+
require.NotNil(t, exporter)
72+
})
73+
t.Run("default_encoding", func(t *testing.T) {
74+
cfg.Encoding = new(otlpProtoMarshaller).Encoding()
75+
exporter, err := f.CreateTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
76+
require.NoError(t, err)
77+
assert.NotNil(t, exporter)
78+
})
79+
}
80+
81+
type customMarshaller struct {
82+
}
83+
84+
var _ Marshaller = (*customMarshaller)(nil)
85+
86+
func (c customMarshaller) Marshal(traces pdata.Traces) ([]Message, error) {
87+
panic("implement me")
88+
}
89+
90+
func (c customMarshaller) Encoding() string {
91+
return "custom"
92+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright 2020 The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package kafkaexporter
16+
17+
import (
18+
"bytes"
19+
20+
"github.com/gogo/protobuf/jsonpb"
21+
jaegerproto "github.com/jaegertracing/jaeger/model"
22+
23+
"go.opentelemetry.io/collector/component/componenterror"
24+
"go.opentelemetry.io/collector/consumer/pdata"
25+
jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger"
26+
)
27+
28+
type jaegerMarshaller struct {
29+
marshaller jaegerSpanMarshaller
30+
}
31+
32+
var _ Marshaller = (*jaegerMarshaller)(nil)
33+
34+
func (j jaegerMarshaller) Marshal(traces pdata.Traces) ([]Message, error) {
35+
batches, err := jaegertranslator.InternalTracesToJaegerProto(traces)
36+
if err != nil {
37+
return nil, err
38+
}
39+
var messages []Message
40+
var errs []error
41+
for _, batch := range batches {
42+
for _, span := range batch.Spans {
43+
span.Process = batch.Process
44+
bts, err := j.marshaller.marshall(span)
45+
// continue to process spans that can be serialized
46+
if err != nil {
47+
errs = append(errs, err)
48+
continue
49+
}
50+
messages = append(messages, Message{Value: bts})
51+
}
52+
}
53+
return messages, componenterror.CombineErrors(errs)
54+
}
55+
56+
func (j jaegerMarshaller) Encoding() string {
57+
return j.marshaller.encoding()
58+
}
59+
60+
type jaegerSpanMarshaller interface {
61+
marshall(span *jaegerproto.Span) ([]byte, error)
62+
encoding() string
63+
}
64+
65+
type jaegerProtoSpanMarshaller struct {
66+
}
67+
68+
var _ jaegerSpanMarshaller = (*jaegerProtoSpanMarshaller)(nil)
69+
70+
func (p jaegerProtoSpanMarshaller) marshall(span *jaegerproto.Span) ([]byte, error) {
71+
return span.Marshal()
72+
}
73+
74+
func (p jaegerProtoSpanMarshaller) encoding() string {
75+
return "jaeger_proto"
76+
}
77+
78+
type jaegerJSONSpanMarshaller struct {
79+
pbMarshaller *jsonpb.Marshaler
80+
}
81+
82+
var _ jaegerSpanMarshaller = (*jaegerJSONSpanMarshaller)(nil)
83+
84+
func newJaegerJSONMarshaller() *jaegerJSONSpanMarshaller {
85+
return &jaegerJSONSpanMarshaller{
86+
pbMarshaller: &jsonpb.Marshaler{},
87+
}
88+
}
89+
90+
func (p jaegerJSONSpanMarshaller) marshall(span *jaegerproto.Span) ([]byte, error) {
91+
out := new(bytes.Buffer)
92+
err := p.pbMarshaller.Marshal(out, span)
93+
return out.Bytes(), err
94+
}
95+
96+
func (p jaegerJSONSpanMarshaller) encoding() string {
97+
return "jaeger_json"
98+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright 2020 The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package kafkaexporter
16+
17+
import (
18+
"bytes"
19+
"testing"
20+
21+
"github.com/gogo/protobuf/jsonpb"
22+
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
24+
25+
"go.opentelemetry.io/collector/consumer/pdata"
26+
jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger"
27+
)
28+
29+
func TestJaegerMarshaller(t *testing.T) {
30+
td := pdata.NewTraces()
31+
td.ResourceSpans().Resize(1)
32+
td.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1)
33+
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1)
34+
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetName("foo")
35+
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetStartTime(pdata.TimestampUnixNano(10))
36+
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetEndTime(pdata.TimestampUnixNano(20))
37+
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetTraceID(pdata.NewTraceID([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}))
38+
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetSpanID(pdata.NewSpanID([]byte{1, 2, 3, 4, 5, 6, 7, 8}))
39+
batches, err := jaegertranslator.InternalTracesToJaegerProto(td)
40+
require.NoError(t, err)
41+
42+
batches[0].Spans[0].Process = batches[0].Process
43+
jaegerProtoBytes, err := batches[0].Spans[0].Marshal()
44+
require.NoError(t, err)
45+
require.NotNil(t, jaegerProtoBytes)
46+
47+
jsonMarshaller := &jsonpb.Marshaler{}
48+
jsonByteBuffer := new(bytes.Buffer)
49+
require.NoError(t, jsonMarshaller.Marshal(jsonByteBuffer, batches[0].Spans[0]))
50+
51+
tests := []struct {
52+
unmarshaller Marshaller
53+
encoding string
54+
messages []Message
55+
}{
56+
{
57+
unmarshaller: jaegerMarshaller{
58+
marshaller: jaegerProtoSpanMarshaller{},
59+
},
60+
encoding: "jaeger_proto",
61+
messages: []Message{{Value: jaegerProtoBytes}},
62+
},
63+
{
64+
unmarshaller: jaegerMarshaller{
65+
marshaller: jaegerJSONSpanMarshaller{
66+
pbMarshaller: &jsonpb.Marshaler{},
67+
},
68+
},
69+
encoding: "jaeger_json",
70+
messages: []Message{{Value: jsonByteBuffer.Bytes()}},
71+
},
72+
}
73+
for _, test := range tests {
74+
t.Run(test.encoding, func(t *testing.T) {
75+
messages, err := test.unmarshaller.Marshal(td)
76+
require.NoError(t, err)
77+
assert.Equal(t, test.messages, messages)
78+
assert.Equal(t, test.encoding, test.unmarshaller.Encoding())
79+
})
80+
}
81+
}
82+
83+
func TestJaegerMarshaller_error_covert_traceID(t *testing.T) {
84+
marshaller := jaegerMarshaller{
85+
marshaller: jaegerProtoSpanMarshaller{},
86+
}
87+
td := pdata.NewTraces()
88+
td.ResourceSpans().Resize(1)
89+
td.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1)
90+
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1)
91+
// fails in zero traceID
92+
messages, err := marshaller.Marshal(td)
93+
require.Error(t, err)
94+
assert.Nil(t, messages)
95+
}

0 commit comments

Comments
 (0)