From 14d6d05f058ef841541fda5605afa445e7b4f30f Mon Sep 17 00:00:00 2001 From: Santiago Jimenez Giraldo Date: Fri, 15 Dec 2023 01:43:13 +0100 Subject: [PATCH 1/4] feat: implement WithListener for Redpanda module New function WithListener lets you add a new listener to the redpanda container that can be used within any of the docker networks of the container Signed-off-by: Santiago Jimenez Giraldo --- modules/redpanda/mounts/redpanda.yaml.tpl | 13 ++++++ modules/redpanda/options.go | 39 +++++++++++++++++- modules/redpanda/redpanda.go | 49 ++++++++++++++++++++--- 3 files changed, 95 insertions(+), 6 deletions(-) diff --git a/modules/redpanda/mounts/redpanda.yaml.tpl b/modules/redpanda/mounts/redpanda.yaml.tpl index a19d21ce19..935e1923ef 100644 --- a/modules/redpanda/mounts/redpanda.yaml.tpl +++ b/modules/redpanda/mounts/redpanda.yaml.tpl @@ -19,6 +19,13 @@ redpanda: port: 9093 authentication_method: {{ if .KafkaAPI.EnableAuthorization }}sasl{{ else }}none{{ end }} + {{ range .KafkaAPI.Listeners }} + - address: 0.0.0.0 + name: {{ .Address }} + port: {{ .Port }} + authentication_method: {{ .AuthenticationMethod }} + {{ end }} + advertised_kafka_api: - address: {{ .KafkaAPI.AdvertisedHost }} name: external @@ -26,6 +33,12 @@ redpanda: - address: 127.0.0.1 name: internal port: 9093 + {{ range .KafkaAPI.Listeners }} + - address: {{ .Address }} + name: {{ .Address }} + port: {{ .Port }} + {{ end }} + {{ if .EnableTLS }} admin_api_tls: diff --git a/modules/redpanda/options.go b/modules/redpanda/options.go index 379492b95d..1d4afcf8af 100644 --- a/modules/redpanda/options.go +++ b/modules/redpanda/options.go @@ -1,6 +1,11 @@ package redpanda -import "github.com/testcontainers/testcontainers-go" +import ( + "net" + "strconv" + + "github.com/testcontainers/testcontainers-go" +) type options struct { // Superusers is a list of service account names. @@ -29,7 +34,12 @@ type options struct { // EnableTLS is a flag to enable TLS. EnableTLS bool + cert, key []byte + + // Listeners is a list of custom listeners that can be provided to access the + // containers form within docker networks + Listeners []listener } func defaultOptions() options { @@ -41,6 +51,7 @@ func defaultOptions() options { ServiceAccounts: make(map[string]string, 0), AutoCreateTopics: false, EnableTLS: false, + Listeners: make([]listener, 0), } } @@ -86,6 +97,8 @@ func WithEnableKafkaAuthorization() Option { } } +// WithEnableSchemaRegistryHTTPBasicAuth enables HTTP basic authentication for +// Schema Registry. func WithEnableSchemaRegistryHTTPBasicAuth() Option { return func(o *options) { o.SchemaRegistryAuthenticationMethod = "http_basic" @@ -106,3 +119,27 @@ func WithTLS(cert, key []byte) Option { o.key = key } } + +// WithListener adds a custom listener to the Redpanda containers. Listener +// will be aliases to all networks, so they can be accessed from within docker +// networks. At leas one network must be attached to the container, if not an +// error will be thrown when starting the container. +func WithListener(lis string) Option { + host, port, err := net.SplitHostPort(lis) + if err != nil { + return func(o *options) {} + } + + portInt, err := strconv.Atoi(port) + if err != nil { + return func(o *options) {} + } + + return func(o *options) { + o.Listeners = append(o.Listeners, listener{ + Address: host, + Port: portInt, + AuthenticationMethod: o.KafkaAuthenticationMethod, + }) + } +} diff --git a/modules/redpanda/redpanda.go b/modules/redpanda/redpanda.go index fd885fbc07..3a30a24e75 100644 --- a/modules/redpanda/redpanda.go +++ b/modules/redpanda/redpanda.go @@ -5,6 +5,7 @@ import ( "context" _ "embed" "fmt" + "math" "os" "path/filepath" "text/template" @@ -31,6 +32,7 @@ const ( defaultKafkaAPIPort = "9092/tcp" defaultAdminAPIPort = "9644/tcp" defaultSchemaRegistryPort = "8081/tcp" + defaultDockerKafkaApiPort = "29092" redpandaDir = "/etc/redpanda" entrypointFile = "/entrypoint-tc.sh" @@ -98,6 +100,12 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize return nil, fmt.Errorf("failed to create entrypoint file: %w", err) } + // 4. Register extra kafka listeners if provided, network aliases will be + // set + if err := registerListeners(ctx, settings, req); err != nil { + return nil, fmt.Errorf("failed to register listeners: %w", err) + } + // Bootstrap config file contains cluster configurations which will only be considered // the very first time you start a cluster. bootstrapConfigPath := filepath.Join(tmpDir, bootstrapConfigFile) @@ -122,7 +130,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize }, ) - // 4. Create certificate and key for TLS connections. + // 5. Create certificate and key for TLS connections. if settings.EnableTLS { certPath := filepath.Join(tmpDir, certFile) if err := os.WriteFile(certPath, settings.cert, 0o600); err != nil { @@ -152,7 +160,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize return nil, err } - // 5. Get mapped port for the Kafka API, so that we can render and then mount + // 6. Get mapped port for the Kafka API, so that we can render and then mount // the Redpanda config with the advertised Kafka address. hostIP, err := container.Host(ctx) if err != nil { @@ -164,7 +172,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize return nil, fmt.Errorf("failed to get mapped Kafka port: %w", err) } - // 6. Render redpanda.yaml config and mount it. + // 7. Render redpanda.yaml config and mount it. nodeConfig, err := renderNodeConfig(settings, hostIP, kafkaPort.Int()) if err != nil { return nil, fmt.Errorf("failed to render node config: %w", err) @@ -175,7 +183,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize return nil, fmt.Errorf("failed to copy redpanda.yaml into container: %w", err) } - // 6. Wait until Redpanda is ready to serve requests + // 8. Wait until Redpanda is ready to serve requests err = wait.ForAll( wait.ForListeningPort(defaultKafkaAPIPort), wait.ForLog("Successfully started Redpanda!").WithPollInterval(100*time.Millisecond)). @@ -185,7 +193,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize return nil, fmt.Errorf("failed to wait for Redpanda readiness: %w", err) } - // 7. Create Redpanda Service Accounts if configured to do so. + // 9. Create Redpanda Service Accounts if configured to do so. if len(settings.ServiceAccounts) > 0 { adminAPIPort, err := container.MappedPort(ctx, nat.Port(defaultAdminAPIPort)) if err != nil { @@ -252,6 +260,29 @@ func renderBootstrapConfig(settings options) ([]byte, error) { return bootstrapConfig.Bytes(), nil } +// registerListeners validates that the provided listeners are valid and set network aliases for the provided addresses. +// The container must be attached to at least one network. +func registerListeners(ctx context.Context, settings options, req testcontainers.GenericContainerRequest) error { + if len(settings.Listeners) == 0 { + return nil + } + + if len(req.Networks) == 0 { + return fmt.Errorf("container must be attached to at least one network") + } + + for _, listener := range settings.Listeners { + if listener.Port < 0 || listener.Port > math.MaxUint16 { + return fmt.Errorf("invalid port on listener %s:%d (must be between 0 and 65535)", listener.Address, listener.Port) + } + + for _, network := range req.Networks { + req.NetworkAliases[network] = append(req.NetworkAliases[network], listener.Address) + } + } + return nil +} + // renderNodeConfig renders the redpanda.yaml node config and returns it as // byte array. func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int) ([]byte, error) { @@ -262,6 +293,7 @@ func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int) AdvertisedPort: advertisedKafkaPort, AuthenticationMethod: settings.KafkaAuthenticationMethod, EnableAuthorization: settings.KafkaEnableAuthorization, + Listeners: settings.Listeners, }, SchemaRegistry: redpandaConfigTplParamsSchemaRegistry{ AuthenticationMethod: settings.SchemaRegistryAuthenticationMethod, @@ -300,8 +332,15 @@ type redpandaConfigTplParamsKafkaAPI struct { AdvertisedPort int AuthenticationMethod string EnableAuthorization bool + Listeners []listener } type redpandaConfigTplParamsSchemaRegistry struct { AuthenticationMethod string } + +type listener struct { + Address string + Port int + AuthenticationMethod string +} From ae85b931ade15fb83040c4661ee12c357588c8d0 Mon Sep 17 00:00:00 2001 From: Santiago Jimenez Giraldo Date: Tue, 19 Dec 2023 11:12:10 +0100 Subject: [PATCH 2/4] test: test listeners for Redpanda Container Add test for new WithListener function, validate connectivity and couple of asserts in the construction of the listener Signed-off-by: Santiago Jimenez Giraldo --- modules/redpanda/redpanda_test.go | 109 ++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/modules/redpanda/redpanda_test.go b/modules/redpanda/redpanda_test.go index 7ad424b3bd..1debb02df4 100644 --- a/modules/redpanda/redpanda_test.go +++ b/modules/redpanda/redpanda_test.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "io" "net/http" "strings" "testing" @@ -12,6 +13,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/network" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" @@ -278,6 +281,112 @@ func TestRedpandaWithTLS(t *testing.T) { require.Error(t, results.FirstErr(), kerr.UnknownTopicOrPartition) } +func TestRedpandaListener_Simple(t *testing.T) { + ctx := context.Background() + // 1. Create network + rpNetwork, err := network.New(ctx, network.WithCheckDuplicate()) + require.NoError(t, err) + + // 2. Start Redpanda container + container, err := RunContainer(ctx, + testcontainers.WithImage("redpandadata/redpanda:v23.2.18"), + network.WithNetwork([]string{"redpanda-host"}, rpNetwork), + WithListener("redpanda:29092"), WithAutoCreateTopics(), + ) + require.NoError(t, err) + + // 3. Start KCat container + kcat, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: "confluentinc/cp-kcat:7.4.1", + Networks: []string{ + rpNetwork.Name, + }, + Entrypoint: []string{ + "sh", + }, + Cmd: []string{ + "-c", + "tail -f /dev/null", + }, + }, + Started: true, + }) + + require.NoError(t, err) + + // 4. Copy message to kcat + err = kcat.CopyToContainer(ctx, []byte("Message produced by kcat"), "/tmp/msgs.txt", 700) + require.NoError(t, err) + + // 5. Produce mesaage to Redpanda + _, _, err = kcat.Exec(ctx, []string{"kcat", "-b", "redpanda:29092", "-t", "msgs", "-P", "-l", "/tmp/msgs.txt"}) + + require.NoError(t, err) + + // 6. Consume message from Redpanda + _, stdout, err := kcat.Exec(ctx, []string{"kcat", "-b", "redpanda:29092", "-C", "-t", "msgs", "-c", "1"}) + require.NoError(t, err) + + // 7. Read Message from stdout + out, err := io.ReadAll(stdout) + require.NoError(t, err) + + require.Contains(t, string(out), "Message produced by kcat") + + t.Cleanup(func() { + if err := kcat.Terminate(ctx); err != nil { + t.Fatalf("failed to terminate kcat container: %s", err) + } + if err := container.Terminate(ctx); err != nil { + t.Fatalf("failed to terminate redpanda container: %s", err) + } + + if err := rpNetwork.Remove(ctx); err != nil { + t.Fatalf("failed to remove network: %s", err) + } + }) +} + +func TestRedpandaListener_InvalidPort(t *testing.T) { + ctx := context.Background() + + // 1. Create network + RPNetwork, err := network.New(ctx, network.WithCheckDuplicate()) + require.NoError(t, err) + + // 2. Attemp Start Redpanda container + _, err = RunContainer(ctx, + testcontainers.WithImage("redpandadata/redpanda:v23.2.18"), + WithListener("redpanda:99092"), + network.WithNetwork([]string{"redpanda-host"}, RPNetwork), + ) + + require.Error(t, err) + + require.Contains(t, err.Error(), "invalid port on listener redpanda:99092") + + t.Cleanup(func() { + if err := RPNetwork.Remove(ctx); err != nil { + t.Fatalf("failed to remove network: %s", err) + } + }) +} + +func TestRedpandaListener_NoNetwork(t *testing.T) { + ctx := context.Background() + + // 1. Attemp Start Redpanda container + _, err := RunContainer(ctx, + testcontainers.WithImage("redpandadata/redpanda:v23.2.18"), + WithListener("redpanda:99092"), + ) + + require.Error(t, err) + + require.Contains(t, err.Error(), "container must be attached to at least one network") +} + // localhostCert is a PEM-encoded TLS cert with SAN IPs // generated from src/crypto/tls: // go run generate_cert.go --rsa-bits 2048 --host 127.0.0.1,::1,localhost --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h From 9b9fd50806791f5b94d365e205ab83650ca08019 Mon Sep 17 00:00:00 2001 From: Santiago Jimenez Giraldo Date: Wed, 10 Jan 2024 21:18:23 +0100 Subject: [PATCH 3/4] docs: add documentation for WithListener option Redpanda Add documentation for the additional listener option (WithListener) for the Redpanda module Signed-off-by: Santiago Jimenez Giraldo --- docs/modules/redpanda.md | 20 ++++++++++++++++++++ modules/redpanda/redpanda_test.go | 9 ++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/docs/modules/redpanda.md b/docs/modules/redpanda.md index c7d8cd36d9..ad2223ce83 100644 --- a/docs/modules/redpanda.md +++ b/docs/modules/redpanda.md @@ -51,6 +51,26 @@ for Redpanda. E.g. `testcontainers.WithImage("docker.redpanda.com/redpandadata/r If you need to enable TLS use `WithTLS` with a valid PEM encoded certificate and key. +#### Additional Listener + +There are scenarios where additional listeners are needed, for example if you +want to consume/from another container in the same network + +You can use the `WithListener` option to add a listener to the Redpanda container. + +[Register additional listener](../../modules/redpanda/redpanda_test.go) inside_block:withListenerRP + + +Container defined in the same network + +[Start Kcat container](../../modules/redpanda/redpanda_test.go) inside_block:withListenerKcat + + +Produce messages using the new registered listener + +[Produce/consume via registered listener](../../modules/redpanda/redpanda_test.go) inside_block:withListenerExec + + ### Container Methods The Redpanda container exposes the following methods: diff --git a/modules/redpanda/redpanda_test.go b/modules/redpanda/redpanda_test.go index 1debb02df4..a569bd2d08 100644 --- a/modules/redpanda/redpanda_test.go +++ b/modules/redpanda/redpanda_test.go @@ -283,19 +283,23 @@ func TestRedpandaWithTLS(t *testing.T) { func TestRedpandaListener_Simple(t *testing.T) { ctx := context.Background() + // 1. Create network rpNetwork, err := network.New(ctx, network.WithCheckDuplicate()) require.NoError(t, err) // 2. Start Redpanda container + // withListenerRP { container, err := RunContainer(ctx, testcontainers.WithImage("redpandadata/redpanda:v23.2.18"), network.WithNetwork([]string{"redpanda-host"}, rpNetwork), WithListener("redpanda:29092"), WithAutoCreateTopics(), ) + // } require.NoError(t, err) // 3. Start KCat container + // withListenerKcat { kcat, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: testcontainers.ContainerRequest{ Image: "confluentinc/cp-kcat:7.4.1", @@ -312,6 +316,7 @@ func TestRedpandaListener_Simple(t *testing.T) { }, Started: true, }) + // } require.NoError(t, err) @@ -319,8 +324,10 @@ func TestRedpandaListener_Simple(t *testing.T) { err = kcat.CopyToContainer(ctx, []byte("Message produced by kcat"), "/tmp/msgs.txt", 700) require.NoError(t, err) - // 5. Produce mesaage to Redpanda + // 5. Produce message to Redpanda + // withListenerExec { _, _, err = kcat.Exec(ctx, []string{"kcat", "-b", "redpanda:29092", "-t", "msgs", "-P", "-l", "/tmp/msgs.txt"}) + // } require.NoError(t, err) From 9525546b004945143ed900c50c9b638e51970de0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Thu, 11 Jan 2024 17:26:08 +0100 Subject: [PATCH 4/4] fix: run make lint --- modules/redpanda/redpanda_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/modules/redpanda/redpanda_test.go b/modules/redpanda/redpanda_test.go index a569bd2d08..ced487cadd 100644 --- a/modules/redpanda/redpanda_test.go +++ b/modules/redpanda/redpanda_test.go @@ -13,12 +13,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/testcontainers/testcontainers-go" - "github.com/testcontainers/testcontainers-go/network" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/sasl/scram" + + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/network" ) func TestRedpanda(t *testing.T) { @@ -362,7 +363,7 @@ func TestRedpandaListener_InvalidPort(t *testing.T) { RPNetwork, err := network.New(ctx, network.WithCheckDuplicate()) require.NoError(t, err) - // 2. Attemp Start Redpanda container + // 2. Attempt Start Redpanda container _, err = RunContainer(ctx, testcontainers.WithImage("redpandadata/redpanda:v23.2.18"), WithListener("redpanda:99092"), @@ -383,7 +384,7 @@ func TestRedpandaListener_InvalidPort(t *testing.T) { func TestRedpandaListener_NoNetwork(t *testing.T) { ctx := context.Background() - // 1. Attemp Start Redpanda container + // 1. Attempt Start Redpanda container _, err := RunContainer(ctx, testcontainers.WithImage("redpandadata/redpanda:v23.2.18"), WithListener("redpanda:99092"),