Skip to content

Commit c669847

Browse files
authored
parallel_connections for Unix Stream generator (#1421)
1 parent 7e4a342 commit c669847

File tree

3 files changed

+95
-34
lines changed

3 files changed

+95
-34
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111
'capacity' exists in the throttle struct itself, without breaking use-cases
1212
where bytes-per-second are specified directly. bytes-per-second implies a
1313
stable throttle.
14+
- `unix_stream` generator now supports `parallel_connections` in a manner similar to
15+
`unix_datagram`.
1416

1517
## [0.26.0]
1618
## Added

lading/src/generator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ impl Server {
197197

198198
Self::UnixStream(unix_stream::UnixStream::new(
199199
config.general,
200-
conf,
200+
&conf,
201201
shutdown,
202202
)?)
203203
}

lading/src/generator/unix_stream.rs

Lines changed: 92 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,19 @@ use metrics::counter;
1818
use rand::{SeedableRng, rngs::StdRng};
1919
use serde::{Deserialize, Serialize};
2020
use std::{num::NonZeroU32, path::PathBuf, thread};
21-
use tokio::{net, sync::mpsc, task::JoinError};
21+
use tokio::{
22+
net,
23+
sync::{broadcast::Receiver, mpsc},
24+
task::{JoinError, JoinSet},
25+
};
2226
use tracing::{debug, error, info, warn};
2327

2428
use super::General;
2529

30+
fn default_parallel_connections() -> u16 {
31+
1
32+
}
33+
2634
#[derive(Debug, Deserialize, Serialize, PartialEq)]
2735
#[serde(deny_unknown_fields)]
2836
/// Configuration of this generator.
@@ -33,7 +41,7 @@ pub struct Config {
3341
pub path: PathBuf,
3442
/// The payload variant
3543
pub variant: lading_payload::Config,
36-
/// The bytes per second to send or receive from the target
44+
/// The bytes per second to send or receive from the target, per connection.
3745
pub bytes_per_second: Option<byte_unit::Byte>,
3846
/// The maximum size in bytes of the cache of prebuilt messages
3947
pub maximum_prebuild_cache_size_bytes: byte_unit::Byte,
@@ -43,6 +51,10 @@ pub struct Config {
4351
/// Whether to use a fixed or streaming block cache
4452
#[serde(default = "lading_payload::block::default_cache_method")]
4553
pub block_cache_method: block::CacheMethod,
54+
/// The total number of parallel connections to maintain, see documentation
55+
/// on `bytes_per_second`.
56+
#[serde(default = "default_parallel_connections")]
57+
pub parallel_connections: u16,
4658
/// The load throttle configuration
4759
pub throttle: Option<crate::generator::common::BytesThrottleConfig>,
4860
}
@@ -59,6 +71,15 @@ pub enum Error {
5971
/// Subtask error
6072
#[error("Subtask failure: {0}")]
6173
Subtask(#[from] JoinError),
74+
/// Child sub-task error.
75+
#[error("Child join error: {0}")]
76+
Child(JoinError),
77+
/// Startup send error.
78+
#[error("Startup send error: {0}")]
79+
StartupSend(#[from] tokio::sync::broadcast::error::SendError<()>),
80+
/// Child startup wait error.
81+
#[error("Child startup wait error: {0}")]
82+
StartupWait(#[from] tokio::sync::broadcast::error::RecvError),
6283
/// Byte error
6384
#[error("Bytes must not be negative: {0}")]
6485
Byte(#[from] byte_unit::ParseError),
@@ -82,11 +103,9 @@ pub enum Error {
82103
/// This generator is responsible for sending data to the target via UDS
83104
/// streams.
84105
pub struct UnixStream {
85-
path: PathBuf,
86-
throttle: Throttle,
87-
block_cache: block::Cache,
88-
metric_labels: Vec<(String, String)>,
106+
handles: JoinSet<Result<(), Error>>,
89107
shutdown: lading_signal::Watcher,
108+
startup: tokio::sync::broadcast::Sender<()>,
90109
}
91110

92111
impl UnixStream {
@@ -103,7 +122,7 @@ impl UnixStream {
103122
#[allow(clippy::cast_possible_truncation)]
104123
pub fn new(
105124
general: General,
106-
config: Config,
125+
config: &Config,
107126
shutdown: lading_signal::Watcher,
108127
) -> Result<Self, Error> {
109128
let mut rng = StdRng::from_seed(config.seed);
@@ -115,37 +134,51 @@ impl UnixStream {
115134
labels.push(("id".to_string(), id));
116135
}
117136

118-
let throttle_config = match (config.bytes_per_second, &config.throttle) {
119-
(Some(bytes_per_second), None) => {
120-
let bytes_per_second =
121-
NonZeroU32::new(bytes_per_second.as_u128() as u32).ok_or(Error::Zero)?;
122-
lading_throttle::Config::Stable {
123-
maximum_capacity: bytes_per_second,
137+
let (startup, _startup_rx) = tokio::sync::broadcast::channel(1);
138+
139+
let mut handles = JoinSet::new();
140+
for _ in 0..config.parallel_connections {
141+
let throttle_config = match (config.bytes_per_second, &config.throttle) {
142+
(Some(bytes_per_second), None) => {
143+
let bytes_per_second =
144+
NonZeroU32::new(bytes_per_second.as_u128() as u32).ok_or(Error::Zero)?;
145+
lading_throttle::Config::Stable {
146+
maximum_capacity: bytes_per_second,
147+
}
124148
}
125-
}
126-
(None, Some(throttle)) => (*throttle).try_into()?,
127-
(Some(_), Some(_)) => return Err(Error::ConflictingThrottleConfig),
128-
(None, None) => return Err(Error::NoThrottleConfig),
129-
};
149+
(None, Some(throttle)) => (*throttle).try_into()?,
150+
(Some(_), Some(_)) => return Err(Error::ConflictingThrottleConfig),
151+
(None, None) => return Err(Error::NoThrottleConfig),
152+
};
153+
let throttle = Throttle::new_with_config(throttle_config);
154+
155+
let total_bytes =
156+
NonZeroU32::new(config.maximum_prebuild_cache_size_bytes.as_u128() as u32)
157+
.ok_or(Error::Zero)?;
158+
let block_cache = match config.block_cache_method {
159+
block::CacheMethod::Fixed => block::Cache::fixed(
160+
&mut rng,
161+
total_bytes,
162+
config.maximum_block_size.as_u128(),
163+
&config.variant,
164+
)?,
165+
};
166+
167+
let child = Child {
168+
path: config.path.clone(),
169+
block_cache,
170+
throttle,
171+
metric_labels: labels.clone(),
172+
shutdown: shutdown.clone(),
173+
};
130174

131-
let total_bytes =
132-
NonZeroU32::new(config.maximum_prebuild_cache_size_bytes.as_u128() as u32)
133-
.ok_or(Error::Zero)?;
134-
let block_cache = match config.block_cache_method {
135-
block::CacheMethod::Fixed => block::Cache::fixed(
136-
&mut rng,
137-
total_bytes,
138-
config.maximum_block_size.as_u128(),
139-
&config.variant,
140-
)?,
141-
};
175+
handles.spawn(child.spin(startup.subscribe()));
176+
}
142177

143178
Ok(Self {
144-
path: config.path,
145-
block_cache,
146-
throttle: Throttle::new_with_config(throttle_config),
147-
metric_labels: labels,
179+
handles,
148180
shutdown,
181+
startup,
149182
})
150183
}
151184

@@ -159,6 +192,32 @@ impl UnixStream {
159192
///
160193
/// Function will panic if underlying byte capacity is not available.
161194
pub async fn spin(mut self) -> Result<(), Error> {
195+
self.startup.send(())?;
196+
self.shutdown.recv().await;
197+
info!("shutdown signal received");
198+
while let Some(res) = self.handles.join_next().await {
199+
match res {
200+
Ok(Ok(())) => continue,
201+
Ok(Err(err)) => return Err(err),
202+
Err(err) => return Err(Error::Child(err)),
203+
}
204+
}
205+
Ok(())
206+
}
207+
}
208+
209+
#[derive(Debug)]
210+
struct Child {
211+
path: PathBuf,
212+
throttle: Throttle,
213+
block_cache: block::Cache,
214+
metric_labels: Vec<(String, String)>,
215+
shutdown: lading_signal::Watcher,
216+
}
217+
218+
impl Child {
219+
async fn spin(mut self, mut startup_receiver: Receiver<()>) -> Result<(), Error> {
220+
startup_receiver.recv().await?;
162221
debug!("UnixStream generator running");
163222

164223
// Move the block_cache into an OS thread, exposing a channel between it

0 commit comments

Comments
 (0)