Skip to content

Commit 032a69f

Browse files
stevenhmdelapenya
andauthored
fix!: data races (#2843)
* ci: enable test race checks Enable race checks for all tests in CI. * fix!: data various data races Fix data race when determining default network, this required making DockerProviderOptions.DefaultNetwork field private which is a breaking change. Fix data race in test bufLogger. Fix data races on log production context cancellation and context timeout not being cancelled in read loop. BREAKING_CHANGE! --------- Co-authored-by: Manuel de la Peña <[email protected]>
1 parent 11eb809 commit 032a69f

File tree

6 files changed

+115
-91
lines changed

6 files changed

+115
-91
lines changed

commons-test.mk

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ test-%: $(GOBIN)/gotestsum
4747
-- \
4848
-v \
4949
-coverprofile=coverage.out \
50-
-timeout=30m
50+
-timeout=30m \
51+
-race
5152

5253
.PHONY: tools
5354
tools:

docker.go

Lines changed: 97 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"path/filepath"
1717
"regexp"
1818
"strings"
19+
"sync"
1920
"time"
2021

2122
"github.com/cenkalti/backoff/v4"
@@ -762,11 +763,15 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
762763
// Setup the log production context which will be used to stop the log production.
763764
c.logProductionCtx, c.logProductionCancel = context.WithCancelCause(ctx)
764765

765-
go func() {
766-
err := c.logProducer(stdout, stderr)
767-
// Set context cancel cause, if not already set.
768-
c.logProductionCancel(err)
769-
}()
766+
// We capture context cancel function to avoid data race with multiple
767+
// calls to startLogProduction.
768+
go func(cancel context.CancelCauseFunc) {
769+
// Ensure the context is cancelled when log productions completes
770+
// so that GetLogProductionErrorChannel functions correctly.
771+
defer cancel(nil)
772+
773+
c.logProducer(stdout, stderr)
774+
}(c.logProductionCancel)
770775

771776
return nil
772777
}
@@ -775,40 +780,49 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
775780
// - logProductionCtx is done
776781
// - A fatal error occurs
777782
// - No more logs are available
778-
func (c *DockerContainer) logProducer(stdout, stderr io.Writer) error {
783+
func (c *DockerContainer) logProducer(stdout, stderr io.Writer) {
779784
// Clean up idle client connections.
780785
defer c.provider.Close()
781786

782787
// Setup the log options, start from the beginning.
783-
options := container.LogsOptions{
788+
options := &container.LogsOptions{
784789
ShowStdout: true,
785790
ShowStderr: true,
786791
Follow: true,
787792
}
788793

789-
for {
790-
timeoutCtx, cancel := context.WithTimeout(c.logProductionCtx, *c.logProductionTimeout)
791-
defer cancel()
794+
// Use a separate method so that timeout cancel function is
795+
// called correctly.
796+
for c.copyLogsTimeout(stdout, stderr, options) {
797+
}
798+
}
792799

793-
err := c.copyLogs(timeoutCtx, stdout, stderr, options)
794-
switch {
795-
case err == nil:
796-
// No more logs available.
797-
return nil
798-
case c.logProductionCtx.Err() != nil:
799-
// Log production was stopped or caller context is done.
800-
return nil
801-
case timeoutCtx.Err() != nil, errors.Is(err, net.ErrClosed):
802-
// Timeout or client connection closed, retry.
803-
default:
804-
// Unexpected error, retry.
805-
Logger.Printf("Unexpected error reading logs: %v", err)
806-
}
800+
// copyLogsTimeout copies logs from the container to stdout and stderr with a timeout.
801+
// It returns true if the log production should be retried, false otherwise.
802+
func (c *DockerContainer) copyLogsTimeout(stdout, stderr io.Writer, options *container.LogsOptions) bool {
803+
timeoutCtx, cancel := context.WithTimeout(c.logProductionCtx, *c.logProductionTimeout)
804+
defer cancel()
807805

808-
// Retry from the last log received.
809-
now := time.Now()
810-
options.Since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond()))
806+
err := c.copyLogs(timeoutCtx, stdout, stderr, *options)
807+
switch {
808+
case err == nil:
809+
// No more logs available.
810+
return false
811+
case c.logProductionCtx.Err() != nil:
812+
// Log production was stopped or caller context is done.
813+
return false
814+
case timeoutCtx.Err() != nil, errors.Is(err, net.ErrClosed):
815+
// Timeout or client connection closed, retry.
816+
default:
817+
// Unexpected error, retry.
818+
Logger.Printf("Unexpected error reading logs: %v", err)
811819
}
820+
821+
// Retry from the last log received.
822+
now := time.Now()
823+
options.Since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond()))
824+
825+
return true
812826
}
813827

814828
// copyLogs copies logs from the container to stdout and stderr.
@@ -866,10 +880,12 @@ func (c *DockerContainer) GetLogProductionErrorChannel() <-chan error {
866880
}
867881

868882
errCh := make(chan error, 1)
869-
go func() {
870-
<-c.logProductionCtx.Done()
871-
errCh <- context.Cause(c.logProductionCtx)
872-
}()
883+
go func(ctx context.Context) {
884+
<-ctx.Done()
885+
errCh <- context.Cause(ctx)
886+
close(errCh)
887+
}(c.logProductionCtx)
888+
873889
return errCh
874890
}
875891

@@ -906,6 +922,7 @@ type DockerProvider struct {
906922
host string
907923
hostCache string
908924
config config.Config
925+
mtx sync.Mutex
909926
}
910927

911928
// Client gets the docker client used by the provider
@@ -984,29 +1001,26 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
9841001
// defer the close of the Docker client connection the soonest
9851002
defer p.Close()
9861003

987-
// Make sure that bridge network exists
988-
// In case it is disabled we will create reaper_default network
989-
if p.DefaultNetwork == "" {
990-
p.DefaultNetwork, err = p.getDefaultNetwork(ctx, p.client)
991-
if err != nil {
992-
return nil, err
993-
}
1004+
var defaultNetwork string
1005+
defaultNetwork, err = p.ensureDefaultNetwork(ctx)
1006+
if err != nil {
1007+
return nil, fmt.Errorf("ensure default network: %w", err)
9941008
}
9951009

9961010
// If default network is not bridge make sure it is attached to the request
9971011
// as container won't be attached to it automatically
9981012
// in case of Podman the bridge network is called 'podman' as 'bridge' would conflict
999-
if p.DefaultNetwork != p.defaultBridgeNetworkName {
1013+
if defaultNetwork != p.defaultBridgeNetworkName {
10001014
isAttached := false
10011015
for _, net := range req.Networks {
1002-
if net == p.DefaultNetwork {
1016+
if net == defaultNetwork {
10031017
isAttached = true
10041018
break
10051019
}
10061020
}
10071021

10081022
if !isAttached {
1009-
req.Networks = append(req.Networks, p.DefaultNetwork)
1023+
req.Networks = append(req.Networks, defaultNetwork)
10101024
}
10111025
}
10121026

@@ -1461,12 +1475,8 @@ func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest)
14611475
// defer the close of the Docker client connection the soonest
14621476
defer p.Close()
14631477

1464-
// Make sure that bridge network exists
1465-
// In case it is disabled we will create reaper_default network
1466-
if p.DefaultNetwork == "" {
1467-
if p.DefaultNetwork, err = p.getDefaultNetwork(ctx, p.client); err != nil {
1468-
return nil, err
1469-
}
1478+
if _, err = p.ensureDefaultNetwork(ctx); err != nil {
1479+
return nil, fmt.Errorf("ensure default network: %w", err)
14701480
}
14711481

14721482
if req.Labels == nil {
@@ -1537,14 +1547,12 @@ func (p *DockerProvider) GetNetwork(ctx context.Context, req NetworkRequest) (ne
15371547

15381548
func (p *DockerProvider) GetGatewayIP(ctx context.Context) (string, error) {
15391549
// Use a default network as defined in the DockerProvider
1540-
if p.DefaultNetwork == "" {
1541-
var err error
1542-
p.DefaultNetwork, err = p.getDefaultNetwork(ctx, p.client)
1543-
if err != nil {
1544-
return "", err
1545-
}
1550+
defaultNetwork, err := p.ensureDefaultNetwork(ctx)
1551+
if err != nil {
1552+
return "", fmt.Errorf("ensure default network: %w", err)
15461553
}
1547-
nw, err := p.GetNetwork(ctx, NetworkRequest{Name: p.DefaultNetwork})
1554+
1555+
nw, err := p.GetNetwork(ctx, NetworkRequest{Name: defaultNetwork})
15481556
if err != nil {
15491557
return "", err
15501558
}
@@ -1563,43 +1571,50 @@ func (p *DockerProvider) GetGatewayIP(ctx context.Context) (string, error) {
15631571
return ip, nil
15641572
}
15651573

1566-
func (p *DockerProvider) getDefaultNetwork(ctx context.Context, cli client.APIClient) (string, error) {
1567-
// Get list of available networks
1568-
networkResources, err := cli.NetworkList(ctx, network.ListOptions{})
1569-
if err != nil {
1570-
return "", err
1571-
}
1574+
// ensureDefaultNetwork ensures that defaultNetwork is set and creates
1575+
// it if it does not exist, returning its value.
1576+
// It is safe to call this method concurrently.
1577+
func (p *DockerProvider) ensureDefaultNetwork(ctx context.Context) (string, error) {
1578+
p.mtx.Lock()
1579+
defer p.mtx.Unlock()
15721580

1573-
reaperNetwork := ReaperDefault
1581+
if p.defaultNetwork != "" {
1582+
// Already set.
1583+
return p.defaultNetwork, nil
1584+
}
15741585

1575-
reaperNetworkExists := false
1586+
networkResources, err := p.client.NetworkList(ctx, network.ListOptions{})
1587+
if err != nil {
1588+
return "", fmt.Errorf("network list: %w", err)
1589+
}
15761590

15771591
for _, net := range networkResources {
1578-
if net.Name == p.defaultBridgeNetworkName {
1579-
return p.defaultBridgeNetworkName, nil
1580-
}
1581-
1582-
if net.Name == reaperNetwork {
1583-
reaperNetworkExists = true
1592+
switch net.Name {
1593+
case p.defaultBridgeNetworkName:
1594+
p.defaultNetwork = p.defaultBridgeNetworkName
1595+
return p.defaultNetwork, nil
1596+
case ReaperDefault:
1597+
p.defaultNetwork = ReaperDefault
1598+
return p.defaultNetwork, nil
15841599
}
15851600
}
15861601

1587-
// Create a bridge network for the container communications
1588-
if !reaperNetworkExists {
1589-
_, err = cli.NetworkCreate(ctx, reaperNetwork, network.CreateOptions{
1590-
Driver: Bridge,
1591-
Attachable: true,
1592-
Labels: GenericLabels(),
1593-
})
1594-
// If the network already exists, we can ignore the error as that can
1595-
// happen if we are running multiple tests in parallel and we only
1596-
// need to ensure that the network exists.
1597-
if err != nil && !errdefs.IsConflict(err) {
1598-
return "", err
1599-
}
1602+
// Create a bridge network for the container communications.
1603+
_, err = p.client.NetworkCreate(ctx, ReaperDefault, network.CreateOptions{
1604+
Driver: Bridge,
1605+
Attachable: true,
1606+
Labels: GenericLabels(),
1607+
})
1608+
// If the network already exists, we can ignore the error as that can
1609+
// happen if we are running multiple tests in parallel and we only
1610+
// need to ensure that the network exists.
1611+
if err != nil && !errdefs.IsConflict(err) {
1612+
return "", fmt.Errorf("network create: %w", err)
16001613
}
16011614

1602-
return reaperNetwork, nil
1615+
p.defaultNetwork = ReaperDefault
1616+
1617+
return p.defaultNetwork, nil
16031618
}
16041619

16051620
// containerFromDockerResponse builds a Docker container struct from the response of the Docker API

docker_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1790,11 +1790,14 @@ func TestGetGatewayIP(t *testing.T) {
17901790
require.NoError(t, err)
17911791
defer provider.Close()
17921792

1793-
ip, err := provider.(*DockerProvider).GetGatewayIP(context.Background())
1794-
require.NoError(t, err)
1795-
if ip == "" {
1796-
t.Fatal("could not get gateway ip")
1793+
dockerProvider, ok := provider.(*DockerProvider)
1794+
if !ok {
1795+
t.Skip("provider is not a DockerProvider")
17971796
}
1797+
1798+
ip, err := dockerProvider.GetGatewayIP(context.Background())
1799+
require.NoError(t, err)
1800+
require.NotEmpty(t, ip)
17981801
}
17991802

18001803
func TestNetworkModeWithContainerReference(t *testing.T) {

network.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ type DefaultNetwork string
2323

2424
// Deprecated: will be removed in the future.
2525
func (n DefaultNetwork) ApplyGenericTo(opts *GenericProviderOptions) {
26-
opts.DefaultNetwork = string(n)
26+
opts.defaultNetwork = string(n)
2727
}
2828

2929
// Deprecated: will be removed in the future.
3030
func (n DefaultNetwork) ApplyDockerTo(opts *DockerProviderOptions) {
31-
opts.DefaultNetwork = string(n)
31+
opts.defaultNetwork = string(n)
3232
}
3333

3434
// Deprecated: will be removed in the future

provider.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type (
2525
// GenericProviderOptions defines options applicable to all providers
2626
GenericProviderOptions struct {
2727
Logger Logging
28-
DefaultNetwork string
28+
defaultNetwork string
2929
}
3030

3131
// GenericProviderOption defines a common interface to modify GenericProviderOptions

reaper.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,12 @@ func (r *reaperSpawner) newReaper(ctx context.Context, sessionID string, provide
402402

403403
// Attach reaper container to a requested network if it is specified
404404
if p, ok := provider.(*DockerProvider); ok {
405-
req.Networks = append(req.Networks, p.DefaultNetwork)
405+
defaultNetwork, err := p.ensureDefaultNetwork(ctx)
406+
if err != nil {
407+
return nil, fmt.Errorf("ensure default network: %w", err)
408+
}
409+
410+
req.Networks = append(req.Networks, defaultNetwork)
406411
}
407412

408413
c, err := provider.RunContainer(ctx, req)

0 commit comments

Comments
 (0)