Skip to content

Commit 7eebe66

Browse files
committed
Use TaskDeduplicator in MerkleTreeComputer
1 parent aae849c commit 7eebe66

File tree

5 files changed

+497
-97
lines changed

5 files changed

+497
-97
lines changed

src/main/java/com/google/devtools/build/lib/remote/merkletree/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ java_library(
2121
"//src/main/java/com/google/devtools/build/lib/actions:file_metadata",
2222
"//src/main/java/com/google/devtools/build/lib/actions:runfiles_metadata",
2323
"//src/main/java/com/google/devtools/build/lib/collect/nestedset",
24+
"//src/main/java/com/google/devtools/build/lib/concurrent:task_deduplicator",
2425
"//src/main/java/com/google/devtools/build/lib/exec:spawn_runner",
2526
"//src/main/java/com/google/devtools/build/lib/profiler",
2627
"//src/main/java/com/google/devtools/build/lib/remote:scrubber",

src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeComputer.java

Lines changed: 83 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import static com.google.common.collect.ImmutableList.toImmutableList;
2121
import static com.google.common.util.concurrent.Futures.allAsList;
2222
import static com.google.common.util.concurrent.Futures.immediateFuture;
23-
import static com.google.common.util.concurrent.Futures.submitAsync;
2423
import static com.google.common.util.concurrent.Futures.transform;
2524
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
2625
import static com.google.devtools.build.lib.util.StringEncoding.internalToUnicode;
@@ -43,6 +42,7 @@
4342
import com.google.common.collect.Iterables;
4443
import com.google.common.collect.Iterators;
4544
import com.google.common.collect.Lists;
45+
import com.google.common.util.concurrent.AsyncCallable;
4646
import com.google.common.util.concurrent.ListenableFuture;
4747
import com.google.devtools.build.lib.actions.ActionInput;
4848
import com.google.devtools.build.lib.actions.ActionInputHelper;
@@ -58,6 +58,7 @@
5858
import com.google.devtools.build.lib.actions.StaticInputMetadataProvider;
5959
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
6060
import com.google.devtools.build.lib.collect.nestedset.NestedSet;
61+
import com.google.devtools.build.lib.concurrent.TaskDeduplicator;
6162
import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
6263
import com.google.devtools.build.lib.profiler.Profiler;
6364
import com.google.devtools.build.lib.profiler.SilentCloseable;
@@ -87,12 +88,10 @@
8788
import java.util.Objects;
8889
import java.util.Set;
8990
import java.util.concurrent.CancellationException;
90-
import java.util.concurrent.ConcurrentHashMap;
9191
import java.util.concurrent.ExecutionException;
9292
import java.util.concurrent.ExecutorService;
9393
import java.util.concurrent.Executors;
9494
import java.util.concurrent.Future;
95-
import java.util.concurrent.atomic.AtomicBoolean;
9695
import java.util.function.Predicate;
9796
import javax.annotation.Nullable;
9897

@@ -174,8 +173,8 @@ public final class MerkleTreeComputer {
174173
private final String workspaceName;
175174
private final Digest emptyDigest;
176175
private final MerkleTree.Uploadable emptyTree;
177-
private final ConcurrentHashMap<InFlightCacheKey, ListenableFuture<MerkleTree.RootOnly>>
178-
inFlightSubTreeCache = new ConcurrentHashMap<>();
176+
private final TaskDeduplicator<InFlightCacheKey, MerkleTree.RootOnly> inFlightComputations =
177+
new TaskDeduplicator<>();
179178

180179
public MerkleTreeComputer(
181180
DigestUtil digestUtil,
@@ -282,7 +281,6 @@ private MerkleTree doBuildForSpawn(
282281
if (!Objects.equals(scrubber, lastScrubber)) {
283282
persistentToolSubTreeCache.invalidateAll();
284283
persistentNonToolSubTreeCache.invalidateAll();
285-
inFlightSubTreeCache.clear();
286284
lastScrubber = scrubber;
287285
}
288286
}
@@ -848,99 +846,87 @@ private ListenableFuture<MerkleTree.RootOnly> computeIfAbsent(
848846
return immediateFuture(cachedRoot);
849847
}
850848
}
851-
var inFlightCacheKey = new InFlightCacheKey(metadata, isTool, blobPolicy != BlobPolicy.DISCARD);
852-
if (blobPolicy == BlobPolicy.KEEP_AND_REUPLOAD) {
853-
inFlightSubTreeCache.remove(inFlightCacheKey);
854-
}
855-
var newlyComputed = new AtomicBoolean();
856-
var future =
857-
inFlightSubTreeCache.computeIfAbsent(
858-
inFlightCacheKey,
859-
unusedKey -> {
860-
newlyComputed.set(true);
861-
return submitAsync(
862-
() -> {
863-
// There is a window in which a concurrent call may have removed the in-flight
864-
// cache entry while this one had already passed the check above. Recheck the
865-
// persistent cache to avoid unnecessary work.
866-
var cachedRoot = persistentCache.getIfPresent(metadata);
867-
if (cachedRoot != null
868-
&& (blobPolicy == BlobPolicy.DISCARD
869-
|| cachedRoot instanceof MerkleTree.RootOnly.BlobsUploaded)) {
870-
return immediateFuture(cachedRoot);
871-
}
872-
// An ongoing computation with blobs can be reused for one that doesn't require
873-
// them.
874-
if (blobPolicy == BlobPolicy.DISCARD) {
875-
var inFlightComputation =
876-
inFlightSubTreeCache.get(
877-
new InFlightCacheKey(metadata, isTool, /* uploadBlobs= */ true));
878-
if (inFlightComputation != null) {
879-
return inFlightComputation;
880-
}
881-
}
882-
ListenableFuture<MerkleTree> merkleTreeFuture;
883-
try {
884-
// Subtrees either consist entirely of tool inputs or don't contain any.
885-
// The same applies to scrubbed inputs.
886-
merkleTreeFuture =
887-
build(
888-
sortedInputsSupplier.compute(),
889-
isTool ? alwaysTrue() : alwaysFalse(),
890-
/* spawnScrubber= */ null,
891-
metadataProvider,
892-
artifactPathResolver,
893-
remoteActionExecutionContext,
894-
remotePathResolver,
895-
blobPolicy);
896-
} catch (IOException e) {
897-
throw new WrappedException(e);
898-
} catch (InterruptedException e) {
899-
throw new WrappedException(e);
849+
var key = new InFlightCacheKey(metadata, isTool, blobPolicy != BlobPolicy.DISCARD);
850+
AsyncCallable<MerkleTree.RootOnly> doCompute =
851+
() -> {
852+
// There is a window in which a concurrent call may have removed the in-flight
853+
// cache entry while this one had already passed the check above. Recheck the
854+
// persistent cache to avoid unnecessary work.
855+
var cachedRoot = persistentCache.getIfPresent(metadata);
856+
if (cachedRoot != null
857+
&& (blobPolicy == BlobPolicy.DISCARD
858+
|| cachedRoot instanceof MerkleTree.RootOnly.BlobsUploaded)) {
859+
return immediateFuture(cachedRoot);
860+
}
861+
// An ongoing computation with blobs can be reused for one that doesn't require
862+
// them.
863+
if (blobPolicy == BlobPolicy.DISCARD) {
864+
var inFlightComputation =
865+
inFlightComputations.maybeJoinExecution(
866+
new InFlightCacheKey(metadata, isTool, /* uploadBlobs= */ true));
867+
if (inFlightComputation != null) {
868+
return inFlightComputation;
869+
}
870+
}
871+
ListenableFuture<MerkleTree> merkleTreeFuture;
872+
try {
873+
// Subtrees either consist entirely of tool inputs or don't contain any.
874+
// The same applies to scrubbed inputs.
875+
merkleTreeFuture =
876+
build(
877+
sortedInputsSupplier.compute(),
878+
isTool ? alwaysTrue() : alwaysFalse(),
879+
/* spawnScrubber= */ null,
880+
metadataProvider,
881+
artifactPathResolver,
882+
remoteActionExecutionContext,
883+
remotePathResolver,
884+
blobPolicy);
885+
} catch (IOException e) {
886+
throw new WrappedException(e);
887+
} catch (InterruptedException e) {
888+
throw new WrappedException(e);
889+
}
890+
return transform(
891+
merkleTreeFuture,
892+
merkleTree -> {
893+
if (merkleTree instanceof MerkleTree.Uploadable uploadable) {
894+
try {
895+
if (merkleTreeUploader != null) {
896+
merkleTreeUploader.ensureInputsPresent(
897+
remoteActionExecutionContext,
898+
uploadable,
899+
blobPolicy == BlobPolicy.KEEP_AND_REUPLOAD,
900+
remotePathResolver);
900901
}
901-
return transform(
902-
merkleTreeFuture,
903-
merkleTree -> {
904-
if (merkleTree instanceof MerkleTree.Uploadable uploadable) {
905-
try {
906-
if (merkleTreeUploader != null) {
907-
merkleTreeUploader.ensureInputsPresent(
908-
remoteActionExecutionContext,
909-
uploadable,
910-
blobPolicy == BlobPolicy.KEEP_AND_REUPLOAD,
911-
remotePathResolver);
912-
}
913-
} catch (IOException e) {
914-
throw new WrappedException(e);
915-
} catch (InterruptedException e) {
916-
throw new WrappedException(e);
917-
}
918-
}
919-
// Move the computed root to the persistent cache so that it can be reused
920-
// by later builds.
921-
persistentCache
922-
.asMap()
923-
.compute(
924-
metadata,
925-
(unused, oldRoot) -> {
926-
// Don't downgrade the cached root from one indicating that its
927-
// blobs have been uploaded.
928-
return oldRoot instanceof MerkleTree.RootOnly.BlobsUploaded
929-
? oldRoot
930-
: merkleTree.root();
931-
});
932-
return merkleTree.root();
933-
},
934-
MERKLE_TREE_UPLOAD_POOL);
935-
},
936-
MERKLE_TREE_BUILD_POOL);
937-
});
938-
if (newlyComputed.get()) {
939-
// Clean up the in-flight cache so that it doesn't grow unboundedly.
940-
future.addListener(
941-
() -> inFlightSubTreeCache.remove(inFlightCacheKey, future), directExecutor());
902+
} catch (IOException e) {
903+
throw new WrappedException(e);
904+
} catch (InterruptedException e) {
905+
throw new WrappedException(e);
906+
}
907+
}
908+
// Move the computed root to the persistent cache so that it can be reused
909+
// by later builds.
910+
persistentCache
911+
.asMap()
912+
.compute(
913+
metadata,
914+
(unused, oldRoot) -> {
915+
// Don't downgrade the cached root from one indicating that its
916+
// blobs have been uploaded.
917+
return oldRoot instanceof MerkleTree.RootOnly.BlobsUploaded
918+
? oldRoot
919+
: merkleTree.root();
920+
});
921+
return merkleTree.root();
922+
},
923+
MERKLE_TREE_UPLOAD_POOL);
924+
};
925+
if (blobPolicy == BlobPolicy.KEEP_AND_REUPLOAD) {
926+
return inFlightComputations.executeUnconditionally(key, doCompute, MERKLE_TREE_BUILD_POOL);
927+
} else {
928+
return inFlightComputations.executeIfNeeded(key, doCompute, MERKLE_TREE_BUILD_POOL);
942929
}
943-
return future;
944930
}
945931

946932
private static <T> T getFromFuture(Future<T> future) throws IOException, InterruptedException {

src/test/java/com/google/devtools/build/lib/remote/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ filegroup(
1717
"//src/test/java/com/google/devtools/build/lib/remote/grpc:srcs",
1818
"//src/test/java/com/google/devtools/build/lib/remote/http:srcs",
1919
"//src/test/java/com/google/devtools/build/lib/remote/logging:srcs",
20+
"//src/test/java/com/google/devtools/build/lib/remote/merkletree:srcs",
2021
"//src/test/java/com/google/devtools/build/lib/remote/options:srcs",
2122
"//src/test/java/com/google/devtools/build/lib/remote/util:srcs",
2223
"//src/test/java/com/google/devtools/build/lib/remote/zstd:srcs",
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
load("@rules_java//java:defs.bzl", "java_test")
2+
3+
package(
4+
default_applicable_licenses = ["//:license"],
5+
default_testonly = 1,
6+
default_visibility = ["//src:__subpackages__"],
7+
)
8+
9+
filegroup(
10+
name = "srcs",
11+
testonly = 0,
12+
srcs = glob(["**"]),
13+
visibility = ["//src:__subpackages__"],
14+
)
15+
16+
java_test(
17+
name = "merkletree",
18+
srcs = glob(["*.java"]),
19+
test_class = "com.google.devtools.build.lib.AllTests",
20+
deps = [
21+
"//src/main/java/com/google/devtools/build/lib/actions",
22+
"//src/main/java/com/google/devtools/build/lib/actions:artifacts",
23+
"//src/main/java/com/google/devtools/build/lib/actions:file_metadata",
24+
"//src/main/java/com/google/devtools/build/lib/actions:fileset_output_tree",
25+
"//src/main/java/com/google/devtools/build/lib/actions:runfiles_metadata",
26+
"//src/main/java/com/google/devtools/build/lib/actions:runfiles_tree",
27+
"//src/main/java/com/google/devtools/build/lib/clock",
28+
"//src/main/java/com/google/devtools/build/lib/remote/common",
29+
"//src/main/java/com/google/devtools/build/lib/remote/merkletree",
30+
"//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils",
31+
"//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value",
32+
"//src/main/java/com/google/devtools/build/lib/util/io",
33+
"//src/main/java/com/google/devtools/build/lib/vfs",
34+
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
35+
"//src/main/java/com/google/devtools/build/lib/vfs/inmemoryfs",
36+
"//src/test/java/com/google/devtools/build/lib:test_runner",
37+
"//src/test/java/com/google/devtools/build/lib/actions/util",
38+
"//src/test/java/com/google/devtools/build/lib/exec/util",
39+
"//src/test/java/com/google/devtools/build/lib/remote/util",
40+
"//third_party:guava",
41+
"//third_party:junit4",
42+
"//third_party:truth",
43+
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
44+
],
45+
)

0 commit comments

Comments
 (0)