Skip to content

Commit bfc296c

Browse files
authored
Support Heartbeat to a specific discover service server (#16)
* Support heartbeat to a specific service discover server * fix style
1 parent c274d56 commit bfc296c

File tree

5 files changed

+284
-4
lines changed

5 files changed

+284
-4
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
.idea
22
dist
33
build
4-
*.egg-info
4+
*.egg-info
5+
*.tar.gz

lmcache_frontend/app.py

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# SPDX-License-Identifier: Apache-2.0
2+
23
import argparse
34
import asyncio
45
import json
@@ -12,12 +13,17 @@
1213
from fastapi.staticfiles import StaticFiles
1314
from starlette.responses import FileResponse, PlainTextResponse
1415

16+
from .heartbeat import HeartbeatService
17+
1518
# Create router
1619
router = APIRouter()
1720

1821
# Global variable to store target nodes
1922
target_nodes = []
2023

24+
# Initialize heartbeat service with app context
25+
heartbeat_service: HeartbeatService = HeartbeatService()
26+
2127

2228
# Load configuration file
2329
def load_config(config_path=None):
@@ -253,6 +259,40 @@ async def health_check():
253259
return {"status": "healthy", "service": "lmcache-monitor"}
254260

255261

262+
@router.get("/api/heartbeat/status")
263+
async def get_heartbeat_status():
264+
"""Get heartbeat status"""
265+
return heartbeat_service.status()
266+
267+
268+
@router.post("/api/heartbeat/start")
269+
async def start_heartbeat_api(request: Request):
270+
"""Start heartbeat service"""
271+
try:
272+
data = await request.json()
273+
heartbeat_url = data.get("heartbeat_url")
274+
initial_delay = data.get("initial_delay", 0)
275+
interval = data.get("interval", 30)
276+
277+
if not heartbeat_url:
278+
raise HTTPException(status_code=400, detail="heartbeat_url is required")
279+
280+
heartbeat_service.start(heartbeat_url, initial_delay, interval)
281+
return {"status": "success", "message": "Heartbeat service started"}
282+
except Exception as e:
283+
raise HTTPException(status_code=500, detail=str(e))
284+
285+
286+
@router.post("/api/heartbeat/stop")
287+
async def stop_heartbeat_api():
288+
"""Stop heartbeat service"""
289+
try:
290+
heartbeat_service.stop()
291+
return {"status": "success", "message": "Heartbeat service stopped"}
292+
except Exception as e:
293+
raise HTTPException(status_code=500, detail=str(e))
294+
295+
256296
@router.get("/")
257297
async def serve_frontend():
258298
"""Return the frontend homepage"""
@@ -359,6 +399,24 @@ def main():
359399
help="Directly specify target nodes as a JSON string. "
360400
'Example: \'[{"name":"node1","host":"127.0.0.1","port":8001}]\'',
361401
)
402+
parser.add_argument(
403+
"--heartbeat-url",
404+
type=str,
405+
default=None,
406+
help="Heartbeat service URL, e.g.: http://example.com/heartbeat",
407+
)
408+
parser.add_argument(
409+
"--heartbeat-initial-delay",
410+
type=int,
411+
default=0,
412+
help="Initial delay before starting heartbeat (seconds), default 0",
413+
)
414+
parser.add_argument(
415+
"--heartbeat-interval",
416+
type=int,
417+
default=30,
418+
help="Heartbeat interval (seconds), default 30",
419+
)
362420

363421
args = parser.parse_args()
364422

@@ -385,7 +443,30 @@ def main():
385443
print(f"Monitoring service running at http://{args.host}:{args.port}")
386444
print(f"Node management: http://{args.host}:{args.port}/static/index.html")
387445

388-
uvicorn.run(app, host=args.host, port=args.port)
446+
# Start heartbeat service if URL is configured
447+
if args.heartbeat_url:
448+
# Set application configuration for heartbeat service
449+
heartbeat_service.set_app_config(args.host, args.port, target_nodes)
450+
451+
print("Starting heartbeat service...")
452+
print(f"Heartbeat URL: {args.heartbeat_url}")
453+
print(f"Initial delay: {args.heartbeat_initial_delay}s")
454+
print(f"Interval: {args.heartbeat_interval}s")
455+
print(f"API Address: http://{args.host}:{args.port}")
456+
print(f"Target nodes count: {len(target_nodes)}")
457+
458+
heartbeat_service.start(
459+
args.heartbeat_url, args.heartbeat_initial_delay, args.heartbeat_interval
460+
)
461+
else:
462+
print("Heartbeat URL not configured, heartbeat disabled")
463+
464+
try:
465+
uvicorn.run(app, host=args.host, port=args.port)
466+
finally:
467+
# Stop heartbeat service when app closes
468+
print("Shutting down application...")
469+
heartbeat_service.stop()
389470

390471

391472
if __name__ == "__main__":

lmcache_frontend/heartbeat.py

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
"""Heartbeat service module for periodic health reporting"""
3+
4+
import asyncio
5+
import os
6+
import socket
7+
import threading
8+
from datetime import datetime
9+
10+
import httpx
11+
12+
13+
class HeartbeatService:
14+
"""Periodic heartbeat service to report system status"""
15+
16+
def __init__(self):
17+
self.thread = None
18+
self.stop_event = threading.Event()
19+
self.startup_time = datetime.now()
20+
self.app_host = "0.0.0.0"
21+
self.app_port = 8000
22+
self.target_nodes = []
23+
24+
def get_local_ip(self):
25+
"""Get local IP address"""
26+
try:
27+
# First try to get IP by connecting to external address
28+
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
29+
s.connect(("8.8.8.8", 80))
30+
ip = s.getsockname()[0]
31+
if ip and ip != "127.0.0.1" and not ip.startswith("192.168."):
32+
return ip
33+
34+
# If above method fails, try getting IP from hostname
35+
hostname = socket.gethostname()
36+
ip = socket.gethostbyname(hostname)
37+
if ip and ip != "127.0.0.1" and not ip.startswith("192.168."):
38+
return ip
39+
40+
# Finally try to get IP from all network interfaces
41+
import re
42+
import subprocess
43+
44+
# Use ifconfig on macOS/Linux to get IP
45+
try:
46+
result = subprocess.run(
47+
["ifconfig"], capture_output=True, text=True, timeout=5
48+
)
49+
if result.returncode == 0:
50+
# Search for IP addresses
51+
ip_pattern = r"inet (\d+\.\d+\.\d+\.\d+)"
52+
matches = re.findall(ip_pattern, result.stdout)
53+
for match in matches:
54+
if match != "127.0.0.1" and not match.startswith("169.254."):
55+
return match
56+
except:
57+
pass
58+
59+
return "127.0.0.1"
60+
61+
except Exception as e:
62+
print(f"Error getting local IP: {str(e)}")
63+
return "127.0.0.1"
64+
65+
def set_app_config(self, host: str, port: int, target_nodes: list):
66+
"""Set application configuration for heartbeat reporting"""
67+
self.app_host = host
68+
self.app_port = port
69+
self.target_nodes = target_nodes
70+
71+
async def send_heartbeat(self, heartbeat_url: str):
72+
"""Send heartbeat request"""
73+
try:
74+
api_address = f"http://{self.get_local_ip()}:{self.app_port}"
75+
version = await self._get_version_from_nodes()
76+
if version:
77+
print(f"Got version from target nodes: {version}")
78+
79+
params = {
80+
"pid": os.getpid(),
81+
"api_address": api_address,
82+
"version": version or "1.0.0",
83+
"other_info": f"startup_time:{self.startup_time.isoformat()},target_nodes_count:{len(self.target_nodes)}",
84+
}
85+
86+
async with httpx.AsyncClient(timeout=10.0) as client:
87+
response = await client.get(heartbeat_url, params=params)
88+
response.raise_for_status()
89+
print(
90+
f"Heartbeat sent successfully: {heartbeat_url} - Status: {response.status_code}"
91+
)
92+
return True
93+
except Exception as e:
94+
print(f"Heartbeat send failed: {heartbeat_url} - Error: {str(e)}")
95+
return False
96+
97+
async def _get_version_from_nodes(self):
98+
"""Get version from target nodes"""
99+
if not self.target_nodes:
100+
return None
101+
102+
for node in self.target_nodes:
103+
try:
104+
async with httpx.AsyncClient(timeout=5.0) as client:
105+
response = await client.get(
106+
f"http://localhost:{self.app_port}/proxy2/{node['name']}/version"
107+
)
108+
109+
if response.status_code == 200 and response.content:
110+
content = response.content.decode("utf-8").strip()
111+
# Try to remove surrounding quotes
112+
if (content.startswith('"') and content.endswith('"')) or (
113+
content.startswith("'") and content.endswith("'")
114+
):
115+
content = content[1:-1]
116+
return content
117+
118+
except Exception as e:
119+
print(f"Failed to get version from node {node['name']}: {str(e)}")
120+
continue
121+
122+
return None
123+
124+
def worker(self, heartbeat_url: str, initial_delay: int, interval: int):
125+
"""Heartbeat background thread worker function"""
126+
local_ip = self.get_local_ip()
127+
print(
128+
f"Heartbeat thread started - Local IP: {local_ip}, Service URL: {heartbeat_url}"
129+
)
130+
print(f"Initial delay: {initial_delay}s, Interval: {interval}s")
131+
132+
if initial_delay > 0:
133+
print(f"Waiting initial delay {initial_delay}s...")
134+
if self.stop_event.wait(initial_delay):
135+
print("Heartbeat thread stopped during initial delay")
136+
return
137+
138+
loop = asyncio.new_event_loop()
139+
asyncio.set_event_loop(loop)
140+
141+
try:
142+
while not self.stop_event.is_set():
143+
try:
144+
loop.run_until_complete(self.send_heartbeat(heartbeat_url))
145+
except Exception as e:
146+
print(f"Heartbeat send exception: {str(e)}")
147+
148+
if self.stop_event.wait(interval):
149+
break
150+
finally:
151+
loop.close()
152+
print("Heartbeat thread stopped")
153+
154+
def start(self, heartbeat_url: str, initial_delay: int = 0, interval: int = 30):
155+
"""Start heartbeat thread"""
156+
if self.thread and self.thread.is_alive():
157+
print("Heartbeat thread is already running")
158+
return
159+
160+
self.stop_event.clear()
161+
self.thread = threading.Thread(
162+
target=self.worker,
163+
args=(heartbeat_url, initial_delay, interval),
164+
daemon=True,
165+
)
166+
self.thread.start()
167+
print("Heartbeat thread started")
168+
169+
def stop(self):
170+
"""Stop heartbeat thread"""
171+
if self.thread and self.thread.is_alive():
172+
print("Stopping heartbeat thread...")
173+
self.stop_event.set()
174+
self.thread.join(timeout=5)
175+
if self.thread.is_alive():
176+
print("Warning: Heartbeat thread didn't stop within 5 seconds")
177+
else:
178+
print("Heartbeat thread stopped successfully")
179+
else:
180+
print("Heartbeat thread is not running")
181+
182+
def status(self):
183+
"""Get current heartbeat status"""
184+
is_running = self.thread and self.thread.is_alive()
185+
return {
186+
"running": is_running,
187+
"local_ip": self.get_local_ip(),
188+
"startup_time": self.startup_time.isoformat(),
189+
"current_time": datetime.now().isoformat(),
190+
}

lmcache_frontend/lmcache_plugin/scheduler_lmc_frontend_plugin.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,15 @@ def handle_exit(signum, frame):
4646
sys.argv = [
4747
sys.argv[0],
4848
"--port",
49-
str(int(port) if port else 8000),
49+
str(int(port)) if port is not None else "8000",
5050
"--host",
5151
"0.0.0.0",
5252
]
53+
54+
for key, value in config.extra_config.items():
55+
if key.startswith("plugin.frontend."):
56+
arg_name = "--" + key.replace("plugin.frontend.", "")
57+
sys.argv.extend([arg_name, str(value)])
5358
if nodes:
5459
if isinstance(nodes, (list, dict)):
5560
nodes = json.dumps(nodes)

requirements/build.txt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,7 @@ packaging>=24.2
22
setuptools>=77.0.3,<81.0.0
33
setuptools_scm>=8
44
wheel
5-
5+
fastapi==0.104.1
6+
uvicorn[standard]==0.24.0
7+
httpx==0.25.2
8+
starlette==0.27.0

0 commit comments

Comments
 (0)