Skip to content

Commit 81b6cfc

Browse files
committed
add otlp upload to runner
1 parent dabe035 commit 81b6cfc

3 files changed

Lines changed: 135 additions & 28 deletions

File tree

runners/s3-benchrunner-rust/Cargo.lock

Lines changed: 27 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

runners/s3-benchrunner-rust/Cargo.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ edition = "2021"
66
[dependencies]
77

88
# Swap which line is commented-out to use GitHub or local aws-s3-transfer-manager
9-
#aws-s3-transfer-manager = { git = "https://github.com/awslabs/aws-s3-transfer-manager-rs.git", rev = "06c087a5d53676bb048f6c512b8eb1fda63f03d5" }
10-
aws-s3-transfer-manager = { path = "../../../aws-s3-transfer-manager-rs/aws-s3-transfer-manager" }
9+
aws-s3-transfer-manager = { git = "https://github.com/awslabs/aws-s3-transfer-manager-rs.git", rev = "06c087a5d53676bb048f6c512b8eb1fda63f03d5" }
10+
#aws-s3-transfer-manager = { path = "../../../aws-s3-transfer-manager-rs/aws-s3-transfer-manager" }
1111

1212
tracing-opentelemetry = "0.27"
1313
opentelemetry = { version = "0.26", features = ["trace"] }
@@ -16,15 +16,17 @@ opentelemetry_sdk = { version = "0.26", default-features = false, features = [
1616
"rt-tokio",
1717
] }
1818
opentelemetry-proto = "0.26"
19+
opentelemetry-otlp = "0.26"
1920
opentelemetry-stdout = { version = "0.26", features = ["trace"] }
2021
opentelemetry-semantic-conventions = "0.26"
22+
tonic = "0.12.3"
2123

2224
anyhow = "1.0.86"
2325
async-trait = "0.1.81"
2426
aws-config = "1.5.11"
2527
bytes = "1"
2628
chrono = "0.4.38"
27-
clap = { version = "4.5.9", features = ["derive"] }
29+
clap = { version = "4.5.23", features = ["derive"] }
2830
fastrand = "=2.1.0"
2931
futures-util = "0.3"
3032
ordered-float = "4.3.0"

runners/s3-benchrunner-rust/src/main.rs

Lines changed: 103 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,44 @@
1-
use clap::{Parser, ValueEnum};
2-
use std::process::exit;
1+
use clap::{Parser, Subcommand, ValueEnum};
2+
use std::fs::File;
33
use std::time::Instant;
4+
use std::{path::Path, process::exit};
45
use tracing::{info_span, Instrument};
56

7+
use opentelemetry_proto::tonic::{
8+
collector::trace::v1::{trace_service_client::TraceServiceClient, ExportTraceServiceRequest},
9+
trace::v1::TracesData,
10+
};
11+
use tonic::transport::Channel;
12+
613
use s3_benchrunner_rust::{
714
bytes_to_gigabits, prepare_run, telemetry, BenchmarkConfig, Result, RunBenchmark,
815
SkipBenchmarkError, TransferManagerRunner,
916
};
17+
18+
#[derive(Parser, Debug)]
19+
struct SimpleCli {
20+
#[command(flatten)]
21+
run_args: RunArgs,
22+
}
23+
1024
#[derive(Parser, Debug)]
11-
#[command()]
12-
struct Args {
25+
struct ExtendedCli {
26+
#[command(subcommand)]
27+
command: Command,
28+
#[command(flatten)]
29+
run_args: Option<RunArgs>,
30+
}
31+
32+
#[derive(Subcommand, Debug)]
33+
enum Command {
34+
RunBenchmark(RunArgs),
35+
UploadOtlp(UploadOtlpArgs),
36+
}
37+
38+
#[derive(Debug, clap::Args)]
39+
#[command(args_conflicts_with_subcommands = true)]
40+
#[command(flatten_help = true)]
41+
struct RunArgs {
1342
#[arg(value_enum, help = "ID of S3 library to use")]
1443
s3_client: S3ClientId,
1544
#[arg(help = "Path to workload file (e.g. download-1GiB.run.json)")]
@@ -29,6 +58,17 @@ struct Args {
2958
disable_directory: bool,
3059
}
3160

61+
#[derive(Debug, clap::Args)]
62+
#[command(flatten_help = true)]
63+
struct UploadOtlpArgs {
64+
/// OLTP endpoint to export data to
65+
#[arg(long, default_value = "http://localhost:4317")]
66+
oltp_endpoint: String,
67+
68+
/// Path to the trace file (in opentelemetry-proto JSON format) to upload
69+
json_file: String,
70+
}
71+
3272
#[derive(ValueEnum, Clone, Debug)]
3373
enum S3ClientId {
3474
#[clap(name = "sdk-rust-tm", help = "use aws-s3-transfer-manager crate")]
@@ -39,24 +79,36 @@ enum S3ClientId {
3979
}
4080

4181
#[tokio::main]
42-
async fn main() {
43-
let args = Args::parse();
44-
45-
let result = execute(&args).await;
46-
if let Err(e) = result {
47-
match e.downcast_ref::<SkipBenchmarkError>() {
48-
None => {
49-
panic!("{e:?}");
50-
}
51-
Some(msg) => {
52-
eprintln!("Skipping benchmark - {msg}");
53-
exit(123);
82+
async fn main() -> Result<()> {
83+
let command = SimpleCli::try_parse()
84+
.map(|cli| Command::RunBenchmark(cli.run_args))
85+
.unwrap_or_else(|_| ExtendedCli::parse().command);
86+
87+
// let cli = Cli::parse();
88+
// let command = cli.command.unwrap_or(Command::RunBenchmark(cli.run_args));
89+
90+
match command {
91+
Command::RunBenchmark(args) => {
92+
let result = execute(&args).await;
93+
if let Err(e) = result {
94+
match e.downcast_ref::<SkipBenchmarkError>() {
95+
None => {
96+
panic!("{e:?}");
97+
}
98+
Some(msg) => {
99+
eprintln!("Skipping benchmark - {msg}");
100+
exit(123);
101+
}
102+
}
54103
}
55104
}
105+
Command::UploadOtlp(args) => upload_otlp(args).await?,
56106
}
107+
108+
Ok(())
57109
}
58110

59-
async fn execute(args: &Args) -> Result<()> {
111+
async fn execute(args: &RunArgs) -> Result<()> {
60112
let mut telemetry = if args.telemetry {
61113
// If emitting telemetry, set that up as tracing_subscriber.
62114
Some(telemetry::init_tracing_subscriber().unwrap())
@@ -119,7 +171,7 @@ async fn execute(args: &Args) -> Result<()> {
119171
Ok(())
120172
}
121173

122-
async fn new_runner(args: &Args) -> Result<Box<dyn RunBenchmark>> {
174+
async fn new_runner(args: &RunArgs) -> Result<Box<dyn RunBenchmark>> {
123175
let config = BenchmarkConfig::new(
124176
&args.workload,
125177
&args.bucket,
@@ -150,3 +202,36 @@ fn trace_file_name(
150202
let run_start = run_start.format("%Y%m%dT%H%M%SZ").to_string();
151203
format!("trace_{run_start}_{workload}_run{run_num:02}.json")
152204
}
205+
206+
async fn upload_otlp(args: UploadOtlpArgs) -> Result<()> {
207+
let path = Path::new(&args.json_file);
208+
let f = File::open(path)?;
209+
let trace_data = read_spans_from_json(f)?;
210+
println!("loaded {} spans", trace_data.resource_spans.len());
211+
212+
let endpoint = Channel::from_shared(args.oltp_endpoint)?;
213+
let channel = endpoint.connect_lazy();
214+
let mut client = TraceServiceClient::new(channel);
215+
216+
let requests: Vec<_> = trace_data
217+
.resource_spans
218+
.chunks(4_096)
219+
.map(|batch| ExportTraceServiceRequest {
220+
resource_spans: batch.to_vec(),
221+
})
222+
.collect();
223+
224+
for request in requests {
225+
let resp = client.export(request).await?;
226+
println!("export response: {:?}", resp);
227+
}
228+
229+
Ok(())
230+
}
231+
232+
// read a file contains ResourceSpans in json format
233+
pub fn read_spans_from_json(file: File) -> Result<TracesData> {
234+
let reader = std::io::BufReader::new(file);
235+
let trace_data: TracesData = serde_json::from_reader(reader)?;
236+
Ok(trace_data)
237+
}

0 commit comments

Comments
 (0)