From e034cc87713caf9113984eeda33756fb6aca1826 Mon Sep 17 00:00:00 2001 From: Djordje Djukic Date: Tue, 26 Mar 2024 13:57:32 +0100 Subject: [PATCH] RavenDB-21956 Add azure queue storage etl RavenDB-21956 wip RavenDB-21956 RavenDB-21956 wip RavenDB-21956 tests wip RavenDB-21956 wip RavenDB-21956 AzureStorageQueue -> AzureQueueStorage RavenDB-21956 Add passwordless option --- .../AzureQueueStorageConnectionSettings.cs | 51 ++- .../ETL/Queue/QueueConnectionString.cs | 3 +- .../ETL/Queue/QueueEtlConfiguration.cs | 6 +- .../Config/Categories/EtlConfiguration.cs | 12 + .../Dashboard/DatabasesInfoRetriever.cs | 9 +- .../Dashboard/DatabasesOngoingTasksInfo.cs | 3 + src/Raven.Server/Documents/ETL/EtlProcess.cs | 16 + .../EtlHandlerProcessorForProgress.cs | 2 + .../AzureQueueStorage/AzureQueueStorageEtl.cs | 133 ++++-- ...essorForTestAzureQueueStorageConnection.cs | 64 +++ .../Handlers/QueueEtlConnectionHandler.cs | 7 + .../Queue/QueueBrokerConnectionHelper.cs | 45 +- .../ShardedQueueEtlConnectionHandler.cs | 7 + .../ETL/Queue/AzureQueueStorageEtlTestBase.cs | 42 +- .../ETL/Queue/AzureQueueStorageEtlTests.cs | 408 +++++++++++++++++- 15 files changed, 724 insertions(+), 84 deletions(-) create mode 100644 src/Raven.Server/Documents/ETL/Providers/Queue/Handlers/Processors/QueueEtlHandlerProcessorForTestAzureQueueStorageConnection.cs diff --git a/src/Raven.Client/Documents/Operations/ETL/Queue/AzureQueueStorageConnectionSettings.cs b/src/Raven.Client/Documents/Operations/ETL/Queue/AzureQueueStorageConnectionSettings.cs index 1d3beb031315..64be25aa693f 100644 --- a/src/Raven.Client/Documents/Operations/ETL/Queue/AzureQueueStorageConnectionSettings.cs +++ b/src/Raven.Client/Documents/Operations/ETL/Queue/AzureQueueStorageConnectionSettings.cs @@ -1,26 +1,54 @@ +using System; +using System.Linq; using Sparrow.Json.Parsing; namespace Raven.Client.Documents.Operations.ETL.Queue; public sealed class AzureQueueStorageConnectionSettings { - // TODO djordje: what should be input, minutes? - public int TimeToLive { get; set; } - - public int VisibilityTimeout { get; set; } - public Authentication Authentication; + public string GetStorageUrl() + { + string storageAccountName = GetStorageAccountName(); + return $"https://{storageAccountName}.queue.core.windows.net/"; + } + + public string GetStorageAccountName() + { + string storageAccountName = ""; + + if (Authentication.ConnectionString != null) + { + var accountNamePart = Authentication.ConnectionString.Split(';') + .FirstOrDefault(part => part.StartsWith("AccountName=", StringComparison.OrdinalIgnoreCase)); + + if (accountNamePart == null) + { + throw new ArgumentException("Storage account name not found in the connection string.", + nameof(Authentication.ConnectionString)); + } + + storageAccountName = accountNamePart.Substring("AccountName=".Length); + } + else if (Authentication.EntraId != null) + { + storageAccountName = Authentication.EntraId.StorageAccountName; + } + + return storageAccountName; + } + public DynamicJsonValue ToJson() { var json = new DynamicJsonValue { - [nameof(TimeToLive)] = TimeToLive, - [nameof(VisibilityTimeout)] = VisibilityTimeout, [nameof(Authentication)] = Authentication == null ? null : new DynamicJsonValue { + [nameof(Authentication.ConnectionString)] = Authentication.ConnectionString, + [nameof(Authentication.Passwordless)] = Authentication.Passwordless, [nameof(Authentication.EntraId)] = Authentication.EntraId == null ? null @@ -34,13 +62,7 @@ public DynamicJsonValue ToJson() Authentication?.EntraId?.ClientId, [nameof(Authentication.EntraId.ClientSecret)] = Authentication?.EntraId?.ClientSecret - }, - [nameof(Authentication.ConnectionString)] = Authentication.ConnectionString == null - ? null - : new DynamicJsonValue - { - [nameof(Authentication.ConnectionString)] = Authentication?.ConnectionString - } + } } }; @@ -52,6 +74,7 @@ public sealed class Authentication { public EntraId EntraId { get; set; } public string ConnectionString { get; set; } + public bool Passwordless { get; set; } } public sealed class EntraId diff --git a/src/Raven.Client/Documents/Operations/ETL/Queue/QueueConnectionString.cs b/src/Raven.Client/Documents/Operations/ETL/Queue/QueueConnectionString.cs index 13103f9acf12..93f4a33ac51e 100644 --- a/src/Raven.Client/Documents/Operations/ETL/Queue/QueueConnectionString.cs +++ b/src/Raven.Client/Documents/Operations/ETL/Queue/QueueConnectionString.cs @@ -46,7 +46,6 @@ protected override void ValidateImpl(ref List errors) internal string GetUrl() { - // TODO djordje: what to do with this, no connection string defined in this moment? string url; switch (BrokerType) @@ -62,7 +61,7 @@ internal string GetUrl() url = indexOfStartServerUri != -1 ? connectionString.Substring(indexOfStartServerUri + 1) : null; break; case QueueBrokerType.AzureQueueStorage: - url = "azure-queue-storage"; + url = AzureQueueStorageConnectionSettings.GetStorageUrl(); break; default: throw new NotSupportedException($"'{BrokerType}' broker is not supported"); diff --git a/src/Raven.Client/Documents/Operations/ETL/Queue/QueueEtlConfiguration.cs b/src/Raven.Client/Documents/Operations/ETL/Queue/QueueEtlConfiguration.cs index 57f1cce49deb..1dbcc42b1415 100644 --- a/src/Raven.Client/Documents/Operations/ETL/Queue/QueueEtlConfiguration.cs +++ b/src/Raven.Client/Documents/Operations/ETL/Queue/QueueEtlConfiguration.cs @@ -47,7 +47,11 @@ public override bool UsingEncryptedCommunicationChannel() } break; case QueueBrokerType.RabbitMq: - return Connection.RabbitMqConnectionSettings.ConnectionString.StartsWith("amqps", StringComparison.OrdinalIgnoreCase); + return Connection.RabbitMqConnectionSettings.ConnectionString.StartsWith("amqps", + StringComparison.OrdinalIgnoreCase); + case QueueBrokerType.AzureQueueStorage: + return Connection.AzureQueueStorageConnectionSettings.GetStorageUrl() + .StartsWith("https", StringComparison.OrdinalIgnoreCase); default: throw new NotSupportedException($"Unknown broker type: {BrokerType}"); } diff --git a/src/Raven.Server/Config/Categories/EtlConfiguration.cs b/src/Raven.Server/Config/Categories/EtlConfiguration.cs index 122b5497e710..bec32b3946de 100644 --- a/src/Raven.Server/Config/Categories/EtlConfiguration.cs +++ b/src/Raven.Server/Config/Categories/EtlConfiguration.cs @@ -52,5 +52,17 @@ public sealed class EtlConfiguration : ConfigurationCategory [TimeUnit(TimeUnit.Seconds)] [ConfigurationEntry("ETL.Queue.Kafka.InitTransactionsTimeoutInSec", ConfigurationEntryScope.ServerWideOrPerDatabase)] public TimeSetting KafkaInitTransactionsTimeout { get; set; } + + [Description("Lifespan of a message in the queue")] + [DefaultValue(604800)] + [TimeUnit(TimeUnit.Seconds)] + [ConfigurationEntry("ETL.Queue.AzureQueueStorage.TimeToLiveInSec", ConfigurationEntryScope.ServerWideOrPerDatabase)] + public TimeSetting AzureQueueStorageTimeToLive{ get; set; } + + [Description("How long a message is hidden after being retrieved but not deleted")] + [DefaultValue(0)] + [TimeUnit(TimeUnit.Seconds)] + [ConfigurationEntry("ETL.Queue.AzureQueueStorage.VisibilityTimeoutInSec", ConfigurationEntryScope.ServerWideOrPerDatabase)] + public TimeSetting AzureQueueStorageVisibilityTimeout{ get; set; } } } diff --git a/src/Raven.Server/Dashboard/DatabasesInfoRetriever.cs b/src/Raven.Server/Dashboard/DatabasesInfoRetriever.cs index 8ee48c5822f4..6d8b95baab9b 100644 --- a/src/Raven.Server/Dashboard/DatabasesInfoRetriever.cs +++ b/src/Raven.Server/Dashboard/DatabasesInfoRetriever.cs @@ -349,6 +349,10 @@ private static DatabaseOngoingTasksInfoItem GetOngoingTasksInfoItem(DocumentData long rabbitMqEtlCountOnNode = GetTaskCountOnNode(database, dbRecord, serverStore, database.EtlLoader.QueueDestinations, task => EtlLoader.GetProcessState(task.Transforms, database, task.Name), task => task.BrokerType == QueueBrokerType.RabbitMq); + var azureQueueStorageEtlCount = database.EtlLoader.GetQueueDestinationCountByBroker(QueueBrokerType.AzureQueueStorage); + long azureQueueStorageEtlCountOnNode = GetTaskCountOnNode(database, dbRecord, serverStore, database.EtlLoader.QueueDestinations, + task => EtlLoader.GetProcessState(task.Transforms, database, task.Name), task => task.BrokerType == QueueBrokerType.AzureQueueStorage); + var periodicBackupCount = database.PeriodicBackupRunner.PeriodicBackups.Count; long periodicBackupCountOnNode = BackupUtils.GetTasksCountOnNode(serverStore, database.Name, context); @@ -364,8 +368,8 @@ private static DatabaseOngoingTasksInfoItem GetOngoingTasksInfoItem(DocumentData task => QueueSinkLoader.GetProcessState(task.Scripts, database, task.Name), task => task.BrokerType == QueueBrokerType.RabbitMq); ongoingTasksCount = extRepCount + replicationHubCount + replicationSinkCount + - ravenEtlCount + sqlEtlCount + elasticSearchEtlCount + olapEtlCount + kafkaEtlCount + rabbitMqEtlCount + - periodicBackupCount + subscriptionCount + + ravenEtlCount + sqlEtlCount + elasticSearchEtlCount + olapEtlCount + kafkaEtlCount + + rabbitMqEtlCount + azureQueueStorageEtlCount + periodicBackupCount + subscriptionCount + kafkaSinkCount + rabbitMqSinkCount; return new DatabaseOngoingTasksInfoItem @@ -380,6 +384,7 @@ private static DatabaseOngoingTasksInfoItem GetOngoingTasksInfoItem(DocumentData OlapEtlCount = olapEtlCountOnNode, KafkaEtlCount = kafkaEtlCountOnNode, RabbitMqEtlCount = rabbitMqEtlCountOnNode, + AzureQueueStorageEtlCount = azureQueueStorageEtlCountOnNode, PeriodicBackupCount = periodicBackupCountOnNode, SubscriptionCount = subscriptionCountOnNode, KafkaSinkCount = kafkaSinkCountOnNode, diff --git a/src/Raven.Server/Dashboard/DatabasesOngoingTasksInfo.cs b/src/Raven.Server/Dashboard/DatabasesOngoingTasksInfo.cs index 1e797fc118cb..ada479948f07 100644 --- a/src/Raven.Server/Dashboard/DatabasesOngoingTasksInfo.cs +++ b/src/Raven.Server/Dashboard/DatabasesOngoingTasksInfo.cs @@ -34,6 +34,8 @@ public sealed class DatabaseOngoingTasksInfoItem : IDynamicJson public long KafkaEtlCount { get; set; } public long RabbitMqEtlCount { get; set; } + + public long AzureQueueStorageEtlCount { get; set; } public long PeriodicBackupCount { get; set; } @@ -57,6 +59,7 @@ public DynamicJsonValue ToJson() [nameof(OlapEtlCount)] = OlapEtlCount, [nameof(KafkaEtlCount)] = KafkaEtlCount, [nameof(RabbitMqEtlCount)] = RabbitMqEtlCount, + [nameof(AzureQueueStorageEtlCount)] = AzureQueueStorageEtlCount, [nameof(PeriodicBackupCount)] = PeriodicBackupCount, [nameof(SubscriptionCount)] = SubscriptionCount, [nameof(KafkaSinkCount)] = KafkaSinkCount, diff --git a/src/Raven.Server/Documents/ETL/EtlProcess.cs b/src/Raven.Server/Documents/ETL/EtlProcess.cs index 8b2185a92510..7acaa8a40c48 100644 --- a/src/Raven.Server/Documents/ETL/EtlProcess.cs +++ b/src/Raven.Server/Documents/ETL/EtlProcess.cs @@ -23,6 +23,7 @@ using Raven.Server.Documents.ETL.Providers.OLAP; using Raven.Server.Documents.ETL.Providers.OLAP.Test; using Raven.Server.Documents.ETL.Providers.Queue; +using Raven.Server.Documents.ETL.Providers.Queue.AzureQueueStorage; using Raven.Server.Documents.ETL.Providers.Queue.Kafka; using Raven.Server.Documents.ETL.Providers.Queue.RabbitMq; using Raven.Server.Documents.ETL.Providers.Raven; @@ -1328,6 +1329,21 @@ public static IDisposable TestScript( result = rabbitMqEtl.RunTest(results, context); result.DebugOutput = debugOutput; + return tx; + } + case AzureQueueStorageEtl azureQueueStorageEtl: + using (azureQueueStorageEtl.EnterTestMode(out debugOutput)) + { + azureQueueStorageEtl.EnsureThreadAllocationStats(); + + var queueItem = new QueueItem(document, docCollection); + + var results = azureQueueStorageEtl.Transform(new[] { queueItem }, context, new EtlStatsScope(new EtlRunStats()), + new EtlProcessState()); + + result = azureQueueStorageEtl.RunTest(results, context); + result.DebugOutput = debugOutput; + return tx; } default: diff --git a/src/Raven.Server/Documents/ETL/Handlers/Processors/EtlHandlerProcessorForProgress.cs b/src/Raven.Server/Documents/ETL/Handlers/Processors/EtlHandlerProcessorForProgress.cs index da9859c8c574..b48e15636a29 100644 --- a/src/Raven.Server/Documents/ETL/Handlers/Processors/EtlHandlerProcessorForProgress.cs +++ b/src/Raven.Server/Documents/ETL/Handlers/Processors/EtlHandlerProcessorForProgress.cs @@ -2,6 +2,7 @@ using System.Threading.Tasks; using JetBrains.Annotations; using Raven.Client.Documents.Operations.ETL.Queue; +using Raven.Server.Documents.ETL.Providers.Queue.AzureQueueStorage; using Raven.Server.Documents.ETL.Providers.Queue.Kafka; using Raven.Server.Documents.ETL.Providers.Queue.RabbitMq; using Raven.Server.Documents.ETL.Stats; @@ -37,6 +38,7 @@ protected override async ValueTask HandleCurrentNodeAsync() { RabbitMqEtl => QueueBrokerType.RabbitMq, KafkaEtl => QueueBrokerType.Kafka, + AzureQueueStorageEtl => QueueBrokerType.AzureQueueStorage, _ => null } }).ToArray(); diff --git a/src/Raven.Server/Documents/ETL/Providers/Queue/AzureQueueStorage/AzureQueueStorageEtl.cs b/src/Raven.Server/Documents/ETL/Providers/Queue/AzureQueueStorage/AzureQueueStorageEtl.cs index 901ebcd21917..3ee3daba3bf9 100644 --- a/src/Raven.Server/Documents/ETL/Providers/Queue/AzureQueueStorage/AzureQueueStorageEtl.cs +++ b/src/Raven.Server/Documents/ETL/Providers/Queue/AzureQueueStorage/AzureQueueStorageEtl.cs @@ -1,16 +1,17 @@ using System; using System.Collections.Generic; -using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; using Azure.Storage.Queues; -using Newtonsoft.Json; -using Raven.Client.Documents.Conventions; +using CloudNative.CloudEvents; +using CloudNative.CloudEvents.Extensions; using Raven.Client.Documents.Operations.ETL; using Raven.Client.Documents.Operations.ETL.Queue; -using Raven.Client.Json.Serialization.NewtonsoftJson.Internal; using Raven.Server.Documents.ETL.Stats; -using Raven.Server.Json.Converters; +using Raven.Server.Exceptions.ETL.QueueEtl; using Raven.Server.ServerWide; using Raven.Server.ServerWide.Context; +using Sparrow.Json; namespace Raven.Server.Documents.ETL.Providers.Queue.AzureQueueStorage; @@ -41,18 +42,17 @@ protected override int PublishMessages(List(); - + idsToDelete = []; int count = 0; foreach (QueueWithItems queue in itemsPerQueue) { string queueName = queue.Name.ToLower(); - + if (_queueClients.TryGetValue(queueName, out QueueClient queueClient) == false) { queueClient = - QueueBrokerConnectionHelper.CreateAzureStorageQueueClient( + QueueBrokerConnectionHelper.CreateAzureQueueStorageClient( Configuration.Connection.AzureQueueStorageConnectionSettings, queueName); _queueClients.Add(queueName, queueClient); } @@ -62,32 +62,43 @@ protected override int PublishMessages(List { - var jsonSerializer = (NewtonsoftJsonJsonSerializer)DocumentConventions.DefaultForServer.Serialization.CreateSerializer(); - jsonSerializer.Converters.Add(SliceJsonConverter.Instance); - jsonSerializer.Converters.Add(Json.Converters.BlittableJsonConverter.Instance); - jsonSerializer.Converters.Add(LazyStringValueJsonConverter.Instance); - jsonSerializer.Converters.Add(StreamConverter.Instance); - jsonSerializer.Converters.Add(BlittableJsonReaderArrayConverter.Instance); - jsonSerializer.Converters.Add(CounterOperationConverter.Instance); - return jsonSerializer; + public static readonly CloudEventConverter Instance = new CloudEventConverter(); + + const string SpecVersionAttributeName = "specversion"; + + private CloudEventConverter() + { + } + + public override void Write(Utf8JsonWriter writer, CloudEvent cloudEvent, JsonSerializerOptions options) + { + writer.WriteStartObject(); + + writer.WritePropertyName(SpecVersionAttributeName); + writer.WriteStringValue(cloudEvent.SpecVersion.VersionId); + + foreach (var pair in cloudEvent.GetPopulatedAttributes()) + { + var attribute = pair.Key; + if (attribute == cloudEvent.SpecVersion.DataContentTypeAttribute || + attribute.Name == Partitioning.PartitionKeyAttribute.Name) + { + continue; + } + + var value = attribute.Format(pair.Value); + + writer.WritePropertyName(attribute.Name); + writer.WriteStringValue(value); + } + + writer.WritePropertyName("data"); + writer.WriteRawValue(((BlittableJsonReaderObject)cloudEvent.Data).ToString()); + + writer.WriteEndObject(); + } + + public override CloudEvent Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + throw new NotImplementedException(); + } } } diff --git a/src/Raven.Server/Documents/ETL/Providers/Queue/Handlers/Processors/QueueEtlHandlerProcessorForTestAzureQueueStorageConnection.cs b/src/Raven.Server/Documents/ETL/Providers/Queue/Handlers/Processors/QueueEtlHandlerProcessorForTestAzureQueueStorageConnection.cs new file mode 100644 index 000000000000..5cabcd10a873 --- /dev/null +++ b/src/Raven.Server/Documents/ETL/Providers/Queue/Handlers/Processors/QueueEtlHandlerProcessorForTestAzureQueueStorageConnection.cs @@ -0,0 +1,64 @@ +using System; +using System.IO; +using System.Threading.Tasks; +using Azure.Storage.Queues; +using JetBrains.Annotations; +using Newtonsoft.Json; +using Raven.Client.Documents.Operations.ETL.Queue; +using Raven.Server.Documents.Handlers.Processors; +using Raven.Server.Web.System; +using Sparrow.Json; +using Sparrow.Json.Parsing; + +namespace Raven.Server.Documents.ETL.Providers.Queue.Handlers.Processors; + +internal sealed class + QueueEtlHandlerProcessorForTestAzureQueueStorageConnection : + AbstractDatabaseHandlerProcessor + where TOperationContext : JsonOperationContext + where TRequestHandler : AbstractDatabaseRequestHandler +{ + public QueueEtlHandlerProcessorForTestAzureQueueStorageConnection([NotNull] TRequestHandler requestHandler) : + base(requestHandler) + { + } + + public override async ValueTask ExecuteAsync() + { + try + { + string authenticationJson = await new StreamReader(HttpContext.Request.Body).ReadToEndAsync(); + Authentication authentication = JsonConvert.DeserializeObject(authenticationJson); + + var connectionSettings = new AzureQueueStorageConnectionSettings() { Authentication = authentication }; + + QueueServiceClient client = + QueueBrokerConnectionHelper.CreateAzureQueueStorageServiceClient(connectionSettings); + + await client.GetStatisticsAsync(); + + DynamicJsonValue result = new() { [nameof(NodeConnectionTestResult.Success)] = true, }; + using (ServerStore.ContextPool.AllocateOperationContext(out JsonOperationContext context)) + await using (AsyncBlittableJsonTextWriter writer = new(context, RequestHandler.ResponseBodyStream())) + { + context.Write(writer, result); + } + } + catch (Exception ex) + { + using (ServerStore.ContextPool.AllocateOperationContext(out JsonOperationContext context)) + { + await using (var writer = + new AsyncBlittableJsonTextWriter(context, RequestHandler.ResponseBodyStream())) + { + context.Write(writer, + new DynamicJsonValue + { + [nameof(NodeConnectionTestResult.Success)] = false, + [nameof(NodeConnectionTestResult.Error)] = ex.ToString() + }); + } + } + } + } +} diff --git a/src/Raven.Server/Documents/ETL/Providers/Queue/Handlers/QueueEtlConnectionHandler.cs b/src/Raven.Server/Documents/ETL/Providers/Queue/Handlers/QueueEtlConnectionHandler.cs index 5b86230320f5..994ce31c1245 100644 --- a/src/Raven.Server/Documents/ETL/Providers/Queue/Handlers/QueueEtlConnectionHandler.cs +++ b/src/Raven.Server/Documents/ETL/Providers/Queue/Handlers/QueueEtlConnectionHandler.cs @@ -20,5 +20,12 @@ public async Task GetTestRabbitMqConnectionResult() using (var processor = new QueueEtlHandlerProcessorForTestRabbitMqConnection(this)) await processor.ExecuteAsync(); } + + [RavenAction("/databases/*/admin/etl/queue/azurequeuestorage/test-connection", "POST", AuthorizationStatus.DatabaseAdmin)] + public async Task GetTestAzureQueueStorageConnectionResult() + { + using (var processor = new QueueEtlHandlerProcessorForTestAzureQueueStorageConnection(this)) + await processor.ExecuteAsync(); + } } } diff --git a/src/Raven.Server/Documents/ETL/Providers/Queue/QueueBrokerConnectionHelper.cs b/src/Raven.Server/Documents/ETL/Providers/Queue/QueueBrokerConnectionHelper.cs index 0e330db83725..55a1e1eef95c 100644 --- a/src/Raven.Server/Documents/ETL/Providers/Queue/QueueBrokerConnectionHelper.cs +++ b/src/Raven.Server/Documents/ETL/Providers/Queue/QueueBrokerConnectionHelper.cs @@ -1,8 +1,10 @@ using System; using System.Collections.Generic; using System.IO; +using Azure.Core.Pipeline; using Azure.Identity; using Azure.Storage.Queues; +using Azure.Storage.Queues.Models; using Confluent.Kafka; using NetTopologySuite.IO; using Org.BouncyCastle.Utilities.IO.Pem; @@ -87,10 +89,10 @@ public static IConnection CreateRabbitMqConnection(RabbitMqConnectionSettings se return connectionFactory.CreateConnection(); } - public static QueueClient CreateAzureStorageQueueClient( + public static QueueClient CreateAzureQueueStorageClient( AzureQueueStorageConnectionSettings azureQueueStorageConnectionSettings, string queueName) { - QueueClient queueClient = null; + QueueClient queueClient; if (azureQueueStorageConnectionSettings.Authentication.ConnectionString != null) { @@ -100,9 +102,7 @@ public static QueueClient CreateAzureStorageQueueClient( else if (azureQueueStorageConnectionSettings.Authentication.EntraId != null) { - var queueUri = - new Uri( - $"https://{azureQueueStorageConnectionSettings.Authentication.EntraId.StorageAccountName}.queue.core.windows.net/{queueName}"); + var queueUri = new Uri($"{azureQueueStorageConnectionSettings.GetStorageUrl()}{queueName}"); queueClient = new QueueClient( queueUri, @@ -111,7 +111,42 @@ public static QueueClient CreateAzureStorageQueueClient( azureQueueStorageConnectionSettings.Authentication.EntraId.ClientId, azureQueueStorageConnectionSettings.Authentication.EntraId.ClientSecret)); } + else if(azureQueueStorageConnectionSettings.Authentication.Passwordless) + { + var queueUri = new Uri($"{azureQueueStorageConnectionSettings.GetStorageUrl()}{queueName}"); + queueClient = new QueueClient(queueUri, new DefaultAzureCredential()); + } + else + { + throw new NotSupportedException("Provided authentication method is not supported"); + } return queueClient; } + + public static QueueServiceClient CreateAzureQueueStorageServiceClient( + AzureQueueStorageConnectionSettings azureQueueStorageConnectionSettings) + { + QueueServiceClient queueServiceClient = null; + + if (azureQueueStorageConnectionSettings.Authentication.ConnectionString != null) + { + queueServiceClient = + new QueueServiceClient(azureQueueStorageConnectionSettings.Authentication.ConnectionString); + } + + else if (azureQueueStorageConnectionSettings.Authentication.EntraId != null) + { + var queueUri = new Uri(azureQueueStorageConnectionSettings.GetStorageUrl()); + + queueServiceClient = new QueueServiceClient( + queueUri, + new ClientSecretCredential( + azureQueueStorageConnectionSettings.Authentication.EntraId.TenantId, + azureQueueStorageConnectionSettings.Authentication.EntraId.ClientId, + azureQueueStorageConnectionSettings.Authentication.EntraId.ClientSecret)); + } + + return queueServiceClient; + } } diff --git a/src/Raven.Server/Documents/Sharding/Handlers/ShardedQueueEtlConnectionHandler.cs b/src/Raven.Server/Documents/Sharding/Handlers/ShardedQueueEtlConnectionHandler.cs index 27e6f5fb9bc2..a4ee96311e3e 100644 --- a/src/Raven.Server/Documents/Sharding/Handlers/ShardedQueueEtlConnectionHandler.cs +++ b/src/Raven.Server/Documents/Sharding/Handlers/ShardedQueueEtlConnectionHandler.cs @@ -20,5 +20,12 @@ public async Task GetTestRabbitMqConnectionResult() using (var processor = new QueueEtlHandlerProcessorForTestRabbitMqConnection(this)) await processor.ExecuteAsync(); } + + [RavenShardedAction("/databases/*/admin/etl/queue/azurequeuestorage/test-connection", "POST")] + public async Task GetTestAzureQueueStorageConnectionResult() + { + using (var processor = new QueueEtlHandlerProcessorForTestAzureQueueStorageConnection(this)) + await processor.ExecuteAsync(); + } } } diff --git a/test/SlowTests/Server/Documents/ETL/Queue/AzureQueueStorageEtlTestBase.cs b/test/SlowTests/Server/Documents/ETL/Queue/AzureQueueStorageEtlTestBase.cs index 1231eecb098a..7527c46a7c3a 100644 --- a/test/SlowTests/Server/Documents/ETL/Queue/AzureQueueStorageEtlTestBase.cs +++ b/test/SlowTests/Server/Documents/ETL/Queue/AzureQueueStorageEtlTestBase.cs @@ -1,5 +1,7 @@ using System.Collections.Generic; using System.Linq; +using Azure.Storage.Queues; +using Azure.Storage.Queues.Models; using Raven.Client.Documents; using Raven.Client.Documents.Operations.ETL; using Raven.Client.Documents.Operations.ETL.Queue; @@ -12,13 +14,16 @@ public class AzureQueueStorageEtlTestBase : QueueEtlTestBase public AzureQueueStorageEtlTestBase(ITestOutputHelper output) : base(output) { } - - protected string OrdersQueueName => $"orders"; + + protected string OrdersQueueName => "orders"; protected readonly string[] DefaultCollections = { "orders" }; + protected readonly string ConnectionString = + "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;"; + protected List DefaultExchanges => new() { new EtlQueue { Name = OrdersQueueName } }; - + protected string DefaultScript => @" var orderData = { Id: id(this), @@ -40,8 +45,7 @@ protected QueueEtlConfiguration SetupQueueEtlToAzureQueueStorageOnline(DocumentS Dictionary configuration = null, string connectionString = null, bool skipAutomaticQueueDeclaration = false) { - // todo djordje: url? - var connectionStringName = $"{store.Database}@{"store.Urls.First()"} to AzureQueueStorage"; + var connectionStringName = $"{store.Database}@{store.Urls.First()} to AzureQueueStorage"; Transformation transformation = new Transformation { @@ -60,11 +64,6 @@ protected QueueEtlConfiguration SetupQueueEtlToAzureQueueStorageOnline(DocumentS SkipAutomaticQueueDeclaration = skipAutomaticQueueDeclaration }; - /*foreach (var queue in queues?.Select(x => x.Name).ToArray() ?? transformation.GetCollectionsFromScript()) - { - _definedExchangesAndQueues.Add(queue); - }*/ - Etl.AddEtl(store, config, new QueueConnectionString { @@ -72,18 +71,25 @@ protected QueueEtlConfiguration SetupQueueEtlToAzureQueueStorageOnline(DocumentS BrokerType = QueueBrokerType.AzureQueueStorage, AzureQueueStorageConnectionSettings = new AzureQueueStorageConnectionSettings { - Authentication = new global::Raven.Client.Documents.Operations.ETL.Queue.Authentication() + Authentication = new global::Raven.Client.Documents.Operations.ETL.Queue.Authentication { - EntraId = new EntraId - { - StorageAccountName = "", - ClientId = "", - TenantId = "", - ClientSecret = "" - } + ConnectionString = ConnectionString } } }); return config; } + + protected static QueueClient CreateAzureQueueStorageClient(string connectionString, string queueName) + { + return new QueueClient(connectionString, queueName, + new QueueClientOptions() { MessageEncoding = QueueMessageEncoding.Base64 }); + } + + protected static QueueMessage[] ReceiveAndDeleteMessages(QueueClient queueClient, int numberOfMessages = 1) + { + var messages = queueClient.ReceiveMessages(numberOfMessages); + messages.Value.ToList().ForEach(message => queueClient.DeleteMessage(message.MessageId, message.PopReceipt)); + return messages.Value.ToArray(); + } } diff --git a/test/SlowTests/Server/Documents/ETL/Queue/AzureQueueStorageEtlTests.cs b/test/SlowTests/Server/Documents/ETL/Queue/AzureQueueStorageEtlTests.cs index 9a93b29a9837..dde64bcfe311 100644 --- a/test/SlowTests/Server/Documents/ETL/Queue/AzureQueueStorageEtlTests.cs +++ b/test/SlowTests/Server/Documents/ETL/Queue/AzureQueueStorageEtlTests.cs @@ -1,7 +1,23 @@ using System; using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Azure.Storage.Queues; +using Azure.Storage.Queues.Models; +using Newtonsoft.Json; +using Raven.Client.Documents.Operations.ConnectionStrings; +using Raven.Client.Documents.Operations.ETL; +using Raven.Client.Documents.Operations.ETL.Queue; +using Raven.Client.Documents.Smuggler; +using Raven.Client.Exceptions.Sharding; +using Raven.Client.ServerWide.Operations; +using Raven.Server.Documents.ETL.Providers.Queue; +using Raven.Server.Documents.ETL.Providers.Queue.Test; +using Raven.Server.ServerWide.Context; +using Tests.Infrastructure; using Xunit; using Xunit.Abstractions; +using QueueItem = Raven.Server.Documents.ETL.Providers.Queue.QueueItem; namespace SlowTests.Server.Documents.ETL.Queue; @@ -10,7 +26,7 @@ public class AzureQueueStorageEtlTests : AzureQueueStorageEtlTestBase public AzureQueueStorageEtlTests(ITestOutputHelper output) : base(output) { } - + [Fact] public void SimpleScript() { @@ -34,9 +50,367 @@ public void SimpleScript() } AssertEtlDone(etlDone, TimeSpan.FromMinutes(1), store.Database, config); + + QueueClient queueClient = CreateAzureQueueStorageClient(ConnectionString, OrdersQueueName); + var message = ReceiveAndDeleteMessages(queueClient).Single(); + var order = JsonConvert.DeserializeObject(message.MessageText).Data; + + Assert.NotNull(order); + Assert.Equal(order.Id, "orders/1-A"); + Assert.Equal(order.OrderLinesCount, 2); + Assert.Equal(order.TotalCost, 10); + } + } + + [Fact] + public void Error_if_script_does_not_contain_any_loadTo_method() + { + var config = new QueueEtlConfiguration + { + Name = "test", + ConnectionStringName = "test", + BrokerType = QueueBrokerType.RabbitMq, + Transforms = + { + new Transformation + { + Name = "test", Collections = { "Orders" }, Script = @"this.TotalCost = 10;" + } + } + }; + + config.Initialize(new QueueConnectionString + { + Name = "Foo", + BrokerType = QueueBrokerType.RabbitMq, + RabbitMqConnectionSettings = + new RabbitMqConnectionSettings() { ConnectionString = "amqp://guest:guest@localhost:5672/" } + }); + + List errors; + config.Validate(out errors); + + Assert.Equal(1, errors.Count); + + Assert.Equal("No `loadTo()` method call found in 'test' script", errors[0]); + } + + [Fact] + public void ShardedAzureQueueStorageEtlNotSupported() + { + using (var store = Sharding.GetDocumentStore()) + { + var error = Assert.ThrowsAny(() => + { + SetupQueueEtlToAzureQueueStorageOnline(store, DefaultScript, DefaultCollections); + }); + Assert.Contains("Queue ETLs are currently not supported in sharding", error.Message); + } + } + + [Fact] + public void TestAreHeadersPresent() + { + using (var store = GetDocumentStore()) + { + var config = SetupQueueEtlToAzureQueueStorageOnline(store, DefaultScript, DefaultCollections); + var etlDone = Etl.WaitForEtlToComplete(store); + + using (var session = store.OpenSession()) + { + session.Store(new Order + { + Id = "orders/1-A", + OrderLines = new List + { + new OrderLine { Cost = 3, Product = "Milk", Quantity = 2 }, + new OrderLine { Cost = 4, Product = "Bear", Quantity = 1 }, + } + }); + session.SaveChanges(); + } + + AssertEtlDone(etlDone, TimeSpan.FromMinutes(1), store.Database, config); + + QueueClient queueClient = CreateAzureQueueStorageClient(ConnectionString, OrdersQueueName); + var message = ReceiveAndDeleteMessages(queueClient).Single(); + var orderCloudEvent = JsonConvert.DeserializeObject(message.MessageText); + + Assert.True(string.IsNullOrWhiteSpace(orderCloudEvent.Id) == false); + Assert.True(string.IsNullOrWhiteSpace(orderCloudEvent.Specversion) == false); + Assert.True(string.IsNullOrWhiteSpace(orderCloudEvent.Type) == false); + Assert.True(string.IsNullOrWhiteSpace(orderCloudEvent.Source) == false); + } + } + + [Fact] + public void SimpleScriptWithManyDocuments() + { + using var store = GetDocumentStore(); + + var numberOfOrders = 10; + var numberOfLinesPerOrder = 2; + + var config = SetupQueueEtlToAzureQueueStorageOnline(store, DefaultScript, DefaultCollections); + var etlDone = + Etl.WaitForEtlToComplete(store, (n, statistics) => statistics.LastProcessedEtag >= numberOfOrders); + + for (int i = 0; i < numberOfOrders; i++) + { + using (var session = store.OpenSession()) + { + Order order = new Order { OrderLines = new List() }; + + for (int j = 0; j < numberOfLinesPerOrder; j++) + { + order.OrderLines.Add(new OrderLine + { + Cost = j + 1, Product = "foos/" + j, Quantity = (i * j) % 10 + }); + } + + session.Store(order, "orders/" + i); + + session.SaveChanges(); + } + } + + AssertEtlDone(etlDone, TimeSpan.FromMinutes(1), store.Database, config); + + QueueClient queueClient = CreateAzureQueueStorageClient(ConnectionString, OrdersQueueName); + QueueMessage[] messages = ReceiveAndDeleteMessages(queueClient, numberOfOrders); + + List orders = messages + .Select(message => JsonConvert.DeserializeObject(message.MessageText)).ToList(); + + Assert.Equal(numberOfOrders, orders.Count); + + for (int counter = 0; counter < numberOfOrders; counter++) + { + var order = orders.Single(x => x.Data.Id == $"orders/{counter}").Data; + Assert.Equal(order.OrderLinesCount, numberOfLinesPerOrder); + Assert.Equal(order.TotalCost, counter * 2); + } + } + + [Fact] + public void Error_if_script_is_empty() + { + var config = new QueueEtlConfiguration + { + Name = "test", + ConnectionStringName = "test", + BrokerType = QueueBrokerType.AzureQueueStorage, + Transforms = { new Transformation { Name = "test", Collections = { "Orders" }, Script = @"" } } + }; + + config.Initialize(new QueueConnectionString + { + Name = "Foo", + BrokerType = QueueBrokerType.AzureQueueStorage, + AzureQueueStorageConnectionSettings = + new AzureQueueStorageConnectionSettings() + { + Authentication = + new global::Raven.Client.Documents.Operations.ETL.Queue.Authentication() + { + ConnectionString = ConnectionString + } + } + }); + + List errors; + config.Validate(out errors); + + Assert.Equal(1, errors.Count); + + Assert.Equal("Script 'test' must not be empty", errors[0]); + } + + [Fact] + public async Task CanTestScript() + { + using (var store = GetDocumentStore()) + { + using (var session = store.OpenAsyncSession()) + { + await session.StoreAsync(new Order + { + OrderLines = new List + { + new OrderLine { Cost = 3, Product = "Milk", Quantity = 3 }, + new OrderLine { Cost = 4, Product = "Bear", Quantity = 2 }, + } + }); + await session.SaveChangesAsync(); + } + + var result1 = store.Maintenance.Send(new PutConnectionStringOperation( + new QueueConnectionString + { + Name = "simulate", + BrokerType = QueueBrokerType.AzureQueueStorage, + AzureQueueStorageConnectionSettings = + new AzureQueueStorageConnectionSettings() + { + Authentication = + new global::Raven.Client.Documents.Operations.ETL.Queue.Authentication() + { + ConnectionString = ConnectionString + } + } + })); + Assert.NotNull(result1.RaftCommandIndex); + + var database = await GetDatabase(store.Database); + + using (database.DocumentsStorage.ContextPool.AllocateOperationContext( + out DocumentsOperationContext context)) + { + using (QueueEtl.TestScript( + new TestQueueEtlScript + { + DocumentId = "orders/1-A", + Configuration = new QueueEtlConfiguration + { + Name = "simulate", + ConnectionStringName = "simulate", + Queues = { new EtlQueue() { Name = "Orders" } }, + BrokerType = QueueBrokerType.AzureQueueStorage, + Transforms = + { + new Transformation + { + Collections = { "Orders" }, + Name = "Orders", + Script = DefaultScript + } + } + } + }, database, database.ServerStore, context, out var testResult)) + { + var result = (QueueEtlTestScriptResult)testResult; + + Assert.Equal(0, result.TransformationErrors.Count); + + Assert.Equal(1, result.Summary.Count); + + Assert.Equal("Orders", result.Summary[0].QueueName); + Assert.Equal("myRoutingKey", result.Summary[0].Messages[0].RoutingKey); + Assert.Equal("orders/1-A", result.Summary[0].Messages[0].Attributes.Id); + Assert.Equal("com.github.users", result.Summary[0].Messages[0].Attributes.Type); + Assert.Equal("/registrations/direct-signup", + result.Summary[0].Messages[0].Attributes.Source.ToString()); + + Assert.Equal("test output", result.DebugOutput[0]); + } + } + } + } + + [Fact] + public void ShouldDeleteDocumentsAfterProcessing() + { + using (var store = GetDocumentStore()) + { + var config = SetupQueueEtlToAzureQueueStorageOnline(store, + @$"loadToUsers(this)", new[] { "Users" }, + new[] { new EtlQueue { Name = $"Users", DeleteProcessedDocuments = true } }); + + var etlDone = Etl.WaitForEtlToComplete(store); + + using (var session = store.OpenSession()) + { + session.Store(new User { Id = "users/1", Name = "Arek" }); + session.SaveChanges(); + } + + AssertEtlDone(etlDone, TimeSpan.FromMinutes(1), store.Database, config); + + QueueClient queueClient = CreateAzureQueueStorageClient(ConnectionString, "users"); + var message = queueClient.ReceiveMessage(); + var user = JsonConvert.DeserializeObject(message.Value.MessageText).Data; + + Assert.NotNull(user); + Assert.Equal(user.Name, "Arek"); + + using (var session = store.OpenSession()) + { + var entity = session.Load("users/1"); + Assert.Null(entity); + } + } + } + + [Fact] + public async Task ShouldImportTask() + { + using (var srcStore = GetDocumentStore()) + using (var dstStore = GetDocumentStore()) + { + var config = SetupQueueEtlToAzureQueueStorageOnline(srcStore, + DefaultScript, DefaultCollections, + new List() { new() { Name = "Orders", DeleteProcessedDocuments = true } }, + connectionString: ConnectionString); + + var exportFile = GetTempFileName(); + + var exportOperation = await srcStore.Smuggler.ExportAsync(new DatabaseSmugglerExportOptions(), exportFile); + await exportOperation.WaitForCompletionAsync(TimeSpan.FromMinutes(1)); + + var operation = await dstStore.Smuggler.ImportAsync(new DatabaseSmugglerImportOptions(), exportFile); + await operation.WaitForCompletionAsync(TimeSpan.FromMinutes(1)); + + var destinationRecord = + await dstStore.Maintenance.Server.SendAsync(new GetDatabaseRecordOperation(dstStore.Database)); + Assert.Equal(1, destinationRecord.QueueConnectionStrings.Count); + Assert.Equal(1, destinationRecord.QueueEtls.Count); + + Assert.Equal(QueueBrokerType.AzureQueueStorage, destinationRecord.QueueEtls[0].BrokerType); + Assert.Equal(DefaultScript, destinationRecord.QueueEtls[0].Transforms[0].Script); + Assert.Equal(DefaultCollections, destinationRecord.QueueEtls[0].Transforms[0].Collections); + + Assert.Equal(1, destinationRecord.QueueEtls[0].Queues.Count); + Assert.Equal("Orders", destinationRecord.QueueEtls[0].Queues[0].Name); + Assert.True(destinationRecord.QueueEtls[0].Queues[0].DeleteProcessedDocuments); } } + [RavenFact(RavenTestCategory.BackupExportImport | RavenTestCategory.Sharding | RavenTestCategory.Etl)] + public async Task ShouldSkipUnsupportedFeaturesInShardingOnImport_RabbitMqEtl() + { + using (var srcStore = GetDocumentStore()) + using (var dstStore = Sharding.GetDocumentStore()) + { + var config = SetupQueueEtlToAzureQueueStorageOnline(srcStore, + DefaultScript, DefaultCollections, new List() + { + new() + { + Name = "Orders", + DeleteProcessedDocuments = true + } + }, connectionString: ConnectionString); + + var record = await srcStore.Maintenance.Server.SendAsync(new GetDatabaseRecordOperation(srcStore.Database)); + + Assert.NotNull(record.QueueEtls); + Assert.Equal(1, record.QueueEtls.Count); + + var exportFile = GetTempFileName(); + + var exportOperation = await srcStore.Smuggler.ExportAsync(new DatabaseSmugglerExportOptions(), exportFile); + await exportOperation.WaitForCompletionAsync(TimeSpan.FromMinutes(1)); + + var operation = await dstStore.Smuggler.ImportAsync(new DatabaseSmugglerImportOptions(), exportFile); + await operation.WaitForCompletionAsync(TimeSpan.FromMinutes(1)); + + record = await dstStore.Maintenance.Server.SendAsync(new GetDatabaseRecordOperation(dstStore.Database)); + + Assert.Empty(record.QueueEtls); + } + } + private class Order { public string Id { get; set; } @@ -56,4 +430,36 @@ private class OrderLine public int Quantity { get; set; } public int Cost { get; set; } } + + private class User + { + public string Id { get; set; } + public string Name { get; set; } + } + + private class Person + { + public string Id { get; set; } + public string Name { get; set; } + } + + private class UserData + { + public string UserId { get; set; } + public string Name { get; set; } + } + + private class CloudEventOrderData + { + public string Id { get; set; } + public string Specversion { get; set; } + public string Type { get; set; } + public string Source { get; set; } + public OrderData Data { get; set; } + } + + private class CloudEventUserData + { + public UserData Data { get; set; } + } }