Skip to content

Commit 6a6ba83

Browse files
committed
feat: adds random balancer
Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
1 parent 8649360 commit 6a6ba83

6 files changed

Lines changed: 219 additions & 12 deletions

File tree

CHANGELOG.md

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010
### Added
1111

1212
- Multi-endpoint support via `GreptimeClientOptions.Endpoints` (`IList<string>`).
13-
Supplying more than one endpoint enables client-side round-robin load
14-
balancing with automatic failover across endpoints. Single-element lists
15-
behave as the previous single-node case. Backed by
16-
`Grpc.Net.Client.Balancer`.
13+
Supplying more than one endpoint enables client-side load balancing with
14+
automatic failover across endpoints. Single-element lists behave as the
15+
previous single-node case. Backed by `Grpc.Net.Client.Balancer`.
16+
- `GreptimeClientOptions.LoadBalancing` (`LoadBalancingStrategy`) selects the
17+
multi-endpoint balancing policy. Supported: `Random` (default — picks a
18+
ready endpoint uniformly at random per call, avoiding the herding pattern
19+
that round-robin can produce when many short-lived clients start at the
20+
same time) and `RoundRobin`.
1721

1822
### Changed
1923

@@ -22,6 +26,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2226
end-of-support, and `Grpc.Net.Client.Balancer` (required for the new
2327
multi-endpoint client-side load balancer) is only shipped in the package's
2428
`net8.0+` build, not its `netstandard2.1` build.
29+
Users on `net6.0` / `net7.0` should pin to the `0.1.x` line, which keeps
30+
those TFMs supported.
2531

2632
### Deprecated
2733

README.md

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
[![NuGet Downloads](https://img.shields.io/nuget/dt/GreptimeDB.Ingester.svg)](https://www.nuget.org/packages/GreptimeDB.Ingester)
66
[![NuGet Grpc](https://img.shields.io/nuget/v/GreptimeDB.Ingester.Grpc.svg)](https://www.nuget.org/packages/GreptimeDB.Ingester.Grpc)
77
[![License](https://img.shields.io/badge/license-Apache%202.0-blue.svg)](LICENSE)
8-
![.NET](https://img.shields.io/badge/.NET-6.0%20%7C%207.0%20%7C%208.0%20%7C%209.0%20%7C%2010.0-purple)
8+
![.NET](https://img.shields.io/badge/.NET-8.0%20%7C%209.0%20%7C%2010.0-purple)
99

1010
.NET SDK for writing data to [GreptimeDB](https://github.com/GreptimeTeam/greptimedb).
1111

@@ -19,6 +19,13 @@
1919
dotnet add package GreptimeDB.Ingester
2020
```
2121

22+
> **.NET 6.0 / 7.0 users:** the latest version requires .NET 8.0 or newer.
23+
> Stay on the `0.1.x` line for net6.0 / net7.0 support:
24+
>
25+
> ```bash
26+
> dotnet add package GreptimeDB.Ingester --version 0.1.*
27+
> ```
28+
2229
## Quick Start
2330
2431
```csharp
@@ -76,11 +83,53 @@ var client = new GreptimeClient(new GreptimeClientOptions
7683
});
7784
```
7885
86+
## Multiple Endpoints
87+
88+
Pass more than one endpoint to enable client-side load balancing and failover
89+
across a GreptimeDB cluster:
90+
91+
```csharp
92+
var client = new GreptimeClient(new GreptimeClientOptions
93+
{
94+
Endpoints = new List<string>
95+
{
96+
"http://node-a:4001",
97+
"http://node-b:4001",
98+
"http://node-c:4001",
99+
},
100+
Database = "public",
101+
});
102+
```
103+
104+
A single-element list takes the direct-channel fast path (no balancer); two
105+
or more endpoints route through `Grpc.Net.Client.Balancer` with the configured
106+
strategy. All endpoints must share the same scheme (all `http` or all `https`)
107+
and must be plain `host:port` URIs without a path/query/fragment.
108+
109+
### Load-balancing strategy
110+
111+
`LoadBalancing` selects the policy used in the multi-endpoint case:
112+
113+
```csharp
114+
new GreptimeClientOptions
115+
{
116+
Endpoints = new List<string> { "http://node-a:4001", "http://node-b:4001" },
117+
LoadBalancing = LoadBalancingStrategy.Random, // default
118+
// LoadBalancing = LoadBalancingStrategy.RoundRobin,
119+
};
120+
```
121+
122+
- `Random` (default) — pick a ready endpoint uniformly at random per call.
123+
Avoids the herding pattern that round-robin can produce when many
124+
short-lived clients start simultaneously.
125+
- `RoundRobin` — cycle through ready endpoints in order.
126+
79127
## Features
80128
81129
- **Unary Write** - Simple single-request writes via gRPC
82130
- **Streaming Write** - High-throughput streaming via gRPC for multiple tables
83131
- **Bulk Write** - Maximum throughput via Apache Arrow Flight
132+
- Multi-endpoint client-side load balancing (random / round-robin) with failover
84133
- Type coercion between .NET and GreptimeDB types
85134
- Health check
86135
- DI integration

src/GreptimeDB.Ingester/Client/GreptimeClient.cs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public GreptimeClient(GreptimeClientOptions options, ILoggerFactory? loggerFacto
4040
_logger = _loggerFactory.CreateLogger<GreptimeClient>();
4141

4242
var endpoints = options.ResolveEndpoints();
43-
(_channel, _channelServices) = BuildChannel(endpoints);
43+
(_channel, _channelServices) = BuildChannel(endpoints, options.LoadBalancing);
4444
_client = new GreptimeDatabase.GreptimeDatabaseClient(_channel);
4545
_healthClient = new HealthCheck.HealthCheckClient(_channel);
4646
_flightClient = new Lazy<FlightClient>(() => new FlightClient(_channel));
@@ -358,7 +358,9 @@ private CallOptions CreateCallOptions(CancellationToken cancellationToken)
358358
cancellationToken: cancellationToken);
359359
}
360360

361-
private static (GrpcChannel Channel, ServiceProvider? Services) BuildChannel(IReadOnlyList<string> endpoints)
361+
private static (GrpcChannel Channel, ServiceProvider? Services) BuildChannel(
362+
IReadOnlyList<string> endpoints,
363+
LoadBalancingStrategy strategy)
362364
{
363365
// Single endpoint: skip the balancer entirely. This preserves the original
364366
// direct-channel behavior, including default TLS authority/SNI handling for
@@ -380,14 +382,25 @@ private static (GrpcChannel Channel, ServiceProvider? Services) BuildChannel(IRe
380382

381383
var services = new ServiceCollection();
382384
services.AddSingleton<ResolverFactory>(new StaticResolverFactory(addresses));
385+
if (strategy == LoadBalancingStrategy.Random)
386+
{
387+
services.AddSingleton<LoadBalancerFactory>(RandomBalancerFactory.Instance);
388+
}
383389
var serviceProvider = services.BuildServiceProvider();
384390

391+
LoadBalancingConfig lbConfig = strategy switch
392+
{
393+
LoadBalancingStrategy.Random => new RandomConfig(),
394+
LoadBalancingStrategy.RoundRobin => new RoundRobinConfig(),
395+
_ => throw new ArgumentOutOfRangeException(nameof(strategy), strategy, "Unsupported load-balancing strategy."),
396+
};
397+
385398
var channelOptions = new GrpcChannelOptions
386399
{
387400
ServiceProvider = serviceProvider,
388401
ServiceConfig = new ServiceConfig
389402
{
390-
LoadBalancingConfigs = { new RoundRobinConfig() }
403+
LoadBalancingConfigs = { lbConfig }
391404
},
392405
Credentials = scheme == Uri.UriSchemeHttps
393406
? ChannelCredentials.SecureSsl

src/GreptimeDB.Ingester/Client/GreptimeClientOptions.cs

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@ public sealed class GreptimeClientOptions
4343
/// </summary>
4444
public TimeSpan WriteTimeout { get; set; } = TimeSpan.FromSeconds(30);
4545

46+
/// <summary>
47+
/// Gets or sets the load-balancing strategy used when multiple
48+
/// <see cref="Endpoints"/> are configured. Ignored for the single-endpoint
49+
/// case, which always uses a direct channel. Defaults to
50+
/// <see cref="LoadBalancingStrategy.Random"/>.
51+
/// </summary>
52+
public LoadBalancingStrategy LoadBalancing { get; set; } = LoadBalancingStrategy.Random;
53+
4654
/// <summary>
4755
/// Returns the effective endpoint list: trimmed, non-whitespace entries of
4856
/// <see cref="Endpoints"/> when the caller populated that list. Falls back
@@ -77,11 +85,15 @@ public void Validate()
7785
var endpoints = ResolveEndpoints();
7886
if (endpoints.Count == 0)
7987
{
80-
var allWhitespace = Endpoints != null && Endpoints.Count > 0;
88+
if (Endpoints != null && Endpoints.Count > 0)
89+
{
90+
throw new ArgumentException(
91+
"Endpoints was set but contained no non-whitespace entries.",
92+
nameof(Endpoints));
93+
}
94+
8195
throw new ArgumentException(
82-
allWhitespace
83-
? "Endpoints was set but contained no non-whitespace entries."
84-
: "At least one endpoint is required (set Endpoints).",
96+
"At least one endpoint is required. Set Endpoints; the deprecated Endpoint property is empty or whitespace.",
8597
nameof(Endpoints));
8698
}
8799

@@ -102,6 +114,15 @@ public void Validate()
102114
nameof(Endpoints));
103115
}
104116

117+
// Reject path/query/fragment: the multi-endpoint balancer uses host:port only
118+
// (BalancerAddress), so a path would silently diverge from the single-endpoint case.
119+
if (uri.AbsolutePath is not ("" or "/") || !string.IsNullOrEmpty(uri.Query) || !string.IsNullOrEmpty(uri.Fragment))
120+
{
121+
throw new ArgumentException(
122+
$"Endpoint '{endpoint}' must be a host:port URI without a path, query, or fragment.",
123+
nameof(Endpoints));
124+
}
125+
105126
firstScheme ??= uri.Scheme;
106127
if (!string.Equals(uri.Scheme, firstScheme, StringComparison.Ordinal))
107128
{
@@ -130,6 +151,25 @@ public void Validate()
130151
}
131152
}
132153

154+
/// <summary>
155+
/// Client-side load-balancing strategy used when multiple endpoints are
156+
/// configured.
157+
/// </summary>
158+
public enum LoadBalancingStrategy
159+
{
160+
/// <summary>
161+
/// Pick a ready endpoint uniformly at random for each call. Default.
162+
/// Avoids the lock-step herding pattern that round-robin can produce when
163+
/// many short-lived clients start at the same time.
164+
/// </summary>
165+
Random = 0,
166+
167+
/// <summary>
168+
/// Cycle through ready endpoints in order.
169+
/// </summary>
170+
RoundRobin = 1,
171+
}
172+
133173
/// <summary>
134174
/// Authentication options for GreptimeDB.
135175
/// </summary>
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
using Grpc.Net.Client.Balancer;
2+
using Grpc.Net.Client.Configuration;
3+
using Microsoft.Extensions.Logging;
4+
5+
namespace GreptimeDB.Ingester.Client;
6+
7+
/// <summary>
8+
/// <see cref="LoadBalancerFactory"/> for the <c>random</c> policy: each pick selects
9+
/// uniformly at random from the ready subchannels. Spreads load with no shared state
10+
/// or coordination, which avoids the lock-step herding pattern that round-robin can
11+
/// produce when many short-lived clients start at the same time.
12+
/// </summary>
13+
internal sealed class RandomBalancerFactory : LoadBalancerFactory
14+
{
15+
public static readonly RandomBalancerFactory Instance = new();
16+
17+
public override string Name => RandomConfig.Name;
18+
19+
public override LoadBalancer Create(LoadBalancerOptions options)
20+
{
21+
return new RandomBalancer(options.Controller, options.LoggerFactory);
22+
}
23+
}
24+
25+
internal sealed class RandomConfig : LoadBalancingConfig
26+
{
27+
public const string Name = "random";
28+
29+
public RandomConfig() : base(Name) { }
30+
}
31+
32+
internal sealed class RandomBalancer : SubchannelsLoadBalancer
33+
{
34+
public RandomBalancer(IChannelControlHelper controller, ILoggerFactory loggerFactory)
35+
: base(controller, loggerFactory) { }
36+
37+
protected override SubchannelPicker CreatePicker(IReadOnlyList<Subchannel> readySubchannels)
38+
{
39+
return new RandomPicker(readySubchannels);
40+
}
41+
}
42+
43+
internal sealed class RandomPicker : SubchannelPicker
44+
{
45+
private readonly IReadOnlyList<Subchannel> _subchannels;
46+
47+
public RandomPicker(IReadOnlyList<Subchannel> subchannels)
48+
{
49+
_subchannels = subchannels;
50+
}
51+
52+
public override PickResult Pick(PickContext context)
53+
{
54+
// Random.Shared is thread-safe; pickers are invoked concurrently from many RPCs.
55+
var index = Random.Shared.Next(_subchannels.Count);
56+
return PickResult.ForSubchannel(_subchannels[index]);
57+
}
58+
}

tests/GreptimeDB.Ingester.Tests/GreptimeClientOptionsTests.cs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@ public void Validate_DefaultOptions_Passes()
1616
act.Should().NotThrow();
1717
}
1818

19+
[Fact]
20+
public void LoadBalancing_DefaultsToRandom()
21+
{
22+
new GreptimeClientOptions().LoadBalancing.Should().Be(LoadBalancingStrategy.Random);
23+
}
24+
1925
[Fact]
2026
public void Validate_SingleEndpoint_Passes()
2127
{
@@ -141,6 +147,41 @@ public void Validate_EndpointsAllWhitespace_ThrowsAndDoesNotFallBack()
141147
act.Should().Throw<ArgumentException>().WithMessage("*no non-whitespace*");
142148
}
143149

150+
[Theory]
151+
[InlineData(LoadBalancingStrategy.Random)]
152+
[InlineData(LoadBalancingStrategy.RoundRobin)]
153+
public void Constructor_MultiEndpoint_AcceptsAllStrategies(LoadBalancingStrategy strategy)
154+
{
155+
var options = new GreptimeClientOptions
156+
{
157+
Endpoints = new List<string> { "http://host-a:4001", "http://host-b:4001" },
158+
LoadBalancing = strategy,
159+
};
160+
161+
var act = () =>
162+
{
163+
using var client = new GreptimeClient(options);
164+
};
165+
166+
act.Should().NotThrow();
167+
}
168+
169+
[Theory]
170+
[InlineData("http://host:4001/v1")]
171+
[InlineData("http://host:4001/?foo=1")]
172+
[InlineData("http://host:4001/#frag")]
173+
public void Validate_EndpointWithPathQueryOrFragment_Throws(string endpoint)
174+
{
175+
var options = new GreptimeClientOptions
176+
{
177+
Endpoints = new List<string> { endpoint }
178+
};
179+
180+
var act = () => options.Validate();
181+
182+
act.Should().Throw<ArgumentException>().WithMessage("*without a path, query, or fragment*");
183+
}
184+
144185
[Fact]
145186
public void ResolveEndpoints_FiltersWhitespaceAndTrims()
146187
{

0 commit comments

Comments
 (0)