From bed9ca4cdd7315776301a0c739ac18cc1d2af569 Mon Sep 17 00:00:00 2001 From: James Netherton Date: Sat, 21 Sep 2024 10:13:28 +0100 Subject: [PATCH] Add method to EventService that can accept a custom ExecutorService for the EventPoller Fixes #375 --- .../zulip/client/api/event/EventPoller.java | 2 +- .../zulip/client/api/event/EventService.java | 15 +++ .../api/integration/event/ZulipEventIT.java | 107 +++++++++++------- 3 files changed, 82 insertions(+), 42 deletions(-) diff --git a/src/main/java/com/github/jamesnetherton/zulip/client/api/event/EventPoller.java b/src/main/java/com/github/jamesnetherton/zulip/client/api/event/EventPoller.java index bacbe874..a1b9702d 100644 --- a/src/main/java/com/github/jamesnetherton/zulip/client/api/event/EventPoller.java +++ b/src/main/java/com/github/jamesnetherton/zulip/client/api/event/EventPoller.java @@ -148,7 +148,7 @@ public synchronized void stop() { executor.shutdown(); } - if (userManagedEventListenerExecutorService) { + if (!userManagedEventListenerExecutorService) { eventListenerExecutorService.shutdown(); eventListenerExecutorService = null; } diff --git a/src/main/java/com/github/jamesnetherton/zulip/client/api/event/EventService.java b/src/main/java/com/github/jamesnetherton/zulip/client/api/event/EventService.java index 6616e77e..c02096bc 100644 --- a/src/main/java/com/github/jamesnetherton/zulip/client/api/event/EventService.java +++ b/src/main/java/com/github/jamesnetherton/zulip/client/api/event/EventService.java @@ -3,6 +3,7 @@ import com.github.jamesnetherton.zulip.client.api.core.ZulipService; import com.github.jamesnetherton.zulip.client.api.narrow.Narrow; import com.github.jamesnetherton.zulip.client.http.ZulipHttpClient; +import java.util.concurrent.ExecutorService; /** * Zulip event APIs. @@ -33,4 +34,18 @@ public EventService(ZulipHttpClient client) { public EventPoller captureMessageEvents(MessageEventListener listener, Narrow... narrows) { return new EventPoller(this.client, listener, narrows); } + + /** + * Capture message events. + * + * @param listener The {@link MessageEventListener} to be invoked on each message event + * @param executorService Custom {@link ExecutorService} to use for processing message events + * @param narrows optional {@link Narrow} expressions to filter which message events are captured. E.g. messages + * from a + * specific stream + * @return {@link EventPoller} to initiate event polling + */ + public EventPoller captureMessageEvents(MessageEventListener listener, ExecutorService executorService, Narrow... narrows) { + return new EventPoller(this.client, listener, narrows, executorService); + } } diff --git a/src/test/java/com/github/jamesnetherton/zulip/client/api/integration/event/ZulipEventIT.java b/src/test/java/com/github/jamesnetherton/zulip/client/api/integration/event/ZulipEventIT.java index 572496ae..37b66149 100644 --- a/src/test/java/com/github/jamesnetherton/zulip/client/api/integration/event/ZulipEventIT.java +++ b/src/test/java/com/github/jamesnetherton/zulip/client/api/integration/event/ZulipEventIT.java @@ -1,5 +1,6 @@ package com.github.jamesnetherton.zulip.client.api.integration.event; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import com.github.jamesnetherton.zulip.client.api.event.EventPoller; @@ -16,6 +17,8 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.junit.jupiter.api.Test; @@ -24,20 +27,38 @@ public class ZulipEventIT extends ZulipIntegrationTestBase { @Test public void messageEvents() throws Exception { - CountDownLatch latch = new CountDownLatch(3); + assertMessageEvents(null); + } + + @Test + public void messageEventsWithCustomExecutor() throws Exception { + ExecutorService executorService = Executors.newFixedThreadPool(2); + try { + assertMessageEvents(executorService); + } finally { + assertFalse(executorService.isShutdown()); + executorService.shutdown(); + } + } + + @Test + public void messageEventsWithNarrow() throws Exception { + CountDownLatch latch = new CountDownLatch(5); List messages = new ArrayList<>(); - String streamName = UUID.randomUUID().toString().split("-")[0]; - StreamSubscriptionRequest subscriptionRequest = StreamSubscriptionRequest.of(streamName, streamName); - StreamService streamService = zulip.streams(); - streamService.subscribe(subscriptionRequest).execute(); + String streamA = UUID.randomUUID().toString().split("-")[0]; + String streamB = UUID.randomUUID().toString().split("-")[0]; + + zulip.streams().subscribe( + StreamSubscriptionRequest.of(streamA, streamA), + StreamSubscriptionRequest.of(streamB, streamB)).execute(); for (int i = 0; i < 10; i++) { - List streams = streamService.getAll().execute(); + List streams = zulip.streams().getAll().execute(); List matches = streams.stream() - .filter(stream -> stream.getName().equals(streamName)) + .filter(stream -> stream.getName().equals(streamA) || stream.getName().equals(streamB)) .collect(Collectors.toList()); - if (matches.size() == 1) { + if (matches.size() == 2) { break; } Thread.sleep(500); @@ -49,80 +70,84 @@ public void onEvent(Message event) { messages.add(event.getContent()); latch.countDown(); } - }); + }, Narrow.of("stream", streamA), Narrow.of("is", "stream")); try { eventPoller.start(); MessageService messageService = zulip.messages(); - for (int i = 0; i < 3; i++) { - messageService.sendStreamMessage("Test Content " + i, streamName, "testtopic").execute(); + for (int i = 0; i < 10; i++) { + String streamName = i % 2 == 0 ? streamA : streamB; + messageService.sendStreamMessage("Stream " + streamName + " Content " + i, streamName, "testtopic").execute(); } assertTrue(latch.await(30, TimeUnit.SECONDS)); - for (int i = 0; i < 3; i++) { - int finalI = i; - assertTrue(messages.stream().anyMatch(message -> message.equals("Test Content " + finalI))); + int count = 0; + for (int i = 0; i < 5; i++) { + int finalCount = count; + assertTrue( + messages.stream().anyMatch(message -> message.equals("Stream " + streamA + " Content " + finalCount))); + + count += 2; } - } catch (ZulipClientException e) { - e.printStackTrace(); - throw e; } finally { eventPoller.stop(); } } - @Test - public void messageEventsWithNarrow() throws Exception { - CountDownLatch latch = new CountDownLatch(5); + private void assertMessageEvents(ExecutorService executorService) throws Exception { + CountDownLatch latch = new CountDownLatch(3); List messages = new ArrayList<>(); - String streamA = UUID.randomUUID().toString().split("-")[0]; - String streamB = UUID.randomUUID().toString().split("-")[0]; - - zulip.streams().subscribe( - StreamSubscriptionRequest.of(streamA, streamA), - StreamSubscriptionRequest.of(streamB, streamB)).execute(); + String streamName = UUID.randomUUID().toString().split("-")[0]; + StreamSubscriptionRequest subscriptionRequest = StreamSubscriptionRequest.of(streamName, streamName); + StreamService streamService = zulip.streams(); + streamService.subscribe(subscriptionRequest).execute(); for (int i = 0; i < 10; i++) { - List streams = zulip.streams().getAll().execute(); + List streams = streamService.getAll().execute(); List matches = streams.stream() - .filter(stream -> stream.getName().equals(streamA) || stream.getName().equals(streamB)) + .filter(stream -> stream.getName().equals(streamName)) .collect(Collectors.toList()); - if (matches.size() == 2) { + if (matches.size() == 1) { break; } Thread.sleep(500); } - EventPoller eventPoller = zulip.events().captureMessageEvents(new MessageEventListener() { + MessageEventListener listener = new MessageEventListener() { @Override public void onEvent(Message event) { messages.add(event.getContent()); latch.countDown(); } - }, Narrow.of("stream", streamA), Narrow.of("is", "stream")); + }; + + EventPoller eventPoller; + if (executorService != null) { + eventPoller = zulip.events().captureMessageEvents(listener, executorService); + } else { + eventPoller = zulip.events().captureMessageEvents(listener); + } try { eventPoller.start(); MessageService messageService = zulip.messages(); - for (int i = 0; i < 10; i++) { - String streamName = i % 2 == 0 ? streamA : streamB; - messageService.sendStreamMessage("Stream " + streamName + " Content " + i, streamName, "testtopic").execute(); + for (int i = 0; i < 3; i++) { + messageService.sendStreamMessage("Test Content " + i, streamName, "testtopic").execute(); } assertTrue(latch.await(30, TimeUnit.SECONDS)); - int count = 0; - for (int i = 0; i < 5; i++) { - int finalCount = count; - assertTrue( - messages.stream().anyMatch(message -> message.equals("Stream " + streamA + " Content " + finalCount))); - - count += 2; + for (int i = 0; i < 3; i++) { + int finalI = i; + assertTrue(messages.stream().anyMatch(message -> message.equals("Test Content " + finalI))); } + } catch (ZulipClientException e) { + e.printStackTrace(); + throw e; } finally { eventPoller.stop(); }