Skip to content

Commit f67b9b5

Browse files
yezizp2012lmatz
authored andcommitted
fix: fix cancel of streaming job when create command is stashed (#10049)
1 parent 7440536 commit f67b9b5

File tree

2 files changed

+31
-4
lines changed

2 files changed

+31
-4
lines changed

src/meta/src/barrier/command.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,24 @@ where
477477
}
478478
}
479479

480+
/// For `CancelStreamingJob`, returns the table id of the target table.
481+
pub fn table_to_cancel(&self) -> Option<TableId> {
482+
match &self.command {
483+
Command::CancelStreamingJob(table_fragments) => Some(table_fragments.table_id()),
484+
_ => None,
485+
}
486+
}
487+
488+
/// For `CreateStreamingJob`, returns the table id of the target table.
489+
pub fn table_to_create(&self) -> Option<TableId> {
490+
match &self.command {
491+
Command::CreateStreamingJob {
492+
table_fragments, ..
493+
} => Some(table_fragments.table_id()),
494+
_ => None,
495+
}
496+
}
497+
480498
/// Clean up actors in CNs if needed, used by drop, cancel and reschedule commands.
481499
async fn clean_up(
482500
&self,

src/meta/src/barrier/mod.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,11 @@ where
221221
}
222222
}
223223

224+
fn cancel_stashed_command(&mut self, id: TableId) {
225+
self.finished_commands
226+
.retain(|x| x.context.table_to_create() != Some(id));
227+
}
228+
224229
/// Before resolving the actors to be sent or collected, we should first record the newly
225230
/// created table and added actors into checkpoint control, so that `can_actor_send_or_collect`
226231
/// will return `true`.
@@ -964,6 +969,14 @@ where
964969
checkpoint_control.stash_command_to_finish(command);
965970
}
966971

972+
if let Some(command) = cancelled_command {
973+
checkpoint_control.cancel_command(command);
974+
} else if let Some(table_id) = node.command_ctx.table_to_cancel() {
975+
// the cancelled command is possibly stashed in `finished_commands` and waiting
976+
// for checkpoint, we should also clear it.
977+
checkpoint_control.cancel_stashed_command(table_id);
978+
}
979+
967980
let remaining = checkpoint_control.finish_commands(checkpoint).await?;
968981
// If there are remaining commands (that requires checkpoint to finish), we force
969982
// the next barrier to be a checkpoint.
@@ -972,10 +985,6 @@ where
972985
self.scheduled_barriers.force_checkpoint_in_next_barrier();
973986
}
974987

975-
if let Some(command) = cancelled_command {
976-
checkpoint_control.cancel_command(command);
977-
}
978-
979988
node.timer.take().unwrap().observe_duration();
980989
node.wait_commit_timer.take().unwrap().observe_duration();
981990

0 commit comments

Comments
 (0)