Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for multiple services per endoint #22

Merged
merged 8 commits into from
Jan 18, 2024
Merged
2 changes: 1 addition & 1 deletion CODE_OF_CONDUCT.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Examples of unacceptable behavior by participants include:

## Our Responsibilities

Project maintainers are responsible for clarifying the standards of acceptable
Project maintainers are responsible for clarifying the standards of acceptor
behavior and are expected to take appropriate and fair corrective action in
response to any instances of unacceptable behavior.

Expand Down
40 changes: 33 additions & 7 deletions src/main/java/com/meta/cp4m/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
import com.meta.cp4m.llm.LLMPlugin;
import com.meta.cp4m.message.Message;
import com.meta.cp4m.message.MessageHandler;
import com.meta.cp4m.message.RequestProcessor;
import com.meta.cp4m.message.ThreadState;
import com.meta.cp4m.routing.Route;
import com.meta.cp4m.store.ChatStore;
import io.javalin.Javalin;
import io.javalin.http.Context;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
Expand All @@ -40,19 +42,25 @@ public Service(
this.path = path;
}

void handle(Context ctx) {
List<T> messages = handler.processRequest(ctx);
<IN> void handler(Context ctx, IN in, RequestProcessor<IN, T> processor) {
List<T> messages = null;
try {
messages = processor.process(ctx, in);
} catch (Exception e) {
LOGGER
.atError()
.addKeyValue("body", ctx.body())
.addKeyValue("headers", ctx.headerMap())
.setMessage("unable to process request")
.log();
}
// TODO: once we have a non-volatile store, on startup send stored but not replied to messages
for (T m : messages) {
ThreadState<T> thread = store.add(m);
executorService.submit(() -> execute(thread));
}
}

public void register(Javalin app) {
handler.handlers().forEach(m -> app.addHandler(m, path, this::handle));
}

public String path() {
return path;
}
Expand All @@ -79,4 +87,22 @@ private void execute(ThreadState<T> thread) {
LOGGER.error("an error occurred while attempting to respond", e);
}
}

private <E> Route<E> toRoute(MessageHandler.RouteDetails<E, T> routeDetails) {
return new Route<>(
path,
routeDetails.handlerType(),
routeDetails.acceptor(),
(ctx, in) -> handler(ctx, in, routeDetails.requestProcessor()));
}

List<Route<?>> routes() {
List<MessageHandler.RouteDetails<?, T>> routeDetails = handler.routeDetails();
List<Route<?>> routes = new ArrayList<>(routeDetails.size());
for (MessageHandler.RouteDetails<?, T> routeDetail : routeDetails) {
Route<?> route = toRoute(routeDetail);
routes.add(route);
}
return routes;
}
}
74 changes: 66 additions & 8 deletions src/main/java/com/meta/cp4m/ServicesRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,23 @@
package com.meta.cp4m;

import com.google.common.base.Preconditions;
import com.meta.cp4m.routing.Route;
import io.javalin.Javalin;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import java.util.*;

import io.javalin.http.BadRequestResponse;
import io.javalin.http.Context;
import io.javalin.http.HandlerType;
import org.checkerframework.common.returnsreceiver.qual.This;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServicesRunner implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(ServicesRunner.class);
private final Javalin app = Javalin.create();
private final Set<Service<?>> services = new HashSet<>();
private final Set<Service<?>> services = new LinkedHashSet<>();
private boolean started = false;
private int port = 8080;

Expand All @@ -28,7 +35,59 @@ public static ServicesRunner newInstance() {
return new ServicesRunner();
}

private <T> boolean didAcceptAndHandle(Context ctx, Route<T> route) {
Optional<T> acceptorOutput = route.acceptor().accept(ctx);
if (acceptorOutput.isPresent()) {
try {
route.handler().handle(ctx, acceptorOutput.get());
} catch (Exception e) {
throw new BadRequestResponse("Unable to process request");
}
return true;
}
return false;
}

/**
* Find the first route that will accept this payload and then handle the payload
*
* @param ctx context from Javalin
* @param routes the routes to check for acceptability and process if accepted
*/
private void routeSelectorAndHandler(Context ctx, List<Route<?>> routes) {
for (Route<?> route : routes) {
if (didAcceptAndHandle(ctx, route)) {
return;
}
}
LOGGER
.atError()
.setMessage("Unable to handle incoming webhook")
.addKeyValue("body", ctx.body())
.addKeyValue("headers", ctx.headerMap())
.log();
throw new BadRequestResponse("unable to handle webhook");
}

public @This ServicesRunner start() {
record RouteGroup(String path, HandlerType handlerType) {}
Map<RouteGroup, List<Route<?>>> routeGroups = new HashMap<>();
for (Service<?> s : services) { // this is not a stream because order matters here
s.routes()
.forEach(
r ->
routeGroups
.computeIfAbsent(
new RouteGroup(r.path(), r.handlerType()), k -> new ArrayList<>())
.add(r));
}
routeGroups.forEach(
(routeGroup, routes) ->
app.addHandler(
routeGroup.handlerType(),
routeGroup.path(),
ctx -> this.routeSelectorAndHandler(ctx, routes)));

if (!started) {
started = true;
app.start(port);
Expand All @@ -38,9 +97,8 @@ public static ServicesRunner newInstance() {

public @This ServicesRunner service(Service<?> service) {
Preconditions.checkState(!started, "cannot add service, server already started");
if (services.add(service)) {
service.register(app);
}

services.add(service);
return this;
}

Expand Down
Loading