Skip to content

Commit cc72277

Browse files
authored
[connector/routing] Add support for OTTL conditions (#35731)
1 parent e9a428b commit cc72277

File tree

8 files changed

+113
-30
lines changed

8 files changed

+113
-30
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: routingconnector
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Allow routing based on OTTL Conditions
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35731]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Each route must contain either a statement or a condition.
20+
21+
# If your change doesn't affect end users or the exported elements of any package,
22+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
23+
# Optional: The change log or logs in which this entry should be included.
24+
# e.g. '[user]' or '[user, api]'
25+
# Include 'user' if the change is relevant to end users.
26+
# Include 'api' if there is a change to a library API.
27+
# Default: '[user]'
28+
change_logs: []

connector/routingconnector/README.md

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ If you are not already familiar with connectors, you may find it helpful to firs
3232
The following settings are available:
3333

3434
- `table (required)`: the routing table for this connector.
35-
- `table.statement (required)`: the routing condition provided as the [OTTL] statement.
35+
- `table.statement`: the routing condition provided as the [OTTL] statement. Required if `table.condition` is not provided.
36+
- `table.condition`: the routing condition provided as the [OTTL] condition. Required if `table.statement` is not provided.
3637
- `table.pipelines (required)`: the list of pipelines to use when the routing condition is met.
3738
- `default_pipelines (optional)`: contains the list of pipelines to use when a record does not meet any of specified conditions.
3839
- `error_mode (optional)`: determines how errors returned from OTTL statements are handled. Valid values are `propagate`, `ignore` and `silent`. If `ignore` or `silent` is used and a statement's condition has an error then the payload will be routed to the default pipelines. When `silent` is used the error is not logged. If not supplied, `propagate` is used.
@@ -97,23 +98,20 @@ Respectively, if none of the routing conditions met, then a signal is routed to
9798
- The connector will only route using [OTTL] statements which can only be applied to resource attributes. It does not support matching on context values at this time.
9899
- The connector routes to pipelines, not exporters as the processor does.
99100
100-
### OTTL Limitations
101-
- Currently, it is not possible to specify boolean statements without function invocation as the routing condition. It is required to provide the NOOP `route()` or any other supported function as part of the routing statement, see [#13545](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/13545) for more information.
102-
- Supported [OTTL] functions:
103-
- [IsMatch](../../pkg/ottl/ottlfuncs/README.md#IsMatch)
104-
- [delete_key](../../pkg/ottl/ottlfuncs/README.md#delete_key)
105-
- [delete_matching_keys](../../pkg/ottl/ottlfuncs/README.md#delete_matching_keys)
101+
### Supported [OTTL] functions
102+
103+
- [IsMatch](../../pkg/ottl/ottlfuncs/README.md#IsMatch)
104+
- [delete_key](../../pkg/ottl/ottlfuncs/README.md#delete_key)
105+
- [delete_matching_keys](../../pkg/ottl/ottlfuncs/README.md#delete_matching_keys)
106106
107107
## Additional Settings
108+
108109
The full list of settings exposed for this connector are documented [here](./config.go) with detailed sample configuration files:
109110
110111
- [logs](./testdata/config_logs.yaml)
111112
- [metrics](./testdata/config_metrics.yaml)
112113
- [traces](./testdata/config_traces.yaml)
113114
114-
[in development]:https://github.com/open-telemetry/opentelemetry-collector#in-development
115115
[Connectors README]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md
116-
[Exporter Pipeline Type]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#exporter-pipeline-type
117-
[Receiver Pipeline Type]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#receiver-pipeline-type
118-
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
119-
[OTTL]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/processing.md#telemetry-query-language
116+
117+
[OTTL]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/README.md

connector/routingconnector/config.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ import (
1212
)
1313

1414
var (
15-
errEmptyRoute = errors.New("invalid route: no statement provided")
16-
errNoPipelines = errors.New("invalid route: no pipelines defined")
17-
errUnexpectedConsumer = errors.New("expected consumer to be a connector router")
18-
errNoTableItems = errors.New("invalid routing table: the routing table is empty")
15+
errNoConditionOrStatement = errors.New("invalid route: no condition or statement provided")
16+
errConditionAndStatement = errors.New("invalid route: both condition and statement provided")
17+
errNoPipelines = errors.New("invalid route: no pipelines defined")
18+
errUnexpectedConsumer = errors.New("expected consumer to be a connector router")
19+
errNoTableItems = errors.New("invalid routing table: the routing table is empty")
1920
)
2021

2122
// Config defines configuration for the Routing processor.
@@ -55,8 +56,12 @@ func (c *Config) Validate() error {
5556
// validate that every route has a value for the routing attribute and has
5657
// at least one pipeline
5758
for _, item := range c.Table {
58-
if len(item.Statement) == 0 {
59-
return errEmptyRoute
59+
if item.Statement == "" && item.Condition == "" {
60+
return errNoConditionOrStatement
61+
}
62+
63+
if item.Statement != "" && item.Condition != "" {
64+
return errConditionAndStatement
6065
}
6166

6267
if len(item.Pipelines) == 0 {
@@ -70,9 +75,13 @@ func (c *Config) Validate() error {
7075
// RoutingTableItem specifies how data should be routed to the different pipelines
7176
type RoutingTableItem struct {
7277
// Statement is a OTTL statement used for making a routing decision.
73-
// Required when 'Value' isn't provided.
78+
// One of 'Statement' or 'Condition' must be provided.
7479
Statement string `mapstructure:"statement"`
7580

81+
// Condition is an OTTL condition used for making a routing decision.
82+
// One of 'Statement' or 'Condition' must be provided.
83+
Condition string `mapstructure:"condition"`
84+
7685
// Pipelines contains the list of pipelines to use when the value from the FromAttribute field
7786
// matches this table item. When no pipelines are specified, the ones specified under
7887
// DefaultPipelines are used, if any.

connector/routingconnector/config_test.go

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func TestValidateConfig(t *testing.T) {
135135
},
136136
},
137137
},
138-
error: "invalid route: no statement provided",
138+
error: "invalid route: no condition or statement provided",
139139
},
140140
{
141141
name: "no pipeline provided",
@@ -162,11 +162,56 @@ func TestValidateConfig(t *testing.T) {
162162
config: &Config{},
163163
error: "invalid routing table: the routing table is empty",
164164
},
165+
{
166+
name: "condition provided",
167+
config: &Config{
168+
Table: []RoutingTableItem{
169+
{
170+
Condition: `attributes["attr"] == "acme"`,
171+
Pipelines: []pipeline.ID{
172+
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
173+
},
174+
},
175+
},
176+
},
177+
},
178+
{
179+
name: "statement provided",
180+
config: &Config{
181+
Table: []RoutingTableItem{
182+
{
183+
Statement: `route() where attributes["attr"] == "acme"`,
184+
Pipelines: []pipeline.ID{
185+
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
186+
},
187+
},
188+
},
189+
},
190+
},
191+
{
192+
name: "both condition and statement provided",
193+
config: &Config{
194+
Table: []RoutingTableItem{
195+
{
196+
Condition: `attributes["attr"] == "acme"`,
197+
Statement: `route() where attributes["attr"] == "acme"`,
198+
Pipelines: []pipeline.ID{
199+
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
200+
},
201+
},
202+
},
203+
},
204+
error: "invalid route: both condition and statement provided",
205+
},
165206
}
166207

167208
for _, tt := range tests {
168209
t.Run(tt.name, func(t *testing.T) {
169-
assert.EqualError(t, component.ValidateConfig(tt.config), tt.error)
210+
if tt.error == "" {
211+
assert.NoError(t, component.ValidateConfig(tt.config))
212+
} else {
213+
assert.EqualError(t, component.ValidateConfig(tt.config), tt.error)
214+
}
170215
})
171216
}
172217
}

connector/routingconnector/logs_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestLogsRegisterConsumersForValidRoute(t *testing.T) {
3131
Pipelines: []pipeline.ID{logs0},
3232
},
3333
{
34-
Statement: `route() where attributes["X-Tenant"] == "*"`,
34+
Condition: `attributes["X-Tenant"] == "*"`,
3535
Pipelines: []pipeline.ID{logs0, logs1},
3636
},
3737
},
@@ -84,7 +84,7 @@ func TestLogsAreCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) {
8484
DefaultPipelines: []pipeline.ID{logsDefault},
8585
Table: []RoutingTableItem{
8686
{
87-
Statement: `route() where IsMatch(attributes["X-Tenant"], ".*acme") == true`,
87+
Condition: `IsMatch(attributes["X-Tenant"], ".*acme") == true`,
8888
Pipelines: []pipeline.ID{logs0},
8989
},
9090
{
@@ -247,7 +247,7 @@ func TestLogsAreCorrectlyMatchOnceWithOTTL(t *testing.T) {
247247
Pipelines: []pipeline.ID{logs1},
248248
},
249249
{
250-
Statement: `route() where attributes["X-Tenant"] == "ecorp"`,
250+
Condition: `attributes["X-Tenant"] == "ecorp"`,
251251
Pipelines: []pipeline.ID{logsDefault, logs0},
252252
},
253253
},

connector/routingconnector/metrics_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestMetricsRegisterConsumersForValidRoute(t *testing.T) {
3131
Pipelines: []pipeline.ID{metrics0},
3232
},
3333
{
34-
Statement: `route() where attributes["X-Tenant"] == "*"`,
34+
Condition: `attributes["X-Tenant"] == "*"`,
3535
Pipelines: []pipeline.ID{metrics0, metrics1},
3636
},
3737
},
@@ -84,7 +84,7 @@ func TestMetricsAreCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) {
8484
DefaultPipelines: []pipeline.ID{metricsDefault},
8585
Table: []RoutingTableItem{
8686
{
87-
Statement: `route() where attributes["value"] > 2.5`,
87+
Condition: `attributes["value"] > 2.5`,
8888
Pipelines: []pipeline.ID{metrics0},
8989
},
9090
{
@@ -262,7 +262,7 @@ func TestMetricsAreCorrectlyMatchOnceWithOTTL(t *testing.T) {
262262
Pipelines: []pipeline.ID{metrics1},
263263
},
264264
{
265-
Statement: `route() where attributes["value"] == 1.0`,
265+
Condition: `attributes["value"] == 1.0`,
266266
Pipelines: []pipeline.ID{metricsDefault, metrics0},
267267
},
268268
},

connector/routingconnector/router.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ func (r *router[C]) registerRouteConsumers() error {
149149
// does not contain a valid OTTL statement then nil is returned.
150150
func (r *router[C]) getStatementFrom(item RoutingTableItem) (*ottl.Statement[ottlresource.TransformContext], error) {
151151
var statement *ottl.Statement[ottlresource.TransformContext]
152+
if item.Condition != "" {
153+
item.Statement = fmt.Sprintf("route() where %s", item.Condition)
154+
}
152155
if item.Statement != "" {
153156
var err error
154157
statement, err = r.parser.ParseStatement(item.Statement)

connector/routingconnector/traces_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestTracesRegisterConsumersForValidRoute(t *testing.T) {
3131
Pipelines: []pipeline.ID{traces0},
3232
},
3333
{
34-
Statement: `route() where attributes["X-Tenant"] == "*"`,
34+
Condition: `attributes["X-Tenant"] == "*"`,
3535
Pipelines: []pipeline.ID{traces0, traces1},
3636
},
3737
},
@@ -84,7 +84,7 @@ func TestTracesCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) {
8484
DefaultPipelines: []pipeline.ID{tracesDefault},
8585
Table: []RoutingTableItem{
8686
{
87-
Statement: `route() where attributes["value"] > 0 and attributes["value"] < 4`,
87+
Condition: `attributes["value"] > 0 and attributes["value"] < 4`,
8888
Pipelines: []pipeline.ID{traces0},
8989
},
9090
{
@@ -223,7 +223,7 @@ func TestTracesCorrectlyMatchOnceWithOTTL(t *testing.T) {
223223
Pipelines: []pipeline.ID{traces1},
224224
},
225225
{
226-
Statement: `route() where attributes["value"] == 5`,
226+
Condition: `attributes["value"] == 5`,
227227
Pipelines: []pipeline.ID{tracesDefault, traces0},
228228
},
229229
},

0 commit comments

Comments
 (0)