Skip to content

Commit e979e41

Browse files
xavirgMovieStoreGuyatoulme
authored
[exporter/kafkaexporter] Skip Username and Password config validation for IAM (#37417)
#### Description In `kafka.SASLConfig`, Username and Password fields [are always required](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/kafkaexporter/config.go#L138-L144): ``` func validateSASLConfig(c *kafka.SASLConfig) error { if c == nil { return nil } if c.Username == "" { return fmt.Errorf("auth.sasl.username is required") } if c.Password == "" { return fmt.Errorf("auth.sasl.password is required") } ``` However, when the authentication mechanism is set to either `AWS_MSK_IAM` or `AWS_MSK_IAM_OAUTHBEARER`, those properties are never used: ``` exporters: kafka/example: auth: sasl: username: "foo" # Not used password: "bar" # Not used mechanism: "AWS_MSK_IAM_OAUTHBEARER" aws_msk: region: us-east-1 ``` Implemented logic to bypass User and Password validation when the mechanism is set to AWS IAM. #### Testing Added a test to make sure kafka.SASLConfig.Username and kafka.SASLConfig.Password are not required --------- Co-authored-by: Sean Marciniak <[email protected]> Co-authored-by: Antoine Toulme <[email protected]> Co-authored-by: Antoine Toulme <[email protected]>
1 parent 7f43cc5 commit e979e41

File tree

3 files changed

+50
-6
lines changed

3 files changed

+50
-6
lines changed

.chloggen/main.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: exporter/kafka
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: do not ask for user and password if auth mechanism is set to AWS IAM
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37417]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

exporter/kafkaexporter/config.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,13 @@ func validateSASLConfig(c *kafka.SASLConfig) error {
135135
return nil
136136
}
137137

138-
if c.Username == "" {
139-
return fmt.Errorf("auth.sasl.username is required")
140-
}
141-
142-
if c.Password == "" {
143-
return fmt.Errorf("auth.sasl.password is required")
138+
if c.Mechanism != "AWS_MSK_IAM" && c.Mechanism != "AWS_MSK_IAM_OAUTHBEARER" {
139+
if c.Username == "" {
140+
return fmt.Errorf("auth.sasl.username is required")
141+
}
142+
if c.Password == "" {
143+
return fmt.Errorf("auth.sasl.password is required")
144+
}
144145
}
145146

146147
switch c.Mechanism {

exporter/kafkaexporter/config_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,22 @@ func TestValidate_sasl_version(t *testing.T) {
295295
assert.EqualError(t, err, "auth.sasl.version has to be either 0 or 1. configured value 42")
296296
}
297297

298+
func TestValidate_sasl_iam(t *testing.T) {
299+
config := &Config{
300+
Producer: Producer{
301+
Compression: "none",
302+
},
303+
Authentication: kafka.Authentication{
304+
SASL: &kafka.SASLConfig{
305+
Mechanism: "AWS_MSK_IAM",
306+
},
307+
},
308+
}
309+
310+
err := config.Validate()
311+
assert.NoError(t, err)
312+
}
313+
298314
func Test_saramaProducerCompressionCodec(t *testing.T) {
299315
tests := map[string]struct {
300316
compression string

0 commit comments

Comments
 (0)