Skip to content

Commit bd052aa

Browse files
authored
Daemon (#20)
* missed a log statement * add daemon code (back) in, simplify * start isn't user facing, update bools to reflect * update manual scripts to make sure daemon is disabled * init timer to None * switch from bpython to IPython to avoid greenlet exception
1 parent 9b2955d commit bd052aa

File tree

8 files changed

+325
-305
lines changed

8 files changed

+325
-305
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ explicit = true
7777

7878
[dependency-groups]
7979
dev = [
80-
"bpython>=0.25",
80+
"ipython>=8.37.0",
8181
"mypy>=1.17.1",
8282
"pdoc>=15.0.4",
8383
"pre-commit>=4.3.0",

scripts/auto/anchor.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,31 @@
11
"""anchor.py: creates and starts an anchor node for testing."""
2+
import sys
3+
4+
import IPython
5+
6+
from chordnet import Node as ChordNode
7+
8+
9+
def main() -> None:
10+
"""Creates a new ring with this computer as the only node."""
11+
if len(sys.argv) != 3:
12+
print("usage: [uv run] python anchor.py ip_addr port_no")
13+
exit(1)
14+
15+
ip = sys.argv[1]
16+
port = int(sys.argv[2])
17+
18+
node = ChordNode(ip, port, debug=True)
19+
# create (start) the ring
20+
node.create()
21+
print(f"Node created as \"node\": {node.address}", file=sys.stderr)
22+
repl_locals = {
23+
'node': node,
24+
}
25+
print("starting repl. access `node`")
26+
IPython.embed(user_ns=repl_locals)
27+
node.stop()
28+
29+
30+
if __name__ == '__main__':
31+
main()

scripts/auto/joiner.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,34 @@
11
"""joiner.py: creates a joining node for debugging."""
2+
import sys
3+
4+
import IPython
5+
6+
from chordnet import Node as ChordNode
7+
8+
9+
def main() -> None:
10+
"""Creates a new ring with this computer as the only node."""
11+
if len(sys.argv) != 5:
12+
print("usage: [uv run] python " \
13+
"joiner.py this_ip this_port target_ip target_port")
14+
exit(1)
15+
16+
# Get IP and port from command line arguments
17+
ip = sys.argv[1]
18+
port = int(sys.argv[2])
19+
target_ip = sys.argv[3]
20+
target_port = int(sys.argv[4])
21+
22+
# Create and join node
23+
node = ChordNode(ip, port, debug=True)
24+
node.join(target_ip, target_port)
25+
repl_locals = {
26+
'node': node,
27+
}
28+
print("starting repl. access `node`, advance with `step(node)`")
29+
IPython.embed(user_ns=repl_locals)
30+
node.stop()
31+
32+
33+
if __name__ == '__main__':
34+
main()

scripts/manual/anchor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""anchor.py: creates and starts an anchor node for testing."""
22
import sys
33

4-
import bpython
4+
import IPython
55
from step import step #type: ignore
66

77
from chordnet import Node as ChordNode
@@ -16,7 +16,7 @@ def main() -> None:
1616
ip = sys.argv[1]
1717
port = int(sys.argv[2])
1818

19-
node = ChordNode(ip, port)
19+
node = ChordNode(ip, port, daemon=False)
2020
# create (start) the ring
2121
node.create()
2222
print(f"Node created as \"node\": {node.address}", file=sys.stderr)
@@ -25,7 +25,7 @@ def main() -> None:
2525
'step': step,
2626
}
2727
print("starting repl. access `node`, advance with `step(node)`")
28-
bpython.embed(locals_=repl_locals)
28+
IPython.embed(user_ns=repl_locals)
2929
node.stop()
3030

3131

scripts/manual/joiner.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""joiner.py: creates a joining node for debugging."""
22
import sys
33

4-
import bpython
4+
import IPython
55
from step import step #type: ignore
66

77
from chordnet import Node as ChordNode
@@ -21,14 +21,14 @@ def main() -> None:
2121
target_port = int(sys.argv[4])
2222

2323
# Create and join node
24-
node = ChordNode(ip, port)
24+
node = ChordNode(ip, port, daemon=False)
2525
node.join(target_ip, target_port)
2626
repl_locals = {
2727
'node': node,
2828
'step': step,
2929
}
3030
print("starting repl. access `node`, advance with `step(node)`")
31-
bpython.embed(locals_=repl_locals)
31+
IPython.embed(user_ns=repl_locals)
3232
node.stop()
3333

3434

src/chordnet/net.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,6 @@ def _handle_connection(self, client_socket: socket.socket) -> None:
162162
# Send response
163163
client_socket.send(str(response).encode())
164164
except Exception as e:
165-
sys.stderr.write(f"Error handling connection: {e}\n")
166-
sys.stderr.flush()
165+
log.error(f"Error handling connection: {e}")
167166
finally:
168167
client_socket.close()

src/chordnet/node.py

Lines changed: 70 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""node.py: Represents a node on a ring."""
2+
import threading
23
from typing import Callable, Tuple
34

45
from loguru import logger as log
@@ -33,14 +34,27 @@ class Node:
3334
finger_table: list[Address | None]
3435
_next: int
3536
_net: _Net
37+
_timer: threading.Timer | None
3638
is_running: bool
37-
38-
def __init__(self, ip: str, port: int) -> None:
39+
_use_daemon: bool
40+
_interval: float
41+
_debug: bool
42+
43+
def __init__(self,
44+
ip: str,
45+
port: int,
46+
daemon:bool=True,
47+
interval:float=1.0,
48+
debug:bool=False
49+
) -> None:
3950
"""Initializes a new Chord node.
4051
4152
Args:
42-
ip (str): IP address for the node.
43-
port (int): Port number to listen on.
53+
ip: IP address for the node.
54+
port: Port number to listen on.
55+
daemon: whether to run the daemon.
56+
interval: daemon interval.
57+
debug: whether to print node state after every daemon run.
4458
"""
4559
self.address = Address(ip, port)
4660

@@ -53,6 +67,11 @@ def __init__(self, ip: str, port: int) -> None:
5367
self._net = _Net(ip, port, self._process_request)
5468
self.is_running = False
5569

70+
self._use_daemon = daemon
71+
self._interval = interval
72+
self._debug = debug
73+
self._timer = None
74+
5675
def successor(self) -> Address | None:
5776
"""Alias for self.finger_table[0]."""
5877
return self.finger_table[0]
@@ -131,43 +150,42 @@ def fix_fingers(self) -> None:
131150

132151
# Move to the next finger table entry, wrapping around if necessary
133152
self._next = (self._next + 1) % Address._M
134-
'''
135-
def _run_fix_fingers(self, interval=1.0):
136-
"""
137-
Periodically invokes fix_fingers every `interval` seconds.
153+
154+
def _daemon(self) -> None:
155+
"""Runs fix_fingers and stabilize periodically.
138156
139157
Args:
140-
interval (float): Time interval between updates (in seconds).
158+
interval: Time interval between periodic calls.
159+
debug: Whether to print node state
141160
"""
142-
self.fix_fingers()
143-
# Schedule the next execution
144-
self._fix_fingers_timer = threading.Timer(
145-
interval, self._run_fix_fingers, args=[interval])
146-
self._fix_fingers_timer.start()
161+
if self._use_daemon and self.is_running:
162+
try:
163+
self.stabilize()
164+
self.fix_fingers()
165+
if self._debug:
166+
print(f"pred: {self.predecessor}, succ: {self.successor()}")
167+
print(self.finger_table)
168+
169+
except Exception as e:
170+
# Catch any unhandled exception within the daemon's tasks
171+
# and log it properly. This is crucial for debugging.
172+
log.error(
173+
f"Unhandled exception in daemon for {self.address}: {e}"
174+
)
175+
# You might want to optionally stop the daemon here
176+
# if continuous failures are problematic
177+
# self.stop()
178+
179+
finally:
180+
# Always reschedule the timer, even if an exception occurred,
181+
# unless you decided to stop the daemon above.
182+
if self.is_running:
183+
self._timer = threading.Timer(self._interval, self._daemon)
184+
self._timer.daemon = True
185+
self._timer.start()
147186

148-
def start_periodic_tasks(self, interval=1.0):
149-
"""
150-
Starts periodic tasks for the node, including fix_fingers.
151187

152-
Args:
153-
interval (float): Time interval between periodic calls.
154-
"""
155-
if self._fix_fingers_timer and self._fix_fingers_timer.is_alive():
156-
# Timer is already running, no need to start again
157-
log.info("Periodic tasks are already running.")
158-
return
159-
self.is_running = True
160-
self._run_fix_fingers(interval)
161188

162-
def stop_periodic_tasks(self):
163-
"""
164-
Stops periodic tasks for the node gracefully.
165-
"""
166-
if self._fix_fingers_timer:
167-
self._fix_fingers_timer.cancel()
168-
self._fix_fingers_timer = None
169-
self.is_running = False
170-
'''
171189
def log_finger_table(self) -> None:
172190
"""Logs the entire finger table to the log file."""
173191
message = "Current Finger Table:\n"
@@ -338,20 +356,35 @@ def notify(self, potential_successor: Address | None)-> bool:
338356

339357

340358
def start(self) -> None:
341-
"""Starts the Chord node's network listener.
359+
"""Starts the Chord node's daemon and network listener.
342360
343361
Begins accepting incoming network connections in a separate thread.
362+
363+
Args:
364+
daemon: whether to run a daemon
365+
(runs fix_fingers and stabilize periodically
366+
interval: interval daemon sleeps for (only relevant if daemon=True)
367+
debug: whether to print the state of the node
368+
(again only relevant to daemon)
344369
"""
345370
self._net.start()
371+
self.is_running = True
372+
if self._use_daemon:
373+
self._daemon()
346374

347375

348376

349377
def stop(self) -> None:
350-
"""Gracefully stops the Chord node's network listener.
378+
"""Gracefully stops the Chord node's daemon and network listener.
351379
352380
Closes the server socket and waits for the network thread to terminate.
353381
"""
354382
self._net.stop()
383+
if self._timer:
384+
self._timer.cancel()
385+
self._timer = None
386+
self.is_running = False
387+
355388

356389

357390

0 commit comments

Comments
 (0)