diff --git a/.chloggen/tailsamplingprocessor-performance-rework.yaml b/.chloggen/tailsamplingprocessor-performance-rework.yaml new file mode 100644 index 0000000000000..492374b581043 --- /dev/null +++ b/.chloggen/tailsamplingprocessor-performance-rework.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: tailsamplingprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Reworked the consume traces, sampling decision, and policy loading paths to improve performance and readability" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37560] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index c6aa08e96d235..e96a55483ce2b 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -249,34 +249,40 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicy(cfgs []PolicyCfg) error telemetrySettings := tsp.set.TelemetrySettings componentID := tsp.set.ID.Name() - policyNames := map[string]bool{} - tsp.policies = make([]*policy, len(cfgs)) + cLen := len(cfgs) + policies := make([]*policy, 0, cLen) + policyNames := make(map[string]struct{}, cLen) - for i := range cfgs { - policyCfg := &cfgs[i] + for _, cfg := range cfgs { + if cfg.Name == "" { + return fmt.Errorf("policy name cannot be empty") + } - if policyNames[policyCfg.Name] { - return fmt.Errorf("duplicate policy name %q", policyCfg.Name) + if _, exists := policyNames[cfg.Name]; exists { + return fmt.Errorf("duplicate policy name %q", cfg.Name) } - policyNames[policyCfg.Name] = true + policyNames[cfg.Name] = struct{}{} - eval, err := getPolicyEvaluator(telemetrySettings, policyCfg) + eval, err := getPolicyEvaluator(telemetrySettings, &cfg) if err != nil { - return err + return fmt.Errorf("failed to create policy evaluator for %q: %w", cfg.Name, err) } - uniquePolicyName := policyCfg.Name + + uniquePolicyName := cfg.Name if componentID != "" { - uniquePolicyName = fmt.Sprintf("%s.%s", componentID, policyCfg.Name) + uniquePolicyName = fmt.Sprintf("%s.%s", componentID, cfg.Name) } - p := &policy{ - name: policyCfg.Name, + + policies = append(policies, &policy{ + name: cfg.Name, evaluator: eval, attribute: metric.WithAttributes(attribute.String("policy", uniquePolicyName)), - } - tsp.policies[i] = p + }) } - tsp.logger.Debug("Loaded sampling policy", zap.Int("policies.len", len(tsp.policies))) + tsp.policies = policies + + tsp.logger.Debug("Loaded sampling policy", zap.Int("policies.len", len(policies))) return nil } @@ -302,9 +308,6 @@ func (tsp *tailSamplingSpanProcessor) loadPendingSamplingPolicy() { tsp.logger.Debug("Loading pending sampling policy", zap.Int("pending.len", pLen)) - // In case something goes wrong. - prev := tsp.policies - err := tsp.loadSamplingPolicy(tsp.pendingPolicy) // Empty pending regardless of error. If policy is invalid, it will fail on @@ -313,20 +316,22 @@ func (tsp *tailSamplingSpanProcessor) loadPendingSamplingPolicy() { if err != nil { tsp.logger.Error("Failed to load pending sampling policy", zap.Error(err)) - tsp.logger.Debug("Falling back to previous sampling policy") - tsp.policies = prev + tsp.logger.Debug("Continuing to use the previously loaded sampling policy") } } func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { + tsp.logger.Debug("Sampling Policy Evaluation ticked") + tsp.loadPendingSamplingPolicy() + ctx := context.Background() metrics := policyMetrics{} - startTime := time.Now() + batch, _ := tsp.decisionBatcher.CloseCurrentAndTakeFirstBatch() batchLen := len(batch) - tsp.logger.Debug("Sampling Policy Evaluation ticked") + for _, id := range batch { d, ok := tsp.idToTrace.Load(id) if !ok { @@ -337,9 +342,8 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { trace.DecisionTime = time.Now() decision := tsp.makeDecision(id, trace, &metrics) + tsp.telemetry.ProcessorTailSamplingSamplingDecisionTimerLatency.Record(tsp.ctx, int64(time.Since(startTime)/time.Microsecond)) - tsp.telemetry.ProcessorTailSamplingSamplingTraceDroppedTooEarly.Add(tsp.ctx, metrics.idNotFoundOnMapCount) - tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, metrics.evaluateErrorCount) tsp.telemetry.ProcessorTailSamplingSamplingTracesOnMemory.Record(tsp.ctx, int64(tsp.numTracesOnMap.Load())) tsp.telemetry.ProcessorTailSamplingGlobalCountTracesSampled.Add(tsp.ctx, 1, decisionToAttribute[decision]) @@ -352,12 +356,15 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { switch decision { case sampling.Sampled: - tsp.releaseSampledTrace(context.Background(), id, allSpans) + tsp.releaseSampledTrace(ctx, id, allSpans) case sampling.NotSampled: tsp.releaseNotSampledTrace(id) } } + tsp.telemetry.ProcessorTailSamplingSamplingTraceDroppedTooEarly.Add(tsp.ctx, metrics.idNotFoundOnMapCount) + tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, metrics.evaluateErrorCount) + tsp.logger.Debug("Sampling policy evaluation completed", zap.Int("batch.len", batchLen), zap.Int64("sampled", metrics.decisionSampled), @@ -368,49 +375,47 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { } func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sampling.TraceData, metrics *policyMetrics) sampling.Decision { - finalDecision := sampling.NotSampled - samplingDecision := map[sampling.Decision]bool{ - sampling.Error: false, - sampling.Sampled: false, - sampling.NotSampled: false, - sampling.InvertSampled: false, - sampling.InvertNotSampled: false, - } - + var decisions [8]bool ctx := context.Background() - // Check all policies before making a final decision + startTime := time.Now() + + // Check all policies before making a final decision. for _, p := range tsp.policies { - policyEvaluateStartTime := time.Now() decision, err := p.evaluator.Evaluate(ctx, id, trace) - tsp.telemetry.ProcessorTailSamplingSamplingDecisionLatency.Record(ctx, int64(time.Since(policyEvaluateStartTime)/time.Microsecond), p.attribute) + latency := time.Since(startTime) + tsp.telemetry.ProcessorTailSamplingSamplingDecisionLatency.Record(ctx, int64(latency/time.Microsecond), p.attribute) + if err != nil { - samplingDecision[sampling.Error] = true + decisions[sampling.Error] = true metrics.evaluateErrorCount++ tsp.logger.Debug("Sampling policy error", zap.Error(err)) - } else { - tsp.telemetry.ProcessorTailSamplingCountTracesSampled.Add(ctx, 1, p.attribute, decisionToAttribute[decision]) - if telemetry.IsMetricStatCountSpansSampledEnabled() { - tsp.telemetry.ProcessorTailSamplingCountSpansSampled.Add(ctx, trace.SpanCount.Load(), p.attribute, decisionToAttribute[decision]) - } + continue + } - samplingDecision[decision] = true + tsp.telemetry.ProcessorTailSamplingCountTracesSampled.Add(ctx, 1, p.attribute, decisionToAttribute[decision]) + + if telemetry.IsMetricStatCountSpansSampledEnabled() { + tsp.telemetry.ProcessorTailSamplingCountSpansSampled.Add(ctx, trace.SpanCount.Load(), p.attribute, decisionToAttribute[decision]) } + + decisions[decision] = true } - // InvertNotSampled takes precedence over any other decision + var finalDecision sampling.Decision switch { - case samplingDecision[sampling.InvertNotSampled]: + case decisions[sampling.InvertNotSampled]: // InvertNotSampled takes precedence finalDecision = sampling.NotSampled - case samplingDecision[sampling.Sampled]: + case decisions[sampling.Sampled]: finalDecision = sampling.Sampled - case samplingDecision[sampling.InvertSampled] && !samplingDecision[sampling.NotSampled]: + case decisions[sampling.InvertSampled] && !decisions[sampling.NotSampled]: finalDecision = sampling.Sampled + default: + finalDecision = sampling.NotSampled } - switch finalDecision { - case sampling.Sampled: + if finalDecision == sampling.Sampled { metrics.decisionSampled++ - case sampling.NotSampled: + } else { metrics.decisionNotSampled++ } @@ -447,6 +452,8 @@ func (tsp *tailSamplingSpanProcessor) groupSpansByTraceKey(resourceSpans ptrace. } func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.ResourceSpans) { + currTime := time.Now() + // Group spans per their traceId to minimize contention on idToTrace idToSpansAndScope := tsp.groupSpansByTraceKey(resourceSpans) var newTraceIDs int64 @@ -476,33 +483,35 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc if !loaded { spanCount := &atomic.Int64{} spanCount.Store(lenSpans) - d, loaded = tsp.idToTrace.LoadOrStore(id, &sampling.TraceData{ - ArrivalTime: time.Now(), + + td := &sampling.TraceData{ + ArrivalTime: currTime, SpanCount: spanCount, ReceivedBatches: ptrace.NewTraces(), - }) + } + + if d, loaded = tsp.idToTrace.LoadOrStore(id, td); !loaded { + newTraceIDs++ + tsp.decisionBatcher.AddToCurrentBatch(id) + tsp.numTracesOnMap.Add(1) + postDeletion := false + for !postDeletion { + select { + case tsp.deleteChan <- id: + postDeletion = true + default: + traceKeyToDrop := <-tsp.deleteChan + tsp.dropTrace(traceKeyToDrop, currTime) + } + } + } } + actualData := d.(*sampling.TraceData) if loaded { actualData.SpanCount.Add(lenSpans) - } else { - newTraceIDs++ - tsp.decisionBatcher.AddToCurrentBatch(id) - tsp.numTracesOnMap.Add(1) - postDeletion := false - currTime := time.Now() - for !postDeletion { - select { - case tsp.deleteChan <- id: - postDeletion = true - default: - traceKeyToDrop := <-tsp.deleteChan - tsp.dropTrace(traceKeyToDrop, currTime) - } - } } - // The only thing we really care about here is the final decision. actualData.Lock() finalDecision := actualData.FinalDecision @@ -510,25 +519,24 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc // If the final decision hasn't been made, add the new spans under the lock. appendToTraces(actualData.ReceivedBatches, resourceSpans, spans) actualData.Unlock() - } else { - actualData.Unlock() + continue + } - switch finalDecision { - case sampling.Sampled: - // Forward the spans to the policy destinations - traceTd := ptrace.NewTraces() - appendToTraces(traceTd, resourceSpans, spans) - tsp.releaseSampledTrace(tsp.ctx, id, traceTd) - case sampling.NotSampled: - tsp.releaseNotSampledTrace(id) - default: - tsp.logger.Warn("Encountered unexpected sampling decision", - zap.Int("decision", int(finalDecision))) - } + actualData.Unlock() - if !actualData.DecisionTime.IsZero() { - tsp.telemetry.ProcessorTailSamplingSamplingLateSpanAge.Record(tsp.ctx, int64(time.Since(actualData.DecisionTime)/time.Second)) - } + switch finalDecision { + case sampling.Sampled: + traceTd := ptrace.NewTraces() + appendToTraces(traceTd, resourceSpans, spans) + tsp.releaseSampledTrace(tsp.ctx, id, traceTd) + case sampling.NotSampled: + tsp.releaseNotSampledTrace(id) + default: + tsp.logger.Warn("Unexpected sampling decision", zap.Int("decision", int(finalDecision))) + } + + if !actualData.DecisionTime.IsZero() { + tsp.telemetry.ProcessorTailSamplingSamplingLateSpanAge.Record(tsp.ctx, int64(time.Since(actualData.DecisionTime)/time.Second)) } }