Skip to content

Commit 29ec878

Browse files
committed
Use data payloads for inputs and outputs in Executor gRPC protocol definitions
Server side report task outcome RPC is updated to work with previous and new Executor behaviors when calling this RPC.
1 parent d574652 commit 29ec878

File tree

2 files changed

+79
-31
lines changed

2 files changed

+79
-31
lines changed

server/proto/executor_api.proto

+34-11
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,28 @@ syntax = "proto3";
44
// Existing clients won't find the service if the package name changes.
55
package executor_api_pb;
66

7+
// ===== DataPayload =====
8+
enum DataPayloadEncoding {
9+
DATA_PAYLOAD_ENCODING_UNKNOWN = 0;
10+
// These encodings are currently mapping 1:1 to mime types.
11+
// TODO: use SDK specific encodings becase 1:1 mapping might not work in the future.
12+
DATA_PAYLOAD_ENCODING_UTF8_JSON = 1;
13+
DATA_PAYLOAD_ENCODING_UTF8_TEXT = 2;
14+
DATA_PAYLOAD_ENCODING_BINARY_PICKLE = 3;
15+
}
16+
17+
message DataPayload {
18+
optional string path = 1; // deprecated, TODO: remove when URI us used everywhere
19+
optional uint64 size = 2;
20+
optional string sha256_hash = 3;
21+
// URI of the data.
22+
// S3 URI if the data is stored in S3.
23+
// Starts with "file://"" prefix if the data is stored on a local file system.
24+
optional string uri = 4;
25+
optional DataPayloadEncoding encoding = 5;
26+
optional uint64 encoding_version = 6;
27+
}
28+
729
// ===== report_executor_state RPC =====
830

931
enum GPUModel {
@@ -72,6 +94,7 @@ message FunctionExecutorDescription {
7294
optional HostResources resource_limits = 8;
7395
// Timeout for customer code duration during FE creation.
7496
optional uint32 customer_code_timeout_ms = 9;
97+
optional DataPayload graph = 10;
7598
}
7699

77100
message FunctionExecutorState {
@@ -134,9 +157,15 @@ message Task {
134157
optional string graph_version = 4;
135158
optional string function_name = 5;
136159
optional string graph_invocation_id = 6;
137-
optional string input_key = 8;
138-
optional string reducer_output_key = 9;
160+
optional string input_key = 8; // deprecated. TODO: remove when input is used everywhere
161+
optional string reducer_output_key = 9; // deprecated. TODO: remove when reducer_input is used everywhere
139162
optional uint32 timeout_ms = 10;
163+
optional DataPayload input = 11;
164+
optional DataPayload reducer_input = 12;
165+
// URI prefix for the output payloads.
166+
// S3 URI if the data is stored in S3.
167+
// Starts with "file://"" prefix followed by an absolute directory path if the data is stored on a local file system.
168+
optional string output_payload_uri_prefix = 13;
140169
}
141170

142171
message TaskAllocation {
@@ -166,12 +195,6 @@ enum TaskOutcome {
166195
TASK_OUTCOME_FAILURE = 2;
167196
}
168197

169-
message DataPayload {
170-
optional string path = 1;
171-
optional uint64 size = 2;
172-
optional string sha256_hash = 3;
173-
}
174-
175198
enum OutputEncoding {
176199
OUTPUT_ENCODING_UNKNOWN = 0;
177200
OUTPUT_ENCODING_JSON = 1;
@@ -186,7 +209,7 @@ message ReportTaskOutcomeRequest {
186209
optional string function_name = 4;
187210
optional string graph_invocation_id = 6;
188211
optional TaskOutcome outcome = 7;
189-
optional string invocation_id = 8;
212+
optional string invocation_id = 8; // deprecated. TODO: remove when graph_invocation_id is used everywhere
190213
optional string executor_id = 9;
191214
optional bool reducer = 10;
192215

@@ -199,10 +222,10 @@ message ReportTaskOutcomeRequest {
199222
optional DataPayload stdout = 14;
200223
optional DataPayload stderr = 15;
201224
// Output encoding of all the outputs of a function have to be same.
202-
optional OutputEncoding output_encoding = 13;
225+
optional OutputEncoding output_encoding = 13; // deprecated. TODO: remove when DataPayload.encoding is used everywhere
203226
// This allows us to change how we encode the output from functions
204227
// and serialize them into storage.
205-
optional uint64 output_encoding_version = 5;
228+
optional uint64 output_encoding_version = 5; // deprecated. TODO: remove when DataPayload.encoding_version is used everywhere
206229
}
207230

208231
message ReportTaskOutcomeResponse {

server/src/executor_api.rs

+45-20
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use data_model::{
2727
use executor_api_pb::{
2828
executor_api_server::ExecutorApi,
2929
AllowedFunction,
30+
DataPayloadEncoding,
3031
DesiredExecutorState,
3132
ExecutorState,
3233
ExecutorStatus,
@@ -361,9 +362,9 @@ impl ExecutorApi for ExecutorAPIService {
361362
.ok_or(Status::invalid_argument("compute_fn is required"))?;
362363
let invocation_id = request
363364
.get_ref()
364-
.invocation_id
365+
.graph_invocation_id
365366
.clone()
366-
.ok_or(Status::invalid_argument("invocation_id is required"))?;
367+
.ok_or(Status::invalid_argument("graph_invocation_id is required"))?;
367368
let task = self
368369
.indexify_state
369370
.reader()
@@ -375,22 +376,10 @@ impl ExecutorApi for ExecutorAPIService {
375376
&task_id,
376377
)
377378
.map_err(|e| Status::internal(e.to_string()))?;
378-
let output_encoding = request
379-
.get_ref()
380-
.output_encoding
381-
.ok_or(Status::invalid_argument("output_encoding is required"))?;
382379
if task.is_none() {
383380
warn!("Task not found for task_id: {}", task_id);
384381
return Ok(Response::new(ReportTaskOutcomeResponse {}));
385382
}
386-
let encoding = OutputEncoding::try_from(output_encoding)
387-
.map_err(|e| Status::invalid_argument(e.to_string()))?;
388-
let encoding_str = match encoding {
389-
OutputEncoding::Json => "application/json",
390-
OutputEncoding::Pickle => "application/octet-stream",
391-
OutputEncoding::Binary => "application/octet-stream",
392-
OutputEncoding::Unknown => "unknown",
393-
};
394383
let mut task = task.unwrap();
395384
match task_outcome {
396385
executor_api_pb::TaskOutcome::Success => {
@@ -410,7 +399,8 @@ impl ExecutorApi for ExecutorAPIService {
410399
for output in request.get_ref().fn_outputs.clone() {
411400
let path = output
412401
.path
413-
.ok_or(Status::invalid_argument("path is required"))?;
402+
.or(output.uri)
403+
.ok_or(Status::invalid_argument("path or uri is required"))?;
414404
let size = output
415405
.size
416406
.ok_or(Status::invalid_argument("size is required"))?;
@@ -422,6 +412,39 @@ impl ExecutorApi for ExecutorAPIService {
422412
size,
423413
sha256_hash,
424414
};
415+
let encoding_str = match output.encoding {
416+
Some(value) => {
417+
let output_encoding = DataPayloadEncoding::try_from(value)
418+
.map_err(|e| Status::invalid_argument(e.to_string()))?;
419+
match output_encoding {
420+
DataPayloadEncoding::Utf8Json => Ok("application/json"),
421+
DataPayloadEncoding::BinaryPickle => Ok("application/octet-stream"),
422+
DataPayloadEncoding::Utf8Text => Ok("text/plain"),
423+
DataPayloadEncoding::Unknown => {
424+
Err(Status::invalid_argument("unknown data payload encoding"))
425+
}
426+
}
427+
}
428+
// Fallback to the deprecated request encoding if not set
429+
None => match request.get_ref().output_encoding {
430+
Some(value) => {
431+
let output_encoding = OutputEncoding::try_from(value)
432+
.map_err(|e| Status::invalid_argument(e.to_string()))?;
433+
match output_encoding {
434+
OutputEncoding::Json => Ok("application/json"),
435+
OutputEncoding::Pickle => Ok("application/octet-stream"),
436+
OutputEncoding::Binary => Ok("application/octet-stream"),
437+
OutputEncoding::Unknown => {
438+
Err(Status::invalid_argument("unknown request output encoding"))
439+
}
440+
}
441+
}
442+
None => Err(Status::invalid_argument(
443+
"data payload encoding or request output encoding is required",
444+
)),
445+
},
446+
}?;
447+
425448
let node_output = NodeOutputBuilder::default()
426449
.namespace(namespace.to_string())
427450
.compute_graph_name(compute_graph.to_string())
@@ -481,18 +504,20 @@ fn prepare_data_payload(
481504
if msg.is_none() {
482505
return None;
483506
}
484-
if msg.as_ref().unwrap().path.as_ref().is_none() {
507+
let msg = msg.unwrap();
508+
if msg.uri.is_none() && msg.path.is_none() {
485509
return None;
486510
}
487-
if msg.as_ref().unwrap().size.as_ref().is_none() {
511+
if msg.size.as_ref().is_none() {
488512
return None;
489513
}
490-
if msg.as_ref().unwrap().sha256_hash.as_ref().is_none() {
514+
if msg.sha256_hash.as_ref().is_none() {
491515
return None;
492516
}
493-
let msg = msg.unwrap();
517+
494518
Some(data_model::DataPayload {
495-
path: msg.path.unwrap(),
519+
// Fallback to deprecated path if uri is not set.
520+
path: msg.uri.or(msg.path).unwrap(),
496521
size: msg.size.unwrap(),
497522
sha256_hash: msg.sha256_hash.unwrap(),
498523
})

0 commit comments

Comments
 (0)