diff --git a/README.md b/README.md index 701660f7..7e5c3d67 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,7 @@ class Application { Flow flow = new FlowBuilder() .read(message -> message.get("value")) .transform(String::toUpperCase) - .write(value -> DefaultMessage.withType("your.type").put("value", value)) + .write(value -> DefaultMessage.withId("your.id").put("value", value)) .build(); Engine engine = new Engine(messageBroker, flow); @@ -73,7 +73,7 @@ class Transformer implements Function { class Writer implements Function { Message apply(String output) { - return DefaultMessage.withType("your.type").put("value", output); + return DefaultMessage.withId("your.id").put("value", output); } } diff --git a/bdd/src/test/java/de/digitalcollections/workflow/bdd/StepDefinitions.java b/bdd/src/test/java/de/digitalcollections/workflow/bdd/StepDefinitions.java index 05d44da8..42799186 100644 --- a/bdd/src/test/java/de/digitalcollections/workflow/bdd/StepDefinitions.java +++ b/bdd/src/test/java/de/digitalcollections/workflow/bdd/StepDefinitions.java @@ -38,17 +38,17 @@ public StepDefinitions() { .readFrom("bdd.in") .writeTo("bdd.out") ); - messagesToSend = Collections.singletonList(DefaultMessage.withType("happy message")); + messagesToSend = Collections.singletonList(DefaultMessage.withId("happy message")); queueToSendTo = queue; }); When("the processing always fails", () -> { - Flow flow = new FlowBuilder() - .read(Message::getType) + Flow flow = new FlowBuilder<>() + .read(Message::toString) .transform(s -> { throw new RuntimeException("Fail!"); }) - .write(DefaultMessage::withType) + .write(s -> DefaultMessage.withId(s.toString())) .build(); // Preparation finished, start everything @@ -56,10 +56,10 @@ public StepDefinitions() { }); When("^the processing always works$", () -> { - Flow flow = new FlowBuilder() - .read(Message::getType) + Flow flow = new FlowBuilder() + .read(DefaultMessage::getId) .transform(s -> s) - .write(s -> DefaultMessage.withType(s).put("blah", "blubb")) + .write(s -> DefaultMessage.withId(s).put("blah", "blubb")) .build(); // Preparation finished, start everything diff --git a/engine/src/main/java/de/digitalcollections/workflow/engine/model/DefaultMessage.java b/engine/src/main/java/de/digitalcollections/workflow/engine/model/DefaultMessage.java index ed165dae..ed7b6ad7 100644 --- a/engine/src/main/java/de/digitalcollections/workflow/engine/model/DefaultMessage.java +++ b/engine/src/main/java/de/digitalcollections/workflow/engine/model/DefaultMessage.java @@ -31,11 +31,6 @@ public Envelope getEnvelope() { return envelope; } - @Override - public String getType() { - return data.get("type"); - } - public Map getData() { return data; } @@ -58,23 +53,12 @@ public String getId() { return data.get("id"); } - public static DefaultMessage withType(String type) { - DefaultMessage message = new DefaultMessage(); - message.put("type", type); - return message; - } - public static DefaultMessage withId(String id) { DefaultMessage message = new DefaultMessage(); message.put("id", id); return message; } - public DefaultMessage andId(String id) { - this.put("id", id); - return this; - } - @Override public String toString() { return "Message{envelope=" + envelope + ", data=" + data + "}"; diff --git a/engine/src/main/java/de/digitalcollections/workflow/engine/model/Message.java b/engine/src/main/java/de/digitalcollections/workflow/engine/model/Message.java index 946286af..810263ff 100644 --- a/engine/src/main/java/de/digitalcollections/workflow/engine/model/Message.java +++ b/engine/src/main/java/de/digitalcollections/workflow/engine/model/Message.java @@ -14,13 +14,6 @@ public interface Message { */ Envelope getEnvelope(); - /** - * The optional message type is represented by an arbitrary string value to distinguish different events on the same queue. - * - * @return The message type. - */ - String getType(); - /** * The optional ID can hold the identifier of an corresponding object (e.g. a book to be indexed). * diff --git a/engine/src/test/java/de/digitalcollections/workflow/engine/CustomMessage.java b/engine/src/test/java/de/digitalcollections/workflow/engine/CustomMessage.java index a4250e5a..b6d787f8 100644 --- a/engine/src/test/java/de/digitalcollections/workflow/engine/CustomMessage.java +++ b/engine/src/test/java/de/digitalcollections/workflow/engine/CustomMessage.java @@ -22,10 +22,10 @@ public CustomMessage() { } - public CustomMessage(String type) { + public CustomMessage(Long id) { this.envelope = new Envelope(); this.data = new HashMap<>(); - this.put("type", type); + this.id = id; } @@ -34,11 +34,6 @@ public Envelope getEnvelope() { return envelope; } - @Override - public String getType() { - return data.get("type"); - } - public Map getData() { return data; } diff --git a/engine/src/test/java/de/digitalcollections/workflow/engine/EngineTest.java b/engine/src/test/java/de/digitalcollections/workflow/engine/EngineTest.java index b48ff848..efcb432c 100644 --- a/engine/src/test/java/de/digitalcollections/workflow/engine/EngineTest.java +++ b/engine/src/test/java/de/digitalcollections/workflow/engine/EngineTest.java @@ -36,12 +36,12 @@ class EngineTest { private static final String DLX = "exchange.retry"; private MessageBroker messageBroker; - private Flow flowWithoutProblems; + private Flow flowWithoutProblems; private Message[] moreMessages(int number) { Message[] messages = new Message[number]; for (int i = 0; i < messages.length; i++) { - messages[i] = DefaultMessage.withType("White Room"); + messages[i] = DefaultMessage.withId("White Room"); } return messages; } @@ -49,7 +49,7 @@ private Message[] moreMessages(int number) { @BeforeEach void setUp() { messageBroker = mock(MessageBroker.class); - flowWithoutProblems = new FlowBuilder() + flowWithoutProblems = new FlowBuilder() .read(READ_SOME_STRING) .transform(Function.identity()) .write(WRITE_SOME_STRING) @@ -58,13 +58,13 @@ void setUp() { @Test public void engineShouldUseMaxNumberOfWorkers() throws IOException, InterruptedException { - when(messageBroker.receive()).thenReturn(DefaultMessage.withType("White Room")); + when(messageBroker.receive()).thenReturn(DefaultMessage.withId("White Room")); Semaphore semaphore = new Semaphore(1); semaphore.drainPermits(); - Flow flow = new FlowBuilder() - .read(Message::getType) + Flow flow = new FlowBuilder() + .read(DefaultMessage::getId) .transform(s -> { try { LOGGER.debug("Trying to acquire semaphore, should block (Thread id {})", Thread.currentThread().getId()); @@ -75,7 +75,7 @@ public void engineShouldUseMaxNumberOfWorkers() throws IOException, InterruptedE } return s; }) - .write(DefaultMessage::withType) + .write(DefaultMessage::withId) .build(); Engine engine = new Engine(messageBroker, flow); @@ -134,14 +134,14 @@ public void engineShouldUseMaxNumberOfWorkers() throws IOException, InterruptedE // System.out.println(messagesSent.get()); // } - private final Function READ_SOME_STRING = Message::getType; + private final Function READ_SOME_STRING = DefaultMessage::getId; - private final Function WRITE_SOME_STRING = DefaultMessage::withType; + private final Function WRITE_SOME_STRING = DefaultMessage::withId; @Test @DisplayName("Engine should reject a message failing processing") void processShouldRejectMessageOnFailure() throws IOException { - Flow flow = new FlowBuilder() + Flow flow = new FlowBuilder() .read(READ_SOME_STRING) .transform(s -> { throw new RuntimeException("Aaaaaaah!"); diff --git a/engine/src/test/java/de/digitalcollections/workflow/engine/flow/FlowBuilderTest.java b/engine/src/test/java/de/digitalcollections/workflow/engine/flow/FlowBuilderTest.java index cb5055e3..f825769f 100644 --- a/engine/src/test/java/de/digitalcollections/workflow/engine/flow/FlowBuilderTest.java +++ b/engine/src/test/java/de/digitalcollections/workflow/engine/flow/FlowBuilderTest.java @@ -1,8 +1,6 @@ package de.digitalcollections.workflow.engine.flow; -import de.digitalcollections.workflow.engine.flow.Flow; -import de.digitalcollections.workflow.engine.flow.FlowBuilder; import de.digitalcollections.workflow.engine.model.DefaultMessage; import de.digitalcollections.workflow.engine.model.Message; import java.util.function.Function; @@ -19,9 +17,7 @@ class FlowBuilderTest { @DisplayName("read should throw an exception if the reader is null") public void readShouldThrowExceptionIfReaderIsNull() { FlowBuilder builder = new FlowBuilder<>(); - Throwable exception = assertThrows(NullPointerException.class, () -> { - builder.read((Function) null); - }); + Throwable exception = assertThrows(NullPointerException.class, () -> builder.read((Function) null)); assertThat(exception.getMessage()).contains("reader"); } @@ -29,9 +25,7 @@ public void readShouldThrowExceptionIfReaderIsNull() { @DisplayName("read should throw an exception if the reader factory is null") public void readShouldThrowExceptionIfFactoryIsNull() { FlowBuilder builder = new FlowBuilder<>(); - Throwable exception = assertThrows(NullPointerException.class, () -> { - builder.read((Supplier>) null); - }); + Throwable exception = assertThrows(NullPointerException.class, () -> builder.read((Supplier>) null)); assertThat(exception.getMessage()).contains("reader factory"); } @@ -39,9 +33,7 @@ public void readShouldThrowExceptionIfFactoryIsNull() { @DisplayName("transform should throw an exception if there is no reader") public void transformerShouldThrowExceptionIfReaderIsNull() { FlowBuilder builder = new FlowBuilder<>(); - Throwable exception = assertThrows(IllegalStateException.class, () -> { - builder.transform(s -> null); - }); + Throwable exception = assertThrows(IllegalStateException.class, () -> builder.transform(s -> null)); assertThat(exception.getMessage()).contains("reader"); } @@ -92,13 +84,13 @@ public void writeShouldThrowExceptionIfWriterFactoryIsNull() { @Test @DisplayName("build should create an working Flow if only reader and writer are set") public void buildWithOnlyReadAndWrite() { - Flow flow = new FlowBuilder() - .read(Message::getType) - .write(DefaultMessage::withType) + Flow flow = new FlowBuilder() + .read(DefaultMessage::getId) + .write(DefaultMessage::withId) .build(); String message = "Whiskey in the Jar"; - Message result = flow.process(DefaultMessage.withType(message)); - assertThat(result.getType()).isEqualTo(message); + Message result = flow.process(DefaultMessage.withId(message)); + assertThat(result.getId()).isEqualTo(message); } } diff --git a/engine/src/test/java/de/digitalcollections/workflow/engine/jackson/DefaultMessageMixinTest.java b/engine/src/test/java/de/digitalcollections/workflow/engine/jackson/DefaultMessageMixinTest.java index 1d1a21bf..fb684e20 100644 --- a/engine/src/test/java/de/digitalcollections/workflow/engine/jackson/DefaultMessageMixinTest.java +++ b/engine/src/test/java/de/digitalcollections/workflow/engine/jackson/DefaultMessageMixinTest.java @@ -16,23 +16,6 @@ class DefaultMessageMixinTest { - private class SpecialMessage extends DefaultMessage { - String specialField; - public SpecialMessage() { - super(); - } - public SpecialMessage(String specialField) { - this.specialField = specialField; - } - - public String getSpecialField() { - return specialField; - } - public void setSpecialField(String specialField) { - this.specialField = specialField; - } - } - private ObjectMapper objectMapper; @BeforeEach @@ -48,32 +31,32 @@ void setUp() { @Test @DisplayName("Serialization should exclude field deliveryTag") void shouldExcludeDeliveryTag() throws JsonProcessingException { - String json = objectMapper.writeValueAsString(DefaultMessage.withType("Tadaaaaa!")); + String json = objectMapper.writeValueAsString(DefaultMessage.withId("Tadaaaaa!")); assertThat(json).doesNotContain("deliveryTag"); } @Test @DisplayName("Serialization should exclude field body") void shouldExcludeBody() throws JsonProcessingException { - String json = objectMapper.writeValueAsString(DefaultMessage.withType("Tadaaaaa!")); + String json = objectMapper.writeValueAsString(DefaultMessage.withId("Tadaaaaa!")); assertThat(json).doesNotContain("body"); } @Test @DisplayName("Deserialization should ignore unknown fields") void shouldIgnoreUnknownFields() throws IOException { - DefaultMessage message = DefaultMessage.withType("Tadaaaaa!").andId("X0000012-3"); + DefaultMessage message = DefaultMessage.withId("Tadaaaaa!"); String json = objectMapper.writeValueAsString(message); json = json.substring(0, json.length() - 1) + ", \"stupidField\": 0}"; System.out.println(json); DefaultMessage restored = objectMapper.readValue(json, DefaultMessage.class); - assertThat(message.getType()).isEqualTo(restored.getType()); + assertThat(message.getId()).isEqualTo(restored.getId()); } @Test @DisplayName("Should serialize Envelope.retries") void shouldSerializeRetries() throws JsonProcessingException { - DefaultMessage message = DefaultMessage.withType("something happened"); + DefaultMessage message = DefaultMessage.withId("something happened"); message.getEnvelope().setRetries(42); assertThat(objectMapper.writeValueAsString(message)).contains("42"); } @@ -81,30 +64,11 @@ void shouldSerializeRetries() throws JsonProcessingException { @Test @DisplayName("Should deserialize Envelope.retries") void shouldDeserializeRetries() throws IOException { - DefaultMessage message = DefaultMessage.withType("something happened"); + DefaultMessage message = DefaultMessage.withId("something happened"); message.getEnvelope().setRetries(42); Message deserialized = objectMapper.readValue(objectMapper.writeValueAsString(message), DefaultMessage.class); assertThat(deserialized.getEnvelope().getRetries()).isEqualTo(42); } - -// @Test -// @DisplayName("Serialization should preserve parameters") -// void shouldPreserveParams() throws IOException { -// Message message = new Message(); -// message.put("floob", "gooobl"); -// message.put("bingle", "bongle"); -// Message restored = objectMapper.readValue(objectMapper.writeValueAsString(message), Message.class); -// assertThat(restored.getData()).isEqualTo(message.getData()); -// } - -// @Test -// @DisplayName("Serialization should work for subtypes") -// void shouldSaveSpecialMessage() throws IOException { -// SpecialMessage message = new SpecialMessage("by the hammer of Thor"); -// SpecialMessage restored = objectMapper.readValue(objectMapper.writeValueAsString(message), SpecialMessage.class); -// assertThat(restored.getSpecialField()).isEqualTo(message.getSpecialField()); -// } - } diff --git a/engine/src/test/java/de/digitalcollections/workflow/engine/messagebroker/MessageBrokerTest.java b/engine/src/test/java/de/digitalcollections/workflow/engine/messagebroker/MessageBrokerTest.java index 7498bf85..a28a3796 100644 --- a/engine/src/test/java/de/digitalcollections/workflow/engine/messagebroker/MessageBrokerTest.java +++ b/engine/src/test/java/de/digitalcollections/workflow/engine/messagebroker/MessageBrokerTest.java @@ -38,7 +38,7 @@ void setUp() throws IOException, TimeoutException { routingConfig.complete(); rabbitClient = mock(RabbitClient.class); messageBroker = new MessageBroker(config, routingConfig, rabbitClient); - message = DefaultMessage.withType("Hey"); + message = DefaultMessage.withId("Hey"); } @Test @@ -78,7 +78,7 @@ void rejectShouldRouteToFailedQueueIfMessageIsRejectedTooOften() throws IOExcept @Test @DisplayName("Should send a message to the output queue") void sendShouldRouteMessageToOutputQueue() throws IOException { - messageBroker.send(DefaultMessage.withType("test")); + messageBroker.send(DefaultMessage.withId("test")); verify(rabbitClient).send(any(), eq(routingConfig.getWriteTo()), any()); } @@ -88,9 +88,9 @@ void sendShouldRouteMessageToOutputQueue() throws IOException { void sendMultipleMessagesShouldRouteMessagesToSpecifiedQueue() throws IOException { String queue = "specified.queue"; List messages = Arrays.asList( - DefaultMessage.withType("test"), - DefaultMessage.withType("test"), - DefaultMessage.withType("test")); + DefaultMessage.withId("test"), + DefaultMessage.withId("test"), + DefaultMessage.withId("test")); messageBroker.send(queue, messages); verify(rabbitClient, times(messages.size())).send(any(), eq(queue), any()); } diff --git a/engine/src/test/java/de/digitalcollections/workflow/engine/messagebroker/RabbitClientTest.java b/engine/src/test/java/de/digitalcollections/workflow/engine/messagebroker/RabbitClientTest.java index a4a89c87..6dca793d 100644 --- a/engine/src/test/java/de/digitalcollections/workflow/engine/messagebroker/RabbitClientTest.java +++ b/engine/src/test/java/de/digitalcollections/workflow/engine/messagebroker/RabbitClientTest.java @@ -41,7 +41,7 @@ void setUp() throws IOException, TimeoutException { connection = mock(MessageBrokerConnection.class); channel = mock(Channel.class); when(connection.getChannel()).thenReturn(channel); - message = DefaultMessage.withType("Hey"); + message = DefaultMessage.withId("Hey"); } @@ -97,12 +97,11 @@ void receiveShouldPullTheInputQueue() throws IOException { .returns(retries, from(Envelope::getRetries)) .returns(timestamp, from(Envelope::getTimestamp)); assertThat(message) - .returns(messageType, from(Message::getType)) .returns(messageId, from(Message::getId)); } private Message createMessage(String messageType, String messageId, int retries, LocalDateTime timestamp) throws IOException { - Message messageToReceive = DefaultMessage.withType(messageType).andId(messageId); + Message messageToReceive = DefaultMessage.withId(messageId); messageToReceive.getEnvelope().setRetries(retries); messageToReceive.getEnvelope().setTimestamp(timestamp); return messageToReceive; diff --git a/engine/src/test/java/de/digitalcollections/workflow/engine/model/DefaultMessageTest.java b/engine/src/test/java/de/digitalcollections/workflow/engine/model/DefaultMessageTest.java index 98cea6ce..1d395d79 100644 --- a/engine/src/test/java/de/digitalcollections/workflow/engine/model/DefaultMessageTest.java +++ b/engine/src/test/java/de/digitalcollections/workflow/engine/model/DefaultMessageTest.java @@ -8,7 +8,7 @@ class DefaultMessageTest { @Test void putCanBeChained() { - DefaultMessage message = DefaultMessage.withType("sundry").put("first", "is the first").put("second", "is the second"); + DefaultMessage message = DefaultMessage.withId("sundry").put("first", "is the first").put("second", "is the second"); assertThat(message.get("first")).isEqualTo("is the first"); assertThat(message.get("second")).isEqualTo("is the second"); } diff --git a/engine/src/test/java/de/digitalcollections/workflow/engine/model/JobTest.java b/engine/src/test/java/de/digitalcollections/workflow/engine/model/JobTest.java index a0cc1223..80b706bc 100644 --- a/engine/src/test/java/de/digitalcollections/workflow/engine/model/JobTest.java +++ b/engine/src/test/java/de/digitalcollections/workflow/engine/model/JobTest.java @@ -6,13 +6,12 @@ import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.from; class JobTest { - private static final Message SOME_MESSAGE = DefaultMessage.withType("Hey!"); + private static final Message SOME_MESSAGE = DefaultMessage.withId("Hey!"); - private static final Function DUMMY_READER = Message::getType; + private static final Function DUMMY_READER = Message::getId; private static final Function DUMMY_TRANSFORMER = Function.identity(); @@ -63,16 +62,16 @@ void write() { @DisplayName("Read, Transform, Write should pass values along") void readTransformWriteShouldPassValues() { String message = "Jolene, Jolene, Jolene, Jolene"; - Job job = new Job<>(DefaultMessage.withType(message)); - job.read(Message::getType); + Job job = new Job<>(new DefaultMessage().put("message", message)); + job.read(m -> m.get("message")); job.transform(String::toUpperCase); - job.write(DefaultMessage::new); - assertThat(job.getResult()).returns(message.toUpperCase(), from(Message::getType)); + job.write(s -> new DefaultMessage().put("message", s)); + assertThat(((DefaultMessage) job.getResult()).get("message")).isEqualTo(message.toUpperCase()); } @Test void getMessage() { - Message message = DefaultMessage.withType("Wuthering Heights"); + Message message = DefaultMessage.withId("Wuthering Heights"); Job job = new Job<>(message); assertThat(job.getMessage()).isEqualTo(message); } diff --git a/examples/src/main/java/de/digitalcollections/workflow/examples/Application.java b/examples/src/main/java/de/digitalcollections/workflow/examples/Application.java index 2de26325..159396f3 100644 --- a/examples/src/main/java/de/digitalcollections/workflow/examples/Application.java +++ b/examples/src/main/java/de/digitalcollections/workflow/examples/Application.java @@ -7,7 +7,6 @@ import de.digitalcollections.workflow.engine.messagebroker.MessageBroker; import de.digitalcollections.workflow.engine.messagebroker.MessageBrokerBuilder; import de.digitalcollections.workflow.engine.model.DefaultMessage; -import de.digitalcollections.workflow.engine.model.Message; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,13 +27,13 @@ private void run() throws IOException { .build(); Flow flow = new FlowBuilder() - .read(Message::getType) + .read(DefaultMessage::getId) .transform(new UppercaseTransformer(true)) - .write(DefaultMessage::withType) + .write(DefaultMessage::withId) .build(); Engine engine = new Engine(messageBroker, flow); - messageBroker.send("someInputQueue", DefaultMessage.withType("lowercase-text").put("text", "Shibuyara")); + messageBroker.send("someInputQueue", DefaultMessage.withId("lowercase-text").put("text", "Shibuyara")); engine.start(); } diff --git a/examples/src/main/java/de/digitalcollections/workflow/examples/spring/Application.java b/examples/src/main/java/de/digitalcollections/workflow/examples/spring/Application.java index 943550e9..a7ecc192 100644 --- a/examples/src/main/java/de/digitalcollections/workflow/examples/spring/Application.java +++ b/examples/src/main/java/de/digitalcollections/workflow/examples/spring/Application.java @@ -24,7 +24,7 @@ public Application(MessageBroker messageBroker, Engine engine) { @Override public void run(String... strings) throws WorkflowSetupException, java.io.IOException { - messageBroker.send("someInputQueue", DefaultMessage.withType("lowercase-text").put("text", "Shibuyara")); + messageBroker.send("someInputQueue", DefaultMessage.withId("lowercase-text").put("text", "Shibuyara")); engine.start(); } diff --git a/examples/src/main/java/de/digitalcollections/workflow/examples/spring/StringWriter.java b/examples/src/main/java/de/digitalcollections/workflow/examples/spring/StringWriter.java index 1bf1c948..ed3c34aa 100644 --- a/examples/src/main/java/de/digitalcollections/workflow/examples/spring/StringWriter.java +++ b/examples/src/main/java/de/digitalcollections/workflow/examples/spring/StringWriter.java @@ -10,7 +10,7 @@ public class StringWriter implements Function { @Override public DefaultMessage apply(String s) { - return DefaultMessage.withType("uppercase-strings").put("text", s); + return DefaultMessage.withId("uppercase-strings").put("text", s); } }