Skip to content

Commit 04865ba

Browse files
committed
feat(sources): add gRPC max connection age
1 parent 5d41252 commit 04865ba

8 files changed

Lines changed: 567 additions & 18 deletions

File tree

src/components/validation/runner/io.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::{
2222
Client as VectorClient, HealthCheckRequest, HealthCheckResponse, PushEventsRequest,
2323
PushEventsResponse, Server as VectorServer, Service as VectorService, ServingStatus,
2424
},
25-
sources::util::grpc::run_grpc_server,
25+
sources::util::grpc::{GrpcKeepaliveConfig, run_grpc_server},
2626
};
2727

2828
#[derive(Clone)]
@@ -166,6 +166,7 @@ pub fn spawn_grpc_server<S>(
166166
listen_addr.as_socket_addr(),
167167
tls_settings,
168168
service,
169+
GrpcKeepaliveConfig::default(),
169170
shutdown_signal,
170171
);
171172
pin!(server);

src/sources/opentelemetry/config.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::{
1414
grpc::Service,
1515
http::{build_warp_filter, run_http_server},
1616
},
17-
util::grpc::run_grpc_server_with_routes,
17+
util::grpc::{GrpcKeepaliveConfig, run_grpc_server_with_routes},
1818
},
1919
};
2020
use futures::FutureExt;
@@ -173,12 +173,17 @@ pub struct GrpcConfig {
173173
#[configurable(derived)]
174174
#[serde(default, skip_serializing_if = "Option::is_none")]
175175
pub tls: Option<TlsEnableableConfig>,
176+
177+
#[configurable(derived)]
178+
#[serde(default)]
179+
pub keepalive: GrpcKeepaliveConfig,
176180
}
177181

178182
fn example_grpc_config() -> GrpcConfig {
179183
GrpcConfig {
180184
address: "0.0.0.0:4317".parse().unwrap(),
181185
tls: None,
186+
keepalive: GrpcKeepaliveConfig::default(),
182187
}
183188
}
184189

@@ -325,6 +330,7 @@ impl SourceConfig for OpentelemetryConfig {
325330
self.grpc.address,
326331
grpc_tls_settings,
327332
builder.routes(),
333+
self.grpc.keepalive.clone(),
328334
cx.shutdown.clone(),
329335
)
330336
.map_err(|error| {

src/sources/opentelemetry/integration_tests.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ async fn receive_logs_legacy_namespace() {
5353
grpc: GrpcConfig {
5454
address: source_grpc_address().parse().unwrap(),
5555
tls: Default::default(),
56+
keepalive: Default::default(),
5657
},
5758
http: HttpConfig {
5859
address: source_http_address().parse().unwrap(),
@@ -152,6 +153,7 @@ async fn receive_trace() {
152153
grpc: GrpcConfig {
153154
address: source_grpc_address().parse().unwrap(),
154155
tls: Default::default(),
156+
keepalive: Default::default(),
155157
},
156158
http: HttpConfig {
157159
address: source_http_address().parse().unwrap(),
@@ -257,6 +259,7 @@ async fn receive_metric() {
257259
grpc: GrpcConfig {
258260
address: source_grpc_address().parse().unwrap(),
259261
tls: Default::default(),
262+
keepalive: Default::default(),
260263
},
261264
http: HttpConfig {
262265
address: source_http_address().parse().unwrap(),

src/sources/opentelemetry/tests.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,30 @@ fn generate_config() {
206206
test_util::test_generate_config::<OpentelemetryConfig>();
207207
}
208208

209+
#[test]
210+
fn config_grpc_keepalive() {
211+
let config: OpentelemetryConfig = toml::from_str(
212+
r#"
213+
[grpc]
214+
address = "0.0.0.0:4317"
215+
216+
[grpc.keepalive]
217+
max_connection_age_secs = 300
218+
max_connection_age_grace_secs = 30
219+
220+
[http]
221+
address = "0.0.0.0:4318"
222+
"#,
223+
)
224+
.unwrap();
225+
226+
assert_eq!(config.grpc.keepalive.max_connection_age_secs, Some(300));
227+
assert_eq!(
228+
config.grpc.keepalive.max_connection_age_grace_secs,
229+
Some(30)
230+
);
231+
}
232+
209233
#[tokio::test]
210234
async fn receive_grpc_logs_vector_namespace() {
211235
assert_source_compliance(&SOURCE_TAGS, async {
@@ -1175,6 +1199,7 @@ fn get_source_config_with_headers(
11751199
grpc: GrpcConfig {
11761200
address: grpc_addr,
11771201
tls: Default::default(),
1202+
keepalive: Default::default(),
11781203
},
11791204
http: HttpConfig {
11801205
address: http_addr,
@@ -1510,6 +1535,7 @@ pub async fn build_otlp_test_env(
15101535
grpc: GrpcConfig {
15111536
address: grpc_addr,
15121537
tls: Default::default(),
1538+
keepalive: Default::default(),
15131539
},
15141540
http: HttpConfig {
15151541
address: http_addr,
@@ -1589,6 +1615,7 @@ async fn http_logs_use_otlp_decoding_emits_metric() {
15891615
grpc: GrpcConfig {
15901616
address: grpc_addr,
15911617
tls: Default::default(),
1618+
keepalive: Default::default(),
15921619
},
15931620
http: HttpConfig {
15941621
address: http_addr,
@@ -1823,6 +1850,7 @@ mod otlp_decoding_config_tests {
18231850
grpc: GrpcConfig {
18241851
address: "0.0.0.0:4317".parse().unwrap(),
18251852
tls: None,
1853+
keepalive: Default::default(),
18261854
},
18271855
http: HttpConfig {
18281856
address: "0.0.0.0:4318".parse().unwrap(),
@@ -1863,6 +1891,7 @@ mod otlp_decoding_config_tests {
18631891
grpc: GrpcConfig {
18641892
address: "0.0.0.0:4317".parse().unwrap(),
18651893
tls: None,
1894+
keepalive: Default::default(),
18661895
},
18671896
http: HttpConfig {
18681897
address: "0.0.0.0:4318".parse().unwrap(),
@@ -1906,6 +1935,7 @@ mod otlp_decoding_config_tests {
19061935
grpc: GrpcConfig {
19071936
address: "0.0.0.0:4317".parse().unwrap(),
19081937
tls: None,
1938+
keepalive: Default::default(),
19091939
},
19101940
http: HttpConfig {
19111941
address: "0.0.0.0:4318".parse().unwrap(),

0 commit comments

Comments
 (0)