Skip to content

Commit

Permalink
#193: Handle messages as Sets instead of Lists
Browse files Browse the repository at this point in the history
Signed-off-by: Jonas Arnhold <[email protected]>
  • Loading branch information
jnsrnhld committed Jul 8, 2023
1 parent 67b17ac commit 71d27b9
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.springframework.web.bind.annotation.RestController;

import java.util.List;
import java.util.Set;

@RestController
@RequestMapping("/messages")
Expand All @@ -30,8 +31,10 @@ public class MessageController {
public ResponseEntity<MessagesDto> getMessages(@RequestParam String topic,
@RequestParam(required = false, defaultValue = "10") Integer numMessages,
@RequestParam(required = false, defaultValue = "") List<String> producers,
@RequestParam(required = false, defaultValue = "") List<String> subscriptions) {
List<MessageDto> messageDtos = messageService.getLatestMessagesFiltered(topic, numMessages, producers, subscriptions);
@RequestParam(required = false, defaultValue = "") List<String> subscriptions)
{
Set<MessageDto> messageDtos = messageService.getLatestMessagesFiltered(topic, numMessages, producers, subscriptions);
return new ResponseEntity<>(new MessagesDto(messageDtos), HttpStatus.OK);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.List;
import java.util.Set;


@Data
@AllArgsConstructor
public class MessagesDto {

private List<MessageDto> messages;
private Set<MessageDto> messages;

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@Service
@Slf4j
@RequiredArgsConstructor
public class MessageService {
private final PulsarAdmin pulsarAdmin;

public List<MessageDto> getLatestMessagesFiltered(String topic, Integer numMessages, List<String> producers, List<String> subscriptions) {
List<MessageDto> messageDtos = getLatestMessagesOfTopic(topic, numMessages);
public Set<MessageDto> getLatestMessagesFiltered(String topic, Integer numMessages, List<String> producers, List<String> subscriptions) {
Set<MessageDto> messageDtos = getLatestMessagesOfTopic(topic, numMessages);
if (!producers.isEmpty()) {
messageDtos = filterByProducers(messageDtos, producers);
}
Expand All @@ -39,12 +41,14 @@ public List<MessageDto> getLatestMessagesFiltered(String topic, Integer numMessa
return messageDtos;
}

private List<MessageDto> filterBySubscription(List<MessageDto> messageDtos, Integer numMessages, String topic, List<String> subscriptions) {
private Set<MessageDto> filterBySubscription(Set<MessageDto> messageDtos, Integer numMessages, String topic, List<String> subscriptions) {
List<String> messageIds = subscriptions.stream()
.flatMap(s -> peekMessageIds(topic, s, numMessages).stream())
.toList();

return messageDtos.stream().filter(m -> messageIds.contains(m.getMessageId())).toList();
return messageDtos.stream()
.filter(m -> messageIds.contains(m.getMessageId()))
.collect(Collectors.toSet());
}

private List<String> peekMessageIds(String topic, String subscription, Integer numMessages) {
Expand All @@ -60,14 +64,13 @@ private List<String> peekMessageIds(String topic, String subscription, Integer n
}


private List<MessageDto> filterByProducers(List<MessageDto> messageDtos, List<String> producers) {
private Set<MessageDto> filterByProducers(Set<MessageDto> messageDtos, List<String> producers) {
return messageDtos.stream()
.filter(m -> producers.contains(m.getProducer()))
.toList();

.collect(Collectors.toSet());
}

private List<MessageDto> getLatestMessagesOfTopic(String topic, Integer numMessages) {
private Set<MessageDto> getLatestMessagesOfTopic(String topic, Integer numMessages) {
var schema = getSchemaIfExists(topic);
try {
var messages = new ArrayList<Message<byte[]>>();
Expand All @@ -83,7 +86,7 @@ private List<MessageDto> getLatestMessagesOfTopic(String topic, Integer numMessa
}
return messages.stream()
.map(message -> MessageDto.fromExistingMessage(message, schema))
.toList();
.collect(Collectors.toSet());
} catch (PulsarAdminException e) {
throw new PulsarApiException(
"Could not examine the amount of '%d' messages for topic '%s'".formatted(numMessages, topic),
Expand Down

0 comments on commit 71d27b9

Please sign in to comment.