Skip to content

Commit c7a62b4

Browse files
authored
Merge pull request #172 from art2ip/juno_24_0108
2 parents 72a9a25 + 16098b1 commit c7a62b4

File tree

1 file changed

+71
-51
lines changed

1 file changed

+71
-51
lines changed

cmd/etcdsvr/etcdsvr.py

+71-51
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import logging
2525
import os
2626
import random
27+
import shlex
2728
import signal
2829
import socket
2930
import subprocess
@@ -62,13 +63,27 @@ def ip_in_use(ip, port):
6263
s.close()
6364
return True
6465

65-
except socket.error as e:
66+
except socket.error:
6667
pass
67-
except Exception as e:
68+
except Exception:
6869
pass
6970

7071
return False
71-
72+
73+
def run_cmd(cmd, get_result=False):
74+
result = ""
75+
try:
76+
out = None
77+
if get_result:
78+
out = subprocess.PIPE
79+
re = subprocess.run(shlex.split(cmd), stdout=out, stderr=out,
80+
universal_newlines=True, check=True)
81+
result = str(re.stdout)
82+
83+
except subprocess.CalledProcessError:
84+
pass
85+
return result
86+
7287
class Config():
7388
# Init default.
7489
def __init__(self):
@@ -95,8 +110,8 @@ def get_members(self):
95110
cmd_list = "%s member list" % (etcd_cmd)
96111
members = ""
97112
try:
98-
members = subprocess.check_output(cmd_list, shell=False)
99-
except subprocess.CalledProcessError as e:
113+
members = run_cmd(cmd_list, get_result=True)
114+
except subprocess.CalledProcessError:
100115
pass
101116
return members
102117

@@ -105,25 +120,23 @@ def display_status(self):
105120

106121
os.environ["ETCDCTL_API"] = "3"
107122
etcd_cmd = '%s/etcdctl --endpoints="%s"' % (os.getcwd(), self.cluster_endpoints)
108-
cmd_list = "%s member list 2>&1 | cat" % (etcd_cmd)
109-
cmd_status = "%s endpoint status 2>&1 | cat" % (etcd_cmd)
110-
cmd_health = "%s endpoint health 2>&1 | cat" % (etcd_cmd)
111-
112-
out = etcd_cmd
113-
out += "\n\n===== member list\n" + subprocess.check_output(cmd_list, shell=False)
114-
print(out)
115-
out = "===== endpoint status\n" + subprocess.check_output(cmd_status, shell=False)
116-
print(out)
117-
out = "===== endpoint health\n" + subprocess.check_output(cmd_health, shell=False)
118-
print(out)
123+
cmd_list = "%s member list" % (etcd_cmd)
124+
cmd_status = "%s endpoint status" % (etcd_cmd)
125+
cmd_health = "%s endpoint health" % (etcd_cmd)
126+
127+
print(etcd_cmd + "\n\n===== member list\n")
128+
run_cmd(cmd_list)
129+
130+
print("\n===== endpoint status\n")
131+
run_cmd(cmd_status)
132+
133+
print("\n===== endpoint health\n")
134+
run_cmd(cmd_health)
119135

120136
# Join an existing cluster.
121137
def join_cluster(self):
122138

123139
etcd_cmd = '%s/etcdctl --endpoints="%s"' % (os.getcwd(), self.cluster_endpoints)
124-
cmd_select = "%s member list | grep ', %s, http' | awk -F',' '{print $1}'" % (
125-
etcd_cmd, self.etcd_name
126-
)
127140

128141
cmd_add = "%s member add %s --peer-urls=%s" % (
129142
etcd_cmd, self.etcd_name, self.peer_url
@@ -133,41 +146,48 @@ def join_cluster(self):
133146

134147
ok = True
135148
err = None
136-
resp = ">> Members:\n"
137149
try:
138150
os.environ["ETCDCTL_API"] = "3"
139-
resp += self.get_members()
151+
text = self.get_members()
140152

141153
hexid = ""
142-
154+
resp = ">> Members:\n" + text
155+
print(resp)
156+
143157
# Remove the current entry if any
144-
resp += "\n>> Select:\n%s\n\n" % (cmd_select)
145-
hexid = subprocess.check_output(cmd_select, shell=False)
158+
lines = text.split("\n")
159+
for li in lines:
160+
tokens = li.split(", ")
161+
if len(tokens) > 3 and self.etcd_name == tokens[2]:
162+
hexid = tokens[0]
163+
break
146164

147165
if len(hexid) > 0:
148-
cmd_remove = "%s member remove %s" % (etcd_cmd, hexid)
149-
resp += "\n>> Remove:\n%s\n\n" % (cmd_remove)
150-
151-
resp += subprocess.check_output(cmd_remove, stderr=subprocess.STDOUT, shell=False)
166+
cmd_remove = "%s member remove %s\n\n" % (etcd_cmd, hexid)
167+
print("\n>> Remove:\n%s" % (cmd_remove))
168+
resp += cmd_remove
169+
170+
run_cmd(cmd_remove)
152171
sleep(5)
153172

154173
# Add a new entry
155-
resp += "\n>> Add:\n%s\n\n" % (cmd_add)
156-
157-
resp += subprocess.check_output(cmd_add, stderr=subprocess.STDOUT, shell=False)
158-
159-
resp += "\n>> Members:\n"
160-
resp += self.get_members()
161-
resp += "\n"
162-
163-
resp += cmd_rm
164-
resp += "\n"
174+
msg = "\n>> Add:\n%s\n\n" % (cmd_add)
175+
print(msg)
176+
resp += msg
177+
178+
run_cmd(cmd_add)
179+
msg = "\n>> Members:\n" + self.get_members()
180+
print(msg)
181+
resp += msg
182+
183+
msg = "\n" + cmd_rm + "\n"
184+
print(msg)
185+
resp += msg
165186

166187
except subprocess.CalledProcessError as e:
167188
err = e.output
168189
ok = False
169190

170-
print(resp)
171191
with open("join.log", "w+") as f:
172192
f.write(resp)
173193

@@ -189,7 +209,6 @@ def add_json_cfg(self):
189209
h["advertise-client-urls"] = client_url
190210
h["initial-advertise-peer-urls"] = self.peer_url
191211

192-
dir = self.etcd_name + ".etcd"
193212
if self.is_existing_cluster:
194213
# Join an existing cluster
195214
h["initial-cluster-state"] = "existing"
@@ -339,23 +358,22 @@ def sig_handler(self, sig, frame):
339358
def is_endpoint_healthy(self, wait_time):
340359
os.environ["ETCDCTL_API"] = "3"
341360
etcd_cmd = '%s/etcdctl --endpoints="%s"' % (os.getcwd(), self.local_endpoint)
342-
cmd_health = "%s endpoint health 2>&1 | cat" % (etcd_cmd)
361+
cmd_health = "%s endpoint health" % (etcd_cmd)
343362
result = ""
344363

345364
now = int(time())
346-
for i in range(50):
347-
sleep(2)
365+
for i in range(10):
366+
sleep(5)
348367
t = int(time()) - now
349368
if t > wait_time:
350369
break
351370

352-
if t > 60:
353-
msg = "unhealthy_%s" % (self.etcd_name)
371+
if t > 50:
372+
msg = "unhealthy_%s retry ..." % (self.etcd_name)
354373
self.logger.error("[MANAGER] %s" % (msg))
355374

356-
result = subprocess.check_output(cmd_health, shell=False)
357-
health_check_bytes = str.encode("is healthy")
358-
if health_check_bytes in result:
375+
result = run_cmd(cmd_health, get_result=True)
376+
if "is health" in result:
359377
return True
360378

361379
self.logger.info("[MANAGER] %s" % (result))
@@ -418,8 +436,8 @@ def watch_and_recycle(self, cfg):
418436
print(" ")
419437
self.logger.info("[MANAGER] Started etcd process %d" % (self.pid))
420438

421-
wait_time = 85 + random.randint(0,10)
422-
while False:
439+
wait_time = 60 + random.randint(0,10)
440+
while False: #not self.is_endpoint_healthy(wait_time):
423441

424442
if restartCount > 0:
425443
self.shutdown_server()
@@ -436,6 +454,8 @@ def watch_and_recycle(self, cfg):
436454
self.shutdown(-1) # exit
437455

438456
print("Starting etcd process %d succeeded." % (self.pid))
457+
if os.path.exists("join.log"):
458+
os.remove("join.log")
439459
self.server.wait()
440460

441461
# etcd server has exited.
@@ -469,7 +489,7 @@ def watch_and_recycle(self, cfg):
469489

470490
with open("./etcdsvr.pid", "w") as f:
471491
f.write("%d\n" % (os.getpid()))
472-
492+
473493
cfg = Config()
474494
err = cfg.parse_cfg(False)
475495
if err:

0 commit comments

Comments
 (0)