From 0760f1beb2163901511c0eb61543862e8a8def5f Mon Sep 17 00:00:00 2001 From: Tim Gee Date: Fri, 20 Dec 2019 14:16:15 +0000 Subject: [PATCH] Test generic stomp reactive client --- .../ConditionalDelegatingSupplier.java | 2 + .../flux/publishers/FixedRatePublisher.java | 1 + .../websocket/BinaryWebSocketFluxClient.java | 1 + .../websocket/TextWebSocketFluxClient.java | 3 +- .../websocket/stomp/RawStompFluxClient.java | 67 ++++++ .../flux/websocket/stomp/StompFluxClient.java | 203 ++++++++++++++---- .../flux/websocket/stomp/StompFrame.java | 4 +- .../websocket/stomp/StompFrameBuilder.java | 8 +- .../stomp/StompInputTransformer.java | 10 +- .../websocket/stomp/StompMessageCodec.java | 13 +- .../stomp/StompOutputTransformer.java | 11 +- .../stomp/frames/StompConnectFrame.java | 3 +- .../stomp/frames/StompConnectedFrame.java | 3 +- .../stomp/frames/StompDisconnectFrame.java | 3 +- .../stomp/frames/StompErrorFrame.java | 4 +- .../stomp/frames/StompMessageFrame.java | 36 ++-- .../stomp/frames/StompReceiptFrame.java | 4 +- .../stomp/frames/StompSendFrame.java | 13 +- .../stomp/frames/StompSubscribeFrame.java | 3 +- .../stomp/frames/StompUnsubscribeFrame.java | 33 +++ 20 files changed, 306 insertions(+), 119 deletions(-) create mode 100644 src/main/java/com/trickl/flux/websocket/stomp/RawStompFluxClient.java create mode 100644 src/main/java/com/trickl/flux/websocket/stomp/frames/StompUnsubscribeFrame.java diff --git a/src/main/java/com/trickl/collections/ConditionalDelegatingSupplier.java b/src/main/java/com/trickl/collections/ConditionalDelegatingSupplier.java index 3ff4ab1..df6c396 100644 --- a/src/main/java/com/trickl/collections/ConditionalDelegatingSupplier.java +++ b/src/main/java/com/trickl/collections/ConditionalDelegatingSupplier.java @@ -41,6 +41,8 @@ public boolean removeSupplier(Supplier supplier) { /** * Apply all suppliers. + * + * @return All suppliers */ public List applyAll() { return delegates.stream() diff --git a/src/main/java/com/trickl/flux/publishers/FixedRatePublisher.java b/src/main/java/com/trickl/flux/publishers/FixedRatePublisher.java index cea11e4..c7dc2a6 100644 --- a/src/main/java/com/trickl/flux/publishers/FixedRatePublisher.java +++ b/src/main/java/com/trickl/flux/publishers/FixedRatePublisher.java @@ -23,6 +23,7 @@ public class FixedRatePublisher implements Supplier> { /** * Create a new flux that emitters an incrementing long at a fixed rate. * + * @param period the time period between emitted elements */ public FixedRatePublisher(Duration period) { this(Duration.ZERO, period, Schedulers.parallel()); diff --git a/src/main/java/com/trickl/flux/websocket/BinaryWebSocketFluxClient.java b/src/main/java/com/trickl/flux/websocket/BinaryWebSocketFluxClient.java index 22b11ab..9b3cac6 100644 --- a/src/main/java/com/trickl/flux/websocket/BinaryWebSocketFluxClient.java +++ b/src/main/java/com/trickl/flux/websocket/BinaryWebSocketFluxClient.java @@ -31,6 +31,7 @@ public class BinaryWebSocketFluxClient { /** * Get a flux of messages from the stream. * + * @param send flux of messages to send upstream * @return A flux of (untyped) objects */ public Flux get(Publisher send) { diff --git a/src/main/java/com/trickl/flux/websocket/TextWebSocketFluxClient.java b/src/main/java/com/trickl/flux/websocket/TextWebSocketFluxClient.java index 290b695..51f4b29 100644 --- a/src/main/java/com/trickl/flux/websocket/TextWebSocketFluxClient.java +++ b/src/main/java/com/trickl/flux/websocket/TextWebSocketFluxClient.java @@ -26,7 +26,8 @@ public class TextWebSocketFluxClient { /** * Get a flux of messages from the stream. - * + * + * @param send the flux of messages to send upstream * @return A flux of (untyped) objects */ public Flux get(Publisher send) { diff --git a/src/main/java/com/trickl/flux/websocket/stomp/RawStompFluxClient.java b/src/main/java/com/trickl/flux/websocket/stomp/RawStompFluxClient.java new file mode 100644 index 0000000..c2c20c3 --- /dev/null +++ b/src/main/java/com/trickl/flux/websocket/stomp/RawStompFluxClient.java @@ -0,0 +1,67 @@ +package com.trickl.flux.websocket.stomp; + +import com.trickl.flux.websocket.BinaryWebSocketFluxClient; +import com.trickl.flux.websocket.stomp.StompFrame; +import com.trickl.flux.websocket.stomp.frames.StompConnectFrame; +import com.trickl.flux.websocket.stomp.frames.StompDisconnectFrame; + +import java.net.URI; +import java.util.function.Supplier; + +import lombok.RequiredArgsConstructor; + +import org.reactivestreams.Publisher; +import org.springframework.http.HttpHeaders; +import org.springframework.web.reactive.socket.client.WebSocketClient; + +import reactor.core.publisher.EmitterProcessor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; + +@RequiredArgsConstructor +public class RawStompFluxClient { + private final WebSocketClient webSocketClient; + private final URI transportUri; + private final Supplier webSocketHeadersProvider; + + /** + * Connect to a stomp service. + * + * @param send The stream of messages to send. + * @return A stream of messages received. + */ + public Flux get(Publisher send) { + StompInputTransformer stompInputTransformer = + new StompInputTransformer(); + StompOutputTransformer stompOutputTransformer = + new StompOutputTransformer(); + + EmitterProcessor frameProcessor = EmitterProcessor.create(); + FluxSink frameSink = frameProcessor.sink(); + Flux output = Flux.merge(frameProcessor, send); + + BinaryWebSocketFluxClient webSocketFluxClient = + new BinaryWebSocketFluxClient( + webSocketClient, + transportUri, + webSocketHeadersProvider, + () -> onConnect(frameSink), + () -> onDisconnect(frameSink)); + return stompInputTransformer.apply(webSocketFluxClient.get( + stompOutputTransformer.apply(output))); + } + + protected void onConnect(FluxSink frameSink) { + StompConnectFrame connectFrame = StompConnectFrame.builder() + .acceptVersion("1.0,1.1,1.2") + .host(transportUri.getHost()) + .build(); + frameSink.next(connectFrame); + } + + protected void onDisconnect(FluxSink frameSink) { + StompDisconnectFrame disconnectFrame = StompDisconnectFrame.builder() + .build(); + frameSink.next(disconnectFrame); + } +} diff --git a/src/main/java/com/trickl/flux/websocket/stomp/StompFluxClient.java b/src/main/java/com/trickl/flux/websocket/stomp/StompFluxClient.java index 08138b2..0840605 100644 --- a/src/main/java/com/trickl/flux/websocket/stomp/StompFluxClient.java +++ b/src/main/java/com/trickl/flux/websocket/stomp/StompFluxClient.java @@ -1,72 +1,197 @@ package com.trickl.flux.websocket.stomp; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.trickl.flux.websocket.BinaryWebSocketFluxClient; +import com.trickl.flux.transformers.ThrowableMapTransformer; import com.trickl.flux.websocket.stomp.StompFrame; -import com.trickl.flux.websocket.stomp.frames.StompConnectFrame; -import com.trickl.flux.websocket.stomp.frames.StompDisconnectFrame; +import com.trickl.flux.websocket.stomp.frames.StompConnectedFrame; +import com.trickl.flux.websocket.stomp.frames.StompMessageFrame; +import com.trickl.flux.websocket.stomp.frames.StompSendFrame; +import com.trickl.flux.websocket.stomp.frames.StompSubscribeFrame; +import com.trickl.flux.websocket.stomp.frames.StompUnsubscribeFrame; +import java.io.IOException; import java.net.URI; +import java.text.MessageFormat; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import java.util.logging.Level; + import lombok.RequiredArgsConstructor; +import lombok.extern.java.Log; import org.reactivestreams.Publisher; import org.springframework.http.HttpHeaders; +import org.springframework.messaging.simp.stomp.StompCommand; import org.springframework.web.reactive.socket.client.WebSocketClient; import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +@Log @RequiredArgsConstructor -public class StompFluxClient { +public class StompFluxClient { private final WebSocketClient webSocketClient; private final URI transportUri; private final Supplier webSocketHeadersProvider; private final ObjectMapper objectMapper; - private final Class requestMessageType; - private final Class responseMessageType; + private final EmitterProcessor responseProcessor = EmitterProcessor.create(); + + private final EmitterProcessor streamRequestProcessor = EmitterProcessor.create(); + + private final FluxSink streamRequestSink = streamRequestProcessor.sink(); + + private final AtomicInteger maxSubscriptionNumber = new AtomicInteger(0); + + private final Map subscriptionDestinationIdMap = new HashMap<>(); + + private final AtomicReference> sharedStream = new AtomicReference<>(); + + private final AtomicBoolean isConnected = new AtomicBoolean(false); + + private final AtomicBoolean isConnecting = new AtomicBoolean(false); /** - * Connect to a stomp service. - * - * @param send The stream of messages to send. - * @return A stream of messages received. + * Connect to the stomp transport. */ - public Flux get(Publisher send) { - StompInputTransformer stompInputTransformer = - new StompInputTransformer<>( - objectMapper, responseMessageType); - StompOutputTransformer stompOutputTransformer = - new StompOutputTransformer<>(objectMapper, requestMessageType); - - EmitterProcessor frameProcessor = EmitterProcessor.create(); - FluxSink frameSink = frameProcessor.sink(); - Flux output = Flux.merge(frameProcessor, send); - - BinaryWebSocketFluxClient webSocketFluxClient = - new BinaryWebSocketFluxClient( - webSocketClient, - transportUri, - webSocketHeadersProvider, - () -> onConnect(frameSink), - () -> onDisconnect(frameSink)); - return stompInputTransformer.apply(webSocketFluxClient.get( - stompOutputTransformer.apply(output))); + public void connect() { + if (isConnected.get() && isConnecting.compareAndSet(false, true)) { + // Already connected + return; + } + + try { + RawStompFluxClient stompFluxClient = + new RawStompFluxClient( + webSocketClient, transportUri, webSocketHeadersProvider); + + Publisher sendWithResponse = + Flux.merge(streamRequestProcessor, responseProcessor); + + Flux stream = stompFluxClient.get(sendWithResponse) + .doOnNext(frame -> { + log.info("Got frame " + frame.getClass()); + if (StompConnectedFrame.class.equals(frame.getClass())) { + handleConnectStream(); + } + }) + .filter(frame -> frame.getHeaderAccessor().getCommand().equals(StompCommand.MESSAGE)) + .cast(StompMessageFrame.class) + .onErrorContinue(JsonProcessingException.class, this::warnAndDropError) + .doAfterTerminate(this::handleTerminateStream) + .repeatWhen(this::reconnect) + .publish() + .refCount(); + + sharedStream.set(stream); + } finally { + isConnecting.set(false); + } } - protected void onConnect(FluxSink frameSink) { - StompConnectFrame connectFrame = StompConnectFrame.builder() - .acceptVersion("1.0,1.1,1.2") - .host(transportUri.getHost()) - .build(); - frameSink.next(connectFrame); + protected Publisher reconnect(Flux emittedEachAttempt) { + return emittedEachAttempt.delayUntil(attempt -> { + if (attempt == 0) { + return Mono.just(attempt); + } + return Mono.delay(Duration.ofSeconds(5000)); + }); + } + + protected void handleTerminateStream() { + sharedStream.set(null); + } + + + protected void handleConnectStream() { + isConnected.set(true); + resubscribeAll(); + } + + protected void handleDisconnectStream() { + isConnected.set(false); } - protected void onDisconnect(FluxSink frameSink) { - StompDisconnectFrame disconnectFrame = StompDisconnectFrame.builder() + protected void warnAndDropError(Throwable ex, Object value) { + log.log( + Level.WARNING, + MessageFormat.format( + "Json processing error.\n Message: {0}\nValue: {1}\n", + new Object[] {ex.getMessage(), value})); + } + + protected String subscribeDestination(String destination) { + int subscriptionNumber = maxSubscriptionNumber.incrementAndGet(); + String subscriptionId = MessageFormat.format("sub-{0}", subscriptionNumber); + StompFrame frame = StompSubscribeFrame.builder() + .destination(destination) + .subscriptionId(subscriptionId) .build(); - frameSink.next(disconnectFrame); + streamRequestSink.next(frame); + return subscriptionId; + } + + protected void resubscribeAll() { + subscriptionDestinationIdMap.replaceAll( + (dest, id) -> subscribeDestination(dest)); + } + + /** + * Subscribe to a destination. + * + * @param destination The destination channel + * @return A flux of messages on that channel + */ + public Flux subscribe(String destination, Class messageType) { + connect(); + subscriptionDestinationIdMap.computeIfAbsent(destination, this::subscribeDestination); + + ThrowableMapTransformer messageTransformer = + new ThrowableMapTransformer<>(frame -> readStompMessageFrame(frame, messageType)); + + Flux messageFrameFlux = sharedStream.get() + .filter(frame -> frame.getDestination().equals(destination)) + .doOnTerminate(() -> unsubscribe(destination)); + + return messageTransformer.apply(messageFrameFlux); + } + + protected void unsubscribe(String destination) { + subscriptionDestinationIdMap.computeIfPresent(destination, (dest, subscriptionId) -> { + StompFrame frame = StompUnsubscribeFrame.builder() + .subscriptionId(subscriptionId) + .build(); + streamRequestSink.next(frame); + return null; + }); + } + + protected T readStompMessageFrame(StompMessageFrame frame, Class messageType) + throws IOException { + return objectMapper.readValue(frame.getBody(), messageType); + } + + /** + * Send a message to a destination. + * @param The type of object to send + * @param message The message + * @param destination The destination + * @throws JsonProcessingException If the message cannot be JSON encoded + */ + public void sendMessage(O message, String destination) throws JsonProcessingException { + String body = objectMapper.writeValueAsString(message); + StompFrame frame = StompSendFrame.builder() + .destination(destination) + .body(body).build(); + streamRequestSink.next(frame); } } + diff --git a/src/main/java/com/trickl/flux/websocket/stomp/StompFrame.java b/src/main/java/com/trickl/flux/websocket/stomp/StompFrame.java index fad1ff1..3de6aba 100644 --- a/src/main/java/com/trickl/flux/websocket/stomp/StompFrame.java +++ b/src/main/java/com/trickl/flux/websocket/stomp/StompFrame.java @@ -1,7 +1,5 @@ package com.trickl.flux.websocket.stomp; -import com.fasterxml.jackson.databind.ObjectMapper; - import java.io.IOException; import org.springframework.messaging.Message; @@ -10,5 +8,5 @@ public interface StompFrame { StompHeaderAccessor getHeaderAccessor(); - Message toMessage(ObjectMapper objectMapper) throws IOException; + Message toMessage() throws IOException; } \ No newline at end of file diff --git a/src/main/java/com/trickl/flux/websocket/stomp/StompFrameBuilder.java b/src/main/java/com/trickl/flux/websocket/stomp/StompFrameBuilder.java index c237581..bd0e55b 100644 --- a/src/main/java/com/trickl/flux/websocket/stomp/StompFrameBuilder.java +++ b/src/main/java/com/trickl/flux/websocket/stomp/StompFrameBuilder.java @@ -1,6 +1,5 @@ package com.trickl.flux.websocket.stomp; -import com.fasterxml.jackson.databind.ObjectMapper; import com.trickl.flux.websocket.stomp.StompFrame; import com.trickl.flux.websocket.stomp.frames.StompConnectedFrame; import com.trickl.flux.websocket.stomp.frames.StompErrorFrame; @@ -16,10 +15,7 @@ import org.springframework.messaging.simp.stomp.StompHeaderAccessor; @RequiredArgsConstructor -public class StompFrameBuilder implements Function, StompFrame> { - - private final ObjectMapper objectMapper; - private final Class messageType; +public class StompFrameBuilder implements Function, StompFrame> { /** * Build a stomp frame from a websocket message. @@ -33,7 +29,7 @@ public StompFrame apply(Message message) { switch (headerAccessor.getCommand()) { case MESSAGE: return StompMessageFrame.create( - headerAccessor, message.getPayload(), objectMapper, messageType); + headerAccessor, message.getPayload()); case CONNECTED: return StompConnectedFrame.create(headerAccessor); case RECEIPT: diff --git a/src/main/java/com/trickl/flux/websocket/stomp/StompInputTransformer.java b/src/main/java/com/trickl/flux/websocket/stomp/StompInputTransformer.java index a0ee89e..cfd3f13 100644 --- a/src/main/java/com/trickl/flux/websocket/stomp/StompInputTransformer.java +++ b/src/main/java/com/trickl/flux/websocket/stomp/StompInputTransformer.java @@ -1,6 +1,5 @@ package com.trickl.flux.websocket.stomp; -import com.fasterxml.jackson.databind.ObjectMapper; import com.trickl.flux.websocket.stomp.StompFrame; import java.io.IOException; @@ -13,19 +12,16 @@ import reactor.core.publisher.Flux; @RequiredArgsConstructor -public class StompInputTransformer implements Function, Flux> { - - private final ObjectMapper objectMapper; - private final Class messageType; +public class StompInputTransformer implements Function, Flux> { @Override public Flux apply(Publisher source) { - StompMessageCodec codec = new StompMessageCodec<>(objectMapper, messageType); + StompMessageCodec codec = new StompMessageCodec(); return Flux.from(source) .flatMap(payload -> handleFrame(payload, codec)); } - protected Publisher handleFrame(byte[] payload, StompMessageCodec codec) { + protected Publisher handleFrame(byte[] payload, StompMessageCodec codec) { List frames; try { frames = codec.decode(payload); diff --git a/src/main/java/com/trickl/flux/websocket/stomp/StompMessageCodec.java b/src/main/java/com/trickl/flux/websocket/stomp/StompMessageCodec.java index 20e03ef..ab4997d 100644 --- a/src/main/java/com/trickl/flux/websocket/stomp/StompMessageCodec.java +++ b/src/main/java/com/trickl/flux/websocket/stomp/StompMessageCodec.java @@ -1,7 +1,5 @@ package com.trickl.flux.websocket.stomp; -import com.fasterxml.jackson.databind.ObjectMapper; - import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; @@ -17,10 +15,7 @@ @Log @RequiredArgsConstructor -public class StompMessageCodec { - - private final ObjectMapper objectMapper; - private final Class messageType; +public class StompMessageCodec { private final StompEncoder encoder = new StompEncoder(); private final StompDecoder decoder = new StompDecoder(); @@ -34,7 +29,7 @@ public class StompMessageCodec { public byte[] encode(StompFrame stompMessage) throws IOException { log.log( Level.FINE, "\u001B[34mSENDING {0}\u001B[0m", new Object[] {stompMessage}); - Message message = stompMessage.toMessage(objectMapper); + Message message = stompMessage.toMessage(); return encoder.encode(message); } @@ -45,8 +40,8 @@ public byte[] encode(StompFrame stompMessage) throws IOException { * @throws IOException If the message cannot be decoded */ public List decode(byte[] payload) throws IOException { - StompFrameBuilder frameBuilder - = new StompFrameBuilder<>(objectMapper, messageType); + StompFrameBuilder frameBuilder + = new StompFrameBuilder(); ByteBuffer byteBuffer = ByteBuffer.wrap(payload); List> messages = decoder.decode(byteBuffer); return messages.stream() diff --git a/src/main/java/com/trickl/flux/websocket/stomp/StompOutputTransformer.java b/src/main/java/com/trickl/flux/websocket/stomp/StompOutputTransformer.java index 6097ae1..2228a96 100644 --- a/src/main/java/com/trickl/flux/websocket/stomp/StompOutputTransformer.java +++ b/src/main/java/com/trickl/flux/websocket/stomp/StompOutputTransformer.java @@ -1,7 +1,5 @@ package com.trickl.flux.websocket.stomp; -import com.fasterxml.jackson.databind.ObjectMapper; - import java.io.IOException; import java.util.function.Function; @@ -12,19 +10,16 @@ import reactor.core.publisher.Mono; @RequiredArgsConstructor -public class StompOutputTransformer implements Function, Flux> { - - private final ObjectMapper objectMapper; - private final Class messageType; +public class StompOutputTransformer implements Function, Flux> { @Override public Flux apply(Publisher source) { - StompMessageCodec codec = new StompMessageCodec<>(objectMapper, messageType); + StompMessageCodec codec = new StompMessageCodec(); return Flux.from(source) .flatMap(payload -> sendFrame(payload, codec)); } - protected Publisher sendFrame(StompFrame payload, StompMessageCodec codec) { + protected Publisher sendFrame(StompFrame payload, StompMessageCodec codec) { if (payload == null) { return Mono.empty(); diff --git a/src/main/java/com/trickl/flux/websocket/stomp/frames/StompConnectFrame.java b/src/main/java/com/trickl/flux/websocket/stomp/frames/StompConnectFrame.java index 22f0144..cb1e3d3 100644 --- a/src/main/java/com/trickl/flux/websocket/stomp/frames/StompConnectFrame.java +++ b/src/main/java/com/trickl/flux/websocket/stomp/frames/StompConnectFrame.java @@ -1,6 +1,5 @@ package com.trickl.flux.websocket.stomp.frames; -import com.fasterxml.jackson.databind.ObjectMapper; import com.trickl.flux.websocket.stomp.StompFrame; import lombok.Builder; @@ -30,7 +29,7 @@ public StompHeaderAccessor getHeaderAccessor() { /** * Convert to the websocket message. */ - public Message toMessage(ObjectMapper objectMapper) { + public Message toMessage() { return MessageBuilder.createMessage(new byte[0], getHeaderAccessor().toMessageHeaders()); } diff --git a/src/main/java/com/trickl/flux/websocket/stomp/frames/StompConnectedFrame.java b/src/main/java/com/trickl/flux/websocket/stomp/frames/StompConnectedFrame.java index 85c04db..f858668 100644 --- a/src/main/java/com/trickl/flux/websocket/stomp/frames/StompConnectedFrame.java +++ b/src/main/java/com/trickl/flux/websocket/stomp/frames/StompConnectedFrame.java @@ -1,6 +1,5 @@ package com.trickl.flux.websocket.stomp.frames; -import com.fasterxml.jackson.databind.ObjectMapper; import com.trickl.flux.websocket.stomp.StompFrame; import lombok.Builder; @@ -39,7 +38,7 @@ public static StompFrame create(StompHeaderAccessor headerAccessor) { /** * Convert to the websocket message. */ - public Message toMessage(ObjectMapper objectMapper) { + public Message toMessage() { return MessageBuilder.createMessage(new byte[0], getHeaderAccessor().toMessageHeaders()); } diff --git a/src/main/java/com/trickl/flux/websocket/stomp/frames/StompDisconnectFrame.java b/src/main/java/com/trickl/flux/websocket/stomp/frames/StompDisconnectFrame.java index 9e37076..5961875 100644 --- a/src/main/java/com/trickl/flux/websocket/stomp/frames/StompDisconnectFrame.java +++ b/src/main/java/com/trickl/flux/websocket/stomp/frames/StompDisconnectFrame.java @@ -1,6 +1,5 @@ package com.trickl.flux.websocket.stomp.frames; -import com.fasterxml.jackson.databind.ObjectMapper; import com.trickl.flux.websocket.stomp.StompFrame; import lombok.Builder; @@ -25,7 +24,7 @@ public StompHeaderAccessor getHeaderAccessor() { /** * Convert to the websocket message. */ - public Message toMessage(ObjectMapper objectMapper) { + public Message toMessage() { return MessageBuilder.createMessage(new byte[0], getHeaderAccessor().toMessageHeaders()); } } \ No newline at end of file diff --git a/src/main/java/com/trickl/flux/websocket/stomp/frames/StompErrorFrame.java b/src/main/java/com/trickl/flux/websocket/stomp/frames/StompErrorFrame.java index 3b7bd1d..3eb9451 100644 --- a/src/main/java/com/trickl/flux/websocket/stomp/frames/StompErrorFrame.java +++ b/src/main/java/com/trickl/flux/websocket/stomp/frames/StompErrorFrame.java @@ -1,6 +1,5 @@ package com.trickl.flux.websocket.stomp.frames; -import com.fasterxml.jackson.databind.ObjectMapper; import com.trickl.flux.websocket.stomp.StompFrame; import lombok.Builder; @@ -39,8 +38,7 @@ public static StompFrame create(StompHeaderAccessor headerAccessor) { /** * Convert to the websocket message. */ - public Message toMessage(ObjectMapper objectMapper) { - + public Message toMessage() { return MessageBuilder.createMessage(new byte[0], getHeaderAccessor().toMessageHeaders()); } } \ No newline at end of file diff --git a/src/main/java/com/trickl/flux/websocket/stomp/frames/StompMessageFrame.java b/src/main/java/com/trickl/flux/websocket/stomp/frames/StompMessageFrame.java index 467ef36..c7b3bdd 100644 --- a/src/main/java/com/trickl/flux/websocket/stomp/frames/StompMessageFrame.java +++ b/src/main/java/com/trickl/flux/websocket/stomp/frames/StompMessageFrame.java @@ -1,10 +1,8 @@ package com.trickl.flux.websocket.stomp.frames; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.trickl.flux.websocket.stomp.StompFrame; -import java.io.IOException; import java.nio.charset.StandardCharsets; import lombok.Builder; @@ -18,8 +16,8 @@ @Data @Builder -public class StompMessageFrame implements StompFrame { - protected T value; +public class StompMessageFrame implements StompFrame { + protected String body; protected String destination; protected String messageId; protected String subscriptionId; @@ -42,23 +40,16 @@ public StompHeaderAccessor getHeaderAccessor() { * @param payload The message payload * @return A typed message */ - public static StompFrame create( + public static StompFrame create( StompHeaderAccessor headerAccessor, - byte[] payload, - ObjectMapper objectMapper, - Class valueType) throws StompConversionException { - try { - String content = new String(payload, StandardCharsets.UTF_8); - T value = objectMapper.readValue(content, valueType); - return StompMessageFrame.builder() - .destination(headerAccessor.getDestination()) - .messageId(headerAccessor.getMessageId()) - .subscriptionId(headerAccessor.getSubscriptionId()) - .value(value) - .build(); - } catch (IOException ex) { - throw new StompConversionException("Unable to read STOMP message", ex); - } + byte[] payload) throws StompConversionException { + String body = new String(payload, StandardCharsets.UTF_8); + return StompMessageFrame.builder() + .destination(headerAccessor.getDestination()) + .messageId(headerAccessor.getMessageId()) + .subscriptionId(headerAccessor.getSubscriptionId()) + .body(body) + .build(); } /** @@ -66,9 +57,8 @@ public static StompFrame create( * * @throws JsonProcessingException if the message cannot be converted */ - public Message toMessage(ObjectMapper objectMapper) throws JsonProcessingException { - String json = objectMapper.writeValueAsString(value); - return MessageBuilder.createMessage(json.getBytes(StandardCharsets.UTF_8), + public Message toMessage() throws JsonProcessingException { + return MessageBuilder.createMessage(body.getBytes(StandardCharsets.UTF_8), getHeaderAccessor().toMessageHeaders()); } } \ No newline at end of file diff --git a/src/main/java/com/trickl/flux/websocket/stomp/frames/StompReceiptFrame.java b/src/main/java/com/trickl/flux/websocket/stomp/frames/StompReceiptFrame.java index afe11b2..8b8ed76 100644 --- a/src/main/java/com/trickl/flux/websocket/stomp/frames/StompReceiptFrame.java +++ b/src/main/java/com/trickl/flux/websocket/stomp/frames/StompReceiptFrame.java @@ -1,6 +1,5 @@ package com.trickl.flux.websocket.stomp.frames; -import com.fasterxml.jackson.databind.ObjectMapper; import com.trickl.flux.websocket.stomp.StompFrame; import lombok.Builder; @@ -39,8 +38,7 @@ public static StompFrame create(StompHeaderAccessor headerAccessor) { /** * Convert to the websocket message. */ - public Message toMessage(ObjectMapper objectMapper) { - + public Message toMessage() { return MessageBuilder.createMessage(new byte[0], getHeaderAccessor().toMessageHeaders()); } } \ No newline at end of file diff --git a/src/main/java/com/trickl/flux/websocket/stomp/frames/StompSendFrame.java b/src/main/java/com/trickl/flux/websocket/stomp/frames/StompSendFrame.java index 8b126f0..4525f7c 100644 --- a/src/main/java/com/trickl/flux/websocket/stomp/frames/StompSendFrame.java +++ b/src/main/java/com/trickl/flux/websocket/stomp/frames/StompSendFrame.java @@ -1,7 +1,5 @@ package com.trickl.flux.websocket.stomp.frames; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.trickl.flux.websocket.stomp.StompFrame; import java.nio.charset.StandardCharsets; @@ -16,8 +14,8 @@ @Data @Builder -public class StompSendFrame implements StompFrame { - protected T value; +public class StompSendFrame implements StompFrame { + protected String body; protected String destination; /** @@ -31,12 +29,9 @@ public StompHeaderAccessor getHeaderAccessor() { /** * Convert to the websocket message. - * - * @throws JsonProcessingException if the message cannot be converted */ - public Message toMessage(ObjectMapper objectMapper) throws JsonProcessingException { - String json = objectMapper.writeValueAsString(value); - return MessageBuilder.createMessage(json.getBytes(StandardCharsets.UTF_8), + public Message toMessage() { + return MessageBuilder.createMessage(body.getBytes(StandardCharsets.UTF_8), getHeaderAccessor().toMessageHeaders()); } } \ No newline at end of file diff --git a/src/main/java/com/trickl/flux/websocket/stomp/frames/StompSubscribeFrame.java b/src/main/java/com/trickl/flux/websocket/stomp/frames/StompSubscribeFrame.java index 6e60620..447ef68 100644 --- a/src/main/java/com/trickl/flux/websocket/stomp/frames/StompSubscribeFrame.java +++ b/src/main/java/com/trickl/flux/websocket/stomp/frames/StompSubscribeFrame.java @@ -1,6 +1,5 @@ package com.trickl.flux.websocket.stomp.frames; -import com.fasterxml.jackson.databind.ObjectMapper; import com.trickl.flux.websocket.stomp.StompFrame; import lombok.Builder; @@ -30,7 +29,7 @@ public StompHeaderAccessor getHeaderAccessor() { /** * Convert to the websocket message. */ - public Message toMessage(ObjectMapper objectMapper) { + public Message toMessage() { return MessageBuilder.createMessage(new byte[0], getHeaderAccessor().toMessageHeaders()); } } \ No newline at end of file diff --git a/src/main/java/com/trickl/flux/websocket/stomp/frames/StompUnsubscribeFrame.java b/src/main/java/com/trickl/flux/websocket/stomp/frames/StompUnsubscribeFrame.java new file mode 100644 index 0000000..aa557ad --- /dev/null +++ b/src/main/java/com/trickl/flux/websocket/stomp/frames/StompUnsubscribeFrame.java @@ -0,0 +1,33 @@ +package com.trickl.flux.websocket.stomp.frames; + +import com.trickl.flux.websocket.stomp.StompFrame; + +import lombok.Builder; +import lombok.Data; + +import org.springframework.messaging.Message; +import org.springframework.messaging.simp.stomp.StompCommand; +import org.springframework.messaging.simp.stomp.StompHeaderAccessor; +import org.springframework.messaging.support.MessageBuilder; + +@Data +@Builder +public class StompUnsubscribeFrame implements StompFrame { + protected String subscriptionId; + + /** + * Get the stomp headers for this message. + */ + public StompHeaderAccessor getHeaderAccessor() { + StompHeaderAccessor stompHeaderAccessor = StompHeaderAccessor.create(StompCommand.UNSUBSCRIBE); + stompHeaderAccessor.setSubscriptionId(subscriptionId); + return stompHeaderAccessor; + } + + /** + * Convert to the websocket message. + */ + public Message toMessage() { + return MessageBuilder.createMessage(new byte[0], getHeaderAccessor().toMessageHeaders()); + } +} \ No newline at end of file