Skip to content

Commit de95257

Browse files
committed
store: Revert entity versions during copy instead of after
Reverting entities after copying can be very slow; the step is also unnecessary since we already know during copying which entity versions need to be unclamped and we never copy versions that would have to be deleted by the revert. We move the revert logic into CopyEntityBatchQuery so that entity versions are reverted as they are copied rather than in a separate revert_block pass after copying completes. The post-copy revert_block call in start_subgraph is kept as a no-op safety net for copies that were started with older code and resumed after upgrading. It can be removed once a release with this logic has been out for long enough.
1 parent 709003c commit de95257

3 files changed

Lines changed: 63 additions & 15 deletions

File tree

store/postgres/src/copy.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,14 @@ impl CopyState {
178178
dst: Arc<Layout>,
179179
target_block: BlockPtr,
180180
) -> Result<CopyState, StoreError> {
181-
let tables = TableState::load(conn, primary, src.as_ref(), dst.as_ref()).await?;
181+
let tables = TableState::load(
182+
conn,
183+
primary,
184+
src.as_ref(),
185+
dst.as_ref(),
186+
target_block.number,
187+
)
188+
.await?;
182189
let (finished, mut unfinished): (Vec<_>, Vec<_>) =
183190
tables.into_iter().partition(|table| table.finished());
184191
unfinished.sort_by_key(|table| table.dst.object.to_string());
@@ -329,6 +336,7 @@ struct TableState {
329336
dst_site: Arc<Site>,
330337
batcher: VidBatcher,
331338
duration_ms: i64,
339+
target_block: BlockNumber,
332340
}
333341

334342
impl TableState {
@@ -351,6 +359,7 @@ impl TableState {
351359
dst_site,
352360
batcher,
353361
duration_ms: 0,
362+
target_block: target_block.number,
354363
})
355364
}
356365

@@ -363,6 +372,7 @@ impl TableState {
363372
primary: Primary,
364373
src_layout: &Layout,
365374
dst_layout: &Layout,
375+
target_block: BlockNumber,
366376
) -> Result<Vec<TableState>, StoreError> {
367377
use copy_table_state as cts;
368378

@@ -429,6 +439,7 @@ impl TableState {
429439
dst_site: dst_layout.site.clone(),
430440
batcher,
431441
duration_ms,
442+
target_block,
432443
};
433444
states.push(state);
434445
}
@@ -503,15 +514,20 @@ impl TableState {
503514
}
504515

505516
async fn copy_batch(&mut self, conn: &mut AsyncPgConnection) -> Result<Status, StoreError> {
506-
let (duration, count) = self
517+
let (duration, count): (_, Option<i32>) = self
507518
.batcher
508-
.step(async |start, end| {
509-
let count =
510-
rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, start, end)?
511-
.count_current()
512-
.get_result::<i64>(conn)
513-
.await
514-
.optional()?;
519+
.step(async |start: i64, end: i64| {
520+
let count = rq::CopyEntityBatchQuery::new(
521+
self.dst.as_ref(),
522+
&self.src,
523+
start,
524+
end,
525+
self.target_block,
526+
)?
527+
.count_current()
528+
.get_result::<i64>(conn)
529+
.await
530+
.optional()?;
515531
Ok(count.unwrap_or(0) as i32)
516532
})
517533
.await?;

store/postgres/src/deployment_store.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1651,11 +1651,21 @@ impl DeploymentStore {
16511651
.await?;
16521652
}
16531653

1654-
// Rewind the subgraph so that entity versions that are
1655-
// clamped in the future (beyond `block`) become valid for
1656-
// all blocks after `block`. `revert_block` gets rid of
1657-
// everything including the block passed to it. We want to
1658-
// preserve `block` and therefore revert `block+1`
1654+
// CopyEntityBatchQuery now reverts entity versions
1655+
// during copying, making this rewind redundant for new
1656+
// copies. We keep it for backward compatibility: a copy
1657+
// that was started before this change and is resumed
1658+
// after upgrading will have already-copied rows that
1659+
// weren't reverted during copy. For data that was
1660+
// already reverted during copy, this is a no-op. This
1661+
// code can be removed once a release with this change
1662+
// has been out for a while and we are sure that there
1663+
// are no more copies in progress that started before
1664+
// the change
1665+
//
1666+
// `revert_block` gets rid of everything including the
1667+
// block passed to it. We want to preserve `block` and
1668+
// therefore revert `block+1`
16591669
let start = Instant::now();
16601670
let block_to_revert: BlockNumber = block
16611671
.number

store/postgres/src/relational_queries.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5091,6 +5091,7 @@ pub struct CopyEntityBatchQuery<'a> {
50915091
columns: Vec<&'a Column>,
50925092
first_vid: i64,
50935093
last_vid: i64,
5094+
target_block: BlockNumber,
50945095
}
50955096

50965097
impl<'a> CopyEntityBatchQuery<'a> {
@@ -5099,6 +5100,7 @@ impl<'a> CopyEntityBatchQuery<'a> {
50995100
src: &'a Table,
51005101
first_vid: i64,
51015102
last_vid: i64,
5103+
target_block: BlockNumber,
51025104
) -> Result<Self, StoreError> {
51035105
let mut columns = Vec::new();
51045106
for dcol in &dst.columns {
@@ -5125,6 +5127,7 @@ impl<'a> CopyEntityBatchQuery<'a> {
51255127
columns,
51265128
first_vid,
51275129
last_vid,
5130+
target_block,
51285131
})
51295132
}
51305133

@@ -5209,7 +5212,16 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
52095212
);
52105213
out.push_sql(&checked_conversion);
52115214
}
5212-
(false, false) => out.push_sql(BLOCK_RANGE_COLUMN),
5215+
(false, false) => {
5216+
let range_conv = format!(
5217+
r#"
5218+
case when upper({BLOCK_RANGE_COLUMN}) > {}
5219+
then int4range(lower({BLOCK_RANGE_COLUMN}), null)
5220+
else {BLOCK_RANGE_COLUMN} end"#,
5221+
self.target_block
5222+
);
5223+
out.push_sql(&range_conv)
5224+
}
52135225
}
52145226

52155227
match (self.src.has_causality_region, self.dst.has_causality_region) {
@@ -5239,6 +5251,16 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
52395251
out.push_bind_param::<BigInt, _>(&self.first_vid)?;
52405252
out.push_sql(" and vid <= ");
52415253
out.push_bind_param::<BigInt, _>(&self.last_vid)?;
5254+
out.push_sql(" and ");
5255+
if self.src.immutable {
5256+
out.push_sql(BLOCK_COLUMN);
5257+
} else {
5258+
out.push_sql("lower(");
5259+
out.push_sql(BLOCK_RANGE_COLUMN);
5260+
out.push_sql(")");
5261+
}
5262+
out.push_sql(" <= ");
5263+
out.push_bind_param::<Integer, _>(&self.target_block)?;
52425264
out.push_sql("\n returning ");
52435265
if self.dst.immutable {
52445266
out.push_sql("true");

0 commit comments

Comments
 (0)