Skip to content

Commit 003cf8b

Browse files
authored
fix: Backward cursor handling single message (#7826)
1 parent 5acd5c4 commit 003cf8b

1 file changed

Lines changed: 248 additions & 23 deletions

File tree

  • rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware

rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/backward.rs

Lines changed: 248 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -170,16 +170,15 @@ impl<T: Debug + Clone + Sync + Send + Indexable + 'static> BackwardSequenceAware
170170
let lowest_block_height_or_sequence = self.get_lowest_block_height_or_sequence().await?;
171171

172172
// Query the block height range ending at the current_indexing_snapshot's at_block.
173-
// We assume that chunk size is at least 1 so that the block 0 is indexed
174-
// together with block 1. That's why we can compare the current block height
175-
// with the lowest block height with <=.
176-
if current_indexing_snapshot.at_block <= lowest_block_height_or_sequence {
177-
// If the current indexing snapshot's block height is less than or equal to the
173+
// When current block equals lowest block, we still need to index it
174+
// (e.g., when there's only one block to index at the boundary).
175+
if current_indexing_snapshot.at_block < lowest_block_height_or_sequence {
176+
// If the current indexing snapshot's block height is less than the
178177
// lowest block height, we don't want to index anything below the lowest block height.
179178
info!(
180179
current_indexing_snapshot=?current_indexing_snapshot,
181180
lowest_block_height=lowest_block_height_or_sequence,
182-
"Current indexing snapshot's block height is less than or equal to the lowest block height, \
181+
"Current indexing snapshot's block height is less than the lowest block height, \
183182
not indexing anything below the lowest block height"
184183
);
185184
return None;
@@ -203,16 +202,15 @@ impl<T: Debug + Clone + Sync + Send + Indexable + 'static> BackwardSequenceAware
203202
let lowest_block_height_or_sequence = self.get_lowest_block_height_or_sequence().await?;
204203

205204
// Query the sequence range ending at the current_indexing_snapshot's sequence.
206-
// We assume that chunk size is at least 1 so that the sequence 0 is indexed
207-
// together with sequence 1. That's why we can compare the current sequence
208-
// with the lowest sequence with <=.
209-
if current_indexing_snapshot.sequence <= lowest_block_height_or_sequence {
210-
// If the current indexing snapshot's sequence is less than or equal to the lowest sequence,
205+
// When current sequence equals lowest sequence, we still need to index it
206+
// (e.g., when there's only one message at sequence 0).
207+
if current_indexing_snapshot.sequence < lowest_block_height_or_sequence {
208+
// If the current indexing snapshot's sequence is less than the lowest sequence,
211209
// we don't want to index anything below the lowest sequence.
212210
info!(
213211
current_indexing_snapshot=?current_indexing_snapshot,
214212
lowest_sequence=lowest_block_height_or_sequence,
215-
"Current indexing snapshot's sequence is less than or equal to the lowest sequence, \
213+
"Current indexing snapshot's sequence is less than the lowest sequence, \
216214
not indexing anything below the lowest sequence"
217215
);
218216
return None;
@@ -360,12 +358,15 @@ impl<T: Debug + Clone + Sync + Send + Indexable + 'static> BackwardSequenceAware
360358
// If the number of logs, which start at the current sequence and go backwards,
361359
// exceeds the current indexing snapshot sequence, we've synced everything including
362360
// sequence 0. Otherwise, we're not fully synced yet.
361+
// Note: at_block is set to range.start() - 1 to avoid re-indexing the block we just processed.
362+
// This works with the `<` check in get_next_block_range to allow indexing when
363+
// current_block == lowest_block initially, while preventing re-indexing after updates.
363364
self.current_indexing_snapshot = current_indexing_snapshot
364365
.sequence
365366
.checked_sub(logs_len)
366367
.map(|new_current_sequence| TargetSnapshot {
367368
sequence: new_current_sequence,
368-
at_block: *range.start(),
369+
at_block: range.start().saturating_sub(1),
369370
});
370371

371372
// This means we indexed at least one log that builds on the last snapshot.
@@ -689,7 +690,7 @@ mod test {
689690
cursor.current_indexing_snapshot,
690691
Some(TargetSnapshot {
691692
sequence: 96,
692-
at_block: 900,
693+
at_block: 899,
693694
})
694695
);
695696
assert_eq!(
@@ -721,7 +722,7 @@ mod test {
721722
cursor.current_indexing_snapshot,
722723
Some(TargetSnapshot {
723724
sequence: 99,
724-
at_block: 900,
725+
at_block: 899,
725726
})
726727
);
727728
assert_eq!(
@@ -735,7 +736,7 @@ mod test {
735736
// Expect the range to be:
736737
// (current - chunk_size, current)
737738
let range = cursor.get_next_range().await.unwrap().unwrap();
738-
let expected_range = 800..=900;
739+
let expected_range = 799..=899;
739740
assert_eq!(range, expected_range);
740741

741742
// Update the cursor with some found logs now.
@@ -757,7 +758,7 @@ mod test {
757758
cursor.current_indexing_snapshot,
758759
Some(TargetSnapshot {
759760
sequence: 95,
760-
at_block: 800,
761+
at_block: 798,
761762
})
762763
);
763764
assert_eq!(
@@ -796,7 +797,7 @@ mod test {
796797
cur.current_indexing_snapshot,
797798
Some(TargetSnapshot {
798799
sequence: 99,
799-
at_block: 900,
800+
at_block: 899,
800801
})
801802
);
802803
assert_eq!(
@@ -810,7 +811,7 @@ mod test {
810811
// Expect the range to be:
811812
// (start, tip)
812813
let range = cur.get_next_range().await.unwrap().unwrap();
813-
let expected_range = 800..=900;
814+
let expected_range = 799..=899;
814815
assert_eq!(range, expected_range);
815816

816817
// Update the cursor, expecting a rewind now
@@ -891,7 +892,7 @@ mod test {
891892
cursor.current_indexing_snapshot,
892893
Some(TargetSnapshot {
893894
sequence: 98,
894-
at_block: 900,
895+
at_block: 899,
895896
})
896897
);
897898
assert_eq!(
@@ -1024,7 +1025,7 @@ mod test {
10241025
cursor.current_indexing_snapshot,
10251026
Some(TargetSnapshot {
10261027
sequence: 41,
1027-
at_block: 942,
1028+
at_block: 941,
10281029
})
10291030
);
10301031
assert_eq!(
@@ -1081,7 +1082,7 @@ mod test {
10811082
cursor.current_indexing_snapshot,
10821083
Some(TargetSnapshot {
10831084
sequence: 49,
1084-
at_block: 950,
1085+
at_block: 949,
10851086
})
10861087
);
10871088
assert_eq!(
@@ -1095,7 +1096,119 @@ mod test {
10951096
// Expect the range to stop at the lowest block height even if the range
10961097
// is tighter than the chunk size.
10971098
let range = cursor.get_next_range().await.unwrap();
1098-
assert_eq!(range, Some(942..=950));
1099+
assert_eq!(range, Some(942..=949));
1100+
}
1101+
1102+
/// When current block equals lowest block, we should still index it.
1103+
/// This covers the edge case where start_block == lowest_block.
1104+
#[tracing_test::traced_test]
1105+
#[tokio::test]
1106+
async fn test_indexes_single_block_at_lowest() {
1107+
let lowest_block_height = 1000i64;
1108+
1109+
let latest_sequence_querier = Arc::new(MockLatestSequenceQuerier {
1110+
latest_sequence_count: Some(0),
1111+
tip: 1000,
1112+
});
1113+
1114+
let db = Arc::new(
1115+
MockHyperlaneSequenceAwareIndexerStore::<MockSequencedData> { logs: vec![] },
1116+
);
1117+
1118+
let metrics_data = MetricsData {
1119+
domain: HyperlaneDomain::new_test_domain("test"),
1120+
metrics: Arc::new(mock_cursor_metrics()),
1121+
};
1122+
let params = BackwardSequenceAwareSyncCursorParams {
1123+
chunk_size: CHUNK_SIZE,
1124+
latest_sequence_querier,
1125+
lowest_block_height_or_sequence: lowest_block_height,
1126+
store: db,
1127+
current_sequence_count: 1,
1128+
start_block: 1000,
1129+
index_mode: INDEX_MODE,
1130+
metrics_data,
1131+
};
1132+
let mut cursor = BackwardSequenceAwareSyncCursor::new(params);
1133+
cursor.skip_indexed().await.unwrap();
1134+
1135+
// current block (1000) == lowest block (1000), should still return a range
1136+
let range = cursor.get_next_range().await.unwrap();
1137+
assert_eq!(range, Some(1000..=1000));
1138+
}
1139+
1140+
/// When current block is below lowest block (misconfiguration), return None.
1141+
#[tracing_test::traced_test]
1142+
#[tokio::test]
1143+
async fn test_returns_none_when_current_below_lowest() {
1144+
let lowest_block_height = 2000i64; // Higher than start_block
1145+
1146+
let latest_sequence_querier = Arc::new(MockLatestSequenceQuerier {
1147+
latest_sequence_count: Some(0),
1148+
tip: 1000,
1149+
});
1150+
1151+
let db = Arc::new(
1152+
MockHyperlaneSequenceAwareIndexerStore::<MockSequencedData> { logs: vec![] },
1153+
);
1154+
1155+
let metrics_data = MetricsData {
1156+
domain: HyperlaneDomain::new_test_domain("test"),
1157+
metrics: Arc::new(mock_cursor_metrics()),
1158+
};
1159+
let params = BackwardSequenceAwareSyncCursorParams {
1160+
chunk_size: CHUNK_SIZE,
1161+
latest_sequence_querier,
1162+
lowest_block_height_or_sequence: lowest_block_height,
1163+
store: db,
1164+
current_sequence_count: 1,
1165+
start_block: 1000, // Below lowest_block_height
1166+
index_mode: INDEX_MODE,
1167+
metrics_data,
1168+
};
1169+
let mut cursor = BackwardSequenceAwareSyncCursor::new(params);
1170+
cursor.skip_indexed().await.unwrap();
1171+
1172+
// current block (1000) < lowest block (2000), should return None
1173+
let range = cursor.get_next_range().await.unwrap();
1174+
assert_eq!(range, None);
1175+
}
1176+
1177+
/// When current block is one above lowest, should return 2-element range.
1178+
#[tracing_test::traced_test]
1179+
#[tokio::test]
1180+
async fn test_indexes_two_blocks_at_boundary() {
1181+
let lowest_block_height = 1000i64;
1182+
1183+
let latest_sequence_querier = Arc::new(MockLatestSequenceQuerier {
1184+
latest_sequence_count: Some(1),
1185+
tip: 1001,
1186+
});
1187+
1188+
let db = Arc::new(
1189+
MockHyperlaneSequenceAwareIndexerStore::<MockSequencedData> { logs: vec![] },
1190+
);
1191+
1192+
let metrics_data = MetricsData {
1193+
domain: HyperlaneDomain::new_test_domain("test"),
1194+
metrics: Arc::new(mock_cursor_metrics()),
1195+
};
1196+
let params = BackwardSequenceAwareSyncCursorParams {
1197+
chunk_size: CHUNK_SIZE,
1198+
latest_sequence_querier,
1199+
lowest_block_height_or_sequence: lowest_block_height,
1200+
store: db,
1201+
current_sequence_count: 2,
1202+
start_block: 1001,
1203+
index_mode: INDEX_MODE,
1204+
metrics_data,
1205+
};
1206+
let mut cursor = BackwardSequenceAwareSyncCursor::new(params);
1207+
cursor.skip_indexed().await.unwrap();
1208+
1209+
// current block (1001) == lowest block + 1, should return 2-element range
1210+
let range = cursor.get_next_range().await.unwrap();
1211+
assert_eq!(range, Some(1000..=1001));
10991212
}
11001213
}
11011214

@@ -1459,6 +1572,118 @@ mod test {
14591572
assert_eq!(range, Some(42..=48));
14601573
}
14611574

1575+
/// When current sequence equals lowest sequence, we should still index it.
1576+
/// This covers the edge case of a single message at sequence 0.
1577+
#[tracing_test::traced_test]
1578+
#[tokio::test]
1579+
async fn test_indexes_single_sequence_at_lowest() {
1580+
let lowest_sequence = 0i64;
1581+
1582+
let latest_sequence_querier = Arc::new(MockLatestSequenceQuerier {
1583+
latest_sequence_count: Some(0),
1584+
tip: 1000,
1585+
});
1586+
1587+
let db = Arc::new(
1588+
MockHyperlaneSequenceAwareIndexerStore::<MockSequencedData> { logs: vec![] },
1589+
);
1590+
1591+
let metrics_data = MetricsData {
1592+
domain: HyperlaneDomain::new_test_domain("test"),
1593+
metrics: Arc::new(mock_cursor_metrics()),
1594+
};
1595+
let params = BackwardSequenceAwareSyncCursorParams {
1596+
chunk_size: CHUNK_SIZE,
1597+
latest_sequence_querier,
1598+
lowest_block_height_or_sequence: lowest_sequence,
1599+
store: db,
1600+
current_sequence_count: 1,
1601+
start_block: 1000,
1602+
index_mode: INDEX_MODE,
1603+
metrics_data,
1604+
};
1605+
let mut cursor = BackwardSequenceAwareSyncCursor::new(params);
1606+
cursor.skip_indexed().await.unwrap();
1607+
1608+
// current sequence (0) == lowest sequence (0), should still return a range
1609+
let range = cursor.get_next_range().await.unwrap();
1610+
assert_eq!(range, Some(0..=0));
1611+
}
1612+
1613+
/// When current sequence is below lowest sequence (misconfiguration), return None.
1614+
#[tracing_test::traced_test]
1615+
#[tokio::test]
1616+
async fn test_returns_none_when_current_below_lowest() {
1617+
let lowest_sequence = 100i64; // Higher than current
1618+
1619+
let latest_sequence_querier = Arc::new(MockLatestSequenceQuerier {
1620+
latest_sequence_count: Some(0),
1621+
tip: 1000,
1622+
});
1623+
1624+
let db = Arc::new(
1625+
MockHyperlaneSequenceAwareIndexerStore::<MockSequencedData> { logs: vec![] },
1626+
);
1627+
1628+
let metrics_data = MetricsData {
1629+
domain: HyperlaneDomain::new_test_domain("test"),
1630+
metrics: Arc::new(mock_cursor_metrics()),
1631+
};
1632+
let params = BackwardSequenceAwareSyncCursorParams {
1633+
chunk_size: CHUNK_SIZE,
1634+
latest_sequence_querier,
1635+
lowest_block_height_or_sequence: lowest_sequence,
1636+
store: db,
1637+
current_sequence_count: 1, // sequence 0, below lowest
1638+
start_block: 1000,
1639+
index_mode: INDEX_MODE,
1640+
metrics_data,
1641+
};
1642+
let mut cursor = BackwardSequenceAwareSyncCursor::new(params);
1643+
cursor.skip_indexed().await.unwrap();
1644+
1645+
// current sequence (0) < lowest sequence (100), should return None
1646+
let range = cursor.get_next_range().await.unwrap();
1647+
assert_eq!(range, None);
1648+
}
1649+
1650+
/// When current sequence is one above lowest, should return 2-element range.
1651+
#[tracing_test::traced_test]
1652+
#[tokio::test]
1653+
async fn test_indexes_two_sequences_at_boundary() {
1654+
let lowest_sequence = 0i64;
1655+
1656+
let latest_sequence_querier = Arc::new(MockLatestSequenceQuerier {
1657+
latest_sequence_count: Some(1),
1658+
tip: 1000,
1659+
});
1660+
1661+
let db = Arc::new(
1662+
MockHyperlaneSequenceAwareIndexerStore::<MockSequencedData> { logs: vec![] },
1663+
);
1664+
1665+
let metrics_data = MetricsData {
1666+
domain: HyperlaneDomain::new_test_domain("test"),
1667+
metrics: Arc::new(mock_cursor_metrics()),
1668+
};
1669+
let params = BackwardSequenceAwareSyncCursorParams {
1670+
chunk_size: CHUNK_SIZE,
1671+
latest_sequence_querier,
1672+
lowest_block_height_or_sequence: lowest_sequence,
1673+
store: db,
1674+
current_sequence_count: 2, // sequences 0 and 1
1675+
start_block: 1000,
1676+
index_mode: INDEX_MODE,
1677+
metrics_data,
1678+
};
1679+
let mut cursor = BackwardSequenceAwareSyncCursor::new(params);
1680+
cursor.skip_indexed().await.unwrap();
1681+
1682+
// current sequence (1) == lowest sequence + 1, should return 2-element range
1683+
let range = cursor.get_next_range().await.unwrap();
1684+
assert_eq!(range, Some(0..=1));
1685+
}
1686+
14621687
#[tracing_test::traced_test]
14631688
#[tokio::test]
14641689
async fn test_get_next_range_negative_block_height() {

0 commit comments

Comments
 (0)