Skip to content

Commit 840e862

Browse files
committed
add http post build event uploader #nonprod
this is an alternative to using the manifold client to upload post-build events the current path is, if fbcode is enabled buck2 uses a ManifoldClient (manfold is Meta's internal s3 blob storage) with a hardcoded fbinternal address. this adds a config to .buckconfig that enables a http client to put the event blob to a configurable URL, instead of the hardcoded facebook url using their blob storages url storage schema this http uploader does not support chunked uploads, while the manifold client does support that. these event blobs seem to be a few mb for large builds so the oss path might not have as much of a need for chunked uploads as meta, keeping a simple upload is likely a feature we want for easy out-of-the-box integration with metrics collection cleanup use http client in addition to manifold client keep backwards support for 'should_upload_logs' update comment
1 parent 82e2cfa commit 840e862

9 files changed

Lines changed: 122 additions & 8 deletions

File tree

app/buck2_client_ctx/src/streaming.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ fn get_event_log_subscriber<T: StreamingCommand>(
300300
let user_event_log = cmd.user_event_log();
301301

302302
let logdir = paths.log_dir();
303+
let daemon_startup_config = ctx.immediate_config.daemon_startup_config().ok();
303304
let log = EventLog::new(
304305
logdir,
305306
ctx.working_dir.clone(),
@@ -312,10 +313,10 @@ fn get_event_log_subscriber<T: StreamingCommand>(
312313
T::COMMAND_NAME.to_owned(),
313314
ctx.start_time,
314315
log_size_counter_bytes,
315-
ctx.immediate_config
316-
.daemon_startup_config()
317-
.map(|daemon_startup_config| daemon_startup_config.retained_event_logs)
316+
daemon_startup_config
317+
.map(|c| c.retained_event_logs)
318318
.unwrap_or(DEFAULT_RETAINED_EVENT_LOGS),
319+
daemon_startup_config.and_then(|c| c.log_upload_url.clone()),
319320
);
320321
Box::new(log)
321322
}

app/buck2_client_ctx/src/subscribers/event_log.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ impl EventLog {
4040
start_time: SystemTime,
4141
log_size_counter_bytes: Option<Arc<AtomicU64>>,
4242
retained_event_logs: usize,
43+
log_upload_url: Option<String>,
4344
) -> EventLog {
4445
Self {
4546
writer: WriteEventLog::new(
@@ -52,6 +53,7 @@ impl EventLog {
5253
start_time,
5354
log_size_counter_bytes,
5455
retained_event_logs,
56+
log_upload_url,
5557
),
5658
}
5759
}

app/buck2_client_ctx/src/subscribers/re_log.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ async fn log_upload_impl(
8282
session_id: String,
8383
isolation_dir: FileNameBuf,
8484
) -> buck2_error::Result<()> {
85-
if !should_upload_log()? {
85+
if buck2_core::is_open_source() || !should_upload_log()? {
8686
return Ok(());
8787
}
8888

app/buck2_cmd_debug_client/src/persist_event_logs.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use buck2_common::chunk_reader::ChunkReader;
1818
use buck2_common::manifold;
1919
use buck2_common::manifold::ManifoldChunkedUploader;
2020
use buck2_common::manifold::ManifoldClient;
21+
use buck2_common::upload_client::HttpUploadClient;
2122
use buck2_core::soft_error;
2223
use buck2_data::InstantEvent;
2324
use buck2_data::PersistEventLogSubprocess;
@@ -63,6 +64,11 @@ pub struct PersistEventLogsCommand {
6364
local_path: String,
6465
#[clap(long, help = "If present, only write to disk and don't upload")]
6566
no_upload: bool,
67+
#[clap(
68+
long,
69+
help = "If present, also upload the log to this HTTP endpoint"
70+
)]
71+
log_upload_url: Option<String>,
6672
#[clap(
6773
long,
6874
help = "UUID of invocation that called this subcommand for logging purposes"
@@ -122,7 +128,7 @@ impl PersistEventLogsCommand {
122128
}
123129
};
124130
let write = write_task(&file, tx, stdin);
125-
let upload = upload_task(&file, rx, self.manifold_name, self.no_upload);
131+
let upload = upload_task(&file, rx, self.manifold_name, self.log_upload_url, self.no_upload);
126132

127133
// Wait for both tasks to finish. If the upload fails we want to keep writing to disk
128134
let (write_result, upload_result) = tokio::join!(write, upload);
@@ -175,6 +181,7 @@ async fn upload_task(
175181
file_mutex: &Mutex<File>,
176182
mut rx: tokio::sync::mpsc::UnboundedReceiver<u64>,
177183
manifold_name: String,
184+
log_upload_url: Option<String>,
178185
no_upload: bool,
179186
) -> buck2_error::Result<()> {
180187
if no_upload {
@@ -218,6 +225,21 @@ async fn upload_task(
218225
// Last chunk to upload is smaller than &reader
219226
uploader.upload_chunk().await?;
220227

228+
if let Some(url) = log_upload_url {
229+
let mut file = file_mutex.lock().await;
230+
file.seek(io::SeekFrom::Start(0))
231+
.await
232+
.buck_error_context("Failed to seek log file")?;
233+
let mut buf = Vec::new();
234+
file.read_to_end(&mut buf)
235+
.await
236+
.buck_error_context("Failed to read log file")?;
237+
drop(file);
238+
239+
let client = HttpUploadClient::new(url).await?;
240+
client.write(&manifold_name, buf.into()).await?;
241+
}
242+
221243
Ok(())
222244
}
223245

app/buck2_common/src/init.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,7 @@ pub struct DaemonStartupConfig {
525525
pub log_download_method: LogDownloadMethod,
526526
pub health_check_config: HealthCheckConfig,
527527
pub retained_event_logs: usize,
528+
pub log_upload_url: Option<String>,
528529
pub macos_qos_class: Option<String>,
529530
}
530531

@@ -608,6 +609,12 @@ impl DaemonStartupConfig {
608609
})
609610
.and_then(|s| s.parse::<usize>().ok())
610611
.unwrap_or(DEFAULT_RETAINED_EVENT_LOGS),
612+
log_upload_url: config
613+
.get(BuckconfigKeyRef {
614+
section: "buck2",
615+
property: "log_upload_url",
616+
})
617+
.map(ToOwned::to_owned),
611618
macos_qos_class: {
612619
let from_config = config
613620
.get(BuckconfigKeyRef {
@@ -661,6 +668,7 @@ impl DaemonStartupConfig {
661668
},
662669
health_check_config: HealthCheckConfig::default(),
663670
retained_event_logs: DEFAULT_RETAINED_EVENT_LOGS,
671+
log_upload_url: None,
664672
macos_qos_class: None,
665673
}
666674
}

app/buck2_common/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,4 @@ pub mod sqlite;
5959
pub mod starlark_profiler;
6060
pub mod target_aliases;
6161
pub mod temp_path;
62+
pub mod upload_client;
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* This source code is dual-licensed under either the MIT license found in the
5+
* LICENSE-MIT file in the root directory of this source tree or the Apache
6+
* License, Version 2.0 found in the LICENSE-APACHE file in the root directory
7+
* of this source tree. You may select, at your option, one of the
8+
* above-listed licenses.
9+
*/
10+
11+
use std::time::Duration;
12+
13+
use buck2_http::HttpClient;
14+
use buck2_http::HttpClientBuilder;
15+
use buck2_http::retries::HttpError;
16+
use buck2_http::retries::HttpErrorForRetry;
17+
use buck2_http::retries::IntoBuck2Error;
18+
use buck2_http::retries::http_retry;
19+
use bytes::Bytes;
20+
use futures::stream::BoxStream;
21+
use futures::stream::StreamExt;
22+
use hyper::Response;
23+
24+
#[derive(Debug, buck2_error::Error)]
25+
#[buck2(tag = Http)]
26+
enum HttpWriteError {
27+
#[error(transparent)]
28+
Client(HttpError),
29+
}
30+
31+
impl HttpErrorForRetry for HttpWriteError {
32+
fn is_retryable(&self) -> bool {
33+
match self {
34+
Self::Client(e) => e.is_retryable(),
35+
}
36+
}
37+
}
38+
39+
impl IntoBuck2Error for HttpWriteError {
40+
fn into_buck2_error(self) -> buck2_error::Error {
41+
buck2_error::Error::from(self)
42+
}
43+
}
44+
45+
pub struct HttpUploadClient {
46+
client: HttpClient,
47+
base_url: String,
48+
}
49+
50+
impl HttpUploadClient {
51+
pub async fn new(base_url: String) -> buck2_error::Result<Self> {
52+
let client = HttpClientBuilder::oss().await?.build();
53+
Ok(Self { client, base_url })
54+
}
55+
56+
pub async fn write(&self, path: &str, buf: Bytes) -> buck2_error::Result<()> {
57+
let url = format!("{}/{}", self.base_url, path);
58+
let res = http_retry(
59+
|| async {
60+
self.client
61+
.put(&url, buf.clone(), vec![])
62+
.await
63+
.map_err(|e| HttpWriteError::Client(HttpError::Client(e)))
64+
},
65+
vec![Duration::from_secs(1), Duration::from_secs(2)],
66+
)
67+
.await?;
68+
consume_response(res).await;
69+
Ok(())
70+
}
71+
}
72+
73+
async fn consume_response<'a>(mut res: Response<BoxStream<'a, hyper::Result<Bytes>>>) {
74+
while let Some(_chunk) = res.body_mut().next().await {}
75+
}

app/buck2_event_log/src/lib.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@ pub mod write;
3131
pub mod writer;
3232

3333
pub fn should_upload_log() -> buck2_error::Result<bool> {
34-
if buck2_core::is_open_source() {
35-
return Ok(false);
36-
}
3734
Ok(!buck2_env!(
3835
"BUCK2_TEST_DISABLE_LOG_UPLOAD",
3936
bool,

app/buck2_event_log/src/write.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ pub struct WriteEventLog {
6464
buf: Vec<u8>,
6565
log_size_counter_bytes: Option<Arc<AtomicU64>>,
6666
retained_event_logs: usize,
67+
log_upload_url: Option<String>,
6768
}
6869

6970
impl WriteEventLog {
@@ -77,6 +78,7 @@ impl WriteEventLog {
7778
start_time: SystemTime,
7879
log_size_counter_bytes: Option<Arc<AtomicU64>>,
7980
retained_event_logs: usize,
81+
log_upload_url: Option<String>,
8082
) -> Self {
8183
Self {
8284
state: LogWriterState::Unopened {
@@ -91,6 +93,7 @@ impl WriteEventLog {
9193
buf: Vec::new(),
9294
log_size_counter_bytes,
9395
retained_event_logs,
96+
log_upload_url,
9497
}
9598
}
9699

@@ -180,6 +183,7 @@ impl WriteEventLog {
180183
path,
181184
event.trace_id()?.clone(),
182185
self.log_size_counter_bytes.clone(),
186+
self.log_upload_url.clone(),
183187
)
184188
.await?;
185189
let mut writers = vec![writer];
@@ -255,6 +259,7 @@ async fn start_persist_event_log_subprocess(
255259
path: EventLogPathBuf,
256260
trace_id: TraceId,
257261
bytes_written: Option<Arc<AtomicU64>>,
262+
log_upload_url: Option<String>,
258263
) -> buck2_error::Result<NamedEventLogWriter> {
259264
let current_exe = std::env::current_exe().buck_error_context("No current_exe")?;
260265
let mut command = buck2_util::process::async_background_command(current_exe);
@@ -273,6 +278,8 @@ async fn start_persist_event_log_subprocess(
273278
.args(["--trace-id", &trace_id.to_string()]);
274279
if !should_upload_log()? {
275280
command.arg("--no-upload");
281+
} else if let Some(url) = log_upload_url {
282+
command.args(["--log-upload-url", &url]);
276283
};
277284
command.stdout(Stdio::null()).stdin(Stdio::piped());
278285

@@ -483,6 +490,7 @@ mod tests {
483490
log_size_counter_bytes: None,
484491
start_time: SystemTime::UNIX_EPOCH,
485492
retained_event_logs: 5,
493+
log_upload_url: None,
486494
})
487495
}
488496
}

0 commit comments

Comments
 (0)