Skip to content

Commit

Permalink
cache BlobServiceClient to mitigate Azure Storage Management API thro…
Browse files Browse the repository at this point in the history
…ttling
  • Loading branch information
pohhsu committed Feb 22, 2024
1 parent 55d80e9 commit 239a7b5
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 28 deletions.
8 changes: 4 additions & 4 deletions migrationTool/ams/AssetAnalyzer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ public override async Task MigrateAsync(CancellationToken cancellationToken)
var assetTypes = new ConcurrentDictionary<string, int>();
if (!isAMSAcc)
{
var (storageClient, accountId) = await _resourceProvider.GetStorageAccount(_analysisOptions.AccountName, cancellationToken);
await _resourceProvider.SetStorageAccountResourcesAsync(_analysisOptions.AccountName, cancellationToken);
var (storageClient, accountId) = _resourceProvider.GetBlobServiceClient(_analysisOptions.AccountName);
if (storageClient == null)
{
_logger.LogError("No valid storage account was found.");
Expand All @@ -167,7 +168,6 @@ public override async Task MigrateAsync(CancellationToken cancellationToken)
var writer = channel.Writer;
await MigrateInParallel(containers, filteredList, async (container, cancellationToken) =>
{
// var storage = await _resourceProvider.GetStorageAccountAsync(account, asset, cancellationToken);
var result = await AnalyzeAsync(container, storageClient, cancellationToken);
var assetType = result.AssetType ?? "unknown";
assetTypes.AddOrUpdate(assetType, 1, (key, value) => Interlocked.Increment(ref value));
Expand Down Expand Up @@ -197,7 +197,7 @@ await MigrateInParallel(containers, filteredList, async (container, cancellation
double totalAssets = await QueryMetricAsync(account.Id.ToString(), "AssetCount", cancellationToken);
_logger.LogInformation("The total asset count of the media account is {count}.", totalAssets);

await _resourceProvider.SetStorageResourceGroupsAsync(account, cancellationToken);
await _resourceProvider.SetStorageAccountResourcesAsync(account, cancellationToken);
var assets = account.GetMediaAssets()
.GetAllAsync(resourceFilter, cancellationToken: cancellationToken);
statistics = new AssetStats();
Expand All @@ -221,7 +221,7 @@ await MigrateInParallel(containers, filteredList, async (container, cancellation
var writer = channel.Writer;
await MigrateInParallel(assets, filteredList, async (asset, cancellationToken) =>
{
var storage = await _resourceProvider.GetStorageAccountAsync(account, asset, cancellationToken);
var storage = _resourceProvider.GetBlobServiceClient(asset);
var result = await AnalyzeAsync(asset, storage, cancellationToken);
var assetType = result.AssetType ?? "unknown";
assetTypes.AddOrUpdate(assetType, 1, (key, value) => Interlocked.Increment(ref value));
Expand Down
4 changes: 2 additions & 2 deletions migrationTool/ams/AssetMigrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public override async Task MigrateAsync(CancellationToken cancellationToken)

var orderBy = "properties/created";

await _resourceProvider.SetStorageResourceGroupsAsync(account, cancellationToken);
await _resourceProvider.SetStorageAccountResourcesAsync(account, cancellationToken);
var assets = account.GetMediaAssets().GetAllAsync(resourceFilter, orderby: orderBy, cancellationToken: cancellationToken);

List<MediaAssetResource>? filteredList = null;
Expand Down Expand Up @@ -85,7 +85,7 @@ private async Task<AssetStats> MigrateAsync(MediaServicesAccountResource account
var stats = new AssetStats();
await MigrateInParallel(assets, filteredList, async (asset, cancellationToken) =>
{
var storage = await _resourceProvider.GetStorageAccountAsync(account, asset, cancellationToken);
var storage = _resourceProvider.GetBlobServiceClient(asset);

var result = await MigrateAsync(account, storage, asset, cancellationToken);
stats.Update(result);
Expand Down
41 changes: 26 additions & 15 deletions migrationTool/ams/AzureResourceProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class AzureResourceProvider
protected readonly TokenCredential _credentials;
protected readonly ArmClient _armClient;

private Dictionary<string, ResourceGroupResource> _storageResourceGroups;
private Dictionary<string, StorageAccountResource> _storageAccountResources;

public AzureResourceProvider(TokenCredential credential, GlobalOptions options)
{
Expand All @@ -29,10 +29,10 @@ public AzureResourceProvider(TokenCredential credential, GlobalOptions options)
options.SubscriptionId,
options.ResourceGroup);
_resourceGroup = _armClient.GetResourceGroupResource(resourceGroupId);
_storageResourceGroups = new Dictionary<string, ResourceGroupResource>();
_storageAccountResources = new Dictionary<string, StorageAccountResource>();
}

public async Task SetStorageResourceGroupsAsync(MediaServicesAccountResource account, CancellationToken cancellationToken)
public async Task SetStorageAccountResourcesAsync(MediaServicesAccountResource account, CancellationToken cancellationToken)
{
IList<MediaServicesStorageAccount> storageAccounts;
var mediaServiceResource = await account.GetAsync(cancellationToken: cancellationToken);
Expand All @@ -47,11 +47,20 @@ public async Task SetStorageResourceGroupsAsync(MediaServicesAccountResource acc
storageAccountId.SubscriptionId!,
storageAccountId.ResourceGroupName!);
var resourceGroup = _armClient.GetResourceGroupResource(resourceGroupId);
_storageResourceGroups.Add(storageAccountId.Name, resourceGroup);
StorageAccountResource storage = await resourceGroup.GetStorageAccountAsync(storageAccountId.Name,
cancellationToken: cancellationToken);
_storageAccountResources.Add(storageAccountId.Name, storage);
}
}
}

public async Task SetStorageAccountResourcesAsync(string storageAccountName, CancellationToken cancellationToken)
{
StorageAccountResource storage = await _resourceGroup.GetStorageAccountAsync(storageAccountName,
cancellationToken: cancellationToken);
_storageAccountResources.Add(storageAccountName, storage);
}

public async Task<MediaServicesAccountResource> GetMediaAccountAsync(
string mediaAccountName,
CancellationToken cancellationToken)
Expand All @@ -60,22 +69,24 @@ public async Task<MediaServicesAccountResource> GetMediaAccountAsync(
mediaAccountName, cancellationToken);
}

public async Task<BlobServiceClient> GetStorageAccountAsync(
MediaServicesAccountResource account,
MediaAssetResource asset,
CancellationToken cancellationToken)
public BlobServiceClient GetBlobServiceClient(MediaAssetResource asset)
{
string assetStorageAccountName = asset.Data.StorageAccountName;
_storageResourceGroups.TryGetValue(asset.Data.StorageAccountName, out var rg);
var resource = await rg.GetStorageAccountAsync(asset.Data.StorageAccountName, cancellationToken: cancellationToken);
return GetStorageAccount(resource);
if (_storageAccountResources.TryGetValue(assetStorageAccountName, out var resource))
{
return GetStorageAccount(resource);
}
throw new Exception($"Failed to get BlobServiceClient for storage account {assetStorageAccountName}.");
}

public async Task<(BlobServiceClient, ResourceIdentifier)> GetStorageAccount(string storageAccountName, CancellationToken cancellationToken)
public (BlobServiceClient, ResourceIdentifier) GetBlobServiceClient(string storageAccountName)
{
StorageAccountResource storage =
await _resourceGroup.GetStorageAccountAsync(storageAccountName, cancellationToken: cancellationToken);
return (GetStorageAccount(storage), storage.Id);
string assetStorageAccountName = storageAccountName;
if (_storageAccountResources.TryGetValue(assetStorageAccountName, out var resource))
{
return (GetStorageAccount(resource), resource.Id);
}
throw new Exception($"Failed to get BlobServiceClient for storage account {assetStorageAccountName}.");
}

private BlobServiceClient GetStorageAccount(StorageAccountResource storage)
Expand Down
5 changes: 2 additions & 3 deletions migrationTool/ams/CleanupCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public override async Task MigrateAsync(CancellationToken cancellationToken)
var resourceFilter = _options.IsCleanUpAccount ? null : GetAssetResourceFilter(_options.ResourceFilter, null, null);

var orderBy = "properties/created";
await _resourceProvider.SetStorageResourceGroupsAsync(account, cancellationToken);
await _resourceProvider.SetStorageAccountResourcesAsync(account, cancellationToken);
assets = account.GetMediaAssets()
.GetAllAsync(resourceFilter, orderby: orderBy, cancellationToken: cancellationToken);
List<MediaAssetResource>? assetList = await assets.ToListAsync(cancellationToken);
Expand Down Expand Up @@ -160,8 +160,7 @@ private async Task<bool> CleanUpAssetAsync(bool isForcedelete, MediaServicesAcco
{
try
{

var storage = await _resourceProvider.GetStorageAccountAsync(account, asset, cancellationToken);
var storage = _resourceProvider.GetBlobServiceClient(asset);
var container = storage.GetContainer(asset);
if (!await container.ExistsAsync(cancellationToken))
{
Expand Down
7 changes: 4 additions & 3 deletions migrationTool/ams/ResetCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public override async Task MigrateAsync(CancellationToken cancellationToken)
var (isAMSAcc, account) = await IsAMSAccountAsync(_options.AccountName, cancellationToken);
if (!isAMSAcc)
{
var (storageClient, accountId) = await _resourceProvider.GetStorageAccount(_options.AccountName, cancellationToken);
await _resourceProvider.SetStorageAccountResourcesAsync(_options.AccountName, cancellationToken);
var (storageClient, accountId) = _resourceProvider.GetBlobServiceClient(_options.AccountName);
if (storageClient == null)
{
_logger.LogError("No valid storage account was found.");
Expand Down Expand Up @@ -64,14 +65,14 @@ public override async Task MigrateAsync(CancellationToken cancellationToken)
throw new Exception("No valid media account was found.");
}
_logger.LogInformation("Begin reset assets on account: {name}", account.Data.Name);
await _resourceProvider.SetStorageResourceGroupsAsync(account, cancellationToken);
await _resourceProvider.SetStorageAccountResourcesAsync(account, cancellationToken);
AsyncPageable<MediaAssetResource> assets = account.GetMediaAssets()
.GetAllAsync(cancellationToken: cancellationToken);
List<MediaAssetResource>? assetList = await assets.ToListAsync(cancellationToken);
int resetAssetCount = 0;
foreach (var asset in assetList)
{
var (storage, _) = await _resourceProvider.GetStorageAccount(asset.Data.StorageAccountName, cancellationToken);
var storage = _resourceProvider.GetBlobServiceClient(asset);
var container = storage.GetContainer(asset);
if (!await container.ExistsAsync(cancellationToken))
{
Expand Down
3 changes: 2 additions & 1 deletion migrationTool/ams/StorageMigrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public StorageMigrator(
public override async Task MigrateAsync(CancellationToken cancellationToken)
{
var watch = Stopwatch.StartNew();
var (storageClient, accountId) = await _resourceProvider.GetStorageAccount(_storageOptions.AccountName, cancellationToken);
await _resourceProvider.SetStorageAccountResourcesAsync(_storageOptions.AccountName, cancellationToken);
var (storageClient, accountId) = _resourceProvider.GetBlobServiceClient(_storageOptions.AccountName);
_logger.LogInformation("Begin migration of containers from account: {name}", storageClient.AccountName);
double totalContainers = await GetStorageBlobMetricAsync(accountId, cancellationToken);
_logger.LogInformation("The total count of containers of the storage account is {count}.", totalContainers);
Expand Down

0 comments on commit 239a7b5

Please sign in to comment.