Skip to content

Commit ed4e528

Browse files
committed
node: Fix duplicate key constraint when reverting chain shard changes
- Implement robust backup naming system to avoid conflicts - Add support for reusing existing backups when appropriate - Preserve previous backups with unique names - Add comprehensive logging for backup operations Fixes #6196
1 parent 78e94cd commit ed4e528

1 file changed

Lines changed: 135 additions & 16 deletions

File tree

node/src/manager/commands/chain.rs

Lines changed: 135 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use graph::{
2222
use graph_chain_ethereum::EthereumAdapter;
2323
use graph_chain_ethereum::EthereumAdapterTrait as _;
2424
use graph_chain_ethereum::chain::BlockFinality;
25+
use graph_store_postgres::AsyncPgConnection;
2526
use graph_store_postgres::BlockStore;
2627
use graph_store_postgres::ChainStore;
2728
use graph_store_postgres::PoolCoordinator;
@@ -236,6 +237,39 @@ pub async fn update_chain_genesis(
236237
Ok(())
237238
}
238239

240+
struct ChainSwapOutcome {
241+
latest_backup_name: String,
242+
previous_backup_final_name: Option<String>,
243+
reused_previous_backup: bool,
244+
allocated_chain: bool,
245+
}
246+
247+
fn backup_name_for_suffix(chain: &str, base: &str, suffix: usize) -> String {
248+
if suffix == 0 {
249+
format!("{chain}-{base}")
250+
} else {
251+
format!("{chain}-{base}-{suffix}")
252+
}
253+
}
254+
255+
async fn next_backup_name(
256+
conn: &mut AsyncPgConnection,
257+
chain: &str,
258+
base: &str,
259+
) -> Result<String, StoreError> {
260+
let mut suffix = 0usize;
261+
262+
loop {
263+
let candidate = backup_name_for_suffix(chain, base, suffix);
264+
265+
if find_chain(conn, &candidate).await?.is_none() {
266+
return Ok(candidate);
267+
}
268+
269+
suffix += 1;
270+
}
271+
}
272+
239273
pub async fn change_block_cache_shard(
240274
primary_store: ConnectionPool,
241275
store: BlockStore,
@@ -250,51 +284,117 @@ pub async fn change_block_cache_shard(
250284
.await?
251285
.ok_or_else(|| anyhow!("unknown chain: {}", chain_name))?;
252286
let old_shard = chain.shard;
287+
let canonical_backup_name = format!("{chain_name}-old");
288+
289+
let existing_backup = find_chain(&mut conn, &canonical_backup_name).await?;
290+
let had_existing_backup = existing_backup.is_some();
291+
let existing_backup_store = store.chain_store(&canonical_backup_name).await;
253292

254293
println!("Current shard: {}", old_shard);
255294

256295
let chain_store = store
257296
.chain_store(&chain_name)
258297
.await
259298
.ok_or_else(|| anyhow!("unknown chain: {}", &chain_name))?;
260-
let new_name = format!("{}-old", &chain_name);
261299
let ident = chain_store.chain_identifier().await?;
300+
let target_shard = Shard::new(shard.clone())?;
301+
let reuse_existing_backup = existing_backup
302+
.as_ref()
303+
.map(|backup| backup.shard.as_str() == target_shard.as_str())
304+
.unwrap_or(false);
305+
306+
let allocated_chain = if reuse_existing_backup {
307+
None
308+
} else {
309+
let chain =
310+
BlockStore::allocate_chain(&mut conn, &chain_name, &target_shard, &ident).await?;
311+
store.add_chain_store(&chain, true).await?;
312+
Some(chain)
313+
};
262314

263-
conn.transaction::<(), StoreError, _>(|conn| {
315+
let outcome = conn.transaction::<ChainSwapOutcome, StoreError, _>(|conn| {
264316
async {
265-
let shard = Shard::new(shard.to_string())?;
266-
267-
let chain = BlockStore::allocate_chain(conn, &chain_name, &shard, &ident).await?;
268-
269-
store.add_chain_store(&chain, true).await?;
270-
271-
// Drop the foreign key constraint on deployment_schemas
272317
sql_query(
273318
"alter table deployment_schemas drop constraint deployment_schemas_network_fkey;",
274319
)
275320
.execute(conn).await?;
276321

277-
// Update the current chain name to chain-old
278-
update_chain_name(conn, &chain_name, &new_name).await?;
322+
let mut previous_backup_final_name: Option<String> = None;
323+
324+
let temp_backup_name = if let Some(backup) = existing_backup.as_ref() {
325+
let temp_name = next_backup_name(conn, &chain_name, "old").await?;
326+
update_chain_name(conn, &backup.name, &temp_name).await?;
327+
previous_backup_final_name = Some(temp_name.clone());
328+
Some(temp_name)
329+
} else {
330+
None
331+
};
332+
333+
let latest_backup_name = next_backup_name(conn, &chain_name, "old").await?;
334+
update_chain_name(conn, &chain_name, &latest_backup_name).await?;
279335

280-
// Create a new chain with the name in the destination shard
281-
let _ = add_chain(conn, &chain_name, &shard, ident).await?;
336+
if reuse_existing_backup {
337+
if let Some(temp_name) = temp_backup_name.as_ref() {
338+
update_chain_name(conn, temp_name, &chain_name).await?;
339+
previous_backup_final_name = Some(chain_name.clone());
340+
}
341+
} else {
342+
add_chain(conn, &chain_name, &target_shard, ident.clone()).await?;
343+
}
282344

283-
// Re-add the foreign key constraint
284345
sql_query(
285346
"alter table deployment_schemas add constraint deployment_schemas_network_fkey foreign key (network) references chains(name);",
286347
)
287348
.execute(conn).await?;
288-
Ok(())
349+
350+
Ok(ChainSwapOutcome {
351+
latest_backup_name,
352+
previous_backup_final_name,
353+
reused_previous_backup: reuse_existing_backup,
354+
allocated_chain: allocated_chain.is_some(),
355+
})
289356
}.scope_boxed()
290357
}).await?;
291358

292-
chain_store.update_name(&new_name).await?;
359+
let ChainSwapOutcome {
360+
latest_backup_name,
361+
previous_backup_final_name,
362+
reused_previous_backup,
363+
allocated_chain,
364+
} = outcome;
365+
366+
chain_store.update_name(&latest_backup_name).await?;
367+
368+
if let (Some(backup_store), Some(final_name)) = (
369+
existing_backup_store.as_ref(),
370+
previous_backup_final_name.as_ref(),
371+
) {
372+
backup_store.update_name(final_name).await?;
373+
}
293374

294375
println!(
295376
"Changed block cache shard for {} from {} to {}",
296377
chain_name, old_shard, shard
297378
);
379+
println!("Latest backup recorded as `{}`", latest_backup_name);
380+
381+
if reused_previous_backup {
382+
println!(
383+
"Reused existing backup `{}` as the active `{}` chain",
384+
canonical_backup_name, chain_name
385+
);
386+
} else if let Some(ref preserved) = previous_backup_final_name {
387+
if had_existing_backup {
388+
println!("Preserved earlier backup as `{}`", preserved);
389+
}
390+
}
391+
392+
if allocated_chain {
393+
println!(
394+
"Allocated new chain state for `{}` on shard {}",
395+
chain_name, shard
396+
);
397+
}
298398

299399
Ok(())
300400
}
@@ -329,3 +429,22 @@ pub async fn ingest(
329429
}
330430
Ok(())
331431
}
432+
433+
#[cfg(test)]
434+
mod tests {
435+
use super::backup_name_for_suffix;
436+
437+
#[test]
438+
fn backup_name_for_suffix_uses_plain_name_first() {
439+
assert_eq!(backup_name_for_suffix("mainnet", "old", 0), "mainnet-old");
440+
}
441+
442+
#[test]
443+
fn backup_name_for_suffix_adds_numeric_suffixes() {
444+
assert_eq!(backup_name_for_suffix("mainnet", "old", 1), "mainnet-old-1");
445+
assert_eq!(
446+
backup_name_for_suffix("mainnet", "old", 42),
447+
"mainnet-old-42"
448+
);
449+
}
450+
}

0 commit comments

Comments
 (0)