Skip to content

Commit ad7aa68

Browse files
lucaspimentelandrewlockJohan Gustafsson
authored
[Tracer] Fix Rabbitmq7 header injection overwrites user supplied basic properties [from external PR #6730] (#6753)
Based on #6730 by @johang88. ## Summary of changes Fixes the header injection so that basic properties are preserved from the supplied argument list in case `PopulateBasicPropertiesHeaders` returns null which happens if the basic properties argument is writable or if no changes had to be made to it. ## Reason for change Fixes a bug that would remove all user supplied basic properties. ## Implementation details Stores the basicproperties argument in the active scope and retrieves it a null return value is encountered. Changed `CachedBasicPropertiesHelper` to call the copy constructor that takes a `IReadOnlyBasicProperties` as argument. ## Test coverage Added basic properties to one of the rabbitmq7 test methods as well as assertions. ## Other details Fixes #6723 --------- Co-authored-by: Andrew Lock <[email protected]> Co-authored-by: Johan Gustafsson <[email protected]>
1 parent 07867ee commit ad7aa68

File tree

5 files changed

+202
-14
lines changed

5 files changed

+202
-14
lines changed
Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,51 @@
1-
// <copyright file="CachedBasicPropertiesHelper.cs" company="Datadog">
1+
// <copyright file="CachedBasicPropertiesHelper.cs" company="Datadog">
22
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
33
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
44
// </copyright>
55

66
#nullable enable
7-
using Datadog.Trace.Util;
7+
using System;
8+
using System.Reflection.Emit;
9+
using Datadog.Trace.DuckTyping;
10+
using Datadog.Trace.Logging;
811

912
namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.RabbitMQ;
1013

1114
internal static class CachedBasicPropertiesHelper<TBasicProperties>
1215
{
13-
// ReSharper disable once StaticMemberInGenericType
14-
private static readonly ActivatorHelper Activator;
16+
private static readonly Func<object, TBasicProperties>? Activator;
1517

1618
static CachedBasicPropertiesHelper()
1719
{
18-
Activator = new ActivatorHelper(typeof(TBasicProperties).Assembly.GetType("RabbitMQ.Client.BasicProperties")!);
20+
try
21+
{
22+
var targetType = typeof(TBasicProperties).Assembly.GetType("RabbitMQ.Client.BasicProperties")!;
23+
var parameterType = typeof(TBasicProperties).Assembly.GetType("RabbitMQ.Client.IReadOnlyBasicProperties")!;
24+
25+
var constructor = targetType.GetConstructor([parameterType])!;
26+
27+
var createBasicPropertiesMethod = new DynamicMethod(
28+
$"TypeActivator_{targetType.Name}_{parameterType.Name}",
29+
targetType,
30+
[typeof(object)],
31+
typeof(DuckType).Module,
32+
skipVisibility: true);
33+
34+
var il = createBasicPropertiesMethod.GetILGenerator();
35+
il.Emit(OpCodes.Ldarg_0); // Load first argument (as object).
36+
il.Emit(OpCodes.Castclass, parameterType); // Cast to IReadOnlyBasicProperties
37+
il.Emit(OpCodes.Newobj, constructor); // Call constructor
38+
il.Emit(OpCodes.Ret); // Return new instance
39+
40+
Activator = (Func<object, TBasicProperties>)createBasicPropertiesMethod.CreateDelegate(typeof(Func<object, TBasicProperties>));
41+
}
42+
catch (Exception ex)
43+
{
44+
var log = DatadogLogging.GetLoggerFor(typeof(CachedBasicPropertiesHelper<TBasicProperties>));
45+
log.Error(ex, "Failed to create activator for {TBasicProperties}", typeof(TBasicProperties).FullName);
46+
}
1947
}
2048

21-
public static TBasicProperties CreateHeaders()
22-
=> (TBasicProperties)Activator.CreateInstance();
49+
public static TBasicProperties? CreateHeaders(object readonlyBasicProperties)
50+
=> Activator is null ? default : Activator(readonlyBasicProperties);
2351
}

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/RabbitMQ/PopulateBasicPropertiesHeadersIntegration.cs

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// <copyright file="PopulateBasicPropertiesHeadersIntegration.cs" company="Datadog">
1+
// <copyright file="PopulateBasicPropertiesHeadersIntegration.cs" company="Datadog">
22
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
33
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
44
// </copyright>
@@ -10,6 +10,7 @@
1010
using System.ComponentModel;
1111
using Datadog.Trace.ClrProfiler.CallTarget;
1212
using Datadog.Trace.DuckTyping;
13+
using Datadog.Trace.Logging;
1314
using Datadog.Trace.Propagators;
1415
using Datadog.Trace.Tagging;
1516

@@ -31,6 +32,13 @@ namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.RabbitMQ;
3132
[EditorBrowsable(EditorBrowsableState.Never)]
3233
public class PopulateBasicPropertiesHeadersIntegration
3334
{
35+
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor<PopulateBasicPropertiesHeadersIntegration>();
36+
37+
internal static CallTargetState OnMethodBegin<TTarget, TBasicProperties, TActivity>(TTarget instance, TBasicProperties basicProperties, TActivity sendActivity, ulong publishSequenceNumber)
38+
{
39+
return new CallTargetState(null, basicProperties);
40+
}
41+
3442
internal static CallTargetReturn<TReturn?> OnMethodEnd<TTarget, TReturn>(TTarget instance, TReturn returnValue, Exception? exception, in CallTargetState state)
3543
{
3644
var tracer = Tracer.Instance;
@@ -45,8 +53,36 @@ public class PopulateBasicPropertiesHeadersIntegration
4553
return new CallTargetReturn<TReturn?>(returnValue);
4654
}
4755

48-
returnValue ??= CachedBasicPropertiesHelper<TReturn>.CreateHeaders();
49-
var duckType = returnValue.DuckCast<IBasicProperties>()!;
56+
// TReturn is type RabbitMQ.Client.BasicProperties
57+
TReturn? basicProperties = default;
58+
59+
// PopulateBasicPropertiesHeaders returns null if the supplied IReadOnlyBasicProperties
60+
// does not have to be modified or if it's a writable instance.
61+
// If that is the case then we have to fetch IReadOnlyBasicProperties from the argument
62+
// list instead of creating a new instance that overwrites the supplied properties.
63+
if (returnValue is null)
64+
{
65+
// state.State is of type RabbitMQ.Client.IReadOnlyBasicProperties
66+
if (state.State is TReturn writable)
67+
{
68+
// Use the existing BasicProperties if it's already of type RabbitMQ.Client.BasicProperties
69+
basicProperties = writable;
70+
}
71+
else if (state.State is null)
72+
{
73+
// This case cannot happen as argument is not nullable.
74+
Log.Warning("Invalid state: PopulateBasicPropertiesHeaders() is returning null and CallTargetState.State has type {Type}", state.State?.GetType().FullName ?? "null");
75+
return new CallTargetReturn<TReturn?>(returnValue);
76+
}
77+
else
78+
{
79+
// create new BasicProperties using the BasicProperties(IReadOnlyBasicProperties) copy constructor
80+
basicProperties = CachedBasicPropertiesHelper<TReturn>.CreateHeaders(state.State!);
81+
}
82+
}
83+
84+
// duck cast so we can access the Headers property
85+
var duckType = basicProperties.DuckCast<IBasicProperties>()!;
5086

5187
// add distributed tracing headers to the message
5288
duckType.Headers ??= new Dictionary<string, object>();
@@ -61,6 +97,6 @@ public class PopulateBasicPropertiesHeadersIntegration
6197
duckType.Headers,
6298
int.TryParse(tags.MessageSize, out var bodyLength) ? bodyLength : 0);
6399

64-
return new CallTargetReturn<TReturn?>(returnValue);
100+
return new CallTargetReturn<TReturn?>(basicProperties);
65101
}
66102
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// <copyright file="CachedBasicPropertiesHelperTests.cs" company="Datadog">
2+
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
4+
// </copyright>
5+
6+
using System;
7+
using System.Collections.Generic;
8+
using Datadog.Trace.ClrProfiler.AutoInstrumentation.RabbitMQ;
9+
using FluentAssertions;
10+
using RabbitMQ.Client;
11+
using Xunit;
12+
13+
namespace Datadog.Trace.ClrProfiler.Managed.Tests.AutoInstrumentation.RabbitMQ;
14+
15+
public class CachedBasicPropertiesHelperTests
16+
{
17+
[Fact]
18+
public void CreateHeaders_With_Null_BasicProperties_Throws()
19+
{
20+
// the BasicProperties(IReadOnlyBasicProperties) constructor throws NullReferenceException if parameter is null
21+
var func = () => CachedBasicPropertiesHelper<BasicProperties>.CreateHeaders(null!);
22+
func.Should().ThrowExactly<NullReferenceException>();
23+
}
24+
25+
[Fact]
26+
public void CreateHeaders_BasicProperties_With_Null_Headers()
27+
{
28+
var originalProperties = new BasicProperties();
29+
30+
var newProperties = CachedBasicPropertiesHelper<BasicProperties>.CreateHeaders(originalProperties);
31+
32+
newProperties.Should().NotBeNull();
33+
newProperties.Headers.Should().BeNull();
34+
}
35+
36+
[Fact]
37+
public void CreateHeaders_BasicProperties_With_Empty_Headers()
38+
{
39+
var headers = new Dictionary<string, object>();
40+
41+
var originalProperties = new BasicProperties
42+
{
43+
Headers = headers
44+
};
45+
46+
var newProperties = CachedBasicPropertiesHelper<BasicProperties>.CreateHeaders(originalProperties);
47+
48+
newProperties.Should().NotBeNull();
49+
newProperties.Headers.Should().BeSameAs(originalProperties.Headers)
50+
.And.Subject.Should().BeEmpty();
51+
}
52+
53+
[Fact]
54+
public void CreateHeaders_BasicProperties_With_Headers()
55+
{
56+
var originalProperties = new BasicProperties();
57+
originalProperties.Headers ??= new Dictionary<string, object>();
58+
originalProperties.Headers["key1"] = "value1";
59+
60+
var newProperties = CachedBasicPropertiesHelper<BasicProperties>.CreateHeaders(originalProperties);
61+
62+
newProperties.Should().NotBeNull();
63+
newProperties.Headers.Should().BeSameAs(originalProperties.Headers)
64+
.And.Subject.Should().Contain("key1", "value1");
65+
}
66+
}

tracer/test/Datadog.Trace.ClrProfiler.Managed.Tests/Datadog.Trace.ClrProfiler.Managed.Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
<PackageReference Include="log4net" Version="$(Log4NetVersion)" />
2727
<PackageReference Include="Newtonsoft.Json" Version="13.0.2" />
2828
<PackageReference Include="NLog" Version="$(NLogVersion)" Condition=" $(TargetFramework.StartsWith('net4'))" />
29+
<PackageReference Include="RabbitMQ.Client" Version="7.1.1" />
2930
</ItemGroup>
3031

3132
<ItemGroup>

tracer/test/test-applications/integrations/Samples.RabbitMQ/Program.cs

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@
1111
#if RABBITMQ_7_0
1212
using IRabbitChannel = RabbitMQ.Client.IChannel;
1313
using IRabbitConsumer = RabbitMQ.Client.IAsyncBasicConsumer;
14+
using RabbitProperties = RabbitMQ.Client.BasicProperties;
1415
#else
1516
using IRabbitChannel = RabbitMQ.Client.IModel;
1617
using IRabbitConsumer = RabbitMQ.Client.IBasicConsumer;
18+
using RabbitProperties = RabbitMQ.Client.IBasicProperties;
1719
#endif
1820

1921
namespace Samples.RabbitMQ
@@ -26,6 +28,7 @@ public static class Program
2628
private static readonly string exchangeName = "test-exchange-name";
2729
private static readonly string routingKey = "test-routing-key";
2830
private static readonly string queueName = "test-queue-name";
31+
private static readonly string customHeaderName = "x-custom-header";
2932

3033
private static string Host()
3134
{
@@ -77,6 +80,8 @@ private static async Task PublishAndGet(string consumerType, bool useDefaultQueu
7780
string publishExchangeName;
7881
string publishQueueName;
7982
string publishRoutingKey;
83+
string messageId = Guid.NewGuid().ToString();
84+
string headerValue = Guid.NewGuid().ToString();
8085

8186
using (SampleHelpers.CreateScope(messagePrefix))
8287
{
@@ -103,13 +108,20 @@ private static async Task PublishAndGet(string consumerType, bool useDefaultQueu
103108
// Test an empty BasicGetResult
104109
await Helper.BasicGetAsync(channel, publishQueueName);
105110

111+
// Setup basic properties to verify instrumentation preserves properties and headers.
112+
var properties = Helper.CreateBasicProperties(channel);
113+
properties.MessageId = messageId;
114+
properties.Headers = new Dictionary<string, object>
115+
{
116+
{ customHeaderName, headerValue }
117+
};
118+
106119
// Send message to the default exchange and use new queue as the routingKey
107120
string message = $"{messagePrefix} - Message";
108121
var body = Encoding.UTF8.GetBytes(message);
109122

110-
await Helper.BasicPublishAsync(channel, exchange: publishExchangeName,
111-
routingKey: publishRoutingKey,
112-
body: body);
123+
await Helper.BasicPublishAsync(channel, publishExchangeName, publishRoutingKey, body, properties);
124+
113125
Console.WriteLine($"BasicPublish - Sent message: {message}");
114126
}
115127

@@ -123,6 +135,19 @@ await Helper.BasicPublishAsync(channel, exchange: publishExchangeName,
123135
var resultMessage = Encoding.UTF8.GetString(result.Body);
124136
#endif
125137
Console.WriteLine($"[Program.PublishAndGetDefault] BasicGet - Received message: {resultMessage}");
138+
139+
if (result.BasicProperties.MessageId != messageId)
140+
{
141+
throw new Exception("MessageId was not preserved in BasicProperties");
142+
}
143+
144+
if (result.BasicProperties.Headers is null ||
145+
!result.BasicProperties.Headers.TryGetValue(customHeaderName, out var receivedHeaderValue) ||
146+
receivedHeaderValue is not byte[] receivedHeaderValueString ||
147+
Encoding.UTF8.GetString(receivedHeaderValueString) != headerValue)
148+
{
149+
throw new Exception("Custom header was not preserved in BasicProperties");
150+
}
126151
}
127152
}
128153

@@ -466,13 +491,45 @@ public static Task BasicPublishAsync(IRabbitChannel channel, string exchange, st
466491
#endif
467492
}
468493

494+
#if RABBITMQ_6_0 || RABBITMQ_7_0
495+
public static Task BasicPublishAsync(IRabbitChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body, RabbitProperties basicProperties = null)
496+
#else
497+
public static Task BasicPublishAsync(IRabbitChannel channel, string exchange, string routingKey, byte[] body, RabbitProperties basicProperties = null)
498+
#endif
499+
{
500+
#if RABBITMQ_7_0
501+
return channel.BasicPublishAsync(
502+
exchange: exchange,
503+
routingKey: routingKey,
504+
mandatory: false,
505+
basicProperties: basicProperties,
506+
body: body)
507+
.AsTask();
508+
#else
509+
channel.BasicPublish(exchange: exchange,
510+
routingKey: routingKey,
511+
basicProperties: basicProperties,
512+
body: body);
513+
return Task.CompletedTask;
514+
#endif
515+
}
516+
469517
public static Task BasicConsumeAsync(IRabbitChannel channel, string queue, IRabbitConsumer consumer)
470518
{
471519
#if RABBITMQ_7_0
472520
return channel.BasicConsumeAsync(queue, autoAck: true, consumer);
473521
#else
474522
channel.BasicConsume(queue, true, consumer);
475523
return Task.CompletedTask;
524+
#endif
525+
}
526+
527+
public static RabbitProperties CreateBasicProperties(IRabbitChannel channel)
528+
{
529+
#if RABBITMQ_7_0
530+
return new BasicProperties();
531+
#else
532+
return channel.CreateBasicProperties();
476533
#endif
477534
}
478535
}

0 commit comments

Comments
 (0)