diff --git a/Proto.Cluster.AzureContainerApps/ArmClientUtils.cs b/Proto.Cluster.AzureContainerApps/ArmClientUtils.cs new file mode 100644 index 0000000000..b42cd70aaa --- /dev/null +++ b/Proto.Cluster.AzureContainerApps/ArmClientUtils.cs @@ -0,0 +1,118 @@ +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Azure; +using Azure.ResourceManager; +using Azure.ResourceManager.AppContainers; +using Azure.ResourceManager.Resources; +using Azure.ResourceManager.Resources.Models; +using Microsoft.Extensions.Logging; + +namespace Proto.Cluster.AzureContainerApps; + +public static class ArmClientUtils +{ + private static readonly ILogger Logger = Log.CreateLogger(nameof(ArmClientUtils)); + + public static async Task GetClusterMembers(this ArmClient client, string resourceGroupName, string containerAppName) + { + var members = new List(); + + var containerApp = await (await client.GetResourceGroupByName(resourceGroupName)).Value.GetContainerAppAsync(containerAppName); + + if (containerApp is null || !containerApp.HasValue) + { + Logger.LogError("Container App: {ContainerApp} in resource group: {ResourceGroup} is not found", containerApp, resourceGroupName); + return members.ToArray(); + } + + var containerAppRevisions = GetActiveRevisionsWithTraffic(containerApp).ToList(); + if (!containerAppRevisions.Any()) + { + Logger.LogError("Container App: {ContainerApp} in resource group: {ResourceGroup} does not contain any active revisions with traffic", containerAppName, resourceGroupName); + return members.ToArray(); + } + + var replicasWithTraffic = containerAppRevisions.SelectMany(r => r.GetContainerAppReplicas()); + + var allTags = (await containerApp.Value.GetTagResource().GetAsync()).Value.Data.TagValues; + + foreach (var replica in replicasWithTraffic) + { + var replicaNameTag = allTags.FirstOrDefault(kvp => kvp.Value == replica.Data.Name); + if (replicaNameTag.Key == null) + { + Logger.LogWarning("Skipping Replica with name: {Name}, no Proto Tags found", replica.Data.Name); + continue; + } + + var replicaNameTagPrefix = replicaNameTag.Key.Replace(ResourceTagLabels.LabelReplicaNameWithoutPrefix, string.Empty); + var currentReplicaTags = allTags.Where(kvp => kvp.Key.StartsWith(replicaNameTagPrefix)).ToDictionary(x => x.Key, x => x.Value); + + var memberId = currentReplicaTags.FirstOrDefault(kvp => kvp.Key.ToString().Contains(ResourceTagLabels.LabelMemberIdWithoutPrefix)).Value; + + var kinds = currentReplicaTags + .Where(kvp => kvp.Key.StartsWith(ResourceTagLabels.LabelKind(memberId))) + .Select(kvp => kvp.Key[(ResourceTagLabels.LabelKind(memberId).Length + 1)..]) + .ToArray(); + + var member = new Member + { + Id = currentReplicaTags[ResourceTagLabels.LabelMemberId(memberId)], + Port = int.Parse(currentReplicaTags[ResourceTagLabels.LabelPort(memberId)]), + Host = currentReplicaTags[ResourceTagLabels.LabelHost(memberId)], + Kinds = { kinds } + }; + + members.Add(member); + } + + return members.ToArray(); + } + + public static async Task AddMemberTags(this ArmClient client, string resourceGroupName, string containerAppName, Dictionary newTags) + { + var resourceTag = new Tag(); + foreach (var tag in newTags) + { + resourceTag.TagValues.Add(tag); + } + + var resourceGroup = await client.GetResourceGroupByName(resourceGroupName); + var containerApp = await resourceGroup.Value.GetContainerAppAsync(containerAppName); + var tagResource = containerApp.Value.GetTagResource(); + + var existingTags = (await tagResource.GetAsync()).Value.Data.TagValues; + foreach (var tag in existingTags) + { + resourceTag.TagValues.Add(tag); + } + await tagResource.CreateOrUpdateAsync(WaitUntil.Completed, new TagResourceData(resourceTag)); + } + + public static async Task ClearMemberTags(this ArmClient client, string resourceGroupName, string containerAppName, string memberId) + { + var resourceGroup = await client.GetResourceGroupByName(resourceGroupName); + var containerApp = await resourceGroup.Value.GetContainerAppAsync(containerAppName); + var tagResource = containerApp.Value.GetTagResource(); + + var resourceTag = new Tag(); + var existingTags = (await tagResource.GetAsync()).Value.Data.TagValues; + + foreach (var tag in existingTags) + { + if (!tag.Key.StartsWith(ResourceTagLabels.LabelPrefix(memberId))) + { + resourceTag.TagValues.Add(tag); + } + } + + await tagResource.CreateOrUpdateAsync(WaitUntil.Completed, new TagResourceData(resourceTag)); + } + + public static async Task> GetResourceGroupByName(this ArmClient client, string resourceGroupName) => + await (await client.GetDefaultSubscriptionAsync()).GetResourceGroups().GetAsync(resourceGroupName); + + private static IEnumerable GetActiveRevisionsWithTraffic(ContainerAppResource containerApp) => + containerApp.GetContainerAppRevisions().Where(r => r.HasData && r.Data.Active.GetValueOrDefault(false) && r.Data.TrafficWeight > 0); +} \ No newline at end of file diff --git a/Proto.Cluster.AzureContainerApps/AzureContainerAppsProvider.cs b/Proto.Cluster.AzureContainerApps/AzureContainerAppsProvider.cs new file mode 100644 index 0000000000..38b3ebc130 --- /dev/null +++ b/Proto.Cluster.AzureContainerApps/AzureContainerAppsProvider.cs @@ -0,0 +1,193 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Azure.ResourceManager; +using Azure.ResourceManager.AppContainers; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using Proto.Utils; + +namespace Proto.Cluster.AzureContainerApps; + +public class AzureContainerAppsProvider : IClusterProvider +{ + public readonly string AdvertisedHost; + + private readonly ArmClient _client; + private readonly string _resourceGroup; + private readonly string _containerAppName; + private readonly string _revisionName; + private readonly string _replicaName; + + private string _memberId = null!; + private string _address = null!; + private Cluster _cluster = null!; + private string _clusterName = null!; + private string[] _kinds = null!; + private string _host = null!; + private int _port; + + private readonly IConfiguration _configuration; + private static readonly ILogger Logger = Log.CreateLogger(); + private static readonly TimeSpan PollIntervalInSeconds = TimeSpan.FromSeconds(5); + + public AzureContainerAppsProvider( + IConfiguration configuration, + ArmClient client, + string resourceGroup, + string containerAppName, + string revisionName, + string replicaName, + string advertisedHost = default) + { + _configuration = configuration; + _client = client; + _resourceGroup = resourceGroup; + _containerAppName = containerAppName; + _revisionName = revisionName; + _replicaName = replicaName; + AdvertisedHost = advertisedHost; + + if (string.IsNullOrEmpty(AdvertisedHost)) + { + AdvertisedHost = ConfigUtils.FindIpAddress().ToString(); + } + } + + public async Task StartMemberAsync(Cluster cluster) + { + var clusterName = cluster.Config.ClusterName; + var (host, port) = cluster.System.GetAddress(); + var kinds = cluster.GetClusterKinds(); + _cluster = cluster; + _clusterName = clusterName; + _memberId = cluster.System.Id; + _port = port; + _host = host; + _kinds = kinds; + _address = $"{host}:{port}"; + + await RegisterMemberAsync(); + StartClusterMonitor(); + } + + public Task StartClientAsync(Cluster cluster) + { + var clusterName = cluster.Config.ClusterName; + var (host, port) = cluster.System.GetAddress(); + _cluster = cluster; + _clusterName = clusterName; + _memberId = cluster.System.Id; + _port = port; + _host = host; + _kinds = Array.Empty(); + + StartClusterMonitor(); + return Task.CompletedTask; + } + + public async Task ShutdownAsync(bool graceful) => await DeregisterMemberAsync(); + + private async Task RegisterMemberAsync() + { + await Retry.Try(RegisterMemberInner, onError: OnError, onFailed: OnFailed, retryCount: Retry.Forever); + + static void OnError(int attempt, Exception exception) => + Logger.LogWarning(exception, "Failed to register service"); + + static void OnFailed(Exception exception) => Logger.LogError(exception, "Failed to register service"); + } + + private async Task RegisterMemberInner() + { + var resourceGroup = await _client.GetResourceGroupByName(_resourceGroup); + var containerApp = await resourceGroup.Value.GetContainerAppAsync(_containerAppName); + var revision = await containerApp.Value.GetContainerAppRevisionAsync(_revisionName); + + if (revision.Value.Data.TrafficWeight.GetValueOrDefault(0) == 0) + { + return; + } + + Logger.LogInformation( + "[Cluster][AzureContainerAppsProvider] Registering service {ReplicaName} on {IpAddress}", + _replicaName, + _address); + + var tags = new Dictionary + { + [ResourceTagLabels.LabelCluster(_memberId)] = _clusterName, + [ResourceTagLabels.LabelHost(_memberId)] = AdvertisedHost, + [ResourceTagLabels.LabelPort(_memberId)] = _port.ToString(), + [ResourceTagLabels.LabelMemberId(_memberId)] = _memberId, + [ResourceTagLabels.LabelReplicaName(_memberId)] = _replicaName + }; + + foreach (var kind in _kinds) + { + var labelKey = $"{ResourceTagLabels.LabelKind(_memberId)}-{kind}"; + tags.TryAdd(labelKey, "true"); + } + + try + { + await _client.AddMemberTags(_resourceGroup, _containerAppName, tags); + } + catch (Exception x) + { + Logger.LogError(x, "Failed to update metadata"); + } + } + + private void StartClusterMonitor() => + _ = SafeTask.Run(async () => + { + while (!_cluster.System.Shutdown.IsCancellationRequested) + { + Logger.LogInformation("Calling ECS API"); + + try + { + var members = await _client.GetClusterMembers(_resourceGroup, _containerAppName); + + if (members.Any()) + { + Logger.LogInformation("Got members {Members}", members.Length); + _cluster.MemberList.UpdateClusterTopology(members); + } + else + { + Logger.LogWarning("Failed to get members from Azure Container Apps"); + } + } + catch (Exception x) + { + Logger.LogError(x, "Failed to get members from Azure Container Apps"); + } + + await Task.Delay(PollIntervalInSeconds); + } + } + ); + + private async Task DeregisterMemberAsync() + { + await Retry.Try(DeregisterMemberInner, onError: OnError, onFailed: OnFailed); + + static void OnError(int attempt, Exception exception) => + Logger.LogWarning(exception, "Failed to deregister service"); + + static void OnFailed(Exception exception) => Logger.LogError(exception, "Failed to deregister service"); + } + + private async Task DeregisterMemberInner() + { + Logger.LogInformation( + "[Cluster][AzureContainerAppsProvider] Unregistering member {ReplicaName} on {IpAddress}", + _replicaName, + _address); + + await _client.ClearMemberTags(_resourceGroup, _containerAppName, _memberId); + } +} \ No newline at end of file diff --git a/Proto.Cluster.AzureContainerApps/ConfigUtils.cs b/Proto.Cluster.AzureContainerApps/ConfigUtils.cs new file mode 100644 index 0000000000..9a9f22eb07 --- /dev/null +++ b/Proto.Cluster.AzureContainerApps/ConfigUtils.cs @@ -0,0 +1,52 @@ +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.NetworkInformation; +using System.Net.Sockets; + +namespace Proto.Cluster.AzureContainerApps; + +public static class ConfigUtils +{ + internal static IPAddress FindIpAddress(AddressFamily family = AddressFamily.InterNetwork) + { + var addressCandidates = NetworkInterface.GetAllNetworkInterfaces() + .Where(nif => nif.OperationalStatus == OperationalStatus.Up) + .SelectMany(nif => nif.GetIPProperties().UnicastAddresses.Select(a => a.Address)) + .Where(addr => addr.AddressFamily == family && !IPAddress.IsLoopback(addr)) + .ToList(); + + return PickSmallestIpAddress(addressCandidates); + } + + private static IPAddress PickSmallestIpAddress(IEnumerable candidates) + { + IPAddress result = null!; + foreach (var addr in candidates) + { + if (CompareIpAddresses(addr, result)) + result = addr; + } + return result; + + static bool CompareIpAddresses(IPAddress lhs, IPAddress rhs) + { + if (rhs == null) + return true; + + var lbytes = lhs.GetAddressBytes(); + var rbytes = rhs.GetAddressBytes(); + + if (lbytes.Length != rbytes.Length) return lbytes.Length < rbytes.Length; + + for (var i = 0; i < lbytes.Length; i++) + { + if (lbytes[i] != rbytes[i]) + { + return lbytes[i] < rbytes[i]; + } + } + return false; + } + } +} \ No newline at end of file diff --git a/Proto.Cluster.AzureContainerApps/Proto.Cluster.AzureContainerApps.csproj b/Proto.Cluster.AzureContainerApps/Proto.Cluster.AzureContainerApps.csproj new file mode 100644 index 0000000000..b004cb1395 --- /dev/null +++ b/Proto.Cluster.AzureContainerApps/Proto.Cluster.AzureContainerApps.csproj @@ -0,0 +1,17 @@ + + + + 10 + netcoreapp3.1;net6.0;net7.0 + + + + + + + + + + + + diff --git a/Proto.Cluster.AzureContainerApps/ResourceTagLabels.cs b/Proto.Cluster.AzureContainerApps/ResourceTagLabels.cs new file mode 100644 index 0000000000..0ea31f77bd --- /dev/null +++ b/Proto.Cluster.AzureContainerApps/ResourceTagLabels.cs @@ -0,0 +1,14 @@ +namespace Proto.Cluster.AzureContainerApps; + +public static class ResourceTagLabels +{ + public static string LabelPrefix(string memberId) => $"proto.cluster-{memberId}|"; + public static string LabelHost(string memberId) => LabelPrefix(memberId) + "host"; + public static string LabelPort(string memberId) => LabelPrefix(memberId) + "port"; + public static string LabelKind(string memberId) => LabelPrefix(memberId) + "kind"; + public static string LabelCluster(string memberId) => LabelPrefix(memberId) + "cluster"; + public static string LabelMemberId(string memberId) => LabelPrefix(memberId) + LabelMemberIdWithoutPrefix; + public const string LabelMemberIdWithoutPrefix = "memberId"; + public static string LabelReplicaName(string memberId) => LabelPrefix(memberId) + LabelReplicaNameWithoutPrefix; + public const string LabelReplicaNameWithoutPrefix = "replicaName"; +} \ No newline at end of file diff --git a/ProtoActor.sln b/ProtoActor.sln index 2d0634307f..fc0922554d 100644 --- a/ProtoActor.sln +++ b/ProtoActor.sln @@ -268,6 +268,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Proto.Analyzers", "src\Prot EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Proto.Analyzers.Tests", "tests\Proto.Analyzers.Tests\Proto.Analyzers.Tests.csproj", "{E56413ED-8205-4AC1-A7CE-24A2C1711F54}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Proto.Cluster.AzureContainerApps", "Proto.Cluster.AzureContainerApps\Proto.Cluster.AzureContainerApps.csproj", "{4A8305DB-758B-4CAD-B8A8-146279A0729A}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -1346,6 +1348,18 @@ Global {E56413ED-8205-4AC1-A7CE-24A2C1711F54}.Release|x64.Build.0 = Release|Any CPU {E56413ED-8205-4AC1-A7CE-24A2C1711F54}.Release|x86.ActiveCfg = Release|Any CPU {E56413ED-8205-4AC1-A7CE-24A2C1711F54}.Release|x86.Build.0 = Release|Any CPU + {4A8305DB-758B-4CAD-B8A8-146279A0729A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4A8305DB-758B-4CAD-B8A8-146279A0729A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4A8305DB-758B-4CAD-B8A8-146279A0729A}.Debug|x64.ActiveCfg = Debug|Any CPU + {4A8305DB-758B-4CAD-B8A8-146279A0729A}.Debug|x64.Build.0 = Debug|Any CPU + {4A8305DB-758B-4CAD-B8A8-146279A0729A}.Debug|x86.ActiveCfg = Debug|Any CPU + {4A8305DB-758B-4CAD-B8A8-146279A0729A}.Debug|x86.Build.0 = Debug|Any CPU + {4A8305DB-758B-4CAD-B8A8-146279A0729A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4A8305DB-758B-4CAD-B8A8-146279A0729A}.Release|Any CPU.Build.0 = Release|Any CPU + {4A8305DB-758B-4CAD-B8A8-146279A0729A}.Release|x64.ActiveCfg = Release|Any CPU + {4A8305DB-758B-4CAD-B8A8-146279A0729A}.Release|x64.Build.0 = Release|Any CPU + {4A8305DB-758B-4CAD-B8A8-146279A0729A}.Release|x86.ActiveCfg = Release|Any CPU + {4A8305DB-758B-4CAD-B8A8-146279A0729A}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1470,6 +1484,7 @@ Global {B0F9003C-BA53-4948-8FB7-1F7F230F9310} = {3D12F5E5-9774-4D7E-8A5B-B1F64544925B} {9D139291-31D9-4502-B5BD-EA522F54B008} = {3D12F5E5-9774-4D7E-8A5B-B1F64544925B} {E56413ED-8205-4AC1-A7CE-24A2C1711F54} = {9AA2BCF0-19AB-4DD9-8D91-7D188E463806} + {4A8305DB-758B-4CAD-B8A8-146279A0729A} = {3D12F5E5-9774-4D7E-8A5B-B1F64544925B} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {CD0D1E44-8118-4682-8793-6B20ABFA824C}