Skip to content

🌱 Followups to default low priority in mappers #3160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions pkg/handler/enqueue_mapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package handler

import (
"context"
"reflect"

"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -65,15 +64,17 @@ func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler {
// TypedEnqueueRequestsFromMapFunc is experimental and subject to future change.
func TypedEnqueueRequestsFromMapFunc[object any, request comparable](fn TypedMapFunc[object, request]) TypedEventHandler[object, request] {
return &enqueueRequestsFromMapFunc[object, request]{
toRequests: fn,
toRequests: fn,
objectImplementsClientObject: implementsClientObject[object](),
}
}

var _ EventHandler = &enqueueRequestsFromMapFunc[client.Object, reconcile.Request]{}

type enqueueRequestsFromMapFunc[object any, request comparable] struct {
// Mapper transforms the argument into a slice of keys to be reconciled
toRequests TypedMapFunc[object, request]
toRequests TypedMapFunc[object, request]
objectImplementsClientObject bool
}

// Create implements EventHandler.
Expand All @@ -85,7 +86,7 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Create(
reqs := map[request]empty{}

var lowPriority bool
if reflect.TypeFor[object]().Implements(reflect.TypeFor[client.Object]()) && isPriorityQueue(q) && !isNil(evt.Object) {
if e.objectImplementsClientObject && isPriorityQueue(q) && !isNil(evt.Object) {
clientObjectEvent := event.CreateEvent{Object: any(evt.Object).(client.Object)}
if isObjectUnchanged(clientObjectEvent) {
lowPriority = true
Expand All @@ -101,7 +102,7 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Update(
q workqueue.TypedRateLimitingInterface[request],
) {
var lowPriority bool
if reflect.TypeFor[object]().Implements(reflect.TypeFor[client.Object]()) && isPriorityQueue(q) && !isNil(evt.ObjectOld) && !isNil(evt.ObjectNew) {
if e.objectImplementsClientObject && isPriorityQueue(q) && !isNil(evt.ObjectOld) && !isNil(evt.ObjectNew) {
lowPriority = any(evt.ObjectOld).(client.Object).GetResourceVersion() == any(evt.ObjectNew).(client.Object).GetResourceVersion()
}
reqs := map[request]empty{}
Expand Down Expand Up @@ -134,12 +135,12 @@ func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(
q workqueue.TypedRateLimitingInterface[request],
o object,
reqs map[request]empty,
unchanged bool,
lowPriority bool,
) {
for _, req := range e.toRequests(ctx, o) {
_, ok := reqs[req]
if !ok {
if unchanged {
if lowPriority {
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(priorityqueue.AddOpts{
Priority: LowPriority,
}, req)
Expand Down
10 changes: 8 additions & 2 deletions pkg/handler/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ type TypedFuncs[object any, request comparable] struct {
GenericFunc func(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request])
}

var typeForClientObject = reflect.TypeFor[client.Object]()

func implementsClientObject[object any]() bool {
return reflect.TypeFor[object]().Implements(typeForClientObject)
}

func isPriorityQueue[request comparable](q workqueue.TypedRateLimitingInterface[request]) bool {
_, ok := q.(priorityqueue.PriorityQueue[request])
return ok
Expand All @@ -117,7 +123,7 @@ func isPriorityQueue[request comparable](q workqueue.TypedRateLimitingInterface[
// Create implements EventHandler.
func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing or caching this directly in the Funcs is problematic:

  • It doesn't have a constructor, so can't do it there (We have to remember to never ever export types and only constructors for this kind of scenario)
  • It has value receivers and we need some kind of locking if we want to cache this, but using a lock with value receivers means we copy the lock which the linter forbids

if h.CreateFunc != nil {
if !reflect.TypeFor[object]().Implements(reflect.TypeFor[client.Object]()) || !isPriorityQueue(q) || isNil(e.Object) {
if !implementsClientObject[object]() || !isPriorityQueue(q) || isNil(e.Object) {
h.CreateFunc(ctx, e, q)
return
}
Expand Down Expand Up @@ -156,7 +162,7 @@ func (h TypedFuncs[object, request]) Delete(ctx context.Context, e event.TypedDe
// Update implements EventHandler.
func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
if h.UpdateFunc != nil {
if !reflect.TypeFor[object]().Implements(reflect.TypeFor[client.Object]()) || !isPriorityQueue(q) || isNil(e.ObjectOld) || isNil(e.ObjectNew) {
if !implementsClientObject[object]() || !isPriorityQueue(q) || isNil(e.ObjectOld) || isNil(e.ObjectNew) {
h.UpdateFunc(ctx, e, q)
return
}
Expand Down
244 changes: 0 additions & 244 deletions pkg/handler/eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,250 +984,6 @@ var _ = Describe("Eventhandler", func() {
})
}
})

Describe("WithLowPriorityWhenUnchanged", func() {
It("should lower the priority of a create request for an object that was created more than one minute in the past", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
actualOpts = o
actualRequests = items
},
}

h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
h.Create(ctx, event.CreateEvent{
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should lower the priority of a create request for an object that was created more than one minute in the past without the WithLowPriorityWrapper", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
actualOpts = o
actualRequests = items
},
}

h := &handler.EnqueueRequestForObject{}
h.Create(ctx, event.CreateEvent{
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should not lower the priority of a create request for an object that was created less than one minute in the past", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
actualOpts = o
actualRequests = items
},
}

h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
h.Create(ctx, event.CreateEvent{
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
CreationTimestamp: metav1.Now(),
}},
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should not lower the priority of a create request for an object that was created less than one minute in the past without the WithLowPriority wrapperr", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
actualOpts = o
actualRequests = items
},
}

h := &handler.EnqueueRequestForObject{}
h.Create(ctx, event.CreateEvent{
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
CreationTimestamp: metav1.Now(),
}},
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should lower the priority of an update request with unchanged RV", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
actualOpts = o
actualRequests = items
},
}

h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
h.Update(ctx, event.UpdateEvent{
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should lower the priority of an update request with unchanged RV without the WithLowPriority wrapper", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
actualOpts = o
actualRequests = items
},
}

h := &handler.EnqueueRequestForObject{}
h.Update(ctx, event.UpdateEvent{
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should not lower the priority of an update request with changed RV", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
actualOpts = o
actualRequests = items
},
}

h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
h.Update(ctx, event.UpdateEvent{
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
ResourceVersion: "1",
}},
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should not lower the priority of an update request with changed RV without the WithLowPriority wrapper", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
actualOpts = o
actualRequests = items
},
}

h := &handler.EnqueueRequestForObject{}
h.Update(ctx, event.UpdateEvent{
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
ResourceVersion: "1",
}},
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should have no effect on create if the workqueue is not a priorityqueue", func() {
h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
h.Create(ctx, event.CreateEvent{
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
}, q)

Expect(q.Len()).To(Equal(1))
item, _ := q.Get()
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
})

It("should have no effect on create if the workqueue is not a priorityqueue without the WithLowPriority wrapper", func() {
h := &handler.EnqueueRequestForObject{}
h.Create(ctx, event.CreateEvent{
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
}, q)

Expect(q.Len()).To(Equal(1))
item, _ := q.Get()
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
})

It("should have no effect on Update if the workqueue is not a priorityqueue", func() {
h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
h.Update(ctx, event.UpdateEvent{
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
}, q)

Expect(q.Len()).To(Equal(1))
item, _ := q.Get()
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
})

It("should have no effect on Update if the workqueue is not a priorityqueue without the WithLowPriority wrapper", func() {
h := &handler.EnqueueRequestForObject{}
h.Update(ctx, event.UpdateEvent{
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
}, q)

Expect(q.Len()).To(Equal(1))
item, _ := q.Get()
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
})
})
})

type fakePriorityQueue struct {
Expand Down
Loading