diff --git a/server/src/main/java/io/littlehorse/common/util/LHProducer.java b/server/src/main/java/io/littlehorse/common/util/LHProducer.java index aa6618d91..2172505fa 100644 --- a/server/src/main/java/io/littlehorse/common/util/LHProducer.java +++ b/server/src/main/java/io/littlehorse/common/util/LHProducer.java @@ -21,22 +21,13 @@ public LHProducer(LHServerConfig config) { } public Future send(String key, AbstractCommand t, String topic, Callback cb, Header... headers) { - return sendRecord(new ProducerRecord<>(topic, null, key, new Bytes(t.toBytes()), List.of(headers)), cb); + return doSend(new ProducerRecord<>(topic, null, key, new Bytes(t.toBytes()), List.of(headers)), cb); } - public Future send(String key, AbstractCommand t, String topic) { - return this.send(key, t, topic, null); - } - - public Future sendRecord(ProducerRecord record, Callback cb) { + private Future doSend(ProducerRecord record, Callback cb) { return (cb != null) ? prod.send(record, cb) : prod.send(record); } - public Future sendToPartition(String key, AbstractCommand val, String topic, int partition) { - Bytes valBytes = val == null ? null : new Bytes(val.toBytes()); - return sendRecord(new ProducerRecord<>(topic, partition, key, valBytes), null); - } - public void close() { this.prod.close(); } diff --git a/server/src/main/java/io/littlehorse/server/LHServer.java b/server/src/main/java/io/littlehorse/server/LHServer.java index 13aaae957..cc76c8403 100644 --- a/server/src/main/java/io/littlehorse/server/LHServer.java +++ b/server/src/main/java/io/littlehorse/server/LHServer.java @@ -46,6 +46,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.header.Headers; @@ -67,7 +68,10 @@ public class LHServer { private Context.Key contextKey = Context.key("executionContextKey"); private final MetadataCache metadataCache; private final CoreStoreProvider coreStoreProvider; + + @Getter private final ScheduledExecutorService networkThreadpool; + private final List listeners; private RequestExecutionContext requestContext() { @@ -338,6 +342,6 @@ public LHHostInfo getAdvertisedHost( } public void onEventThrown(WorkflowEventModel event) { - internalComms.onWorkflowEventThrown(event); + networkThreadpool.submit(() -> internalComms.onWorkflowEventThrown(event)); } } diff --git a/server/src/main/java/io/littlehorse/server/streams/topology/core/LHTaskManager.java b/server/src/main/java/io/littlehorse/server/streams/topology/core/LHTaskManager.java index c52babc23..eda8ee351 100644 --- a/server/src/main/java/io/littlehorse/server/streams/topology/core/LHTaskManager.java +++ b/server/src/main/java/io/littlehorse/server/streams/topology/core/LHTaskManager.java @@ -12,6 +12,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -29,6 +30,7 @@ public class LHTaskManager { private final String timerTopicName; private final String commandTopicName; private final AuthorizationContext authContext; + private final ExecutorService networkThreadPool; private final ProcessorContext processorContext; private final TaskQueueManager taskQueueManager; @@ -40,13 +42,15 @@ public LHTaskManager( AuthorizationContext authContext, ProcessorContext processorContext, TaskQueueManager taskQueueManager, - TenantScopedStore coreStore) { + TenantScopedStore coreStore, + ExecutorService networkThreadPool) { this.timerTopicName = timerTopicName; this.commandTopicName = commandTopicName; this.authContext = authContext; this.processorContext = processorContext; this.taskQueueManager = taskQueueManager; this.coreStore = coreStore; + this.networkThreadPool = networkThreadPool; } /** @@ -89,8 +93,8 @@ void forwardPendingTasks() { ScheduledTaskModel scheduledTask = entry.getValue(); if (scheduledTask != null) { this.coreStore.put(scheduledTask); - taskQueueManager.onTaskScheduled( - taskId, scheduledTask.getTaskDefId(), scheduledTask, authContext.tenantId()); + networkThreadPool.submit(() -> taskQueueManager.onTaskScheduled( + taskId, scheduledTask.getTaskDefId(), scheduledTask, authContext.tenantId())); } else { this.coreStore.delete(scheduledTaskId, StoreableType.SCHEDULED_TASK); } diff --git a/server/src/main/java/io/littlehorse/server/streams/topology/core/ProcessorExecutionContext.java b/server/src/main/java/io/littlehorse/server/streams/topology/core/ProcessorExecutionContext.java index 07a540b80..7a568cc3c 100644 --- a/server/src/main/java/io/littlehorse/server/streams/topology/core/ProcessorExecutionContext.java +++ b/server/src/main/java/io/littlehorse/server/streams/topology/core/ProcessorExecutionContext.java @@ -102,7 +102,8 @@ public LHTaskManager getTaskManager() { authContext, processorContext, globalTaskQueueManager, - coreStore); + coreStore, + server.getNetworkThreadpool()); return currentTaskManager; } diff --git a/server/src/main/java/io/littlehorse/server/streams/topology/core/processors/CommandProcessor.java b/server/src/main/java/io/littlehorse/server/streams/topology/core/processors/CommandProcessor.java index 8e37396c6..5a8bfdbde 100644 --- a/server/src/main/java/io/littlehorse/server/streams/topology/core/processors/CommandProcessor.java +++ b/server/src/main/java/io/littlehorse/server/streams/topology/core/processors/CommandProcessor.java @@ -35,6 +35,7 @@ import io.littlehorse.server.streams.util.MetadataCache; import java.time.Duration; import java.util.Date; +import java.util.concurrent.ExecutorService; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.header.Headers; @@ -53,6 +54,7 @@ public class CommandProcessor implements Processor nativeStore; private KeyValueStore globalStore; @@ -70,6 +72,7 @@ public CommandProcessor( this.metadataCache = metadataCache; this.globalTaskQueueManager = globalTaskQueueManager; this.exceptionHandler = new LHProcessingExceptionHandler(server); + this.networkThreadPool = server.getNetworkThreadpool(); } @Override @@ -105,7 +108,19 @@ private void processHelper(final Record commandRecord) { .setResult(response.toByteString()) .build(); - server.onResponseReceived(command.getCommandId(), cmdReply); + // The 'onResponseReceived' method can involve waiting on a lock in the AsyncWaiters class; + // we don't want to do that here so submit to an executor for async processing. + // + // LHServer#onResponseReceived() + // BackendInternalComms#onResponseReceived() + // AsyncWaiters#registerCommandProcessed() + // CommandWaiter#setResponseAndMaybeComplete() + // + // The CommandWaiter method involves blocking on a lock. No need to hold up the stream thread + // for this. + networkThreadPool.submit(() -> { + server.onResponseReceived(command.getCommandId(), cmdReply); + }); } } catch (KafkaException ke) { throw ke;