Skip to content

feat: expose Redpanda's listener in the docker network #1994

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/modules/redpanda.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,26 @@ for Redpanda. E.g. `testcontainers.WithImage("docker.redpanda.com/redpandadata/r

If you need to enable TLS use `WithTLS` with a valid PEM encoded certificate and key.

#### Additional Listener

There are scenarios where additional listeners are needed, for example if you
want to consume/from another container in the same network

You can use the `WithListener` option to add a listener to the Redpanda container.
<!--codeinclude-->
[Register additional listener](../../modules/redpanda/redpanda_test.go) inside_block:withListenerRP
<!--/codeinclude-->

Container defined in the same network
<!--codeinclude-->
[Start Kcat container](../../modules/redpanda/redpanda_test.go) inside_block:withListenerKcat
<!--/codeinclude-->

Produce messages using the new registered listener
<!--codeinclude-->
[Produce/consume via registered listener](../../modules/redpanda/redpanda_test.go) inside_block:withListenerExec
<!--/codeinclude-->

### Container Methods

The Redpanda container exposes the following methods:
Expand Down
13 changes: 13 additions & 0 deletions modules/redpanda/mounts/redpanda.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,26 @@ redpanda:
port: 9093
authentication_method: {{ if .KafkaAPI.EnableAuthorization }}sasl{{ else }}none{{ end }}

{{ range .KafkaAPI.Listeners }}
- address: 0.0.0.0
name: {{ .Address }}
port: {{ .Port }}
authentication_method: {{ .AuthenticationMethod }}
{{ end }}

advertised_kafka_api:
- address: {{ .KafkaAPI.AdvertisedHost }}
name: external
port: {{ .KafkaAPI.AdvertisedPort }}
- address: 127.0.0.1
name: internal
port: 9093
{{ range .KafkaAPI.Listeners }}
- address: {{ .Address }}
name: {{ .Address }}
port: {{ .Port }}
{{ end }}


{{ if .EnableTLS }}
admin_api_tls:
Expand Down
39 changes: 38 additions & 1 deletion modules/redpanda/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package redpanda

import "github.com/testcontainers/testcontainers-go"
import (
"net"
"strconv"

"github.com/testcontainers/testcontainers-go"
)

type options struct {
// Superusers is a list of service account names.
Expand Down Expand Up @@ -29,7 +34,12 @@ type options struct {

// EnableTLS is a flag to enable TLS.
EnableTLS bool

cert, key []byte

// Listeners is a list of custom listeners that can be provided to access the
// containers form within docker networks
Listeners []listener
}

func defaultOptions() options {
Expand All @@ -41,6 +51,7 @@ func defaultOptions() options {
ServiceAccounts: make(map[string]string, 0),
AutoCreateTopics: false,
EnableTLS: false,
Listeners: make([]listener, 0),
}
}

Expand Down Expand Up @@ -86,6 +97,8 @@ func WithEnableKafkaAuthorization() Option {
}
}

// WithEnableSchemaRegistryHTTPBasicAuth enables HTTP basic authentication for
// Schema Registry.
func WithEnableSchemaRegistryHTTPBasicAuth() Option {
return func(o *options) {
o.SchemaRegistryAuthenticationMethod = "http_basic"
Expand All @@ -106,3 +119,27 @@ func WithTLS(cert, key []byte) Option {
o.key = key
}
}

// WithListener adds a custom listener to the Redpanda containers. Listener
// will be aliases to all networks, so they can be accessed from within docker
// networks. At leas one network must be attached to the container, if not an
// error will be thrown when starting the container.
func WithListener(lis string) Option {
host, port, err := net.SplitHostPort(lis)
if err != nil {
return func(o *options) {}
}

portInt, err := strconv.Atoi(port)
if err != nil {
return func(o *options) {}
}

return func(o *options) {
o.Listeners = append(o.Listeners, listener{
Address: host,
Port: portInt,
AuthenticationMethod: o.KafkaAuthenticationMethod,
})
}
}
49 changes: 44 additions & 5 deletions modules/redpanda/redpanda.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
_ "embed"
"fmt"
"math"
"os"
"path/filepath"
"text/template"
Expand All @@ -31,6 +32,7 @@ const (
defaultKafkaAPIPort = "9092/tcp"
defaultAdminAPIPort = "9644/tcp"
defaultSchemaRegistryPort = "8081/tcp"
defaultDockerKafkaApiPort = "29092"

redpandaDir = "/etc/redpanda"
entrypointFile = "/entrypoint-tc.sh"
Expand Down Expand Up @@ -98,6 +100,12 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
return nil, fmt.Errorf("failed to create entrypoint file: %w", err)
}

// 4. Register extra kafka listeners if provided, network aliases will be
// set
if err := registerListeners(ctx, settings, req); err != nil {
return nil, fmt.Errorf("failed to register listeners: %w", err)
}

// Bootstrap config file contains cluster configurations which will only be considered
// the very first time you start a cluster.
bootstrapConfigPath := filepath.Join(tmpDir, bootstrapConfigFile)
Expand All @@ -122,7 +130,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
},
)

// 4. Create certificate and key for TLS connections.
// 5. Create certificate and key for TLS connections.
if settings.EnableTLS {
certPath := filepath.Join(tmpDir, certFile)
if err := os.WriteFile(certPath, settings.cert, 0o600); err != nil {
Expand Down Expand Up @@ -152,7 +160,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
return nil, err
}

// 5. Get mapped port for the Kafka API, so that we can render and then mount
// 6. Get mapped port for the Kafka API, so that we can render and then mount
// the Redpanda config with the advertised Kafka address.
hostIP, err := container.Host(ctx)
if err != nil {
Expand All @@ -164,7 +172,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
return nil, fmt.Errorf("failed to get mapped Kafka port: %w", err)
}

// 6. Render redpanda.yaml config and mount it.
// 7. Render redpanda.yaml config and mount it.
nodeConfig, err := renderNodeConfig(settings, hostIP, kafkaPort.Int())
if err != nil {
return nil, fmt.Errorf("failed to render node config: %w", err)
Expand All @@ -175,7 +183,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
return nil, fmt.Errorf("failed to copy redpanda.yaml into container: %w", err)
}

// 6. Wait until Redpanda is ready to serve requests
// 8. Wait until Redpanda is ready to serve requests
err = wait.ForAll(
wait.ForListeningPort(defaultKafkaAPIPort),
wait.ForLog("Successfully started Redpanda!").WithPollInterval(100*time.Millisecond)).
Expand All @@ -185,7 +193,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
return nil, fmt.Errorf("failed to wait for Redpanda readiness: %w", err)
}

// 7. Create Redpanda Service Accounts if configured to do so.
// 9. Create Redpanda Service Accounts if configured to do so.
if len(settings.ServiceAccounts) > 0 {
adminAPIPort, err := container.MappedPort(ctx, nat.Port(defaultAdminAPIPort))
if err != nil {
Expand Down Expand Up @@ -252,6 +260,29 @@ func renderBootstrapConfig(settings options) ([]byte, error) {
return bootstrapConfig.Bytes(), nil
}

// registerListeners validates that the provided listeners are valid and set network aliases for the provided addresses.
// The container must be attached to at least one network.
func registerListeners(ctx context.Context, settings options, req testcontainers.GenericContainerRequest) error {
if len(settings.Listeners) == 0 {
return nil
}

if len(req.Networks) == 0 {
return fmt.Errorf("container must be attached to at least one network")
}

for _, listener := range settings.Listeners {
if listener.Port < 0 || listener.Port > math.MaxUint16 {
return fmt.Errorf("invalid port on listener %s:%d (must be between 0 and 65535)", listener.Address, listener.Port)
}

for _, network := range req.Networks {
req.NetworkAliases[network] = append(req.NetworkAliases[network], listener.Address)
}
Comment on lines +279 to +281
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one issue with this approach that Java implementation still have is that when adding Toxiproxy upfront Kafka container and using 0.0.0.0:<random-toxiproxy-port> as a listener the registration will fail because will try to add 0.0.0.0 as an alias. I forgot to mention that when suggesting the approach initially.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sago2k8 @eddumelendez would you like to collaborate in a follow-up PR for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, thanks for your comments, yes I could work on it, do you happen to have a suggestion of a expected behaviour for that specific case

}
return nil
}

// renderNodeConfig renders the redpanda.yaml node config and returns it as
// byte array.
func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int) ([]byte, error) {
Expand All @@ -262,6 +293,7 @@ func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int)
AdvertisedPort: advertisedKafkaPort,
AuthenticationMethod: settings.KafkaAuthenticationMethod,
EnableAuthorization: settings.KafkaEnableAuthorization,
Listeners: settings.Listeners,
},
SchemaRegistry: redpandaConfigTplParamsSchemaRegistry{
AuthenticationMethod: settings.SchemaRegistryAuthenticationMethod,
Expand Down Expand Up @@ -300,8 +332,15 @@ type redpandaConfigTplParamsKafkaAPI struct {
AdvertisedPort int
AuthenticationMethod string
EnableAuthorization bool
Listeners []listener
}

type redpandaConfigTplParamsSchemaRegistry struct {
AuthenticationMethod string
}

type listener struct {
Address string
Port int
AuthenticationMethod string
}
117 changes: 117 additions & 0 deletions modules/redpanda/redpanda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net/http"
"strings"
"testing"
Expand All @@ -16,6 +17,9 @@ import (
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/scram"

"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/network"
)

func TestRedpanda(t *testing.T) {
Expand Down Expand Up @@ -278,6 +282,119 @@ func TestRedpandaWithTLS(t *testing.T) {
require.Error(t, results.FirstErr(), kerr.UnknownTopicOrPartition)
}

func TestRedpandaListener_Simple(t *testing.T) {
ctx := context.Background()

// 1. Create network
rpNetwork, err := network.New(ctx, network.WithCheckDuplicate())
require.NoError(t, err)

// 2. Start Redpanda container
// withListenerRP {
container, err := RunContainer(ctx,
testcontainers.WithImage("redpandadata/redpanda:v23.2.18"),
network.WithNetwork([]string{"redpanda-host"}, rpNetwork),
WithListener("redpanda:29092"), WithAutoCreateTopics(),
)
// }
require.NoError(t, err)

// 3. Start KCat container
// withListenerKcat {
kcat, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "confluentinc/cp-kcat:7.4.1",
Networks: []string{
rpNetwork.Name,
},
Entrypoint: []string{
"sh",
},
Cmd: []string{
"-c",
"tail -f /dev/null",
},
},
Started: true,
})
// }

require.NoError(t, err)

// 4. Copy message to kcat
err = kcat.CopyToContainer(ctx, []byte("Message produced by kcat"), "/tmp/msgs.txt", 700)
require.NoError(t, err)

// 5. Produce message to Redpanda
// withListenerExec {
_, _, err = kcat.Exec(ctx, []string{"kcat", "-b", "redpanda:29092", "-t", "msgs", "-P", "-l", "/tmp/msgs.txt"})
// }

require.NoError(t, err)

// 6. Consume message from Redpanda
_, stdout, err := kcat.Exec(ctx, []string{"kcat", "-b", "redpanda:29092", "-C", "-t", "msgs", "-c", "1"})
require.NoError(t, err)

// 7. Read Message from stdout
out, err := io.ReadAll(stdout)
require.NoError(t, err)

require.Contains(t, string(out), "Message produced by kcat")

t.Cleanup(func() {
if err := kcat.Terminate(ctx); err != nil {
t.Fatalf("failed to terminate kcat container: %s", err)
}
if err := container.Terminate(ctx); err != nil {
t.Fatalf("failed to terminate redpanda container: %s", err)
}

if err := rpNetwork.Remove(ctx); err != nil {
t.Fatalf("failed to remove network: %s", err)
}
})
}

func TestRedpandaListener_InvalidPort(t *testing.T) {
ctx := context.Background()

// 1. Create network
RPNetwork, err := network.New(ctx, network.WithCheckDuplicate())
require.NoError(t, err)

// 2. Attempt Start Redpanda container
_, err = RunContainer(ctx,
testcontainers.WithImage("redpandadata/redpanda:v23.2.18"),
WithListener("redpanda:99092"),
network.WithNetwork([]string{"redpanda-host"}, RPNetwork),
)

require.Error(t, err)

require.Contains(t, err.Error(), "invalid port on listener redpanda:99092")

t.Cleanup(func() {
if err := RPNetwork.Remove(ctx); err != nil {
t.Fatalf("failed to remove network: %s", err)
}
})
}

func TestRedpandaListener_NoNetwork(t *testing.T) {
ctx := context.Background()

// 1. Attempt Start Redpanda container
_, err := RunContainer(ctx,
testcontainers.WithImage("redpandadata/redpanda:v23.2.18"),
WithListener("redpanda:99092"),
)

require.Error(t, err)

require.Contains(t, err.Error(), "container must be attached to at least one network")
}

// localhostCert is a PEM-encoded TLS cert with SAN IPs
// generated from src/crypto/tls:
// go run generate_cert.go --rsa-bits 2048 --host 127.0.0.1,::1,localhost --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h
Expand Down