Skip to content

Commit 9041eae

Browse files
authored
fix(cayenne): Skip catalog refresh state reload for existing providers (spiceai#10396)
1 parent a874314 commit 9041eae

1 file changed

Lines changed: 6 additions & 110 deletions

File tree

crates/cayenne/src/catalog_provider.rs

Lines changed: 6 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,14 @@ use std::sync::{Arc, RwLock};
3030
use async_trait::async_trait;
3131

3232
use data_components::RefreshableCatalogProvider;
33-
use data_components::poly::PolyTableProvider;
3433
use datafusion::catalog::{CatalogProvider, SchemaProvider, TableProvider};
3534
use datafusion::error::Result as DFResult;
3635
use datafusion::execution::runtime_env::RuntimeEnv;
37-
#[cfg(feature = "partition-table-provider")]
38-
use runtime_table_partition::provider::PartitionTableProvider;
3936
use snafu::prelude::*;
4037

4138
use crate::catalog::CatalogError;
4239
use crate::metadata::{CompressionStrategy, VortexConfig};
43-
use crate::{CayenneCatalog, CayenneTableProvider, CayenneTableProviderBuilder, MetadataCatalog};
40+
use crate::{CayenneCatalog, CayenneTableProviderBuilder, MetadataCatalog};
4441

4542
/// Configuration for constructing a [`CayenneCatalogProvider`].
4643
#[derive(Debug, Clone)]
@@ -337,9 +334,7 @@ impl RefreshableCatalogProvider for CayenneCatalogProvider {
337334
.as_any()
338335
.downcast_ref::<CayenneSchemaProvider>()
339336
{
340-
existing_cayenne_schema
341-
.refresh_from(&refreshed_schema)
342-
.await;
337+
existing_cayenne_schema.refresh_from(&refreshed_schema);
343338
new_schemas.insert(ns.clone(), Arc::clone(existing_schema));
344339
} else {
345340
new_schemas.insert(ns.clone(), Arc::new(refreshed_schema));
@@ -471,21 +466,17 @@ impl CayenneSchemaProvider {
471466
}
472467
}
473468

474-
async fn refresh_from(&self, source: &Self) {
469+
fn refresh_from(&self, source: &Self) {
475470
let existing_tables = self.tables_snapshot();
476471
let refreshed_tables = source.tables_snapshot();
477472
let mut merged_tables = HashMap::with_capacity(refreshed_tables.len());
478473

479474
for (table_name, refreshed_provider) in refreshed_tables {
480475
let provider_to_use = if let Some(existing_provider) = existing_tables.get(&table_name)
481476
{
482-
if Self::refresh_table_provider_in_place(existing_provider, &refreshed_provider)
483-
.await
484-
{
485-
Arc::clone(existing_provider)
486-
} else {
487-
refreshed_provider
488-
}
477+
// Existing providers are authoritative — their in-memory state is kept up-to-date by writes (insert, delete, etc).
478+
// Reloading from the catalog is redundant and leads to unnecessary work and side effects including cache invalidations
479+
Arc::clone(existing_provider)
489480
} else {
490481
refreshed_provider
491482
};
@@ -496,101 +487,6 @@ impl CayenneSchemaProvider {
496487
self.replace_tables(merged_tables);
497488
}
498489

499-
async fn refresh_table_provider_in_place(
500-
existing_provider: &Arc<dyn TableProvider>,
501-
refreshed_provider: &Arc<dyn TableProvider>,
502-
) -> bool {
503-
// Case 1: Non-partitioned CayenneTableProvider — refresh in place from the
504-
// freshly-loaded provider.
505-
if let Some(existing_cayenne) = Self::cayenne_table_from_provider(existing_provider) {
506-
let Some(refreshed_cayenne) = Self::cayenne_table_from_provider(refreshed_provider)
507-
else {
508-
return false;
509-
};
510-
511-
if let Err(err) = existing_cayenne.refresh(refreshed_cayenne).await {
512-
tracing::warn!("Failed to refresh Cayenne table provider in place: {err}");
513-
return false;
514-
}
515-
516-
return true;
517-
}
518-
519-
// Case 2: PolyTableProvider — extract the writer and check its inner type.
520-
let Some(poly) = existing_provider
521-
.as_any()
522-
.downcast_ref::<PolyTableProvider>()
523-
else {
524-
return false;
525-
};
526-
527-
let existing_writer = poly.writer();
528-
529-
// Case 2a: Non-partitioned CayenneTableProvider inside PolyTableProvider.
530-
if let Some(existing_cayenne) = existing_writer
531-
.as_any()
532-
.downcast_ref::<CayenneTableProvider>()
533-
{
534-
let refreshed_writer = refreshed_provider
535-
.as_any()
536-
.downcast_ref::<PolyTableProvider>()
537-
.map(PolyTableProvider::writer);
538-
539-
let refreshed_cayenne = refreshed_writer
540-
.as_ref()
541-
.and_then(|w| w.as_any().downcast_ref::<CayenneTableProvider>());
542-
543-
if let Some(refreshed) = refreshed_cayenne {
544-
if let Err(err) = existing_cayenne.refresh(refreshed).await {
545-
tracing::warn!("Failed to refresh Cayenne table provider in place: {err}");
546-
return false;
547-
}
548-
return true;
549-
}
550-
551-
return false;
552-
}
553-
554-
// Case 2b: PartitionTableProvider inside PolyTableProvider — refresh each
555-
// partition's inner CayenneTableProvider from itself (reloads deletion
556-
// vectors, protected snapshots, snapshot ID, and listing table from the
557-
// catalog).
558-
#[cfg(feature = "partition-table-provider")]
559-
if let Some(partition_provider) = existing_writer
560-
.as_any()
561-
.downcast_ref::<PartitionTableProvider>()
562-
{
563-
let providers = partition_provider.partition_table_providers().await;
564-
for provider in &providers {
565-
// Each partition's inner provider is a CayenneTableProvider.
566-
567-
use crate::CayenneTableProvider;
568-
let Some(cayenne) = provider.as_any().downcast_ref::<CayenneTableProvider>() else {
569-
tracing::warn!(
570-
"Partition sub-provider is not a CayenneTableProvider, skipping refresh"
571-
);
572-
continue;
573-
};
574-
if let Err(err) = cayenne.refresh(cayenne).await {
575-
tracing::warn!("Failed to refresh partitioned Cayenne table in place: {err}");
576-
// Fail safely: keep the existing partitioned provider rather than
577-
// signaling failure that would cause it to be replaced by a
578-
// non-partitioned provider.
579-
return true;
580-
}
581-
}
582-
return true;
583-
}
584-
585-
false
586-
}
587-
588-
fn cayenne_table_from_provider(
589-
provider: &Arc<dyn TableProvider>,
590-
) -> Option<&CayenneTableProvider> {
591-
provider.as_any().downcast_ref::<CayenneTableProvider>()
592-
}
593-
594490
fn clear_tables(&self) {
595491
self.replace_tables(HashMap::new());
596492
}

0 commit comments

Comments
 (0)