From a53537e3fabf27b3785b07442bb87d4cc12d3f9e Mon Sep 17 00:00:00 2001 From: "ryota.suzuki" Date: Sat, 11 Jan 2025 17:13:33 +0900 Subject: [PATCH] fix extract Extension --- .../otel/tracing/context_decorator.go | 18 +++++++++--------- actor/middleware/otel/tracing/extension.go | 9 +++++++++ .../otel/tracing/sender_middleware.go | 8 +++++++- actor/middleware/otel/tracing/span.go | 8 ++++++-- .../otel/tracing/spawn_middleware.go | 9 ++++++++- examples/opentelemetry-trace/go.mod | 3 ++- examples/opentelemetry-trace/go.sum | 11 ----------- examples/opentelemetry-trace/main.go | 7 ++++--- 8 files changed, 45 insertions(+), 28 deletions(-) diff --git a/actor/middleware/otel/tracing/context_decorator.go b/actor/middleware/otel/tracing/context_decorator.go index 885baa8b..1c9afcf9 100644 --- a/actor/middleware/otel/tracing/context_decorator.go +++ b/actor/middleware/otel/tracing/context_decorator.go @@ -21,7 +21,7 @@ type ActorContext struct { var _ actor.Context = (*ActorContext)(nil) func (ac *ActorContext) Receive(envelope *actor.MessageEnvelope) { - traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem()) if !ok { ac.Logger().Debug("TraceExtension not registered") ac.Context.Receive(envelope) @@ -69,7 +69,7 @@ func (ac *ActorContext) Receive(envelope *actor.MessageEnvelope) { } func (ac *ActorContext) Send(pid *actor.PID, message interface{}) { - traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem()) if !ok { ac.Logger().Debug("TraceExtension not registered") ac.Context.Send(pid, message) @@ -84,7 +84,7 @@ func (ac *ActorContext) Send(pid *actor.PID, message interface{}) { } func (ac *ActorContext) Request(pid *actor.PID, message interface{}) { - traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem()) if !ok { ac.Logger().Debug("TraceExtension not registered") ac.Context.Request(pid, message) @@ -99,7 +99,7 @@ func (ac *ActorContext) Request(pid *actor.PID, message interface{}) { } func (ac *ActorContext) RequestWithCustomSender(pid *actor.PID, message interface{}, sender *actor.PID) { - traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem()) if !ok { ac.Logger().Debug("TraceExtension not registered") ac.Context.RequestWithCustomSender(pid, message, sender) @@ -126,7 +126,7 @@ func messageToEnvelop(message interface{}, t *ActorContext, sender *actor.PID) * } func (ac *ActorContext) RequestFuture(pid *actor.PID, message interface{}, timeout time.Duration) *actor.Future { - traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem()) if !ok { ac.Logger().Debug("TraceExtension not registered") return ac.Context.RequestFuture(pid, message, timeout) @@ -142,7 +142,7 @@ func (ac *ActorContext) RequestFuture(pid *actor.PID, message interface{}, timeo } func (ac *ActorContext) Respond(response interface{}) { - traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem()) if !ok { ac.Logger().Debug("TraceExtension not registered") ac.Context.Respond(response) @@ -165,7 +165,7 @@ func ContextDecorator() func(next actor.ContextDecoratorFunc) actor.ContextDecor } func (ac *ActorContext) Spawn(props *actor.Props) *actor.PID { - traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem()) if !ok { ac.Logger().Debug("TraceExtension not registered") return ac.Context.Spawn(props) @@ -180,7 +180,7 @@ func (ac *ActorContext) Spawn(props *actor.Props) *actor.PID { } func (ac *ActorContext) SpawnPrefix(props *actor.Props, prefix string) *actor.PID { - traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem()) if !ok { ac.Logger().Debug("TraceExtension not registered") return ac.Context.SpawnPrefix(props, prefix) @@ -197,7 +197,7 @@ func (ac *ActorContext) SpawnPrefix(props *actor.Props, prefix string) *actor.PI } func (ac *ActorContext) SpawnNamed(props *actor.Props, id string) (*actor.PID, error) { - traceExt, ok := ac.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) + traceExt, ok := ExtensionFromActorSystem(ac.ActorSystem()) if !ok { ac.Logger().Debug("TraceExtension not registered") return ac.Context.SpawnNamed(props, id) diff --git a/actor/middleware/otel/tracing/extension.go b/actor/middleware/otel/tracing/extension.go index 0ab3d38d..5b5a82f1 100644 --- a/actor/middleware/otel/tracing/extension.go +++ b/actor/middleware/otel/tracing/extension.go @@ -1,6 +1,7 @@ package tracing import ( + "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/extensions" "go.opentelemetry.io/otel/trace" ) @@ -15,6 +16,14 @@ func (ext *TraceExtension) Tracer() trace.Tracer { return ext.TracerProvider.Tracer("protoactor") } +func ExtensionFromActorSystem(system *actor.ActorSystem) (*TraceExtension, bool) { + t, ok := system.Extensions.Get(extensionID).(*TraceExtension) + if !ok { + return nil, false + } + return t, true +} + func NewTraceExtension( provider trace.TracerProvider, ) *TraceExtension { diff --git a/actor/middleware/otel/tracing/sender_middleware.go b/actor/middleware/otel/tracing/sender_middleware.go index 2f8bd1a3..a75975a9 100644 --- a/actor/middleware/otel/tracing/sender_middleware.go +++ b/actor/middleware/otel/tracing/sender_middleware.go @@ -36,6 +36,13 @@ func RootContextSenderMiddleware() actor.SenderMiddleware { return } + traceExt, ok := ExtensionFromActorSystem(c.ActorSystem()) + if !ok { + c.Logger().Debug("TraceExtension not registered") + next(c, target, envelope) + return + } + ctxWithParentSpan := context2.Background() spanContext, err := extractSpanContextFromSenderFuncArgs(c, envelope) if errors.Is(err, ErrSpanContextNotFound) { @@ -46,7 +53,6 @@ func RootContextSenderMiddleware() actor.SenderMiddleware { ctxWithParentSpan = trace.ContextWithSpanContext(ctxWithParentSpan, spanContext) } - traceExt := c.ActorSystem().Extensions.Get(extensionID).(*TraceExtension) ctxWithCurrentSpan, span := traceExt.Tracer().Start(ctxWithParentSpan, fmt.Sprintf("message_send/%T", envelope.Message)) defer span.End() span.SetAttributes(attribute.String("SenderActorPID", c.Self().String())) diff --git a/actor/middleware/otel/tracing/span.go b/actor/middleware/otel/tracing/span.go index 13b706e8..2bed7c2e 100644 --- a/actor/middleware/otel/tracing/span.go +++ b/actor/middleware/otel/tracing/span.go @@ -69,6 +69,10 @@ func setSpanContextToEnvelope(spanCtx trace.SpanContext, envelope *actor.Message } // TraceableRootContext creates a RootContext with tracing capabilities -func TraceableRootContext(rootContext actor.RootContext, spanContext trace.SpanContext) *actor.RootContext { - return rootContext.Copy().WithSenderMiddleware(RootContextSenderMiddleware()).WithSpawnMiddleware(RootContextSpawnMiddleware()).WithHeaders(MapFromSpanContext(spanContext)) +func TraceableRootContext(rootContext *actor.RootContext) *actor.RootContext { + return rootContext.Copy().WithSenderMiddleware(RootContextSenderMiddleware()).WithSpawnMiddleware(RootContextSpawnMiddleware()) +} + +func WithSpanRootContext(rootContext *actor.RootContext, span trace.Span) *actor.RootContext { + return rootContext.Copy().WithHeaders(MapFromSpanContext(span.SpanContext())) } diff --git a/actor/middleware/otel/tracing/spawn_middleware.go b/actor/middleware/otel/tracing/spawn_middleware.go index c1debef3..144465bf 100644 --- a/actor/middleware/otel/tracing/spawn_middleware.go +++ b/actor/middleware/otel/tracing/spawn_middleware.go @@ -14,6 +14,13 @@ import ( func rootContextSpawnMiddleware() actor.SpawnMiddleware { return func(next actor.SpawnFunc) actor.SpawnFunc { return func(actorSystem *actor.ActorSystem, id string, props *actor.Props, parentContext actor.SpawnerContext) (pid *actor.PID, e error) { + traceExt, ok := ExtensionFromActorSystem(actorSystem) + if !ok { + actorSystem.Logger().Debug("TraceExtension not registered") + pid, err := next(actorSystem, id, props, parentContext) + return pid, err + } + rootContext, ok := parentContext.(*actor.RootContext) if !ok { parentContext.Logger().Debug("Context is not rootContext", slog.Any("self", parentContext.Self())) @@ -34,7 +41,7 @@ func rootContextSpawnMiddleware() actor.SpawnMiddleware { ctxWithParentSpan = trace.ContextWithSpanContext(ctxWithParentSpan, spanCtx) } } - traceExt := actorSystem.Extensions.Get(extensionID).(*TraceExtension) + _, span := traceExt.Tracer().Start(ctxWithParentSpan, fmt.Sprintf("spawn/%s", id)) defer span.End() span.SetAttributes(attribute.String("SpawnActorPID", pid.String())) diff --git a/examples/opentelemetry-trace/go.mod b/examples/opentelemetry-trace/go.mod index 5662cef7..a3cca71b 100644 --- a/examples/opentelemetry-trace/go.mod +++ b/examples/opentelemetry-trace/go.mod @@ -11,6 +11,7 @@ require ( github.com/asynkron/protoactor-go v0.0.0-20240822202345-3c0e61ca19c9 go.opentelemetry.io/otel v1.33.0 go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.33.0 + go.opentelemetry.io/otel/sdk v1.33.0 ) @@ -33,8 +34,8 @@ require ( github.com/prometheus/procfs v0.11.1 // indirect github.com/twmb/murmur3 v1.1.8 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/exporters/prometheus v0.44.0 // indirect go.opentelemetry.io/otel/metric v1.33.0 // indirect - go.opentelemetry.io/otel/sdk v1.33.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.33.0 // indirect go.opentelemetry.io/otel/trace v1.33.0 // indirect golang.org/x/sys v0.28.0 // indirect diff --git a/examples/opentelemetry-trace/go.sum b/examples/opentelemetry-trace/go.sum index d7b41af7..a7a7e82a 100644 --- a/examples/opentelemetry-trace/go.sum +++ b/examples/opentelemetry-trace/go.sum @@ -1,5 +1,3 @@ -github.com/Workiva/go-datastructures v1.1.3 h1:LRdRrug9tEuKk7TGfz/sct5gjVj44G9pfqDt4qm7ghw= -github.com/Workiva/go-datastructures v1.1.3/go.mod h1:1yZL+zfsztete+ePzZz/Zb1/t5BnDuE2Ya2MMGhzP6A= github.com/Workiva/go-datastructures v1.1.5 h1:5YfhQ4ry7bZc2Mc7R0YZyYwpf5c6t1cEFvdAhd6Mkf4= github.com/Workiva/go-datastructures v1.1.5/go.mod h1:1yZL+zfsztete+ePzZz/Zb1/t5BnDuE2Ya2MMGhzP6A= github.com/asynkron/goconsole v0.0.0-20160504192649-bfa12eebf716 h1:SgyG4sXkrlalMoCfp20LiNPNhfJS7ez3opNdtihIxPc= @@ -19,12 +17,8 @@ github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ4 github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -73,8 +67,6 @@ go.opentelemetry.io/otel/metric v1.33.0 h1:r+JOocAyeRVXD8lZpjdQjzMadVZp2M4WmQ+5W go.opentelemetry.io/otel/metric v1.33.0/go.mod h1:L9+Fyctbp6HFTddIxClbQkjtubW6O9QS3Ann/M82u6M= go.opentelemetry.io/otel/sdk v1.33.0 h1:iax7M131HuAm9QkZotNHEfstof92xM+N8sr3uHXc2IM= go.opentelemetry.io/otel/sdk v1.33.0/go.mod h1:A1Q5oi7/9XaMlIWzPSxLRWOI8nG3FnzHJNbiENQuihM= -go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6RfAY4ICcR0= -go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q= go.opentelemetry.io/otel/sdk/metric v1.33.0 h1:Gs5VK9/WUJhNXZgn8MR6ITatvAmKeIuCtNbsP3JkNqU= go.opentelemetry.io/otel/sdk/metric v1.33.0/go.mod h1:dL5ykHZmm1B1nVRk9dDjChwDmt81MjVp3gLkQRwKf/Q= go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s= @@ -101,10 +93,7 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/examples/opentelemetry-trace/main.go b/examples/opentelemetry-trace/main.go index ebd1ce7c..e76a03f3 100644 --- a/examples/opentelemetry-trace/main.go +++ b/examples/opentelemetry-trace/main.go @@ -34,6 +34,7 @@ func main() { traceProvider := sdktrace.NewTracerProvider( sdktrace.WithBatcher(traceConsoleExporter), ) + defer func() { err := traceProvider.Shutdown(context2.Background()) if err != nil { @@ -47,11 +48,11 @@ func main() { props := actor.PropsFromProducer(func() actor.Actor { return &helloActor{} }) - + root := tracing.TraceableRootContext(system.Root) otel.SetTracerProvider(traceProvider) - pid := system.Root.Spawn(props) - system.Root.Request(pid, &hello{Who: "with tracing"}) + pid := root.Spawn(props) + root.Request(pid, &hello{Who: "with tracing"}) time.Sleep(100 * time.Millisecond) _, _ = console.ReadLine() }