Skip to content

Commit b15154c

Browse files
Implement Operations server (TraceMachina#2233)
1 parent 373e1b5 commit b15154c

3 files changed

Lines changed: 424 additions & 26 deletions

File tree

nativelink-service/src/execution_server.rs

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@ use nativelink_proto::build::bazel::remote::execution::v2::execution_server::{
3030
use nativelink_proto::build::bazel::remote::execution::v2::{
3131
Action, Command, ExecuteRequest, WaitExecutionRequest,
3232
};
33-
use nativelink_proto::google::longrunning::Operation;
33+
use nativelink_proto::google::longrunning::operations_server::{Operations, OperationsServer};
34+
use nativelink_proto::google::longrunning::{
35+
CancelOperationRequest, DeleteOperationRequest, GetOperationRequest, ListOperationsRequest,
36+
ListOperationsResponse, Operation, WaitOperationRequest,
37+
};
3438
use nativelink_store::ac_utils::get_and_decode_digest;
3539
use nativelink_store::store_manager::StoreManager;
3640
use nativelink_util::action_messages::{
@@ -78,6 +82,7 @@ impl fmt::Display for NativelinkOperationId {
7882
}
7983
}
8084

85+
#[derive(Clone)]
8186
struct InstanceInfo {
8287
scheduler: Arc<dyn ClientStateManager>,
8388
cas_store: Store,
@@ -161,7 +166,7 @@ impl InstanceInfo {
161166
}
162167
}
163168

164-
#[derive(Debug)]
169+
#[derive(Debug, Clone)]
165170
pub struct ExecutionServer {
166171
instance_infos: HashMap<InstanceName, InstanceInfo>,
167172
}
@@ -204,6 +209,10 @@ impl ExecutionServer {
204209
Server::new(self)
205210
}
206211

212+
pub fn into_operations_service(self) -> OperationsServer<Self> {
213+
OperationsServer::new(self)
214+
}
215+
207216
fn to_execute_stream(
208217
nl_client_operation_id: &NativelinkOperationId,
209218
action_listener: Box<dyn ActionStateResult>,
@@ -375,6 +384,107 @@ impl Execution for ExecutionServer {
375384
}
376385
}
377386

387+
#[tonic::async_trait]
388+
impl Operations for ExecutionServer {
389+
async fn list_operations(
390+
&self,
391+
_request: Request<ListOperationsRequest>,
392+
) -> Result<Response<ListOperationsResponse>, Status> {
393+
Err(Status::unimplemented("list_operations not implemented"))
394+
}
395+
396+
async fn delete_operation(
397+
&self,
398+
_request: Request<DeleteOperationRequest>,
399+
) -> Result<Response<()>, Status> {
400+
Err(Status::unimplemented("delete_operation not implemented"))
401+
}
402+
403+
async fn cancel_operation(
404+
&self,
405+
_request: Request<CancelOperationRequest>,
406+
) -> Result<Response<()>, Status> {
407+
Err(Status::unimplemented("cancel_operation not implemented"))
408+
}
409+
410+
async fn get_operation(
411+
&self,
412+
request: Request<GetOperationRequest>,
413+
) -> Result<Response<Operation>, Status> {
414+
let inner_request = request.into_inner();
415+
416+
let mut stream = Box::pin(
417+
self.inner_wait_execution(WaitExecutionRequest {
418+
name: inner_request.name,
419+
})
420+
.await?,
421+
);
422+
423+
let operation = stream
424+
.next()
425+
.await
426+
.ok_or_else(|| Status::not_found("Operation not found"))??;
427+
428+
Ok(Response::new(operation))
429+
}
430+
431+
async fn wait_operation(
432+
&self,
433+
request: Request<WaitOperationRequest>,
434+
) -> Result<Response<Operation>, Status> {
435+
let inner_request = request.into_inner();
436+
let timeout_opt = inner_request.timeout.map(|d| {
437+
let secs = u64::try_from(d.seconds).unwrap_or(0);
438+
let nanos = u32::try_from(d.nanos).unwrap_or(0);
439+
Duration::new(secs, nanos)
440+
});
441+
442+
let mut stream = Box::pin(
443+
self.inner_wait_execution(WaitExecutionRequest {
444+
name: inner_request.name,
445+
})
446+
.await?,
447+
);
448+
449+
let mut last_operation = stream
450+
.next()
451+
.await
452+
.ok_or_else(|| Status::not_found("Operation not found"))??;
453+
454+
if last_operation.done {
455+
return Ok(Response::new(last_operation));
456+
}
457+
458+
let end_time = timeout_opt.map(|t| tokio::time::Instant::now() + t);
459+
460+
loop {
461+
let next_fut = stream.next();
462+
let next_res = if let Some(end) = end_time {
463+
match tokio::time::timeout_at(end, next_fut).await {
464+
Ok(res) => res,
465+
Err(_) => break,
466+
}
467+
} else {
468+
next_fut.await
469+
};
470+
471+
match next_res {
472+
Some(Ok(operation)) => {
473+
let is_done = operation.done;
474+
last_operation = operation;
475+
if is_done {
476+
break;
477+
}
478+
}
479+
Some(Err(e)) => return Err(e),
480+
None => break,
481+
}
482+
}
483+
484+
Ok(Response::new(last_operation))
485+
}
486+
}
487+
378488
#[cfg(test)]
379489
#[test]
380490
fn test_nl_op_id_from_name() -> Result<(), Box<dyn core::error::Error>> {

0 commit comments

Comments
 (0)