Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest]
tool: [check, fmt, clippy]
include:
- tool: check
protobuf: true
Expand All @@ -24,7 +25,7 @@ jobs:
protobuf: true
fuse: true
components: "rustfmt"
command: cargo fmt --all -- --check --all-features
command: cargo fmt --all -- --check
- tool: clippy
protobuf: true
fuse: true
Expand Down
13 changes: 6 additions & 7 deletions integration/ducks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,28 @@ use anyhow::Context;
use bytes::Bytes;
use bytes::BytesMut;
use http_body_util::Full;
use http_body_util::{combinators::BoxBody, BodyExt};
use hyper::{service::service_fn, Method, Request, StatusCode};
use http_body_util::{BodyExt, combinators::BoxBody};
use hyper::{Method, Request, StatusCode, service::service_fn};
use hyper_util::rt::TokioExecutor;
use hyper_util::rt::TokioIo;
use hyper_util::server::conn::auto;
use once_cell::sync::OnceCell;
use shared::{
DucksConfig,
integration_api::{
self,
self, Empty, HttpMetrics, ListenInfo, LogMessage, Metrics, SocketMetrics, TestConfig,
integration_target_server::{IntegrationTarget, IntegrationTargetServer},
Empty, HttpMetrics, ListenInfo, LogMessage, Metrics, SocketMetrics, TestConfig,
},
DucksConfig,
};
use sketches_ddsketch::DDSketch;
use std::{collections::HashMap, net::SocketAddr, pin::Pin, sync::Arc, time::Duration};
use tokio::task::JoinSet;
use tokio::{
io::AsyncReadExt,
net::{TcpListener, TcpStream, UdpSocket, UnixListener},
sync::{mpsc, Mutex},
sync::{Mutex, mpsc},
};
use tokio_stream::{wrappers::UnixListenerStream, Stream};
use tokio_stream::{Stream, wrappers::UnixListenerStream};
use tonic::Status;
use tracing::error;
use tracing::{debug, trace, warn};
Expand Down
2 changes: 1 addition & 1 deletion integration/sheepdog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use std::{
use anyhow::Context;
use hyper_util::rt::TokioIo;
use shared::{
integration_api::{self, integration_target_client::IntegrationTargetClient},
DucksConfig,
integration_api::{self, integration_target_client::IntegrationTargetClient},
};
use tempfile::TempDir;
use tokio::{net::UnixStream, process::Command};
Expand Down
6 changes: 3 additions & 3 deletions lading/src/bin/captool.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use std::collections::{hash_map::RandomState, BTreeSet, HashMap};
use std::collections::{BTreeSet, HashMap, hash_map::RandomState};
use std::ffi::OsStr;
use std::hash::BuildHasher;
use std::hash::Hasher;

use async_compression::tokio::bufread::ZstdDecoder;
use average::{concatenate, Estimate, Max, Min, Variance};
use average::{Estimate, Max, Min, Variance, concatenate};
use clap::Parser;
use futures::io;
use lading_capture::json::{Line, MetricKind};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_stream::wrappers::LinesStream;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::LinesStream;
use tracing::{error, info};
use tracing_subscriber::{fmt::format::FmtSpan, util::SubscriberInitExt};

Expand Down
8 changes: 4 additions & 4 deletions lading/src/bin/lading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ use lading::{
use metrics::gauge;
use metrics_exporter_prometheus::PrometheusBuilder;
use once_cell::sync::Lazy;
use rand::{rngs::StdRng, SeedableRng};
use rand::{SeedableRng, rngs::StdRng};
use regex::Regex;
use rustc_hash::FxHashMap;
use tokio::{
runtime::Builder,
signal,
sync::broadcast,
time::{self, sleep, Duration},
time::{self, Duration, sleep},
};
use tracing::{debug, error, info, info_span, warn, Instrument};
use tracing_subscriber::{util::SubscriberInitExt, EnvFilter};
use tracing::{Instrument, debug, error, info, info_span, warn};
use tracing_subscriber::{EnvFilter, util::SubscriberInitExt};

#[derive(thiserror::Error, Debug)]
enum Error {
Expand Down
6 changes: 4 additions & 2 deletions lading/src/bin/payloadtool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{io, num::NonZeroU32};
use clap::Parser;
use lading::generator::http::Method;
use lading_payload::block;
use rand::{rngs::StdRng, SeedableRng};
use rand::{SeedableRng, rngs::StdRng};
use tracing::{debug, error, info, warn};
use tracing_subscriber::{fmt::format::FmtSpan, util::SubscriberInitExt};

Expand Down Expand Up @@ -64,7 +64,9 @@ fn generate_and_check(
let total_generated_bytes_str = total_generated_bytes
.get_appropriate_unit(false)
.to_string();
warn!("Generator failed to generate {total_requested_bytes_str}, instead only found {total_generated_bytes_str} of data")
warn!(
"Generator failed to generate {total_requested_bytes_str}, instead only found {total_generated_bytes_str} of data"
)
}

Ok(())
Expand Down
1 change: 0 additions & 1 deletion lading/src/blackhole/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ where
Error = hyper::Error,
> + Send
+ 'static,

S::Future: Send + 'static,
{
let listener = TcpListener::bind(addr).await.map_err(Error::Io)?;
Expand Down
6 changes: 3 additions & 3 deletions lading/src/blackhole/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
//!

use bytes::Bytes;
use http::{header::InvalidHeaderValue, status::InvalidStatusCode, HeaderMap};
use http_body_util::{combinators::BoxBody, BodyExt};
use hyper::{header, Request, Response, StatusCode};
use http::{HeaderMap, header::InvalidHeaderValue, status::InvalidStatusCode};
use http_body_util::{BodyExt, combinators::BoxBody};
use hyper::{Request, Response, StatusCode, header};
use metrics::counter;
use serde::{Deserialize, Serialize};
use std::{net::SocketAddr, time::Duration};
Expand Down
4 changes: 2 additions & 2 deletions lading/src/blackhole/splunk_hec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use std::{
};

use bytes::Bytes;
use http_body_util::{combinators::BoxBody, BodyExt};
use hyper::{header, Method, Request, Response, StatusCode};
use http_body_util::{BodyExt, combinators::BoxBody};
use hyper::{Method, Request, Response, StatusCode, header};
use metrics::counter;
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
Expand Down
2 changes: 1 addition & 1 deletion lading/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub(crate) fn decode(
.body(crate::full(format!(
"Unsupported encoding type: {encoding}"
)))
.expect("failed to build response"))
.expect("failed to build response"));
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion lading/src/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ impl Server {
Inner::UnixStream(conf) => {
if let lading_payload::Config::DogStatsD(variant) = conf.variant {
if !variant.length_prefix_framed {
warn!("Dogstatsd stream requires length prefix framing. You likely want to add `length_prefix_framed: true` to your payload config.");
warn!(
"Dogstatsd stream requires length prefix framing. You likely want to add `length_prefix_framed: true` to your payload config."
);
}
}

Expand Down
6 changes: 3 additions & 3 deletions lading/src/generator/file_gen/logrotate_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

use crate::generator;
use fuser::{
spawn_mount2, BackgroundSession, FileAttr, Filesystem, MountOption, ReplyAttr, ReplyData,
ReplyDirectory, ReplyEntry, Request,
BackgroundSession, FileAttr, Filesystem, MountOption, ReplyAttr, ReplyData, ReplyDirectory,
ReplyEntry, Request, spawn_mount2,
};
use lading_payload::block;
use metrics::counter;
use nix::libc::{self, ENOENT};
use rand::{rngs::SmallRng, SeedableRng};
use rand::{SeedableRng, rngs::SmallRng};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
Expand Down
4 changes: 2 additions & 2 deletions lading/src/generator/file_gen/traditional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::{
num::NonZeroU32,
path::PathBuf,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
atomic::{AtomicU32, Ordering},
},
thread,
};
Expand All @@ -27,7 +27,7 @@ use byte_unit::{Byte, ByteError};
use futures::future::join_all;
use lading_throttle::Throttle;
use metrics::{counter, gauge};
use rand::{prelude::StdRng, SeedableRng};
use rand::{SeedableRng, prelude::StdRng};
use serde::{Deserialize, Serialize};
use tokio::{
fs,
Expand Down
6 changes: 3 additions & 3 deletions lading/src/generator/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ use std::{convert::TryFrom, num::NonZeroU32, thread, time::Duration};

use byte_unit::ByteError;
use bytes::{Buf, BufMut, Bytes};
use http::{uri::PathAndQuery, Uri};
use http::{Uri, uri::PathAndQuery};
use lading_throttle::Throttle;
use metrics::{counter, gauge};
use rand::rngs::StdRng;
use rand::SeedableRng;
use rand::rngs::StdRng;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tonic::{
codec::{DecodeBuf, Decoder, EncodeBuf, Encoder},
Request, Response, Status,
codec::{DecodeBuf, Decoder, EncodeBuf, Encoder},
};
use tracing::{debug, info};

Expand Down
6 changes: 3 additions & 3 deletions lading/src/generator/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
use std::{num::NonZeroU32, thread};

use byte_unit::ByteError;
use hyper::{header::CONTENT_LENGTH, HeaderMap, Request, Uri};
use hyper::{HeaderMap, Request, Uri, header::CONTENT_LENGTH};
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
use lading_throttle::Throttle;
use metrics::{counter, gauge};
use once_cell::sync::OnceCell;
use rand::{prelude::StdRng, SeedableRng};
use rand::{SeedableRng, prelude::StdRng};
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, Semaphore};
use tokio::sync::{Semaphore, mpsc};
use tracing::info;

use crate::common::PeekableReceiver;
Expand Down
2 changes: 1 addition & 1 deletion lading/src/generator/passthru_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio::io::AsyncWriteExt;
use byte_unit::ByteError;
use lading_throttle::Throttle;
use metrics::{counter, gauge};
use rand::{rngs::StdRng, SeedableRng};
use rand::{SeedableRng, rngs::StdRng};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tracing::{info, warn};
Expand Down
3 changes: 2 additions & 1 deletion lading/src/generator/process_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,8 @@ fn try_wait_pid(pids: &mut FxHashSet<Pid>) {
let mut exited: Option<Pid> = None;

for pid in pids.iter() {
if let Ok(WaitStatus::StillAlive) = waitpid(*pid, Some(WaitPidFlag::WNOHANG)) {} else {
if let Ok(WaitStatus::StillAlive) = waitpid(*pid, Some(WaitPidFlag::WNOHANG)) {
} else {
exited = Some(*pid);
break;
}
Expand Down
2 changes: 1 addition & 1 deletion lading/src/generator/procfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use std::str::FromStr;
use std::{fs::File, io::Write, num::NonZeroU32, path::PathBuf};

use lading_payload::procfs;
use rand::rngs::StdRng;
use rand::SeedableRng;
use rand::rngs::StdRng;
use serde::{Deserialize, Serialize};

#[derive(::thiserror::Error, Debug)]
Expand Down
8 changes: 4 additions & 4 deletions lading/src/generator/splunk_hec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ use std::{future::ready, num::NonZeroU32, thread, time::Duration};
use acknowledgements::Channels;
use byte_unit::ByteError;
use http::{
header::{AUTHORIZATION, CONTENT_LENGTH},
Method, Request, Uri,
header::{AUTHORIZATION, CONTENT_LENGTH},
};
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper_util::{
client::legacy::{connect::HttpConnector, Client},
client::legacy::{Client, connect::HttpConnector},
rt::TokioExecutor,
};
use lading_throttle::Throttle;
use metrics::{counter, gauge};
use once_cell::sync::OnceCell;
use rand::{prelude::StdRng, SeedableRng};
use rand::{SeedableRng, prelude::StdRng};
use serde::{Deserialize, Serialize};
use tokio::{
sync::{mpsc, Semaphore, SemaphorePermit},
sync::{Semaphore, SemaphorePermit, mpsc},
time::timeout,
};
use tracing::info;
Expand Down
2 changes: 1 addition & 1 deletion lading/src/generator/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{
use byte_unit::ByteError;
use lading_throttle::Throttle;
use metrics::{counter, gauge};
use rand::{rngs::StdRng, SeedableRng};
use rand::{SeedableRng, rngs::StdRng};
use serde::{Deserialize, Serialize};
use tokio::{io::AsyncWriteExt, net::TcpStream, sync::mpsc};
use tracing::{info, trace};
Expand Down
2 changes: 1 addition & 1 deletion lading/src/generator/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
use byte_unit::{Byte, ByteError, ByteUnit};
use lading_throttle::Throttle;
use metrics::{counter, gauge};
use rand::{rngs::StdRng, SeedableRng};
use rand::{SeedableRng, rngs::StdRng};
use serde::{Deserialize, Serialize};
use tokio::{net::UdpSocket, sync::mpsc};
use tracing::{debug, info, trace};
Expand Down
2 changes: 1 addition & 1 deletion lading/src/generator/unix_datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use futures::future::join_all;
use lading_payload::block::{self, Block};
use lading_throttle::Throttle;
use metrics::{counter, gauge};
use rand::{rngs::StdRng, SeedableRng};
use rand::{SeedableRng, rngs::StdRng};
use serde::{Deserialize, Serialize};
use std::{num::NonZeroU32, path::PathBuf, thread};
use tokio::{
Expand Down
2 changes: 1 addition & 1 deletion lading/src/generator/unix_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use byte_unit::ByteError;
use lading_payload::block::{self, Block};
use lading_throttle::Throttle;
use metrics::{counter, gauge};
use rand::{rngs::StdRng, SeedableRng};
use rand::{SeedableRng, rngs::StdRng};
use serde::{Deserialize, Serialize};
use std::{num::NonZeroU32, path::PathBuf, thread};
use tokio::{net, sync::mpsc, task::JoinError};
Expand Down
4 changes: 2 additions & 2 deletions lading/src/inspector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{

use nix::{
errno::Errno,
sys::signal::{kill, SIGTERM},
sys::signal::{SIGTERM, kill},
unistd::Pid,
};
use rustc_hash::FxHashMap;
Expand All @@ -25,7 +25,7 @@ use tokio::process::Command;
use tracing::{error, info};

use crate::{
common::{stdio, Output},
common::{Output, stdio},
target::TargetPidReceiver,
};

Expand Down
Loading
Loading