Skip to content
This repository has been archived by the owner on Feb 1, 2024. It is now read-only.

Commit

Permalink
feat: add support for producing (#32)
Browse files Browse the repository at this point in the history
Allows you to send messages to SQS directly, without having to go across
an SNS fanout.

Closes #31
  • Loading branch information
Heiko Rothe authored Oct 29, 2021
1 parent 4b5c297 commit 78d2fce
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 15 deletions.
50 changes: 42 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Spring Cloud Stream Binder for AWS SQS

spring-cloud-stream-binder-sqs lets you use [Spring Cloud Stream](https://spring.io/projects/spring-cloud-stream) with the AWS Simple Queue Service (SQS). Currently it only supports consuming from SQS queues to your service, producing will be added later.
spring-cloud-stream-binder-sqs lets you use [Spring Cloud Stream](https://spring.io/projects/spring-cloud-stream) with the AWS Simple Queue Service (SQS).

## Installation

Expand All @@ -9,7 +9,7 @@ spring-cloud-stream-binder-sqs lets you use [Spring Cloud Stream](https://spring
<dependency>
<groupId>de.idealo.spring</groupId>
<artifactId>spring-cloud-stream-binder-sqs</artifactId>
<version>1.0.0</version>
<version>1.5.0</version>
</dependency>
</dependencies>
```
Expand All @@ -20,11 +20,12 @@ With the library in your dependencies you can configure your Spring Cloud Stream

You may also provide additional configuration options:

- **maxNumberOfMessages** - Maximum number of messages to retrieve with one poll to SQS. Must be a number between 1 and 10.
- **visibilityTimeout** - The duration in seconds that polled messages are hidden from subsequent poll requests after having been retrieved.
- **waitTimeout** - The duration in seconds that the system will wait for new messages to arrive when polling. Uses the Amazon SQS long polling feature. The value should be between 1 and 20.
- **messageDeletionPolicy** - The deletion policy for messages that are retrieved from SQS. Defaults to NO_REDRIVE.
- **snsFanout** - Whether the incoming message has the SNS format and should be deserialized automatically. Defaults to true.
- **Consumers**
- **maxNumberOfMessages** - Maximum number of messages to retrieve with one poll to SQS. Must be a number between 1 and 10.
- **visibilityTimeout** - The duration in seconds that polled messages are hidden from subsequent poll requests after having been retrieved.
- **waitTimeout** - The duration in seconds that the system will wait for new messages to arrive when polling. Uses the Amazon SQS long polling feature. The value should be between 1 and 20.
- **messageDeletionPolicy** - The deletion policy for messages that are retrieved from SQS. Defaults to NO_REDRIVE.
- **snsFanout** - Whether the incoming message has the SNS format and should be deserialized automatically. Defaults to true.

**Example Configuration:**

Expand All @@ -39,7 +40,40 @@ spring:
snsFanout: false
bindings:
someFunction-in-0:
destination: queue-name
destination: input-queue-name
someFunction-out-0:
destination: output-queue-name
```
You may also provide your own beans of `AmazonSQSAsync` to override those that are created by [spring-cloud-aws-autoconfigure](https://github.com/spring-cloud/spring-cloud-aws/tree/master/spring-cloud-aws-autoconfigure).

### FIFO queues

To use [FIFO SQS queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html) you will need to provide a deduplication id and a group id.
With this binder you may set these using the message headers `SqsHeaders.GROUP_ID` and `SqsHeaders.DEDUPLICATION_ID`.
The example below shows how you could use a FIFO queue in real life.

**Example Configuration:**

```yaml
spring:
cloud:
stream:
bindings:
someFunction-in-0:
destination: input-queue-name
someFunction-out-0:
destination: output-queue-name.fifo
```

```java
class Application {
@Bean
public Message<String> someFunction(String input) {
return MessageBuilder.withPayload(input)
.setHeader(SqsHeaders.GROUP_ID, "my-application")
.setHeader(SqsHeaders.DEDUPLICATION_ID, UUID.randomUUID())
.build();
}
}
```
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/de/idealo/spring/stream/binder/sqs/SqsHeaders.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package de.idealo.spring.stream.binder.sqs;

public final class SqsHeaders {

public static final String PREFIX = "sqs_";

public static final String GROUP_ID = PREFIX + "groupId";

public static final String DEDUPLICATION_ID = PREFIX + "deduplicationId";

private SqsHeaders() {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter;
import org.springframework.integration.aws.outbound.SqsMessageHandler;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
Expand Down Expand Up @@ -46,7 +48,15 @@ public List<SqsMessageDrivenChannelAdapter> getAdapters() {

@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<SqsProducerProperties> producerProperties, MessageChannel errorChannel) throws Exception {
throw new UnsupportedOperationException("Producing to SQS is not supported yet");
SqsMessageHandler sqsMessageHandler = new SqsMessageHandler(amazonSQS);
sqsMessageHandler.setQueue(destination.getName());
sqsMessageHandler.setFailureChannel(errorChannel);
sqsMessageHandler.setBeanFactory(getBeanFactory());

sqsMessageHandler.setMessageGroupIdExpressionString(String.format("headers.get('%s')", SqsHeaders.GROUP_ID));
sqsMessageHandler.setMessageDeduplicationIdExpressionString(String.format("headers.get('%s')", SqsHeaders.DEDUPLICATION_ID));

return sqsMessageHandler;
}

@Override
Expand Down Expand Up @@ -88,4 +98,9 @@ public String getDefaultsPrefix() {
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
}

@Override
protected void postProcessOutputChannel(MessageChannel outputChannel, ExtendedProducerProperties<SqsProducerProperties> producerProperties) {
((AbstractMessageChannel) outputChannel).addInterceptor(new SqsPayloadConvertingChannelInterceptor());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package de.idealo.spring.stream.binder.sqs;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageBuilder;

public class SqsPayloadConvertingChannelInterceptor implements ChannelInterceptor {

@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
return MessageBuilder.createMessage(new String((byte[]) message.getPayload()), message.getHeaders());
}

}
Original file line number Diff line number Diff line change
@@ -1,28 +1,41 @@
package de.idealo.spring.stream.binder.sqs;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.SQS;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.HealthEndpoint;
import org.springframework.boot.actuate.health.Status;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.support.MessageBuilder;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.SendMessageRequest;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.test.StepVerifier;

Expand All @@ -31,8 +44,10 @@
"cloud.aws.stack.auto=false",
"cloud.aws.region.static=eu-central-1",
"spring.cloud.stream.bindings.input-in-0.destination=queue1",
"spring.cloud.stream.bindings.function.definition=input",
"spring.cloud.stream.sqs.bindings.input-in-0.consumer.snsFanout=false"
"spring.cloud.stream.sqs.bindings.input-in-0.consumer.snsFanout=false",
"spring.cloud.stream.bindings.output-out-0.destination=queue2",
"spring.cloud.stream.bindings.fifoOutput-out-0.destination=queue3.fifo",
"spring.cloud.function.definition=input;output;fifoOutput"
})
class SqsBinderTest {

Expand All @@ -41,9 +56,11 @@ class SqsBinderTest {
.withServices(SQS)
.withEnv("DEFAULT_REGION", "eu-central-1");

private static final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
private static final Sinks.Many<String> inputSink = Sinks.many().multicast().onBackpressureBuffer();
private static final Sinks.Many<String> outputSink = Sinks.many().multicast().onBackpressureBuffer();
private static final Sinks.Many<org.springframework.messaging.Message<String>> fifoOutputSink = Sinks.many().multicast().onBackpressureBuffer();

@Autowired
@SpyBean
private AmazonSQSAsync amazonSQS;

@Autowired
Expand All @@ -52,6 +69,8 @@ class SqsBinderTest {
@BeforeAll
static void beforeAll() throws Exception {
localStack.execInContainer("awslocal", "sqs", "create-queue", "--queue-name", "queue1");
localStack.execInContainer("awslocal", "sqs", "create-queue", "--queue-name", "queue2");
localStack.execInContainer("awslocal", "sqs", "create-queue", "--queue-name", "queue3.fifo", "--attributes", "FifoQueue=true");
}

@Test
Expand All @@ -61,13 +80,55 @@ void shouldPassMessageToConsumer() {
String queueUrl = amazonSQS.getQueueUrl("queue1").getQueueUrl();
amazonSQS.sendMessage(queueUrl, testMessage);

StepVerifier.create(sink.asFlux())
StepVerifier.create(inputSink.asFlux())
.assertNext(message -> {
assertThat(message).isEqualTo(testMessage);
})
.verifyTimeout(Duration.ofSeconds(1));
}

@Test
void shouldPublishMessageFromProducer() {
String testMessage = "test message";

outputSink.tryEmitNext(testMessage);

String queueUrl = amazonSQS.getQueueUrl("queue2").getQueueUrl();

await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
List<Message> messages = amazonSQS.receiveMessage(queueUrl).getMessages();
assertThat(messages).hasSize(1)
.extracting("body")
.containsExactly(testMessage);
});
}

@Test
void shouldPublishMessageToFifoQueue() {
org.springframework.messaging.Message<String> message = MessageBuilder
.withPayload("fifo body")
.setHeader(SqsHeaders.GROUP_ID, "my-group")
.setHeader(SqsHeaders.DEDUPLICATION_ID, "unique1")
.build();

fifoOutputSink.tryEmitNext(message);

ArgumentCaptor<SendMessageRequest> captor = ArgumentCaptor.forClass(SendMessageRequest.class);
verify(amazonSQS, timeout(500)).sendMessageAsync(captor.capture(), any());

SendMessageRequest actualRequest = captor.getValue();
assertThat(actualRequest.getMessageGroupId()).isEqualTo("my-group");
assertThat(actualRequest.getMessageDeduplicationId()).isEqualTo("unique1");

String queueUrl = amazonSQS.getQueueUrl("queue3.fifo").getQueueUrl();
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
List<Message> messages = amazonSQS.receiveMessage(queueUrl).getMessages();
assertThat(messages).hasSize(1)
.extracting("body")
.containsExactly(message.getPayload());
});
}

@Test
void canTestHealth() {
assertThat(healthEndpoint.health().getStatus()).isEqualTo(Status.UP);
Expand All @@ -87,7 +148,17 @@ AmazonSQSAsync amazonSQS() {

@Bean
Consumer<String> input() {
return sink::tryEmitNext;
return inputSink::tryEmitNext;
}

@Bean
Supplier<Flux<String>> output() {
return outputSink::asFlux;
}

@Bean
Supplier<Flux<org.springframework.messaging.Message<String>>> fifoOutput() {
return fifoOutputSink::asFlux;
}
}

Expand Down

0 comments on commit 78d2fce

Please sign in to comment.