Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for multiple services per endoint #22

Merged
merged 8 commits into from
Jan 18, 2024
40 changes: 33 additions & 7 deletions src/main/java/com/meta/cp4m/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
import com.meta.cp4m.llm.LLMPlugin;
import com.meta.cp4m.message.Message;
import com.meta.cp4m.message.MessageHandler;
import com.meta.cp4m.message.RequestProcessor;
import com.meta.cp4m.message.ThreadState;
import com.meta.cp4m.routing.Route;
import com.meta.cp4m.store.ChatStore;
import io.javalin.Javalin;
import io.javalin.http.Context;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
Expand All @@ -40,19 +42,25 @@ public Service(
this.path = path;
}

void handle(Context ctx) {
List<T> messages = handler.processRequest(ctx);
<IN> void handler(Context ctx, IN in, RequestProcessor<IN, T> processor) {
List<T> messages = null;
try {
messages = processor.process(ctx, in);
} catch (Exception e) {
LOGGER
.atError()
.addKeyValue("body", ctx.body())
.addKeyValue("headers", ctx.headerMap())
.setMessage("unable to process request")
.log();
}
// TODO: once we have a non-volatile store, on startup send stored but not replied to messages
for (T m : messages) {
ThreadState<T> thread = store.add(m);
executorService.submit(() -> execute(thread));
}
}

public void register(Javalin app) {
handler.handlers().forEach(m -> app.addHandler(m, path, this::handle));
}

public String path() {
return path;
}
Expand All @@ -79,4 +87,22 @@ private void execute(ThreadState<T> thread) {
LOGGER.error("an error occurred while attempting to respond", e);
}
}

private <E> Route<E> toRoute(MessageHandler.RouteDetails<E, T> routeDetails) {
return new Route<>(
path,
routeDetails.handlerType(),
routeDetails.acceptor(),
(ctx, in) -> handler(ctx, in, routeDetails.requestProcessor()));
}

List<Route<?>> routes() {
List<MessageHandler.RouteDetails<?, T>> routeDetails = handler.routeDetails();
List<Route<?>> routes = new ArrayList<>(routeDetails.size());
for (MessageHandler.RouteDetails<?, T> routeDetail : routeDetails) {
Route<?> route = toRoute(routeDetail);
routes.add(route);
}
return routes;
}
}
72 changes: 64 additions & 8 deletions src/main/java/com/meta/cp4m/ServicesRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,21 @@
package com.meta.cp4m;

import com.google.common.base.Preconditions;
import com.meta.cp4m.routing.Route;
import io.javalin.Javalin;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import io.javalin.http.BadRequestResponse;
import io.javalin.http.Context;
import io.javalin.http.HandlerType;
import java.util.*;
import org.checkerframework.common.returnsreceiver.qual.This;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServicesRunner implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(ServicesRunner.class);
private final Javalin app = Javalin.create();
private final Set<Service<?>> services = new HashSet<>();
private final Set<Service<?>> services = new LinkedHashSet<>();
private boolean started = false;
private int port = 8080;

Expand All @@ -28,7 +33,59 @@ public static ServicesRunner newInstance() {
return new ServicesRunner();
}

private <T> boolean didAcceptAndHandle(Context ctx, Route<T> route) {
Optional<T> acceptorOutput = route.acceptor().accept(ctx);
if (acceptorOutput.isPresent()) {
try {
route.handler().handle(ctx, acceptorOutput.get());
} catch (Exception e) {
throw new BadRequestResponse("Unable to process request");
}
return true;
}
return false;
}

/**
* Find the first route that will accept this payload and then handle the payload
*
* @param ctx context from Javalin
* @param routes the routes to check for acceptability and process if accepted
*/
private void routeSelectorAndHandler(Context ctx, List<Route<?>> routes) {
for (Route<?> route : routes) {
if (didAcceptAndHandle(ctx, route)) {
return;
}
}
LOGGER
.atError()
.setMessage("Unable to handle incoming webhook")
.addKeyValue("body", ctx.body())
.addKeyValue("headers", ctx.headerMap())
.log();
throw new BadRequestResponse("unable to handle webhook");
}

public @This ServicesRunner start() {
record RouteGroup(String path, HandlerType handlerType) {}
Map<RouteGroup, List<Route<?>>> routeGroups = new HashMap<>();
for (Service<?> s : services) { // this is not a stream because order matters here
s.routes()
.forEach(
r ->
routeGroups
.computeIfAbsent(
new RouteGroup(r.path(), r.handlerType()), k -> new ArrayList<>())
.add(r));
}
routeGroups.forEach(
(routeGroup, routes) ->
app.addHandler(
routeGroup.handlerType(),
routeGroup.path(),
ctx -> this.routeSelectorAndHandler(ctx, routes)));

if (!started) {
started = true;
app.start(port);
Expand All @@ -38,9 +95,8 @@ public static ServicesRunner newInstance() {

public @This ServicesRunner service(Service<?> service) {
Preconditions.checkState(!started, "cannot add service, server already started");
if (services.add(service)) {
service.register(app);
}

services.add(service);
return this;
}

Expand Down
89 changes: 31 additions & 58 deletions src/main/java/com/meta/cp4m/message/FBMessageHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,66 +91,13 @@ public FBMessageHandler(String verifyToken, String pageAccessToken, String appSe
: null;
}

@Override
public List<FBMessage> processRequest(Context ctx) {
try {
switch (ctx.handlerType()) {
case GET -> {
return getHandler(ctx);
}
case POST -> {
return postHandler(ctx);
}
}
} catch (JsonProcessingException | NullPointerException e) {
LOGGER
.atWarn()
.setMessage("Unable to parse message from Meta webhook")
.setCause(e)
.addKeyValue("body", ctx.body())
.addKeyValue("headers", ctx.headerMap())
.log();
throw new BadRequestResponse("Invalid body");
} catch (RuntimeException e) {
LOGGER.error(e.getMessage(), e);
throw e;
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw new RuntimeException(e);
}
throw new UnsupportedOperationException("Only accepting get and post methods");
}

private List<FBMessage> getHandler(Context ctx) {
MetaHandlerUtils.subscriptionVerification(ctx, verifyToken);
LOGGER.debug("Meta verified callback url successfully");
return Collections.emptyList();
}

@TestOnly
String hmac(String body) {
// TODO: refactor test so we don't need this
return MetaHandlerUtils.hmac(body, appSecret);
}

private List<FBMessage> postHandler(Context ctx) throws JsonProcessingException {
MetaHandlerUtils.postHeaderValidator(ctx, appSecret);

String bodyString = ctx.body();
JsonNode body = MAPPER.readTree(bodyString);
String object = body.get("object").textValue();
if (!object.equals("page") && !object.equals("instagram")) {
LOGGER
.atWarn()
.setMessage(
"received body with value of "
+ object
+ " for 'object', expected 'page' or 'instagram'")
.addKeyValue("body", bodyString)
.log();
return Collections.emptyList();
}
// TODO: need better validation
private List<FBMessage> postHandler(Context ctx, JsonNode body) {
JsonNode entries = body.get("entry");
ArrayList<FBMessage> output = new ArrayList<>();
for (JsonNode entry : entries) {
Expand Down Expand Up @@ -191,15 +138,15 @@ private List<FBMessage> postHandler(Context ctx) throws JsonProcessingException
LOGGER
.atWarn()
.setMessage("received message without text, unable to handle this")
.addKeyValue("body", bodyString)
.addKeyValue("body", body)
.log();
}
} else {
LOGGER
.atWarn()
.setMessage(
"received a message without a 'message' key, unable to handle this message type")
.addKeyValue("body", bodyString)
.addKeyValue("body", body)
.log();
}
}
Expand Down Expand Up @@ -263,7 +210,33 @@ private void send(String message, Identifier recipient, Identifier sender) throw
}

@Override
public Collection<HandlerType> handlers() {
return List.of(HandlerType.GET, HandlerType.POST);
public List<RouteDetails<?, FBMessage>> routeDetails() {
RouteDetails<JsonNode, FBMessage> postDetails =
new RouteDetails<>(
HandlerType.POST,
ctx -> {
@Nullable String contentType = ctx.contentType();
if (contentType != null
&& ContentType.parse(contentType).isSameMimeType(ContentType.APPLICATION_JSON)
&& MetaHandlerUtils.postHeaderValid(ctx, appSecret)) {
JsonNode body;
try {
body = MAPPER.readTree(ctx.body());
} catch (JsonProcessingException e) {
throw new BadRequestResponse("unable to parse body");
}
// TODO: need better validation
String expectedObjectValue =
connectedFacebookPageForInstagram == null ? "page" : "instagram";
@Nullable JsonNode objectNode = body.get("object");
if (objectNode != null && objectNode.textValue().equals(expectedObjectValue)) {
return Optional.of(body);
}
}
return Optional.empty();
},
this::postHandler);

return List.of(MetaHandlerUtils.subscriptionVerificationRouteDetails(verifyToken), postDetails);
}
}
18 changes: 4 additions & 14 deletions src/main/java/com/meta/cp4m/message/MessageHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,14 @@

package com.meta.cp4m.message;

import io.javalin.http.Context;
import com.meta.cp4m.routing.Acceptor;
import io.javalin.http.HandlerType;
import java.io.IOException;
import java.util.Collection;
import java.util.List;

public interface MessageHandler<T extends Message> {

/**
* Process incoming requests from the messaging service, including messages from the user.
*
* @param ctx the context corresponding to an incoming request
* @return return a {@link Message} object if appropriate
*/
List<T> processRequest(Context ctx);
record RouteDetails<IN, OUT extends Message>(
HandlerType handlerType, Acceptor<IN> acceptor, RequestProcessor<IN, OUT> requestProcessor) {}

/**
* The method needed to respond to a message from a user
Expand All @@ -31,8 +24,5 @@ public interface MessageHandler<T extends Message> {
*/
void respond(T message) throws IOException;

/**
* @return The different {@link HandlerType}s that this handler expects to receive
*/
Collection<HandlerType> handlers();
List<RouteDetails<?, T>> routeDetails();
}
40 changes: 40 additions & 0 deletions src/main/java/com/meta/cp4m/message/MetaHandlerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,17 @@

import io.javalin.http.Context;
import io.javalin.http.ForbiddenResponse;
import io.javalin.http.HandlerType;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import org.apache.hc.client5.http.utils.Hex;
import org.checkerframework.checker.nullness.qual.Nullable;

class MetaHandlerUtils {
static void subscriptionVerification(Context ctx, String verifyToken) {
Expand All @@ -27,6 +32,26 @@ static void subscriptionVerification(Context ctx, String verifyToken) {
ctx.result(String.valueOf(challenge));
}

static <T extends Message>
MessageHandler.RouteDetails<Integer, T> subscriptionVerificationRouteDetails(
String verifyToken) {
return new MessageHandler.RouteDetails<>(
HandlerType.GET,
ctx ->
// validateSubscription handles putting challenge into context response if it succeeds
{
if (Objects.equals(ctx.queryParam("hub.mode"), "subscribe")
&& Objects.equals(ctx.queryParam("hub.verify_token"), verifyToken)) {
return Optional.of(ctx.queryParamAsClass("hub.challenge", Integer.class).get());
}
return Optional.empty();
},
(ctx, challenge) -> {
ctx.result(String.valueOf(challenge));
return List.of();
});
}

static String hmac(String body, String appSecret) {
Mac sha256HMAC;
SecretKeySpec secretKey;
Expand Down Expand Up @@ -65,4 +90,19 @@ static void postHeaderValidator(Context ctx, String appSecret) {
"X-Hub-Signature-256 could not be validated")
.getOrThrow(ignored -> new ForbiddenResponse("X-Hub-Signature-256 could not be validated"));
}

static boolean postHeaderValid(Context ctx, String appSecret) {
@Nullable String sig = ctx.headerMap().get("X-Hub-Signature-256");
if (sig == null) {
return false;
}

String[] hashParts = sig.strip().split("=");
if (hashParts.length != 2) {
return false;
}

String calculatedHmac = hmac(ctx.body(), appSecret);
return hashParts[1].equals(calculatedHmac);
}
}
Loading