Skip to content

Commit 48f9ea8

Browse files
Julian Priestleymeta-codesync[bot]
authored andcommitted
Exclude backfill request types from the general worker
Summary: **Context:** We want backfills to be handled by a separate set of async request workers, to better allow controlling scaling, execution concurrency etc. We'll use the same framework, the same request queue etc, but have one set of workers handling the backfill requests, and one handling the remaining requests. **This Diff:** Exclude backfill types from the 'general' async requests workers. Differential Revision: D100604598 fbshipit-source-id: c7978009bf45c8016d0fead04d998930a163cd65
1 parent 85f8e82 commit 48f9ea8

3 files changed

Lines changed: 19 additions & 2 deletions

File tree

eden/mononoke/features/async_requests/worker/BUCK

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ rust_binary(
2222
"//eden/mononoke/features/async_requests:async_requests",
2323
"//eden/mononoke/features/async_requests:async_requests_client",
2424
"//eden/mononoke/features/async_requests:requests_table",
25+
"//eden/mononoke/features/async_requests/types:async_requests_types",
2526
"//eden/mononoke/features/async_requests/worker_lib:worker_lib",
2627
"//eden/mononoke/megarepo_api:megarepo_api",
2728
"//eden/mononoke/metaconfig:metaconfig_types",

eden/mononoke/features/async_requests/worker/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ anyhow = "1.0.102"
1212
async-trait = "0.1.86"
1313
async_requests = { version = "0.1.0", path = ".." }
1414
async_requests_client = { version = "0.1.0", path = "../client" }
15+
async_requests_types = { version = "0.1.0", path = "../types" }
1516
blobstore = { version = "0.1.0", path = "../../../blobstore" }
1617
clap = { version = "4.6.0", features = ["derive", "env", "string", "unicode", "wrap_help"] }
1718
cmdlib_logging = { version = "0.1.0", path = "../../../cmdlib/log" }

eden/mononoke/features/async_requests/worker/src/main.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ use anyhow::Context;
1313
use anyhow::Result;
1414
use async_requests::AsyncMethodRequestQueue;
1515
use async_requests::QueueRepoFilter;
16+
use async_requests::QueueRequestTypeFilter;
1617
use async_requests_client::open_blobstore;
1718
use async_requests_client::open_sql_connection;
19+
use async_requests_types::BACKFILL_REQUEST_TYPES;
1820
use async_trait::async_trait;
1921
use blobstore::Blobstore;
2022
use clap::Parser;
@@ -49,6 +51,17 @@ use worker_lib::worker::AsyncMethodRequestWorker;
4951

5052
const SERVICE_NAME: &str = "async_requests_worker";
5153

54+
/// Build a QueueRequestTypeFilter that excludes backfill request types.
55+
/// Backfill requests are handled by the dedicated backfill_worker.
56+
fn backfill_exclude_filter() -> QueueRequestTypeFilter {
57+
QueueRequestTypeFilter::Except(
58+
BACKFILL_REQUEST_TYPES
59+
.iter()
60+
.map(|s| requests_table::RequestType(s.to_string()))
61+
.collect(),
62+
)
63+
}
64+
5265
const SM_CLEANUP_TIMEOUT_SECS: u64 = 60;
5366

5467
/// Processes the megarepo async requests
@@ -122,10 +135,11 @@ impl RepoShardedProcess for WorkerProcess {
122135
let repos = vec![repo.repo_identity.id()];
123136
info!("Completed setup for repo {} ({:?})", repo_name, repos);
124137

125-
let queue = Arc::new(AsyncMethodRequestQueue::new(
138+
let queue = Arc::new(AsyncMethodRequestQueue::new_with_request_type_filter(
126139
self.sql_connection.clone(),
127140
self.blobstore.clone(),
128141
QueueRepoFilter::Only(repos),
142+
backfill_exclude_filter(),
129143
));
130144

131145
let executor = AsyncMethodRequestWorker::new(
@@ -296,10 +310,11 @@ fn run_worker_queue(
296310
will_exit: Arc<AtomicBool>,
297311
) -> Result<()> {
298312
let executor = {
299-
let queue = Arc::new(AsyncMethodRequestQueue::new(
313+
let queue = Arc::new(AsyncMethodRequestQueue::new_with_request_type_filter(
300314
sql_connection,
301315
blobstore,
302316
repo_filter,
317+
backfill_exclude_filter(),
303318
));
304319

305320
runtime.block_on(AsyncMethodRequestWorker::new(

0 commit comments

Comments
 (0)