Skip to content

Commit fef62f3

Browse files
svenefftingeroboquat
authored andcommitted
[supervisor] shutdown processes gracefully
1 parent ccaff6f commit fef62f3

File tree

10 files changed

+309
-221
lines changed

10 files changed

+309
-221
lines changed

components/common-go/process/process.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,14 @@
44

55
package process
66

7+
import (
8+
"context"
9+
"errors"
10+
"os"
11+
12+
"golang.org/x/sys/unix"
13+
)
14+
715
// IsNotChildProcess checks if an error returned by a command
816
// execution is an error related to no child processes running
917
// This can be seen, for instance, in short lived commands.
@@ -14,3 +22,36 @@ func IsNotChildProcess(err error) bool {
1422

1523
return (err.Error() == "wait: no child processes" || err.Error() == "waitid: no child processes")
1624
}
25+
26+
var ErrForceKilled = errors.New("Process didn't terminate, so we sent SIGKILL")
27+
28+
// TerminateSync sends a SIGTERM to the given process and returns when the process has terminated or when the context was cancelled.
29+
// When the context is cancelled this function sends a SIGKILL to the process and return immediately with ErrForceKilled.
30+
func TerminateSync(ctx context.Context, pid int) error {
31+
process, err := os.FindProcess(pid)
32+
if err != nil { // never happens on UNIX
33+
return err
34+
}
35+
err = process.Signal(unix.SIGTERM)
36+
if err != nil {
37+
if err == os.ErrProcessDone {
38+
return nil
39+
}
40+
return err
41+
}
42+
terminated := make(chan error, 1)
43+
go func() {
44+
_, err := process.Wait()
45+
terminated <- err
46+
}()
47+
select {
48+
case err := <-terminated:
49+
return err
50+
case <-ctx.Done():
51+
err = process.Kill()
52+
if err != nil {
53+
return err
54+
}
55+
return ErrForceKilled
56+
}
57+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2+
// Licensed under the GNU Affero General Public License (AGPL).
3+
// See License-AGPL.txt in the project root for license information.
4+
5+
package process
6+
7+
import (
8+
"context"
9+
"fmt"
10+
"io/ioutil"
11+
"os"
12+
"os/exec"
13+
"syscall"
14+
"testing"
15+
"time"
16+
17+
"github.com/stretchr/testify/require"
18+
"golang.org/x/sys/unix"
19+
)
20+
21+
func TestTerminateSync(t *testing.T) {
22+
cmd := exec.Command("/bin/sleep", "20")
23+
require.NoError(t, cmd.Start())
24+
require.NotNil(t, cmd.Process)
25+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
26+
defer cancel()
27+
err := TerminateSync(ctx, cmd.Process.Pid)
28+
require.NoError(t, err)
29+
require.Equal(t, os.ErrProcessDone, cmd.Process.Signal(unix.SIGHUP))
30+
}
31+
32+
func TestTerminateSync_ignoring_process(t *testing.T) {
33+
34+
tests := []struct {
35+
processTimeSeconds int
36+
gracePeriod time.Duration
37+
fileExists bool
38+
}{
39+
{
40+
processTimeSeconds: 1,
41+
gracePeriod: 7 * time.Second,
42+
fileExists: true,
43+
},
44+
{
45+
processTimeSeconds: 7,
46+
gracePeriod: time.Second,
47+
fileExists: false,
48+
},
49+
{
50+
processTimeSeconds: 0,
51+
gracePeriod: 5 * time.Second,
52+
fileExists: true,
53+
},
54+
}
55+
for _, test := range tests {
56+
dir, err := ioutil.TempDir("", "terminal_test_close")
57+
require.NoError(t, err)
58+
expectedFile := dir + "/done.txt"
59+
script := dir + "/script.sh"
60+
err = ioutil.WriteFile(script, []byte(fmt.Sprintf(`#!/bin/bash
61+
trap 'echo \"Be patient\"' SIGTERM SIGINT SIGHUP
62+
for ((n= %d ; n; n--))
63+
do
64+
sleep 1
65+
done
66+
echo touching
67+
touch %s
68+
echo touched
69+
`, test.processTimeSeconds, expectedFile)), 0644)
70+
require.NoError(t, err)
71+
72+
cmd := exec.Command("/bin/bash", script)
73+
74+
cmd.Stdout = os.Stdout
75+
cmd.Stderr = os.Stderr
76+
require.NoError(t, cmd.Start())
77+
require.NotNil(t, cmd.Process)
78+
time.Sleep(100 * time.Millisecond)
79+
require.NotEqual(t, os.ErrProcessDone, cmd.Process.Signal(syscall.Signal(0)))
80+
ctx, cancel := context.WithTimeout(context.Background(), test.gracePeriod)
81+
defer cancel()
82+
err = TerminateSync(ctx, cmd.Process.Pid)
83+
if test.fileExists {
84+
require.NoError(t, err)
85+
require.FileExists(t, expectedFile)
86+
} else {
87+
require.Equal(t, ErrForceKilled, err)
88+
require.NoFileExists(t, expectedFile)
89+
}
90+
}
91+
}

components/supervisor/cmd/init.go

Lines changed: 125 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -5,50 +5,38 @@
55
package cmd
66

77
import (
8+
"context"
9+
"fmt"
10+
"io"
811
"os"
912
"os/exec"
1013
"os/signal"
1114
"strings"
15+
"sync"
1216
"syscall"
17+
"time"
1318

1419
"github.com/gitpod-io/gitpod/common-go/log"
20+
"github.com/gitpod-io/gitpod/common-go/process"
21+
"github.com/gitpod-io/gitpod/supervisor/pkg/supervisor"
22+
"github.com/prometheus/procfs"
23+
reaper "github.com/ramr/go-reaper"
1524
"github.com/spf13/cobra"
16-
"golang.org/x/sys/unix"
1725
)
1826

1927
var initCmd = &cobra.Command{
2028
Use: "init",
2129
Short: "init the supervisor",
2230
Run: func(cmd *cobra.Command, args []string) {
2331
log.Init(ServiceName, Version, true, false)
24-
// Because we're reaping with PID -1, we'll catch the child process for
25-
// which we've missed the notification anyways.
32+
cfg, err := supervisor.GetConfig()
33+
if err != nil {
34+
log.WithError(err).Info("cannnot load config")
35+
}
2636
var (
27-
sigInput = make(chan os.Signal, 1)
28-
sigReaper = make(chan os.Signal, 1)
29-
sigSupervisor = make(chan os.Signal, 1)
37+
sigInput = make(chan os.Signal, 1)
3038
)
31-
signal.Notify(sigInput, syscall.SIGCHLD, os.Interrupt, syscall.SIGTERM)
32-
go func() {
33-
for s := range sigInput {
34-
switch s {
35-
default:
36-
sigSupervisor <- s
37-
// the reaper needs all signals so that it can turn into
38-
// a terminating reaper if need be.
39-
fallthrough
40-
case syscall.SIGCHLD:
41-
// we don't want to blob the SIGINT/SIGTERM behaviour because
42-
// the reaper is still busy.
43-
select {
44-
case sigReaper <- s:
45-
default:
46-
}
47-
}
48-
}
49-
}()
50-
51-
go reaper(sigReaper)
39+
signal.Notify(sigInput, os.Interrupt, syscall.SIGTERM)
5240

5341
supervisorPath, err := os.Executable()
5442
if err != nil {
@@ -76,66 +64,132 @@ var initCmd = &cobra.Command{
7664
return
7765
}
7866
}()
67+
// start the reaper to clean up zombie processes
68+
reaper.Reap()
7969

8070
select {
8171
case <-supervisorDone:
8272
// supervisor has ended - we're all done here
8373
return
84-
case s := <-sigSupervisor:
74+
case <-sigInput:
8575
// we received a terminating signal - pass on to supervisor and wait for it to finish
86-
_ = runCommand.Process.Signal(s)
87-
<-supervisorDone
76+
ctx, cancel := context.WithTimeout(context.Background(), cfg.GetTerminationGracePeriod())
77+
defer cancel()
78+
slog := newShutdownLogger()
79+
defer slog.Close()
80+
slog.write("Shutting down all processes")
81+
82+
terminationDone := make(chan struct{})
83+
go func() {
84+
defer close(terminationDone)
85+
slog.TerminateSync(ctx, runCommand.Process.Pid)
86+
terminateAllProcesses(ctx, slog)
87+
close(supervisorDone)
88+
}()
89+
// wait for either successful termination or the timeout
90+
select {
91+
case <-ctx.Done():
92+
// Time is up, but we give all the goroutines a bit more time to react to this.
93+
time.Sleep(time.Millisecond * 500)
94+
case <-terminationDone:
95+
}
96+
slog.write("Finished shutting down all processes.")
8897
}
8998
},
9099
}
91100

101+
// terminateAllProcesses terminates all processes but ours until there are none anymore or the context is cancelled
102+
// on context cancellation any still running processes receive a SIGKILL
103+
func terminateAllProcesses(ctx context.Context, slog shutdownLogger) {
104+
for {
105+
processes, err := procfs.AllProcs()
106+
if err != nil {
107+
log.WithError(err).Error("Cannot list processes")
108+
slog.write(fmt.Sprintf("Cannot list processes: %s", err))
109+
return
110+
}
111+
// only one process (must be us)
112+
if len(processes) == 1 {
113+
return
114+
}
115+
// terminate all processes but ourself
116+
var wg sync.WaitGroup
117+
for _, proc := range processes {
118+
if proc.PID == os.Getpid() {
119+
continue
120+
}
121+
p := proc
122+
wg.Add(1)
123+
go func() {
124+
defer wg.Done()
125+
slog.TerminateSync(ctx, p.PID)
126+
}()
127+
}
128+
wg.Wait()
129+
}
130+
}
131+
92132
func init() {
93133
rootCmd.AddCommand(initCmd)
94134
}
95135

96-
func reaper(sigs <-chan os.Signal) {
97-
// The reaper can be turned into a terminating reaper by writing true to this channel.
98-
// When in terminating mode, the reaper will send SIGTERM to each child that gets reparented
99-
// to us and is still running. We use this mechanism to send SIGTERM to a shell child processes
100-
// that get reparented once their parent shell terminates during shutdown.
101-
var terminating bool
136+
type shutdownLogger interface {
137+
write(s string)
138+
TerminateSync(ctx context.Context, pid int)
139+
io.Closer
140+
}
102141

103-
for s := range sigs {
104-
if s != syscall.SIGCHLD {
105-
terminating = true
106-
continue
107-
}
142+
func newShutdownLogger() shutdownLogger {
143+
file := "/workspace/.gitpod/supervisor-termination.log"
144+
f, err := os.Create(file)
145+
if err != nil {
146+
log.WithError(err).WithField("file", file).Error("Couldn't create shutdown log file")
147+
}
148+
result := shutdownLoggerImpl{
149+
file: f,
150+
startTime: time.Now(),
151+
}
152+
return &result
153+
}
108154

109-
for {
110-
// wait on the process, hence remove it from the process table
111-
pid, err := unix.Wait4(-1, nil, 0, nil)
112-
// if we've been interrupted, try again until we're done
113-
for err == syscall.EINTR {
114-
pid, err = unix.Wait4(-1, nil, 0, nil)
115-
}
116-
// The calling process does not have any unwaited-for children. Let's wait for a SIGCHLD notification.
117-
if err == unix.ECHILD {
118-
break
119-
}
120-
if err != nil {
121-
log.WithField("pid", pid).WithError(err).Debug("cannot call waitpid() for re-parented child")
122-
}
123-
if !terminating {
124-
continue
125-
}
126-
proc, err := os.FindProcess(pid)
127-
if err != nil {
128-
log.WithField("pid", pid).WithError(err).Debug("cannot find re-parented process")
129-
continue
130-
}
131-
err = proc.Signal(syscall.SIGTERM)
132-
if err != nil {
133-
if !strings.Contains(err.Error(), "os: process already finished") {
134-
log.WithField("pid", pid).WithError(err).Debug("cannot send SIGTERM to re-parented process")
135-
}
136-
continue
137-
}
138-
log.WithField("pid", pid).Debug("SIGTERM'ed reparented child process")
155+
type shutdownLoggerImpl struct {
156+
file *os.File
157+
startTime time.Time
158+
}
159+
160+
func (l *shutdownLoggerImpl) write(s string) {
161+
if l.file != nil {
162+
_, err := l.file.WriteString(fmt.Sprintf("[%s] %s \n", time.Since(l.startTime), s))
163+
if err != nil {
164+
log.WithError(err).Error("couldn't write to log file")
165+
}
166+
} else {
167+
log.Debug(s)
168+
}
169+
}
170+
func (l *shutdownLoggerImpl) Close() error {
171+
return l.file.Close()
172+
}
173+
func (l *shutdownLoggerImpl) TerminateSync(ctx context.Context, pid int) {
174+
proc, err := procfs.NewProc(pid)
175+
if err != nil {
176+
l.write(fmt.Sprintf("Couldn't obtain process information for PID %d.", pid))
177+
return
178+
}
179+
stat, err := proc.Stat()
180+
if err != nil {
181+
l.write(fmt.Sprintf("Couldn't obtain process information for PID %d.", pid))
182+
} else if stat.State == "Z" {
183+
return
184+
} else {
185+
l.write(fmt.Sprintf("Terminating process %s with PID %d (state: %s, cmdlind: %s).", stat.Comm, pid, stat.State, fmt.Sprint(proc.CmdLine())))
186+
}
187+
err = process.TerminateSync(ctx, pid)
188+
if err != nil {
189+
if err == process.ErrForceKilled {
190+
l.write("Terminating process didn't finish, but had to be force killed")
191+
} else {
192+
l.write(fmt.Sprintf("Terminating main process errored: %s", err))
139193
}
140194
}
141195
}

components/supervisor/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ require (
8282
github.com/opentracing/opentracing-go v1.2.0 // indirect
8383
github.com/pkg/errors v0.9.1 // indirect
8484
github.com/pmezard/go-difflib v1.0.0 // indirect
85+
github.com/ramr/go-reaper v0.2.1 // indirect
8586
github.com/rs/cors v1.8.2 // indirect
8687
github.com/rs/xid v1.2.1 // indirect
8788
github.com/segmentio/backo-go v0.0.0-20200129164019-23eae7c10bd3 // indirect

0 commit comments

Comments
 (0)