Skip to content

Commit 10bb718

Browse files
Jeadielukekimkrinart
authored
Ensure we query one and only one partition per request (spiceai#9416)
* Ensure we query one and only one partition per request * fix merge * clippy * clip * clippy again?? * fix * fix get_partitions_legacy * clippy * clippy Entire-Checkpoint: 236c651af457 * clippy * fix comments * fix merge * PR comments * clippy * docs * Use in_memory partition manager by default + timeout update * Fix table creation * Fix cayenne_catalog::tests::test_create_table_detects --------- Co-authored-by: Luke Kim <80174+lukekim@users.noreply.github.com> Co-authored-by: Viktor Yershov <viktor@spice.ai>
1 parent ce5d54d commit 10bb718

15 files changed

Lines changed: 614 additions & 222 deletions

File tree

crates/cayenne/src/cayenne_catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2023,7 +2023,7 @@ mod tests {
20232023
assert!(metadata.primary_key.is_empty());
20242024
assert_eq!(metadata.table_id, table_id_1);
20252025

2026-
// Try to recreate with a primary key change — should return ChangedConfiguration error
2026+
// Now try to create with a primary key change — should get ChangedConfiguration error
20272027
let options_changed = CreateTableOptions {
20282028
table_name: "test_table".to_string(),
20292029
schema: Arc::clone(&schema),

crates/object_store_occ/src/state.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,16 @@ pub struct ObjectState<T> {
7777
_marker: PhantomData<T>,
7878
}
7979

80+
impl<T> std::fmt::Debug for ObjectState<T> {
81+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82+
f.debug_struct("ObjectState")
83+
.field("prefix", &self.prefix)
84+
.field("store", &"Arc<dyn ObjectStore>")
85+
.field("cache_size", &self.cache.read().len())
86+
.finish_non_exhaustive()
87+
}
88+
}
89+
8090
impl<T> ObjectState<T>
8191
where
8292
T: Serialize + DeserializeOwned + Clone + Send + Sync,

crates/runtime-datafusion/src/analyzer_rule/partitioned_table_scan_rewrite.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub trait TablePartitionProvider: Send + Sync + Debug {
3939
fn get_partitions(
4040
&self,
4141
table: &TableReference,
42-
schema: SchemaRef,
42+
schema: &SchemaRef,
4343
) -> Vec<(Arc<dyn TableProvider>, Vec<Expr>)>;
4444

4545
/// Whether partitioning should be applied to the given table.
@@ -104,7 +104,7 @@ impl AnalyzerRule for PartitionedTableScanRewrite {
104104

105105
let providers = self
106106
.partition_provider
107-
.get_partitions(&scan.table_name, scan.source.schema());
107+
.get_partitions(&scan.table_name, &scan.source.schema());
108108

109109
tracing::debug!(
110110
"PartitionedTableScanRewrite: {} partitions for '{}' table.",

crates/runtime/src/builder.rs

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ limitations under the License.
1616

1717
use crate::cluster::DistributedNode;
1818
use crate::cluster::ExecutorRegistry;
19+
use crate::cluster::PartitionManager;
1920
use crate::cluster::ResolvedClusterConfig;
21+
use crate::cluster::partition;
2022
use crate::config::ClusterRole;
2123
use crate::config::Config;
2224
use crate::datafusion::udf::register_udfs;
@@ -225,19 +227,63 @@ impl RuntimeBuilder {
225227

226228
// Create resource monitor early so it can be passed to DataFusion
227229
let resource_monitor = crate::resource_monitor::ResourceMonitor::new();
230+
let secrets = Arc::new(RwLock::new(Self::load_secrets(self.app.as_ref()).await));
228231

229-
let distributed = match self
232+
let distributed: Option<DistributedNode> = match self
230233
.resolved_cluster_config
231234
.as_ref()
232235
.and_then(ResolvedClusterConfig::effective_role)
233236
{
234-
Some(ClusterRole::Scheduler) => Some(DistributedNode::Scheduler {
235-
peers: Arc::new(RwLock::new(HashMap::new())),
236-
// Initialized later when scheduler registry starts
237-
job_executor: Arc::new(RwLock::new(None)),
238-
executor_registry: Arc::new(ExecutorRegistry::new()),
239-
partition_manager: Arc::new(RwLock::new(None)),
240-
}),
237+
Some(ClusterRole::Scheduler) => {
238+
if let Some(scheduler_config) = self
239+
.app
240+
.as_ref()
241+
.and_then(|app| app.runtime.scheduler.clone())
242+
{
243+
match partition::build_partition_metadata_store(
244+
io_runtime.clone(),
245+
Arc::clone(&secrets),
246+
&scheduler_config,
247+
)
248+
.await
249+
{
250+
Ok(store) => {
251+
let partition_manager = Arc::new(PartitionManager::new(store));
252+
253+
Some(DistributedNode::Scheduler {
254+
peers: Arc::new(RwLock::new(HashMap::new())),
255+
// Initialized later when scheduler registry starts
256+
job_executor: Arc::new(RwLock::new(None)),
257+
executor_registry: Arc::new(ExecutorRegistry::new(Arc::clone(
258+
&partition_manager,
259+
))),
260+
partition_manager,
261+
})
262+
}
263+
Err(e) => {
264+
tracing::error!(
265+
"Failed to initialize partition metadata store for scheduler: {e}"
266+
);
267+
None
268+
}
269+
}
270+
} else {
271+
tracing::warn!(
272+
"'--role scheduler' was specified but no `runtime.scheduler` field was found in spicepod.yaml. Using in-memory partition store."
273+
);
274+
let partition_manager = Arc::new(PartitionManager::new(Arc::new(
275+
object_store::memory::InMemory::new(),
276+
)));
277+
Some(DistributedNode::Scheduler {
278+
peers: Arc::new(RwLock::new(HashMap::new())),
279+
job_executor: Arc::new(RwLock::new(None)),
280+
executor_registry: Arc::new(ExecutorRegistry::new(Arc::clone(
281+
&partition_manager,
282+
))),
283+
partition_manager,
284+
})
285+
}
286+
}
241287
Some(ClusterRole::Executor) => Some(DistributedNode::Executor {
242288
partition_assignments: Arc::new(RwLock::new(HashMap::new())),
243289
}),
@@ -297,8 +343,6 @@ impl RuntimeBuilder {
297343
None
298344
};
299345

300-
let secrets = Self::load_secrets(self.app.as_ref()).await;
301-
302346
let evals = self
303347
.app
304348
.as_ref()
@@ -318,7 +362,7 @@ impl RuntimeBuilder {
318362
tools: Arc::new(RwLock::new(HashMap::new())),
319363
tool_factories: Arc::new(Mutex::new(HashMap::new())),
320364
pods_watcher: Arc::new(RwLock::new(self.pods_watcher)),
321-
secrets: Arc::new(RwLock::new(secrets)),
365+
secrets,
322366
spaced_tracer: Arc::new(tracers::SpacedTracer::new(Duration::from_secs(15))),
323367
autoload_extensions: Arc::new(self.autoload_extensions),
324368
extensions: Arc::new(RwLock::new(HashMap::new())),

0 commit comments

Comments
 (0)