|
| 1 | +import copy |
| 2 | +import logging |
1 | 3 | import os |
2 | 4 |
|
3 | 5 | import jsons |
4 | | -from starlette.testclient import TestClient |
5 | | - |
6 | 6 | from fastapi import FastAPI |
7 | 7 | from pymongo import MongoClient |
8 | 8 |
|
9 | | -from .. import mongodb_url |
10 | | -from ..model.machine import Machine |
11 | | -from ..controller import machine_controller |
12 | | - |
| 9 | +from lat2db import mongodb_url |
| 10 | +from lat2db.controller import machine_controller |
| 11 | +from lat2db.model.beam_position_monitor import BeamPositionMonitor |
| 12 | +from lat2db.model.bending import Bending |
| 13 | +from lat2db.model.cavity import Cavity |
| 14 | +from lat2db.model.drift import Drift |
| 15 | +from lat2db.model.machine import Machine |
| 16 | +from lat2db.model.marker import Marker |
| 17 | +from lat2db.model.quadrupole import Quadrupole |
| 18 | +from lat2db.model.sequencer import Sequencer |
| 19 | +from lat2db.model.sextupole import Sextupole |
| 20 | +from lat2db.model.steerer import Steerer |
13 | 21 |
|
14 | 22 | app = FastAPI() |
15 | 23 | app.include_router(machine_controller.router, tags=["machines"], prefix="/machine") |
16 | 24 | app.mongodb_client = MongoClient(mongodb_url) |
17 | 25 | DB_NAME = os.environ.get("MONGODB_DB", "bessyii") |
18 | 26 | app.database = app.mongodb_client[DB_NAME] |
19 | 27 |
|
20 | | -def set_machine(machine: Machine): |
21 | | -# return machine |
| 28 | +logger = logging.getLogger("tools") |
| 29 | + |
| 30 | + |
| 31 | +def create_machine(lat): |
| 32 | + machine = Machine() |
| 33 | + machine.set_base_parameters( lat ) |
| 34 | + |
| 35 | + # iterate through each row in lat.elements |
| 36 | + for row_ in lat.elements: |
| 37 | + |
| 38 | + # make a copy of the row so that changes don't affect original data |
| 39 | + row = copy.copy(row_) |
| 40 | + |
| 41 | + # print the row |
| 42 | + print(f'{row} ----') |
22 | 43 |
|
| 44 | + # revamp parameters as required for the dataclasses |
| 45 | + row.setdefault("length", row.pop("L", 0e0)) # rename "L" to "length" |
| 46 | + type_name = row['type'] |
| 47 | + # Ensure 'passmethod' and 'tags' keys exist in the row dictionary |
| 48 | + row.setdefault("passmethod", None) |
| 49 | + row.setdefault("tags", None) |
| 50 | + if type_name in ["Bending", "Quadrupole", "Sextupole", "Steerer"]: |
| 51 | + row.setdefault("main_multipole_strength", row.pop("K", 0e0)) # rename "K" to "main_multipole_strength" |
| 52 | + row.setdefault("number_of_integration_steps", |
| 53 | + row.pop("N", 1)) # rename "N" to "number_of_integration_steps" |
| 54 | + row.setdefault("method", row.pop("Method", 4)) # rename "Method" to "method" |
| 55 | + elif type_name == "Cavity": |
| 56 | + row.setdefault("frequency", row.pop("Frequency", 0e0)) # rename "Frequency" to "frequency" |
| 57 | + row.setdefault("voltage", row.pop("Voltage", 0e0)) # rename "Voltage" to "voltage" |
| 58 | + row.setdefault("harmonic_number", |
| 59 | + row.pop("Harmonicnumber", 0e0)) # rename "HarmonicNumber" to "harmonic_number" |
| 60 | + else: |
| 61 | + pass # do nothing if type_name is not recognized |
| 62 | + |
| 63 | + if type_name == "Bending": |
| 64 | + row.setdefault("bending_angle", row.pop("T", 0e0)) # rename "T" to "bending_angle" |
| 65 | + row.setdefault("entry_angle", row.pop("T1", 0e0)) # rename "T1" to "entry_angle" |
| 66 | + row.setdefault("exit_angle", row.pop("T2", 0e0)) # rename "T2" to "exit_angle" |
| 67 | + |
| 68 | + # create a Sequencer instance using the modified row |
| 69 | + sequence_item = Sequencer(**row) |
| 70 | + |
| 71 | + # add the sequence_item to the appropriate list in the machine instance based on its type_name |
| 72 | + type_dict = { |
| 73 | + "Drift": (Drift, machine.add_drift), |
| 74 | + "Marker": (Marker, machine.add_marker), |
| 75 | + "Sextupole": (Sextupole, machine.add_sextupole), |
| 76 | + "Steerer": (Steerer, machine.add_steerer()), |
| 77 | + "Bending": (Bending, machine.add_bending), |
| 78 | + "Quadrupole": (Quadrupole, machine.add_quadrupole), |
| 79 | + "Bpm": (BeamPositionMonitor, machine.add_beam_position_monitor), |
| 80 | + "Cavity": (Cavity, machine.add_cavity), |
| 81 | + } |
| 82 | + |
| 83 | + type_class, type_method = type_dict.get(type_name, (None, None)) |
| 84 | + if type_class is None or type_method is None: |
| 85 | + if type_name in ['Horizontalsteerer','Verticalsteerer']: |
| 86 | + continue #ignore the two steerers |
| 87 | + else: |
| 88 | + raise KeyError(f"Don't know type {type_name}") |
| 89 | + sequence_item.set_properties(row) |
| 90 | + type_instance = type_class(**row) |
| 91 | + machine.add_to_sequence(sequence_item) |
| 92 | + type_method(type_instance) |
| 93 | + |
| 94 | + # return machine |
| 95 | + from starlette.testclient import TestClient |
23 | 96 | with TestClient(app) as client: |
24 | | - response = client.post("/machine/machine", json=jsons.dump(machine.to_dict())) |
| 97 | + response = client.post("/machine/machine", json=jsons.dump(machine)) |
25 | 98 | if response.status_code != 201: |
26 | 99 | raise AssertionError(f"Got response {response}") |
27 | | - |
|
0 commit comments