Skip to content

Commit

Permalink
#453: replace brave by micrometer tracing. update some 3rd party depe…
Browse files Browse the repository at this point in the history
…ndencies
  • Loading branch information
Christian Ohr committed Jul 15, 2024
1 parent a9e5ecb commit 70d4cd7
Show file tree
Hide file tree
Showing 16 changed files with 222 additions and 161 deletions.
9 changes: 7 additions & 2 deletions commons/ihe/hl7v2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,15 @@
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave</artifactId>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package org.openehealth.ipf.commons.ihe.hl7v2.tracing

import brave.SpanCustomizer
import ca.uhn.hl7v2.model.Message
import io.micrometer.tracing.SpanCustomizer

/**
* @author Christian Ohr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

package org.openehealth.ipf.commons.ihe.hl7v2.tracing

import brave.propagation.Propagation

import ca.uhn.hl7v2.model.AbstractMessage
import io.micrometer.tracing.propagation.Propagator
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand All @@ -26,7 +27,7 @@ import org.slf4j.LoggerFactory
*
* @author Christian Ohr
*/
class Hl7MessageGetter implements Propagation.Getter<AbstractMessage, String> {
class Hl7MessageGetter implements Propagator.Getter<AbstractMessage> {

private static final Logger LOG = LoggerFactory.getLogger(Hl7MessageGetter)
private final String segmentName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package org.openehealth.ipf.commons.ihe.hl7v2.tracing

import brave.propagation.Propagation

import ca.uhn.hl7v2.model.Composite
import ca.uhn.hl7v2.model.Message
import io.micrometer.tracing.propagation.Propagator
import org.openehealth.ipf.modules.hl7.dsl.Repeatable
import org.slf4j.Logger
import org.slf4j.LoggerFactory
Expand All @@ -28,7 +29,7 @@ import org.slf4j.LoggerFactory
*
* @author Christian Ohr
*/
class Hl7MessageSetter implements Propagation.Setter<Message, String> {
class Hl7MessageSetter implements Propagator.Setter<Message> {

private static final Logger LOG = LoggerFactory.getLogger(Hl7MessageSetter)

Expand All @@ -43,12 +44,13 @@ class Hl7MessageSetter implements Propagation.Setter<Message, String> {
}

@Override
void put(Message msg, String key, String value) {
void set(Message msg, String key, String value) {
if (msg && key) {
def qip = Composite.QIP(msg) //, [segmentFieldName: key, values: value ?: ''])
qip[1] = key
qip[2] = value ?: ''
def varies = nextRepetition(msg.get(segmentName)[1])
def seg = msg.get(segmentName)
def varies = nextRepetition(seg[1])
varies.data = qip

if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@

package org.openehealth.ipf.commons.ihe.hl7v2.tracing

import brave.Span
import brave.Tracer
import brave.Tracing
import brave.propagation.Propagation
import ca.uhn.hl7v2.HL7Exception
import ca.uhn.hl7v2.model.AbstractMessage
import ca.uhn.hl7v2.model.Message
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import io.micrometer.tracing.Span
import io.micrometer.tracing.Tracer
import io.micrometer.tracing.propagation.Propagator

/**
* Helper class that injects and extracts tracing information from HL7 messages
Expand All @@ -34,88 +31,76 @@ import org.slf4j.LoggerFactory
*/
class MessageTracer {

private static final Logger LOG = LoggerFactory.getLogger(MessageTracer.class)

private static final String HL7_SENDING_APPLICATION = "MSH-3"
private static final String HL7_SENDING_FACILITY = "MSH-4"
private static final String HL7_MESSAGE_TYPE = "MSH-9-1"
private static final String HL7_TRIGGER_EVENT = "MSH-9-2"
private static final String HL7_PROCESSING_ID = "MSH-11"

private final Tracing tracing
private final Tracer tracer
private final boolean removeSegment
private final String segmentName
private final Propagation.Setter<Message, String> setter
private final Propagation.Getter<Message, String> getter
private final Propagator propagator;
private final Propagator.Setter<Message> setter
private final Propagator.Getter<Message> getter

/**
* @param tracing Tracing instance
* @param tracer Tracer instance, e.g. BraveTracer
* @param propagator Propagator instance, e.g. BravePropagator
* @param segmentName name of the segment with the propagated tracing information (default: ZTR)
* @param removeSegment whether the segment with the propagated tracing information is removed (default: true)
*/
MessageTracer(Tracing tracing, String segmentName = 'ZTR', boolean removeSegment = true) {
this.tracing = tracing
MessageTracer(Tracer tracer, Propagator propagator, String segmentName = 'ZTR', boolean removeSegment = true) {
this.tracer = tracer
this.removeSegment = removeSegment
this.segmentName = segmentName
this.setter = new Hl7MessageSetter(segmentName)
this.getter = new Hl7MessageGetter(segmentName)
this.propagator = propagator;
}

void sendMessage(Message msg, String name, Handler sender) {
Tracer tracer = tracing.tracer()
Span span = startSpan(tracer.nextSpan(), Span.Kind.CLIENT, name, msg)
def span = startSpan(tracer.spanBuilder(), Span.Kind.CLIENT, name, msg)
msg.addNonstandardSegment(segmentName)
tracing.propagation()
.injector(setter)
.inject(span.context(), msg)
Tracer.SpanInScope ws = tracer.withSpanInScope(span)
try {
try (def ws = this.tracer.withSpan(span)) {
propagator.inject(span.context(), msg, setter)
sender.accept(msg, span)
} catch (Throwable t) {
span.error(t)
throw t
} finally {
ws?.close()
span?.finish()
span.end()
}
}

void receiveMessage(Message msg, String name, Handler receiver) {
Tracer tracer = tracing.tracer()
Span span = startSpan(tracer.nextSpan(
tracing.propagation()
.extractor(getter)
.extract(msg)),
Span.Kind.SERVER, name, msg)
Tracer.SpanInScope ws = tracer.withSpanInScope(span)
try {
def span = startSpan(propagator.extract(msg, getter), Span.Kind.SERVER, name, msg)
try (def ws = tracer.withSpan(span)) {
if (removeSegment && msg instanceof AbstractMessage) {
try {
msg.removeRepetition(segmentName, 0)
} catch (HL7Exception ignored) {
// TODO LOG something?
}
}
receiver.accept(msg, span)
} catch (Throwable t) {
span.error(t)
throw t
} finally {
ws?.close()
span?.finish()
span.end()
}
}

private static Span startSpan(Span span, Span.Kind kind, String name, Message msg) {
span.kind(kind)
.name(name)
.tag(HL7_SENDING_APPLICATION, msg.MSH[3]?.value ?: '')
.tag(HL7_SENDING_FACILITY, msg.MSH[4]?.value ?: '')
.tag(HL7_MESSAGE_TYPE, msg.MSH[9][1]?.value ?: '')
.tag(HL7_TRIGGER_EVENT, msg.MSH[9][2]?.value ?: '')
.tag(HL7_PROCESSING_ID, msg.MSH[11]?.value ?: '')
// ExtraFieldPropagation.set(span.context(), 'messageId', msg.MSH[11]?.value ?: '')
span.start()
private static Span startSpan(Span.Builder spanBuilder, Span.Kind kind, String name, Message msg) {
spanBuilder
.name(name)
.kind(kind)
.tag(HL7_SENDING_APPLICATION, msg.MSH[3]?.value ?: '')
.tag(HL7_SENDING_FACILITY, msg.MSH[4]?.value ?: '')
.tag(HL7_MESSAGE_TYPE, msg.MSH[9][1]?.value ?: '')
.tag(HL7_TRIGGER_EVENT, msg.MSH[9][2]?.value ?: '')
.tag(HL7_PROCESSING_ID, msg.MSH[11]?.value ?: '')
.start()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ class Hl7MessageSetterGetterTest {
Message r01 = MessageUtils.makeMessage(CONTEXT, 'ORU', 'R01','2.5')
r01.addNonstandardSegment('ZTR')
Hl7MessageSetter setter = new Hl7MessageSetter()
setter.put(r01, 'key1', 'value1')
setter.put(r01, 'key2', 'value2')
setter.put(r01, 'key3', 'value~with^reserved|characters')
setter.set(r01, 'key1', 'value1')
setter.set(r01, 'key2', 'value2')
setter.set(r01, 'key3', 'value~with^reserved|characters')

assertNotNull(r01.get('ZTR'))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,24 @@

package org.openehealth.ipf.commons.ihe.hl7v2.tracing

import brave.SpanCustomizer
import brave.Span
import brave.Tracing
import brave.context.slf4j.MDCScopeDecorator
import brave.handler.MutableSpan
import brave.handler.SpanHandler
import brave.propagation.ThreadLocalCurrentTraceContext
import brave.propagation.TraceContext
import ca.uhn.hl7v2.HapiContext
import ca.uhn.hl7v2.model.Message
import io.micrometer.tracing.SpanCustomizer
import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext
import io.micrometer.tracing.brave.bridge.BravePropagator
import io.micrometer.tracing.brave.bridge.BraveTracer
import org.junit.jupiter.api.Test
import org.openehealth.ipf.commons.ihe.hl7v2.definitions.HapiContextFactory
import org.openehealth.ipf.modules.hl7.message.MessageUtils
import zipkin2.Span
import zipkin2.reporter.Reporter

import static org.junit.jupiter.api.Assertions.assertEquals
import static org.junit.jupiter.api.Assertions.assertNotEquals
import static org.junit.jupiter.api.Assertions.assertFalse
import static org.junit.jupiter.api.Assertions.assertTrue
import static org.junit.jupiter.api.Assertions.*

/**
* @author Christian Ohr
Expand All @@ -41,12 +45,24 @@ class MessageTracerTest {
@Test
void traceMessage() {
MockReporter reporter = new MockReporter()
Tracing tracing = Tracing.newBuilder()

// Brave setup
def braveCurrentTraceContext = ThreadLocalCurrentTraceContext.newBuilder()
.addScopeDecorator(MDCScopeDecorator.get()) // Example of Brave's automatic MDC setup
.build();
def tracing = Tracing.newBuilder()
.localServiceName('MessageTracerTest')
.spanReporter(reporter)
.addSpanHandler(reporter)
.build()
MessageTracer messageTracer = new MessageTracer(tracing)
Message sending = MessageUtils.makeMessage(CONTEXT, 'ORU', 'R01', '2.5')
def braveTracer = tracing.tracer();

// Micrometer Brave Bridge
def propagator = new BravePropagator(tracing)
def bridgeContext = new BraveCurrentTraceContext(braveCurrentTraceContext);
def tracer = new BraveTracer(braveTracer, bridgeContext);

def messageTracer = new MessageTracer(tracer, propagator)
def sending = MessageUtils.makeMessage(CONTEXT, 'ORU', 'R01', '2.5')

messageTracer.sendMessage(sending, "producer", new Handler() {
@Override
Expand All @@ -62,29 +78,30 @@ class MessageTracerTest {
})

// Check a few things
List<zipkin2.Span> spans = reporter.getSpans()
def spans = reporter.getSpans()
assertEquals(2, spans.size())

Span clientSpan = reporter.spans.find { span -> span.kind() == Span.Kind.CLIENT}
Span serverSpan = reporter.spans.find { span -> span.kind() == Span.Kind.SERVER}
def clientSpan = reporter.spans.find { span -> span.kind() == Span.Kind.CLIENT}
def serverSpan = reporter.spans.find { span -> span.kind() == Span.Kind.SERVER}
assertFalse(clientSpan.tags().isEmpty())
assertEquals(clientSpan.tags(), serverSpan.tags())
assertEquals(new HashMap<>(clientSpan.tags()), new HashMap<>(serverSpan.tags()))
assertNotEquals(clientSpan.id(), serverSpan.id())
assertEquals(clientSpan.id(), serverSpan.parentId())
assertTrue(clientSpan.durationAsLong() > serverSpan.durationAsLong())
}

private static final class MockReporter implements Reporter<zipkin2.Span> {
class MockReporter extends SpanHandler {

private List<zipkin2.Span> spans = new ArrayList<>()
private List<MutableSpan> spans = new ArrayList<>();

@Override
void report(zipkin2.Span span) {
spans.add(span)
boolean end(TraceContext context, MutableSpan span, Cause cause) {
spans.add(span);
return super.end(context, span, cause)
}

List<zipkin2.Span> getSpans() {
List<MutableSpan> getSpans() {
return spans
}
}

}
Loading

0 comments on commit 70d4cd7

Please sign in to comment.