Skip to content

Commit

Permalink
Reactor without Netty
Browse files Browse the repository at this point in the history
  • Loading branch information
meiao committed Nov 20, 2023
1 parent fe6fd79 commit 58ea287
Show file tree
Hide file tree
Showing 17 changed files with 817 additions and 0 deletions.
17 changes: 17 additions & 0 deletions instrumentation/reactor-3.3.0/NOTICE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
This product contains a modified part of OpenTelemetry:

* License:

Copyright 2019 The OpenTelemetry Authors

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied. See the License for the specific language governing permissions and limitations under
the License.

* Homepage: https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/LICENSE
7 changes: 7 additions & 0 deletions instrumentation/reactor-3.3.0/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Reactor Instrumentation

Instrumentation for Reactor Core library code.

This instrumentation module is a subset of the `netty-reactor-0.9.0` instrumentation. It does not contain anything related to HTTP nor starting transactions and has added Skips for when `reactor-netty` classes are present.

The contents of the `netty-reactor` module were not moved to this module because it would cause the `tokenLift` to register twice in the `Hooks` class.
12 changes: 12 additions & 0 deletions instrumentation/reactor-3.3.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
dependencies {
implementation(project(":agent-bridge"))
implementation("io.projectreactor:reactor-core:3.3.21.RELEASE")
}

jar {
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.reactor-3.3.0' }
}

verifyInstrumentation {
passesOnly 'io.projectreactor:reactor-core:[3.3.0.RELEASE,)'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.instrumentation;

import com.newrelic.api.agent.NewRelic;

public class ReactorConfig {
public static final boolean errorsEnabled = NewRelic.getAgent().getConfig()
.getValue("reactor.errors.enabled", false);

private ReactorConfig() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package com.nr.instrumentation.reactor;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Token;
import com.newrelic.api.agent.Trace;
import com.nr.instrumentation.ReactorConfig;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

import java.util.function.BiFunction;
import java.util.function.Function;

/**
* Implementation of a reactor.core.CoreSubscriber (a Context aware subscriber) that can be added as
* a lifecycle hook on Flux/Mono operators to propagate, retrieve, and link Tokens across async contexts.
*
* Based on OpenTelemetry code:
* https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingSubscriber.java
* @param <T>
*/
public class TokenLinkingSubscriber<T> implements CoreSubscriber<T> {
private final Token token;
private final Subscriber<? super T> subscriber;
private Context context;

public TokenLinkingSubscriber(Subscriber<? super T> subscriber, Context ctx) {
this.subscriber = subscriber;
this.context = ctx;
// newrelic-token is added by spring-webflux instrumentation of ServerWebExchange
this.token = ctx.getOrDefault("newrelic-token", null);
}

@Override
public void onSubscribe(Subscription subscription) {
withNRToken(() -> subscriber.onSubscribe(subscription));
}

@Override
public void onNext(T o) {
withNRToken(() -> subscriber.onNext(o));
}

@Override
public void onError(Throwable throwable) {
withNRError(() -> subscriber.onError(throwable), throwable);
}

@Override
public void onComplete() {
withNRToken(subscriber::onComplete);
}

@Override
public Context currentContext() {
return context;
}

@Trace(async = true, excludeFromTransactionTrace = true)
private void withNRToken(Runnable runnable) {
if (token != null && AgentBridge.getAgent().getTransaction(false) == null) {
token.link();
}
runnable.run();
}

@Trace(async = true, excludeFromTransactionTrace = true)
private void withNRError(Runnable runnable, Throwable throwable) {
if (token != null && token.isActive()) {
token.link();
if (ReactorConfig.errorsEnabled) {
NewRelic.noticeError(throwable);
}
}
runnable.run();
}

public static <T> Function<? super Publisher<T>, ? extends Publisher<T>> tokenLift() {
return Operators.lift(new TokenLifter<>());
}

private static class TokenLifter<T>
implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {

public TokenLifter() {
}

@Override
public CoreSubscriber<? super T> apply(Scannable publisher, CoreSubscriber<? super T> sub) {
// if Flux/Mono #just, #empty, #error
if (publisher instanceof Fuseable.ScalarCallable) {
return sub;
}
Token token = sub.currentContext().getOrDefault("newrelic-token", null);
if (token != null ) {
return new TokenLinkingSubscriber<>(sub, sub.currentContext());
}
return sub;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package reactor.core.publisher;

import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;

import java.util.concurrent.atomic.AtomicBoolean;

@Weave(originalName = "reactor.core.publisher.Hooks")
public abstract class Hooks_Instrumentation {

/*
* Note that sub-hooks are cumulative. We want to avoid setting the same sub-hooks
* more than once, so we set this boolean to true the first time we set a sub-hook.
* if (!Hooks_Instrumentation.instrumented.getAndSet(true)) { Hooks.onEachOperator(...) }
*/
@NewField
public static AtomicBoolean instrumented = new AtomicBoolean(false);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package reactor.core.publisher;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Token;
import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.WeaveAllConstructors;
import com.newrelic.api.agent.weaver.Weaver;
import reactor.util.context.Context;

@Weave(originalName = "reactor.core.publisher.LambdaMonoSubscriber")
abstract class LambdaMonoSubscriber_Instrumentation {
@NewField
private Context nrContext;
final Context initialContext = Weaver.callOriginal();

@WeaveAllConstructors
protected LambdaMonoSubscriber_Instrumentation() {
// LamdaMonoSubscriber creates a new Context, so we create a new token and put it on the Context
// to be linked by TokenLinkingSubscriber but expired on onComplete here
if (AgentBridge.getAgent().getTransaction(false) != null
&& initialContext.getOrDefault("newrelic-token", null) == null) {
nrContext = Context.of("newrelic-token", NewRelic.getAgent().getTransaction().getToken());
}
}

public final void onComplete() {
Token token = this.currentContext().getOrDefault("newrelic-token", null);
if (token != null) {
token.expire();
this.nrContext = null;
}
Weaver.callOriginal();
}

public final void onError(Throwable t) {
Token token = this.currentContext().getOrDefault("newrelic-token", null);
if (token != null) {
token.expire();
this.nrContext = null;
}
Weaver.callOriginal();
}

public Context currentContext() {
if (nrContext != null) {
return initialContext.putAll(nrContext);
}
return Weaver.callOriginal();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package reactor.core.publisher;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Token;
import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.WeaveAllConstructors;
import com.newrelic.api.agent.weaver.Weaver;
import reactor.util.context.Context;

@Weave(originalName = "reactor.core.publisher.LambdaSubscriber")
abstract class LambdaSubscriber_Instrumentation {
final Context initialContext = Weaver.callOriginal();
@NewField
private Context nrContext;

@WeaveAllConstructors
protected LambdaSubscriber_Instrumentation() {
if (AgentBridge.getAgent().getTransaction(false) != null
&& initialContext.getOrDefault("newrelic-token", null) == null) {
nrContext = Context.of("newrelic-token", NewRelic.getAgent().getTransaction().getToken());
}
}

public final void onComplete() {
Token token = this.currentContext().getOrDefault("newrelic-token", null);
if (token != null) {
token.expire();
this.nrContext = null;
}
Weaver.callOriginal();
}

public final void onError(Throwable t) {
Token token = this.currentContext().getOrDefault("newrelic-token", null);
if (token != null) {
token.expire();
this.nrContext = null;
}
Weaver.callOriginal();
}

public Context currentContext() {
if (nrContext != null) {
//return nrContext;
return initialContext.putAll(nrContext);
}
return Weaver.callOriginal();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package reactor.core.scheduler;

import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;

@Weave(originalName = "reactor.core.scheduler.InstantPeriodicWorkerTask")
final class InstantPeriodicWorkerTask_Instrumentation {

// We need to be able to link the Token here when executing on a supplied Scheduler
// A Token should be available on the thread that this task executes on if tokenLift() was added to Hooks.onEachOperator
@Trace(async = true, excludeFromTransactionTrace = true)
public Void call() {
return Weaver.callOriginal();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package reactor.core.scheduler;

import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;

@Weave(originalName = "reactor.core.scheduler.PeriodicWorkerTask")
final class PeriodicWorkerTask_Instrumentation {

// We need to be able to link the Token here when executing on a supplied Scheduler
// A Token should be available on the thread that this task executes on if tokenLift() was added to Hooks.onEachOperator
@Trace(async = true, excludeFromTransactionTrace = true)
public Void call() {
return Weaver.callOriginal();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
*
* * Copyright 2020 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package reactor.core.scheduler;

import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;

@Weave(originalName = "reactor.core.scheduler.SchedulerTask")
final class SchedulerTask_Instrumentation {

// We need to be able to link the Token here when executing on a supplied Scheduler via Mono::publishOn
// A Token should be available on the thread that this task executes on if tokenLift() was added to Hooks.onEachOperator
@Trace(async = true, excludeFromTransactionTrace = true)
public Void call() {
return Weaver.callOriginal();
}
}
Loading

0 comments on commit 58ea287

Please sign in to comment.