diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/Constants.java b/src/main/java/software/amazon/cloudwatchlogs/emf/Constants.java index 81e62c39..64faecf7 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/Constants.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/Constants.java @@ -22,4 +22,6 @@ public class Constants { public static final String UNKNOWN = "Unknown"; public static final int MAX_METRICS_PER_EVENT = 100; + + public static final int MAX_DATAPOINTS_PER_METRIC = 100; } diff --git a/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricsContext.java b/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricsContext.java index b745e7b1..3d56bdf5 100644 --- a/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricsContext.java +++ b/src/main/java/software/amazon/cloudwatchlogs/emf/model/MetricsContext.java @@ -203,31 +203,52 @@ public MetricsContext createCopyWithContext() { * metrics in one log event. If there're more than 100 metrics, we split the metrics into * multiple log events. * + *

If a metric has more than 100 data points, we also split the metric. + * * @return the serialized strings. * @throws JsonProcessingException if there's any object that cannot be serialized */ public List serialize() throws JsonProcessingException { - if (rootNode.metrics().size() <= Constants.MAX_METRICS_PER_EVENT) { + if (rootNode.metrics().size() <= Constants.MAX_METRICS_PER_EVENT + && !anyMetricWithTooManyDataPoints(rootNode)) { return Arrays.asList(this.rootNode.serialize()); } else { List nodes = new ArrayList<>(); Map metrics = new HashMap<>(); - int count = 0; - for (MetricDefinition metric : rootNode.metrics().values()) { - metrics.put(metric.getName(), metric); - count++; + Queue metricDefinitions = + new LinkedList<>(rootNode.metrics().values()); + while (metricDefinitions.size() > 0) { + MetricDefinition metric = metricDefinitions.poll(); + if (metrics.size() == Constants.MAX_METRICS_PER_EVENT - || count == rootNode.metrics().size()) { - Metadata metadata = rootNode.getAws(); - MetricDirective metricDirective = metadata.getCloudWatchMetrics().get(0); - Metadata clonedMetadata = - metadata.withCloudWatchMetrics( - Arrays.asList(metricDirective.withMetrics(metrics))); - nodes.add(rootNode.withAws(clonedMetadata)); + || metrics.containsKey(metric.getName())) { + nodes.add(buildRootNode(metrics)); metrics = new HashMap<>(); } - } + if (metric.getValues().size() <= Constants.MAX_DATAPOINTS_PER_METRIC) { + metrics.put(metric.getName(), metric); + } else { + metrics.put( + metric.getName(), + new MetricDefinition( + metric.getName(), + metric.getUnit(), + metric.getValues() + .subList(0, Constants.MAX_DATAPOINTS_PER_METRIC))); + metricDefinitions.offer( + new MetricDefinition( + metric.getName(), + metric.getUnit(), + metric.getValues() + .subList( + Constants.MAX_DATAPOINTS_PER_METRIC, + metric.getValues().size()))); + } + } + if (!metrics.isEmpty()) { + nodes.add(buildRootNode(metrics)); + } List strings = new ArrayList<>(); for (RootNode node : nodes) { strings.add(node.serialize()); @@ -235,4 +256,18 @@ public List serialize() throws JsonProcessingException { return strings; } } + + private RootNode buildRootNode(Map metrics) { + Metadata metadata = rootNode.getAws(); + MetricDirective metricDirective = metadata.getCloudWatchMetrics().get(0); + Metadata clonedMetadata = + metadata.withCloudWatchMetrics(Arrays.asList(metricDirective.withMetrics(metrics))); + return rootNode.withAws(clonedMetadata); + } + + private boolean anyMetricWithTooManyDataPoints(RootNode node) { + return node.metrics().values().stream() + .anyMatch( + metric -> metric.getValues().size() > Constants.MAX_DATAPOINTS_PER_METRIC); + } } diff --git a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextTest.java b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextTest.java index 367de33d..feda1211 100644 --- a/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextTest.java +++ b/src/test/java/software/amazon/cloudwatchlogs/emf/model/MetricsContextTest.java @@ -18,14 +18,17 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.json.JsonMapper; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import org.junit.Test; +import software.amazon.cloudwatchlogs.emf.Constants; public class MetricsContextTest { @@ -75,6 +78,59 @@ public void testSerializeMoreThen100Metrics() throws JsonProcessingException { } } + @Test + public void testSerializeAMetricWith101DataPoints() throws JsonProcessingException { + MetricsContext mc = new MetricsContext(); + int dataPointCount = 101; + int expectedEventCount = 2; + String metricName = "metric"; + for (int i = 0; i < dataPointCount; i++) { + mc.putMetric(metricName, i); + } + + List events = mc.serialize(); + assertEquals(expectedEventCount, events.size()); + List allMetrics = new ArrayList<>(); + for (String event : events) { + allMetrics.addAll(parseMetrics(event)); + } + List expectedValues = new ArrayList<>(); + for (int i = 0; i < Constants.MAX_DATAPOINTS_PER_METRIC; i++) { + expectedValues.add((double) i); + } + assertEquals(expectedValues, allMetrics.get(0).getValues()); + assertTrue(allMetrics.get(1).getValues().equals(Arrays.asList(100.0))); + } + + @Test + public void testSerializeMetricsWith101DataPoints() throws JsonProcessingException { + MetricsContext mc = new MetricsContext(); + int dataPointCount = 101; + int expectedEventCount = 2; + String metricName = "metric1"; + for (int i = 0; i < dataPointCount; i++) { + mc.putMetric(metricName, i); + } + mc.putMetric("metric2", 2); + + List events = mc.serialize(); + assertEquals(expectedEventCount, events.size()); + + List metricsFromEvent1 = parseMetrics(events.get(0)); + List metricsFromEvent2 = parseMetrics(events.get(1)); + + assertEquals(2, metricsFromEvent1.size()); + List expectedValues = new ArrayList<>(); + for (int i = 0; i < Constants.MAX_DATAPOINTS_PER_METRIC; i++) { + expectedValues.add((double) i); + } + assertEquals(expectedValues, metricsFromEvent1.get(0).getValues()); + assertEquals(Arrays.asList(2.0), metricsFromEvent1.get(1).getValues()); + + assertEquals(1, metricsFromEvent2.size()); + assertEquals(Arrays.asList(100.0), metricsFromEvent2.get(0).getValues()); + } + @Test public void testSerializeZeroMetric() throws JsonProcessingException { MetricsContext mc = new MetricsContext(); @@ -106,8 +162,12 @@ private ArrayList parseMetrics(String event) throws JsonProces for (Map metric : metrics) { String name = metric.get("Name"); Unit unit = Unit.fromValue(metric.get("Unit")); - double value = (double) metadata_map.get(name); - metricDefinitions.add(new MetricDefinition(name, unit, value)); + Object value = metadata_map.get(name); + if (value instanceof ArrayList) { + metricDefinitions.add(new MetricDefinition(name, unit, (ArrayList) value)); + } else { + metricDefinitions.add(new MetricDefinition(name, unit, (double) value)); + } } return metricDefinitions; }