Skip to content
95 changes: 93 additions & 2 deletions rocketpy/motors/motor.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
import base64
import re
import tempfile
import warnings
import xml.etree.ElementTree as ET
from abc import ABC, abstractmethod
from functools import cached_property
from os import path
from os import path, remove

import numpy as np

import requests
import logging
from ..mathutils.function import Function, funcify_method
from ..plots.motor_plots import _MotorPlots
from ..prints.motor_prints import _MotorPrints
from ..tools import parallel_axis_theorem_from_com, tuple_handler

logger = logging.getLogger(__name__)


# pylint: disable=too-many-public-methods
class Motor(ABC):
Expand Down Expand Up @@ -1913,6 +1918,92 @@ def load_from_rse_file(
interpolation_method=interpolation_method,
coordinate_system_orientation=coordinate_system_orientation,
)

@staticmethod
def load_from_thrustcurve_api(name: str, **kwargs):
"""
Creates a Motor instance by downloading a .eng file from the ThrustCurve API
based on the given motor name.

Parameters
----------
name : str
The motor name according to the API (e.g., "Cesaroni_M1670" or "M1670").
Both manufacturer-prefixed and shorthand names are commonly used; if multiple
motors match the search, the first result is used.
**kwargs :
Additional arguments passed to the Motor constructor or loader, such as
dry_mass, nozzle_radius, etc.

Returns
-------
instance : GenericMotor
A new GenericMotor instance initialized using the downloaded .eng file.

Raises
------
ValueError
If no motor is found or if the downloaded .eng data is missing.
requests.exceptions.RequestException
If a network or HTTP error occurs during the API call.
"""
base_url = "https://www.thrustcurve.org/api/v1"

# Step 1. Search motor
response = requests.get(f"{base_url}/search.json", params={"commonName": name})
response.raise_for_status()
data = response.json()

if not data.get("results"):
raise ValueError(
f"No motor found for name '{name}'. "
"Please verify the motor name format (e.g., 'Cesaroni_M1670' or 'M1670') and try again."
)

motor_info = data["results"][0]
motor_id = motor_info.get("motorId")
designation = motor_info.get("designation", "").replace("/", "-")
manufacturer = motor_info.get("manufacturer", "")
# Logging the fact that the motor was found
logger.info(f"Motor found: {designation} ({manufacturer})")

# Step 2. Download the .eng file
dl_response = requests.get(
f"{base_url}/download.json",
params={"motorIds": motor_id, "format": "RASP", "data": "file"},
)
dl_response.raise_for_status()
dl_data = dl_response.json()

if not dl_data.get("results"):
raise ValueError(f"No .eng file found for motor '{name}' in the ThrustCurve API.")

data_base64 = dl_data["results"][0].get("data")
if not data_base64:
raise ValueError(f"Downloaded .eng data for motor '{name}' is empty or invalid.")

data_bytes = base64.b64decode(data_base64)

# Step 3. Create the motor from the .eng file
tmp_path = None
try:
# create a temporary file that persists until we explicitly remove it
with tempfile.NamedTemporaryFile(suffix=".eng", delete=False) as tmp_file:
tmp_file.write(data_bytes)
tmp_file.flush()
tmp_path = tmp_file.name


motor_instance = GenericMotor.load_from_eng_file(tmp_path, **kwargs)
return motor_instance
finally:
# Ensuring the temporary file is removed
if tmp_path and path.exists(tmp_path):
try:
remove(tmp_path)
except OSError:
# If cleanup fails, don't raise: we don't want to mask prior exceptions.
pass

def all_info(self):
"""Prints out all data and graphs available about the Motor."""
Expand Down
88 changes: 88 additions & 0 deletions tests/unit/motors/test_genericmotor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import numpy as np
import pytest
import scipy.integrate
import requests
import base64


from rocketpy import Function, Motor

Expand Down Expand Up @@ -211,3 +214,88 @@ def test_load_from_rse_file(generic_motor):
assert thrust_curve[0][1] == 0.0 # First thrust point
assert thrust_curve[-1][0] == 2.2 # Last point of time
assert thrust_curve[-1][1] == 0.0 # Last thrust point

def test_load_from_thrustcurve_api(monkeypatch, generic_motor):
"""
Tests the GenericMotor.load_from_thrustcurve_api method with mocked ThrustCurve API responses.
Parameters
----------
monkeypatch : pytest.MonkeyPatch
The pytest monkeypatch fixture for mocking.
generic_motor : rocketpy.GenericMotor
The GenericMotor object to be used in the tests.

"""

class MockResponse:
def __init__(self, json_data):
self._json_data = json_data

def json(self):
return self._json_data

def raise_for_status(self):
# Simulate a successful HTTP response (200)
return None

# Provide mocked responses for the two endpoints: search.json and download.json
def mock_get(url, params=None):
if "search.json" in url:
# Return a mock search result with a motorId and designation
return MockResponse(
{
"results": [
{
"motorId": "12345",
"designation": "Cesaroni_M1670",
"manufacturer": "Cesaroni",
}
]
}
)
elif "download.json" in url:
# Read the local .eng file and return its base64-encoded content as the API would
eng_path = "data/motors/cesaroni/Cesaroni_M1670.eng"
with open(eng_path, "rb") as f:
encoded = base64.b64encode(f.read()).decode("utf-8")
return MockResponse({"results": [{"data": encoded}]})
else:
raise RuntimeError(f"Unexpected URL called in test mock: {url}")

monkeypatch.setattr(requests, "get", mock_get)

# Expected parameters from the original test
burn_time = (0, 3.9)
dry_mass = 5.231 - 3.101 # 2.130 kg
propellant_initial_mass = 3.101
chamber_radius = 75 / 1000
chamber_height = 757 / 1000
nozzle_radius = chamber_radius * 0.85 # 85% of chamber radius

average_thrust = 1545.218
total_impulse = 6026.350
max_thrust = 2200.0
exhaust_velocity = 1943.357

# Call the method using the class (works if it's a staticmethod); using type(generic_motor)
# ensures test works if the method is invoked on a GenericMotor instance in the project
motor = type(generic_motor).load_from_thrustcurve_api("M1670")

# Assertions (same as original)
assert motor.burn_time == burn_time
assert motor.dry_mass == dry_mass
assert motor.propellant_initial_mass == propellant_initial_mass
assert motor.chamber_radius == chamber_radius
assert motor.chamber_height == chamber_height
assert motor.chamber_position == 0
assert motor.average_thrust == pytest.approx(average_thrust)
assert motor.total_impulse == pytest.approx(total_impulse)
assert motor.exhaust_velocity.average(*burn_time) == pytest.approx(exhaust_velocity)
assert motor.max_thrust == pytest.approx(max_thrust)
assert motor.nozzle_radius == pytest.approx(nozzle_radius)

# testing thrust curve equality against the local .eng import (as in original test)
_, _, points = Motor.import_eng("data/motors/cesaroni/Cesaroni_M1670.eng")
assert motor.thrust.y_array == pytest.approx(
Function(points, "Time (s)", "Thrust (N)", "linear", "zero").y_array
)
Loading