Skip to content

Commit

Permalink
Add utility for conditional timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
trickl committed Jan 17, 2020
1 parent 9cb4586 commit c116063
Show file tree
Hide file tree
Showing 9 changed files with 292 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.trickl.flux.publishers;

import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.logging.Level;

import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;

import org.reactivestreams.Publisher;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

@Log
@RequiredArgsConstructor
public class ConditionalTimeoutPublisher<T> implements Supplier<Mono<T>> {

private final Publisher<T> source;
private final Duration timeout;
private final Predicate<? super T> condition;
private final Supplier<Throwable> onTimeoutThrow;
private final Runnable onTimeoutDo;
private final Scheduler scheduler;

@Override
public Mono<T> get() {

return Flux.from(source)
.filter(condition)
.timeout(timeout, scheduler)
.doOnError(error -> {
if (error instanceof TimeoutException
&& onTimeoutDo != null) {
onTimeoutDo.run();
}
})
.onErrorContinue(error -> {
if (error instanceof TimeoutException) {
return onTimeoutThrow == null;
}
return false;
}, (error, value) ->
log.log(Level.FINE, "Timeout captured.")
)
.onErrorMap(error -> {
if (error instanceof TimeoutException
&& onTimeoutThrow != null) {
return onTimeoutThrow.get();
}
return error;
})
.ignoreElements();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.trickl.flux.websocket.sockjs;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.trickl.flux.websocket.TextWebSocketFluxClient;

import java.util.function.Function;
import java.util.function.Supplier;
import lombok.RequiredArgsConstructor;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import org.springframework.web.socket.sockjs.client.SockJsUrlInfo;
import org.springframework.web.socket.sockjs.transport.TransportType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RequiredArgsConstructor
public class RawSockJsFluxClient {
private final WebSocketClient webSocketClient;
private final SockJsUrlInfo sockJsUrlInfo;
private final Mono<HttpHeaders> webSocketHeadersProvider;
private final ObjectMapper objectMapper;
private final Supplier<String> openMessageSupplier;
private final Supplier<String> hearbeatMessageSupplier;
private final Function<CloseStatus, String> closeMessageFunction;

/**
* Connect to a sockjs service.
*
* @param send The stream of messages to send.
* @return A stream of messages received.
*/
public Flux<String> get(Publisher<String> send) {
SockJsInputTransformer sockJsInputTransformer =
new SockJsInputTransformer(
objectMapper, openMessageSupplier, hearbeatMessageSupplier, closeMessageFunction);
SockJsOutputTransformer sockJsOutputTransformer =
new SockJsOutputTransformer(objectMapper);

TextWebSocketFluxClient webSocketFluxClient =
new TextWebSocketFluxClient(
webSocketClient,
sockJsUrlInfo.getTransportUrl(TransportType.WEBSOCKET),
webSocketHeadersProvider);

return sockJsInputTransformer.apply(webSocketFluxClient.get(
sockJsOutputTransformer.apply(Flux.from(send))));
}
}
Original file line number Diff line number Diff line change
@@ -1,50 +1,96 @@
package com.trickl.flux.websocket.sockjs;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.trickl.flux.websocket.TextWebSocketFluxClient;
import com.trickl.flux.mappers.ThrowableMapper;
import com.trickl.flux.websocket.sockjs.RawSockJsFluxClient;
import com.trickl.flux.websocket.sockjs.frames.SockJsClose;
import com.trickl.flux.websocket.sockjs.frames.SockJsFrame;
import com.trickl.flux.websocket.sockjs.frames.SockJsHeartbeat;
import com.trickl.flux.websocket.sockjs.frames.SockJsMessage;
import com.trickl.flux.websocket.sockjs.frames.SockJsOpen;

import java.io.IOException;
import java.text.MessageFormat;
import java.util.logging.Level;

import java.util.function.Function;
import java.util.function.Supplier;
import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;

import org.reactivestreams.Publisher;
import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import org.springframework.web.socket.sockjs.client.SockJsUrlInfo;
import org.springframework.web.socket.sockjs.transport.TransportType;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

@Log
@RequiredArgsConstructor
public class SockJsFluxClient {
private final WebSocketClient webSocketClient;
private final SockJsUrlInfo sockJsUrlInfo;
private final Mono<HttpHeaders> webSocketHeadersProvider;
private final ObjectMapper objectMapper;
private final Supplier<String> openMessageSupplier;
private final Supplier<String> hearbeatMessageSupplier;
private final Function<CloseStatus, String> closeMessageFunction;

private final EmitterProcessor<SockJsFrame> responseProcessor = EmitterProcessor.create();

private static final String SOCK_JS_OPEN = "o";
private static final String SOCK_JS_CLOSE = "c";
private static final String SOCK_JS_HEARTBEAT = "h";

/**
* Connect to a sockjs service.
* Get messages from the stream.
*
* @param send The stream of messages to send.
* @return A stream of messages received.
* @return A reactive stream of messages.
*/
public Flux<String> get(Publisher<String> send) {
SockJsInputTransformer sockJsInputTransformer =
new SockJsInputTransformer(
objectMapper, openMessageSupplier, hearbeatMessageSupplier, closeMessageFunction);
SockJsOutputTransformer sockJsOutputTransformer =
new SockJsOutputTransformer(objectMapper);

TextWebSocketFluxClient webSocketFluxClient =
new TextWebSocketFluxClient(
public Flux<SockJsFrame> get(Publisher<SockJsFrame> send) {
RawSockJsFluxClient sockJsClient =
new RawSockJsFluxClient(
webSocketClient,
sockJsUrlInfo.getTransportUrl(TransportType.WEBSOCKET),
webSocketHeadersProvider);

return sockJsInputTransformer.apply(webSocketFluxClient.get(
sockJsOutputTransformer.apply(Flux.from(send))));
sockJsUrlInfo,
webSocketHeadersProvider,
objectMapper,
() -> SOCK_JS_OPEN,
() -> SOCK_JS_HEARTBEAT,
(CloseStatus status) -> SOCK_JS_CLOSE + status.getCode());

Publisher<SockJsFrame> sendWithResponse = Flux.merge(send, responseProcessor);

FluxSink<SockJsFrame> responseSink = responseProcessor.sink();
return sockJsClient.get(Flux.from(sendWithResponse).flatMap(new ThrowableMapper<>(this::write)))
.flatMap(
new ThrowableMapper<String, SockJsFrame>(message -> read(message, responseSink)))
.onErrorContinue(JsonProcessingException.class, this::warnAndDropError)
.publish()
.refCount();
}

protected void warnAndDropError(Throwable ex, Object value) {
log.log(Level.WARNING, MessageFormat.format(
"Json processing error.\n Message: {0}\nValue: {1}\n",
new Object[] {ex.getMessage(), value}));
}

protected SockJsFrame read(String message, FluxSink<SockJsFrame> respond)
throws IOException {

// Handle sockJs messages
if (message.startsWith(SOCK_JS_OPEN)) {
return new SockJsOpen();
} else if (message.startsWith(SOCK_JS_HEARTBEAT)) {
return new SockJsHeartbeat();
} else if (message.startsWith(SOCK_JS_CLOSE)) {
int closeStatus = Integer.parseInt(message.substring(SOCK_JS_CLOSE.length()));
return new SockJsClose(closeStatus);
}

return new SockJsMessage(message);
}

protected String write(SockJsFrame request) throws JsonProcessingException {
return objectMapper.writeValueAsString(request);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.trickl.flux.websocket.sockjs.frames;

import lombok.Value;

@Value
public class SockJsClose implements SockJsFrame {
protected int status;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.trickl.flux.websocket.sockjs.frames;

/** Any response received on the stream. */
public interface SockJsFrame {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.trickl.flux.websocket.sockjs.frames;

public class SockJsHeartbeat implements SockJsFrame {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.trickl.flux.websocket.sockjs.frames;

import lombok.Value;

@Value
public class SockJsMessage implements SockJsFrame {
protected String payload;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.trickl.flux.websocket.sockjs.frames;

public class SockJsOpen implements SockJsFrame {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.trickl.flux.publishers;

import java.time.Duration;
import java.util.concurrent.TimeoutException;

import org.junit.Test;
import org.reactivestreams.Publisher;

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;

public class ConditionalTimeoutPublisherTest {


@Test
public void expectContinueWithElements() {
TestPublisher<String> input = TestPublisher.<String>create();

ConditionalTimeoutPublisher<String> timeoutPublisher =
new ConditionalTimeoutPublisher<>(
input,
Duration.ofSeconds(15),
value -> true,
TimeoutException::new,
null,
Schedulers.parallel());

Publisher<String> output = Flux.from(input).mergeWith(timeoutPublisher.get());

StepVerifier.withVirtualTime(() -> output)
.then(() -> input.emit("first"))
.expectNoEvent(Duration.ofSeconds(10))
.then(() -> input.emit("second"))
.thenAwait(Duration.ofSeconds(10))
.then(input::complete)
.expectComplete();
}


@Test
public void expectExceptionWithoutElements() {
TestPublisher<String> input = TestPublisher.<String>create();

ConditionalTimeoutPublisher<String> timeoutPublisher =
new ConditionalTimeoutPublisher<>(
input,
Duration.ofSeconds(15),
value -> true,
TimeoutException::new,
null,
Schedulers.parallel());

Publisher<String> output = Flux.from(input).mergeWith(timeoutPublisher.get());

StepVerifier.withVirtualTime(() -> output)
.then(() -> input.emit("first"))
.expectNoEvent(Duration.ofSeconds(10))
.thenAwait(Duration.ofSeconds(10))
.expectError(TimeoutException.class);
}

@Test
public void expectEarlyCompleteIfRequired() {
TestPublisher<String> input = TestPublisher.<String>create();

ConditionalTimeoutPublisher<String> timeoutPublisher =
new ConditionalTimeoutPublisher<>(
input,
Duration.ofSeconds(15),
value -> true,
null,
input::complete,
Schedulers.parallel());

Publisher<String> output = Flux.from(input).mergeWith(timeoutPublisher.get());

StepVerifier.withVirtualTime(() -> output)
.then(() -> input.emit("something"))
.expectNoEvent(Duration.ofSeconds(10))
.thenAwait(Duration.ofSeconds(10))
.expectComplete();
}
}

0 comments on commit c116063

Please sign in to comment.