From c65503ce0ae59f5624c5266dee58b20a32e7c1dd Mon Sep 17 00:00:00 2001 From: AlexStocks Date: Thu, 24 Aug 2017 19:55:20 +0800 Subject: [PATCH 1/2] bug fix: rmt can not exit when sync redis data to rdb files --- src/rmt_core.c | 27 +++++++++++++++++++++++++-- src/rmt_redis.c | 5 +++++ src/rmt_redis.h | 1 + 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/src/rmt_core.c b/src/rmt_core.c index 940d178..d22f945 100644 --- a/src/rmt_core.c +++ b/src/rmt_core.c @@ -1182,6 +1182,16 @@ static void *read_thread_run(void *args) redis_node *srnode; listNode *lnode; listIter *it; + static int read_num; + + read_num = listLength(nodes); + log_notice("init read event num:%u", read_num); + it = listGetIterator(nodes, AL_START_HEAD); + while((lnode = listNext(it)) != NULL){ + srnode = listNodeValue(lnode); + srnode->event_num = &read_num; + } + listReleaseIterator(it); it = listGetIterator(nodes, AL_START_HEAD); while((lnode = listNext(it)) != NULL){ @@ -1217,6 +1227,7 @@ static void *read_thread_run(void *args) listReleaseIterator(it); aeMain(rdata->loop); + log_notice("read thread over"); return 0; } @@ -1275,6 +1286,7 @@ static void *write_thread_run(void *args) rmt_write(srnode->sockpairfds[1], " ", 1); aeMain(wdata->loop); + log_notice("write thread over"); return 0; } @@ -2550,19 +2562,30 @@ void redis_migrate(rmtContext *ctx, int type) log_notice("migrate job is running..."); - aeMain(ctx->loop); + // aeMain(ctx->loop); //wait for the read job finish for(i = 0; i < read_threads_count; i ++){ rdata = array_get(read_datas, (uint32_t)i); pthread_join(rdata->thread_id, NULL); } + log_notice("migrate read job over"); //wait for the write job finish for(i = 0; i < write_threads_count; i ++){ - wdata = array_get(write_datas, (uint32_t)i); + wdata = array_get(write_datas, (uint32_t)i); + list *nodes = wdata->nodes; //type : source redis_node + redis_group *srgroup; + redis_node *srnode = listFirstValue(nodes); + if(srnode != NULL){ + srgroup = srnode->owner; + if(srgroup->kind != GROUP_TYPE_RDBFILE){ + aeStop(wdata->loop); + } + } pthread_join(wdata->thread_id, NULL); } + log_notice("migrate write job over"); done: diff --git a/src/rmt_redis.c b/src/rmt_redis.c index dfc58e6..9b83c6d 100644 --- a/src/rmt_redis.c +++ b/src/rmt_redis.c @@ -1651,6 +1651,11 @@ static void rmtReceiveRdb(aeEventLoop *el, int fd, void *privdata, int mask) log_notice("Rdb file received, disconnect from the node[%s]", srnode->addr); notice_write_thread(srnode); /* Let the next node begin replication */ + int event_num = __sync_fetch_and_sub(srnode->event_num, 1); + log_notice("current read event number:%u", event_num - 1); + if (event_num == 1) { + aeStop(el); + } return; } diff --git a/src/rmt_redis.h b/src/rmt_redis.h index a69bc43..0faa657 100644 --- a/src/rmt_redis.h +++ b/src/rmt_redis.h @@ -174,6 +174,7 @@ typedef struct redis_node{ long long timestamp; int sk_event; /* used to run some task */ + unsigned int *event_num; struct redis_node *next; /* next redis_node to begin replication */ }redis_node; From c9bbb8752272ffd71a54070140549d982992cec2 Mon Sep 17 00:00:00 2001 From: AlexStocks Date: Sat, 26 Aug 2017 21:14:29 +0800 Subject: [PATCH 2/2] add rdb file name prefix --- src/rmt.c | 9 ++++++++- src/rmt_conf.c | 12 +++++++++++- src/rmt_conf.h | 1 + src/rmt_core.h | 1 + src/rmt_redis.c | 7 ++++++- 5 files changed, 27 insertions(+), 3 deletions(-) diff --git a/src/rmt.c b/src/rmt.c index a67e284..f5b56aa 100644 --- a/src/rmt.c +++ b/src/rmt.c @@ -195,9 +195,12 @@ init_context(struct instance *rmti) destroy_context(rmt_ctx); return NULL; } - + rmt_ctx->dir = sdsdup(cf->dir); } + if (cf->rdb_prefix != CONF_UNSET_PTR && sdslen(cf->rdb_prefix) > 0) { + rmt_ctx->rdb_prefix = sdsdup(cf->rdb_prefix); + } rmt_ctx->loop = aeCreateEventLoop(1000); if (rmt_ctx->loop == NULL) { @@ -271,6 +274,10 @@ void destroy_context(rmtContext *rmt_ctx) sdsfree(rmt_ctx->dir); } + if (rmt_ctx->rdb_prefix != NULL) { + sdsfree(rmt_ctx->rdb_prefix); + } + while(listLength(&rmt_ctx->clients) > 0) { lnode = listFirst(&rmt_ctx->clients); c = listNodeValue(lnode); diff --git a/src/rmt_conf.c b/src/rmt_conf.c index 819b661..ebfd6ab 100644 --- a/src/rmt_conf.c +++ b/src/rmt_conf.c @@ -108,6 +108,9 @@ static conf_option conf_common_options[] = { { (char*)"dir", conf_set_string, offsetof(rmt_conf, dir) }, + { (char*)"rdb_prefix", + conf_set_string, + offsetof(rmt_conf, rdb_prefix) }, { (char*)"max_clients", conf_set_num, offsetof(rmt_conf, max_clients) }, @@ -694,6 +697,7 @@ static int conf_init(rmt_conf *cf) cf->rdb_diskless = CONF_UNSET_NUM; cf->source_safe = CONF_UNSET_NUM; cf->dir = CONF_UNSET_PTR; + cf->rdb_prefix = CONF_UNSET_PTR; cf->max_clients = CONF_UNSET_NUM; @@ -735,7 +739,12 @@ static void conf_deinit(rmt_conf *cf) sdsfree(cf->dir); cf->dir = CONF_UNSET_PTR; } - + + if(cf->rdb_prefix != NULL){ + sdsfree(cf->rdb_prefix); + cf->rdb_prefix = CONF_UNSET_PTR; + } + cf->maxmemory = CONF_UNSET_NUM; cf->threads = CONF_UNSET_NUM; cf->step = CONF_UNSET_NUM; @@ -792,6 +801,7 @@ conf_dump(rmt_conf *cf) log_debug(log_level, " rdb_diskless: %d", cf->rdb_diskless); log_debug(log_level, " source_safe: %d", cf->source_safe); log_debug(log_level, " dir: %s", cf->dir); + log_debug(log_level, " rdb_prefix: %s", cf->rdb_prefix); log_debug(log_level, " max_clients: %d", cf->max_clients); log_debug(log_level, " filter: %s", cf->filter); log_debug(log_level, ""); diff --git a/src/rmt_conf.h b/src/rmt_conf.h index 35b4762..31d2671 100644 --- a/src/rmt_conf.h +++ b/src/rmt_conf.h @@ -61,6 +61,7 @@ typedef struct rmt_conf { int rdb_diskless; int source_safe; sds dir; + sds rdb_prefix; int max_clients; sds filter; diff --git a/src/rmt_core.h b/src/rmt_core.h index 4f516b8..a4af804 100644 --- a/src/rmt_core.h +++ b/src/rmt_core.h @@ -199,6 +199,7 @@ typedef struct rmtContext { int source_safe; sds dir; + sds rdb_prefix; struct redis_group *srgroup; diff --git a/src/rmt_redis.c b/src/rmt_redis.c index 9b83c6d..2439501 100644 --- a/src/rmt_redis.c +++ b/src/rmt_redis.c @@ -1907,9 +1907,14 @@ static void rmtSyncRedisMaster(aeEventLoop *el, int fd, void *privdata, int mask if (ctx->dir != NULL) { rdb->fname = sdscatsds(rdb->fname, ctx->dir); rdb->fname = sdscat(rdb->fname, "/"); + if (ctx->rdb_prefix != NULL && sdslen(ctx->rdb_prefix)) { + rdb->fname = sdscatsds(rdb->fname, ctx->rdb_prefix); + } else { + rdb->fname = sdscat(rdb->fname, "node"); + } } rdb->fname = sdscatfmt(rdb->fname, - "node%s-%I-%i.rdb", + "%s-%I-%i.rdb", srnode->addr==NULL?"unknow":srnode->addr, rmt_usec_now(), (long int)getpid());