Skip to content

Commit

Permalink
RavenDB-21956 azure queue storage wip
Browse files Browse the repository at this point in the history
RavenDB-21956 azure queue storage etl - wip
  • Loading branch information
Djordje Djukic authored and djordjedjukic committed Mar 26, 2024
1 parent df870d3 commit 2ed871d
Show file tree
Hide file tree
Showing 13 changed files with 535 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using Sparrow.Json.Parsing;

namespace Raven.Client.Documents.Operations.ETL.Queue;

public sealed class AzureQueueStorageConnectionSettings
{
// TODO djordje: what should be input, minutes?
public int TimeToLive { get; set; }

public int VisibilityTimeout { get; set; }

public Authentication Authentication;

public DynamicJsonValue ToJson()
{
var json = new DynamicJsonValue
{
[nameof(TimeToLive)] = TimeToLive,
[nameof(VisibilityTimeout)] = VisibilityTimeout,
[nameof(Authentication)] = Authentication == null
? null
: new DynamicJsonValue
{
[nameof(Authentication.EntraId)] =
Authentication.EntraId == null
? null
: new DynamicJsonValue
{
[nameof(Authentication.EntraId.StorageAccountName)] =
Authentication?.EntraId?.StorageAccountName,
[nameof(Authentication.EntraId.TenantId)] =
Authentication?.EntraId?.TenantId,
[nameof(Authentication.EntraId.ClientId)] =
Authentication?.EntraId?.ClientId,
[nameof(Authentication.EntraId.ClientSecret)] =
Authentication?.EntraId?.ClientSecret
},
[nameof(Authentication.ConnectionString)] = Authentication.ConnectionString == null
? null
: new DynamicJsonValue
{
[nameof(Authentication.ConnectionString)] = Authentication?.ConnectionString
}
}
};

return json;
}
}

public sealed class Authentication
{
public EntraId EntraId { get; set; }
public string ConnectionString { get; set; }
}

public sealed class EntraId
{
public string StorageAccountName { get; set; }
public string TenantId { get; set; }
public string ClientId { get; set; }
public string ClientSecret { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ public enum QueueBrokerType
{
None,
Kafka,
RabbitMq
RabbitMq,
AzureQueueStorage
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public sealed class QueueConnectionString : ConnectionString
public KafkaConnectionSettings KafkaConnectionSettings { get; set; }

public RabbitMqConnectionSettings RabbitMqConnectionSettings { get; set; }

public AzureQueueStorageConnectionSettings AzureQueueStorageConnectionSettings { get; set; }

public override ConnectionStringType Type => ConnectionStringType.Queue;

Expand All @@ -31,13 +33,20 @@ protected override void ValidateImpl(ref List<string> errors)
errors.Add($"{nameof(RabbitMqConnectionSettings)} has no valid setting.");
}
break;
case QueueBrokerType.AzureQueueStorage:
if (AzureQueueStorageConnectionSettings?.Authentication == null)
{
errors.Add($"{nameof(AzureQueueStorageConnectionSettings)} has no valid setting.");
}
break;
default:
throw new NotSupportedException($"'{BrokerType}' broker is not supported");
}
}

internal string GetUrl()
{
// TODO djordje: what to do with this, no connection string defined in this moment?
string url;

switch (BrokerType)
Expand All @@ -52,6 +61,9 @@ internal string GetUrl()

url = indexOfStartServerUri != -1 ? connectionString.Substring(indexOfStartServerUri + 1) : null;
break;
case QueueBrokerType.AzureQueueStorage:
url = "azure-queue-storage";
break;
default:
throw new NotSupportedException($"'{BrokerType}' broker is not supported");
}
Expand All @@ -66,6 +78,7 @@ public override DynamicJsonValue ToJson()
json[nameof(BrokerType)] = BrokerType;
json[nameof(KafkaConnectionSettings)] = KafkaConnectionSettings?.ToJson();
json[nameof(RabbitMqConnectionSettings)] = RabbitMqConnectionSettings?.ToJson();
json[nameof(AzureQueueStorageConnectionSettings)] = AzureQueueStorageConnectionSettings?.ToJson();

return json;
}
Expand Down
4 changes: 3 additions & 1 deletion src/Raven.Server/Commercial/LicenseStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ public bool CanAutoRenewLetsEncryptCertificate

public bool HasElasticSearchEtl => Enabled(LicenseAttribute.ElasticSearchEtl);

public bool HasQueueEtl => Enabled(LicenseAttribute.QueueEtl);
//public bool HasQueueEtl => Enabled(LicenseAttribute.QueueEtl);
public bool HasQueueEtl => true;


public bool HasPowerBI => Enabled(LicenseAttribute.PowerBI);

Expand Down
31 changes: 31 additions & 0 deletions src/Raven.Server/Documents/ETL/EtlLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using Raven.Server.Documents.ETL.Providers.ElasticSearch;
using Raven.Server.Documents.ETL.Providers.OLAP;
using Raven.Server.Documents.ETL.Providers.Queue;
using Raven.Server.Documents.ETL.Providers.Queue.AzureQueueStorage;
using Raven.Server.Documents.ETL.Providers.Queue.Kafka;
using Raven.Server.Documents.ETL.Providers.Queue.RabbitMq;
using Raven.Server.Documents.ETL.Providers.Raven;
Expand Down Expand Up @@ -646,6 +647,29 @@ public void HandleDatabaseRecordChange(DatabaseRecord record)

break;
}
case AzureQueueStorageEtl azureQueueStorageEtl:
{
QueueEtlConfiguration existing = null;

foreach (var config in myQueueEtl)
{
var diff = azureQueueStorageEtl.Configuration.Compare(config);

if (diff == EtlConfigurationCompareDifferences.None)
{
existing = config;
break;
}
}

if (existing != null)
{
toRemove.Remove(processesPerConfig.Key);
myQueueEtl.Remove(existing);
}

break;
}
case ElasticSearchEtl elasticSearchEtl:
{
ElasticSearchEtlConfiguration existing = null;
Expand Down Expand Up @@ -794,6 +818,13 @@ private static string GetStopReason(
if (existing != null)
differences = rabbitMqEtl.Configuration.Compare(existing, transformationDiffs);
}
else if (process is AzureQueueStorageEtl azureQueueStorageEtl)
{
var existing = myQueueEtl.FirstOrDefault(x => x.Name.Equals(azureQueueStorageEtl.ConfigurationName, StringComparison.OrdinalIgnoreCase));

if (existing != null)
differences = azureQueueStorageEtl.Configuration.Compare(existing, transformationDiffs);
}
else
{
throw new InvalidOperationException($"Unknown ETL process type: " + process.GetType().FullName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
using Jint;
using Jint.Native;
using Jint.Native.Object;
using Jint.Runtime.Interop;
using Raven.Client.Documents.Operations.ETL;
using Raven.Client.Documents.Operations.ETL.Queue;
using Raven.Server.Documents.Patch;
using Raven.Server.ServerWide.Context;

namespace Raven.Server.Documents.ETL.Providers.Queue.AzureQueueStorage;

public sealed class AzureQueueStorageDocumentTransformer<T> : QueueDocumentTransformer<T, AzureQueueStorageItem>
where T : QueueItem
{
public AzureQueueStorageDocumentTransformer(Transformation transformation, DocumentDatabase database,
DocumentsOperationContext context, QueueEtlConfiguration config) : base(transformation, database, context,
config)
{
}

protected override void LoadToFunction(string queueName, ScriptRunnerResult document)
{
LoadToFunction(queueName, document, null);
}

private void LoadToFunction(string queueName, ScriptRunnerResult document, CloudEventAttributes attributes)
{
if (queueName == null)
ThrowLoadParameterIsMandatory(nameof(queueName));

var result = document.TranslateToObject(Context);

var topic = GetOrAdd(queueName);

topic.Items.Add(new AzureQueueStorageItem(Current) { TransformationResult = result, Attributes = attributes });
}

public override void Initialize(bool debugMode)
{
base.Initialize(debugMode);

DocumentScript.ScriptEngine.SetValue(Transformation.LoadTo,
new ClrFunction(DocumentScript.ScriptEngine, Transformation.LoadTo,
LoadToFunctionTranslatorWithAttributes));

foreach (var queueName in LoadToDestinations)
{
var name = Transformation.LoadTo + queueName;

DocumentScript.ScriptEngine.SetValue(name, new ClrFunction(DocumentScript.ScriptEngine, name,
(self, args) => LoadToFunctionTranslatorWithAttributes(queueName, args)));
}
}

private JsValue LoadToFunctionTranslatorWithAttributes(JsValue self, JsValue[] args)
{
var methodSignature = "loadTo(name, obj, attributes)";

if (args.Length != 2 && args.Length != 3)
ThrowInvalidScriptMethodCall($"{methodSignature} must be called with 2 or 3 parameters");

if (args[0].IsString() == false)
ThrowInvalidScriptMethodCall($"{methodSignature} first argument must be a string");

if (args[1].IsObject() == false)
ThrowInvalidScriptMethodCall($"{methodSignature} second argument must be an object");

if (args.Length == 3 && args[2].IsObject() == false)
ThrowInvalidScriptMethodCall($"{methodSignature} third argument must be an object");

return LoadToFunctionTranslatorWithAttributesInternal(args[0].AsString(), args[1].AsObject(),
args.Length == 3 ? args[2].AsObject() : null);
}

private JsValue LoadToFunctionTranslatorWithAttributes(string name, JsValue[] args)
{
var methodSignature = $"loadTo{name}(obj, attributes)";

if (args.Length != 1 && args.Length != 2)
ThrowInvalidScriptMethodCall($"{methodSignature} must be called with with 1 or 2 parameters");

if (args[0].IsObject() == false)
ThrowInvalidScriptMethodCall($"{methodSignature} argument 'obj' must be an object");

if (args.Length == 2 && args[1].IsObject() == false)
ThrowInvalidScriptMethodCall($"{methodSignature} argument 'attributes' must be an object");

return LoadToFunctionTranslatorWithAttributesInternal(name, args[0].AsObject(),
args.Length == 2 ? args[1].AsObject() : null);
}

private JsValue LoadToFunctionTranslatorWithAttributesInternal(string name, ObjectInstance obj,
ObjectInstance attributes)
{
var result = new ScriptRunnerResult(DocumentScript, obj);

CloudEventAttributes cloudEventAttributes = null;

if (attributes != null)
cloudEventAttributes = GetCloudEventAttributes(attributes);

LoadToFunction(name, result, cloudEventAttributes);

return result.Instance;
}
}
Loading

0 comments on commit 2ed871d

Please sign in to comment.