Skip to content

Commit

Permalink
Convert header supplier to a mono
Browse files Browse the repository at this point in the history
  • Loading branch information
trickl committed Dec 22, 2019
1 parent 0760f1b commit 5b66096
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class BinaryWebSocketFluxClient {

private final URI transportUrl;

private final Supplier<HttpHeaders> webSocketHeadersProvider;
private final Mono<HttpHeaders> webSocketHeadersProvider;

private final Runnable onConnect;

Expand Down Expand Up @@ -51,8 +51,8 @@ protected Mono<Void> connect(Publisher<byte[]> send, FluxSink<byte[]> receive) {
SessionHandler sessionHandler = new SessionHandler(dataHandler,
sessionId -> onConnect.run());

return webSocketClient
.execute(transportUrl, webSocketHeadersProvider.get(), sessionHandler).log("client")
return webSocketHeadersProvider.flatMap(headers ->
webSocketClient.execute(transportUrl, headers, sessionHandler).log("client"))
.doOnError(receive::error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class TextWebSocketFluxClient {

private final URI transportUrl;

private final Supplier<HttpHeaders> webSocketHeadersProvider;
private final Mono<HttpHeaders> webSocketHeadersProvider;

/**
* Get a flux of messages from the stream.
Expand All @@ -45,8 +45,8 @@ public Flux<String> get(Publisher<String> send) {
protected Mono<Void> connect(Publisher<String> send, FluxSink<String> receive) {
TextWebSocketHandler handler = new TextWebSocketHandler(receive, Flux.from(send));

return webSocketClient
.execute(transportUrl, webSocketHeadersProvider.get(), handler).log("client")
return webSocketHeadersProvider.flatMap(headers ->
webSocketClient.execute(transportUrl, headers, handler).log("client"))
.doOnError(receive::error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
import org.springframework.web.socket.sockjs.client.SockJsUrlInfo;
import org.springframework.web.socket.sockjs.transport.TransportType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RequiredArgsConstructor
public class SockJsFluxClient {
private final WebSocketClient webSocketClient;
private final SockJsUrlInfo sockJsUrlInfo;
private final Supplier<HttpHeaders> webSocketHeadersProvider;
private final Mono<HttpHeaders> webSocketHeadersProvider;
private final ObjectMapper objectMapper;
private final Supplier<String> openMessageSupplier;
private final Supplier<String> hearbeatMessageSupplier;
Expand All @@ -42,6 +43,7 @@ public Flux<String> get(Publisher<String> send) {
webSocketClient,
sockJsUrlInfo.getTransportUrl(TransportType.WEBSOCKET),
webSocketHeadersProvider);

return sockJsInputTransformer.apply(webSocketFluxClient.get(
sockJsOutputTransformer.apply(Flux.from(send))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.trickl.flux.websocket.stomp.frames.StompDisconnectFrame;

import java.net.URI;
import java.util.function.Supplier;

import lombok.RequiredArgsConstructor;

Expand All @@ -17,12 +16,13 @@
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

@RequiredArgsConstructor
public class RawStompFluxClient {
private final WebSocketClient webSocketClient;
private final URI transportUri;
private final Supplier<HttpHeaders> webSocketHeadersProvider;
private final Mono<HttpHeaders> webSocketHeadersProvider;

/**
* Connect to a stomp service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.logging.Level;

import lombok.RequiredArgsConstructor;
Expand All @@ -40,7 +39,7 @@
public class StompFluxClient {
private final WebSocketClient webSocketClient;
private final URI transportUri;
private final Supplier<HttpHeaders> webSocketHeadersProvider;
private final Mono<HttpHeaders> webSocketHeadersProvider;
private final ObjectMapper objectMapper;

private final EmitterProcessor<StompFrame> responseProcessor = EmitterProcessor.create();
Expand Down

0 comments on commit 5b66096

Please sign in to comment.