Skip to content

Commit af0ebc8

Browse files
committed
Block resuming a partition merge schema change. Resuming would require sharing the destination newdb,
which involves more complicated code changes and it will be properly addressed in a future PR. Signed-off-by: Dorin Hogea <[email protected]>
1 parent 2dcd065 commit af0ebc8

File tree

5 files changed

+78
-33
lines changed

5 files changed

+78
-33
lines changed

Diff for: db/osqlblockproc.c

+3-2
Original file line numberDiff line numberDiff line change
@@ -1471,8 +1471,9 @@ void *resume_sc_multiddl_txn_finalize(void *p)
14711471
} else {
14721472
logmsg(LOGMSG_ERROR, "%s: shard '%s', rc %d\n", __func__,
14731473
sc->tablename, sc->sc_rc);
1474-
sc_set_running(iq, sc, sc->tablename, 0, NULL, 0, __func__,
1475-
__LINE__);
1474+
if (sc->set_running)
1475+
sc_set_running(iq, sc, sc->tablename, 0, NULL, 0, __func__,
1476+
__LINE__);
14761477
free_schema_change_type(sc);
14771478
error = 1;
14781479
}

Diff for: db/osqlcomm.c

-4
Original file line numberDiff line numberDiff line change
@@ -6481,14 +6481,10 @@ static int _process_partitioned_table_merge(struct ireq *iq)
64816481
*/
64826482
sc->nothrevent = 1; /* we need do_alter_table to run first */
64836483
sc->finalize = 0;
6484-
enum comdb2_partition_type tt = sc->partition.type;
6485-
sc->partition.type = PARTITION_NONE;
64866484

64876485
strncpy(sc->tablename, first_shard->tablename, sizeof(sc->tablename));
64886486

64896487
rc = start_schema_change_tran(iq, NULL);
6490-
6491-
sc->partition.type = tt;
64926488
iq->sc->sc_next = iq->sc_pending;
64936489
iq->sc_pending = iq->sc;
64946490
if (rc) {

Diff for: schemachange/sc_alter_table.c

+11-7
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,17 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s,
403403
struct scinfo scinfo;
404404
struct errstat err = {0};
405405

406-
if (s->partition.type == PARTITION_MERGE)
406+
db = get_dbtable_by_name(s->tablename);
407+
if (db == NULL) {
408+
sc_errf(s, "Table not found:%s\n", s->tablename);
409+
return SC_TABLE_DOESNOT_EXIST;
410+
}
411+
412+
413+
/* note for a partition merge, if we reuse the first shard (i.e. it is aliased),
414+
* we need to alter it here instead of running do_merge_table
415+
*/
416+
if (s->partition.type == PARTITION_MERGE && !db->sqlaliasname)
407417
return do_merge_table(iq, s, tran);
408418

409419
#ifdef DEBUG_SC
@@ -413,12 +423,6 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s,
413423
gbl_use_plan = 1;
414424
gbl_sc_last_writer_time = 0;
415425

416-
db = get_dbtable_by_name(s->tablename);
417-
if (db == NULL) {
418-
sc_errf(s, "Table not found:%s\n", s->tablename);
419-
return SC_TABLE_DOESNOT_EXIST;
420-
}
421-
422426
if (s->resume == SC_PREEMPT_RESUME) {
423427
newdb = db->sc_to;
424428
changed = s->schema_change;

Diff for: schemachange/sc_logic.c

+64-19
Original file line numberDiff line numberDiff line change
@@ -978,15 +978,58 @@ static int verify_sc_resumed_for_shard(const char *shardname,
978978
static int verify_sc_resumed_for_all_shards(void *obj, void *arg)
979979
{
980980
struct timepart_sc_resuming *tpt_sc = (struct timepart_sc_resuming *)obj;
981+
int rc = 0;
982+
983+
/* corner case, shard merging: all shard schema changes need
984+
* to share the same destination table newdb;
985+
* at this point we will block resuming a shard merging, give
986+
* the complexity of the changes involved
987+
*/
988+
struct schema_change_type *sc = tpt_sc->s;
989+
while (sc) {
990+
if (sc->partition.type == PARTITION_MERGE) {
991+
break;
992+
}
993+
sc = sc->sc_next;
994+
}
995+
if (sc) {
996+
/* merge operation, resume not supported */
997+
logmsg(LOGMSG_ERROR, "%s partition merging detected %s, aborting\n", __func__,
998+
tpt_sc->s->tablename);
999+
sc = tpt_sc->s;
1000+
while (sc) {
1001+
mark_schemachange_over(sc->tablename);
1002+
struct dbtable *tbl = get_dbtable_by_name(sc->tablename);
1003+
if (tbl) {
1004+
sc_del_unused_files(tbl);
1005+
}
1006+
sc->sc_rc = SC_ABORTED;
1007+
sc = sc->sc_next;
1008+
}
1009+
return 0;
1010+
}
1011+
1012+
/* we need to start all the shards already in progress */
1013+
sc = tpt_sc->s;
1014+
while (sc) {
1015+
rc = start_schema_change(sc);
1016+
if (rc != SC_OK && rc != SC_ASYNC) {
1017+
logmsg(LOGMSG_ERROR,
1018+
"%s: failed to resume schema change for table '%s' rc %d\n",
1019+
__func__, sc->tablename, rc);
1020+
sc->sc_rc = SC_ABORTED;
1021+
return -1;
1022+
}
1023+
sc = sc->sc_next;
1024+
}
9811025

982-
/* all shards resumed, including the next shard if any */
1026+
/* are all shards resumed, including the next shard if any */
9831027
if (tpt_sc->nshards > timepart_get_num_shards(tpt_sc->viewname))
9841028
return 0;
9851029

1030+
/* start new sc for shards that were not resumed */
9861031
timepart_sc_arg_t sc_arg = {0};
9871032
sc_arg.s = tpt_sc->s;
988-
sc_arg.part_name = tpt_sc->viewname;
989-
/* start new sc for shards that were not resumed */
9901033
sc_arg.check_extra_shard = 1;
9911034
sc_arg.lockless = 1;
9921035
timepart_foreach_shard(verify_sc_resumed_for_shard, &sc_arg);
@@ -1254,19 +1297,7 @@ int resume_schema_change(void)
12541297

12551298
MEMORY_SYNC;
12561299

1257-
/* start the schema change back up */
1258-
rc = start_schema_change(s);
1259-
if (rc != SC_OK) {
1260-
logmsg(
1261-
LOGMSG_ERROR,
1262-
"%s: failed to resume schema change for table '%s' rc %d\n",
1263-
__func__, s->tablename, rc);
1264-
/* start_schema_change will free if this fails */
1265-
/*
1266-
free_schema_change_type(s);
1267-
*/
1268-
continue;
1269-
} else if (s->timepartition_name) {
1300+
if (s->timepartition_name) {
12701301
struct timepart_sc_resuming *tpt_sc = NULL;
12711302
tpt_sc = hash_find(tpt_sc_hash, &s->timepartition_name);
12721303
if (tpt_sc == NULL) {
@@ -1287,9 +1318,23 @@ int resume_schema_change(void)
12871318
tpt_sc->s = s;
12881319
tpt_sc->nshards++;
12891320
}
1290-
} else if (s->finalize == 0 && s->kind != SC_ALTERTABLE_PENDING) {
1291-
s->sc_next = sc_resuming;
1292-
sc_resuming = s;
1321+
} else {
1322+
/* start the schema change back up */
1323+
rc = start_schema_change(s);
1324+
if (rc != SC_OK) {
1325+
logmsg(
1326+
LOGMSG_ERROR,
1327+
"%s: failed to resume schema change for table '%s' rc %d\n",
1328+
__func__, s->tablename, rc);
1329+
/* start_schema_change will free if this fails */
1330+
/*
1331+
free_schema_change_type(s);
1332+
*/
1333+
continue;
1334+
} else if (s->finalize == 0 && s->kind != SC_ALTERTABLE_PENDING) {
1335+
s->sc_next = sc_resuming;
1336+
sc_resuming = s;
1337+
}
12931338
}
12941339
}
12951340
}

Diff for: tests/TODO

-1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,5 @@ rowlocks_blkseq.test
8383
sc_async_constraints.test
8484
sigstopcluster.test
8585
weighted_standing_queue.test -- failing in rhel8 + podman
86-
sc_resume_partition.test -- Pending PR #5073
8786

8887
# vim: set sw=4 ts=4 et:

0 commit comments

Comments
 (0)