Skip to content

Commit 0a2ea1b

Browse files
authored
Add logs support on Kafka receiver (#2944)
* Add logs support on Kafka receiver * Update README * feedback - public logsUnmarshallers
1 parent 5db1af0 commit 0a2ea1b

File tree

10 files changed

+494
-10
lines changed

10 files changed

+494
-10
lines changed

receiver/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Available metric receivers (sorted alphabetically):
2424
Available log receivers (sorted alphabetically):
2525

2626
- [OTLP Receiver](otlpreceiver/README.md)
27+
- [Kafka Receiver](kafkareceiver/README.md)
2728

2829
The [contrib repository](https://github.com/open-telemetry/opentelemetry-collector-contrib)
2930
has more receivers that can be added to custom builds of the collector.

receiver/kafkareceiver/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
Kafka receiver receives traces from Kafka. Message payload encoding is configurable.
44

5-
Supported pipeline types: traces
5+
Supported pipeline types: traces, logs
66

77
## Getting Started
88

@@ -13,7 +13,7 @@ The following settings are required:
1313
The following settings can be optionally configured:
1414

1515
- `brokers` (default = localhost:9092): The list of kafka brokers
16-
- `topic` (default = otlp_spans): The name of the kafka topic to export to
16+
- `topic` (default = otlp_spans): The name of the kafka topic to read from
1717
- `encoding` (default = otlp_proto): The encoding of the payload sent to kafka. Available encodings:
1818
- `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`.
1919
- `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`.

receiver/kafkareceiver/factory.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,30 @@ func WithAddUnmarshallers(encodingMarshaller map[string]Unmarshaller) FactoryOpt
5353
}
5454
}
5555

56+
// WithAddLogsUnmarshallers adds LogsUnmarshallers.
57+
func WithAddLogsUnmarshallers(encodingMarshaller map[string]LogsUnmarshaller) FactoryOption {
58+
return func(factory *kafkaReceiverFactory) {
59+
for encoding, unmarshaller := range encodingMarshaller {
60+
factory.logsUnmarshaller[encoding] = unmarshaller
61+
}
62+
}
63+
}
64+
5665
// NewFactory creates Kafka receiver factory.
5766
func NewFactory(options ...FactoryOption) component.ReceiverFactory {
5867
f := &kafkaReceiverFactory{
59-
unmarshalers: defaultUnmarshallers(),
68+
unmarshalers: defaultUnmarshallers(),
69+
logsUnmarshaller: defaultLogsUnmarshallers(),
6070
}
6171
for _, o := range options {
6272
o(f)
6373
}
6474
return receiverhelper.NewFactory(
6575
typeStr,
6676
createDefaultConfig,
67-
receiverhelper.WithTraces(f.createTracesReceiver))
77+
receiverhelper.WithTraces(f.createTracesReceiver),
78+
receiverhelper.WithLogs(f.createLogsReceiver),
79+
)
6880
}
6981

7082
func createDefaultConfig() config.Receiver {
@@ -89,7 +101,8 @@ func createDefaultConfig() config.Receiver {
89101
}
90102

91103
type kafkaReceiverFactory struct {
92-
unmarshalers map[string]Unmarshaller
104+
unmarshalers map[string]Unmarshaller
105+
logsUnmarshaller map[string]LogsUnmarshaller
93106
}
94107

95108
func (f *kafkaReceiverFactory) createTracesReceiver(
@@ -105,3 +118,17 @@ func (f *kafkaReceiverFactory) createTracesReceiver(
105118
}
106119
return r, nil
107120
}
121+
122+
func (f *kafkaReceiverFactory) createLogsReceiver(
123+
_ context.Context,
124+
params component.ReceiverCreateParams,
125+
cfg config.Receiver,
126+
nextConsumer consumer.Logs,
127+
) (component.LogsReceiver, error) {
128+
c := cfg.(*Config)
129+
r, err := newLogsReceiver(*c, params, f.logsUnmarshaller, nextConsumer)
130+
if err != nil {
131+
return nil, err
132+
}
133+
return r, nil
134+
}

receiver/kafkareceiver/factory_test.go

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func TestCreateTracesReceiver_error(t *testing.T) {
5959
}
6060

6161
func TestWithUnmarshallers(t *testing.T) {
62-
unmarshaller := &customUnamarshaller{}
62+
unmarshaller := &customUnmarshaller{}
6363
f := NewFactory(WithAddUnmarshallers(map[string]Unmarshaller{unmarshaller.Encoding(): unmarshaller}))
6464
cfg := createDefaultConfig().(*Config)
6565
// disable contacting broker
@@ -80,15 +80,70 @@ func TestWithUnmarshallers(t *testing.T) {
8080
})
8181
}
8282

83-
type customUnamarshaller struct {
83+
func TestCreateLogsReceiver(t *testing.T) {
84+
cfg := createDefaultConfig().(*Config)
85+
cfg.Brokers = []string{"invalid:9092"}
86+
cfg.ProtocolVersion = "2.0.0"
87+
f := kafkaReceiverFactory{logsUnmarshaller: defaultLogsUnmarshallers()}
88+
r, err := f.createLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
89+
// no available broker
90+
require.Error(t, err)
91+
assert.Nil(t, r)
92+
}
93+
94+
func TestCreateLogsReceiver_error(t *testing.T) {
95+
cfg := createDefaultConfig().(*Config)
96+
cfg.ProtocolVersion = "2.0.0"
97+
// disable contacting broker at startup
98+
cfg.Metadata.Full = false
99+
f := kafkaReceiverFactory{logsUnmarshaller: defaultLogsUnmarshallers()}
100+
r, err := f.createLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
101+
require.NoError(t, err)
102+
assert.NotNil(t, r)
103+
}
104+
105+
func TestWithLogsUnmarshallers(t *testing.T) {
106+
unmarshaller := &customLogsUnmarshaller{}
107+
f := NewFactory(WithAddLogsUnmarshallers(map[string]LogsUnmarshaller{unmarshaller.Encoding(): unmarshaller}))
108+
cfg := createDefaultConfig().(*Config)
109+
// disable contacting broker
110+
cfg.Metadata.Full = false
111+
cfg.ProtocolVersion = "2.0.0"
112+
113+
t.Run("custom_encoding", func(t *testing.T) {
114+
cfg.Encoding = unmarshaller.Encoding()
115+
exporter, err := f.CreateLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
116+
require.NoError(t, err)
117+
require.NotNil(t, exporter)
118+
})
119+
t.Run("default_encoding", func(t *testing.T) {
120+
cfg.Encoding = new(otlpLogsPbUnmarshaller).Encoding()
121+
exporter, err := f.CreateLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
122+
require.NoError(t, err)
123+
assert.NotNil(t, exporter)
124+
})
84125
}
85126

86-
var _ Unmarshaller = (*customUnamarshaller)(nil)
127+
type customUnmarshaller struct {
128+
}
129+
130+
type customLogsUnmarshaller struct {
131+
}
132+
133+
var _ Unmarshaller = (*customUnmarshaller)(nil)
134+
135+
func (c customUnmarshaller) Unmarshal([]byte) (pdata.Traces, error) {
136+
panic("implement me")
137+
}
138+
139+
func (c customUnmarshaller) Encoding() string {
140+
return "custom"
141+
}
87142

88-
func (c customUnamarshaller) Unmarshal([]byte) (pdata.Traces, error) {
143+
func (c customLogsUnmarshaller) Unmarshal([]byte) (pdata.Logs, error) {
89144
panic("implement me")
90145
}
91146

92-
func (c customUnamarshaller) Encoding() string {
147+
func (c customLogsUnmarshaller) Encoding() string {
93148
return "custom"
94149
}

receiver/kafkareceiver/kafka_receiver.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,20 @@ type kafkaConsumer struct {
4848
logger *zap.Logger
4949
}
5050

51+
// kafkaLogsConsumer uses sarama to consume and handle messages from kafka.
52+
type kafkaLogsConsumer struct {
53+
name string
54+
consumerGroup sarama.ConsumerGroup
55+
nextConsumer consumer.Logs
56+
topics []string
57+
cancelConsumeLoop context.CancelFunc
58+
unmarshaller LogsUnmarshaller
59+
60+
logger *zap.Logger
61+
}
62+
5163
var _ component.Receiver = (*kafkaConsumer)(nil)
64+
var _ component.Receiver = (*kafkaLogsConsumer)(nil)
5265

5366
func newReceiver(config Config, params component.ReceiverCreateParams, unmarshalers map[string]Unmarshaller, nextConsumer consumer.Traces) (*kafkaConsumer, error) {
5467
unmarshaller := unmarshalers[config.Encoding]
@@ -121,6 +134,77 @@ func (c *kafkaConsumer) Shutdown(context.Context) error {
121134
return c.consumerGroup.Close()
122135
}
123136

137+
func newLogsReceiver(config Config, params component.ReceiverCreateParams, unmarshalers map[string]LogsUnmarshaller, nextConsumer consumer.Logs) (*kafkaLogsConsumer, error) {
138+
unmarshaller := unmarshalers[config.Encoding]
139+
if unmarshaller == nil {
140+
return nil, errUnrecognizedEncoding
141+
}
142+
143+
c := sarama.NewConfig()
144+
c.ClientID = config.ClientID
145+
c.Metadata.Full = config.Metadata.Full
146+
c.Metadata.Retry.Max = config.Metadata.Retry.Max
147+
c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
148+
if config.ProtocolVersion != "" {
149+
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
150+
if err != nil {
151+
return nil, err
152+
}
153+
c.Version = version
154+
}
155+
if err := kafkaexporter.ConfigureAuthentication(config.Authentication, c); err != nil {
156+
return nil, err
157+
}
158+
client, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, c)
159+
if err != nil {
160+
return nil, err
161+
}
162+
return &kafkaLogsConsumer{
163+
name: config.Name(),
164+
consumerGroup: client,
165+
topics: []string{config.Topic},
166+
nextConsumer: nextConsumer,
167+
unmarshaller: unmarshaller,
168+
logger: params.Logger,
169+
}, nil
170+
}
171+
172+
func (c *kafkaLogsConsumer) Start(context.Context, component.Host) error {
173+
ctx, cancel := context.WithCancel(context.Background())
174+
c.cancelConsumeLoop = cancel
175+
logsConsumerGroup := &logsConsumerGroupHandler{
176+
name: c.name,
177+
logger: c.logger,
178+
unmarshaller: c.unmarshaller,
179+
nextConsumer: c.nextConsumer,
180+
ready: make(chan bool),
181+
}
182+
go c.consumeLoop(ctx, logsConsumerGroup)
183+
<-logsConsumerGroup.ready
184+
return nil
185+
}
186+
187+
func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {
188+
for {
189+
// `Consume` should be called inside an infinite loop, when a
190+
// server-side rebalance happens, the consumer session will need to be
191+
// recreated to get the new claims
192+
if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil {
193+
c.logger.Error("Error from consumer", zap.Error(err))
194+
}
195+
// check if context was cancelled, signaling that the consumer should stop
196+
if ctx.Err() != nil {
197+
c.logger.Info("Consumer stopped", zap.Error(ctx.Err()))
198+
return ctx.Err()
199+
}
200+
}
201+
}
202+
203+
func (c *kafkaLogsConsumer) Shutdown(context.Context) error {
204+
c.cancelConsumeLoop()
205+
return c.consumerGroup.Close()
206+
}
207+
124208
type consumerGroupHandler struct {
125209
name string
126210
unmarshaller Unmarshaller
@@ -131,7 +215,18 @@ type consumerGroupHandler struct {
131215
logger *zap.Logger
132216
}
133217

218+
type logsConsumerGroupHandler struct {
219+
name string
220+
unmarshaller LogsUnmarshaller
221+
nextConsumer consumer.Logs
222+
ready chan bool
223+
readyCloser sync.Once
224+
225+
logger *zap.Logger
226+
}
227+
134228
var _ sarama.ConsumerGroupHandler = (*consumerGroupHandler)(nil)
229+
var _ sarama.ConsumerGroupHandler = (*logsConsumerGroupHandler)(nil)
135230

136231
func (c *consumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
137232
c.readyCloser.Do(func() {
@@ -180,3 +275,51 @@ func (c *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
180275
}
181276
return nil
182277
}
278+
279+
func (c *logsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
280+
c.readyCloser.Do(func() {
281+
close(c.ready)
282+
})
283+
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.name)}
284+
_ = stats.RecordWithTags(session.Context(), statsTags, statPartitionStart.M(1))
285+
return nil
286+
}
287+
288+
func (c *logsConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
289+
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.name)}
290+
_ = stats.RecordWithTags(session.Context(), statsTags, statPartitionClose.M(1))
291+
return nil
292+
}
293+
294+
func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
295+
c.logger.Info("Starting consumer group", zap.Int32("partition", claim.Partition()))
296+
for message := range claim.Messages() {
297+
c.logger.Debug("Kafka message claimed",
298+
zap.String("value", string(message.Value)),
299+
zap.Time("timestamp", message.Timestamp),
300+
zap.String("topic", message.Topic))
301+
session.MarkMessage(message, "")
302+
303+
ctx := obsreport.ReceiverContext(session.Context(), c.name, transport)
304+
ctx = obsreport.StartTraceDataReceiveOp(ctx, c.name, transport)
305+
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.name)}
306+
_ = stats.RecordWithTags(ctx, statsTags,
307+
statMessageCount.M(1),
308+
statMessageOffset.M(message.Offset),
309+
statMessageOffsetLag.M(claim.HighWaterMarkOffset()-message.Offset-1))
310+
311+
logs, err := c.unmarshaller.Unmarshal(message.Value)
312+
if err != nil {
313+
c.logger.Error("failed to unmarshall message", zap.Error(err))
314+
return err
315+
}
316+
317+
err = c.nextConsumer.ConsumeLogs(session.Context(), logs)
318+
// TODO
319+
obsreport.EndTraceDataReceiveOp(ctx, c.unmarshaller.Encoding(), logs.LogRecordCount(), err)
320+
if err != nil {
321+
return err
322+
}
323+
}
324+
return nil
325+
}

0 commit comments

Comments
 (0)