Skip to content

Commit

Permalink
Remove Message.type
Browse files Browse the repository at this point in the history
  • Loading branch information
bitzl committed Jan 10, 2018
1 parent be3113a commit 78cf3fd
Show file tree
Hide file tree
Showing 15 changed files with 55 additions and 130 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class Application {
Flow flow = new FlowBuilder<DefaultMessage, String, String>()
.read(message -> message.get("value"))
.transform(String::toUpperCase)
.write(value -> DefaultMessage.withType("your.type").put("value", value))
.write(value -> DefaultMessage.withId("your.id").put("value", value))
.build();

Engine engine = new Engine(messageBroker, flow);
Expand All @@ -73,7 +73,7 @@ class Transformer implements Function<String, String> {

class Writer implements Function<String, Message> {
Message apply(String output) {
return DefaultMessage.withType("your.type").put("value", output);
return DefaultMessage.withId("your.id").put("value", output);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,28 @@ public StepDefinitions() {
.readFrom("bdd.in")
.writeTo("bdd.out")
);
messagesToSend = Collections.singletonList(DefaultMessage.withType("happy message"));
messagesToSend = Collections.singletonList(DefaultMessage.withId("happy message"));
queueToSendTo = queue;
});

When("the processing always fails", () -> {
Flow<Message, String, String> flow = new FlowBuilder<Message, String, String>()
.read(Message::getType)
Flow flow = new FlowBuilder<>()
.read(Message::toString)
.transform(s -> {
throw new RuntimeException("Fail!");
})
.write(DefaultMessage::withType)
.write(s -> DefaultMessage.withId(s.toString()))
.build();

// Preparation finished, start everything
start(flow);
});

When("^the processing always works$", () -> {
Flow<Message, String, String> flow = new FlowBuilder<Message, String, String>()
.read(Message::getType)
Flow flow = new FlowBuilder<DefaultMessage, String, String>()
.read(DefaultMessage::getId)
.transform(s -> s)
.write(s -> DefaultMessage.withType(s).put("blah", "blubb"))
.write(s -> DefaultMessage.withId(s).put("blah", "blubb"))
.build();

// Preparation finished, start everything
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ public Envelope getEnvelope() {
return envelope;
}

@Override
public String getType() {
return data.get("type");
}

public Map<String, String> getData() {
return data;
}
Expand All @@ -58,23 +53,12 @@ public String getId() {
return data.get("id");
}

public static DefaultMessage withType(String type) {
DefaultMessage message = new DefaultMessage();
message.put("type", type);
return message;
}

public static DefaultMessage withId(String id) {
DefaultMessage message = new DefaultMessage();
message.put("id", id);
return message;
}

public DefaultMessage andId(String id) {
this.put("id", id);
return this;
}

@Override
public String toString() {
return "Message{envelope=" + envelope + ", data=" + data + "}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@ public interface Message<ID> {
*/
Envelope getEnvelope();

/**
* The optional message type is represented by an arbitrary string value to distinguish different events on the same queue.
*
* @return The message type.
*/
String getType();

/**
* The optional ID can hold the identifier of an corresponding object (e.g. a book to be indexed).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ public CustomMessage() {
}


public CustomMessage(String type) {
public CustomMessage(Long id) {
this.envelope = new Envelope();
this.data = new HashMap<>();
this.put("type", type);
this.id = id;
}


Expand All @@ -34,11 +34,6 @@ public Envelope getEnvelope() {
return envelope;
}

@Override
public String getType() {
return data.get("type");
}

public Map<String, String> getData() {
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,20 @@ class EngineTest {

private static final String DLX = "exchange.retry";
private MessageBroker messageBroker;
private Flow<Message, String, String> flowWithoutProblems;
private Flow<DefaultMessage, String, String> flowWithoutProblems;

private Message[] moreMessages(int number) {
Message[] messages = new Message[number];
for (int i = 0; i < messages.length; i++) {
messages[i] = DefaultMessage.withType("White Room");
messages[i] = DefaultMessage.withId("White Room");
}
return messages;
}

@BeforeEach
void setUp() {
messageBroker = mock(MessageBroker.class);
flowWithoutProblems = new FlowBuilder<Message, String, String>()
flowWithoutProblems = new FlowBuilder<DefaultMessage, String, String>()
.read(READ_SOME_STRING)
.transform(Function.identity())
.write(WRITE_SOME_STRING)
Expand All @@ -58,13 +58,13 @@ void setUp() {

@Test
public void engineShouldUseMaxNumberOfWorkers() throws IOException, InterruptedException {
when(messageBroker.receive()).thenReturn(DefaultMessage.withType("White Room"));
when(messageBroker.receive()).thenReturn(DefaultMessage.withId("White Room"));

Semaphore semaphore = new Semaphore(1);
semaphore.drainPermits();

Flow<Message, String, String> flow = new FlowBuilder<Message, String, String>()
.read(Message::getType)
Flow flow = new FlowBuilder<DefaultMessage, String, String>()
.read(DefaultMessage::getId)
.transform(s -> {
try {
LOGGER.debug("Trying to acquire semaphore, should block (Thread id {})", Thread.currentThread().getId());
Expand All @@ -75,7 +75,7 @@ public void engineShouldUseMaxNumberOfWorkers() throws IOException, InterruptedE
}
return s;
})
.write(DefaultMessage::withType)
.write(DefaultMessage::withId)
.build();

Engine engine = new Engine(messageBroker, flow);
Expand Down Expand Up @@ -134,14 +134,14 @@ public void engineShouldUseMaxNumberOfWorkers() throws IOException, InterruptedE
// System.out.println(messagesSent.get());
// }

private final Function<Message, String> READ_SOME_STRING = Message::getType;
private final Function<DefaultMessage, String> READ_SOME_STRING = DefaultMessage::getId;

private final Function<String, Message> WRITE_SOME_STRING = DefaultMessage::withType;
private final Function<String, Message> WRITE_SOME_STRING = DefaultMessage::withId;

@Test
@DisplayName("Engine should reject a message failing processing")
void processShouldRejectMessageOnFailure() throws IOException {
Flow<Message, String, String> flow = new FlowBuilder<Message, String, String>()
Flow flow = new FlowBuilder<DefaultMessage, String, String>()
.read(READ_SOME_STRING)
.transform(s -> {
throw new RuntimeException("Aaaaaaah!");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package de.digitalcollections.workflow.engine.flow;


import de.digitalcollections.workflow.engine.flow.Flow;
import de.digitalcollections.workflow.engine.flow.FlowBuilder;
import de.digitalcollections.workflow.engine.model.DefaultMessage;
import de.digitalcollections.workflow.engine.model.Message;
import java.util.function.Function;
Expand All @@ -19,29 +17,23 @@ class FlowBuilderTest {
@DisplayName("read should throw an exception if the reader is null")
public void readShouldThrowExceptionIfReaderIsNull() {
FlowBuilder<Message, String, String> builder = new FlowBuilder<>();
Throwable exception = assertThrows(NullPointerException.class, () -> {
builder.read((Function<Message, String>) null);
});
Throwable exception = assertThrows(NullPointerException.class, () -> builder.read((Function<Message, String>) null));
assertThat(exception.getMessage()).contains("reader");
}

@Test
@DisplayName("read should throw an exception if the reader factory is null")
public void readShouldThrowExceptionIfFactoryIsNull() {
FlowBuilder<Message, String, String> builder = new FlowBuilder<>();
Throwable exception = assertThrows(NullPointerException.class, () -> {
builder.read((Supplier<Function<Message, String>>) null);
});
Throwable exception = assertThrows(NullPointerException.class, () -> builder.read((Supplier<Function<Message, String>>) null));
assertThat(exception.getMessage()).contains("reader factory");
}

@Test
@DisplayName("transform should throw an exception if there is no reader")
public void transformerShouldThrowExceptionIfReaderIsNull() {
FlowBuilder<Message, String, String> builder = new FlowBuilder<>();
Throwable exception = assertThrows(IllegalStateException.class, () -> {
builder.transform(s -> null);
});
Throwable exception = assertThrows(IllegalStateException.class, () -> builder.transform(s -> null));
assertThat(exception.getMessage()).contains("reader");
}

Expand Down Expand Up @@ -92,13 +84,13 @@ public void writeShouldThrowExceptionIfWriterFactoryIsNull() {
@Test
@DisplayName("build should create an working Flow if only reader and writer are set")
public void buildWithOnlyReadAndWrite() {
Flow<Message, String, String> flow = new FlowBuilder<Message, String, String>()
.read(Message::getType)
.write(DefaultMessage::withType)
Flow flow = new FlowBuilder<DefaultMessage, String, String>()
.read(DefaultMessage::getId)
.write(DefaultMessage::withId)
.build();
String message = "Whiskey in the Jar";
Message result = flow.process(DefaultMessage.withType(message));
assertThat(result.getType()).isEqualTo(message);
Message result = flow.process(DefaultMessage.withId(message));
assertThat(result.getId()).isEqualTo(message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,6 @@

class DefaultMessageMixinTest {

private class SpecialMessage extends DefaultMessage {
String specialField;
public SpecialMessage() {
super();
}
public SpecialMessage(String specialField) {
this.specialField = specialField;
}

public String getSpecialField() {
return specialField;
}
public void setSpecialField(String specialField) {
this.specialField = specialField;
}
}

private ObjectMapper objectMapper;

@BeforeEach
Expand All @@ -48,63 +31,44 @@ void setUp() {
@Test
@DisplayName("Serialization should exclude field deliveryTag")
void shouldExcludeDeliveryTag() throws JsonProcessingException {
String json = objectMapper.writeValueAsString(DefaultMessage.withType("Tadaaaaa!"));
String json = objectMapper.writeValueAsString(DefaultMessage.withId("Tadaaaaa!"));
assertThat(json).doesNotContain("deliveryTag");
}

@Test
@DisplayName("Serialization should exclude field body")
void shouldExcludeBody() throws JsonProcessingException {
String json = objectMapper.writeValueAsString(DefaultMessage.withType("Tadaaaaa!"));
String json = objectMapper.writeValueAsString(DefaultMessage.withId("Tadaaaaa!"));
assertThat(json).doesNotContain("body");
}

@Test
@DisplayName("Deserialization should ignore unknown fields")
void shouldIgnoreUnknownFields() throws IOException {
DefaultMessage message = DefaultMessage.withType("Tadaaaaa!").andId("X0000012-3");
DefaultMessage message = DefaultMessage.withId("Tadaaaaa!");
String json = objectMapper.writeValueAsString(message);
json = json.substring(0, json.length() - 1) + ", \"stupidField\": 0}";
System.out.println(json);
DefaultMessage restored = objectMapper.readValue(json, DefaultMessage.class);
assertThat(message.getType()).isEqualTo(restored.getType());
assertThat(message.getId()).isEqualTo(restored.getId());
}

@Test
@DisplayName("Should serialize Envelope.retries")
void shouldSerializeRetries() throws JsonProcessingException {
DefaultMessage message = DefaultMessage.withType("something happened");
DefaultMessage message = DefaultMessage.withId("something happened");
message.getEnvelope().setRetries(42);
assertThat(objectMapper.writeValueAsString(message)).contains("42");
}

@Test
@DisplayName("Should deserialize Envelope.retries")
void shouldDeserializeRetries() throws IOException {
DefaultMessage message = DefaultMessage.withType("something happened");
DefaultMessage message = DefaultMessage.withId("something happened");
message.getEnvelope().setRetries(42);
Message deserialized = objectMapper.readValue(objectMapper.writeValueAsString(message), DefaultMessage.class);
assertThat(deserialized.getEnvelope().getRetries()).isEqualTo(42);

}


// @Test
// @DisplayName("Serialization should preserve parameters")
// void shouldPreserveParams() throws IOException {
// Message message = new Message();
// message.put("floob", "gooobl");
// message.put("bingle", "bongle");
// Message restored = objectMapper.readValue(objectMapper.writeValueAsString(message), Message.class);
// assertThat(restored.getData()).isEqualTo(message.getData());
// }

// @Test
// @DisplayName("Serialization should work for subtypes")
// void shouldSaveSpecialMessage() throws IOException {
// SpecialMessage message = new SpecialMessage("by the hammer of Thor");
// SpecialMessage restored = objectMapper.readValue(objectMapper.writeValueAsString(message), SpecialMessage.class);
// assertThat(restored.getSpecialField()).isEqualTo(message.getSpecialField());
// }

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void setUp() throws IOException, TimeoutException {
routingConfig.complete();
rabbitClient = mock(RabbitClient.class);
messageBroker = new MessageBroker(config, routingConfig, rabbitClient);
message = DefaultMessage.withType("Hey");
message = DefaultMessage.withId("Hey");
}

@Test
Expand Down Expand Up @@ -78,7 +78,7 @@ void rejectShouldRouteToFailedQueueIfMessageIsRejectedTooOften() throws IOExcept
@Test
@DisplayName("Should send a message to the output queue")
void sendShouldRouteMessageToOutputQueue() throws IOException {
messageBroker.send(DefaultMessage.withType("test"));
messageBroker.send(DefaultMessage.withId("test"));
verify(rabbitClient).send(any(), eq(routingConfig.getWriteTo()), any());
}

Expand All @@ -88,9 +88,9 @@ void sendShouldRouteMessageToOutputQueue() throws IOException {
void sendMultipleMessagesShouldRouteMessagesToSpecifiedQueue() throws IOException {
String queue = "specified.queue";
List<Message> messages = Arrays.asList(
DefaultMessage.withType("test"),
DefaultMessage.withType("test"),
DefaultMessage.withType("test"));
DefaultMessage.withId("test"),
DefaultMessage.withId("test"),
DefaultMessage.withId("test"));
messageBroker.send(queue, messages);
verify(rabbitClient, times(messages.size())).send(any(), eq(queue), any());
}
Expand Down
Loading

0 comments on commit 78cf3fd

Please sign in to comment.