Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions src/catalog/src/table_source/dummy_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use async_trait::async_trait;
use common_catalog::format_full_table_name;
use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider};
use datafusion::datasource::TableProvider;
use session::context::QueryContextRef;
use snafu::OptionExt;
use table::table::adapter::DfTableProviderAdapter;

Expand All @@ -32,12 +33,27 @@ use crate::error::TableNotExistSnafu;
#[derive(Clone)]
pub struct DummyCatalogList {
catalog_manager: CatalogManagerRef,
query_ctx: Option<QueryContextRef>,
}

impl DummyCatalogList {
/// Creates a new catalog list with the given catalog manager.
/// Creates a new catalog list with the given catalog manager (no query context).
pub fn new(catalog_manager: CatalogManagerRef) -> Self {
Self { catalog_manager }
Self {
catalog_manager,
query_ctx: None,
}
}

/// Creates a new catalog list with the given catalog manager and query context.
pub fn new_with_query_ctx(
catalog_manager: CatalogManagerRef,
query_ctx: QueryContextRef,
) -> Self {
Self {
catalog_manager,
query_ctx: Some(query_ctx),
}
}
}

Expand Down Expand Up @@ -68,6 +84,7 @@ impl CatalogProviderList for DummyCatalogList {
Some(Arc::new(DummyCatalogProvider {
catalog_name: catalog_name.to_string(),
catalog_manager: self.catalog_manager.clone(),
query_ctx: self.query_ctx.clone(),
}))
}
}
Expand All @@ -77,6 +94,7 @@ impl CatalogProviderList for DummyCatalogList {
struct DummyCatalogProvider {
catalog_name: String,
catalog_manager: CatalogManagerRef,
query_ctx: Option<QueryContextRef>,
}

impl CatalogProvider for DummyCatalogProvider {
Expand All @@ -93,6 +111,7 @@ impl CatalogProvider for DummyCatalogProvider {
catalog_name: self.catalog_name.clone(),
schema_name: schema_name.to_string(),
catalog_manager: self.catalog_manager.clone(),
query_ctx: self.query_ctx.clone(),
}))
}
}
Expand All @@ -111,6 +130,7 @@ struct DummySchemaProvider {
catalog_name: String,
schema_name: String,
catalog_manager: CatalogManagerRef,
query_ctx: Option<QueryContextRef>,
}

#[async_trait]
Expand All @@ -126,7 +146,12 @@ impl SchemaProvider for DummySchemaProvider {
async fn table(&self, name: &str) -> datafusion::error::Result<Option<Arc<dyn TableProvider>>> {
let table = self
.catalog_manager
.table(&self.catalog_name, &self.schema_name, name, None)
.table(
&self.catalog_name,
&self.schema_name,
name,
self.query_ctx.as_deref(),
)
.await?
.with_context(|| TableNotExistSnafu {
table: format_full_table_name(&self.catalog_name, &self.schema_name, name),
Expand Down
17 changes: 8 additions & 9 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ impl RegionServer {
let ctx = request.header.as_ref().map(|h| h.into());
let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));

let region_id = request.region_id;
let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan)
.context(DataFusionSnafu)?;
let mut injector = injector_builder
Expand All @@ -323,7 +324,6 @@ impl RegionServer {
.context(DataFusionSnafu)?
.data;

let region_id = request.region_id;
let stream = self
.inner
.handle_read(QueryRequest { plan, ..request }, query_ctx.clone())
Expand Down Expand Up @@ -770,14 +770,13 @@ fn wrap_flow_region_watermark_stream(
region_id: RegionId,
query_ctx: &QueryContextRef,
) -> SendableRecordBatchStream {
let Some(seq) = should_collect_region_watermark_from_extensions(&query_ctx.extensions())
.then(|| query_ctx.get_snapshot(region_id.as_u64()))
.flatten()
else {
return stream;
};

Box::pin(RegionWatermarkStream::new(stream, region_id, seq))
if should_collect_region_watermark_from_extensions(&query_ctx.extensions())
&& let Some(seq) = query_ctx.get_snapshot(region_id.as_u64())
{
Box::pin(RegionWatermarkStream::new(stream, region_id, seq)) as SendableRecordBatchStream
} else {
stream
}
}

/// Wraps a region read stream so terminal metrics can carry the scan-open watermark.
Expand Down
Loading
Loading