Skip to content

Commit 01eb19f

Browse files
committed
add support for dynamic index in OpenSearch exporter
Signed-off-by: Shenoy Pratik <[email protected]>
1 parent 4fec8ff commit 01eb19f

File tree

3 files changed

+135
-13
lines changed

3 files changed

+135
-13
lines changed

exporter/opensearchexporter/README.md

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,92 @@
11
# OpenSearch Exporter
22

33
<!-- status autogenerated section -->
4-
| Status | |
5-
| ------------- |-----------|
6-
| Stability | [unmaintained]: traces, logs |
7-
| Distributions | [contrib] |
8-
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Fopensearch%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Fopensearch) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Fopensearch%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Fopensearch) |
9-
| Code coverage | [![codecov](https://codecov.io/github/open-telemetry/opentelemetry-collector-contrib/graph/main/badge.svg?component=exporter_opensearch)](https://app.codecov.io/gh/open-telemetry/opentelemetry-collector-contrib/tree/main/?components%5B0%5D=exporter_opensearch&displayType=list) |
10-
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | \| Seeking more code owners! |
11-
| Emeritus | [@Aneurysm9](https://www.github.com/Aneurysm9), [@MitchellGale](https://www.github.com/MitchellGale), [@MaxKsyunz](https://www.github.com/MaxKsyunz), [@YANG-DB](https://www.github.com/YANG-DB) |
4+
5+
| Status | |
6+
| -------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
7+
| Stability | [unmaintained]: traces, logs |
8+
| Distributions | [contrib] |
9+
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Fopensearch%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Fopensearch) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Fopensearch%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Fopensearch) |
10+
| Code coverage | [![codecov](https://codecov.io/github/open-telemetry/opentelemetry-collector-contrib/graph/main/badge.svg?component=exporter_opensearch)](https://app.codecov.io/gh/open-telemetry/opentelemetry-collector-contrib/tree/main/?components%5B0%5D=exporter_opensearch&displayType=list) |
11+
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | \| Seeking more code owners! |
12+
| Emeritus | [@Aneurysm9](https://www.github.com/Aneurysm9), [@MitchellGale](https://www.github.com/MitchellGale), [@MaxKsyunz](https://www.github.com/MaxKsyunz), [@YANG-DB](https://www.github.com/YANG-DB) |
1213

1314
[unmaintained]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#unmaintained
1415
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
16+
1517
<!-- end autogenerated section -->
1618

1719
OpenSearch exporter supports sending OpenTelemetry signals as documents to [OpenSearch](https://www.opensearch.org).
1820

1921
The documents are sent using [observability catalog](https://github.com/opensearch-project/opensearch-catalog/tree/main/schema/observability) schema.
2022

2123
## Configuration options
24+
2225
### Indexing Options
26+
2327
The Observability indices would follow the recommended pattern for immutable data stream ingestion using
2428
the [data_stream](https://opensearch.org/docs/latest/dashboards/im-dashboards/datastream) concepts.
2529
Index pattern will follow the next naming template `ss4o_{type}-{dataset}-{namespace}`
30+
2631
- `dataset` (default=`default`) a user-provided label to classify source of telemetry. It is used to construct the name of the destination index or data stream.
2732
- `namespace` (default=`namespace`) a user-provided label to group telemetry. It is used to construct the name of the destination index or data stream.
2833

2934
LogsIndex configures the index, index alias, or data stream name logs should be indexed in.
35+
3036
- `logs_index` a user-provided label to specify name of the destination index or data stream.
3137

38+
## Dynamic Log Indexing
39+
40+
The OpenSearch exporter supports dynamic log index names using placeholders in the `logs_index` config. You can use any attribute or context key as a placeholder to construct index names dynamically per log record.
41+
42+
- Placeholder: `%{key}`
43+
44+
- Example: `otel-logs-%{service.name}` or `otel-logs-%{custom.label}`
45+
- The value is looked up from a context map (resource attributes, log attributes, etc.).
46+
- If the key is missing, the value from `logs_index_fallback` is used (or `unknown` if not set).
47+
48+
- Time Suffix: You can append a time-formatted suffix to the index name using the `logs_index_time_format` option.
49+
- `logs_index_time_format`: If set, appends a time suffix to the resolved index name using the specified format (default is no suffix).
50+
51+
### Example Configuration
52+
53+
````yaml
54+
exporters:
55+
opensearch:
56+
http:
57+
endpoint: https://opensearch.example.com:9200
58+
logs_index: "otel-logs-%{service.name}"
59+
logs_index_fallback: "default-service" # optional, if not set default is ss4o_logs-default-namespace
60+
logs_index_time_format: "yyyy.MM.dd" # optional, if set appends time suffix
61+
62+
This will create log indexes like `otel-logs-myservice-2024.06.07`. If `service.name` is missing, `otel-logs-default-service-2024.06.07` will be used.
63+
3264
### HTTP Connection Options
65+
3366
OpenSearch export supports standard [HTTP client settings](https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/confighttp#client-configuration).
67+
3468
- `http.endpoint` (required) `<url>:<port>` of OpenSearch node to send data to.
3569

3670
### TLS settings
71+
3772
Supports standard TLS settings as part of HTTP settings. See [TLS Configuration/Client Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md#client-configuration).
3873

3974
### Retry Options
75+
4076
- `retry_on_failure`: See [retry_on_failure](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
4177

4278
### Sending Queue Options
79+
4380
- `sending_queue`: See [sending_queue](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
4481

4582
### Timeout Options
83+
4684
- `timeout` : See [timeout](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
4785

4886
### Bulk Indexer Options
49-
- `bulk_action` (optional): the [action](https://opensearch.org/docs/2.9/api-reference/document-apis/bulk/) for ingesting data. Only `create` and `index` are allowed here.
87+
88+
- `bulk_action` (optional): the [action](https://opensearch.org/docs/2.9/api-reference/document-apis/bulk/) for ingesting data. Only `create` and `index` are allowed here.
89+
5090
## Example
5191

5292
```yaml
@@ -55,7 +95,7 @@ extensions:
5595
client_auth:
5696
username: username
5797
password: password
58-
98+
5999
exporters:
60100
opensearch/trace:
61101
http:
@@ -69,4 +109,4 @@ service:
69109
receivers: [otlp]
70110
exporters: [opensearch/trace]
71111
processors: [batch]
72-
```
112+
````

exporter/opensearchexporter/config.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ type Config struct {
4343
// LogsIndex configures the index, index alias, or data stream name logs should be indexed in.
4444
// https://opensearch.org/docs/latest/im-plugin/index/
4545
// https://opensearch.org/docs/latest/dashboards/im-dashboards/datastream/
46-
LogsIndex string `mapstructure:"logs_index"`
46+
LogsIndex string `mapstructure:"logs_index"`
47+
LogsIndexFallback string `mapstructure:"logs_index_fallback"`
48+
LogsIndexTimeFormat string `mapstructure:"logs_index_time_format"`
4749

4850
// BulkAction configures the action for ingesting data. Only `create` and `index` are allowed here.
4951
// If not specified, the default value `create` will be used.

exporter/opensearchexporter/sso_log_exporter.go

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@ package opensearchexporter // import "github.com/open-telemetry/opentelemetry-co
55

66
import (
77
"context"
8+
"regexp"
89
"strings"
10+
"time"
911

1012
"github.com/opensearch-project/opensearch-go/v2"
1113
"go.opentelemetry.io/collector/component"
1214
"go.opentelemetry.io/collector/config/confighttp"
1315
"go.opentelemetry.io/collector/exporter"
1416
"go.opentelemetry.io/collector/pdata/plog"
17+
"go.opentelemetry.io/collector/pdata/pcommon"
1518
)
1619

1720
type logExporter struct {
@@ -21,6 +24,7 @@ type logExporter struct {
2124
model mappingModel
2225
httpSettings confighttp.ClientConfig
2326
telemetry component.TelemetrySettings
27+
config *Config // add config reference
2428
}
2529

2630
func newLogExporter(cfg *Config, set exporter.Settings) *logExporter {
@@ -41,6 +45,7 @@ func newLogExporter(cfg *Config, set exporter.Settings) *logExporter {
4145
bulkAction: cfg.BulkAction,
4246
httpSettings: cfg.ClientConfig,
4347
model: model,
48+
config: cfg, // set config
4449
}
4550
}
4651

@@ -60,16 +65,91 @@ func (l *logExporter) Start(ctx context.Context, host component.Host) error {
6065
}
6166

6267
func (l *logExporter) pushLogData(ctx context.Context, ld plog.Logs) error {
63-
indexer := newLogBulkIndexer(l.Index, l.bulkAction, l.model)
68+
indexer := newLogBulkIndexer("", l.bulkAction, l.model)
6469
startErr := indexer.start(l.client)
6570
if startErr != nil {
6671
return startErr
6772
}
73+
74+
// Collect attributes from resource/log record
75+
attrs := collectAttributes(ld)
76+
logTimestamp := time.Now() // Replace with actual log timestamp extraction
77+
indexName := resolveLogIndexName(l.config, attrs, logTimestamp)
78+
indexer.index = indexName
6879
indexer.submit(ctx, ld)
6980
indexer.close(ctx)
7081
return indexer.joinedError()
7182
}
7283

84+
// collectAttributes extracts resource and log record attributes into a flat map for placeholder resolution.
85+
func collectAttributes(ld plog.Logs) map[string]string {
86+
attrs := make(map[string]string)
87+
resLogsSlice := ld.ResourceLogs()
88+
for i := 0; i < resLogsSlice.Len(); i++ {
89+
resLogs := resLogsSlice.At(i)
90+
// Resource attributes
91+
resAttrs := resLogs.Resource().Attributes()
92+
resAttrs.Range(func(k string, v pcommon.Value) bool {
93+
attrs[k] = v.AsString()
94+
return true
95+
})
96+
logSlice := resLogs.ScopeLogs()
97+
for j := 0; j < logSlice.Len(); j++ {
98+
scopeLogs := logSlice.At(j)
99+
// Instrumentation scope attributes
100+
if scope := scopeLogs.Scope(); scope.Name() != "" {
101+
attrs["scope.name"] = scope.Name()
102+
}
103+
if scope := scopeLogs.Scope(); scope.Version() != "" {
104+
attrs["scope.version"] = scope.Version()
105+
}
106+
logs := scopeLogs.LogRecords()
107+
for k := 0; k < logs.Len(); k++ {
108+
logAttrs := logs.At(k).Attributes()
109+
logAttrs.Range(func(k string, v pcommon.Value) bool {
110+
attrs[k] = v.AsString()
111+
return true
112+
})
113+
}
114+
}
115+
}
116+
return attrs
117+
}
118+
119+
// resolveLogIndexName resolves the logs index name using placeholders, fallback, and time format.
120+
func resolveLogIndexName(cfg *Config, attrs map[string]string, t time.Time) string {
121+
index := cfg.LogsIndex
122+
placeholderPattern := regexp.MustCompile(`%\{([^}]+)\}`)
123+
index = placeholderPattern.ReplaceAllStringFunc(index, func(match string) string {
124+
key := placeholderPattern.FindStringSubmatch(match)[1]
125+
if val, ok := attrs[key]; ok && val != "" {
126+
return val
127+
}
128+
if cfg.LogsIndexFallback != "" {
129+
return cfg.LogsIndexFallback
130+
}
131+
return "unknown" // default if placeholder not found
132+
})
133+
if cfg.LogsIndexTimeFormat != "" {
134+
index = index + "-" + t.Format(convertGoTimeFormat(cfg.LogsIndexTimeFormat))
135+
}
136+
return index
137+
}
138+
139+
// convertGoTimeFormat converts a Java-style date format to Go's time format.
140+
func convertGoTimeFormat(format string) string {
141+
// Support yyyy, yy, MM, dd, HH, mm, ss -> 2006, 06, 01, 02, 15, 04, 05
142+
f := format
143+
f = strings.ReplaceAll(f, "yyyy", "2006")
144+
f = strings.ReplaceAll(f, "yy", "06")
145+
f = strings.ReplaceAll(f, "MM", "01")
146+
f = strings.ReplaceAll(f, "dd", "02")
147+
f = strings.ReplaceAll(f, "HH", "15")
148+
f = strings.ReplaceAll(f, "mm", "04")
149+
f = strings.ReplaceAll(f, "ss", "05")
150+
return f
151+
}
152+
73153
func getIndexName(dataset, namespace, index string) string {
74154
if len(index) != 0 {
75155
return index

0 commit comments

Comments
 (0)