7
7
"context"
8
8
"fmt"
9
9
"hash/fnv"
10
+ "strings"
10
11
"time"
11
12
12
13
"github.com/aws/aws-msk-iam-sasl-signer-go/signer"
@@ -34,7 +35,7 @@ const (
34
35
)
35
36
36
37
// NewFranzSyncProducer creates a new Kafka client using the franz-go library.
37
- func NewFranzSyncProducer (clientCfg configkafka.ClientConfig ,
38
+ func NewFranzSyncProducer (ctx context. Context , clientCfg configkafka.ClientConfig ,
38
39
cfg configkafka.ProducerConfig ,
39
40
timeout time.Duration ,
40
41
logger * zap.Logger ,
@@ -45,15 +46,16 @@ func NewFranzSyncProducer(clientCfg configkafka.ClientConfig,
45
46
default :
46
47
codec = codec .WithLevel (int (cfg .CompressionParams .Level ))
47
48
}
48
- opts := []kgo.Opt {
49
- kgo .SeedBrokers (clientCfg .Brokers ... ),
50
- kgo .WithLogger (kzap .New (logger .Named ("kafka" ))),
49
+ opts , err := commonOpts (ctx , clientCfg , logger ,
51
50
kgo .ProduceRequestTimeout (timeout ),
52
51
kgo .ProducerBatchCompression (codec ),
53
52
// Use the UniformBytesPartitioner that is the default in franz-go with
54
53
// the legacy compatibility sarama hashing to avoid hashing to different
55
54
// partitions in case partitioning is enabled.
56
55
kgo .RecordPartitioner (newSaramaCompatPartitioner ()),
56
+ )
57
+ if err != nil {
58
+ return nil , err
57
59
}
58
60
// Configure required acks
59
61
switch cfg .RequiredAcks {
@@ -68,17 +70,128 @@ func NewFranzSyncProducer(clientCfg configkafka.ClientConfig,
68
70
opts = append (opts , kgo .DisableIdempotentWrite ())
69
71
opts = append (opts , kgo .RequiredAcks (kgo .LeaderAck ()))
70
72
}
73
+
74
+ // Configure max message size
75
+ if cfg .MaxMessageBytes > 0 {
76
+ opts = append (opts , kgo .ProducerBatchMaxBytes (
77
+ int32 (cfg .MaxMessageBytes ),
78
+ ))
79
+ }
80
+ // Configure batch size
81
+ if cfg .FlushMaxMessages > 0 {
82
+ opts = append (opts , kgo .MaxBufferedRecords (cfg .FlushMaxMessages ))
83
+ }
84
+
85
+ return kgo .NewClient (opts ... )
86
+ }
87
+
88
+ // NewFranzConsumerGroup creates a new Kafka consumer client using the franz-go library.
89
+ func NewFranzConsumerGroup (ctx context.Context , clientCfg configkafka.ClientConfig ,
90
+ consumerCfg configkafka.ConsumerConfig ,
91
+ topics []string ,
92
+ logger * zap.Logger ,
93
+ opts ... kgo.Opt ,
94
+ ) (* kgo.Client , error ) {
95
+ opts , err := commonOpts (ctx , clientCfg , logger , append ([]kgo.Opt {
96
+ kgo .ConsumeTopics (topics ... ),
97
+ kgo .ConsumerGroup (consumerCfg .GroupID ),
98
+ }, opts ... )... )
99
+ if err != nil {
100
+ return nil , err
101
+ }
102
+
103
+ for _ , t := range topics {
104
+ // Similar to librdkafka, if the topic starts with `^`, it is a regex topic:
105
+ // https://github.com/confluentinc/librdkafka/blob/b871fdabab84b2ea1be3866a2ded4def7e31b006/src/rdkafka.h#L3899-L3938
106
+ if strings .HasPrefix (t , "^" ) {
107
+ opts = append (opts , kgo .ConsumeRegex ())
108
+ break
109
+ }
110
+ }
111
+
112
+ // Configure session timeout
113
+ if consumerCfg .SessionTimeout > 0 {
114
+ opts = append (opts , kgo .SessionTimeout (consumerCfg .SessionTimeout ))
115
+ }
116
+
117
+ // Configure heartbeat interval
118
+ if consumerCfg .HeartbeatInterval > 0 {
119
+ opts = append (opts , kgo .HeartbeatInterval (consumerCfg .HeartbeatInterval ))
120
+ }
121
+
122
+ // Configure fetch sizes
123
+ if consumerCfg .MinFetchSize > 0 {
124
+ opts = append (opts , kgo .FetchMinBytes (consumerCfg .MinFetchSize ))
125
+ }
126
+ if consumerCfg .DefaultFetchSize > 0 {
127
+ opts = append (opts , kgo .FetchMaxBytes (consumerCfg .DefaultFetchSize ))
128
+ }
129
+
130
+ // Configure max fetch wait
131
+ if consumerCfg .MaxFetchWait > 0 {
132
+ opts = append (opts , kgo .FetchMaxWait (consumerCfg .MaxFetchWait ))
133
+ }
134
+
135
+ interval := consumerCfg .AutoCommit .Interval
136
+ if ! consumerCfg .AutoCommit .Enable {
137
+ // Set auto-commit interval to a very high value to "disable" it, but
138
+ // still allow using marks.
139
+ interval = time .Hour
140
+ }
141
+ // Configure auto-commit to use marks, this simplifies the committing
142
+ // logic and makes it more consistent with the Sarama client.
143
+ opts = append (opts , kgo .AutoCommitMarks (),
144
+ kgo .AutoCommitInterval (interval ),
145
+ )
146
+
147
+ // Configure the offset to reset to if an exception is found (or no current
148
+ // partition offset is found.
149
+ switch consumerCfg .InitialOffset {
150
+ case configkafka .EarliestOffset :
151
+ opts = append (opts , kgo .ConsumeResetOffset (kgo .NewOffset ().AtStart ()))
152
+ case configkafka .LatestOffset :
153
+ opts = append (opts , kgo .ConsumeResetOffset (kgo .NewOffset ().AtEnd ()))
154
+ }
155
+
156
+ // Configure group instance ID if provided
157
+ if consumerCfg .GroupInstanceID != "" {
158
+ opts = append (opts , kgo .InstanceID (consumerCfg .GroupInstanceID ))
159
+ }
160
+
161
+ // Configure rebalance strategy
162
+ switch consumerCfg .GroupRebalanceStrategy {
163
+ case "range" : // Sarama default.
164
+ opts = append (opts , kgo .Balancers (kgo .RangeBalancer ()))
165
+ case "roundrobin" :
166
+ opts = append (opts , kgo .Balancers (kgo .RoundRobinBalancer ()))
167
+ case "sticky" :
168
+ opts = append (opts , kgo .Balancers (kgo .StickyBalancer ()))
169
+ // NOTE(marclop): This is a new type of balancer, document accordingly.
170
+ case "cooperative-sticky" :
171
+ opts = append (opts , kgo .Balancers (kgo .CooperativeStickyBalancer ()))
172
+ }
173
+ return kgo .NewClient (opts ... )
174
+ }
175
+
176
+ func commonOpts (ctx context.Context , clientCfg configkafka.ClientConfig ,
177
+ logger * zap.Logger ,
178
+ opts ... kgo.Opt ,
179
+ ) ([]kgo.Opt , error ) {
180
+ opts = append (opts ,
181
+ kgo .WithLogger (kzap .New (logger .Named ("franz" ))),
182
+ kgo .SeedBrokers (clientCfg .Brokers ... ),
183
+ )
71
184
// Configure TLS if needed
72
185
if clientCfg .TLS != nil {
73
- tlsCfg , err := clientCfg .TLS .LoadTLSConfig (context . Background () )
186
+ tlsCfg , err := clientCfg .TLS .LoadTLSConfig (ctx )
74
187
if err != nil {
75
188
return nil , fmt .Errorf ("failed to load TLS config: %w" , err )
76
189
}
77
190
if tlsCfg != nil {
78
191
opts = append (opts , kgo .DialTLSConfig (tlsCfg ))
79
192
}
80
193
}
81
- // Configure Auth
194
+ // Configure authentication
82
195
if clientCfg .Authentication .PlainText != nil {
83
196
auth := plain.Auth {
84
197
User : clientCfg .Authentication .PlainText .Username ,
@@ -104,18 +217,7 @@ func NewFranzSyncProducer(clientCfg configkafka.ClientConfig,
104
217
if clientCfg .ClientID != "" {
105
218
opts = append (opts , kgo .ClientID (clientCfg .ClientID ))
106
219
}
107
- // Configure max message size
108
- if cfg .MaxMessageBytes > 0 {
109
- opts = append (opts , kgo .ProducerBatchMaxBytes (
110
- int32 (cfg .MaxMessageBytes ),
111
- ))
112
- }
113
- // Configure batch size
114
- if cfg .FlushMaxMessages > 0 {
115
- opts = append (opts , kgo .MaxBufferedRecords (cfg .FlushMaxMessages ))
116
- }
117
-
118
- return kgo .NewClient (opts ... )
220
+ return opts , nil
119
221
}
120
222
121
223
func configureKgoSASL (cfg * configkafka.SASLConfig ) (kgo.Opt , error ) {
0 commit comments