[pull] trunk from spiceai:trunk#853
Merged
Merged
Conversation
…ESTAMPTZ columns (#10947) * test: Add failing tests for monotonic cast ordering propagation in SchemaCastScanExec Add tests that verify SchemaCastScanExec should propagate ordering through monotonic casts (temporal→temporal, numeric widening) and should return maintains_input_order=false for non-monotonic casts. These tests currently fail, demonstrating the RowConverter schema mismatch bug when using ORDER BY on partitioned DuckDB-accelerated tables with TIMESTAMPTZ columns. * fix: Propagate ordering through monotonic casts in SchemaCastScanExec Add is_order_preserving_cast() helper that identifies monotonic type casts (temporal→temporal, numeric→numeric) following DataFusion's CastExpr convention. Update equivalence_properties to propagate input ordering when the sort-key column cast is monotonic, and update maintains_input_order() to return false only when a non-monotonic cast exists. This fixes the 'RowConverter column schema mismatch' and 'does not satisfy order requirements' errors when using ORDER BY on partitioned DuckDB-accelerated tables with TIMESTAMPTZ columns (Timestamp µs→ns cast). * fix formatting * update datafusion-table-providers, to include datafusion schema fix * fix: Tighten is_order_preserving_cast to whitelist safe numeric widenings Address review comments: - Restrict numeric casts to a known-safe monotonic whitelist instead of allowing all numeric→numeric (signed↔unsigned can reorder). - Trim comments for clarity. - Update stale comment above ordering propagation logic. - Add comprehensive unit tests for is_order_preserving_cast and is_numeric_widening covering all positive and negative cases. - Add test for sort-key-unchanged-but-other-column-cast edge case. * test: Update retention test to expect Timestamp(µs) from DuckDB accelerator DuckDB stores timestamps in microsecond precision. With the table-providers fix, DuckSqlExec now correctly reports its actual µs schema instead of claiming ns. Update the test assertions accordingly. * Move is_order_preserving_cast and is_numeric_widening to arrow_tools::schema * Fix clippy::unnested_or_patterns in is_numeric_widening * remove unused import * Skip DuckDB nullability assertion in test_schema_preservation DuckDB does not preserve NOT NULL field metadata when returning Arrow results (all scanned columns are reported as nullable). See: duckdb/duckdb#4629 * Fix test * fix lint * Advertise engine schema from partitions in PartitionTableProvider Add test to verify schema reflects engine type downgrades (e.g. Timestamp ns→µs) * linting * Revert partitioned table schema override (handled by DTP schema cast at read time) * Update datafusion-table-providers to 846d4de245e919bf3c3c1729c85f50a3564d7949 Include datafusion-contrib/datafusion-table-providers#652 * cleanup * cleanup --------- Co-authored-by: Sergei Grebnov <sergei.grebnov@gmail.com> Co-authored-by: Jeadie <jeadie@users.noreply.github.com> Co-authored-by: jeadie <jack@spice.ai>
* Add CorrelatedResponses helper for control-stream request/response correlation Introduces a DashMap-backed pending-request registry keyed by request_id, with register / remove / deliver operations and a send_correlated free function that drives the full request lifecycle (register, send, await with timeout, cleanup on every failure path). This is a pure addition. Subsequent commits migrate the existing metrics path to use it and introduce a new Ack message type for partition-update reliability. Refs #10963 * Migrate request_metrics to CorrelatedResponses helper Replaces the inline lifecycle code in ExecutorConnection::request_metrics with a call to send_correlated. Replaces the tokio RwLock<HashMap> pending-request map with a DashMap-backed CorrelatedResponses<MetricsResponse>. Reduces the control-stream dispatch site to a single deliver() call. Behavior is preserved: no timeout on metrics requests (None passed to send_correlated). Existing 57 tests in runtime-cluster pass unchanged. Refs #10963 * Add Ack proto message and route on the control stream Adds: - A generic Ack message (request_id + optional error) so scheduler→executor commands can require delivery confirmation. Reused by UpdatePartitions in the next commit; eligible for CancelTasks / RefreshDataset in follow-ups. - request_id field on UpdatePartitions. Empty == fire-and-forget (legacy); non-empty triggers ack-correlated handling. - pending_acks: CorrelatedResponses<Ack> on ExecutorConnection alongside the existing pending_metrics map. - RegisteredHandles struct returned by ExecutorRegistry::register so the control-stream dispatcher can route both response types. - Match arm in the inbound dispatcher to deliver acks to their waiters. The notify_executor_of_assignments call site still sends empty request_id (unchanged behavior); the next commit switches it to await ack + retry. Refs #10963 * Gate allocate_initial_partitions on accelerated-table readiness The scheduler used to serve allocate_initial_partitions RPCs before its own load_datasets() had registered the accelerated tables. The downstream partition_value_to_bytes call then went through DataFusion::try_parse_expr, which needs the table's DFSchema from the catalog. If the table was not yet registered, the call returned "Table not found when parsing expression" and the loop logged-and-skipped the partition, returning Ok to the executor with incomplete bytes — split-brain between partition_store assignments and executor_registry.set_executor_partitions. This commit: - Adds a readiness check at the top of allocate_initial_partitions that iterates accelerated_tables(app) and returns Status::unavailable if any are not yet in the SessionContext. Executors retry on Unavailable (wired in a follow-up commit). - Stops swallowing per-partition serialization errors. With the readiness gate, the dataset-not-ready case no longer reaches the inner loop; anything that does is a real bug and is now returned as Status::internal so it surfaces instead of silently dropping the partition. The notify path through PartitionAssignmentTask::run_assignment_cycle already retries on the next tick when serialization fails, and the follow-up ack-with-retry commit replaces its fire-and-forget shape with an explicit retry loop driven by the executor's ack. Refs #10963 * Executor: retry allocate_initial_partitions on Unavailable Wraps the one-shot executor_request_initial_partitions call in a fibonacci backoff loop, retrying when the scheduler returns tonic::Code::Unavailable (emitted by the scheduler-side readiness gate while its load_datasets is still in flight). Any other error still surfaces immediately as before, so real misconfiguration / connectivity failures continue to fail the executor startup with the original message. Reuses the SCHEDULER_BACKOFF_MAX constant and FibonacciBackoffBuilder already used elsewhere in the executor poll loop. Refs #10963 * Executor sends Ack from UpdatePartitions handler Changes the executor's control-stream message handling so that the result of applying an UpdatePartitions message is reported back to the scheduler via the new Ack message. - PartitionUpdateHandler now returns Result<(), String>. update_partition_ assignments and update_partition_refresh_sql in lib.rs are reworked to propagate errors instead of log-and-swallow at multiple layers. - New Error::UnableToUpdateClusterPartitionFilters variant gives the inner datafusion error a typed home. - The UpdatePartitions handler in control_stream_client captures the handler's Result and, if the scheduler asked for an ack (non-empty request_id), sends ExecutorMessage::Ack back via outbound_tx. Empty request_id keeps the legacy fire-and-forget shape, no ack sent. The next commit wires the scheduler to generate request_ids and await the ack with a retry loop. Refs #10963 * Scheduler notify_executor_of_assignments: ack-based send with retry Replaces the fire-and-forget send_command call in the per-executor assignment notification path with send_command_with_ack, so the scheduler now waits for the executor to confirm it applied the UpdatePartitions message and retries on transient failure. - New executor_registry methods: send_command_with_ack drives the send_correlated lifecycle against the executor's pending_acks. Returns classified errors (AckTimeout, AckFailed, ExecutorNotRegistered) with an is_retryable() helper for callers. - notify_executor_of_assignments wraps the per-table send in a fibonacci backoff loop. Retries while the error is retryable (AckTimeout, AckFailed -- e.g. executor reports dataset still loading) up to NOTIFY_MAX_RETRIES. After exhaustion the partition assignment stays committed in the store; the next PartitionAssignmentTask reconcile cycle re-attempts. - New constants: NOTIFY_ACK_TIMEOUT (10s per attempt), NOTIFY_BACKOFF_MAX (5s upper backoff bound), NOTIFY_MAX_RETRIES (8 attempts). - Adds four unit tests on send_command_with_ack covering: success, application error (AckFailed + retryable), timeout (AckTimeout + retryable), unknown executor (ExecutorNotRegistered, not retryable). The unload path (notify_executor_to_unload at service.rs:749) still uses fire-and-forget send_command; converting it to ack-based is a follow-up since it's not in the startup-race critical path. Refs #10963 * Address review feedback: extract readiness fn, gate reconcile, fail deserialize, doc/payload cleanup - Extract the accelerated-table readiness check into `partition::first_unready_accelerated_table(app, df)` so both call sites share the same logic. The new helper takes a snapshotted `Arc<App>` so no async lock guard is held across the get_table awaits. - Gate `PartitionAssignmentTask::run_assignment_cycle` on the same readiness check: if any accelerated table is still loading during a scheduler startup tick, defer the cycle and let the next tick retry. The notify path inside reconcile_all calls partition_value_to_bytes, so it has the same race window as allocate_initial_partitions. - Make partition-expression deserialization in update_partition_assignments fatal (new Error::UnableToDeserializeClusterPartitionExpression variant). Previously it logged-and-skipped; with ack-based delivery the scheduler would mark the update delivered while the executor silently dropped part of it. - correlated.rs: collapse the timeout match with let-else for clippy::single_match_else; also fix the send_correlated doc to accurately describe pending-entry cleanup (the entry lives in CorrelatedResponses and is independent of request_tx state, contra what the original comment implied). - service.rs notify path: wrap the partition payload in an Arc so each retry attempt only does the unavoidable BytesArray clone (proto requires owned data) once at message-build time instead of twice per loop iteration. - Hoist ALLOCATE_INITIAL_PARTITIONS_MAX_RETRIES to module scope (clippy::items_after_statements). Refs #10963
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to subscribe to this conversation on GitHub.
Already have an account?
Sign in.
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
See Commits and Changes for more details.
Created by
pull[bot] (v2.0.0-alpha.4)
Can you help keep this open source service alive? 💖 Please sponsor : )