Skip to content

Commit

Permalink
Add method to EventService that can accept a custom ExecutorService f…
Browse files Browse the repository at this point in the history
…or the EventPoller

Fixes #375
  • Loading branch information
jamesnetherton committed Sep 21, 2024
1 parent 273c26d commit bed9ca4
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public synchronized void stop() {
executor.shutdown();
}

if (userManagedEventListenerExecutorService) {
if (!userManagedEventListenerExecutorService) {
eventListenerExecutorService.shutdown();
eventListenerExecutorService = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.github.jamesnetherton.zulip.client.api.core.ZulipService;
import com.github.jamesnetherton.zulip.client.api.narrow.Narrow;
import com.github.jamesnetherton.zulip.client.http.ZulipHttpClient;
import java.util.concurrent.ExecutorService;

/**
* Zulip event APIs.
Expand Down Expand Up @@ -33,4 +34,18 @@ public EventService(ZulipHttpClient client) {
public EventPoller captureMessageEvents(MessageEventListener listener, Narrow... narrows) {
return new EventPoller(this.client, listener, narrows);
}

/**
* Capture message events.
*
* @param listener The {@link MessageEventListener} to be invoked on each message event
* @param executorService Custom {@link ExecutorService} to use for processing message events
* @param narrows optional {@link Narrow} expressions to filter which message events are captured. E.g. messages
* from a
* specific stream
* @return {@link EventPoller} to initiate event polling
*/
public EventPoller captureMessageEvents(MessageEventListener listener, ExecutorService executorService, Narrow... narrows) {
return new EventPoller(this.client, listener, narrows, executorService);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.github.jamesnetherton.zulip.client.api.integration.event;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.github.jamesnetherton.zulip.client.api.event.EventPoller;
Expand All @@ -16,6 +17,8 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
Expand All @@ -24,20 +27,38 @@ public class ZulipEventIT extends ZulipIntegrationTestBase {

@Test
public void messageEvents() throws Exception {
CountDownLatch latch = new CountDownLatch(3);
assertMessageEvents(null);
}

@Test
public void messageEventsWithCustomExecutor() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(2);
try {
assertMessageEvents(executorService);
} finally {
assertFalse(executorService.isShutdown());
executorService.shutdown();
}
}

@Test
public void messageEventsWithNarrow() throws Exception {
CountDownLatch latch = new CountDownLatch(5);
List<String> messages = new ArrayList<>();

String streamName = UUID.randomUUID().toString().split("-")[0];
StreamSubscriptionRequest subscriptionRequest = StreamSubscriptionRequest.of(streamName, streamName);
StreamService streamService = zulip.streams();
streamService.subscribe(subscriptionRequest).execute();
String streamA = UUID.randomUUID().toString().split("-")[0];
String streamB = UUID.randomUUID().toString().split("-")[0];

zulip.streams().subscribe(
StreamSubscriptionRequest.of(streamA, streamA),
StreamSubscriptionRequest.of(streamB, streamB)).execute();

for (int i = 0; i < 10; i++) {
List<Stream> streams = streamService.getAll().execute();
List<Stream> streams = zulip.streams().getAll().execute();
List<Stream> matches = streams.stream()
.filter(stream -> stream.getName().equals(streamName))
.filter(stream -> stream.getName().equals(streamA) || stream.getName().equals(streamB))
.collect(Collectors.toList());
if (matches.size() == 1) {
if (matches.size() == 2) {
break;
}
Thread.sleep(500);
Expand All @@ -49,80 +70,84 @@ public void onEvent(Message event) {
messages.add(event.getContent());
latch.countDown();
}
});
}, Narrow.of("stream", streamA), Narrow.of("is", "stream"));

try {
eventPoller.start();

MessageService messageService = zulip.messages();
for (int i = 0; i < 3; i++) {
messageService.sendStreamMessage("Test Content " + i, streamName, "testtopic").execute();
for (int i = 0; i < 10; i++) {
String streamName = i % 2 == 0 ? streamA : streamB;
messageService.sendStreamMessage("Stream " + streamName + " Content " + i, streamName, "testtopic").execute();
}

assertTrue(latch.await(30, TimeUnit.SECONDS));

for (int i = 0; i < 3; i++) {
int finalI = i;
assertTrue(messages.stream().anyMatch(message -> message.equals("Test Content " + finalI)));
int count = 0;
for (int i = 0; i < 5; i++) {
int finalCount = count;
assertTrue(
messages.stream().anyMatch(message -> message.equals("Stream " + streamA + " Content " + finalCount)));

count += 2;
}
} catch (ZulipClientException e) {
e.printStackTrace();
throw e;
} finally {
eventPoller.stop();
}
}

@Test
public void messageEventsWithNarrow() throws Exception {
CountDownLatch latch = new CountDownLatch(5);
private void assertMessageEvents(ExecutorService executorService) throws Exception {
CountDownLatch latch = new CountDownLatch(3);
List<String> messages = new ArrayList<>();

String streamA = UUID.randomUUID().toString().split("-")[0];
String streamB = UUID.randomUUID().toString().split("-")[0];

zulip.streams().subscribe(
StreamSubscriptionRequest.of(streamA, streamA),
StreamSubscriptionRequest.of(streamB, streamB)).execute();
String streamName = UUID.randomUUID().toString().split("-")[0];
StreamSubscriptionRequest subscriptionRequest = StreamSubscriptionRequest.of(streamName, streamName);
StreamService streamService = zulip.streams();
streamService.subscribe(subscriptionRequest).execute();

for (int i = 0; i < 10; i++) {
List<Stream> streams = zulip.streams().getAll().execute();
List<Stream> streams = streamService.getAll().execute();
List<Stream> matches = streams.stream()
.filter(stream -> stream.getName().equals(streamA) || stream.getName().equals(streamB))
.filter(stream -> stream.getName().equals(streamName))
.collect(Collectors.toList());
if (matches.size() == 2) {
if (matches.size() == 1) {
break;
}
Thread.sleep(500);
}

EventPoller eventPoller = zulip.events().captureMessageEvents(new MessageEventListener() {
MessageEventListener listener = new MessageEventListener() {
@Override
public void onEvent(Message event) {
messages.add(event.getContent());
latch.countDown();
}
}, Narrow.of("stream", streamA), Narrow.of("is", "stream"));
};

EventPoller eventPoller;
if (executorService != null) {
eventPoller = zulip.events().captureMessageEvents(listener, executorService);
} else {
eventPoller = zulip.events().captureMessageEvents(listener);
}

try {
eventPoller.start();

MessageService messageService = zulip.messages();
for (int i = 0; i < 10; i++) {
String streamName = i % 2 == 0 ? streamA : streamB;
messageService.sendStreamMessage("Stream " + streamName + " Content " + i, streamName, "testtopic").execute();
for (int i = 0; i < 3; i++) {
messageService.sendStreamMessage("Test Content " + i, streamName, "testtopic").execute();
}

assertTrue(latch.await(30, TimeUnit.SECONDS));

int count = 0;
for (int i = 0; i < 5; i++) {
int finalCount = count;
assertTrue(
messages.stream().anyMatch(message -> message.equals("Stream " + streamA + " Content " + finalCount)));

count += 2;
for (int i = 0; i < 3; i++) {
int finalI = i;
assertTrue(messages.stream().anyMatch(message -> message.equals("Test Content " + finalI)));
}
} catch (ZulipClientException e) {
e.printStackTrace();
throw e;
} finally {
eventPoller.stop();
}
Expand Down

0 comments on commit bed9ca4

Please sign in to comment.