@@ -5,19 +5,34 @@ import (
5
5
"crypto/tls"
6
6
"crypto/x509"
7
7
"fmt"
8
+ "io"
8
9
"net/http"
9
10
"strings"
10
11
"testing"
11
12
"time"
12
13
13
14
"github.com/stretchr/testify/assert"
14
15
"github.com/stretchr/testify/require"
16
+ "github.com/testcontainers/testcontainers-go"
17
+ "github.com/testcontainers/testcontainers-go/network"
15
18
"github.com/twmb/franz-go/pkg/kadm"
16
19
"github.com/twmb/franz-go/pkg/kerr"
17
20
"github.com/twmb/franz-go/pkg/kgo"
18
21
"github.com/twmb/franz-go/pkg/sasl/scram"
19
22
)
20
23
24
+ // TestContainersLogger is a logger that implements the testcontainers
25
+ // logger interface.
26
+ type TestContainersLogger struct {
27
+ // LogPrefix is a string that is prefixed for every individual log line.
28
+ LogPrefix string
29
+ }
30
+
31
+ // Accept prints the log to stdout
32
+ func (lc TestContainersLogger ) Accept (l testcontainers.Log ) {
33
+ fmt .Print (lc .LogPrefix + string (l .Content ))
34
+ }
35
+
21
36
func TestRedpanda (t * testing.T ) {
22
37
ctx := context .Background ()
23
38
@@ -278,6 +293,111 @@ func TestRedpandaWithTLS(t *testing.T) {
278
293
require .Error (t , results .FirstErr (), kerr .UnknownTopicOrPartition )
279
294
}
280
295
296
+ func TestRedpandaListener_Simple (t * testing.T ) {
297
+ ctx := context .Background ()
298
+
299
+ // 1. Create network
300
+ RPNetwork , err := network .New (ctx , network .WithCheckDuplicate ())
301
+ require .NoError (t , err )
302
+
303
+ // 2. Start Redpanda container
304
+ container , err := RunContainer (ctx ,
305
+ testcontainers .WithImage ("redpandadata/redpanda:v23.2.18" ),
306
+ network .WithNetwork ([]string {"redpanda-host" }, RPNetwork ),
307
+ WithListener ("redpanda:29092" ), WithAutoCreateTopics (),
308
+ )
309
+ require .NoError (t , err )
310
+
311
+ // 3. Start KCat container
312
+ kcat , err := testcontainers .GenericContainer (ctx , testcontainers.GenericContainerRequest {
313
+ ContainerRequest : testcontainers.ContainerRequest {
314
+ Image : "confluentinc/cp-kcat:7.4.1" ,
315
+ Networks : []string {
316
+ RPNetwork .Name ,
317
+ },
318
+ Entrypoint : []string {
319
+ "sh" ,
320
+ },
321
+ Cmd : []string {
322
+ "-c" ,
323
+ "tail -f /dev/null" ,
324
+ },
325
+ },
326
+ Started : true ,
327
+ })
328
+
329
+ require .NoError (t , err )
330
+
331
+ // 4. Copy message to kcat
332
+ err = kcat .CopyToContainer (ctx , []byte ("Message produced by kcat" ), "/tmp/msgs.txt" , 700 )
333
+ require .NoError (t , err )
334
+
335
+ // 5. Produce mesaage to Redpanda
336
+ _ , _ , err = kcat .Exec (ctx , []string {"kcat" , "-b" , "redpanda:29092" , "-t" , "msgs" , "-P" , "-l" , "/tmp/msgs.txt" })
337
+
338
+ require .NoError (t , err )
339
+
340
+ // 6. Consume message from Redpanda
341
+ _ , stdout , err := kcat .Exec (ctx , []string {"kcat" , "-b" , "redpanda:29092" , "-C" , "-t" , "msgs" , "-c" , "1" })
342
+ require .NoError (t , err )
343
+ out , err := io .ReadAll (stdout )
344
+ require .NoError (t , err )
345
+
346
+ require .Contains (t , string (out ), "Message produced by kcat" )
347
+
348
+ t .Cleanup (func () {
349
+ if err := kcat .Terminate (ctx ); err != nil {
350
+ t .Fatalf ("failed to terminate kcat container: %s" , err )
351
+ }
352
+ if err := container .Terminate (ctx ); err != nil {
353
+ t .Fatalf ("failed to terminate redpanda container: %s" , err )
354
+ }
355
+
356
+ if err := RPNetwork .Remove (ctx ); err != nil {
357
+ t .Fatalf ("failed to remove network: %s" , err )
358
+ }
359
+ })
360
+ }
361
+
362
+ func TestRedpandaListener_InvalidPort (t * testing.T ) {
363
+ ctx := context .Background ()
364
+
365
+ // 1. Create network
366
+ RPNetwork , err := network .New (ctx , network .WithCheckDuplicate ())
367
+ require .NoError (t , err )
368
+
369
+ // 2. Attemp Start Redpanda container
370
+ _ , err = RunContainer (ctx ,
371
+ testcontainers .WithImage ("redpandadata/redpanda:v23.2.18" ),
372
+ WithListener ("redpanda:99092" ),
373
+ network .WithNetwork ([]string {"redpanda-host" }, RPNetwork ),
374
+ )
375
+
376
+ require .Error (t , err )
377
+
378
+ require .Contains (t , err .Error (), "invalid port on listener redpanda:99092" )
379
+
380
+ t .Cleanup (func () {
381
+ if err := RPNetwork .Remove (ctx ); err != nil {
382
+ t .Fatalf ("failed to remove network: %s" , err )
383
+ }
384
+ })
385
+ }
386
+
387
+ func TestRedpandaListener_NoNetwork (t * testing.T ) {
388
+ ctx := context .Background ()
389
+
390
+ // 1. Attemp Start Redpanda container
391
+ _ , err := RunContainer (ctx ,
392
+ testcontainers .WithImage ("redpandadata/redpanda:v23.2.18" ),
393
+ WithListener ("redpanda:99092" ),
394
+ )
395
+
396
+ require .Error (t , err )
397
+
398
+ require .Contains (t , err .Error (), "container must be attached to at least one network" )
399
+ }
400
+
281
401
// localhostCert is a PEM-encoded TLS cert with SAN IPs
282
402
// generated from src/crypto/tls:
283
403
// go run generate_cert.go --rsa-bits 2048 --host 127.0.0.1,::1,localhost --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h
0 commit comments