Skip to content

SFx exporter: add delta translator #839

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Sep 2, 2020
1 change: 1 addition & 0 deletions exporter/signalfxexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/Azure/go-autorest/autorest/adal v0.9.0 // indirect
github.com/census-instrumentation/opencensus-proto v0.3.0
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.4.2
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver v0.0.0-00010101000000-000000000000
github.com/signalfx/com_signalfx_metrics_protobuf v0.0.1
Expand Down
86 changes: 86 additions & 0 deletions exporter/signalfxexporter/translation/delta_translator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package translation

import (
"github.com/gogo/protobuf/proto"
sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model"
)

type deltaTranslator struct {
prevPts map[string]*sfxpb.DataPoint
}

func newDeltaTranslator() *deltaTranslator {
return &deltaTranslator{prevPts: map[string]*sfxpb.DataPoint{}}
}

func (t *deltaTranslator) translate(processedDataPoints []*sfxpb.DataPoint, tr Rule) []*sfxpb.DataPoint {
for _, currPt := range processedDataPoints {
deltaMetricName, ok := tr.Mapping[currPt.Metric]
if !ok {
// only metrics defined in Rule.Mapping get translated
continue
}
deltaPt := t.deltaPt(deltaMetricName, currPt)
if deltaPt == nil {
continue
}
processedDataPoints = append(processedDataPoints, deltaPt)
}
return processedDataPoints
}

func (t *deltaTranslator) deltaPt(deltaMetricName string, currPt *sfxpb.DataPoint) *sfxpb.DataPoint {
// check if we have a previous point for this metric + dimensions
dimKey := stringifyDimensions(currPt.Dimensions, nil)
fullKey := currPt.Metric + ":" + dimKey
prevPt, ok := t.prevPts[fullKey]
t.prevPts[fullKey] = currPt
if !ok {
// no previous point, so we can't calculate a delta
return nil
}
var deltaPt *sfxpb.DataPoint
if currPt.Value.DoubleValue != nil && prevPt.Value.DoubleValue != nil {
deltaPt = doublePt(currPt, prevPt, deltaMetricName)
} else if currPt.Value.IntValue != nil && prevPt.Value.IntValue != nil {
deltaPt = intPt(currPt, prevPt, deltaMetricName)
} else {
return nil
}
return deltaPt
}

func doublePt(currPt *sfxpb.DataPoint, prevPt *sfxpb.DataPoint, deltaMetricName string) *sfxpb.DataPoint {
deltaPt := basePt(currPt, deltaMetricName)
*deltaPt.Value.DoubleValue = *currPt.Value.DoubleValue - *prevPt.Value.DoubleValue
return deltaPt
}

func intPt(currPt *sfxpb.DataPoint, prevPt *sfxpb.DataPoint, deltaMetricName string) *sfxpb.DataPoint {
deltaPt := basePt(currPt, deltaMetricName)
*deltaPt.Value.IntValue = *currPt.Value.IntValue - *prevPt.Value.IntValue
return deltaPt
}

var cumulativeCounterType = sfxpb.MetricType_CUMULATIVE_COUNTER

func basePt(currPt *sfxpb.DataPoint, deltaMetricName string) *sfxpb.DataPoint {
deltaPt := proto.Clone(currPt).(*sfxpb.DataPoint)
deltaPt.Metric = deltaMetricName
deltaPt.MetricType = &cumulativeCounterType
return deltaPt
}
32 changes: 24 additions & 8 deletions exporter/signalfxexporter/translation/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ const (

// ActionDropMetrics drops datapoints with metric name defined in "metric_names".
ActionDropMetrics Action = "drop_metrics"

// ActionDeltaMetric creates a new delta (cumulative) metric from an existing non-cumulative int or double
// metric. It takes mappings of names of the existing metrics to the names of the new, delta metrics to be
// created. All dimensions will be preserved.
ActionDeltaMetric Action = "delta_metric"
)

type MetricOperator string
Expand Down Expand Up @@ -193,6 +198,8 @@ type MetricTranslator struct {

// Additional map to be used only for dimension renaming in metadata
dimensionsMap map[string]string

deltaTranslator *deltaTranslator
}

func NewMetricTranslator(rules []Rule) (*MetricTranslator, error) {
Expand All @@ -202,8 +209,9 @@ func NewMetricTranslator(rules []Rule) (*MetricTranslator, error) {
}

return &MetricTranslator{
rules: rules,
dimensionsMap: createDimensionsMap(rules),
rules: rules,
dimensionsMap: createDimensionsMap(rules),
deltaTranslator: newDeltaTranslator(),
}, nil
}

Expand Down Expand Up @@ -287,7 +295,10 @@ func validateTranslationRules(rules []Rule) error {
if len(tr.MetricNames) == 0 {
return fmt.Errorf(`field "metric_names" is required for %q translation rule`, tr.Action)
}

case ActionDeltaMetric:
if len(tr.Mapping) == 0 {
return fmt.Errorf(`field "mapping" is required for %q translation rule`, tr.Action)
}
default:
return fmt.Errorf("unknown \"action\" value: %q", tr.Action)
}
Expand Down Expand Up @@ -423,6 +434,9 @@ func (mp *MetricTranslator) TranslateDataPoints(logger *zap.Logger, sfxDataPoint
}
}
processedDataPoints = processedDataPoints[:resultSliceLen]

case ActionDeltaMetric:
processedDataPoints = mp.deltaTranslator.translate(processedDataPoints, tr)
}
}

Expand Down Expand Up @@ -543,7 +557,7 @@ func aggregateDatapoints(
// group datapoints by dimension values
dimValuesToDps := make(map[string][]*sfxpb.DataPoint, len(dps))
for i, dp := range dps {
aggregationKey := getAggregationKey(dp.Dimensions, withoutDimensions)
aggregationKey := stringifyDimensions(dp.Dimensions, withoutDimensions)
if _, ok := dimValuesToDps[aggregationKey]; !ok {
// set slice capacity to the possible maximum = len(dps)-i to avoid reallocations
dimValuesToDps[aggregationKey] = make([]*sfxpb.DataPoint, 0, len(dps)-i)
Expand Down Expand Up @@ -586,13 +600,15 @@ func aggregateDatapoints(
return result
}

// getAggregationKey composes an aggregation key based on dimensions that left after excluding withoutDimensions.
// The key composed from dimensions has the following form: dim1:val1//dim2:val2
func getAggregationKey(dimensions []*sfxpb.Dimension, withoutDimensions []string) string {
// stringifyDimensions turns the passed-in `dimensions` into a string while
// ignoring the passed-in `exclusions`. The result has the following form:
// dim1:val1//dim2:val2. Order is deterministic so this function can be used to
// generate map keys.
func stringifyDimensions(dimensions []*sfxpb.Dimension, exclusions []string) string {
const aggregationKeyDelimiter = "//"
var aggregationKeyParts = make([]string, 0, len(dimensions))
for _, d := range dimensions {
if !dimensionIn(d, withoutDimensions) {
if !dimensionIn(d, exclusions) {
aggregationKeyParts = append(aggregationKeyParts, fmt.Sprintf("%s:%s", d.Key, d.Value))
}
}
Expand Down
196 changes: 196 additions & 0 deletions exporter/signalfxexporter/translation/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import (
"testing"
"time"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
"github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/ptypes/timestamp"
sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
)
Expand Down Expand Up @@ -460,6 +463,15 @@ func TestNewMetricTranslator(t *testing.T) {
wantDimensionsMap: nil,
wantError: `field "metric_names" is required for "drop_metrics" translation rule`,
},
{
name: "delta_metric_invalid",
trs: []Rule{
{
Action: ActionDeltaMetric,
},
},
wantError: `field "mapping" is required for "delta_metric" translation rule`,
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -2073,3 +2085,187 @@ func TestDimensionsEqual(t *testing.T) {
})
}
}

func TestDeltaMetricDouble(t *testing.T) {
const delta1 = 3
const delta2 = 5
md1 := doubleMD(10, 0)
md2 := doubleMD(20, delta1)
md3 := doubleMD(30, delta1+delta2)

pts1, pts2 := requireDeltaMetricOk(t, md1, md2, md3)
for _, pt := range pts1 {
require.EqualValues(t, delta1, *pt.Value.DoubleValue)
}
for _, pt := range pts2 {
require.EqualValues(t, delta2, *pt.Value.DoubleValue)
}
}

func TestDeltaMetricInt(t *testing.T) {
const delta1 = 7
const delta2 = 11
md1 := intMD(10, 0)
md2 := intMD(20, delta1)
md3 := intMD(30, delta1+delta2)
pts1, pts2 := requireDeltaMetricOk(t, md1, md2, md3)
for _, pt := range pts1 {
require.EqualValues(t, delta1, *pt.Value.IntValue)
}
for _, pt := range pts2 {
require.EqualValues(t, delta2, *pt.Value.IntValue)
}
}

func TestDeltaTranslatorNoMatchingMapping(t *testing.T) {
c := converter(t, map[string]string{"foo": "bar"})
md := intMD(1, 1)
pts, _ := c.MetricDataToSignalFxV2([]consumerdata.MetricsData{md})
idx := indexPts(pts)
require.Equal(t, 1, len(idx))
}

func TestDeltaTranslatorMismatchedValueTypes(t *testing.T) {
c := converter(t, map[string]string{"system.cpu.time": "system.cpu.delta"})
md1 := baseMD()
md1.Metrics[0].Timeseries = []*metricspb.TimeSeries{
intTS("cpu0", "user", 1, 1, 1),
}
_, _ = c.MetricDataToSignalFxV2([]consumerdata.MetricsData{md1})
md2 := baseMD()
md2.Metrics[0].Timeseries = []*metricspb.TimeSeries{
dblTS("cpu0", "user", 1, 1, 1),
}
pts, _ := c.MetricDataToSignalFxV2([]consumerdata.MetricsData{md2})
idx := indexPts(pts)
require.Equal(t, 1, len(idx))
}

func requireDeltaMetricOk(t *testing.T, md1, md2, md3 consumerdata.MetricsData) (
[]*sfxpb.DataPoint, []*sfxpb.DataPoint,
) {
c := converter(t, map[string]string{"system.cpu.time": "system.cpu.delta"})

dp1, dropped1 := c.MetricDataToSignalFxV2([]consumerdata.MetricsData{md1})
require.Equal(t, 0, dropped1)
m1 := indexPts(dp1)
require.Equal(t, 1, len(m1))

dp2, dropped2 := c.MetricDataToSignalFxV2([]consumerdata.MetricsData{md2})
require.Equal(t, 0, dropped2)
m2 := indexPts(dp2)
require.Equal(t, 2, len(m2))

origPts, ok := m2["system.cpu.time"]
require.True(t, ok)

deltaPts1, ok := m2["system.cpu.delta"]
require.True(t, ok)
require.Equal(t, len(origPts), len(deltaPts1))
counterType := sfxpb.MetricType_CUMULATIVE_COUNTER
for _, pt := range deltaPts1 {
require.Equal(t, &counterType, pt.MetricType)
}

dp3, dropped3 := c.MetricDataToSignalFxV2([]consumerdata.MetricsData{md3})
require.Equal(t, 0, dropped3)
m3 := indexPts(dp3)
require.Equal(t, 2, len(m3))

deltaPts2, ok := m3["system.cpu.delta"]
require.True(t, ok)
require.Equal(t, len(origPts), len(deltaPts2))
for _, pt := range deltaPts2 {
require.Equal(t, &counterType, pt.MetricType)
}
return deltaPts1, deltaPts2
}

func converter(t *testing.T, mapping map[string]string) *MetricsConverter {
rules := []Rule{{
Action: ActionDeltaMetric,
Mapping: mapping,
}}
tr, err := NewMetricTranslator(rules)
require.NoError(t, err)

c := NewMetricsConverter(zap.NewNop(), tr)
return c
}

func indexPts(pts []*sfxpb.DataPoint) map[string][]*sfxpb.DataPoint {
m := map[string][]*sfxpb.DataPoint{}
for _, pt := range pts {
l := m[pt.Metric]
m[pt.Metric] = append(l, pt)
}
return m
}

func doubleMD(secondsDelta int64, valueDelta float64) consumerdata.MetricsData {
md := baseMD()
md.Metrics[0].Timeseries = []*metricspb.TimeSeries{
dblTS("cpu0", "user", secondsDelta, 100, valueDelta),
dblTS("cpu0", "system", secondsDelta, 200, valueDelta),
dblTS("cpu0", "idle", secondsDelta, 300, valueDelta),
dblTS("cpu1", "user", secondsDelta, 111, valueDelta),
dblTS("cpu1", "system", secondsDelta, 222, valueDelta),
dblTS("cpu1", "idle", secondsDelta, 333, valueDelta),
}
return md
}

func intMD(secondsDelta int64, valueDelta int64) consumerdata.MetricsData {
md := baseMD()
md.Metrics[0].Timeseries = []*metricspb.TimeSeries{
intTS("cpu0", "user", secondsDelta, 100, valueDelta),
intTS("cpu0", "system", secondsDelta, 200, valueDelta),
intTS("cpu0", "idle", secondsDelta, 300, valueDelta),
intTS("cpu1", "user", secondsDelta, 111, valueDelta),
intTS("cpu1", "system", secondsDelta, 222, valueDelta),
intTS("cpu1", "idle", secondsDelta, 333, valueDelta),
}
return md
}

func baseMD() consumerdata.MetricsData {
return consumerdata.MetricsData{
Metrics: []*metricspb.Metric{{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: "system.cpu.time",
Unit: "s",
Type: 5,
LabelKeys: []*metricspb.LabelKey{
{Key: "cpu"},
{Key: "state"},
},
},
}},
}
}

func dblTS(lbl0 string, lbl1 string, secondsDelta int64, v float64, valueDelta float64) *metricspb.TimeSeries {
ts := baseTS(lbl0, lbl1, secondsDelta)
ts.Points[0].Value = &metricspb.Point_DoubleValue{DoubleValue: v + valueDelta}
return ts
}

func intTS(lbl0 string, lbl1 string, secondsDelta int64, v int64, valueDelta int64) *metricspb.TimeSeries {
ts := baseTS(lbl0, lbl1, secondsDelta)
ts.Points[0].Value = &metricspb.Point_Int64Value{Int64Value: v + valueDelta}
return ts
}

func baseTS(lbl0 string, lbl1 string, secondsDelta int64) *metricspb.TimeSeries {
const startTime = 1600000000
return &metricspb.TimeSeries{
StartTimestamp: &timestamp.Timestamp{Seconds: startTime},
LabelValues: []*metricspb.LabelValue{
{Value: lbl0, HasValue: true},
{Value: lbl1, HasValue: true},
},
Points: []*metricspb.Point{{
Timestamp: &timestamp.Timestamp{Seconds: startTime + secondsDelta},
}},
}
}