Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/routing.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ All network elements implement a `dispatch` function that depends on the content
ensures that routing information is adequately interpreted, and that the appropriate protocol is performed by the
desired node or group of nodes. The sequence of events in this case is as follows:

1. **Node A** sends a packet to the router
1. **InstrumentProvider A** sends a packet to the router
2. The router determines the validity of the sender and the receiver
3. The router extracts the signature of the packet, and separates the implementation into data, control and routing
backplane functions
4. The router sends the packet to **Node B**
5. **Node B** executes code implementing classical or quantum functions
6. **Node B** prepares a response packet with `(source, destination)` being reversed from the original request
4. The router sends the packet to **InstrumentProvider B**
5. **InstrumentProvider B** executes code implementing classical or quantum functions
6. **InstrumentProvider B** prepares a response packet with `(source, destination)` being reversed from the original request
7. The router proceeds repeats steps 2-4
8. If the packet `hops` value is zero, no further routing happens

Expand Down
58 changes: 28 additions & 30 deletions src/pqnstack/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import typer

from pqnstack.base.errors import InvalidNetworkConfigurationError
from pqnstack.network.node import Node
from pqnstack.network.instrument_provider import InstrumentProvider
from pqnstack.network.router import Router

# TODO: check if this way of handling logging from a command line script is ok.
Expand Down Expand Up @@ -40,51 +40,49 @@ def _verify_instruments_config(instruments: list[dict[str, str]]) -> dict[str, d
return ins


def _load_and_parse_node_config(
def _load_and_parse_provider_config(
config_path: Path | str, kwargs: dict[str, str | int], instruments: dict[str, dict[str, str]]
) -> tuple[dict[str, str | int], dict[str, dict[str, str]]]:
path = Path(config_path)
with path.open("rb") as f:
config = tomllib.load(f)

if "node" not in config:
msg = (
f"Config file {config_path} does not contain a node section. Add node configuration under '[node]' section."
)
if "provider" not in config:
msg = f"Config file {config_path} does not contain a provider section. Add provider configuration under '[provider]' section."
raise InvalidNetworkConfigurationError(msg)

node = config["node"]
if "name" in node:
kwargs["name"] = str(node["name"])
if "router_name" in node:
kwargs["router_name"] = str(node["router_name"])
if "host" in node:
kwargs["host"] = str(node["host"])
if "port" in node:
kwargs["port"] = int(node["port"])
if "beat_period" in node:
kwargs["beat_period"] = int(node["beat_period"])

if "instruments" in node:
instruments = _verify_instruments_config(node["instruments"])
provider = config["provider"]
if "name" in provider:
kwargs["name"] = str(provider["name"])
if "router_name" in provider:
kwargs["router_name"] = str(provider["router_name"])
if "host" in provider:
kwargs["host"] = str(provider["host"])
if "port" in provider:
kwargs["port"] = int(provider["port"])
if "beat_period" in provider:
kwargs["beat_period"] = int(provider["beat_period"])

if "instruments" in provider:
instruments = _verify_instruments_config(provider["instruments"])

return kwargs, instruments


@app.command()
def start_node( # noqa: PLR0913
name: Annotated[str | None, typer.Option(help="Name of the Node.")] = None,
def start_provider( # noqa: PLR0913
name: Annotated[str | None, typer.Option(help="Name of the InstrumentProvider.")] = None,
router_name: Annotated[
str | None, typer.Option(help="Name of the router this node will talk to (default: 'router1').")
str | None, typer.Option(help="Name of the router this provider will talk to (default: 'router1').")
] = None,
host: Annotated[
str | None,
typer.Option(
help="Host address (IP) of the node (default: 'localhost'). Usually the IP address of the Router this node will talk to."
help="Host address (IP) of the provider (default: 'localhost'). Usually the IP address of the Router this provider will talk to."
),
] = None,
port: Annotated[
int | None, typer.Option(help="Port of the node (default: 5555). Has to be the same port as the Router.")
int | None, typer.Option(help="Port of the provider (default: 5555). Has to be the same port as the Router.")
] = None,
beat_period: Annotated[int | None, typer.Option(help="Heartbeat period in milliseconds (default: 1000)")] = None,
instruments: Annotated[
Expand All @@ -98,15 +96,15 @@ def start_node( # noqa: PLR0913
] = None,
) -> None:
"""
Start a PQN Node.
Start a PQN InstrumentProvider.

Can be configured by passing arguments directly into the command line but it is recommended to use a config file if instruments will be added.
"""
kwargs: dict[str, str | int] = {}
ins: dict[str, dict[str, str]] = {}

if config:
kwargs, ins = _load_and_parse_node_config(config, kwargs, ins)
kwargs, ins = _load_and_parse_provider_config(config, kwargs, ins)

if name:
kwargs["name"] = name
Expand All @@ -123,11 +121,11 @@ def start_node( # noqa: PLR0913
ins = {**ins, **json.loads(instruments)}

if "name" not in kwargs:
msg = "Node name is required"
msg = "InstrumentProvider name is required"
raise InvalidNetworkConfigurationError(msg)

node = Node(**kwargs, **ins) # type: ignore[arg-type]
node.start()
provider = InstrumentProvider(**kwargs, **ins) # type: ignore[arg-type]
provider.start()


def _load_and_parse_router_config(config_path: Path | str, kwargs: dict[str, str | int]) -> dict[str, str | int]:
Expand Down
26 changes: 13 additions & 13 deletions src/pqnstack/network/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class InstrumentClientInit(NamedTuple):
router_name: str
timeout: int
instrument_name: str
node_name: str
provider_name: str


class InstrumentClient(ClientBase):
Expand All @@ -158,26 +158,26 @@ def __init__(self, init_args: InstrumentClientInit) -> None:
)

self.instrument_name = init_args.instrument_name
self.node_name = init_args.node_name
self.provider_name = init_args.provider_name

def trigger_operation(self, operation: str, *args: Any, **kwargs: Any) -> Any:
packet = self.create_control_packet(
self.node_name, self.instrument_name + ":OPERATION:" + operation, (args, kwargs)
self.provider_name, self.instrument_name + ":OPERATION:" + operation, (args, kwargs)
)
response = self.ask(packet)

return response.payload

def trigger_parameter(self, parameter: str, *args: Any, **kwargs: Any) -> Any:
packet = self.create_control_packet(
self.node_name, self.instrument_name + ":PARAMETER:" + parameter, (args, kwargs)
self.provider_name, self.instrument_name + ":PARAMETER:" + parameter, (args, kwargs)
)

response = self.ask(packet)
return response.payload

def get_info(self) -> DeviceInfo:
packet = self.create_control_packet(self.node_name, self.instrument_name + ":INFO:", ((), {}))
packet = self.create_control_packet(self.provider_name, self.instrument_name + ":INFO:", ((), {}))

response = self.ask(packet)
if not isinstance(response.payload, DeviceInfo):
Expand All @@ -194,7 +194,7 @@ class ProxyInstrumentInit(NamedTuple):
router_name: str
instrument_name: str
timeout: int
node_name: str
provider_name: str
desc: str
address: str
parameters: set[str]
Expand All @@ -219,7 +219,7 @@ def __init__(self, init_args: ProxyInstrumentInit) -> None:
self.parameters = init_args.parameters
self.operations = init_args.operations

self.node_name = init_args.node_name
self.provider_name = init_args.provider_name
self.router_name = init_args.router_name

# The client's name is the instrument name with "_client" appended and a random 6 character string appended.
Expand All @@ -236,7 +236,7 @@ def __init__(self, init_args: ProxyInstrumentInit) -> None:
router_name=self.router_name,
timeout=self.timeout,
instrument_name=self.name,
node_name=self.node_name,
provider_name=self.provider_name,
)
self.client = InstrumentClient(instrument_client_init)

Expand Down Expand Up @@ -278,8 +278,8 @@ def ping(self, destination: str) -> Packet | None:
)
return self.ask(ping_packet)

def get_available_devices(self, node_name: str) -> dict[str, str]:
packet = self.create_data_packet(node_name, "GET_DEVICES", None)
def get_available_devices(self, provider_name: str) -> dict[str, str]:
packet = self.create_data_packet(provider_name, "GET_DEVICES", None)
response = self.ask(packet)

if not isinstance(response.payload, dict):
Expand All @@ -288,8 +288,8 @@ def get_available_devices(self, node_name: str) -> dict[str, str]:

return response.payload

def get_device(self, node_name: str, device_name: str) -> DeviceDriver:
packet = self.create_data_packet(node_name, "GET_DEVICE_STRUCTURE", device_name)
def get_device(self, provider_name: str, device_name: str) -> DeviceDriver:
packet = self.create_data_packet(provider_name, "GET_DEVICE_STRUCTURE", device_name)

response = self.ask(packet)

Expand All @@ -309,7 +309,7 @@ def get_device(self, node_name: str, device_name: str) -> DeviceDriver:
router_name=self.router_name,
timeout=self.timeout,
instrument_name=response.payload["name"],
node_name=node_name,
provider_name=provider_name,
parameters=set(response.payload["parameters"]),
operations=response.payload["operations"],
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
# University of Illinois Urbana-Champaign
# Public Quantum Network
#
# NCSA/Illinois Computes
import datetime
import importlib
import logging
Expand All @@ -21,7 +17,7 @@
logger = logging.getLogger(__name__)


class Node:
class InstrumentProvider:
def __init__(
self,
name: str,
Expand All @@ -32,20 +28,20 @@ def __init__(
**instruments: dict[str, Any],
) -> None:
"""
Node class for PQN.
InstrumentProvider class for PQN.

A Node is the class that talks with real hardware and performs experiments. It talks to a
A InstrumentProvider is the class that talks with real hardware and performs experiments. It talks to a
single `Router` instance through zqm and awaits for instructions from it. Every `beat_interval` milliseconds,
sends a registration packet to the router.
This is done so if the router goes offline, the node can reconnect to the router automatically.
This is done so if the router goes offline, the provider can reconnect to the router automatically.

:param name: Name for the Node.
:param host: Hostname or IP address of the Router this node talks to.
:param port: Port of the name of the Router this node talks to.
:param router_name: Name of the Router this node talks to.
:param name: Name for the InstrumentProvider.
:param host: Hostname or IP address of the Router this provider talks to.
:param port: Port of the name of the Router this provider talks to.
:param router_name: Name of the Router this provider talks to.
:param beat_period: Interval in milliseconds to send a beat to the Router.
:param instruments: Instruments is a Dictionary holding the necessary instructions to initialize any hardware
the Node talks to. The keys are the names of the instruments, every key has another dictionary as its value
the InstrumentProvider talks to. The keys are the names of the instruments, every key has another dictionary as its value
with all the necessary instructions to initialize the instrument. Inside of the dictionary for the specific
instrument, a key called 'import' is required holding the import path for that specific instrument.
Note that the name is not necessary since that is the key of the dictionary.
Expand Down Expand Up @@ -127,7 +123,7 @@ def instantiate_instruments(self) -> None:
def start(self) -> None:
self.instantiate_instruments()

logger.info("Starting node %s at %s", self.name, self.address)
logger.info("Starting provider %s at %s", self.name, self.address)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.DEALER)
self.socket.setsockopt_string(zmq.IDENTITY, self.name)
Expand Down Expand Up @@ -227,7 +223,7 @@ def _beat(self) -> None:

self.socket.connect(self.address)
reg_packet = create_registration_packet(
source=self.name, destination=self.router_name, payload=NetworkElementClass.NODE, hops=0
source=self.name, destination=self.router_name, payload=NetworkElementClass.PROVIDER, hops=0
)
self.socket.send(pickle.dumps(reg_packet))
logger.info("Sent registration packet to router at %s", self.address)
Expand All @@ -237,7 +233,7 @@ def _beat(self) -> None:
logger.warning("Error while sending beat to router at %s", self.address)

def _handle_reg_acknowledge(self) -> None:
logger.info("Node %s is connected to router at %s", self.name, self.address)
logger.info("InstrumentProvider %s is connected to router at %s", self.name, self.address)
self.running = True
self._beats_since_reply = 0
self._last_received_beat = datetime.datetime.now(tz=datetime.UTC)
Expand Down Expand Up @@ -384,7 +380,7 @@ def _handle_instrument_control(self, packet: Packet) -> Packet:
return self._create_control_packet(packet.source, f"{ins_name}:INFO", instrument.info())

# All the possible packet options should have been handled by now, so if we get here, something went wrong.
msg = f"Something inside node {self.name} went wrong. Check that your packet is correct and try again."
msg = f"Something inside provider {self.name} went wrong. Check that your packet is correct and try again."
return self._create_error_packet(packet.source, msg)

def _create_error_packet(self, destination: str, error_msg: str) -> Packet:
Expand Down
2 changes: 1 addition & 1 deletion src/pqnstack/network/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

class NetworkElementClass(Enum):
ROUTER = auto()
NODE = auto()
PROVIDER = auto()
CLIENT = auto()
TELEMETRY = auto()

Expand Down
20 changes: 8 additions & 12 deletions src/pqnstack/network/router.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
# University of Illinois Urbana-Champaign
# Public Quantum Network
#
# NCSA/Illinois Computes
import copy
import logging
import pickle
Expand All @@ -27,7 +23,7 @@ def __init__(self, name: str, host: str = "localhost", port: int = 5555) -> None

# FIXME, breaking this into 3 different dictionaries is probably not the way to go.
self.routers: dict[str, bytes] = {} # Holds what other routers are in the network
self.nodes: dict[str, bytes] = {}
self.providers: dict[str, bytes] = {}
self.clients: dict[str, bytes] = {}

self.context: zmq.Context[zmq.Socket[bytes]] | None = None
Expand Down Expand Up @@ -65,9 +61,9 @@ def handle_registration(self, identity_binary: bytes, packet: Packet) -> None:
self.handle_packet_error(identity_binary, f"Router {self.name} is not the destination")
return
match packet.payload:
case NetworkElementClass.NODE:
self.nodes[packet.source] = identity_binary
logger.info("Node %s registered", identity_binary)
case NetworkElementClass.PROVIDER:
self.providers[packet.source] = identity_binary
logger.info("InstrumentProvider %s registered", identity_binary)
case NetworkElementClass.CLIENT:
self.clients[packet.source] = identity_binary
logger.info("Client %s registered", identity_binary)
Expand All @@ -90,19 +86,19 @@ def handle_pass_packet(self, identity_binary: bytes, packet: Packet) -> None:
if packet.destination == self.name:
logger.info("Packet destination is self, dropping")

elif packet.destination in self.nodes or packet.destination in self.clients:
logger.info("Packet destination is a node called %s, routing message there", packet.destination)
elif packet.destination in self.providers or packet.destination in self.clients:
logger.info("Packet destination is a provider called %s, routing message there", packet.destination)
forward_packet = copy.copy(packet)
forward_packet.hops += 1
dest = self.nodes.get(packet.destination) or self.clients.get(packet.destination)
dest = self.providers.get(packet.destination) or self.clients.get(packet.destination)
if dest is None:
self.handle_packet_error(identity_binary, f"Destination {packet.destination} not found.")
return
self._send(dest, forward_packet)
logger.info("Sent packet to %s", packet.destination)

else:
logger.info("Packet destination is not a node will ask other routers in system")
logger.info("Packet destination is not a provider will ask other routers in system")
# FIXME: This is temporary and should be replaced with the routing algorithm.
self.handle_packet_error(identity_binary, "Routing not implemented yet.")

Expand Down
6 changes: 3 additions & 3 deletions tests/messaging/blocking_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
if __name__ == "__main__":
c = Client()

# ping node
ping_reply = c.ping("node1")
# ping provider
ping_reply = c.ping("provider1")
logger.info(ping_reply)

instrument = c.get_device("node1", "dummy1")
instrument = c.get_device("provider1", "dummy1")
logger.info(instrument)

# blocking operation
Expand Down
Loading