24
24
import logging
25
25
import os
26
26
import random
27
+ import shlex
27
28
import signal
28
29
import socket
29
30
import subprocess
@@ -62,13 +63,27 @@ def ip_in_use(ip, port):
62
63
s .close ()
63
64
return True
64
65
65
- except socket .error as e :
66
+ except socket .error :
66
67
pass
67
- except Exception as e :
68
+ except Exception :
68
69
pass
69
70
70
71
return False
71
-
72
+
73
+ def run_cmd (cmd , get_result = False ):
74
+ result = ""
75
+ try :
76
+ out = None
77
+ if get_result :
78
+ out = subprocess .PIPE
79
+ re = subprocess .run (shlex .split (cmd ), stdout = out , stderr = out ,
80
+ universal_newlines = True , check = True )
81
+ result = str (re .stdout )
82
+
83
+ except subprocess .CalledProcessError :
84
+ pass
85
+ return result
86
+
72
87
class Config ():
73
88
# Init default.
74
89
def __init__ (self ):
@@ -95,8 +110,8 @@ def get_members(self):
95
110
cmd_list = "%s member list" % (etcd_cmd )
96
111
members = ""
97
112
try :
98
- members = subprocess . check_output (cmd_list , shell = False )
99
- except subprocess .CalledProcessError as e :
113
+ members = run_cmd (cmd_list , get_result = True )
114
+ except subprocess .CalledProcessError :
100
115
pass
101
116
return members
102
117
@@ -105,25 +120,23 @@ def display_status(self):
105
120
106
121
os .environ ["ETCDCTL_API" ] = "3"
107
122
etcd_cmd = '%s/etcdctl --endpoints="%s"' % (os .getcwd (), self .cluster_endpoints )
108
- cmd_list = "%s member list 2>&1 | cat" % (etcd_cmd )
109
- cmd_status = "%s endpoint status 2>&1 | cat" % (etcd_cmd )
110
- cmd_health = "%s endpoint health 2>&1 | cat" % (etcd_cmd )
111
-
112
- out = etcd_cmd
113
- out += "\n \n ===== member list\n " + subprocess .check_output (cmd_list , shell = False )
114
- print (out )
115
- out = "===== endpoint status\n " + subprocess .check_output (cmd_status , shell = False )
116
- print (out )
117
- out = "===== endpoint health\n " + subprocess .check_output (cmd_health , shell = False )
118
- print (out )
123
+ cmd_list = "%s member list" % (etcd_cmd )
124
+ cmd_status = "%s endpoint status" % (etcd_cmd )
125
+ cmd_health = "%s endpoint health" % (etcd_cmd )
126
+
127
+ print (etcd_cmd + "\n \n ===== member list\n " )
128
+ run_cmd (cmd_list )
129
+
130
+ print ("\n ===== endpoint status\n " )
131
+ run_cmd (cmd_status )
132
+
133
+ print ("\n ===== endpoint health\n " )
134
+ run_cmd (cmd_health )
119
135
120
136
# Join an existing cluster.
121
137
def join_cluster (self ):
122
138
123
139
etcd_cmd = '%s/etcdctl --endpoints="%s"' % (os .getcwd (), self .cluster_endpoints )
124
- cmd_select = "%s member list | grep ', %s, http' | awk -F',' '{print $1}'" % (
125
- etcd_cmd , self .etcd_name
126
- )
127
140
128
141
cmd_add = "%s member add %s --peer-urls=%s" % (
129
142
etcd_cmd , self .etcd_name , self .peer_url
@@ -133,41 +146,48 @@ def join_cluster(self):
133
146
134
147
ok = True
135
148
err = None
136
- resp = ">> Members:\n "
137
149
try :
138
150
os .environ ["ETCDCTL_API" ] = "3"
139
- resp + = self .get_members ()
151
+ text = self .get_members ()
140
152
141
153
hexid = ""
142
-
154
+ resp = ">> Members:\n " + text
155
+ print (resp )
156
+
143
157
# Remove the current entry if any
144
- resp += "\n >> Select:\n %s\n \n " % (cmd_select )
145
- hexid = subprocess .check_output (cmd_select , shell = False )
158
+ lines = text .split ("\n " )
159
+ for li in lines :
160
+ tokens = li .split (", " )
161
+ if len (tokens ) > 3 and self .etcd_name == tokens [2 ]:
162
+ hexid = tokens [0 ]
163
+ break
146
164
147
165
if len (hexid ) > 0 :
148
- cmd_remove = "%s member remove %s" % (etcd_cmd , hexid )
149
- resp += "\n >> Remove:\n %s\n \n " % (cmd_remove )
150
-
151
- resp += subprocess .check_output (cmd_remove , stderr = subprocess .STDOUT , shell = False )
166
+ cmd_remove = "%s member remove %s\n \n " % (etcd_cmd , hexid )
167
+ print ("\n >> Remove:\n %s" % (cmd_remove ))
168
+ resp += cmd_remove
169
+
170
+ run_cmd (cmd_remove )
152
171
sleep (5 )
153
172
154
173
# Add a new entry
155
- resp += "\n >> Add:\n %s\n \n " % (cmd_add )
156
-
157
- resp += subprocess .check_output (cmd_add , stderr = subprocess .STDOUT , shell = False )
158
-
159
- resp += "\n >> Members:\n "
160
- resp += self .get_members ()
161
- resp += "\n "
162
-
163
- resp += cmd_rm
164
- resp += "\n "
174
+ msg = "\n >> Add:\n %s\n \n " % (cmd_add )
175
+ print (msg )
176
+ resp += msg
177
+
178
+ run_cmd (cmd_add )
179
+ msg = "\n >> Members:\n " + self .get_members ()
180
+ print (msg )
181
+ resp += msg
182
+
183
+ msg = "\n " + cmd_rm + "\n "
184
+ print (msg )
185
+ resp += msg
165
186
166
187
except subprocess .CalledProcessError as e :
167
188
err = e .output
168
189
ok = False
169
190
170
- print (resp )
171
191
with open ("join.log" , "w+" ) as f :
172
192
f .write (resp )
173
193
@@ -189,7 +209,6 @@ def add_json_cfg(self):
189
209
h ["advertise-client-urls" ] = client_url
190
210
h ["initial-advertise-peer-urls" ] = self .peer_url
191
211
192
- dir = self .etcd_name + ".etcd"
193
212
if self .is_existing_cluster :
194
213
# Join an existing cluster
195
214
h ["initial-cluster-state" ] = "existing"
@@ -339,23 +358,22 @@ def sig_handler(self, sig, frame):
339
358
def is_endpoint_healthy (self , wait_time ):
340
359
os .environ ["ETCDCTL_API" ] = "3"
341
360
etcd_cmd = '%s/etcdctl --endpoints="%s"' % (os .getcwd (), self .local_endpoint )
342
- cmd_health = "%s endpoint health 2>&1 | cat " % (etcd_cmd )
361
+ cmd_health = "%s endpoint health" % (etcd_cmd )
343
362
result = ""
344
363
345
364
now = int (time ())
346
- for i in range (50 ):
347
- sleep (2 )
365
+ for i in range (10 ):
366
+ sleep (5 )
348
367
t = int (time ()) - now
349
368
if t > wait_time :
350
369
break
351
370
352
- if t > 60 :
353
- msg = "unhealthy_%s" % (self .etcd_name )
371
+ if t > 50 :
372
+ msg = "unhealthy_%s retry ... " % (self .etcd_name )
354
373
self .logger .error ("[MANAGER] %s" % (msg ))
355
374
356
- result = subprocess .check_output (cmd_health , shell = False )
357
- health_check_bytes = str .encode ("is healthy" )
358
- if health_check_bytes in result :
375
+ result = run_cmd (cmd_health , get_result = True )
376
+ if "is health" in result :
359
377
return True
360
378
361
379
self .logger .info ("[MANAGER] %s" % (result ))
@@ -418,8 +436,8 @@ def watch_and_recycle(self, cfg):
418
436
print (" " )
419
437
self .logger .info ("[MANAGER] Started etcd process %d" % (self .pid ))
420
438
421
- wait_time = 85 + random .randint (0 ,10 )
422
- while False :
439
+ wait_time = 60 + random .randint (0 ,10 )
440
+ while False : #not self.is_endpoint_healthy(wait_time):
423
441
424
442
if restartCount > 0 :
425
443
self .shutdown_server ()
@@ -436,6 +454,8 @@ def watch_and_recycle(self, cfg):
436
454
self .shutdown (- 1 ) # exit
437
455
438
456
print ("Starting etcd process %d succeeded." % (self .pid ))
457
+ if os .path .exists ("join.log" ):
458
+ os .remove ("join.log" )
439
459
self .server .wait ()
440
460
441
461
# etcd server has exited.
@@ -469,7 +489,7 @@ def watch_and_recycle(self, cfg):
469
489
470
490
with open ("./etcdsvr.pid" , "w" ) as f :
471
491
f .write ("%d\n " % (os .getpid ()))
472
-
492
+
473
493
cfg = Config ()
474
494
err = cfg .parse_cfg (False )
475
495
if err :
0 commit comments