diff --git a/springwolf-asyncapi/src/main/java/io/github/springwolf/asyncapi/v3/bindings/amqp/AMQPChannelBinding.java b/springwolf-asyncapi/src/main/java/io/github/springwolf/asyncapi/v3/bindings/amqp/AMQPChannelBinding.java index 0c17d3e2f..87431cb7a 100644 --- a/springwolf-asyncapi/src/main/java/io/github/springwolf/asyncapi/v3/bindings/amqp/AMQPChannelBinding.java +++ b/springwolf-asyncapi/src/main/java/io/github/springwolf/asyncapi/v3/bindings/amqp/AMQPChannelBinding.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.github.springwolf.asyncapi.v3.bindings.ChannelBinding; +import io.github.springwolf.asyncapi.v3.model.channel.ChannelReference; import jakarta.validation.constraints.NotNull; import lombok.AllArgsConstructor; import lombok.Builder; @@ -29,6 +30,17 @@ public class AMQPChannelBinding extends ChannelBinding { @JsonProperty("exchange") private AMQPChannelExchangeProperties exchange; + /** + * When is=routingKey, this defines the actual routing pattern to route the message from the exchange to the queue. + */ + @JsonProperty("name") + private String name; + + /** + * When is=routingKey, this defines the target queue after routing the message (essentially the binding). + */ + private ChannelReference channel; + /** * When is=queue, this object defines the queue properties. */ @@ -37,5 +49,5 @@ public class AMQPChannelBinding extends ChannelBinding { @Builder.Default @JsonProperty(value = "bindingVersion") - private final String bindingVersion = "0.3.0"; + private final String bindingVersion = "0.4.0"; } diff --git a/springwolf-asyncapi/src/main/java/io/github/springwolf/asyncapi/v3/bindings/amqp/AMQPMessageBinding.java b/springwolf-asyncapi/src/main/java/io/github/springwolf/asyncapi/v3/bindings/amqp/AMQPMessageBinding.java index 4229e2390..149e4f61e 100644 --- a/springwolf-asyncapi/src/main/java/io/github/springwolf/asyncapi/v3/bindings/amqp/AMQPMessageBinding.java +++ b/springwolf-asyncapi/src/main/java/io/github/springwolf/asyncapi/v3/bindings/amqp/AMQPMessageBinding.java @@ -35,5 +35,5 @@ public class AMQPMessageBinding extends MessageBinding { */ @Builder.Default @JsonProperty(value = "bindingVersion") - private final String bindingVersion = "0.3.0"; + private final String bindingVersion = "0.4.0"; } diff --git a/springwolf-asyncapi/src/main/java/io/github/springwolf/asyncapi/v3/bindings/amqp/AMQPOperationBinding.java b/springwolf-asyncapi/src/main/java/io/github/springwolf/asyncapi/v3/bindings/amqp/AMQPOperationBinding.java index 416e388ac..4e1c3dbcf 100644 --- a/springwolf-asyncapi/src/main/java/io/github/springwolf/asyncapi/v3/bindings/amqp/AMQPOperationBinding.java +++ b/springwolf-asyncapi/src/main/java/io/github/springwolf/asyncapi/v3/bindings/amqp/AMQPOperationBinding.java @@ -109,5 +109,5 @@ public class AMQPOperationBinding extends OperationBinding { */ @Builder.Default @JsonProperty("bindingVersion") - private String bindingVersion = "0.3.0"; + private String bindingVersion = "0.4.0"; } diff --git a/springwolf-asyncapi/src/test/resources/v3/bindings/amqp/amqp-channel-queue.yaml b/springwolf-asyncapi/src/test/resources/v3/bindings/amqp/amqp-channel-queue.yaml index 922074113..3c5ff08d4 100644 --- a/springwolf-asyncapi/src/test/resources/v3/bindings/amqp/amqp-channel-queue.yaml +++ b/springwolf-asyncapi/src/test/resources/v3/bindings/amqp/amqp-channel-queue.yaml @@ -10,4 +10,4 @@ channels: exclusive: true autoDelete: false vhost: / - bindingVersion: 0.3.0 \ No newline at end of file + bindingVersion: 0.4.0 \ No newline at end of file diff --git a/springwolf-asyncapi/src/test/resources/v3/bindings/amqp/amqp-channel-routing.yaml b/springwolf-asyncapi/src/test/resources/v3/bindings/amqp/amqp-channel-routing.yaml index 0bcaed1fb..154429b94 100644 --- a/springwolf-asyncapi/src/test/resources/v3/bindings/amqp/amqp-channel-routing.yaml +++ b/springwolf-asyncapi/src/test/resources/v3/bindings/amqp/amqp-channel-routing.yaml @@ -10,4 +10,4 @@ channels: durable: true autoDelete: false vhost: / - bindingVersion: 0.3.0 \ No newline at end of file + bindingVersion: 0.4.0 \ No newline at end of file diff --git a/springwolf-asyncapi/src/test/resources/v3/bindings/amqp/amqp-message.yaml b/springwolf-asyncapi/src/test/resources/v3/bindings/amqp/amqp-message.yaml index 67b76119c..aa0cbc732 100644 --- a/springwolf-asyncapi/src/test/resources/v3/bindings/amqp/amqp-message.yaml +++ b/springwolf-asyncapi/src/test/resources/v3/bindings/amqp/amqp-message.yaml @@ -7,4 +7,4 @@ channels: amqp: contentEncoding: gzip messageType: 'user.signup' - bindingVersion: 0.3.0 \ No newline at end of file + bindingVersion: 0.4.0 \ No newline at end of file diff --git a/springwolf-asyncapi/src/test/resources/v3/bindings/amqp/amqp-operation.yaml b/springwolf-asyncapi/src/test/resources/v3/bindings/amqp/amqp-operation.yaml index 001049f1c..aee0d69d2 100644 --- a/springwolf-asyncapi/src/test/resources/v3/bindings/amqp/amqp-operation.yaml +++ b/springwolf-asyncapi/src/test/resources/v3/bindings/amqp/amqp-operation.yaml @@ -13,4 +13,4 @@ operations: bcc: ['external.audit'] timestamp: true ack: false - bindingVersion: 0.3.0 \ No newline at end of file + bindingVersion: 0.4.0 \ No newline at end of file diff --git a/springwolf-examples/springwolf-amqp-example/src/test/resources/asyncapi.json b/springwolf-examples/springwolf-amqp-example/src/test/resources/asyncapi.json index 8e3a1a802..d972a2737 100644 --- a/springwolf-examples/springwolf-amqp-example/src/test/resources/asyncapi.json +++ b/springwolf-examples/springwolf-amqp-example/src/test/resources/asyncapi.json @@ -34,6 +34,9 @@ }, "bindings": { "amqp": { + "channel": { + "$ref": "#/channels/queue-update" + }, "is": "routingKey", "exchange": { "name": "CRUD-topic-exchange-1", @@ -42,7 +45,8 @@ "autoDelete": false, "vhost": "/" }, - "bindingVersion": "0.3.0" + "name": "#", + "bindingVersion": "0.4.0" } } }, @@ -55,6 +59,9 @@ }, "bindings": { "amqp": { + "channel": { + "$ref": "#/channels/queue-read" + }, "is": "routingKey", "exchange": { "name": "CRUD-topic-exchange-2", @@ -63,7 +70,8 @@ "autoDelete": false, "vhost": "/" }, - "bindingVersion": "0.3.0" + "name": "#", + "bindingVersion": "0.4.0" } } }, @@ -84,7 +92,7 @@ "autoDelete": false, "vhost": "/" }, - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } } }, @@ -100,7 +108,7 @@ "autoDelete": true, "vhost": "/" }, - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } } }, @@ -121,7 +129,7 @@ "autoDelete": false, "vhost": "/" }, - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } } }, @@ -142,6 +150,9 @@ }, "bindings": { "amqp": { + "channel": { + "$ref": "#/channels/example-bindings-queue" + }, "is": "routingKey", "exchange": { "name": "example-topic-exchange", @@ -150,7 +161,8 @@ "autoDelete": false, "vhost": "/" }, - "bindingVersion": "0.3.0" + "name": "example-topic-routing-key", + "bindingVersion": "0.4.0" } } }, @@ -174,7 +186,7 @@ "autoDelete": false, "vhost": "/" }, - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } } }, @@ -195,7 +207,7 @@ "autoDelete": false, "vhost": "/" }, - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } } }, @@ -216,7 +228,7 @@ "autoDelete": false, "vhost": "/" }, - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } } }, @@ -232,7 +244,7 @@ "autoDelete": false, "vhost": "/" }, - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } } }, @@ -248,7 +260,7 @@ "autoDelete": false, "vhost": "/" }, - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } } } @@ -426,7 +438,7 @@ "title": "AnotherPayloadDto", "bindings": { "amqp": { - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } } }, @@ -444,7 +456,7 @@ "title": "ExamplePayloadDto", "bindings": { "amqp": { - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } } }, @@ -462,7 +474,7 @@ "title": "GenericPayloadDtoExamplePayloadDto", "bindings": { "amqp": { - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } } }, @@ -480,7 +492,7 @@ "title": "GenericPayloadDtoLong", "bindings": { "amqp": { - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } } }, @@ -498,7 +510,7 @@ "title": "GenericPayloadDtoString", "bindings": { "amqp": { - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } } } @@ -513,7 +525,7 @@ "bindings": { "amqp": { "expiration": 0, - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } }, "messages": [ @@ -530,7 +542,7 @@ "bindings": { "amqp": { "expiration": 0, - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } }, "messages": [ @@ -547,7 +559,7 @@ "bindings": { "amqp": { "expiration": 0, - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } }, "messages": [ @@ -564,7 +576,7 @@ "bindings": { "amqp": { "expiration": 0, - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } }, "messages": [ @@ -581,7 +593,7 @@ "bindings": { "amqp": { "expiration": 0, - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } }, "messages": [ @@ -607,7 +619,7 @@ "bcc": [ ], "timestamp": false, "ack": false, - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } }, "messages": [ @@ -624,7 +636,7 @@ "bindings": { "amqp": { "expiration": 0, - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } }, "messages": [ @@ -644,7 +656,7 @@ "bindings": { "amqp": { "expiration": 0, - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } }, "messages": [ @@ -661,7 +673,7 @@ "bindings": { "amqp": { "expiration": 0, - "bindingVersion": "0.3.0" + "bindingVersion": "0.4.0" } }, "messages": [ diff --git a/springwolf-examples/springwolf-amqp-example/src/test/resources/asyncapi.yaml b/springwolf-examples/springwolf-amqp-example/src/test/resources/asyncapi.yaml index ef6152692..0c6e657a3 100644 --- a/springwolf-examples/springwolf-amqp-example/src/test/resources/asyncapi.yaml +++ b/springwolf-examples/springwolf-amqp-example/src/test/resources/asyncapi.yaml @@ -26,6 +26,8 @@ channels: $ref: "#/components/messages/io.github.springwolf.examples.amqp.dtos.GenericPayloadDtoIo.github.springwolf.examples.amqp.dtos.ExamplePayloadDto" bindings: amqp: + channel: + $ref: "#/channels/queue-update" is: routingKey exchange: name: CRUD-topic-exchange-1 @@ -33,7 +35,8 @@ channels: durable: true autoDelete: false vhost: / - bindingVersion: 0.3.0 + name: "#" + bindingVersion: 0.4.0 CRUD-topic-exchange-2: address: CRUD-topic-exchange-2 messages: @@ -41,6 +44,8 @@ channels: $ref: "#/components/messages/io.github.springwolf.examples.amqp.dtos.ExamplePayloadDto" bindings: amqp: + channel: + $ref: "#/channels/queue-read" is: routingKey exchange: name: CRUD-topic-exchange-2 @@ -48,7 +53,8 @@ channels: durable: true autoDelete: false vhost: / - bindingVersion: 0.3.0 + name: "#" + bindingVersion: 0.4.0 another-queue: address: another-queue messages: @@ -63,7 +69,7 @@ channels: exclusive: false autoDelete: false vhost: / - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 example-bindings-queue: address: example-bindings-queue bindings: @@ -75,7 +81,7 @@ channels: exclusive: false autoDelete: true vhost: / - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 example-queue: address: example-queue messages: @@ -90,7 +96,7 @@ channels: exclusive: false autoDelete: false vhost: / - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 example-topic-exchange: address: example-topic-exchange messages: @@ -103,6 +109,8 @@ channels: $ref: "#/components/messages/io.github.springwolf.examples.amqp.dtos.ExamplePayloadDto" bindings: amqp: + channel: + $ref: "#/channels/example-bindings-queue" is: routingKey exchange: name: example-topic-exchange @@ -110,7 +118,8 @@ channels: durable: true autoDelete: false vhost: / - bindingVersion: 0.3.0 + name: example-topic-routing-key + bindingVersion: 0.4.0 multi-payload-queue: address: multi-payload-queue messages: @@ -127,7 +136,7 @@ channels: exclusive: false autoDelete: false vhost: / - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 queue-create: address: queue-create messages: @@ -142,7 +151,7 @@ channels: exclusive: false autoDelete: false vhost: / - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 queue-delete: address: queue-delete messages: @@ -157,7 +166,7 @@ channels: exclusive: false autoDelete: false vhost: / - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 queue-read: address: queue-read bindings: @@ -169,7 +178,7 @@ channels: exclusive: false autoDelete: false vhost: / - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 queue-update: address: queue-update bindings: @@ -181,7 +190,7 @@ channels: exclusive: false autoDelete: false vhost: / - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 components: schemas: HeadersNotDocumented: @@ -302,7 +311,7 @@ components: title: AnotherPayloadDto bindings: amqp: - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 io.github.springwolf.examples.amqp.dtos.ExamplePayloadDto: headers: $ref: "#/components/schemas/SpringRabbitListenerDefaultHeaders" @@ -314,7 +323,7 @@ components: title: ExamplePayloadDto bindings: amqp: - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 io.github.springwolf.examples.amqp.dtos.GenericPayloadDtoIo.github.springwolf.examples.amqp.dtos.ExamplePayloadDto: headers: $ref: "#/components/schemas/SpringRabbitListenerDefaultHeaders" @@ -326,7 +335,7 @@ components: title: GenericPayloadDtoExamplePayloadDto bindings: amqp: - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 io.github.springwolf.examples.amqp.dtos.GenericPayloadDtoJava.lang.Long: headers: $ref: "#/components/schemas/SpringRabbitListenerDefaultHeaders" @@ -338,7 +347,7 @@ components: title: GenericPayloadDtoLong bindings: amqp: - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 io.github.springwolf.examples.amqp.dtos.GenericPayloadDtoJava.lang.String: headers: $ref: "#/components/schemas/SpringRabbitListenerDefaultHeaders" @@ -350,7 +359,7 @@ components: title: GenericPayloadDtoString bindings: amqp: - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 operations: CRUD-topic-exchange-1_receive_bindingsUpdate: action: receive @@ -359,7 +368,7 @@ operations: bindings: amqp: expiration: 0 - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 messages: - $ref: "#/channels/CRUD-topic-exchange-1/messages/io.github.springwolf.examples.amqp.dtos.GenericPayloadDtoIo.github.springwolf.examples.amqp.dtos.ExamplePayloadDto" CRUD-topic-exchange-2_receive_bindingsRead: @@ -369,7 +378,7 @@ operations: bindings: amqp: expiration: 0 - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 messages: - $ref: "#/channels/CRUD-topic-exchange-2/messages/io.github.springwolf.examples.amqp.dtos.ExamplePayloadDto" another-queue_receive_receiveAnotherPayload: @@ -379,7 +388,7 @@ operations: bindings: amqp: expiration: 0 - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 messages: - $ref: "#/channels/another-queue/messages/io.github.springwolf.examples.amqp.dtos.AnotherPayloadDto" example-queue_receive_receiveExamplePayload: @@ -389,7 +398,7 @@ operations: bindings: amqp: expiration: 0 - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 messages: - $ref: "#/channels/example-queue/messages/io.github.springwolf.examples.amqp.dtos.ExamplePayloadDto" example-topic-exchange_example-topic-routing-key_receive_bindingsExample: @@ -399,7 +408,7 @@ operations: bindings: amqp: expiration: 0 - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 messages: - $ref: "#/channels/example-topic-exchange_example-topic-routing-key/messages/io.github.springwolf.examples.amqp.dtos.ExamplePayloadDto" example-topic-exchange_send_sendMessage: @@ -418,7 +427,7 @@ operations: bcc: [] timestamp: false ack: false - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 messages: - $ref: "#/channels/example-topic-exchange/messages/io.github.springwolf.examples.amqp.dtos.AnotherPayloadDto" multi-payload-queue_receive_bindingsBeanExample: @@ -428,7 +437,7 @@ operations: bindings: amqp: expiration: 0 - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 messages: - $ref: "#/channels/multi-payload-queue/messages/io.github.springwolf.examples.amqp.dtos.AnotherPayloadDto" - $ref: "#/channels/multi-payload-queue/messages/io.github.springwolf.examples.amqp.dtos.ExamplePayloadDto" @@ -439,7 +448,7 @@ operations: bindings: amqp: expiration: 0 - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 messages: - $ref: "#/channels/queue-create/messages/io.github.springwolf.examples.amqp.dtos.GenericPayloadDtoJava.lang.String" queue-delete_receive_queuesToDeclareDelete: @@ -449,6 +458,6 @@ operations: bindings: amqp: expiration: 0 - bindingVersion: 0.3.0 + bindingVersion: 0.4.0 messages: - $ref: "#/channels/queue-delete/messages/io.github.springwolf.examples.amqp.dtos.GenericPayloadDtoJava.lang.Long" diff --git a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/bindings/RabbitListenerUtil.java b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/bindings/RabbitListenerUtil.java index 581d53f9e..4a995d5e0 100644 --- a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/bindings/RabbitListenerUtil.java +++ b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/bindings/RabbitListenerUtil.java @@ -13,6 +13,7 @@ import io.github.springwolf.asyncapi.v3.bindings.amqp.AMQPOperationBinding; import io.github.springwolf.asyncapi.v3.model.ReferenceUtil; import io.github.springwolf.asyncapi.v3.model.channel.ChannelObject; +import io.github.springwolf.asyncapi.v3.model.channel.ChannelReference; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Exchange; @@ -28,6 +29,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -125,6 +127,18 @@ public static Map buildChannelBinding( } else { channelBinding.is(AMQPChannelType.ROUTING_KEY); channelBinding.exchange(buildExchangeProperties(annotation, exchangeName, context)); + + // Alternative: lookup the bindings from the context + QueueBinding[] queueBindings = annotation.bindings(); + if (queueBindings.length > 0) { + QueueBinding binding = queueBindings[0]; + String queueChannelId = + stringValueResolver.resolveStringValue(binding.value().name()); + String routingKey = stringValueResolver.resolveStringValue( + Arrays.stream(binding.key()).findFirst().orElse(DEFAULT_EXCHANGE_ROUTING_KEY)); + channelBinding.name(routingKey); + channelBinding.channel(ChannelReference.fromChannel(queueChannelId)); + } } return Map.of(BINDING_NAME, channelBinding.build()); @@ -192,7 +206,41 @@ private static AMQPChannelQueueProperties buildQueueProperties( .build(); } - public static ChannelObject buildChannelObject(org.springframework.amqp.core.Queue queue) { + private static String getExchangeName( + RabbitListener annotation, StringValueResolver stringValueResolver, RabbitListenerUtilContext context) { + String exchangeName = Stream.of(annotation.bindings()) + .map(binding -> binding.exchange().name()) + .map(stringValueResolver::resolveStringValue) + .filter(StringUtils::hasText) + .findFirst() + .orElse(null); + + Binding binding = context.bindingMap().get(getChannelName(annotation, stringValueResolver)); + if (exchangeName == null && binding != null) { + exchangeName = binding.getExchange(); + } + + if (exchangeName == null) { + // The amqp default exchange is represented with an empty string + exchangeName = ""; + } + + return exchangeName; + } + + public static Stream buildChannelObjectFromBeans( + List queues, List bindings) { + Map queueMap = queues.stream() + .map(RabbitListenerUtil::buildChannelObjectForQueue) + .collect(Collectors.toMap(ChannelObject::getChannelId, c -> c, (a, b) -> a)); + + Stream bindingStream = + bindings.stream().flatMap((binding) -> buildChannelObjectFromBeans(binding, queueMap)); + + return Stream.concat(bindingStream, queueMap.values().stream()); + } + + private static ChannelObject buildChannelObjectForQueue(org.springframework.amqp.core.Queue queue) { return ChannelObject.builder() .channelId(ReferenceUtil.toValidId(queue.getName())) .address(queue.getName()) @@ -210,15 +258,35 @@ public static ChannelObject buildChannelObject(org.springframework.amqp.core.Que .build(); } - public static List buildChannelObject(Binding binding) { - String exchangeId = channelIdFromAnnotationBindings( + private static ChannelObject buildChannelObjectForQueue(Binding binding) { + return ChannelObject.builder() + .channelId(ReferenceUtil.toValidId(binding.getDestination())) + .address(binding.getDestination()) + .bindings(Map.of( + BINDING_NAME, + AMQPChannelBinding.builder() + .is(AMQPChannelType.QUEUE) + .queue(AMQPChannelQueueProperties.builder() + .name(binding.getDestination()) + .build()) + .build())) + .build(); + } + + private static Stream buildChannelObjectFromBeans( + Binding binding, Map queueMap) { + String exchangeChannelId = channelIdFromAnnotationBindings( binding.getExchange(), List.of(binding.getRoutingKey()).toArray(String[]::new)) .findFirst() .get(); - return List.of( + + String queueChannelId = ReferenceUtil.toValidId(binding.getDestination()); + ChannelObject queue = queueMap.getOrDefault(queueChannelId, buildChannelObjectForQueue(binding)); + + return Stream.of( // exchange ChannelObject.builder() - .channelId(exchangeId) + .channelId(exchangeChannelId) .address(binding.getRoutingKey()) .bindings(Map.of( BINDING_NAME, @@ -227,21 +295,12 @@ public static List buildChannelObject(Binding binding) { .exchange(AMQPChannelExchangeProperties.builder() .name(binding.getExchange()) .build()) + .name(binding.getRoutingKey()) + .channel(ChannelReference.fromChannel(queueChannelId)) .build())) .build(), - // queue (where the exchange forwards the message to) - ChannelObject.builder() - .channelId(ReferenceUtil.toValidId(binding.getDestination())) - .address(binding.getDestination()) - .bindings(Map.of( - BINDING_NAME, - AMQPChannelBinding.builder() - .is(AMQPChannelType.QUEUE) - .queue(AMQPChannelQueueProperties.builder() - .name(binding.getDestination()) - .build()) - .build())) - .build()); + // queue (exchange forwards message to this queue) + queue); } private static Boolean parse(String value, Boolean defaultIfEmpty) { @@ -251,28 +310,6 @@ private static Boolean parse(String value, Boolean defaultIfEmpty) { return Boolean.valueOf(value); } - private static String getExchangeName( - RabbitListener annotation, StringValueResolver stringValueResolver, RabbitListenerUtilContext context) { - String exchangeName = Stream.of(annotation.bindings()) - .map(binding -> binding.exchange().name()) - .map(stringValueResolver::resolveStringValue) - .filter(StringUtils::hasText) - .findFirst() - .orElse(null); - - Binding binding = context.bindingMap().get(getChannelName(annotation, stringValueResolver)); - if (exchangeName == null && binding != null) { - exchangeName = binding.getExchange(); - } - - if (exchangeName == null) { - // The amqp default exchange is represented with an empty string - exchangeName = ""; - } - - return exchangeName; - } - public static Map buildOperationBinding( RabbitListener annotation, StringValueResolver resolver, RabbitListenerUtilContext context) { return Map.of(BINDING_NAME, AMQPOperationBinding.builder().build()); diff --git a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/channels/RabbitQueueBeanScanner.java b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/channels/RabbitQueueBeanScanner.java index 5406174eb..4a4257c4e 100644 --- a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/channels/RabbitQueueBeanScanner.java +++ b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/channels/RabbitQueueBeanScanner.java @@ -11,7 +11,6 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import java.util.stream.Stream; @RequiredArgsConstructor public class RabbitQueueBeanScanner implements ChannelsScanner { @@ -20,11 +19,7 @@ public class RabbitQueueBeanScanner implements ChannelsScanner { @Override public Map scan() { - return Stream.concat( - queues.stream().map(RabbitListenerUtil::buildChannelObject), - bindings.stream() - .map(RabbitListenerUtil::buildChannelObject) - .flatMap(List::stream)) + return RabbitListenerUtil.buildChannelObjectFromBeans(queues, bindings) .collect(Collectors.toMap(ChannelObject::getChannelId, c -> c, (a, b) -> a)); } } diff --git a/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/bindings/RabbitListenerUtilTest.java b/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/bindings/RabbitListenerUtilTest.java index d7d08954c..1fc3e53e4 100644 --- a/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/bindings/RabbitListenerUtilTest.java +++ b/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/bindings/RabbitListenerUtilTest.java @@ -11,10 +11,14 @@ import io.github.springwolf.asyncapi.v3.bindings.amqp.AMQPChannelType; import io.github.springwolf.asyncapi.v3.bindings.amqp.AMQPMessageBinding; import io.github.springwolf.asyncapi.v3.bindings.amqp.AMQPOperationBinding; +import io.github.springwolf.asyncapi.v3.model.channel.ChannelObject; +import io.github.springwolf.asyncapi.v3.model.channel.ChannelReference; import org.assertj.core.util.Sets; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; @@ -24,8 +28,10 @@ import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -354,6 +360,8 @@ void buildChannelBinding() { .durable(true) .autoDelete(false) .build()) + .channel(ChannelReference.fromChannel("queue-1")) + .name("#") .build(), channelBinding.get("amqp")); } @@ -379,6 +387,8 @@ void buildChannelBindingWithExchangeContext() { .durable(false) .autoDelete(true) .build()) + .channel(ChannelReference.fromChannel("queue-1")) + .name("#") .build(), channelBinding.get("amqp")); } @@ -407,6 +417,8 @@ void buildChannelBindingWithEmptyContext() { .durable(true) .autoDelete(false) .build()) + .channel(ChannelReference.fromChannel("queue-1")) + .name("#") .build(), channelBinding.get("amqp")); } @@ -528,6 +540,8 @@ void buildChannelBinding() { .durable(true) .autoDelete(false) .build()) + .channel(ChannelReference.fromChannel("queue-1")) + .name("routing-key") .build(), channelBinding.get("amqp")); } @@ -553,6 +567,8 @@ void buildChannelBindingWithEmptyContext() { .durable(true) .autoDelete(false) .build()) + .channel(ChannelReference.fromChannel("queue-1")) + .name("routing-key") .build(), channelBinding.get("amqp")); } @@ -611,6 +627,138 @@ private void methodWithAnnotation(String payload) {} } } + @Nested + class BuildChannelObjectFromBeans { + org.springframework.amqp.core.Queue queue = + new org.springframework.amqp.core.Queue("queue-1", false, true, true); + TopicExchange exchange = new TopicExchange("exchange-name"); + + @Test + void simpleBinding() { + // given + List bindings = + List.of(BindingBuilder.bind(queue).to(exchange).with("routing-key")); + + // when + Map channelObject = RabbitListenerUtil.buildChannelObjectFromBeans( + List.of(queue), bindings) + .collect(Collectors.toMap(ChannelObject::getChannelId, c -> c, (a, b) -> a)); + + // then + assertThat(channelObject) + .containsOnly( + Map.entry( + "queue-1", + ChannelObject.builder() + .channelId("queue-1") + .address(queue.getName()) + .bindings(Map.of( + "amqp", + AMQPChannelBinding.builder() + .is(AMQPChannelType.QUEUE) + .queue(AMQPChannelQueueProperties.builder() + .name(queue.getName()) + .durable(false) + .autoDelete(true) + .exclusive(true) + .build()) + .build())) + .build()), + Map.entry( + "exchange-name_routing-key", + ChannelObject.builder() + .channelId("exchange-name_routing-key") + .address("routing-key") + .bindings(Map.of( + "amqp", + AMQPChannelBinding.builder() + .is(AMQPChannelType.ROUTING_KEY) + .exchange(AMQPChannelExchangeProperties.builder() + .name("exchange-name") + .type(null) + .build()) + .channel(ChannelReference.fromChannel("queue-1")) + .name("routing-key") + .build())) + .build())); + } + + @Test + void onlyQueueBeanWithoutExchange() { + // when + Map channelObject = RabbitListenerUtil.buildChannelObjectFromBeans( + List.of(queue), List.of()) + .collect(Collectors.toMap(ChannelObject::getChannelId, c -> c, (a, b) -> a)); + + // then + assertThat(channelObject) + .containsOnly(Map.entry( + "queue-1", + ChannelObject.builder() + .channelId("queue-1") + .address(queue.getName()) + .bindings(Map.of( + "amqp", + AMQPChannelBinding.builder() + .is(AMQPChannelType.QUEUE) + .queue(AMQPChannelQueueProperties.builder() + .name(queue.getName()) + .durable(false) + .autoDelete(true) + .exclusive(true) + .build()) + .build())) + .build())); + } + + @Test + void oneBindingWithoutQueueBean() { + // given + List bindings = + List.of(BindingBuilder.bind(queue).to(exchange).with("routing-key")); + + // when + Map channelObject = RabbitListenerUtil.buildChannelObjectFromBeans( + List.of(), bindings) + .collect(Collectors.toMap(ChannelObject::getChannelId, c -> c, (a, b) -> a)); + + // then + assertThat(channelObject) + .containsOnly( + Map.entry( + "queue-1", + ChannelObject.builder() + .channelId("queue-1") + .address(queue.getName()) + .bindings(Map.of( + "amqp", + AMQPChannelBinding.builder() + .is(AMQPChannelType.QUEUE) + .queue(AMQPChannelQueueProperties.builder() + .name(queue.getName()) + .build()) + .build())) + .build()), + Map.entry( + "exchange-name_routing-key", + ChannelObject.builder() + .channelId("exchange-name_routing-key") + .address("routing-key") + .bindings(Map.of( + "amqp", + AMQPChannelBinding.builder() + .is(AMQPChannelType.ROUTING_KEY) + .exchange(AMQPChannelExchangeProperties.builder() + .name("exchange-name") + .type(null) + .build()) + .channel(ChannelReference.fromChannel("queue-1")) + .name("routing-key") + .build())) + .build())); + } + } + private static RabbitListener getAnnotation(Class clazz) { return clazz.getDeclaredMethods()[0].getAnnotation(RabbitListener.class); } diff --git a/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/channels/RabbitQueueBeanScannerTest.java b/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/channels/RabbitQueueBeanScannerTest.java index e7e94e0c2..9f95b4f1d 100644 --- a/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/channels/RabbitQueueBeanScannerTest.java +++ b/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/channels/RabbitQueueBeanScannerTest.java @@ -6,6 +6,7 @@ import io.github.springwolf.asyncapi.v3.bindings.amqp.AMQPChannelQueueProperties; import io.github.springwolf.asyncapi.v3.bindings.amqp.AMQPChannelType; import io.github.springwolf.asyncapi.v3.model.channel.ChannelObject; +import io.github.springwolf.asyncapi.v3.model.channel.ChannelReference; import org.junit.jupiter.api.Test; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Queue; @@ -57,6 +58,8 @@ void scan() { .exchange(AMQPChannelExchangeProperties.builder() .name("exchange") .build()) + .channel(ChannelReference.fromChannel("destination")) + .name("routingKey") .build())) .build(), "destination",