Skip to content

Commit

Permalink
Add a general purpose collecting method to ReadStream to facilitate t…
Browse files Browse the repository at this point in the history
…he reduction of streams. The collecting method is a default method.
  • Loading branch information
vietj committed May 22, 2024
1 parent 79c06b4 commit c0299ad
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 2 deletions.
12 changes: 12 additions & 0 deletions src/main/asciidoc/streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,15 @@ returns `true` if the write queue is considered full.
Will be called if an exception occurs on the `WriteStream`.
- {@link io.vertx.core.streams.WriteStream#drainHandler}:
The handler will be called if the `WriteStream` is considered no longer full.

=== Reducing streams

Java collectors can reduce a `ReadStream` to a result in a similar fashion `java.util.Stream` does, yet in an asynchronous
fashion.

[source,$lang]
----
{@link examples.StreamsExamples#reduce1}
----

Note that `collect` overrides any previously handler set on the stream.
12 changes: 11 additions & 1 deletion src/main/java/examples/StreamsExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package examples;

import io.vertx.core.Handler;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
Expand All @@ -20,6 +20,9 @@
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.streams.Pipe;
import io.vertx.core.streams.ReadStream;

import java.util.stream.Collectors;

/**
* @author <a href="mailto:[email protected]">Julien Viet</a>
Expand Down Expand Up @@ -164,4 +167,11 @@ public void pipe9(AsyncFile src, AsyncFile dst) {
dst.end(Buffer.buffer("done"));
});
}

public <T> void reduce1(ReadStream<T> stream) {
// Count the number of elements
Future<Long> result = stream.collect(Collectors.counting());

result.onSuccess(count -> System.out.println("Stream emitted " + count + " elements"));
}
}
26 changes: 25 additions & 1 deletion src/main/java/io/vertx/core/streams/ReadStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
package io.vertx.core.streams;

import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.streams.impl.PipeImpl;

import java.util.function.BiConsumer;

/**
* Represents a stream of items that can be read from.
* <p>
Expand Down Expand Up @@ -111,6 +114,27 @@ default Pipe<T> pipe() {
return new PipeImpl<>(this);
}

/**
* Apply a {@code collector} to this stream, the obtained result is returned as a future.
* <p/>
* Handlers of this stream are affected by this operation.
*
* @return a future notified with result produced by the {@code collector} applied to this stream
*/
@GenIgnore(GenIgnore.PERMITTED_TYPE)
default <R, A> Future<R> collect(java.util.stream.Collector<T , A , R> collector) {
PromiseInternal<R> promise = (PromiseInternal<R>) Promise.promise();
A cumulation = collector.supplier().get();
BiConsumer<A, T> accumulator = collector.accumulator();
handler(elt -> accumulator.accept(cumulation, elt));
endHandler(v -> {
R result = collector.finisher().apply(cumulation);
promise.tryComplete(result);
});
exceptionHandler(promise::tryFail);
return promise.future();
}

/**
* Pipe this {@code ReadStream} to the {@code WriteStream}.
* <p>
Expand Down
62 changes: 62 additions & 0 deletions src/test/java/io/vertx/core/streams/ReadStreamReduceTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.streams;

import io.vertx.core.Future;
import io.vertx.test.core.AsyncTestBase;
import io.vertx.test.fakestream.FakeStream;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class ReadStreamReduceTest extends AsyncTestBase {

private FakeStream<Object> dst;
private Object o1 = new Object();
private Object o2 = new Object();
private Object o3 = new Object();

@Override
protected void setUp() throws Exception {
super.setUp();
dst = new FakeStream<>();
}

@Test
public void testCollect() {
Future<List<Object>> list = dst.collect(Collectors.toList());
assertFalse(list.isComplete());
dst.write(o1);
assertFalse(list.isComplete());
dst.write(o2);
assertFalse(list.isComplete());
dst.write(o3);
dst.end();
assertTrue(list.succeeded());
assertEquals(Arrays.asList(o1, o2, o3), list.result());
}

@Test
public void testFailure() {
Future<List<Object>> list = dst.collect(Collectors.toList());
assertFalse(list.isComplete());
dst.write(o1);
assertFalse(list.isComplete());
dst.write(o2);
assertFalse(list.isComplete());
Throwable err = new Throwable();
dst.fail(err);
assertTrue(list.failed());
assertSame(err, list.cause());
}
}

0 comments on commit c0299ad

Please sign in to comment.