diff --git a/src/Vecc.K8s.MultiCluster.Api.Tests/Services/Default/DefaultHostnameSynchronizerTests.cs b/src/Vecc.K8s.MultiCluster.Api.Tests/Services/Default/DefaultHostnameSynchronizerTests.cs index 11a3baa..e86ce4e 100644 --- a/src/Vecc.K8s.MultiCluster.Api.Tests/Services/Default/DefaultHostnameSynchronizerTests.cs +++ b/src/Vecc.K8s.MultiCluster.Api.Tests/Services/Default/DefaultHostnameSynchronizerTests.cs @@ -17,7 +17,6 @@ public class DefaultHostnameSynchronizerTests { private readonly Mock> _loggerMock; private readonly Mock _ingressManagerMock; - private readonly Mock _namespaceManagerMock; private readonly Mock _serviceManagerMock; private readonly Mock _cacheMock; private readonly Mock _lifetimeMock; @@ -31,7 +30,6 @@ public DefaultHostnameSynchronizerTests() { _loggerMock = new Mock>(); _ingressManagerMock = new Mock(); - _namespaceManagerMock = new Mock(); _serviceManagerMock = new Mock(); _cacheMock = new Mock(); _lifetimeMock = new Mock(); @@ -56,7 +54,6 @@ private DefaultHostnameSynchronizer CreateSynchronizer() return new DefaultHostnameSynchronizer( _loggerMock.Object, _ingressManagerMock.Object, - _namespaceManagerMock.Object, _serviceManagerMock.Object, _cacheMock.Object, Options.Create(_options), diff --git a/src/Vecc.K8s.MultiCluster.Api.Tests/Services/Default/DefaultIngressManagerTests.cs b/src/Vecc.K8s.MultiCluster.Api.Tests/Services/Default/DefaultIngressManagerTests.cs index 59e72b3..851024d 100644 --- a/src/Vecc.K8s.MultiCluster.Api.Tests/Services/Default/DefaultIngressManagerTests.cs +++ b/src/Vecc.K8s.MultiCluster.Api.Tests/Services/Default/DefaultIngressManagerTests.cs @@ -10,7 +10,6 @@ public class DefaultIngressManagerTests { private readonly Mock> _loggerMock; private readonly Mock _kubernetesClientMock; - private readonly Mock _namespaceManagerMock; private readonly Mock _serviceManagerMock; private readonly DefaultIngressManager _ingressManager; @@ -18,13 +17,11 @@ public DefaultIngressManagerTests() { _loggerMock = new Mock>(); _kubernetesClientMock = new Mock(); - _namespaceManagerMock = new Mock(); _serviceManagerMock = new Mock(); _ingressManager = new DefaultIngressManager( _loggerMock.Object, _kubernetesClientMock.Object, - _namespaceManagerMock.Object, _serviceManagerMock.Object); } diff --git a/src/Vecc.K8s.MultiCluster.Api.Tests/Services/Default/DefaultNamespaceManagerTests.cs b/src/Vecc.K8s.MultiCluster.Api.Tests/Services/Default/DefaultNamespaceManagerTests.cs deleted file mode 100644 index 88e1810..0000000 --- a/src/Vecc.K8s.MultiCluster.Api.Tests/Services/Default/DefaultNamespaceManagerTests.cs +++ /dev/null @@ -1,56 +0,0 @@ -using k8s.Models; -using KubeOps.KubernetesClient; -using Microsoft.Extensions.Logging; -using Moq; -using Vecc.K8s.MultiCluster.Api.Services.Default; - -namespace Vecc.K8s.MultiCluster.Api.Tests.Services.Default -{ - public class DefaultNamespaceManagerTests - { - private readonly Mock> _loggerMock; - private readonly Mock _clientMock; - - public DefaultNamespaceManagerTests() - { - _loggerMock = new Mock>(); - _clientMock = new Mock(); - } - - [Fact] - public async Task GetNamsepacesAsync_ReturnsList() - { - // Arrange - var ns1 = new V1Namespace { Metadata = new V1ObjectMeta { Name = "default" } }; - var ns2 = new V1Namespace { Metadata = new V1ObjectMeta { Name = "kube-system" } }; - _clientMock.Setup(x => x.ListAsync(null)) - .ReturnsAsync(new List { ns1, ns2 }); - - var manager = new DefaultNamespaceManager(_loggerMock.Object, _clientMock.Object); - - // Act - var result = await manager.GetNamsepacesAsync(); - - // Assert - Assert.Equal(2, result.Count); - Assert.Equal("default", result[0].Metadata.Name); - Assert.Equal("kube-system", result[1].Metadata.Name); - } - - [Fact] - public async Task GetNamsepacesAsync_EmptyList_ReturnsEmpty() - { - // Arrange - _clientMock.Setup(x => x.ListAsync(null)) - .ReturnsAsync(new List()); - - var manager = new DefaultNamespaceManager(_loggerMock.Object, _clientMock.Object); - - // Act - var result = await manager.GetNamsepacesAsync(); - - // Assert - Assert.Empty(result); - } - } -} diff --git a/src/Vecc.K8s.MultiCluster.Api.Tests/Services/Default/DefaultServiceManagerTests.cs b/src/Vecc.K8s.MultiCluster.Api.Tests/Services/Default/DefaultServiceManagerTests.cs index 356b593..e55e629 100644 --- a/src/Vecc.K8s.MultiCluster.Api.Tests/Services/Default/DefaultServiceManagerTests.cs +++ b/src/Vecc.K8s.MultiCluster.Api.Tests/Services/Default/DefaultServiceManagerTests.cs @@ -168,8 +168,7 @@ public async Task GetLoadBalancerServicesAsync_WithEmptyServices_ReturnsEmpty() private DefaultServiceManager CreateServiceManager() { var kubernetesClientMock = new Mock(); - var namespaceManagerMock = new Mock(); - return new DefaultServiceManager(_loggerMock.Object, kubernetesClientMock.Object, namespaceManagerMock.Object); + return new DefaultServiceManager(_loggerMock.Object, kubernetesClientMock.Object); } private V1EndpointSlice CreateEndpointSlice(int readyCount, int notReadyCount) diff --git a/src/Vecc.K8s.MultiCluster.Api/Program.cs b/src/Vecc.K8s.MultiCluster.Api/Program.cs index ad0db14..9508264 100644 --- a/src/Vecc.K8s.MultiCluster.Api/Program.cs +++ b/src/Vecc.K8s.MultiCluster.Api/Program.cs @@ -107,14 +107,16 @@ throw new Exception($"Expected one of {OperatorFlag}, {OrchestratorFlag}, {DnsServerFlag} or {FrontEndFlag}"); } +builder.Services.AddMemoryCache(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); -builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); -builder.Services.AddSingleton(); +builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); diff --git a/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultBasicCache.cs b/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultBasicCache.cs new file mode 100644 index 0000000..08c5219 --- /dev/null +++ b/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultBasicCache.cs @@ -0,0 +1,32 @@ +using Microsoft.Extensions.Caching.Memory; +using M = Microsoft.Extensions.Caching.Memory; + +namespace Vecc.K8s.MultiCluster.Api.Services.Default +{ + public class BasicCache : IBasicCache + { + private readonly M.MemoryCache _cache; + + public BasicCache(M.IMemoryCache cache) + { + _cache = cache as M.MemoryCache ?? throw new ArgumentException("Expected MemoryCache", nameof(cache)); + } + + public IEnumerable Keys => _cache.Keys; + + public T? Get(string key) + => _cache.Get(key); + + public T? GetOrCreate(string key, Func createFunc) + => _cache.GetOrCreate(key, (_) => createFunc()); + + public bool TryGetValue(string key, out T? value) + => _cache.TryGetValue(key, out value); + + public void Set(string key, T value) + => _cache.Set(key, value); + + public void Remove(string key) + => _cache.Remove(key); + } +} \ No newline at end of file diff --git a/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultCache.cs b/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultCache.cs new file mode 100644 index 0000000..1308e54 --- /dev/null +++ b/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultCache.cs @@ -0,0 +1,136 @@ +using Microsoft.Extensions.Options; +using Vecc.K8s.MultiCluster.Api.Models.K8sEntities; +using k8s.Models; + +namespace Vecc.K8s.MultiCluster.Api.Services.Default; + +public class MemoryCache( + IBasicCache _cache, + IKubernetesCache _kubernetesCache, + IOptions _options) : ICache +{ + public Task GetClusterHeartbeatTimeAsync(string clusterIdentifier) + => _kubernetesCache.GetClusterHeartbeatTimeAsync(clusterIdentifier); + + public Task GetClusterIdentifiersAsync() + => _kubernetesCache.GetClusterIdentifiersAsync(); + + public async Task GetEndpointsCountAsync(string ns, string name) + { + var service = await GetOrCreateServiceCache(ns, name, false); + var result = service?.EndpointCount ?? 0; + return result; + } + + public Task GetHostInformationAsync(string hostname) + => _kubernetesCache.GetHostInformationAsync(hostname); + + public Task GetHostnamesAsync() + => _kubernetesCache.GetHostnamesAsync(); + + public Task GetHostsAsync(string clusterIdentifier) + => _kubernetesCache.GetHostsAsync(clusterIdentifier); + + public Task GetLastResourceVersionAsync(string uniqueIdentifier) + { + lock (_cache) + { + _cache.TryGetValue(GetResourceVersionKey(uniqueIdentifier), out string? cacheValue); + var result = cacheValue ?? string.Empty; + return Task.FromResult(result); + } + } + + public Task IsServiceMonitoredAsync(string ns, string name) + => Task.FromResult(_cache.TryGetValue(GetServiceCacheKey(ns, name), out _)); + + public Task RemoveClusterCacheAsync(string clusterIdentifier) + => _kubernetesCache.RemoveClusterCacheAsync(clusterIdentifier); + + public Task SetClusterCacheAsync(string identifier, Models.Core.Host[] hosts) + => _kubernetesCache.SetClusterCacheAsync(identifier, hosts); + + public Task SetClusterHeartbeatAsync(string clusterIdentifier, DateTime heartbeat) + => _kubernetesCache.SetClusterHeartbeatAsync(clusterIdentifier, heartbeat); + + public async Task SetEndpointsCountAsync(string ns, string name, int count) + { + var serviceCache = await GetOrCreateServiceCache(ns, name); + serviceCache!.EndpointCount = count; + } + + public Task SetResourceVersionAsync(string uniqueIdentifier, string version) + { + lock (_cache) + { + _cache.Set(GetResourceVersionKey(uniqueIdentifier), version); + } + return Task.CompletedTask; + + } + + public Task SynchronizeCachesAsync() + => _kubernetesCache.SynchronizeCachesAsync(); + + public Task TrackServiceAsync(string ns, string name) + => GetOrCreateServiceCache(ns, name); + + public Task UntrackAllServicesAsync() + { + var rawKeys = Array.Empty(); + + lock (_cache) + { + rawKeys = _cache.Keys.ToArray(); + } + + var keys = rawKeys.OfType() + .Where(key => key.StartsWith("service-")); + + foreach (var key in keys) + { + _cache.Remove(key); + } + + return Task.CompletedTask; + } + + private Task GetOrCreateServiceCache(string namespaceName, string name, bool createMissing = true) + { + if (createMissing) + { + var cacheKey = GetServiceCacheKey(namespaceName, name); + var result = _cache.GetOrCreate(cacheKey, () => + { + var serviceCache = new V1ServiceCache(); + var metadata = serviceCache.EnsureMetadata(); + var labels = metadata.EnsureLabels(); + + labels["namespace"] = namespaceName; + labels["name"] = name; + + metadata.Name = namespaceName + "." + name; + metadata.SetNamespace(_options.Value.Namespace); + serviceCache.Service = new V1ObjectReference + { + Name = name, + NamespaceProperty = namespaceName + }; + return serviceCache; + } + ); + return Task.FromResult(result); + } + + return Task.FromResult(_cache.Get(GetServiceCacheKey(namespaceName, name))); + } + + private string GetIngressKey(string ns, string name) + => $"ingress-{ns}-{name}"; + + private string GetResourceVersionKey(string uniqueIdentifier) + => $"resourceVersion-{uniqueIdentifier}"; + + private string GetServiceCacheKey(string ns, string name) + => $"service-{ns}-{name}"; +} diff --git a/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultHostnameSynchronizer.cs b/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultHostnameSynchronizer.cs index ed01f9d..64229f8 100644 --- a/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultHostnameSynchronizer.cs +++ b/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultHostnameSynchronizer.cs @@ -12,7 +12,6 @@ public class DefaultHostnameSynchronizer : IHostnameSynchronizer { private readonly ILogger _logger; private readonly IIngressManager _ingressManager; - private readonly INamespaceManager _namespaceManager; private readonly IServiceManager _serviceManager; private readonly ICache _cache; private readonly IOptions _multiClusterOptions; @@ -24,12 +23,11 @@ public class DefaultHostnameSynchronizer : IHostnameSynchronizer private readonly CancellationTokenSource _shutdownCancellationTokenSource; private readonly CancellationToken _shutdownCancellationToken; private readonly ManualResetEvent _shutdownEvent; - private readonly AutoResetEvent _synchronizeLocalClusterHolder; + private readonly ManualResetEventSlim _synchronizingLocalClustersEvent = new ManualResetEventSlim(true); public DefaultHostnameSynchronizer( ILogger logger, IIngressManager ingressManager, - INamespaceManager namespaceManager, IServiceManager serviceManager, ICache cache, IOptions multiClusterOptions, @@ -41,7 +39,6 @@ public DefaultHostnameSynchronizer( { _logger = logger; _ingressManager = ingressManager; - _namespaceManager = namespaceManager; _serviceManager = serviceManager; _cache = cache; _multiClusterOptions = multiClusterOptions; @@ -54,13 +51,17 @@ public DefaultHostnameSynchronizer( _shutdownCancellationTokenSource = new CancellationTokenSource(); _shutdownEvent = new ManualResetEvent(true); _shutdownCancellationToken = _shutdownCancellationTokenSource.Token; - _synchronizeLocalClusterHolder = new AutoResetEvent(true); } [Trace] public async Task SynchronizeLocalClusterAsync() { - _synchronizeLocalClusterHolder.WaitOne(); + if (!_synchronizingLocalClustersEvent.IsSet) + { + return; + } + _synchronizingLocalClustersEvent.Wait(_lifetime.ApplicationStopping); + using var scope = _logger.BeginScope(new { SyncId = Guid.NewGuid() }); try { @@ -328,7 +329,7 @@ await Task.WhenAll( } finally { - _synchronizeLocalClusterHolder.Set(); + _synchronizingLocalClustersEvent.Set(); } } diff --git a/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultIngressManager.cs b/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultIngressManager.cs index 9bc58f9..560baa4 100644 --- a/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultIngressManager.cs +++ b/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultIngressManager.cs @@ -10,14 +10,12 @@ public class DefaultIngressManager : IIngressManager private const string _serviceNameLabel = "kubernetes.io/service-name"; private readonly ILogger _logger; private readonly IKubernetesClient _kubernetesClient; - private readonly INamespaceManager _namespaceManager; private readonly IServiceManager _serviceManager; - public DefaultIngressManager(ILogger logger, IKubernetesClient kubernetesClient, INamespaceManager namespaceManager, IServiceManager serviceManager) + public DefaultIngressManager(ILogger logger, IKubernetesClient kubernetesClient, IServiceManager serviceManager) { _logger = logger; _kubernetesClient = kubernetesClient; - _namespaceManager = namespaceManager; _serviceManager = serviceManager; } diff --git a/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultNamespaceManager.cs b/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultNamespaceManager.cs deleted file mode 100644 index 7639ae9..0000000 --- a/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultNamespaceManager.cs +++ /dev/null @@ -1,32 +0,0 @@ -using k8s.Models; -using KubeOps.KubernetesClient; -using NewRelic.Api.Agent; - -namespace Vecc.K8s.MultiCluster.Api.Services.Default -{ - public class DefaultNamespaceManager : INamespaceManager - { - private readonly ILogger _logger; - private readonly IKubernetesClient _kubernetesClient; - - public DefaultNamespaceManager(ILogger logger, IKubernetesClient kubernetesClient) - { - _logger = logger; - _kubernetesClient = kubernetesClient; - } - - [Trace] - public async Task> GetNamsepacesAsync() - { - var result = new List(); - - _logger.LogDebug("Getting all namespaces"); - var namespaces = await _kubernetesClient.ListAsync(); - _logger.LogDebug("Done getting all namespaces"); - - result.AddRange(namespaces); - - return result; - } - } -} diff --git a/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultServiceManager.cs b/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultServiceManager.cs index 3f6912d..2e9c3b6 100644 --- a/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultServiceManager.cs +++ b/src/Vecc.K8s.MultiCluster.Api/Services/Default/DefaultServiceManager.cs @@ -11,13 +11,11 @@ public class DefaultServiceManager : IServiceManager private const string _serviceNameLabel = "kubernetes.io/service-name"; private readonly ILogger _logger; private readonly IKubernetesClient _kubernetesClient; - private readonly INamespaceManager _namespaceManager; - public DefaultServiceManager(ILogger logger, IKubernetesClient kubernetesClient, INamespaceManager namespaceManager) + public DefaultServiceManager(ILogger logger, IKubernetesClient kubernetesClient) { _logger = logger; _kubernetesClient = kubernetesClient; - _namespaceManager = namespaceManager; } [Trace] diff --git a/src/Vecc.K8s.MultiCluster.Api/Services/Default/KubernetesApiCache.cs b/src/Vecc.K8s.MultiCluster.Api/Services/Default/KubernetesApiCache.cs index 86a1f65..1d1de3a 100644 --- a/src/Vecc.K8s.MultiCluster.Api/Services/Default/KubernetesApiCache.cs +++ b/src/Vecc.K8s.MultiCluster.Api/Services/Default/KubernetesApiCache.cs @@ -7,7 +7,7 @@ namespace Vecc.K8s.MultiCluster.Api.Services.Default { - public class KubernetesApiCache : ICache + public class KubernetesApiCache : IKubernetesCache { private readonly ILogger _logger; private readonly IKubernetesClient _kubernetesClient; @@ -60,17 +60,6 @@ public async Task GetClusterIdentifiersAsync() return result; } - public async Task GetEndpointsCountAsync(string ns, string name) - { - _logger.LogTrace("Getting endpoint count for {ns}/{name}", ns, name); - - var service = await GetOrCreateServiceCache(ns, name, false); - var result = service?.EndpointCount ?? 0; - - _logger.LogTrace("Got {result} for {ns}/{name}", result, ns, name); - return result; - } - public async Task GetHostInformationAsync(string hostname) { using var _scope = _logger.BeginScope(new { hostname }); @@ -122,29 +111,6 @@ public async Task GetHostnamesAsync() return result; } - public async Task GetLastResourceVersionAsync(string uniqueIdentifier) - { - using var _scope = _logger.BeginScope(new { uniqueIdentifier }); - _logger.LogTrace("Getting last resource version"); - var resource = await _kubernetesClient.GetAsync(uniqueIdentifier, _options.Value.Namespace); - var result = resource?.CurrentResourceVersion ?? string.Empty; - - _logger.LogTrace("Last resource version is {result}", result); - return result; - } - - public async Task IsServiceMonitoredAsync(string ns, string name) - { - using var _scope = _logger.BeginScope(new { @namespace = ns, name }); - _logger.LogTrace("Checking if service"); - - var service = await GetOrCreateServiceCache(ns, name, false); - var result = service != null; - - _logger.LogTrace("Service monitor state: {result}", result); - return result; - } - public async Task RemoveClusterCacheAsync(string clusterIdentifier) { using var _scope = _logger.BeginScope(new { clusterIdentifier }); @@ -268,79 +234,6 @@ public async Task SetClusterHeartbeatAsync(string clusterIdentifier, DateTime he _logger.LogDebug("Done"); } - public async Task SetEndpointsCountAsync(string ns, string name, int count) - { - using var _scope = _logger.BeginScope(new { @namespace = ns, name, count }); - _logger.LogTrace("Setting endpoint count"); - - var set = false; - for (var iterator = 0; iterator < 5; iterator++) - { - try - { - var service = await GetOrCreateServiceCache(ns, name); - - service!.EndpointCount = count; - - await _kubernetesClient.SaveAsync(service); - set = true; - break; - } - catch (Exception exception) - { - var jitter = _random.Next(500); - _logger.LogWarning(exception, "Error setting endpoint. Attempt {attempt}. Waiting {jitter} ms", iterator + 1, jitter); - await Task.Delay(jitter); - } - } - - if (!set) - { - throw new Exception("Unable to set cluster endpoint count after multiple attempts"); - } - - _logger.LogTrace("Done"); - } - - public async Task SetResourceVersionAsync(string uniqueIdentifier, string version) - { - using var _scope = _logger.BeginScope(new { uniqueIdentifier, version }); - _logger.LogDebug("Setting resource version"); - - var set = false; - for (var iterator = 0; iterator < 5; iterator++) - { - try - { - var resource = await _kubernetesClient.GetAsync(uniqueIdentifier, _options.Value.Namespace); - if (resource == null) - { - resource = new V1ResourceCache(); - var metadata = resource.EnsureMetadata(); - metadata.Name = uniqueIdentifier; - metadata.SetNamespace(_options.Value.Namespace); - } - resource.CurrentResourceVersion = version; - await _kubernetesClient.SaveAsync(resource); - set = true; - break; - } - catch (Exception exception) - { - var jitter = _random.Next(500); - _logger.LogWarning(exception, "Unable to set resource version. Attempt {attempt}. Waiting {jitter} ms", iterator + 1, jitter); - await Task.Delay(jitter); - } - } - - if (!set) - { - throw new Exception("Unable to set resource version after multiple attempts"); - } - - _logger.LogTrace("Done"); - } - public async Task SynchronizeCachesAsync() { using var scope = _logger.BeginScope(new { CacheSynchronizeId = Guid.NewGuid() }); @@ -373,14 +266,21 @@ public async Task SynchronizeCachesAsync() hosts[hostname.Hostname] = new List(); } hosts[hostname.Hostname].AddRange(hostname.HostIPs.Select(x => x.ToCore())); + hosts[hostname.Hostname] = + [.. hosts[hostname.Hostname].DistinctBy(x => $"{x.ClusterIdentifier}:{x.IPAddress}:{x.Priority}:{x.Weight}")]; } } _logger.LogDebug("Got {count} host entries", hosts.Count); _logger.LogInformation("Getting current hostnames"); var hostcaches = await _kubernetesClient.ListAsync(_options.Value.Namespace); + var existing = hostcaches.ToDictionary(x => x.Hostname ?? x.GetLabel("hostname"), x => x); + _logger.LogDebug("Hostnames found: {count}", hostcaches.Count); - _logger.LogTrace("Hostnames: {@hostnames}", hostcaches.Select(x => new { Hostname = x.Hostname ?? x.GetLabel("hostname"), IPs = x.Addresses })); + if (_logger.IsEnabled(LogLevel.Trace)) + { + _logger.LogTrace("Hostnames: {@hostnames}", hostcaches.Select(x => new { Hostname = x.Hostname ?? x.GetLabel("hostname"), IPs = x.Addresses })); + } _logger.LogInformation("Setting hostnames ip addresses"); foreach (var host in hosts) @@ -388,7 +288,17 @@ public async Task SynchronizeCachesAsync() var _hostScope = _logger.BeginScope(new { Hostname = host.Key }); try { - var hostcache = (await GetOrCreateHostnameCache(host.Key))!; + V1HostnameCache hostcache; + if (existing.ContainsKey(host.Key)) + { + _logger.LogDebug("Hostname cache entry already exists for {@hostname}, using it", host.Key); + hostcache = existing[host.Key]; + } + else + { + _logger.LogDebug("Hostname cache entry doesn't exist for {@hostname}, creating it", host.Key); + hostcache = (await GetOrCreateHostnameCache(host.Key))!; + } var outOfSync = false; if (hostcache.Hostname == null) @@ -444,7 +354,7 @@ public async Task SynchronizeCachesAsync() catch (Exception ex) { var jitter = _random.Next(500); - _logger.LogWarning(ex, "Unable to synchronize caches. Attempt {attempt}. Waiting {jitter} ms", iterator+1, jitter); + _logger.LogWarning(ex, "Unable to synchronize caches. Attempt {attempt}. Waiting {jitter} ms", iterator + 1, jitter); await Task.Delay(jitter); } } @@ -461,64 +371,6 @@ public async Task SynchronizeCachesAsync() _logger.LogInformation("Done synchronizing caches"); } - public async Task TrackServiceAsync(string ns, string name) - { - using var _scope = _logger.BeginScope(new { @namespace = ns, name }); - var tracked = false; - for (var iterator = 0; iterator < 5; iterator++) - { - try - { - _logger.LogDebug("Tracking service"); - await GetOrCreateServiceCache(ns, name); - tracked = true; - break; - } - catch (Exception exception) - { - var jitter = _random.Next(500); - _logger.LogWarning(exception, "Unable to track service. Attempt {attempt}. Waiting {jitter} ms", iterator + 1, jitter); - await Task.Delay(jitter); - } - } - - if (!tracked) - { - throw new Exception("Unable to track service after multiple attempts"); - } - - _logger.LogDebug("Done"); - } - - public async Task UntrackAllServicesAsync() - { - _logger.LogInformation("Untracking all services"); - var deleted = false; - for (var iterator = 0; iterator < 5; iterator++) - { - try - { - var serviceCaches = await _kubernetesClient.ListAsync(_options.Value.Namespace); - await _kubernetesClient.DeleteAsync(serviceCaches); - deleted = true; - break; - } - catch (Exception exception) - { - var jitter = _random.Next(500); - _logger.LogWarning(exception, "Unable to delete all services. Attempt {attempt}. Waiting {jitter} ms", iterator + 1, jitter); - await Task.Delay(jitter); - } - } - - if (!deleted) - { - throw new Exception("Unable to untrack all services after multiple attempts."); - } - - _logger.LogDebug("Done"); - } - private string GenerateName(string name) { name = string.Concat(name.Where(x => char.IsLetterOrDigit(x))).ToLower(); @@ -630,7 +482,7 @@ private string GenerateName(string name) _logger.LogDebug("Hostname cache entry not found for {hostname}", hostname); return null; } - catch(Exception exception) + catch (Exception exception) { var jitter = _random.Next(500); _logger.LogWarning(exception, "Unable to get or create the hostname cache entry. Attempt {attempt}. Waiting {jitter} ms", @@ -641,59 +493,5 @@ private string GenerateName(string name) throw new Exception("Unable to get or create the hostname cache entry after multiple attempts."); } - - private async Task GetOrCreateServiceCache(string namespaceName, string name, bool createMissing = true) - { - for (var iterator = 0; iterator < 5; iterator++) - { - try - { - var items = await _kubernetesClient.ListAsync(_options.Value.Namespace, new EqualsSelector("namespace", namespaceName), new EqualsSelector("name", name)); - if (items.Count == 1) - { - _logger.LogDebug("Service cache entry found for {namespace}/{name}", namespaceName, name); - return items[0]; - } - else if (items.Count > 1) - { - _logger.LogInformation("Too many service cache objects matching {namespace}/{name}, returning the oldest", namespaceName, name); - return items.OrderBy(x => x.Metadata.CreationTimestamp ?? DateTime.MinValue).First(); - } - - if (createMissing) - { - _logger.LogDebug("Service cache entry didn't exist, creating it. {namespace}/{name}", namespaceName, name); - var serviceCache = new V1ServiceCache(); - var metadata = serviceCache.EnsureMetadata(); - var labels = metadata.EnsureLabels(); - - labels["namespace"] = namespaceName; - labels["name"] = name; - - metadata.Name = GenerateName(namespaceName + "." + name); - metadata.SetNamespace(_options.Value.Namespace); - serviceCache.Service = new V1ObjectReference - { - Name = name, - NamespaceProperty = namespaceName - }; - await _kubernetesClient.CreateAsync(serviceCache); - - return serviceCache; - } - - _logger.LogDebug("Service cache entry not found for {namespace}/{name}", namespaceName, name); - return null; - } - catch (Exception exception) - { - var jitter = _random.Next(500); - _logger.LogWarning(exception, "Unable to get or create service cache. Attempt {attempt}. Waiting {jitter} ms", iterator + 1, jitter); - await Task.Delay(jitter); - } - } - - throw new Exception("Unable to get or create service cache after multiple attempts."); - } } } diff --git a/src/Vecc.K8s.MultiCluster.Api/Services/IBasicCache.cs b/src/Vecc.K8s.MultiCluster.Api/Services/IBasicCache.cs new file mode 100644 index 0000000..1a74503 --- /dev/null +++ b/src/Vecc.K8s.MultiCluster.Api/Services/IBasicCache.cs @@ -0,0 +1,11 @@ +namespace Vecc.K8s.MultiCluster.Api.Services; + +public interface IBasicCache +{ + bool TryGetValue(string key, out T? value); + void Set(string key, T value); + IEnumerable Keys { get; } + void Remove(string key); + T? GetOrCreate(string key, Func createFunc); + T? Get(string key); +} \ No newline at end of file diff --git a/src/Vecc.K8s.MultiCluster.Api/Services/IKubernetesCache.cs b/src/Vecc.K8s.MultiCluster.Api/Services/IKubernetesCache.cs new file mode 100644 index 0000000..e969728 --- /dev/null +++ b/src/Vecc.K8s.MultiCluster.Api/Services/IKubernetesCache.cs @@ -0,0 +1,17 @@ +using Vecc.K8s.MultiCluster.Api.Models.Core; + +namespace Vecc.K8s.MultiCluster.Api.Services +{ + public interface IKubernetesCache + { + Task GetClusterIdentifiersAsync(); + Task GetHostInformationAsync(string hostname); + Task GetHostnamesAsync(); + Task GetHostsAsync(string clusterIdentifier); + Task GetClusterHeartbeatTimeAsync(string clusterIdentifier); + Task SetClusterCacheAsync(string identifier, Models.Core.Host[] hosts); + Task SetClusterHeartbeatAsync(string clusterIdentifier, DateTime heartbeat); + Task SynchronizeCachesAsync(); + Task RemoveClusterCacheAsync(string clusterIdentifier); + } +} diff --git a/src/Vecc.K8s.MultiCluster.Api/Services/INamespaceManager.cs b/src/Vecc.K8s.MultiCluster.Api/Services/INamespaceManager.cs deleted file mode 100644 index a636d32..0000000 --- a/src/Vecc.K8s.MultiCluster.Api/Services/INamespaceManager.cs +++ /dev/null @@ -1,9 +0,0 @@ -using k8s.Models; - -namespace Vecc.K8s.MultiCluster.Api.Services -{ - public interface INamespaceManager - { - Task> GetNamsepacesAsync(); - } -} diff --git a/src/Vecc.K8s.MultiCluster.Api/Vecc.K8s.MultiCluster.Api.csproj b/src/Vecc.K8s.MultiCluster.Api/Vecc.K8s.MultiCluster.Api.csproj index 645046b..4f65ffc 100644 --- a/src/Vecc.K8s.MultiCluster.Api/Vecc.K8s.MultiCluster.Api.csproj +++ b/src/Vecc.K8s.MultiCluster.Api/Vecc.K8s.MultiCluster.Api.csproj @@ -32,13 +32,13 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - + - + diff --git a/test/test.sh b/test/test.sh index bf114e8..8266a3d 100755 --- a/test/test.sh +++ b/test/test.sh @@ -52,7 +52,7 @@ echo_color "${G}Downloading kind" mkdir -p .test if [ ! -f ./.test/kind ] then - curl -Lo ./.test/kind https://kind.sigs.k8s.io/dl/v0.24.0/kind-linux-amd64 + curl -Lo ./.test/kind https://kind.sigs.k8s.io/dl/v0.31.0/kind-linux-amd64 chmod +x ./.test/kind fi