Skip to content

Commit 0a07926

Browse files
author
VIMPELCOM_MAIN\NKuksov
committed
Added 'WatchKube' discovery provider
1 parent 414f634 commit 0a07926

File tree

6 files changed

+133
-16
lines changed

6 files changed

+133
-16
lines changed

src/Ocelot.Provider.Kubernetes/EndPointClientV1.cs

+25-4
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,24 @@ namespace Ocelot.Provider.Kubernetes
77
{
88
public class EndPointClientV1 : KubeResourceClient, IEndPointClient
99
{
10-
private readonly HttpRequest _collection;
10+
private readonly HttpRequest _byName;
11+
private readonly HttpRequest _watchByName;
1112

1213
public EndPointClientV1(IKubeApiClient client) : base(client)
1314
{
14-
_collection = KubeRequest.Create("api/v1/namespaces/{Namespace}/endpoints/{ServiceName}");
15+
_byName = KubeRequest.Create("api/v1/namespaces/{Namespace}/endpoints/{ServiceName}");
16+
_watchByName = KubeRequest.Create("api/v1/watch/namespaces/{Namespace}/endpoints/{ServiceName}");
1517
}
1618

17-
public async Task<EndpointsV1> GetAsync(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default)
19+
public async Task<EndpointsV1> GetAsync(string serviceName, string kubeNamespace = null,
20+
CancellationToken cancellationToken = default)
1821
{
1922
if (string.IsNullOrEmpty(serviceName))
2023
{
2124
throw new ArgumentNullException(nameof(serviceName));
2225
}
2326

24-
var request = _collection
27+
var request = _byName
2528
.WithTemplateParameters(new
2629
{
2730
Namespace = kubeNamespace ?? KubeClient.DefaultNamespace,
@@ -34,5 +37,23 @@ public async Task<EndpointsV1> GetAsync(string serviceName, string kubeNamespace
3437
? await response.ReadContentAsAsync<EndpointsV1>()
3538
: null;
3639
}
40+
41+
public IObservable<IResourceEventV1<EndpointsV1>> Watch(string serviceName, string kubeNamespace,
42+
CancellationToken cancellationToken = default)
43+
{
44+
if (string.IsNullOrEmpty(serviceName))
45+
{
46+
throw new ArgumentNullException(nameof(serviceName));
47+
}
48+
49+
return ObserveEvents<EndpointsV1>(
50+
_watchByName.WithTemplateParameters(new
51+
{
52+
ServiceName = serviceName,
53+
Namespace = kubeNamespace ?? KubeClient.DefaultNamespace,
54+
}),
55+
"watch v1/Endpoints '" + serviceName + "' in namespace " +
56+
(kubeNamespace ?? KubeClient.DefaultNamespace));
57+
}
3758
}
3859
}

src/Ocelot.Provider.Kubernetes/Interfaces/IEndPointClient.cs

+2
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,6 @@ namespace Ocelot.Provider.Kubernetes.Interfaces;
66
public interface IEndPointClient : IKubeResourceClient
77
{
88
Task<EndpointsV1> GetAsync(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default);
9+
10+
IObservable<IResourceEventV1<EndpointsV1>> Watch(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default);
911
}

src/Ocelot.Provider.Kubernetes/Kube.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public virtual async Task<List<Service>> GetAsync()
4747
}
4848

4949
private Task<EndpointsV1> GetEndpoint() => _kubeApi
50-
.ResourceClient(client => new EndPointClientV1(client))
50+
.EndpointsV1()
5151
.GetAsync(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace);
5252

5353
private bool CheckErroneousState(EndpointsV1 endpoint)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
using Ocelot.Provider.Kubernetes.Interfaces;
2+
3+
namespace Ocelot.Provider.Kubernetes;
4+
5+
public static class KubeApiClientExtensions
6+
{
7+
public static IEndPointClient EndpointsV1(this IKubeApiClient client)
8+
=> client.ResourceClient(x => new EndPointClientV1(client));
9+
}
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
1-
using Microsoft.Extensions.DependencyInjection;
1+
using Microsoft.Extensions.DependencyInjection;
22
using Ocelot.Configuration;
3-
using Ocelot.Logging;
3+
using Ocelot.Logging;
44
using Ocelot.Provider.Kubernetes.Interfaces;
55

66
namespace Ocelot.Provider.Kubernetes
77
{
88
public static class KubernetesProviderFactory
9-
{
10-
/// <summary>
11-
/// String constant used for provider type definition.
9+
{
10+
/// <summary>
11+
/// String constant used for provider type definition.
1212
/// </summary>
1313
public const string PollKube = nameof(Kubernetes.PollKube);
14-
15-
public static ServiceDiscoveryFinderDelegate Get { get; } = CreateProvider;
14+
15+
public const string WatchKube = nameof(Kubernetes.WatchKube);
16+
17+
public static ServiceDiscoveryFinderDelegate Get { get; } = CreateProvider;
1618

1719
private static IServiceDiscoveryProvider CreateProvider(IServiceProvider provider, ServiceProviderConfiguration config, DownstreamRoute route)
1820
{
@@ -27,11 +29,16 @@ private static IServiceDiscoveryProvider CreateProvider(IServiceProvider provide
2729
Scheme = route.DownstreamScheme,
2830
};
2931

32+
if (WatchKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase))
33+
{
34+
return new WatchKube(configuration, factory, kubeClient, serviceBuilder);
35+
}
36+
3037
var defaultK8sProvider = new Kube(configuration, factory, kubeClient, serviceBuilder);
31-
32-
return PollKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase)
33-
? new PollKube(config.PollingInterval, factory, defaultK8sProvider)
34-
: defaultK8sProvider;
38+
39+
return PollKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase)
40+
? new PollKube(config.PollingInterval, factory, defaultK8sProvider)
41+
: defaultK8sProvider;
3542
}
3643
}
3744
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
using KubeClient.Models;
2+
using Ocelot.Logging;
3+
using Ocelot.Provider.Kubernetes.Interfaces;
4+
using Ocelot.Values;
5+
6+
namespace Ocelot.Provider.Kubernetes;
7+
8+
// Dispose() won't be called because provider wasn't resolved from DI
9+
public class WatchKube : IServiceDiscoveryProvider, IDisposable
10+
{
11+
private readonly KubeRegistryConfiguration _configuration;
12+
private readonly IOcelotLogger _logger;
13+
private readonly IKubeApiClient _kubeApi;
14+
private readonly IKubeServiceBuilder _serviceBuilder;
15+
16+
private List<Service> _services = null;
17+
private readonly IDisposable _subscription;
18+
19+
public WatchKube(
20+
KubeRegistryConfiguration configuration,
21+
IOcelotLoggerFactory factory,
22+
IKubeApiClient kubeApi,
23+
IKubeServiceBuilder serviceBuilder)
24+
{
25+
_configuration = configuration;
26+
_logger = factory.CreateLogger<Kube>();
27+
_kubeApi = kubeApi;
28+
_serviceBuilder = serviceBuilder;
29+
30+
_subscription = CreateSubscription();
31+
}
32+
33+
public virtual async Task<List<Service>> GetAsync()
34+
{
35+
// need to wait for first result fetching somehow
36+
if (_services is null)
37+
{
38+
await Task.Delay(1000);
39+
}
40+
41+
if (_services is not { Count: > 0 })
42+
{
43+
_logger.LogWarning(() => GetMessage("Subscription to service endpoints gave no results!"));
44+
}
45+
46+
return _services;
47+
}
48+
49+
private IDisposable CreateSubscription() =>
50+
_kubeApi
51+
.EndpointsV1()
52+
.Watch(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace)
53+
.Subscribe(
54+
onNext: endpointEvent =>
55+
{
56+
_services = endpointEvent.EventType switch
57+
{
58+
ResourceEventType.Deleted or ResourceEventType.Error => new(),
59+
_ when (endpointEvent.Resource?.Subsets?.Count ?? 0) == 0 => new(),
60+
_ => _serviceBuilder.BuildServices(_configuration, endpointEvent.Resource).ToList(),
61+
};
62+
},
63+
onError: ex =>
64+
{
65+
// recreate subscription in case of exceptions?
66+
_logger.LogError(() => GetMessage("Endpoints subscription error occured"), ex);
67+
},
68+
onCompleted: () =>
69+
{
70+
// called only when subscription is cancelled
71+
_logger.LogWarning(() => GetMessage("Subscription to service endpoints completed"));
72+
});
73+
74+
private string GetMessage(string message)
75+
=> $"{nameof(WatchKube)} provider. Namespace:{_configuration.KubeNamespace}, Service:{_configuration.KeyOfServiceInK8s}; {message}";
76+
77+
public void Dispose() => _subscription.Dispose();
78+
}

0 commit comments

Comments
 (0)