From 96de00a85de28690e908f857d6b5caac822a89c5 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Tue, 2 Sep 2025 09:59:24 -0500 Subject: [PATCH 01/11] First version of working websockets --- src/pqnstack/app/api/main.py | 4 ++++ src/pqnstack/app/api/routes/handshake.py | 15 +++++++++++++++ src/pqnstack/app/api/routes/websocket.py | 24 ++++++++++++++++++++++++ src/pqnstack/app/core/config.py | 3 +++ 4 files changed, 46 insertions(+) create mode 100644 src/pqnstack/app/api/routes/handshake.py create mode 100644 src/pqnstack/app/api/routes/websocket.py diff --git a/src/pqnstack/app/api/main.py b/src/pqnstack/app/api/main.py index d50d7c12..a36b74de 100644 --- a/src/pqnstack/app/api/main.py +++ b/src/pqnstack/app/api/main.py @@ -5,6 +5,8 @@ from pqnstack.app.api.routes import rng from pqnstack.app.api.routes import serial from pqnstack.app.api.routes import timetagger +from pqnstack.app.api.routes import websocket +from pqnstack.app.api.routes import handshake api_router = APIRouter() api_router.include_router(chsh.router) @@ -12,3 +14,5 @@ api_router.include_router(timetagger.router) api_router.include_router(rng.router) api_router.include_router(serial.router) +api_router.include_router(websocket.router) +api_router.include_router(handshake.router) \ No newline at end of file diff --git a/src/pqnstack/app/api/routes/handshake.py b/src/pqnstack/app/api/routes/handshake.py new file mode 100644 index 00000000..72fbac6b --- /dev/null +++ b/src/pqnstack/app/api/routes/handshake.py @@ -0,0 +1,15 @@ +from fastapi import APIRouter + +from pqnstack.app.core.config import state, state_change_event + +router = APIRouter() + + + +@router.get("/handshake") +async def handshake() -> bool: + state.incoming = True + + state_change_event.set() + return True + diff --git a/src/pqnstack/app/api/routes/websocket.py b/src/pqnstack/app/api/routes/websocket.py new file mode 100644 index 00000000..e8e8df5b --- /dev/null +++ b/src/pqnstack/app/api/routes/websocket.py @@ -0,0 +1,24 @@ +from fastapi import APIRouter, WebSocket, WebSocketDisconnect + +router = APIRouter() + +@router.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + """ + A simple websocket endpoint that accepts a connection, receives messages, + and sends a response back. + """ + from pqnstack.app.core.config import state, state_change_event + await websocket.accept() + await websocket.send_text("HELLO HELLO HELLO") + try: + while True: + await state_change_event.wait() # Wait for a state change event + if state.incoming: + await websocket.send_text("PING PING PING") + state.incoming = False + # To prevent sending this message in a loop, you might want to reset the state: + + state_change_event.clear() # Reset the event for the next change + except WebSocketDisconnect: + print("Client disconnected") diff --git a/src/pqnstack/app/core/config.py b/src/pqnstack/app/core/config.py index da78729f..7a9c7a67 100644 --- a/src/pqnstack/app/core/config.py +++ b/src/pqnstack/app/core/config.py @@ -1,5 +1,6 @@ import logging from functools import lru_cache +import asyncio from pydantic import BaseModel from pydantic import Field @@ -72,6 +73,7 @@ def get_settings() -> Settings: class NodeState(BaseModel): + incoming: bool = False chsh_request_basis: list[float] = [22.5, 67.5] # FIXME: Use enums for this qkd_basis_list: list[QKDEncodingBasis] = [ @@ -94,3 +96,4 @@ class NodeState(BaseModel): state = NodeState() +state_change_event = asyncio.Event() From aa148121c25e56a756ded1fe7789d0509251f84c Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Thu, 4 Sep 2025 15:30:54 -0500 Subject: [PATCH 02/11] Implements basic state handling, as well as client disconnects detection --- configs/config_app_example.toml | 2 + src/pqnstack/app/api/main.py | 8 +- src/pqnstack/app/api/routes/coordination.py | 109 ++++++++++++++++++++ src/pqnstack/app/api/routes/debug.py | 11 ++ src/pqnstack/app/api/routes/handshake.py | 15 --- src/pqnstack/app/api/routes/websocket.py | 24 ----- src/pqnstack/app/core/config.py | 21 +++- 7 files changed, 146 insertions(+), 44 deletions(-) create mode 100644 src/pqnstack/app/api/routes/coordination.py create mode 100644 src/pqnstack/app/api/routes/debug.py delete mode 100644 src/pqnstack/app/api/routes/handshake.py delete mode 100644 src/pqnstack/app/api/routes/websocket.py diff --git a/configs/config_app_example.toml b/configs/config_app_example.toml index 81c4a476..84c8a813 100644 --- a/configs/config_app_example.toml +++ b/configs/config_app_example.toml @@ -1,5 +1,7 @@ # MAKE SURE TO RENAME THIS FILE TO config.toml AND PLACE IT IN THE ROOT OF THE PROJECT +node_name = "example_node" + # Router configuration router_name = "router1" router_address = "xx.xx.xx.xx" # Replace with actual IP address diff --git a/src/pqnstack/app/api/main.py b/src/pqnstack/app/api/main.py index a36b74de..fe65fa93 100644 --- a/src/pqnstack/app/api/main.py +++ b/src/pqnstack/app/api/main.py @@ -5,8 +5,8 @@ from pqnstack.app.api.routes import rng from pqnstack.app.api.routes import serial from pqnstack.app.api.routes import timetagger -from pqnstack.app.api.routes import websocket -from pqnstack.app.api.routes import handshake +from pqnstack.app.api.routes import coordination +from pqnstack.app.api.routes import debug api_router = APIRouter() api_router.include_router(chsh.router) @@ -14,5 +14,5 @@ api_router.include_router(timetagger.router) api_router.include_router(rng.router) api_router.include_router(serial.router) -api_router.include_router(websocket.router) -api_router.include_router(handshake.router) \ No newline at end of file +api_router.include_router(coordination.router) +api_router.include_router(debug.router) \ No newline at end of file diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py new file mode 100644 index 00000000..cb040382 --- /dev/null +++ b/src/pqnstack/app/api/routes/coordination.py @@ -0,0 +1,109 @@ +import logging +import asyncio + +from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Request, HTTPException, status + +from pqnstack.app.api.deps import ClientDep +from pqnstack.app.core.config import state, state_change_event, settings, user_replied_event + +logger = logging.getLogger(__name__) + + +router = APIRouter(prefix="/coordination", tags=["coordination"]) + + +@router.post("/collect_follower") +async def collect_follower(address: str, http_client: ClientDep) -> bool: + + logger.info("Requesting client at %s to follow", address) + + ret = await http_client.post(f"http://{address}/coordination/follow_requested?leaders_name={settings.node_name}") + if ret.status_code != 200: + raise HTTPException(status_code=ret.status_code, detail=ret.text) + + + if ret.json() is True: + state.leading = True + state.followers_address = address + logger.info("Successfully collected follower") + return True + if ret.json() is False: + logger.info("Follower rejected follow request") + return False + + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR , detail="Could not collect follower for unknown reasons") + + +@router.post('/follow_requested') +async def follow_requested(request: Request, leaders_name: str) -> bool: + + logger.info("Requesting client at %s to follow", leaders_name) + print("state.client_listening", state.client_listening_for_follower_requests) + print("state.following", state.following) + # Check if the client is ready to accept a follower request and that node is not already following someone. + if state.client_listening_for_follower_requests and not state.following: + + # Load the state with the incoming info of the request + state.leaders_address = request.client.host + state.leaders_name = leaders_name + state.following_requested = True + # Trigger the state change to get the websocket to send question to user + state_change_event.set() + + logger.info("Asking user to accept follow request from %s (%s)", leaders_name, request.client.host) + await user_replied_event.wait() # Wait for a state change event to see if user accepted + user_replied_event.clear() # Reset the event for the next change + if state.following_requested_user_response: + logger.info(f"Follow request from {request.client.host} accepted.") + state.client_listening_for_follower_requests = True + state.following = True + state.leaders_address = request.client.host + state_change_event.set() + return True + + logger.info(f"Follow request from {request.client.host} rejected.") + # Clean up the state if user rejected + state.leaders_address = None + state.leaders_name = None + state.following_requested = False + + else: + logger.info("Request rejected because %s", ("client is not listening for requests" if state.client_listening_for_follower_requests else "this node is already following someone")) + + return False + + +@router.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + """ + A simple websocket endpoint that accepts a connection, receives messages, + and sends a response back. + """ + from pqnstack.app.core.config import state, state_change_event + await websocket.accept() + logger.info("Client connected to websocket for multiplayer coordination.") + state.client_listening_for_follower_requests = True + + async def state_change_handler(): + while True: + await state_change_event.wait() # Wait for a state change event + if state.following_requested: + logger.debug("Websocket detected a follow request, asking user for response.") + await websocket.send_text(f"Do you want to accept a connection from {state.leaders_name} ({state.leaders_address})?") + state_change_event.clear() # Reset the event for the next change + + state_change_task = asyncio.create_task(state_change_handler()) + + try: + while True: + response = await websocket.receive_text() + state.following_requested_user_response = response.lower() in ["true", "yes", "y"] + state.following_requested = False + logger.debug("Websocket received a response from user: %s", state.following_requested_user_response) + user_replied_event.set() + + except WebSocketDisconnect: + logger.info("Client disconnected from websocket for multiplayer coordination.") + state.client_listening_for_follower_requests = False + finally: + state_change_task.cancel() \ No newline at end of file diff --git a/src/pqnstack/app/api/routes/debug.py b/src/pqnstack/app/api/routes/debug.py new file mode 100644 index 00000000..0250aa90 --- /dev/null +++ b/src/pqnstack/app/api/routes/debug.py @@ -0,0 +1,11 @@ +from fastapi import APIRouter + +from pqnstack.app.core.config import state + +router = APIRouter(prefix='/debug', tags=['debug']) + + +@router.get('/state') +async def get_state(): + return state + diff --git a/src/pqnstack/app/api/routes/handshake.py b/src/pqnstack/app/api/routes/handshake.py deleted file mode 100644 index 72fbac6b..00000000 --- a/src/pqnstack/app/api/routes/handshake.py +++ /dev/null @@ -1,15 +0,0 @@ -from fastapi import APIRouter - -from pqnstack.app.core.config import state, state_change_event - -router = APIRouter() - - - -@router.get("/handshake") -async def handshake() -> bool: - state.incoming = True - - state_change_event.set() - return True - diff --git a/src/pqnstack/app/api/routes/websocket.py b/src/pqnstack/app/api/routes/websocket.py deleted file mode 100644 index e8e8df5b..00000000 --- a/src/pqnstack/app/api/routes/websocket.py +++ /dev/null @@ -1,24 +0,0 @@ -from fastapi import APIRouter, WebSocket, WebSocketDisconnect - -router = APIRouter() - -@router.websocket("/ws") -async def websocket_endpoint(websocket: WebSocket): - """ - A simple websocket endpoint that accepts a connection, receives messages, - and sends a response back. - """ - from pqnstack.app.core.config import state, state_change_event - await websocket.accept() - await websocket.send_text("HELLO HELLO HELLO") - try: - while True: - await state_change_event.wait() # Wait for a state change event - if state.incoming: - await websocket.send_text("PING PING PING") - state.incoming = False - # To prevent sending this message in a loop, you might want to reset the state: - - state_change_event.clear() # Reset the event for the next change - except WebSocketDisconnect: - print("Client disconnected") diff --git a/src/pqnstack/app/core/config.py b/src/pqnstack/app/core/config.py index 7a9c7a67..078dd9b6 100644 --- a/src/pqnstack/app/core/config.py +++ b/src/pqnstack/app/core/config.py @@ -33,6 +33,7 @@ class QKDSettings(BaseModel): class Settings(BaseSettings): + node_name: str = "node1" router_name: str = "router1" router_address: str = "localhost" router_port: int = 5555 @@ -73,7 +74,24 @@ def get_settings() -> Settings: class NodeState(BaseModel): - incoming: bool = False + + # FIXME: Make sure we are checking for the client_listening_for_follower_requests state everywhere. + client_listening_for_follower_requests: bool = False + + # Leader's state + leading: bool = False + followers_address: str | None = None + + + # Follower's state + following: bool = False + following_requested: bool = False + # User's response to the follow request. None if no response yet, True if accepted, False if rejected. + following_requested_user_response: bool | None = None + # The address of the leader this node is following. None if not following anyone. + leaders_address: str | None = None + leaders_name: str | None = None + chsh_request_basis: list[float] = [22.5, 67.5] # FIXME: Use enums for this qkd_basis_list: list[QKDEncodingBasis] = [ @@ -97,3 +115,4 @@ class NodeState(BaseModel): state = NodeState() state_change_event = asyncio.Event() +user_replied_event = asyncio.Event() From d782a756270730fcae487fcca981f0e2ad8f97da Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Thu, 4 Sep 2025 18:00:31 -0500 Subject: [PATCH 03/11] cleans up code and deals with ruff and mypy --- src/pqnstack/app/api/routes/coordination.py | 137 +++++++++++++------- src/pqnstack/app/api/routes/debug.py | 8 +- src/pqnstack/app/core/config.py | 7 +- 3 files changed, 98 insertions(+), 54 deletions(-) diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index cb040382..93a533e5 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -1,10 +1,18 @@ -import logging import asyncio +import logging -from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Request, HTTPException, status +from fastapi import APIRouter +from fastapi import HTTPException +from fastapi import Request +from fastapi import WebSocket +from fastapi import WebSocketDisconnect +from fastapi import status from pqnstack.app.api.deps import ClientDep -from pqnstack.app.core.config import state, state_change_event, settings, user_replied_event +from pqnstack.app.core.config import ask_user_for_follow_event +from pqnstack.app.core.config import settings +from pqnstack.app.core.config import state +from pqnstack.app.core.config import user_replied_event logger = logging.getLogger(__name__) @@ -12,16 +20,34 @@ router = APIRouter(prefix="/coordination", tags=["coordination"]) +# TODO: Send a disconnection message if I was following someone. +@router.post("/reset_coordination_state") +async def reset_coordination_state() -> None: + """Reset the coordination state of the node.""" + state.leading = False + state.followers_address = None + state.following = False + state.following_requested = False + state.following_requested_user_response = None + state.leaders_address = None + state.leaders_name = None + + @router.post("/collect_follower") async def collect_follower(address: str, http_client: ClientDep) -> bool: + """ + Endpoint called by a leader node (this one) to request a follower node (other node) to follow it. + Returns + ------- + True if the follower accepted the request, False otherwise. + """ logger.info("Requesting client at %s to follow", address) ret = await http_client.post(f"http://{address}/coordination/follow_requested?leaders_name={settings.node_name}") - if ret.status_code != 200: + if ret.status_code != status.HTTP_200_OK: raise HTTPException(status_code=ret.status_code, detail=ret.text) - if ret.json() is True: state.leading = True state.followers_address = address @@ -31,79 +57,98 @@ async def collect_follower(address: str, http_client: ClientDep) -> bool: logger.info("Follower rejected follow request") return False - raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR , detail="Could not collect follower for unknown reasons") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Could not collect follower for unknown reasons" + ) -@router.post('/follow_requested') +@router.post("/follow_requested") async def follow_requested(request: Request, leaders_name: str) -> bool: + """ + Endpoint is called by a leader node (other node) to request this node to follow it. + + Returns + ------- + True if the follow request is accepted, False otherwise. + """ + logger.debug("Requesting client at %s to follow", leaders_name) + + if request.client is None: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Request lacks the clients host") + leaders_address = request.client.host - logger.info("Requesting client at %s to follow", leaders_name) - print("state.client_listening", state.client_listening_for_follower_requests) - print("state.following", state.following) # Check if the client is ready to accept a follower request and that node is not already following someone. if state.client_listening_for_follower_requests and not state.following: - - # Load the state with the incoming info of the request - state.leaders_address = request.client.host - state.leaders_name = leaders_name state.following_requested = True # Trigger the state change to get the websocket to send question to user - state_change_event.set() + ask_user_for_follow_event.set() - logger.info("Asking user to accept follow request from %s (%s)", leaders_name, request.client.host) + logger.debug("Asking user to accept follow request from %s (%s)", leaders_name, leaders_address) await user_replied_event.wait() # Wait for a state change event to see if user accepted user_replied_event.clear() # Reset the event for the next change if state.following_requested_user_response: - logger.info(f"Follow request from {request.client.host} accepted.") - state.client_listening_for_follower_requests = True + logger.debug("Follow request from %s accepted.", leaders_address) state.following = True - state.leaders_address = request.client.host - state_change_event.set() + state.leaders_name = leaders_name + state.leaders_address = leaders_address + ask_user_for_follow_event.set() return True - logger.info(f"Follow request from {request.client.host} rejected.") + logger.debug("Follow request from %s rejected.", leaders_address) # Clean up the state if user rejected state.leaders_address = None state.leaders_name = None state.following_requested = False else: - logger.info("Request rejected because %s", ("client is not listening for requests" if state.client_listening_for_follower_requests else "this node is already following someone")) + logger.info( + "Request rejected because %s", + ( + "client is not listening for requests" + if state.client_listening_for_follower_requests + else "this node is already following someone" + ), + ) return False -@router.websocket("/ws") -async def websocket_endpoint(websocket: WebSocket): - """ - A simple websocket endpoint that accepts a connection, receives messages, - and sends a response back. - """ - from pqnstack.app.core.config import state, state_change_event +@router.websocket("/follow_requested_alerts") +async def follow_requested_alert(websocket: WebSocket) -> None: + """Websocket endpoint is used to alert the client when a follow request is received. It also handles the response from the client.""" await websocket.accept() logger.info("Client connected to websocket for multiplayer coordination.") state.client_listening_for_follower_requests = True - async def state_change_handler(): + async def ask_user_for_follow_handler() -> None: + """Task that waits for the ask_user_for_follow_event event and sends a message to the client if a follow request is detected.""" while True: - await state_change_event.wait() # Wait for a state change event + await ask_user_for_follow_event.wait() # Wait for a state change event if state.following_requested: logger.debug("Websocket detected a follow request, asking user for response.") - await websocket.send_text(f"Do you want to accept a connection from {state.leaders_name} ({state.leaders_address})?") - state_change_event.clear() # Reset the event for the next change - - state_change_task = asyncio.create_task(state_change_handler()) + await websocket.send_text( + f"Do you want to accept a connection from {state.leaders_name} ({state.leaders_address})?" + ) + ask_user_for_follow_event.clear() # Reset the event for the next change + + async def client_message_handler() -> None: + """Task that waits for a message from the client and handles the response. It also handles the case where the client disconnects.""" + try: + while True: + response = await websocket.receive_text() + state.following_requested_user_response = response.lower() in ["true", "yes", "y"] + state.following_requested = False + logger.debug("Websocket received a response from user: %s", state.following_requested_user_response) + user_replied_event.set() + except WebSocketDisconnect: + logger.info("Client disconnected from websocket for multiplayer coordination.") + state.client_listening_for_follower_requests = False + + state_change_task = asyncio.create_task(ask_user_for_follow_handler()) + client_message_task = asyncio.create_task(client_message_handler()) try: - while True: - response = await websocket.receive_text() - state.following_requested_user_response = response.lower() in ["true", "yes", "y"] - state.following_requested = False - logger.debug("Websocket received a response from user: %s", state.following_requested_user_response) - user_replied_event.set() - - except WebSocketDisconnect: - logger.info("Client disconnected from websocket for multiplayer coordination.") - state.client_listening_for_follower_requests = False + await asyncio.gather(state_change_task, client_message_task) finally: - state_change_task.cancel() \ No newline at end of file + state_change_task.cancel() + client_message_task.cancel() diff --git a/src/pqnstack/app/api/routes/debug.py b/src/pqnstack/app/api/routes/debug.py index 0250aa90..1e2af119 100644 --- a/src/pqnstack/app/api/routes/debug.py +++ b/src/pqnstack/app/api/routes/debug.py @@ -1,11 +1,11 @@ from fastapi import APIRouter +from pqnstack.app.core.config import NodeState from pqnstack.app.core.config import state -router = APIRouter(prefix='/debug', tags=['debug']) +router = APIRouter(prefix="/debug", tags=["debug"]) -@router.get('/state') -async def get_state(): +@router.get("/state") +async def get_state() -> NodeState: return state - diff --git a/src/pqnstack/app/core/config.py b/src/pqnstack/app/core/config.py index 078dd9b6..a28b5baa 100644 --- a/src/pqnstack/app/core/config.py +++ b/src/pqnstack/app/core/config.py @@ -1,6 +1,6 @@ +import asyncio import logging from functools import lru_cache -import asyncio from pydantic import BaseModel from pydantic import Field @@ -74,7 +74,6 @@ def get_settings() -> Settings: class NodeState(BaseModel): - # FIXME: Make sure we are checking for the client_listening_for_follower_requests state everywhere. client_listening_for_follower_requests: bool = False @@ -82,9 +81,9 @@ class NodeState(BaseModel): leading: bool = False followers_address: str | None = None - # Follower's state following: bool = False + # Other node requested this node to follow it. following_requested: bool = False # User's response to the follow request. None if no response yet, True if accepted, False if rejected. following_requested_user_response: bool | None = None @@ -114,5 +113,5 @@ class NodeState(BaseModel): state = NodeState() -state_change_event = asyncio.Event() +ask_user_for_follow_event = asyncio.Event() user_replied_event = asyncio.Event() From 09a0616216250087576ccbcf72d2a00ddf849265 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Thu, 11 Sep 2025 13:03:00 -0500 Subject: [PATCH 04/11] Moved all coordination state into its own model and dep --- src/pqnstack/app/api/deps.py | 10 ++++ src/pqnstack/app/api/routes/coordination.py | 64 +++++++++++---------- src/pqnstack/app/core/config.py | 11 ++-- 3 files changed, 50 insertions(+), 35 deletions(-) diff --git a/src/pqnstack/app/api/deps.py b/src/pqnstack/app/api/deps.py index 610116ef..22e70555 100644 --- a/src/pqnstack/app/api/deps.py +++ b/src/pqnstack/app/api/deps.py @@ -4,6 +4,9 @@ import httpx from fastapi import Depends +from pqnstack.app.core.config import CoordinationState +from pqnstack.app.core.config import state + async def get_http_client() -> AsyncGenerator[httpx.AsyncClient, None]: async with httpx.AsyncClient(timeout=600_000) as client: @@ -11,3 +14,10 @@ async def get_http_client() -> AsyncGenerator[httpx.AsyncClient, None]: ClientDep = Annotated[httpx.AsyncClient, Depends(get_http_client)] + + +async def get_coordination_state() -> AsyncGenerator[CoordinationState, None]: + yield state.coordination_state + + +CoordinationStateDep = Annotated[CoordinationState, Depends(get_coordination_state)] diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index 93a533e5..b9096996 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -9,9 +9,9 @@ from fastapi import status from pqnstack.app.api.deps import ClientDep +from pqnstack.app.api.deps import CoordinationStateDep from pqnstack.app.core.config import ask_user_for_follow_event from pqnstack.app.core.config import settings -from pqnstack.app.core.config import state from pqnstack.app.core.config import user_replied_event logger = logging.getLogger(__name__) @@ -22,19 +22,19 @@ # TODO: Send a disconnection message if I was following someone. @router.post("/reset_coordination_state") -async def reset_coordination_state() -> None: +async def reset_coordination_state(coord: CoordinationStateDep) -> None: """Reset the coordination state of the node.""" - state.leading = False - state.followers_address = None - state.following = False - state.following_requested = False - state.following_requested_user_response = None - state.leaders_address = None - state.leaders_name = None + coord.leading = False + coord.followers_address = "" + coord.following = False + coord.following_requested = False + coord.following_requested_user_response = None + coord.leaders_address = "" + coord.leaders_name = "" @router.post("/collect_follower") -async def collect_follower(address: str, http_client: ClientDep) -> bool: +async def collect_follower(address: str, coord: CoordinationStateDep, http_client: ClientDep) -> bool: """ Endpoint called by a leader node (this one) to request a follower node (other node) to follow it. @@ -49,8 +49,8 @@ async def collect_follower(address: str, http_client: ClientDep) -> bool: raise HTTPException(status_code=ret.status_code, detail=ret.text) if ret.json() is True: - state.leading = True - state.followers_address = address + coord.leading = True + coord.followers_address = address logger.info("Successfully collected follower") return True if ret.json() is False: @@ -63,7 +63,7 @@ async def collect_follower(address: str, http_client: ClientDep) -> bool: @router.post("/follow_requested") -async def follow_requested(request: Request, leaders_name: str) -> bool: +async def follow_requested(request: Request, leaders_name: str, coord: CoordinationStateDep) -> bool: """ Endpoint is called by a leader node (other node) to request this node to follow it. @@ -78,34 +78,36 @@ async def follow_requested(request: Request, leaders_name: str) -> bool: leaders_address = request.client.host # Check if the client is ready to accept a follower request and that node is not already following someone. - if state.client_listening_for_follower_requests and not state.following: - state.following_requested = True + if coord.client_listening_for_follower_requests and not coord.following: + coord.following_requested = True + coord.leaders_name = leaders_name + coord.leaders_address = leaders_address # Trigger the state change to get the websocket to send question to user ask_user_for_follow_event.set() logger.debug("Asking user to accept follow request from %s (%s)", leaders_name, leaders_address) await user_replied_event.wait() # Wait for a state change event to see if user accepted user_replied_event.clear() # Reset the event for the next change - if state.following_requested_user_response: + if coord.following_requested_user_response: logger.debug("Follow request from %s accepted.", leaders_address) - state.following = True - state.leaders_name = leaders_name - state.leaders_address = leaders_address + coord.following = True + coord.leaders_name = leaders_name + coord.leaders_address = leaders_address ask_user_for_follow_event.set() return True logger.debug("Follow request from %s rejected.", leaders_address) # Clean up the state if user rejected - state.leaders_address = None - state.leaders_name = None - state.following_requested = False + coord.leaders_address = "" + coord.leaders_name = "" + coord.following_requested = False else: logger.info( "Request rejected because %s", ( "client is not listening for requests" - if state.client_listening_for_follower_requests + if coord.client_listening_for_follower_requests else "this node is already following someone" ), ) @@ -114,20 +116,20 @@ async def follow_requested(request: Request, leaders_name: str) -> bool: @router.websocket("/follow_requested_alerts") -async def follow_requested_alert(websocket: WebSocket) -> None: +async def follow_requested_alert(websocket: WebSocket, coord: CoordinationStateDep) -> None: """Websocket endpoint is used to alert the client when a follow request is received. It also handles the response from the client.""" await websocket.accept() logger.info("Client connected to websocket for multiplayer coordination.") - state.client_listening_for_follower_requests = True + coord.client_listening_for_follower_requests = True async def ask_user_for_follow_handler() -> None: """Task that waits for the ask_user_for_follow_event event and sends a message to the client if a follow request is detected.""" while True: await ask_user_for_follow_event.wait() # Wait for a state change event - if state.following_requested: + if coord.following_requested: logger.debug("Websocket detected a follow request, asking user for response.") await websocket.send_text( - f"Do you want to accept a connection from {state.leaders_name} ({state.leaders_address})?" + f"Do you want to accept a connection from {coord.leaders_name} ({coord.leaders_address})?" ) ask_user_for_follow_event.clear() # Reset the event for the next change @@ -136,13 +138,13 @@ async def client_message_handler() -> None: try: while True: response = await websocket.receive_text() - state.following_requested_user_response = response.lower() in ["true", "yes", "y"] - state.following_requested = False - logger.debug("Websocket received a response from user: %s", state.following_requested_user_response) + coord.following_requested_user_response = response.lower() in ["true", "yes", "y"] + coord.following_requested = False + logger.debug("Websocket received a response from user: %s", coord.following_requested_user_response) user_replied_event.set() except WebSocketDisconnect: logger.info("Client disconnected from websocket for multiplayer coordination.") - state.client_listening_for_follower_requests = False + coord.client_listening_for_follower_requests = False state_change_task = asyncio.create_task(ask_user_for_follow_handler()) client_message_task = asyncio.create_task(client_message_handler()) diff --git a/src/pqnstack/app/core/config.py b/src/pqnstack/app/core/config.py index a28b5baa..2e7a762d 100644 --- a/src/pqnstack/app/core/config.py +++ b/src/pqnstack/app/core/config.py @@ -73,13 +73,13 @@ def get_settings() -> Settings: settings = get_settings() -class NodeState(BaseModel): +class CoordinationState(BaseModel): # FIXME: Make sure we are checking for the client_listening_for_follower_requests state everywhere. client_listening_for_follower_requests: bool = False # Leader's state leading: bool = False - followers_address: str | None = None + followers_address: str = "" # Follower's state following: bool = False @@ -88,9 +88,12 @@ class NodeState(BaseModel): # User's response to the follow request. None if no response yet, True if accepted, False if rejected. following_requested_user_response: bool | None = None # The address of the leader this node is following. None if not following anyone. - leaders_address: str | None = None - leaders_name: str | None = None + leaders_address: str = "" + leaders_name: str = "" + +class NodeState(BaseModel): + coordination_state: CoordinationState = CoordinationState() chsh_request_basis: list[float] = [22.5, 67.5] # FIXME: Use enums for this qkd_basis_list: list[QKDEncodingBasis] = [ From 413f199319eb079402220f8e10c8faad4125cd8e Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Wed, 17 Sep 2025 11:14:12 -0500 Subject: [PATCH 05/11] Implemented feedback to coordination.py --- src/pqnstack/app/api/routes/coordination.py | 92 ++++++++++++--------- 1 file changed, 53 insertions(+), 39 deletions(-) diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index b9096996..7479531f 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -7,6 +7,7 @@ from fastapi import WebSocket from fastapi import WebSocketDisconnect from fastapi import status +from pydantic import BaseModel from pqnstack.app.api.deps import ClientDep from pqnstack.app.api.deps import CoordinationStateDep @@ -17,12 +18,24 @@ logger = logging.getLogger(__name__) +class FollowRequestResponse(BaseModel): + accepted: bool + + +class CollectFollowerResponse(BaseModel): + success: bool + + +class ResetCoordinationStateResponse(BaseModel): + message: str = "Coordination state reset successfully" + + router = APIRouter(prefix="/coordination", tags=["coordination"]) # TODO: Send a disconnection message if I was following someone. -@router.post("/reset_coordination_state") -async def reset_coordination_state(coord: CoordinationStateDep) -> None: +@router.post("/reset_coordination_state", response_model=ResetCoordinationStateResponse) +async def reset_coordination_state(coord: CoordinationStateDep) -> ResetCoordinationStateResponse: """Reset the coordination state of the node.""" coord.leading = False coord.followers_address = "" @@ -31,16 +44,17 @@ async def reset_coordination_state(coord: CoordinationStateDep) -> None: coord.following_requested_user_response = None coord.leaders_address = "" coord.leaders_name = "" + return ResetCoordinationStateResponse() -@router.post("/collect_follower") -async def collect_follower(address: str, coord: CoordinationStateDep, http_client: ClientDep) -> bool: +@router.post("/collect_follower", response_model=CollectFollowerResponse) +async def collect_follower(address: str, coord: CoordinationStateDep, http_client: ClientDep) -> CollectFollowerResponse: """ Endpoint called by a leader node (this one) to request a follower node (other node) to follow it. Returns ------- - True if the follower accepted the request, False otherwise. + CollectFollowerResponse indicating if the follower accepted the request. """ logger.info("Requesting client at %s to follow", address) @@ -48,28 +62,29 @@ async def collect_follower(address: str, coord: CoordinationStateDep, http_clien if ret.status_code != status.HTTP_200_OK: raise HTTPException(status_code=ret.status_code, detail=ret.text) - if ret.json() is True: + response_data = ret.json() + if response_data.get("accepted") is True: coord.leading = True coord.followers_address = address logger.info("Successfully collected follower") - return True - if ret.json() is False: + return CollectFollowerResponse(success=True) + if response_data.get("accepted") is False: logger.info("Follower rejected follow request") - return False + return CollectFollowerResponse(success=False) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Could not collect follower for unknown reasons" ) -@router.post("/follow_requested") -async def follow_requested(request: Request, leaders_name: str, coord: CoordinationStateDep) -> bool: +@router.post("/follow_requested", response_model=FollowRequestResponse) +async def follow_requested(request: Request, leaders_name: str, coord: CoordinationStateDep) -> FollowRequestResponse: """ Endpoint is called by a leader node (other node) to request this node to follow it. Returns ------- - True if the follow request is accepted, False otherwise. + FollowRequestResponse indicating if the follow request is accepted. """ logger.debug("Requesting client at %s to follow", leaders_name) @@ -78,41 +93,40 @@ async def follow_requested(request: Request, leaders_name: str, coord: Coordinat leaders_address = request.client.host # Check if the client is ready to accept a follower request and that node is not already following someone. - if coord.client_listening_for_follower_requests and not coord.following: - coord.following_requested = True - coord.leaders_name = leaders_name - coord.leaders_address = leaders_address - # Trigger the state change to get the websocket to send question to user - ask_user_for_follow_event.set() - - logger.debug("Asking user to accept follow request from %s (%s)", leaders_name, leaders_address) - await user_replied_event.wait() # Wait for a state change event to see if user accepted - user_replied_event.clear() # Reset the event for the next change - if coord.following_requested_user_response: - logger.debug("Follow request from %s accepted.", leaders_address) - coord.following = True - coord.leaders_name = leaders_name - coord.leaders_address = leaders_address - ask_user_for_follow_event.set() - return True - - logger.debug("Follow request from %s rejected.", leaders_address) - # Clean up the state if user rejected - coord.leaders_address = "" - coord.leaders_name = "" - coord.following_requested = False - - else: + if not coord.client_listening_for_follower_requests or coord.following: logger.info( "Request rejected because %s", ( "client is not listening for requests" - if coord.client_listening_for_follower_requests + if not coord.client_listening_for_follower_requests else "this node is already following someone" ), ) + return FollowRequestResponse(accepted=False) + + coord.following_requested = True + coord.leaders_name = leaders_name + coord.leaders_address = leaders_address + # Trigger the state change to get the websocket to send question to user + ask_user_for_follow_event.set() + + logger.debug("Asking user to accept follow request from %s (%s)", leaders_name, leaders_address) + await user_replied_event.wait() # Wait for a state change event to see if user accepted + user_replied_event.clear() # Reset the event for the next change + if coord.following_requested_user_response: + logger.debug("Follow request from %s accepted.", leaders_address) + coord.following = True + coord.leaders_name = leaders_name + coord.leaders_address = leaders_address + ask_user_for_follow_event.set() + return FollowRequestResponse(accepted=True) - return False + logger.debug("Follow request from %s rejected.", leaders_address) + # Clean up the state if user rejected + coord.leaders_address = "" + coord.leaders_name = "" + coord.following_requested = False + return FollowRequestResponse(accepted=False) @router.websocket("/follow_requested_alerts") From 81315258c43b8b958e4c5450acfed29379042c79 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Wed, 17 Sep 2025 11:34:33 -0500 Subject: [PATCH 06/11] Removed CoordinationState object --- src/pqnstack/app/api/deps.py | 10 +--- src/pqnstack/app/api/routes/coordination.py | 66 ++++++++++----------- src/pqnstack/app/core/config.py | 14 +++-- 3 files changed, 46 insertions(+), 44 deletions(-) diff --git a/src/pqnstack/app/api/deps.py b/src/pqnstack/app/api/deps.py index 22e70555..706a0f96 100644 --- a/src/pqnstack/app/api/deps.py +++ b/src/pqnstack/app/api/deps.py @@ -4,8 +4,8 @@ import httpx from fastapi import Depends -from pqnstack.app.core.config import CoordinationState -from pqnstack.app.core.config import state +from pqnstack.app.core.config import NodeState +from pqnstack.app.core.config import get_state async def get_http_client() -> AsyncGenerator[httpx.AsyncClient, None]: @@ -16,8 +16,4 @@ async def get_http_client() -> AsyncGenerator[httpx.AsyncClient, None]: ClientDep = Annotated[httpx.AsyncClient, Depends(get_http_client)] -async def get_coordination_state() -> AsyncGenerator[CoordinationState, None]: - yield state.coordination_state - - -CoordinationStateDep = Annotated[CoordinationState, Depends(get_coordination_state)] +StateDep = Annotated[NodeState, Depends(get_state)] diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index 7479531f..86b61d25 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -10,7 +10,7 @@ from pydantic import BaseModel from pqnstack.app.api.deps import ClientDep -from pqnstack.app.api.deps import CoordinationStateDep +from pqnstack.app.api.deps import StateDep from pqnstack.app.core.config import ask_user_for_follow_event from pqnstack.app.core.config import settings from pqnstack.app.core.config import user_replied_event @@ -35,20 +35,20 @@ class ResetCoordinationStateResponse(BaseModel): # TODO: Send a disconnection message if I was following someone. @router.post("/reset_coordination_state", response_model=ResetCoordinationStateResponse) -async def reset_coordination_state(coord: CoordinationStateDep) -> ResetCoordinationStateResponse: +async def reset_coordination_state(state: StateDep) -> ResetCoordinationStateResponse: """Reset the coordination state of the node.""" - coord.leading = False - coord.followers_address = "" - coord.following = False - coord.following_requested = False - coord.following_requested_user_response = None - coord.leaders_address = "" - coord.leaders_name = "" + state.leading = False + state.followers_address = "" + state.following = False + state.following_requested = False + state.following_requested_user_response = None + state.leaders_address = "" + state.leaders_name = "" return ResetCoordinationStateResponse() @router.post("/collect_follower", response_model=CollectFollowerResponse) -async def collect_follower(address: str, coord: CoordinationStateDep, http_client: ClientDep) -> CollectFollowerResponse: +async def collect_follower(address: str, state: StateDep, http_client: ClientDep) -> CollectFollowerResponse: """ Endpoint called by a leader node (this one) to request a follower node (other node) to follow it. @@ -64,8 +64,8 @@ async def collect_follower(address: str, coord: CoordinationStateDep, http_clien response_data = ret.json() if response_data.get("accepted") is True: - coord.leading = True - coord.followers_address = address + state.leading = True + state.followers_address = address logger.info("Successfully collected follower") return CollectFollowerResponse(success=True) if response_data.get("accepted") is False: @@ -78,7 +78,7 @@ async def collect_follower(address: str, coord: CoordinationStateDep, http_clien @router.post("/follow_requested", response_model=FollowRequestResponse) -async def follow_requested(request: Request, leaders_name: str, coord: CoordinationStateDep) -> FollowRequestResponse: +async def follow_requested(request: Request, leaders_name: str, state: StateDep) -> FollowRequestResponse: """ Endpoint is called by a leader node (other node) to request this node to follow it. @@ -93,57 +93,57 @@ async def follow_requested(request: Request, leaders_name: str, coord: Coordinat leaders_address = request.client.host # Check if the client is ready to accept a follower request and that node is not already following someone. - if not coord.client_listening_for_follower_requests or coord.following: + if not state.client_listening_for_follower_requests or state.following: logger.info( "Request rejected because %s", ( "client is not listening for requests" - if not coord.client_listening_for_follower_requests + if not state.client_listening_for_follower_requests else "this node is already following someone" ), ) return FollowRequestResponse(accepted=False) - coord.following_requested = True - coord.leaders_name = leaders_name - coord.leaders_address = leaders_address + state.following_requested = True + state.leaders_name = leaders_name + state.leaders_address = leaders_address # Trigger the state change to get the websocket to send question to user ask_user_for_follow_event.set() logger.debug("Asking user to accept follow request from %s (%s)", leaders_name, leaders_address) await user_replied_event.wait() # Wait for a state change event to see if user accepted user_replied_event.clear() # Reset the event for the next change - if coord.following_requested_user_response: + if state.following_requested_user_response: logger.debug("Follow request from %s accepted.", leaders_address) - coord.following = True - coord.leaders_name = leaders_name - coord.leaders_address = leaders_address + state.following = True + state.leaders_name = leaders_name + state.leaders_address = leaders_address ask_user_for_follow_event.set() return FollowRequestResponse(accepted=True) logger.debug("Follow request from %s rejected.", leaders_address) # Clean up the state if user rejected - coord.leaders_address = "" - coord.leaders_name = "" - coord.following_requested = False + state.leaders_address = "" + state.leaders_name = "" + state.following_requested = False return FollowRequestResponse(accepted=False) @router.websocket("/follow_requested_alerts") -async def follow_requested_alert(websocket: WebSocket, coord: CoordinationStateDep) -> None: +async def follow_requested_alert(websocket: WebSocket, state: StateDep) -> None: """Websocket endpoint is used to alert the client when a follow request is received. It also handles the response from the client.""" await websocket.accept() logger.info("Client connected to websocket for multiplayer coordination.") - coord.client_listening_for_follower_requests = True + state.client_listening_for_follower_requests = True async def ask_user_for_follow_handler() -> None: """Task that waits for the ask_user_for_follow_event event and sends a message to the client if a follow request is detected.""" while True: await ask_user_for_follow_event.wait() # Wait for a state change event - if coord.following_requested: + if state.following_requested: logger.debug("Websocket detected a follow request, asking user for response.") await websocket.send_text( - f"Do you want to accept a connection from {coord.leaders_name} ({coord.leaders_address})?" + f"Do you want to accept a connection from {state.leaders_name} ({state.leaders_address})?" ) ask_user_for_follow_event.clear() # Reset the event for the next change @@ -152,13 +152,13 @@ async def client_message_handler() -> None: try: while True: response = await websocket.receive_text() - coord.following_requested_user_response = response.lower() in ["true", "yes", "y"] - coord.following_requested = False - logger.debug("Websocket received a response from user: %s", coord.following_requested_user_response) + state.following_requested_user_response = response.lower() in ["true", "yes", "y"] + state.following_requested = False + logger.debug("Websocket received a response from user: %s", state.following_requested_user_response) user_replied_event.set() except WebSocketDisconnect: logger.info("Client disconnected from websocket for multiplayer coordination.") - coord.client_listening_for_follower_requests = False + state.client_listening_for_follower_requests = False state_change_task = asyncio.create_task(ask_user_for_follow_handler()) client_message_task = asyncio.create_task(client_message_handler()) diff --git a/src/pqnstack/app/core/config.py b/src/pqnstack/app/core/config.py index 2e7a762d..0b072092 100644 --- a/src/pqnstack/app/core/config.py +++ b/src/pqnstack/app/core/config.py @@ -73,7 +73,9 @@ def get_settings() -> Settings: settings = get_settings() -class CoordinationState(BaseModel): + +class NodeState(BaseModel): + # Coordination state # FIXME: Make sure we are checking for the client_listening_for_follower_requests state everywhere. client_listening_for_follower_requests: bool = False @@ -91,10 +93,10 @@ class CoordinationState(BaseModel): leaders_address: str = "" leaders_name: str = "" - -class NodeState(BaseModel): - coordination_state: CoordinationState = CoordinationState() + # CHSH state chsh_request_basis: list[float] = [22.5, 67.5] + + # QKD state # FIXME: Use enums for this qkd_basis_list: list[QKDEncodingBasis] = [ QKDEncodingBasis.DA, @@ -118,3 +120,7 @@ class NodeState(BaseModel): state = NodeState() ask_user_for_follow_event = asyncio.Event() user_replied_event = asyncio.Event() + + +def get_state() -> NodeState: + return state From 08cf41cf69474f1c8cf2f560e7ca5a8648b08d99 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Wed, 17 Sep 2025 11:41:05 -0500 Subject: [PATCH 07/11] State is now used through a dep --- src/pqnstack/app/api/routes/chsh.py | 4 ++-- src/pqnstack/app/api/routes/debug.py | 4 ++-- src/pqnstack/app/api/routes/qkd.py | 8 +++++--- src/pqnstack/app/core/config.py | 1 - 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/pqnstack/app/api/routes/chsh.py b/src/pqnstack/app/api/routes/chsh.py index 375e84bb..3eeeb490 100644 --- a/src/pqnstack/app/api/routes/chsh.py +++ b/src/pqnstack/app/api/routes/chsh.py @@ -6,8 +6,8 @@ from fastapi import status from pqnstack.app.api.deps import ClientDep +from pqnstack.app.api.deps import StateDep from pqnstack.app.core.config import settings -from pqnstack.app.core.config import state from pqnstack.app.core.models import calculate_chsh_expectation_error from pqnstack.network.client import Client @@ -126,7 +126,7 @@ async def chsh( @router.post("/request-angle-by-basis") -async def request_angle_by_basis(index: int, *, perp: bool = False) -> bool: +async def request_angle_by_basis(index: int, state: StateDep, *, perp: bool = False) -> bool: client = Client(host=settings.router_address, port=settings.router_port, timeout=600_000) hwp = client.get_device(settings.chsh_settings.request_hwp[0], settings.chsh_settings.request_hwp[1]) if hwp is None: diff --git a/src/pqnstack/app/api/routes/debug.py b/src/pqnstack/app/api/routes/debug.py index 1e2af119..12768c32 100644 --- a/src/pqnstack/app/api/routes/debug.py +++ b/src/pqnstack/app/api/routes/debug.py @@ -1,11 +1,11 @@ from fastapi import APIRouter +from pqnstack.app.api.deps import StateDep from pqnstack.app.core.config import NodeState -from pqnstack.app.core.config import state router = APIRouter(prefix="/debug", tags=["debug"]) @router.get("/state") -async def get_state() -> NodeState: +async def get_state(state: StateDep) -> NodeState: return state diff --git a/src/pqnstack/app/api/routes/qkd.py b/src/pqnstack/app/api/routes/qkd.py index f676b2ce..914f40d5 100644 --- a/src/pqnstack/app/api/routes/qkd.py +++ b/src/pqnstack/app/api/routes/qkd.py @@ -7,8 +7,8 @@ from fastapi import status from pqnstack.app.api.deps import ClientDep +from pqnstack.app.api.deps import StateDep from pqnstack.app.core.config import settings -from pqnstack.app.core.config import state from pqnstack.constants import BasisBool from pqnstack.constants import QKDEncodingBasis from pqnstack.network.client import Client @@ -21,6 +21,7 @@ async def _qkd( follower_node_address: str, http_client: ClientDep, + state: StateDep, timetagger_address: str | None = None, ) -> list[int]: logger.debug("Starting QKD") @@ -106,6 +107,7 @@ def get_outcome(state: int, basis: int, choice: int, counts: int) -> int: async def qkd( follower_node_address: str, http_client: ClientDep, + state: StateDep, timetagger_address: str | None = None, ) -> list[int]: """Perform a QKD protocol with the given follower node.""" @@ -120,7 +122,7 @@ async def qkd( @router.post("/single_bit") -async def request_qkd_single_pass() -> bool: +async def request_qkd_single_pass(state: StateDep) -> bool: client = Client(host=settings.router_address, port=settings.router_port, timeout=600_000) hwp = client.get_device(settings.qkd_settings.request_hwp[0], settings.qkd_settings.request_hwp[1]) @@ -149,7 +151,7 @@ async def request_qkd_single_pass() -> bool: @router.post("/request_basis_list") -def request_qkd_basis_list(leader_basis_list: list[str]) -> list[str]: +def request_qkd_basis_list(leader_basis_list: list[str], state: StateDep) -> list[str]: """Return the list of basis angles for QKD.""" # Check that lengths match if len(leader_basis_list) != len(state.qkd_request_basis_list): diff --git a/src/pqnstack/app/core/config.py b/src/pqnstack/app/core/config.py index 0b072092..b5aea03d 100644 --- a/src/pqnstack/app/core/config.py +++ b/src/pqnstack/app/core/config.py @@ -73,7 +73,6 @@ def get_settings() -> Settings: settings = get_settings() - class NodeState(BaseModel): # Coordination state # FIXME: Make sure we are checking for the client_listening_for_follower_requests state everywhere. From b4bfac8318f16af5ad3c8950d17e0a80a68bb3a7 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Wed, 17 Sep 2025 13:08:57 -0500 Subject: [PATCH 08/11] mypy and ruff fixes --- src/pqnstack/app/api/routes/coordination.py | 6 +++--- src/pqnstack/app/api/routes/qkd.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index 86b61d25..1e618873 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -34,7 +34,7 @@ class ResetCoordinationStateResponse(BaseModel): # TODO: Send a disconnection message if I was following someone. -@router.post("/reset_coordination_state", response_model=ResetCoordinationStateResponse) +@router.post("/reset_coordination_state") async def reset_coordination_state(state: StateDep) -> ResetCoordinationStateResponse: """Reset the coordination state of the node.""" state.leading = False @@ -47,7 +47,7 @@ async def reset_coordination_state(state: StateDep) -> ResetCoordinationStateRes return ResetCoordinationStateResponse() -@router.post("/collect_follower", response_model=CollectFollowerResponse) +@router.post("/collect_follower") async def collect_follower(address: str, state: StateDep, http_client: ClientDep) -> CollectFollowerResponse: """ Endpoint called by a leader node (this one) to request a follower node (other node) to follow it. @@ -77,7 +77,7 @@ async def collect_follower(address: str, state: StateDep, http_client: ClientDep ) -@router.post("/follow_requested", response_model=FollowRequestResponse) +@router.post("/follow_requested") async def follow_requested(request: Request, leaders_name: str, state: StateDep) -> FollowRequestResponse: """ Endpoint is called by a leader node (other node) to request this node to follow it. diff --git a/src/pqnstack/app/api/routes/qkd.py b/src/pqnstack/app/api/routes/qkd.py index 914f40d5..35a97695 100644 --- a/src/pqnstack/app/api/routes/qkd.py +++ b/src/pqnstack/app/api/routes/qkd.py @@ -118,7 +118,7 @@ async def qkd( detail="QKD basis list is empty", ) - return await _qkd(follower_node_address, http_client, timetagger_address) + return await _qkd(follower_node_address, http_client, state, timetagger_address) @router.post("/single_bit") From 64d888461fbb84dff9b144cfb876a1e41972483d Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Wed, 24 Sep 2025 16:50:47 -0500 Subject: [PATCH 09/11] Minor bugfix --- src/pqnstack/app/api/routes/coordination.py | 27 ++++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index 1e618873..ded798b9 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -93,7 +93,7 @@ async def follow_requested(request: Request, leaders_name: str, state: StateDep) leaders_address = request.client.host # Check if the client is ready to accept a follower request and that node is not already following someone. - if not state.client_listening_for_follower_requests or state.following: + if not state.client_listening_for_follower_requests and state.following: logger.info( "Request rejected because %s", ( @@ -118,7 +118,6 @@ async def follow_requested(request: Request, leaders_name: str, state: StateDep) state.following = True state.leaders_name = leaders_name state.leaders_address = leaders_address - ask_user_for_follow_event.set() return FollowRequestResponse(accepted=True) logger.debug("Follow request from %s rejected.", leaders_address) @@ -139,13 +138,23 @@ async def follow_requested_alert(websocket: WebSocket, state: StateDep) -> None: async def ask_user_for_follow_handler() -> None: """Task that waits for the ask_user_for_follow_event event and sends a message to the client if a follow request is detected.""" while True: - await ask_user_for_follow_event.wait() # Wait for a state change event - if state.following_requested: - logger.debug("Websocket detected a follow request, asking user for response.") - await websocket.send_text( - f"Do you want to accept a connection from {state.leaders_name} ({state.leaders_address})?" - ) - ask_user_for_follow_event.clear() # Reset the event for the next change + try: + await ask_user_for_follow_event.wait() # Wait for a state change event + if state.following_requested: + logger.debug("Websocket detected a follow request, asking user for response.") + try: + await websocket.send_text( + f"Do you want to accept a connection from {state.leaders_name} ({state.leaders_address})?" + ) + except RuntimeError as e: + if "websocket.close" in str(e) or "response already completed" in str(e): + logger.debug("WebSocket already closed, cannot send message") + break + raise + ask_user_for_follow_event.clear() # Reset the event for the next change + except Exception as e: + logger.error("Error in ask_user_for_follow_handler: %s", e) + break async def client_message_handler() -> None: """Task that waits for a message from the client and handles the response. It also handles the case where the client disconnects.""" From 26d73c4abb186a6cb7148de072148245bdcc5700 Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Wed, 15 Oct 2025 18:31:35 -0500 Subject: [PATCH 10/11] ruff mypy checks --- src/pqnstack/app/api/main.py | 6 +++--- src/pqnstack/app/api/routes/coordination.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/pqnstack/app/api/main.py b/src/pqnstack/app/api/main.py index fe65fa93..175e0899 100644 --- a/src/pqnstack/app/api/main.py +++ b/src/pqnstack/app/api/main.py @@ -1,12 +1,12 @@ from fastapi import APIRouter from pqnstack.app.api.routes import chsh +from pqnstack.app.api.routes import coordination +from pqnstack.app.api.routes import debug from pqnstack.app.api.routes import qkd from pqnstack.app.api.routes import rng from pqnstack.app.api.routes import serial from pqnstack.app.api.routes import timetagger -from pqnstack.app.api.routes import coordination -from pqnstack.app.api.routes import debug api_router = APIRouter() api_router.include_router(chsh.router) @@ -15,4 +15,4 @@ api_router.include_router(rng.router) api_router.include_router(serial.router) api_router.include_router(coordination.router) -api_router.include_router(debug.router) \ No newline at end of file +api_router.include_router(debug.router) diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index ded798b9..fb7ac6a1 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -152,8 +152,8 @@ async def ask_user_for_follow_handler() -> None: break raise ask_user_for_follow_event.clear() # Reset the event for the next change - except Exception as e: - logger.error("Error in ask_user_for_follow_handler: %s", e) + except Exception: + logger.exception("Error in ask_user_for_follow_handler") break async def client_message_handler() -> None: From cd72572d8592c41afe45fe85e50740c1e665fd9b Mon Sep 17 00:00:00 2001 From: marcosf2 Date: Fri, 17 Oct 2025 16:36:23 -0500 Subject: [PATCH 11/11] Checking for websocket connection before sending message --- src/pqnstack/app/api/routes/coordination.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index fb7ac6a1..35d77850 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -93,7 +93,7 @@ async def follow_requested(request: Request, leaders_name: str, state: StateDep) leaders_address = request.client.host # Check if the client is ready to accept a follower request and that node is not already following someone. - if not state.client_listening_for_follower_requests and state.following: + if not state.client_listening_for_follower_requests or state.following: logger.info( "Request rejected because %s", ( @@ -142,15 +142,13 @@ async def ask_user_for_follow_handler() -> None: await ask_user_for_follow_event.wait() # Wait for a state change event if state.following_requested: logger.debug("Websocket detected a follow request, asking user for response.") - try: + if websocket.client_state.name == "CONNECTED": await websocket.send_text( f"Do you want to accept a connection from {state.leaders_name} ({state.leaders_address})?" ) - except RuntimeError as e: - if "websocket.close" in str(e) or "response already completed" in str(e): - logger.debug("WebSocket already closed, cannot send message") - break - raise + else: + logger.debug("WebSocket not connected, cannot send message") + break ask_user_for_follow_event.clear() # Reset the event for the next change except Exception: logger.exception("Error in ask_user_for_follow_handler")