Skip to content

[awsemfexporter] Add check for unhandled metric data types #1493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions exporter/awsemfexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package awsemfexporter

import (
"go.opentelemetry.io/collector/config/configmodels"
"go.uber.org/zap"
)

// Config defines configuration for AWS EMF exporter.
Expand Down Expand Up @@ -53,4 +54,7 @@ type Config struct {
// "SingleDimensionRollupOnly" - Enable single dimension rollup
// "NoDimensionRollup" - No dimension rollup (only keep original metrics which contain all dimensions)
DimensionRollupOption string `mapstructure:"dimension_rollup_option"`

// logger is the Logger used for writing error/warning logs
logger *zap.Logger
}
13 changes: 8 additions & 5 deletions exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,11 @@ func New(
}

logger := params.Logger
expConfig := config.(*Config)
expConfig.logger = logger

// create AWS session
awsConfig, session, err := GetAWSConfigSession(logger, &Conn{}, config.(*Config))
awsConfig, session, err := GetAWSConfigSession(logger, &Conn{}, expConfig)
if err != nil {
return nil, err
}
Expand All @@ -77,11 +80,10 @@ func New(

func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) (droppedTimeSeries int, err error) {
expConfig := emf.config.(*Config)
dimensionRollupOption := expConfig.DimensionRollupOption
logGroup := "/metrics/default"
logStream := fmt.Sprintf("otel-stream-%s", emf.collectorID)
// override log group if customer has specified Resource Attributes service.name or service.namespace
putLogEvents, totalDroppedMetrics, namespace := generateLogEventFromMetric(md, dimensionRollupOption, expConfig.Namespace)
putLogEvents, totalDroppedMetrics, namespace := generateLogEventFromMetric(md, expConfig)
if namespace != "" {
logGroup = fmt.Sprintf("/metrics/%s", namespace)
}
Expand Down Expand Up @@ -169,7 +171,8 @@ func (emf *emfExporter) Start(ctx context.Context, host component.Host) error {
return nil
}

func generateLogEventFromMetric(metric pdata.Metrics, dimensionRollupOption string, namespace string) ([]*LogEvent, int, string) {
func generateLogEventFromMetric(metric pdata.Metrics, config *Config) ([]*LogEvent, int, string) {
namespace := config.Namespace
rms := metric.ResourceMetrics()
cwMetricLists := []*CWMetrics{}
var cwm []*CWMetrics
Expand All @@ -180,7 +183,7 @@ func generateLogEventFromMetric(metric pdata.Metrics, dimensionRollupOption stri
if rm.IsNil() {
continue
}
cwm, totalDroppedMetrics = TranslateOtToCWMetric(&rm, dimensionRollupOption, namespace)
cwm, totalDroppedMetrics = TranslateOtToCWMetric(&rm, config)
if len(cwm) > 0 && len(cwm[0].Measurements) > 0 {
namespace = cwm[0].Measurements[0].Namespace
}
Expand Down
2 changes: 2 additions & 0 deletions exporter/awsemfexporter/emf_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,11 @@ func TestNewExporterWithoutConfig(t *testing.T) {
defer popEnv(env)
os.Setenv("AWS_STS_REGIONAL_ENDPOINTS", "fake")

assert.Nil(t, expCfg.logger)
exp, err := New(expCfg, component.ExporterCreateParams{Logger: zap.NewNop()})
assert.NotNil(t, err)
assert.Nil(t, exp)
assert.NotNil(t, expCfg.logger)
}

func TestNewExporterWithoutSession(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions exporter/awsemfexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func createDefaultConfig() configmodels.Exporter {
Region: "",
RoleARN: "",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
logger: nil,
}
}

Expand Down
29 changes: 21 additions & 8 deletions exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/conventions"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/mapwithexpiry"
)
Expand Down Expand Up @@ -118,9 +119,10 @@ func (dps DoubleHistogramDataPointSlice) At(i int) DataPoint {
}

// TranslateOtToCWMetric converts OT metrics to CloudWatch Metric format
func TranslateOtToCWMetric(rm *pdata.ResourceMetrics, dimensionRollupOption string, namespace string) ([]*CWMetrics, int) {
func TranslateOtToCWMetric(rm *pdata.ResourceMetrics, config *Config) ([]*CWMetrics, int) {
var cwMetricList []*CWMetrics
totalDroppedMetrics := 0
namespace := config.Namespace
var instrumentationLibName string

if len(namespace) == 0 && !rm.Resource().IsNil() {
Expand Down Expand Up @@ -158,7 +160,7 @@ func TranslateOtToCWMetric(rm *pdata.ResourceMetrics, dimensionRollupOption stri
totalDroppedMetrics++
continue
}
cwMetrics := getCWMetrics(&metric, namespace, instrumentationLibName, dimensionRollupOption)
cwMetrics := getCWMetrics(&metric, namespace, instrumentationLibName, config)
cwMetricList = append(cwMetricList, cwMetrics...)
}
}
Expand Down Expand Up @@ -192,10 +194,13 @@ func TranslateCWMetricToEMF(cwMetricLists []*CWMetrics) []*LogEvent {
}

// Translates OTLP Metric to list of CW Metrics
func getCWMetrics(metric *pdata.Metric, namespace string, instrumentationLibName string, dimensionRollupOption string) []*CWMetrics {
var result []*CWMetrics
func getCWMetrics(metric *pdata.Metric, namespace string, instrumentationLibName string, config *Config) (cwMetrics []*CWMetrics) {
var dps DataPoints

if metric == nil {
return
}

// metric measure data from OT
metricMeasure := make(map[string]string)
metricMeasure["Name"] = metric.Name()
Expand All @@ -215,22 +220,30 @@ func getCWMetrics(metric *pdata.Metric, namespace string, instrumentationLibName
dps = DoubleDataPointSlice{metric.DoubleSum().DataPoints()}
case pdata.MetricDataTypeDoubleHistogram:
dps = DoubleHistogramDataPointSlice{metric.DoubleHistogram().DataPoints()}
default:
config.logger.Warn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.debug level I think, this will be too spammy right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @anuraaga, thanks for the feedback! Since this is an edge case, it shouldn't happen often, and we do want to alert the customer that this is happening vs just having it in the debug level. I'm happy to turn this down to debug if down the road this edge case is occurring more frequently than we think. :)

cc: @mxiamxia

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix this in a followup PR if needed

"Unhandled metric data type.",
zap.String("DataType", metric.DataType().String()),
zap.String("Name", metric.Name()),
zap.String("Unit", metric.Unit()),
)
return
}

if dps.Len() == 0 {
return result
return
}
for m := 0; m < dps.Len(); m++ {
dp := dps.At(m)
if dp.IsNil() {
continue
}
cwMetric := buildCWMetric(dp, metric, namespace, metricSlice, instrumentationLibName, dimensionRollupOption)
cwMetric := buildCWMetric(dp, metric, namespace, metricSlice, instrumentationLibName, config.DimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
cwMetrics = append(cwMetrics, cwMetric)
}
}
return result
return
}

// Build CWMetric from DataPoint
Expand Down
84 changes: 74 additions & 10 deletions exporter/awsemfexporter/metric_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/conventions"
"go.opentelemetry.io/collector/translator/internaldata"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
)

// Asserts whether dimension sets are equal (i.e. has same sets of dimensions)
Expand All @@ -53,14 +56,17 @@ func assertDimsEqual(t *testing.T, expected, actual [][]string) {
}

func TestTranslateOtToCWMetricWithInstrLibrary(t *testing.T) {

config := &Config{
Namespace: "",
DimensionRollupOption: ZeroAndSingleDimensionRollup,
}
md := createMetricTestData()
rm := internaldata.OCToMetrics(md).ResourceMetrics().At(0)
ilms := rm.InstrumentationLibraryMetrics()
ilm := ilms.At(0)
ilm.InstrumentationLibrary().InitEmpty()
ilm.InstrumentationLibrary().SetName("cloudwatch-lib")
cwm, totalDroppedMetrics := TranslateOtToCWMetric(&rm, ZeroAndSingleDimensionRollup, "")
cwm, totalDroppedMetrics := TranslateOtToCWMetric(&rm, config)
assert.Equal(t, 1, totalDroppedMetrics)
assert.NotNil(t, cwm)
assert.Equal(t, 5, len(cwm))
Expand Down Expand Up @@ -91,10 +97,13 @@ func TestTranslateOtToCWMetricWithInstrLibrary(t *testing.T) {
}

func TestTranslateOtToCWMetricWithoutInstrLibrary(t *testing.T) {

config := &Config{
Namespace: "",
DimensionRollupOption: ZeroAndSingleDimensionRollup,
}
md := createMetricTestData()
rm := internaldata.OCToMetrics(md).ResourceMetrics().At(0)
cwm, totalDroppedMetrics := TranslateOtToCWMetric(&rm, ZeroAndSingleDimensionRollup, "")
cwm, totalDroppedMetrics := TranslateOtToCWMetric(&rm, config)
assert.Equal(t, 1, totalDroppedMetrics)
assert.NotNil(t, cwm)
assert.Equal(t, 5, len(cwm))
Expand Down Expand Up @@ -127,6 +136,10 @@ func TestTranslateOtToCWMetricWithoutInstrLibrary(t *testing.T) {
}

func TestTranslateOtToCWMetricWithNameSpace(t *testing.T) {
config := &Config{
Namespace: "",
DimensionRollupOption: ZeroAndSingleDimensionRollup,
}
md := consumerdata.MetricsData{
Node: &commonpb.Node{
LibraryInfo: &commonpb.LibraryInfo{ExporterVersion: "SomeVersion"},
Expand All @@ -139,7 +152,7 @@ func TestTranslateOtToCWMetricWithNameSpace(t *testing.T) {
Metrics: []*metricspb.Metric{},
}
rm := internaldata.OCToMetrics(md).ResourceMetrics().At(0)
cwm, totalDroppedMetrics := TranslateOtToCWMetric(&rm, ZeroAndSingleDimensionRollup, "")
cwm, totalDroppedMetrics := TranslateOtToCWMetric(&rm, config)
assert.Equal(t, 0, totalDroppedMetrics)
assert.Nil(t, cwm)
assert.Equal(t, 0, len(cwm))
Expand Down Expand Up @@ -231,7 +244,7 @@ func TestTranslateOtToCWMetricWithNameSpace(t *testing.T) {
},
}
rm = internaldata.OCToMetrics(md).ResourceMetrics().At(0)
cwm, totalDroppedMetrics = TranslateOtToCWMetric(&rm, ZeroAndSingleDimensionRollup, "")
cwm, totalDroppedMetrics = TranslateOtToCWMetric(&rm, config)
assert.Equal(t, 0, totalDroppedMetrics)
assert.NotNil(t, cwm)
assert.Equal(t, 1, len(cwm))
Expand Down Expand Up @@ -269,6 +282,9 @@ func TestGetCWMetrics(t *testing.T) {
namespace := "Namespace"
OTelLib := "OTelLib"
instrumentationLibName := "InstrLibName"
config := &Config{
DimensionRollupOption: "",
}

testCases := []struct {
testName string
Expand Down Expand Up @@ -756,7 +772,7 @@ func TestGetCWMetrics(t *testing.T) {
assert.Equal(t, 1, metrics.Len())
metric := metrics.At(0)

cwMetrics := getCWMetrics(&metric, namespace, instrumentationLibName, "")
cwMetrics := getCWMetrics(&metric, namespace, instrumentationLibName, config)
assert.Equal(t, len(tc.expected), len(cwMetrics))

for i, expected := range tc.expected {
Expand All @@ -768,6 +784,42 @@ func TestGetCWMetrics(t *testing.T) {
}
})
}

t.Run("Unhandled metric type", func(t *testing.T) {
metric := pdata.NewMetric()
metric.InitEmpty()
metric.SetName("foo")
metric.SetUnit("Count")
metric.SetDataType(pdata.MetricDataTypeIntHistogram)

obs, logs := observer.New(zap.WarnLevel)
obsConfig := &Config{
DimensionRollupOption: "",
logger: zap.New(obs),
}

cwMetrics := getCWMetrics(&metric, namespace, instrumentationLibName, obsConfig)
assert.Nil(t, cwMetrics)

// Test output warning logs
expectedLogs := []observer.LoggedEntry{
{
Entry: zapcore.Entry{Level: zap.WarnLevel, Message: "Unhandled metric data type."},
Context: []zapcore.Field{
zap.String("DataType", "IntHistogram"),
zap.String("Name", "foo"),
zap.String("Unit", "Count"),
},
},
}
assert.Equal(t, 1, logs.Len())
assert.Equal(t, expectedLogs, logs.AllUntimed())
})

t.Run("Nil metric", func(t *testing.T) {
cwMetrics := getCWMetrics(nil, namespace, instrumentationLibName, config)
assert.Nil(t, cwMetrics)
})
}

func TestBuildCWMetric(t *testing.T) {
Expand Down Expand Up @@ -1400,20 +1452,28 @@ func BenchmarkTranslateOtToCWMetricWithInstrLibrary(b *testing.B) {
ilm := ilms.At(0)
ilm.InstrumentationLibrary().InitEmpty()
ilm.InstrumentationLibrary().SetName("cloudwatch-lib")
config := &Config{
Namespace: "",
DimensionRollupOption: ZeroAndSingleDimensionRollup,
}

b.ResetTimer()
for n := 0; n < b.N; n++ {
TranslateOtToCWMetric(&rm, ZeroAndSingleDimensionRollup, "")
TranslateOtToCWMetric(&rm, config)
}
}

func BenchmarkTranslateOtToCWMetricWithoutInstrLibrary(b *testing.B) {
md := createMetricTestData()
rm := internaldata.OCToMetrics(md).ResourceMetrics().At(0)
config := &Config{
Namespace: "",
DimensionRollupOption: ZeroAndSingleDimensionRollup,
}

b.ResetTimer()
for n := 0; n < b.N; n++ {
TranslateOtToCWMetric(&rm, ZeroAndSingleDimensionRollup, "")
TranslateOtToCWMetric(&rm, config)
}
}

Expand All @@ -1430,10 +1490,14 @@ func BenchmarkTranslateOtToCWMetricWithNamespace(b *testing.B) {
Metrics: []*metricspb.Metric{},
}
rm := internaldata.OCToMetrics(md).ResourceMetrics().At(0)
config := &Config{
Namespace: "",
DimensionRollupOption: ZeroAndSingleDimensionRollup,
}

b.ResetTimer()
for n := 0; n < b.N; n++ {
TranslateOtToCWMetric(&rm, ZeroAndSingleDimensionRollup, "")
TranslateOtToCWMetric(&rm, config)
}
}

Expand Down