Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(server): use executor for returning tasks to client #1055

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions server/src/main/java/io/littlehorse/common/util/LHProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,13 @@ public LHProducer(LHServerConfig config) {
}

public Future<RecordMetadata> send(String key, AbstractCommand<?> t, String topic, Callback cb, Header... headers) {
return sendRecord(new ProducerRecord<>(topic, null, key, new Bytes(t.toBytes()), List.of(headers)), cb);
return doSend(new ProducerRecord<>(topic, null, key, new Bytes(t.toBytes()), List.of(headers)), cb);
}

public Future<RecordMetadata> send(String key, AbstractCommand<?> t, String topic) {
return this.send(key, t, topic, null);
}

public Future<RecordMetadata> sendRecord(ProducerRecord<String, Bytes> record, Callback cb) {
private Future<RecordMetadata> doSend(ProducerRecord<String, Bytes> record, Callback cb) {
return (cb != null) ? prod.send(record, cb) : prod.send(record);
}

public Future<RecordMetadata> sendToPartition(String key, AbstractCommand<?> val, String topic, int partition) {
Bytes valBytes = val == null ? null : new Bytes(val.toBytes());
return sendRecord(new ProducerRecord<>(topic, partition, key, valBytes), null);
}

public void close() {
this.prod.close();
}
Expand Down
6 changes: 5 additions & 1 deletion server/src/main/java/io/littlehorse/server/LHServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.header.Headers;
Expand All @@ -67,7 +68,10 @@ public class LHServer {
private Context.Key<RequestExecutionContext> contextKey = Context.key("executionContextKey");
private final MetadataCache metadataCache;
private final CoreStoreProvider coreStoreProvider;

@Getter
private final ScheduledExecutorService networkThreadpool;

private final List<LHServerListener> listeners;

private RequestExecutionContext requestContext() {
Expand Down Expand Up @@ -338,6 +342,6 @@ public LHHostInfo getAdvertisedHost(
}

public void onEventThrown(WorkflowEventModel event) {
internalComms.onWorkflowEventThrown(event);
networkThreadpool.submit(() -> internalComms.onWorkflowEventThrown(event));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.ProcessorContext;
Expand All @@ -29,6 +30,7 @@ public class LHTaskManager {
private final String timerTopicName;
private final String commandTopicName;
private final AuthorizationContext authContext;
private final ExecutorService networkThreadPool;

private final ProcessorContext<String, CommandProcessorOutput> processorContext;
private final TaskQueueManager taskQueueManager;
Expand All @@ -40,13 +42,15 @@ public LHTaskManager(
AuthorizationContext authContext,
ProcessorContext<String, CommandProcessorOutput> processorContext,
TaskQueueManager taskQueueManager,
TenantScopedStore coreStore) {
TenantScopedStore coreStore,
ExecutorService networkThreadPool) {
this.timerTopicName = timerTopicName;
this.commandTopicName = commandTopicName;
this.authContext = authContext;
this.processorContext = processorContext;
this.taskQueueManager = taskQueueManager;
this.coreStore = coreStore;
this.networkThreadPool = networkThreadPool;
}

/**
Expand Down Expand Up @@ -89,8 +93,8 @@ void forwardPendingTasks() {
ScheduledTaskModel scheduledTask = entry.getValue();
if (scheduledTask != null) {
this.coreStore.put(scheduledTask);
taskQueueManager.onTaskScheduled(
taskId, scheduledTask.getTaskDefId(), scheduledTask, authContext.tenantId());
networkThreadPool.submit(() -> taskQueueManager.onTaskScheduled(
taskId, scheduledTask.getTaskDefId(), scheduledTask, authContext.tenantId()));
} else {
this.coreStore.delete(scheduledTaskId, StoreableType.SCHEDULED_TASK);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public LHTaskManager getTaskManager() {
authContext,
processorContext,
globalTaskQueueManager,
coreStore);
coreStore,
server.getNetworkThreadpool());
return currentTaskManager;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.littlehorse.server.streams.util.MetadataCache;
import java.time.Duration;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.Headers;
Expand All @@ -53,6 +54,7 @@ public class CommandProcessor implements Processor<String, Command, String, Comm
private final LHServer server;
private final MetadataCache metadataCache;
private final TaskQueueManager globalTaskQueueManager;
private final ExecutorService networkThreadPool;

private KeyValueStore<String, Bytes> nativeStore;
private KeyValueStore<String, Bytes> globalStore;
Expand All @@ -70,6 +72,7 @@ public CommandProcessor(
this.metadataCache = metadataCache;
this.globalTaskQueueManager = globalTaskQueueManager;
this.exceptionHandler = new LHProcessingExceptionHandler(server);
this.networkThreadPool = server.getNetworkThreadpool();
}

@Override
Expand Down Expand Up @@ -105,7 +108,19 @@ private void processHelper(final Record<String, Command> commandRecord) {
.setResult(response.toByteString())
.build();

server.onResponseReceived(command.getCommandId(), cmdReply);
// The 'onResponseReceived' method can involve waiting on a lock in the AsyncWaiters class;
// we don't want to do that here so submit to an executor for async processing.
//
// LHServer#onResponseReceived()
// BackendInternalComms#onResponseReceived()
// AsyncWaiters#registerCommandProcessed()
// CommandWaiter#setResponseAndMaybeComplete()
//
// The CommandWaiter method involves blocking on a lock. No need to hold up the stream thread
// for this.
networkThreadPool.submit(() -> {
server.onResponseReceived(command.getCommandId(), cmdReply);
});
}
} catch (KafkaException ke) {
throw ke;
Expand Down
Loading