Skip to content

Add partition merge resume test #5022

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdb2api/cdb2api.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ static int CDB2_PORTMUXPORT = CDB2_PORTMUXPORT_DEFAULT;
#define MAX_RETRIES_DEFAULT 21
static int MAX_RETRIES = MAX_RETRIES_DEFAULT; /* We are looping each node twice. */

#define MIN_RETRIES_DEFAULT 16
#define MIN_RETRIES_DEFAULT 3
static int MIN_RETRIES = MIN_RETRIES_DEFAULT;

#define CDB2_CONNECT_TIMEOUT_DEFAULT 100
Expand Down
2 changes: 2 additions & 0 deletions db/osqlblockproc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1602,6 +1602,8 @@ int resume_sc_multiddl_txn(sc_list_t *scl)
* NOTE: schema changes will be queued in iq->sc_pending
*
*/

printf("%s: About to run scs\n", __func__);
rc = bplog_schemachange_run(iq, scl->uuid, &scs);
if (rc) {
free(iq);
Expand Down
118 changes: 102 additions & 16 deletions db/osqlcomm.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ int gbl_partition_sc_reorder = 1;

extern int db_is_exiting();

extern int prepare_schema_change(struct ireq *iq, tran_type *trans);
extern int launch_schema_change(struct ireq *iq, tran_type *trans);

static int osql_net_type_to_net_uuid_type(int type);
static void osql_extract_snap_info(osql_sess_t *sess, void *rpl, int rpllen);

Expand Down Expand Up @@ -6335,7 +6338,42 @@ static int _process_single_table_sc(struct ireq *iq)
return rc;
}

static int start_schema_change_tran_wrapper_merge(const char *tblname,
static int launch_schema_change_wrapper_merge(const char *tblname,
timepart_view_t **pview,
timepart_sc_arg_t *arg)
{
struct ireq *iq = arg->s->iq;
iq->sc = iq->sc_pending;

if (arg->lockless)
views_unlock();

const int rc = launch_schema_change(iq, NULL);

iq->sc_pending = iq->sc->sc_next;

struct schema_change_type *sc_last = iq->sc;
while (sc_last->sc_next) {
sc_last = sc_last->sc_next;
}
iq->sc->sc_next = NULL;
sc_last->sc_next = iq->sc;

iq->sc->newdb = NULL; /* lose ownership, otherwise double free */

if (arg->lockless) {
*pview = timepart_reaquire_view(arg->part_name);
if (!pview) {
logmsg(LOGMSG_ERROR, "%s view %s dropped while processing\n",
__func__, arg->part_name);
return VIEW_ERR_SC;
}
}

return rc;
}

static int prepare_schema_change_wrapper_merge(const char *tblname,
timepart_view_t **pview,
timepart_sc_arg_t *arg)
{
Expand All @@ -6353,13 +6391,15 @@ static int start_schema_change_tran_wrapper_merge(const char *tblname,

/* new target */
strncpy0(alter_sc->tablename, tblname, sizeof(sc->tablename));
printf("%s for %s\n", __func__, alter_sc->tablename);
/*alter_sc->usedbtablevers = sc->partition.u.mergetable.version;*/
alter_sc->kind = SC_ALTERTABLE;
/* use the created file as target */
alter_sc->newdb = sc->newdb;
alter_sc->force_rebuild = 1; /* we are moving rows here */
/* alter only in parallel mode for live */
alter_sc->scanmode = SCAN_PARALLEL;
// alter_sc->resume = arg->s->resume;
/* link the sc */
iq->sc = alter_sc;

Expand All @@ -6371,13 +6411,12 @@ static int start_schema_change_tran_wrapper_merge(const char *tblname,
if (arg->lockless)
views_unlock();

rc = start_schema_change_tran(iq, NULL);
rc = prepare_schema_change(iq, NULL);

/* link the alter */
iq->sc->sc_next = iq->sc_pending;
iq->sc_pending = iq->sc;
iq->sc->newdb = NULL; /* lose ownership, otherwise double free */

/* link the alter */
if (arg->lockless) {
*pview = timepart_reaquire_view(arg->part_name);
if (!pview) {
Expand Down Expand Up @@ -6430,18 +6469,42 @@ static int _process_single_table_sc_merge(struct ireq *iq)
sc->usedbtablevers = sc->partition.u.mergetable.version;
enum comdb2_partition_type old_part_type = sc->partition.type;
sc->partition.type = PARTITION_MERGE;
rc = start_schema_change_tran_wrapper_merge(
rc = prepare_schema_change_wrapper_merge(
sc->partition.u.mergetable.tablename, NULL, &arg);
rc = launch_schema_change_wrapper_merge(
sc->partition.u.mergetable.tablename, NULL, &arg);
sc->partition.type = old_part_type;

return rc;
}

static int launch_merge_schema_change_for_shards(timepart_sc_arg_t * const arg)
{
comdb2_name_thread(__func__);
thread_started("launch merge thread");
struct thr_handle *thr_self = thrman_self();

if (thr_self) {
thrman_change_type(thr_self, THRTYPE_SCHEMACHANGE);
} else {
thr_self = thrman_register(THRTYPE_SCHEMACHANGE);
}

bdb_thread_event(thedb->bdb_env, BDBTHR_EVENT_START_RDWR);
const int rc = timepart_foreach_shard(launch_schema_change_wrapper_merge, arg);
bdb_thread_event(thedb->bdb_env, BDBTHR_EVENT_DONE_RDWR);

free(arg->part_name);
free(arg);

return rc;
}

static int _process_partitioned_table_merge(struct ireq *iq)
{
struct schema_change_type *sc = iq->sc;
int rc;
timepart_sc_arg_t arg = {0};
timepart_sc_arg_t * arg = (timepart_sc_arg_t *) calloc(1, sizeof(timepart_sc_arg_t));

assert(sc->kind == SC_ALTERTABLE);

Expand Down Expand Up @@ -6496,24 +6559,47 @@ static int _process_partitioned_table_merge(struct ireq *iq)
iq->osql_flags |= OSQL_FLAGS_SCDONE;
return ERR_SC;
}
arg.check_extra_shard = 1;
arg->check_extra_shard = 1;
strncpy(sc->newtable, sc->tablename, sizeof(sc->newtable)); /* piggyback a rename with alter */
arg.start = 1;
arg->start = 1;
/* since this is a partition drop, we do not need to set/reset arg.pos here */
}

/* at this point we have created the future btree, launch an alter
* for each of the shards of the partition
*/
arg.s = sc;
arg.s->iq = iq;
arg.part_name = strdup(sc->tablename); /*sc->tablename gets rewritten*/
if (!arg.part_name)
sc->iq = iq;

// We need to pass this sc into `timepart_foreach_shard` (rather than a clone of it)
// because `timepart_foreach_shard` sets `timepartition_name`, which needs
// to be set when the sc is getting finalized.
arg->s = sc;

arg->part_name = strdup(sc->tablename); /*sc->tablename gets rewritten*/
if (!arg->part_name)
return VIEW_ERR_MALLOC;
arg.lockless = 1;
/* note: we have already set nothrevent depending on the number of shards */
rc = timepart_foreach_shard(start_schema_change_tran_wrapper_merge, &arg);
free(arg.part_name);
arg->lockless = 1;

rc = timepart_foreach_shard(prepare_schema_change_wrapper_merge, arg);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: Failed to prepare shard schema changes\n", __func__);
return ERR_SC;
}

if (sc->resume) {
// After we spawn the thread to do the schema changes, the first shard's sc may be freed
// -- so clone it here.
struct schema_change_type *sc_clone = clone_schemachange_type(sc);
sc_clone->resume = sc->resume;
sc_clone->newdb = sc->newdb;
sc_clone->iq = iq;
arg->s = sc_clone;
pthread_t tid;
pthread_create(&tid, &gbl_pthread_attr_detached, (void *(*)(void *)) launch_merge_schema_change_for_shards, arg);
} else {
rc = timepart_foreach_shard(launch_schema_change_wrapper_merge, arg);
iq->sc = iq->sc_pending;
}

if (first_shard->sqlaliasname) {
sc->partition.type = PARTITION_REMOVE; /* first shard is the collapsed table */
Expand Down
8 changes: 8 additions & 0 deletions schemachange/sc_alter_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,14 @@ static int do_merge_table(struct ireq *iq, struct schema_change_type *s,
assert(db->sc_from == db && s->db == db);
assert(db->sc_to == newdb && s->newdb == newdb);
assert(db->doing_conversion == 1);
if (s->resume && IS_ALTERTABLE(s)) {
if (gbl_test_sc_resume_race && !get_stopsc(__func__, __LINE__)) {
logmsg(LOGMSG_INFO, "%s:%d sleeping 5s for sc_resume test\n",
__func__, __LINE__);
sleep(5);
}
decrement_sc_yet_to_resume_counter();
}
MEMORY_SYNC;

if (get_stopsc(__func__, __LINE__)) {
Expand Down
8 changes: 4 additions & 4 deletions schemachange/sc_logic.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ static enum thrtype prepare_sc_thread(struct schema_change_type *s)
thread_started("schema change");
oldtype = thrman_get_type(thr_self);
thrman_change_type(thr_self, THRTYPE_SCHEMACHANGE);
} else
} else {
thr_self = thrman_register(THRTYPE_SCHEMACHANGE);
if (!s->nothrevent) {
backend_thread_event(thedb, COMDB2_THR_EVENT_START_RDWR);
logmsg(LOGMSG_INFO, "Preparing schema change read write thread\n");
}

backend_thread_event(thedb, COMDB2_THR_EVENT_START_RDWR);
logmsg(LOGMSG_INFO, "Preparing schema change read write thread\n");
}
return oldtype;
}
Expand Down
4 changes: 3 additions & 1 deletion schemachange/sc_records.c
Original file line number Diff line number Diff line change
Expand Up @@ -630,8 +630,10 @@ static int convert_record(struct convert_record_data *data)
int no_wait_rowlock = 0;
int64_t estimate = 0;

printf("hello\n");

if (debug_switch_convert_record_sleep())
sleep(5);
sleep(1000);

if (data->s->sc_thd_failed) {
if (!data->s->retry_bad_genids)
Expand Down
Loading