Skip to content

Commit

Permalink
Support querying inbound stream metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
trickl committed Jan 5, 2020
1 parent 378eaa7 commit e042769
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 26 deletions.
23 changes: 23 additions & 0 deletions src/main/java/com/trickl/flux/websocket/StreamDetail.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.trickl.flux.websocket;

import java.time.Instant;

import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;

@Builder
@Data
@EqualsAndHashCode
public class StreamDetail {
protected StreamId id;
protected int subscriberCount;
protected int messageCount;
protected Instant subscriptionTime;
protected Instant lastMessageTime;
protected Instant cancelTime;
protected Instant completeTime;
protected Instant errorTime;
protected String errorMessage;
protected boolean isTerminated;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
@Builder
@Data
@EqualsAndHashCode
public class WebSocketRequest {
public class StreamId {
protected final WebSocketDestinationType destinationType;
protected final String channelType;
protected final String userName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class WebSocketRequestBuilder implements Function<String, Optional<WebSocketRequest>> {
public class StreamIdParser implements Function<String, Optional<StreamId>> {

private static final String TOPIC_PREFIX_PATTERN = "\\/(?<destinationType>topic)";
private static final String USER_QUEUE_PREFIX_PATTERN =
Expand All @@ -22,11 +22,11 @@ public class WebSocketRequestBuilder implements Function<String, Optional<WebSoc


@Override
public Optional<WebSocketRequest> apply(String destination) {
public Optional<StreamId> apply(String destination) {
Matcher matcher = topicPattern.matcher(destination);

WebSocketRequest.WebSocketRequestBuilder builder
= WebSocketRequest.builder();
StreamId.StreamIdBuilder builder
= StreamId.builder();

if (!matcher.matches()) {
matcher = userQueuePattern.matcher(destination);
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/com/trickl/flux/websocket/SubscriptionDetail.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.trickl.flux.websocket;

import java.time.Instant;

import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;

@Builder
@Data
@EqualsAndHashCode
public class SubscriptionDetail {
protected final String subscriptionId;
protected final String userName;
protected final String destination;
protected final String sessionId;
protected Instant subscriptionTime;
protected Instant cancelTime;
protected Instant completeTime;
protected Instant errorTime;
protected String errorMessage;
protected boolean isCancelled;
}
135 changes: 115 additions & 20 deletions src/main/java/com/trickl/flux/websocket/WebSocketRequestRouter.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@
import com.trickl.exceptions.SubscriptionFailedException;
import com.trickl.flux.consumers.SimpMessageSender;

import java.security.Principal;
import java.text.MessageFormat;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.stream.Collectors;

import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;

import org.reactivestreams.Subscription;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.core.Ordered;
Expand All @@ -29,26 +32,28 @@
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
import org.springframework.web.socket.messaging.SessionSubscribeEvent;
import org.springframework.web.socket.messaging.SessionUnsubscribeEvent;
import reactor.core.Disposable;

import reactor.core.publisher.Flux;

@Log
@RequiredArgsConstructor
public class WebSocketRequestRouter<T> implements SmartApplicationListener {

private final Function<WebSocketRequest, Optional<Flux<T>>> fluxFactory;
private final Function<StreamId, Optional<Flux<T>>> fluxFactory;

private final SimpUserRegistry simpUserRegistry;

private final SimpMessagingTemplate messagingTemplate;

// Destination -> Flux
private final Map<WebSocketRequest, Optional<Flux<T>>> fluxes = new ConcurrentHashMap<>();
private final Map<StreamId, Optional<Flux<T>>> fluxes = new ConcurrentHashMap<>();

private final Map<StreamId, StreamDetail> streams = new ConcurrentHashMap<>();

// SubscriptionId -> Subscription
private final Map<String, Disposable> subscriptions = new ConcurrentHashMap<>();
private final Map<String, Subscription> subscriptions = new ConcurrentHashMap<>();

private final WebSocketRequestBuilder webSocketRequestBuilder = new WebSocketRequestBuilder();
private final Map<String, SubscriptionDetail> subscriptionDetails = new ConcurrentHashMap<>();

private final StreamIdParser streamIdParser = new StreamIdParser();

@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
Expand All @@ -65,32 +70,108 @@ public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}

protected void subscribe(Principal user, String destination, String subscriptionId)
protected void subscribe(SubscriptionDetail subscription)
throws SubscriptionFailedException {
WebSocketRequest webSocketRequest = webSocketRequestBuilder.apply(destination)
String destination = subscription.getDestination();
StreamId streamId = streamIdParser.apply(destination)
.orElseThrow(() -> new SubscriptionFailedException(
MessageFormat.format("Destination: {0} not found.", destination)));
StreamDetail stream = StreamDetail.builder()
.id(streamId)
.build();

SimpMessageSender<T> messageSender = new SimpMessageSender<>(messagingTemplate, destination);
Flux<?> connectableFlux = this.fluxes.computeIfAbsent(
webSocketRequest, fluxFactory::apply)
streamId, fluxFactory::apply)
.orElseThrow(() -> new SubscriptionFailedException(
MessageFormat.format("Destination: {0} not found.", destination)))
.doOnNext(new SimpMessageSender<>(messagingTemplate, destination))
.doOnNext(value -> handleStreamValue(stream, messageSender, value))
.doOnSubscribe(sub -> handleStreamSubscription(stream, sub))
.doOnCancel(() -> handleStreamCancel(stream))
.doOnError(error -> handleStreamError(stream, error))
.doOnComplete(() -> handleStreamComplete(stream))
.publish()
.refCount();

Disposable subscription = connectableFlux
.doOnCancel(() -> this.fluxes.remove(webSocketRequest))
.doOnTerminate(() -> this.fluxes.remove(webSocketRequest))
.subscribe();
connectableFlux
.doOnSubscribe(sub -> handleSubscription(subscription, stream, sub))
.doOnCancel(() -> handleSubscriberCancel(subscription, stream))
.doOnError(error -> handleSubscriberError(subscription, stream, error))
.doOnComplete(() -> handleSubscriberComplete(subscription, stream))
.subscribe();
}

protected void handleStreamSubscription(StreamDetail stream, Subscription subscription) {
stream.setSubscriptionTime(Instant.now());
streams.put(stream.getId(), stream);
}

protected void handleStreamValue(
StreamDetail stream, SimpMessageSender<T> messageSender, T value) {
messageSender.accept(value);
stream.setMessageCount(stream.getMessageCount() + 1);
stream.setLastMessageTime(Instant.now());
}

protected void handleStreamCancel(StreamDetail stream) {
stream.setCancelTime(Instant.now());
setStreamTerminated(stream.getId());
}

protected void handleStreamError(StreamDetail stream, Throwable error) {
stream.setErrorTime(Instant.now());
stream.setErrorMessage(error.getLocalizedMessage());
setStreamTerminated(stream.getId());
}

protected void handleStreamComplete(StreamDetail stream) {
stream.setCompleteTime(Instant.now());
setStreamTerminated(stream.getId());
}

protected void setStreamTerminated(StreamId id) {
fluxes.computeIfPresent(id, (streamId, flux) -> {
streams.computeIfPresent(streamId,
(i, detail) -> {
detail.setTerminated(true);
return detail;
});
return null;
});
}

protected void handleSubscription(
SubscriptionDetail detail, StreamDetail stream, Subscription subscription) {
detail.setSubscriptionTime(Instant.now());
stream.setSubscriberCount(stream.getSubscriberCount() + 1);
subscriptions.put(detail.getSubscriptionId(), subscription);
subscriptionDetails.put(detail.getSubscriptionId(), detail);
}

protected void handleSubscriberCancel(SubscriptionDetail subscription, StreamDetail stream) {
stream.setSubscriberCount(stream.getSubscriberCount() - 1);
subscription.setCancelTime(Instant.now());
}

this.subscriptions.put(subscriptionId, subscription);
protected void handleSubscriberError(
SubscriptionDetail subscription, StreamDetail stream, Throwable error) {
subscription.setErrorTime(Instant.now());
subscription.setErrorMessage(error.getLocalizedMessage());
}

protected void handleSubscriberComplete(SubscriptionDetail subscription, StreamDetail stream) {
subscription.setCompleteTime(Instant.now());
}

protected void unsubscribe(String subscriptionId) {
this.subscriptions.computeIfPresent(subscriptionId,
subscriptions.computeIfPresent(subscriptionId,
(id, subscription) -> {
subscription.dispose();
subscription.cancel();
subscriptionDetails.computeIfPresent(subscriptionId,
(subId, detail) -> {
detail.setCancelled(true);
return detail;
});
return null;
});
}
Expand All @@ -110,7 +191,13 @@ public void onApplicationEvent(ApplicationEvent event) {
Assert.state(destination != null, "No destination");

try {
this.subscribe(accessor.getUser(), destination, accessor.getSubscriptionId());
SubscriptionDetail subscriptionInfo = SubscriptionDetail.builder()
.userName(accessor.getUser() != null ? accessor.getUser().getName() : null)
.destination(destination)
.subscriptionId(accessor.getSubscriptionId())
.sessionId(accessor.getSessionId())
.build();
subscribe(subscriptionInfo);
} catch (SubscriptionFailedException ex) {
log.log(Level.WARNING, "Subscription failed", ex);
}
Expand All @@ -128,4 +215,12 @@ public void onApplicationEvent(ApplicationEvent event) {
unsubscribe(accessor.getSubscriptionId());
}
}

public List<SubscriptionDetail> getSubscriptionDetails() {
return subscriptionDetails.values().stream().collect(Collectors.toList());
}

public List<StreamDetail> getStreams() {
return streams.values().stream().collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void init() {
public void testDoesNothingWithoutSubscribers() {
TestPublisher<String> input = TestPublisher.<String>create();

Publisher<String> output = Flux.from(input)
Flux.from(input)
.doOnNext(new SimpMessageSender<>(messagingTemplate, DESTINATION))
.publish()
.refCount();
Expand Down

0 comments on commit e042769

Please sign in to comment.