Skip to content

Commit

Permalink
Merge pull request #192 from jd-opensource/191-fix-the-issue-where-ex…
Browse files Browse the repository at this point in the history
…cluded-executors-were-not-filtered-out-in-transmission

Fix issue #191
  • Loading branch information
hexiaofeng authored Jan 2, 2025
2 parents 7a37415 + 554c5ea commit 050b7c4
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,25 +57,61 @@ public static class ThreadConfig {

private Set<String> excludeTaskPrefixes = new HashSet<>();

/**
* Checks if the given executor type is excluded.
*
* @param type The class type of the executor to check.
* @return {@code true} if the executor type is excluded, {@code false} otherwise.
*/
public boolean isExcludedExecutor(Class<?> type) {
return type != null && isExcludedExecutor(type.getName());
}

/**
* Checks if the given executor name is excluded.
*
* @param name The name of the executor to check.
* @return {@code true} if the executor name is excluded, {@code false} otherwise.
*/
public boolean isExcludedExecutor(String name) {
return name == null || excludeExecutors.contains(name);
return name != null && excludeExecutors.contains(name);
}

/**
* Checks if the given task type is excluded.
*
* @param type The class type of the task to check.
* @return {@code true} if the task type is excluded, {@code false} otherwise.
*/
public boolean isExcludedTask(Class<?> type) {
return type != null && isExcludedTask(type.getName());
}

/**
* Checks if the given task name is excluded.
*
* @param name The name of the task to check.
* @return {@code true} if the task name is excluded or matches any excluded prefix, {@code false} otherwise.
*/
public boolean isExcludedTask(String name) {
return name == null || excludeTasks.contains(name) || isExcludedTaskPrefix(name);
return name != null && (excludeTasks.contains(name) || isExcludedTaskPrefix(name));
}

protected boolean isExcludedTaskPrefix(String name) {
if (name == null) {
return false;
}
/**
* Checks if the given task name matches any excluded prefix.
*
* @param name The name of the task to check.
* @return {@code true} if the task name starts with any excluded prefix, {@code false} otherwise.
*/
private boolean isExcludedTaskPrefix(String name) {
for (String prefix : excludeTaskPrefixes) {
if (name.startsWith(prefix)) {
return true;
}
}
return false;
}

}

}
Expand Down
34 changes: 13 additions & 21 deletions joylive-package/src/main/assembly/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -218,29 +218,15 @@ agent:
- x-service-
thread:
excludeExecutors:
- org.apache.dubbo.common.threadpool.ThreadlessExecutor
- org.apache.tomcat.util.threads.ThreadPoolExecutor
- org.apache.tomcat.util.threads.ScheduledThreadPoolExecutor
- org.apache.tomcat.util.threads.InlineExecutorService
- javax.management.NotificationBroadcasterSupport$1
- io.grpc.stub.ClientCalls.ThreadlessExecutor
- io.grpc.SynchronizationContext
- io.netty.channel.MultithreadEventLoopGroup
- io.netty.channel.nio.NioEventLoop
- io.netty.channel.SingleThreadEventLoop
- io.netty.channel.kqueue.KQueueEventLoopGroup
- io.netty.channel.kqueue.KQueueEventLoop
- io.netty.util.concurrent.MultithreadEventExecutorGroup
- io.netty.util.concurrent.AbstractEventExecutorGroup
- io.netty.util.concurrent.ThreadPerTaskExecutor
- io.netty.util.concurrent.GlobalEventExecutor
- io.netty.util.concurrent.AbstractScheduledEventExecutor
- io.netty.util.concurrent.AbstractEventExecutor
- io.netty.util.concurrent.DefaultEventExecutor
- io.netty.util.concurrent.SingleThreadEventExecutor
- io.netty.util.internal.ThreadExecutorMap$1
- reactor.core.scheduler.BoundedElasticScheduler$BoundedScheduledExecutorService
- reactor.netty.resources.ColocatedEventLoopGroup
- org.apache.tomcat.util.threads.ThreadPoolExecutor
- org.apache.tomcat.util.threads.ScheduledThreadPoolExecutor
- org.apache.tomcat.util.threads.InlineExecutorService
- javax.management.NotificationBroadcasterSupport$1
- com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup
- com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.channel.MultithreadEventLoopGroup
- com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.util.concurrent.MultithreadEventExecutorGroup
Expand All @@ -261,6 +247,7 @@ agent:
- com.alibaba.nacos.shaded.io.grpc.SynchronizationContext
- com.alibaba.nacos.shaded.com.google.common.util.concurrent.DirectExecutor
excludeTasks:
- com.alibaba.nacos.common.executor.NameThreadFactory
- com.alibaba.nacos.shaded.io.grpc.internal.DnsNameResolver.Resolve
- com.alibaba.nacos.client.naming.backups.FailoverReactor$DiskFileWriter
- com.alibaba.nacos.client.naming.backups.FailoverReactor.SwitchRefresher
Expand All @@ -269,6 +256,7 @@ agent:
- com.alibaba.nacos.shaded.io.grpc.internal.DelayedClientTransport$5
- com.alibaba.nacos.shaded.io.grpc.internal.SerializingExecutor
- com.alibaba.nacos.shaded.io.grpc.internal.LogExceptionRunnable
- com.alibaba.nacos.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder$1
- com.jd.live.agent.core.thread.NamedThreadFactory
- com.jd.jr.sgm.client.disruptor.LogEventFactory
- com.jd.jr.sgm.client.util.AgentThreadFactory
Expand All @@ -282,6 +270,11 @@ agent:
- io.sermant.implement.service.xds.handler.XdsHandler.NamedThreadFactory
- io.sermant.discovery.factory.RealmServiceThreadFactory
- org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory
- sun.rmi.runtime.RuntimeUtil$1
- sun.rmi.transport.tcp.TCPTransport$1
- sun.rmi.transport.DGCImpl$1
- sun.rmi.transport.DGCAckHandler$1
- org.apache.tomcat.util.threads.TaskThreadFactory
excludeTaskPrefixes:
- reactor.core.scheduler.BoundedElasticScheduler$$Lambda
- org.springframework.cloud.commons.util.InetUtils$$Lambda$
Expand All @@ -290,9 +283,8 @@ agent:
- com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate$$Lambda$
- com.alibaba.nacos.common.remote.client.RpcClient$$Lambda$
- com.alibaba.nacos.common.utils.ThreadFactoryBuilder$$Lambda$
- sun.rmi.transport.tcp.TCPTransport$
- sun.rmi.transport.DGCImpl$
- sun.rmi.transport.DGCAckHandler$
- org.apache.catalina.core.ContainerBase$
- org.apache.catalina.core.StandardServer$$Lambda$
counter:
gateway: true
service: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.jd.live.agent.plugin.registry.dubbo.v2_7.condition;

import com.jd.live.agent.core.extension.annotation.ConditionalComposite;
import com.jd.live.agent.core.extension.annotation.ConditionalOnMissingClass;
import com.jd.live.agent.core.extension.annotation.ConditionalOnClass;
import com.jd.live.agent.governance.annotation.ConditionalOnDubboEnabled;

import java.lang.annotation.*;
Expand All @@ -25,7 +25,7 @@
@Retention(RetentionPolicy.RUNTIME)
@Documented
@ConditionalOnDubboEnabled
@ConditionalOnMissingClass(ConditionalOnDubbo27Enabled.TYPE_PROTOCOL_FILTER_WRAPPER)
@ConditionalOnClass(ConditionalOnDubbo27Enabled.TYPE_PROTOCOL_FILTER_WRAPPER)
@ConditionalComposite
public @interface ConditionalOnDubbo27Enabled {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.jd.live.agent.plugin.registry.dubbo.v3.condition;

import com.jd.live.agent.core.extension.annotation.ConditionalComposite;
import com.jd.live.agent.core.extension.annotation.ConditionalOnMissingClass;
import com.jd.live.agent.core.extension.annotation.ConditionalOnClass;
import com.jd.live.agent.governance.annotation.ConditionalOnDubboEnabled;

import java.lang.annotation.*;
Expand All @@ -25,7 +25,7 @@
@Retention(RetentionPolicy.RUNTIME)
@Documented
@ConditionalOnDubboEnabled
@ConditionalOnMissingClass(ConditionalOnDubbo3Enabled.TYPE_CONSUMER_CONTEXT_FILTER)
@ConditionalOnClass(ConditionalOnDubbo3Enabled.TYPE_CONSUMER_CONTEXT_FILTER)
@ConditionalComposite
public @interface ConditionalOnDubbo3Enabled {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package com.jd.live.agent.plugin.transmission.thread.interceptor;

import com.jd.live.agent.bootstrap.bytekit.context.ExecutableContext;
import com.jd.live.agent.bootstrap.logger.Logger;
import com.jd.live.agent.bootstrap.logger.LoggerFactory;
import com.jd.live.agent.core.plugin.definition.InterceptorAdaptor;
import com.jd.live.agent.core.thread.Camera;
import com.jd.live.agent.core.thread.Snapshot;
import com.jd.live.agent.governance.config.TransmitConfig;
import com.jd.live.agent.governance.config.TransmitConfig.ThreadConfig;
import com.jd.live.agent.plugin.transmission.thread.adapter.AbstractThreadAdapter;
import com.jd.live.agent.plugin.transmission.thread.adapter.CallableAdapter;
import com.jd.live.agent.plugin.transmission.thread.adapter.RunnableAdapter;
Expand All @@ -28,56 +30,42 @@
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.*;

/**
* ExecutorInterceptor
*/
public class ExecutorInterceptor extends InterceptorAdaptor {

private static final Logger logger = LoggerFactory.getLogger(ExecutorInterceptor.class);

private static final String FIELD_CALLABLE = "callable";

private final Field callableField;

private final Camera[] cameras;

private final TransmitConfig.ThreadConfig threadConfig;
private final ThreadConfig threadConfig;

private final Map<Class<?>, Boolean> excludes = new ConcurrentHashMap<>();
private final Map<Class<?>, Boolean> excludes = new ConcurrentHashMap<>(128);

public ExecutorInterceptor(List<Camera> cameras, TransmitConfig.ThreadConfig threadConfig) {
public ExecutorInterceptor(List<Camera> cameras, ThreadConfig threadConfig) {
this.cameras = cameras == null ? new Camera[0] : cameras.toArray(new Camera[0]);
this.threadConfig = threadConfig;
this.callableField = getCallableField();
}

private Field getCallableField() {
Field result = null;
try {
result = FutureTask.class.getDeclaredField(FIELD_CALLABLE);
result.setAccessible(true);
} catch (NoSuchFieldException ignore) {
}
return result;
}

private boolean isExcluded(Object task) {
return task != null && excludes.computeIfAbsent(task.getClass(), c -> threadConfig.isExcludedTask(c.getName()));
}

@Override
public void onEnter(ExecutableContext ctx) {
Object target = ctx.getTarget();
String name = target.getClass().getSimpleName();
Object[] arguments = ctx.getArguments();
if (arguments == null || arguments.length == 0 || cameras.length == 0) {
return;
} else if (target instanceof ThreadPoolExecutor
&& isExcluded(((ThreadPoolExecutor) target).getThreadFactory())) {
// filter sgm thread pool before unwrap
if (arguments == null
|| arguments.length == 0
|| cameras.length == 0
|| isExcludedExecutor(target)
|| target instanceof ThreadPoolExecutor
&& isExcludedThreadFactory(((ThreadPoolExecutor) target).getThreadFactory())) {
return;
}
Object argument = arguments[0];
Expand All @@ -86,7 +74,7 @@ && isExcluded(((ThreadPoolExecutor) target).getThreadFactory())) {
return;
} else if (unwrapped instanceof AbstractThreadAdapter) {
return;
} else if (isExcluded(unwrapped)) {
} else if (isExcludedTask(unwrapped)) {
return;
}

Expand All @@ -103,6 +91,88 @@ && isExcluded(((ThreadPoolExecutor) target).getThreadFactory())) {
}
}

/**
* Checks if the given thread factory is excluded by its class type.
* The result is cached in the {@code excludes} map to avoid repeated computations.
*
* @param factory The thread factory object to check.
* @return {@code true} if the thread factory is excluded, {@code false} otherwise.
*/
private boolean isExcludedThreadFactory(ThreadFactory factory) {
return factory != null && excludes.computeIfAbsent(factory.getClass(), this::isExcludedThreadFactoryType);
}

/**
* Checks if the given thread factory type is excluded.
* If the thread factory is excluded, logs an informational message.
*
* @param type The class type of the thread factory to check.
* @return {@code true} if the thread factory type is excluded, {@code false} otherwise.
*/
private boolean isExcludedThreadFactoryType(Class<?> type) {
if (threadConfig.isExcludedTask(type)) {
logger.info("Disable transmission in threads of factory " + type.getName());
return true;
}
logger.info("Enable transmission in threads of factory " + type.getName());
return false;
}


/**
* Checks if the given task is excluded by its class type.
* The result is cached in the {@code excludes} map to avoid repeated computations.
*
* @param task The task object to check.
* @return {@code true} if the task is excluded, {@code false} otherwise.
*/
private boolean isExcludedTask(Object task) {
return task != null && excludes.computeIfAbsent(task.getClass(), this::isExcludeTaskType);
}

/**
* Checks if the given task type is excluded.
* If the task is excluded, logs an informational message.
*
* @param type The class type of the task to check.
* @return {@code true} if the task type is excluded, {@code false} otherwise.
*/
private boolean isExcludeTaskType(Class<?> type) {
if (threadConfig.isExcludedTask(type)) {
logger.info("Disable transmission in task " + type.getName());
return true;
}
logger.info("Enable transmission in task " + type.getName());
return false;
}

/**
* Checks if the given executor is excluded by its class type.
* The result is cached in the {@code excludes} map to avoid repeated computations.
*
* @param executor The executor object to check.
* @return {@code true} if the executor is excluded, {@code false} otherwise.
*/
private boolean isExcludedExecutor(Object executor) {
return executor != null && excludes.computeIfAbsent(executor.getClass(), this::isExcludeExecutorType);
}

/**
* Checks if the given executor type is excluded.
* If the executor is excluded, logs an informational message.
*
* @param type The class type of the executor to check.
* @return {@code true} if the executor type is excluded, {@code false} otherwise.
*/
private boolean isExcludeExecutorType(Class<?> type) {
if (threadConfig.isExcludedExecutor(type)) {
logger.info("Disable transmission in executor " + type.getName());
return true;
}
logger.info("Enable transmission in executor " + type.getName());
return false;
}

/**
* Unwraps the provided argument object to retrieve its underlying value. If the argument is an instance
* of {@link AbstractThreadAdapter}, it is returned directly. If the argument is an instance of {@link FutureTask},
Expand All @@ -127,4 +197,20 @@ private Object unwrap(Object argument) {
return argument;
}

/**
* Retrieves the {@link Field} object representing the {@code callable} field in the {@link FutureTask} class.
* This method uses reflection to access the private field and makes it accessible.
*
* @return The {@link Field} object representing the {@code callable} field, or {@code null} if the field is not found.
*/
private static Field getCallableField() {
Field result = null;
try {
result = FutureTask.class.getDeclaredField(FIELD_CALLABLE);
result.setAccessible(true);
} catch (NoSuchFieldException ignore) {
}
return result;
}

}

0 comments on commit 050b7c4

Please sign in to comment.