Skip to content

INTERNAL: Remove lock_arcus that are not in necessary #236 #242

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions libmemcached/arcus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ static pthread_mutex_t azk_mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t azk_cond = PTHREAD_COND_INITIALIZER;
static int azk_count;

#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_t lock_update_serverlist = PTHREAD_MUTEX_INITIALIZER;
#endif
pthread_mutex_t lock_arcus = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond_arcus = PTHREAD_COND_INITIALIZER;

Expand Down Expand Up @@ -104,7 +107,10 @@ static inline arcus_return_t do_arcus_init(memcached_st *mc,
memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_DISTRIBUTION, MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA_SPY);
memcached_behavior_set_key_hash(mc, MEMCACHED_HASH_MD5);

#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
do {
arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
if (arcus) {
Expand Down Expand Up @@ -155,7 +161,10 @@ static inline arcus_return_t do_arcus_init(memcached_st *mc,
/* Set the Arcus to memcached as a server manager. */
memcached_set_server_manager(mc, (void *)arcus);
} while(0);
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif

return rc;
}
Expand Down Expand Up @@ -310,15 +319,21 @@ arcus_return_t arcus_close(memcached_st *mc)
return rc;
}

#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
if (arcus) {
arcus->pool= NULL;
free(arcus);
arcus= NULL;
}
memcached_set_server_manager(mc, NULL);
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif

return ARCUS_SUCCESS;
}
Expand Down Expand Up @@ -613,7 +628,10 @@ static inline arcus_return_t do_arcus_zk_connect(memcached_st *mc)

inc_count(1);

#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
do {
arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
if (not arcus) {
Expand All @@ -626,20 +644,29 @@ static inline arcus_return_t do_arcus_zk_connect(memcached_st *mc)

ZOO_LOG_WARN(("Initiating zookeeper client"));

#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_lock(&lock_arcus);
#endif
/* Connect to ZooKeeper ensemble. */
arcus->zk.handle= zookeeper_init(arcus->zk.ensemble_list,
do_arcus_zk_watcher_global,
arcus->zk.session_timeout,
&(arcus->zk.myid),
(void *)mc,
ZOO_NO_FLAGS);
#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_unlock(&lock_arcus);
#endif
if (not arcus->zk.handle) {
ZOO_LOG_ERROR(("zookeeper_init() failed, reason=%s, zookeeper=%s",
strerror(errno), arcus->zk.ensemble_list));
rc= ARCUS_ERROR; break;
}
} while(0);
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif

if (rc != ARCUS_SUCCESS) {
return ARCUS_ERROR;
Expand All @@ -655,28 +682,46 @@ static inline arcus_return_t do_arcus_zk_connect(memcached_st *mc)
rc= ARCUS_ERROR; break;
}

#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
if (arcus->zk.conn_result != ARCUS_SUCCESS) {
ZOO_LOG_ERROR(("ZooKeeper connection failed, result=%d", arcus->zk.conn_result));
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
rc= ARCUS_ERROR; break;
}
if (do_arcus_cluster_validation_check(mc, arcus) < 0) {
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
rc= ARCUS_ERROR; break;
}
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif

if (do_add_client_info(arcus) < 0) {
rc= ARCUS_ERROR; break;
}
} while(0);

if (rc != ARCUS_SUCCESS) {
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
zookeeper_close(arcus->zk.handle);
arcus->zk.handle= NULL;
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
}
return rc;
}
Expand All @@ -691,15 +736,24 @@ static inline arcus_return_t do_arcus_zk_close(memcached_st *mc)

clear_count();

#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
do {
arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
if (not arcus) {
rc= ARCUS_ERROR; break;
}

#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_lock(&lock_arcus);
#endif
/* Delete the (expired) session. */
arcus->zk.myid.client_id= 0;
#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_unlock(&lock_arcus);
#endif

/* Clear connect result */
arcus->zk.conn_result= ARCUS_SUCCESS;
Expand All @@ -716,7 +770,10 @@ static inline arcus_return_t do_arcus_zk_close(memcached_st *mc)
}
}
} while(0);
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif

return rc;
}
Expand Down Expand Up @@ -771,9 +828,17 @@ void arcus_server_check_for_update(memcached_st *ptr)
#endif
{
/* master's cache list was changed, update member's cache list */
#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_lock(&lock_update_serverlist);
#else
pthread_mutex_lock(&lock_arcus);
#endif
(void)memcached_pool_update_member(arcus->pool, ptr);
#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_unlock(&lock_update_serverlist);
#else
pthread_mutex_unlock(&lock_arcus);
#endif
}
}
}
Expand Down Expand Up @@ -947,9 +1012,17 @@ static inline void do_arcus_zk_update_cachelist_by_string(memcached_st *mc,
}
}

#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_lock(&lock_update_serverlist);
#else
pthread_mutex_lock(&lock_arcus);
#endif
do_arcus_update_cachelist(mc, serverinfo, servercount);
#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_unlock(&lock_update_serverlist);
#else
pthread_mutex_unlock(&lock_arcus);
#endif

libmemcached_free(mc, serverinfo);
}
Expand Down Expand Up @@ -1035,7 +1108,10 @@ static inline void do_arcus_zk_update_cachelist(memcached_st *mc,
{
arcus_st *arcus;

#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
do {
arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
if (not arcus) {
Expand All @@ -1062,7 +1138,10 @@ static inline void do_arcus_zk_update_cachelist(memcached_st *mc,
libmemcached_free(mc, serverinfo);
}
} while(0);
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
}

static inline void do_arcus_zk_watch_and_update_cachelist(memcached_st *mc,
Expand Down Expand Up @@ -1114,9 +1193,15 @@ static inline void do_arcus_zk_watcher_cachelist(zhandle_t *zh __attribute__((un
ZOO_LOG_INFO(("ZOO_CHILD_EVENT from ZK cache list"));

memcached_st *mc= static_cast<memcached_st *>(ctx_mc);
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
arcus_st *arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
if (not arcus || not arcus->zk.handle) {
ZOO_LOG_ERROR(("arcus is null"));
return;
Expand Down Expand Up @@ -1147,11 +1232,17 @@ static inline void do_arcus_zk_watcher_global(zhandle_t *zh,
return;
}

#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
if (not arcus || not arcus->zk.handle) {
ZOO_LOG_ERROR(("arcus is null"));
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
return;
}

Expand All @@ -1160,6 +1251,9 @@ static inline void do_arcus_zk_watcher_global(zhandle_t *zh,
ZOO_LOG_WARN(("SESSION_STATE=CONNECTED, to %s", arcus->zk.ensemble_list));

const clientid_t *id= zoo_client_id(zh);
#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_lock(&lock_arcus);
#endif
if (arcus->zk.myid.client_id == 0 or arcus->zk.myid.client_id != id->client_id) {
ZOO_LOG_DEBUG(("Previous sessionid : 0x%llx", (long long) arcus->zk.myid.client_id));
arcus->zk.myid= *id;
Expand All @@ -1172,12 +1266,18 @@ static inline void do_arcus_zk_watcher_global(zhandle_t *zh,
else if (state == ZOO_CONNECTING_STATE or state == ZOO_ASSOCIATING_STATE)
{
ZOO_LOG_WARN(("SESSION_STATE=CONNECTING, to %s", arcus->zk.ensemble_list));
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
}
else if (state == ZOO_AUTH_FAILED_STATE)
{
arcus->zk.conn_result= ARCUS_AUTH_FAILED;
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
if (arcus->zk_mgr.running) {
ZOO_LOG_WARN(("SESSION_STATE=AUTH_FAILED, create a new client after closing failed one"));
pthread_mutex_lock(&arcus->zk_mgr.lock);
Expand All @@ -1192,7 +1292,10 @@ static inline void do_arcus_zk_watcher_global(zhandle_t *zh,
else if (state == ZOO_EXPIRED_SESSION_STATE)
{
ZOO_LOG_WARN(("SESSION_STATE=EXPIRED_SESSION, create a new client after closing expired one"));
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
if (arcus->zk_mgr.running) {
pthread_mutex_lock(&arcus->zk_mgr.lock);
arcus->zk_mgr.request.reconnect_process= true;
Expand All @@ -1211,7 +1314,10 @@ static inline arcus_return_t do_arcus_zk_manager_start(memcached_st *mc)
arcus_return_t rc= ARCUS_SUCCESS;

/* Start arcus zk manager thread */
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
do {
arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
if (not arcus || not arcus->zk.handle) {
Expand All @@ -1235,15 +1341,21 @@ static inline arcus_return_t do_arcus_zk_manager_start(memcached_st *mc)
do_arcus_zk_manager_wakeup(mc, true);
pthread_mutex_unlock(&arcus->zk_mgr.lock);
} while(0);
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif

if (rc != ARCUS_SUCCESS) {
return rc;
}

ZOO_LOG_WARN(("Waiting for the cache server list..."));

#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
if (arcus && arcus->zk.is_initializing) {
struct timeval now;
Expand All @@ -1259,7 +1371,10 @@ static inline arcus_return_t do_arcus_zk_manager_start(memcached_st *mc)
rc= ARCUS_ERROR;
}
}
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif

if (rc == ARCUS_SUCCESS) {
ZOO_LOG_WARN(("Done"));
Expand All @@ -1271,9 +1386,15 @@ static inline void do_arcus_zk_manager_stop(memcached_st *mc)
{
arcus_st *arcus;

#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
if (arcus && arcus->zk_mgr.running) {
ZOO_LOG_WARN(("Wait for the arcus zookeeper manager to stop"));
arcus->zk_mgr.reqstop= true;
Expand Down
1 change: 1 addition & 0 deletions libmemcached/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
#define POOL_UPDATE_SERVERLIST 1
#define POOL_MORE_CONCURRENCY 1
#define KETAMA_HASH_COLLSION 1
#define REMOVE_LOCK_ARCUS 1

/* Public defines */
#define MEMCACHED_DEFAULT_PORT 11211
Expand Down