Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

82 remove dsa from disks monitoring #88

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions src/BobApi/BobEntities/ClusterConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
Expand All @@ -16,6 +16,27 @@ public class ClusterConfiguration
[YamlMember(Alias = "vdisks")]
public List<VDisk> VDisks { get; set; }

public string FindDiskName(string nodeName, long vdiskId)
{
return VDisks
.Find(vd => vd.Id == vdiskId)
?.Replicas
.Find(r => r.Node == nodeName)
?.Disk;
}

public Node FindNodeByName(string name) => Nodes.Find(n => n.Name == name);

public VDisk FindVDiskByNodeNameDiskName(string nodeName, string diskName) =>
VDisks.Find(vd => vd.Replicas.Any(r => r.Node == nodeName && r.Disk == diskName));

public static async Task<ClusterConfiguration> FromYamlFile(string filename)
{
var deserializer = new DeserializerBuilder().IgnoreUnmatchedProperties().Build();
var content = await File.ReadAllTextAsync(filename);
return deserializer.Deserialize<ClusterConfiguration>(content);
}

public class Node
{
[YamlMember(Alias = "name")]
Expand All @@ -35,9 +56,12 @@ public async ValueTask<IPAddress> FindIPAddress()
if (!IPAddress.TryParse(host, out var addr))
addr = (await Dns.GetHostAddressesAsync(host)).FirstOrDefault();
return addr;

}

public Disk FindDiskByPath(string path) => Disks.Find(d => d.Path == path);

public override string ToString() => Name;

public class Disk
{
[YamlMember(Alias = "name")]
Expand All @@ -46,11 +70,6 @@ public class Disk
[YamlMember(Alias = "path")]
public string Path { get; set; }
}

public override string ToString()
{
return Name;
}
}

public class VDisk
Expand All @@ -72,18 +91,5 @@ public class Replica
public override string ToString() => $"{Disk}@{Node}";
}
}

public string FindDiskName(string nodeName, long vdiskId)
{
return VDisks.Find(vd => vd.Id == vdiskId)?.Replicas.Find(r => r.Node == nodeName)?.Disk;
}

public static async Task<ClusterConfiguration> FromYamlFile(string filename)
{
var deserializer = new DeserializerBuilder().IgnoreUnmatchedProperties().Build();
var content = await File.ReadAllTextAsync(filename);
return deserializer.Deserialize<ClusterConfiguration>(content);
}
}
}

10 changes: 9 additions & 1 deletion src/BobToolsCli/CommonArguments.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BobApi.BobEntities;
using BobToolsCli.ConfigurationFinding;
using BobToolsCli.ConfigurationReading;
using BobToolsCli.Exceptions;
using BobToolsCli.Helpers;
using CommandLine;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -56,6 +57,13 @@ public async Task<ConfigurationReadingResult<ClusterConfiguration>> FindClusterC
return await GetClusterConfigurationFromFile(ClusterConfigPath, cancellationToken);
}

public async Task<ClusterConfiguration> GetClusterConfiguration(bool skipUnavailableNodes =false, CancellationToken cancellationToken = default)
{
if ((await FindClusterConfiguration(skipUnavailableNodes, cancellationToken)).IsOk(out var conf, out var err))
return conf;
throw new ConfigurationException($"Failed to get cluster configuration: {err}");
}

public async Task<ConfigurationReadingResult<ClusterConfiguration>> GetClusterConfigurationFromFile(string path,
CancellationToken cancellationToken = default)
{
Expand Down
126 changes: 47 additions & 79 deletions src/DisksMonitoring/Bob/DisksCopier.cs
Original file line number Diff line number Diff line change
@@ -1,104 +1,72 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Linq;
using System.Threading.Tasks;
using BobApi;
using BobApi.Entities;
using BobApi.BobEntities;
using DisksMonitoring.Config;
using DisksMonitoring.Entities;
using DisksMonitoring.OS.Helpers;
using Microsoft.Extensions.Logging;
using RemoteFileCopy;
using RemoteFileCopy.Entities;

namespace DisksMonitoring.Bob
{
class DisksCopier
{
private readonly Configuration configuration;
private readonly ProcessInvoker processInvoker;
private readonly ILogger<DisksCopier> logger;
private readonly Configuration _configuration;
private readonly ILogger<DisksCopier> _logger;
private readonly IRemoteFileCopier _remoteFileCopier;

public DisksCopier(Configuration configuration, ProcessInvoker processInvoker, ILogger<DisksCopier> logger)
public DisksCopier(
Configuration configuration,
ILogger<DisksCopier> logger,
IRemoteFileCopier remoteFileCopier
)
{
this.configuration = configuration;
this.processInvoker = processInvoker;
this.logger = logger;
_configuration = configuration;
_logger = logger;
_remoteFileCopier = remoteFileCopier;
}

public async Task CopyDataFromReplica(BobApiClient bobApiClient, BobDisk bobDisk)
public async Task CopyDataFromReplica(
ClusterConfiguration clusterConfiguration,
BobApiClient bobApiClient,
string nodeName,
BobDisk bobDisk
)
{
if (configuration.PathToDiskStatusAnalyzer == null || !File.Exists(configuration.PathToDiskStatusAnalyzer))
var localNode = clusterConfiguration.FindNodeByName(nodeName);
var disk = localNode.FindDiskByPath(bobDisk.BobPath.Path);
var targetVDisk = clusterConfiguration.FindVDiskByNodeNameDiskName(nodeName, disk.Name);
if (targetVDisk == null)
{
logger.LogInformation($"DiskStatusAnalyzer path ({configuration.PathToDiskStatusAnalyzer}) is invalid, skipping copy");
return;
}
var statusResult = await bobApiClient.GetStatus();
if (!statusResult.IsOk(out var status, out var err))
var restReplicas = targetVDisk
.Replicas
.Where(r => r.Node != nodeName || r.Disk != disk.Name);
var currentDir = new RemoteDir(await localNode.FindIPAddress(), bobDisk.BobPath.Path);
foreach (var replica in restReplicas)
{
logger.LogError($"Failed to get status from {bobApiClient}, {err}");
return;
}
var destName = status.Name;
var diskName = bobDisk.DiskNameInBob;
bool IsCurrent(Replica replica) => replica.Node == destName && replica.Disk == diskName;
var vdisks = status.VDisks.Where(vd => vd.Replicas.Any(IsCurrent));
if (!vdisks.Any())
{
logger.LogError($"VDisks with replica ({diskName}, {destName}) not found");
return;
}
foreach (var vdisk in vdisks)
{
var bobPath = Path.Combine(bobDisk.BobPath.Path, "bob");
await TryCreateDir(bobPath);
await TryCreateDir(Path.Combine(bobPath, vdisk.Id.ToString()));
foreach (var replica in vdisk.Replicas)
var replicaNode = clusterConfiguration.Nodes.Single(n => n.Name == replica.Node);
var replicaDisk = replicaNode.Disks.Single(d => d.Name == replica.Disk);
var replicaRemoteDir = new RemoteDir(
await replicaNode.FindIPAddress(),
replicaDisk.Path
);

var copyResult = await _remoteFileCopier.Copy(replicaRemoteDir, currentDir);
if (copyResult.IsError == false)
{
if (replica.Node == destName)
continue;
logger.LogInformation($"Trying to copy {vdisk} from {replica.Node} to {destName}");
try
{
await PerformCopy(replica.Node, destName, vdisk.Id);
logger.LogInformation($"Successfully copied {vdisk} from {replica.Node} to {destName}");
break;
}
catch (Exception e)
{
logger.LogError($"Failed to copy {vdisk} from {replica.Node} to {destName}: {e.Message}");
}
_logger.LogInformation(
"Succesfully copied data for disk {Disk} from {From} to {To}",
bobDisk,
replicaRemoteDir,
currentDir
);
return;
}
}
}

private async Task TryCreateDir(string path)
{
try
{
if (!System.IO.Directory.Exists(path))
await CreateDir(path);
else
logger.LogDebug($"Directory {path} already exists");
}
catch (Exception e)
{
logger.LogError($"Failed to create dir {path}: {e.Message}");
}
}

private async Task CreateDir(string path)
{
logger.LogInformation($"Creating dir {path}");
System.IO.Directory.CreateDirectory(path);
await processInvoker.SetDirPermissionsAndOwner(path, configuration.BobDirPermissions, configuration.BobDirOwner);
}

private async Task PerformCopy(string sourceName, string destName, int vdiskId)
{
string fullPath = Path.GetFullPath(configuration.PathToDiskStatusAnalyzer);
await processInvoker.InvokeSudoProcessWithWD(fullPath, Path.GetDirectoryName(fullPath),
"copy-vdisk", $"-s {sourceName}", $"-d {destName}", $"-v {vdiskId}");
_logger.LogError("Failed to copy data to disk {Dir}", bobDisk);
}
}
}
Loading
Loading