@@ -147,6 +147,9 @@ typedef struct {
147147 char * mc_ipport ; // this memcached ip:port string
148148 char * hostip ; // localhost server IP
149149 int port ; // memcached port number
150+ #ifdef CONFIG_FAILSTOP
151+ bool mc_failstop ; // memcached automatic failstop on/off
152+ #endif
150153 int hb_failstop ; // memcached heartbeat failstop
151154 int hb_timeout ; // memcached heartbeat timeout
152155 int zk_timeout ; // Zookeeper session timeout
@@ -172,6 +175,9 @@ arcus_zk_conf arcus_conf = {
172175 .svc = NULL ,
173176 .mc_ipport = NULL ,
174177 .hostip = NULL ,
178+ #ifdef CONFIG_FAILSTOP
179+ .mc_failstop = true,
180+ #endif
175181 .hb_failstop = HEART_BEAT_DFT_FAILSTOP ,
176182 .hb_timeout = HEART_BEAT_DFT_TIMEOUT ,
177183 .zk_timeout = DEFAULT_ZK_TO ,
@@ -243,6 +249,10 @@ struct sm {
243249 volatile bool timer_running ;
244250#endif
245251
252+ #ifdef CONFIG_FAILSTOP
253+ volatile bool mc_pause ;
254+ #endif
255+
246256 pthread_t state_tid ;
247257#ifdef ENABLE_SUICIDE_UPON_DISCONNECT
248258 /* A timer thread to fail-stop the server
@@ -336,6 +346,35 @@ static int wait_count(int timeout)
336346 return rc ;
337347}
338348
349+ #ifdef CONFIG_FAILSTOP
350+ static int
351+ arcus_zk_client_init (zk_info_t * zinfo )
352+ {
353+ inc_count (1 );
354+ zinfo -> zh = zookeeper_init (zinfo -> ensemble_list , arcus_zk_watcher ,
355+ arcus_conf .zk_timeout , & zinfo -> myid , zinfo , 0 );
356+ if (!zinfo -> zh ) {
357+ arcus_conf .logger -> log (EXTENSION_LOG_WARNING , NULL ,
358+ "zookeeper_init() failed (%s error)\n" , strerror (errno ));
359+ return EX_PROTOCOL ;
360+ }
361+ /* wait until above init callback is called
362+ * We need to wait until ZOO_CONNECTED_STATE
363+ */
364+ if (wait_count (arcus_conf .zk_timeout ) != 0 ) {
365+ /* zoo_state(zh) != ZOO_CONNECTED_STATE */
366+ arcus_conf .logger -> log (EXTENSION_LOG_WARNING , NULL ,
367+ "cannot to be ZOO_CONNECTED_STATE\n" );
368+ zookeeper_close (zinfo -> zh );
369+ zinfo -> zh = NULL ;
370+ inc_count (-1 );
371+ return EX_PROTOCOL ;
372+ }
373+
374+ return 0 ;
375+ }
376+ #endif
377+
339378// Arcus zookeeper global watch callback routine
340379static void
341380arcus_zk_watcher (zhandle_t * wzh , int type , int state , const char * path , void * cxt )
@@ -371,6 +410,22 @@ arcus_zk_watcher(zhandle_t *wzh, int type, int state, const char *path, void *cx
371410 arcus_exit (wzh , EX_NOPERM );
372411 }
373412 else if (state == ZOO_EXPIRED_SESSION_STATE ) {
413+ #ifdef CONFIG_FAILSTOP
414+ if (arcus_conf .mc_failstop ) {
415+ // very likely that memcached process exited and restarted within
416+ // session timeout
417+ arcus_conf .logger -> log (EXTENSION_LOG_WARNING , NULL , "Expired state. shutting down\n" );
418+ // send SMS here??
419+ arcus_exit (wzh , EX_TEMPFAIL );
420+ } else {
421+ arcus_conf .logger -> log (EXTENSION_LOG_WARNING , NULL , "Expired state. pausing memcached (mc_failstop: off)\n" );
422+
423+ sm_lock ();
424+ sm_info .mc_pause = true;
425+ sm_wakeup (true);
426+ sm_unlock ();
427+ }
428+ #else
374429 // very likely that memcached process exited and restarted within
375430 // session timeout
376431 arcus_conf .logger -> log (EXTENSION_LOG_WARNING , NULL , "Expired state. shutting down\n" );
@@ -381,6 +436,7 @@ arcus_zk_watcher(zhandle_t *wzh, int type, int state, const char *path, void *cx
381436 * to avoid stale cache data access (rehashing already happened)
382437 */
383438 //arcus_zk_init(arcus_conf.ensemble_list, settings );
439+ #endif
384440 }
385441 else if (state == ZOO_ASSOCIATING_STATE || state == ZOO_CONNECTING_STATE ) {
386442 /* we get these when connection to Ensemble is dropped and retrying
@@ -1181,6 +1237,14 @@ void arcus_zk_init(char *ensemble_list, int zk_to,
11811237
11821238 gettimeofday (& start_time , 0 );
11831239
1240+ #ifdef CONFIG_FAILSTOP
1241+ rc = arcus_zk_client_init (zinfo );
1242+ if (rc != 0 ) {
1243+ arcus_conf .logger -> log (EXTENSION_LOG_WARNING , NULL ,
1244+ "Failed to initialize zk client\n" );
1245+ arcus_exit (NULL , rc );
1246+ }
1247+ #else
11841248 inc_count (1 );
11851249 zinfo -> zh = zookeeper_init (zinfo -> ensemble_list , arcus_zk_watcher ,
11861250 arcus_conf .zk_timeout , & zinfo -> myid , zinfo , 0 );
@@ -1198,6 +1262,7 @@ void arcus_zk_init(char *ensemble_list, int zk_to,
11981262 "cannot to be ZOO_CONNECTED_STATE\n" );
11991263 arcus_exit (zk_info .zh , EX_PROTOCOL );
12001264 }
1265+ #endif
12011266
12021267 /* setting main zk */
12031268 main_zk = zinfo ;
@@ -1434,6 +1499,136 @@ int arcus_zk_get_ensemble(char *buf, int size)
14341499 return ret ;
14351500}
14361501
1502+ #ifdef CONFIG_FAILSTOP
1503+ int arcus_zk_rejoin_ensemble ()
1504+ {
1505+ int ret = 0 ;
1506+
1507+ pthread_mutex_lock (& zk_lock );
1508+
1509+ if (main_zk -> zh != NULL ) {
1510+ pthread_mutex_unlock (& zk_lock );
1511+ arcus_conf .logger -> log (EXTENSION_LOG_WARNING , NULL ,
1512+ "Failed to rejoin ensemble. It's already a member of cloud.\n" );
1513+ return -1 ;
1514+ }
1515+
1516+ assert (sm_info .mc_pause == true);
1517+
1518+ do {
1519+ struct timeval start_time , end_time ;
1520+ long difftime_us ;
1521+
1522+ /* initialize Arcus ZK stats */
1523+ memset (& azk_stat , 0 , sizeof (azk_stat ));
1524+
1525+ memset (& main_zk -> myid , 0 , sizeof (clientid_t ));
1526+
1527+ assert (main_zk -> ensemble_list );
1528+
1529+ gettimeofday (& start_time , 0 );
1530+
1531+ ret = arcus_zk_client_init (main_zk );
1532+ if (ret != 0 ) {
1533+ arcus_conf .logger -> log (EXTENSION_LOG_WARNING , NULL ,
1534+ "Failed to initialize zk client.\n" );
1535+ break ;
1536+ }
1537+
1538+ /* check zk root directory and get the serice code */
1539+ if (zk_root == NULL ) {
1540+ zk_root = "/arcus" ; /* set zk root directory */
1541+ ret = arcus_check_server_mapping (main_zk -> zh , zk_root );
1542+ if (ret != 0 ) {
1543+ arcus_conf .logger -> log (EXTENSION_LOG_WARNING , NULL ,
1544+ "Failed to check server mapping for this cache node. "
1545+ "(zk_root=%s)\n" , zk_root );
1546+ }
1547+ }
1548+
1549+ /* create "/cache_list/{svc}/ip:port-hostname" ephemeral znode */
1550+ ret = arcus_create_ephemeral_znode (main_zk -> zh , zk_root );
1551+ if (ret != 0 ) {
1552+ arcus_conf .logger -> log (EXTENSION_LOG_WARNING , NULL ,
1553+ "arcus_create_ephemeral_znode() failed.\n" );
1554+ break ;
1555+ }
1556+
1557+ /* log this rejoin activity in /arcus/cache_server_log */
1558+ arcus_zk_log (main_zk -> zh , "rejoin" );
1559+
1560+ gettimeofday (& end_time , 0 );
1561+ difftime_us = (end_time .tv_sec * 1000000 + end_time .tv_usec ) -
1562+ (start_time .tv_sec * 1000000 + start_time .tv_usec );
1563+
1564+ /* We have finished registering this memcached instance to Arcus cluster */
1565+ arcus_conf .logger -> log (EXTENSION_LOG_INFO , NULL ,
1566+ "Memcached rejoined Arcus cache cloud for \"%s\" service "
1567+ "(took %ld microsec)\n" , arcus_conf .svc , difftime_us );
1568+
1569+ // "recv" timeout is actually the session timeout
1570+ // ZK client ping period is recv_timeout / 3.
1571+ arcus_conf .logger -> log (EXTENSION_LOG_INFO , NULL ,
1572+ "ZooKeeper session timeout: %d sec\n" , zoo_recv_timeout (main_zk -> zh )/1000 );
1573+
1574+ assert (arcus_conf .ch );
1575+
1576+ struct String_vector strv = { 0 , NULL };
1577+ /* 2nd argument, NULL means no watcher */
1578+ if (arcus_read_ZK_children (main_zk -> zh , arcus_conf .cluster_path , NULL , & strv ) <= 0 ) {
1579+ arcus_conf .logger -> log (EXTENSION_LOG_WARNING , NULL ,
1580+ "Failed to read cache list from ZK.\n" );
1581+ ret = -1 ; break ;
1582+ }
1583+
1584+ if (update_cluster_config (& strv ) != 0 ) {
1585+ arcus_conf .logger -> log (EXTENSION_LOG_WARNING , NULL ,
1586+ "Failed to update cluster config.\n" );
1587+ ret = -1 ; break ;
1588+ }
1589+
1590+ deallocate_String_vector (& strv );
1591+
1592+ arcus_conf .logger -> log (EXTENSION_LOG_WARNING , NULL ,
1593+ "Memcached is watching Arcus cache cloud for \"%s\"\n" , arcus_conf .cluster_path );
1594+
1595+
1596+ /* Wake up the state machine thread.
1597+ * Tell it to refresh the hash ring (cluster_config).
1598+ */
1599+ sm_lock ();
1600+ sm_info .mc_pause = false;
1601+ /* Don't care if we just read the list above. Do it again. */
1602+ sm_info .request .update_cache_list = true;
1603+ sm_wakeup (true);
1604+ sm_unlock ();
1605+ } while (0 );
1606+
1607+ if (ret != 0 ) {
1608+ if (main_zk -> zh ) {
1609+ zookeeper_close (main_zk -> zh );
1610+ main_zk -> zh = NULL ;
1611+ }
1612+ ret = -1 ;
1613+ }
1614+ pthread_mutex_unlock (& zk_lock );
1615+
1616+ return ret ;
1617+ }
1618+
1619+ void arcus_zk_set_mcfailstop (bool mcfailstop )
1620+ {
1621+ pthread_mutex_lock (& arcus_conf .lock );
1622+ arcus_conf .mc_failstop = mcfailstop ;
1623+ pthread_mutex_unlock (& arcus_conf .lock );
1624+ }
1625+
1626+ bool arcus_zk_get_mcfailstop (void )
1627+ {
1628+ return arcus_conf .mc_failstop ;
1629+ }
1630+ #endif
1631+
14371632int arcus_zk_set_hbtimeout (int hbtimeout )
14381633{
14391634 int ret = 0 ;
@@ -1535,6 +1730,9 @@ static void *sm_state_thread(void *arg)
15351730 bool sm_retry = false;
15361731 bool shutdown_by_me = false;
15371732
1733+ #ifdef CONFIG_FAILSTOP
1734+ sm_info .mc_pause = false;
1735+ #endif
15381736 sm_info .state_running = true;
15391737 while (!arcus_zk_shutdown )
15401738 {
@@ -1566,6 +1764,23 @@ static void *sm_state_thread(void *arg)
15661764 if (arcus_zk_shutdown )
15671765 break ;
15681766
1767+ #ifdef CONFIG_FAILSTOP
1768+ if (sm_info .mc_pause == true) {
1769+ bool paused = false;
1770+ pthread_mutex_lock (& zk_lock );
1771+ if (sm_info .mc_pause == true) {
1772+ if (main_zk -> zh != NULL ) {
1773+ zookeeper_close (main_zk -> zh );
1774+ main_zk -> zh = NULL ;
1775+ arcus_conf .logger -> log (EXTENSION_LOG_WARNING , NULL , "zk connection closed\n" );
1776+ }
1777+ paused = true;
1778+ }
1779+ pthread_mutex_unlock (& zk_lock );
1780+ if (paused ) continue ;
1781+ }
1782+ #endif
1783+
15691784 /* Read the latest hash ring */
15701785 if (smreq .update_cache_list ) {
15711786 struct String_vector strv_cache_list = {0 , NULL };
0 commit comments