Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azservicebus] Enable distributed tracing #23860

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from

Conversation

karenychen
Copy link
Contributor

@karenychen karenychen commented Dec 11, 2024

  • The purpose of this PR is explained in this or a referenced issue.
  • The PR does not update generated files.
  • Tests are included and/or updated for code changes.
  • Updates to module CHANGELOG.md are included.
  • MIT license headers are included in each file.

@github-actions github-actions bot added Community Contribution Community members are working on the issue customer-reported Issues that are reported by GitHub users external to the Azure organization. Service Bus labels Dec 11, 2024
Copy link

Thank you for your contribution @karenychen! We will review the pull request and get back to you soon.

@karenychen
Copy link
Contributor Author

Hi @lmolkova ! I had a small question regarding diagnostic-id in https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-end-to-end-tracing?tabs=net-standard-sdk-2

The .NET SDK seems to be hooking them up to a ReceiveMessages trace when the users set the diagnostic-id and linking the list of diagnostic ids from the messages to the Receive trace (code here). I might be misunderstanding how .NET is doing it, but I am wondering what we enable with the diagnostic-ids?

Copy link
Member

@richardpark-msft richardpark-msft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great so far, got some questions for the other experts in the crowd, but from an SB perspective it looks great.

}

func getSpanAttributesForMessage(message *Message) []tracing.Attribute {
attrs := []tracing.Attribute{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
attrs := []tracing.Attribute{}
var attrs []tracing.Attribute

I swear there's some linter that complains if you pre-init the slice and it's not technically needed anyways.

ctx, endSpan := s.startSpan(ctx, "ScheduleAMQPAnnotatedMessages", tracing.ScheduleOperationName,
tracing.Attribute{Key: tracing.BatchMessageCount, Value: int64(len(messages))},
)
defer func() { endSpan(err) }()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jhendrixMSFT, would it be worth building this pattern (via a callback, probably) into the tracing library? It can be internal, but it seems like everyone's going to do the "last error gets passed to endSpan before block ends" pattern.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not, @karenychen, we can build a helper function - maybe we'd stick it right in the retry function to make things easier since we're passing very similar information to both.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it goes in the sdk/internal module?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synced with Richard offline -- we are moving this to the Retry() layer :)

@@ -186,10 +187,11 @@ func deleteSubscription(t *testing.T, ac *admin.Client, topicName string, subscr
// and fails tests otherwise.
func peekSingleMessageForTest(t *testing.T, receiver *Receiver) *ReceivedMessage {
var msg *ReceivedMessage
// TODO
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General question: is it possible for me to test the traces in outside of the local unit tests too? Are there instructions on how I can run the live tests (and potentially the stress tests)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can run the same suite of live tests. I need to update the sample.env to list them properly, but for now you can see the ones we use here: https://github.com/karenychen/azure-sdk-for-go/blob/bd50b2a1e4c72b1d22ba11314d315d939796c201/sdk/messaging/azservicebus/internal/test/test_helpers.go#L81

Just create a .env file and place it in the azservicebus folder, and run go test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, you can add /azp run go - azservicebus as a comment on your PR to run it as part of your CI.

ctx, endSpan := s.startSpan(ctx, "ScheduleAMQPAnnotatedMessages", tracing.ScheduleOperationName,
tracing.Attribute{Key: tracing.BatchMessageCount, Value: int64(len(messages))},
)
defer func() { endSpan(err) }()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synced with Richard offline -- we are moving this to the Retry() layer :)

Tracer() tracing.Tracer

// SetTracer sets the tracer for the AMQPLinks instance.
SetTracer(tracing.Tracer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does AMQPLinks needs to own a Tracer object or should it just be passed in as an argument to each function call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this I am open to your preference :) Currently the tracer starts a span at the Retry() function level. So we can either have it in the amqpLink layer, or keep it in the Sender/Receiver layer and passing it as an argument to each function call all the way down to the Retry() function level. Which option do you think is more appropriate here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

passing it as an argument to each function call all the way down to the Retry() function level

I'd prefer this, just to eliminate any potential race conditions with state.

I know the argument list is getting pretty gnarly with Retry(), and we can work on that (separately).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if it gets too gnarly though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to move the tracers 1 level up. Now they live in Sender, Receiver and Namespace

@@ -433,7 +434,7 @@ func (ns *Namespace) startNegotiateClaimRenewer(ctx context.Context,
return
case <-time.After(nextClaimAt):
for {
err := utils.Retry(refreshCtx, exported.EventAuth, "NegotiateClaimRefresh", func(ctx context.Context, args *utils.RetryFnArgs) error {
err := utils.Retry(refreshCtx, tracing.NewNoOpTracer(), exported.EventAuth, "NegotiateClaimRefresh", func(ctx context.Context, args *utils.RetryFnArgs) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the NoOpTracer here temporary or is there a reason we shouldn't trace this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The claim negotiation is not outlined in the Otel conventions for SB, so I left it to a no-op tracer https://opentelemetry.io/docs/specs/semconv/messaging/azure-messaging/

Definitely open to adding it if we want to support it though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. I think we could consider it a request, just not user initiated. We can figure it out later, but maybe we could file a separate issue so we pick it up later.

It's definitely useful info.

Copy link
Contributor Author

@karenychen karenychen Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! Just noticed we can use tracing.SpanKind to denote that it is an internal span.

https://github.com/Azure/azure-sdk-for-go/blob/af2aacb0bf5b03231cba3fdc08e330469f297cd4/sdk/azcore/tracing/constants.go#L7C1-L27C2

Updated the code to include this everywhere: Sender spans will have Producer span kind, and Receiver spans will have Consumer span kind. The NegotiateClaim function has the Internal span kind.


type SetAttributesFn func([]Attribute) []Attribute

func NewSpanConfig(name SpanName, options ...SetAttributesFn) *SpanConfig {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this indirection (for SetAttributesFn) is a bit odd - it looks like attributes ...Attribute would cover what we use it for, and helps a bit in showing that's the only thing we use it for.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switched this one up a bit to directly return list of attributes, let me know what you think!

@@ -361,6 +369,12 @@ func (r *Receiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessa
}

func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error) {
var err error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you move this span code into ReceiveMessages it'll be less tricky since you'll only have a single spot where you need to log the error, and we don't have to be wary of assigning err in all the code here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated!

Copy link
Member

@richardpark-msft richardpark-msft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's coming along really well. I left some comments, but I'm really liking it.

Copy link
Member

@lmolkova lmolkova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank! Left some comments on naming and following conventions more closely

spanKind = SpanKindConsumer
}

return runtime.StartSpan(ctx, string(options.SpanName), options.Tracer,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in messaging (and other area where conventions are defined) we follow OTel semantic conventions and should use span name documented there - it would be {messaging.operation.name} {queue | topic name}, e.g. send orders

spanKind = SpanKindProducer
} else if spanCaller == "Receiver" || spanCaller == "SessionReceiver" {
spanKind = SpanKindConsumer
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -93,9 +95,18 @@ type SendMessageBatchOptions struct {
// Message batches can be created using [Sender.NewMessageBatch].
// If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable.
func (s *Sender) SendMessageBatch(ctx context.Context, batch *MessageBatch, options *SendMessageBatchOptions) error {
to := &tracing.TracerOptions{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so for batch we actually need to

  • create a producer span per message - this is where we'd inject context to each message
  • create a client send span and add links to individual message contexts

Copy link
Member

@serbrech serbrech Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err := s.links.Retry(ctx, EventSender, "SendMessageBatch", func(ctx context.Context, lwid *internal.LinksWithID, args *utils.RetryFnArgs) error {
return lwid.Sender.Send(ctx, batch.toAMQPMessage(), nil)
}, RetryOptions(s.retryOptions))
}, RetryOptions(s.retryOptions), to)

return internal.TransformError(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@karenychen karenychen Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: we should set the errors.type attribute in the azcore/runtime package

I will make a PR for that alongside the Span Linking but cc @jhendrixMSFT for viz

func scheduleMessages[T amqpCompatibleMessage](ctx context.Context, tracer tracing.Tracer, links internal.AMQPLinks, retryOptions RetryOptions, messages []T, scheduledEnqueueTime time.Time) ([]int64, error) {
to := &tracing.TracerOptions{
Tracer: tracer,
SpanName: tracing.ScheduleSpanName,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SB-specific operation names (language-agnostic) are documented here https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/azure-messaging.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a span validator used to validate spans in unit tests. I will make a separate PR on it but putting it here for reference

if s == nil {
return internal.NewErrNonRetriable("messages that are received in `ReceiveModeReceiveAndDelete` mode are not settleable")
}

if to != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are setting the tracer here so we don't hit a nil-pointer dereference in cases where messageSettler is set to nil in a receive-and-delete sender

@azure-sdk
Copy link
Collaborator

API change check

API changes are not detected in this pull request.

1 similar comment
@azure-sdk
Copy link
Collaborator

API change check

API changes are not detected in this pull request.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Community Contribution Community members are working on the issue customer-reported Issues that are reported by GitHub users external to the Azure organization. Service Bus
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants