Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/pqnstack/app/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
from pqnstack.app.api.routes import chsh
from pqnstack.app.api.routes import qkd
from pqnstack.app.api.routes import rng
from pqnstack.app.api.routes import serial
from pqnstack.app.api.routes import timetagger

api_router = APIRouter()
api_router.include_router(chsh.router)
api_router.include_router(qkd.router)
api_router.include_router(timetagger.router)
api_router.include_router(rng.router)
api_router.include_router(serial.router)
28 changes: 28 additions & 0 deletions src/pqnstack/app/api/routes/serial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import logging
from typing import Annotated

from fastapi import APIRouter
from fastapi import Depends
from pydantic import BaseModel

from pqnstack.app.core.config import settings
from pqnstack.pqn.drivers.rotaryencoder import SerialRotaryEncoder

logger = logging.getLogger(__name__)
router = APIRouter(prefix="/serial", tags=["measure"])


def get_rotary_encoder() -> SerialRotaryEncoder:
return settings.rotary_encoder


SERDep = Annotated[SerialRotaryEncoder, Depends(get_rotary_encoder)]


class AngleResponse(BaseModel):
theta: float


@router.get("/")
async def read_angle(rotary_encoder: SERDep) -> AngleResponse:
return AngleResponse(theta=rotary_encoder.read())
5 changes: 5 additions & 0 deletions src/pqnstack/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from pqnstack.constants import BellState
from pqnstack.constants import QKDEncodingBasis
from pqnstack.pqn.drivers.rotaryencoder import SerialRotaryEncoder
from pqnstack.pqn.protocols.measurement import MeasurementConfig

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -39,6 +40,10 @@ class Settings(BaseSettings):
bell_state: BellState = BellState.Phi_plus
timetagger: tuple[str, str] | None = None # Name of the timetagger to use for the CHSH experiment.

rotary_encoder: SerialRotaryEncoder = SerialRotaryEncoder(
label="rotary_encoder", address="/dev/ttyACM0", offset_degrees=0.0
)
Comment on lines +43 to +45

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the automatic discovery of the serial address? What happens if the pi doesn't have this address?


model_config = SettingsConfigDict(toml_file="./config.toml", env_file=".env", env_file_encoding="utf-8")

@classmethod
Expand Down
30 changes: 30 additions & 0 deletions src/pqnstack/pqn/drivers/rotaryencoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import atexit
from dataclasses import dataclass
from dataclasses import field

import serial


@dataclass(slots=True)
class SerialRotaryEncoder:
label: str
address: str
offset_degrees: float = 0.0
_conn: serial.Serial = field(init=False, repr=False)

def __post_init__(self) -> None:
self._conn = serial.Serial(self.address, baudrate=115200, timeout=1)
self._conn.write(b"open_channel")
self._conn.read(100)
self._conn.write(b"ready")
self._conn.read(100)

atexit.register(self.close)

def close(self) -> None:
self._conn.close()

def read(self) -> float:
self._conn.write(b"ANGLE?\n")
angle = self._conn.readline().decode().strip()
return float(angle) + self.offset_degrees