diff --git a/cdb2api/cdb2api.c b/cdb2api/cdb2api.c index 211fdc70d5..f6ad75dc90 100644 --- a/cdb2api/cdb2api.c +++ b/cdb2api/cdb2api.c @@ -94,7 +94,7 @@ static int CDB2_PORTMUXPORT = CDB2_PORTMUXPORT_DEFAULT; #define MAX_RETRIES_DEFAULT 21 static int MAX_RETRIES = MAX_RETRIES_DEFAULT; /* We are looping each node twice. */ -#define MIN_RETRIES_DEFAULT 16 +#define MIN_RETRIES_DEFAULT 3 static int MIN_RETRIES = MIN_RETRIES_DEFAULT; #define CDB2_CONNECT_TIMEOUT_DEFAULT 100 diff --git a/db/osqlblockproc.c b/db/osqlblockproc.c index 41dfa99581..4af4c49ccc 100644 --- a/db/osqlblockproc.c +++ b/db/osqlblockproc.c @@ -1602,6 +1602,8 @@ int resume_sc_multiddl_txn(sc_list_t *scl) * NOTE: schema changes will be queued in iq->sc_pending * */ + + printf("%s: About to run scs\n", __func__); rc = bplog_schemachange_run(iq, scl->uuid, &scs); if (rc) { free(iq); diff --git a/db/osqlcomm.c b/db/osqlcomm.c index 3b3d3b7d2a..61c6f77585 100644 --- a/db/osqlcomm.c +++ b/db/osqlcomm.c @@ -82,6 +82,9 @@ int gbl_partition_sc_reorder = 1; extern int db_is_exiting(); +extern int prepare_schema_change(struct ireq *iq, tran_type *trans); +extern int launch_schema_change(struct ireq *iq, tran_type *trans); + static int osql_net_type_to_net_uuid_type(int type); static void osql_extract_snap_info(osql_sess_t *sess, void *rpl, int rpllen); @@ -6335,7 +6338,42 @@ static int _process_single_table_sc(struct ireq *iq) return rc; } -static int start_schema_change_tran_wrapper_merge(const char *tblname, +static int launch_schema_change_wrapper_merge(const char *tblname, + timepart_view_t **pview, + timepart_sc_arg_t *arg) +{ + struct ireq *iq = arg->s->iq; + iq->sc = iq->sc_pending; + + if (arg->lockless) + views_unlock(); + + const int rc = launch_schema_change(iq, NULL); + + iq->sc_pending = iq->sc->sc_next; + + struct schema_change_type *sc_last = iq->sc; + while (sc_last->sc_next) { + sc_last = sc_last->sc_next; + } + iq->sc->sc_next = NULL; + sc_last->sc_next = iq->sc; + + iq->sc->newdb = NULL; /* lose ownership, otherwise double free */ + + if (arg->lockless) { + *pview = timepart_reaquire_view(arg->part_name); + if (!pview) { + logmsg(LOGMSG_ERROR, "%s view %s dropped while processing\n", + __func__, arg->part_name); + return VIEW_ERR_SC; + } + } + + return rc; +} + +static int prepare_schema_change_wrapper_merge(const char *tblname, timepart_view_t **pview, timepart_sc_arg_t *arg) { @@ -6353,6 +6391,7 @@ static int start_schema_change_tran_wrapper_merge(const char *tblname, /* new target */ strncpy0(alter_sc->tablename, tblname, sizeof(sc->tablename)); + printf("%s for %s\n", __func__, alter_sc->tablename); /*alter_sc->usedbtablevers = sc->partition.u.mergetable.version;*/ alter_sc->kind = SC_ALTERTABLE; /* use the created file as target */ @@ -6360,6 +6399,7 @@ static int start_schema_change_tran_wrapper_merge(const char *tblname, alter_sc->force_rebuild = 1; /* we are moving rows here */ /* alter only in parallel mode for live */ alter_sc->scanmode = SCAN_PARALLEL; + // alter_sc->resume = arg->s->resume; /* link the sc */ iq->sc = alter_sc; @@ -6371,13 +6411,12 @@ static int start_schema_change_tran_wrapper_merge(const char *tblname, if (arg->lockless) views_unlock(); - rc = start_schema_change_tran(iq, NULL); + rc = prepare_schema_change(iq, NULL); - /* link the alter */ iq->sc->sc_next = iq->sc_pending; iq->sc_pending = iq->sc; - iq->sc->newdb = NULL; /* lose ownership, otherwise double free */ + /* link the alter */ if (arg->lockless) { *pview = timepart_reaquire_view(arg->part_name); if (!pview) { @@ -6430,18 +6469,42 @@ static int _process_single_table_sc_merge(struct ireq *iq) sc->usedbtablevers = sc->partition.u.mergetable.version; enum comdb2_partition_type old_part_type = sc->partition.type; sc->partition.type = PARTITION_MERGE; - rc = start_schema_change_tran_wrapper_merge( + rc = prepare_schema_change_wrapper_merge( + sc->partition.u.mergetable.tablename, NULL, &arg); + rc = launch_schema_change_wrapper_merge( sc->partition.u.mergetable.tablename, NULL, &arg); sc->partition.type = old_part_type; return rc; } +static int launch_merge_schema_change_for_shards(timepart_sc_arg_t * const arg) +{ + comdb2_name_thread(__func__); + thread_started("launch merge thread"); + struct thr_handle *thr_self = thrman_self(); + + if (thr_self) { + thrman_change_type(thr_self, THRTYPE_SCHEMACHANGE); + } else { + thr_self = thrman_register(THRTYPE_SCHEMACHANGE); + } + + bdb_thread_event(thedb->bdb_env, BDBTHR_EVENT_START_RDWR); + const int rc = timepart_foreach_shard(launch_schema_change_wrapper_merge, arg); + bdb_thread_event(thedb->bdb_env, BDBTHR_EVENT_DONE_RDWR); + + free(arg->part_name); + free(arg); + + return rc; +} + static int _process_partitioned_table_merge(struct ireq *iq) { struct schema_change_type *sc = iq->sc; int rc; - timepart_sc_arg_t arg = {0}; + timepart_sc_arg_t * arg = (timepart_sc_arg_t *) calloc(1, sizeof(timepart_sc_arg_t)); assert(sc->kind == SC_ALTERTABLE); @@ -6496,24 +6559,47 @@ static int _process_partitioned_table_merge(struct ireq *iq) iq->osql_flags |= OSQL_FLAGS_SCDONE; return ERR_SC; } - arg.check_extra_shard = 1; + arg->check_extra_shard = 1; strncpy(sc->newtable, sc->tablename, sizeof(sc->newtable)); /* piggyback a rename with alter */ - arg.start = 1; + arg->start = 1; /* since this is a partition drop, we do not need to set/reset arg.pos here */ } /* at this point we have created the future btree, launch an alter * for each of the shards of the partition */ - arg.s = sc; - arg.s->iq = iq; - arg.part_name = strdup(sc->tablename); /*sc->tablename gets rewritten*/ - if (!arg.part_name) + sc->iq = iq; + + // We need to pass this sc into `timepart_foreach_shard` (rather than a clone of it) + // because `timepart_foreach_shard` sets `timepartition_name`, which needs + // to be set when the sc is getting finalized. + arg->s = sc; + + arg->part_name = strdup(sc->tablename); /*sc->tablename gets rewritten*/ + if (!arg->part_name) return VIEW_ERR_MALLOC; - arg.lockless = 1; - /* note: we have already set nothrevent depending on the number of shards */ - rc = timepart_foreach_shard(start_schema_change_tran_wrapper_merge, &arg); - free(arg.part_name); + arg->lockless = 1; + + rc = timepart_foreach_shard(prepare_schema_change_wrapper_merge, arg); + if (rc) { + logmsg(LOGMSG_ERROR, "%s: Failed to prepare shard schema changes\n", __func__); + return ERR_SC; + } + + if (sc->resume) { + // After we spawn the thread to do the schema changes, the first shard's sc may be freed + // -- so clone it here. + struct schema_change_type *sc_clone = clone_schemachange_type(sc); + sc_clone->resume = sc->resume; + sc_clone->newdb = sc->newdb; + sc_clone->iq = iq; + arg->s = sc_clone; + pthread_t tid; + pthread_create(&tid, &gbl_pthread_attr_detached, (void *(*)(void *)) launch_merge_schema_change_for_shards, arg); + } else { + rc = timepart_foreach_shard(launch_schema_change_wrapper_merge, arg); + iq->sc = iq->sc_pending; + } if (first_shard->sqlaliasname) { sc->partition.type = PARTITION_REMOVE; /* first shard is the collapsed table */ diff --git a/schemachange/sc_alter_table.c b/schemachange/sc_alter_table.c index bbd35ddb3f..82fd6881c0 100644 --- a/schemachange/sc_alter_table.c +++ b/schemachange/sc_alter_table.c @@ -790,6 +790,14 @@ static int do_merge_table(struct ireq *iq, struct schema_change_type *s, assert(db->sc_from == db && s->db == db); assert(db->sc_to == newdb && s->newdb == newdb); assert(db->doing_conversion == 1); + if (s->resume && IS_ALTERTABLE(s)) { + if (gbl_test_sc_resume_race && !get_stopsc(__func__, __LINE__)) { + logmsg(LOGMSG_INFO, "%s:%d sleeping 5s for sc_resume test\n", + __func__, __LINE__); + sleep(5); + } + decrement_sc_yet_to_resume_counter(); + } MEMORY_SYNC; if (get_stopsc(__func__, __LINE__)) { diff --git a/schemachange/sc_logic.c b/schemachange/sc_logic.c index 1d0c39ccb4..a5f672f799 100644 --- a/schemachange/sc_logic.c +++ b/schemachange/sc_logic.c @@ -63,12 +63,12 @@ static enum thrtype prepare_sc_thread(struct schema_change_type *s) thread_started("schema change"); oldtype = thrman_get_type(thr_self); thrman_change_type(thr_self, THRTYPE_SCHEMACHANGE); - } else + } else { thr_self = thrman_register(THRTYPE_SCHEMACHANGE); - if (!s->nothrevent) { - backend_thread_event(thedb, COMDB2_THR_EVENT_START_RDWR); - logmsg(LOGMSG_INFO, "Preparing schema change read write thread\n"); } + + backend_thread_event(thedb, COMDB2_THR_EVENT_START_RDWR); + logmsg(LOGMSG_INFO, "Preparing schema change read write thread\n"); } return oldtype; } diff --git a/schemachange/sc_records.c b/schemachange/sc_records.c index 2b1d9ff683..946dc1cbef 100644 --- a/schemachange/sc_records.c +++ b/schemachange/sc_records.c @@ -630,8 +630,10 @@ static int convert_record(struct convert_record_data *data) int no_wait_rowlock = 0; int64_t estimate = 0; + printf("hello\n"); + if (debug_switch_convert_record_sleep()) - sleep(5); + sleep(1000); if (data->s->sc_thd_failed) { if (!data->s->retry_bad_genids) diff --git a/schemachange/schemachange.c b/schemachange/schemachange.c index 9757cc81e1..ccb27cab91 100644 --- a/schemachange/schemachange.c +++ b/schemachange/schemachange.c @@ -42,8 +42,87 @@ const char *get_hostname_with_crc32(bdb_state_type *bdb_state, extern int gbl_test_sc_resume_race; -/* If this is successful, it increments */ -int start_schema_change_tran(struct ireq *iq, tran_type *trans) +int launch_schema_change(struct ireq *iq, tran_type *trans) +{ + struct schema_change_type *s = iq->sc; + int rc = 0; + + sc_arg_t *arg = malloc(sizeof(sc_arg_t)); + arg->trans = trans; + arg->iq = iq; + arg->sc = iq->sc; + + /* + ** if s->kind == SC_PARTIALUPRECS, we're going radio silent from this point + *forward + ** in order to produce minimal spew + */ + if (s->nothrevent) { + if (s->kind != SC_PARTIALUPRECS) + logmsg(LOGMSG_INFO, "Executing SYNCHRONOUSLY\n"); + rc = do_schema_change_tran(arg); + } else { + int max_threads = + bdb_attr_get(thedb->bdb_attr, BDB_ATTR_SC_ASYNC_MAXTHREADS); + Pthread_mutex_lock(&sc_async_mtx); + while (!s->must_resume && !s->resume && max_threads > 0 && + sc_async_threads >= max_threads) { + logmsg(LOGMSG_INFO, "Waiting for avaiable schema change threads\n"); + Pthread_cond_wait(&sc_async_cond, &sc_async_mtx); + } + sc_async_threads++; + Pthread_mutex_unlock(&sc_async_mtx); + + if (s->kind != SC_PARTIALUPRECS) + logmsg(LOGMSG_INFO, "Executing ASYNCHRONOUSLY\n"); + pthread_t tid; + + if (s->kind == SC_ALTERTABLE_PENDING || + s->preempted == SC_ACTION_RESUME) { + free(arg); + arg = NULL; + rc = pthread_create(&tid, &gbl_pthread_attr_detached, + (void *(*)(void *))do_schema_change_locked, s); + } else { + Pthread_mutex_lock(&s->mtxStart); + rc = pthread_create(&tid, &gbl_pthread_attr_detached, + (void *(*)(void *))do_schema_change_tran_thd, + arg); + if (rc == 0) { + while (!s->started) { + Pthread_cond_wait(&s->condStart, &s->mtxStart); + } + } + Pthread_mutex_unlock(&s->mtxStart); + } + if (rc) { + logmsg(LOGMSG_ERROR, + "start_schema_change:pthread_create rc %d %s\n", rc, + strerror(errno)); + + Pthread_mutex_lock(&sc_async_mtx); + sc_async_threads--; + Pthread_mutex_unlock(&sc_async_mtx); + + if (arg) + free(arg); + if (!s->is_osql) { + sc_set_running(iq, s, s->tablename, 0, gbl_myhostname, + time(NULL), __func__, __LINE__); + free_schema_change_type(s); + } + rc = SC_ASYNC_FAILED; + } + } + /* SC_COMMIT_PENDING is SC_OK for the upper layers */ + if (rc == SC_COMMIT_PENDING) { + rc = s->sc_rc = SC_OK; + } + + return rc; +} + +int prepare_schema_change(struct ireq *iq, tran_type *trans) { struct schema_change_type *s = iq->sc; int maxcancelretry = 10; @@ -305,10 +384,6 @@ int start_schema_change_tran(struct ireq *iq, tran_type *trans) } iq->sc_seed = seed; - sc_arg_t *arg = malloc(sizeof(sc_arg_t)); - arg->trans = trans; - arg->iq = iq; - arg->sc = iq->sc; s->started = 0; if (s->resume && s->resume != SC_OSQL_RESUME && IS_ALTERTABLE(s)) { @@ -319,73 +394,28 @@ int start_schema_change_tran(struct ireq *iq, tran_type *trans) } ATOMIC_ADD32(gbl_sc_resume_start, 1); } - /* - ** if s->kind == SC_PARTIALUPRECS, we're going radio silent from this point - *forward - ** in order to produce minimal spew - */ - if (s->nothrevent) { - if (s->kind != SC_PARTIALUPRECS) - logmsg(LOGMSG_INFO, "Executing SYNCHRONOUSLY\n"); - rc = do_schema_change_tran(arg); - } else { - int max_threads = - bdb_attr_get(thedb->bdb_attr, BDB_ATTR_SC_ASYNC_MAXTHREADS); - Pthread_mutex_lock(&sc_async_mtx); - while (!s->must_resume && !s->resume && max_threads > 0 && - sc_async_threads >= max_threads) { - logmsg(LOGMSG_INFO, "Waiting for avaiable schema change threads\n"); - Pthread_cond_wait(&sc_async_cond, &sc_async_mtx); - } - sc_async_threads++; - Pthread_mutex_unlock(&sc_async_mtx); - if (s->kind != SC_PARTIALUPRECS) - logmsg(LOGMSG_INFO, "Executing ASYNCHRONOUSLY\n"); - pthread_t tid; - - if (s->kind == SC_ALTERTABLE_PENDING || - s->preempted == SC_ACTION_RESUME) { - free(arg); - arg = NULL; - rc = pthread_create(&tid, &gbl_pthread_attr_detached, - (void *(*)(void *))do_schema_change_locked, s); - } else { - Pthread_mutex_lock(&s->mtxStart); - rc = pthread_create(&tid, &gbl_pthread_attr_detached, - (void *(*)(void *))do_schema_change_tran_thd, - arg); - if (rc == 0) { - while (!s->started) { - Pthread_cond_wait(&s->condStart, &s->mtxStart); - } - } - Pthread_mutex_unlock(&s->mtxStart); - } - if (rc) { - logmsg(LOGMSG_ERROR, - "start_schema_change:pthread_create rc %d %s\n", rc, - strerror(errno)); - - Pthread_mutex_lock(&sc_async_mtx); - sc_async_threads--; - Pthread_mutex_unlock(&sc_async_mtx); + return rc; +} - if (arg) - free(arg); - if (!s->is_osql) { - sc_set_running(iq, s, s->tablename, 0, gbl_myhostname, - time(NULL), __func__, __LINE__); - free_schema_change_type(s); - } - rc = SC_ASYNC_FAILED; - } +/* If this is successful, it increments */ +int start_schema_change_tran(struct ireq *iq, tran_type *trans) +{ + int rc = prepare_schema_change(iq, trans); + if (rc) { + logmsg(LOGMSG_ERROR, "%s:%d Failed to prepare schema change. rc(%d)\n", + __func__, __LINE__, rc); + goto err; } - /* SC_COMMIT_PENDING is SC_OK for the upper layers */ - if (rc == SC_COMMIT_PENDING) { - rc = s->sc_rc = SC_OK; + + rc = launch_schema_change(iq, trans); + if (rc) { + logmsg(LOGMSG_INFO, "%s:%d Failed to complete schema change. rc(%d)\n", + __func__, __LINE__, rc); + goto err; } +err: return rc; } diff --git a/tests/sc_resume_partition.test/Makefile b/tests/sc_resume_partition.test/Makefile new file mode 100644 index 0000000000..053ca0182e --- /dev/null +++ b/tests/sc_resume_partition.test/Makefile @@ -0,0 +1,8 @@ +ifeq ($(TESTSROOTDIR),) + include ../testcase.mk +else + include $(TESTSROOTDIR)/testcase.mk +endif +ifeq ($(TEST_TIMEOUT),) + export TEST_TIMEOUT=2m +endif diff --git a/tests/sc_resume_partition.test/lrl.options b/tests/sc_resume_partition.test/lrl.options new file mode 100644 index 0000000000..1ff97f5a5d --- /dev/null +++ b/tests/sc_resume_partition.test/lrl.options @@ -0,0 +1 @@ +multitable_ddl 1 diff --git a/tests/sc_resume_partition.test/runit b/tests/sc_resume_partition.test/runit new file mode 100755 index 0000000000..afdb6b0cbb --- /dev/null +++ b/tests/sc_resume_partition.test/runit @@ -0,0 +1,123 @@ +#!/usr/bin/env bash + +source ${TESTSROOTDIR}/tools/runit_common.sh +source ${TESTSROOTDIR}/tools/cluster_utils.sh + +set -ex + +[ -z "${CLUSTER}" ] && { echo "Test requires a cluster"; exit 0; } + +dbnm=$1 + +restart_cluster() { + set +e + for node in ${CLUSTER} ; do + kill_restart_node ${node} & + done + set -e + + sleep 2 + + wait_for_cluster +} + +test_alter_resume() { + # Given + local master + master=`cdb2sql --tabs ${CDB2_OPTIONS} $dbnm default 'SELECT host FROM comdb2_cluster WHERE is_master="Y"'` + + cdb2sql $dbnm --host $master "EXEC PROCEDURE sys.cmd.send('convert_record_sleep 1')" + cdb2sql ${CDB2_OPTIONS} ${dbnm} default "create table t(a int, b int)" + cdb2sql ${CDB2_OPTIONS} ${dbnm} default "insert into t values(1, 1)" + + # When + cdb2sql ${CDB2_OPTIONS} ${dbnm} default "alter table t drop column a" & + local -r waitpid=$! + sleep 1 + restart_cluster &> /dev/null + + # Then + if wait ${waitpid}; + then + echo "FAIL: Merge succeeded before cluster bounced. Test is buggy." + return 1 + fi + + # Cleanup + cdb2sql ${CDB2_OPTIONS} ${dbnm} default "drop table t" +} + +test_partition_merge_resume() { + # Given + local -r num_shards=$1 + local starttime + starttime=$(get_timestamp 120) + local master + master=`cdb2sql --tabs ${CDB2_OPTIONS} $dbnm default 'SELECT host FROM comdb2_cluster WHERE is_master="Y"'` + + cdb2sql $dbnm --host $master "EXEC PROCEDURE sys.cmd.send('convert_record_sleep 1')" + cdb2sql ${CDB2_OPTIONS} ${dbnm} default "create table t(a int) partitioned by time period 'daily' retention ${num_shards} start '${starttime}'" + + for i in $(seq 0 1 $((num_shards-1))); + do + local shard + shard=$(cdb2sql --tabs ${CDB2_OPTIONS} ${dbnm} default "select shardname from comdb2_timepartshards limit 1 offset ${i}") + cdb2sql ${CDB2_OPTIONS} ${dbnm} default "insert into '${shard}' values(1)" + done + + # When + cdb2sql ${CDB2_OPTIONS} ${dbnm} default "alter table t partitioned by none" & + local -r waitpid=$! + sleep 1 + + local -r elapsed_time_before_restart=${SECONDS} + restart_cluster &> /dev/null + local -r elapsed_time_after_restart=${SECONDS} + + local -r expected_max_restart_duration=30 + local -r restart_duration=$((elapsed_time_after_restart - elapsed_time_before_restart)) + if (( restart_duration > expected_max_restart_duration )); + then + echo "FAIL: Restart took ${restart_duration} seconds. This is larger than expected upper bound ${expected_max_restart_duration}" + return 1 + fi + + # Then + if wait ${waitpid}; + then + echo "FAIL: Merge succeeded before cluster bounced. Test is buggy." + return 1 + fi + + local timepart + timepart=$(cdb2sql --tabs ${CDB2_OPTIONS} ${dbnm} default "select * from comdb2_timepartitions where name='t'") + if [[ -n ${timepart} ]]; + then + echo "FAIL: Found time partition that should not exist" + return 1 + fi + + local num_records + num_records=$(cdb2sql --tabs ${CDB2_OPTIONS} ${dbnm} default "select count(*) from t") + if (( num_records != num_shards )); + then + echo "FAIL: Unexpected number of records after partition merge" + return 1 + fi + + # Cleanup + cdb2sql ${CDB2_OPTIONS} ${dbnm} default "drop table t" +} + +main() { + # test_alter_resume + + local num_shards=10 records_per_shard=1 + test_partition_merge_resume ${num_shards} ${records_per_shard} + + # Stress test + num_shards=100 records_per_shard=100000 + # test_partition_merge_resume ${num_shards} ${records_per_shard} +} + +main diff --git a/tests/tools/cluster_utils.sh b/tests/tools/cluster_utils.sh index 2903cd840d..927b55d059 100644 --- a/tests/tools/cluster_utils.sh +++ b/tests/tools/cluster_utils.sh @@ -247,3 +247,12 @@ function kill_restart_tertiary_node } +function wait_for_cluster +{ + for node in ${CLUSTER} ; do + until cdb2sql --host ${node} ${CDB2_OPTIONS} ${DBNAME} default "select 1"; + do + : + done + done +}