-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathnonblocking_consumer.java
36 lines (32 loc) · 1.62 KB
/
nonblocking_consumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#### A single app interested in consuming a known topic from beginning (asynchronously)
Method `consumeForeverNonBlocking` is the entry point.
```java
private CompletableFuture<StreamSet> setupConsumptionNonBlocking(String topicName, String namespace) {
ClientFactory clientFactory = new ClientFactoryImpl();
TopicConsumer topicConsumer = clientFactory.createTopicConsumer(ConsumerConfigsBuilder().foo().bar().build());
Topic topic = new Topic(topicName, namespace);
topicConsumer.describeRangesFor(Collections.singleton(topic))
.thenCompose((RangeDescription rangeDescription) -> {
List<Range> oldestRanges = rangeDescription.get(topic).get(0);
Set<RangePosition> positions = new Set<>();
for(Range range: oldestRanges) {
positions.add(RangePosition.beginning(range));
}
StreamSet streamSet = topicConsumer.createStreamSet();
streamSet.subscribe(positions);
return streamSet;
});
}
private CompletableFuture<StreamSet> continueConsumeNonBlocking(StreamSet streamSet) {
return streamSet.poll(42).thenComposeAsync((ConsumerRecords records) -> {
log.info("Received records");
return continueConsumeNonBlocking(streamSet);
});
}
public CompletableFuture<Void> consumeForeverNonBlocking(String topicName, String namespace) {
return setupConsumptionNonBlocking(topicName, namespace)
.thenAccept((StreamSet streamSet) -> {
continueConsumeNonBlocking(streamSet);
});
}
```