diff --git a/src/Raven.Client/Documents/Operations/ETL/Queue/AzureQueueStorageConnectionSettings.cs b/src/Raven.Client/Documents/Operations/ETL/Queue/AzureQueueStorageConnectionSettings.cs new file mode 100644 index 000000000000..64be25aa693f --- /dev/null +++ b/src/Raven.Client/Documents/Operations/ETL/Queue/AzureQueueStorageConnectionSettings.cs @@ -0,0 +1,86 @@ +using System; +using System.Linq; +using Sparrow.Json.Parsing; + +namespace Raven.Client.Documents.Operations.ETL.Queue; + +public sealed class AzureQueueStorageConnectionSettings +{ + public Authentication Authentication; + + public string GetStorageUrl() + { + string storageAccountName = GetStorageAccountName(); + return $"https://{storageAccountName}.queue.core.windows.net/"; + } + + public string GetStorageAccountName() + { + string storageAccountName = ""; + + if (Authentication.ConnectionString != null) + { + var accountNamePart = Authentication.ConnectionString.Split(';') + .FirstOrDefault(part => part.StartsWith("AccountName=", StringComparison.OrdinalIgnoreCase)); + + if (accountNamePart == null) + { + throw new ArgumentException("Storage account name not found in the connection string.", + nameof(Authentication.ConnectionString)); + } + + storageAccountName = accountNamePart.Substring("AccountName=".Length); + } + else if (Authentication.EntraId != null) + { + storageAccountName = Authentication.EntraId.StorageAccountName; + } + + return storageAccountName; + } + + public DynamicJsonValue ToJson() + { + var json = new DynamicJsonValue + { + [nameof(Authentication)] = Authentication == null + ? null + : new DynamicJsonValue + { + [nameof(Authentication.ConnectionString)] = Authentication.ConnectionString, + [nameof(Authentication.Passwordless)] = Authentication.Passwordless, + [nameof(Authentication.EntraId)] = + Authentication.EntraId == null + ? null + : new DynamicJsonValue + { + [nameof(Authentication.EntraId.StorageAccountName)] = + Authentication?.EntraId?.StorageAccountName, + [nameof(Authentication.EntraId.TenantId)] = + Authentication?.EntraId?.TenantId, + [nameof(Authentication.EntraId.ClientId)] = + Authentication?.EntraId?.ClientId, + [nameof(Authentication.EntraId.ClientSecret)] = + Authentication?.EntraId?.ClientSecret + } + } + }; + + return json; + } +} + +public sealed class Authentication +{ + public EntraId EntraId { get; set; } + public string ConnectionString { get; set; } + public bool Passwordless { get; set; } +} + +public sealed class EntraId +{ + public string StorageAccountName { get; set; } + public string TenantId { get; set; } + public string ClientId { get; set; } + public string ClientSecret { get; set; } +} diff --git a/src/Raven.Client/Documents/Operations/ETL/Queue/QueueBrokerType.cs b/src/Raven.Client/Documents/Operations/ETL/Queue/QueueBrokerType.cs index e3abe64df8df..ec72741e8287 100644 --- a/src/Raven.Client/Documents/Operations/ETL/Queue/QueueBrokerType.cs +++ b/src/Raven.Client/Documents/Operations/ETL/Queue/QueueBrokerType.cs @@ -4,5 +4,6 @@ public enum QueueBrokerType { None, Kafka, - RabbitMq + RabbitMq, + AzureQueueStorage } diff --git a/src/Raven.Client/Documents/Operations/ETL/Queue/QueueConnectionString.cs b/src/Raven.Client/Documents/Operations/ETL/Queue/QueueConnectionString.cs index 0df950a6df71..93f4a33ac51e 100644 --- a/src/Raven.Client/Documents/Operations/ETL/Queue/QueueConnectionString.cs +++ b/src/Raven.Client/Documents/Operations/ETL/Queue/QueueConnectionString.cs @@ -12,6 +12,8 @@ public sealed class QueueConnectionString : ConnectionString public KafkaConnectionSettings KafkaConnectionSettings { get; set; } public RabbitMqConnectionSettings RabbitMqConnectionSettings { get; set; } + + public AzureQueueStorageConnectionSettings AzureQueueStorageConnectionSettings { get; set; } public override ConnectionStringType Type => ConnectionStringType.Queue; @@ -31,6 +33,12 @@ protected override void ValidateImpl(ref List errors) errors.Add($"{nameof(RabbitMqConnectionSettings)} has no valid setting."); } break; + case QueueBrokerType.AzureQueueStorage: + if (AzureQueueStorageConnectionSettings?.Authentication == null) + { + errors.Add($"{nameof(AzureQueueStorageConnectionSettings)} has no valid setting."); + } + break; default: throw new NotSupportedException($"'{BrokerType}' broker is not supported"); } @@ -52,6 +60,9 @@ internal string GetUrl() url = indexOfStartServerUri != -1 ? connectionString.Substring(indexOfStartServerUri + 1) : null; break; + case QueueBrokerType.AzureQueueStorage: + url = AzureQueueStorageConnectionSettings.GetStorageUrl(); + break; default: throw new NotSupportedException($"'{BrokerType}' broker is not supported"); } @@ -66,6 +77,7 @@ public override DynamicJsonValue ToJson() json[nameof(BrokerType)] = BrokerType; json[nameof(KafkaConnectionSettings)] = KafkaConnectionSettings?.ToJson(); json[nameof(RabbitMqConnectionSettings)] = RabbitMqConnectionSettings?.ToJson(); + json[nameof(AzureQueueStorageConnectionSettings)] = AzureQueueStorageConnectionSettings?.ToJson(); return json; } diff --git a/src/Raven.Client/Documents/Operations/ETL/Queue/QueueEtlConfiguration.cs b/src/Raven.Client/Documents/Operations/ETL/Queue/QueueEtlConfiguration.cs index 57f1cce49deb..1dbcc42b1415 100644 --- a/src/Raven.Client/Documents/Operations/ETL/Queue/QueueEtlConfiguration.cs +++ b/src/Raven.Client/Documents/Operations/ETL/Queue/QueueEtlConfiguration.cs @@ -47,7 +47,11 @@ public override bool UsingEncryptedCommunicationChannel() } break; case QueueBrokerType.RabbitMq: - return Connection.RabbitMqConnectionSettings.ConnectionString.StartsWith("amqps", StringComparison.OrdinalIgnoreCase); + return Connection.RabbitMqConnectionSettings.ConnectionString.StartsWith("amqps", + StringComparison.OrdinalIgnoreCase); + case QueueBrokerType.AzureQueueStorage: + return Connection.AzureQueueStorageConnectionSettings.GetStorageUrl() + .StartsWith("https", StringComparison.OrdinalIgnoreCase); default: throw new NotSupportedException($"Unknown broker type: {BrokerType}"); } diff --git a/src/Raven.Server/Commercial/LicenseStatus.cs b/src/Raven.Server/Commercial/LicenseStatus.cs index 876ad8c533a5..653181381f89 100644 --- a/src/Raven.Server/Commercial/LicenseStatus.cs +++ b/src/Raven.Server/Commercial/LicenseStatus.cs @@ -216,7 +216,9 @@ public bool CanAutoRenewLetsEncryptCertificate public bool HasElasticSearchEtl => Enabled(LicenseAttribute.ElasticSearchEtl); - public bool HasQueueEtl => Enabled(LicenseAttribute.QueueEtl); + //public bool HasQueueEtl => Enabled(LicenseAttribute.QueueEtl); + public bool HasQueueEtl => true; + public bool HasPowerBI => Enabled(LicenseAttribute.PowerBI); diff --git a/src/Raven.Server/Config/Categories/EtlConfiguration.cs b/src/Raven.Server/Config/Categories/EtlConfiguration.cs index 122b5497e710..bec32b3946de 100644 --- a/src/Raven.Server/Config/Categories/EtlConfiguration.cs +++ b/src/Raven.Server/Config/Categories/EtlConfiguration.cs @@ -52,5 +52,17 @@ public sealed class EtlConfiguration : ConfigurationCategory [TimeUnit(TimeUnit.Seconds)] [ConfigurationEntry("ETL.Queue.Kafka.InitTransactionsTimeoutInSec", ConfigurationEntryScope.ServerWideOrPerDatabase)] public TimeSetting KafkaInitTransactionsTimeout { get; set; } + + [Description("Lifespan of a message in the queue")] + [DefaultValue(604800)] + [TimeUnit(TimeUnit.Seconds)] + [ConfigurationEntry("ETL.Queue.AzureQueueStorage.TimeToLiveInSec", ConfigurationEntryScope.ServerWideOrPerDatabase)] + public TimeSetting AzureQueueStorageTimeToLive{ get; set; } + + [Description("How long a message is hidden after being retrieved but not deleted")] + [DefaultValue(0)] + [TimeUnit(TimeUnit.Seconds)] + [ConfigurationEntry("ETL.Queue.AzureQueueStorage.VisibilityTimeoutInSec", ConfigurationEntryScope.ServerWideOrPerDatabase)] + public TimeSetting AzureQueueStorageVisibilityTimeout{ get; set; } } } diff --git a/src/Raven.Server/Dashboard/DatabasesInfoRetriever.cs b/src/Raven.Server/Dashboard/DatabasesInfoRetriever.cs index 8ee48c5822f4..6d8b95baab9b 100644 --- a/src/Raven.Server/Dashboard/DatabasesInfoRetriever.cs +++ b/src/Raven.Server/Dashboard/DatabasesInfoRetriever.cs @@ -349,6 +349,10 @@ private static DatabaseOngoingTasksInfoItem GetOngoingTasksInfoItem(DocumentData long rabbitMqEtlCountOnNode = GetTaskCountOnNode(database, dbRecord, serverStore, database.EtlLoader.QueueDestinations, task => EtlLoader.GetProcessState(task.Transforms, database, task.Name), task => task.BrokerType == QueueBrokerType.RabbitMq); + var azureQueueStorageEtlCount = database.EtlLoader.GetQueueDestinationCountByBroker(QueueBrokerType.AzureQueueStorage); + long azureQueueStorageEtlCountOnNode = GetTaskCountOnNode(database, dbRecord, serverStore, database.EtlLoader.QueueDestinations, + task => EtlLoader.GetProcessState(task.Transforms, database, task.Name), task => task.BrokerType == QueueBrokerType.AzureQueueStorage); + var periodicBackupCount = database.PeriodicBackupRunner.PeriodicBackups.Count; long periodicBackupCountOnNode = BackupUtils.GetTasksCountOnNode(serverStore, database.Name, context); @@ -364,8 +368,8 @@ private static DatabaseOngoingTasksInfoItem GetOngoingTasksInfoItem(DocumentData task => QueueSinkLoader.GetProcessState(task.Scripts, database, task.Name), task => task.BrokerType == QueueBrokerType.RabbitMq); ongoingTasksCount = extRepCount + replicationHubCount + replicationSinkCount + - ravenEtlCount + sqlEtlCount + elasticSearchEtlCount + olapEtlCount + kafkaEtlCount + rabbitMqEtlCount + - periodicBackupCount + subscriptionCount + + ravenEtlCount + sqlEtlCount + elasticSearchEtlCount + olapEtlCount + kafkaEtlCount + + rabbitMqEtlCount + azureQueueStorageEtlCount + periodicBackupCount + subscriptionCount + kafkaSinkCount + rabbitMqSinkCount; return new DatabaseOngoingTasksInfoItem @@ -380,6 +384,7 @@ private static DatabaseOngoingTasksInfoItem GetOngoingTasksInfoItem(DocumentData OlapEtlCount = olapEtlCountOnNode, KafkaEtlCount = kafkaEtlCountOnNode, RabbitMqEtlCount = rabbitMqEtlCountOnNode, + AzureQueueStorageEtlCount = azureQueueStorageEtlCountOnNode, PeriodicBackupCount = periodicBackupCountOnNode, SubscriptionCount = subscriptionCountOnNode, KafkaSinkCount = kafkaSinkCountOnNode, diff --git a/src/Raven.Server/Dashboard/DatabasesOngoingTasksInfo.cs b/src/Raven.Server/Dashboard/DatabasesOngoingTasksInfo.cs index 1e797fc118cb..ada479948f07 100644 --- a/src/Raven.Server/Dashboard/DatabasesOngoingTasksInfo.cs +++ b/src/Raven.Server/Dashboard/DatabasesOngoingTasksInfo.cs @@ -34,6 +34,8 @@ public sealed class DatabaseOngoingTasksInfoItem : IDynamicJson public long KafkaEtlCount { get; set; } public long RabbitMqEtlCount { get; set; } + + public long AzureQueueStorageEtlCount { get; set; } public long PeriodicBackupCount { get; set; } @@ -57,6 +59,7 @@ public DynamicJsonValue ToJson() [nameof(OlapEtlCount)] = OlapEtlCount, [nameof(KafkaEtlCount)] = KafkaEtlCount, [nameof(RabbitMqEtlCount)] = RabbitMqEtlCount, + [nameof(AzureQueueStorageEtlCount)] = AzureQueueStorageEtlCount, [nameof(PeriodicBackupCount)] = PeriodicBackupCount, [nameof(SubscriptionCount)] = SubscriptionCount, [nameof(KafkaSinkCount)] = KafkaSinkCount, diff --git a/src/Raven.Server/Documents/ETL/EtlLoader.cs b/src/Raven.Server/Documents/ETL/EtlLoader.cs index bf578541be3d..2dc1ddf252fe 100644 --- a/src/Raven.Server/Documents/ETL/EtlLoader.cs +++ b/src/Raven.Server/Documents/ETL/EtlLoader.cs @@ -18,6 +18,7 @@ using Raven.Server.Documents.ETL.Providers.ElasticSearch; using Raven.Server.Documents.ETL.Providers.OLAP; using Raven.Server.Documents.ETL.Providers.Queue; +using Raven.Server.Documents.ETL.Providers.Queue.AzureQueueStorage; using Raven.Server.Documents.ETL.Providers.Queue.Kafka; using Raven.Server.Documents.ETL.Providers.Queue.RabbitMq; using Raven.Server.Documents.ETL.Providers.Raven; @@ -646,6 +647,29 @@ public void HandleDatabaseRecordChange(DatabaseRecord record) break; } + case AzureQueueStorageEtl azureQueueStorageEtl: + { + QueueEtlConfiguration existing = null; + + foreach (var config in myQueueEtl) + { + var diff = azureQueueStorageEtl.Configuration.Compare(config); + + if (diff == EtlConfigurationCompareDifferences.None) + { + existing = config; + break; + } + } + + if (existing != null) + { + toRemove.Remove(processesPerConfig.Key); + myQueueEtl.Remove(existing); + } + + break; + } case ElasticSearchEtl elasticSearchEtl: { ElasticSearchEtlConfiguration existing = null; @@ -794,6 +818,13 @@ private static string GetStopReason( if (existing != null) differences = rabbitMqEtl.Configuration.Compare(existing, transformationDiffs); } + else if (process is AzureQueueStorageEtl azureQueueStorageEtl) + { + var existing = myQueueEtl.FirstOrDefault(x => x.Name.Equals(azureQueueStorageEtl.ConfigurationName, StringComparison.OrdinalIgnoreCase)); + + if (existing != null) + differences = azureQueueStorageEtl.Configuration.Compare(existing, transformationDiffs); + } else { throw new InvalidOperationException($"Unknown ETL process type: " + process.GetType().FullName); diff --git a/src/Raven.Server/Documents/ETL/EtlProcess.cs b/src/Raven.Server/Documents/ETL/EtlProcess.cs index 8b2185a92510..7acaa8a40c48 100644 --- a/src/Raven.Server/Documents/ETL/EtlProcess.cs +++ b/src/Raven.Server/Documents/ETL/EtlProcess.cs @@ -23,6 +23,7 @@ using Raven.Server.Documents.ETL.Providers.OLAP; using Raven.Server.Documents.ETL.Providers.OLAP.Test; using Raven.Server.Documents.ETL.Providers.Queue; +using Raven.Server.Documents.ETL.Providers.Queue.AzureQueueStorage; using Raven.Server.Documents.ETL.Providers.Queue.Kafka; using Raven.Server.Documents.ETL.Providers.Queue.RabbitMq; using Raven.Server.Documents.ETL.Providers.Raven; @@ -1328,6 +1329,21 @@ public static IDisposable TestScript( result = rabbitMqEtl.RunTest(results, context); result.DebugOutput = debugOutput; + return tx; + } + case AzureQueueStorageEtl azureQueueStorageEtl: + using (azureQueueStorageEtl.EnterTestMode(out debugOutput)) + { + azureQueueStorageEtl.EnsureThreadAllocationStats(); + + var queueItem = new QueueItem(document, docCollection); + + var results = azureQueueStorageEtl.Transform(new[] { queueItem }, context, new EtlStatsScope(new EtlRunStats()), + new EtlProcessState()); + + result = azureQueueStorageEtl.RunTest(results, context); + result.DebugOutput = debugOutput; + return tx; } default: diff --git a/src/Raven.Server/Documents/ETL/Handlers/Processors/EtlHandlerProcessorForProgress.cs b/src/Raven.Server/Documents/ETL/Handlers/Processors/EtlHandlerProcessorForProgress.cs index da9859c8c574..b48e15636a29 100644 --- a/src/Raven.Server/Documents/ETL/Handlers/Processors/EtlHandlerProcessorForProgress.cs +++ b/src/Raven.Server/Documents/ETL/Handlers/Processors/EtlHandlerProcessorForProgress.cs @@ -2,6 +2,7 @@ using System.Threading.Tasks; using JetBrains.Annotations; using Raven.Client.Documents.Operations.ETL.Queue; +using Raven.Server.Documents.ETL.Providers.Queue.AzureQueueStorage; using Raven.Server.Documents.ETL.Providers.Queue.Kafka; using Raven.Server.Documents.ETL.Providers.Queue.RabbitMq; using Raven.Server.Documents.ETL.Stats; @@ -37,6 +38,7 @@ protected override async ValueTask HandleCurrentNodeAsync() { RabbitMqEtl => QueueBrokerType.RabbitMq, KafkaEtl => QueueBrokerType.Kafka, + AzureQueueStorageEtl => QueueBrokerType.AzureQueueStorage, _ => null } }).ToArray(); diff --git a/src/Raven.Server/Documents/ETL/Providers/Queue/AzureQueueStorage/AzureQueueStorageDocumentTransformer.cs b/src/Raven.Server/Documents/ETL/Providers/Queue/AzureQueueStorage/AzureQueueStorageDocumentTransformer.cs new file mode 100644 index 000000000000..2ab6b2380fbb --- /dev/null +++ b/src/Raven.Server/Documents/ETL/Providers/Queue/AzureQueueStorage/AzureQueueStorageDocumentTransformer.cs @@ -0,0 +1,106 @@ +using Jint; +using Jint.Native; +using Jint.Native.Object; +using Jint.Runtime.Interop; +using Raven.Client.Documents.Operations.ETL; +using Raven.Client.Documents.Operations.ETL.Queue; +using Raven.Server.Documents.Patch; +using Raven.Server.ServerWide.Context; + +namespace Raven.Server.Documents.ETL.Providers.Queue.AzureQueueStorage; + +public sealed class AzureQueueStorageDocumentTransformer : QueueDocumentTransformer + where T : QueueItem +{ + public AzureQueueStorageDocumentTransformer(Transformation transformation, DocumentDatabase database, + DocumentsOperationContext context, QueueEtlConfiguration config) : base(transformation, database, context, + config) + { + } + + protected override void LoadToFunction(string queueName, ScriptRunnerResult document) + { + LoadToFunction(queueName, document, null); + } + + private void LoadToFunction(string queueName, ScriptRunnerResult document, CloudEventAttributes attributes) + { + if (queueName == null) + ThrowLoadParameterIsMandatory(nameof(queueName)); + + var result = document.TranslateToObject(Context); + + var topic = GetOrAdd(queueName); + + topic.Items.Add(new AzureQueueStorageItem(Current) { TransformationResult = result, Attributes = attributes }); + } + + public override void Initialize(bool debugMode) + { + base.Initialize(debugMode); + + DocumentScript.ScriptEngine.SetValue(Transformation.LoadTo, + new ClrFunction(DocumentScript.ScriptEngine, Transformation.LoadTo, + LoadToFunctionTranslatorWithAttributes)); + + foreach (var queueName in LoadToDestinations) + { + var name = Transformation.LoadTo + queueName; + + DocumentScript.ScriptEngine.SetValue(name, new ClrFunction(DocumentScript.ScriptEngine, name, + (self, args) => LoadToFunctionTranslatorWithAttributes(queueName, args))); + } + } + + private JsValue LoadToFunctionTranslatorWithAttributes(JsValue self, JsValue[] args) + { + var methodSignature = "loadTo(name, obj, attributes)"; + + if (args.Length != 2 && args.Length != 3) + ThrowInvalidScriptMethodCall($"{methodSignature} must be called with 2 or 3 parameters"); + + if (args[0].IsString() == false) + ThrowInvalidScriptMethodCall($"{methodSignature} first argument must be a string"); + + if (args[1].IsObject() == false) + ThrowInvalidScriptMethodCall($"{methodSignature} second argument must be an object"); + + if (args.Length == 3 && args[2].IsObject() == false) + ThrowInvalidScriptMethodCall($"{methodSignature} third argument must be an object"); + + return LoadToFunctionTranslatorWithAttributesInternal(args[0].AsString(), args[1].AsObject(), + args.Length == 3 ? args[2].AsObject() : null); + } + + private JsValue LoadToFunctionTranslatorWithAttributes(string name, JsValue[] args) + { + var methodSignature = $"loadTo{name}(obj, attributes)"; + + if (args.Length != 1 && args.Length != 2) + ThrowInvalidScriptMethodCall($"{methodSignature} must be called with with 1 or 2 parameters"); + + if (args[0].IsObject() == false) + ThrowInvalidScriptMethodCall($"{methodSignature} argument 'obj' must be an object"); + + if (args.Length == 2 && args[1].IsObject() == false) + ThrowInvalidScriptMethodCall($"{methodSignature} argument 'attributes' must be an object"); + + return LoadToFunctionTranslatorWithAttributesInternal(name, args[0].AsObject(), + args.Length == 2 ? args[1].AsObject() : null); + } + + private JsValue LoadToFunctionTranslatorWithAttributesInternal(string name, ObjectInstance obj, + ObjectInstance attributes) + { + var result = new ScriptRunnerResult(DocumentScript, obj); + + CloudEventAttributes cloudEventAttributes = null; + + if (attributes != null) + cloudEventAttributes = GetCloudEventAttributes(attributes); + + LoadToFunction(name, result, cloudEventAttributes); + + return result.Instance; + } +} diff --git a/src/Raven.Server/Documents/ETL/Providers/Queue/AzureQueueStorage/AzureQueueStorageEtl.cs b/src/Raven.Server/Documents/ETL/Providers/Queue/AzureQueueStorage/AzureQueueStorageEtl.cs new file mode 100644 index 000000000000..3ee3daba3bf9 --- /dev/null +++ b/src/Raven.Server/Documents/ETL/Providers/Queue/AzureQueueStorage/AzureQueueStorageEtl.cs @@ -0,0 +1,165 @@ +using System; +using System.Collections.Generic; +using System.Text.Json; +using System.Text.Json.Serialization; +using Azure.Storage.Queues; +using CloudNative.CloudEvents; +using CloudNative.CloudEvents.Extensions; +using Raven.Client.Documents.Operations.ETL; +using Raven.Client.Documents.Operations.ETL.Queue; +using Raven.Server.Documents.ETL.Stats; +using Raven.Server.Exceptions.ETL.QueueEtl; +using Raven.Server.ServerWide; +using Raven.Server.ServerWide.Context; +using Sparrow.Json; + +namespace Raven.Server.Documents.ETL.Providers.Queue.AzureQueueStorage; + +public sealed class AzureQueueStorageEtl : QueueEtl +{ + private readonly HashSet _alreadyCreatedQueues = new(); + private readonly Dictionary _queueClients = new(); + + public AzureQueueStorageEtl(Transformation transformation, QueueEtlConfiguration configuration, + DocumentDatabase database, ServerStore serverStore) : base(transformation, configuration, database, serverStore) + { + } + + protected override + EtlTransformer, EtlStatsScope, EtlPerformanceOperation> + GetTransformer(DocumentsOperationContext context) + { + return new AzureQueueStorageDocumentTransformer(Transformation, Database, context, + Configuration); + } + + protected override int PublishMessages(List> itemsPerQueue, + BlittableJsonEventBinaryFormatter formatter, out List idsToDelete) + { + if (itemsPerQueue.Count == 0) + { + idsToDelete = null; + return 0; + } + + idsToDelete = []; + int count = 0; + + foreach (QueueWithItems queue in itemsPerQueue) + { + string queueName = queue.Name.ToLower(); + + if (_queueClients.TryGetValue(queueName, out QueueClient queueClient) == false) + { + queueClient = + QueueBrokerConnectionHelper.CreateAzureQueueStorageClient( + Configuration.Connection.AzureQueueStorageConnectionSettings, queueName); + _queueClients.Add(queueName, queueClient); + } + + if (Configuration.SkipAutomaticQueueDeclaration == false) + CreateQueueIfNotExists(queueClient); + + foreach (AzureQueueStorageItem queueItem in queue.Items) + { + string base64CloudEvent = CreateBase64CloudEvent(queueItem); + + TimeSpan? timeToLive = Database.Configuration.Etl.AzureQueueStorageTimeToLive.AsTimeSpan; + TimeSpan? visibilityTimeout = Database.Configuration.Etl.AzureQueueStorageVisibilityTimeout.AsTimeSpan; + + try + { + queueClient.SendMessage(base64CloudEvent, visibilityTimeout, timeToLive); + count++; + + if (queue.DeleteProcessedDocuments) + idsToDelete.Add(queueItem.DocumentId); + } + catch (Azure.RequestFailedException ex) + { + throw new QueueLoadException( + $"Failed to deliver message, Azure error code: '{ex.ErrorCode}', error reason: '{ex.Message}' for document with id: '{queueItem.DocumentId}'", + ex); + } + catch (Exception ex) + { + throw new QueueLoadException($"Failed to deliver message, error reason: '{ex.Message}'", ex); + } + } + } + + return count; + } + + private string CreateBase64CloudEvent(AzureQueueStorageItem queueItem) + { + var cloudEvent = CreateCloudEvent(queueItem); + var options = new JsonSerializerOptions { Converters = { CloudEventConverter.Instance } }; + byte[] cloudEventBytes = JsonSerializer.SerializeToUtf8Bytes(cloudEvent, options); + return Convert.ToBase64String(cloudEventBytes); + } + + protected override void OnProcessStopped() + { + _queueClients.Clear(); + } + + private void CreateQueueIfNotExists(QueueClient queueClient) + { + if (_alreadyCreatedQueues.Contains(queueClient.Name)) return; + try + { + queueClient.CreateIfNotExists(); + _alreadyCreatedQueues.Add(queueClient.Name); + } + catch (Azure.RequestFailedException ex) + { + throw new QueueLoadException( + $"Failed to deliver message, Azure error code: '{ex.ErrorCode}', error reason: '{ex.Message}'", ex); + } + } + + private sealed class CloudEventConverter : JsonConverter + { + public static readonly CloudEventConverter Instance = new CloudEventConverter(); + + const string SpecVersionAttributeName = "specversion"; + + private CloudEventConverter() + { + } + + public override void Write(Utf8JsonWriter writer, CloudEvent cloudEvent, JsonSerializerOptions options) + { + writer.WriteStartObject(); + + writer.WritePropertyName(SpecVersionAttributeName); + writer.WriteStringValue(cloudEvent.SpecVersion.VersionId); + + foreach (var pair in cloudEvent.GetPopulatedAttributes()) + { + var attribute = pair.Key; + if (attribute == cloudEvent.SpecVersion.DataContentTypeAttribute || + attribute.Name == Partitioning.PartitionKeyAttribute.Name) + { + continue; + } + + var value = attribute.Format(pair.Value); + + writer.WritePropertyName(attribute.Name); + writer.WriteStringValue(value); + } + + writer.WritePropertyName("data"); + writer.WriteRawValue(((BlittableJsonReaderObject)cloudEvent.Data).ToString()); + + writer.WriteEndObject(); + } + + public override CloudEvent Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + throw new NotImplementedException(); + } + } +} diff --git a/src/Raven.Server/Documents/ETL/Providers/Queue/AzureQueueStorage/AzureQueueStorageItem.cs b/src/Raven.Server/Documents/ETL/Providers/Queue/AzureQueueStorage/AzureQueueStorageItem.cs new file mode 100644 index 000000000000..eb424aef2160 --- /dev/null +++ b/src/Raven.Server/Documents/ETL/Providers/Queue/AzureQueueStorage/AzureQueueStorageItem.cs @@ -0,0 +1,8 @@ +namespace Raven.Server.Documents.ETL.Providers.Queue.AzureQueueStorage; + +public sealed class AzureQueueStorageItem : QueueItem +{ + public AzureQueueStorageItem(QueueItem item) : base(item) + { + } +} diff --git a/src/Raven.Server/Documents/ETL/Providers/Queue/Handlers/Processors/QueueEtlHandlerProcessorForTestAzureQueueStorageConnection.cs b/src/Raven.Server/Documents/ETL/Providers/Queue/Handlers/Processors/QueueEtlHandlerProcessorForTestAzureQueueStorageConnection.cs new file mode 100644 index 000000000000..5cabcd10a873 --- /dev/null +++ b/src/Raven.Server/Documents/ETL/Providers/Queue/Handlers/Processors/QueueEtlHandlerProcessorForTestAzureQueueStorageConnection.cs @@ -0,0 +1,64 @@ +using System; +using System.IO; +using System.Threading.Tasks; +using Azure.Storage.Queues; +using JetBrains.Annotations; +using Newtonsoft.Json; +using Raven.Client.Documents.Operations.ETL.Queue; +using Raven.Server.Documents.Handlers.Processors; +using Raven.Server.Web.System; +using Sparrow.Json; +using Sparrow.Json.Parsing; + +namespace Raven.Server.Documents.ETL.Providers.Queue.Handlers.Processors; + +internal sealed class + QueueEtlHandlerProcessorForTestAzureQueueStorageConnection : + AbstractDatabaseHandlerProcessor + where TOperationContext : JsonOperationContext + where TRequestHandler : AbstractDatabaseRequestHandler +{ + public QueueEtlHandlerProcessorForTestAzureQueueStorageConnection([NotNull] TRequestHandler requestHandler) : + base(requestHandler) + { + } + + public override async ValueTask ExecuteAsync() + { + try + { + string authenticationJson = await new StreamReader(HttpContext.Request.Body).ReadToEndAsync(); + Authentication authentication = JsonConvert.DeserializeObject(authenticationJson); + + var connectionSettings = new AzureQueueStorageConnectionSettings() { Authentication = authentication }; + + QueueServiceClient client = + QueueBrokerConnectionHelper.CreateAzureQueueStorageServiceClient(connectionSettings); + + await client.GetStatisticsAsync(); + + DynamicJsonValue result = new() { [nameof(NodeConnectionTestResult.Success)] = true, }; + using (ServerStore.ContextPool.AllocateOperationContext(out JsonOperationContext context)) + await using (AsyncBlittableJsonTextWriter writer = new(context, RequestHandler.ResponseBodyStream())) + { + context.Write(writer, result); + } + } + catch (Exception ex) + { + using (ServerStore.ContextPool.AllocateOperationContext(out JsonOperationContext context)) + { + await using (var writer = + new AsyncBlittableJsonTextWriter(context, RequestHandler.ResponseBodyStream())) + { + context.Write(writer, + new DynamicJsonValue + { + [nameof(NodeConnectionTestResult.Success)] = false, + [nameof(NodeConnectionTestResult.Error)] = ex.ToString() + }); + } + } + } + } +} diff --git a/src/Raven.Server/Documents/ETL/Providers/Queue/Handlers/QueueEtlConnectionHandler.cs b/src/Raven.Server/Documents/ETL/Providers/Queue/Handlers/QueueEtlConnectionHandler.cs index 5b86230320f5..994ce31c1245 100644 --- a/src/Raven.Server/Documents/ETL/Providers/Queue/Handlers/QueueEtlConnectionHandler.cs +++ b/src/Raven.Server/Documents/ETL/Providers/Queue/Handlers/QueueEtlConnectionHandler.cs @@ -20,5 +20,12 @@ public async Task GetTestRabbitMqConnectionResult() using (var processor = new QueueEtlHandlerProcessorForTestRabbitMqConnection(this)) await processor.ExecuteAsync(); } + + [RavenAction("/databases/*/admin/etl/queue/azurequeuestorage/test-connection", "POST", AuthorizationStatus.DatabaseAdmin)] + public async Task GetTestAzureQueueStorageConnectionResult() + { + using (var processor = new QueueEtlHandlerProcessorForTestAzureQueueStorageConnection(this)) + await processor.ExecuteAsync(); + } } } diff --git a/src/Raven.Server/Documents/ETL/Providers/Queue/QueueBrokerConnectionHelper.cs b/src/Raven.Server/Documents/ETL/Providers/Queue/QueueBrokerConnectionHelper.cs index bf8ef7c841b2..55a1e1eef95c 100644 --- a/src/Raven.Server/Documents/ETL/Providers/Queue/QueueBrokerConnectionHelper.cs +++ b/src/Raven.Server/Documents/ETL/Providers/Queue/QueueBrokerConnectionHelper.cs @@ -1,7 +1,12 @@ using System; using System.Collections.Generic; using System.IO; +using Azure.Core.Pipeline; +using Azure.Identity; +using Azure.Storage.Queues; +using Azure.Storage.Queues.Models; using Confluent.Kafka; +using NetTopologySuite.IO; using Org.BouncyCastle.Utilities.IO.Pem; using RabbitMQ.Client; using Raven.Client.Documents.Operations.ETL.Queue; @@ -13,7 +18,8 @@ namespace Raven.Server.Documents.ETL.Providers.Queue; public static class QueueBrokerConnectionHelper { - public static IProducer CreateKafkaProducer(KafkaConnectionSettings settings, string transactionalId, Logger logger, string etlProcessName, + public static IProducer CreateKafkaProducer(KafkaConnectionSettings settings, + string transactionalId, Logger logger, string etlProcessName, CertificateUtils.CertificateHolder certificateHolder = null) { ProducerConfig config = new() @@ -25,7 +31,7 @@ public static IProducer CreateKafkaProducer(KafkaConnectionSetti }; SetupKafkaClientConfig(config, settings, certificateHolder); - + IProducer producer = new ProducerBuilder(config) .SetErrorHandler((producer, error) => { @@ -38,14 +44,16 @@ public static IProducer CreateKafkaProducer(KafkaConnectionSetti .SetLogHandler((producer, logMessage) => { if (logger.IsOperationsEnabled) - logger.Operations($"ETL process: {etlProcessName}. {logMessage.Message} (level: {logMessage.Level}, facility: {logMessage.Facility}"); + logger.Operations( + $"ETL process: {etlProcessName}. {logMessage.Message} (level: {logMessage.Level}, facility: {logMessage.Facility}"); }) .Build(); return producer; } - public static void SetupKafkaClientConfig(ClientConfig config, KafkaConnectionSettings settings, CertificateUtils.CertificateHolder certificateHolder = null) + public static void SetupKafkaClientConfig(ClientConfig config, KafkaConnectionSettings settings, + CertificateUtils.CertificateHolder certificateHolder = null) { if (settings.UseRavenCertificate && certificateHolder?.Certificate != null) { @@ -68,7 +76,7 @@ private static string ExportAsPem(object @object) using (var sw = new StringWriter()) { var pemWriter = new PemWriter(sw); - + pemWriter.WriteObject(@object); return sw.ToString(); @@ -80,4 +88,65 @@ public static IConnection CreateRabbitMqConnection(RabbitMqConnectionSettings se var connectionFactory = new ConnectionFactory { Uri = new Uri(settings.ConnectionString) }; return connectionFactory.CreateConnection(); } + + public static QueueClient CreateAzureQueueStorageClient( + AzureQueueStorageConnectionSettings azureQueueStorageConnectionSettings, string queueName) + { + QueueClient queueClient; + + if (azureQueueStorageConnectionSettings.Authentication.ConnectionString != null) + { + queueClient = new QueueClient(azureQueueStorageConnectionSettings.Authentication.ConnectionString, + queueName); + } + + else if (azureQueueStorageConnectionSettings.Authentication.EntraId != null) + { + var queueUri = new Uri($"{azureQueueStorageConnectionSettings.GetStorageUrl()}{queueName}"); + + queueClient = new QueueClient( + queueUri, + new ClientSecretCredential( + azureQueueStorageConnectionSettings.Authentication.EntraId.TenantId, + azureQueueStorageConnectionSettings.Authentication.EntraId.ClientId, + azureQueueStorageConnectionSettings.Authentication.EntraId.ClientSecret)); + } + else if(azureQueueStorageConnectionSettings.Authentication.Passwordless) + { + var queueUri = new Uri($"{azureQueueStorageConnectionSettings.GetStorageUrl()}{queueName}"); + queueClient = new QueueClient(queueUri, new DefaultAzureCredential()); + } + else + { + throw new NotSupportedException("Provided authentication method is not supported"); + } + + return queueClient; + } + + public static QueueServiceClient CreateAzureQueueStorageServiceClient( + AzureQueueStorageConnectionSettings azureQueueStorageConnectionSettings) + { + QueueServiceClient queueServiceClient = null; + + if (azureQueueStorageConnectionSettings.Authentication.ConnectionString != null) + { + queueServiceClient = + new QueueServiceClient(azureQueueStorageConnectionSettings.Authentication.ConnectionString); + } + + else if (azureQueueStorageConnectionSettings.Authentication.EntraId != null) + { + var queueUri = new Uri(azureQueueStorageConnectionSettings.GetStorageUrl()); + + queueServiceClient = new QueueServiceClient( + queueUri, + new ClientSecretCredential( + azureQueueStorageConnectionSettings.Authentication.EntraId.TenantId, + azureQueueStorageConnectionSettings.Authentication.EntraId.ClientId, + azureQueueStorageConnectionSettings.Authentication.EntraId.ClientSecret)); + } + + return queueServiceClient; + } } diff --git a/src/Raven.Server/Documents/ETL/Providers/Queue/QueueEtl.cs b/src/Raven.Server/Documents/ETL/Providers/Queue/QueueEtl.cs index 2fd1163f8fa9..a3f4b590f8eb 100644 --- a/src/Raven.Server/Documents/ETL/Providers/Queue/QueueEtl.cs +++ b/src/Raven.Server/Documents/ETL/Providers/Queue/QueueEtl.cs @@ -7,6 +7,7 @@ using Raven.Client.Documents.Operations.ETL; using Raven.Client.Documents.Operations.ETL.Queue; using Raven.Server.Documents.ETL.Metrics; +using Raven.Server.Documents.ETL.Providers.Queue.AzureQueueStorage; using Raven.Server.Documents.ETL.Providers.Queue.Enumerators; using Raven.Server.Documents.ETL.Providers.Queue.Kafka; using Raven.Server.Documents.ETL.Providers.Queue.RabbitMq; @@ -45,6 +46,8 @@ public static EtlProcess CreateInstance(Transformation transformation, QueueEtlC return new KafkaEtl(transformation, configuration, database, serverStore); case QueueBrokerType.RabbitMq: return new RabbitMqEtl(transformation, configuration, database, serverStore); + case QueueBrokerType.AzureQueueStorage: + return new AzureQueueStorageEtl(transformation, configuration, database, serverStore); default: throw new NotSupportedException($"Unknown broker type: {configuration.BrokerType}"); } diff --git a/src/Raven.Server/Documents/Sharding/Handlers/ShardedQueueEtlConnectionHandler.cs b/src/Raven.Server/Documents/Sharding/Handlers/ShardedQueueEtlConnectionHandler.cs index 27e6f5fb9bc2..a4ee96311e3e 100644 --- a/src/Raven.Server/Documents/Sharding/Handlers/ShardedQueueEtlConnectionHandler.cs +++ b/src/Raven.Server/Documents/Sharding/Handlers/ShardedQueueEtlConnectionHandler.cs @@ -20,5 +20,12 @@ public async Task GetTestRabbitMqConnectionResult() using (var processor = new QueueEtlHandlerProcessorForTestRabbitMqConnection(this)) await processor.ExecuteAsync(); } + + [RavenShardedAction("/databases/*/admin/etl/queue/azurequeuestorage/test-connection", "POST")] + public async Task GetTestAzureQueueStorageConnectionResult() + { + using (var processor = new QueueEtlHandlerProcessorForTestAzureQueueStorageConnection(this)) + await processor.ExecuteAsync(); + } } } diff --git a/src/Raven.Server/Raven.Server.csproj b/src/Raven.Server/Raven.Server.csproj index 117b94111259..a4557c9c403a 100644 --- a/src/Raven.Server/Raven.Server.csproj +++ b/src/Raven.Server/Raven.Server.csproj @@ -155,7 +155,12 @@ + + + + + diff --git a/test/SlowTests/Server/Documents/ETL/Queue/AzureQueueStorageEtlTestBase.cs b/test/SlowTests/Server/Documents/ETL/Queue/AzureQueueStorageEtlTestBase.cs new file mode 100644 index 000000000000..7527c46a7c3a --- /dev/null +++ b/test/SlowTests/Server/Documents/ETL/Queue/AzureQueueStorageEtlTestBase.cs @@ -0,0 +1,95 @@ +using System.Collections.Generic; +using System.Linq; +using Azure.Storage.Queues; +using Azure.Storage.Queues.Models; +using Raven.Client.Documents; +using Raven.Client.Documents.Operations.ETL; +using Raven.Client.Documents.Operations.ETL.Queue; +using Xunit.Abstractions; + +namespace SlowTests.Server.Documents.ETL.Queue; + +public class AzureQueueStorageEtlTestBase : QueueEtlTestBase +{ + public AzureQueueStorageEtlTestBase(ITestOutputHelper output) : base(output) + { + } + + protected string OrdersQueueName => "orders"; + + protected readonly string[] DefaultCollections = { "orders" }; + + protected readonly string ConnectionString = + "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;"; + + protected List DefaultExchanges => new() { new EtlQueue { Name = OrdersQueueName } }; + + protected string DefaultScript => @" +var orderData = { + Id: id(this), + OrderLinesCount: this.OrderLines.length, + TotalCost: 0 +}; + +for (var i = 0; i < this.OrderLines.length; i++) { + var line = this.OrderLines[i]; + orderData.TotalCost += line.Cost*line.Quantity; +} +loadToOrders" + @"(orderData); +"; + + protected QueueEtlConfiguration SetupQueueEtlToAzureQueueStorageOnline(DocumentStore store, string script, + IEnumerable collections, IEnumerable queues = null, bool applyToAllDocuments = false, + string configurationName = null, + string transformationName = null, + Dictionary configuration = null, string connectionString = null, + bool skipAutomaticQueueDeclaration = false) + { + var connectionStringName = $"{store.Database}@{store.Urls.First()} to AzureQueueStorage"; + + Transformation transformation = new Transformation + { + Name = transformationName ?? $"ETL : {connectionStringName}", + Collections = new List(collections), + Script = script, + ApplyToAllDocuments = applyToAllDocuments + }; + var config = new QueueEtlConfiguration + { + Name = configurationName ?? connectionStringName, + ConnectionStringName = connectionStringName, + Transforms = { transformation }, + Queues = queues?.ToList(), + BrokerType = QueueBrokerType.AzureQueueStorage, + SkipAutomaticQueueDeclaration = skipAutomaticQueueDeclaration + }; + + Etl.AddEtl(store, config, + new QueueConnectionString + { + Name = connectionStringName, + BrokerType = QueueBrokerType.AzureQueueStorage, + AzureQueueStorageConnectionSettings = new AzureQueueStorageConnectionSettings + { + Authentication = new global::Raven.Client.Documents.Operations.ETL.Queue.Authentication + { + ConnectionString = ConnectionString + } + } + }); + return config; + } + + protected static QueueClient CreateAzureQueueStorageClient(string connectionString, string queueName) + { + return new QueueClient(connectionString, queueName, + new QueueClientOptions() { MessageEncoding = QueueMessageEncoding.Base64 }); + } + + protected static QueueMessage[] ReceiveAndDeleteMessages(QueueClient queueClient, int numberOfMessages = 1) + { + var messages = queueClient.ReceiveMessages(numberOfMessages); + messages.Value.ToList().ForEach(message => queueClient.DeleteMessage(message.MessageId, message.PopReceipt)); + return messages.Value.ToArray(); + } +} diff --git a/test/SlowTests/Server/Documents/ETL/Queue/AzureQueueStorageEtlTests.cs b/test/SlowTests/Server/Documents/ETL/Queue/AzureQueueStorageEtlTests.cs new file mode 100644 index 000000000000..dde64bcfe311 --- /dev/null +++ b/test/SlowTests/Server/Documents/ETL/Queue/AzureQueueStorageEtlTests.cs @@ -0,0 +1,465 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Azure.Storage.Queues; +using Azure.Storage.Queues.Models; +using Newtonsoft.Json; +using Raven.Client.Documents.Operations.ConnectionStrings; +using Raven.Client.Documents.Operations.ETL; +using Raven.Client.Documents.Operations.ETL.Queue; +using Raven.Client.Documents.Smuggler; +using Raven.Client.Exceptions.Sharding; +using Raven.Client.ServerWide.Operations; +using Raven.Server.Documents.ETL.Providers.Queue; +using Raven.Server.Documents.ETL.Providers.Queue.Test; +using Raven.Server.ServerWide.Context; +using Tests.Infrastructure; +using Xunit; +using Xunit.Abstractions; +using QueueItem = Raven.Server.Documents.ETL.Providers.Queue.QueueItem; + +namespace SlowTests.Server.Documents.ETL.Queue; + +public class AzureQueueStorageEtlTests : AzureQueueStorageEtlTestBase +{ + public AzureQueueStorageEtlTests(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public void SimpleScript() + { + using (var store = GetDocumentStore()) + { + var config = SetupQueueEtlToAzureQueueStorageOnline(store, DefaultScript, DefaultCollections); + var etlDone = Etl.WaitForEtlToComplete(store); + + using (var session = store.OpenSession()) + { + session.Store(new Order + { + Id = "orders/1-A", + OrderLines = new List + { + new OrderLine { Cost = 3, Product = "Milk", Quantity = 2 }, + new OrderLine { Cost = 4, Product = "Bear", Quantity = 1 }, + } + }); + session.SaveChanges(); + } + + AssertEtlDone(etlDone, TimeSpan.FromMinutes(1), store.Database, config); + + QueueClient queueClient = CreateAzureQueueStorageClient(ConnectionString, OrdersQueueName); + var message = ReceiveAndDeleteMessages(queueClient).Single(); + var order = JsonConvert.DeserializeObject(message.MessageText).Data; + + Assert.NotNull(order); + Assert.Equal(order.Id, "orders/1-A"); + Assert.Equal(order.OrderLinesCount, 2); + Assert.Equal(order.TotalCost, 10); + } + } + + [Fact] + public void Error_if_script_does_not_contain_any_loadTo_method() + { + var config = new QueueEtlConfiguration + { + Name = "test", + ConnectionStringName = "test", + BrokerType = QueueBrokerType.RabbitMq, + Transforms = + { + new Transformation + { + Name = "test", Collections = { "Orders" }, Script = @"this.TotalCost = 10;" + } + } + }; + + config.Initialize(new QueueConnectionString + { + Name = "Foo", + BrokerType = QueueBrokerType.RabbitMq, + RabbitMqConnectionSettings = + new RabbitMqConnectionSettings() { ConnectionString = "amqp://guest:guest@localhost:5672/" } + }); + + List errors; + config.Validate(out errors); + + Assert.Equal(1, errors.Count); + + Assert.Equal("No `loadTo()` method call found in 'test' script", errors[0]); + } + + [Fact] + public void ShardedAzureQueueStorageEtlNotSupported() + { + using (var store = Sharding.GetDocumentStore()) + { + var error = Assert.ThrowsAny(() => + { + SetupQueueEtlToAzureQueueStorageOnline(store, DefaultScript, DefaultCollections); + }); + Assert.Contains("Queue ETLs are currently not supported in sharding", error.Message); + } + } + + [Fact] + public void TestAreHeadersPresent() + { + using (var store = GetDocumentStore()) + { + var config = SetupQueueEtlToAzureQueueStorageOnline(store, DefaultScript, DefaultCollections); + var etlDone = Etl.WaitForEtlToComplete(store); + + using (var session = store.OpenSession()) + { + session.Store(new Order + { + Id = "orders/1-A", + OrderLines = new List + { + new OrderLine { Cost = 3, Product = "Milk", Quantity = 2 }, + new OrderLine { Cost = 4, Product = "Bear", Quantity = 1 }, + } + }); + session.SaveChanges(); + } + + AssertEtlDone(etlDone, TimeSpan.FromMinutes(1), store.Database, config); + + QueueClient queueClient = CreateAzureQueueStorageClient(ConnectionString, OrdersQueueName); + var message = ReceiveAndDeleteMessages(queueClient).Single(); + var orderCloudEvent = JsonConvert.DeserializeObject(message.MessageText); + + Assert.True(string.IsNullOrWhiteSpace(orderCloudEvent.Id) == false); + Assert.True(string.IsNullOrWhiteSpace(orderCloudEvent.Specversion) == false); + Assert.True(string.IsNullOrWhiteSpace(orderCloudEvent.Type) == false); + Assert.True(string.IsNullOrWhiteSpace(orderCloudEvent.Source) == false); + } + } + + [Fact] + public void SimpleScriptWithManyDocuments() + { + using var store = GetDocumentStore(); + + var numberOfOrders = 10; + var numberOfLinesPerOrder = 2; + + var config = SetupQueueEtlToAzureQueueStorageOnline(store, DefaultScript, DefaultCollections); + var etlDone = + Etl.WaitForEtlToComplete(store, (n, statistics) => statistics.LastProcessedEtag >= numberOfOrders); + + for (int i = 0; i < numberOfOrders; i++) + { + using (var session = store.OpenSession()) + { + Order order = new Order { OrderLines = new List() }; + + for (int j = 0; j < numberOfLinesPerOrder; j++) + { + order.OrderLines.Add(new OrderLine + { + Cost = j + 1, Product = "foos/" + j, Quantity = (i * j) % 10 + }); + } + + session.Store(order, "orders/" + i); + + session.SaveChanges(); + } + } + + AssertEtlDone(etlDone, TimeSpan.FromMinutes(1), store.Database, config); + + QueueClient queueClient = CreateAzureQueueStorageClient(ConnectionString, OrdersQueueName); + QueueMessage[] messages = ReceiveAndDeleteMessages(queueClient, numberOfOrders); + + List orders = messages + .Select(message => JsonConvert.DeserializeObject(message.MessageText)).ToList(); + + Assert.Equal(numberOfOrders, orders.Count); + + for (int counter = 0; counter < numberOfOrders; counter++) + { + var order = orders.Single(x => x.Data.Id == $"orders/{counter}").Data; + Assert.Equal(order.OrderLinesCount, numberOfLinesPerOrder); + Assert.Equal(order.TotalCost, counter * 2); + } + } + + [Fact] + public void Error_if_script_is_empty() + { + var config = new QueueEtlConfiguration + { + Name = "test", + ConnectionStringName = "test", + BrokerType = QueueBrokerType.AzureQueueStorage, + Transforms = { new Transformation { Name = "test", Collections = { "Orders" }, Script = @"" } } + }; + + config.Initialize(new QueueConnectionString + { + Name = "Foo", + BrokerType = QueueBrokerType.AzureQueueStorage, + AzureQueueStorageConnectionSettings = + new AzureQueueStorageConnectionSettings() + { + Authentication = + new global::Raven.Client.Documents.Operations.ETL.Queue.Authentication() + { + ConnectionString = ConnectionString + } + } + }); + + List errors; + config.Validate(out errors); + + Assert.Equal(1, errors.Count); + + Assert.Equal("Script 'test' must not be empty", errors[0]); + } + + [Fact] + public async Task CanTestScript() + { + using (var store = GetDocumentStore()) + { + using (var session = store.OpenAsyncSession()) + { + await session.StoreAsync(new Order + { + OrderLines = new List + { + new OrderLine { Cost = 3, Product = "Milk", Quantity = 3 }, + new OrderLine { Cost = 4, Product = "Bear", Quantity = 2 }, + } + }); + await session.SaveChangesAsync(); + } + + var result1 = store.Maintenance.Send(new PutConnectionStringOperation( + new QueueConnectionString + { + Name = "simulate", + BrokerType = QueueBrokerType.AzureQueueStorage, + AzureQueueStorageConnectionSettings = + new AzureQueueStorageConnectionSettings() + { + Authentication = + new global::Raven.Client.Documents.Operations.ETL.Queue.Authentication() + { + ConnectionString = ConnectionString + } + } + })); + Assert.NotNull(result1.RaftCommandIndex); + + var database = await GetDatabase(store.Database); + + using (database.DocumentsStorage.ContextPool.AllocateOperationContext( + out DocumentsOperationContext context)) + { + using (QueueEtl.TestScript( + new TestQueueEtlScript + { + DocumentId = "orders/1-A", + Configuration = new QueueEtlConfiguration + { + Name = "simulate", + ConnectionStringName = "simulate", + Queues = { new EtlQueue() { Name = "Orders" } }, + BrokerType = QueueBrokerType.AzureQueueStorage, + Transforms = + { + new Transformation + { + Collections = { "Orders" }, + Name = "Orders", + Script = DefaultScript + } + } + } + }, database, database.ServerStore, context, out var testResult)) + { + var result = (QueueEtlTestScriptResult)testResult; + + Assert.Equal(0, result.TransformationErrors.Count); + + Assert.Equal(1, result.Summary.Count); + + Assert.Equal("Orders", result.Summary[0].QueueName); + Assert.Equal("myRoutingKey", result.Summary[0].Messages[0].RoutingKey); + Assert.Equal("orders/1-A", result.Summary[0].Messages[0].Attributes.Id); + Assert.Equal("com.github.users", result.Summary[0].Messages[0].Attributes.Type); + Assert.Equal("/registrations/direct-signup", + result.Summary[0].Messages[0].Attributes.Source.ToString()); + + Assert.Equal("test output", result.DebugOutput[0]); + } + } + } + } + + [Fact] + public void ShouldDeleteDocumentsAfterProcessing() + { + using (var store = GetDocumentStore()) + { + var config = SetupQueueEtlToAzureQueueStorageOnline(store, + @$"loadToUsers(this)", new[] { "Users" }, + new[] { new EtlQueue { Name = $"Users", DeleteProcessedDocuments = true } }); + + var etlDone = Etl.WaitForEtlToComplete(store); + + using (var session = store.OpenSession()) + { + session.Store(new User { Id = "users/1", Name = "Arek" }); + session.SaveChanges(); + } + + AssertEtlDone(etlDone, TimeSpan.FromMinutes(1), store.Database, config); + + QueueClient queueClient = CreateAzureQueueStorageClient(ConnectionString, "users"); + var message = queueClient.ReceiveMessage(); + var user = JsonConvert.DeserializeObject(message.Value.MessageText).Data; + + Assert.NotNull(user); + Assert.Equal(user.Name, "Arek"); + + using (var session = store.OpenSession()) + { + var entity = session.Load("users/1"); + Assert.Null(entity); + } + } + } + + [Fact] + public async Task ShouldImportTask() + { + using (var srcStore = GetDocumentStore()) + using (var dstStore = GetDocumentStore()) + { + var config = SetupQueueEtlToAzureQueueStorageOnline(srcStore, + DefaultScript, DefaultCollections, + new List() { new() { Name = "Orders", DeleteProcessedDocuments = true } }, + connectionString: ConnectionString); + + var exportFile = GetTempFileName(); + + var exportOperation = await srcStore.Smuggler.ExportAsync(new DatabaseSmugglerExportOptions(), exportFile); + await exportOperation.WaitForCompletionAsync(TimeSpan.FromMinutes(1)); + + var operation = await dstStore.Smuggler.ImportAsync(new DatabaseSmugglerImportOptions(), exportFile); + await operation.WaitForCompletionAsync(TimeSpan.FromMinutes(1)); + + var destinationRecord = + await dstStore.Maintenance.Server.SendAsync(new GetDatabaseRecordOperation(dstStore.Database)); + Assert.Equal(1, destinationRecord.QueueConnectionStrings.Count); + Assert.Equal(1, destinationRecord.QueueEtls.Count); + + Assert.Equal(QueueBrokerType.AzureQueueStorage, destinationRecord.QueueEtls[0].BrokerType); + Assert.Equal(DefaultScript, destinationRecord.QueueEtls[0].Transforms[0].Script); + Assert.Equal(DefaultCollections, destinationRecord.QueueEtls[0].Transforms[0].Collections); + + Assert.Equal(1, destinationRecord.QueueEtls[0].Queues.Count); + Assert.Equal("Orders", destinationRecord.QueueEtls[0].Queues[0].Name); + Assert.True(destinationRecord.QueueEtls[0].Queues[0].DeleteProcessedDocuments); + } + } + + [RavenFact(RavenTestCategory.BackupExportImport | RavenTestCategory.Sharding | RavenTestCategory.Etl)] + public async Task ShouldSkipUnsupportedFeaturesInShardingOnImport_RabbitMqEtl() + { + using (var srcStore = GetDocumentStore()) + using (var dstStore = Sharding.GetDocumentStore()) + { + var config = SetupQueueEtlToAzureQueueStorageOnline(srcStore, + DefaultScript, DefaultCollections, new List() + { + new() + { + Name = "Orders", + DeleteProcessedDocuments = true + } + }, connectionString: ConnectionString); + + var record = await srcStore.Maintenance.Server.SendAsync(new GetDatabaseRecordOperation(srcStore.Database)); + + Assert.NotNull(record.QueueEtls); + Assert.Equal(1, record.QueueEtls.Count); + + var exportFile = GetTempFileName(); + + var exportOperation = await srcStore.Smuggler.ExportAsync(new DatabaseSmugglerExportOptions(), exportFile); + await exportOperation.WaitForCompletionAsync(TimeSpan.FromMinutes(1)); + + var operation = await dstStore.Smuggler.ImportAsync(new DatabaseSmugglerImportOptions(), exportFile); + await operation.WaitForCompletionAsync(TimeSpan.FromMinutes(1)); + + record = await dstStore.Maintenance.Server.SendAsync(new GetDatabaseRecordOperation(dstStore.Database)); + + Assert.Empty(record.QueueEtls); + } + } + + private class Order + { + public string Id { get; set; } + public List OrderLines { get; set; } + } + + private class OrderData + { + public string Id { get; set; } + public int OrderLinesCount { get; set; } + public int TotalCost { get; set; } + } + + private class OrderLine + { + public string Product { get; set; } + public int Quantity { get; set; } + public int Cost { get; set; } + } + + private class User + { + public string Id { get; set; } + public string Name { get; set; } + } + + private class Person + { + public string Id { get; set; } + public string Name { get; set; } + } + + private class UserData + { + public string UserId { get; set; } + public string Name { get; set; } + } + + private class CloudEventOrderData + { + public string Id { get; set; } + public string Specversion { get; set; } + public string Type { get; set; } + public string Source { get; set; } + public OrderData Data { get; set; } + } + + private class CloudEventUserData + { + public UserData Data { get; set; } + } +}