diff --git a/src/Cyclops.MultiCluster.Tests/Controllers/HostControllerTests.cs b/src/Cyclops.MultiCluster.Tests/Controllers/HostControllerTests.cs index cac7f3a..87aa6d1 100644 --- a/src/Cyclops.MultiCluster.Tests/Controllers/HostControllerTests.cs +++ b/src/Cyclops.MultiCluster.Tests/Controllers/HostControllerTests.cs @@ -137,7 +137,7 @@ public async Task UpdateHost_ReplacesExistingHostWithSameHostname() } [Fact] - public async Task UpdateHost_WithNullHostIPs_SetsEmptyHostIPs() + public async Task UpdateHost_WithNullHostIPs_RemovesHostFromCache() { SetupAuthenticatedUser("cluster-1"); _cacheMock.Setup(x => x.GetHostsAsync("cluster-1")) @@ -154,13 +154,12 @@ public async Task UpdateHost_WithNullHostIPs_SetsEmptyHostIPs() _cacheMock.Verify(x => x.SetClusterCacheAsync( "cluster-1", It.Is(hosts => - hosts.Length == 1 && - hosts[0].HostIPs.Length == 0)), + hosts.Length == 0)), Times.Once); } [Fact] - public async Task UpdateHost_WithEmptyHostIPs_SetsEmptyHostIPs() + public async Task UpdateHost_WithEmptyHostIPs_RemovesHostFromCache() { SetupAuthenticatedUser("cluster-1"); _cacheMock.Setup(x => x.GetHostsAsync("cluster-1")) @@ -177,8 +176,7 @@ public async Task UpdateHost_WithEmptyHostIPs_SetsEmptyHostIPs() _cacheMock.Verify(x => x.SetClusterCacheAsync( "cluster-1", It.Is(hosts => - hosts.Length == 1 && - hosts[0].HostIPs.Length == 0)), + hosts.Length == 0)), Times.Once); } diff --git a/src/Cyclops.MultiCluster/Controllers/HostController.cs b/src/Cyclops.MultiCluster/Controllers/HostController.cs index f3cb46e..b99f052 100644 --- a/src/Cyclops.MultiCluster/Controllers/HostController.cs +++ b/src/Cyclops.MultiCluster/Controllers/HostController.cs @@ -1,132 +1,141 @@ -using Microsoft.AspNetCore.Authorization; -using Microsoft.AspNetCore.Mvc; -using Microsoft.Extensions.Options; -using Cyclops.MultiCluster.Models.Api; -using Cyclops.MultiCluster.Services; - -namespace Cyclops.MultiCluster.Controllers -{ - /// - /// Host based operations happen here - /// - [ApiController] - [Authorize] - [Route("[controller]")] - public class HostController : ControllerBase - { - private readonly ILogger _logger; - private readonly ICache _cache; - private readonly IOptions _options; - - /// - /// Host based operations constructor - /// - /// - /// - /// - public HostController(ILogger logger, ICache cache, IOptions options) - { - _logger = logger; - _cache = cache; - _options = options; - } - - /// - /// Updates the host with the new ip's - /// - /// - /// - [HttpPost] - [ProducesResponseType(204)] - [ProducesResponseType(400)] - [ProducesResponseType(401)] - [ProducesResponseType(500)] - public async Task UpdateHost([FromBody] HostModel model) - { - using var scope = _logger.BeginScope(new { hostname = model.Hostname }); - - _logger.LogInformation("Received host update"); - _logger.LogDebug("Model {@model}", model); - - try - { - if (!ModelState.IsValid) - { - _logger.LogWarning("Invalid request state {@model}", model); - return BadRequest(ModelState); - } - - var clusterIdentifier = User.Identity!.Name!; - var hostIPs = Array.Empty(); - - if (model.HostIPs != null && model.HostIPs.Any()) - { - hostIPs = model.HostIPs.Select(ip => new Models.Core.HostIP - { - IPAddress = ip.IPAddress, - Priority = ip.Priority, - Weight = ip.Weight, - ClusterIdentifier = clusterIdentifier - }).ToArray(); - } - - var cluster = await _cache.GetHostsAsync(clusterIdentifier); - var hosts = cluster?.Where(x => x.Hostname != model.Hostname).ToList() ?? new List(); - hosts.Add(new Models.Core.Host - { - Hostname = model.Hostname, - HostIPs = hostIPs - }); - await _cache.SetClusterCacheAsync(clusterIdentifier, hosts.ToArray()); - - return NoContent(); - } - catch (Exception exception) - { - _logger.LogError(exception, "Error updating host with {@model}", model); - return base.Problem(exception.Message); - } - } - - /// - /// Get all hosts in this cluster - /// - /// - [HttpGet] - [ProducesResponseType(typeof(HostModel[]), 200)] - [ProducesResponseType(404)] - public async Task> Get() - { - _logger.LogInformation("Getting a list of local hosts"); - try - { - var clusterIdentifier = _options.Value.ClusterIdentifier; - var hosts = await _cache.GetHostsAsync(clusterIdentifier); - - if (hosts == null) - { - _logger.LogWarning("Hosts for the local cluster is not found."); - return NotFound(); - } - - var result = hosts.Select(host => new HostModel - { - HostIPs = host.HostIPs.Select(ip => new HostIP - { - IPAddress = ip.IPAddress, - Priority = ip.Priority, - Weight = ip.Weight - }).ToArray(), - Hostname = host.Hostname - }); - - return Ok(result); - } - catch (Exception exception) - { - _logger.LogError(exception, "Unable to get local cluster hosts"); - throw; - } - } - } -} +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Extensions.Options; +using Cyclops.MultiCluster.Models.Api; +using Cyclops.MultiCluster.Services; + +namespace Cyclops.MultiCluster.Controllers +{ + /// + /// Host based operations happen here + /// + [ApiController] + [Authorize] + [Route("[controller]")] + public class HostController : ControllerBase + { + private readonly ILogger _logger; + private readonly ICache _cache; + private readonly IOptions _options; + + /// + /// Host based operations constructor + /// + /// + /// + /// + public HostController(ILogger logger, ICache cache, IOptions options) + { + _logger = logger; + _cache = cache; + _options = options; + } + + /// + /// Updates the host with the new ip's + /// + /// + /// + [HttpPost] + [ProducesResponseType(204)] + [ProducesResponseType(400)] + [ProducesResponseType(401)] + [ProducesResponseType(500)] + public async Task UpdateHost([FromBody] HostModel model) + { + using var scope = _logger.BeginScope(new { hostname = model.Hostname }); + + _logger.LogInformation("Received host update"); + _logger.LogDebug("Model {@model}", model); + + try + { + if (!ModelState.IsValid) + { + _logger.LogWarning("Invalid request state {@model}", model); + return BadRequest(ModelState); + } + + var clusterIdentifier = User.Identity!.Name!; + var hostIPs = Array.Empty(); + + if (model.HostIPs != null && model.HostIPs.Any()) + { + hostIPs = model.HostIPs.Select(ip => new Models.Core.HostIP + { + IPAddress = ip.IPAddress, + Priority = ip.Priority, + Weight = ip.Weight, + ClusterIdentifier = clusterIdentifier + }).ToArray(); + } + + var cluster = await _cache.GetHostsAsync(clusterIdentifier); + var hosts = cluster?.Where(x => x.Hostname != model.Hostname).ToList() ?? new List(); + + if (hostIPs.Length > 0) + { + hosts.Add(new Models.Core.Host + { + Hostname = model.Hostname, + HostIPs = hostIPs + }); + } + else + { + _logger.LogInformation("Removing host {hostname} from cluster cache for {clusterIdentifier}", model.Hostname, clusterIdentifier); + } + + await _cache.SetClusterCacheAsync(clusterIdentifier, hosts.ToArray()); + + return NoContent(); + } + catch (Exception exception) + { + _logger.LogError(exception, "Error updating host with {@model}", model); + return base.Problem(exception.Message); + } + } + + /// + /// Get all hosts in this cluster + /// + /// + [HttpGet] + [ProducesResponseType(typeof(HostModel[]), 200)] + [ProducesResponseType(404)] + public async Task> Get() + { + _logger.LogInformation("Getting a list of local hosts"); + try + { + var clusterIdentifier = _options.Value.ClusterIdentifier; + var hosts = await _cache.GetHostsAsync(clusterIdentifier); + + if (hosts == null) + { + _logger.LogWarning("Hosts for the local cluster is not found."); + return NotFound(); + } + + var result = hosts.Select(host => new HostModel + { + HostIPs = host.HostIPs.Select(ip => new HostIP + { + IPAddress = ip.IPAddress, + Priority = ip.Priority, + Weight = ip.Weight + }).ToArray(), + Hostname = host.Hostname + }); + + return Ok(result); + } + catch (Exception exception) + { + _logger.LogError(exception, "Unable to get local cluster hosts"); + throw; + } + } + } +} diff --git a/src/Cyclops.MultiCluster/Program.cs b/src/Cyclops.MultiCluster/Program.cs index d6bb698..0bf4b68 100644 --- a/src/Cyclops.MultiCluster/Program.cs +++ b/src/Cyclops.MultiCluster/Program.cs @@ -1,355 +1,356 @@ -using Destructurama; -using k8s.Models; -using KubeOps.Abstractions.Builder; -using KubeOps.KubernetesClient; -using KubeOps.Operator; -using Microsoft.OpenApi; -using Scalar.AspNetCore; -using Serilog; -using Cyclops.MultiCluster.Controllers; -using Cyclops.MultiCluster.Models.K8sEntities; -using Cyclops.MultiCluster.Services; -using Cyclops.MultiCluster.Services.Authentication; -using Cyclops.MultiCluster.Services.Default; - -const string OperatorFlag = "--operator"; -const string OrchestratorFlag = "--orchestrator"; -const string DnsServerFlag = "--dns-server"; -const string FrontEndFlag = "--front-end"; - -var builder = WebApplication.CreateBuilder(args); -builder.Configuration.AddJsonFile("appsettings.logging.json") - .AddEnvironmentVariables() - .AddCommandLine(args); - -builder.Services.Configure(builder.Configuration.GetSection("Authentication")); - -builder.Services.Configure(builder.Configuration); -var options = new MultiClusterOptions(); -builder.Configuration.Bind(options); - -//Enable http/2 only -// .net core only allows one http protocol on http ports. GRPC requires http/2. So we force it. -builder.WebHost.ConfigureKestrel(o => -{ - o.ListenAnyIP(options.ListenPort, (lo) => - { - lo.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1; - }); - - if (args.Contains(DnsServerFlag)) - { - o.ListenAnyIP(options.ListenGrpcPort, (lo) => - { - lo.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http2; - }); - } -}); - -builder.Host.UseSerilog((context, configuration) => -{ - configuration.ReadFrom.Configuration(context.Configuration) - .Destructure.UsingAttributes(); -}); - -if (args.Contains(OperatorFlag)) -{ - builder.Services.AddSingleton(); - builder.Services.AddKubernetesOperator((operatorSettings) => - { - operatorSettings.AutoAttachFinalizers = false; - operatorSettings.AutoDetachFinalizers = false; - operatorSettings.Name = "operator"; - operatorSettings.LeaderElectionType = LeaderElectionType.Single; - }) - .AddController() - .AddController() - .AddController() - .AddController(); -} -else if (args.Contains(OrchestratorFlag)) -{ - builder.Services.AddSingleton(); - builder.Services.AddKubernetesOperator((operatorSettings) => - { - operatorSettings.AutoAttachFinalizers = false; - operatorSettings.AutoDetachFinalizers = false; - operatorSettings.Name = "orchestrator"; - operatorSettings.Namespace = options.Namespace; - operatorSettings.LeaderElectionType = LeaderElectionType.Single; - }) - .AddController(); -} -else if (args.Contains(DnsServerFlag)) -{ - builder.Services.AddKubernetesOperator((operatorSettings) => - { - operatorSettings.AutoAttachFinalizers = false; - operatorSettings.AutoDetachFinalizers = false; - operatorSettings.Name = "dnsserver"; - operatorSettings.Namespace = options.Namespace; - operatorSettings.LeaderElectionType = LeaderElectionType.None; - }) - .AddController(); - - builder.Services.AddSingleton(); - builder.Services.AddGrpc((o) => - { - o.EnableDetailedErrors = true; - }); -} -else if (args.Contains(FrontEndFlag)) -{ - builder.Services.AddSingleton(); -} -else -{ - throw new Exception($"Expected one of {OperatorFlag}, {OrchestratorFlag}, {DnsServerFlag} or {FrontEndFlag}"); -} - -builder.Services.AddMemoryCache(); -builder.Services.AddSingleton(); -builder.Services.AddSingleton(); -builder.Services.AddSingleton(); -builder.Services.AddSingleton(); -builder.Services.AddSingleton(); -builder.Services.AddSingleton(); -builder.Services.AddSingleton(); -builder.Services.AddSingleton(); -builder.Services.AddSingleton(); -builder.Services.AddSingleton(); -builder.Services.AddSingleton(); -builder.Services.AddSingleton(); -builder.Services.AddSingleton(); -builder.Services.AddSingleton((s) => s.GetRequiredService()); -builder.Services.AddScoped(); -builder.Services.AddSingleton(); -builder.Services.AddHttpClient(); -builder.Services.AddAuthentication(ApiAuthenticationHandlerOptions.DefaultScheme) - .AddScheme(ApiAuthenticationHandlerOptions.DefaultScheme, null); -builder.Services.AddControllers(); -builder.Services.AddEndpointsApiExplorer(); -builder.Services.AddSwaggerGen(options => -{ - options.AddSecurityDefinition("X-Api-Key", new OpenApiSecurityScheme{ - In = ParameterLocation.Header, - Name = "X-Api-Key", - Type = SecuritySchemeType.ApiKey - }); - options.IncludeXmlComments(typeof(Program).Assembly); - options.OperationFilter(); -}); - - -foreach (var peer in options.Peers) -{ - builder.Services.AddHttpClient(peer.Url, client => - { - client.BaseAddress = new Uri(peer.Url); - client.DefaultRequestHeaders.Add("X-Api-Key", peer.Key); - }); -} - -var app = builder.Build(); - -app.UseWhen(context => !context.Request.Path.StartsWithSegments("/Healthz"), appBuilder => appBuilder.UseSerilogRequestLogging()); -app.UseSwagger(); -app.UseSwagger(options => -{ - options.RouteTemplate = "/openapi/{documentName}.json"; -}); -app.MapScalarApiReference(); - -app.UseRouting(); -app.UseAuthentication(); -app.UseAuthorization(); -app.MapControllers(); - -var logger = app.Services.GetRequiredService>(); - -logger.LogInformation("Starting"); -logger.LogInformation("Configured Options {@options}", options); - -var processTasks = new List(); - -// watches the cluster caches and updates the host cache, also expires old cluster caches -if (args.Contains(OperatorFlag)) -{ - logger.LogInformation("Running the operator"); - - var hostnameSynchronizer = app.Services.GetRequiredService(); - - processTasks.Add(Task.Run(async () => - { - logger.LogInformation("Starting the operator leader watcher"); - var leaderStateChanged = app.Services.GetRequiredService(); - var lifecycle = app.Lifetime; - - while (!lifecycle.ApplicationStopping.IsCancellationRequested) - { - await Task.Yield(); - await Task.Delay(1000); - }; - }).ContinueWith(_ => logger.LogInformation("Operator leader watcher stopped"))); - - processTasks.Add(Task.Run(() => - { - logger.LogInformation("Starting cluster heartbeat"); - return hostnameSynchronizer.ClusterHeartbeatAsync().ContinueWith(_ => - { - logger.LogInformation("Cluster heartbeat stopped"); - }); - })); - - if (options.PeriodicRefreshInterval <= 0) - { - logger.LogInformation("Perioid refresh interval is {interval} which is <= 0, disabling periodic refresher.", options.PeriodicRefreshInterval); - } - else - { - processTasks.Add(Task.Run(async () => - { - logger.LogInformation("Starting the periodic refresher"); - var lifecycle = app.Lifetime; - var leaderStatus = app.Services.GetRequiredService(); - - while (!lifecycle.ApplicationStopping.IsCancellationRequested) - { - await Task.Yield(); - await Task.Delay(options.PeriodicRefreshInterval * 1000); - using var scope = logger.BeginScope(new { PeriodicRefreshId = Guid.NewGuid() }); - - if (leaderStatus.IsLeader) - { - logger.LogInformation("Initiating periodic refresh"); - try - { - await hostnameSynchronizer.SynchronizeLocalClusterAsync(); - } - catch (Exception ex) - { - logger.LogError(ex, "Error during periodic refresh"); - } - } - else - { - logger.LogTrace("Not the leader, skipping periodic refresh"); - } - }; - }).ContinueWith(_ => logger.LogInformation("Periodic refresher stopped"))); - } - - processTasks.Add(Task.Run(() => - { - logger.LogInformation("Running API Server for health checks"); - return app.RunAsync().ContinueWith(_ => logger.LogInformation("API Server stopped")); - })); -} - -//watches cluster events and keeps the local cluster config in sync and sends updates to other nodes -else if (args.Contains(OrchestratorFlag)) -{ - logger.LogInformation("Running the orchestrator"); - - processTasks.Add(Task.Run(() => - { - var hostnameSynchronizer = app.Services.GetRequiredService(); - logger.LogInformation("Starting cluster heartbeat watcher"); - return hostnameSynchronizer.WatchClusterHeartbeatsAsync().ContinueWith(_ => logger.LogInformation("Cluster heartbeat watcher stopped")); - })); - - processTasks.Add(Task.Run(async () => - { - logger.LogInformation("Starting the orchestrator leader watcher"); - var leaderStateChanged = app.Services.GetRequiredService(); - while (true) - { - await Task.Yield(); - await Task.Delay(1000); - } - }).ContinueWith(_ => logger.LogInformation("Orchestrator leader watcher stopped"))); - - - if (options.PeriodicRefreshInterval <= 0) - { - logger.LogInformation("Perioid refresh interval is {interval} which is <= 0, disabling periodic refresher.", options.PeriodicRefreshInterval); - } - else - { - processTasks.Add(Task.Run(async () => - { - logger.LogInformation("Starting the periodic refresher"); - var lifecycle = app.Lifetime; - var leaderStatus = app.Services.GetRequiredService(); - var cache = app.Services.GetRequiredService(); - - while (!lifecycle.ApplicationStopping.IsCancellationRequested) - { - await Task.Yield(); - await Task.Delay(options.PeriodicRefreshInterval * 1000); - using var scope = logger.BeginScope(new { PeriodicRefreshId = Guid.NewGuid() }); - - if (leaderStatus.IsLeader) - { - logger.LogInformation("Initiating periodic refresh"); - try - { - await cache.SynchronizeCachesAsync(); - } - catch (Exception ex) - { - logger.LogError(ex, "Error during periodic refresh"); - } - } - else - { - logger.LogTrace("Not the leader, skipping periodic refresh"); - } - }; - }).ContinueWith(_ => logger.LogInformation("Periodic refresher stopped"))); - } - - processTasks.Add(Task.Run(() => - { - logger.LogInformation("Running API Server for health checks"); - return app.RunAsync().ContinueWith(_ => logger.LogInformation("API Server stopped")); - })); -} - -//starts the dns server to respond to dns queries for the respective hosts -else if (args.Contains(DnsServerFlag)) -{ - logger.LogInformation("Running the dns server"); - - var dnsResolver = app.Services.GetRequiredService(); - var queue = app.Services.GetRequiredService(); - - await dnsResolver.InitializeAsync(); - queue.OnHostChangedAsync = dnsResolver.OnHostChangedAsync; - - app.MapGrpcService() - .WithHttpLogging(Microsoft.AspNetCore.HttpLogging.HttpLoggingFields.All) - .AllowAnonymous(); - - processTasks.Add(Task.Run(() => - { - logger.LogInformation("Running API Server for health checks"); - return app.RunAsync().ContinueWith(_ => logger.LogInformation("API Server stopped")); - })); -} - -//starts the api server -else if (args.Contains(FrontEndFlag)) -{ - processTasks.Add(Task.Run(() => - { - logger.LogInformation("Running API Server"); - return app.RunAsync().ContinueWith(_ => logger.LogInformation("API Server stopped")); - })); -} - -logger.LogInformation("Waiting on process tasks"); - -await Task.WhenAny(processTasks); - -logger.LogInformation("Terminated"); +using Destructurama; +using k8s.Models; +using KubeOps.Abstractions.Builder; +using KubeOps.KubernetesClient; +using KubeOps.Operator; +using Microsoft.OpenApi; +using Scalar.AspNetCore; +using Serilog; +using Cyclops.MultiCluster.Controllers; +using Cyclops.MultiCluster.Models.K8sEntities; +using Cyclops.MultiCluster.Services; +using Cyclops.MultiCluster.Services.Authentication; +using Cyclops.MultiCluster.Services.Default; + +const string OperatorFlag = "--operator"; +const string OrchestratorFlag = "--orchestrator"; +const string DnsServerFlag = "--dns-server"; +const string FrontEndFlag = "--front-end"; + +var builder = WebApplication.CreateBuilder(args); +builder.Configuration.AddJsonFile("appsettings.logging.json") + .AddEnvironmentVariables() + .AddCommandLine(args); + +builder.Services.Configure(builder.Configuration.GetSection("Authentication")); + +builder.Services.Configure(builder.Configuration); +var options = new MultiClusterOptions(); +builder.Configuration.Bind(options); + +//Enable http/2 only +// .net core only allows one http protocol on http ports. GRPC requires http/2. So we force it. +builder.WebHost.ConfigureKestrel(o => +{ + o.ListenAnyIP(options.ListenPort, (lo) => + { + lo.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1; + }); + + if (args.Contains(DnsServerFlag)) + { + o.ListenAnyIP(options.ListenGrpcPort, (lo) => + { + lo.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http2; + }); + } +}); + +builder.Host.UseSerilog((context, configuration) => +{ + configuration.ReadFrom.Configuration(context.Configuration) + .Destructure.UsingAttributes(); +}); + +if (args.Contains(OperatorFlag)) +{ + builder.Services.AddSingleton(); + builder.Services.AddKubernetesOperator((operatorSettings) => + { + operatorSettings.AutoAttachFinalizers = false; + operatorSettings.AutoDetachFinalizers = false; + operatorSettings.Name = "operator"; + operatorSettings.LeaderElectionType = LeaderElectionType.Single; + }) + .AddController() + .AddController() + .AddController() + .AddController() + .AddController(); +} +else if (args.Contains(OrchestratorFlag)) +{ + builder.Services.AddSingleton(); + builder.Services.AddKubernetesOperator((operatorSettings) => + { + operatorSettings.AutoAttachFinalizers = false; + operatorSettings.AutoDetachFinalizers = false; + operatorSettings.Name = "orchestrator"; + operatorSettings.Namespace = options.Namespace; + operatorSettings.LeaderElectionType = LeaderElectionType.Single; + }) + .AddController(); +} +else if (args.Contains(DnsServerFlag)) +{ + builder.Services.AddKubernetesOperator((operatorSettings) => + { + operatorSettings.AutoAttachFinalizers = false; + operatorSettings.AutoDetachFinalizers = false; + operatorSettings.Name = "dnsserver"; + operatorSettings.Namespace = options.Namespace; + operatorSettings.LeaderElectionType = LeaderElectionType.None; + }) + .AddController(); + + builder.Services.AddSingleton(); + builder.Services.AddGrpc((o) => + { + o.EnableDetailedErrors = true; + }); +} +else if (args.Contains(FrontEndFlag)) +{ + builder.Services.AddSingleton(); +} +else +{ + throw new Exception($"Expected one of {OperatorFlag}, {OrchestratorFlag}, {DnsServerFlag} or {FrontEndFlag}"); +} + +builder.Services.AddMemoryCache(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton(); +builder.Services.AddSingleton((s) => s.GetRequiredService()); +builder.Services.AddScoped(); +builder.Services.AddSingleton(); +builder.Services.AddHttpClient(); +builder.Services.AddAuthentication(ApiAuthenticationHandlerOptions.DefaultScheme) + .AddScheme(ApiAuthenticationHandlerOptions.DefaultScheme, null); +builder.Services.AddControllers(); +builder.Services.AddEndpointsApiExplorer(); +builder.Services.AddSwaggerGen(options => +{ + options.AddSecurityDefinition("X-Api-Key", new OpenApiSecurityScheme{ + In = ParameterLocation.Header, + Name = "X-Api-Key", + Type = SecuritySchemeType.ApiKey + }); + options.IncludeXmlComments(typeof(Program).Assembly); + options.OperationFilter(); +}); + + +foreach (var peer in options.Peers) +{ + builder.Services.AddHttpClient(peer.Url, client => + { + client.BaseAddress = new Uri(peer.Url); + client.DefaultRequestHeaders.Add("X-Api-Key", peer.Key); + }); +} + +var app = builder.Build(); + +app.UseWhen(context => !context.Request.Path.StartsWithSegments("/Healthz"), appBuilder => appBuilder.UseSerilogRequestLogging()); +app.UseSwagger(); +app.UseSwagger(options => +{ + options.RouteTemplate = "/openapi/{documentName}.json"; +}); +app.MapScalarApiReference(); + +app.UseRouting(); +app.UseAuthentication(); +app.UseAuthorization(); +app.MapControllers(); + +var logger = app.Services.GetRequiredService>(); + +logger.LogInformation("Starting"); +logger.LogInformation("Configured Options {@options}", options); + +var processTasks = new List(); + +// watches the cluster caches and updates the host cache, also expires old cluster caches +if (args.Contains(OperatorFlag)) +{ + logger.LogInformation("Running the operator"); + + var hostnameSynchronizer = app.Services.GetRequiredService(); + + processTasks.Add(Task.Run(async () => + { + logger.LogInformation("Starting the operator leader watcher"); + var leaderStateChanged = app.Services.GetRequiredService(); + var lifecycle = app.Lifetime; + + while (!lifecycle.ApplicationStopping.IsCancellationRequested) + { + await Task.Yield(); + await Task.Delay(1000); + }; + }).ContinueWith(_ => logger.LogInformation("Operator leader watcher stopped"))); + + processTasks.Add(Task.Run(() => + { + logger.LogInformation("Starting cluster heartbeat"); + return hostnameSynchronizer.ClusterHeartbeatAsync().ContinueWith(_ => + { + logger.LogInformation("Cluster heartbeat stopped"); + }); + })); + + if (options.PeriodicRefreshInterval <= 0) + { + logger.LogInformation("Perioid refresh interval is {interval} which is <= 0, disabling periodic refresher.", options.PeriodicRefreshInterval); + } + else + { + processTasks.Add(Task.Run(async () => + { + logger.LogInformation("Starting the periodic refresher"); + var lifecycle = app.Lifetime; + var leaderStatus = app.Services.GetRequiredService(); + + while (!lifecycle.ApplicationStopping.IsCancellationRequested) + { + await Task.Yield(); + await Task.Delay(options.PeriodicRefreshInterval * 1000); + using var scope = logger.BeginScope(new { PeriodicRefreshId = Guid.NewGuid() }); + + if (leaderStatus.IsLeader) + { + logger.LogInformation("Initiating periodic refresh"); + try + { + await hostnameSynchronizer.SynchronizeLocalClusterAsync(); + } + catch (Exception ex) + { + logger.LogError(ex, "Error during periodic refresh"); + } + } + else + { + logger.LogTrace("Not the leader, skipping periodic refresh"); + } + }; + }).ContinueWith(_ => logger.LogInformation("Periodic refresher stopped"))); + } + + processTasks.Add(Task.Run(() => + { + logger.LogInformation("Running API Server for health checks"); + return app.RunAsync().ContinueWith(_ => logger.LogInformation("API Server stopped")); + })); +} + +//watches cluster events and keeps the local cluster config in sync and sends updates to other nodes +else if (args.Contains(OrchestratorFlag)) +{ + logger.LogInformation("Running the orchestrator"); + + processTasks.Add(Task.Run(() => + { + var hostnameSynchronizer = app.Services.GetRequiredService(); + logger.LogInformation("Starting cluster heartbeat watcher"); + return hostnameSynchronizer.WatchClusterHeartbeatsAsync().ContinueWith(_ => logger.LogInformation("Cluster heartbeat watcher stopped")); + })); + + processTasks.Add(Task.Run(async () => + { + logger.LogInformation("Starting the orchestrator leader watcher"); + var leaderStateChanged = app.Services.GetRequiredService(); + while (true) + { + await Task.Yield(); + await Task.Delay(1000); + } + }).ContinueWith(_ => logger.LogInformation("Orchestrator leader watcher stopped"))); + + + if (options.PeriodicRefreshInterval <= 0) + { + logger.LogInformation("Perioid refresh interval is {interval} which is <= 0, disabling periodic refresher.", options.PeriodicRefreshInterval); + } + else + { + processTasks.Add(Task.Run(async () => + { + logger.LogInformation("Starting the periodic refresher"); + var lifecycle = app.Lifetime; + var leaderStatus = app.Services.GetRequiredService(); + var cache = app.Services.GetRequiredService(); + + while (!lifecycle.ApplicationStopping.IsCancellationRequested) + { + await Task.Yield(); + await Task.Delay(options.PeriodicRefreshInterval * 1000); + using var scope = logger.BeginScope(new { PeriodicRefreshId = Guid.NewGuid() }); + + if (leaderStatus.IsLeader) + { + logger.LogInformation("Initiating periodic refresh"); + try + { + await cache.SynchronizeCachesAsync(); + } + catch (Exception ex) + { + logger.LogError(ex, "Error during periodic refresh"); + } + } + else + { + logger.LogTrace("Not the leader, skipping periodic refresh"); + } + }; + }).ContinueWith(_ => logger.LogInformation("Periodic refresher stopped"))); + } + + processTasks.Add(Task.Run(() => + { + logger.LogInformation("Running API Server for health checks"); + return app.RunAsync().ContinueWith(_ => logger.LogInformation("API Server stopped")); + })); +} + +//starts the dns server to respond to dns queries for the respective hosts +else if (args.Contains(DnsServerFlag)) +{ + logger.LogInformation("Running the dns server"); + + var dnsResolver = app.Services.GetRequiredService(); + var queue = app.Services.GetRequiredService(); + + await dnsResolver.InitializeAsync(); + queue.OnHostChangedAsync = dnsResolver.OnHostChangedAsync; + + app.MapGrpcService() + .WithHttpLogging(Microsoft.AspNetCore.HttpLogging.HttpLoggingFields.All) + .AllowAnonymous(); + + processTasks.Add(Task.Run(() => + { + logger.LogInformation("Running API Server for health checks"); + return app.RunAsync().ContinueWith(_ => logger.LogInformation("API Server stopped")); + })); +} + +//starts the api server +else if (args.Contains(FrontEndFlag)) +{ + processTasks.Add(Task.Run(() => + { + logger.LogInformation("Running API Server"); + return app.RunAsync().ContinueWith(_ => logger.LogInformation("API Server stopped")); + })); +} + +logger.LogInformation("Waiting on process tasks"); + +await Task.WhenAny(processTasks); + +logger.LogInformation("Terminated");