Skip to content

Commit 462e269

Browse files
csweichelroboquat
authored andcommitted
[ws-daemon] Integrate new CPU limiter
1 parent f35229a commit 462e269

File tree

15 files changed

+385
-1064
lines changed

15 files changed

+385
-1064
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2+
// Licensed under the GNU Affero General Public License (AGPL).
3+
// See License-AGPL.txt in the project root for license information.
4+
5+
package cpulimit
6+
7+
import (
8+
"bufio"
9+
"math"
10+
"os"
11+
"path/filepath"
12+
"strconv"
13+
"strings"
14+
"time"
15+
16+
"golang.org/x/xerrors"
17+
)
18+
19+
// CgroupCFSController controls a cgroup's CFS settings
20+
type CgroupCFSController string
21+
22+
// Usage returns the cpuacct.usage value of the cgroup
23+
func (basePath CgroupCFSController) Usage() (usage CPUTime, err error) {
24+
25+
cpuTimeInNS, err := basePath.readUint64("cpuacct.usage")
26+
if err != nil {
27+
return 0, xerrors.Errorf("cannot read cpuacct.usage: %w", err)
28+
}
29+
30+
return CPUTime(time.Duration(cpuTimeInNS) * time.Nanosecond), nil
31+
}
32+
33+
// SetQuota sets a new CFS quota on the cgroup
34+
func (basePath CgroupCFSController) SetLimit(limit Bandwidth) (changed bool, err error) {
35+
p, err := basePath.readUint64("cpu.cfs_period_us")
36+
if err != nil {
37+
err = xerrors.Errorf("cannot parse CFS period: %w", err)
38+
return
39+
}
40+
period := time.Duration(p) * time.Microsecond
41+
42+
q, err := basePath.readUint64("cpu.cfs_quota_us")
43+
if err != nil {
44+
err = xerrors.Errorf("cannot parse CFS quota: %w", err)
45+
return
46+
}
47+
quota := time.Duration(q) * time.Microsecond
48+
target := limit.Quota(period)
49+
if quota == target {
50+
return false, nil
51+
}
52+
53+
err = os.WriteFile(filepath.Join(string(basePath), "cpu.cfs_quota_us"), []byte(strconv.FormatInt(target.Microseconds(), 10)), 0644)
54+
if err != nil {
55+
return false, xerrors.Errorf("cannot set CFS quota: %w", err)
56+
}
57+
return true, nil
58+
}
59+
60+
func (basePath CgroupCFSController) readUint64(path string) (uint64, error) {
61+
fn := filepath.Join(string(basePath), path)
62+
fc, err := os.ReadFile(fn)
63+
if err != nil {
64+
return 0, err
65+
}
66+
67+
s := strings.TrimSpace(string(fc))
68+
if s == "max" {
69+
return math.MaxUint64, nil
70+
}
71+
72+
p, err := strconv.ParseInt(s, 10, 64)
73+
if err != nil {
74+
return 0, err
75+
}
76+
return uint64(p), nil
77+
}
78+
79+
// NrThrottled returns the number of CFS periods the cgroup was throttled in
80+
func (basePath CgroupCFSController) NrThrottled() (uint64, error) {
81+
f, err := os.Open(filepath.Join(string(basePath), "cpu.stat"))
82+
if err != nil {
83+
return 0, xerrors.Errorf("cannot read cpu.stat: %w", err)
84+
}
85+
defer f.Close()
86+
87+
const prefixNrThrottled = "nr_throttled "
88+
89+
scanner := bufio.NewScanner(f)
90+
for scanner.Scan() {
91+
l := scanner.Text()
92+
if !strings.HasPrefix(l, prefixNrThrottled) {
93+
continue
94+
}
95+
96+
r, err := strconv.ParseInt(strings.TrimSpace(strings.TrimPrefix(l, prefixNrThrottled)), 10, 64)
97+
if err != nil {
98+
return 0, xerrors.Errorf("cannot parse cpu.stat: %s: %w", l, err)
99+
}
100+
return uint64(r), nil
101+
}
102+
return 0, xerrors.Errorf("cpu.stat did not contain nr_throttled")
103+
}

components/ws-daemon/pkg/cpulimit/cpulimit.go

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,6 @@
44

55
package cpulimit
66

7-
// TODO(cw):
8-
// - introduce metrics that report
9-
// - total throttled period count (per QoS)
10-
// - total bandwidth granted (per QoS)
11-
// - something that corresponds to the user experience
12-
137
import (
148
"context"
159
"sort"
@@ -37,7 +31,7 @@ type WorkspaceHistory struct {
3731

3832
func (h *WorkspaceHistory) Usage() CPUTime {
3933
if h == nil || h.LastUpdate == nil {
40-
panic("usage called before update")
34+
return 0
4135
}
4236
return h.LastUpdate.Usage - h.UsageT0
4337
}
@@ -60,26 +54,26 @@ func (h *WorkspaceHistory) Throttled() bool {
6054
}
6155

6256
type DistributorSource func(context.Context) ([]Workspace, error)
63-
type DistributorSink func(workspaceID string, limit Bandwidth)
57+
type DistributorSink func(id string, limit Bandwidth, burst bool)
6458

65-
func NewDistributor(source DistributorSource, sink DistributorSink, limiter ResourceLimiter, breakoutLimiter ResourceLimiter, totalBandwidth Bandwidth) *Distributor {
59+
func NewDistributor(source DistributorSource, sink DistributorSink, limiter ResourceLimiter, burstLimiter ResourceLimiter, totalBandwidth Bandwidth) *Distributor {
6660
return &Distributor{
67-
Source: source,
68-
Sink: sink,
69-
Limiter: limiter,
70-
BreakoutLimiter: breakoutLimiter,
71-
TotalBandwidth: totalBandwidth,
72-
History: make(map[string]*WorkspaceHistory),
61+
Source: source,
62+
Sink: sink,
63+
Limiter: limiter,
64+
BurstLimiter: burstLimiter,
65+
TotalBandwidth: totalBandwidth,
66+
History: make(map[string]*WorkspaceHistory),
7367
}
7468
}
7569

7670
type Distributor struct {
7771
Source DistributorSource
7872
Sink DistributorSink
7973

80-
History map[string]*WorkspaceHistory
81-
Limiter ResourceLimiter
82-
BreakoutLimiter ResourceLimiter
74+
History map[string]*WorkspaceHistory
75+
Limiter ResourceLimiter
76+
BurstLimiter ResourceLimiter
8377

8478
// TotalBandwidth is the total CPU time available in nanoseconds per second
8579
TotalBandwidth Bandwidth
@@ -90,7 +84,7 @@ type Distributor struct {
9084
}
9185

9286
type DistributorDebug struct {
93-
BandwidthAvail, BandwidthUsed, BandwidthBreakout Bandwidth
87+
BandwidthAvail, BandwidthUsed, BandwidthBurst Bandwidth
9488
}
9589

9690
// Run starts a ticker which repeatedly calls Tick until the context is canceled.
@@ -175,7 +169,7 @@ func (d *Distributor) Tick(dt time.Duration) (DistributorDebug, error) {
175169
d.LastTickUsage = totalUsage
176170

177171
// enforce limits
178-
var breakoutBandwidth Bandwidth
172+
var burstBandwidth Bandwidth
179173
for _, id := range wsOrder {
180174
ws := d.History[id]
181175
limit := d.Limiter.Limit(ws.Usage())
@@ -184,24 +178,25 @@ func (d *Distributor) Tick(dt time.Duration) (DistributorDebug, error) {
184178
// and there's still some bandwidth left to give, let's act as if had
185179
// never spent any CPU time and assume the workspace will spend their
186180
// entire bandwidth at once.
181+
var burst bool
187182
if totalBandwidth < d.TotalBandwidth && ws.Throttled() {
188-
limit = d.BreakoutLimiter.Limit(ws.Usage())
183+
limit = d.BurstLimiter.Limit(ws.Usage())
189184

190185
// We assume the workspace is going to use as much as their limit allows.
191186
// This might not be true, because their process which consumed so much CPU
192187
// may have ended by now.
193188
totalBandwidth += limit
194189

195-
breakoutBandwidth += limit
190+
burstBandwidth += limit
196191
}
197192

198-
d.Sink(id, limit)
193+
d.Sink(id, limit, burst)
199194
}
200195

201196
return DistributorDebug{
202-
BandwidthAvail: d.TotalBandwidth,
203-
BandwidthUsed: totalBandwidth,
204-
BandwidthBreakout: breakoutBandwidth,
197+
BandwidthAvail: d.TotalBandwidth,
198+
BandwidthUsed: totalBandwidth,
199+
BandwidthBurst: burstBandwidth,
205200
}, nil
206201
}
207202

components/ws-daemon/pkg/cpulimit/cpulimit_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func (n *Node) Source(context.Context) ([]cpulimit.Workspace, error) {
221221
}
222222

223223
// Sink acts as sink for a distributor
224-
func (n *Node) Sink(id string, limit cpulimit.Bandwidth) {
224+
func (n *Node) Sink(id string, limit cpulimit.Bandwidth, burst bool) {
225225
n.State[id].Limit = limit
226226
}
227227

@@ -238,7 +238,7 @@ func (n *Node) Dump(out io.Writer, t time.Duration, dbg cpulimit.DistributorDebu
238238
if actualRate > limit {
239239
actualRate = limit
240240
}
241-
fmt.Fprintf(out, "%d,%s,%d,%d,%d,%d,%d,%d,%d,%d,%d\n", t, c.ID(), c.Rate(t), state.Throttled, time.Duration(state.Usage).Milliseconds(), state.Limit, actualRate, totalCapacity, n.bandwidthUsed, n.bandwidthReq, dbg.BandwidthBreakout)
241+
fmt.Fprintf(out, "%d,%s,%d,%d,%d,%d,%d,%d,%d,%d,%d\n", t, c.ID(), c.Rate(t), state.Throttled, time.Duration(state.Usage).Milliseconds(), state.Limit, actualRate, totalCapacity, n.bandwidthUsed, n.bandwidthReq, dbg.BandwidthBurst)
242242
}
243243
}
244244

0 commit comments

Comments
 (0)