Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ classifiers = [
]

requires-python = ">=3.10"
dependencies = []
dependencies = [
"loguru>=0.7.3",
]

[build-system]
requires = ["uv_build>=0.7.19,<0.8"]
Expand Down
16 changes: 9 additions & 7 deletions src/chordnet/net.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import threading
from typing import Callable, Tuple

from loguru import logger as log

from .address import Address

callback = Callable[[str, list[str]], str| Address | None]
Expand Down Expand Up @@ -86,7 +88,7 @@ def send_request(
request_args = ':'.join(str(arg) for arg in args)
request = f"{method}:{request_args}"
if (method == "TRACE_SUCCESSOR"):
print ('[SENDING TRACE REQ]', request)
log.debug("[SENDING TRACE REQ]", request)
# Send the request
sock.send(request.encode())

Expand All @@ -96,13 +98,13 @@ def send_request(
return response

except socket.timeout:
print("Request timed out", file=sys.stderr)
log.info("Request timed out")
return None
except ConnectionRefusedError:
print("Connection refused", file=sys.stderr)
log.info("Connection refused")
return None
except Exception as e:
print(f"Network request error: {e}", file=sys.stderr)
log.info(f"Network request error: {e}")
return None


Expand All @@ -129,7 +131,7 @@ def _listen_for_connections(self) -> None:
).start()
except Exception as e:
if self._running:
print(f"Error accepting connection: {e}\n")
log.info(f"Error accepting connection: {e}\n")
sys.stderr.write(f"Error accepting connection: {e}\n")
sys.stderr.flush()

Expand All @@ -149,13 +151,13 @@ def _handle_connection(self, client_socket: socket.socket) -> None:
method, *args = request.split(':')

if method == 'TRACE_SUCCESSOR':
print(f"[NET]Received request: {request}", file=sys.stderr)
log.debug(f"[NET]Received request: {request}")

# Dispatch to appropriate method
response = self._request_handler(method, args)

if method == 'TRACE_SUCCESSOR':
print(f"[NET]Sent response: {response}", file=sys.stderr)
log.debug(f"[NET]Sent response: {response}")

# Send response
client_socket.send(str(response).encode())
Expand Down
43 changes: 22 additions & 21 deletions src/chordnet/node.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""node.py: Represents a node on a ring."""
import sys
from typing import Callable, Tuple

from loguru import logger as log

from .address import Address
from .net import _Net

Expand Down Expand Up @@ -95,7 +96,7 @@ def join(self, known_ip: str, known_port: int) -> None:
self.finger_table[0] = self._parse_address(response)
msg = f"Node {self.address.key} joined the ring. " \
"Successor: {self.successor().key}"
print(msg, file=sys.stderr)
log.info(msg)
else:
raise ValueError("Failed to find successor. Join failed")

Expand All @@ -104,7 +105,7 @@ def join(self, known_ip: str, known_port: int) -> None:


except Exception as e:
print(f"Join failed: {e}")
log.info(f"Join failed: {e}")
raise


Expand All @@ -118,15 +119,15 @@ def fix_fingers(self) -> None:
gap = (2 ** self._next) % (2 ** Address._M)

start = self.address.key + gap
#print(f"fixing finger {self._next}. gap is {gap}, " \
#log.info(f"fixing finger {self._next}. gap is {gap}, " \
#"start of interval is: {start}")

try:
# Find the successor for this finger's start position
responsible_node = self.find_successor(start)
self.finger_table[self._next] = responsible_node
except Exception as e:
print(f"fix_fingers failed for finger {self._next}: {e}")
log.debug(f"fix_fingers failed for finger {self._next}: {e}")

# Move to the next finger table entry, wrapping around if necessary
self._next = (self._next + 1) % Address._M
Expand All @@ -153,7 +154,7 @@ def start_periodic_tasks(self, interval=1.0):
"""
if self._fix_fingers_timer and self._fix_fingers_timer.is_alive():
# Timer is already running, no need to start again
print("Periodic tasks are already running.", file=sys.stderr)
log.info("Periodic tasks are already running.")
return
self.is_running = True
self._run_fix_fingers(interval)
Expand All @@ -173,7 +174,7 @@ def log_finger_table(self) -> None:
for i, finger in enumerate(self.finger_table):
message += f" Finger[{i}] -> {finger}\n"

print(message, file=sys.stderr)
log.info(message)

def find_successor(self, id: int) -> Address:
"""Finds the successor node for a given identifier.
Expand Down Expand Up @@ -210,7 +211,7 @@ def find_successor(self, id: int) -> Address:
return successor if successor else self.address

except Exception as e:
print(f"Find successor failed: {e}")
log.info(f"Find successor failed: {e}")
# Fallback to local successor if network request fails
return curr_successor if curr_successor else self.address

Expand Down Expand Up @@ -278,31 +279,31 @@ def stabilize(self) -> None:

try:
# Get the predecessor of the current successor
#print(f"stabilize: checking successor {self.successor().key}" \
#for predecessor", file=sys.stderr)
#log.info(f"stabilize: checking successor {self.successor().key}" \
#for predecessor")
x_response = self._net.send_request(
curr_successor, 'GET_PREDECESSOR')

#print(f"stabilize: predecessor found: {x_response}",
#log.info(f"stabilize: predecessor found: {x_response}",
#file=sys.stderr)
x = self._parse_address(x_response)

if x and self._is_between(
self.address.key, curr_successor.key, x.key
):
self.finger_table[0] = x
#print(
#log.info(
#f"stabilize: updated successor to {self.successor().key}",
#file=sys.stderr)
# otherwise, we just notify them that we exist.
# This is usually for the first joiner to a ring.

#print(f"Node {self.address} - Updated Successor:" \
#log.info(f"Node {self.address} - Updated Successor:" \
#"{self.successor()}, Predecessor: {self.predecessor}",
#file=sys.stderr)

except Exception as e:
print(f"Stabilize failed: {e}", file=sys.stderr)
log.info(f"Stabilize failed: {e}")
finally:
self.notify(self.successor())

Expand Down Expand Up @@ -332,7 +333,7 @@ def notify(self, potential_successor: Address | None)-> bool:
else:
return False
except Exception as e:
print(f"Notify failed: {e}", file=sys.stderr)
log.info(f"Notify failed: {e}")
return False


Expand Down Expand Up @@ -449,7 +450,7 @@ def trace_successor(
id,
curr_hops
)
print(f"Raw response: {response}", file=sys.stderr) # Debugging line
log.debug(f"Raw response: {response}") # Debugging line
assert response is not None
parts = response.split(":")
if len(parts) != 4:
Expand All @@ -459,14 +460,14 @@ def trace_successor(
# resolved_node.key = int(node_key)
response_split = response.split(":")
address = ':'.join(response_split[:-1])
print ("[trace]Joined Address :", address)
log.info("[trace]Joined Address :", address)
# address = '':'.join(response[:2])
return address, int(hops)+1

# return self._parse_address(response), hops

except Exception as e:
print(f"trace successor failed: {e}")
log.info(f"trace successor failed: {e}")
# Fallback to local successor if network request fails
return str(self.successor()), -1

Expand All @@ -490,14 +491,14 @@ def _process_request(
elif method == "TRACE_SUCCESSOR":
try:
id, hops = int(args[0]), int(args[1])
print ("[NODE] Current ID ", id, "Current hops ", hops)
log.info("[NODE] Current ID ", id, "Current hops ", hops)
successor, hops = self.trace_successor(id, hops)

print ("SUCCESSSOR NODE :", successor, "HOPS :", hops)
log.info("SUCCESSSOR NODE :", successor, "HOPS :", hops)
returnString = f"{successor}:{hops}"
return returnString
except Exception as e:
print(f"TRACE_SUCCESSOR error: {e}", file=sys.stderr)
log.info(f"TRACE_SUCCESSOR error: {e}")
return "ERROR:Invalid TRACE_SUCCESSOR Request"

elif method == 'GET_PREDECESSOR':
Expand Down
26 changes: 26 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.