Skip to content

Commit

Permalink
RavenDB-22429 Aws sqs wip
Browse files Browse the repository at this point in the history
  • Loading branch information
djordjedjukic committed Aug 7, 2024
1 parent 7adfcf9 commit c4f9395
Show file tree
Hide file tree
Showing 18 changed files with 456 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace Raven.Client.Documents.Operations.ETL.Queue;

public sealed class AwsSqsConnectionSettings
{
public Basic Basic { get; set; }

//public Passwordless Passwordless { get; set; }

public bool IsValidConnection() { return true; } // TODO djordje: implement
}

public class Basic
{
public string AccessKey { get; set; }

public string SecretKey { get; set; }

public bool IsValid()
{
return string.IsNullOrWhiteSpace(AccessKey) == false &&
string.IsNullOrWhiteSpace(AccessKey) == false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ public enum QueueBrokerType
None,
Kafka,
RabbitMq,
AzureQueueStorage
AzureQueueStorage,
AwsSqs
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public sealed class QueueConnectionString : ConnectionString

public AzureQueueStorageConnectionSettings AzureQueueStorageConnectionSettings { get; set; }

public AwsSqsConnectionSettings AwsSqsConnectionSettings { get; set; }

public override ConnectionStringType Type => ConnectionStringType.Queue;

protected override void ValidateImpl(ref List<string> errors)
Expand All @@ -39,6 +41,12 @@ protected override void ValidateImpl(ref List<string> errors)
errors.Add($"{nameof(AzureQueueStorageConnectionSettings)} has no valid setting.");
}
break;
case QueueBrokerType.AwsSqs:
if (AwsSqsConnectionSettings.IsValidConnection() == false)
{
errors.Add($"{nameof(AwsSqsConnectionSettings)} has no valid setting.");
}
break;
default:
throw new NotSupportedException($"'{BrokerType}' broker is not supported");
}
Expand All @@ -63,6 +71,9 @@ internal string GetUrl()
case QueueBrokerType.AzureQueueStorage:
url = AzureQueueStorageConnectionSettings.GetStorageUrl();
break;
case QueueBrokerType.AwsSqs:
url = ""; // TODO djordje: implement
break;
default:
throw new NotSupportedException($"'{BrokerType}' broker is not supported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public override bool UsingEncryptedCommunicationChannel()
case QueueBrokerType.AzureQueueStorage:
return Connection.AzureQueueStorageConnectionSettings.GetStorageUrl()
.StartsWith("https", StringComparison.OrdinalIgnoreCase);
case QueueBrokerType.AwsSqs:
return true; // TODO djordje: implement
default:
throw new NotSupportedException($"Unknown broker type: {BrokerType}");
}
Expand Down
9 changes: 7 additions & 2 deletions src/Raven.Server/Dashboard/DatabasesInfoRetriever.cs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,10 @@ private static DatabaseOngoingTasksInfoItem GetOngoingTasksInfoItem(DocumentData
long azureQueueStorageEtlCountOnNode = GetTaskCountOnNode<QueueEtlConfiguration>(database, dbRecord, serverStore, database.EtlLoader.QueueDestinations,
task => EtlLoader.GetProcessState(task.Transforms, database, task.Name), task => task.BrokerType == QueueBrokerType.AzureQueueStorage);

var awsSqsEtlCount = database.EtlLoader.GetQueueDestinationCountByBroker(QueueBrokerType.AwsSqs);
long awsSqsEtlCountOnNode = GetTaskCountOnNode<QueueEtlConfiguration>(database, dbRecord, serverStore, database.EtlLoader.QueueDestinations,
task => EtlLoader.GetProcessState(task.Transforms, database, task.Name), task => task.BrokerType == QueueBrokerType.AwsSqs);

var periodicBackupCount = database.PeriodicBackupRunner.PeriodicBackups.Count;
long periodicBackupCountOnNode = BackupUtils.GetTasksCountOnNode(serverStore, database.Name, context);

Expand All @@ -369,8 +373,8 @@ private static DatabaseOngoingTasksInfoItem GetOngoingTasksInfoItem(DocumentData

ongoingTasksCount = extRepCount + replicationHubCount + replicationSinkCount +
ravenEtlCount + sqlEtlCount + elasticSearchEtlCount + olapEtlCount + kafkaEtlCount +
rabbitMqEtlCount + azureQueueStorageEtlCount + periodicBackupCount + subscriptionCount +
kafkaSinkCount + rabbitMqSinkCount;
rabbitMqEtlCount + azureQueueStorageEtlCount + awsSqsEtlCount + periodicBackupCount +
subscriptionCount + kafkaSinkCount + rabbitMqSinkCount;

return new DatabaseOngoingTasksInfoItem
{
Expand All @@ -385,6 +389,7 @@ private static DatabaseOngoingTasksInfoItem GetOngoingTasksInfoItem(DocumentData
KafkaEtlCount = kafkaEtlCountOnNode,
RabbitMqEtlCount = rabbitMqEtlCountOnNode,
AzureQueueStorageEtlCount = azureQueueStorageEtlCountOnNode,
AwsSqsEtlCount = awsSqsEtlCountOnNode,
PeriodicBackupCount = periodicBackupCountOnNode,
SubscriptionCount = subscriptionCountOnNode,
KafkaSinkCount = kafkaSinkCountOnNode,
Expand Down
5 changes: 4 additions & 1 deletion src/Raven.Server/Dashboard/DatabasesOngoingTasksInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public sealed class DatabaseOngoingTasksInfoItem : IDynamicJson
public long RabbitMqEtlCount { get; set; }

public long AzureQueueStorageEtlCount { get; set; }

public long AwsSqsEtlCount { get; set; }

public long PeriodicBackupCount { get; set; }

Expand All @@ -60,10 +62,11 @@ public DynamicJsonValue ToJson()
[nameof(KafkaEtlCount)] = KafkaEtlCount,
[nameof(RabbitMqEtlCount)] = RabbitMqEtlCount,
[nameof(AzureQueueStorageEtlCount)] = AzureQueueStorageEtlCount,
[nameof(AwsSqsEtlCount)] = AwsSqsEtlCount,
[nameof(PeriodicBackupCount)] = PeriodicBackupCount,
[nameof(SubscriptionCount)] = SubscriptionCount,
[nameof(KafkaSinkCount)] = KafkaSinkCount,
[nameof(RabbitMqSinkCount)] = RabbitMqSinkCount,
[nameof(RabbitMqSinkCount)] = RabbitMqSinkCount
};
}
}
Expand Down
31 changes: 31 additions & 0 deletions src/Raven.Server/Documents/ETL/EtlLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using Raven.Server.Documents.ETL.Providers.ElasticSearch;
using Raven.Server.Documents.ETL.Providers.OLAP;
using Raven.Server.Documents.ETL.Providers.Queue;
using Raven.Server.Documents.ETL.Providers.Queue.AwsSqs;
using Raven.Server.Documents.ETL.Providers.Queue.AzureQueueStorage;
using Raven.Server.Documents.ETL.Providers.Queue.Kafka;
using Raven.Server.Documents.ETL.Providers.Queue.RabbitMq;
Expand Down Expand Up @@ -691,6 +692,29 @@ public void HandleDatabaseRecordChange(DatabaseRecord record)

break;
}
case AwsSqsEtl awsSqsEtl:
{
QueueEtlConfiguration existing = null;

foreach (var config in myQueueEtl)
{
var diff = awsSqsEtl.Configuration.Compare(config);

if (diff == EtlConfigurationCompareDifferences.None)
{
existing = config;
break;
}
}

if (existing != null)
{
toRemove.Remove(processesPerConfig.Key);
myQueueEtl.Remove(existing);
}

break;
}
default:
throw new InvalidOperationException($"Unknown ETL process type: {process.GetType()}");
}
Expand Down Expand Up @@ -825,6 +849,13 @@ private static string GetStopReason(
if (existing != null)
differences = azureQueueStorageEtl.Configuration.Compare(existing, transformationDiffs);
}
else if (process is AwsSqsEtl awsSqsEtl)
{
var existing = myQueueEtl.FirstOrDefault(x => x.Name.Equals(awsSqsEtl.ConfigurationName, StringComparison.OrdinalIgnoreCase));

if (existing != null)
differences = awsSqsEtl.Configuration.Compare(existing, transformationDiffs);
}
else
{
throw new InvalidOperationException($"Unknown ETL process type: " + process.GetType().FullName);
Expand Down
16 changes: 16 additions & 0 deletions src/Raven.Server/Documents/ETL/EtlProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
using Raven.Server.Documents.ETL.Providers.OLAP;
using Raven.Server.Documents.ETL.Providers.OLAP.Test;
using Raven.Server.Documents.ETL.Providers.Queue;
using Raven.Server.Documents.ETL.Providers.Queue.AwsSqs;
using Raven.Server.Documents.ETL.Providers.Queue.AzureQueueStorage;
using Raven.Server.Documents.ETL.Providers.Queue.Kafka;
using Raven.Server.Documents.ETL.Providers.Queue.RabbitMq;
Expand Down Expand Up @@ -1336,6 +1337,21 @@ public static TestEtlScriptResult TestScript<TC, TCS>(
var result = azureQueueStorageEtl.RunTest(results, context);
result.DebugOutput = debugOutput;

return result;
}
case AwsSqsEtl awsSqsEtl:
using (awsSqsEtl.EnterTestMode(out debugOutput))
{
awsSqsEtl.EnsureThreadAllocationStats();

var queueItem = new QueueItem(document, docCollection);

var results = awsSqsEtl.Transform(new[] { queueItem }, context, new EtlStatsScope(new EtlRunStats()),
new EtlProcessState());

var result = awsSqsEtl.RunTest(results, context);
result.DebugOutput = debugOutput;

return result;
}
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Threading.Tasks;
using JetBrains.Annotations;
using Raven.Client.Documents.Operations.ETL.Queue;
using Raven.Server.Documents.ETL.Providers.Queue.AwsSqs;
using Raven.Server.Documents.ETL.Providers.Queue.AzureQueueStorage;
using Raven.Server.Documents.ETL.Providers.Queue.Kafka;
using Raven.Server.Documents.ETL.Providers.Queue.RabbitMq;
Expand Down Expand Up @@ -39,6 +40,7 @@ protected override async ValueTask HandleCurrentNodeAsync()
RabbitMqEtl => QueueBrokerType.RabbitMq,
KafkaEtl => QueueBrokerType.Kafka,
AzureQueueStorageEtl => QueueBrokerType.AzureQueueStorage,
AwsSqsEtl => QueueBrokerType.AwsSqs,
_ => null
}
}).ToArray();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using Jint.Runtime.Interop;
using Raven.Client.Documents.Operations.ETL;
using Raven.Client.Documents.Operations.ETL.Queue;
using Raven.Server.Documents.ETL.Providers.Queue.AzureQueueStorage;
using Raven.Server.Documents.Patch;
using Raven.Server.ServerWide.Context;

namespace Raven.Server.Documents.ETL.Providers.Queue.AwsSqs;

public sealed class AwsSqsDocumentTransformer<T> : QueueDocumentTransformer<T, AzureQueueStorageItem>
where T : QueueItem
{
public AwsSqsDocumentTransformer(Transformation transformation, DocumentDatabase database,
DocumentsOperationContext context, QueueEtlConfiguration config) : base(transformation, database, context,
config)
{
}

protected override void LoadToFunction(string queueName, ScriptRunnerResult document)
{
LoadToFunction(queueName, document, null);
}

protected override void LoadToFunction(string queueName, ScriptRunnerResult document, CloudEventAttributes attributes)
{
if (queueName == null)
ThrowLoadParameterIsMandatory(nameof(queueName));

var result = document.TranslateToObject(Context);

var queue = GetOrAdd(queueName);

queue.Items.Add(new AzureQueueStorageItem(Current) { TransformationResult = result, Attributes = attributes });
}

public override void Initialize(bool debugMode)
{
base.Initialize(debugMode);

DocumentScript.ScriptEngine.SetValue(Transformation.LoadTo,
new ClrFunction(DocumentScript.ScriptEngine, Transformation.LoadTo,
LoadToFunctionTranslatorWithAttributes));

foreach (var queueName in LoadToDestinations)
{
var name = Transformation.LoadTo + queueName;

DocumentScript.ScriptEngine.SetValue(name, new ClrFunction(DocumentScript.ScriptEngine, name,
(self, args) => LoadToFunctionTranslatorWithAttributes(queueName, args)));
}
}
}
Loading

0 comments on commit c4f9395

Please sign in to comment.