@@ -82,6 +82,9 @@ public class BazelOutputService implements OutputService {
8282 private final String remoteCache ;
8383 private final String remoteInstanceName ;
8484 private final String remoteOutputServiceOutputPathPrefix ;
85+ private final int maxOutboundMessageSize ;
86+ private final int stageArtifactsRequestArtifactWithoutPathSize ;
87+ private final int finalizeArtifactsRequestArtifactWithoutPathSize ;
8588 private final boolean verboseFailures ;
8689 private final RemoteRetrier retrier ;
8790 private final ReferenceCountedChannel channel ;
@@ -94,21 +97,27 @@ public BazelOutputService(
9497 Path outputBase ,
9598 Supplier <Path > execRootSupplier ,
9699 Supplier <Path > outputPathSupplier ,
97- DigestFunction . Value digestFunction ,
100+ DigestUtil digestUtil ,
98101 String remoteCache ,
99102 String remoteInstanceName ,
100103 String remoteOutputServiceOutputPathPrefix ,
104+ int maxOutboundMessageSize ,
101105 boolean verboseFailures ,
102106 RemoteRetrier retrier ,
103107 ReferenceCountedChannel channel ,
104108 @ Nullable String lastBuildId ) {
105109 this .outputBaseId = DigestUtil .hashCodeToString (md5 ().hashString (outputBase .toString (), UTF_8 ));
106110 this .execRootSupplier = execRootSupplier ;
107111 this .outputPathSupplier = outputPathSupplier ;
108- this .digestFunction = digestFunction ;
112+ this .digestFunction = digestUtil . getDigestFunction () ;
109113 this .remoteCache = remoteCache ;
110114 this .remoteInstanceName = remoteInstanceName ;
111115 this .remoteOutputServiceOutputPathPrefix = remoteOutputServiceOutputPathPrefix ;
116+ this .maxOutboundMessageSize = maxOutboundMessageSize ;
117+ this .stageArtifactsRequestArtifactWithoutPathSize =
118+ computeStageArtifactsRequestArtifactWithoutPathSize (digestUtil );
119+ this .finalizeArtifactsRequestArtifactWithoutPathSize =
120+ computeFinalizeArtifactsRequestArtifactWithoutPathSize (digestUtil );
112121 this .verboseFailures = verboseFailures ;
113122 this .retrier = retrier ;
114123 this .channel = channel ;
@@ -277,33 +286,63 @@ protected void stageArtifacts(List<FileMetadata> files) throws IOException, Inte
277286 var outputPath = outputPathSupplier .get ();
278287 var request = StageArtifactsRequest .newBuilder ();
279288 request .setBuildId (buildId );
280- for (var file : files ) {
281- request .addArtifacts (
282- StageArtifactsRequest .Artifact .newBuilder ()
283- .setPath (file .path ().relativeTo (outputPath ).toString ())
284- .setLocator (
285- Any .pack (FileArtifactLocator .newBuilder ().setDigest (file .digest ()).build ()))
286- .build ());
287- }
288- var response = stageArtifacts (request .build ());
289- if (response .getResponsesCount () != files .size ()) {
290- throw new IOException (
291- String .format (
292- "StageArtifacts failed: expect %s responses from StageArtifactsResponse, got %s" ,
293- files .size (), response .getResponsesCount ()));
294- }
295-
296- for (var i = 0 ; i < files .size (); ++i ) {
297- var fileResponse = response .getResponses (i );
298- if (fileResponse .getStatus ().getCode () != Status .Code .OK .value ()) {
289+ final int initialRequestSize = request .build ().getSerializedSize ();
290+
291+ // Split into batches to avoid exceeding gRPC message size limit.
292+ while (!files .isEmpty ()) {
293+ request .clearArtifacts ();
294+ int requestSize = initialRequestSize ;
295+ int endIdx ;
296+ for (endIdx = 0 ; endIdx < files .size (); ++endIdx ) {
297+ var file = files .get (endIdx );
298+ var path = file .path ().relativeTo (outputPath ).toString ();
299+ requestSize += stageArtifactsRequestArtifactWithoutPathSize + path .length ();
300+ if (endIdx > 0 && requestSize > maxOutboundMessageSize ) {
301+ break ;
302+ }
303+ request .addArtifacts (
304+ StageArtifactsRequest .Artifact .newBuilder ()
305+ .setPath (path )
306+ .setLocator (
307+ Any .pack (FileArtifactLocator .newBuilder ().setDigest (file .digest ()).build ()))
308+ .build ());
309+ }
310+ // Send this part of the list to avoid too big gRPC messages.
311+ var filesInRequest = files .subList (0 , endIdx );
312+ files = files .subList (endIdx , files .size ());
313+
314+ var response = stageArtifacts (request .build ());
315+ if (response .getResponsesCount () != filesInRequest .size ()) {
299316 throw new IOException (
300317 String .format (
301- "Failed to stage %s, code: %s" ,
302- files .get (i ).path ().relativeTo (outputPath ), fileResponse .getStatus ()));
318+ "StageArtifacts failed: expect %s responses from StageArtifactsResponse, got %s" ,
319+ filesInRequest .size (), response .getResponsesCount ()));
320+ }
321+
322+ for (var i = 0 ; i < filesInRequest .size (); ++i ) {
323+ var fileResponse = response .getResponses (i );
324+ if (fileResponse .getStatus ().getCode () != Status .Code .OK .value ()) {
325+ throw new IOException (
326+ String .format (
327+ "Failed to stage %s, code: %s" ,
328+ filesInRequest .get (i ).path ().relativeTo (outputPath ), fileResponse .getStatus ()));
329+ }
303330 }
304331 }
305332 }
306333
334+ private static int computeStageArtifactsRequestArtifactWithoutPathSize (DigestUtil digestUtil ) {
335+ // We assume all non-empty digests have the same size. This is true for fixed-length hashes.
336+ // To not underestimate, add a small overhead.
337+ final int overhead = 7 ;
338+ final int stageArtifactsRequestArtifactWithoutPathSize = StageArtifactsRequest .Artifact .newBuilder ()
339+ .setPath ("p" )
340+ .setLocator (Any .pack (FileArtifactLocator .newBuilder ().setDigest (digestUtil .compute (new byte [] {1 })).build ()))
341+ .build ()
342+ .getSerializedSize ();
343+ return overhead + stageArtifactsRequestArtifactWithoutPathSize ;
344+ }
345+
307346 private StageArtifactsResponse stageArtifacts (StageArtifactsRequest request )
308347 throws IOException , InterruptedException {
309348 return retrier .execute (
@@ -358,32 +397,81 @@ private FinalizeBuildResponse finalizeBuild(FinalizeBuildRequest request)
358397 }));
359398 }
360399
400+ private static int computeFinalizeArtifactsRequestArtifactWithoutPathSize (DigestUtil digestUtil ) {
401+ // We assume all non-empty digests have the same size. This is true for fixed-length hashes.
402+ // To not underestimate, add a small overhead.
403+ final int overhead = 7 ;
404+ final int finalizeArtifactsRequestArtifactWithoutPathSize = FinalizeArtifactsRequest .Artifact .newBuilder ()
405+ .setPath ("p" )
406+ .setLocator (Any .pack (FileArtifactLocator .newBuilder ().setDigest (digestUtil .compute (new byte [] {1 })).build ()))
407+ .build ()
408+ .getSerializedSize ();
409+ return overhead + finalizeArtifactsRequestArtifactWithoutPathSize ;
410+ }
411+
361412 @ Override
362413 public void finalizeAction (Action action , OutputMetadataStore outputMetadataStore )
363414 throws IOException , InterruptedException {
364415 var execRoot = execRootSupplier .get ();
365416 var outputPath = outputPathSupplier .get ();
366417
367- var request = FinalizeArtifactsRequest .newBuilder ();
368- request .setBuildId (buildId );
418+ // Send a partial lists to avoid too large messages.
419+ class RequestSizeLimitingActionFinalizer {
420+ FinalizeArtifactsRequest .Builder builder ;
421+ int requestSize ;
422+ final int initialRequestSize ;
423+
424+ public RequestSizeLimitingActionFinalizer () {
425+ builder = FinalizeArtifactsRequest .newBuilder ();
426+ builder .setBuildId (buildId );
427+ initialRequestSize = builder .build ().getSerializedSize ();
428+ requestSize = initialRequestSize ;
429+ }
430+
431+ public void addArtifact (Artifact output ) throws IOException , InterruptedException {
432+ checkState (!output .isTreeArtifact ());
433+ var metadata = outputMetadataStore .getOutputMetadata (output );
434+ if (metadata .getType ().isFile ()) {
435+ var digest = DigestUtil .buildDigest (metadata .getDigest (), metadata .getSize ());
436+ var path = execRoot .getRelative (output .getExecPath ()).relativeTo (outputPath ).toString ();
437+ final int entrySize = finalizeArtifactsRequestArtifactWithoutPathSize + path .length ();
438+ if (requestSize + entrySize > maxOutboundMessageSize ) {
439+ // Send a partial list to avoid too large messages.
440+ sendPendingRequest ();
441+ }
442+ requestSize += entrySize ;
443+ builder .addArtifacts (
444+ FinalizeArtifactsRequest .Artifact .newBuilder ()
445+ .setPath (path )
446+ .setLocator (Any .pack (FileArtifactLocator .newBuilder ().setDigest (digest ).build ()))
447+ .build ());
448+ }
449+ }
450+
451+ public void sendPendingRequest () throws IOException , InterruptedException {
452+ var unused = finalizeArtifacts (builder .build ());
453+ builder .clearArtifacts ();
454+ requestSize = initialRequestSize ;
455+ }
456+ }
457+
458+ var request = new RequestSizeLimitingActionFinalizer ();
369459 for (var output : action .getOutputs ()) {
370460 if (outputMetadataStore .artifactOmitted (output )) {
371461 continue ;
372462 }
373-
374463 if (output .isTreeArtifact ()) {
375464 // TODO(chiwang): Use TreeArtifactLocator
376465 var children =
377466 outputMetadataStore .getTreeArtifactValue ((SpecialArtifact ) output ).getChildren ();
378467 for (var child : children ) {
379- addArtifact (outputMetadataStore , execRoot , outputPath , request , child );
468+ request . addArtifact (child );
380469 }
381470 } else {
382- addArtifact (outputMetadataStore , execRoot , outputPath , request , output );
471+ request . addArtifact (output );
383472 }
384473 }
385-
386- var unused = finalizeArtifacts (request .build ());
474+ request .sendPendingRequest ();
387475 }
388476
389477 private FinalizeArtifactsResponse finalizeArtifacts (FinalizeArtifactsRequest request )
@@ -402,26 +490,6 @@ private FinalizeArtifactsResponse finalizeArtifacts(FinalizeArtifactsRequest req
402490 }));
403491 }
404492
405- private static void addArtifact (
406- OutputMetadataStore outputMetadataStore ,
407- Path execRoot ,
408- Path outputPath ,
409- FinalizeArtifactsRequest .Builder builder ,
410- Artifact output )
411- throws IOException , InterruptedException {
412- checkState (!output .isTreeArtifact ());
413- var metadata = outputMetadataStore .getOutputMetadata (output );
414- if (metadata .getType ().isFile ()) {
415- var digest = DigestUtil .buildDigest (metadata .getDigest (), metadata .getSize ());
416- var path = execRoot .getRelative (output .getExecPath ()).relativeTo (outputPath ).toString ();
417- builder .addArtifacts (
418- FinalizeArtifactsRequest .Artifact .newBuilder ()
419- .setPath (path )
420- .setLocator (Any .pack (FileArtifactLocator .newBuilder ().setDigest (digest ).build ()))
421- .build ());
422- }
423- }
424-
425493 private record BazelOutputServiceFile (Digest digest ) implements FileStatusWithDigest {
426494 @ Override
427495 public boolean isFile () {
0 commit comments