Skip to content

Commit

Permalink
RavenDB-21956 Fix PR issues
Browse files Browse the repository at this point in the history
  • Loading branch information
djordjedjukic committed Apr 2, 2024
1 parent 62dccca commit 051ba22
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ namespace Raven.Client.Documents.Operations.ETL.Queue;

public sealed class AzureQueueStorageConnectionSettings
{
public Authentication Authentication;
public EntraId EntraId { get; set; }

public string ConnectionString { get; set; }

public bool Passwordless { get; set; }

public string GetStorageUrl()
{
Expand All @@ -18,22 +22,22 @@ public string GetStorageAccountName()
{
string storageAccountName = "";

if (Authentication.ConnectionString != null)
if (ConnectionString != null)
{
var accountNamePart = Authentication.ConnectionString.Split(';')
var accountNamePart = ConnectionString.Split(';')
.FirstOrDefault(part => part.StartsWith("AccountName=", StringComparison.OrdinalIgnoreCase));

if (accountNamePart == null)
{
throw new ArgumentException("Storage account name not found in the connection string.",
nameof(Authentication.ConnectionString));
nameof(ConnectionString));
}

storageAccountName = accountNamePart.Substring("AccountName=".Length);
}
else if (Authentication.EntraId != null)
else if (EntraId != null)
{
storageAccountName = Authentication.EntraId.StorageAccountName;
storageAccountName = EntraId.StorageAccountName;
}

return storageAccountName;
Expand All @@ -43,40 +47,24 @@ public DynamicJsonValue ToJson()
{
var json = new DynamicJsonValue
{
[nameof(Authentication)] = Authentication == null
[nameof(ConnectionString)] = ConnectionString,
[nameof(Passwordless)] = Passwordless,
[nameof(EntraId)] = EntraId == null
? null
: new DynamicJsonValue
{
[nameof(Authentication.ConnectionString)] = Authentication.ConnectionString,
[nameof(Authentication.Passwordless)] = Authentication.Passwordless,
[nameof(Authentication.EntraId)] =
Authentication.EntraId == null
? null
: new DynamicJsonValue
{
[nameof(Authentication.EntraId.StorageAccountName)] =
Authentication?.EntraId?.StorageAccountName,
[nameof(Authentication.EntraId.TenantId)] =
Authentication?.EntraId?.TenantId,
[nameof(Authentication.EntraId.ClientId)] =
Authentication?.EntraId?.ClientId,
[nameof(Authentication.EntraId.ClientSecret)] =
Authentication?.EntraId?.ClientSecret
}
[nameof(EntraId.StorageAccountName)] = EntraId?.StorageAccountName,
[nameof(EntraId.TenantId)] = EntraId?.TenantId,
[nameof(EntraId.ClientId)] = EntraId?.ClientId,
[nameof(EntraId.ClientSecret)] = EntraId?.ClientSecret
}
};


return json;
}
}

public sealed class Authentication
{
public EntraId EntraId { get; set; }
public string ConnectionString { get; set; }
public bool Passwordless { get; set; }
}

public sealed class EntraId
{
public string StorageAccountName { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ protected override void ValidateImpl(ref List<string> errors)
}
break;
case QueueBrokerType.AzureQueueStorage:
if (AzureQueueStorageConnectionSettings?.Authentication == null)
//todo djordje: better validation, at least one auth method
if (AzureQueueStorageConnectionSettings == null)
{
errors.Add($"{nameof(AzureQueueStorageConnectionSettings)} has no valid setting.");
}
Expand Down
4 changes: 1 addition & 3 deletions src/Raven.Server/Commercial/LicenseStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,7 @@ public bool CanAutoRenewLetsEncryptCertificate

public bool HasElasticSearchEtl => Enabled(LicenseAttribute.ElasticSearchEtl);

//public bool HasQueueEtl => Enabled(LicenseAttribute.QueueEtl);
public bool HasQueueEtl => true;

public bool HasQueueEtl => Enabled(LicenseAttribute.QueueEtl);

public bool HasPowerBI => Enabled(LicenseAttribute.PowerBI);

Expand Down
2 changes: 1 addition & 1 deletion src/Raven.Server/Config/Categories/EtlConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public sealed class EtlConfiguration : ConfigurationCategory
public TimeSetting KafkaInitTransactionsTimeout { get; set; }

[Description("Lifespan of a message in the queue")]
[DefaultValue(604800)]
[DefaultValue(604800)] // 7 days (Azure default)
[TimeUnit(TimeUnit.Seconds)]
[ConfigurationEntry("ETL.Queue.AzureQueueStorage.TimeToLiveInSec", ConfigurationEntryScope.ServerWideOrPerDatabase)]
public TimeSetting AzureQueueStorageTimeToLive{ get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ public override async ValueTask ExecuteAsync()
try
{
string authenticationJson = await new StreamReader(HttpContext.Request.Body).ReadToEndAsync();
Authentication authentication = JsonConvert.DeserializeObject<Authentication>(authenticationJson);

var connectionSettings = new AzureQueueStorageConnectionSettings() { Authentication = authentication };
AzureQueueStorageConnectionSettings connectionSettings =
JsonConvert.DeserializeObject<AzureQueueStorageConnectionSettings>(authenticationJson);

QueueServiceClient client =
QueueBrokerConnectionHelper.CreateAzureQueueStorageServiceClient(connectionSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,24 +94,24 @@ public static QueueClient CreateAzureQueueStorageClient(
{
QueueClient queueClient;

if (azureQueueStorageConnectionSettings.Authentication.ConnectionString != null)
if (azureQueueStorageConnectionSettings.ConnectionString != null)
{
queueClient = new QueueClient(azureQueueStorageConnectionSettings.Authentication.ConnectionString,
queueClient = new QueueClient(azureQueueStorageConnectionSettings.ConnectionString,
queueName);
}

else if (azureQueueStorageConnectionSettings.Authentication.EntraId != null)
else if (azureQueueStorageConnectionSettings.EntraId != null)
{
var queueUri = new Uri($"{azureQueueStorageConnectionSettings.GetStorageUrl()}{queueName}");

queueClient = new QueueClient(
queueUri,
new ClientSecretCredential(
azureQueueStorageConnectionSettings.Authentication.EntraId.TenantId,
azureQueueStorageConnectionSettings.Authentication.EntraId.ClientId,
azureQueueStorageConnectionSettings.Authentication.EntraId.ClientSecret));
azureQueueStorageConnectionSettings.EntraId.TenantId,
azureQueueStorageConnectionSettings.EntraId.ClientId,
azureQueueStorageConnectionSettings.EntraId.ClientSecret));
}
else if(azureQueueStorageConnectionSettings.Authentication.Passwordless)
else if(azureQueueStorageConnectionSettings.Passwordless)
{
var queueUri = new Uri($"{azureQueueStorageConnectionSettings.GetStorageUrl()}{queueName}");
queueClient = new QueueClient(queueUri, new DefaultAzureCredential());
Expand All @@ -129,22 +129,22 @@ public static QueueServiceClient CreateAzureQueueStorageServiceClient(
{
QueueServiceClient queueServiceClient = null;

if (azureQueueStorageConnectionSettings.Authentication.ConnectionString != null)
if (azureQueueStorageConnectionSettings.ConnectionString != null)
{
queueServiceClient =
new QueueServiceClient(azureQueueStorageConnectionSettings.Authentication.ConnectionString);
new QueueServiceClient(azureQueueStorageConnectionSettings.ConnectionString);
}

else if (azureQueueStorageConnectionSettings.Authentication.EntraId != null)
else if (azureQueueStorageConnectionSettings.EntraId != null)
{
var queueUri = new Uri(azureQueueStorageConnectionSettings.GetStorageUrl());

queueServiceClient = new QueueServiceClient(
queueUri,
new ClientSecretCredential(
azureQueueStorageConnectionSettings.Authentication.EntraId.TenantId,
azureQueueStorageConnectionSettings.Authentication.EntraId.ClientId,
azureQueueStorageConnectionSettings.Authentication.EntraId.ClientSecret));
azureQueueStorageConnectionSettings.EntraId.TenantId,
azureQueueStorageConnectionSettings.EntraId.ClientId,
azureQueueStorageConnectionSettings.EntraId.ClientSecret));
}

return queueServiceClient;
Expand Down
5 changes: 1 addition & 4 deletions src/Raven.Server/Raven.Server.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,7 @@
<FrameworkReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="AWSSDK.Core" Version="3.7.302.16" />
<PackageReference Include="AWSSDK.Glacier" Version="3.7.300.55" />
<PackageReference Include="AWSSDK.S3" Version="3.7.305.31" />
<PackageReference Include="AWSSDK.Core" Version="3.7.302.9" />
<PackageReference Include="AWSSDK.Glacier" Version="3.7.300.49" />
<PackageReference Include="AWSSDK.S3" Version="3.7.305.25" />
<PackageReference Include="AWSSDK.S3" Version="3.7.305.31" />
<PackageReference Include="Azure.Identity" Version="1.10.4" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.19.1" />
<PackageReference Include="Azure.Storage.Queues" Version="12.17.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,7 @@ protected QueueEtlConfiguration SetupQueueEtlToAzureQueueStorageOnline(DocumentS
BrokerType = QueueBrokerType.AzureQueueStorage,
AzureQueueStorageConnectionSettings = new AzureQueueStorageConnectionSettings
{
Authentication = new global::Raven.Client.Documents.Operations.ETL.Queue.Authentication
{
ConnectionString = ConnectionString
}
ConnectionString = ConnectionString
}
});
return config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,7 @@ public void Error_if_script_is_empty()
Name = "Foo",
BrokerType = QueueBrokerType.AzureQueueStorage,
AzureQueueStorageConnectionSettings =
new AzureQueueStorageConnectionSettings()
{
Authentication =
new global::Raven.Client.Documents.Operations.ETL.Queue.Authentication()
{
ConnectionString = ConnectionString
}
}
new AzureQueueStorageConnectionSettings { ConnectionString = ConnectionString }
});

List<string> errors;
Expand Down Expand Up @@ -251,14 +244,7 @@ await session.StoreAsync(new Order
Name = "simulate",
BrokerType = QueueBrokerType.AzureQueueStorage,
AzureQueueStorageConnectionSettings =
new AzureQueueStorageConnectionSettings()
{
Authentication =
new global::Raven.Client.Documents.Operations.ETL.Queue.Authentication()
{
ConnectionString = ConnectionString
}
}
new AzureQueueStorageConnectionSettings { ConnectionString = ConnectionString }
}));
Assert.NotNull(result1.RaftCommandIndex);

Expand Down Expand Up @@ -374,28 +360,23 @@ public async Task ShouldImportTask()
Assert.True(destinationRecord.QueueEtls[0].Queues[0].DeleteProcessedDocuments);
}
}

[RavenFact(RavenTestCategory.BackupExportImport | RavenTestCategory.Sharding | RavenTestCategory.Etl)]
public async Task ShouldSkipUnsupportedFeaturesInShardingOnImport_RabbitMqEtl()
{
using (var srcStore = GetDocumentStore())
using (var dstStore = Sharding.GetDocumentStore())
{
var config = SetupQueueEtlToAzureQueueStorageOnline(srcStore,
DefaultScript, DefaultCollections, new List<EtlQueue>()
{
new()
{
Name = "Orders",
DeleteProcessedDocuments = true
}
}, connectionString: ConnectionString);
DefaultScript, DefaultCollections,
new List<EtlQueue>() { new() { Name = "Orders", DeleteProcessedDocuments = true } },
connectionString: ConnectionString);

var record = await srcStore.Maintenance.Server.SendAsync(new GetDatabaseRecordOperation(srcStore.Database));

Assert.NotNull(record.QueueEtls);
Assert.Equal(1, record.QueueEtls.Count);

var exportFile = GetTempFileName();

var exportOperation = await srcStore.Smuggler.ExportAsync(new DatabaseSmugglerExportOptions(), exportFile);
Expand Down

0 comments on commit 051ba22

Please sign in to comment.