Skip to content

Commit ce4042a

Browse files
authored
Add ability to process logs in schema processor (#38615)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Add ability to process logs in schema processor based on the target version <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 854101c commit ce4042a

File tree

4 files changed

+185
-3
lines changed

4 files changed

+185
-3
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: schemaprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Adds functionality to transform logs
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38615]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
- Adds functionality to transform logs using the target schema version.
20+
21+
# If your change doesn't affect end users or the exported elements of any package,
22+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
23+
# Optional: The change log or logs in which this entry should be included.
24+
# e.g. '[user]' or '[user, api]'
25+
# Include 'user' if the change is relevant to end users.
26+
# Include 'api' if there is a change to a library API.
27+
# Default: '[user]'
28+
change_logs: []

processor/schemaprocessor/processor.go

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,45 @@ func newSchemaProcessor(_ context.Context, conf component.Config, set processor.
4747
}, nil
4848
}
4949

50-
func (t schemaProcessor) processLogs(_ context.Context, ld plog.Logs) (plog.Logs, error) {
50+
func (t schemaProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) {
51+
for rt := 0; rt < ld.ResourceLogs().Len(); rt++ {
52+
rLogs := ld.ResourceLogs().At(rt)
53+
resourceSchemaURL := rLogs.SchemaUrl()
54+
if resourceSchemaURL != "" {
55+
t.log.Debug("requesting translation for resourceSchemaURL", zap.String("resourceSchemaURL", resourceSchemaURL))
56+
tr, err := t.manager.
57+
RequestTranslation(ctx, resourceSchemaURL)
58+
if err != nil {
59+
t.log.Error("failed to request translation", zap.Error(err))
60+
return ld, err
61+
}
62+
err = tr.ApplyAllResourceChanges(rLogs, resourceSchemaURL)
63+
if err != nil {
64+
t.log.Error("failed to apply resource changes", zap.Error(err))
65+
return ld, err
66+
}
67+
}
68+
for ss := 0; ss < rLogs.ScopeLogs().Len(); ss++ {
69+
logs := rLogs.ScopeLogs().At(ss)
70+
logsSchemaURL := logs.SchemaUrl()
71+
if logsSchemaURL == "" {
72+
logsSchemaURL = resourceSchemaURL
73+
}
74+
if logsSchemaURL == "" {
75+
continue
76+
}
77+
tr, err := t.manager.
78+
RequestTranslation(ctx, logsSchemaURL)
79+
if err != nil {
80+
t.log.Error("failed to request translation", zap.Error(err))
81+
continue
82+
}
83+
err = tr.ApplyScopeLogChanges(logs, logsSchemaURL)
84+
if err != nil {
85+
t.log.Error("failed to apply scope log changes", zap.Error(err))
86+
}
87+
}
88+
}
5189
return ld, nil
5290
}
5391

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package schemaprocessor
5+
6+
import (
7+
"context"
8+
"testing"
9+
10+
"github.com/stretchr/testify/assert"
11+
"go.opentelemetry.io/collector/pdata/plog"
12+
)
13+
14+
func TestLogs_RenameAttributes(t *testing.T) {
15+
t.Parallel()
16+
17+
tests := []struct {
18+
name string
19+
in plog.Logs
20+
out plog.Logs
21+
transformations string
22+
targetVersion string
23+
}{
24+
{
25+
name: "one_version_downgrade",
26+
in: func() plog.Logs {
27+
in := plog.NewLogs()
28+
in.ResourceLogs().AppendEmpty()
29+
in.ResourceLogs().At(0).SetSchemaUrl("http://opentelemetry.io/schemas/1.9.0")
30+
in.ResourceLogs().At(0).Resource().Attributes().PutStr("new.resource.name", "test-cluster")
31+
in.ResourceLogs().At(0).ScopeLogs().AppendEmpty()
32+
in.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().AppendEmpty()
33+
in.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("new.attr.name", "my-att-cluster")
34+
return in
35+
}(),
36+
out: func() plog.Logs {
37+
out := plog.NewLogs()
38+
out.ResourceLogs().AppendEmpty()
39+
out.ResourceLogs().At(0).SetSchemaUrl("http://opentelemetry.io/schemas/1.8.0")
40+
out.ResourceLogs().At(0).Resource().Attributes().PutStr("old.resource.name", "test-cluster")
41+
out.ResourceLogs().At(0).ScopeLogs().AppendEmpty()
42+
out.ResourceLogs().At(0).ScopeLogs().At(0).SetSchemaUrl("http://opentelemetry.io/schemas/1.8.0")
43+
out.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().AppendEmpty()
44+
out.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("old.attr.name", "my-att-cluster")
45+
46+
return out
47+
}(),
48+
transformations: `
49+
1.9.0:
50+
all:
51+
changes:
52+
- rename_attributes:
53+
attribute_map:
54+
old.resource.name: new.resource.name
55+
logs:
56+
changes:
57+
- rename_attributes:
58+
attribute_map:
59+
old.attr.name: new.attr.name
60+
1.8.0:`,
61+
targetVersion: "1.8.0",
62+
},
63+
{
64+
name: "one_version_upgrade",
65+
in: func() plog.Logs {
66+
in := plog.NewLogs()
67+
in.ResourceLogs().AppendEmpty()
68+
in.ResourceLogs().At(0).SetSchemaUrl("http://opentelemetry.io/schemas/1.8.0")
69+
in.ResourceLogs().At(0).Resource().Attributes().PutStr("old.resource.name", "test-cluster")
70+
in.ResourceLogs().At(0).ScopeLogs().AppendEmpty()
71+
in.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().AppendEmpty()
72+
in.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("old.attr.name", "my-att-cluster")
73+
74+
return in
75+
}(),
76+
out: func() plog.Logs {
77+
out := plog.NewLogs()
78+
out.ResourceLogs().AppendEmpty()
79+
out.ResourceLogs().At(0).SetSchemaUrl("http://opentelemetry.io/schemas/1.9.0")
80+
out.ResourceLogs().At(0).Resource().Attributes().PutStr("new.resource.name", "test-cluster")
81+
out.ResourceLogs().At(0).ScopeLogs().AppendEmpty()
82+
out.ResourceLogs().At(0).ScopeLogs().At(0).SetSchemaUrl("http://opentelemetry.io/schemas/1.9.0")
83+
out.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().AppendEmpty()
84+
out.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("new.attr.name", "my-att-cluster")
85+
return out
86+
}(),
87+
transformations: `
88+
1.9.0:
89+
all:
90+
changes:
91+
- rename_attributes:
92+
attribute_map:
93+
old.resource.name: new.resource.name
94+
logs:
95+
changes:
96+
- rename_attributes:
97+
attribute_map:
98+
old.attr.name: new.attr.name
99+
1.8.0:`,
100+
targetVersion: "1.9.0",
101+
},
102+
}
103+
104+
for _, tt := range tests {
105+
t.Run(tt.name, func(t *testing.T) {
106+
pr := newTestSchemaProcessor(t, tt.transformations, tt.targetVersion)
107+
ctx := context.Background()
108+
out, err := pr.processLogs(ctx, tt.in)
109+
if err != nil {
110+
t.Errorf("Error while processing logs: %v", err)
111+
}
112+
assert.Equal(t, tt.out, out, "Logs transformation failed")
113+
})
114+
}
115+
}

processor/schemaprocessor/processor_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ type dummySchemaProvider struct {
2525
}
2626

2727
func (m *dummySchemaProvider) Retrieve(_ context.Context, _ string) (string, error) {
28+
transformations := strings.ReplaceAll(m.transformations, "\t", " ")
2829
data := fmt.Sprintf(`
2930
file_format: 1.1.0
3031
schema_url: http://opentelemetry.io/schemas/1.9.0
31-
versions:
32-
%s`, m.transformations)
32+
versions:%s`, transformations)
33+
3334
data = strings.TrimSpace(data)
3435
return data, nil
3536
}

0 commit comments

Comments
 (0)