Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "chordnet"
version = "1.0.0"
version = "1.1.0"
license = "MIT"
license-files = ["LICENSE"]
description = "Implementation of the chord peer-to-peer networking protocol, introduced by Stoica et al."
Expand Down Expand Up @@ -77,7 +77,7 @@ explicit = true

[dependency-groups]
dev = [
"bpython>=0.25",
"ipython>=8.37.0",
"mypy>=1.17.1",
"pdoc>=15.0.4",
"pre-commit>=4.3.0",
Expand Down
30 changes: 30 additions & 0 deletions scripts/auto/anchor.py
Original file line number Diff line number Diff line change
@@ -1 +1,31 @@
"""anchor.py: creates and starts an anchor node for testing."""
import sys

import IPython

from chordnet import Node as ChordNode


def main() -> None:
"""Creates a new ring with this computer as the only node."""
if len(sys.argv) != 3:
print("usage: [uv run] python anchor.py ip_addr port_no")
exit(1)

ip = sys.argv[1]
port = int(sys.argv[2])

node = ChordNode(ip, port, debug=True)
# create (start) the ring
node.create()
print(f"Node created as \"node\": {node.address}", file=sys.stderr)
repl_locals = {
'node': node,
}
print("starting repl. access `node`")
IPython.embed(user_ns=repl_locals)
node.stop()


if __name__ == '__main__':
main()
33 changes: 33 additions & 0 deletions scripts/auto/joiner.py
Original file line number Diff line number Diff line change
@@ -1 +1,34 @@
"""joiner.py: creates a joining node for debugging."""
import sys

import IPython

from chordnet import Node as ChordNode


def main() -> None:
"""Creates a new ring with this computer as the only node."""
if len(sys.argv) != 5:
print("usage: [uv run] python " \
"joiner.py this_ip this_port target_ip target_port")
exit(1)

# Get IP and port from command line arguments
ip = sys.argv[1]
port = int(sys.argv[2])
target_ip = sys.argv[3]
target_port = int(sys.argv[4])

# Create and join node
node = ChordNode(ip, port, debug=True)
node.join(target_ip, target_port)
repl_locals = {
'node': node,
}
print("starting repl. access `node`, advance with `step(node)`")
IPython.embed(user_ns=repl_locals)
node.stop()


if __name__ == '__main__':
main()
6 changes: 3 additions & 3 deletions scripts/manual/anchor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""anchor.py: creates and starts an anchor node for testing."""
import sys

import bpython
import IPython
from step import step #type: ignore

from chordnet import Node as ChordNode
Expand All @@ -16,7 +16,7 @@ def main() -> None:
ip = sys.argv[1]
port = int(sys.argv[2])

node = ChordNode(ip, port)
node = ChordNode(ip, port, daemon=False)
# create (start) the ring
node.create()
print(f"Node created as \"node\": {node.address}", file=sys.stderr)
Expand All @@ -25,7 +25,7 @@ def main() -> None:
'step': step,
}
print("starting repl. access `node`, advance with `step(node)`")
bpython.embed(locals_=repl_locals)
IPython.embed(user_ns=repl_locals)
node.stop()


Expand Down
6 changes: 3 additions & 3 deletions scripts/manual/joiner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""joiner.py: creates a joining node for debugging."""
import sys

import bpython
import IPython
from step import step #type: ignore

from chordnet import Node as ChordNode
Expand All @@ -21,14 +21,14 @@ def main() -> None:
target_port = int(sys.argv[4])

# Create and join node
node = ChordNode(ip, port)
node = ChordNode(ip, port, daemon=False)
node.join(target_ip, target_port)
repl_locals = {
'node': node,
'step': step,
}
print("starting repl. access `node`, advance with `step(node)`")
bpython.embed(locals_=repl_locals)
IPython.embed(user_ns=repl_locals)
node.stop()


Expand Down
3 changes: 1 addition & 2 deletions src/chordnet/net.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ def _handle_connection(self, client_socket: socket.socket) -> None:
# Send response
client_socket.send(str(response).encode())
except Exception as e:
sys.stderr.write(f"Error handling connection: {e}\n")
sys.stderr.flush()
log.error(f"Error handling connection: {e}")
finally:
client_socket.close()
126 changes: 84 additions & 42 deletions src/chordnet/node.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""node.py: Represents a node on a ring."""
import threading
from typing import Callable, Tuple

from loguru import logger as log
Expand Down Expand Up @@ -33,14 +34,27 @@ class Node:
finger_table: list[Address | None]
_next: int
_net: _Net
_timer: threading.Timer | None
is_running: bool

def __init__(self, ip: str, port: int) -> None:
_use_daemon: bool
_interval: float
_debug: bool

def __init__(self,
ip: str,
port: int,
daemon:bool=True,
interval:float=1.0,
debug:bool=False
) -> None:
"""Initializes a new Chord node.

Args:
ip (str): IP address for the node.
port (int): Port number to listen on.
ip: IP address for the node.
port: Port number to listen on.
daemon: whether to run the daemon.
interval: daemon interval.
debug: whether to print node state after every daemon run.
"""
self.address = Address(ip, port)

Expand All @@ -53,6 +67,11 @@ def __init__(self, ip: str, port: int) -> None:
self._net = _Net(ip, port, self._process_request)
self.is_running = False

self._use_daemon = daemon
self._interval = interval
self._debug = debug
self._timer = None

def successor(self) -> Address | None:
"""Alias for self.finger_table[0]."""
return self.finger_table[0]
Expand Down Expand Up @@ -131,43 +150,42 @@ def fix_fingers(self) -> None:

# Move to the next finger table entry, wrapping around if necessary
self._next = (self._next + 1) % Address._M
'''
def _run_fix_fingers(self, interval=1.0):
"""
Periodically invokes fix_fingers every `interval` seconds.

def _daemon(self) -> None:
"""Runs fix_fingers and stabilize periodically.

Args:
interval (float): Time interval between updates (in seconds).
interval: Time interval between periodic calls.
debug: Whether to print node state
"""
self.fix_fingers()
# Schedule the next execution
self._fix_fingers_timer = threading.Timer(
interval, self._run_fix_fingers, args=[interval])
self._fix_fingers_timer.start()
if self._use_daemon and self.is_running:
try:
self.stabilize()
self.fix_fingers()
if self._debug:
print(f"pred: {self.predecessor}, succ: {self.successor()}")
print(self.finger_table)

except Exception as e:
# Catch any unhandled exception within the daemon's tasks
# and log it properly. This is crucial for debugging.
log.error(
f"Unhandled exception in daemon for {self.address}: {e}"
)
# You might want to optionally stop the daemon here
# if continuous failures are problematic
# self.stop()

finally:
# Always reschedule the timer, even if an exception occurred,
# unless you decided to stop the daemon above.
if self.is_running:
self._timer = threading.Timer(self._interval, self._daemon)
self._timer.daemon = True
self._timer.start()

def start_periodic_tasks(self, interval=1.0):
"""
Starts periodic tasks for the node, including fix_fingers.

Args:
interval (float): Time interval between periodic calls.
"""
if self._fix_fingers_timer and self._fix_fingers_timer.is_alive():
# Timer is already running, no need to start again
log.info("Periodic tasks are already running.")
return
self.is_running = True
self._run_fix_fingers(interval)

def stop_periodic_tasks(self):
"""
Stops periodic tasks for the node gracefully.
"""
if self._fix_fingers_timer:
self._fix_fingers_timer.cancel()
self._fix_fingers_timer = None
self.is_running = False
'''
def log_finger_table(self) -> None:
"""Logs the entire finger table to the log file."""
message = "Current Finger Table:\n"
Expand Down Expand Up @@ -272,9 +290,15 @@ def stabilize(self) -> None:
# set successor to x
# notify successor about this node
curr_successor = self.successor()
if curr_successor is None:
if curr_successor is None or curr_successor == self.address:
# if we have a predecessor, then its a 2 node ring
# complete the circle
if self.predecessor and self.predecessor != self.address:
self.finger_table[0] = self.predecessor
return



x = None

try:
Expand Down Expand Up @@ -338,20 +362,35 @@ def notify(self, potential_successor: Address | None)-> bool:


def start(self) -> None:
"""Starts the Chord node's network listener.
"""Starts the Chord node's daemon and network listener.

Begins accepting incoming network connections in a separate thread.

Args:
daemon: whether to run a daemon
(runs fix_fingers and stabilize periodically
interval: interval daemon sleeps for (only relevant if daemon=True)
debug: whether to print the state of the node
(again only relevant to daemon)
"""
self._net.start()
self.is_running = True
if self._use_daemon:
self._daemon()



def stop(self) -> None:
"""Gracefully stops the Chord node's network listener.
"""Gracefully stops the Chord node's daemon and network listener.

Closes the server socket and waits for the network thread to terminate.
"""
self._net.stop()
if self._timer:
self._timer.cancel()
self._timer = None
self.is_running = False




Expand Down Expand Up @@ -408,9 +447,11 @@ def _be_notified(self, notifying_node: Address) -> bool:
True if the node was accepted as a predecessor, False otherwise.
"""
# Update predecessor if necessary
if not self.predecessor or self._is_between(
self.predecessor.key, self.address.key, notifying_node.key
):
my_key = self.address.key
their_key = notifying_node.key
if not self.predecessor \
or self.predecessor == self.address \
or self._is_between(self.predecessor.key, my_key, their_key):
self.predecessor = notifying_node
return True
else:
Expand Down Expand Up @@ -513,7 +554,8 @@ def _process_request(
[args[0], args[1], args[2]])
)
assert notifier is not None
return "OK" if self._be_notified(notifier) else "IGNORED"
new_predecessor = self._be_notified(notifier)
return "OK" if new_predecessor else "IGNORED"

except (ValueError, AssertionError):
return "INVALID_NODE"
Expand Down
Loading