Skip to content

Commit

Permalink
#193: Order messages from latest to earliest
Browse files Browse the repository at this point in the history
Signed-off-by: Jonas Arnhold <[email protected]>
  • Loading branch information
jnsrnhld committed Jul 8, 2023
1 parent 76c9718 commit 9549cf7
Showing 1 changed file with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -48,7 +50,7 @@ private Set<MessageDto> filterBySubscription(Set<MessageDto> messageDtos, Intege

return messageDtos.stream()
.filter(m -> messageIds.contains(m.getMessageId()))
.collect(Collectors.toSet());
.collect(Collectors.toCollection(LinkedHashSet::new));
}

private List<String> peekMessageIds(String topic, String subscription, Integer numMessages) {
Expand All @@ -67,7 +69,7 @@ private List<String> peekMessageIds(String topic, String subscription, Integer n
private Set<MessageDto> filterByProducers(Set<MessageDto> messageDtos, List<String> producers) {
return messageDtos.stream()
.filter(m -> producers.contains(m.getProducer()))
.collect(Collectors.toSet());
.collect(Collectors.toCollection(LinkedHashSet::new));
}

private Set<MessageDto> getLatestMessagesOfTopic(String topic, Integer numMessages) {
Expand All @@ -86,7 +88,10 @@ private Set<MessageDto> getLatestMessagesOfTopic(String topic, Integer numMessag
}
return messages.stream()
.map(message -> MessageDto.fromExistingMessage(message, schema))
.collect(Collectors.toSet());
// latest message first in set
.sorted(Comparator.comparing(MessageDto::getPublishTime, Comparator.reverseOrder()))
// linked to keep the order!
.collect(Collectors.toCollection(LinkedHashSet::new));
} catch (PulsarAdminException e) {
throw new PulsarApiException(
"Could not examine the amount of '%d' messages for topic '%s'".formatted(numMessages, topic),
Expand Down

0 comments on commit 9549cf7

Please sign in to comment.