Skip to content

Commit 31ade89

Browse files
authored
Generators obey throttle wait_for errors (#1317)
### What does this PR do? This commit fixes a long-standing mystery: why when throughputs are low does lading _sometimes_ emit faster than it should. It turns out the answer is we failed to handle the capacity error that the throttle kicks out in such conditions systematically. This also implies that unless the user explicitly sets a max block size then we should configure it for them, so the default of a flat 1MiB does not serve well. I'll fix that in a follow-on PR.
1 parent 68b7642 commit 31ade89

File tree

9 files changed

+234
-173
lines changed

9 files changed

+234
-173
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

7+
## [Unreleased]
8+
## Fixed
9+
- Fixed throttle capacity validation to prevent requests larger than maximum
10+
capacity from being emitted.
11+
712
## [0.25.8]
813
## Fixed
914
- Lading will now ignore child processes when polling /proc if the children are

lading/src/generator/file_gen/logrotate.rs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -368,17 +368,24 @@ impl Child {
368368
let blk = rcv.peek().await.expect("block cache should never be empty");
369369

370370
tokio::select! {
371-
_ = self.throttle.wait_for(blk.total_bytes) => {
372-
let blk = rcv.next().await.expect("failed to advance through the blocks");
373-
write_bytes(&blk,
374-
&mut fp,
375-
&mut total_bytes_written,
376-
bytes_per_second,
377-
total_names,
378-
maximum_bytes_per_log,
379-
&self.names,
380-
last_name,
381-
&self.labels).await?;
371+
result = self.throttle.wait_for(blk.total_bytes) => {
372+
match result {
373+
Ok(()) => {
374+
let blk = rcv.next().await.expect("failed to advance through the blocks");
375+
write_bytes(&blk,
376+
&mut fp,
377+
&mut total_bytes_written,
378+
bytes_per_second,
379+
total_names,
380+
maximum_bytes_per_log,
381+
&self.names,
382+
last_name,
383+
&self.labels).await?;
384+
}
385+
Err(err) => {
386+
error!("Throttle request of {} is larger than throttle capacity. Block will be discarded. Error: {}", blk.total_bytes, err);
387+
}
388+
}
382389
}
383390
() = &mut shutdown_wait => {
384391
fp.flush().await.map_err(|err| Error::IoFlush { err })?;
@@ -387,7 +394,6 @@ impl Child {
387394
info!("shutdown signal received");
388395
return Ok(());
389396
},
390-
391397
}
392398
}
393399
}

lading/src/generator/file_gen/traditional.rs

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use tokio::{
3535
sync::mpsc,
3636
task::{JoinError, JoinHandle},
3737
};
38-
use tracing::info;
38+
use tracing::{error, info};
3939

4040
use crate::common::PeekableReceiver;
4141
use lading_payload::{
@@ -263,38 +263,45 @@ impl Child {
263263
let total_bytes = blk.total_bytes;
264264

265265
tokio::select! {
266-
_ = self.throttle.wait_for(total_bytes) => {
267-
let blk = rcv.next().await.expect("failed to advance through blocks"); // actually advance through the blocks
268-
let total_bytes = u64::from(total_bytes.get());
266+
result = self.throttle.wait_for(total_bytes) => {
267+
match result {
268+
Ok(()) => {
269+
let blk = rcv.next().await.expect("failed to advance through blocks"); // actually advance through the blocks
270+
let total_bytes = u64::from(total_bytes.get());
269271

270-
{
271-
fp.write_all(&blk.bytes).await?;
272-
counter!("bytes_written").increment(total_bytes);
273-
total_bytes_written += total_bytes;
274-
}
272+
{
273+
fp.write_all(&blk.bytes).await?;
274+
counter!("bytes_written").increment(total_bytes);
275+
total_bytes_written += total_bytes;
276+
}
275277

276-
if total_bytes_written > maximum_bytes_per_file {
277-
fp.flush().await?;
278-
if self.rotate {
279-
// Delete file, leaving any open file handlers intact. This
280-
// includes our own `fp` for the time being.
281-
fs::remove_file(&path).await?;
278+
if total_bytes_written > maximum_bytes_per_file {
279+
fp.flush().await?;
280+
if self.rotate {
281+
// Delete file, leaving any open file handlers intact. This
282+
// includes our own `fp` for the time being.
283+
fs::remove_file(&path).await?;
284+
}
285+
// Update `path` to point to the next indexed file.
286+
file_index = self.file_index.fetch_add(1, Ordering::Relaxed);
287+
path = path_from_template(&self.path_template, file_index);
288+
// Open a new fp to `path`, replacing `fp`. Any holders of the
289+
// file pointer still have it but the file no longer has a name.
290+
fp = BufWriter::with_capacity(
291+
bytes_per_second,
292+
fs::OpenOptions::new()
293+
.create(true)
294+
.truncate(false)
295+
.write(true)
296+
.open(&path)
297+
.await?,
298+
);
299+
total_bytes_written = 0;
300+
}
301+
}
302+
Err(err) => {
303+
error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}");
282304
}
283-
// Update `path` to point to the next indexed file.
284-
file_index = self.file_index.fetch_add(1, Ordering::Relaxed);
285-
path = path_from_template(&self.path_template, file_index);
286-
// Open a new fp to `path`, replacing `fp`. Any holders of the
287-
// file pointer still have it but the file no longer has a name.
288-
fp = BufWriter::with_capacity(
289-
bytes_per_second,
290-
fs::OpenOptions::new()
291-
.create(true)
292-
.truncate(false)
293-
.write(true)
294-
.open(&path)
295-
.await?,
296-
);
297-
total_bytes_written = 0;
298305
}
299306
}
300307
() = &mut shutdown_wait => {

lading/src/generator/http.rs

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use once_cell::sync::OnceCell;
2222
use rand::{SeedableRng, prelude::StdRng};
2323
use serde::{Deserialize, Serialize};
2424
use tokio::sync::{Semaphore, mpsc};
25-
use tracing::info;
25+
use tracing::{error, info};
2626

2727
use crate::common::PeekableReceiver;
2828
use lading_payload::block::{self, Block};
@@ -227,30 +227,38 @@ impl Http {
227227
}
228228

229229
tokio::select! {
230-
_ = self.throttle.wait_for(total_bytes) => {
231-
let client = client.clone();
232-
let labels = labels.clone();
230+
result = self.throttle.wait_for(total_bytes) => {
231+
match result {
232+
Ok(()) => {
233+
let client = client.clone();
234+
let labels = labels.clone();
233235

234-
let permit = CONNECTION_SEMAPHORE.get().expect("Connection Semaphore is being initialized or cell is empty").acquire().await.expect("Connection Semaphore has already closed");
235-
tokio::spawn(async move {
236-
counter!("requests_sent", &labels).increment(1);
237-
match client.request(request).await {
238-
Ok(response) => {
239-
counter!("bytes_written", &labels).increment(block_length as u64);
240-
let status = response.status();
241-
let mut status_labels = labels.clone();
242-
status_labels
243-
.push(("status_code".to_string(), status.as_u16().to_string()));
244-
counter!("request_ok", &status_labels).increment(1);
245-
}
246-
Err(err) => {
247-
let mut error_labels = labels.clone();
248-
error_labels.push(("error".to_string(), err.to_string()));
249-
counter!("request_failure", &error_labels).increment(1);
250-
}
236+
let permit = CONNECTION_SEMAPHORE.get().expect("Connection Semaphore is being initialized or cell is empty").acquire().await.expect("Connection Semaphore has already closed");
237+
tokio::spawn(async move {
238+
counter!("requests_sent", &labels).increment(1);
239+
match client.request(request).await {
240+
Ok(response) => {
241+
counter!("bytes_written", &labels).increment(block_length as u64);
242+
let status = response.status();
243+
let mut status_labels = labels.clone();
244+
status_labels
245+
.push(("status_code".to_string(), status.as_u16().to_string()));
246+
counter!("request_ok", &status_labels).increment(1);
247+
}
248+
Err(err) => {
249+
let mut error_labels = labels.clone();
250+
error_labels.push(("error".to_string(), err.to_string()));
251+
counter!("request_failure", &error_labels).increment(1);
252+
}
253+
}
254+
drop(permit);
255+
});
256+
257+
}
258+
Err(err) => {
259+
error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}");
251260
}
252-
drop(permit);
253-
});
261+
}
254262
},
255263
() = &mut shutdown_wait => {
256264
info!("shutdown signal received");

lading/src/generator/splunk_hec.rs

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use tokio::{
3939
sync::{Semaphore, SemaphorePermit, mpsc},
4040
time::timeout,
4141
};
42-
use tracing::info;
42+
use tracing::{error, info};
4343

4444
use crate::{common::PeekableReceiver, generator::splunk_hec::acknowledgements::Channel};
4545
use lading_payload::block::{self, Block};
@@ -282,30 +282,37 @@ impl SplunkHec {
282282
let total_bytes = blk.total_bytes;
283283

284284
tokio::select! {
285-
_ = self.throttle.wait_for(total_bytes) => {
286-
let client = client.clone();
287-
let labels = labels.clone();
288-
let uri = uri.clone();
285+
result = self.throttle.wait_for(total_bytes) => {
286+
match result {
287+
Ok(()) => {
288+
let client = client.clone();
289+
let labels = labels.clone();
290+
let uri = uri.clone();
289291

290-
let blk = rcv.next().await.expect("failed to advance through blocks"); // actually advance through the blocks
291-
let body = crate::full(blk.bytes.clone());
292-
let block_length = blk.bytes.len();
292+
let blk = rcv.next().await.expect("failed to advance through blocks"); // actually advance through the blocks
293+
let body = crate::full(blk.bytes.clone());
294+
let block_length = blk.bytes.len();
293295

294-
let request = Request::builder()
295-
.method(Method::POST)
296-
.uri(uri)
297-
.header(AUTHORIZATION, format!("Splunk {}", self.token))
298-
.header(CONTENT_LENGTH, block_length)
299-
.header(SPLUNK_HEC_CHANNEL_HEADER, channel.id())
300-
.body(body)?;
296+
let request = Request::builder()
297+
.method(Method::POST)
298+
.uri(uri)
299+
.header(AUTHORIZATION, format!("Splunk {}", self.token))
300+
.header(CONTENT_LENGTH, block_length)
301+
.header(SPLUNK_HEC_CHANNEL_HEADER, channel.id())
302+
.body(body)?;
301303

302-
// NOTE once JoinSet is in tokio stable we can make this
303-
// much, much tidier by spawning requests in the JoinSet. I
304-
// think we could also possibly have the send request return
305-
// the AckID, meaning we could just keep the channel logic
306-
// in this main loop here and avoid the AckService entirely.
307-
let permit = CONNECTION_SEMAPHORE.get().expect("Connecton Semaphore is empty or being initialized").acquire().await.expect("Semaphore has already been closed");
308-
tokio::spawn(send_hec_request(permit, block_length, labels, channel, client, request, request_shutdown.clone()));
304+
// NOTE once JoinSet is in tokio stable we can make this
305+
// much, much tidier by spawning requests in the JoinSet. I
306+
// think we could also possibly have the send request return
307+
// the AckID, meaning we could just keep the channel logic
308+
// in this main loop here and avoid the AckService entirely.
309+
let permit = CONNECTION_SEMAPHORE.get().expect("Connecton Semaphore is empty or being initialized").acquire().await.expect("Semaphore has already been closed");
310+
tokio::spawn(send_hec_request(permit, block_length, labels, channel, client, request, request_shutdown.clone()));
311+
}
312+
Err(err) => {
313+
error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}");
314+
}
315+
}
309316
}
310317
() = &mut shutdown_wait => {
311318
info!("shutdown signal received");

lading/src/generator/tcp.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use metrics::{counter, gauge};
2323
use rand::{SeedableRng, rngs::StdRng};
2424
use serde::{Deserialize, Serialize};
2525
use tokio::{io::AsyncWriteExt, net::TcpStream, sync::mpsc};
26-
use tracing::{info, trace};
26+
use tracing::{error, info, trace};
2727

2828
use crate::common::PeekableReceiver;
2929
use lading_payload::block::{self, Block};
@@ -177,20 +177,27 @@ impl Tcp {
177177
let total_bytes = blk.total_bytes;
178178

179179
tokio::select! {
180-
_ = self.throttle.wait_for(total_bytes) => {
181-
let blk = rcv.next().await.expect("failed to advance through the blocks"); // actually advance through the blocks
182-
match connection.write_all(&blk.bytes).await {
180+
result = self.throttle.wait_for(total_bytes) => {
181+
match result {
183182
Ok(()) => {
184-
counter!("bytes_written", &self.metric_labels).increment(u64::from(blk.total_bytes.get()));
185-
counter!("packets_sent", &self.metric_labels).increment(1);
183+
let blk = rcv.next().await.expect("failed to advance through the blocks"); // actually advance through the blocks
184+
match connection.write_all(&blk.bytes).await {
185+
Ok(()) => {
186+
counter!("bytes_written", &self.metric_labels).increment(u64::from(blk.total_bytes.get()));
187+
counter!("packets_sent", &self.metric_labels).increment(1);
188+
}
189+
Err(err) => {
190+
trace!("write failed: {}", err);
191+
192+
let mut error_labels = self.metric_labels.clone();
193+
error_labels.push(("error".to_string(), err.to_string()));
194+
counter!("request_failure", &error_labels).increment(1);
195+
current_connection = None;
196+
}
197+
}
186198
}
187199
Err(err) => {
188-
trace!("write failed: {}", err);
189-
190-
let mut error_labels = self.metric_labels.clone();
191-
error_labels.push(("error".to_string(), err.to_string()));
192-
counter!("request_failure", &error_labels).increment(1);
193-
current_connection = None;
200+
error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}");
194201
}
195202
}
196203
}

0 commit comments

Comments
 (0)