Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

81 remove alien partitions #99

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion src/BobApi/BobApiClient.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
Expand Down Expand Up @@ -136,6 +136,48 @@ await GetJson<Partition>(
cancellationToken: cancellationToken
);

public async Task<BobApiResult<List<PartitionSlim>>> GetPartitionSlims(
string diskName,
long vDiskId,
CancellationToken cancellationToken = default
) =>
await GetJson<List<PartitionSlim>>(
$"disks/{diskName}/vdisks/{vDiskId}/partitions",
cancellationToken
);

public async Task<BobApiResult<List<PartitionSlim>>> GetAlienPartitionSlims(
string nodeName,
long vDiskId,
CancellationToken cancellationToken = default
) =>
await GetJson<List<PartitionSlim>>(
$"alien/nodes/{nodeName}/vdisks/{vDiskId}/partitions",
cancellationToken
);

public async Task<BobApiResult<bool>> DeletePartitionById(
string diskName,
long vDiskId,
string partitionId,
CancellationToken cancellationToken
) =>
await DeleteIsOk(
$"disks/{diskName}/vdisks/{vDiskId}/partitions/{partitionId}",
cancellationToken
);

public async Task<BobApiResult<bool>> DeleteAlienPartitionById(
string nodeName,
long vDiskId,
string partitionId,
CancellationToken cancellationToken
) =>
await DeleteIsOk(
$"alien/nodes/{nodeName}/vdisks/{vDiskId}/partitions/{partitionId}",
cancellationToken
);

public async Task<BobApiResult<long>> CountRecordsOnVDisk(
long id,
CancellationToken cancellationToken = default
Expand Down
4 changes: 3 additions & 1 deletion src/BobApi/Entities/BobApiResult.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
Expand Down Expand Up @@ -76,5 +76,7 @@ string content
content
)
);

public static implicit operator BobApiResult<T>(T data) => Ok(data);
}
}
13 changes: 13 additions & 0 deletions src/BobApi/Entities/PartitionSlim.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using Newtonsoft.Json;

namespace BobApi.Entities
{
public class PartitionSlim
{
[JsonProperty("id")]
public string Id { get; set; }

[JsonProperty("timestamp")]
public long Timestamp { get; set; }
}
}
29 changes: 24 additions & 5 deletions src/BobApi/IPartitionsBobApiClient.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,34 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using BobApi.BobEntities;
using BobApi.Entities;

namespace BobApi
{
public interface IPartitionsBobApiClient
public interface IPartitionsBobApiClient : IDisposable
{
Task<BobApiResult<bool>> DeletePartitionsByTimestamp(long vDiskId, long timestamp, CancellationToken cancellationToken = default);
Task<BobApiResult<Partition>> GetPartition(long vdiskId, string partition, CancellationToken cancellationToken = default);
Task<BobApiResult<List<string>>> GetPartitions(ClusterConfiguration.VDisk vDisk, CancellationToken cancellationToken = default);
Task<BobApiResult<List<PartitionSlim>>> GetPartitionSlims(
string diskName,
long vDiskId,
CancellationToken cancellationToken = default
);
Task<BobApiResult<List<PartitionSlim>>> GetAlienPartitionSlims(
string nodeName,
long vDiskId,
CancellationToken cancellationToken = default
);
Task<BobApiResult<bool>> DeletePartitionById(
string diskName,
long vDiskId,
string partitionId,
CancellationToken cancellationToken
);
Task<BobApiResult<bool>> DeleteAlienPartitionById(
string nodeName,
long vDiskId,
string partitionId,
CancellationToken cancellationToken
);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;

namespace BobToolsCli.ConfigurationReading
{
Expand Down Expand Up @@ -28,6 +28,9 @@ public ConfigurationReadingResult<Y> Map<Y>(Func<T, Y> f)
}

public static ConfigurationReadingResult<T> Ok(T data) => new(data, null);

public static ConfigurationReadingResult<T> Error(string error) => new(default, error);

public static implicit operator ConfigurationReadingResult<T>(T data) => Ok(data);
}
}
}
9 changes: 4 additions & 5 deletions src/OldPartitionsRemover/ByDateRemoving/Arguments.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
using System;
using System.Globalization;
using System;
using System.Text.RegularExpressions;
using BobToolsCli;
using CommandLine;
using OldPartitionsRemover.Entities;

namespace OldPartitionsRemover.ByDateRemoving
{
[Verb("by-date")]
public class Arguments : CommonArguments
public class Arguments : RemoverArguments
{
private static readonly Regex s_timeSpanRegex = new(@"^\-(?<span>\d+)(?<unit>[dhmy])");

[Option('t', "threshold", HelpText = "Removal threshold. Can be either date, timestamp or in relative days count format, e.g. \"-3d\"", Required = true)]
public string ThresholdString { get; set; }


public Result<DateTime> GetThreshold()
{
if (string.IsNullOrWhiteSpace(ThresholdString))
Expand Down Expand Up @@ -50,4 +49,4 @@ private static bool TryParseThreshold(string s, DateTime now, out DateTime thres
return false;
}
}
}
}
21 changes: 0 additions & 21 deletions src/OldPartitionsRemover/ByDateRemoving/Entities/NodeApi.cs

This file was deleted.

This file was deleted.

This file was deleted.

87 changes: 13 additions & 74 deletions src/OldPartitionsRemover/ByDateRemoving/Remover.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BobApi;
using BobApi.BobEntities;
using BobApi.Entities;
using BobToolsCli;
using BobToolsCli.BobApliClientFactories;
using BobToolsCli.ConfigurationFinding;
using BobToolsCli.Helpers;
using Microsoft.Extensions.Logging;
using OldPartitionsRemover.ByDateRemoving.Entities;
using OldPartitionsRemover.Entities;
using OldPartitionsRemover.Infrastructure;

Expand All @@ -24,16 +18,18 @@ public partial class Remover
private readonly IBobApiClientFactory _bobApiClientFactory;
private readonly IConfigurationFinder _configurationFinder;
private readonly ResultsCombiner _resultsCombiner;
private readonly RemovablePartitionsFinder _removablePartitionsFinder;

public Remover(Arguments arguments, ILogger<Remover> logger,
IBobApiClientFactory bobApiClientFactory, IConfigurationFinder configurationFinder,
ResultsCombiner resultsCombiner)
ResultsCombiner resultsCombiner, RemovablePartitionsFinder removablePartitionsFinder)
{
_arguments = arguments;
_logger = logger;
_bobApiClientFactory = bobApiClientFactory;
_configurationFinder = configurationFinder;
_resultsCombiner = resultsCombiner;
_removablePartitionsFinder = removablePartitionsFinder;
}

public async Task<Result<int>> RemoveOldPartitions(CancellationToken cancellationToken)
Expand All @@ -46,80 +42,23 @@ public async Task<Result<int>> RemoveOldPartitions(CancellationToken cancellatio
_logger.LogInformation("Removing blobs older than {Threshold}", t);
return configResult.Bind(c => FindInCluster(c, t, cancellationToken));
});
return await removeOperations.Bind(InvokeOperations);
return await removeOperations.Bind(async ops => await InvokeOperations(ops, cancellationToken));
}

private async Task<Result<List<RemoveOperation>>> FindInCluster(ClusterConfiguration clusterConfig,
private async Task<Result<List<RemovablePartition>>> FindInCluster(ClusterConfiguration clusterConfig,
DateTime threshold, CancellationToken cancellationToken)
=> await _resultsCombiner.CollectResults(clusterConfig.Nodes,
async node => await FindOnNode(clusterConfig, node, threshold, cancellationToken));

private async Task<Result<List<RemoveOperation>>> FindOnNode(ClusterConfiguration clusterConfig,
ClusterConfiguration.Node node, DateTime threshold, CancellationToken cancellationToken)
{
_logger.LogInformation("Preparing partitions to remove from node {Node}", node.Name);

var vdisksOnNode = clusterConfig.VDisks.Where(vd => vd.Replicas.Any(r => r.Node == node.Name));
var nodeApi = new NodeApi(_bobApiClientFactory.GetPartitionsBobApiClient(node), cancellationToken);

return await _resultsCombiner.CollectResults(vdisksOnNode, async vdisk => await FindOnVDisk(vdisk, nodeApi, threshold));
}

private async Task<Result<List<RemoveOperation>>> FindOnVDisk(ClusterConfiguration.VDisk vdisk, NodeApi nodeApi,
DateTime threshold)
{
_logger.LogDebug("Preparing partitions to remove from vdisk {VDisk}", vdisk.Id);

var partitionFunctions = new PartitionFunctions(vdisk, nodeApi);

var partitionIdsResult = await partitionFunctions.FindPartitionIds();
return await partitionIdsResult.Bind(partitionIds => FindWithinPatitionIds(partitionIds, partitionFunctions, threshold));
}

private async Task<Result<List<RemoveOperation>>> FindWithinPatitionIds(List<string> partitionIds,
PartitionFunctions partitionFunctions, DateTime threshold)
{
var partitionsResult = await GetPartitions(partitionIds, partitionFunctions);
return partitionsResult.Bind(partitions => FindWithinPartitions(partitions, partitionFunctions, threshold));
}

private async Task<Result<List<Partition>>> GetPartitions(List<string> partitionIds,
PartitionFunctions partitionFunctions)
{
return await _resultsCombiner.CollectResults(partitionIds, async p => await partitionFunctions.FindPartitionById(p));
}

private Result<List<RemoveOperation>> FindWithinPartitions(List<Partition> partitionInfos,
PartitionFunctions partitionFunctions, DateTime threshold)
{
return FindByTimestamp(partitionInfos, partitionFunctions, threshold);
}

private Result<List<RemoveOperation>> FindByTimestamp(List<Partition> partitionInfos,
PartitionFunctions partitionFunctions, DateTimeOffset threshold)
{
var oldTimestamps = partitionInfos.Select(p => p.Timestamp)
.Where(p => DateTimeOffset.FromUnixTimeSeconds(p) < threshold)
.Distinct()
.ToArray();
var countByOldTimestamp = partitionInfos.Select(p => p.Timestamp)
.Where(p => DateTimeOffset.FromUnixTimeSeconds(p) < threshold)
.GroupBy(p => p)
.ToDictionary(g => g.Key, g => g.Count());
if (oldTimestamps.Length > 0)
_logger.LogInformation("Preparing partitions from {TimestampsCount} timestamps to remove", oldTimestamps.Length);
else
_logger.LogInformation("No partitions to be removed");

var removeOperations = oldTimestamps.Select<long, RemoveOperation>(ts =>
async () => (await partitionFunctions.RemovePartitionsByTimestamp(ts)).Map(t => t ? countByOldTimestamp[ts] : 0)).ToList();
return Result<List<RemoveOperation>>.Ok(removeOperations);
return (await _removablePartitionsFinder.Find(clusterConfig, cancellationToken))
.Map(rms => {
rms.RemoveAll(rm => rm.Timestamp >= threshold);
return rms;
});
}

private async Task<Result<int>> InvokeOperations(List<RemoveOperation> ops)
private async Task<Result<int>> InvokeOperations(List<RemovablePartition> ops, CancellationToken cancellationToken)
{
_logger.LogInformation("Invoking {RemoveOperationsCount} remove operations", ops.Count);
return await _resultsCombiner.CombineResults(ops, 0, async (c, n) => (await n()).Map(r => c + r));
return await _resultsCombiner.CombineResults(ops, 0, async (c, n) => (await n.Remove(cancellationToken)).Map(r => c + (r ? 1 : 0)));
}
}
}
6 changes: 2 additions & 4 deletions src/OldPartitionsRemover/BySpaceRemoving/Arguments.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
using System;
using System.Text.RegularExpressions;
using BobToolsCli;
using System;
using ByteSizeLib;
using CommandLine;
using OldPartitionsRemover.Entities;

namespace OldPartitionsRemover.BySpaceRemoving
{
[Verb("by-space")]
public class Arguments : CommonArguments
public class Arguments : RemoverArguments
{
[Option('t', "threshold", HelpText = "Removal threshold", Required = true)]
public string ThresholdString { get; set; }
Expand Down
Loading
Loading