Skip to content

Commit e150506

Browse files
Delete all references to the global state (#567)
1 parent 728d0e5 commit e150506

File tree

7 files changed

+10
-477
lines changed

7 files changed

+10
-477
lines changed

src/murfey/server/__init__.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
get_security_config,
5858
)
5959
from murfey.util.processing_params import default_spa_parameters
60-
from murfey.util.state import global_state
6160
from murfey.util.tomo import midpoint
6261

6362
try:
@@ -2258,17 +2257,6 @@ def feedback_callback(header: dict, message: dict) -> None:
22582257
time.sleep(2)
22592258
_transport_object.transport.nack(header, requeue=True)
22602259
return None
2261-
if global_state.get("data_collection_group_ids") and isinstance(
2262-
global_state["data_collection_group_ids"], dict
2263-
):
2264-
global_state["data_collection_group_ids"] = {
2265-
**global_state["data_collection_group_ids"],
2266-
message.get("tag"): dcgid,
2267-
}
2268-
else:
2269-
global_state["data_collection_group_ids"] = {
2270-
message.get("tag"): dcgid
2271-
}
22722260
_transport_object.transport.ack(header)
22732261
if dcg_hooks := entry_points().select(
22742262
group="murfey.hooks", name="data_collection_group"

src/murfey/server/api/__init__.py

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,10 @@
7676
BLSampleImageParameters,
7777
BLSampleParameters,
7878
BLSubSampleParameters,
79-
ClearanceKeys,
8079
ClientInfo,
81-
ContextInfo,
8280
CurrentGainRef,
8381
DCGroupParameters,
8482
DCParameters,
85-
File,
8683
FoilHoleParameters,
8784
FractionationParameters,
8885
GainReference,
@@ -107,7 +104,6 @@
107104
Visit,
108105
)
109106
from murfey.util.processing_params import default_spa_parameters
110-
from murfey.util.state import global_state
111107
from murfey.util.tomo import midpoint
112108
from murfey.workflows.spa.flush_spa_preprocess import (
113109
register_foil_hole,
@@ -1023,23 +1019,6 @@ def visit_info(
10231019
return None
10241020

10251021

1026-
@router.post("/visits/{visit_name}/context")
1027-
async def register_context(context_info: ContextInfo):
1028-
await ws.manager.broadcast(f"Context registered: {context_info}")
1029-
await ws.manager.set_state("experiment_type", context_info.experiment_type)
1030-
await ws.manager.set_state(
1031-
"acquisition_software", context_info.acquisition_software
1032-
)
1033-
1034-
1035-
@router.post("/visits/{visit_name}/files")
1036-
async def add_file(file: File):
1037-
message = f"File {file} transferred"
1038-
log.info(message)
1039-
await ws.manager.broadcast(f"File {file} transferred")
1040-
return file
1041-
1042-
10431022
@router.post("/instruments/{instrument_name}/feedback")
10441023
async def send_murfey_message(instrument_name: str, msg: RegistrationMessage):
10451024
if _transport_object:
@@ -1322,7 +1301,6 @@ async def request_tomography_preprocessing(
13221301
db.add(for_stash)
13231302
db.commit()
13241303
db.close()
1325-
# await ws.manager.broadcast(f"Pre-processing requested for {ppath.name}")
13261304
return proc_file
13271305

13281306

@@ -1795,42 +1773,6 @@ async def make_gif(
17951773
return {"output_gif": str(output_path)}
17961774

17971775

1798-
@router.post("/visits/{visit_name}/clean_state")
1799-
async def clean_state(visit_name: str, for_clearance: ClearanceKeys):
1800-
if global_state.get("data_collection_group_ids") and isinstance(
1801-
global_state["data_collection_group_ids"], dict
1802-
):
1803-
global_state["data_collection_group_ids"] = {
1804-
k: v
1805-
for k, v in global_state["data_collection_group_ids"].items()
1806-
if k not in for_clearance.data_collection_group
1807-
}
1808-
if global_state.get("data_collection_ids") and isinstance(
1809-
global_state["data_collection_ids"], dict
1810-
):
1811-
global_state["data_collection_ids"] = {
1812-
k: v
1813-
for k, v in global_state["data_collection_ids"].items()
1814-
if k not in for_clearance.data_collection
1815-
}
1816-
if global_state.get("processing_job_ids") and isinstance(
1817-
global_state["processing_job_ids"], dict
1818-
):
1819-
global_state["processing_job_ids"] = {
1820-
k: v
1821-
for k, v in global_state["processing_job_ids"].items()
1822-
if k not in for_clearance.processing_job
1823-
}
1824-
if global_state.get("autoproc_program_ids") and isinstance(
1825-
global_state["autoproc_program_ids"], dict
1826-
):
1827-
global_state["autoproc_program_ids"] = {
1828-
k: v
1829-
for k, v in global_state["autoproc_program_ids"].items()
1830-
if k not in for_clearance.autoproc_program
1831-
}
1832-
1833-
18341776
@router.get("/new_client_id/")
18351777
async def new_client_id(db=murfey_db):
18361778
clients = db.exec(select(ClientEnvironment)).all()

src/murfey/server/demo_api.py

Lines changed: 1 addition & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,9 @@
6262
)
6363
from murfey.util.models import (
6464
ClientInfo,
65-
ContextInfo,
6665
CurrentGainRef,
6766
DCGroupParameters,
6867
DCParameters,
69-
File,
7068
FoilHoleParameters,
7169
FractionationParameters,
7270
GainReference,
@@ -89,7 +87,6 @@
8987
Visit,
9088
)
9189
from murfey.util.processing_params import default_spa_parameters
92-
from murfey.util.state import global_state
9390
from murfey.workflows.spa.picking import _register_picked_particles_use_diameter
9491

9592
log = logging.getLogger("murfey.server.demo_api")
@@ -900,26 +897,6 @@ def visit_info(request: Request, visit_name: str):
900897
)
901898

902899

903-
@router.post("/visits/{visit_name}/context")
904-
async def register_context(context_info: ContextInfo):
905-
log.info(
906-
f"Context {context_info.experiment_type}:{context_info.acquisition_software} registered"
907-
)
908-
await ws.manager.broadcast(f"Context registered: {context_info}")
909-
await ws.manager.set_state("experiment_type", context_info.experiment_type)
910-
await ws.manager.set_state(
911-
"acquisition_software", context_info.acquisition_software
912-
)
913-
914-
915-
@router.post("/visits/{visit_name}/files")
916-
async def add_file(file: File):
917-
message = f"File {file} transferred"
918-
log.info(message)
919-
await ws.manager.broadcast(f"File {file} transferred")
920-
return file
921-
922-
923900
@router.post("/feedback")
924901
async def send_murfey_message(msg: RegistrationMessage):
925902
pass
@@ -1447,15 +1424,6 @@ def register_dc_group(
14471424
db.add(murfey_app_3d)
14481425
db.commit()
14491426

1450-
if global_state.get("data_collection_group_ids") and isinstance(
1451-
global_state["data_collection_group_ids"], dict
1452-
):
1453-
global_state["data_collection_group_ids"] = {
1454-
**global_state["data_collection_group_ids"],
1455-
dcg_params.tag: dcgid,
1456-
}
1457-
else:
1458-
global_state["data_collection_group_ids"] = {dcg_params.tag: dcgid}
14591427
if dcg_params.atlas:
14601428
_flush_grid_square_records(
14611429
{"session_id": session_id, "tag": dcg_params.tag}, demo=True
@@ -1511,15 +1479,6 @@ def start_dc(
15111479
db.add(murfey_app)
15121480
db.commit()
15131481
db.close()
1514-
if global_state.get("data_collection_ids") and isinstance(
1515-
global_state["data_collection_ids"], dict
1516-
):
1517-
global_state["data_collection_ids"] = {
1518-
**global_state["data_collection_ids"],
1519-
dc_params.tag: 1,
1520-
}
1521-
else:
1522-
global_state["data_collection_ids"] = {dc_params.tag: 1}
15231482
if dc_params.exposure_time:
15241483
prom.exposure_time.set(dc_params.exposure_time)
15251484
return dc_params
@@ -1529,35 +1488,8 @@ def start_dc(
15291488
def register_proc(
15301489
visit_name, session_id: MurfeySessionID, proc_params: ProcessingJobParameters
15311490
):
1491+
# This should probably do something
15321492
log.info("Registering processing job")
1533-
if global_state.get("processing_job_ids"):
1534-
assert isinstance(global_state["processing_job_ids"], dict)
1535-
global_state["processing_job_ids"] = {
1536-
**{
1537-
k: v
1538-
for k, v in global_state["processing_job_ids"].items()
1539-
if k != proc_params.tag
1540-
},
1541-
proc_params.tag: {
1542-
**global_state["processing_job_ids"].get(proc_params.tag, {}),
1543-
proc_params.recipe: 1,
1544-
},
1545-
}
1546-
else:
1547-
global_state["processing_job_ids"] = {proc_params.tag: {proc_params.recipe: 1}}
1548-
if global_state.get("autoproc_program_ids"):
1549-
assert isinstance(global_state["autoproc_program_ids"], dict)
1550-
global_state["autoproc_program_ids"] = {
1551-
**global_state["autoproc_program_ids"],
1552-
proc_params.tag: {
1553-
**global_state["autoproc_program_ids"].get(proc_params.tag, {}),
1554-
proc_params.recipe: 1,
1555-
},
1556-
}
1557-
else:
1558-
global_state["autoproc_program_ids"] = {
1559-
proc_params.tag: {proc_params.recipe: 1}
1560-
}
15611493
log.info("Processing job registered")
15621494
return proc_params
15631495

src/murfey/server/websocket.py

Lines changed: 9 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import json
55
import logging
66
from datetime import datetime
7-
from typing import Any, Dict, Generic, TypeVar, Union
7+
from typing import Any, Dict, TypeVar, Union
88

99
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
1010
from sqlmodel import select
@@ -13,19 +13,16 @@
1313
from murfey.server.murfey_db import get_murfey_db_session
1414
from murfey.util import sanitise
1515
from murfey.util.db import ClientEnvironment
16-
from murfey.util.state import State, global_state
1716

1817
T = TypeVar("T")
1918

2019
ws = APIRouter(prefix="/ws", tags=["websocket"])
2120
log = logging.getLogger("murfey.server.websocket")
2221

2322

24-
class ConnectionManager(Generic[T]):
25-
def __init__(self, state: State[T]):
23+
class ConnectionManager:
24+
def __init__(self):
2625
self.active_connections: Dict[int | str, WebSocket] = {}
27-
self._state = state
28-
self._state.subscribe(self._broadcast_state_update)
2926

3027
async def connect(
3128
self, websocket: WebSocket, client_id: int | str, register_client: bool = True
@@ -38,7 +35,6 @@ async def connect(
3835
"To register a client the client ID must be an integer"
3936
)
4037
self._register_new_client(client_id)
41-
await websocket.send_json({"message": "state-full", "state": self._state.data})
4238

4339
@staticmethod
4440
def _register_new_client(client_id: int):
@@ -48,9 +44,7 @@ def _register_new_client(client_id: int):
4844
murfey_db.commit()
4945
murfey_db.close()
5046

51-
def disconnect(
52-
self, websocket: WebSocket, client_id: int | str, unregister_client: bool = True
53-
):
47+
def disconnect(self, client_id: int | str, unregister_client: bool = True):
5448
self.active_connections.pop(client_id)
5549
if unregister_client:
5650
murfey_db = next(get_murfey_db_session())
@@ -67,33 +61,14 @@ async def broadcast(self, message: str):
6761
for connection in self.active_connections:
6862
await self.active_connections[connection].send_text(message)
6963

70-
async def _broadcast_state_update(
71-
self, attribute: str, value: T | None, message: str = "state-update"
72-
):
73-
for connection in self.active_connections:
74-
await self.active_connections[connection].send_json(
75-
{"message": message, "attribute": attribute, "value": value}
76-
)
77-
78-
async def set_state(self, attribute: str, value: T):
79-
log.info(
80-
f"State attribute {sanitise(attribute)!r} set to {sanitise(str(value))!r}"
81-
)
82-
await self._state.set(attribute, value)
83-
84-
async def delete_state(self, attribute: str):
85-
log.info(f"State attribute {sanitise(attribute)!r} removed")
86-
await self._state.delete(attribute)
87-
8864

89-
manager = ConnectionManager(global_state)
65+
manager = ConnectionManager()
9066

9167

9268
@ws.websocket("/test/{client_id}")
9369
async def websocket_endpoint(websocket: WebSocket, client_id: int):
9470
await manager.connect(websocket, client_id)
9571
await manager.broadcast(f"Client {client_id} joined")
96-
await manager.set_state(f"Client {client_id}", "joined")
9772
try:
9873
while True:
9974
data = await websocket.receive_text()
@@ -111,9 +86,8 @@ async def websocket_endpoint(websocket: WebSocket, client_id: int):
11186
select(ClientEnvironment).where(ClientEnvironment.client_id == client_id)
11287
).one()
11388
prom.monitoring_switch.labels(visit=client_env.visit).set(0)
114-
manager.disconnect(websocket, client_id)
89+
manager.disconnect(client_id)
11590
await manager.broadcast(f"Client #{client_id} disconnected")
116-
await manager.delete_state(f"Client {client_id}")
11791

11892

11993
@ws.websocket("/connect/{client_id}")
@@ -122,7 +96,6 @@ async def websocket_connection_endpoint(
12296
):
12397
await manager.connect(websocket, client_id, register_client=False)
12498
await manager.broadcast(f"Client {client_id} joined")
125-
await manager.set_state(f"Client {client_id}", "joined")
12699
try:
127100
while True:
128101
data = await websocket.receive_text()
@@ -138,9 +111,8 @@ async def websocket_connection_endpoint(
138111
await manager.broadcast(f"Client #{client_id} sent message {data}")
139112
except WebSocketDisconnect:
140113
log.info(f"Disconnecting Client {sanitise(str(client_id))}")
141-
manager.disconnect(websocket, client_id, unregister_client=False)
114+
manager.disconnect(client_id, unregister_client=False)
142115
await manager.broadcast(f"Client #{client_id} disconnected")
143-
await manager.delete_state(f"Client {client_id}")
144116

145117

146118
async def check_connections(active_connections):
@@ -178,7 +150,7 @@ async def close_ws_connection(client_id: int):
178150
murfey_db.close()
179151
client_id_str = str(client_id).replace("\r\n", "").replace("\n", "")
180152
log.info(f"Disconnecting {client_id_str}")
181-
manager.disconnect(manager.active_connections[client_id], client_id)
153+
manager.disconnect(client_id)
182154
prom.monitoring_switch.labels(visit=visit_name).set(0)
183155
await manager.broadcast(f"Client #{client_id} disconnected")
184156

@@ -187,5 +159,5 @@ async def close_ws_connection(client_id: int):
187159
async def close_unrecorded_ws_connection(client_id: Union[int, str]):
188160
client_id_str = str(client_id).replace("\r\n", "").replace("\n", "")
189161
log.info(f"Disconnecting {client_id_str}")
190-
manager.disconnect(manager.active_connections[client_id], client_id)
162+
manager.disconnect(client_id)
191163
await manager.broadcast(f"Client #{client_id} disconnected")

src/murfey/util/models.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,6 @@ class SessionInfo(BaseModel):
102102
rescale: bool = True
103103

104104

105-
class ContextInfo(BaseModel):
106-
experiment_type: str
107-
acquisition_software: str
108-
109-
110105
class ClientInfo(BaseModel):
111106
id: int
112107

@@ -127,13 +122,6 @@ class RsyncerInfo(BaseModel):
127122
tag: str = ""
128123

129124

130-
class ClearanceKeys(BaseModel):
131-
data_collection_group: List[str]
132-
data_collection: List[str]
133-
processing_job: List[str]
134-
autoproc_program: List[str]
135-
136-
137125
class GainReference(BaseModel):
138126
gain_ref: Path
139127
rescale: bool = True

0 commit comments

Comments
 (0)