Skip to content

Commit

Permalink
chore(ws): simplify messaging server
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewazores committed Apr 30, 2024
1 parent c3d1541 commit 1d06e0e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 66 deletions.
91 changes: 27 additions & 64 deletions src/main/java/io/cryostat/ws/MessagingServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,19 @@
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jakarta.websocket.OnClose;
import jakarta.websocket.OnError;
import jakarta.websocket.OnMessage;
import jakarta.websocket.OnOpen;
import jakarta.websocket.Session;
import jakarta.websocket.server.ServerEndpoint;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;

@ApplicationScoped
Expand All @@ -50,14 +41,7 @@ public class MessagingServer {

@Inject ObjectMapper mapper;
@Inject Logger logger;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final BlockingQueue<Notification> msgQ;
private final Set<Session> sessions = new CopyOnWriteArraySet<>();
private volatile Future<?> task;

MessagingServer(@ConfigProperty(name = "cryostat.messaging.queue.size") int capacity) {
this.msgQ = new ArrayBlockingQueue<>(capacity);
}

@OnOpen
public void onOpen(Session session) throws InterruptedException {
Expand All @@ -84,63 +68,42 @@ public void onError(Session session, Throwable throwable) throws InterruptedExce
} catch (IOException ioe) {
logger.error("Unable to close session", ioe);
}
sessions.remove(session);
broadcast(
new Notification(
CLIENT_ACTIVITY_CATEGORY, Map.of(session.getId(), "disconnected")));
}

void start(@Observes StartupEvent evt) {
logger.infov("Starting {0} executor", getClass().getName());
cancelTask();
this.task =
executor.submit(
() -> {
loop:
while (!executor.isShutdown() && !Thread.interrupted()) {
try {
var notification = msgQ.take();
var map =
Map.of(
"meta",
Map.of("category", notification.category()),
"message",
notification.message());
try {
var json = mapper.writeValueAsString(map);
logger.debugv("Broadcasting: {0}", json);
sessions.forEach(s -> s.getAsyncRemote().sendText(json));
} catch (JsonProcessingException e) {
logger.error("Unable to serialize message to JSON", e);
}
} catch (InterruptedException ie) {
logger.warn(ie);
break loop;
}
}
});
}

void shutdown(@Observes ShutdownEvent evt) {
logger.infov("Shutting down {0} executor", getClass().getName());
executor.shutdownNow();
cancelTask();
msgQ.clear();
}

private void cancelTask() {
if (this.task != null) {
this.task.cancel(true);
this.task = null;
}
}

@OnMessage
public void onMessage(Session session, String message) {
logger.debugv("{0} message: \"{1}\"", session.getId(), message);
}

@ConsumeEvent
void broadcast(Notification notification) throws InterruptedException {
msgQ.put(notification);
@ConsumeEvent(blocking = true, ordered = true)
void broadcast(Notification notification) {
var map =
Map.of(
"meta",
Map.of("category", notification.category()),
"message",
notification.message());
String json;
try {
json = mapper.writeValueAsString(map);
} catch (JsonProcessingException e) {
logger.errorv(e, "Unable to serialize message to JSON: {0}", notification);
return;
}
logger.infov("Broadcasting: {0}", json);
sessions.forEach(
s ->
s.getAsyncRemote()
.sendText(
json,
h -> {
if (!h.isOK()) {
logger.warn(h.getException());
}
}));
}
}
2 changes: 0 additions & 2 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
quarkus.naming.enable-jndi=true
cryostat.discovery.jdp.enabled=false
cryostat.discovery.containers.poll-period=10s
cryostat.discovery.containers.request-timeout=2s
Expand All @@ -24,7 +23,6 @@ cryostat.connections.max-open=0
cryostat.connections.ttl=10s
cryostat.connections.failed-backoff=2s
cryostat.connections.failed-timeout=10s
cryostat.messaging.queue.size=1024
quarkus.rest-client.reports.url=http://localhost/
quarkus.cache.enabled=true
cryostat.services.reports.memory-cache.enabled=true
Expand Down

0 comments on commit 1d06e0e

Please sign in to comment.