Skip to content

Commit 9891ea1

Browse files
committed
Add a configurable max_export_batch_size to the gRPC metrics exporter
1 parent c9222bf commit 9891ea1

File tree

3 files changed

+391
-4
lines changed

3 files changed

+391
-4
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
- Add a configurable max_export_batch_size to the gRPC metrics exporter
11+
([#2809](https://github.com/open-telemetry/opentelemetry-python/pull/2809))
1012
- Change tracing to use `Resource.to_json()`
1113
([#2784](https://github.com/open-telemetry/opentelemetry-python/pull/2784))
1214
- Fix get_log_emitter instrumenting_module_version args typo

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py

Lines changed: 136 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
from logging import getLogger
1515
from os import environ
16-
from typing import Optional, Sequence
16+
from typing import Iterable, List, Optional, Sequence
1717
from grpc import ChannelCredentials, Compression
1818
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
1919
OTLPExporterMixin,
@@ -31,6 +31,7 @@
3131
OTEL_EXPORTER_OTLP_METRICS_INSECURE,
3232
)
3333
from opentelemetry.sdk.metrics.export import (
34+
DataPointT,
3435
Gauge,
3536
Histogram,
3637
Metric,
@@ -41,6 +42,8 @@
4142
MetricExporter,
4243
MetricExportResult,
4344
MetricsData,
45+
ResourceMetrics,
46+
ScopeMetrics,
4447
)
4548

4649
_logger = getLogger(__name__)
@@ -61,6 +64,7 @@ def __init__(
6164
headers: Optional[Sequence] = None,
6265
timeout: Optional[int] = None,
6366
compression: Optional[Compression] = None,
67+
max_export_batch_size: Optional[int] = None,
6468
):
6569

6670
if insecure is None:
@@ -79,6 +83,8 @@ def __init__(
7983
}
8084
)
8185

86+
self._max_export_batch_size: Optional[int] = max_export_batch_size
87+
8288
def _translate_data(
8389
self, data: MetricsData
8490
) -> ExportMetricsServiceRequest:
@@ -180,8 +186,9 @@ def _translate_data(
180186
)
181187
pb2_metric.sum.data_points.append(pt)
182188
else:
183-
_logger.warn(
184-
"unsupported datapoint type %s", metric.point
189+
_logger.warning(
190+
"unsupported data type %s",
191+
metric.data.__class__.__name__,
185192
)
186193
continue
187194

@@ -202,7 +209,132 @@ def export(
202209
**kwargs,
203210
) -> MetricExportResult:
204211
# TODO(#2663): OTLPExporterMixin should pass timeout to gRPC
205-
return self._export(metrics_data)
212+
if self._max_export_batch_size is None:
213+
return self._export(data=metrics_data)
214+
215+
export_result = MetricExportResult.SUCCESS
216+
217+
for split_metrics_data in self._split_metrics_data(metrics_data):
218+
split_export_result = self._export(data=split_metrics_data)
219+
220+
if split_export_result is MetricExportResult.FAILURE:
221+
export_result = MetricExportResult.FAILURE
222+
223+
return export_result
224+
225+
def _split_metrics_data(
226+
self,
227+
metrics_data: MetricsData,
228+
) -> Iterable[MetricsData]:
229+
batch_size: int = 0
230+
split_resource_metrics: List[ResourceMetrics] = []
231+
232+
for resource_metrics in metrics_data.resource_metrics:
233+
split_scope_metrics: List[ScopeMetrics] = []
234+
split_resource_metrics.append(
235+
ResourceMetrics(
236+
resource=resource_metrics.resource,
237+
schema_url=resource_metrics.schema_url,
238+
scope_metrics=split_scope_metrics,
239+
)
240+
)
241+
for scope_metrics in resource_metrics.scope_metrics:
242+
split_metrics: List[Metric] = []
243+
split_scope_metrics.append(
244+
ScopeMetrics(
245+
scope=scope_metrics.scope,
246+
schema_url=scope_metrics.schema_url,
247+
metrics=split_metrics,
248+
)
249+
)
250+
for metric in scope_metrics.metrics:
251+
split_data_points: List[DataPointT] = []
252+
split_metrics.append(
253+
self._create_metric_copy(
254+
metric=metric,
255+
data_points=split_data_points,
256+
)
257+
)
258+
259+
for data_point in metric.data.data_points:
260+
split_data_points.append(data_point)
261+
batch_size += 1
262+
263+
if batch_size >= self._max_export_batch_size:
264+
yield MetricsData(
265+
resource_metrics=split_resource_metrics
266+
)
267+
# Reset all the variables
268+
batch_size = 0
269+
split_data_points = []
270+
split_metrics = [
271+
self._create_metric_copy(
272+
metric=metric,
273+
data_points=split_data_points,
274+
),
275+
]
276+
split_scope_metrics = [
277+
ScopeMetrics(
278+
scope=scope_metrics.scope,
279+
schema_url=scope_metrics.schema_url,
280+
metrics=split_metrics,
281+
)
282+
]
283+
split_resource_metrics = [
284+
ResourceMetrics(
285+
resource=resource_metrics.resource,
286+
schema_url=resource_metrics.schema_url,
287+
scope_metrics=split_scope_metrics,
288+
)
289+
]
290+
291+
if not split_data_points:
292+
# If data_points is empty remove the whole metric
293+
split_metrics.pop()
294+
295+
if not split_metrics:
296+
# If metrics is empty remove the whole scope_metrics
297+
split_scope_metrics.pop()
298+
299+
if not split_scope_metrics:
300+
# If scope_metrics is empty remove the whole resource_metrics
301+
split_resource_metrics.pop()
302+
303+
if batch_size > 0:
304+
yield MetricsData(resource_metrics=split_resource_metrics)
305+
306+
@staticmethod
307+
def _create_metric_copy(
308+
metric: Metric,
309+
data_points: List[DataPointT],
310+
) -> Metric:
311+
if isinstance(metric.data, Sum):
312+
empty_data = Sum(
313+
aggregation_temporality=metric.data.aggregation_temporality,
314+
is_monotonic=metric.data.is_monotonic,
315+
data_points=data_points,
316+
)
317+
elif isinstance(metric.data, Gauge):
318+
empty_data = Gauge(
319+
data_points=data_points,
320+
)
321+
elif isinstance(metric.data, Histogram):
322+
empty_data = Histogram(
323+
aggregation_temporality=metric.data.aggregation_temporality,
324+
data_points=data_points,
325+
)
326+
else:
327+
_logger.warning(
328+
"unsupported data type %s", metric.data.__class__.__name__
329+
)
330+
empty_data = None
331+
332+
return Metric(
333+
name=metric.name,
334+
description=metric.description,
335+
unit=metric.unit,
336+
data=empty_data,
337+
)
206338

207339
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
208340
pass

0 commit comments

Comments
 (0)