|
| 1 | +"""Optimisation demonstration.""" |
| 2 | + |
| 3 | +# fmt: off |
| 4 | +import typing as _t |
| 5 | + |
| 6 | +from plugboard.component import Component, IOController as IO |
| 7 | +from plugboard.process import ProcessBuilder |
| 8 | +from plugboard.schemas import ComponentArgsDict, ProcessSpec, ProcessArgsSpec, ObjectiveSpec |
| 9 | +from plugboard.schemas.tune import FloatParameterSpec |
| 10 | +from plugboard.tune import Tuner |
| 11 | +import math |
| 12 | + |
| 13 | + |
| 14 | +# --8<-- [start:components] |
| 15 | +class Iterator(Component): |
| 16 | + """Creates a sequence of x values.""" |
| 17 | + |
| 18 | + io = IO(outputs=["x"]) |
| 19 | + |
| 20 | + def __init__(self, iters: int, **kwargs: _t.Unpack[ComponentArgsDict]) -> None: |
| 21 | + super().__init__(**kwargs) |
| 22 | + self._iters = iters |
| 23 | + |
| 24 | + async def init(self) -> None: |
| 25 | + self._seq = iter(range(self._iters)) |
| 26 | + |
| 27 | + async def step(self) -> None: |
| 28 | + try: |
| 29 | + self.x = next(self._seq) |
| 30 | + except StopIteration: |
| 31 | + await self.io.close() |
| 32 | + |
| 33 | + |
| 34 | +class Trajectory(Component): |
| 35 | + """Computes the height of a projectile.""" |
| 36 | + |
| 37 | + io = IO(inputs=["x"], outputs=["y"]) |
| 38 | + |
| 39 | + def __init__( |
| 40 | + self, angle: float = 30, velocity: float = 20, **kwargs: _t.Unpack[ComponentArgsDict] |
| 41 | + ) -> None: |
| 42 | + super().__init__(**kwargs) |
| 43 | + self._angle_radians = math.radians(angle) |
| 44 | + self._v0 = velocity |
| 45 | + |
| 46 | + async def step(self) -> None: |
| 47 | + self._logger.info("Calculating trajectory", x=self.x) |
| 48 | + self.y = self.x * math.tan(self._angle_radians) - (9.81 * self.x**2) / ( |
| 49 | + 2 * self._v0**2 * math.cos(self._angle_radians) ** 2 |
| 50 | + ) |
| 51 | + |
| 52 | + |
| 53 | +class MaxHeight(Component): |
| 54 | + """Record the maximum height achieved.""" |
| 55 | + |
| 56 | + io = IO(inputs=["y"], outputs=["max_y"]) |
| 57 | + |
| 58 | + def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None: |
| 59 | + super().__init__(**kwargs) |
| 60 | + self.max_y: float = 0 |
| 61 | + |
| 62 | + async def step(self) -> None: |
| 63 | + self.max_y = max(self.y, self.max_y) |
| 64 | +# --8<-- [end:components] |
| 65 | + |
| 66 | + |
| 67 | +if __name__ == "__main__": |
| 68 | + # --8<-- [start:define_process] |
| 69 | + process_spec = ProcessSpec( |
| 70 | + args=ProcessArgsSpec( |
| 71 | + components=[ |
| 72 | + {"type": "hello_tuner.Iterator", "args": {"name": "horizontal", "iters": 100}}, |
| 73 | + { |
| 74 | + "type": "hello_tuner.Trajectory", |
| 75 | + "args": {"name": "trajectory", "angle": 30, "velocity": 20}, |
| 76 | + }, |
| 77 | + {"type": "hello_tuner.MaxHeight", "args": {"name": "max-height"}}, |
| 78 | + ], |
| 79 | + connectors=[ |
| 80 | + {"source": "horizontal.x", "target": "trajectory.x"}, |
| 81 | + {"source": "trajectory.y", "target": "max-height.y"}, |
| 82 | + ], |
| 83 | + ), |
| 84 | + type="plugboard.process.LocalProcess", |
| 85 | + ) |
| 86 | + # Check that the process spec can be built |
| 87 | + _ = ProcessBuilder.build(spec=process_spec) |
| 88 | + # --8<-- [end:define_process] |
| 89 | + # --8<-- [start:run_tuner] |
| 90 | + tuner = Tuner( |
| 91 | + objective=ObjectiveSpec( # (1)! |
| 92 | + object_type="component", |
| 93 | + object_name="max-height", |
| 94 | + field_type="field", |
| 95 | + field_name="max_y", |
| 96 | + ), |
| 97 | + parameters=[ |
| 98 | + FloatParameterSpec( # (2)! |
| 99 | + object_type="component", |
| 100 | + object_name="trajectory", |
| 101 | + field_type="arg", |
| 102 | + field_name="angle", |
| 103 | + lower=0, |
| 104 | + upper=90, |
| 105 | + ), |
| 106 | + FloatParameterSpec( |
| 107 | + object_type="component", |
| 108 | + object_name="trajectory", |
| 109 | + field_type="arg", |
| 110 | + field_name="velocity", |
| 111 | + lower=0, |
| 112 | + upper=100, |
| 113 | + ), |
| 114 | + ], |
| 115 | + num_samples=40, # (3)! |
| 116 | + max_concurrent=4, # (4)! |
| 117 | + mode="max", # (5)! |
| 118 | + ) |
| 119 | + result = tuner.run(spec=process_spec) |
| 120 | + print( |
| 121 | + f"Best parameters: angle={result.config['trajectory.angle']}, velocity={result.config['trajectory.velocity']}" |
| 122 | + ) |
| 123 | + print(f"Best max height: {result.metrics['max-height.max_y']}") |
| 124 | + # --8<-- [end:run_tuner] |
0 commit comments