From ea01456a734de8d664d222563a66c375948da907 Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Fri, 13 Dec 2024 16:05:10 -0800 Subject: [PATCH 1/4] initial commit --- .../LocalGrpcListener.cs | 15 +++++++++++++++ .../FunctionsDurableTaskClient.cs | 7 +++++++ 2 files changed, 22 insertions(+) diff --git a/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs b/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs index fea9a0ffd..dfc5a1fe8 100644 --- a/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs +++ b/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs @@ -3,6 +3,7 @@ #nullable enable using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Reflection.Metadata.Ecma335; using System.Threading; @@ -154,8 +155,22 @@ public override Task Hello(Empty request, ServerCallContext context) { try { + ActivitySource activityTraceSource = new ActivitySource("DurableTask.WebJobs"); + + Activity? newActivity = activityTraceSource.CreateActivity("gRPC start orchestration", kind: ActivityKind.Producer); + + if (newActivity != null) + { + newActivity.SetParentId(request.ParentTraceContext.TraceParent); + } + + newActivity?.Start(); + string instanceId = await this.GetClient(context).StartNewAsync( request.Name, request.InstanceId, Raw(request.Input)); + + newActivity?.Stop(); + return new P.CreateInstanceResponse { InstanceId = instanceId, diff --git a/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs b/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs index 3c919362d..b5b5736a1 100644 --- a/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs +++ b/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using Microsoft.DurableTask; @@ -76,6 +77,12 @@ public override Task ScheduleNewOrchestrationInstanceAsync( StartOrchestrationOptions? options = null, CancellationToken cancellation = default) { + Activity? currActivity = Activity.Current; + if (currActivity != null) + { + options = new StartOrchestrationOptions(ParentTraceId: currActivity?.Id?.ToString()); + } + return this.inner.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input, options, cancellation); } From 312f31b006c6eef9b93febb358cff591b62fcfed Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Mon, 16 Dec 2024 18:10:36 -0800 Subject: [PATCH 2/4] update activitykind --- src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs b/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs index dfc5a1fe8..be84e6791 100644 --- a/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs +++ b/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs @@ -157,7 +157,7 @@ public override Task Hello(Empty request, ServerCallContext context) { ActivitySource activityTraceSource = new ActivitySource("DurableTask.WebJobs"); - Activity? newActivity = activityTraceSource.CreateActivity("gRPC start orchestration", kind: ActivityKind.Producer); + Activity? newActivity = activityTraceSource.CreateActivity("gRPC start orchestration", kind: ActivityKind.Server); if (newActivity != null) { From 15999ad03544b4edaba19d16856995e53d04c726 Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Mon, 16 Dec 2024 23:45:04 -0800 Subject: [PATCH 3/4] updated condition to create StartOrchestrationOptions and set parent trace id --- src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs b/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs index b5b5736a1..ebc4eb1a1 100644 --- a/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs +++ b/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs @@ -78,7 +78,7 @@ public override Task ScheduleNewOrchestrationInstanceAsync( CancellationToken cancellation = default) { Activity? currActivity = Activity.Current; - if (currActivity != null) + if (options == null && options?.ParentTraceId == null && currActivity != null) { options = new StartOrchestrationOptions(ParentTraceId: currActivity?.Id?.ToString()); } From e805effea865de7fe497cb20d53d88932fd38479 Mon Sep 17 00:00:00 2001 From: Varshitha Bachu Date: Wed, 15 Jan 2025 11:51:46 -0800 Subject: [PATCH 4/4] used reflection to rehydrate a shim Activity in LocalGrpcListener StartInstance() --- .../LocalGrpcListener.cs | 25 ++++++++++++++----- .../FunctionsDurableTaskClient.cs | 5 ---- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs b/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs index be84e6791..d2aa294f3 100644 --- a/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs +++ b/src/WebJobs.Extensions.DurableTask/LocalGrpcListener.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.Reflection; using System.Reflection.Metadata.Ecma335; using System.Threading; using System.Threading.Tasks; @@ -155,21 +156,33 @@ public override Task Hello(Empty request, ServerCallContext context) { try { - ActivitySource activityTraceSource = new ActivitySource("DurableTask.WebJobs"); + Activity? newActivity = new Activity("gRPC start orchestration"); - Activity? newActivity = activityTraceSource.CreateActivity("gRPC start orchestration", kind: ActivityKind.Server); + string traceParentContext = request.ParentTraceContext.TraceParent.ToString(); - if (newActivity != null) + string[] traceParentContextSplit = traceParentContext.Split('-'); + + if (traceParentContextSplit.Length == 4) { - newActivity.SetParentId(request.ParentTraceContext.TraceParent); + string traceId = traceParentContextSplit[1]; + string spanId = traceParentContextSplit[2]; + + // reflection to set trace id and span id of newActivity + if (newActivity != null) + { + typeof(Activity).GetField("_traceId", BindingFlags.Instance | BindingFlags.NonPublic)?.SetValue(newActivity, traceId); + typeof(Activity).GetField("_spanId", BindingFlags.Instance | BindingFlags.NonPublic)?.SetValue(newActivity, spanId); + } } - newActivity?.Start(); + // set Activity.Current as the new Activity, start the orchestration, and then reset Activity.Current to the previous Activity.Current + Activity? currActivity = Activity.Current; + Activity.Current = newActivity; string instanceId = await this.GetClient(context).StartNewAsync( request.Name, request.InstanceId, Raw(request.Input)); - newActivity?.Stop(); + Activity.Current = currActivity; return new P.CreateInstanceResponse { diff --git a/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs b/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs index ebc4eb1a1..ec79aba96 100644 --- a/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs +++ b/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs @@ -77,11 +77,6 @@ public override Task ScheduleNewOrchestrationInstanceAsync( StartOrchestrationOptions? options = null, CancellationToken cancellation = default) { - Activity? currActivity = Activity.Current; - if (options == null && options?.ParentTraceId == null && currActivity != null) - { - options = new StartOrchestrationOptions(ParentTraceId: currActivity?.Id?.ToString()); - } return this.inner.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input, options, cancellation); }