Skip to content

Commit

Permalink
Bugfix for PostProcess Stats job (#10282)
Browse files Browse the repository at this point in the history
* Fixing gallery build error

* Testfix

* Bugfix

* Added TopLevel

* Has right .net sdk version now

* Changed toplevel error

* Changed toplevel error pt 2

* Changed toplevel error pt 3

* Bugfix

* Bugfix2

* Added a comment

* Clarified comment

* Changed comments

* resolved a comment

---------

Co-authored-by: Lana Parezanin <[email protected]>
  • Loading branch information
Lanaparezanin and Lana Parezanin authored Jan 24, 2025
1 parent 53b4272 commit 03b38b8
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 22 deletions.
2 changes: 1 addition & 1 deletion global.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
"rollForward": "latestFeature",
"allowPrerelease": false
}
}
}
7 changes: 5 additions & 2 deletions src/NuGet.Services.Storage/AggregateStorage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
Expand Down Expand Up @@ -111,9 +111,12 @@ public override async Task<IEnumerable<StorageListItem>> ListAsync(bool getMetad
return await _primaryStorage.ListAsync(getMetadata, cancellationToken);
}

public override async Task<IEnumerable<StorageListItem>> ListTopLevelAsync(bool getMetadata, CancellationToken cancellationToken) =>
await _primaryStorage.ListTopLevelAsync(getMetadata, cancellationToken);

public override Task SetMetadataAsync(Uri resourceUri, IDictionary<string, string> metadata)
{
throw new NotImplementedException();
}
}
}
}
24 changes: 23 additions & 1 deletion src/NuGet.Services.Storage/AzureStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,29 @@ public override async Task<IEnumerable<StorageListItem>> ListAsync(bool getMetad

await foreach (BlobHierarchyItem blob in _directory.GetBlobsByHierarchyAsync(traits: blobTraits, prefix: _path))
{
blobList.Add(await GetStorageListItemAsync(_directory.GetBlockBlobClient(blob.Blob.Name)));
blobList.Add(await GetStorageListItemAsync(_directory.GetBlockBlobClient(blob.Blob.Name)));
}

return blobList;
}

public override async Task<IEnumerable<StorageListItem>> ListTopLevelAsync(bool getMetadata, CancellationToken cancellationToken)
{
var prefix = _path.Trim('/') + '/';
var blobTraits = new BlobTraits();
if (getMetadata)
{
blobTraits |= BlobTraits.Metadata;
}

var blobList = new List<StorageListItem>();

await foreach (BlobHierarchyItem blob in _directory.GetBlobsByHierarchyAsync(traits: blobTraits, prefix: prefix, delimiter: "/"))
{
if (!blob.IsPrefix)
{
blobList.Add(await GetStorageListItemAsync(_directory.GetBlockBlobClient(blob.Blob.Name)));
}
}

return blobList;
Expand Down
7 changes: 6 additions & 1 deletion src/NuGet.Services.Storage/FileStorage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
Expand Down Expand Up @@ -57,6 +57,11 @@ public override Task<IEnumerable<StorageListItem>> ListAsync(bool getMetadata, C
return Task.FromResult<IEnumerable<StorageListItem>>(List(getMetadata));
}

public override Task<IEnumerable<StorageListItem>> ListTopLevelAsync(bool getMetadata, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

public string Path
{
get;
Expand Down
13 changes: 12 additions & 1 deletion src/NuGet.Services.Storage/IStorage.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -19,7 +20,17 @@ public interface IStorage
Uri BaseAddress { get; }
Uri ResolveUri(string relativeUri);
IEnumerable<StorageListItem> List(bool getMetadata);

/// <summary>
/// Lists all children of the storage(including the ones contained in subdirectories).
/// </summary>
Task<IEnumerable<StorageListItem>> ListAsync(bool getMetadata, CancellationToken cancellationToken);

/// <summary>
/// Lists immediate children of the storage assuming directory-like structure
/// </summary>
Task<IEnumerable<StorageListItem>> ListTopLevelAsync(bool getMetadata, CancellationToken cancellationToken);

Task CopyAsync(
Uri sourceUri,
IStorage destinationStorage,
Expand Down
8 changes: 7 additions & 1 deletion src/NuGet.Services.Storage/Storage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public async Task<string> LoadString(Uri resourceUri, CancellationToken cancella
public abstract Task<bool> ExistsAsync(string fileName, CancellationToken cancellationToken);
public abstract IEnumerable<StorageListItem> List(bool getMetadata);
public abstract Task<IEnumerable<StorageListItem>> ListAsync(bool getMetadata, CancellationToken cancellationToken);
public abstract Task<IEnumerable<StorageListItem>> ListTopLevelAsync(bool getMetadata, CancellationToken cancellationToken);

public bool Verbose
{
Expand Down Expand Up @@ -205,10 +206,15 @@ protected string GetName(Uri uri)
int baseAddressLength = address.Length;

var name = uriString.Substring(baseAddressLength);
if (name.Contains("#"))
if (name.Contains("?"))
{
name = name.Substring(0, name.IndexOf("?"));
}
else if (name.Contains("#"))
{
name = name.Substring(0, name.IndexOf("#"));
}

return name;
}

Expand Down
21 changes: 11 additions & 10 deletions src/Stats.PostProcessReports/DetailedReportPostProcessor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
Expand All @@ -10,6 +10,7 @@
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
Expand Down Expand Up @@ -120,7 +121,7 @@ private async Task ProcessBlobs(List<StorageListItem> jsonBlobs, CancellationTok
foreach (var sourceBlob in jsonBlobs)
{
var blobName = GetBlobName(sourceBlob);
var workBlobUri = _workStorage.ResolveUri(blobName);
var workBlobUri = _workStorage.ResolveUri(_configuration.WorkPath + blobName);
var sourceBlobStats = new BlobStatistics();
var individualReports = await ProcessSourceBlobAsync(sourceBlob, sourceBlobStats, totals);
using (_logger.BeginScope("Processing {BlobName}", blobName))
Expand Down Expand Up @@ -154,7 +155,7 @@ private async Task ProcessBlobs(List<StorageListItem> jsonBlobs, CancellationTok
}
}
}
var jobSucceededUrl = _workStorage.ResolveUri(JobSucceededFilename);
var jobSucceededUrl = _workStorage.ResolveUri(_configuration.WorkPath + JobSucceededFilename);
var jobSucceededContent = new StringStorageContent("", TextContentType);
await _workStorage.Save(jobSucceededUrl, jobSucceededContent, overwrite: true, cancellationToken: cancellationToken);
_telemetryService.ReportTotals(totals.SourceFilesProcessed, totals.TotalLinesProcessed, totals.TotalFilesCreated, totals.TotalLinesFailed);
Expand Down Expand Up @@ -202,26 +203,26 @@ private async Task CopySourceBlobsAsync(List<StorageListItem> jsonBlobs, Cancell
foreach (var sourceBlob in jsonBlobs)
{
var blobName = GetBlobName(sourceBlob);
var targetUrl = _workStorage.ResolveUri(blobName);
var targetUrl = _workStorage.ResolveUri(_configuration.WorkPath + blobName);
_logger.LogInformation("{SourceBlobUri} ({BlobName})", sourceBlob.Uri.AbsoluteUri, blobName);
_logger.LogInformation("{WorkBlobUrl}", targetUrl);
await _sourceStorage.CopyAsync(sourceBlob.Uri, _workStorage, targetUrl, destinationProperties: null, cancellationToken);
}
var copySucceededContent = new StringStorageContent("", TextContentType);
var copySucceededUrl = _workStorage.ResolveUri(CopySucceededFilename);
var copySucceededUrl = _workStorage.ResolveUri(_configuration.WorkPath + CopySucceededFilename);
await _workStorage.Save(copySucceededUrl, copySucceededContent, overwrite: true, cancellationToken: cancellationToken);
}

private async Task<List<StorageListItem>> EnumerateSourceBlobsAsync()
{
var blobs = await _sourceStorage.ListAsync(getMetadata: true, cancellationToken: CancellationToken.None);
var blobs = await _sourceStorage.ListTopLevelAsync(getMetadata: true, cancellationToken: CancellationToken.None);

return blobs.ToList();
}

private async Task<List<StorageListItem>> EnumerateWorkStorageBlobsAsync()
{
var blobs = await _workStorage.ListAsync(getMetadata: true, cancellationToken: CancellationToken.None);
var blobs = await _workStorage.ListTopLevelAsync(getMetadata: true, cancellationToken: CancellationToken.None);

return blobs.ToList();
}
Expand All @@ -245,7 +246,7 @@ private async Task<ConcurrentBag<LineProcessingContext>> ProcessSourceBlobAsync(
var sw = Stopwatch.StartNew();
var numLines = 0;
var individualReports = new ConcurrentBag<LineProcessingContext>();
var workStorageUrl = _workStorage.ResolveUri(GetBlobName(sourceBlob));
var workStorageUrl = _workStorage.ResolveUri(_configuration.WorkPath + GetBlobName(sourceBlob));
var storageContent = await _workStorage.Load(workStorageUrl, CancellationToken.None);
using (var sourceStream = storageContent.GetContentStream())
using (var streamReader = new StreamReader(sourceStream))
Expand Down Expand Up @@ -311,7 +312,7 @@ private static bool BlobMetadataExists(StorageListItem sourceBlob, TotalStats to

private static string GetBlobName(StorageListItem blob)
{
var path = blob.Uri.AbsoluteUri;
var path = blob.Uri.GetComponents(UriComponents.Path, UriFormat.UriEscaped);
var lastSlash = path.LastIndexOf('/');
if (lastSlash < 0)
{
Expand Down Expand Up @@ -351,7 +352,7 @@ private async Task WriteReports(
continue;
}
var outFilename = $"recentpopularitydetail_{data.PackageId.ToLowerInvariant()}.json";
var destinationUri = _destinationStorage.ResolveUri(outFilename);
var destinationUri = _destinationStorage.ResolveUri(_configuration.DestinationPath + outFilename);
var storageContent = new StringStorageContent(details.Data, JsonContentType);

await _destinationStorage.Save(destinationUri, storageContent, overwrite: true, cancellationToken: cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ public override Task<IEnumerable<StorageListItem>> ListAsync(bool getMetadata, C
return _storage.ListAsync(getMetadata, cancellationToken);
}

public override Task<IEnumerable<StorageListItem>> ListTopLevelAsync(bool getMetadata, CancellationToken cancellationToken)
{
return _storage.ListTopLevelAsync(getMetadata, cancellationToken);
}

public override Task SetMetadataAsync(Uri resourceUri, IDictionary<string, string> metadata)
{
return _storage.SetMetadataAsync(resourceUri, metadata);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
Expand Down Expand Up @@ -33,13 +33,13 @@ public class DetailedReportPostProcessorFacts
public async Task DoesntStartIfNoSuccessFile()
{
_sourceStorageMock
.Setup(ss => ss.ListAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.Setup(ss => ss.ListTopLevelAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new List<StorageListItem>());

await _target.CopyReportsAsync();

_sourceStorageMock
.Verify(ss => ss.ListAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>()), Times.Once);
.Verify(ss => ss.ListTopLevelAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>()), Times.Once);
_sourceStorageMock.VerifyNoOtherCalls();
_workStorageMock.VerifyNoOtherCalls();
_destinationStorageMock.VerifyNoOtherCalls();
Expand Down Expand Up @@ -186,7 +186,7 @@ public async Task SkipsProcessedFiles()
{ "FilesCreated", "123" }
};
_workStorageMock
.Setup(ss => ss.ListAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.Setup(ss => ss.ListTopLevelAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(() => new List<StorageListItem>(_workFiles.Select(f => Blob(
_workStorageMock,
f,
Expand Down Expand Up @@ -286,7 +286,7 @@ private static void SetupStorageMock(Mock<IStorage> mock, string baseUrl, Func<L
return files().Contains(filename);
});
mock
.Setup(s => s.ListAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.Setup(s => s.ListTopLevelAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(() => new List<StorageListItem>(files().Select(f => Blob(mock, f))));
mock
.Setup(s => s.ResolveUri(It.IsAny<string>()))
Expand Down

0 comments on commit 03b38b8

Please sign in to comment.