From a3a2b0aca333adff3008320f5106acc06cceb997 Mon Sep 17 00:00:00 2001 From: VajiraPrabuddhaka Date: Thu, 9 Jan 2025 22:15:06 +0530 Subject: [PATCH] [choreo] Fix: propely get AmqpOverWebsocketsEnabled flag --- adapter/internal/messaging/azure_listener.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/adapter/internal/messaging/azure_listener.go b/adapter/internal/messaging/azure_listener.go index f90a63f981..ebde0bbbca 100644 --- a/adapter/internal/messaging/azure_listener.go +++ b/adapter/internal/messaging/azure_listener.go @@ -65,7 +65,7 @@ func InitiateAndProcessEvents(config *config.Config) { subscription, err := msg.InitiateBrokerConnectionAndValidate( topic.ConnectionString, topic.TopicName, - getAmqpClientOptions(config), + getAmqpClientOptions(topic.AmqpOverWebsocketsEnabled), componentName, topic.ReconnectRetryCount, topic.ReconnectInterval*time.Millisecond, @@ -89,7 +89,7 @@ func InitiateAndProcessEvents(config *config.Config) { subscription, err := msg.InitiateBrokerConnectionAndValidate( connectionString, topic, - getAmqpClientOptions(config), + getAmqpClientOptions(config.ControlPlane.BrokerConnectionParameters.AmqpOverWebsocketsEnabled), componentName, reconnectRetryCount, reconnectInterval*time.Millisecond, @@ -118,8 +118,8 @@ func startChannelConsumer(consumerType string) { } } -func getAmqpClientOptions(config *config.Config) *azservicebus.ClientOptions { - if config.ControlPlane.BrokerConnectionParameters.AmqpOverWebsocketsEnabled { +func getAmqpClientOptions(isAmqpOverWebsocketsEnabled bool) *azservicebus.ClientOptions { + if isAmqpOverWebsocketsEnabled { logger.LoggerMgw.Info("AMQP over Websockets is enabled. Initiating brokers with AMQP over Websockets.") newWebSocketConnFn := func(ctx context.Context, args azservicebus.NewWebSocketConnArgs) (net.Conn, error) { opts := &websocket.DialOptions{Subprotocols: []string{"amqp"}}