Skip to content

Commit c235d6b

Browse files
authored
[SpiceDQ] Improve error messages; Avoid race condition on allocate_initial_partitions. (spiceai#9579)
* make errors better * Avoid issues when executor requests allocate_initial_partitions earlier than scheduler can handle * fix lint * linting
1 parent ac6c4e5 commit c235d6b

3 files changed

Lines changed: 66 additions & 23 deletions

File tree

crates/runtime/src/cluster/partition/manager.rs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ pub enum Error {
4040
#[snafu(display("Partition {partition} not found in table {table}"))]
4141
PartitionNotFound { table: String, partition: String },
4242

43+
#[snafu(display("No partition metadata found for table {table}"))]
44+
TableMetadataNotFound { table: String },
45+
4346
#[snafu(display("Concurrent modification detected for table {table}"))]
4447
ConcurrentModification { table: String },
4548
}
@@ -152,13 +155,10 @@ impl PartitionManager {
152155

153156
loop {
154157
let now_ms = now_ms()?;
155-
let mut metadata =
156-
self.get_table_metadata(table)
157-
.await?
158-
.ok_or_else(|| Error::PartitionNotFound {
159-
table: key.clone(),
160-
partition: "any".to_string(),
161-
})?;
158+
let mut metadata = self
159+
.get_table_metadata(table)
160+
.await?
161+
.ok_or_else(|| Error::TableMetadataNotFound { table: key.clone() })?;
162162

163163
let mut allocated: Vec<_> = metadata
164164
.partitions
@@ -219,13 +219,10 @@ impl PartitionManager {
219219

220220
loop {
221221
let now_ms = now_ms()?;
222-
let mut metadata =
223-
self.get_table_metadata(table)
224-
.await?
225-
.ok_or_else(|| Error::PartitionNotFound {
226-
table: key.clone(),
227-
partition: "any".to_string(),
228-
})?;
222+
let mut metadata = self
223+
.get_table_metadata(table)
224+
.await?
225+
.ok_or_else(|| Error::TableMetadataNotFound { table: key.clone() })?;
229226

230227
let mut updated = false;
231228
for partition in &mut metadata.partitions {

crates/runtime/src/cluster/service.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -603,8 +603,16 @@ impl ClusterService for ClusterServiceImpl {
603603
let app_guard = self.app.read().await;
604604
if let Some(app) = app_guard.as_ref() {
605605
// Find accelerated datasets with partitioning
606-
607606
for table_ref in super::partition::accelerated_tables(app).keys() {
607+
if partition_manager
608+
.get_cached_table_metadata(table_ref)
609+
.is_none()
610+
{
611+
tracing::info!(
612+
"No cached partition metadata for table {table_ref}. Scheduler likely has not finished discovering partitions for the table. Will not assign in initial allocation, but will get assigned on future assignments"
613+
);
614+
continue;
615+
}
608616
match partition_manager
609617
.allocate_partitions(table_ref, executor_id, 10)
610618
.await

crates/runtime/src/datafusion/refresh_sql.rs

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,13 @@ pub enum Error {
4747
))]
4848
ExpectedSingleSqlStatement { num_statements: usize },
4949

50-
#[snafu(display("Expected a SQL query starting with SELECT <columns> FROM {expected_table}"))]
51-
InvalidSqlStatement { expected_table: TableReference },
50+
#[snafu(display(
51+
"Expected a SQL query starting with SELECT <columns> FROM {expected_table}. {issue}"
52+
))]
53+
InvalidSqlStatement {
54+
expected_table: TableReference,
55+
issue: String,
56+
},
5257

5358
#[snafu(display(
5459
"Unexpected '{expr}' in the Refresh SQL statement. Rewrite the SQL to only perform WHERE filters, i.e. SELECT col1, col2, col3 FROM {expected_table} WHERE col1 = 'foo'"
@@ -127,7 +132,13 @@ pub fn parse_refresh_sql(
127132
)?;
128133
ensure!(
129134
select.from.len() == 1,
130-
InvalidSqlStatementSnafu { expected_table }
135+
InvalidSqlStatementSnafu {
136+
expected_table,
137+
issue: format!(
138+
"A single FROM clause with table reference is expected, found {}",
139+
select.from.len()
140+
)
141+
}
131142
);
132143

133144
ensure_no_expr!(select.cluster_by.is_empty(), "CLUSTER BY", expected_table);
@@ -178,11 +189,23 @@ pub fn parse_refresh_sql(
178189
if TableReference::parse_str(&table_name_with_schema)
179190
!= expected_table
180191
{
181-
return InvalidSqlStatementSnafu { expected_table }.fail();
192+
return InvalidSqlStatementSnafu {
193+
expected_table: expected_table.clone(),
194+
issue: format!(
195+
"Table name in refresh_sql should be {expected_table}, is {name}",
196+
),
197+
}
198+
.fail();
182199
}
183200
}
184201
_ => {
185-
return InvalidSqlStatementSnafu { expected_table }.fail();
202+
return InvalidSqlStatementSnafu {
203+
expected_table,
204+
issue:
205+
"No FROM clause with table reference found in SQL statement"
206+
.to_string(),
207+
}
208+
.fail();
186209
}
187210
}
188211

@@ -198,12 +221,27 @@ pub fn parse_refresh_sql(
198221

199222
Ok((refresh_sql, refresh_schema))
200223
}
201-
_ => InvalidSqlStatementSnafu { expected_table }.fail()?,
224+
_ => InvalidSqlStatementSnafu {
225+
expected_table,
226+
issue: format!(
227+
"Expected a basic Select SQL query statement, found {}",
228+
query.body
229+
),
230+
}
231+
.fail()?,
202232
}
203233
}
204-
_ => InvalidSqlStatementSnafu { expected_table }.fail()?,
234+
_ => InvalidSqlStatementSnafu {
235+
expected_table,
236+
issue: format!("Expected a Select SQL query statement, found {statement}"),
237+
}
238+
.fail()?,
205239
},
206-
_ => InvalidSqlStatementSnafu { expected_table }.fail()?,
240+
_ => InvalidSqlStatementSnafu {
241+
expected_table,
242+
issue: format!("Expected a SQL Statement, found {statement}"),
243+
}
244+
.fail()?,
207245
}
208246
}
209247

0 commit comments

Comments
 (0)