Skip to content

Commit

Permalink
Distributed Tracing: Fixes traceid null exception issue (#4111)
Browse files Browse the repository at this point in the history
* Fix traceid null exception issue

* Fixing merge conflicts

* Fixing merge conflicts

* Update script

* Code cleanup

* Updated change description

* updated comment description

* updated comment description

---------

Co-authored-by: Matias Quaranta <[email protected]>
  • Loading branch information
aavasthy and ealsur authored Oct 10, 2023
1 parent e2fb347 commit 4e2beda
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ public static OpenTelemetryCoreRecorder CreateRecorder(string operationName,
DiagnosticScope scope = LazyOperationScopeFactory.Value.CreateScope(name: operationName,
kind: clientContext.ClientOptions.ConnectionMode == ConnectionMode.Gateway ? DiagnosticScope.ActivityKind.Internal : DiagnosticScope.ActivityKind.Client);

// Record values only when we have a valid Diagnostic Scope
// Need a parent activity id associated with the operation which is logged in diagnostics and used for tracing purpose.
// If there are listeners at operation level then scope is enabled and it tries to create activity.
// However, if available listeners are not subscribed to operation level event then it will lead to scope being enabled but no activity is created.
if (scope.IsEnabled)
{
scope.SetDisplayName($"{operationName} {containerName}");
Expand All @@ -63,14 +65,28 @@ public static OpenTelemetryCoreRecorder CreateRecorder(string operationName,
config: requestOptions?.CosmosThresholdOptions ?? clientContext.ClientOptions?.CosmosClientTelemetryOptions.CosmosThresholdOptions);
}
#if !INTERNAL
else if (Activity.Current is null)
// Need a parent activity which groups all network activities under it and is logged in diagnostics and used for tracing purpose.
// If there are listeners at network level then scope is enabled and it tries to create activity.
// However, if available listeners are not subscribed to network event then it will lead to scope being enabled but no activity is created.
else
{
DiagnosticScope requestScope = LazyNetworkScopeFactory.Value.CreateScope(name: operationName);
openTelemetryRecorder = requestScope.IsEnabled ? OpenTelemetryCoreRecorder.CreateNetworkLevelParentActivity(networkScope: requestScope) : openTelemetryRecorder;
}

openTelemetryRecorder = requestScope.IsEnabled ? OpenTelemetryCoreRecorder.CreateNetworkLevelParentActivity(networkScope: requestScope) : OpenTelemetryCoreRecorder.CreateParentActivity(operationName);
// If there are no listeners at operation level and network level and no parent activity created.
// Then create a dummy activity as there should be a parent level activity always to send a traceid to the backend services through context propagation.
// The parent activity id is logged in diagnostics and used for tracing purpose.
if (Activity.Current is null)
{
openTelemetryRecorder = OpenTelemetryCoreRecorder.CreateParentActivity(operationName);
}
#endif
trace.AddDatum("DistributedTraceId", Activity.Current?.TraceId);
// Safety check as diagnostic logs should not break the code.
if (Activity.Current?.TraceId != null)
{
trace.AddDatum("DistributedTraceId", Activity.Current.TraceId);
}
}
return openTelemetryRecorder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Microsoft.Azure.Cosmos
using System.Diagnostics;
using Microsoft.Azure.Cosmos.Tracing;
using System.Net.Http;
using Microsoft.Azure.Cosmos.Tests;

[VisualStudio.TestTools.UnitTesting.TestClass]
public sealed class DistributedTracingOTelTests : BaseCosmosClientHelper
Expand All @@ -33,7 +34,8 @@ public void TestInitialize()
[DataTestMethod]
[DataRow($"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Request", DisplayName = "DirectMode and DistributedFlag On: Asserts activity creation at operation and network level with Diagnostic TraceId being added to logs")]
[DataRow($"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", null, DisplayName = "DirectMode and DistributedFlag On: Asserts activity creation at operation level with Diagnostic TraceId being added to logs")]
public async Task SourceEnabled_FlagOn_DirectMode_RecordsActivity_AssertLogTraceId_AssertTraceparent(string operationLevelSource, string networkLevelSource)
[DataRow(null, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Request", DisplayName = "DirectMode and DistributedFlag On: Asserts activity creation at network level with Diagnostic TraceId being added to logs")]
public async Task SourceEnabled_FlagOn_DirectMode_RecordsOperationNetworkActivity_AssertLogTraceId_AssertTraceparent(string operationLevelSource, string networkLevelSource)
{
string[] sources = new string[] { operationLevelSource, networkLevelSource };
sources = sources.Where(x => x != null).ToArray();
Expand All @@ -43,12 +45,12 @@ public async Task SourceEnabled_FlagOn_DirectMode_RecordsActivity_AssertLogTrace
.AddSource(sources)
.Build();

await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
customizeClientBuilder: (builder) => builder
.WithClientTelemetryOptions(new CosmosClientTelemetryOptions()
{
{
DisableDistributedTracing = false
})
})
.WithConnectionModeDirect());

Container containerResponse = await this.database.CreateContainerAsync(
Expand Down Expand Up @@ -106,7 +108,8 @@ await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
[DataTestMethod]
[DataRow($"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Request", DisplayName = "GatewayMode and DistributedFlag On: Asserts activity creation at operation and network level with Diagnostic TraceId being added to logs")]
[DataRow($"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", null, DisplayName = "GatewayMode and DistributedFlag On: Asserts activity creation at operation level with Diagnostic TraceId being added to logs")]
public async Task SourceEnabled_FlagOn_GatewayMode_RecordsActivity_AssertLogTraceId_AssertTraceparent(string operationLevelSource, string networkLevelSource)
[DataRow(null, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Request", DisplayName = "GatewayMode and DistributedFlag On: Asserts activity creation at network level with Diagnostic TraceId being added to logs")]
public async Task SourceEnabled_FlagOn_GatewayMode_RecordsOperationNetworkActivity_AssertLogTraceId_AssertTraceparent(string operationLevelSource, string networkLevelSource)
{
string[] sources = new string[] { operationLevelSource, networkLevelSource };
sources = sources.Where(x => x != null).ToArray();
Expand All @@ -128,12 +131,12 @@ public async Task SourceEnabled_FlagOn_GatewayMode_RecordsActivity_AssertLogTrac
.AddSource(sources)
.Build();

await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
customizeClientBuilder: (builder) => builder
.WithClientTelemetryOptions(new CosmosClientTelemetryOptions()
{
{
DisableDistributedTracing = false
})
})
.WithHttpClientFactory(() => new HttpClient(httpClientHandlerHelper))
.WithConnectionModeGateway());

Expand All @@ -142,7 +145,6 @@ await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
partitionKeyPath: "/id",
throughput: 20000);

List<Activity> b = CustomOtelExporter.CollectedActivities.ToList();
//Assert traceId in Diagnostics logs
string diagnosticsCreateContainer = containerResponse.Diagnostics.ToString();
JObject objDiagnosticsCreate = JObject.Parse(diagnosticsCreateContainer);
Expand All @@ -169,7 +171,7 @@ await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
[DataRow(true, true, "random.source.name", DisplayName = "GatewayMode, DistributedFlag Off, Random/No Source:Asserts no activity creation")]
[DataRow(false, true, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", DisplayName = "DirectMode, DistributedFlag Off, OperationLevel Source:Asserts no activity creation")]
[DataRow(true, true, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", DisplayName = "GatewayMode, DistributedFlag Off, OperationLevel Source:Asserts no activity creation")]
public async Task NoSourceEnabled_ResultsInNoSourceParentActivityCreation_AssertLogTraceId(bool useGateway, bool disableDistributingTracing, string source)
public async Task NoSourceNoFlagEnabled_ResultsInNoOperationNetworkActivityCreation_AssertLogTraceId(bool useGateway, bool disableDistributingTracing, string source)
{
using TracerProvider provider = Sdk.CreateTracerProviderBuilder()
.AddCustomOtelExporter()
Expand All @@ -178,20 +180,20 @@ public async Task NoSourceEnabled_ResultsInNoSourceParentActivityCreation_Assert

if (useGateway)
{
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
customizeClientBuilder: (builder) => builder
.WithClientTelemetryOptions(new CosmosClientTelemetryOptions()
{
{
DisableDistributedTracing = disableDistributingTracing
})
.WithConnectionModeGateway());
}
else
{
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
customizeClientBuilder: (builder) => builder
.WithClientTelemetryOptions(new CosmosClientTelemetryOptions()
{
{
DisableDistributedTracing = disableDistributingTracing
}));
}
Expand Down Expand Up @@ -221,6 +223,49 @@ await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
Assert.AreEqual(0, CustomOtelExporter.CollectedActivities.Count());
}


[DataTestMethod]
[DataRow(false)]
[DataRow(true)]
public async Task SuppressListenerEvents_ResultsInNoScopeActivityCreation_AssertTraceIdNotNull(bool useGateway)
{
// Initialize CustomListener with suppression
CustomListener customListener = new CustomListener($"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.*", "Azure-Cosmos-Operation-Request-Diagnostics", true);

if (useGateway)
{
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
customizeClientBuilder: (builder) => builder
.WithClientTelemetryOptions(new CosmosClientTelemetryOptions()
{
DisableDistributedTracing = false
})
.WithConnectionModeGateway());
}
else
{
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
customizeClientBuilder: (builder) => builder
.WithClientTelemetryOptions(new CosmosClientTelemetryOptions()
{
DisableDistributedTracing = false
}));
}

ContainerResponse containerResponse = await this.database.CreateContainerAsync(
id: Guid.NewGuid().ToString(),
partitionKeyPath: "/id",
throughput: 20000);

// Assert traceId in Diagnostics logs
string diagnosticsCreateContainer = containerResponse.Diagnostics.ToString();
JObject objDiagnosticsCreate = JObject.Parse(diagnosticsCreateContainer);
Assert.IsNotNull(objDiagnosticsCreate["data"]["DistributedTraceId"], "Distributed Trace Id has value in diagnostics i.e. " + (string)objDiagnosticsCreate["data"]["DistributedTraceId"]);

// Cleanup
customListener.Dispose();
}

[TestCleanup]
public async Task CleanUp()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ public static void AreEqualAcrossListeners()
{
Assert.AreEqual(
JsonConvert.SerializeObject(CustomListener.CollectedOperationActivities.OrderBy(x => x.Id)),
JsonConvert.SerializeObject(CustomOtelExporter.CollectedActivities.OrderBy(x => x.Id)));
JsonConvert.SerializeObject(CustomOtelExporter.CollectedActivities
.Where(activity => activity.Source.Name == $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation")
.OrderBy(x => x.Id)));
}

private static void AssertDatabaseAndContainerName(string name, KeyValuePair<string, string> tag)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ internal class CustomListener :
{
private readonly Func<string, bool> sourceNameFilter;
private readonly string eventName;

private readonly bool suppressAllEvents;

private ConcurrentBag<IDisposable> subscriptions = new();
private ConcurrentBag<ProducedDiagnosticScope> Scopes { get; } = new();

Expand All @@ -36,21 +37,22 @@ internal class CustomListener :

private static List<EventSource> EventSources { set; get; } = new();

public CustomListener(string name, string eventName)
: this(n => Regex.Match(n, name).Success, eventName)
public CustomListener(string name, string eventName, bool suppressAllEvents = false)
: this(n => Regex.Match(n, name).Success, eventName, suppressAllEvents)
{
}

public CustomListener(Func<string, bool> filter, string eventName)
public CustomListener(Func<string, bool> filter, string eventName, bool suppressAllEvents = false)
{
this.sourceNameFilter = filter;
this.eventName = eventName;
this.suppressAllEvents = suppressAllEvents;

foreach (EventSource eventSource in EventSources)
{
this.OnEventSourceCreated(eventSource);
}

DiagnosticListener.AllListeners.Subscribe(this);
}

Expand Down Expand Up @@ -149,7 +151,15 @@ public void OnNext(DiagnosticListener value)
{
lock (this.Scopes)
{
this.subscriptions?.Add(value.Subscribe(this));
IDisposable subscriber = value.Subscribe(this, isEnabled: (name) =>
{
if (this.suppressAllEvents)
{
return false;
}
return true;
});
this.subscriptions?.Add(subscriber);
}
}
}
Expand All @@ -159,7 +169,12 @@ public void OnNext(DiagnosticListener value)
/// </summary>
protected override void OnEventSourceCreated(EventSource eventSource)
{
if(this.eventName == null)
if (this.eventName == null)
{
EventSources.Add(eventSource);
}

if (this.eventName == null)
{
EventSources.Add(eventSource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public override ExportResult Export(in Batch<Activity> batch)

foreach (Activity activity in batch)
{
if (string.Equals(activity.Source.Name, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", StringComparison.OrdinalIgnoreCase))
if (string.Equals(activity.Source.Name, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", StringComparison.OrdinalIgnoreCase)
|| string.Equals(activity.Source.Name, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Request", StringComparison.OrdinalIgnoreCase))
{
AssertActivity.IsValidOperationActivity(activity);

Expand Down

0 comments on commit 4e2beda

Please sign in to comment.