Skip to content

Commit

Permalink
support server.address span attributes for amqp
Browse files Browse the repository at this point in the history
  • Loading branch information
mirackara committed Aug 27, 2024
1 parent 4ae7fb9 commit 139cd89
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 10 deletions.
10 changes: 7 additions & 3 deletions v3/integrations/nramqp/examples/publisher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ type amqpServer struct {
ch *amqp.Channel
exchange string
routingKey string
url string
}

func NewServer(channel *amqp.Channel, exchangeName, routingKeyName string) *amqpServer {
func NewServer(channel *amqp.Channel, exchangeName, routingKeyName string, url string) *amqpServer {
return &amqpServer{
channel,
exchangeName,
routingKeyName,
url,
}
}

Expand All @@ -65,6 +67,7 @@ func (serv *amqpServer) publishPlainTxtMessage(w http.ResponseWriter, r *http.Re
ctx,
serv.exchange, // exchange
serv.routingKey, // routing key
serv.url, // url
false, // mandatory
false, // immediate
amqp.Publishing{
Expand Down Expand Up @@ -94,7 +97,8 @@ func main() {

nrApp.WaitForConnection(time.Second * 5)

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
amqpURL := "amqp://guest:guest@localhost:5672/"
conn, err := amqp.Dial(amqpURL)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

Expand All @@ -112,7 +116,7 @@ func main() {
)
failOnError(err, "Failed to declare a queue")

server := NewServer(ch, "", q.Name)
server := NewServer(ch, "", q.Name, amqpURL)

http.HandleFunc(newrelic.WrapHandleFunc(nrApp, "/", server.index))
http.HandleFunc(newrelic.WrapHandleFunc(nrApp, "/message", server.publishPlainTxtMessage))
Expand Down
15 changes: 8 additions & 7 deletions v3/integrations/nramqp/nramqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (

func init() { internal.TrackUsage("integration", "messagebroker", "nramqp") }

func creatProducerSegment(exchange, key string) *newrelic.MessageProducerSegment {
func createProducerSegment(exchange, key string) *newrelic.MessageProducerSegment {
s := newrelic.MessageProducerSegment{
Library: RabbitMQLibrary,
DestinationName: "Default",
Expand All @@ -35,11 +35,11 @@ func creatProducerSegment(exchange, key string) *newrelic.MessageProducerSegment

// PublishedWithContext looks for a newrelic transaction in the context object, and if found, creates a message producer segment.
// It will also inject distributed tracing headers into the message.
func PublishWithContext(ch *amqp.Channel, ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
func PublishWithContext(ch *amqp.Channel, ctx context.Context, exchange, key, url string, mandatory, immediate bool, msg amqp.Publishing) error {
txn := newrelic.FromContext(ctx)
if txn != nil {
// generate message broker segment
s := creatProducerSegment(exchange, key)
s := createProducerSegment(exchange, key)

// capture telemetry for AMQP producer
if msg.Headers != nil && len(msg.Headers) > 0 {
Expand All @@ -49,15 +49,16 @@ func PublishWithContext(ch *amqp.Channel, ctx context.Context, exchange, key str
}
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageHeaders, hdrStr)
}
s.StartTime = txn.StartSegmentNow()

// inject DT headers into headers object
msg.Headers = injectDtHeaders(txn, msg.Headers)

integrationsupport.AddAgentSpanAttribute(txn, newrelic.SpanAttributeServerAddress, url)
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageRoutingKey, key)
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageCorrelationID, msg.CorrelationId)
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageReplyTo, msg.ReplyTo)

// inject DT headers into headers object
msg.Headers = injectDtHeaders(txn, msg.Headers)

s.StartTime = txn.StartSegmentNow()
err := ch.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg)
s.End()
return err
Expand Down
1 change: 1 addition & 0 deletions v3/newrelic/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ const (
// cfg.SpanEvents.Attributes.Exclude = append(cfg.SpanEvents.Attributes.Exclude,
// newrelic.SpanAttributeDBStatement)
const (
SpanAttributeServerAddress = "server.address"
SpanAttributeDBStatement = "db.statement"
SpanAttributeDBInstance = "db.instance"
SpanAttributeDBCollection = "db.collection"
Expand Down
1 change: 1 addition & 0 deletions v3/newrelic/attributes_from_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ var (
AttributeLLM: usualDests,

// Span specific attributes
SpanAttributeServerAddress: usualDests,
SpanAttributeDBStatement: usualDests,
SpanAttributeDBInstance: usualDests,
SpanAttributeDBCollection: usualDests,
Expand Down

0 comments on commit 139cd89

Please sign in to comment.