Skip to content

Commit

Permalink
Merge pull request #38 from mvallim/feature/add-ring-buffer-bocking-q…
Browse files Browse the repository at this point in the history
…ueue

Feature/add ring buffer bocking queue
  • Loading branch information
mvallim authored Feb 21, 2024
2 parents 1c79679 + 05a6b57 commit 0d2dc64
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 17 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ You can pull it from the central Maven repositories:
<dependency>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib-v1</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
</dependency>
```

Expand All @@ -47,7 +47,7 @@ You can pull it from the central Maven repositories:
<dependency>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib-v2</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
</dependency>
```

Expand All @@ -68,12 +68,12 @@ If you want to try a snapshot version, add the following repository:

### For AWS SDK v1
```groovy
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.0.5'
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.0.6'
```

### For AWS SDK v2
```groovy
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.0.5'
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.0.6'
```

If you want to try a snapshot version, add the following repository:
Expand Down
2 changes: 1 addition & 1 deletion amazon-sns-java-messaging-lib-template/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib</artifactId>
<version>1.0.6-SNAPSHOT</version>
<version>1.0.7-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.amazon.sns.messaging.lib.concurrent;

import java.util.AbstractQueue;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;

@SuppressWarnings({ "java:S2274", "unchecked" })
public class RingBufferBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {

private static final int DEFAULT_CAPACITY = 2048;

private final Entry<E>[] buffer;

private final int capacity;

private final AtomicInteger writeSequence = new AtomicInteger(-1);

private final AtomicInteger readSequence = new AtomicInteger(0);

private final ReentrantLock reentrantLock;

private final Condition notEmpty;

private final Condition notFull;

public RingBufferBlockingQueue(final int capacity) {
this.capacity = capacity;
this.buffer = new Entry[capacity];
Arrays.setAll(buffer, p -> new Entry<>());
reentrantLock = new ReentrantLock(true);
notEmpty = reentrantLock.newCondition();
notFull = reentrantLock.newCondition();
}

public RingBufferBlockingQueue() {
this(DEFAULT_CAPACITY);
}

@SneakyThrows
private void enqueue(final E element) {
while (isFull()) {
notFull.await();
}

final int nextWriteSeq = writeSequence.get() + 1;
buffer[wrap(nextWriteSeq)].setValue(element);
writeSequence.incrementAndGet();
notEmpty.signal();
}

@SneakyThrows
private E dequeue() {
while (isEmpty()) {
notEmpty.await();
}

final E nextValue = buffer[wrap(readSequence.get())].getValue();
readSequence.incrementAndGet();
notFull.signal();
return nextValue;
}

private int wrap(final int sequence) {
return sequence % capacity;
}

@Override
public int size() {
return (writeSequence.get() - readSequence.get()) + 1;
}

@Override
public boolean isEmpty() {
return writeSequence.get() < readSequence.get();
}

public boolean isFull() {
return size() >= capacity;
}

public int writeSequence() {
return writeSequence.get();
}

public int readSequence() {
return readSequence.get();
}

@Override
@SneakyThrows
public E peek() {
if (isEmpty()) {
return null;
}

return buffer[wrap(readSequence.get())].getValue();
}

@Override
@SneakyThrows
public void put(final E element) {
try {
reentrantLock.lock();
enqueue(element);
} finally {
reentrantLock.unlock();
}
}

@Override
@SneakyThrows
public E take() {
try {
reentrantLock.lock();
return dequeue();
} finally {
reentrantLock.unlock();
}
}

@Getter
@Setter
static class Entry<E> {

private E value;

}

@Override
public boolean offer(final E e) {
throw new UnsupportedOperationException();
}

@Override
public E poll() {
throw new UnsupportedOperationException();
}

@Override
public Iterator<E> iterator() {
throw new UnsupportedOperationException();
}

@Override
public boolean add(final E e) {
throw new UnsupportedOperationException();
}

@Override
public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException();
}

@Override
public E poll(final long timeout, final TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException();
}

@Override
public int remainingCapacity() {
throw new UnsupportedOperationException();
}

@Override
public int drainTo(final Collection<? super E> c) {
throw new UnsupportedOperationException();
}

@Override
public int drainTo(final Collection<? super E> c, final int maxElements) {
throw new UnsupportedOperationException();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void shutdown() {
scheduledExecutorService.shutdown();
if (!scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS)) {
LOGGER.warn("Scheduled executor service did not terminate in the specified time.");
final List<Runnable> droppedTasks = executorService.shutdownNow();
final List<Runnable> droppedTasks = scheduledExecutorService.shutdownNow();
LOGGER.warn("Scheduled executor service was abruptly shut down. {} tasks will not be executed.", droppedTasks.size());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.amazon.sns.messaging.lib.concurrent;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;

import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.junit.jupiter.api.Test;

import com.amazon.sns.messaging.lib.model.RequestEntry;

class RingBufferBlockingQueueTest {

private final ExecutorService producer = Executors.newSingleThreadExecutor();

private final ScheduledExecutorService consumer = Executors.newSingleThreadScheduledExecutor();

@Test
void testSuccess() throws InterruptedException {
final List<RequestEntry<Integer>> requestEntriesOut = new LinkedList<>();

final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = new RingBufferBlockingQueue<>(5120);

producer.submit(() -> {
IntStream.range(0, 100_000).forEach(value -> {
ringBlockingQueue.put(RequestEntry.<Integer>builder().withValue(value).build());
});
});

consumer.scheduleAtFixedRate(() -> {
while (!ringBlockingQueue.isEmpty()) {
final List<RequestEntry<Integer>> requestEntries = new LinkedList<>();

while ((requestEntries.size() < 10) && Objects.nonNull(ringBlockingQueue.peek())) {
requestEntries.add(ringBlockingQueue.take());
}

requestEntriesOut.addAll(requestEntries);
}
}, 0, 100L, TimeUnit.MILLISECONDS);

await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.writeSequence() == 99_999);
producer.shutdownNow();

await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.readSequence() == 100_000);
consumer.shutdownNow();

assertThat(ringBlockingQueue.isEmpty(), is(true));

assertThat(requestEntriesOut, hasSize(100_000));
requestEntriesOut.sort((a, b) -> a.getValue() - b.getValue());

for (int i = 0; i < 100_000; i++) {
assertThat(requestEntriesOut.get(i).getValue(), is(i));
}
}

}
2 changes: 1 addition & 1 deletion amazon-sns-java-messaging-lib-v1/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib</artifactId>
<version>1.0.6-SNAPSHOT</version>
<version>1.0.7-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import com.amazon.sns.messaging.lib.concurrent.RingBufferBlockingQueue;
import com.amazon.sns.messaging.lib.model.RequestEntry;
import com.amazon.sns.messaging.lib.model.TopicProperty;
import com.amazonaws.services.sns.AmazonSNS;
Expand Down Expand Up @@ -49,7 +49,7 @@ public AmazonSnsTemplate(final AmazonSNS amazonSNS, final TopicProperty topicPro
}

public AmazonSnsTemplate(final AmazonSNS amazonSNS, final TopicProperty topicProperty, final ObjectMapper objectMapper) {
this(amazonSNS, topicProperty, new ConcurrentHashMap<>(), new LinkedBlockingQueue<>(topicProperty.getMaximumPoolSize() * topicProperty.getMaxBatchSize()), objectMapper);
this(amazonSNS, topicProperty, new ConcurrentHashMap<>(), new RingBufferBlockingQueue<>(topicProperty.getMaximumPoolSize() * topicProperty.getMaxBatchSize()), objectMapper);
}

public AmazonSnsTemplate(final AmazonSNS amazonSNS, final TopicProperty topicProperty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

import org.apache.commons.lang3.RandomStringUtils;
Expand All @@ -39,6 +38,7 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import com.amazon.sns.messaging.lib.concurrent.RingBufferBlockingQueue;
import com.amazon.sns.messaging.lib.core.helper.ConsumerHelper;
import com.amazon.sns.messaging.lib.model.RequestEntry;
import com.amazon.sns.messaging.lib.model.ResponseFailEntry;
Expand Down Expand Up @@ -69,7 +69,7 @@ public void before() throws Exception {
.maximumPoolSize(10)
.topicArn("arn:aws:sns:us-east-2:000000000000:topic")
.build();
snsTemplate = new AmazonSnsTemplate<>(amazonSNS, topicProperty, new LinkedBlockingQueue<>(1024));
snsTemplate = new AmazonSnsTemplate<>(amazonSNS, topicProperty, new RingBufferBlockingQueue<>(1024));
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion amazon-sns-java-messaging-lib-v2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib</artifactId>
<version>1.0.6-SNAPSHOT</version>
<version>1.0.7-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Loading

0 comments on commit 0d2dc64

Please sign in to comment.