Skip to content

Commit

Permalink
Test generic stomp reactive client
Browse files Browse the repository at this point in the history
  • Loading branch information
trickl committed Dec 20, 2019
1 parent 42a58e2 commit 0760f1b
Show file tree
Hide file tree
Showing 20 changed files with 306 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public boolean removeSupplier(Supplier<S> supplier) {

/**
* Apply all suppliers.
*
* @return All suppliers
*/
public List<S> applyAll() {
return delegates.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class FixedRatePublisher implements Supplier<Flux<Long>> {
/**
* Create a new flux that emitters an incrementing long at a fixed rate.
*
* @param period the time period between emitted elements
*/
public FixedRatePublisher(Duration period) {
this(Duration.ZERO, period, Schedulers.parallel());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class BinaryWebSocketFluxClient {
/**
* Get a flux of messages from the stream.
*
* @param send flux of messages to send upstream
* @return A flux of (untyped) objects
*/
public Flux<byte[]> get(Publisher<byte[]> send) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public class TextWebSocketFluxClient {

/**
* Get a flux of messages from the stream.
*
*
* @param send the flux of messages to send upstream
* @return A flux of (untyped) objects
*/
public Flux<String> get(Publisher<String> send) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.trickl.flux.websocket.stomp;

import com.trickl.flux.websocket.BinaryWebSocketFluxClient;
import com.trickl.flux.websocket.stomp.StompFrame;
import com.trickl.flux.websocket.stomp.frames.StompConnectFrame;
import com.trickl.flux.websocket.stomp.frames.StompDisconnectFrame;

import java.net.URI;
import java.util.function.Supplier;

import lombok.RequiredArgsConstructor;

import org.reactivestreams.Publisher;
import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.socket.client.WebSocketClient;

import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

@RequiredArgsConstructor
public class RawStompFluxClient {
private final WebSocketClient webSocketClient;
private final URI transportUri;
private final Supplier<HttpHeaders> webSocketHeadersProvider;

/**
* Connect to a stomp service.
*
* @param send The stream of messages to send.
* @return A stream of messages received.
*/
public Flux<StompFrame> get(Publisher<StompFrame> send) {
StompInputTransformer stompInputTransformer =
new StompInputTransformer();
StompOutputTransformer stompOutputTransformer =
new StompOutputTransformer();

EmitterProcessor<StompFrame> frameProcessor = EmitterProcessor.create();
FluxSink<StompFrame> frameSink = frameProcessor.sink();
Flux<StompFrame> output = Flux.merge(frameProcessor, send);

BinaryWebSocketFluxClient webSocketFluxClient =
new BinaryWebSocketFluxClient(
webSocketClient,
transportUri,
webSocketHeadersProvider,
() -> onConnect(frameSink),
() -> onDisconnect(frameSink));
return stompInputTransformer.apply(webSocketFluxClient.get(
stompOutputTransformer.apply(output)));
}

protected void onConnect(FluxSink<StompFrame> frameSink) {
StompConnectFrame connectFrame = StompConnectFrame.builder()
.acceptVersion("1.0,1.1,1.2")
.host(transportUri.getHost())
.build();
frameSink.next(connectFrame);
}

protected void onDisconnect(FluxSink<StompFrame> frameSink) {
StompDisconnectFrame disconnectFrame = StompDisconnectFrame.builder()
.build();
frameSink.next(disconnectFrame);
}
}
203 changes: 164 additions & 39 deletions src/main/java/com/trickl/flux/websocket/stomp/StompFluxClient.java
Original file line number Diff line number Diff line change
@@ -1,72 +1,197 @@
package com.trickl.flux.websocket.stomp;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.trickl.flux.websocket.BinaryWebSocketFluxClient;
import com.trickl.flux.transformers.ThrowableMapTransformer;
import com.trickl.flux.websocket.stomp.StompFrame;
import com.trickl.flux.websocket.stomp.frames.StompConnectFrame;
import com.trickl.flux.websocket.stomp.frames.StompDisconnectFrame;
import com.trickl.flux.websocket.stomp.frames.StompConnectedFrame;
import com.trickl.flux.websocket.stomp.frames.StompMessageFrame;
import com.trickl.flux.websocket.stomp.frames.StompSendFrame;
import com.trickl.flux.websocket.stomp.frames.StompSubscribeFrame;
import com.trickl.flux.websocket.stomp.frames.StompUnsubscribeFrame;

import java.io.IOException;
import java.net.URI;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.logging.Level;

import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;

import org.reactivestreams.Publisher;
import org.springframework.http.HttpHeaders;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.web.reactive.socket.client.WebSocketClient;

import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

@Log
@RequiredArgsConstructor
public class StompFluxClient<O, I> {
public class StompFluxClient {
private final WebSocketClient webSocketClient;
private final URI transportUri;
private final Supplier<HttpHeaders> webSocketHeadersProvider;
private final ObjectMapper objectMapper;

private final Class<O> requestMessageType;
private final Class<I> responseMessageType;
private final EmitterProcessor<StompFrame> responseProcessor = EmitterProcessor.create();

private final EmitterProcessor<StompFrame> streamRequestProcessor = EmitterProcessor.create();

private final FluxSink<StompFrame> streamRequestSink = streamRequestProcessor.sink();

private final AtomicInteger maxSubscriptionNumber = new AtomicInteger(0);

private final Map<String, String> subscriptionDestinationIdMap = new HashMap<>();

private final AtomicReference<Flux<StompMessageFrame>> sharedStream = new AtomicReference<>();

private final AtomicBoolean isConnected = new AtomicBoolean(false);

private final AtomicBoolean isConnecting = new AtomicBoolean(false);

/**
* Connect to a stomp service.
*
* @param send The stream of messages to send.
* @return A stream of messages received.
* Connect to the stomp transport.
*/
public Flux<StompFrame> get(Publisher<StompFrame> send) {
StompInputTransformer<I> stompInputTransformer =
new StompInputTransformer<>(
objectMapper, responseMessageType);
StompOutputTransformer<O> stompOutputTransformer =
new StompOutputTransformer<>(objectMapper, requestMessageType);

EmitterProcessor<StompFrame> frameProcessor = EmitterProcessor.create();
FluxSink<StompFrame> frameSink = frameProcessor.sink();
Flux<StompFrame> output = Flux.merge(frameProcessor, send);

BinaryWebSocketFluxClient webSocketFluxClient =
new BinaryWebSocketFluxClient(
webSocketClient,
transportUri,
webSocketHeadersProvider,
() -> onConnect(frameSink),
() -> onDisconnect(frameSink));
return stompInputTransformer.apply(webSocketFluxClient.get(
stompOutputTransformer.apply(output)));
public void connect() {
if (isConnected.get() && isConnecting.compareAndSet(false, true)) {
// Already connected
return;
}

try {
RawStompFluxClient stompFluxClient =
new RawStompFluxClient(
webSocketClient, transportUri, webSocketHeadersProvider);

Publisher<StompFrame> sendWithResponse =
Flux.merge(streamRequestProcessor, responseProcessor);

Flux<StompMessageFrame> stream = stompFluxClient.get(sendWithResponse)
.doOnNext(frame -> {
log.info("Got frame " + frame.getClass());
if (StompConnectedFrame.class.equals(frame.getClass())) {
handleConnectStream();
}
})
.filter(frame -> frame.getHeaderAccessor().getCommand().equals(StompCommand.MESSAGE))
.cast(StompMessageFrame.class)
.onErrorContinue(JsonProcessingException.class, this::warnAndDropError)
.doAfterTerminate(this::handleTerminateStream)
.repeatWhen(this::reconnect)
.publish()
.refCount();

sharedStream.set(stream);
} finally {
isConnecting.set(false);
}
}

protected void onConnect(FluxSink<StompFrame> frameSink) {
StompConnectFrame connectFrame = StompConnectFrame.builder()
.acceptVersion("1.0,1.1,1.2")
.host(transportUri.getHost())
.build();
frameSink.next(connectFrame);
protected Publisher<Long> reconnect(Flux<Long> emittedEachAttempt) {
return emittedEachAttempt.delayUntil(attempt -> {
if (attempt == 0) {
return Mono.just(attempt);
}
return Mono.delay(Duration.ofSeconds(5000));
});
}

protected void handleTerminateStream() {
sharedStream.set(null);
}


protected void handleConnectStream() {
isConnected.set(true);
resubscribeAll();
}

protected void handleDisconnectStream() {
isConnected.set(false);
}

protected void onDisconnect(FluxSink<StompFrame> frameSink) {
StompDisconnectFrame disconnectFrame = StompDisconnectFrame.builder()
protected void warnAndDropError(Throwable ex, Object value) {
log.log(
Level.WARNING,
MessageFormat.format(
"Json processing error.\n Message: {0}\nValue: {1}\n",
new Object[] {ex.getMessage(), value}));
}

protected String subscribeDestination(String destination) {
int subscriptionNumber = maxSubscriptionNumber.incrementAndGet();
String subscriptionId = MessageFormat.format("sub-{0}", subscriptionNumber);
StompFrame frame = StompSubscribeFrame.builder()
.destination(destination)
.subscriptionId(subscriptionId)
.build();
frameSink.next(disconnectFrame);
streamRequestSink.next(frame);
return subscriptionId;
}

protected void resubscribeAll() {
subscriptionDestinationIdMap.replaceAll(
(dest, id) -> subscribeDestination(dest));
}

/**
* Subscribe to a destination.
*
* @param destination The destination channel
* @return A flux of messages on that channel
*/
public <T> Flux<T> subscribe(String destination, Class<T> messageType) {
connect();
subscriptionDestinationIdMap.computeIfAbsent(destination, this::subscribeDestination);

ThrowableMapTransformer<StompMessageFrame, T> messageTransformer =
new ThrowableMapTransformer<>(frame -> readStompMessageFrame(frame, messageType));

Flux<StompMessageFrame> messageFrameFlux = sharedStream.get()
.filter(frame -> frame.getDestination().equals(destination))
.doOnTerminate(() -> unsubscribe(destination));

return messageTransformer.apply(messageFrameFlux);
}

protected void unsubscribe(String destination) {
subscriptionDestinationIdMap.computeIfPresent(destination, (dest, subscriptionId) -> {
StompFrame frame = StompUnsubscribeFrame.builder()
.subscriptionId(subscriptionId)
.build();
streamRequestSink.next(frame);
return null;
});
}

protected <T> T readStompMessageFrame(StompMessageFrame frame, Class<T> messageType)
throws IOException {
return objectMapper.readValue(frame.getBody(), messageType);
}

/**
* Send a message to a destination.
* @param <O> The type of object to send
* @param message The message
* @param destination The destination
* @throws JsonProcessingException If the message cannot be JSON encoded
*/
public <O> void sendMessage(O message, String destination) throws JsonProcessingException {
String body = objectMapper.writeValueAsString(message);
StompFrame frame = StompSendFrame.builder()
.destination(destination)
.body(body).build();
streamRequestSink.next(frame);
}
}

Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.trickl.flux.websocket.stomp;

import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;

import org.springframework.messaging.Message;
Expand All @@ -10,5 +8,5 @@
public interface StompFrame {
StompHeaderAccessor getHeaderAccessor();

Message<byte[]> toMessage(ObjectMapper objectMapper) throws IOException;
Message<byte[]> toMessage() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.trickl.flux.websocket.stomp;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.trickl.flux.websocket.stomp.StompFrame;
import com.trickl.flux.websocket.stomp.frames.StompConnectedFrame;
import com.trickl.flux.websocket.stomp.frames.StompErrorFrame;
Expand All @@ -16,10 +15,7 @@
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;

@RequiredArgsConstructor
public class StompFrameBuilder<T> implements Function<Message<byte[]>, StompFrame> {

private final ObjectMapper objectMapper;
private final Class<T> messageType;
public class StompFrameBuilder implements Function<Message<byte[]>, StompFrame> {

/**
* Build a stomp frame from a websocket message.
Expand All @@ -33,7 +29,7 @@ public StompFrame apply(Message<byte[]> message) {
switch (headerAccessor.getCommand()) {
case MESSAGE:
return StompMessageFrame.create(
headerAccessor, message.getPayload(), objectMapper, messageType);
headerAccessor, message.getPayload());
case CONNECTED:
return StompConnectedFrame.create(headerAccessor);
case RECEIPT:
Expand Down
Loading

0 comments on commit 0760f1b

Please sign in to comment.