diff --git a/src/main/java/com/trickl/flux/websocket/StreamDetail.java b/src/main/java/com/trickl/flux/websocket/StreamDetail.java new file mode 100644 index 0000000..ba5a098 --- /dev/null +++ b/src/main/java/com/trickl/flux/websocket/StreamDetail.java @@ -0,0 +1,23 @@ +package com.trickl.flux.websocket; + +import java.time.Instant; + +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Builder +@Data +@EqualsAndHashCode +public class StreamDetail { + protected StreamId id; + protected int subscriberCount; + protected int messageCount; + protected Instant subscriptionTime; + protected Instant lastMessageTime; + protected Instant cancelTime; + protected Instant completeTime; + protected Instant errorTime; + protected String errorMessage; + protected boolean isTerminated; +} \ No newline at end of file diff --git a/src/main/java/com/trickl/flux/websocket/WebSocketRequest.java b/src/main/java/com/trickl/flux/websocket/StreamId.java similarity index 92% rename from src/main/java/com/trickl/flux/websocket/WebSocketRequest.java rename to src/main/java/com/trickl/flux/websocket/StreamId.java index 363e5f4..77cb44a 100644 --- a/src/main/java/com/trickl/flux/websocket/WebSocketRequest.java +++ b/src/main/java/com/trickl/flux/websocket/StreamId.java @@ -10,7 +10,7 @@ @Builder @Data @EqualsAndHashCode -public class WebSocketRequest { +public class StreamId { protected final WebSocketDestinationType destinationType; protected final String channelType; protected final String userName; diff --git a/src/main/java/com/trickl/flux/websocket/WebSocketRequestBuilder.java b/src/main/java/com/trickl/flux/websocket/StreamIdParser.java similarity index 86% rename from src/main/java/com/trickl/flux/websocket/WebSocketRequestBuilder.java rename to src/main/java/com/trickl/flux/websocket/StreamIdParser.java index 8b8dac9..38aa0a3 100644 --- a/src/main/java/com/trickl/flux/websocket/WebSocketRequestBuilder.java +++ b/src/main/java/com/trickl/flux/websocket/StreamIdParser.java @@ -7,7 +7,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -public class WebSocketRequestBuilder implements Function> { +public class StreamIdParser implements Function> { private static final String TOPIC_PREFIX_PATTERN = "\\/(?topic)"; private static final String USER_QUEUE_PREFIX_PATTERN = @@ -22,11 +22,11 @@ public class WebSocketRequestBuilder implements Function apply(String destination) { + public Optional apply(String destination) { Matcher matcher = topicPattern.matcher(destination); - WebSocketRequest.WebSocketRequestBuilder builder - = WebSocketRequest.builder(); + StreamId.StreamIdBuilder builder + = StreamId.builder(); if (!matcher.matches()) { matcher = userQueuePattern.matcher(destination); diff --git a/src/main/java/com/trickl/flux/websocket/SubscriptionDetail.java b/src/main/java/com/trickl/flux/websocket/SubscriptionDetail.java new file mode 100644 index 0000000..7eb50e5 --- /dev/null +++ b/src/main/java/com/trickl/flux/websocket/SubscriptionDetail.java @@ -0,0 +1,23 @@ +package com.trickl.flux.websocket; + +import java.time.Instant; + +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Builder +@Data +@EqualsAndHashCode +public class SubscriptionDetail { + protected final String subscriptionId; + protected final String userName; + protected final String destination; + protected final String sessionId; + protected Instant subscriptionTime; + protected Instant cancelTime; + protected Instant completeTime; + protected Instant errorTime; + protected String errorMessage; + protected boolean isCancelled; +} \ No newline at end of file diff --git a/src/main/java/com/trickl/flux/websocket/WebSocketRequestRouter.java b/src/main/java/com/trickl/flux/websocket/WebSocketRequestRouter.java index a88a4a6..16139be 100644 --- a/src/main/java/com/trickl/flux/websocket/WebSocketRequestRouter.java +++ b/src/main/java/com/trickl/flux/websocket/WebSocketRequestRouter.java @@ -3,17 +3,20 @@ import com.trickl.exceptions.SubscriptionFailedException; import com.trickl.flux.consumers.SimpMessageSender; -import java.security.Principal; import java.text.MessageFormat; +import java.time.Instant; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.logging.Level; +import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.java.Log; +import org.reactivestreams.Subscription; import org.springframework.context.ApplicationEvent; import org.springframework.context.event.SmartApplicationListener; import org.springframework.core.Ordered; @@ -29,26 +32,28 @@ import org.springframework.web.socket.messaging.SessionDisconnectEvent; import org.springframework.web.socket.messaging.SessionSubscribeEvent; import org.springframework.web.socket.messaging.SessionUnsubscribeEvent; -import reactor.core.Disposable; + import reactor.core.publisher.Flux; @Log @RequiredArgsConstructor public class WebSocketRequestRouter implements SmartApplicationListener { - private final Function>> fluxFactory; + private final Function>> fluxFactory; private final SimpUserRegistry simpUserRegistry; private final SimpMessagingTemplate messagingTemplate; - // Destination -> Flux - private final Map>> fluxes = new ConcurrentHashMap<>(); + private final Map>> fluxes = new ConcurrentHashMap<>(); + + private final Map streams = new ConcurrentHashMap<>(); - // SubscriptionId -> Subscription - private final Map subscriptions = new ConcurrentHashMap<>(); + private final Map subscriptions = new ConcurrentHashMap<>(); - private final WebSocketRequestBuilder webSocketRequestBuilder = new WebSocketRequestBuilder(); + private final Map subscriptionDetails = new ConcurrentHashMap<>(); + + private final StreamIdParser streamIdParser = new StreamIdParser(); @Override public boolean supportsEventType(Class eventType) { @@ -65,32 +70,108 @@ public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } - protected void subscribe(Principal user, String destination, String subscriptionId) + protected void subscribe(SubscriptionDetail subscription) throws SubscriptionFailedException { - WebSocketRequest webSocketRequest = webSocketRequestBuilder.apply(destination) + String destination = subscription.getDestination(); + StreamId streamId = streamIdParser.apply(destination) .orElseThrow(() -> new SubscriptionFailedException( MessageFormat.format("Destination: {0} not found.", destination))); + StreamDetail stream = StreamDetail.builder() + .id(streamId) + .build(); + SimpMessageSender messageSender = new SimpMessageSender<>(messagingTemplate, destination); Flux connectableFlux = this.fluxes.computeIfAbsent( - webSocketRequest, fluxFactory::apply) + streamId, fluxFactory::apply) .orElseThrow(() -> new SubscriptionFailedException( MessageFormat.format("Destination: {0} not found.", destination))) - .doOnNext(new SimpMessageSender<>(messagingTemplate, destination)) + .doOnNext(value -> handleStreamValue(stream, messageSender, value)) + .doOnSubscribe(sub -> handleStreamSubscription(stream, sub)) + .doOnCancel(() -> handleStreamCancel(stream)) + .doOnError(error -> handleStreamError(stream, error)) + .doOnComplete(() -> handleStreamComplete(stream)) .publish() .refCount(); - Disposable subscription = connectableFlux - .doOnCancel(() -> this.fluxes.remove(webSocketRequest)) - .doOnTerminate(() -> this.fluxes.remove(webSocketRequest)) - .subscribe(); + connectableFlux + .doOnSubscribe(sub -> handleSubscription(subscription, stream, sub)) + .doOnCancel(() -> handleSubscriberCancel(subscription, stream)) + .doOnError(error -> handleSubscriberError(subscription, stream, error)) + .doOnComplete(() -> handleSubscriberComplete(subscription, stream)) + .subscribe(); + } + + protected void handleStreamSubscription(StreamDetail stream, Subscription subscription) { + stream.setSubscriptionTime(Instant.now()); + streams.put(stream.getId(), stream); + } + + protected void handleStreamValue( + StreamDetail stream, SimpMessageSender messageSender, T value) { + messageSender.accept(value); + stream.setMessageCount(stream.getMessageCount() + 1); + stream.setLastMessageTime(Instant.now()); + } + + protected void handleStreamCancel(StreamDetail stream) { + stream.setCancelTime(Instant.now()); + setStreamTerminated(stream.getId()); + } + + protected void handleStreamError(StreamDetail stream, Throwable error) { + stream.setErrorTime(Instant.now()); + stream.setErrorMessage(error.getLocalizedMessage()); + setStreamTerminated(stream.getId()); + } + + protected void handleStreamComplete(StreamDetail stream) { + stream.setCompleteTime(Instant.now()); + setStreamTerminated(stream.getId()); + } + + protected void setStreamTerminated(StreamId id) { + fluxes.computeIfPresent(id, (streamId, flux) -> { + streams.computeIfPresent(streamId, + (i, detail) -> { + detail.setTerminated(true); + return detail; + }); + return null; + }); + } + + protected void handleSubscription( + SubscriptionDetail detail, StreamDetail stream, Subscription subscription) { + detail.setSubscriptionTime(Instant.now()); + stream.setSubscriberCount(stream.getSubscriberCount() + 1); + subscriptions.put(detail.getSubscriptionId(), subscription); + subscriptionDetails.put(detail.getSubscriptionId(), detail); + } + + protected void handleSubscriberCancel(SubscriptionDetail subscription, StreamDetail stream) { + stream.setSubscriberCount(stream.getSubscriberCount() - 1); + subscription.setCancelTime(Instant.now()); + } - this.subscriptions.put(subscriptionId, subscription); + protected void handleSubscriberError( + SubscriptionDetail subscription, StreamDetail stream, Throwable error) { + subscription.setErrorTime(Instant.now()); + subscription.setErrorMessage(error.getLocalizedMessage()); + } + + protected void handleSubscriberComplete(SubscriptionDetail subscription, StreamDetail stream) { + subscription.setCompleteTime(Instant.now()); } protected void unsubscribe(String subscriptionId) { - this.subscriptions.computeIfPresent(subscriptionId, + subscriptions.computeIfPresent(subscriptionId, (id, subscription) -> { - subscription.dispose(); + subscription.cancel(); + subscriptionDetails.computeIfPresent(subscriptionId, + (subId, detail) -> { + detail.setCancelled(true); + return detail; + }); return null; }); } @@ -110,7 +191,13 @@ public void onApplicationEvent(ApplicationEvent event) { Assert.state(destination != null, "No destination"); try { - this.subscribe(accessor.getUser(), destination, accessor.getSubscriptionId()); + SubscriptionDetail subscriptionInfo = SubscriptionDetail.builder() + .userName(accessor.getUser() != null ? accessor.getUser().getName() : null) + .destination(destination) + .subscriptionId(accessor.getSubscriptionId()) + .sessionId(accessor.getSessionId()) + .build(); + subscribe(subscriptionInfo); } catch (SubscriptionFailedException ex) { log.log(Level.WARNING, "Subscription failed", ex); } @@ -128,4 +215,12 @@ public void onApplicationEvent(ApplicationEvent event) { unsubscribe(accessor.getSubscriptionId()); } } + + public List getSubscriptionDetails() { + return subscriptionDetails.values().stream().collect(Collectors.toList()); + } + + public List getStreams() { + return streams.values().stream().collect(Collectors.toList()); + } } diff --git a/src/test/java/com/trickl/flux/consumers/SimpMessageSenderTest.java b/src/test/java/com/trickl/flux/consumers/SimpMessageSenderTest.java index 8ba6a73..fd6aa4f 100644 --- a/src/test/java/com/trickl/flux/consumers/SimpMessageSenderTest.java +++ b/src/test/java/com/trickl/flux/consumers/SimpMessageSenderTest.java @@ -36,7 +36,7 @@ public void init() { public void testDoesNothingWithoutSubscribers() { TestPublisher input = TestPublisher.create(); - Publisher output = Flux.from(input) + Flux.from(input) .doOnNext(new SimpMessageSender<>(messagingTemplate, DESTINATION)) .publish() .refCount();