From 1c08b6a896995d5e02a3599529cac2f5d0c2a303 Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Sun, 20 Jun 2021 20:40:49 +0200 Subject: [PATCH] Revert BEP refactor (#3130) * Revert "BEP refactor (#2285)" - This reverts commit 4348c9422e32cc7100313f4dbc00fe9dc19c1800. * Test BEP in flatmap * Revert copyright year Signed-off-by: Daniel Kec --- .../common/reactive/BiConsumerChain.java | 21 +- .../reactive/BufferedEmittingPublisher.java | 488 +++++++----------- .../BufferedEmittingPublisherTest.java | 32 +- .../helidon/common/reactive/EmitterTest.java | 22 +- .../helidon/webclient/NettyClientHandler.java | 8 +- .../webserver/HttpRequestScopedPublisher.java | 6 +- 6 files changed, 243 insertions(+), 334 deletions(-) diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/BiConsumerChain.java b/common/reactive/src/main/java/io/helidon/common/reactive/BiConsumerChain.java index 2af71b538bb..90140e3f5f2 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/BiConsumerChain.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/BiConsumerChain.java @@ -30,6 +30,13 @@ public void accept(T t, S s) { } } + BiConsumerChain combineWith(BiConsumer another) { + BiConsumerChain newChain = new BiConsumerChain<>(); + newChain.addAll(this); + newChain.add(another); + return newChain; + } + static BiConsumer combine( BiConsumer current, BiConsumer another) { @@ -39,18 +46,12 @@ static BiConsumer combine( if (another == null) { return current; } - BiConsumerChain newChain = new BiConsumerChain<>(); if (current instanceof BiConsumerChain) { - newChain.addAll((BiConsumerChain) current); - } else { - newChain.add(current); - } - - if (another instanceof BiConsumerChain) { - newChain.addAll((BiConsumerChain) another); - } else { - newChain.add(another); + return ((BiConsumerChain) current).combineWith(another); } + BiConsumerChain newChain = new BiConsumerChain<>(); + newChain.add(current); + newChain.add(another); return newChain; } } diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/BufferedEmittingPublisher.java b/common/reactive/src/main/java/io/helidon/common/reactive/BufferedEmittingPublisher.java index e2fbb29b58f..1f215a7e7a1 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/BufferedEmittingPublisher.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/BufferedEmittingPublisher.java @@ -16,10 +16,12 @@ package io.helidon.common.reactive; +import java.util.Objects; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Flow; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -34,31 +36,16 @@ */ public class BufferedEmittingPublisher implements Flow.Publisher { + private final AtomicReference state = new AtomicReference<>(State.READY_TO_EMIT); private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); - private volatile Throwable error; + private final EmittingPublisher emitter = new EmittingPublisher<>(); + private final AtomicLong deferredDrains = new AtomicLong(0); + private final AtomicBoolean draining = new AtomicBoolean(false); + private final AtomicBoolean subscribed = new AtomicBoolean(false); + private final AtomicReference error = new AtomicReference<>(); private BiConsumer requestCallback = null; private Consumer onEmitCallback = null; - private Consumer onCleanup = null; - private Consumer onAbort = null; - private volatile Flow.Subscriber subscriber; - // state: two bits, b1 b0, tell: - // b0: 0/1 is not started/started (a subscriber arrived) - // b1: 0/1 is not stopped/stopped (a publisher completed) - // You can start and stop asynchronously and in any order - private final AtomicInteger state = new AtomicInteger(); - // assert: contenders is initially non-zero, so nothing can be done until onSubscribe has - // been signalled; observe drain() after onSubscribe - private final AtomicInteger contenders = new AtomicInteger(1); - private final AtomicLong requested = new AtomicLong(); - // assert: ignorePending is set to enter terminal state as soon as possible: behave like - // the buffer is empty - private volatile boolean ignorePending; - - // assert: emitted is accessed single-threadedly - private long emitted; - // assert: observing cancelled, but not ignorePending, is possible only if cancel() races - // against a completion (isCancelled() and isComplete() are both true) - private boolean cancelled; + private boolean safeToSkipBuffer = false; protected BufferedEmittingPublisher() { } @@ -74,39 +61,26 @@ public static BufferedEmittingPublisher create() { } @Override - public void subscribe(final Flow.Subscriber sub) { - if (stateChange(1)) { - MultiError.create(new IllegalStateException("Only single subscriber is allowed!")) - .subscribe(sub); + public void subscribe(final Flow.Subscriber subscriber) { + Objects.requireNonNull(subscriber, "subscriber is null"); + + if (!subscribed.compareAndSet(false, true)) { + subscriber.onSubscribe(SubscriptionHelper.CANCELED); + subscriber.onError(new IllegalStateException("Only single subscriber is allowed!")); return; } - sub.onSubscribe(new Flow.Subscription() { - public void request(long n) { - if (n < 1) { - abort(new IllegalArgumentException("Expected request() with a positive increment")); - return; - } - long curr; - do { - curr = requested.get(); - } while (curr != Long.MAX_VALUE - && !requested.compareAndSet(curr, Long.MAX_VALUE - curr > n ? curr + n : Long.MAX_VALUE)); - if (requestCallback != null) { - requestCallback.accept(n, curr); - } - maybeDrain(); - } - - public void cancel() { - cancelled = true; - ignorePending = true; - maybeDrain(); - abort(null); + emitter.onSubscribe(() -> state.get().drain(this)); + emitter.onRequest((n, cnt) -> { + if (requestCallback != null) { + requestCallback.accept(n, cnt); } + state.get().drain(this); }); - subscriber = sub; - drain(); // assert: contenders lock is already acquired + emitter.onCancel(() -> state.compareAndSet(State.READY_TO_EMIT, State.CANCELLED)); + + // subscriber is already validated + emitter.unsafeSubscribe(subscriber); } /** @@ -120,7 +94,11 @@ public void cancel() { * @param requestCallback to be executed */ public void onRequest(BiConsumer requestCallback) { - this.requestCallback = BiConsumerChain.combine(this.requestCallback, requestCallback); + if (this.requestCallback == null) { + this.requestCallback = requestCallback; + } else { + this.requestCallback = BiConsumerChain.combine(this.requestCallback, requestCallback); + } } /** @@ -132,188 +110,71 @@ public void onRequest(BiConsumer requestCallback) { * @param onEmitCallback to be executed */ public void onEmit(Consumer onEmitCallback) { - this.onEmitCallback = ConsumerChain.combine(this.onEmitCallback, onEmitCallback); - } - - /** - * Callback executed to clean up the buffer, when the Publisher terminates without passing - * ownership of buffered items to anyone (fail, completeNow, or the Subscription is cancelled). - *

- * Use case: items buffered require handling their lifecycle, like releasing resources, or - * returning to a pool. - *

- * Calling onCleanup multiple times will ensure that each of the provided Consumers gets a - * chance to look at the items in the buffer. Usually you do not want to release the same - * resource to a pool more than once, so you should usually want to ensure you pass one and - * only one callback to onCleanup. For this reason, do not use together with clearBuffer, - * unless you know how to have idempotent resource lifecycle management. - * - * @param onCleanup callback executed to clean up the buffer - */ - public void onCleanup(Consumer onCleanup) { - this.onCleanup = ConsumerChain.combine(this.onCleanup, onCleanup); - } - - /** - * Callback executed when this Publisher fails or is cancelled in a way that the entity performing - * emit() may be unaware of. - *

- * Use case: emit() is issued only if onRequest is received; these will cease upon a failed request - * or when downstream requests cancellation. onAbort is going to let the entity issuing emit() - * know that no more onRequest are forthcoming (albeit they may still happen, the items emitted - * after onAbort will likely be discarded, and not emitted items will not be missed). - *

- * In essence the pair of onRequest and onAbort make up the interface like that of a Processor's - * Subscription's request and cancel. The difference is only the API and the promise: we allow - * emit() to not heed backpressure (for example, when upstream is really unable to heed - * backpressure without introducing a buffer of its own, like is the case with many transformations - * of the form Publisher<T>->Publisher<Publisher<T>>). - *

- * In the same vein there really is no restriction as to when onAbort callback can be called - there - * is no requirement for this Publisher to establish exactly whether the entity performing emit() - * is aware of the abort (say, a fail), or not. It is only required to ensure that the failures it - * generates (and not merely forwards to downstream) and cancellations it received, get propagated - * to the callback. - * - * @param onAbort callback executed when this Publisher fails or is cancelled - */ - public void onAbort(Consumer onAbort) { - this.onAbort = ConsumerChain.combine(this.onAbort, onAbort); - } - - private void abort(Throwable th) { - if (th != null) { - fail(th); - } - if (onAbort != null) { - onAbort.accept(th); + if (this.onEmitCallback == null) { + this.onEmitCallback = onEmitCallback; + } else { + this.onEmitCallback = ConsumerChain.combine(this.onEmitCallback, onEmitCallback); } } /** * Emit item to the stream, if there is no immediate demand from downstream, * buffer item for sending when demand is signaled. - * No-op after downstream enters terminal state. (Cancelled subscription or received onError/onComplete) * * @param item to be emitted + * @return actual size of the buffer, value should be used as informative and can change asynchronously + * @throws IllegalStateException if cancelled, completed of failed */ - public void emit(final T item) { - boolean locked = false; - int s = state.get(); - if (s == 1) { - // assert: attempt fast path only if started, and not stopped - locked = contenders.get() == 0 && contenders.compareAndSet(0, 1); - } - - // assert: this condition is the same as the loop on slow path in drain(), except the buffer - // isEmpty - the condition when we can skip adding, and immediately removing the item - // from the buffer, without loss of FIFO order. - if (locked && !ignorePending && requested.get() > emitted && buffer.isEmpty()) { - try { - subscriber.onNext(item); - if (onEmitCallback != null) { - onEmitCallback.accept(item); - } - emitted++; - } catch (RuntimeException re) { - // assert: fail is re-entrant (will succeed even while the contenders lock has been acquired) - abort(re); - } finally { - drain(); - } - return; - } - - // assert: if ignorePending, buffer cleanup will happen in the future - buffer.add(item); - if (locked) { - drain(); - } else { - maybeDrain(); - } + public int emit(final T item) { + return state.get().emit(this, item); } /** * Send {@code onError} signal downstream, regardless of the buffer content. * Nothing else can be sent downstream after calling fail. - * No-op after downstream enters terminal state. (Cancelled subscription or received onError/onComplete) - *

- * If several fail are invoked in quick succession or concurrently, no guarantee - * which of them ends up sent to downstream. + * {@link BufferedEmittingPublisher#emit(Object)} throws {@link IllegalStateException} after calling fail. * * @param throwable Throwable to be sent downstream as onError signal. */ public void fail(Throwable throwable) { - // assert: delivering a completion signal discarding the whole buffer takes precedence over normal - // completion - that is, if complete() has been called, but onComplete has not been delivered - // yet, onError will be signalled instead, discarding the entire buffer. - // Otherwise the downstream may not be able to establish orderly processing: fail() can be - // forced as part of a borken request(), failed onNext, onRequest or onEmit callbacks. These - // indicate the conditions where downstream may not reach a successful request() or cancel, - // thus blocking the progress of the Publisher. - error = throwable; - completeNow(); + error.set(throwable); + if (state.compareAndSet(State.READY_TO_EMIT, State.FAILED)) { + emitter.fail(throwable); + } } /** - * Send onComplete to downstream after it consumes the entire buffer. Intervening fail invocations - * can end up sending onError instead of onComplete. - * No-op after downstream enters terminal state. (Cancelled subscription or received onError/onComplete) + * Drain the buffer, in case of not sufficient demands wait for more requests, + * then send {@code onComplete} signal to downstream. + * {@link BufferedEmittingPublisher#emit(Object)} throws {@link IllegalStateException} after calling complete. */ public void complete() { - // assert: transition the state to stopped, and see if it is started; if not started, maybeDrain is futile - // assert: if cancelled can be observed, let's not race against it to change the state - let the state - // remain cancelled; this does not preclude the possibility of isCancelled switching to false, just makes - // it a little more predictable in single-threaded cases - // assert: even if cancelled, enter maybeDrain to ensure the cleanup occurs (complete is entrant from - // completeNow and fail) - if (cancelled || stateChange(2)) { - maybeDrain(); + if (state.compareAndSet(State.READY_TO_EMIT, State.COMPLETING)) { + //drain buffer then complete + State.READY_TO_EMIT.drain(this); } } - private boolean stateChange(int s) { - int curr; - do { - curr = state.get(); - } while ((curr & s) != s && !state.compareAndSet(curr, curr + s)); - return (curr & 1) > 0; - } - /** * Send {@code onComplete} signal downstream immediately, regardless of the buffer content. * Nothing else can be sent downstream after calling {@link BufferedEmittingPublisher#completeNow()}. - * No-op after downstream enters terminal state. (Cancelled subscription or received onError/onComplete) + * {@link BufferedEmittingPublisher#emit(Object)} throws {@link IllegalStateException} after calling completeNow. */ public void completeNow() { - ignorePending = true; - complete(); + if (state.compareAndSet(State.READY_TO_EMIT, State.COMPLETED)) { + emitter.complete(); + } } /** * Clear whole buffer, invoke consumer for each item before discarding it. - * Use case: items in the buffer require discarding properly, freeing up some resources, or returning them - * to a pool. - *

- * It is the caller's responsibility to ensure there are no concurrent invocations of clearBuffer, and - * that there will be no emit calls in the future, as the items processed by those invocations may not be - * consumed properly. - *

- * It is recommended that onCleanup is set up instead of using clearBuffer. Do not use together with onCleanup. * * @param consumer to be invoked for each item */ public void clearBuffer(Consumer consumer) { - // I recommend deprecating this method altogether - - // Accessing buffer concurrently with drain() is inherently broken: everyone assumes that if buffer - // is not empty, then buffer.poll() returns non-null value (this promise is broken), and everyone - // assumes that polling buffer returns items in FIFO order (this promise is broken). - //while (!buffer.isEmpty()) { - // consumer.accept(buffer.poll()); - //} - onCleanup(consumer); - completeNow(); // this is the current behaviour + while (!buffer.isEmpty()) { + consumer.accept(buffer.poll()); + } } /** @@ -322,7 +183,7 @@ public void clearBuffer(Consumer consumer) { * @return true if so */ public boolean isUnbounded() { - return requested.get() == Long.MAX_VALUE; + return this.emitter.isUnbounded(); } /** @@ -332,54 +193,28 @@ public boolean isUnbounded() { * @return true if demand is higher than 0 */ public boolean hasRequests() { - return requested.get() > emitted; + return this.emitter.hasRequests(); } /** * Check if publisher sent {@code onComplete} signal downstream. * Returns {@code true} right after calling {@link BufferedEmittingPublisher#completeNow()} - * (with a caveat) * but after calling {@link BufferedEmittingPublisher#complete()} returns * {@code false} until whole buffer has been drained. - *

- * The caveat is that completeNow() does not guarantee that the onComplete signal is sent - * before returning from completeNow() - it is only guaranteed to be sent as soon as it can be done. * * @return true if so */ public boolean isCompleted() { - // The caveat above means only that the current implementation guarantees onComplete is sent - // before completeNow returns in single-threaded cases. When concurrent emit() or request() - // race against completeNow, completeNow may return without entering drain() - but the concurrent - // calls guarantee onComplete will be called as soon as they observe the buffer is empty. - // - // We don't want to say this in the public documentation as this is implementation detail. - // - // A subtle logical statement: if onError or onComplete has been signalled to downstream, - // isCompleted is true. But it is also true if cancel() precluded sending the signal - // to downstream. - // - // The current implementation is: isCompleted is true if and only if no more onNext signals - // will be sent to downstream and no cancellation was requested. - // - // assert: once isCompleted becomes true, it stays true - // question: what should it be, if complete() was called, but not onSubscribe()? - return buffer.isEmpty() && state.get() > 1; + return this.state.get() == State.COMPLETED; } /** * Check if publisher is in terminal state CANCELLED. - *

- * It is for information only. It is not guaranteed to tell what happened to downstream, if there - * were a concurrent cancellation and a completion. * * @return true if so */ public boolean isCancelled() { - // a stricter logic can be implemented, but is the complication warranted? - - // assert: once isCancelled becomes true, isCancelled || isCompleted stays true - return ignorePending && cancelled && !isCompleted(); + return this.state.get() == State.CANCELLED; } /** @@ -392,95 +227,154 @@ public int bufferSize() { return buffer.size(); } - /** - * Override, if you prefer to do cleanup in a uniform way, instead of requiring everyone - * to register a onCleanup. - *

- * Use case: a subclass that offers an implementation of BufferedEmittingPublisher<T> for - * a certain type of resource T. - */ - protected void cleanup() { - if (onCleanup == null) { - buffer.clear(); - } else { - while (!buffer.isEmpty()) { - onCleanup.accept(buffer.poll()); + private void drainBuffer() { + deferredDrains.incrementAndGet(); + + long drains; + do { + if (draining.getAndSet(true)) { + //other thread already draining + return; } - } + drains = deferredDrains.getAndUpdate(d -> d == 0 ? 0 : d - 1); + if (drains > 0) { + // in case of parallel drains invoked by request + // increasing demand during draining + actualDrain(); + drains--; + } + draining.set(false); + // changed while draining, try again + } while (drains < deferredDrains.get()); } - private void maybeDrain() { - // assert: if not started, will not post too many emit() and complete() to overflow the - // counter - if (contenders.getAndIncrement() == 0) { - drain(); + private void actualDrain() { + while (!buffer.isEmpty()) { + if (emitter.emit(buffer.peek())) { + if (onEmitCallback != null) { + onEmitCallback.accept(buffer.poll()); + } else { + buffer.poll(); + } + } else { + break; + } + } + if (buffer.isEmpty() + && state.compareAndSet(State.COMPLETING, State.COMPLETED)) { + // Buffer drained, time for complete + emitter.complete(); } } - // Key design principles: - // - all operations on downstream are executed whilst "holding the lock". - // The lock acquisition is the ability to transition the value of contenders from zero to 1. - // - any changes to state are followed by maybeDrain, so the thread inside drain() can notice - // that some state change has occurred: - // - ignorePending - // - error - // - cancelled - // - requested - // - buffer contents - private void drain() { - IllegalStateException ise = null; - for (int cont = 1; cont > 0; cont = contenders.addAndGet(-cont)) { - boolean terminateNow = ignorePending; + private int emitOrBuffer(T item) { + synchronized (this) { try { - while (!terminateNow && requested.get() > emitted && !buffer.isEmpty()) { - T item = buffer.poll(); - subscriber.onNext(item); + if (buffer.isEmpty() && emitter.emit(item)) { + // Buffer drained, emit successful + // saved time by skipping buffer if (onEmitCallback != null) { onEmitCallback.accept(item); } - emitted++; - terminateNow = ignorePending; + return 0; + } else { + // safe slower path thru buffer + buffer.add(item); + state.get().drain(this); + return buffer.size(); + } + } finally { + // If unbounded, check only once if buffer is empty + if (!safeToSkipBuffer && isUnbounded() && buffer.isEmpty()) { + safeToSkipBuffer = true; } - } catch (RuntimeException re) { - abort(re); } + } + } - if (terminateNow) { - cleanup(); + private int unboundedEmitOrBuffer(T item) { + // Not reachable unless unbounded req was made + // and buffer is empty + if (emitter.emit(item)) { + // Emit successful + if (onEmitCallback != null) { + onEmitCallback.accept(item); } + return 0; + } else { + // Emitter can be only in terminal state + // buffer for later retrieval by clearBuffer() + buffer.add(item); + return buffer.size(); + } + } - if (terminateNow || isCompleted()) { - try { - // assert: cleanup in finally - if (!cancelled) { - cancelled = true; - if (error != null) { - subscriber.onError(error); - } else { - subscriber.onComplete(); - } - } - } catch (Throwable th) { - // assert: catch all throwables, to ensure the lock is released properly - // and buffer cleanup remains reachable - // assert: this line is reachable only once: all subsequent iterations - // will observe cancelled == true - ise = new IllegalStateException(th); - } finally { - error = null; - subscriber = null; - requestCallback = null; - onEmitCallback = null; + + private enum State { + READY_TO_EMIT { + @Override + int emit(BufferedEmittingPublisher publisher, T item) { + if (publisher.safeToSkipBuffer) { + return publisher.unboundedEmitOrBuffer(item); } + return publisher.emitOrBuffer(item); } - } - if (ise != null) { - // assert: this violates the reactive spec, but this is what the tests expect. - // Observe that there is no guarantee where the exception will be thrown - - // it may happen during request(), which is expected to finish without - // throwing - throw ise; - } + @Override + void drain(final BufferedEmittingPublisher publisher) { + publisher.drainBuffer(); + } + }, + CANCELLED { + @Override + int emit(BufferedEmittingPublisher publisher, T item) { + throw new IllegalStateException("Emitter is cancelled!"); + } + + @Override + void drain(final BufferedEmittingPublisher publisher) { + //noop + } + }, + FAILED { + @Override + int emit(BufferedEmittingPublisher publisher, T item) { + throw new IllegalStateException("Emitter is in failed state!"); + } + + @Override + void drain(final BufferedEmittingPublisher publisher) { + //Can't happen twice, internal emitter keeps the state too + publisher.emitter.fail(publisher.error.get()); + } + }, + COMPLETING { + @Override + int emit(BufferedEmittingPublisher publisher, T item) { + throw new IllegalStateException("Emitter is completing!"); + } + + @Override + void drain(final BufferedEmittingPublisher publisher) { + State.READY_TO_EMIT.drain(publisher); + } + }, + COMPLETED { + @Override + int emit(BufferedEmittingPublisher publisher, T item) { + throw new IllegalStateException("Emitter is completed!"); + } + + @Override + void drain(final BufferedEmittingPublisher publisher) { + //Can't happen twice, internal emitter keeps the state too + publisher.emitter.complete(); + } + }; + + abstract int emit(BufferedEmittingPublisher publisher, T item); + + abstract void drain(BufferedEmittingPublisher publisher); + } } diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/BufferedEmittingPublisherTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/BufferedEmittingPublisherTest.java index d03e4382dad..70913e8ccf8 100644 --- a/common/reactive/src/test/java/io/helidon/common/reactive/BufferedEmittingPublisherTest.java +++ b/common/reactive/src/test/java/io/helidon/common/reactive/BufferedEmittingPublisherTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.CoreMatchers.equalTo; @@ -198,7 +199,8 @@ public void onNext(Long item) { publisher.emit(15L); assertThat(subscriber.isComplete(), is(equalTo(false))); assertThat(subscriber.getLastError(), is(not(nullValue()))); - assertThat(subscriber.getLastError(), is(instanceOf(UnsupportedOperationException.class))); // not sure why the rewrapping was required + assertThat(subscriber.getLastError(), is(instanceOf(IllegalStateException.class))); + assertThat(subscriber.getLastError().getCause(), is(instanceOf(UnsupportedOperationException.class))); } @Test @@ -236,8 +238,30 @@ public void onSubscribe(Subscription subscription) { } }; publisher.subscribe(subscriber); - publisher.emit(0L); - assertThat(publisher.bufferSize(), is(equalTo(0))); // not sure why throwing anything was done - it is an unsafe practice in a concurrent setting - assertThat(publisher.isCancelled(), is(equalTo(true))); + assertThrows(IllegalStateException.class, () -> publisher.emit(0L)); + } + + @Test + void flatMapping() { + final int STREAM_SIZE = 1_000_000; + AtomicInteger cnt = new AtomicInteger(); + ExecutorService exec = Executors.newFixedThreadPool(32); + Single promise = Multi.range(0, STREAM_SIZE) + .flatMap(it -> { + BufferedEmittingPublisher flatMapped = new BufferedEmittingPublisher<>(); + exec.submit(() -> { + flatMapped.emit(it); + flatMapped.complete(); + }); + return flatMapped; + }) + .forEach(unused -> cnt.incrementAndGet()); + + try { + promise.await(10, TimeUnit.SECONDS); + assertThat(cnt.get(), is(equalTo(STREAM_SIZE))); + } finally { + exec.shutdown(); + } } } diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/EmitterTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/EmitterTest.java index 2b29a53f642..bbe75035479 100644 --- a/common/reactive/src/test/java/io/helidon/common/reactive/EmitterTest.java +++ b/common/reactive/src/test/java/io/helidon/common/reactive/EmitterTest.java @@ -39,14 +39,10 @@ void testBackPressureWithCompleteNow() { TestSubscriber subscriber = new TestSubscriber<>(); emitter.subscribe(subscriber); - emitter.emit(0); - assertBufferSize(emitter.bufferSize(), 1); - emitter.emit(1); - assertBufferSize(emitter.bufferSize(), 2); - emitter.emit(2); - assertBufferSize(emitter.bufferSize(), 3); - emitter.emit(3); - assertBufferSize(emitter.bufferSize(), 4); + assertBufferSize(emitter.emit(0), 1); + assertBufferSize(emitter.emit(1), 2); + assertBufferSize(emitter.emit(2), 3); + assertBufferSize(emitter.emit(3), 4); subscriber .assertEmpty() @@ -56,12 +52,10 @@ void testBackPressureWithCompleteNow() { .assertItemCount(3) .assertNotTerminated(); - emitter.emit(4); - assertBufferSize(emitter.bufferSize(), 2); + assertBufferSize(emitter.emit(4), 2); emitter.completeNow(); - assertBufferSize(emitter.bufferSize(), 0); subscriber.requestMax() .assertValues(0, 1, 2) .assertComplete(); @@ -172,15 +166,13 @@ void testBackPressureWithLazyComplete() { .assertItemCount(3) .assertNotTerminated(); - emitter.emit(10); - assertThat(emitter.bufferSize(), is(equalTo(8))); + assertThat(emitter.emit(10), is(equalTo(8))); subscriber .request(3) .assertItemCount(6); - emitter.emit(11); - assertThat(emitter.bufferSize(), is(equalTo(6))); + assertThat(emitter.emit(11), is(equalTo(6))); subscriber.requestMax() .assertNotTerminated(); diff --git a/webclient/webclient/src/main/java/io/helidon/webclient/NettyClientHandler.java b/webclient/webclient/src/main/java/io/helidon/webclient/NettyClientHandler.java index d5325842a0a..dd3eb477d0b 100644 --- a/webclient/webclient/src/main/java/io/helidon/webclient/NettyClientHandler.java +++ b/webclient/webclient/src/main/java/io/helidon/webclient/NettyClientHandler.java @@ -286,12 +286,10 @@ private static final class HttpResponsePublisher extends BufferedEmittingPublish }); } - - - public void emit(final ByteBuf buf) { + public int emit(final ByteBuf buf) { buf.retain(); - super.emit(DataChunk.create(false, true, buf::release, - buf.nioBuffer().asReadOnlyBuffer())); + return super.emit(DataChunk.create(false, true, buf::release, + buf.nioBuffer().asReadOnlyBuffer())); } } diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/HttpRequestScopedPublisher.java b/webserver/webserver/src/main/java/io/helidon/webserver/HttpRequestScopedPublisher.java index 70604143ce5..b100db92d4c 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/HttpRequestScopedPublisher.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/HttpRequestScopedPublisher.java @@ -36,9 +36,9 @@ class HttpRequestScopedPublisher extends BufferedEmittingPublisher { this.holdingQueue = holdingQueue; } - public void emit(ByteBuf data) { + public int emit(ByteBuf data) { try { - super.emit(new ByteBufRequestChunk(data, holdingQueue)); + return super.emit(new ByteBufRequestChunk(data, holdingQueue)); } finally { holdingQueue.release(); } @@ -46,7 +46,7 @@ public void emit(ByteBuf data) { /** * Clear and release any {@link io.helidon.common.http.DataChunk DataChunk} hanging in - * the buffer. Try self-subscribe in case no one subscribed and unreleased {@link io.netty.buffer.ByteBuf ByteBufs} + * the buffer. Try self subscribe in case no one subscribed and unreleased {@link io.netty.buffer.ByteBuf ByteBufs} * are hanging in the netty pool. */ public void clearAndRelease() {