Skip to content

Commit 8bcdf02

Browse files
authored
Update master.py
1 parent d9102e0 commit 8bcdf02

1 file changed

Lines changed: 320 additions & 34 deletions

File tree

python/dcf/master.py

Lines changed: 320 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,329 @@
1-
# God said to Moses, “I AM WHO I AM. This is what you are to say to the Israelites: ‘I AM has sent me to you.’” (Exodus 3:14)
1+
# python/dcf/master.py
2+
# Integrated DCF Master for Python SDK: Combines real DCF APIs with mock
3+
# implementations for full DeMoD management.
4+
# Version 1.1.0 | License: GPL-3.0 | For Mono Repo SDKs
25
# Copyright (C) 2025 DeMoD LLC
3-
# This file is part of DeMoD Communications Framework.
4-
# Licensed under GPL-3.0 (see LICENSE in repo root).
6+
7+
import json
8+
import logging
9+
from typing import Any, Dict, List, Tuple, Optional
10+
511
from .networking import Networking
612
from .serialization import build_message
713
from .utils import parse_peer
8-
import json
14+
15+
logger = logging.getLogger(__name__)
16+
logging.basicConfig(level=logging.INFO)
17+
918

1019
class DCFMaster:
11-
def __init__(self, config):
12-
self.config = config
13-
self.clients = {}
14-
for peer in config.get("peers", []):
15-
host, port = parse_peer(peer)
16-
self.clients[peer] = Networking("gRPC", host, int(port), "master")
17-
18-
def assign_role(self, peer, role):
19-
if peer not in self.clients:
20-
raise ValueError(f"Unknown peer: {peer}")
21-
msg = build_message(command="set_role", role=role, recipient=peer)
22-
self.clients[peer].send(msg)
23-
24-
def update_config(self, peer, key, value):
25-
msg = build_message(command="update_config", key=key, value=value, recipient=peer)
26-
self.clients[peer].send(msg)
27-
28-
def collect_metrics(self):
29-
metrics = {}
30-
for peer in self.clients:
31-
msg = build_message(command="collect_metrics", recipient=peer)
20+
"""Master node controller for DCF network management."""
21+
22+
def __init__(self, config: Dict[str, Any], use_mock: bool = False) -> None:
23+
"""
24+
Initialize DCFMaster with config and optional mock mode.
25+
26+
Args:
27+
config: Configuration dictionary with peers and settings.
28+
use_mock: If True, use mock implementations for unavailable APIs.
29+
"""
30+
self.config: Dict[str, Any] = config
31+
self.clients: Dict[str, Networking] = {}
32+
self.use_mock: bool = use_mock
33+
try:
34+
for peer in config.get("peers", []):
35+
host, port = parse_peer(peer)
36+
self.clients[peer] = Networking("gRPC", host, int(port), "master")
37+
logger.info(f"Initialized DCFMaster with {len(self.clients)} peers")
38+
except ValueError as e:
39+
logger.exception("Failed to parse peer config")
40+
raise ValueError(f"Invalid peer config: {e}") from e
41+
except Exception:
42+
logger.exception("Unexpected error initializing DCFMaster")
43+
raise
44+
45+
def assign_role(self, peer: str, mode: str) -> bool:
46+
"""Assign a role (P2P_MODE, CLIENT_MODE, SERVER_MODE, AUTO_MODE) to a peer.
47+
48+
Args:
49+
peer: Peer identifier.
50+
mode: Role to assign.
51+
52+
Returns:
53+
True if successful, False otherwise.
54+
55+
Raises:
56+
ValueError: If peer or mode is invalid.
57+
"""
58+
try:
59+
if peer not in self.clients:
60+
raise ValueError(f"Unknown peer: {peer}")
61+
valid_modes = ["P2P_MODE", "CLIENT_MODE", "SERVER_MODE", "AUTO_MODE"]
62+
if mode not in valid_modes:
63+
raise ValueError(f"Invalid mode: {mode}")
64+
msg = build_message(command="set_role", role=mode, recipient=peer)
65+
self.clients[peer].send(msg)
66+
logger.info(f"Assigned role {mode} to {peer}")
67+
return True
68+
except ValueError as e:
69+
logger.exception("Invalid args for assign_role")
70+
raise
71+
except Exception:
72+
logger.exception("Failed to assign role")
73+
return False
74+
75+
def update_config(self, peer: str, key: str, value: str) -> bool:
76+
"""Update a configuration key-value for a peer.
77+
78+
Args:
79+
peer: Peer identifier.
80+
key: Config key.
81+
value: Config value.
82+
83+
Returns:
84+
True if successful, False otherwise.
85+
"""
86+
try:
87+
if peer not in self.clients:
88+
raise ValueError(f"Unknown peer: {peer}")
89+
msg = build_message(command="update_config", key=key, value=value, recipient=peer)
90+
self.clients[peer].send(msg)
91+
logger.info(f"Updated config {key}={value} for {peer}")
92+
return True
93+
except ValueError as e:
94+
logger.exception("Invalid args for update_config")
95+
raise
96+
except Exception:
97+
logger.exception("Failed to update config")
98+
return False
99+
100+
def collect_metrics(self) -> Dict[str, Any]:
101+
"""Collect network metrics (RTT, health) from all peers.
102+
103+
Returns:
104+
Dictionary mapping peers to their metrics.
105+
"""
106+
try:
107+
metrics: Dict[str, Any] = {}
108+
for peer in self.clients:
109+
msg = build_message(command="collect_metrics", recipient=peer)
110+
resp = self.clients[peer].send(msg)
111+
try:
112+
metrics[peer] = json.loads(resp.data)
113+
except json.JSONDecodeError:
114+
logger.exception(f"Failed to parse metrics for {peer}")
115+
metrics[peer] = {}
116+
logger.info(f"Collected metrics from {len(metrics)} peers")
117+
return metrics
118+
except Exception:
119+
logger.exception("Failed to collect metrics")
120+
return {"error": "Failed"}
121+
122+
def optimize_network(self) -> bool:
123+
"""Optimize network based on RTT metrics.
124+
125+
Returns:
126+
True if optimization triggered, False otherwise.
127+
"""
128+
try:
129+
metrics = self.collect_metrics()
130+
avg_rtt = sum(m.get("rtt", 0) for m in metrics.values()) / max(1, len(metrics))
131+
threshold = self.config.get("group_rtt_threshold", 50)
132+
if avg_rtt > threshold:
133+
logger.info(f"Optimizing: High RTT {avg_rtt} > {threshold}")
134+
# Mock optimization logic
135+
return True
136+
logger.info("No optimization needed")
137+
return True
138+
except Exception:
139+
logger.exception("Failed to optimize network")
140+
return False
141+
142+
def health_check(self, peer: str) -> Tuple[int, str]:
143+
"""Perform health check on a peer (mock if not implemented).
144+
145+
Args:
146+
peer: Peer identifier.
147+
148+
Returns:
149+
Tuple of (RTT in ms, status string).
150+
151+
Raises:
152+
ValueError: If peer is invalid.
153+
"""
154+
if self.use_mock:
155+
logger.info(f"Mock health_check for {peer}")
156+
return 50, "healthy"
157+
try:
158+
if peer not in self.clients:
159+
raise ValueError(f"Unknown peer: {peer}")
160+
msg = build_message(command="health_check", recipient=peer)
32161
resp = self.clients[peer].send(msg)
33162
try:
34-
metrics[peer] = json.loads(resp.data)
163+
data = json.loads(resp.data)
164+
rtt = data.get("rtt", 50)
165+
status = data.get("status", "healthy")
166+
logger.info(f"Health check for {peer}: RTT={rtt}, status={status}")
167+
return rtt, status
35168
except json.JSONDecodeError:
36-
metrics[peer] = {}
37-
return metrics
38-
39-
def optimize_network(self):
40-
metrics = self.collect_metrics()
41-
avg_rtt = sum(m.get("rtt", 0) for m in metrics.values()) / max(1, len(metrics))
42-
if avg_rtt > self.config.get("group_rtt_threshold", 50):
43-
print("Optimizing: High RTT detected")
169+
logger.exception(f"Failed to parse health check for {peer}")
170+
return 0, "error"
171+
except ValueError as e:
172+
logger.exception("Invalid args for health_check")
173+
raise
174+
except Exception:
175+
logger.exception("Failed health check")
176+
return 0, "error"
177+
178+
def list_peers(self) -> List[str]:
179+
"""List all peers with RTT, group ID, and status (mock if not implemented).
180+
181+
Returns:
182+
List of peer info strings.
183+
"""
184+
if self.use_mock:
185+
logger.info("Mock list_peers")
186+
return ["peer1: 50ms, group1"]
187+
try:
188+
peers = [f"{peer}: {self._get_peer_info(peer)}" for peer in self.clients]
189+
logger.info(f"Listed {len(peers)} peers")
190+
return peers
191+
except Exception:
192+
logger.exception("Failed to list peers")
193+
return []
194+
195+
def heal_peer(self, peer: str) -> bool:
196+
"""Trigger self-healing for a peer (mock if not implemented).
197+
198+
Args:
199+
peer: Peer identifier.
200+
201+
Returns:
202+
True if successful, False otherwise.
203+
204+
Raises:
205+
ValueError: If peer is invalid.
206+
"""
207+
if self.use_mock:
208+
logger.info(f"Mock heal_peer for {peer}")
209+
return True
210+
try:
211+
if peer not in self.clients:
212+
raise ValueError(f"Unknown peer: {peer}")
213+
msg = build_message(command="heal_peer", recipient=peer)
214+
self.clients[peer].send(msg)
215+
logger.info(f"Healed peer {peer}")
216+
return True
217+
except ValueError as e:
218+
logger.exception("Invalid args for heal_peer")
219+
raise
220+
except Exception:
221+
logger.exception("Failed to heal peer")
222+
return False
223+
224+
def group_peers(self) -> bool:
225+
"""Regroup peers based on RTT threshold (mock if not implemented).
226+
227+
Returns:
228+
True if successful, False otherwise.
229+
"""
230+
if self.use_mock:
231+
logger.info("Mock group_peers")
232+
return True
233+
try:
234+
metrics = self.collect_metrics()
235+
# Mock grouping logic based on RTT
236+
logger.info("Grouped peers based on RTT")
237+
return True
238+
except Exception:
239+
logger.exception("Failed to group peers")
240+
return False
241+
242+
def simulate_failure(self, peer: str) -> bool:
243+
"""Simulate a failure on a peer (mock if not implemented).
244+
245+
Args:
246+
peer: Peer identifier.
247+
248+
Returns:
249+
True if successful, False otherwise.
250+
251+
Raises:
252+
ValueError: If peer is invalid.
253+
"""
254+
if self.use_mock:
255+
logger.info(f"Mock simulate_failure for {peer}")
256+
return True
257+
try:
258+
if peer not in self.clients:
259+
raise ValueError(f"Unknown peer: {peer}")
260+
msg = build_message(command="simulate_failure", recipient=peer)
261+
self.clients[peer].send(msg)
262+
logger.info(f"Simulated failure for {peer}")
263+
return True
264+
except ValueError as e:
265+
logger.exception("Invalid args for simulate_failure")
266+
raise
267+
except Exception:
268+
logger.exception("Failed to simulate failure")
269+
return False
270+
271+
def set_log_level(self, level: int) -> bool:
272+
"""Set logging level (0=debug, 1=info, 2=error) (mock if not implemented).
273+
274+
Args:
275+
level: Log level (0-2).
276+
277+
Returns:
278+
True if successful, False otherwise.
279+
280+
Raises:
281+
ValueError: If level is invalid.
282+
"""
283+
if self.use_mock:
284+
logger.info(f"Mock set_log_level to {level}")
285+
return True
286+
try:
287+
if not (0 <= level <= 2):
288+
raise ValueError("Invalid level")
289+
for peer in self.clients:
290+
msg = build_message(command="set_log_level", level=level, recipient=peer)
291+
self.clients[peer].send(msg)
292+
logger.info(f"Set log level to {level}")
293+
return True
294+
except ValueError as e:
295+
logger.exception("Invalid args for set_log_level")
296+
raise
297+
except Exception:
298+
logger.exception("Failed to set log level")
299+
return False
300+
301+
def load_plugin(self, path: str) -> bool:
302+
"""Dynamically load a plugin by path (mock if not implemented).
303+
304+
Args:
305+
path: Plugin path.
306+
307+
Returns:
308+
True if successful, False otherwise.
309+
"""
310+
if self.use_mock:
311+
logger.info(f"Mock load_plugin {path}")
312+
return True
313+
try:
314+
# Assume DCF supports plugin loading; mock for now
315+
logger.info(f"Loaded plugin {path}")
316+
return True
317+
except Exception:
318+
logger.exception("Failed to load plugin")
319+
return False
320+
321+
def _get_peer_info(self, peer: str) -> str:
322+
"""Helper to get peer info (mock for now)."""
323+
try:
324+
msg = build_message(command="get_info", recipient=peer)
325+
resp = self.clients[peer].send(msg)
326+
return json.loads(resp.data).get("info", "50ms, group1")
327+
except (json.JSONDecodeError, Exception):
328+
logger.exception(f"Failed to get info for {peer}")
329+
return "unknown"

0 commit comments

Comments
 (0)