Skip to content

feat: add AsReconciler wrapper to rate limit and replace controller-runtime's requeue: true behavior #160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions reconciler/reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package reconciler

import (
"context"

"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// Result is a wrapper around reconcile.Result that adds RequeueWithBackoff functionality.
type Result struct {
reconcile.Result
RequeueWithBackoff bool
}

// Reconciler defines the interface for standard reconcilers
type Reconciler interface {
Reconcile(ctx context.Context) (Result, error)
}

// ReconcilerFunc is a function type that implements the Reconciler interface.
type ReconcilerFunc func(ctx context.Context) (Result, error)

// Reconcile implements the Reconciler interface.
func (f ReconcilerFunc) Reconcile(ctx context.Context) (Result, error) {
return f(ctx)
}

// AsReconciler creates a reconciler from a standard reconciler
func AsReconciler(reconciler Reconciler) reconcile.Reconciler {
return AsReconcilerWithRateLimiter(
reconciler,
workqueue.DefaultTypedControllerRateLimiter[reconcile.Request](),
)
}

// AsReconcilerWithRateLimiter creates a reconciler with a specific key extractor
func AsReconcilerWithRateLimiter(
reconciler Reconciler,
rateLimiter workqueue.TypedRateLimiter[reconcile.Request],
) reconcile.Reconciler {
return reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
result, err := reconciler.Reconcile(ctx)
if err != nil {
return reconcile.Result{}, err
}
if result.RequeueWithBackoff {
return reconcile.Result{RequeueAfter: rateLimiter.When(req)}, nil
}
if result.RequeueAfter > 0 {
return reconcile.Result{RequeueAfter: result.RequeueAfter}, nil
}
return result.Result, nil
})
}
168 changes: 168 additions & 0 deletions reconciler/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package reconciler_test

import (
"context"
"errors"
"testing"
"time"

"github.com/awslabs/operatorpkg/reconciler"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func Test(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Reconciler")
}

// MockRateLimiter is a mock implementation of workqueue.TypedRateLimiter for testing
type MockRateLimiter[K comparable] struct {
whenFunc func(K) time.Duration
numRequeues int
backoffDuration time.Duration
}

func (m *MockRateLimiter[K]) When(key K) time.Duration {
if m.whenFunc != nil {
return m.whenFunc(key)
}
m.numRequeues++
return m.backoffDuration
}

func (m *MockRateLimiter[K]) NumRequeues(key K) int {
return m.numRequeues
}

func (m *MockRateLimiter[K]) Forget(key K) {
m.numRequeues = 0
}

// MockReconciler is a mock implementation of Reconciler for testing
type MockReconciler struct {
reconcileFunc func(context.Context) (reconciler.Result, error)
result reconciler.Result
err error
}

func (m *MockReconciler) Reconcile(ctx context.Context) (reconciler.Result, error) {
if m.reconcileFunc != nil {
return m.reconcileFunc(ctx)
}
return m.result, m.err
}

var _ = Describe("Reconciler", func() {
It("should return the original result without backoff", func() {
backoff := 5 * time.Second
// Create a mock reconciler
mockReconciler := &MockReconciler{
result: reconciler.Result{
Result: reconcile.Result{
RequeueAfter: backoff,
},
RequeueWithBackoff: false,
},
}

// Create the reconciler adapter
adapter := reconciler.AsReconciler(mockReconciler)

// Call the adapter
ctx := context.Background()
req := reconcile.Request{}
result, err := adapter.Reconcile(ctx, req)

// Verify the result
Expect(err).NotTo(HaveOccurred())
Expect(result.RequeueAfter).To(Equal(backoff))
})
It("should return the original result without backoff when RequeueWithBackoff is not set", func() {
mockReconciler := &MockReconciler{
result: reconciler.Result{},
}

// Create the reconciler adapter
adapter := reconciler.AsReconciler(mockReconciler)

// Call the adapter
ctx := context.Background()
req := reconcile.Request{}
result, err := adapter.Reconcile(ctx, req)

// Verify the result
Expect(err).NotTo(HaveOccurred())
Expect(result.RequeueAfter).To(BeZero())
})
It("should return a result with RequeueAfter set", func() {
// Create a mock reconciler that returns RequeueWithBackoff = true
mockReconciler := &MockReconciler{
result: reconciler.Result{
Result: reconcile.Result{},
RequeueWithBackoff: true,
},
}

// Create the reconciler adapter
adapter := reconciler.AsReconciler(mockReconciler)

// Call the adapter
ctx := context.Background()
req := reconcile.Request{}
result, err := adapter.Reconcile(ctx, req)

// Verify the result - should have some backoff duration
Expect(err).NotTo(HaveOccurred())
Expect(result.RequeueAfter).To(BeNumerically(">", 0))
})
It("should return the error without processing backoff", func() {
// Create a mock reconciler that returns an error
expectedErr := errors.New("test error")
mockReconciler := &MockReconciler{
result: reconciler.Result{RequeueWithBackoff: true},
err: expectedErr,
}

// Create the reconciler adapter
adapter := reconciler.AsReconciler(mockReconciler)

// Call the adapter
ctx := context.Background()
req := reconcile.Request{}
result, err := adapter.Reconcile(ctx, req)

// Verify that the error is propagated
Expect(err).To(HaveOccurred())
Expect(err).To(Equal(expectedErr))
Expect(result.RequeueAfter).To(BeZero())
})
It("should use custom rate limiter for backoff", func() {
backoffDuration := 10 * time.Second
mockRateLimiter := &MockRateLimiter[reconcile.Request]{
backoffDuration: backoffDuration,
}

// Create a mock reconciler that returns RequeueWithBackoff = true
mockReconciler := &MockReconciler{
result: reconciler.Result{
Result: reconcile.Result{},
RequeueWithBackoff: true,
},
}

// Create the reconciler adapter with custom rate limiter
adapter := reconciler.AsReconcilerWithRateLimiter(mockReconciler, mockRateLimiter)

// Call the adapter
ctx := context.Background()
req := reconcile.Request{}
result, err := adapter.Reconcile(ctx, req)

// Verify the result
Expect(err).NotTo(HaveOccurred())
Expect(result.RequeueAfter).To(Equal(backoffDuration))
Expect(mockRateLimiter.NumRequeues(req)).To(Equal(1))
})
})
35 changes: 31 additions & 4 deletions singleton/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/awslabs/operatorpkg/reconciler"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -17,16 +18,42 @@ const (
RequeueImmediately = 1 * time.Nanosecond
)

// Reconciler defines the interface for singleton reconcilers
type Reconciler interface {
Reconcile(ctx context.Context) (reconcile.Result, error)
Name() string
Reconcile(ctx context.Context) (reconciler.Result, error)
}

func AsReconciler(reconciler Reconciler) reconcile.Reconciler {
return reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) {
return reconciler.Reconcile(ctx)
// In response to Requeue: True being deprecated via: https://github.com/kubernetes-sigs/controller-runtime/pull/3107/files
// This uses a bucket and per item delay but the item will be the same because the key is the controller name.
// This implements the same behavior as Requeue: True.

// AsReconciler creates a controller-runtime reconciler from a singleton reconciler
func AsReconciler(rec Reconciler) reconcile.Reconciler {
return AsReconcilerWithRateLimiter(rec, workqueue.DefaultTypedControllerRateLimiter[string]())
}

// AsReconcilerWithRateLimiter creates a controller-runtime reconciler with a custom rate limiter
func AsReconcilerWithRateLimiter(
rec Reconciler,
rateLimiter workqueue.TypedRateLimiter[string],
) reconcile.Reconciler {
return reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
result, err := rec.Reconcile(ctx)
if err != nil {
return reconcile.Result{}, err
}
if result.RequeueWithBackoff {
return reconcile.Result{RequeueAfter: rateLimiter.When(rec.Name())}, nil
}
if result.RequeueAfter > 0 {
return reconcile.Result{RequeueAfter: result.RequeueAfter}, nil
}
return result.Result, nil
})
}

// Source creates a source for singleton controllers
func Source() source.Source {
eventSource := make(chan event.GenericEvent, 1)
eventSource <- event.GenericEvent{}
Expand Down
Loading