Skip to content

[receiver/discovery] Send delete entity events for discovered services #6260

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ require (
go.opentelemetry.io/collector/extension v1.32.0
go.opentelemetry.io/collector/extension/zpagesextension v0.126.0
go.opentelemetry.io/collector/otelcol v0.126.0
go.opentelemetry.io/collector/pdata v1.32.0
go.opentelemetry.io/collector/pdata v1.32.1-0.20250523042642-a867641d12bd
go.opentelemetry.io/collector/pipeline v0.126.0
go.opentelemetry.io/collector/processor v1.32.0
go.opentelemetry.io/collector/processor/batchprocessor v0.126.0
Expand Down Expand Up @@ -653,7 +653,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.126.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.126.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.126.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.126.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.126.1-0.20250526171903-169686dbc75d
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/signalfx v0.126.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.126.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/winperfcounters v0.126.0 // indirect
Expand Down Expand Up @@ -723,7 +723,7 @@ require (
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
gonum.org/v1/gonum v0.16.0 // indirect
google.golang.org/api v0.232.0 // indirect
google.golang.org/grpc v1.72.0 // indirect
google.golang.org/grpc v1.72.1 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/fsnotify.v1 v1.4.7 // indirect
gopkg.in/go-playground/validator.v9 v9.31.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1439,8 +1439,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.126.0/go.mod h1:dfRudXeUgf148puPQxJ91IXABpfqsl9rarrwqFuH2Ro=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/core/xidutils v0.126.0 h1:A0ngaz213hmNCCX6AoKs3tqM0WAAsURHdIv5h0UXLeM=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/core/xidutils v0.126.0/go.mod h1:NHImkI6CD3ijcfLe4h6XP1MXQG0PSWH9TpLenZf6Wys=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.126.0 h1:e5aLwj2GldsWGMuk9e2Is+TN/EJHPh25RWmUzRWA7Ps=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.126.0/go.mod h1:o2lvKV9aHwoYUGRsND18Xriker2hi6A6zxbyr6UkEFA=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.126.1-0.20250526171903-169686dbc75d h1:tqbhR/PYfU4foM/zfEvH9Wr2V+5Mm+iPIS5QlXk7rcA=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.126.1-0.20250526171903-169686dbc75d/go.mod h1:3rl7mtPncU+cQEXs0lnYh5rmf1b0f4D2B12oIX1h0U4=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.126.0 h1:AnOgi0AF5kALP4hEILsQEnRzT/yNXfua598210Dn9ko=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.126.0/go.mod h1:jjyo4lLRH9WOUJ9djpEql6xqVAaReNDY7ciWRt23FZk=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic v0.126.0 h1:VObAgxn0ONZEAXcANe+dq9uHR7KDMqBxnQegEJu/dpM=
Expand Down Expand Up @@ -2112,8 +2112,8 @@ go.opentelemetry.io/collector/otelcol v0.126.0 h1:TQaIMKzDgWPR79QmuZYeC1LaiUG624
go.opentelemetry.io/collector/otelcol v0.126.0/go.mod h1:Ve1nB64BniDT1v6IT85zF9P/qmD3bDWfL0Mry+m6cFA=
go.opentelemetry.io/collector/otelcol/otelcoltest v0.126.0 h1:PJ0c2wT9y7e6T+Ag4S/f3Di2DYic07lrvj+z9Ep200I=
go.opentelemetry.io/collector/otelcol/otelcoltest v0.126.0/go.mod h1:tbJaLJbeXRNEGH+5+w1YXLLrg4Xt640Qzsw5JLL3VZ8=
go.opentelemetry.io/collector/pdata v1.32.0 h1:hBzlJV1rujr1UdD2CBy2gmaIKtC15ysg/z+x8F3McQA=
go.opentelemetry.io/collector/pdata v1.32.0/go.mod h1:m41io9nWpy7aCm/uD1L9QcKiZwOP0ldj83JEA34dmlk=
go.opentelemetry.io/collector/pdata v1.32.1-0.20250523042642-a867641d12bd h1:H136IWSouulaIdZFOzgXo/KCcQD1QlXhRCPoXYE7pdk=
go.opentelemetry.io/collector/pdata v1.32.1-0.20250523042642-a867641d12bd/go.mod h1:TDvbHuvIK+g6xqu1gxtz8ti4pB2x1WcBpjFob5KfleU=
go.opentelemetry.io/collector/pdata/pprofile v0.126.0 h1:ArYQxg5KdTb98r1X6KSZY7W6/4DPv/q6z7jSbSZ1mBc=
go.opentelemetry.io/collector/pdata/pprofile v0.126.0/go.mod h1:2fBTFDcXjVfseBQKnt/DTM0EYTmFoPKtRpjg8ql38Ek=
go.opentelemetry.io/collector/pdata/testdata v0.126.0 h1:CMJEYwg12tMI60GOiBIKyrZQp839bD0eJ4rmD4ttlUs=
Expand Down Expand Up @@ -2654,8 +2654,8 @@ google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=
google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
google.golang.org/grpc v1.72.1 h1:HR03wO6eyZ7lknl75XlxABNVLLFc2PAb6mHlYh756mA=
google.golang.org/grpc v1.72.1/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
65 changes: 26 additions & 39 deletions internal/receiver/discoveryreceiver/endpoint_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (

// identifyingAttrKeys are the keys of attributes that are used to identify an entity.
var identifyingAttrKeys = []string{
serviceTypeAttr,
semconv.AttributeServiceName,
semconv.AttributeK8SPodUID,
semconv.AttributeContainerID,
semconv.AttributeK8SNodeUID,
Expand Down Expand Up @@ -135,7 +137,7 @@ func (et *endpointTracker) stop() {

func (et *endpointTracker) emitEntityStateEvents(observerCID component.ID, endpoints []observer.Endpoint) {
if et.pLogs != nil {
entityEvents, numFailed, err := entityStateEvents(observerCID, endpoints, et.correlations, time.Now())
entityEvents, numFailed, err := entityEvents(observerCID, endpoints, et.correlations, time.Now(), experimentalmetricmetadata.EventTypeState)
if err != nil {
et.logger.Warn(fmt.Sprintf("failed converting %v endpoints to entity state events", numFailed), zap.Error(err))
}
Expand All @@ -145,9 +147,10 @@ func (et *endpointTracker) emitEntityStateEvents(observerCID component.ID, endpo
}
}

func (et *endpointTracker) emitEntityDeleteEvents(endpoints []observer.Endpoint) {
func (et *endpointTracker) emitEntityDeleteEvents(observerCID component.ID, endpoints []observer.Endpoint) {
if et.pLogs != nil {
entityEvents, numFailed, err := entityDeleteEvents(endpoints, time.Now())
entityEvents, numFailed, err := entityEvents(observerCID, endpoints, et.correlations, time.Now(),
experimentalmetricmetadata.EventTypeDelete)
if err != nil {
et.logger.Warn(fmt.Sprintf("failed converting %v endpoints to entity delete events", numFailed), zap.Error(err))
}
Expand Down Expand Up @@ -226,18 +229,18 @@ func (n *notify) OnRemove(removed []observer.Endpoint) {
n.endpointTracker.correlations.MarkStale(endpoint.ID)
}
}
n.endpointTracker.emitEntityDeleteEvents(matchingEndpoints)
n.endpointTracker.emitEntityDeleteEvents(n.observerID, matchingEndpoints)
}

func (n *notify) OnChange(changed []observer.Endpoint) {
n.endpointTracker.updateEndpoints(changed, n.observerID)
}

// entityStateEvents converts observer endpoints to entity state events excluding those
// entityEvents converts observer endpoints to entity state events excluding those
// that don't have a discovery status attribute yet.
func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, correlations *correlationStore,
ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) {
entityEvents := experimentalmetricmetadata.NewEntityEventsSlice()
func entityEvents(observerID component.ID, endpoints []observer.Endpoint, correlations *correlationStore,
ts time.Time, eventType experimentalmetricmetadata.EventType) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) {
events := experimentalmetricmetadata.NewEntityEventsSlice()
for _, endpoint := range endpoints {
if endpoint.Details == nil {
failed++
Expand All @@ -251,17 +254,11 @@ func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, c
continue
}

entityEvent := entityEvents.AppendEmpty()
entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(ts))
entityState := entityEvent.SetEntityState()
entityState.SetEntityType(entityType)
attrs := entityState.Attributes()
if envAttrs, e := endpointEnvToAttrs(endpoint.Details.Type(), endpoint.Details.Env()); e != nil {
attrs, e := endpointEnvToAttrs(endpoint.Details.Type(), endpoint.Details.Env())
if e != nil {
err = multierr.Combine(err, fmt.Errorf("failed determining attributes for %q: %w", endpoint.ID, e))
failed++
} else {
// this must be the first mutation of attrs since it's destructive
envAttrs.CopyTo(attrs)
continue
}
attrs.PutStr("type", string(endpoint.Details.Type()))
attrs.PutStr(discovery.EndpointIDAttr, string(endpoint.ID))
Expand All @@ -273,31 +270,21 @@ func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, c
}
attrs.PutStr(serviceTypeAttr, deduceServiceType(attrs))
attrs.PutStr(semconv.AttributeServiceName, deduceServiceName(attrs))
extractIdentifyingAttrs(attrs, entityEvent.ID())
}
return entityEvents, failed, err
}

func entityDeleteEvents(endpoints []observer.Endpoint, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) {
entityEvents := experimentalmetricmetadata.NewEntityEventsSlice()
for _, endpoint := range endpoints {
if endpoint.Details == nil {
failed++
err = multierr.Combine(err, fmt.Errorf("endpoint %q has no details", endpoint.ID))
continue
}

entityEvent := entityEvents.AppendEmpty()
entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(ts))
entityEvent.SetEntityDelete()
if envAttrs, e := endpointEnvToAttrs(endpoint.Details.Type(), endpoint.Details.Env()); e != nil {
err = multierr.Combine(err, fmt.Errorf("failed determining attributes for %q: %w", endpoint.ID, e))
failed++
} else {
extractIdentifyingAttrs(envAttrs, entityEvent.ID())
event := events.AppendEmpty()
event.SetTimestamp(pcommon.NewTimestampFromTime(ts))
extractIdentifyingAttrs(attrs, event.ID())
switch eventType {
case experimentalmetricmetadata.EventTypeState:
entityState := event.SetEntityState()
entityState.SetEntityType(entityType)
attrs.MoveTo(entityState.Attributes())
case experimentalmetricmetadata.EventTypeDelete:
deleteEvent := event.SetEntityDelete()
deleteEvent.SetEntityType(entityType)
}
}
return entityEvents, failed, err
return events, failed, err
}

func endpointEnvToAttrs(endpointType observer.EndpointType, endpointEnv observer.EndpointEnv) (pcommon.Map, error) {
Expand Down
Loading
Loading