Skip to content

Commit 4e9f012

Browse files
evan-bradleyjriguera
authored andcommitted
[cmd/opampsupervisor] Fix ServerToAgent message field handling (open-telemetry#34349)
**Description:** Follow-up to open-telemetry#33576. Boolean short-circuiting was causing the `onMessage` handler to not process the whole `ServerToAgent` message if it included multiple fields. I noticed this was causing the Collector to fail to start when using the opamp-go demo server; the own metrics section is required for the Collector to start since it provides the only pipelines by default. I included a new unit test that verifies everything looks as we would expect when getting a message like what would be sent by the example server. --------- Co-authored-by: Evan Bradley <[email protected]>
1 parent 87ae10a commit 4e9f012

File tree

3 files changed

+102
-7
lines changed

3 files changed

+102
-7
lines changed

.chloggen/fix-onmessage.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: cmd/opampsupervisor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Ensure the Supervisor processes all fields in a ServerToAgent message.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [34349]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

cmd/opampsupervisor/supervisor/supervisor.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,16 +1190,17 @@ func (s *Supervisor) saveLastReceivedOwnTelemetrySettings(set *protobufs.Telemet
11901190

11911191
func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) {
11921192
configChanged := false
1193-
if msg.RemoteConfig != nil {
1194-
configChanged = configChanged || s.processRemoteConfigMessage(msg.RemoteConfig)
1193+
1194+
if msg.AgentIdentification != nil {
1195+
configChanged = s.processAgentIdentificationMessage(msg.AgentIdentification) || configChanged
11951196
}
11961197

1197-
if msg.OwnMetricsConnSettings != nil {
1198-
configChanged = configChanged || s.processOwnMetricsConnSettingsMessage(ctx, msg.OwnMetricsConnSettings)
1198+
if msg.RemoteConfig != nil {
1199+
configChanged = s.processRemoteConfigMessage(msg.RemoteConfig) || configChanged
11991200
}
12001201

1201-
if msg.AgentIdentification != nil {
1202-
configChanged = configChanged || s.processAgentIdentificationMessage(msg.AgentIdentification)
1202+
if msg.OwnMetricsConnSettings != nil {
1203+
configChanged = s.processOwnMetricsConnSettingsMessage(ctx, msg.OwnMetricsConnSettings) || configChanged
12031204
}
12041205

12051206
// Update the agent config if any messages have touched the config
@@ -1309,7 +1310,14 @@ func (s *Supervisor) processAgentIdentificationMessage(msg *protobufs.AgentIdent
13091310
s.logger.Error("Failed to send agent description to OpAMP server")
13101311
}
13111312

1312-
return true
1313+
// Need to recalculate the Agent config so that the new agent identification is included in it.
1314+
configChanged, err := s.composeMergedConfig(s.remoteConfig)
1315+
if err != nil {
1316+
s.logger.Error("Error composing merged config with new instance ID", zap.Error(err))
1317+
return false
1318+
}
1319+
1320+
return configChanged
13131321
}
13141322

13151323
func (s *Supervisor) persistentStateFilePath() string {

cmd/opampsupervisor/supervisor/supervisor_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,12 @@ func Test_onMessage(t *testing.T) {
104104
persistentState: &persistentState{InstanceID: initialID},
105105
agentDescription: agentDesc,
106106
agentConfigOwnMetricsSection: &atomic.Value{},
107+
mergedConfig: &atomic.Value{},
107108
effectiveConfig: &atomic.Value{},
108109
agentHealthCheckEndpoint: "localhost:8000",
109110
opampClient: client.NewHTTP(newLoggerFromZap(zap.NewNop())),
110111
}
112+
require.NoError(t, s.createTemplates())
111113

112114
s.onMessage(context.Background(), &types.MessageData{
113115
AgentIdentification: &protobufs.AgentIdentification{
@@ -131,9 +133,11 @@ func Test_onMessage(t *testing.T) {
131133
persistentState: &persistentState{InstanceID: testUUID},
132134
agentDescription: agentDesc,
133135
agentConfigOwnMetricsSection: &atomic.Value{},
136+
mergedConfig: &atomic.Value{},
134137
effectiveConfig: &atomic.Value{},
135138
agentHealthCheckEndpoint: "localhost:8000",
136139
}
140+
require.NoError(t, s.createTemplates())
137141

138142
s.onMessage(context.Background(), &types.MessageData{
139143
AgentIdentification: &protobufs.AgentIdentification{
@@ -175,6 +179,7 @@ func Test_onMessage(t *testing.T) {
175179
hasNewConfig: make(chan struct{}, 1),
176180
persistentState: &persistentState{InstanceID: testUUID},
177181
agentConfigOwnMetricsSection: &atomic.Value{},
182+
mergedConfig: &atomic.Value{},
178183
effectiveConfig: &atomic.Value{},
179184
agentConn: agentConnAtomic,
180185
agentHealthCheckEndpoint: "localhost:8000",
@@ -231,6 +236,61 @@ func Test_onMessage(t *testing.T) {
231236
require.True(t, gotMessage, "Message was not sent to agent")
232237
})
233238

239+
t.Run("Processes all ServerToAgent fields", func(t *testing.T) {
240+
agentDesc := &atomic.Value{}
241+
agentDesc.Store(&protobufs.AgentDescription{
242+
NonIdentifyingAttributes: []*protobufs.KeyValue{
243+
{
244+
Key: "runtime.type",
245+
Value: &protobufs.AnyValue{
246+
Value: &protobufs.AnyValue_StringValue{
247+
StringValue: "test",
248+
},
249+
},
250+
},
251+
},
252+
})
253+
initialID := uuid.MustParse("018fee23-4a51-7303-a441-73faed7d9deb")
254+
newID := uuid.MustParse("018fef3f-14a8-73ef-b63e-3b96b146ea38")
255+
s := Supervisor{
256+
logger: zap.NewNop(),
257+
pidProvider: defaultPIDProvider{},
258+
config: config.Supervisor{},
259+
hasNewConfig: make(chan struct{}, 1),
260+
persistentState: &persistentState{InstanceID: initialID},
261+
agentDescription: agentDesc,
262+
agentConfigOwnMetricsSection: &atomic.Value{},
263+
mergedConfig: &atomic.Value{},
264+
effectiveConfig: &atomic.Value{},
265+
agentHealthCheckEndpoint: "localhost:8000",
266+
opampClient: client.NewHTTP(newLoggerFromZap(zap.NewNop())),
267+
}
268+
require.NoError(t, s.createTemplates())
269+
270+
s.onMessage(context.Background(), &types.MessageData{
271+
AgentIdentification: &protobufs.AgentIdentification{
272+
NewInstanceUid: newID[:],
273+
},
274+
RemoteConfig: &protobufs.AgentRemoteConfig{
275+
Config: &protobufs.AgentConfigMap{
276+
ConfigMap: map[string]*protobufs.AgentConfigFile{
277+
"": {
278+
Body: []byte(""),
279+
},
280+
},
281+
},
282+
},
283+
OwnMetricsConnSettings: &protobufs.TelemetryConnectionSettings{
284+
DestinationEndpoint: "http://localhost:4318",
285+
},
286+
})
287+
288+
require.Equal(t, newID, s.persistentState.InstanceID)
289+
t.Log(s.mergedConfig.Load())
290+
require.Contains(t, s.mergedConfig.Load(), "prometheus/own_metrics")
291+
require.Contains(t, s.mergedConfig.Load(), newID.String())
292+
require.Contains(t, s.mergedConfig.Load(), "runtime.type: test")
293+
})
234294
}
235295

236296
func Test_handleAgentOpAMPMessage(t *testing.T) {

0 commit comments

Comments
 (0)