diff --git a/bdb/bdb_api.h b/bdb/bdb_api.h index ea4b7e76e9..8553edc6fa 100644 --- a/bdb/bdb_api.h +++ b/bdb/bdb_api.h @@ -2496,3 +2496,6 @@ int bdb_keylen(bdb_state_type *bdb_state, int ixnum); void llmeta_collect_tablename_alias(void); #endif +int bdb_set_genshard(tran_type *, const char *, char *, int *); +int bdb_get_genshard(tran_type *, const char *, char **, int *, int *); +int bdb_get_genshard_names(tran_type *t, char **names, int *num); diff --git a/bdb/bdb_schemachange.c b/bdb/bdb_schemachange.c index 29acb6f0ce..e5c693534d 100644 --- a/bdb/bdb_schemachange.c +++ b/bdb/bdb_schemachange.c @@ -190,7 +190,7 @@ int handle_scdone(DB_ENV *dbenv, u_int32_t rectype, llog_scdone_args *scdoneop, } if (sctype == rename_table || sctype == alias_table || - (sctype == add && (strlen(table) + 1 < scdoneop->table.size))) { + ((sctype == add || sctype == genshard_add || sctype == genshard_drop) && (strlen(table) + 1 < scdoneop->table.size))) { strarg = &table[strlen(table) + 1]; } diff --git a/bdb/bdb_schemachange.h b/bdb/bdb_schemachange.h index 31ae3e9bee..423a02c44e 100644 --- a/bdb/bdb_schemachange.h +++ b/bdb/bdb_schemachange.h @@ -51,7 +51,9 @@ typedef enum scdone { add_queue_file, // 22 del_queue_file, // 23 alias_table, // 24 - alias // 25 + alias, // 25 + genshard_add, // 26 + genshard_drop // 27 } scdone_t; #define BDB_BUMP_DBOPEN_GEN(type, msg) \ diff --git a/bdb/llmeta.c b/bdb/llmeta.c index cb39d58809..6ce6d239e5 100644 --- a/bdb/llmeta.c +++ b/bdb/llmeta.c @@ -179,6 +179,7 @@ typedef enum { LLMETA_SCHEMACHANGE_STATUS_V2 = 56, LLMETA_SCHEMACHANGE_LIST = 57, /* list of all sc-s in a uuid txh */ LLMETA_SCHEMACHANGE_STATUS_PROTOBUF = 58, /* Indicate protobuf sc */ + LLMETA_GEN_SHARD = 59, } llmetakey_t; struct llmeta_file_type_key { @@ -7354,6 +7355,15 @@ int bdb_llmeta_print_record(bdb_state_type *bdb_state, void *key, int keylen, if (osql_scl_print((uint8_t*)p_buf_key, p_buf_end_key, (uint8_t*)p_buf_data, p_buf_end_data)) logmsg(LOGMSG_USER, " failed to deserialize object\n"); } break; + case LLMETA_GEN_SHARD: { + char tblname[LLMETA_TBLLEN + 1]; + buf_no_net_get(&(tblname), sizeof(tblname), p_buf_key + sizeof(int), + p_buf_end_key); + + logmsg(LOGMSG_USER, + "LLMETA_GEN_SHARD table=\"%s\" value=\"%s\"\n", + tblname, (char *)data); + } break; default: logmsg(LOGMSG_USER, "Todo (type=%d)\n", type); break; @@ -11372,3 +11382,230 @@ void *buf_get_schemachange(struct schema_change_type *s, void *p_buf, void *p_bu } return NULL; } + +struct llmeta_gen_shard_key { + int file_type; + char tablename[LLMETA_TBLLEN]; +}; + + +static int make_gen_shard_key(char *key, const char *table) +{ + + const uint8_t *p_gen_shard_key = NULL; + const uint8_t *p_gen_shard_key_end = NULL; + uint32_t key_type = 0; + assert(sizeof(int) + strlen(table) <= LLMETA_IXLEN); + key_type = LLMETA_GEN_SHARD; + p_gen_shard_key = (uint8_t *)key; + p_gen_shard_key_end = p_gen_shard_key + sizeof(int) + LLMETA_TBLLEN; + + if (!llmeta_bdb_state) { + logmsg(LOGMSG_ERROR, "%s: low level meta table not yet " + "open, you must run bdb_llmeta_open\n", + __func__); + return -1; + } + + /* put key type */ + if (!(p_gen_shard_key = + buf_put(&key_type, sizeof(key_type), (uint8_t *)p_gen_shard_key, + p_gen_shard_key_end))) { + logmsg(LOGMSG_ERROR, "%s:%d error converting to correct " + "endianess\n", + __func__, __LINE__); + return -1; + } + + /* put tablename of generic shard */ + if (!(p_gen_shard_key = + buf_put(table, strlen(table) , (uint8_t *)p_gen_shard_key, p_gen_shard_key_end))) { + logmsg(LOGMSG_ERROR, "%s:%d error converting to correct " + "endianess\n", + __func__, __LINE__); + return -1; + } + return 0; +} + +int bdb_set_genshard(tran_type *input_tran, const char *table, char *str, int *bdberr) { + uint32_t retries = 0; + int rc=0; + char key[LLMETA_IXLEN] = {0}; + tran_type *tran; + if (make_gen_shard_key(key, table)) { + *bdberr = BDBERR_BADARGS; + return -1; + } + /*const uint8_t *p_gen_shard_key = NULL; + const uint8_t *p_gen_shard_key_end = NULL; + + assert(sizeof(int) + strlen(table) <= LLMETA_IXLEN); + key_type = LLMETA_GEN_SHARD; + p_gen_shard_key = (uint8_t *)key; + p_gen_shard_key_end = p_gen_shard_key + sizeof(int) + LLMETA_TBLLEN; + + if (!llmeta_bdb_state) { + logmsg(LOGMSG_ERROR, "%s: low level meta table not yet " + "open, you must run bdb_llmeta_open\n", + __func__); + *bdberr = BDBERR_MISC; + return -1; + } + + if (!(p_gen_shard_key = + buf_put(&key_type, sizeof(key_type), (uint8_t *)p_gen_shard_key, + p_gen_shard_key_end))) { + logmsg(LOGMSG_ERROR, "%s:%d error converting to correct " + "endianess\n", + __func__, __LINE__); + *bdberr = BDBERR_MISC; + return -1; + } + + if (!(p_gen_shard_key = + buf_put(table, strlen(table) , (uint8_t *)p_gen_shard_key, p_gen_shard_key_end))) { + logmsg(LOGMSG_ERROR, "%s:%d error converting to correct " + "endianess\n", + __func__, __LINE__); + *bdberr = BDBERR_MISC; + return -1; + }*/ +retry: + if (++retries >= gbl_maxretries) { + logmsg(LOGMSG_ERROR, "%s: giving up after %d retries\n", __func__, retries); + return -1; + } + /*if the user didn't give us a transaction, create our own*/ + if (!input_tran) { + tran = bdb_tran_begin(llmeta_bdb_state, NULL, bdberr); + if (!tran) { + if (*bdberr == BDBERR_DEADLOCK) { + int dp = gbl_llmeta_deadlock_poll; + if (dp > 1) + poll(NULL, 0, rand() % dp); + goto retry; + } + + logmsg(LOGMSG_ERROR, "%s: failed to get " + "transaction\n", + __func__); + goto cleanup; + } + } else { + tran = input_tran; + } + + /* write the shard string */ + if (str) { + rc = bdb_lite_add(llmeta_bdb_state, tran, str, + strlen(str) + 1, key, bdberr); + } else { + rc = bdb_lite_exact_del(llmeta_bdb_state, tran, key, bdberr); + } + if (rc && *bdberr != BDBERR_NOERROR) + goto backout; + + /*commit if we created our own transaction*/ + if (!input_tran) { + rc = bdb_tran_commit(llmeta_bdb_state, tran, bdberr); + if (rc && *bdberr != BDBERR_NOERROR) + goto cleanup; + } + + *bdberr = BDBERR_NOERROR; + return 0; + +backout: + /*if we created the transaction*/ + if (!input_tran) { + /*kill the transaction*/ + rc = bdb_tran_abort(llmeta_bdb_state, tran, bdberr); + if (rc && !BDBERR_NOERROR) { + logmsg(LOGMSG_ERROR, "%s: trans abort failed with " + "bdberr %d\n", + __func__, *bdberr); + return -1; + } + if (*bdberr == BDBERR_DEADLOCK) { + int dp = gbl_llmeta_deadlock_poll; + if (dp > 1) + poll(NULL, 0, rand() % dp); + goto retry; + } + + logmsg(LOGMSG_ERROR, "%s:%d failed with bdberr %d\n", __func__,__LINE__, *bdberr); + } +cleanup: + return -1; +} + + +int bdb_get_genshard(tran_type *tran, const char *table, char **str, int *size, int *bdberr) { + char key[LLMETA_IXLEN] = {0}; + if (make_gen_shard_key(key, table)) { + *bdberr = BDBERR_BADARGS; + return -1; + } + /*const uint8_t *p_gen_shard_key = NULL; + const uint8_t *p_gen_shard_key_end = NULL; + + assert(sizeof(int) + strlen(table) <= LLMETA_IXLEN); + key_type = LLMETA_GEN_SHARD; + p_gen_shard_key = (uint8_t *)key; + p_gen_shard_key_end = p_gen_shard_key + sizeof(int) + LLMETA_TBLLEN; + + if (!llmeta_bdb_state) { + logmsg(LOGMSG_ERROR, "%s: low level meta table not yet " + "open, you must run bdb_llmeta_open\n", + __func__); + *bdberr = BDBERR_MISC; + return -1; + } + + if (!(p_gen_shard_key = + buf_put(&key_type, sizeof(key_type), (uint8_t *)p_gen_shard_key, + p_gen_shard_key_end))) { + logmsg(LOGMSG_ERROR, "%s:%d error converting to correct " + "endianess\n", + __func__, __LINE__); + *bdberr = BDBERR_MISC; + return -1; + } + + if (!(p_gen_shard_key = + buf_put(table, strlen(table) , (uint8_t *)p_gen_shard_key, p_gen_shard_key_end))) { + logmsg(LOGMSG_ERROR, "%s:%d error converting to correct " + "endianess\n", + __func__, __LINE__); + *bdberr = BDBERR_MISC; + return -1; + }*/ + return bdb_lite_exact_var_fetch_tran(llmeta_bdb_state, tran, key, (void **)str, size, bdberr); +} + +/* Fetch all generic shard names */ +int bdb_get_genshard_names(tran_type *t, char **names, int *num) +{ + union { + struct llmeta_gen_shard_key key; + uint8_t buf[LLMETA_IXLEN]; + } * *shards; + int rc, n, bdberr; + llmetakey_t k; + + k = htonl(LLMETA_GEN_SHARD); + rc = kv_get_keys(t, &k, sizeof(k), (void ***)&shards, &n, &bdberr); + if (rc || (n == 0)) { + *num = 0; + return rc; + } + + for (int i = 0; i < n; ++i) { + names[i] = strdup(shards[i]->key.tablename); + free(shards[i]); + } + free(shards); + *num = n; + return rc; +} diff --git a/db/comdb2.c b/db/comdb2.c index 40945e3dba..c82f50280e 100644 --- a/db/comdb2.c +++ b/db/comdb2.c @@ -208,7 +208,7 @@ static void *purge_old_files_thread(void *arg); static int lrllinecmp(char *lrlline, char *cmpto); static void ttrap(struct timer_parm *parm); int clear_temp_tables(void); - +int gen_shard_deserialize_shard(char **genshard_name, uint32_t *numdbs, char ***dbnames, uint32_t *numcols, char ***columns, char ***shardnames, char *serializedStr); pthread_key_t comdb2_open_key; /*---GLOBAL SETTINGS---*/ @@ -250,7 +250,7 @@ int gbl_watchdog_watch_threshold = 60; int gbl_watchdog_disable_at_start = 0; /* don't enable watchdog on start */ int gbl_nonames = 1; int gbl_reject_osql_mismatch = 1; -int gbl_abort_on_clear_inuse_rqid = 1; +int gbl_abort_on_clear_inuse_rqid = 0; int gbl_archive_on_init = 1; pthread_t gbl_invalid_tid; /* set this to our main threads tid */ @@ -2295,6 +2295,81 @@ int llmeta_load_views(struct dbenv *dbenv, void *tran) return rc; } +int llmeta_load_genshards(struct dbenv *dbenv, void *tran) { + int rc = 0; + int bdberr = 0; + /* allow as many generic sharded tables as regular tables (?) */ + char **tablenames = (char **)alloca(sizeof(char*) * thedb->num_dbs); + char *shard_info = NULL, *genshard_name = NULL; + char **dbnames = NULL, **columns = NULL, **shardnames = NULL; + int table_count = 0, size = 0; + uint32_t numdbs=0, numcols=0; + struct dbtable *db = NULL; + /* load the tables from the low level metatable */ + if (bdb_get_genshard_names(tran, (char **)tablenames, &table_count)) { + logmsg( + LOGMSG_ERROR, + "couldn't load generic shard names from low level meta table (bdberr: %d)\n", + bdberr); + return 1; + } + + for (int i = 0; i < table_count; i++) { + if (bdb_get_genshard(tran, tablenames[i], &shard_info, &size, &bdberr)) { + logmsg(LOGMSG_ERROR, + "couldn't load view definition from low level meta table " + "(bdberr: %d)\n", + bdberr); + goto err; + } + + if (gen_shard_deserialize_shard(&genshard_name, &numdbs, &dbnames, &numcols, &columns, &shardnames, shard_info)) { + logmsg(LOGMSG_ERROR, "Failed to deserialize llmeta str for generic shard %s\n", tablenames[i]); + } + + for(int i=0;igenshard_name = genshard_name; + db->numdbs = numdbs; + db->dbnames = dbnames; + db->numcols = numcols; + db->columns = columns; + db->shardnames = shardnames; + hash_sqlalias_db(db, genshard_name); + } else { + logmsg(LOGMSG_ERROR, "FAILED TO UPDATE GENERIC SHARDING METADATA FOR TABLE %s\n", tablenames[i]); + } + } + } + return rc; +err: + for (int i = 0; i < table_count; i++) { + free(tablenames[i]); + } + + if (dbnames) { + for(int i=0;isqlaliasname) { - rc = gen_shard_update_inmem_db(tran, tbl, tbl->sqlaliasname); - if (rc) { - logmsg(LOGMSG_USER, "NOT UPDATING SHARD METADATA FOR TABLE %s\n", tbl->tablename); - } - } /* We only want to load older schema versions for ODH databases. ODH * information is stored in the meta table (not the llmeta table), so * it's not loaded yet. @@ -4228,6 +4294,7 @@ static int init(int argc, char **argv) return -1; } + if (llmeta_load_timepart(thedb)) { logmsg(LOGMSG_ERROR, "could not load time partitions\n"); unlock_schema_lk(); @@ -4369,6 +4436,13 @@ static int init(int argc, char **argv) logmsg(LOGMSG_FATAL, "invalid client function for dbstore\n"); return -1; } + + if (llmeta_load_genshards(thedb, NULL)) { + logmsg(LOGMSG_FATAL, "could not load generic shards from the low level meta " + "table\n"); + unlock_schema_lk(); + return -1; + } unlock_schema_lk(); /* There could have been an in-process schema change. Add those tables now diff --git a/db/comdb2.h b/db/comdb2.h index 6c25f76f9f..cc0e770923 100644 --- a/db/comdb2.h +++ b/db/comdb2.h @@ -791,6 +791,7 @@ typedef struct dbtable { const char *timepartition_name; /* generic sharding metadata */ + char *genshard_name; uint32_t numdbs; char **dbnames; uint32_t numcols; diff --git a/db/fdb_fend.c b/db/fdb_fend.c index 32e20b076c..7233ffc2a3 100644 --- a/db/fdb_fend.c +++ b/db/fdb_fend.c @@ -451,9 +451,8 @@ static void __fdb_add_user(fdb_t *fdb, int noTrace) Pthread_mutex_lock(&fdb->users_mtx); fdb->users++; - if (!noTrace && gbl_fdb_track) - logmsg(LOGMSG_USER, "%p %s %s users %d\n", (void *)pthread_self(), __func__, fdb->dbname, fdb->users); - + //if (!noTrace && gbl_fdb_track) + logmsg(LOGMSG_USER, "%s %p %s %s users %d\n",__func__, (void *)pthread_self(), __func__, fdb->dbname, fdb->users); assert(fdb->users > 0); Pthread_mutex_unlock(&fdb->users_mtx); } @@ -467,8 +466,8 @@ static void __fdb_rem_user(fdb_t *fdb, int noTrace) Pthread_mutex_lock(&fdb->users_mtx); fdb->users--; - if (!noTrace && gbl_fdb_track) - logmsg(LOGMSG_USER, "%p %s %s users %d\n", (void *)pthread_self(), __func__, fdb->dbname, fdb->users); + //if (!noTrace && gbl_fdb_track) + logmsg(LOGMSG_USER, "%s %p %s %s users %d\n",__func__, (void *)pthread_self(), __func__, fdb->dbname, fdb->users); assert(fdb->users >= 0); Pthread_mutex_unlock(&fdb->users_mtx); @@ -1586,6 +1585,7 @@ static int __lock_wrlock_exclusive(char *dbname) /* we got the lock, are there any lockless users ? */ if (fdb->users > 1) { + logmsg(LOGMSG_USER, "FOUND LOCKLESS USERS!!!!\n"); Pthread_rwlock_unlock(&fdb->h_rwlock); Pthread_rwlock_unlock(&fdbs.arr_lock); @@ -1594,8 +1594,10 @@ static int __lock_wrlock_exclusive(char *dbname) for a bdb write lock to be processed */ struct sql_thread *thd = pthread_getspecific(query_info_key); - if (!thd) + if (!thd) { + logmsg(LOGMSG_USER, "%s : couldn't find thd ... continuing\n", __func__); continue; + } rc = clnt_check_bdb_lock_desired(thd->clnt); if (rc) { @@ -1604,8 +1606,10 @@ static int __lock_wrlock_exclusive(char *dbname) return FDB_ERR_GENERIC; } + logmsg(LOGMSG_USER, "clnt_check_bdb_lock_desired returned 0 .... continuing\n"); continue; } else { + logmsg(LOGMSG_USER, "NO LOCKLESS USERS!!!!\n"); rc = FDB_NOERR; break; /* own fdb */ } @@ -4357,6 +4361,10 @@ int fdb_trans_commit(sqlclntstate *clnt, enum trans_clntcomm sideeffects) if (tran->errstr) // TODO: this can be non-null even when no error errstat_set_str(&clnt->osql.xerr, tran->errstr); clnt->osql.error_is_remote = 1; + + if (tran->is_cdb2api) { + logmsg(LOGMSG_ERROR, "cdb2api rc: %d, cdb2api errstr: %s\n", rc, cdb2_errstr(tran->fcon.hndl)); + } } if (clnt->use_2pc) { @@ -6262,6 +6270,12 @@ static fdb_push_connector_t *fdb_push_connector_create(const char *dbname, return NULL; int rc = fdb_get_remote_version(fdb->dbname, tblname, fdb->class, local, &remote_version, &err); + + /*if (sqlite3UnlockTable(dbname, tblname)) { + logmsg(LOGMSG_ERROR, "%s:%d Failed to unlock table %s on db %s\n!!", + __func__, __LINE__, tblname, dbname); + }*/ + destroy_fdb(fdb); switch (rc) { case FDB_NOERR: logmsg(LOGMSG_ERROR, "Table %s already exists, ver %llu\n", tblname, remote_version); @@ -6275,7 +6289,6 @@ static fdb_push_connector_t *fdb_push_connector_create(const char *dbname, tblname, rc, err.errval, err.errstr); return NULL; } - fdb_push_connector_t * push = fdb_push_create(dbname, class, override, local, type); return push; } @@ -6334,7 +6347,7 @@ static int _running_dist_ddl(struct schema_change_type *sc, char **errmsg, uint3 snprintf(numcols_str, 3, "%d", numcols); str[0] = "SET PARTITION NAME "; - strl[0] = strlen(str[0]) + strlen(sc->tablename) + 1; + strl[0] = strlen(str[0]) + strlen(sc->partition.u.genshard.tablename) + 1; str[1] = "SET PARTITION NUMDBS "; strl[1] = strlen(str[1]) + strlen(numdbs_str) + 1; @@ -6365,7 +6378,8 @@ static int _running_dist_ddl(struct schema_change_type *sc, char **errmsg, uint3 extra_set[5] = alloca(strl[5]); /* SET PARTITION NAME */ - snprintf(extra_set[0], strl[0], "%s%s", str[0], sc->tablename); + snprintf(extra_set[0], strl[0], "%s%s", str[0], sc->partition.u.genshard.tablename); + logmsg(LOGMSG_USER, "PARTITION NAME SET : %s\n", extra_set[0]); /* SET PARTITION NUMDBS */ snprintf(extra_set[1], strl[1], "%s%s", str[1], numdbs_str); @@ -6422,6 +6436,7 @@ static int _running_dist_ddl(struct schema_change_type *sc, char **errmsg, uint3 /* commit the transaction */ rc = osql_sock_commit(clnt, OSQL_SOCK_REQ, TRANS_CLNTCOMM_NORMAL); + if (rc) { logmsg(LOGMSG_ERROR, "%s Failed to commit ddl transaction rc %d\n", __func__, rc); *errmsg = "failed to commit"; diff --git a/db/gen_shard.c b/db/gen_shard.c index 561a1388fe..fa32cb314e 100644 --- a/db/gen_shard.c +++ b/db/gen_shard.c @@ -15,6 +15,13 @@ int cson_extract_int(cson_object *cson_obj, const char *param, struct errstat *err); cson_array *cson_extract_array(cson_object *cson_obj, const char *param, struct errstat *err); +int views_sqlite_del_view(const char *view_name, sqlite3 *db, + struct errstat *err); +char *describe_row(const char *tblname, const char *prefix, + enum views_trigger_op op_type, struct errstat *err); + + + int gen_shard_serialize_shard(const char *tablename, uint32_t numdbs, char **dbnames, uint32_t numcols, char **columns, char **shardnames, uint32_t *outLen, char **out) { @@ -92,14 +99,9 @@ int gen_shard_serialize_shard(const char *tablename, uint32_t numdbs, char **dbn return -1; } -int gen_shard_llmeta_write_serialized_str(tran_type *tran, const char *tablename, const char *str) { - int rc; - if (str){ - rc = bdb_set_table_parameter(tran, LLMETA_GENERIC_SHARD, tablename, str); - } else { - /* this is a partition drop */ - rc = bdb_clear_table_parameter(tran, LLMETA_GENERIC_SHARD, tablename); - } +int gen_shard_llmeta_write_serialized_str(tran_type *tran, const char *tablename, char *str) { + int rc = 0, bdberr = 0; + rc = bdb_set_genshard(tran, tablename, str, &bdberr); if (rc) { logmsg(LOGMSG_ERROR, "FAILED TO WRITE SHARD STRING TO LLMETA. RC: %d\n", rc); @@ -143,22 +145,27 @@ int gen_shard_llmeta_add(tran_type *tran, char *tablename, uint32_t numdbs, char int gen_shard_llmeta_read(void *tran, const char *name, char **pstr) { - int rc; + int rc, bdberr = 0, size = 0; *pstr = NULL; - rc = bdb_get_table_parameter_tran(LLMETA_GENERIC_SHARD, name, pstr, tran); + /*rc = bdb_get_table_parameter_tran(LLMETA_GENERIC_SHARD, name, pstr, tran); if (rc) { logmsg(LOGMSG_ERROR, "bdb_get_table_parameter_tran failed with err: %d\n", rc); + }*/ + + rc = bdb_get_genshard(tran, name, pstr, &size, &bdberr); + if (rc) { + logmsg(LOGMSG_ERROR, "bdb_get_genshard failed with err: %d\n", rc); } return rc; } -int gen_shard_deserialize_shard(uint32_t *numdbs, char ***dbnames, uint32_t *numcols, char ***columns, char ***shardnames, char *serializedStr) { +int gen_shard_deserialize_shard(char **genshard_name, uint32_t *numdbs, char ***dbnames, uint32_t *numcols, char ***columns, char ***shardnames, char *serializedStr) { cson_object *rootObj = NULL; cson_value *rootVal = NULL, *arrVal = NULL; cson_array *dbs_arr = NULL, *cols_arr = NULL, *shards_arr = NULL; - const char *tablename = NULL; + char *tablename = NULL; char **dbs = NULL, **cols = NULL, **shards = NULL; char *err_str = NULL; int num_dbs = 0, num_cols = 0; @@ -175,7 +182,7 @@ int gen_shard_deserialize_shard(uint32_t *numdbs, char ***dbnames, uint32_t *num rc = cson_value_is_object(rootVal); rootObj = cson_value_get_object(rootVal); - tablename = cson_extract_str(rootObj, "TABLENAME", &err); + tablename = (char *)cson_extract_str(rootObj, "TABLENAME", &err); if (!tablename) { err_str = "INVALID CSON. Couldn't find 'TABLENAME' key"; goto error; @@ -251,7 +258,7 @@ int gen_shard_deserialize_shard(uint32_t *numdbs, char ***dbnames, uint32_t *num } shards[i] = strdup(cson_value_get_cstr(arrVal)); } - + *genshard_name = strdup(tablename); *numdbs = num_dbs; *dbnames = dbs; *numcols = num_cols; @@ -294,10 +301,45 @@ int gen_shard_deserialize_shard(uint32_t *numdbs, char ***dbnames, uint32_t *num } +int gen_shard_clear_inmem_db(void *tran, struct dbtable *db) { + if (!db) { + logmsg(LOGMSG_ERROR, "dbtable object can't be NULL here!\n"); + return -1; + } + + if (db->genshard_name) { + free(db->genshard_name); + db->genshard_name = NULL; + } + + for(int i=0;inumdbs;i++) { + if (db->dbnames[i]) { + free(db->dbnames[i]); + } + if (db->shardnames[i]) { + free(db->shardnames[i]); + } + } + + db->numdbs = 0; + db->dbnames = NULL; + db->shardnames = NULL; + + for(int i=0;inumcols;i++){ + if (db->columns[i]) { + free(db->columns[i]); + } + } + + db->numcols = 0; + db->columns = NULL; + return 0; +} + int gen_shard_update_inmem_db(void *tran, struct dbtable *db, const char *name) { char *serializedStr = NULL; uint32_t numdbs = 0, numcols = 0; - char **dbnames = NULL, **columns = NULL, **shardnames = NULL; + char **dbnames = NULL, **columns = NULL, **shardnames = NULL, *genshard_name = NULL; int rc = 0; rc = gen_shard_llmeta_read(tran, name, &serializedStr); if (rc) { @@ -305,13 +347,14 @@ int gen_shard_update_inmem_db(void *tran, struct dbtable *db, const char *name) goto done; } - rc = gen_shard_deserialize_shard(&numdbs, &dbnames, &numcols, &columns, &shardnames, serializedStr); + rc = gen_shard_deserialize_shard(&genshard_name,&numdbs, &dbnames, &numcols, &columns, &shardnames, serializedStr); if (rc) { logmsg(LOGMSG_ERROR, "Failed to deserialized llmeta str for table %s\n", name); goto done; } /*update the table object*/ + db->genshard_name = genshard_name; db->numdbs = numdbs; db->dbnames = dbnames; db->numcols = numcols; @@ -324,4 +367,156 @@ int gen_shard_update_inmem_db(void *tran, struct dbtable *db, const char *name) return rc; } +char *gen_shard_create_view_query(struct dbtable *tbl, sqlite3 *db, struct errstat *err) +{ + char *select_str = NULL; + char *cols_str = NULL; + char *tmp_str = NULL; + char *ret_str = NULL; + int numshards = tbl->numdbs; + const char *viewname = tbl->genshard_name; + char **dbnames = tbl->dbnames; + char **shardnames = tbl->shardnames; + int i; + cols_str = sqlite3_mprintf("rowid as __hidden__rowid, "); + if (!cols_str) { + goto malloc; + } + + cols_str = describe_row(tbl->tablename, cols_str, VIEWS_TRIGGER_QUERY, err); + if (!cols_str) { + /* preserve error, if any */ + if (err->errval != VIEW_NOERR) + return NULL; + goto malloc; + } else { + if (gbl_gen_shard_verbose) { + logmsg(LOGMSG_USER, "GOT cols_str as %s\n", cols_str); + } + } + + select_str = sqlite3_mprintf(""); + i = 0; + logmsg(LOGMSG_USER, "num shards is : %d\n", numshards); + for(;i 0) ? " UNION ALL " : "", cols_str, + dbnames[i], shardnames[i]); + sqlite3_free(select_str); + if (!tmp_str) { + sqlite3_free(cols_str); + goto malloc; + } + select_str = tmp_str; + } + + ret_str = sqlite3_mprintf("CREATE VIEW %w AS %s", viewname, select_str); + if (!ret_str) { + sqlite3_free(select_str); + sqlite3_free(cols_str); + goto malloc; + } + + sqlite3_free(select_str); + sqlite3_free(cols_str); + + if (gbl_gen_shard_verbose) { + logmsg(LOGMSG_USER, "THE GENERATED VIEW QUERY IS %s\n", ret_str); + } + + return ret_str; + +malloc: + err->errval = VIEW_ERR_MALLOC; + snprintf(err->errstr, sizeof(err->errstr), "View %s out of memory\n", viewname); + return NULL; +} + +int gen_shard_run_sql(sqlite3 *db, char *stmt, struct errstat *err) +{ + char *errstr = NULL; + int rc; + + /* create the view */ + rc = sqlite3_exec(db, stmt, NULL, NULL, &errstr); + if (rc != SQLITE_OK) { + err->errval = VIEW_ERR_BUG; + snprintf(err->errstr, sizeof(err->errstr), "Sqlite error \"%s\"", errstr); + /* can't control sqlite errors */ + err->errstr[sizeof(err->errstr) - 1] = '\0'; + + logmsg(LOGMSG_USER, "%s: sqlite error \"%s\" sql \"%s\"\n", __func__, errstr, stmt); + + if (errstr) + sqlite3_free(errstr); + return err->errval; + } + + return VIEW_NOERR; +} + +int gen_shard_add_view(struct dbtable *tbl, sqlite3 *db, struct errstat *err) +{ + char *stmt_str; + int rc; + + /* create the statement */ + stmt_str = gen_shard_create_view_query(tbl, db, err); + if (!stmt_str) { + return err->errval; + } + + rc = gen_shard_run_sql(db, stmt_str, err); + + logmsg(LOGMSG_USER, "+++++++++++sql: %s, rc: %d\n", stmt_str, rc); + /* free the statement */ + sqlite3_free(stmt_str); + + if (rc != VIEW_NOERR) { + return err->errval; + } + return rc; +} + +int gen_shard_update_sqlite(sqlite3 *db, struct errstat *err) +{ + Table *tab; + int rc; + for (int tbl_idx = 0; tbl_idx < thedb->num_dbs; ++tbl_idx) { + struct dbtable *tbl = thedb->dbs[tbl_idx]; + if (tbl->genshard_name) { + logmsg(LOGMSG_USER, "TRYING ADD VIEW FOR TABLE %s PART OF GENSHARD %s\n", tbl->tablename, tbl->genshard_name); + /* this table is a component shard of a genshard table*/ + tab = sqlite3FindTableCheckOnly(db, tbl->genshard_name, NULL); + if (tab) { + /* found view, is it the same version ? */ + if (tbl->tableversion != tab->version) { + /* older version, destroy current view */ + rc = views_sqlite_del_view(tbl->genshard_name, db, err); + if (rc != VIEW_NOERR) { + logmsg(LOGMSG_ERROR, "%s: failed to remove old view\n", __func__); + goto done; + } + } else { + /* up to date, nothing to do */ + continue; + } + } + rc = gen_shard_add_view(tbl, db, err); + if (rc != VIEW_NOERR) { + goto done; + } + } + } + rc = VIEW_NOERR; +done: + return rc; +} + +int is_gen_shard(const char *tablename) { + struct dbtable *db = get_dbtable_by_name(tablename); + if (db && strcmp(db->genshard_name, tablename)==0) { + return 1; + } + return 0; +} diff --git a/db/gen_shard.h b/db/gen_shard.h index 16cfebcbcb..3d4f18714f 100644 --- a/db/gen_shard.h +++ b/db/gen_shard.h @@ -4,4 +4,8 @@ int gen_shard_llmeta_add(tran_type *tran, char *tablename, uint32_t numdbs, char uint32_t numcols, char **columns, char **shardnames, struct errstat *); int gen_shard_update_inmem_db(void *tran, struct dbtable *db, const char *name); int gen_shard_llmeta_remove(tran_type *tran, char *tablename, struct errstat *err); +int gen_shard_update_sqlite(sqlite3 *db, struct errstat *err); +int gen_shard_add_view(struct dbtable *tbl, sqlite3 *db, struct errstat *err); +int is_gen_shard(const char *tablename); +int gen_shard_clear_inmem_db(void *tran, struct dbtable *db); #endif diff --git a/db/osqlsqlthr.c b/db/osqlsqlthr.c index 9cbf4fd067..6bbdff4b24 100644 --- a/db/osqlsqlthr.c +++ b/db/osqlsqlthr.c @@ -1838,6 +1838,7 @@ int osql_schemachange_logic(struct schema_change_type *sc, int usedb) } snprintf(sc->partition.u.genshard.tablename, sizeof(sc->partition.u.genshard.tablename), "%s", clnt->remsql_set.tablename); + logmsg(LOGMSG_USER, "%s setting genshard.tablename to %s\n", __func__ , clnt->remsql_set.tablename); sc->partition.u.genshard.numdbs = clnt->remsql_set.numdbs; sc->partition.u.genshard.dbnames = malloc(sizeof(char*) * clnt->remsql_set.numdbs); for (int i = 0; i < clnt->remsql_set.numdbs; i++) { diff --git a/db/sqlglue.c b/db/sqlglue.c index 4f1aef0c57..3c59302315 100644 --- a/db/sqlglue.c +++ b/db/sqlglue.c @@ -1927,7 +1927,10 @@ static int create_sqlmaster_record(struct dbtable *tbl, void *tran) { int field; char namebuf[128]; - char *tablename = tbl->sqlaliasname ? tbl->sqlaliasname : tbl->tablename; + /* Don't expose tbls with genshardnames (which are stored as sqlaliasnames in comdb2 land) + * We want to create views in sqlite with the genshardname -> adding it to sqlite_master here + * will create a conflict */ + char *tablename = tbl->sqlaliasname ? (tbl->genshard_name ? tbl->tablename : tbl->sqlaliasname) : tbl->tablename; struct schema *schema = tbl->schema; if (schema == NULL) { diff --git a/db/sqlinterfaces.c b/db/sqlinterfaces.c index 5bbcd3122f..13c9c49967 100644 --- a/db/sqlinterfaces.c +++ b/db/sqlinterfaces.c @@ -113,7 +113,7 @@ #include #include #include - +#include "gen_shard.h" /* ** WARNING: These enumeration values are not arbitrary. They represent ** indexes into the array of meta-command names contained in @@ -4305,6 +4305,12 @@ static int prepare_engine(struct sqlthdstate *thd, struct sqlclntstate *clnt, } } + rc = gen_shard_update_sqlite(thd->sqldb, &xerr); + if (rc) { + logmsg(LOGMSG_FATAL, "Failed to create views for generic shards." + "rc=%d errstr=%s\n", xerr.errval, xerr.errstr); + abort(); + } /* save the views generation number */ thd->views_gen = gbl_views_gen; } diff --git a/db/tag.c b/db/tag.c index 59f6c53326..a8c96aaaa3 100644 --- a/db/tag.c +++ b/db/tag.c @@ -6154,6 +6154,7 @@ void freedb_int(dbtable *db, dbtable *replace) int i; int dbs_idx; char *sqlaliasname = db->sqlaliasname; + char *genshard_name = db->genshard_name; const char *timepartition_name = db->timepartition_name; char **dbnames = alloca(sizeof(char*) * db->numdbs); for (i = 0; i < db->numdbs; i++) @@ -6167,6 +6168,7 @@ void freedb_int(dbtable *db, dbtable *replace) if (!replace) { free(sqlaliasname); + free(genshard_name); for (i = 0; i < db->numdbs; i++) { free(dbnames[i]); } @@ -6242,6 +6244,7 @@ void freedb_int(dbtable *db, dbtable *replace) db->dbs_idx = dbs_idx; db->sqlaliasname = sqlaliasname; db->timepartition_name = timepartition_name; + db->genshard_name = genshard_name; for (i = 0; i < db->numdbs; i++) db->dbnames[i] = dbnames[i]; for (i = 0; i < db->numcols; i++) diff --git a/db/views.c b/db/views.c index d19193bb19..658fc4932b 100644 --- a/db/views.c +++ b/db/views.c @@ -97,7 +97,7 @@ static int _views_do_op(timepart_views_t *views, const char *name, int (*op)(timepart_views_t *, timepart_view_t *, struct errstat *), struct errstat *err); -static char *_describe_row(const char *tblname, const char *prefix, +char *describe_row(const char *tblname, const char *prefix, enum views_trigger_op op_type, struct errstat *err); static void *timepart_cron_kickoff(struct cron_event *_, struct errstat *err); static int _next_shard_exists(timepart_view_t *view, char *newShardName, @@ -1439,7 +1439,7 @@ int timepart_dump_timepartitions(FILE *dest) return has_tp; } -static char *_describe_row(const char *tblname, const char *prefix, +char *describe_row(const char *tblname, const char *prefix, enum views_trigger_op op_type, struct errstat *err) { struct dbtable *gdb; diff --git a/db/views.h b/db/views.h index 6b5384cab6..f5464e0612 100644 --- a/db/views.h +++ b/db/views.h @@ -217,7 +217,7 @@ int views_sqlite_add_view(timepart_view_t *view, sqlite3 *db, * * */ -int views_sqlite_del_view(timepart_view_t *view, sqlite3 *db, +int views_sqlite_del_timepart_view(timepart_view_t *view, sqlite3 *db, struct errstat *err); /** diff --git a/db/views_sqlite.c b/db/views_sqlite.c index 5c8b62a4fc..6bd68b1d24 100644 --- a/db/views_sqlite.c +++ b/db/views_sqlite.c @@ -55,7 +55,7 @@ int views_sqlite_update(timepart_views_t *views, sqlite3 *db, /* found view, is it the same version ? */ if (view->version != tab->version) { /* older version, destroy current view */ - rc = views_sqlite_del_view(view, db, err); + rc = views_sqlite_del_timepart_view(view, db, err); if (rc != VIEW_NOERR) { logmsg(LOGMSG_ERROR, "%s: failed to remove old view\n", __func__); @@ -126,7 +126,7 @@ int views_sqlite_add_view(timepart_view_t *view, sqlite3 *db, } /* internal view delete function, callable from sqlite callback */ -int _views_sqlite_del_view(const char *view_name, sqlite3 *db, +int views_sqlite_del_view(const char *view_name, sqlite3 *db, struct errstat *err) { int rc; @@ -167,10 +167,10 @@ int _views_sqlite_del_view(const char *view_name, sqlite3 *db, * * */ -int views_sqlite_del_view(timepart_view_t *view, sqlite3 *db, +int views_sqlite_del_timepart_view(timepart_view_t *view, sqlite3 *db, struct errstat *err) { - return _views_sqlite_del_view(view->name, db, err); + return views_sqlite_del_view(view->name, db, err); } static char *_views_create_view_query(timepart_view_t *view, sqlite3 *db, @@ -195,7 +195,7 @@ static char *_views_create_view_query(timepart_view_t *view, sqlite3 *db, if (!cols_str) { goto malloc; } - cols_str = _describe_row(table0name, cols_str, VIEWS_TRIGGER_QUERY, err); + cols_str = describe_row(table0name, cols_str, VIEWS_TRIGGER_QUERY, err); if (!cols_str) { /* preserve error, if any */ @@ -314,7 +314,7 @@ static int _view_delete_if_missing(const char *name, sqlite3 *db, void *arg) /* if the view doesn't exist anymore, delete it */ if (i >= views->nviews) { - rc = _views_sqlite_del_view(name, db, &err); + rc = views_sqlite_del_view(name, db, &err); if (rc != VIEW_NOERR) { logmsg(LOGMSG_ERROR, "%s: failed to clear old view %s rc=%d str=%s\n", __func__, name, rc, err.errstr); diff --git a/db/views_updates.c b/db/views_updates.c index 0f3299b69f..bb02b01049 100644 --- a/db/views_updates.c +++ b/db/views_updates.c @@ -148,7 +148,7 @@ char *_views_create_update_trigger_query(timepart_view_t *view, assert(view->nshards >= 1); - cols_str = _describe_row(view->shards[view->current_shard].tblname, NULL, + cols_str = describe_row(view->shards[view->current_shard].tblname, NULL, VIEWS_TRIGGER_UPDATE, err); if (!cols_str) { sqlite3_free(ret_str); @@ -194,7 +194,7 @@ char *_views_create_insert_trigger_query(timepart_view_t *view, assert(view->nshards >= 1); - cols_str = _describe_row(view->shards[view->current_shard].tblname, NULL, + cols_str = describe_row(view->shards[view->current_shard].tblname, NULL, VIEWS_TRIGGER_INSERT, err); if (!cols_str) { goto oom; diff --git a/schemachange/sc_add_table.c b/schemachange/sc_add_table.c index ee24f8eb17..28395eb710 100644 --- a/schemachange/sc_add_table.c +++ b/schemachange/sc_add_table.c @@ -368,16 +368,8 @@ int finalize_add_table(struct ireq *iq, struct schema_change_type *s, return SC_INTERNAL_ERROR; } } else if (s->partition.type == PARTITION_ADD_GENSHARD) { - /* NOTE: for the purpose of this test stub, we create an sql alias; the actual - * sharding will have a view created here - */ - hash_sqlalias_db(db, s->partition.u.genshard.tablename); - rc = bdb_set_table_sqlalias(db->tablename, tran, db->sqlaliasname); - if (rc) { - hash_sqlalias_db(db, NULL); - logmsg(LOGMSG_ERROR, "Failed to alias genshard table"); - return -1; - } + struct errstat err = {0}; + db->genshard_name = strdup(s->partition.u.genshard.tablename); db->numdbs = s->partition.u.genshard.numdbs; db->dbnames = (char **)malloc(sizeof(char*) * db->numdbs); for (int i = 0; i < db->numdbs; i++) { @@ -395,12 +387,14 @@ int finalize_add_table(struct ireq *iq, struct schema_change_type *s, db->shardnames[i] = strdup(s->partition.u.genshard.shardnames[i]); } /*write to llmeta*/ - struct errstat err = {0}; if (gen_shard_llmeta_add(tran, s->partition.u.genshard.tablename, s->partition.u.genshard.numdbs, s->partition.u.genshard.dbnames, s->partition.u.genshard.numcols, s->partition.u.genshard.columns, s->partition.u.genshard.shardnames, &err)) { sc_errf(s, "failed to write shard info to llmeta for shard %s. rc: %d, err: %s\n", s->partition.u.genshard.tablename, rc, err.errstr); - hash_sqlalias_db(db, NULL); + if (db->genshard_name) { + free(db->genshard_name); + db->genshard_name = NULL; + } return -1; } } diff --git a/schemachange/sc_callbacks.c b/schemachange/sc_callbacks.c index 0aa89927b4..045f09bf99 100644 --- a/schemachange/sc_callbacks.c +++ b/schemachange/sc_callbacks.c @@ -747,26 +747,12 @@ static int scdone_alter(const char tablename[], void *arg, scdone_t type) return rc; } -static int scdone_add(const char tablename[], void *arg, scdone_t type) -{ - tran_type *tran; - uint32_t lid; +static int scdone_add_int(const char tablename[], void *arg, scdone_t type, tran_type *tran) { + char *table_copy = strdup(tablename); + char *csc2text = NULL; int bdberr; int rc; struct dbtable *db = NULL; - - if (gbl_assert_systable_locks) - assert(bdb_has_tablename_locked(thedb->bdb_env, "comdb2_tables", - gbl_rep_lockid, - TABLENAME_LOCKED_WRITE)); - - tran = _tran(&lid, &bdberr, __func__, __LINE__); - if (!tran) - return bdberr; - - char *table_copy = strdup(tablename); - char *csc2text = NULL; - if (get_csc2_file_tran(tablename, -1, &csc2text, NULL, tran)) { logmsg(LOGMSG_ERROR, "%s: error getting schema for %s.\n", __func__, tablename); @@ -785,21 +771,6 @@ static int scdone_add(const char tablename[], void *arg, scdone_t type) add_dbtable_to_thedb_dbs(db); - /* this is used by testgenshard, creating a partition name alias for the actual shard */ - char *sqlalias = NULL; - rc = bdb_get_table_sqlalias_tran(db->tablename, tran, &sqlalias); - if (sqlalias){ - hash_sqlalias_db(db, sqlalias); - } - - /* 'dbnames' is passed as a string arg while finalizing an add of a generic sharded table */ - if (arg && strcmp(arg, "dbnames") == 0 && db->sqlaliasname) { - rc = gen_shard_update_inmem_db(tran, db, db->sqlaliasname); - if (rc != 0) { - logmsg(LOGMSG_ERROR, "REPLICANT FAILED TO UPDATE GENERIC SHARD INFO\n"); - } - } - _master_recs(tran, tablename, type); @@ -841,12 +812,27 @@ static int scdone_add(const char tablename[], void *arg, scdone_t type) } rc = _db_dbnum(tran, db, &bdberr); - if (rc) - goto done; + return rc; +} -done: - _untran(tran, lid); +static int scdone_add(const char tablename[], void *arg, scdone_t type) +{ + tran_type *tran; + uint32_t lid; + int bdberr; + int rc; + if (gbl_assert_systable_locks) + assert(bdb_has_tablename_locked(thedb->bdb_env, "comdb2_tables", + gbl_rep_lockid, + TABLENAME_LOCKED_WRITE)); + + tran = _tran(&lid, &bdberr, __func__, __LINE__); + if (!tran) + return bdberr; + rc = scdone_add_int(tablename, arg, type, tran); + + _untran(tran, lid); return rc; } @@ -926,6 +912,22 @@ static int scdone_addandfastinit(const char tablename[], void *arg, return scdone_fastinit(tablename, arg, type); } +static int scdone_drop_int(const char tablename[], void *arg, scdone_t type, tran_type *tran) { + int rc; + rc = _anti_deadlock(tran, tablename); + if (rc) + return rc; + + logmsg(LOGMSG_INFO, "Replicant dropping table:%s\n", tablename); + if (delete_table_rep((char *)tablename, tran)) { + logmsg(LOGMSG_FATAL, "%s: error deleting table %s.\n", __func__, + tablename); + exit(1); + } + _master_recs(tran, tablename, type); + return rc; +} + static int scdone_drop(const char tablename[], void *arg, scdone_t type) { tran_type *tran; @@ -942,20 +944,8 @@ static int scdone_drop(const char tablename[], void *arg, scdone_t type) if (!tran) return bdberr; - rc = _anti_deadlock(tran, tablename); - if (rc) - goto done; + rc = scdone_drop_int(tablename, arg, type, tran); - logmsg(LOGMSG_INFO, "Replicant dropping table:%s\n", tablename); - if (delete_table_rep((char *)tablename, tran)) { - logmsg(LOGMSG_FATAL, "%s: error deleting table %s.\n", __func__, - tablename); - exit(1); - } - - _master_recs(tran, tablename, type); - -done: _untran(tran, lid); return rc; } @@ -1270,6 +1260,77 @@ static int scdone_alias(const char tablename[], void *arg, scdone_t type) _untran(tran, lid); return 0; } + +static int scdone_genshard_add(const char tablename[] , void *arg, scdone_t type) +{ + tran_type *tran = NULL; + uint32_t lid = 0; + int rc = 0; + int bdberr = 0; + struct dbtable *db = NULL; + assert(arg!=NULL); + const char *genshard_name = (char *)arg; + logmsg(LOGMSG_USER, "%s: SHARDNAME IS %s\n", __func__, genshard_name); + tran = _tran(&lid, &bdberr, __func__, __LINE__); + if (!tran) + return bdberr; + rc = scdone_add_int(tablename, arg, type, tran); + if (rc) { + logmsg(LOGMSG_ERROR, "%s scdone_add_int failed. rc %d\n", __func__, rc); + goto done; + } + db = get_dbtable_by_name(tablename); + if (!db) { + logmsg(LOGMSG_ERROR, "scdone_add_int failed to add table %s\n", tablename); + } + logmsg(LOGMSG_USER, "++++++ %s : calling update gen_shard_update_inmem_db\n", __func__); + rc = gen_shard_update_inmem_db(tran, db, genshard_name); + if (rc) { + logmsg(LOGMSG_ERROR, "REPLICANT FAILED TO UPDATE GENERIC SHARD INFO\n"); + goto done; + } + hash_sqlalias_db(db, db->genshard_name); +done: + _untran(tran, lid); + return rc; +} + + +static int scdone_genshard_drop(const char tablename[], void *arg, scdone_t type) +{ + tran_type *tran = NULL; + uint32_t lid = 0; + int rc = 0; + int bdberr = 0; + struct dbtable *db = get_dbtable_by_name(tablename); + if (db==NULL) { + logmsg(LOGMSG_USER, "%s GOT DBTABLE AS NULL. ABORTING.. \n", __func__); + abort(); + } + /*assert(arg!=NULL); + const char *genshard_name = (char *)arg; + logmsg(LOGMSG_USER, "%s: SHARDNAME IS %s\n", __func__, genshard_name); + logmsg(LOGMSG_USER, "%s: TABLENAME IS %s\n", __func__, tablename); + logmsg(LOGMSG_USER, "%s: db->tablename IS %s\n", __func__, db->tablename); + logmsg(LOGMSG_USER, "%s: db->genshard_name IS %s\n", __func__, db->genshard_name);*/ + tran = _tran(&lid, &bdberr, __func__, __LINE__); + if (!tran) + return bdberr; + + logmsg(LOGMSG_USER, "++++++ %s : calling update gen_shard_update_inmem_db\n", __func__); + rc = gen_shard_clear_inmem_db(tran, db); + if (rc != 0) { + logmsg(LOGMSG_ERROR, "REPLICANT FAILED TO UPDATE GENERIC SHARD INFO\n"); + goto done; + } + hash_sqlalias_db(db, db->tablename); + rc = scdone_drop_int(tablename, arg, type, tran); + /* clear the alias */ +done: + _untran(tran, lid); + return rc; +} + /* keep this in sync with enum scdone */ int (*SCDONE_CALLBACKS[])(const char *, void *, scdone_t) = { &scdone_alter, &scdone_addandfastinit, /* fastinit AND add (doh) */ @@ -1280,7 +1341,8 @@ int (*SCDONE_CALLBACKS[])(const char *, void *, scdone_t) = { &scdone_llmeta_queue, &scdone_genid48, &scdone_genid48, &scdone_lua_sfunc, &scdone_lua_afunc, &scdone_rename_table, &scdone_change_stripe, &scdone_user_view, &scdone_queue_file, - &scdone_queue_file, &scdone_rename_table, &scdone_alias}; + &scdone_queue_file, &scdone_rename_table, &scdone_alias, + &scdone_genshard_add, &scdone_genshard_drop}; /* TODO fail gracefully now that inline? */ /* called by bdb layer through a callback as a detached thread, diff --git a/schemachange/sc_drop_table.c b/schemachange/sc_drop_table.c index 7faa6eba01..cc562917d1 100644 --- a/schemachange/sc_drop_table.c +++ b/schemachange/sc_drop_table.c @@ -143,11 +143,13 @@ int finalize_drop_table(struct ireq *iq, struct schema_change_type *s, } } else if (s->partition.type == PARTITION_REM_GENSHARD) { struct errstat err = {0}; - rc = gen_shard_llmeta_remove(tran, db->sqlaliasname, &err); + rc = gen_shard_llmeta_remove(tran, db->genshard_name, &err); if (rc) { logmsg(LOGMSG_ERROR, "Failed to remove llmeta entry for sharded table %s\n", s->tablename); return SC_INTERNAL_ERROR; } + /* remove the alias name */ + hash_sqlalias_db(db, db->tablename); } live_sc_off(db); diff --git a/schemachange/sc_logic.c b/schemachange/sc_logic.c index b6576f87f9..f9bd9d5707 100644 --- a/schemachange/sc_logic.c +++ b/schemachange/sc_logic.c @@ -290,6 +290,35 @@ static inline int replication_only_error_code(int rc) return 0; } +int llog_scdone_genshard(bdb_state_type *bdb_state, int genshard_scdone_type, + char *genshard_name, struct schema_change_type *s, + tran_type *tran, int *bdberr) +{ + int rc = 0; + char *mashup = NULL; + int oldlen = 0; + int newlen = 0; + mashup = s->tablename; + oldlen = strlen(s->tablename) + 1; + logmsg(LOGMSG_USER, "CALLING LLOG_SCDONE_GENSHARD WITH GENSHARD_NAME %s\n, and genshard_scdone_type %d",genshard_name, genshard_scdone_type); + if (s->partition.type == PARTITION_ADD_GENSHARD) { + newlen = strlen(genshard_name) + 1; + mashup = alloca(oldlen + newlen); + memcpy(mashup, s->tablename, oldlen); /* old */ + memcpy(mashup + oldlen, genshard_name, newlen); /* new */ + } + if (tran) { + rc = bdb_llog_scdone_tran(bdb_state, genshard_scdone_type, tran, mashup, oldlen+newlen, bdberr); + } else { + rc = bdb_llog_scdone(bdb_state, genshard_scdone_type, mashup, oldlen + newlen, 1, bdberr); + } + if (rc) { + logmsg(LOGMSG_ERROR, "bdb_llog_scdone failed for genshard %s. rc: %d, bdberr: %d\n", s->partition.u.genshard.tablename, rc, *bdberr); + } + + return rc; +} + int llog_scdone_rename_wrapper(bdb_state_type *bdb_state, struct schema_change_type *s, tran_type *tran, int *bdberr) @@ -298,6 +327,7 @@ int llog_scdone_rename_wrapper(bdb_state_type *bdb_state, char *mashup = NULL; int oldlen = 0; int newlen = 0; + char *genshard_name = NULL; if (s->db) { mashup = s->tablename; @@ -315,14 +345,20 @@ int llog_scdone_rename_wrapper(bdb_state_type *bdb_state, mashup = alloca(oldlen + newlen); memcpy(mashup, s->tablename, oldlen); /* old */ memcpy(mashup + oldlen, dst, newlen); /* new */ - } else if (s->done_type == add && s->partition.type == PARTITION_ADD_GENSHARD) { - char *key = "dbnames"; - newlen = strlen(key) + 1; - mashup = alloca(oldlen + newlen); - memcpy(mashup, s->tablename, oldlen); - memcpy(mashup + oldlen, key, newlen); } } + + if (s->done_type == add && s->partition.type == PARTITION_ADD_GENSHARD) { + s->done_type = genshard_add; + genshard_name = s->partition.u.genshard.tablename; + newlen = strlen(genshard_name) + 1; + mashup = alloca(oldlen + newlen); + memcpy(mashup, s->tablename, oldlen); /* old */ + memcpy(mashup + oldlen, genshard_name, newlen); /* new */ + } else if (s->done_type == drop && s->partition.type == PARTITION_REM_GENSHARD) { + s->done_type = genshard_drop; + } + if (!tran) rc = bdb_llog_scdone(bdb_state, s->done_type, mashup, oldlen + newlen, 1, bdberr); @@ -382,7 +418,6 @@ static int do_finalize(ddl_t func, struct ireq *iq, "Forcing replication error table %s '%s' for tran %p\n", bdb_get_scdone_str(s->done_type), s->tablename, tran); } - rc = llog_scdone_rename_wrapper(thedb->bdb_env, s, tran, &bdberr); if (rc || bdberr != BDBERR_NOERROR) { sc_errf(s, "Failed to send scdone rc=%d bdberr=%d\n", rc, bdberr); @@ -454,7 +489,7 @@ static int do_ddl(ddl_t pre, ddl_t post, struct ireq *iq, struct schema_change_type *s, tran_type *tran) { int rc, bdberr = 0; - + struct dbtable *db = NULL; if (s->resume == SC_OSQL_RESUME) { return s->sc_rc; } @@ -531,6 +566,12 @@ static int do_ddl(ddl_t pre, ddl_t post, struct ireq *iq, if (!rc && !s->is_osql) { create_sqlmaster_records(tran); create_sqlite_master(); + db = get_dbtable_by_name(s->tablename); + if (s->partition.type == PARTITION_ADD_GENSHARD) { + hash_sqlalias_db(db, db->genshard_name); + } else if (s->partition.type == PARTITION_REM_GENSHARD) { + hash_sqlalias_db(db, db->tablename); + } } if (local_lock) unlock_schema_lk(); diff --git a/sqlite/src/build.c b/sqlite/src/build.c index c27dfe2a92..66bd4b4a92 100644 --- a/sqlite/src/build.c +++ b/sqlite/src/build.c @@ -30,6 +30,7 @@ #include "comdb2Int.h" #include "pragma.h" #include "logmsg.h" +#include "gen_shard.h" int has_comdb2_index_for_sqlite(Table *pTab); int is_comdb2_index_unique(const char *dbname, char *idx); @@ -765,6 +766,7 @@ Table *sqlite3LocateTableItem( }else{ zDb = p->zDatabase; } + logmsg(LOGMSG_USER, "FOUND zDb as %s\n", zDb); #if defined(SQLITE_BUILDING_FOR_COMDB2) if( gbl_allow_user_schema ){ char tblName[MAXTABLELEN]; @@ -2792,7 +2794,16 @@ void sqlite3CreateView( sqlite3TwoPartName(pParse, pName1, pName2, &pName); iDb = sqlite3SchemaToIndex(db, p->pSchema); sqlite3FixInit(&sFix, pParse, iDb, "view", pName); - if( sqlite3FixSelect(&sFix, pSelect) ) goto create_view_fail; +#ifdef SQLITE_BUILDING_FOR_COMDB2 + char *tablename = (char *)alloca(pName->n+1); + strncpy(tablename, pName->z, pName->n); + tablename[pName->n]='\0'; + if( !is_gen_shard(tablename) && sqlite3FixSelect(&sFix, pSelect) ) { +#else + if ( sqlite3FixSelect(&sFix, pSelect) ) { +#endif + goto create_view_fail; + } /* Make a copy of the entire SELECT statement that defines the view. ** This will force all the Expr.token.z values to be dynamically @@ -3313,7 +3324,7 @@ void sqlite3DropTable(Parse *pParse, SrcList *pName, int isView, int noErr){ } if( !isView && pTab->pSelect ){ #if defined(SQLITE_BUILDING_FOR_COMDB2) - if (timepart_allow_drop(pTab->zName)) { + if ( !is_gen_shard(pTab->zName) && timepart_allow_drop(pTab->zName)) { #endif /* defined(SQLITE_BUILDING_FOR_COMDB2) */ sqlite3ErrorMsg(pParse, "use DROP VIEW to delete view %s", pTab->zName); goto exit_drop_table; diff --git a/sqlite/src/comdb2build.c b/sqlite/src/comdb2build.c index e075c83aad..2e7b26b06e 100644 --- a/sqlite/src/comdb2build.c +++ b/sqlite/src/comdb2build.c @@ -942,7 +942,7 @@ void comdb2DropTable(Parse *pParse, SrcList *pName) /* Check if this is a distributed drop */ struct dbtable *tbl = get_dbtable_by_name(sc->tablename); - if (tbl && tbl->sqlaliasname && tbl->dbnames[0]) { + if (tbl && tbl->genshard_name) { /* dropping a generic partition */ /* NOTE: there are two was to get here: * - initial drop table that is actually a partition (coordinator) @@ -955,7 +955,8 @@ void comdb2DropTable(Parse *pParse, SrcList *pName) sc->partition.type = thd->clnt->remsql_set.is_remsql == IS_REMCREATE ? PARTITION_REM_GENSHARD : PARTITION_REM_GENSHARD_COORD ; snprintf(sc->partition.u.genshard.tablename, - sizeof(sc->partition.u.genshard.tablename), "%s", tbl->sqlaliasname); + sizeof(sc->partition.u.genshard.tablename), "%s", tbl->genshard_name); + logmsg(LOGMSG_USER, "%s genshard table name is %s\n", __func__, sc->partition.u.genshard.tablename); } } @@ -7864,7 +7865,7 @@ void comdb2CreateGenShard(Parse* pParse, IdList *cols, IdList *dbs) } partition->type = PARTITION_ADD_GENSHARD_COORD; - + strncpy0(partition->u.genshard.tablename, pParse->comdb2_ddl_ctx->tablename, MAXTABLELEN); partition->u.genshard.numcols = cols->nId; partition->u.genshard.columns = calloc(cols->nId, sizeof(char*)); if (!partition->u.genshard.columns) { diff --git a/tests/remotecreate.test/runit b/tests/remotecreate.test/runit index 7d6a9d4b79..c54dec9806 100755 --- a/tests/remotecreate.test/runit +++ b/tests/remotecreate.test/runit @@ -67,22 +67,22 @@ function verify_table { } function verify_llmeta { expected=$1 - result=$($S0_SQLT "EXEC PROCEDURE sys.cmd.send('llmeta list')" | grep "gen_shard") + result=$($S0_SQLT "EXEC PROCEDURE sys.cmd.send('llmeta list')" | grep "LLMETA_GEN_SHARD") entry=`echo $result | cut -d ' ' -f 3 | cut -d '=' -f 2` if [ "$entry" != "$expected" ]; then fail_exit "verify_llmeta failed on $DBNAME. Expected $expected but got $entry" fi - result=$($S1_SQLT "EXEC PROCEDURE sys.cmd.send('llmeta list')" | grep "gen_shard") + result=$($S1_SQLT "EXEC PROCEDURE sys.cmd.send('llmeta list')" | grep "LLMETA_GEN_SHARD") entry=`echo $result | cut -d ' ' -f 3 | cut -d '=' -f 2` if [ "$entry" != "$expected" ]; then fail_exit "verify_llmeta failed on ${SECONDARY_DBNAME}. Expected $expected but got $entry" fi - result=$($S2_SQLT "EXEC PROCEDURE sys.cmd.send('llmeta list')" | grep "gen_shard") + result=$($S2_SQLT "EXEC PROCEDURE sys.cmd.send('llmeta list')" | grep "LLMETA_GEN_SHARD") entry=`echo $result | cut -d ' ' -f 3 | cut -d '=' -f 2` if [ "$entry" != "$expected" ]; then fail_exit "verify_llmeta failed on ${TERNARY_DBNAME}. Expected $expected but got $entry" fi - result=$($S3_SQLT "EXEC PROCEDURE sys.cmd.send('llmeta list')" | grep "gen_shard") + result=$($S3_SQLT "EXEC PROCEDURE sys.cmd.send('llmeta list')" | grep "LLMETA_GEN_SHARD") entry=`echo $result | cut -d ' ' -f 3 | cut -d '=' -f 2` if [ "$entry" != "$expected" ]; then fail_exit "verify_llmeta failed on ${QUATERNARY_DBNAME}. Expected $expected but got $entry"