Skip to content

balancergroup: Make closing terminal #8095

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
Logger: lb.logger,
SubBalancerCloseTimeout: time.Duration(0), // Disable caching of removed child policies
})
lb.bg.Start()
go lb.run()
return lb
}
Expand Down
1 change: 0 additions & 1 deletion balancer/weightedtarget/weightedtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
Logger: b.logger,
SubBalancerCloseTimeout: time.Duration(0), // Disable caching of removed child policies
})
b.bg.Start()
b.logger.Infof("Created")
return b
}
Expand Down
121 changes: 58 additions & 63 deletions internal/balancergroup/balancergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@
// The corresponding boolean outgoingStarted is used to stop further updates
// to sub-balancers after they are closed.
outgoingMu sync.Mutex
outgoingStarted bool
outgoingClosed bool
idToBalancerConfig map[string]*subBalancerWrapper
// Cache for sub-balancers when they are removed. This is `nil` if caching
// is disabled by passing `0` for Options.SubBalancerCloseTimeout`.
Expand Down Expand Up @@ -224,7 +224,7 @@
// The corresponding boolean incomingStarted is used to stop further updates
// from sub-balancers after they are closed.
incomingMu sync.Mutex
incomingStarted bool // This boolean only guards calls back to ClientConn.
incomingClosed bool // This boolean only guards calls back to ClientConn.
scToSubBalancer map[balancer.SubConn]*subBalancerWrapper
}

Expand Down Expand Up @@ -265,30 +265,6 @@
}
}

// Start starts the balancer group, including building all the sub-balancers,
// and send the existing addresses to them.
//
// A BalancerGroup can be closed and started later. When a BalancerGroup is
// closed, it can still receive address updates, which will be applied when
// restarted.
func (bg *BalancerGroup) Start() {
bg.incomingMu.Lock()
bg.incomingStarted = true
bg.incomingMu.Unlock()

bg.outgoingMu.Lock()
if bg.outgoingStarted {
bg.outgoingMu.Unlock()
return
}

for _, config := range bg.idToBalancerConfig {
config.startBalancer()
}
bg.outgoingStarted = true
bg.outgoingMu.Unlock()
}

// AddWithClientConn adds a balancer with the given id to the group. The
// balancer is built with a balancer builder registered with balancerName. The
// given ClientConn is passed to the newly built balancer instead of the
Expand All @@ -299,17 +275,18 @@
bg.logger.Infof("Adding child policy of type %q for child %q", balancerName, id)
builder := balancer.Get(balancerName)
if builder == nil {
return fmt.Errorf("unregistered balancer name %q", balancerName)
return fmt.Errorf("balancergroup: unregistered balancer name %q", balancerName)

Check warning on line 278 in internal/balancergroup/balancergroup.go

View check run for this annotation

Codecov / codecov/patch

internal/balancergroup/balancergroup.go#L278

Added line #L278 was not covered by tests
}

// Store data in static map, and then check to see if bg is started.
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if bg.outgoingClosed {
return fmt.Errorf("balancergroup: already closed")
}

Check warning on line 286 in internal/balancergroup/balancergroup.go

View check run for this annotation

Codecov / codecov/patch

internal/balancergroup/balancergroup.go#L285-L286

Added lines #L285 - L286 were not covered by tests
var sbc *subBalancerWrapper
// If outgoingStarted is true, search in the cache. Otherwise, cache is
// guaranteed to be empty, searching is unnecessary. Also, skip the cache if
// caching is disabled.
if bg.outgoingStarted && bg.deletedBalancerCache != nil {
// Skip searching the cache if disabled.
if bg.deletedBalancerCache != nil {
if old, ok := bg.deletedBalancerCache.Remove(id); ok {
if bg.logger.V(2) {
bg.logger.Infof("Removing and reusing child policy of type %q for child %q from the balancer cache", balancerName, id)
Expand Down Expand Up @@ -341,11 +318,7 @@
builder: builder,
buildOpts: bg.buildOpts,
}
if bg.outgoingStarted {
// Only start the balancer if bg is started. Otherwise, we only keep the
// static data.
sbc.startBalancer()
}
sbc.startBalancer()
} else {
// When brining back a sub-balancer from cache, re-send the cached
// picker and state.
Expand All @@ -369,6 +342,10 @@
bg.logger.Infof("Removing child policy for child %q", id)

bg.outgoingMu.Lock()
if bg.outgoingClosed {
bg.outgoingMu.Unlock()
return
}

sbToRemove, ok := bg.idToBalancerConfig[id]
if !ok {
Expand All @@ -379,12 +356,6 @@

// Unconditionally remove the sub-balancer config from the map.
delete(bg.idToBalancerConfig, id)
if !bg.outgoingStarted {
// Nothing needs to be done here, since we wouldn't have created the
// sub-balancer.
bg.outgoingMu.Unlock()
return
}

if bg.deletedBalancerCache != nil {
if bg.logger.V(2) {
Expand Down Expand Up @@ -424,6 +395,7 @@
// cleanup after the timeout.
func (bg *BalancerGroup) cleanupSubConns(config *subBalancerWrapper) {
bg.incomingMu.Lock()
defer bg.incomingMu.Unlock()
// Remove SubConns. This is only done after the balancer is
// actually closed.
//
Expand All @@ -437,18 +409,20 @@
delete(bg.scToSubBalancer, sc)
}
}
bg.incomingMu.Unlock()
}

// connect attempts to connect to all subConns belonging to sb.
func (bg *BalancerGroup) connect(sb *subBalancerWrapper) {
bg.incomingMu.Lock()
defer bg.incomingMu.Unlock()
if bg.incomingClosed {
return
}

Check warning on line 420 in internal/balancergroup/balancergroup.go

View check run for this annotation

Codecov / codecov/patch

internal/balancergroup/balancergroup.go#L417-L420

Added lines #L417 - L420 were not covered by tests
for sc, b := range bg.scToSubBalancer {
if b == sb {
sc.Connect()
}
}
bg.incomingMu.Unlock()
}

// Following are actions from the parent grpc.ClientConn, forward to sub-balancers.
Expand All @@ -457,6 +431,10 @@
// needed.
func (bg *BalancerGroup) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState, cb func(balancer.SubConnState)) {
bg.incomingMu.Lock()
if bg.incomingClosed {
bg.incomingMu.Unlock()
return
}
if _, ok := bg.scToSubBalancer[sc]; !ok {
bg.incomingMu.Unlock()
return
Expand All @@ -468,10 +446,13 @@
bg.incomingMu.Unlock()

bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if bg.outgoingClosed {
return
}

Check warning on line 452 in internal/balancergroup/balancergroup.go

View check run for this annotation

Codecov / codecov/patch

internal/balancergroup/balancergroup.go#L451-L452

Added lines #L451 - L452 were not covered by tests
if cb != nil {
cb(state)
}
bg.outgoingMu.Unlock()
}

// UpdateSubConnState handles the state for the subconn. It finds the
Expand All @@ -485,6 +466,9 @@
func (bg *BalancerGroup) UpdateClientConnState(id string, s balancer.ClientConnState) error {
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if bg.outgoingClosed {
return nil
}

Check warning on line 471 in internal/balancergroup/balancergroup.go

View check run for this annotation

Codecov / codecov/patch

internal/balancergroup/balancergroup.go#L470-L471

Added lines #L470 - L471 were not covered by tests
if config, ok := bg.idToBalancerConfig[id]; ok {
return config.updateClientConnState(s)
}
Expand All @@ -494,10 +478,13 @@
// ResolverError forwards resolver errors to all sub-balancers.
func (bg *BalancerGroup) ResolverError(err error) {
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if bg.outgoingClosed {
return
}

Check warning on line 484 in internal/balancergroup/balancergroup.go

View check run for this annotation

Codecov / codecov/patch

internal/balancergroup/balancergroup.go#L483-L484

Added lines #L483 - L484 were not covered by tests
for _, config := range bg.idToBalancerConfig {
config.resolverError(err)
}
bg.outgoingMu.Unlock()
}

// Following are actions from sub-balancers, forward to ClientConn.
Expand All @@ -514,9 +501,9 @@
// error. But since we call balancer.stopBalancer when removing the balancer, this
// shouldn't happen.
bg.incomingMu.Lock()
if !bg.incomingStarted {
if bg.incomingClosed {
bg.incomingMu.Unlock()
return nil, fmt.Errorf("NewSubConn is called after balancer group is closed")
return nil, fmt.Errorf("balancergroup: NewSubConn is called after balancer group is closed")

Check warning on line 506 in internal/balancergroup/balancergroup.go

View check run for this annotation

Codecov / codecov/patch

internal/balancergroup/balancergroup.go#L506

Added line #L506 was not covered by tests
}
var sc balancer.SubConn
oldListener := opts.StateListener
Expand Down Expand Up @@ -547,31 +534,33 @@
}

// Close closes the balancer. It stops sub-balancers, and removes the subconns.
// The BalancerGroup can be restarted later.
// When a BalancerGroup is closed, it can not receive further address updates.
func (bg *BalancerGroup) Close() {
bg.incomingMu.Lock()
if bg.incomingStarted {
bg.incomingStarted = false
// Also remove all SubConns.
for sc := range bg.scToSubBalancer {
sc.Shutdown()
delete(bg.scToSubBalancer, sc)
}
bg.incomingClosed = true
// Also remove all SubConns.
for sc := range bg.scToSubBalancer {
sc.Shutdown()
delete(bg.scToSubBalancer, sc)
}
bg.incomingMu.Unlock()

bg.outgoingMu.Lock()
// Setting `outgoingClosed` ensures that no entries are added to
// `deletedBalancerCache` after this point.
bg.outgoingClosed = true
bg.outgoingMu.Unlock()

// Clear(true) runs clear function to close sub-balancers in cache. It
// must be called out of outgoing mutex.
if bg.deletedBalancerCache != nil {
bg.deletedBalancerCache.Clear(true)
}

bg.outgoingMu.Lock()
if bg.outgoingStarted {
bg.outgoingStarted = false
for _, config := range bg.idToBalancerConfig {
config.stopBalancer()
}
for id, config := range bg.idToBalancerConfig {
config.stopBalancer()
delete(bg.idToBalancerConfig, id)
}
bg.outgoingMu.Unlock()
}
Expand All @@ -581,24 +570,30 @@
// not supported.
func (bg *BalancerGroup) ExitIdle() {
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if bg.outgoingClosed {
return
}

Check warning on line 576 in internal/balancergroup/balancergroup.go

View check run for this annotation

Codecov / codecov/patch

internal/balancergroup/balancergroup.go#L575-L576

Added lines #L575 - L576 were not covered by tests
for _, config := range bg.idToBalancerConfig {
if !config.exitIdle() {
bg.connect(config)
}
}
bg.outgoingMu.Unlock()
}

// ExitIdleOne instructs the sub-balancer `id` to exit IDLE state, if
// appropriate and possible.
func (bg *BalancerGroup) ExitIdleOne(id string) {
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if bg.outgoingClosed {
return
}

Check warning on line 591 in internal/balancergroup/balancergroup.go

View check run for this annotation

Codecov / codecov/patch

internal/balancergroup/balancergroup.go#L590-L591

Added lines #L590 - L591 were not covered by tests
if config := bg.idToBalancerConfig[id]; config != nil {
if !config.exitIdle() {
bg.connect(config)
}
}
bg.outgoingMu.Unlock()
}

// ParseConfig parses a child config list and returns a LB config for the
Expand Down
Loading