Skip to content

Commit 514539b

Browse files
authored
Added a timeout to state regeneration (Consensys#8097)
Added a development flag to allow us to set state regeneration timeout, with a 120 second timeout by default, and a tiny bit of sanity around the flag not being less than 1. Signed-off-by: Paul Harris <[email protected]>
1 parent 91b4e3d commit 514539b

File tree

13 files changed

+274
-54
lines changed

13 files changed

+274
-54
lines changed

Diff for: services/chainstorage/src/main/java/tech/pegasys/teku/services/chainstorage/StorageService.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,11 @@ protected SafeFuture<?> doStart() {
130130
}
131131
final EventChannels eventChannels = serviceConfig.getEventChannels();
132132
chainStorage =
133-
ChainStorage.create(database, config.getSpec(), config.getDataStorageMode());
133+
ChainStorage.create(
134+
database,
135+
config.getSpec(),
136+
config.getDataStorageMode(),
137+
config.getStateRebuildTimeoutSeconds());
134138
final DepositStorage depositStorage =
135139
DepositStorage.create(
136140
eventChannels.getPublisher(Eth1EventsChannel.class),

Diff for: storage/build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ dependencies {
3838
testImplementation testFixtures(project(':infrastructure:metrics'))
3939
testImplementation project(':ethereum:networks')
4040
testImplementation testFixtures(project(':ethereum:spec'))
41+
testImplementation testFixtures(project(':infrastructure:logging'))
4142
testImplementation testFixtures(project(':infrastructure:async'))
4243
testImplementation testFixtures(project(':infrastructure:time'))
4344
testImplementation testFixtures(project(':storage'))
@@ -67,6 +68,7 @@ dependencies {
6768
testFixturesImplementation 'org.hyperledger.besu.internal:metrics-core'
6869
testFixturesImplementation 'org.hyperledger.besu:plugin-api'
6970

71+
7072
jmhImplementation testFixtures(project(':storage'))
7173
jmhImplementation testFixtures(project(':ethereum:spec'))
7274
}

Diff for: storage/src/main/java/tech/pegasys/teku/storage/server/ChainStorage.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,15 @@ private ChainStorage(
7070
}
7171

7272
public static ChainStorage create(
73-
final Database database, final Spec spec, final StateStorageMode dataStorageMode) {
73+
final Database database,
74+
final Spec spec,
75+
final StateStorageMode dataStorageMode,
76+
int stateRebuildTimeoutSeconds) {
7477
final int finalizedStateCacheSize = spec.getSlotsPerEpoch(SpecConfig.GENESIS_EPOCH) * 3;
7578
return new ChainStorage(
7679
database,
77-
new FinalizedStateCache(spec, database, finalizedStateCacheSize, true),
80+
new FinalizedStateCache(
81+
spec, database, finalizedStateCacheSize, true, stateRebuildTimeoutSeconds),
7882
dataStorageMode);
7983
}
8084

Diff for: storage/src/main/java/tech/pegasys/teku/storage/server/StorageConfiguration.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
public class StorageConfiguration {
3636

3737
public static final boolean DEFAULT_STORE_NON_CANONICAL_BLOCKS_ENABLED = false;
38-
38+
public static final int DEFAULT_STATE_REBUILD_TIMEOUT_SECONDS = 120;
3939
public static final long DEFAULT_STORAGE_FREQUENCY = 2048L;
4040
public static final int DEFAULT_MAX_KNOWN_NODE_CACHE_SIZE = 100_000;
4141
public static final Duration DEFAULT_BLOCK_PRUNING_INTERVAL = Duration.ofMinutes(15);
@@ -59,6 +59,8 @@ public class StorageConfiguration {
5959
private final Duration blobsPruningInterval;
6060
private final int blobsPruningLimit;
6161

62+
private final int stateRebuildTimeoutSeconds;
63+
6264
private StorageConfiguration(
6365
final Eth1Address eth1DepositContract,
6466
final StateStorageMode dataStorageMode,
@@ -70,6 +72,7 @@ private StorageConfiguration(
7072
final int blockPruningLimit,
7173
final Duration blobsPruningInterval,
7274
final int blobsPruningLimit,
75+
int stateRebuildTimeoutSeconds,
7376
final Spec spec) {
7477
this.eth1DepositContract = eth1DepositContract;
7578
this.dataStorageMode = dataStorageMode;
@@ -81,6 +84,7 @@ private StorageConfiguration(
8184
this.blockPruningLimit = blockPruningLimit;
8285
this.blobsPruningInterval = blobsPruningInterval;
8386
this.blobsPruningLimit = blobsPruningLimit;
87+
this.stateRebuildTimeoutSeconds = stateRebuildTimeoutSeconds;
8488
this.spec = spec;
8589
}
8690

@@ -96,6 +100,10 @@ public StateStorageMode getDataStorageMode() {
96100
return dataStorageMode;
97101
}
98102

103+
public int getStateRebuildTimeoutSeconds() {
104+
return stateRebuildTimeoutSeconds;
105+
}
106+
99107
public long getDataStorageFrequency() {
100108
return dataStorageFrequency;
101109
}
@@ -146,6 +154,7 @@ public static final class Builder {
146154
private int blockPruningLimit = DEFAULT_BLOCK_PRUNING_LIMIT;
147155
private Duration blobsPruningInterval = DEFAULT_BLOBS_PRUNING_INTERVAL;
148156
private int blobsPruningLimit = DEFAULT_BLOBS_PRUNING_LIMIT;
157+
private int stateRebuildTimeoutSeconds = DEFAULT_STATE_REBUILD_TIMEOUT_SECONDS;
149158

150159
private Builder() {}
151160

@@ -251,6 +260,7 @@ public StorageConfiguration build() {
251260
blockPruningLimit,
252261
blobsPruningInterval,
253262
blobsPruningLimit,
263+
stateRebuildTimeoutSeconds,
254264
spec);
255265
}
256266

@@ -285,6 +295,17 @@ private Optional<StateStorageMode> getStorageModeFromPersistedDatabase(
285295
throw new UncheckedIOException("Failed to read storage mode from file", ex);
286296
}
287297
}
298+
299+
public Builder stateRebuildTimeoutSeconds(int stateRebuildTimeoutSeconds) {
300+
if (stateRebuildTimeoutSeconds < 10 || stateRebuildTimeoutSeconds > 300) {
301+
LOG.warn(
302+
"State rebuild timeout is set outside of sensible defaults of 10 -> 300, {} was defined. Cannot be below 1, will allow the value to exceed 300.",
303+
stateRebuildTimeoutSeconds);
304+
}
305+
this.stateRebuildTimeoutSeconds = Math.max(stateRebuildTimeoutSeconds, 1);
306+
LOG.debug("stateRebuildTimeoutSeconds = {}", stateRebuildTimeoutSeconds);
307+
return this;
308+
}
288309
}
289310

290311
static StateStorageMode determineStorageDefault(

Diff for: storage/src/main/java/tech/pegasys/teku/storage/server/state/FinalizedStateCache.java

+29-45
Original file line numberDiff line numberDiff line change
@@ -13,27 +13,24 @@
1313

1414
package tech.pegasys.teku.storage.server.state;
1515

16-
import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ONE;
17-
1816
import com.google.common.base.Throwables;
1917
import com.google.common.cache.CacheBuilder;
20-
import com.google.common.cache.CacheLoader;
2118
import com.google.common.cache.LoadingCache;
2219
import com.google.common.cache.RemovalCause;
2320
import com.google.common.cache.RemovalNotification;
2421
import com.google.common.util.concurrent.UncheckedExecutionException;
2522
import java.util.NavigableSet;
2623
import java.util.Optional;
2724
import java.util.concurrent.ConcurrentSkipListSet;
28-
import java.util.stream.Stream;
29-
import tech.pegasys.teku.dataproviders.generators.StreamingStateRegenerator;
3025
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
3126
import tech.pegasys.teku.spec.Spec;
32-
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
3327
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
3428
import tech.pegasys.teku.storage.server.Database;
3529

3630
public class FinalizedStateCache {
31+
32+
private static final long MAX_REGENERATE_LOTS = 10_000L;
33+
3734
/**
3835
* Note this is a best effort basis to track what states are cached. Slots are added here slightly
3936
* before the stateCache is actually updated and removed slightly after they are evicted from the
@@ -42,24 +39,40 @@ public class FinalizedStateCache {
4239
private final NavigableSet<UInt64> availableSlots = new ConcurrentSkipListSet<>();
4340

4441
private final LoadingCache<UInt64, BeaconState> stateCache;
45-
private final Spec spec;
46-
private final Database database;
4742

4843
public FinalizedStateCache(
4944
final Spec spec,
5045
final Database database,
5146
final int maximumCacheSize,
52-
final boolean useSoftReferences) {
53-
this.spec = spec;
54-
this.database = database;
47+
final boolean useSoftReferences,
48+
final int stateRebuildTimeoutSeconds) {
49+
this(
50+
spec,
51+
database,
52+
maximumCacheSize,
53+
useSoftReferences,
54+
stateRebuildTimeoutSeconds,
55+
MAX_REGENERATE_LOTS);
56+
}
57+
58+
FinalizedStateCache(
59+
final Spec spec,
60+
final Database database,
61+
final int maximumCacheSize,
62+
final boolean useSoftReferences,
63+
int stateRebuildTimeoutSeconds,
64+
final long maxRegenerateSlots) {
5565
final CacheBuilder<UInt64, BeaconState> cacheBuilder =
5666
CacheBuilder.newBuilder()
5767
.maximumSize(maximumCacheSize)
5868
.removalListener(this::onRemovedFromCache);
5969
if (useSoftReferences) {
6070
cacheBuilder.softValues();
6171
}
62-
this.stateCache = cacheBuilder.build(new StateCacheLoader());
72+
this.stateCache =
73+
cacheBuilder.build(
74+
new StateCacheLoader(
75+
spec, database, stateRebuildTimeoutSeconds, maxRegenerateSlots, this));
6376
}
6477

6578
private void onRemovedFromCache(
@@ -80,46 +93,17 @@ public Optional<BeaconState> getFinalizedState(final UInt64 slot) {
8093
}
8194
}
8295

83-
private Optional<BeaconState> getLatestStateFromCache(final UInt64 slot) {
96+
Optional<BeaconState> getLatestStateFromCache(final UInt64 slot) {
8497
return Optional.ofNullable(availableSlots.floor(slot)).map(stateCache::getIfPresent);
8598
}
8699

87-
private class StateCacheLoader extends CacheLoader<UInt64, BeaconState> {
88-
89-
@Override
90-
public BeaconState load(final UInt64 key) {
91-
return regenerateState(key).orElseThrow(StateUnavailableException::new);
92-
}
93-
94-
private Optional<BeaconState> regenerateState(final UInt64 slot) {
95-
return database
96-
.getLatestAvailableFinalizedState(slot)
97-
.map(state -> regenerateState(slot, state));
98-
}
99-
100-
private BeaconState regenerateState(final UInt64 slot, final BeaconState stateFromDisk) {
101-
final Optional<BeaconState> latestStateFromCache = getLatestStateFromCache(slot);
102-
final BeaconState preState =
103-
latestStateFromCache
104-
.filter(
105-
stateFromCache ->
106-
stateFromCache.getSlot().compareTo(stateFromDisk.getSlot()) >= 0)
107-
.orElse(stateFromDisk);
108-
if (preState.getSlot().equals(slot)) {
109-
return preState;
110-
}
111-
try (final Stream<SignedBeaconBlock> blocks =
112-
database.streamFinalizedBlocks(preState.getSlot().plus(ONE), slot)) {
113-
final BeaconState state = StreamingStateRegenerator.regenerate(spec, preState, blocks);
114-
availableSlots.add(state.getSlot());
115-
return state;
116-
}
117-
}
100+
NavigableSet<UInt64> getAvailableSlots() {
101+
return availableSlots;
118102
}
119103

120104
/**
121105
* Cache doesn't allow returning null but we may not be able to regenerate a state so throw this
122106
* exception and catch it in {@link #getFinalizedState(UInt64)}
123107
*/
124-
private static class StateUnavailableException extends RuntimeException {}
108+
static class StateUnavailableException extends RuntimeException {}
125109
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright Consensys Software Inc., 2024
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package tech.pegasys.teku.storage.server.state;
15+
16+
import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ONE;
17+
18+
import com.google.common.cache.CacheLoader;
19+
import java.util.Optional;
20+
import java.util.concurrent.ExecutionException;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.TimeoutException;
23+
import java.util.stream.Stream;
24+
import org.apache.logging.log4j.LogManager;
25+
import org.apache.logging.log4j.Logger;
26+
import tech.pegasys.teku.dataproviders.generators.StreamingStateRegenerator;
27+
import tech.pegasys.teku.infrastructure.async.SafeFuture;
28+
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
29+
import tech.pegasys.teku.spec.Spec;
30+
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
31+
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
32+
import tech.pegasys.teku.storage.server.Database;
33+
34+
class StateCacheLoader extends CacheLoader<UInt64, BeaconState> {
35+
private static final Logger LOG = LogManager.getLogger();
36+
private final int stateRebuildTimeoutSeconds;
37+
private final Database database;
38+
private final long maxRegenerateSlots;
39+
private final FinalizedStateCache finalizedStateCache;
40+
private final Spec spec;
41+
42+
StateCacheLoader(
43+
final Spec spec,
44+
final Database database,
45+
final int stateRebuildTimeoutSeconds,
46+
final long maxRegenerateSlots,
47+
final FinalizedStateCache finalizedStateCache) {
48+
this.database = database;
49+
this.stateRebuildTimeoutSeconds = stateRebuildTimeoutSeconds;
50+
this.maxRegenerateSlots = maxRegenerateSlots;
51+
this.finalizedStateCache = finalizedStateCache;
52+
this.spec = spec;
53+
}
54+
55+
@Override
56+
public BeaconState load(final UInt64 key) {
57+
return regenerateState(key).orElseThrow(FinalizedStateCache.StateUnavailableException::new);
58+
}
59+
60+
private Optional<BeaconState> regenerateState(final UInt64 slot) {
61+
final Optional<BeaconState> maybeState = database.getLatestAvailableFinalizedState(slot);
62+
if (maybeState.isEmpty()) {
63+
return Optional.empty();
64+
}
65+
final BeaconState state = maybeState.get();
66+
try {
67+
return Optional.of(
68+
regenerateStateWithinReasonableTime(slot, state)
69+
.get(stateRebuildTimeoutSeconds, TimeUnit.SECONDS));
70+
} catch (ExecutionException | InterruptedException e) {
71+
LOG.warn("Failed to regenerate state for slot {}", slot, e);
72+
return Optional.empty();
73+
} catch (TimeoutException e) {
74+
LOG.error(
75+
"Timed out trying to regenerate state at slot {} starting from slot {} within {} seconds",
76+
slot,
77+
state.getSlot(),
78+
stateRebuildTimeoutSeconds);
79+
return Optional.empty();
80+
}
81+
}
82+
83+
private SafeFuture<BeaconState> regenerateStateWithinReasonableTime(
84+
final UInt64 slot, final BeaconState stateFromDisk) {
85+
final Optional<BeaconState> latestStateFromCache =
86+
finalizedStateCache.getLatestStateFromCache(slot);
87+
final BeaconState preState =
88+
latestStateFromCache
89+
.filter(
90+
stateFromCache -> stateFromCache.getSlot().compareTo(stateFromDisk.getSlot()) >= 0)
91+
.orElse(stateFromDisk);
92+
if (preState.getSlot().equals(slot)) {
93+
return SafeFuture.completedFuture(preState);
94+
}
95+
final long regenerateSlotCount = slot.minusMinZero(stateFromDisk.getSlot()).longValue();
96+
LOG.trace("Slots to regenerate state from: {}", regenerateSlotCount);
97+
if (regenerateSlotCount > maxRegenerateSlots) {
98+
LOG.error(
99+
"Refusing to regenerate a state that is {} slots from what we have stored",
100+
regenerateSlotCount);
101+
return SafeFuture.failedFuture(new FinalizedStateCache.StateUnavailableException());
102+
}
103+
try (final Stream<SignedBeaconBlock> blocks =
104+
database.streamFinalizedBlocks(preState.getSlot().plus(ONE), slot)) {
105+
final BeaconState state = StreamingStateRegenerator.regenerate(spec, preState, blocks);
106+
finalizedStateCache.getAvailableSlots().add(state.getSlot());
107+
return SafeFuture.completedFuture(state);
108+
}
109+
}
110+
}

Diff for: storage/src/test/java/tech/pegasys/teku/storage/server/MultiThreadedStoreTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ private StorageSystem createStorageSystem(
117117
.dataDir(tempDir.toPath())
118118
.version(DatabaseVersion.LEVELDB2)
119119
.storageMode(storageMode)
120+
.stateRebuildTimeoutSeconds(12)
120121
.stateStorageFrequency(1L)
121122
.storeConfig(storeConfig)
122123
.storeNonCanonicalBlocks(storeNonCanonicalBlocks)

Diff for: storage/src/test/java/tech/pegasys/teku/storage/server/state/FinalizedStateCacheTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class FinalizedStateCacheTest {
4444
private final Database database = mock(Database.class);
4545
// We don't use soft references in unit tests to avoid intermittency
4646
private final FinalizedStateCache cache =
47-
new FinalizedStateCache(spec, database, MAXIMUM_CACHE_SIZE, false);
47+
new FinalizedStateCache(spec, database, MAXIMUM_CACHE_SIZE, false, 120);
4848

4949
@BeforeEach
5050
public void setUp() {

0 commit comments

Comments
 (0)