Skip to content

Commit be01f76

Browse files
committed
Use TaskDeduplicator in MerkleTreeComputer
1 parent b28ea5f commit be01f76

File tree

5 files changed

+500
-97
lines changed

5 files changed

+500
-97
lines changed

src/main/java/com/google/devtools/build/lib/remote/merkletree/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ java_library(
2121
"//src/main/java/com/google/devtools/build/lib/actions:file_metadata",
2222
"//src/main/java/com/google/devtools/build/lib/actions:runfiles_metadata",
2323
"//src/main/java/com/google/devtools/build/lib/collect/nestedset",
24+
"//src/main/java/com/google/devtools/build/lib/concurrent:task_deduplicator",
2425
"//src/main/java/com/google/devtools/build/lib/exec:spawn_runner",
2526
"//src/main/java/com/google/devtools/build/lib/profiler",
2627
"//src/main/java/com/google/devtools/build/lib/remote:scrubber",

src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeComputer.java

Lines changed: 86 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import static com.google.common.collect.ImmutableList.toImmutableList;
2121
import static com.google.common.util.concurrent.Futures.allAsList;
2222
import static com.google.common.util.concurrent.Futures.immediateFuture;
23-
import static com.google.common.util.concurrent.Futures.submitAsync;
2423
import static com.google.common.util.concurrent.Futures.transform;
2524
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
2625
import static com.google.devtools.build.lib.util.StringEncoding.internalToUnicode;
@@ -43,6 +42,8 @@
4342
import com.google.common.collect.Iterables;
4443
import com.google.common.collect.Iterators;
4544
import com.google.common.collect.Lists;
45+
import com.google.common.util.concurrent.AsyncCallable;
46+
import com.google.common.util.concurrent.Futures;
4647
import com.google.common.util.concurrent.ListenableFuture;
4748
import com.google.devtools.build.lib.actions.ActionInput;
4849
import com.google.devtools.build.lib.actions.ActionInputHelper;
@@ -58,6 +59,7 @@
5859
import com.google.devtools.build.lib.actions.StaticInputMetadataProvider;
5960
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
6061
import com.google.devtools.build.lib.collect.nestedset.NestedSet;
62+
import com.google.devtools.build.lib.concurrent.TaskDeduplicator;
6163
import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
6264
import com.google.devtools.build.lib.profiler.Profiler;
6365
import com.google.devtools.build.lib.profiler.SilentCloseable;
@@ -87,13 +89,12 @@
8789
import java.util.Objects;
8890
import java.util.Set;
8991
import java.util.concurrent.CancellationException;
90-
import java.util.concurrent.ConcurrentHashMap;
9192
import java.util.concurrent.ExecutionException;
9293
import java.util.concurrent.ExecutorService;
9394
import java.util.concurrent.Executors;
9495
import java.util.concurrent.Future;
95-
import java.util.concurrent.atomic.AtomicBoolean;
9696
import java.util.function.Predicate;
97+
import java.util.function.Supplier;
9798
import javax.annotation.Nullable;
9899

99100
/**
@@ -174,8 +175,8 @@ public final class MerkleTreeComputer {
174175
private final String workspaceName;
175176
private final Digest emptyDigest;
176177
private final MerkleTree.Uploadable emptyTree;
177-
private final ConcurrentHashMap<InFlightCacheKey, ListenableFuture<MerkleTree.RootOnly>>
178-
inFlightSubTreeCache = new ConcurrentHashMap<>();
178+
private final TaskDeduplicator<InFlightCacheKey, MerkleTree.RootOnly> inFlightComputations =
179+
new TaskDeduplicator<>();
179180

180181
public MerkleTreeComputer(
181182
DigestUtil digestUtil,
@@ -282,7 +283,6 @@ private MerkleTree doBuildForSpawn(
282283
if (!Objects.equals(scrubber, lastScrubber)) {
283284
persistentToolSubTreeCache.invalidateAll();
284285
persistentNonToolSubTreeCache.invalidateAll();
285-
inFlightSubTreeCache.clear();
286286
lastScrubber = scrubber;
287287
}
288288
}
@@ -848,99 +848,88 @@ private ListenableFuture<MerkleTree.RootOnly> computeIfAbsent(
848848
return immediateFuture(cachedRoot);
849849
}
850850
}
851-
var inFlightCacheKey = new InFlightCacheKey(metadata, isTool, blobPolicy != BlobPolicy.DISCARD);
852-
if (blobPolicy == BlobPolicy.KEEP_AND_REUPLOAD) {
853-
inFlightSubTreeCache.remove(inFlightCacheKey);
854-
}
855-
var newlyComputed = new AtomicBoolean();
856-
var future =
857-
inFlightSubTreeCache.computeIfAbsent(
858-
inFlightCacheKey,
859-
unusedKey -> {
860-
newlyComputed.set(true);
861-
return submitAsync(
862-
() -> {
863-
// There is a window in which a concurrent call may have removed the in-flight
864-
// cache entry while this one had already passed the check above. Recheck the
865-
// persistent cache to avoid unnecessary work.
866-
var cachedRoot = persistentCache.getIfPresent(metadata);
867-
if (cachedRoot != null
868-
&& (blobPolicy == BlobPolicy.DISCARD
869-
|| cachedRoot instanceof MerkleTree.RootOnly.BlobsUploaded)) {
870-
return immediateFuture(cachedRoot);
871-
}
872-
// An ongoing computation with blobs can be reused for one that doesn't require
873-
// them.
874-
if (blobPolicy == BlobPolicy.DISCARD) {
875-
var inFlightComputation =
876-
inFlightSubTreeCache.get(
877-
new InFlightCacheKey(metadata, isTool, /* uploadBlobs= */ true));
878-
if (inFlightComputation != null) {
879-
return inFlightComputation;
880-
}
881-
}
882-
ListenableFuture<MerkleTree> merkleTreeFuture;
883-
try {
884-
// Subtrees either consist entirely of tool inputs or don't contain any.
885-
// The same applies to scrubbed inputs.
886-
merkleTreeFuture =
887-
build(
888-
sortedInputsSupplier.compute(),
889-
isTool ? alwaysTrue() : alwaysFalse(),
890-
/* spawnScrubber= */ null,
891-
metadataProvider,
892-
artifactPathResolver,
893-
remoteActionExecutionContext,
894-
remotePathResolver,
895-
blobPolicy);
896-
} catch (IOException e) {
897-
throw new WrappedException(e);
898-
} catch (InterruptedException e) {
899-
throw new WrappedException(e);
851+
var key = new InFlightCacheKey(metadata, isTool, blobPolicy != BlobPolicy.DISCARD);
852+
AsyncCallable<MerkleTree.RootOnly> buildMerkleTreeTask =
853+
() -> {
854+
// There is a window in which a concurrent call may have removed the in-flight cache entry
855+
// while this one had already passed the check above. Recheck the persistent cache to
856+
// avoid unnecessary work.
857+
var cachedRoot = persistentCache.getIfPresent(metadata);
858+
if (cachedRoot != null
859+
&& (blobPolicy == BlobPolicy.DISCARD
860+
|| cachedRoot instanceof MerkleTree.RootOnly.BlobsUploaded)) {
861+
return immediateFuture(cachedRoot);
862+
}
863+
// An ongoing computation with blobs can be reused for one that doesn't require them.
864+
if (blobPolicy == BlobPolicy.DISCARD) {
865+
var inFlightComputation =
866+
inFlightComputations.maybeJoinExecution(
867+
new InFlightCacheKey(metadata, isTool, /* uploadBlobs= */ true));
868+
if (inFlightComputation != null) {
869+
return inFlightComputation;
870+
}
871+
}
872+
ListenableFuture<MerkleTree> merkleTreeFuture;
873+
try {
874+
// Subtrees either consist entirely of tool inputs or don't contain any. The same
875+
// applies to scrubbed inputs.
876+
merkleTreeFuture =
877+
build(
878+
sortedInputsSupplier.compute(),
879+
isTool ? alwaysTrue() : alwaysFalse(),
880+
/* spawnScrubber= */ null,
881+
metadataProvider,
882+
artifactPathResolver,
883+
remoteActionExecutionContext,
884+
remotePathResolver,
885+
blobPolicy);
886+
} catch (IOException e) {
887+
throw new WrappedException(e);
888+
} catch (InterruptedException e) {
889+
throw new WrappedException(e);
890+
}
891+
return transform(
892+
merkleTreeFuture,
893+
merkleTree -> {
894+
if (merkleTree instanceof MerkleTree.Uploadable uploadable) {
895+
try {
896+
if (merkleTreeUploader != null) {
897+
merkleTreeUploader.ensureInputsPresent(
898+
remoteActionExecutionContext,
899+
uploadable,
900+
blobPolicy == BlobPolicy.KEEP_AND_REUPLOAD,
901+
remotePathResolver);
900902
}
901-
return transform(
902-
merkleTreeFuture,
903-
merkleTree -> {
904-
if (merkleTree instanceof MerkleTree.Uploadable uploadable) {
905-
try {
906-
if (merkleTreeUploader != null) {
907-
merkleTreeUploader.ensureInputsPresent(
908-
remoteActionExecutionContext,
909-
uploadable,
910-
blobPolicy == BlobPolicy.KEEP_AND_REUPLOAD,
911-
remotePathResolver);
912-
}
913-
} catch (IOException e) {
914-
throw new WrappedException(e);
915-
} catch (InterruptedException e) {
916-
throw new WrappedException(e);
917-
}
918-
}
919-
// Move the computed root to the persistent cache so that it can be reused
920-
// by later builds.
921-
persistentCache
922-
.asMap()
923-
.compute(
924-
metadata,
925-
(unused, oldRoot) -> {
926-
// Don't downgrade the cached root from one indicating that its
927-
// blobs have been uploaded.
928-
return oldRoot instanceof MerkleTree.RootOnly.BlobsUploaded
929-
? oldRoot
930-
: merkleTree.root();
931-
});
932-
return merkleTree.root();
933-
},
934-
MERKLE_TREE_UPLOAD_POOL);
935-
},
936-
MERKLE_TREE_BUILD_POOL);
937-
});
938-
if (newlyComputed.get()) {
939-
// Clean up the in-flight cache so that it doesn't grow unboundedly.
940-
future.addListener(
941-
() -> inFlightSubTreeCache.remove(inFlightCacheKey, future), directExecutor());
903+
} catch (IOException e) {
904+
throw new WrappedException(e);
905+
} catch (InterruptedException e) {
906+
throw new WrappedException(e);
907+
}
908+
}
909+
// Move the computed root to the persistent cache so that it can be reused by later
910+
// builds.
911+
persistentCache
912+
.asMap()
913+
.compute(
914+
metadata,
915+
(unused, oldRoot) -> {
916+
// Don't downgrade the cached root from one indicating that its blobs have
917+
// been uploaded.
918+
return oldRoot instanceof MerkleTree.RootOnly.BlobsUploaded
919+
? oldRoot
920+
: merkleTree.root();
921+
});
922+
return merkleTree.root();
923+
},
924+
MERKLE_TREE_UPLOAD_POOL);
925+
};
926+
Supplier<ListenableFuture<MerkleTree.RootOnly>> buildMerkleTreeTaskSupplier =
927+
() -> Futures.submitAsync(buildMerkleTreeTask, MERKLE_TREE_BUILD_POOL);
928+
if (blobPolicy == BlobPolicy.KEEP_AND_REUPLOAD) {
929+
return inFlightComputations.executeUnconditionally(key, buildMerkleTreeTaskSupplier);
930+
} else {
931+
return inFlightComputations.executeIfNew(key, buildMerkleTreeTaskSupplier);
942932
}
943-
return future;
944933
}
945934

946935
private static <T> T getFromFuture(Future<T> future) throws IOException, InterruptedException {

src/test/java/com/google/devtools/build/lib/remote/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ filegroup(
1717
"//src/test/java/com/google/devtools/build/lib/remote/grpc:srcs",
1818
"//src/test/java/com/google/devtools/build/lib/remote/http:srcs",
1919
"//src/test/java/com/google/devtools/build/lib/remote/logging:srcs",
20+
"//src/test/java/com/google/devtools/build/lib/remote/merkletree:srcs",
2021
"//src/test/java/com/google/devtools/build/lib/remote/options:srcs",
2122
"//src/test/java/com/google/devtools/build/lib/remote/util:srcs",
2223
"//src/test/java/com/google/devtools/build/lib/remote/zstd:srcs",
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
load("@rules_java//java:defs.bzl", "java_test")
2+
3+
package(
4+
default_applicable_licenses = ["//:license"],
5+
default_testonly = 1,
6+
default_visibility = ["//src:__subpackages__"],
7+
)
8+
9+
filegroup(
10+
name = "srcs",
11+
testonly = 0,
12+
srcs = glob(["**"]),
13+
visibility = ["//src:__subpackages__"],
14+
)
15+
16+
java_test(
17+
name = "merkletree",
18+
srcs = glob(["*.java"]),
19+
test_class = "com.google.devtools.build.lib.AllTests",
20+
deps = [
21+
"//src/main/java/com/google/devtools/build/lib/actions",
22+
"//src/main/java/com/google/devtools/build/lib/actions:artifacts",
23+
"//src/main/java/com/google/devtools/build/lib/actions:file_metadata",
24+
"//src/main/java/com/google/devtools/build/lib/actions:fileset_output_tree",
25+
"//src/main/java/com/google/devtools/build/lib/actions:runfiles_metadata",
26+
"//src/main/java/com/google/devtools/build/lib/actions:runfiles_tree",
27+
"//src/main/java/com/google/devtools/build/lib/clock",
28+
"//src/main/java/com/google/devtools/build/lib/remote/common",
29+
"//src/main/java/com/google/devtools/build/lib/remote/merkletree",
30+
"//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils",
31+
"//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value",
32+
"//src/main/java/com/google/devtools/build/lib/util/io",
33+
"//src/main/java/com/google/devtools/build/lib/vfs",
34+
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
35+
"//src/main/java/com/google/devtools/build/lib/vfs/inmemoryfs",
36+
"//src/test/java/com/google/devtools/build/lib:test_runner",
37+
"//src/test/java/com/google/devtools/build/lib/actions/util",
38+
"//src/test/java/com/google/devtools/build/lib/exec/util",
39+
"//src/test/java/com/google/devtools/build/lib/remote/util",
40+
"//third_party:guava",
41+
"//third_party:junit4",
42+
"//third_party:truth",
43+
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
44+
],
45+
)

0 commit comments

Comments
 (0)