Skip to content

Commit

Permalink
Merge pull request #23 from mvallim/feature/interprocess-semaphore
Browse files Browse the repository at this point in the history
feat: comunication interprocess using semaphore
  • Loading branch information
mvallim authored Feb 21, 2024
2 parents 728ef01 + 180ffbd commit 1d8e5d2
Show file tree
Hide file tree
Showing 32 changed files with 1,150 additions and 975 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ jobs:
needs: [build, test]
steps:
- uses: actions/checkout@v3
- name: Set up JDK 11
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
java-version: 11
java-version: 17
distribution: "corretto"
cache: "maven"
- name: Cache SonarQube packages
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ You can pull it from the central Maven repositories:
<dependency>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib-v1</artifactId>
<version>1.0.3</version>
<version>1.0.4</version>
</dependency>
```

Expand All @@ -47,7 +47,7 @@ You can pull it from the central Maven repositories:
<dependency>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib-v2</artifactId>
<version>1.0.3</version>
<version>1.0.4</version>
</dependency>
```

Expand All @@ -68,12 +68,12 @@ If you want to try a snapshot version, add the following repository:

### For AWS SDK v1
```groovy
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.0.3'
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.0.4'
```

### For AWS SDK v2
```groovy
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.0.3'
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.0.4'
```

If you want to try a snapshot version, add the following repository:
Expand Down
2 changes: 1 addition & 1 deletion amazon-sns-java-messaging-lib-template/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib</artifactId>
<version>1.0.4-SNAPSHOT</version>
<version>1.0.5-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.amazon.sns.messaging.lib.core;

import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazon.sns.messaging.lib.model.PublishRequestBuilder;
import com.amazon.sns.messaging.lib.model.RequestEntry;
import com.amazon.sns.messaging.lib.model.TopicProperty;
import com.fasterxml.jackson.databind.ObjectMapper;

import lombok.SneakyThrows;

// @formatter:off
//@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
abstract class AbstractAmazonSnsConsumer<C, R, O, E> implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAmazonSnsConsumer.class);

private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

protected final C amazonSnsClient;

private final TopicProperty topicProperty;

private final ObjectMapper objectMapper;

protected final ConcurrentMap<String, ListenableFutureRegistry> pendingRequests;

private final BlockingQueue<RequestEntry<E>> topicRequests;

private final ExecutorService executorService;

protected AbstractAmazonSnsConsumer(
final C amazonSnsClient,
final TopicProperty topicProperty,
final ObjectMapper objectMapper,
final ConcurrentMap<String, ListenableFutureRegistry> pendingRequests,
final BlockingQueue<RequestEntry<E>> topicRequests,
final ExecutorService executorService) {

this.amazonSnsClient = amazonSnsClient;
this.topicProperty = topicProperty;
this.objectMapper = objectMapper;
this.pendingRequests = pendingRequests;
this.topicRequests = topicRequests;
this.executorService = executorService;

scheduledExecutorService.scheduleAtFixedRate(this, 0, topicProperty.getLinger(), TimeUnit.MILLISECONDS);
}

protected abstract O publish(final R publishBatchRequest);

protected abstract void handleError(final R publishBatchRequest, final Throwable throwable);

protected abstract void handleResponse(final O publishBatchResult);

protected abstract BiFunction<String, List<RequestEntry<E>>, R> supplierPublishRequest();

private void doPublish(final R publishBatchRequest) {
try {
handleResponse(publish(publishBatchRequest));
} catch (final Exception ex) {
handleError(publishBatchRequest, ex);
}
}

private void publishBatch(final R publishBatchRequest) {
if (topicProperty.isFifo()) {
doPublish(publishBatchRequest);
} else {
try {
CompletableFuture.runAsync(() -> doPublish(publishBatchRequest), executorService);
} catch (final Exception ex) {
handleError(publishBatchRequest, ex);
}
}
}

@Override
@SneakyThrows
public void run() {
try {
while (requestsWaitedFor(topicRequests, topicProperty.getLinger()) || maxBatchSizeReached(topicRequests)) {
createBatch(topicRequests).ifPresent(this::publishBatch);
}
} catch (final Exception ex) {
LOGGER.error(ex.getMessage(), ex);
}
}

@SneakyThrows
public void shutdown() {
LOGGER.warn("Shutdown consumer {}", getClass().getSimpleName());

scheduledExecutorService.shutdown();
if (!scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS)) {
LOGGER.warn("Scheduled executor service did not terminate in the specified time.");
final List<Runnable> droppedTasks = executorService.shutdownNow();
LOGGER.warn("Scheduled executor service was abruptly shut down. {} tasks will not be executed.", droppedTasks.size());
}

executorService.shutdown();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
LOGGER.warn("Executor service did not terminate in the specified time.");
final List<Runnable> droppedTasks = executorService.shutdownNow();
LOGGER.warn("Executor service was abruptly shut down. {} tasks will not be executed.", droppedTasks.size());
}
}

private boolean requestsWaitedFor(final BlockingQueue<RequestEntry<E>> requests, final long batchingWindowInMs) {
return Optional.ofNullable(requests.peek()).map(oldestPendingRequest -> {
final long oldestEntryWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - oldestPendingRequest.getCreateTime());
return oldestEntryWaitTime > batchingWindowInMs;
}).orElse(false);
}

private boolean maxBatchSizeReached(final BlockingQueue<RequestEntry<E>> requests) {
return requests.size() > topicProperty.getMaxBatchSize();
}

@SneakyThrows
private Optional<R> createBatch(final BlockingQueue<RequestEntry<E>> requests) {
final List<RequestEntry<E>> requestEntries = new LinkedList<>();

while (requestEntries.size() < topicProperty.getMaxBatchSize() && Objects.nonNull(requests.peek())) {
final RequestEntry<E> requestEntry = requests.take();
requestEntries.add(requestEntry);
}

if (requestEntries.isEmpty()) {
return Optional.empty();
}

LOGGER.debug("{}", requestEntries);

return Optional.of(PublishRequestBuilder.<R, RequestEntry<E>>builder()
.supplier(supplierPublishRequest())
.entries(requestEntries)
.topicArn(topicProperty.getTopicArn())
.build());
}

@SneakyThrows
public CompletableFuture<Void> await() {
return CompletableFuture.runAsync(() -> {
while (
MapUtils.isNotEmpty(this.pendingRequests) ||
CollectionUtils.isNotEmpty(this.topicRequests)) {
sleep(topicProperty.getLinger());
}
});
}

@SneakyThrows
protected String convertPayload(final E payload) {
return payload instanceof String ? payload.toString() : objectMapper.writeValueAsString(payload);
}

@SneakyThrows
private static void sleep(final long millis) {
Thread.sleep(millis);
}

}
// @formatter:on
Loading

0 comments on commit 1d8e5d2

Please sign in to comment.