Skip to content

Commit

Permalink
Use standard design patterns for fluxes
Browse files Browse the repository at this point in the history
  • Loading branch information
trickl committed Jan 5, 2020
1 parent 5b66096 commit 378eaa7
Show file tree
Hide file tree
Showing 15 changed files with 137 additions and 211 deletions.
30 changes: 30 additions & 0 deletions src/main/java/com/trickl/flux/consumers/SimpMessageSender.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.trickl.flux.consumers;

import java.util.function.Consumer;
import java.util.logging.Level;

import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;

import org.springframework.messaging.simp.SimpMessagingTemplate;

@Log
@RequiredArgsConstructor
public class SimpMessageSender<T> implements
Consumer<T> {

private final SimpMessagingTemplate messagingTemplate;
private final String destination;

@Override
public void accept(T t) {
sendMessage(t);
}

void sendMessage(T value) {
log.log(
Level.FINE, "\u001B[32mSENDING ↑ {0}\u001B[0m on {1}",
new Object[] {value, destination});
messagingTemplate.convertAndSend(destination, value);
}
}
31 changes: 31 additions & 0 deletions src/main/java/com/trickl/flux/mappers/DifferentialMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.trickl.flux.mappers;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.reactivestreams.Publisher;

@RequiredArgsConstructor
@AllArgsConstructor
public class DifferentialMapper<T, S>
implements Function<T, Publisher<? extends S>> {

private final BiFunction<T, T, Publisher<S>> transform;

private final T emptyValue;

private AtomicReference<Optional<T>> lastValue =
new AtomicReference<>(Optional.empty());

@Override
public Publisher<S> apply(T t) {
return transform.apply(
lastValue.getAndSet(Optional.of(t))
.orElse(emptyValue),
t
);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.trickl.flux.transformers;
package com.trickl.flux.mappers;

import java.util.function.Function;
import lombok.RequiredArgsConstructor;
Expand All @@ -7,7 +7,8 @@
import reactor.core.publisher.Mono;

@RequiredArgsConstructor
public class ThrowableMapTransformer<T, S> implements Function<Publisher<T>, Flux<S>> {
public class ThrowableMapper<T, S> implements
Function<T, Publisher<? extends S>> {

@FunctionalInterface
public interface ThrowingFunction<T, S, E extends Exception> {
Expand All @@ -16,16 +17,8 @@ public interface ThrowingFunction<T, S, E extends Exception> {

private final ThrowingFunction<T, S, ?> mapper;

/**
* Get a flux of messages from the stream.
*
* @return A flux of (untyped) objects
*/
public Flux<S> apply(Publisher<T> source) {
return Flux.from(source).flatMap(this::map);
}

protected Publisher<S> map(T t) {
@Override
public Publisher<S> apply(T t) {
try {
return Mono.just(mapper.apply(t));
} catch (Exception throwable) {
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.trickl.flux.websocket;

import java.net.URI;
import java.util.function.Supplier;

import lombok.RequiredArgsConstructor;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.trickl.flux.websocket;

import com.trickl.flux.transformers.ThrowableMapTransformer;
import com.trickl.flux.mappers.ThrowableMapper;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -26,9 +26,8 @@ public class BinaryWebSocketHandler implements WebSocketHandler {

@Override
public Mono<Void> handle(WebSocketSession session) {
ThrowableMapTransformer<WebSocketMessage, WebSocketMessage> messageHandler =
new ThrowableMapTransformer<>(this::handleMessage);
Mono<Void> input = messageHandler.apply(session.receive()).then();
Mono<Void> input = session.receive().flatMap(
new ThrowableMapper<>(this::handleMessage)).then();

Mono<Void> output =
session.send(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.trickl.flux.websocket;

import java.net.URI;
import java.util.function.Supplier;
import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;
import org.reactivestreams.Publisher;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.trickl.flux.websocket;

import com.trickl.exceptions.SubscriptionFailedException;
import com.trickl.flux.publishers.SimpMessagingPublisher;
import com.trickl.flux.consumers.SimpMessageSender;

import java.security.Principal;
import java.text.MessageFormat;
Expand Down Expand Up @@ -71,16 +71,17 @@ protected void subscribe(Principal user, String destination, String subscription
.orElseThrow(() -> new SubscriptionFailedException(
MessageFormat.format("Destination: {0} not found.", destination)));

Flux<?> flux = this.fluxes.computeIfAbsent(webSocketRequest, fluxFactory::apply)
Flux<?> connectableFlux = this.fluxes.computeIfAbsent(
webSocketRequest, fluxFactory::apply)
.orElseThrow(() -> new SubscriptionFailedException(
MessageFormat.format("Destination: {0} not found.", destination)));

SimpMessagingPublisher<?> broadcaster = new SimpMessagingPublisher<>(
flux, messagingTemplate, destination);
MessageFormat.format("Destination: {0} not found.", destination)))
.doOnNext(new SimpMessageSender<>(messagingTemplate, destination))
.publish()
.refCount();

Disposable subscription = Flux.from(broadcaster.get())
Disposable subscription = connectableFlux
.doOnCancel(() -> this.fluxes.remove(webSocketRequest))
.doOnComplete(() -> this.fluxes.remove(webSocketRequest))
.doOnTerminate(() -> this.fluxes.remove(webSocketRequest))
.subscribe();

this.subscriptions.put(subscriptionId, subscription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.trickl.flux.transformers.ThrowableMapTransformer;
import com.trickl.flux.mappers.ThrowableMapper;
import com.trickl.flux.websocket.stomp.StompFrame;
import com.trickl.flux.websocket.stomp.frames.StompConnectedFrame;
import com.trickl.flux.websocket.stomp.frames.StompMessageFrame;
Expand Down Expand Up @@ -62,7 +62,7 @@ public class StompFluxClient {
* Connect to the stomp transport.
*/
public void connect() {
if (isConnected.get() && isConnecting.compareAndSet(false, true)) {
if (isConnected.get() || !isConnecting.compareAndSet(false, true)) {
// Already connected
return;
}
Expand Down Expand Up @@ -153,14 +153,12 @@ public <T> Flux<T> subscribe(String destination, Class<T> messageType) {
connect();
subscriptionDestinationIdMap.computeIfAbsent(destination, this::subscribeDestination);

ThrowableMapTransformer<StompMessageFrame, T> messageTransformer =
new ThrowableMapTransformer<>(frame -> readStompMessageFrame(frame, messageType));

Flux<StompMessageFrame> messageFrameFlux = sharedStream.get()
.filter(frame -> frame.getDestination().equals(destination))
.doOnTerminate(() -> unsubscribe(destination));

return messageTransformer.apply(messageFrameFlux);
return messageFrameFlux.flatMap(
new ThrowableMapper<>(frame -> readStompMessageFrame(frame, messageType)));
}

protected void unsubscribe(String destination) {
Expand Down
Loading

0 comments on commit 378eaa7

Please sign in to comment.