Skip to content

Commit 220e498

Browse files
authored
HIP-1193 Support two-stage cutover (#13331)
- Support the first-stage cutover to stream WRBs from BN with recordstream as fallback - Run blockstream streaming / recordstream downloading in a synchronized block due to race condition - Support fast fallback to recordstream when blocks were not advanced in a blockstream attempt - Support fallback to recordstream when under high latency blockstream streaming Signed-off-by: Xin Li <xin@hashgraph.com>
1 parent 7f93620 commit 220e498

26 files changed

+907
-490
lines changed

docs/configuration.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,12 @@ value, it is recommended to only populate overridden properties in the custom `a
4646
| ------------------------------------------------------------------------------- | ---------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
4747
| `hiero.mirror.importer.block.autoDiscoveryEnabled` | true | When enabled, auto-discovered block node properties (read from cache or database) are merged with block node config file properties. |
4848
| `hiero.mirror.importer.block.bucketName` | | The cloud storage bucket name to download blockstream files. This value takes priority over network hardcoded bucket names. |
49-
| `hiero.mirror.importer.block.cutover` | | Whether to auto switch from record stream to block stream. This overrides the default set for the network. |
50-
| `hiero.mirror.importer.block.cutoverThreshold` | 8s | The amount of time to wait to switch between block stream and record stream during cutover. |
49+
| `hiero.mirror.importer.block.cutover.enabled` | false | Whether to auto switch from record stream to block stream. This overrides the default set for the network. Note it defaults to false to dsiable cutover until a later release when the feature is ready |
50+
| `hiero.mirror.importer.block.cutover.firstStage.enabled` | false | Whether to enable the first-stage cutover |
51+
| `hiero.mirror.importer.block.cutover.firstStage.hapiVersion` | 0.75.0 | The minimum HAPI version to enable the first-stage cutover |
52+
| `hiero.mirror.importer.block.cutover.firstStage.latencyCheckThreshold` | 10s | The elapsed time threshold in block stream streaming to check latency |
53+
| `hiero.mirror.importer.block.cutover.firstStage.maxLatency` | 4s | The maximum latency, if crossed, to fall back to record stream |
54+
| `hiero.mirror.importer.block.cutover.threshold` | 16s | The amount of time to wait to switch between block stream and record stream during cutover. |
5155
| `hiero.mirror.importer.block.enabled` | false | Whether to enable block stream source |
5256
| `hiero.mirror.importer.block.frequency` | 100ms | The fixed period between invocations. Can accept duration units like `10s`, `2m`, etc. If not specified, millisecond is implied as the unit. |
5357
| `hiero.mirror.importer.block.ledger.historyProofVerificationKey` | | The WRAPS verification key bytes as a base64 string. Note the ledger configuration is a fallback with the lowest priority. |

importer/src/main/java/org/hiero/mirror/importer/downloader/Downloader.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ private boolean verifySignatures(Collection<StreamFileSignature> signatures, Str
358358
return false;
359359
}
360360

361-
onVerified(streamFileData, streamFile, node);
361+
onVerified(streamFileData, streamFile);
362362
return true;
363363
} catch (FileOperationException | HashMismatchException | TransientProviderException e) {
364364
final var previous =
@@ -382,8 +382,7 @@ private boolean verifySignatures(Collection<StreamFileSignature> signatures, Str
382382
return false;
383383
}
384384

385-
@SuppressWarnings("java:S1172") // Unused Parameter (node) required by subclass implementations
386-
protected void onVerified(StreamFileData streamFileData, T streamFile, ConsensusNode node) {
385+
protected void onVerified(StreamFileData streamFileData, T streamFile) {
387386
setStreamFileIndex(streamFile);
388387
streamFileNotifier.verified(streamFile);
389388

importer/src/main/java/org/hiero/mirror/importer/downloader/balance/AccountBalancesDownloader.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import org.hiero.mirror.common.domain.balance.AccountBalance;
99
import org.hiero.mirror.common.domain.balance.AccountBalanceFile;
1010
import org.hiero.mirror.importer.ImporterProperties;
11-
import org.hiero.mirror.importer.addressbook.ConsensusNode;
1211
import org.hiero.mirror.importer.addressbook.ConsensusNodeService;
1312
import org.hiero.mirror.importer.config.DateRangeCalculator;
1413
import org.hiero.mirror.importer.domain.StreamFileData;
@@ -61,8 +60,8 @@ public void download() {
6160
}
6261

6362
@Override
64-
protected void onVerified(StreamFileData streamFileData, AccountBalanceFile streamFile, ConsensusNode node) {
65-
super.onVerified(streamFileData, streamFile, node);
63+
protected void onVerified(StreamFileData streamFileData, AccountBalanceFile streamFile) {
64+
super.onVerified(streamFileData, streamFile);
6665
accountBalanceFileExists.set(true);
6766
}
6867

importer/src/main/java/org/hiero/mirror/importer/downloader/block/AbstractBlockSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.hiero.mirror.common.domain.transaction.BlockFile;
88
import org.hiero.mirror.common.domain.transaction.RecordFile;
99
import org.hiero.mirror.importer.downloader.CommonDownloaderProperties;
10+
import org.hiero.mirror.importer.downloader.block.cutover.CutoverService;
1011
import org.hiero.mirror.importer.reader.block.BlockStream;
1112
import org.hiero.mirror.importer.reader.block.BlockStreamReader;
1213
import org.jspecify.annotations.NullMarked;

importer/src/main/java/org/hiero/mirror/importer/downloader/block/BlockDownloaderProperties.java

Lines changed: 0 additions & 16 deletions
This file was deleted.

importer/src/main/java/org/hiero/mirror/importer/downloader/block/BlockFileSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.hiero.mirror.importer.domain.StreamFileData;
1616
import org.hiero.mirror.importer.domain.StreamFilename;
1717
import org.hiero.mirror.importer.downloader.CommonDownloaderProperties;
18+
import org.hiero.mirror.importer.downloader.block.cutover.CutoverService;
1819
import org.hiero.mirror.importer.downloader.provider.StreamFileProvider;
1920
import org.hiero.mirror.importer.exception.BlockStreamException;
2021
import org.hiero.mirror.importer.reader.block.BlockStream;

importer/src/main/java/org/hiero/mirror/importer/downloader/block/BlockNodeSubscriber.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.concurrent.atomic.AtomicLong;
1515
import java.util.concurrent.atomic.AtomicReference;
1616
import org.hiero.mirror.importer.downloader.CommonDownloaderProperties;
17+
import org.hiero.mirror.importer.downloader.block.cutover.CutoverService;
1718
import org.hiero.mirror.importer.exception.BlockStreamException;
1819
import org.hiero.mirror.importer.reader.block.BlockStreamReader;
1920
import org.jspecify.annotations.NullMarked;

importer/src/main/java/org/hiero/mirror/importer/downloader/block/BlockProperties.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import java.util.Collections;
1010
import lombok.Data;
1111
import org.apache.commons.lang3.StringUtils;
12-
import org.hibernate.validator.constraints.time.DurationMin;
1312
import org.hiero.mirror.common.domain.transaction.BlockSourceType;
1413
import org.hiero.mirror.importer.ImporterProperties;
1514
import org.hiero.mirror.importer.downloader.block.tss.LedgerProperties;
@@ -29,12 +28,6 @@ public class BlockProperties {
2928

3029
private String bucketName;
3130

32-
private Boolean cutover;
33-
34-
@DurationMin(seconds = 8)
35-
@NotNull
36-
private Duration cutoverThreshold = Duration.ofSeconds(8);
37-
3831
private boolean enabled = false;
3932

4033
@NotNull

importer/src/main/java/org/hiero/mirror/importer/downloader/block/BlockStreamVerifier.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.hiero.mirror.importer.domain.StreamFilename;
2727
import org.hiero.mirror.importer.downloader.NodeSignatureVerifier;
2828
import org.hiero.mirror.importer.downloader.StreamFileNotifier;
29+
import org.hiero.mirror.importer.downloader.block.cutover.CutoverService;
2930
import org.hiero.mirror.importer.downloader.block.tss.LedgerIdPublicationTransactionParser;
3031
import org.hiero.mirror.importer.downloader.block.tss.TssVerifier;
3132
import org.hiero.mirror.importer.exception.HashMismatchException;

importer/src/main/java/org/hiero/mirror/importer/downloader/block/CompositeBlockSource.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import lombok.RequiredArgsConstructor;
1111
import org.hiero.mirror.common.domain.StreamType;
1212
import org.hiero.mirror.common.domain.transaction.BlockSourceType;
13+
import org.hiero.mirror.importer.downloader.block.cutover.CutoverService;
1314
import org.springframework.context.annotation.Primary;
1415
import org.springframework.scheduling.annotation.Scheduled;
1516

@@ -42,18 +43,16 @@ final class CompositeBlockSource implements BlockSource {
4243
@Override
4344
@Scheduled(fixedDelayString = "#{@blockProperties.getFrequency().toMillis()}")
4445
public void get() {
45-
if (!cutoverService.isActive(StreamType.BLOCK)) {
46-
return;
47-
}
48-
49-
var sourceHealth = getSourceHealth();
50-
try {
51-
sourceHealth.getSource().get();
52-
sourceHealth.reset();
53-
} catch (Throwable t) {
54-
log.error("Failed to get block from {} source", sourceHealth.getType(), t);
55-
sourceHealth.onError();
56-
}
46+
cutoverService.get(StreamType.BLOCK, () -> {
47+
final var sourceHealth = getSourceHealth();
48+
try {
49+
sourceHealth.getSource().get();
50+
sourceHealth.reset();
51+
} catch (Throwable t) {
52+
log.error("Failed to get block from {} source", sourceHealth.getType(), t);
53+
sourceHealth.onError();
54+
}
55+
});
5756
}
5857

5958
private SourceHealth getSourceHealth() {

0 commit comments

Comments
 (0)