Skip to content

Commit 7c48a78

Browse files
committed
Stop ingesting initial state
1 parent 94f1cfd commit 7c48a78

File tree

2 files changed

+37
-46
lines changed

2 files changed

+37
-46
lines changed

receiver/k8sobjectsreceiver/receiver.go

Lines changed: 28 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -147,25 +147,11 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects
147147
kr.stopperChanList = append(kr.stopperChanList, stopperChan)
148148
kr.mu.Unlock()
149149

150-
resourceVersion := config.ResourceVersion
151-
var err error
152-
if resourceVersion == "" {
153-
// Proper use of the Kubernetes API Watch capability when no resourceVersion is supplied is to do a list first
154-
// to get the initial state and a useable resourceVersion.
155-
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes for details.
156-
resourceVersion, err = kr.doInitialList(ctx, config, resource)
157-
if err != nil {
158-
kr.setting.Logger.Error("could not perform initial list for watch", zap.String("resource", config.gvr.String()), zap.Error(err))
159-
return
160-
}
161-
// If we still don't have a resourceVersion we can try 1 as a last ditch effort.
162-
// This also helps our unit tests since the fake client can't handle returning resource versions
163-
// as part of a list of objects.
164-
if resourceVersion == "" {
165-
resourceVersion = defaultResourceVersion
166-
}
150+
resourceVersion, err := getResourceVersion(ctx, config, resource)
151+
if err != nil {
152+
kr.setting.Logger.Error("could not retrieve an initial resourceVersion", zap.String("resource", config.gvr.String()), zap.Error(err))
153+
return
167154
}
168-
169155
watchFunc := func(options metav1.ListOptions) (apiWatch.Interface, error) {
170156
options.FieldSelector = config.FieldSelector
171157
options.LabelSelector = config.LabelSelector
@@ -202,28 +188,33 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects
202188

203189
}
204190

205-
func (kr *k8sobjectsreceiver) doInitialList(ctx context.Context, config *K8sObjectsConfig, resource dynamic.ResourceInterface) (string, error) {
206-
objects, err := resource.List(ctx, metav1.ListOptions{
207-
FieldSelector: config.FieldSelector,
208-
LabelSelector: config.LabelSelector,
209-
})
210-
if err != nil {
211-
return "", err
212-
}
191+
func getResourceVersion(ctx context.Context, config *K8sObjectsConfig, resource dynamic.ResourceInterface) (string, error) {
192+
resourceVersion := config.ResourceVersion
193+
if resourceVersion == "" || resourceVersion == "0" {
194+
// Proper use of the Kubernetes API Watch capability when no resourceVersion is supplied is to do a list first
195+
// to get the initial state and a useable resourceVersion.
196+
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes for details.
197+
objects, err := resource.List(ctx, metav1.ListOptions{
198+
FieldSelector: config.FieldSelector,
199+
LabelSelector: config.LabelSelector,
200+
})
201+
if err != nil {
202+
return "", fmt.Errorf("could not perform initial list for watch on %v, %w", config.gvr.String(), err)
203+
}
204+
if objects == nil {
205+
return "", fmt.Errorf("nil objects returned, this is an error in the k8sobjectsreceiver")
206+
}
213207

214-
if objects == nil {
215-
return "", fmt.Errorf("nil objects returned, this is an error in the k8sobjectsreceiver")
216-
}
208+
resourceVersion = objects.GetResourceVersion()
217209

218-
if len(objects.Items) > 0 {
219-
logs := pullObjectsToLogData(objects, time.Now(), config)
220-
obsCtx := kr.obsrecv.StartLogsOp(ctx)
221-
err = kr.consumer.ConsumeLogs(obsCtx, logs)
222-
kr.obsrecv.EndLogsOp(obsCtx, metadata.Type, logs.LogRecordCount(), err)
210+
// If we still don't have a resourceVersion we can try 1 as a last ditch effort.
211+
// This also helps our unit tests since the fake client can't handle returning resource versions
212+
// as part of a list of objects.
213+
if resourceVersion == "" || resourceVersion == "0" {
214+
resourceVersion = defaultResourceVersion
215+
}
223216
}
224-
225-
return objects.GetResourceVersion(), nil
226-
217+
return resourceVersion, nil
227218
}
228219

229220
// Start ticking immediately.

receiver/k8sobjectsreceiver/receiver_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,6 @@ func TestWatchObject(t *testing.T) {
8888
generatePod("pod1", "default", map[string]interface{}{
8989
"environment": "production",
9090
}, "1"),
91-
generatePod("pod2", "default", map[string]interface{}{
92-
"environment": "test",
93-
}, "2"),
94-
generatePod("pod3", "default_ignore", map[string]interface{}{
95-
"environment": "production",
96-
}, "3"),
9791
)
9892

9993
rCfg := createDefaultConfig().(*Config)
@@ -124,17 +118,23 @@ func TestWatchObject(t *testing.T) {
124118
require.NoError(t, r.Start(ctx, componenttest.NewNopHost()))
125119

126120
time.Sleep(time.Millisecond * 100)
127-
assert.Len(t, consumer.Logs(), 1)
128-
assert.Equal(t, 2, consumer.Count())
121+
assert.Len(t, consumer.Logs(), 0)
122+
assert.Equal(t, 0, consumer.Count())
129123

130124
mockClient.createPods(
125+
generatePod("pod2", "default", map[string]interface{}{
126+
"environment": "test",
127+
}, "2"),
128+
generatePod("pod3", "default_ignore", map[string]interface{}{
129+
"environment": "production",
130+
}, "3"),
131131
generatePod("pod4", "default", map[string]interface{}{
132132
"environment": "production",
133133
}, "4"),
134134
)
135135
time.Sleep(time.Millisecond * 100)
136136
assert.Len(t, consumer.Logs(), 2)
137-
assert.Equal(t, 3, consumer.Count())
137+
assert.Equal(t, 2, consumer.Count())
138138

139139
assert.NoError(t, r.Shutdown(ctx))
140140
}

0 commit comments

Comments
 (0)