Skip to content

Commit

Permalink
Revert "Fix possible auth caching issue (#453)" (#455)
Browse files Browse the repository at this point in the history
This reverts commit f47e319.
  • Loading branch information
Aaronontheweb authored Dec 4, 2024
1 parent 375306a commit 0e9ac0b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 58 deletions.
53 changes: 26 additions & 27 deletions src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
using System.Threading;
using System.Threading.Tasks;
using Akka.Configuration;
using Akka.Util;
using Azure;
using Azure.Data.Tables;
using Debug = System.Diagnostics.Debug;
Expand Down Expand Up @@ -52,9 +51,10 @@ public class AzureTableStorageJournal : AsyncWriteJournal
private readonly Dictionary<string, ISet<IActorRef>> _persistenceIdSubscribers = new Dictionary<string, ISet<IActorRef>>();
private readonly SerializationHelper _serialization;
private readonly AzureTableStorageJournalSettings _settings;
private readonly TableServiceClient _tableServiceClient;
private TableClient _tableStorage_DoNotUseDirectly;
private readonly Dictionary<string, ISet<IActorRef>> _tagSubscribers = new Dictionary<string, ISet<IActorRef>>();
private readonly CancellationTokenSource _shutdownCts;
private AtomicBoolean _initialized = new();

public AzureTableStorageJournal(Config config = null)
{
Expand All @@ -77,16 +77,31 @@ public AzureTableStorageJournal(Config config = null)

_serialization = new SerializationHelper(Context.System);

if (_settings.Development)
{
_tableServiceClient = new TableServiceClient(connectionString: "UseDevelopmentStorage=true");
}
else
{
// Use TokenCredential if both ServiceUri and TokenCredential are populated in the settings
_tableServiceClient = _settings.ServiceUri != null && _settings.AzureCredential != null
? new TableServiceClient(
endpoint: _settings.ServiceUri,
tokenCredential: _settings.AzureCredential,
options: _settings.TableClientOptions)
: new TableServiceClient(connectionString: _settings.ConnectionString);
}

_shutdownCts = new CancellationTokenSource();
}

public TableClient Table
{
get
{
if (!_initialized.Value)
if (_tableStorage_DoNotUseDirectly == null)
throw new Exception("Table storage has not been initialized yet. PreStart() has not been invoked");
return TableServiceClient.GetTableClient(_settings.TableName);
return _tableStorage_DoNotUseDirectly;
}
}

Expand All @@ -96,23 +111,6 @@ public TableClient Table

protected bool HasTagSubscribers => _tagSubscribers.Count != 0;

private TableServiceClient TableServiceClient
{
get
{
if (_settings.Development)
return new TableServiceClient(connectionString: "UseDevelopmentStorage=true");

// Use TokenCredential if both ServiceUri and TokenCredential are populated in the settings
return _settings.ServiceUri != null && _settings.AzureCredential != null
? new TableServiceClient(
endpoint: _settings.ServiceUri,
tokenCredential: _settings.AzureCredential,
options: _settings.TableClientOptions)
: new TableServiceClient(connectionString: _settings.ConnectionString);
}
}

public override async Task<long> ReadHighestSequenceNrAsync(
string persistenceId,
long fromSequenceNr)
Expand Down Expand Up @@ -265,7 +263,8 @@ protected override void PreStart()
{
_log.Debug("Initializing Azure Table Storage...");

InitCloudStorage(5, _shutdownCts.Token).GetAwaiter().GetResult();
InitCloudStorage(5, _shutdownCts.Token)
.ConfigureAwait(false).GetAwaiter().GetResult();

_log.Debug("Successfully started Azure Table Storage!");

Expand Down Expand Up @@ -621,7 +620,7 @@ private async Task InitCloudStorage(int remainingTries, CancellationToken cancel
{
try
{
var tableClient = TableServiceClient.GetTableClient(_settings.TableName);
var tableClient = _tableServiceClient.GetTableClient(_settings.TableName);

var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(_settings.ConnectTimeout);
Expand All @@ -640,16 +639,16 @@ private async Task InitCloudStorage(int remainingTries, CancellationToken cancel
}

_log.Info("Successfully connected to existing table", _settings.TableName);
_initialized.CompareAndSet(false, true);

_tableStorage_DoNotUseDirectly = tableClient;
return;
}

if (await tableClient.CreateIfNotExistsAsync(cts.Token) != null)
_log.Info("Created Azure Cloud Table", _settings.TableName);
else
_log.Info("Successfully connected to existing table", _settings.TableName);

_initialized.CompareAndSet(false, true);
_tableStorage_DoNotUseDirectly = tableClient;
}
}
catch (Exception ex)
Expand All @@ -668,7 +667,7 @@ private async Task InitCloudStorage(int remainingTries, CancellationToken cancel

private async Task<bool> IsTableExist(string name, CancellationToken cancellationToken)
{
var tables = await TableServiceClient.QueryAsync(t => t.Name == name, cancellationToken: cancellationToken)
var tables = await _tableServiceClient.QueryAsync(t => t.Name == name, cancellationToken: cancellationToken)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);
return tables.Count > 0;
Expand Down
53 changes: 22 additions & 31 deletions src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
using Akka.Event;
using Akka.Persistence.Azure.Util;
using Akka.Persistence.Snapshot;
using Akka.Util;
using Akka.Util.Internal;
using Azure;
using Azure.Storage.Blobs;
Expand Down Expand Up @@ -43,12 +42,13 @@ public class AzureBlobSnapshotStore : SnapshotStore
private const string TimeStampMetaDataKey = "Timestamp";
private const string SeqNoMetaDataKey = "SeqNo";

private readonly Lazy<BlobContainerClient> _containerClient;
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly SerializationHelper _serialization;
private readonly AzureBlobSnapshotStoreSettings _settings;
private readonly BlobServiceClient _serviceClient;

private readonly CancellationTokenSource _shutdownCts;
private AtomicBoolean _initialized = new();

public AzureBlobSnapshotStore(Config config = null)
{
Expand All @@ -70,40 +70,32 @@ public AzureBlobSnapshotStore(Config config = null)
_settings = setup.Value.Apply(_settings);
}

_shutdownCts = new CancellationTokenSource();
}

public BlobContainerClient Container
{
get
if (_settings.Development)
{
if (!_initialized.Value)
throw new Exception("Blob storage has not been initialized yet. PreStart() has not been invoked");
return BlobServiceClient.GetBlobContainerClient(_settings.ContainerName);
_serviceClient = new BlobServiceClient(connectionString: "UseDevelopmentStorage=true");
}
}

private BlobServiceClient BlobServiceClient
{
get
else
{
if (_settings.Development)
return new BlobServiceClient(connectionString: "UseDevelopmentStorage=true");

return _settings.ServiceUri != null && _settings.AzureCredential != null
? new BlobServiceClient(
_serviceClient = _settings.ServiceUri != null && _settings.AzureCredential != null
? _serviceClient = new BlobServiceClient(
serviceUri: _settings.ServiceUri,
credential: _settings.AzureCredential,
options: _settings.BlobClientOptions)
: new BlobServiceClient(connectionString: _settings.ConnectionString);
: _serviceClient = new BlobServiceClient(connectionString: _settings.ConnectionString);
}

_shutdownCts = new CancellationTokenSource();
_containerClient = new Lazy<BlobContainerClient>(() =>
InitCloudStorage(5, _shutdownCts.Token).GetAwaiter().GetResult());
}

private async Task InitCloudStorage(int remainingTries, CancellationToken cancellationToken)
public BlobContainerClient Container => _containerClient.Value;

private async Task<BlobContainerClient> InitCloudStorage(int remainingTries, CancellationToken cancellationToken)
{
try
{
var blobClient = BlobServiceClient.GetBlobContainerClient(_settings.ContainerName);
var blobClient = _serviceClient.GetBlobContainerClient(_settings.ContainerName);

var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(_settings.ConnectTimeout);
Expand All @@ -123,8 +115,7 @@ private async Task InitCloudStorage(int remainingTries, CancellationToken cancel

_log.Info("Successfully connected to existing container {0}", _settings.ContainerName);

_initialized.CompareAndSet(false, true);
return;
return blobClient;
}

if (await blobClient.ExistsAsync(cts.Token))
Expand All @@ -145,7 +136,7 @@ await blobClient.CreateAsync(_settings.ContainerPublicAccessType,
}
}

_initialized.CompareAndSet(false, true);
return blobClient;
}
}
catch (Exception ex)
Expand All @@ -157,15 +148,16 @@ await blobClient.CreateAsync(_settings.ContainerPublicAccessType,
if (cancellationToken.IsCancellationRequested)
throw;

await InitCloudStorage(remainingTries - 1, cancellationToken);
return await InitCloudStorage(remainingTries - 1, cancellationToken);
}
}

protected override void PreStart()
{
_log.Debug("Initializing Azure Container Storage...");

InitCloudStorage(5, _shutdownCts.Token).GetAwaiter().GetResult();
// forces loading of the value
var name = Container.Name;

_log.Debug("Successfully started Azure Container Storage!");

Expand Down Expand Up @@ -302,10 +294,9 @@ protected override async Task DeleteAsync(string persistenceId, SnapshotSelectio
.Where(x => FilterBlobTimestamp(criteria, x));

var deleteTasks = new List<Task>();
var container = Container;
await foreach (var blob in filtered.WithCancellation(cts.Token))
{
var blobClient = container.GetBlobClient(blob.Name);
var blobClient = Container.GetBlobClient(blob.Name);
deleteTasks.Add(blobClient.DeleteIfExistsAsync(cancellationToken: cts.Token));
}

Expand Down

0 comments on commit 0e9ac0b

Please sign in to comment.