Skip to content

Commit e9ac53e

Browse files
committed
feat(sources): add gRPC max connection age
1 parent 5d41252 commit e9ac53e

8 files changed

Lines changed: 326 additions & 15 deletions

File tree

src/components/validation/runner/io.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::{
2222
Client as VectorClient, HealthCheckRequest, HealthCheckResponse, PushEventsRequest,
2323
PushEventsResponse, Server as VectorServer, Service as VectorService, ServingStatus,
2424
},
25-
sources::util::grpc::run_grpc_server,
25+
sources::util::grpc::{GrpcKeepaliveConfig, run_grpc_server},
2626
};
2727

2828
#[derive(Clone)]
@@ -166,6 +166,7 @@ pub fn spawn_grpc_server<S>(
166166
listen_addr.as_socket_addr(),
167167
tls_settings,
168168
service,
169+
GrpcKeepaliveConfig::default(),
169170
shutdown_signal,
170171
);
171172
pin!(server);

src/sources/opentelemetry/config.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::{
1414
grpc::Service,
1515
http::{build_warp_filter, run_http_server},
1616
},
17-
util::grpc::run_grpc_server_with_routes,
17+
util::grpc::{GrpcKeepaliveConfig, run_grpc_server_with_routes},
1818
},
1919
};
2020
use futures::FutureExt;
@@ -173,12 +173,17 @@ pub struct GrpcConfig {
173173
#[configurable(derived)]
174174
#[serde(default, skip_serializing_if = "Option::is_none")]
175175
pub tls: Option<TlsEnableableConfig>,
176+
177+
#[configurable(derived)]
178+
#[serde(default)]
179+
pub keepalive: GrpcKeepaliveConfig,
176180
}
177181

178182
fn example_grpc_config() -> GrpcConfig {
179183
GrpcConfig {
180184
address: "0.0.0.0:4317".parse().unwrap(),
181185
tls: None,
186+
keepalive: GrpcKeepaliveConfig::default(),
182187
}
183188
}
184189

@@ -325,6 +330,7 @@ impl SourceConfig for OpentelemetryConfig {
325330
self.grpc.address,
326331
grpc_tls_settings,
327332
builder.routes(),
333+
self.grpc.keepalive.clone(),
328334
cx.shutdown.clone(),
329335
)
330336
.map_err(|error| {

src/sources/opentelemetry/integration_tests.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ async fn receive_logs_legacy_namespace() {
5353
grpc: GrpcConfig {
5454
address: source_grpc_address().parse().unwrap(),
5555
tls: Default::default(),
56+
keepalive: Default::default(),
5657
},
5758
http: HttpConfig {
5859
address: source_http_address().parse().unwrap(),
@@ -152,6 +153,7 @@ async fn receive_trace() {
152153
grpc: GrpcConfig {
153154
address: source_grpc_address().parse().unwrap(),
154155
tls: Default::default(),
156+
keepalive: Default::default(),
155157
},
156158
http: HttpConfig {
157159
address: source_http_address().parse().unwrap(),
@@ -257,6 +259,7 @@ async fn receive_metric() {
257259
grpc: GrpcConfig {
258260
address: source_grpc_address().parse().unwrap(),
259261
tls: Default::default(),
262+
keepalive: Default::default(),
260263
},
261264
http: HttpConfig {
262265
address: source_http_address().parse().unwrap(),

src/sources/opentelemetry/tests.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,30 @@ fn generate_config() {
206206
test_util::test_generate_config::<OpentelemetryConfig>();
207207
}
208208

209+
#[test]
210+
fn config_grpc_keepalive() {
211+
let config: OpentelemetryConfig = toml::from_str(
212+
r#"
213+
[grpc]
214+
address = "0.0.0.0:4317"
215+
216+
[grpc.keepalive]
217+
max_connection_age_secs = 300
218+
max_connection_age_grace_secs = 30
219+
220+
[http]
221+
address = "0.0.0.0:4318"
222+
"#,
223+
)
224+
.unwrap();
225+
226+
assert_eq!(config.grpc.keepalive.max_connection_age_secs, Some(300));
227+
assert_eq!(
228+
config.grpc.keepalive.max_connection_age_grace_secs,
229+
Some(30)
230+
);
231+
}
232+
209233
#[tokio::test]
210234
async fn receive_grpc_logs_vector_namespace() {
211235
assert_source_compliance(&SOURCE_TAGS, async {
@@ -1175,6 +1199,7 @@ fn get_source_config_with_headers(
11751199
grpc: GrpcConfig {
11761200
address: grpc_addr,
11771201
tls: Default::default(),
1202+
keepalive: Default::default(),
11781203
},
11791204
http: HttpConfig {
11801205
address: http_addr,
@@ -1510,6 +1535,7 @@ pub async fn build_otlp_test_env(
15101535
grpc: GrpcConfig {
15111536
address: grpc_addr,
15121537
tls: Default::default(),
1538+
keepalive: Default::default(),
15131539
},
15141540
http: HttpConfig {
15151541
address: http_addr,
@@ -1589,6 +1615,7 @@ async fn http_logs_use_otlp_decoding_emits_metric() {
15891615
grpc: GrpcConfig {
15901616
address: grpc_addr,
15911617
tls: Default::default(),
1618+
keepalive: Default::default(),
15921619
},
15931620
http: HttpConfig {
15941621
address: http_addr,
@@ -1823,6 +1850,7 @@ mod otlp_decoding_config_tests {
18231850
grpc: GrpcConfig {
18241851
address: "0.0.0.0:4317".parse().unwrap(),
18251852
tls: None,
1853+
keepalive: Default::default(),
18261854
},
18271855
http: HttpConfig {
18281856
address: "0.0.0.0:4318".parse().unwrap(),
@@ -1863,6 +1891,7 @@ mod otlp_decoding_config_tests {
18631891
grpc: GrpcConfig {
18641892
address: "0.0.0.0:4317".parse().unwrap(),
18651893
tls: None,
1894+
keepalive: Default::default(),
18661895
},
18671896
http: HttpConfig {
18681897
address: "0.0.0.0:4318".parse().unwrap(),
@@ -1906,6 +1935,7 @@ mod otlp_decoding_config_tests {
19061935
grpc: GrpcConfig {
19071936
address: "0.0.0.0:4317".parse().unwrap(),
19081937
tls: None,
1938+
keepalive: Default::default(),
19091939
},
19101940
http: HttpConfig {
19111941
address: "0.0.0.0:4318".parse().unwrap(),

src/sources/util/grpc/mod.rs

Lines changed: 135 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,23 @@
1-
use std::{convert::Infallible, net::SocketAddr, time::Duration};
1+
use std::{
2+
convert::Infallible,
3+
net::SocketAddr,
4+
pin::Pin,
5+
task::{Context, Poll},
6+
time::Duration,
7+
};
28

3-
use futures::FutureExt;
9+
use futures::{FutureExt, StreamExt};
410
use http::{Request, Response};
511
use hyper::Body;
12+
use tokio::{
13+
io::{AsyncRead, AsyncWrite, ReadBuf},
14+
net::TcpStream,
15+
time::{Sleep, sleep},
16+
};
617
use tonic::{
718
body::BoxBody,
819
server::NamedService,
9-
transport::server::{Routes, Server},
20+
transport::server::{Connected, Routes, Server},
1021
};
1122
use tower::Service;
1223
use tower_http::{
@@ -18,16 +29,127 @@ use tracing::Span;
1829
use crate::{
1930
internal_events::{GrpcServerRequestReceived, GrpcServerResponseSent},
2031
shutdown::{ShutdownSignal, ShutdownSignalToken},
21-
tls::MaybeTlsSettings,
32+
tls::{MaybeTlsIncomingStream, MaybeTlsSettings},
2233
};
34+
use vector_lib::configurable::configurable_component;
2335

2436
mod decompression;
2537
pub use self::decompression::{DecompressionAndMetrics, DecompressionAndMetricsLayer};
2638

39+
/// Configuration of gRPC server keepalive parameters.
40+
#[configurable_component]
41+
#[derive(Clone, Debug, Default, PartialEq, Eq)]
42+
#[serde(deny_unknown_fields)]
43+
pub struct GrpcKeepaliveConfig {
44+
/// The maximum amount of time a connection may exist before the server closes it.
45+
///
46+
/// When unset, connections are not closed based on age.
47+
#[serde(default)]
48+
#[configurable(metadata(docs::examples = 300))]
49+
#[configurable(metadata(docs::type_unit = "seconds"))]
50+
#[configurable(metadata(docs::human_name = "Maximum Connection Age"))]
51+
pub max_connection_age_secs: Option<u64>,
52+
53+
/// The grace period added to `max_connection_age_secs` before the server closes the connection.
54+
///
55+
/// This setting only applies when `max_connection_age_secs` is set.
56+
#[serde(default)]
57+
#[configurable(metadata(docs::examples = 30))]
58+
#[configurable(metadata(docs::type_unit = "seconds"))]
59+
#[configurable(metadata(docs::human_name = "Maximum Connection Age Grace"))]
60+
pub max_connection_age_grace_secs: Option<u64>,
61+
}
62+
63+
impl GrpcKeepaliveConfig {
64+
fn max_connection_lifetime(&self) -> Option<Duration> {
65+
self.max_connection_age_secs.map(|max_connection_age_secs| {
66+
let age = Duration::from_secs(max_connection_age_secs);
67+
let grace = self
68+
.max_connection_age_grace_secs
69+
.map(Duration::from_secs)
70+
.unwrap_or_default();
71+
72+
age.checked_add(grace).unwrap_or(Duration::MAX)
73+
})
74+
}
75+
}
76+
77+
struct MaxConnectionAgeIo {
78+
inner: MaybeTlsIncomingStream<TcpStream>,
79+
deadline: Option<Pin<Box<Sleep>>>,
80+
}
81+
82+
impl MaxConnectionAgeIo {
83+
fn new(inner: MaybeTlsIncomingStream<TcpStream>, lifetime: Option<Duration>) -> Self {
84+
Self {
85+
inner,
86+
deadline: lifetime.map(|lifetime| Box::pin(sleep(lifetime))),
87+
}
88+
}
89+
90+
fn is_expired(&mut self, cx: &mut Context<'_>) -> bool {
91+
self.deadline
92+
.as_mut()
93+
.is_some_and(|deadline| deadline.as_mut().poll(cx).is_ready())
94+
}
95+
}
96+
97+
impl AsyncRead for MaxConnectionAgeIo {
98+
fn poll_read(
99+
self: Pin<&mut Self>,
100+
cx: &mut Context<'_>,
101+
buf: &mut ReadBuf<'_>,
102+
) -> Poll<std::io::Result<()>> {
103+
let this = self.get_mut();
104+
if this.is_expired(cx) {
105+
Poll::Ready(Ok(()))
106+
} else {
107+
Pin::new(&mut this.inner).poll_read(cx, buf)
108+
}
109+
}
110+
}
111+
112+
impl AsyncWrite for MaxConnectionAgeIo {
113+
fn poll_write(
114+
self: Pin<&mut Self>,
115+
cx: &mut Context<'_>,
116+
buf: &[u8],
117+
) -> Poll<std::io::Result<usize>> {
118+
let this = self.get_mut();
119+
if this.is_expired(cx) {
120+
Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()))
121+
} else {
122+
Pin::new(&mut this.inner).poll_write(cx, buf)
123+
}
124+
}
125+
126+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
127+
let this = self.get_mut();
128+
if this.is_expired(cx) {
129+
Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()))
130+
} else {
131+
Pin::new(&mut this.inner).poll_flush(cx)
132+
}
133+
}
134+
135+
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
136+
Pin::new(&mut self.get_mut().inner).poll_shutdown(cx)
137+
}
138+
}
139+
140+
impl Connected for MaxConnectionAgeIo {
141+
type ConnectInfo = <MaybeTlsIncomingStream<TcpStream> as Connected>::ConnectInfo;
142+
143+
fn connect_info(&self) -> Self::ConnectInfo {
144+
self.inner.connect_info()
145+
}
146+
}
147+
27148
pub async fn run_grpc_server<S>(
28149
address: SocketAddr,
29150
tls_settings: MaybeTlsSettings,
30151
service: S,
152+
keepalive: GrpcKeepaliveConfig,
31153
shutdown: ShutdownSignal,
32154
) -> crate::Result<()>
33155
where
@@ -41,7 +163,10 @@ where
41163
let span = Span::current();
42164
let (tx, rx) = tokio::sync::oneshot::channel::<ShutdownSignalToken>();
43165
let listener = tls_settings.bind(&address).await?;
44-
let stream = listener.accept_stream();
166+
let max_connection_lifetime = keepalive.max_connection_lifetime();
167+
let stream = listener
168+
.accept_stream()
169+
.map(move |stream| stream.map(|io| MaxConnectionAgeIo::new(io, max_connection_lifetime)));
45170

46171
info!(%address, "Building gRPC server.");
47172

@@ -72,12 +197,16 @@ pub async fn run_grpc_server_with_routes(
72197
address: SocketAddr,
73198
tls_settings: MaybeTlsSettings,
74199
routes: Routes,
200+
keepalive: GrpcKeepaliveConfig,
75201
shutdown: ShutdownSignal,
76202
) -> crate::Result<()> {
77203
let span = Span::current();
78204
let (tx, rx) = tokio::sync::oneshot::channel::<ShutdownSignalToken>();
79205
let listener = tls_settings.bind(&address).await?;
80-
let stream = listener.accept_stream();
206+
let max_connection_lifetime = keepalive.max_connection_lifetime();
207+
let stream = listener
208+
.accept_stream()
209+
.map(move |stream| stream.map(|io| MaxConnectionAgeIo::new(io, max_connection_lifetime)));
81210

82211
info!(%address, "Building gRPC server.");
83212

0 commit comments

Comments
 (0)