diff --git a/src/main/java/com/trickl/flux/consumers/SimpMessageSender.java b/src/main/java/com/trickl/flux/consumers/SimpMessageSender.java new file mode 100644 index 0000000..0bc7302 --- /dev/null +++ b/src/main/java/com/trickl/flux/consumers/SimpMessageSender.java @@ -0,0 +1,30 @@ +package com.trickl.flux.consumers; + +import java.util.function.Consumer; +import java.util.logging.Level; + +import lombok.RequiredArgsConstructor; +import lombok.extern.java.Log; + +import org.springframework.messaging.simp.SimpMessagingTemplate; + +@Log +@RequiredArgsConstructor +public class SimpMessageSender implements + Consumer { + + private final SimpMessagingTemplate messagingTemplate; + private final String destination; + + @Override + public void accept(T t) { + sendMessage(t); + } + + void sendMessage(T value) { + log.log( + Level.FINE, "\u001B[32mSENDING ↑ {0}\u001B[0m on {1}", + new Object[] {value, destination}); + messagingTemplate.convertAndSend(destination, value); + } +} \ No newline at end of file diff --git a/src/main/java/com/trickl/flux/mappers/DifferentialMapper.java b/src/main/java/com/trickl/flux/mappers/DifferentialMapper.java new file mode 100644 index 0000000..1902c6c --- /dev/null +++ b/src/main/java/com/trickl/flux/mappers/DifferentialMapper.java @@ -0,0 +1,31 @@ +package com.trickl.flux.mappers; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import java.util.function.Function; +import lombok.AllArgsConstructor; +import lombok.RequiredArgsConstructor; +import org.reactivestreams.Publisher; + +@RequiredArgsConstructor +@AllArgsConstructor +public class DifferentialMapper + implements Function> { + + private final BiFunction> transform; + + private final T emptyValue; + + private AtomicReference> lastValue = + new AtomicReference<>(Optional.empty()); + + @Override + public Publisher apply(T t) { + return transform.apply( + lastValue.getAndSet(Optional.of(t)) + .orElse(emptyValue), + t + ); + } +} diff --git a/src/main/java/com/trickl/flux/transformers/ThrowableMapTransformer.java b/src/main/java/com/trickl/flux/mappers/ThrowableMapper.java similarity index 58% rename from src/main/java/com/trickl/flux/transformers/ThrowableMapTransformer.java rename to src/main/java/com/trickl/flux/mappers/ThrowableMapper.java index 8651596..6ddaf9b 100644 --- a/src/main/java/com/trickl/flux/transformers/ThrowableMapTransformer.java +++ b/src/main/java/com/trickl/flux/mappers/ThrowableMapper.java @@ -1,4 +1,4 @@ -package com.trickl.flux.transformers; +package com.trickl.flux.mappers; import java.util.function.Function; import lombok.RequiredArgsConstructor; @@ -7,7 +7,8 @@ import reactor.core.publisher.Mono; @RequiredArgsConstructor -public class ThrowableMapTransformer implements Function, Flux> { +public class ThrowableMapper implements + Function> { @FunctionalInterface public interface ThrowingFunction { @@ -16,16 +17,8 @@ public interface ThrowingFunction { private final ThrowingFunction mapper; - /** - * Get a flux of messages from the stream. - * - * @return A flux of (untyped) objects - */ - public Flux apply(Publisher source) { - return Flux.from(source).flatMap(this::map); - } - - protected Publisher map(T t) { + @Override + public Publisher apply(T t) { try { return Mono.just(mapper.apply(t)); } catch (Exception throwable) { diff --git a/src/main/java/com/trickl/flux/publishers/DifferentialPollPublisher.java b/src/main/java/com/trickl/flux/publishers/DifferentialPollPublisher.java deleted file mode 100644 index 0bf46a1..0000000 --- a/src/main/java/com/trickl/flux/publishers/DifferentialPollPublisher.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.trickl.flux.publishers; - -import com.trickl.flux.transformers.DifferentialTransformer; - -import java.time.Instant; -import java.util.function.BiFunction; -import java.util.function.Supplier; - -import lombok.RequiredArgsConstructor; -import org.reactivestreams.Publisher; - -import reactor.core.publisher.Flux; - -@RequiredArgsConstructor -public class DifferentialPollPublisher implements Supplier> { - - private final Publisher timePublisher; - private final BiFunction> pollBetween; - - @Override - public Flux get() { - DifferentialTransformer> pollTransform = - new DifferentialTransformer<>(pollBetween); - - return pollTransform.apply(timePublisher) - .flatMap(pub -> pub); - } -} \ No newline at end of file diff --git a/src/main/java/com/trickl/flux/publishers/SimpMessagingPublisher.java b/src/main/java/com/trickl/flux/publishers/SimpMessagingPublisher.java deleted file mode 100644 index 6fb7977..0000000 --- a/src/main/java/com/trickl/flux/publishers/SimpMessagingPublisher.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.trickl.flux.publishers; - -import java.util.function.Supplier; -import java.util.logging.Level; -import lombok.extern.java.Log; -import org.reactivestreams.Publisher; -import org.springframework.messaging.simp.SimpMessagingTemplate; - -import reactor.core.publisher.Flux; - -@Log -public class SimpMessagingPublisher implements Supplier> { - - private final Publisher source; - private final SimpMessagingTemplate messagingTemplate; - private final String destination; - private final Publisher publisher; - - /** - * Create a message topic broadcaster. - * - * @param source The underlying data source - * @param messagingTemplate Messaging template for broadcast - * @param destination The destination of messages - */ - public SimpMessagingPublisher( - Publisher source, SimpMessagingTemplate messagingTemplate, String destination) { - this.source = source; - this.messagingTemplate = messagingTemplate; - this.destination = destination; - - publisher = Flux.from(this.source) - .doOnNext(this::sendMessage) - .publish() - .refCount(); - } - - @Override - public Publisher get() { - return publisher; - } - - void sendMessage(T value) { - log.log( - Level.FINE, "\u001B[32mSENDING ↑ {0}\u001B[0m on {1}", - new Object[] {value, destination}); - messagingTemplate.convertAndSend(destination, value); - } -} \ No newline at end of file diff --git a/src/main/java/com/trickl/flux/transformers/DifferentialTransformer.java b/src/main/java/com/trickl/flux/transformers/DifferentialTransformer.java deleted file mode 100644 index 4cbeb87..0000000 --- a/src/main/java/com/trickl/flux/transformers/DifferentialTransformer.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.trickl.flux.transformers; - -import java.util.Optional; -import java.util.function.BiFunction; -import java.util.function.Function; -import lombok.AllArgsConstructor; -import lombok.RequiredArgsConstructor; -import org.reactivestreams.Publisher; -import reactor.core.publisher.Flux; - -@RequiredArgsConstructor -@AllArgsConstructor -public class DifferentialTransformer implements Function, Flux> { - - private final BiFunction transform; - - private T emptyValue = null; - - @Override - public Flux apply(Publisher source) { - return Flux.from(source) - .map(Optional::of) - .startWith(Optional.empty()) - .buffer(2, 1) - .map( - list -> - transform.apply( - list.get(0).isPresent() ? list.get(0).get() : emptyValue, - list.size() > 1 && list.get(1).isPresent() ? list.get(1).get() : emptyValue)); - } -} diff --git a/src/main/java/com/trickl/flux/transformers/SingletonFluxTransfomer.java b/src/main/java/com/trickl/flux/transformers/SingletonFluxTransfomer.java deleted file mode 100644 index 8551871..0000000 --- a/src/main/java/com/trickl/flux/transformers/SingletonFluxTransfomer.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.trickl.flux.transformers; - -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; - -import org.reactivestreams.Publisher; -import reactor.core.publisher.Flux; - -public class SingletonFluxTransfomer implements Function, Flux> { - - private final AtomicReference> sharedFlux = new AtomicReference<>(); - - /** - * Get a flux of messages from the stream. - * - * @return A flux of (untyped) objects - */ - public Flux apply(Publisher source) { - return sharedFlux.updateAndGet(flux -> { - if (flux == null) { - flux = Flux.from(source).publish().refCount(); - } - return flux; - }); - } -} diff --git a/src/main/java/com/trickl/flux/websocket/BinaryWebSocketFluxClient.java b/src/main/java/com/trickl/flux/websocket/BinaryWebSocketFluxClient.java index daccb38..a181bfe 100644 --- a/src/main/java/com/trickl/flux/websocket/BinaryWebSocketFluxClient.java +++ b/src/main/java/com/trickl/flux/websocket/BinaryWebSocketFluxClient.java @@ -1,7 +1,6 @@ package com.trickl.flux.websocket; import java.net.URI; -import java.util.function.Supplier; import lombok.RequiredArgsConstructor; diff --git a/src/main/java/com/trickl/flux/websocket/BinaryWebSocketHandler.java b/src/main/java/com/trickl/flux/websocket/BinaryWebSocketHandler.java index e51008c..cac39e2 100644 --- a/src/main/java/com/trickl/flux/websocket/BinaryWebSocketHandler.java +++ b/src/main/java/com/trickl/flux/websocket/BinaryWebSocketHandler.java @@ -1,6 +1,6 @@ package com.trickl.flux.websocket; -import com.trickl.flux.transformers.ThrowableMapTransformer; +import com.trickl.flux.mappers.ThrowableMapper; import java.io.IOException; import java.io.InputStream; @@ -26,9 +26,8 @@ public class BinaryWebSocketHandler implements WebSocketHandler { @Override public Mono handle(WebSocketSession session) { - ThrowableMapTransformer messageHandler = - new ThrowableMapTransformer<>(this::handleMessage); - Mono input = messageHandler.apply(session.receive()).then(); + Mono input = session.receive().flatMap( + new ThrowableMapper<>(this::handleMessage)).then(); Mono output = session.send( diff --git a/src/main/java/com/trickl/flux/websocket/TextWebSocketFluxClient.java b/src/main/java/com/trickl/flux/websocket/TextWebSocketFluxClient.java index 6b7b3a3..c269961 100644 --- a/src/main/java/com/trickl/flux/websocket/TextWebSocketFluxClient.java +++ b/src/main/java/com/trickl/flux/websocket/TextWebSocketFluxClient.java @@ -1,7 +1,6 @@ package com.trickl.flux.websocket; import java.net.URI; -import java.util.function.Supplier; import lombok.RequiredArgsConstructor; import lombok.extern.java.Log; import org.reactivestreams.Publisher; diff --git a/src/main/java/com/trickl/flux/websocket/WebSocketRequestRouter.java b/src/main/java/com/trickl/flux/websocket/WebSocketRequestRouter.java index ad08476..a88a4a6 100644 --- a/src/main/java/com/trickl/flux/websocket/WebSocketRequestRouter.java +++ b/src/main/java/com/trickl/flux/websocket/WebSocketRequestRouter.java @@ -1,7 +1,7 @@ package com.trickl.flux.websocket; import com.trickl.exceptions.SubscriptionFailedException; -import com.trickl.flux.publishers.SimpMessagingPublisher; +import com.trickl.flux.consumers.SimpMessageSender; import java.security.Principal; import java.text.MessageFormat; @@ -71,16 +71,17 @@ protected void subscribe(Principal user, String destination, String subscription .orElseThrow(() -> new SubscriptionFailedException( MessageFormat.format("Destination: {0} not found.", destination))); - Flux flux = this.fluxes.computeIfAbsent(webSocketRequest, fluxFactory::apply) + Flux connectableFlux = this.fluxes.computeIfAbsent( + webSocketRequest, fluxFactory::apply) .orElseThrow(() -> new SubscriptionFailedException( - MessageFormat.format("Destination: {0} not found.", destination))); - - SimpMessagingPublisher broadcaster = new SimpMessagingPublisher<>( - flux, messagingTemplate, destination); + MessageFormat.format("Destination: {0} not found.", destination))) + .doOnNext(new SimpMessageSender<>(messagingTemplate, destination)) + .publish() + .refCount(); - Disposable subscription = Flux.from(broadcaster.get()) + Disposable subscription = connectableFlux .doOnCancel(() -> this.fluxes.remove(webSocketRequest)) - .doOnComplete(() -> this.fluxes.remove(webSocketRequest)) + .doOnTerminate(() -> this.fluxes.remove(webSocketRequest)) .subscribe(); this.subscriptions.put(subscriptionId, subscription); diff --git a/src/main/java/com/trickl/flux/websocket/stomp/StompFluxClient.java b/src/main/java/com/trickl/flux/websocket/stomp/StompFluxClient.java index e5d6471..d396fb6 100644 --- a/src/main/java/com/trickl/flux/websocket/stomp/StompFluxClient.java +++ b/src/main/java/com/trickl/flux/websocket/stomp/StompFluxClient.java @@ -2,7 +2,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.trickl.flux.transformers.ThrowableMapTransformer; +import com.trickl.flux.mappers.ThrowableMapper; import com.trickl.flux.websocket.stomp.StompFrame; import com.trickl.flux.websocket.stomp.frames.StompConnectedFrame; import com.trickl.flux.websocket.stomp.frames.StompMessageFrame; @@ -62,7 +62,7 @@ public class StompFluxClient { * Connect to the stomp transport. */ public void connect() { - if (isConnected.get() && isConnecting.compareAndSet(false, true)) { + if (isConnected.get() || !isConnecting.compareAndSet(false, true)) { // Already connected return; } @@ -153,14 +153,12 @@ public Flux subscribe(String destination, Class messageType) { connect(); subscriptionDestinationIdMap.computeIfAbsent(destination, this::subscribeDestination); - ThrowableMapTransformer messageTransformer = - new ThrowableMapTransformer<>(frame -> readStompMessageFrame(frame, messageType)); - Flux messageFrameFlux = sharedStream.get() .filter(frame -> frame.getDestination().equals(destination)) .doOnTerminate(() -> unsubscribe(destination)); - return messageTransformer.apply(messageFrameFlux); + return messageFrameFlux.flatMap( + new ThrowableMapper<>(frame -> readStompMessageFrame(frame, messageType))); } protected void unsubscribe(String destination) { diff --git a/src/test/java/com/trickl/flux/publishers/SimpMessagingPublisherTest.java b/src/test/java/com/trickl/flux/consumers/SimpMessageSenderTest.java similarity index 76% rename from src/test/java/com/trickl/flux/publishers/SimpMessagingPublisherTest.java rename to src/test/java/com/trickl/flux/consumers/SimpMessageSenderTest.java index 2f510c4..8ba6a73 100644 --- a/src/test/java/com/trickl/flux/publishers/SimpMessagingPublisherTest.java +++ b/src/test/java/com/trickl/flux/consumers/SimpMessageSenderTest.java @@ -1,4 +1,4 @@ -package com.trickl.flux.publishers; +package com.trickl.flux.consumers; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; @@ -16,10 +16,11 @@ import org.springframework.messaging.simp.SimpMessagingTemplate; import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.Flux; import reactor.test.publisher.TestPublisher; @RunWith(MockitoJUnitRunner.class) -public class SimpMessagingPublisherTest { +public class SimpMessageSenderTest { @Mock private SimpMessagingTemplate messagingTemplate; @@ -35,7 +36,10 @@ public void init() { public void testDoesNothingWithoutSubscribers() { TestPublisher input = TestPublisher.create(); - new SimpMessagingPublisher<>(input, messagingTemplate, DESTINATION).get(); + Publisher output = Flux.from(input) + .doOnNext(new SimpMessageSender<>(messagingTemplate, DESTINATION)) + .publish() + .refCount(); input.next("a", "a", "a"); @@ -45,11 +49,13 @@ public void testDoesNothingWithoutSubscribers() { @Test public void testBroadcastsOnceWithOneSubscriber() { TestPublisher input = TestPublisher.create(); - Publisher broadcaster = - new SimpMessagingPublisher<>(input, messagingTemplate, DESTINATION).get(); + Publisher output = Flux.from(input) + .doOnNext(new SimpMessageSender<>(messagingTemplate, DESTINATION)) + .publish() + .refCount(); input.next("a", "a", "a"); - broadcaster.subscribe(new BaseSubscriber() {}); + output.subscribe(new BaseSubscriber() {}); input.next("b", "b"); verify(messagingTemplate, never()).convertAndSend(DESTINATION, "a"); @@ -59,18 +65,20 @@ public void testBroadcastsOnceWithOneSubscriber() { @Test public void testBroadcastsOnceWithMultipleSubscriber() { TestPublisher input = TestPublisher.create(); - Publisher broadcaster = - new SimpMessagingPublisher<>(input, messagingTemplate, DESTINATION).get(); + Publisher output = Flux.from(input) + .doOnNext(new SimpMessageSender<>(messagingTemplate, DESTINATION)) + .publish() + .refCount(); input.next("a", "a", "a"); Subscriber firstSubscriber = new BaseSubscriber() {}; - broadcaster.subscribe(firstSubscriber); + output.subscribe(firstSubscriber); input.next("b", "b"); Subscriber secondSubscriber = new BaseSubscriber() {}; - broadcaster.subscribe(secondSubscriber); + output.subscribe(secondSubscriber); input.next("c", "c", "c", "c"); @@ -87,13 +95,15 @@ public void testBroadcastsOnceWithMultipleSubscriber() { @Test public void testStopsBroadcastingWithoutSubscribers() { TestPublisher input = TestPublisher.create(); - Publisher broadcaster = - new SimpMessagingPublisher<>(input, messagingTemplate, DESTINATION).get(); + Publisher output = Flux.from(input) + .doOnNext(new SimpMessageSender<>(messagingTemplate, DESTINATION)) + .publish() + .refCount(); input.next("a", "a", "a"); BaseSubscriber subscriber = new BaseSubscriber() {}; - broadcaster.subscribe(subscriber); + output.subscribe(subscriber); input.next("b", "b"); @@ -110,18 +120,20 @@ public void testStopsBroadcastingWithoutSubscribers() { @Test public void testStopsBroadcastingWithoutSubscribersScenario2() { TestPublisher input = TestPublisher.create(); - Publisher broadcaster = - new SimpMessagingPublisher<>(input, messagingTemplate, DESTINATION).get(); + Publisher output = Flux.from(input) + .doOnNext(new SimpMessageSender<>(messagingTemplate, DESTINATION)) + .publish() + .refCount(); input.next("a", "a", "a"); BaseSubscriber firstSubscriber = new BaseSubscriber() {}; - broadcaster.subscribe(firstSubscriber); + output.subscribe(firstSubscriber); input.next("b", "b"); BaseSubscriber secondSubscriber = new BaseSubscriber() {}; - broadcaster.subscribe(secondSubscriber); + output.subscribe(secondSubscriber); input.next("c", "c", "c", "c"); @@ -145,13 +157,15 @@ public void testStopsBroadcastingWithoutSubscribersScenario2() { @Test public void testRebroadcastingOnResubscribers() { TestPublisher input = TestPublisher.create(); - Publisher broadcaster = - new SimpMessagingPublisher<>(input, messagingTemplate, DESTINATION).get(); + Publisher output = Flux.from(input) + .doOnNext(new SimpMessageSender<>(messagingTemplate, DESTINATION)) + .publish() + .refCount(); input.next("a", "a", "a"); BaseSubscriber firstSubscriber = new BaseSubscriber() {}; - broadcaster.subscribe(firstSubscriber); + output.subscribe(firstSubscriber); input.next("b", "b"); @@ -161,7 +175,7 @@ public void testRebroadcastingOnResubscribers() { input.next("c", "c", "c", "c"); BaseSubscriber secondSubscriber = new BaseSubscriber() {}; - broadcaster.subscribe(secondSubscriber); + output.subscribe(secondSubscriber); input.next("d", "d", "d"); diff --git a/src/test/java/com/trickl/flux/transformers/DifferentialTransformerTest.java b/src/test/java/com/trickl/flux/mappers/DifferentialMapperTest.java similarity index 67% rename from src/test/java/com/trickl/flux/transformers/DifferentialTransformerTest.java rename to src/test/java/com/trickl/flux/mappers/DifferentialMapperTest.java index 8a270ee..c5483f8 100644 --- a/src/test/java/com/trickl/flux/transformers/DifferentialTransformerTest.java +++ b/src/test/java/com/trickl/flux/mappers/DifferentialMapperTest.java @@ -1,20 +1,22 @@ -package com.trickl.flux.transformers; +package com.trickl.flux.mappers; import org.junit.Test; import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; -public class DifferentialTransformerTest { - DifferentialTransformer dashSeparateString = - new DifferentialTransformer<>((a, b) -> a + "-" + b); +public class DifferentialMapperTest { + DifferentialMapper dashSeparateString = + new DifferentialMapper<>((a, b) -> Mono.just(a + "-" + b), "null"); @Test public void correctOutputWithZeroInputs() { Publisher input = TestPublisher.create(); - Publisher output = dashSeparateString.apply(input); + Publisher output = Flux.from(input).flatMap(dashSeparateString); StepVerifier.create(output) .expectComplete(); @@ -26,7 +28,7 @@ public void correctOutputWithOneInputs() { .next("a"); - Publisher output = dashSeparateString.apply(input); + Publisher output = Flux.from(input).flatMap(dashSeparateString); StepVerifier.create(output) .expectNext("null-a") @@ -44,7 +46,7 @@ public void correctOutputWithMultipleInputs() { .next("e"); - Publisher output = dashSeparateString.apply(input); + Publisher output = Flux.from(input).flatMap(dashSeparateString); StepVerifier.create(output) .expectNext("null-a") diff --git a/src/test/java/com/trickl/flux/publishers/DifferentialPollPublisherTest.java b/src/test/java/com/trickl/flux/mappers/PollerTest.java similarity index 77% rename from src/test/java/com/trickl/flux/publishers/DifferentialPollPublisherTest.java rename to src/test/java/com/trickl/flux/mappers/PollerTest.java index 58118e3..43fef4d 100644 --- a/src/test/java/com/trickl/flux/publishers/DifferentialPollPublisherTest.java +++ b/src/test/java/com/trickl/flux/mappers/PollerTest.java @@ -1,4 +1,4 @@ -package com.trickl.flux.publishers; +package com.trickl.flux.mappers; import java.text.MessageFormat; import java.time.Duration; @@ -15,18 +15,18 @@ import reactor.test.publisher.TestPublisher; @RunWith(MockitoJUnitRunner.class) -public class DifferentialPollPublisherTest { +public class PollerTest { private static final Instant START_TIME = Instant.parse("2019-01-01T00:00:00Z"); private class TestPoller { public Publisher getDaysSinceLast(Instant from, Instant to) { - if (from != null && to != null) { + if (from != Instant.MIN && to != Instant.MIN) { long days = ChronoUnit.DAYS.between(from, to); return Mono.just(MessageFormat.format("{0} days", new Object[] {days})); - } else if (from != null) { + } else if (from != Instant.MIN) { return Mono.just(MessageFormat.format("from {0}", from)); - } else if (to != null) { + } else if (to != Instant.MIN) { return Mono.just(MessageFormat.format("to {0}", to)); } return Flux.empty(); @@ -47,10 +47,8 @@ public void init() { public void testSubscribesCorrectly() { TestPublisher instantPublisher = TestPublisher.create(); - DifferentialPollPublisher publisher = - new DifferentialPollPublisher<>( - instantPublisher, poller::getDaysSinceLast); - Flux output = publisher.get(); + Flux output = Flux.concat(instantPublisher, Mono.just(Instant.MIN)) + .flatMap(new DifferentialMapper<>(poller::getDaysSinceLast, Instant.MIN)); StepVerifier.create(output) .then(instantPublisher::complete) @@ -62,10 +60,8 @@ public void testSubscribesCorrectly() { public void testGeneratesFirstCorrectly() { TestPublisher instantPublisher = TestPublisher.create(); - DifferentialPollPublisher publisher = - new DifferentialPollPublisher<>( - instantPublisher, poller::getDaysSinceLast); - Flux output = publisher.get(); + Flux output = Flux.concat(instantPublisher, Mono.just(Instant.MIN)) + .flatMap(new DifferentialMapper<>(poller::getDaysSinceLast, Instant.MIN)); StepVerifier.create(output) .then(() -> instantPublisher.next(START_TIME)) @@ -80,10 +76,8 @@ public void testGeneratesFirstCorrectly() { public void testGeneratesMultipleCorrectly() { TestPublisher instantPublisher = TestPublisher.create(); - DifferentialPollPublisher publisher = - new DifferentialPollPublisher<>( - instantPublisher, poller::getDaysSinceLast); - Flux output = publisher.get(); + Flux output = Flux.concat(instantPublisher, Mono.just(Instant.MIN)) + .flatMap(new DifferentialMapper<>(poller::getDaysSinceLast, Instant.MIN)); StepVerifier.create(output) .then(() -> instantPublisher.next(START_TIME))