Skip to content

Commit

Permalink
Simplify writer generics
Browse files Browse the repository at this point in the history
  • Loading branch information
bitzl committed Mar 9, 2018
1 parent 8a7d613 commit 7a53dbe
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 15 deletions.
25 changes: 24 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,29 @@ dependencies {

Required libraries are Jackson and RabbitMQ Java API, the minimal Java version is 8 (will move to 9 in a few months). The Flusswerk engine itself *never requires Spring*, but there are examples how to integrate the engine in a Spring Boot Application.

## Migration from version 1.x to 2.x

Starting with version 2.0.0, the interface for flows sending many messages has been simplified. The writer now can use the message class generics directly:

```java
class Writer implements java.util.function.Function<T, Collection<? extends Message>> {
@Override
public Collection<? extends Message> apply(T value) {
// ...
}
}
```

gets now

```java
class Writer implements java.util.function.Function<T, Collection<Message>> {
@Override
public Collection<Message> apply(T value) {
// ...
}
}
```

## Basic setup

Expand Down Expand Up @@ -102,7 +125,7 @@ Depending if you want to want to send one message, multiple messages or no messa

- `flowBuilder.write(Consumer<T>)` processes values of type `T`, but does not send messages returned by the writer.
- `flowBuilder.writeAndSend(Function<T, Message>)` processes values of type `T`, and sends the message returned by the writer to the default output queue.
- `flowBuilder.writeAndSendMany(Function<T, List<? extends Message>>)` processes values of type `T`, and sends all messages in the list returned by the writer to the default output queue.
- `flowBuilder.writeAndSendMany(Function<T, List<Message>>)` processes values of type `T`, and sends all messages in the list returned by the writer to the default output queue.

It is always possible to use `MessageBroker.send("some.queue", Message)` anywhere to manually [send messages to arbitrary queues](#sending-messages-to-arbitrary-queues).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ public class Flow<M extends Message, R, W> {

private Supplier<Function<R, W>> transformerFactory;

private Supplier<Function<W, Collection<? extends Message>>> writerFactory;
private Supplier<Function<W, Collection<Message>>> writerFactory;

private Supplier<Consumer<W>> consumingWriterFactory;

public Flow(Supplier<Function<M, R>> readerFactory, Supplier<Function<R, W>> transformerFactory, Supplier<Function<W, Collection<? extends Message>>> writerFactory, Supplier<Consumer<W>> consumingWriterFactory) {
public Flow(Supplier<Function<M, R>> readerFactory, Supplier<Function<R, W>> transformerFactory, Supplier<Function<W, Collection<Message>>> writerFactory, Supplier<Consumer<W>> consumingWriterFactory) {
this.readerFactory = readerFactory;
this.transformerFactory = transformerFactory;
this.writerFactory = writerFactory;
this.consumingWriterFactory = consumingWriterFactory;
}

public Collection<? extends Message> process(M message) {
public Collection<Message> process(M message) {
Job<M, R, W> job = new Job<>(message);
if (readerFactory != null) {
job.read(readerFactory.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class FlowBuilder<M extends Message, R, W> {

private Supplier<Function<R, W>> transformerFactory;

private Supplier<Function<W, Collection<? extends Message>>> writerFactory;
private Supplier<Function<W, Collection<Message>>> writerFactory;

private Supplier<Consumer<W>> consumingWriterFactory;

Expand Down Expand Up @@ -122,7 +122,7 @@ public FlowBuilder<M, R, W> writeAndSend(Supplier<Function<W, Message>> writerFa
* @param writer The writer to produce outgoing messages.
* @return This {@link FlowBuilder} instance for further configuration or creation of the {@link Flow}.
*/
public FlowBuilder<M, R, W> writeAndSendMany(Function<W, Collection<? extends Message>> writer) {
public FlowBuilder<M, R, W> writeAndSendMany(Function<W, Collection<Message>> writer) {
requireNonNull(writer, "The writer factory cannot be null");
createDefaultTransformer();
this.writerFactory = () -> writer;
Expand All @@ -135,7 +135,7 @@ public FlowBuilder<M, R, W> writeAndSendMany(Function<W, Collection<? extends Me
* @param writerFactory The writer factory to provide a writer for every message.
* @return This {@link FlowBuilder} instance for further configuration or creation of the {@link Flow}.
*/
public FlowBuilder<M, R, W> writeAndSendMany(Supplier<Function<W, Collection<? extends Message>>> writerFactory) {
public FlowBuilder<M, R, W> writeAndSendMany(Supplier<Function<W, Collection<Message>>> writerFactory) {
createDefaultTransformer();
this.writerFactory = requireNonNull(writerFactory, "The writer factory cannot be null");
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.util.Collections;
import java.util.function.Function;

public class WriterAdapter<T> implements Function<T, Collection<? extends Message>> {
public class WriterAdapter<T> implements Function<T, Collection<Message>> {

private final Function<T, Message> writer;

Expand All @@ -14,7 +14,7 @@ public WriterAdapter(Function<T, Message> writer) {
}

@Override
public Collection<? extends Message> apply(T t) {
public Collection<Message> apply(T t) {
Message result = writer.apply(t);
if (result == null) {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class Job<M, R, W> {

private M message;

private Collection<? extends Message> result;
private Collection<Message> result;

public Job(M message) {
this.message = message;
Expand All @@ -28,7 +28,7 @@ public void transform(Function<R, W> transformer) {
dataTransformed = transformer.apply(dataRead);
}

public void write(Function<W, Collection<? extends Message>> writer) {
public void write(Function<W, Collection<Message>> writer) {
result = writer.apply(dataTransformed);
}

Expand All @@ -40,7 +40,7 @@ public M getMessage() {
return message;
}

public Collection<? extends Message> getResult() {
public Collection<Message> getResult() {
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ public W apply(R r) {
}
}

class CheckIfWritten<R> implements Function<R, Collection<? extends Message>> {
class CheckIfWritten<R> implements Function<R, Collection<Message>> {

boolean called = false;

@Override
public Collection<? extends Message> apply(R r) {
public Collection<Message> apply(R r) {
called = true;
return null;
}
Expand Down Expand Up @@ -78,7 +78,7 @@ void readTransformWriteShouldPassValues() {
Job<DefaultMessage, String, String> job = new Job<>(new DefaultMessage().put("message", message));
job.read(m -> m.get("message"));
job.transform(String::toUpperCase);
job.write((Function<String, Collection<? extends Message>>) s -> Collections.singleton(new DefaultMessage().put("message", s)));
job.write((Function<String, Collection<Message>>) s -> Collections.singleton(new DefaultMessage().put("message", s)));
assertThat(job.getResult()).allSatisfy(
result -> assertThat(assertThat(((DefaultMessage) result).get("message")).isEqualTo(message.toUpperCase()))
);
Expand Down

0 comments on commit 7a53dbe

Please sign in to comment.