Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add helper methods for manual spans in mutiny pipelines #45478

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions docs/src/main/asciidoc/opentelemetry-tracing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,60 @@ public void tracedWork() {
}
----

brunobat marked this conversation as resolved.
Show resolved Hide resolved
=== Mutiny
Methods returning reactive types can also be annotated with `@WithSpan` and `@AddingSpanAttributes` to create a new span or add attributes to the current span.

If you need to create spans manually within a mutiny pipeline, use `wrapWithSpan` method from `io.quarkus.opentelemetry.runtime.tracing.mutiny.MutinyTracingHelper`.

Example. Assuming you have the following pipeline:
[source,java]
----
Uni<String> uni = Uni.createFrom().item("hello")
//start trace here
.onItem().transform(item -> item + " world")
.onItem().transform(item -> item + "!")
//end trace here
.subscribe().with(
item -> System.out.println("Received: " + item),
failure -> System.out.println("Failed with " + failure)
);
----
wrap it like this:
[source,java]
----
import static io.quarkus.opentelemetry.runtime.tracing.mutiny.MutinyTracingHelper.wrapWithSpan;
...
@Inject
Tracer tracer;
...
Context context = Context.current();
Uni<String> uni = Uni.createFrom().item("hello")
.transformToUni(m -> wrapWithSpan(tracer, Optional.of(context), "my-span-name",
Uni.createFrom().item(m)
.onItem().transform(item -> item + " world")
.onItem().transform(item -> item + "!")
))
.subscribe().with(
item -> System.out.println("Received: " + item),
failure -> System.out.println("Failed with " + failure)
);

----
for multi-pipelines it works similarly:
[source,java]
----
Multi.createFrom().items("Alice", "Bob", "Charlie")
.transformToMultiAndConcatenate(m -> TracingHelper.withTrace("my-span-name",
Multi.createFrom().item(m)
.onItem().transform(name -> "Hello " + name)
))
.subscribe().with(
item -> System.out.println("Received: " + item),
failure -> System.out.println("Failed with " + failure)
);
----
Instead of `transformToMultiAndConcatenate` you can use `transformToMultiAndMerge` if you don't care about the order of the items.

=== Quarkus Messaging - Kafka

When using the Quarkus Messaging extension for Kafka,
Expand Down
11 changes: 11 additions & 0 deletions extensions/opentelemetry/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@
<artifactId>quarkus-junit5-internal</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand All @@ -211,6 +216,12 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<version>1.42.1</version>
brunobat marked this conversation as resolved.
Show resolved Hide resolved
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package io.quarkus.opentelemetry.runtime.tracing.mutiny;

import java.util.Deque;
import java.util.LinkedList;
import java.util.Optional;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.quarkus.opentelemetry.runtime.QuarkusContextStorage;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;

public class MutinyTracingHelper {

private static final String SPAN_STACK = "SPAN_STACK";

/**
* Wraps the given pipeline with a span with the given name. Ensures that subspans find the current span as context,
* by running on a duplicated context. The span will be closed when the pipeline completes.
* If there is already a span in the current context, it will be used as parent for the new span.
* <p>
* Use as follows:
* Given this existing pipeline:
* ```java
* Uni.createFrom().item("Hello")
* .onItem().transform(s -> s + " World")
* .subscribe().with(System.out::println);
* ```
* wrap like this:
* ```java
* Uni.createFrom().item("Hello")
* .onItem().transformToUni(s -> wrapWithSpan(tracer, "mySpan", Uni.createFrom().item(s + " World")))
* .subscribe().with(System.out::println);
* ```
* <p>
* it also works with multi:
* ```java
* Multi.createFrom().items("Alice", "Bob", "Charlie")
* .onItem().transform(name -> "Hello " + name)
* .subscribe().with(System.out::println);
* ```
* wrap like this:
* ```java
* Multi.createFrom().items("Alice", "Bob", "Charlie")
* .onItem().transformToUni(s -> wrapWithSpan(tracer, "mySpan", Uni.createFrom().item("Hello " + s)
* .onItem().transform(name -> "Hello " + name)
* ))
* .subscribe().with(System.out::println);
* ```
*
* @param <T> the type of the result of the pipeline
* @param spanName
* the name of the span that should be created
* @param pipeline
* the pipeline to run within the span
*
* @return the result of the pipeline
*/
public static <T> Uni<T> wrapWithSpan(final Tracer tracer, final String spanName, final Uni<T> pipeline) {

return wrapWithSpan(tracer, Optional.of(io.opentelemetry.context.Context.current()), spanName, pipeline);
}

/**
* see {@link #wrapWithSpan(Tracer, String, Uni)}
* use this method if you manually want to specify the parent context of the new span
* or if you want to ensure the new span is a root span.
*
* @param <T>
* @param parentContext
* the parent context to use for the new span. If empty, a new root span will be created.
* @param spanName
* the name of the span that should be created
* @param pipeline
* the pipeline to run within the span
*
* @return the result of the pipeline
*/
public static <T> Uni<T> wrapWithSpan(final Tracer tracer,
final Optional<io.opentelemetry.context.Context> parentContext,
final String spanName, final Uni<T> pipeline) {

//creates duplicate context, if the current context is not a duplicated one and not null
//Otherwise returns the current context or null
final Context context = QuarkusContextStorage.getVertxContext();

return Uni.createFrom().voidItem()
.emitOn(runnable -> {
if (context != null) {
context.runOnContext(v -> runnable.run());
} else {
runnable.run();
}
})
.withContext((uni, ctx) -> {
return uni
.invoke(m -> startSpan(tracer, parentContext, ctx, spanName))
.replaceWith(pipeline)
.eventually(() -> endSpanCloseScope(ctx));
});
}

private static void startSpan(final Tracer tracer, final Optional<io.opentelemetry.context.Context> tracingData,
final io.smallrye.mutiny.Context ctx, final String spanName) {
final SpanBuilder spanBuilder = tracer.spanBuilder(spanName);
if (tracingData.isPresent()) {
spanBuilder.setParent(tracingData.get());
} else {
spanBuilder.setNoParent();
}

final Span span = spanBuilder.startSpan();
final Scope scope = QuarkusContextStorage.INSTANCE.attach(io.opentelemetry.context.Context.current().with(span));

storeSpanAndScope(ctx, span, scope);
}

private static void storeSpanAndScope(final io.smallrye.mutiny.Context ctx, final Span span, final Scope scope) {
final SpanAndScope spanAndScope = new SpanAndScope(span, scope);

final Deque<SpanAndScope> stack = ctx.getOrElse(SPAN_STACK, LinkedList::new);
stack.push(spanAndScope);
ctx.put(SPAN_STACK, stack);
ctx.put("CURRENT_SPAN", span);
ctx.put("CURRENT_SCOPE", scope);
}

private static void endSpanCloseScope(final io.smallrye.mutiny.Context ctx) {
final Deque<SpanAndScope> stack = ctx.getOrElse(SPAN_STACK, () -> null);
if (stack != null && !stack.isEmpty()) {
final SpanAndScope spanAndScope = stack.pop();
spanAndScope.span().end();
spanAndScope.scope().close();
}
}

private record SpanAndScope(Span span, Scope scope) {
}
}
Loading
Loading