diff --git a/docs/source/architecture.md b/docs/source/architecture.md index 738b1f7..5cc9702 100644 --- a/docs/source/architecture.md +++ b/docs/source/architecture.md @@ -102,6 +102,7 @@ PyMAD-NG supports a wide range of Python-native and NumPy types, which are autom | `range`, `np.geomspace` | range types | encoded structure | Conversion is handled by {func}`MAD.send` and `{func}`MAD.recv`, both methods on the {class}`MAD` class. + --- ## Dynamic Attributes & Autocompletion diff --git a/src/pymadng/madp_classes.py b/src/pymadng/madp_classes.py index cd131bd..596c476 100644 --- a/src/pymadng/madp_classes.py +++ b/src/pymadng/madp_classes.py @@ -12,7 +12,8 @@ from collections.abc import Iterable # TODO: Are you able to store the actual parent? (jgray 2023) -# TODO: Allow __setitem__ to work with multiple indices (Should be a simple recursive loop) (jgray 2023) +# TODO: Allow __setitem__ to work with multiple indices (jgray 2023) +# TODO: Change __iter__ to use the MAD-NG iterator, ipairs and pairs also (jgray 2025) MADX_methods = ["load", "open_env", "close_env"] @@ -72,10 +73,7 @@ def __sub__(self, rhs): def __truediv__(self, rhs): return self.__generate_operation__(rhs, "/") - def __mod__(self, rhs): - return self.__generate_operation__(rhs, "%") - - def __eq__(self, rhs): + def __eq__(self, rhs) -> bool: if isinstance(rhs, type(self)) and self._name == rhs._name: return True else: @@ -88,21 +86,48 @@ def __generate_operation__(self, rhs, operator: str): ).send(rhs) return rtrn - def __len__(self): + def __len__(self) -> int: return self._mad.protected_variable_retrieval(f"#{self._name}") - def __str__(self): + def __str__(self) -> str: + """ + Convert the MAD-NG reference to a string. + This method retrieves the value of the reference and converts it to a string. + If the value is a high-level MAD reference, it returns its string representation. + Otherwise, it returns the string representation of the value. + + Returns: + str: The string representation of the MAD-NG reference. + """ val = self._mad.recv_vars(self._name) if isinstance(val, high_level_mad_ref): return repr(val) else: return str(val) - def eval(self): + def eval(self) -> Any: + """ + Evaluate the reference and return the value. + + Returns: + Any: The evaluated value of the reference in MAD-NG. + """ return self._mad.recv_vars(self._name) - def __repr__(self): # TODO: This should be better (jgray 2024) - return f"MAD-NG Object(Name: {self._name}, Parent: {self._parent}, Process: {repr(self._mad)})" + def __repr__(self): + """ + Provide a detailed string representation of the MAD-NG object. + + Returns: + str: A string containing the object's name, parent, process, and type. + """ + return ( + f"<{self.__class__.__name__}(" + f"Name: {self._name}, " + f"Parent: {self._parent}, " + f"Process: {repr(self._mad)}, " + f"Type: {type(self).__name__})>" + ) def __dir__(self) -> Iterable[str]: name = self._name @@ -164,6 +189,9 @@ def __call__(self, *args, **kwargs): return last_obj def __iter__(self): + self._mad.send(f"{self._mad.py_name}:send({self._name}:is_instanceOf(sequence))") + is_seq = self._mad.recv() + assert is_seq, "Iteration is only supported for sequences for now" self._iterindex = -1 return self diff --git a/src/pymadng/madp_object.py b/src/pymadng/madp_object.py index 55937d7..7dd9c07 100644 --- a/src/pymadng/madp_object.py +++ b/src/pymadng/madp_object.py @@ -402,20 +402,18 @@ def __getitem__(self, var_name: str) -> Any: # ----------------------------------------------------------------------------------------------# - def eval(self, input: str) -> Any: + def eval(self, expression: str) -> Any: """ Evaluate an expression in the MAD-NG environment. - Assigns the result to a temporary variable and returns its value. - Args: - input (str): The expression to evaluate. + expression (str): The expression to evaluate. Returns: - The evaluated result. + Any: The result of the evaluated expression. """ rtrn = self.__get_mad_reflast() - self.send(f"{rtrn._name} =" + input) + self.send(f"{rtrn._name} = {expression}") return rtrn.eval() def evaluate_in_madx_environment(self, input: str) -> None: diff --git a/src/pymadng/madp_pymad.py b/src/pymadng/madp_pymad.py index 7b86ba6..be493ca 100644 --- a/src/pymadng/madp_pymad.py +++ b/src/pymadng/madp_pymad.py @@ -6,6 +6,7 @@ import struct import subprocess import sys +import threading from pathlib import Path from typing import Any, TYPE_CHECKING, TextIO from contextlib import suppress @@ -85,16 +86,9 @@ def __init__( startupChunk = ( f"MAD.pymad '{py_name}' {{_dbg = {lua_debug_flag}}} :__ini({mad_write})" ) - original_sigint_handler = signal.getsignal(signal.SIGINT) - def delete_process(sig, frame): - self.close() - signal.signal(signal.SIGINT, original_sigint_handler) - raise KeyboardInterrupt("MAD process was interrupted, and has been deleted") - - signal.signal( - signal.SIGINT, delete_process - ) # Delete the process if interrupted + if threading.current_thread() is threading.main_thread(): + self._setup_signal_handler() # Start the process self.process = subprocess.Popen( @@ -150,6 +144,16 @@ def delete_process(sig, frame): self.set_error_handler(True) self.raise_on_madng_error = True + def _setup_signal_handler(self): + original_sigint_handler = signal.getsignal(signal.SIGINT) + + def delete_process(sig, frame): + self.close() + signal.signal(signal.SIGINT, original_sigint_handler) + raise KeyboardInterrupt("MAD process was interrupted, and has been deleted") + + signal.signal(signal.SIGINT, delete_process) + def send_range(self, start: float, stop: float, size: int) -> None: """Send a linear range (numpy array) to MAD-NG. diff --git a/tests/comm_test.py b/tests/comm_test.py deleted file mode 100644 index f055edc..0000000 --- a/tests/comm_test.py +++ /dev/null @@ -1,456 +0,0 @@ -from pathlib import Path -import time -import unittest - -import numpy as np - -from pymadng import MAD - -inputs_folder = Path(__file__).parent / "inputs" -# TODO: Test the following functions: -# - eval -# - error on stdout = something strange - -class TestExecution(unittest.TestCase): - - def test_recv_and_exec(self): - with MAD() as mad: - mad.send("""py:send([==[mad.send('''py:send([=[mad.send("py:send([[a = 100/2]])")]=])''')]==])""") - mad.recv_and_exec() - mad.recv_and_exec() - a = mad.recv_and_exec()["a"] - self.assertEqual(a, 50) - - def test_err(self): - with MAD(stdout="/dev/null", redirect_sterr=True) as mad: - mad.send("py:__err(true)") - mad.send("1+1") #Load error - self.assertRaises(RuntimeError, mad.recv) - mad.send("py:__err(true)") - mad.send("print(nil/2)") #Runtime error - self.assertRaises(RuntimeError, mad.recv) - -class TestStrings(unittest.TestCase): - - def test_recv(self): - with MAD() as mad: - mad.send("py:send('hi')") - mad.send("""py:send([[Multiline string should work - -Like So.]])""") - self.assertEqual(mad.recv(), 'hi') - self.assertEqual(mad.receive(), 'Multiline string should work\n\nLike So.') - - def test_send(self): - with MAD() as mad: - initString = "asdfghjkl;" - mad.send("str = py:recv(); py:send(str .. str)") - mad.send(initString) - self.assertEqual(mad.recv(), initString * 2) - mad.send("str2 = py:recv(); py:send(str2 .. str2)") - initString = """Py Multiline string should work - -Like So.]])""" - mad.send(initString) - self.assertEqual(mad.recv(), initString * 2) - - def test_protected_send(self): - with MAD(stdout="/dev/null", redirect_sterr=True, raise_on_madng_error=False) as mad: - mad.send("py:send('hello world'); a = nil/2") - self.assertEqual(mad.recv(), "hello world") # python should not crash - mad.send("py:send(1)") - self.assertEqual(mad.recv(), 1) # Check that the error did not affect the pipe - - mad.protected_send("a = nil/2") - self.assertRaises(RuntimeError, mad.recv) # python should receive an error - - mad.psend("a = nil/2") - self.assertRaises(RuntimeError, mad.recv) - - -class TestNil(unittest.TestCase): - - def test_send_recv(self): - with MAD() as mad: - mad.send(""" - local myNil = py:recv() - py:send(myNil) - py:send(nil) - py:send() - """) - mad.send(None) - self.assertIsNone(mad.recv()) - self.assertIsNone(mad.recv()) - self.assertIsNone(mad.recv()) - -class TestList(unittest.TestCase): - - def test_send_recv(self): - with MAD() as mad: - myList = [[1, 2, 3, 4, 5, 6, 7, 8, 9]] * 2 - mad.send(""" - local list = py:recv() - list[1][1] = 10 - list[2][1] = 10 - py:send(list) - """) - mad.send(myList) - myList[0][0] = 10 - myList[1][0] = 10 - self.assertEqual(mad.recv(), myList) - - def test_send_recv_wref(self): - with MAD() as mad: - mad.send(""" - list = {MAD.object "a" {a = 2}, MAD.object "b" {b = 2}} - list2 = {1, 2, 3, 4, 5, a = 10, b = 3, c = 4} - py:send(list) - py:send(list2) - """) - list1 = mad.recv("list") - list2 = mad.recv("list2") - self.assertEqual(len(list1), 2) - self.assertEqual([x for x in list2], [1, 2, 3, 4, 5]) - self.assertEqual(list2["a"], 10) - self.assertEqual(list2["b"], 3) - self.assertEqual(list2["c"], 4) - self.assertEqual(list1[0].a, 2) - self.assertEqual(list1[1].b, 2) - - -class TestNums(unittest.TestCase): - - eps = 2**-52 - tiny = 2**-1022 - huge = 2**1023 - flt_lst = [0, tiny, 2**-64, 2**-63, 2**-53, eps, 2**-52, 2*eps, 2**-32, 2**-31, 1e-9, - 0.1-eps, 0.1, 0.1+eps, 0.5, 0.7-eps, 0.7, 0.7+eps, 1-eps, 1, 1+eps, - 1.1, 1.7, 2, 10, 1e2, 1e3, 1e6, 1e9, 2**31, 2**32, 2**52, 2**53, - 2**63, 2**64, huge] - - def test_send_recv_int(self): - with MAD() as mad: - int_lst = [0, 1, 2, 10, 1e2, 1e3, 1e6, 1e9, 2**31-1] - for i in range(len(int_lst)): - mad.send(""" - local is_integer in MAD.typeid - local num = py:recv() - py:send( num) - py:send(-num) - py:send(is_integer(num)) - """) - mad.send(int_lst[i]) - recv_num = mad.recv() - self.assertEqual(recv_num, int_lst[i]) - self.assertTrue(isinstance(recv_num, np.int32)) - recv_num = mad.recv() - self.assertEqual(recv_num, -int_lst[i]) - self.assertTrue(isinstance(recv_num, np.int32)) - self.assertTrue(mad.recv()) - - def test_send_recv_num(self): - with MAD() as mad: - for i in range(len(self.flt_lst)): - mad.send(""" - local num = py:recv() - local negative = py:recv() - py:send(num) - py:send(negative) - py:send(num * 1.61) - """) - mad.send(self.flt_lst[i]) - mad.send(-self.flt_lst[i]) - self.assertEqual(mad.recv(), self.flt_lst[i]) #Check individual floats - self.assertEqual(mad.recv(), -self.flt_lst[i]) #Check negation - self.assertEqual(mad.recv(), self.flt_lst[i] * 1.61) #Check manipulation - - def test_send_recv_cpx(self): - with MAD() as mad: - for i in range(len(self.flt_lst)): - for j in range(len(self.flt_lst)): - mad.send(""" - local my_cpx = py:recv() - py:send(my_cpx) - py:send(-my_cpx) - py:send(my_cpx * 1.31i) - """) - my_cpx = self.flt_lst[i] + 1j * self.flt_lst[j] - mad.send(my_cpx) - self.assertEqual(mad.recv(), my_cpx) - self.assertEqual(mad.recv(), -my_cpx) - self.assertEqual(mad.recv(), my_cpx * 1.31j) - -class TestMatrices(unittest.TestCase): - - def test_send_recv_imat(self): - with MAD() as mad: - mad.send(""" - local imat = py:recv() - py:send(imat) - py:send(MAD.imatrix(3, 5):seq()) - """) - imat = np.random.randint(0, 255, (5, 5), dtype=np.int32) - mad.send(imat) - self.assertTrue(np.all(mad.recv() == imat)) - self.assertTrue(np.all(mad.recv() == np.arange(1, 16).reshape(3, 5))) - - def test_send_recv_mat(self): - with MAD() as mad: - mad.send(""" - local mat = py:recv() - py:send(mat) - py:send(MAD.matrix(3, 5):seq() / 2) - """) - mat = np.arange(1, 25).reshape(4, 6) / 4 - mad.send(mat) - self.assertTrue(np.all(mad.recv() == mat)) - self.assertTrue(np.all(mad.recv() == np.arange(1, 16).reshape(3, 5) / 2)) - - def test_send_recv_cmat(self): - with MAD() as mad: - mad.send(""" - local cmat = py:recv() - py:send(cmat) - py:send(MAD.cmatrix(3, 5):seq() / 2i) - """) - cmat = np.arange(1, 25).reshape(4, 6) / 4 + 1j * np.arange(1, 25).reshape(4, 6) / 4 - mad.send(cmat) - self.assertTrue(np.all(mad.recv() == cmat)) - self.assertTrue(np.all(mad.recv() == (np.arange(1, 16).reshape(3, 5) / 2j))) - -class TestRngs(unittest.TestCase): - - def test_recv(self): - with MAD() as mad: - mad.send(""" - irng = MAD.range(3, 11, 2) - rng = MAD.nrange(3.5, 21.4, 12) - lrng = MAD.nlogrange(1, 20, 20) - py:send(irng) - py:send(rng) - py:send(lrng) - py:send(irng:totable()) - py:send(rng:totable()) - py:send(lrng:totable()) - """) - self.assertEqual(mad.recv(), range(3 , 12 , 2)) #MAD is inclusive, python is exclusive (on stop) - self.assertTrue (np.allclose(mad.recv(), np.linspace(3.5, 21.4, 12))) - self.assertTrue (np.allclose(mad.recv(), np.geomspace(1, 20, 20))) - self.assertEqual(mad.recv(), list(range(3, 12, 2))) #MAD is inclusive, python is exclusive (on stop) - self.assertTrue (np.allclose(mad.recv(), np.linspace(3.5, 21.4, 12))) - self.assertTrue (np.allclose(mad.recv(), np.geomspace(1, 20, 20))) - - def test_send(self): - with MAD() as mad: - mad.send(""" - irng = py:recv() + 1 - rng = py:recv() + 2 - lrng = py:recv() - py:send(irng:totable()) - py:send(rng:totable()) - py:send(lrng:totable()) - """) - mad.send(range(3, 10, 1)) - mad.send_range(3.5, 21.4, 14) - mad.send_logrange(1, 20, 20) - self.assertEqual(mad.recv(), list(range(4, 12, 1))) - self.assertTrue (np.allclose(mad.recv(), np.linspace(5.5, 23.4, 14))) - self.assertTrue (np.allclose(mad.recv(), np.geomspace(1, 20, 20))) - -class TestBool(unittest.TestCase): - - def test_send_recv(self): - with MAD() as mad: - mad.send(""" - bool1 = py:recv() - bool2 = py:recv() - py:send(not bool1) - py:send(not bool2) - """) - mad.send(True ) - mad.send(False) - self.assertEqual(mad.recv(), False) - self.assertEqual(mad.recv(), True) - -class TestMono(unittest.TestCase): - - def test_recv(self): - with MAD() as mad: - mad.send(""" - local m = MAD.monomial({1, 2, 3, 4, 6, 20, 100}) - py:send(m) - local m = MAD.monomial("WZ346oy") - py:send(m) - """) - - self.assertTrue(np.all(mad.recv() == [1, 2, 3, 4, 6, 20, 100])) - self.assertTrue(np.all(mad.recv() == [32, 35, 3, 4, 6, 50, 60])) - - def test_send_recv(self): - with MAD() as mad: - mad.send(""" - local m1 = py:recv() - local m2 = py:recv() - py:send(m1 + m2) - """) - pym1 = np.random.randint(0, 255, 20, dtype=np.ubyte) - pym2 = np.random.randint(0, 255, 20, dtype=np.ubyte) - mad.send(pym1) - mad.send(pym2) - mad_res = mad.recv() - self.assertTrue(np.all(mad_res == pym1+pym2)) - #Check the return is a monomial - self.assertEqual(mad_res.dtype, np.dtype("ubyte")) - -class TestTPSA(unittest.TestCase): - - def test_recv_real(self): - with MAD() as mad: - mad.send(""" - local d = MAD.gtpsad(3, 6) - res = MAD.tpsa(6):set(1,2):set(2, 1) - res2 = res:copy():set(3, 1) - res3 = res2:copy():set(4, 1) - py:send(res:axypbzpc(res2, res3, 1, 2, 3)) - """) - monomials, coefficients = mad.recv() - self.assertTrue(np.all(monomials[0] == [0, 0, 0])) - self.assertTrue(np.all(monomials[1] == [1, 0, 0])) - self.assertTrue(np.all(monomials[2] == [0, 1, 0])) - self.assertTrue(np.all(monomials[3] == [0, 0, 1])) - self.assertTrue(np.all(monomials[4] == [2, 0, 0])) - self.assertTrue(np.all(monomials[5] == [1, 1, 0])) - self.assertTrue(np.all(coefficients == [11, 6, 4, 2, 1, 1])) - - def test_recv_cpx(self): - with MAD() as mad: - mad.send(""" - local d = MAD.gtpsad(3, 6) - res = MAD.ctpsa(6):set(1,2+1i):set(2, 1+2i) - res2 = res:copy():set(3, 1+2i) - res3 = res2:copy():set(4, 1+2i) - py:send(res:axypbzpc(res2, res3, 1, 2, 3)) - """) - monomials, coefficients = mad.recv() - self.assertTrue(np.all(monomials[0] == [0, 0, 0])) - self.assertTrue(np.all(monomials[1] == [1, 0, 0])) - self.assertTrue(np.all(monomials[2] == [0, 1, 0])) - self.assertTrue(np.all(monomials[3] == [0, 0, 1])) - self.assertTrue(np.all(monomials[4] == [2, 0, 0])) - self.assertTrue(np.all(monomials[5] == [1, 1, 0])) - self.assertTrue(np.all(coefficients == [10+6j, 2+14j, 2+9j, 2+4j, -3+4j, -3+4j])) - - def test_send_tpsa(self): - with MAD() as mad: - mad.send(""" - tab = py:recv() - py:send(tab) - """) - monos = np.asarray([[0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1], [2, 0, 0], [1, 1, 0]], dtype=np.uint8) - coefficients = [11, 6, 4, 2, 1, 1] - mad.send_tpsa(monos, coefficients) - self.assertTrue(mad.recv("tab"), ["000", "100", "010", "001", "200", "110"].extend(coefficients)) #intentional? - - def test_send_ctpsa(self): - with MAD() as mad: - mad.send(""" - tab = py:recv() - py:send(tab) - """) - monos = np.asarray([[0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1], [2, 0, 0], [1, 1, 0]], dtype=np.uint8) - coefficients = [10+6j, 2+14j, 2+9j, 2+4j, -3+4j, -3+4j] - mad.send_cpx_tpsa(monos, coefficients) - self.assertTrue(mad.recv("tab"), ["000", "100", "010", "001", "200", "110"].extend(coefficients)) #intentional? - - def test_send_recv_damap(self): - with MAD() as mad: - mad.send(""" - py:__err(true) - local sin in MAD.gmath - MAD.gtpsad(6, 5) - local M = MAD.damap {xy = 5} - M[1] = 1 ; M[3] = 2 ; M[5] = 3 - M[2] = 2 ; M[4] = 1 ; M[6] = 1 - res = sin(M[1]) * sin(M[3]) - py:send(res) - recved = MAD.tpsa():fromtable(py:recv()) - py:send(recved) - """) - init = mad.recv() - mad.send_tpsa(*init) - final = mad.recv() - self.assertTrue((init[0] == final[0]).all()) - self.assertTrue((init[1] == final[1]).all()) - - -class TestOutput(unittest.TestCase): - - def test_print(self): - with MAD() as mad: - mad.send("py:send('hello world')") - self.assertEqual(mad.recv(), "hello world") # Check printing does not affect pipe - -class TestDebug(unittest.TestCase): - test_log1 = inputs_folder/"test.log" - def test_logfile(self): - example_log = inputs_folder/"example.log" - test_log2 = inputs_folder/"test2.log" - - with MAD(stdout=self.test_log1, debug=True, raise_on_madng_error=False) as mad: - pass - time.sleep(0.1) #Wait for file to be written - with open(self.test_log1, "r") as f: - with open(example_log, "r") as f2: - self.assertEqual(f.read(), f2.read()) - - with MAD(stdout=test_log2, debug=True) as mad: - mad.send("!This is a line that does nothing") - mad.send("print('hello world')") - - with open(test_log2) as f: - text = f.read() - self.assertTrue("[!This is a line that does nothing]" in text) - self.assertTrue("hello world\n" in text) - self.assertTrue("[print('hello world')]" in text) - - with MAD(stdout=test_log2, debug=False) as mad: - mad.send("!This is a line that does nothing") - mad.send("print('hello world')") - - with open(test_log2) as f: - self.assertEqual(f.read(), "hello world\n") - - self.test_log1.unlink() - test_log2.unlink() - - def test_err(self): - # Run debug without stderr redirection - with open(self.test_log1, "w") as f: - with MAD(debug=True, stdout=f, raise_on_madng_error=False) as mad: - mad.psend("a = nil/2") - # receive the error before closing the pipe - self.assertRaises(RuntimeError, mad.recv) - with open(self.test_log1) as f: - # Check command was sent - file_text = f.read() - self.assertTrue("[py:__err(true); a = nil/2; py:__err(false);]" in file_text) - # Check error was not in stdout - self.assertFalse("***pymad.run:" in file_text) - - # Run debug with stderr redirection - with MAD(stdout=self.test_log1, redirect_sterr=True) as mad: - mad.psend("a = nil/2") - # receive the error before closing the pipe - self.assertRaises(RuntimeError, mad.recv) - with open(self.test_log1) as f: - # Check command was sent - file_text = f.read() - # Check error was in stdout - self.assertEqual("***pymad.run:", file_text[:13]) - self.test_log1.unlink() - - - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/obj_test.py b/tests/obj_test.py deleted file mode 100644 index e84d70b..0000000 --- a/tests/obj_test.py +++ /dev/null @@ -1,460 +0,0 @@ -import os -import sys -import unittest - -import numpy as np -import pandas -import tfs - -from pymadng import MAD -from pymadng.madp_classes import high_level_mad_ref - -# TODO: Test the following functions: -# - evaluate_in_madx_environment -# - send_vars -# - recv_vars -# - quote_strings -# - create_deferred_expression -# - dir (on mad object or last) -# - globals -# - history -# - % (mod) -# - __str__ on mad references (low priority) - -# TODO: Test iterate through an object - -class TestLoad(unittest.TestCase): - a = np.arange(1, 21).reshape(4, 5) - b = np.arange(1, 7).reshape(2, 3) - c = (np.arange(1, 16) + 1j).reshape(5, 3) - - def test_load(self): - with MAD() as mad: - mad.load("MAD", "matrix") - self.assertTrue(mad.send("py:send(matrix == MAD.matrix)").recv()) - - mad.load("MAD.gmath") - self.assertTrue(mad.send("py:send(sin == MAD.gmath.sin)").recv()) - self.assertTrue(mad.send("py:send(cos == MAD.gmath.cos)").recv()) - self.assertTrue(mad.send("py:send(tan == MAD.gmath.tan)").recv()) - - mad.load("MAD.element", "quadrupole", "sextupole", "drift") - self.assertTrue( - mad.send("py:send(quadrupole == MAD.element.quadrupole)").recv() - ) - self.assertTrue( - mad.send("py:send(sextupole == MAD.element.sextupole )").recv() - ) - self.assertTrue( - mad.send("py:send(drift == MAD.element.drift )").recv() - ) - - def test_run_file(self): - with open("test.mad", "w") as f: - f.write(""" - local matrix, cmatrix in MAD - a = matrix(4, 5):seq() - b = cmatrix(2, 3):seq() - """) - with MAD(stdout="/dev/null", redirect_sterr=True) as mad: - mad.loadfile("test.mad") - self.assertIsNone(mad.matrix) - self.assertTrue(np.all(mad.a == self.a)) - self.assertTrue(np.all(mad.b == self.b)) - os.remove("test.mad") - - def test_load_file(self): - with open("test.mad", "w") as f: - f.write(""" - local matrix, cmatrix in MAD - local a = matrix(4, 5):seq() - local c = cmatrix(5, 3):seq() + 1i - return {res1 = a * c, res2 = a * c:conj()} - """) - with MAD() as mad: - mad.loadfile("test.mad", "res1", "res2") - self.assertTrue(np.all(mad.res1 == np.matmul(self.a, self.c))) - self.assertTrue(np.all(mad.res2 == np.matmul(self.a, self.c.conj()))) - os.remove("test.mad") - - -class TestGetSet(unittest.TestCase): - def test_get(self): - with MAD(stdout="/dev/null", redirect_sterr=True) as mad: - mad.load("element", "quadrupole") - self.assertEqual(mad.asdfg, None) - mad.send("""qd = quadrupole {knl={0, 0.25}, l = 1}""") - mad.send("""qf = quadrupole {qd = qd}""") - qd, qf = mad["qd", "qf"] - self.assertEqual(qd._name, "qd") - self.assertEqual(qd._parent, None) - self.assertEqual(qd._mad, mad._MAD__process) - self.assertEqual(qd.knl, [0, 0.25]) - self.assertEqual(qd.l, 1) - self.assertRaises(AttributeError, lambda: qd.asdfg) - self.assertRaises(KeyError, lambda: qd["asdfg"]) - self.assertRaises(IndexError, lambda: qd[1]) - self.assertTrue(isinstance(qf.qd, high_level_mad_ref)) - self.assertEqual(qf.qd.knl, [0, 0.25]) - self.assertEqual(qf.qd.l, 1) - self.assertEqual(qf.qd, qd) - - mad.send("objList = {qd, qf, qd, qf, qd} py:send(objList)") - objList = mad.recv("objList") - for i in range(len(objList)): - if i % 2 != 0: - self.assertTrue(isinstance(objList[i].qd, high_level_mad_ref)) - self.assertEqual(objList[i].qd._parent, f"objList[{i+1}]") - self.assertEqual(objList[i].qd.knl, [0, 0.25]) - self.assertEqual(objList[i].qd.l, 1) - self.assertEqual(objList[i].qd, qd) - - else: - self.assertEqual(objList[i].knl, [0, 0.25]) - self.assertEqual(objList[i].l, 1) - self.assertEqual(objList[i], qd) - self.assertEqual(objList[i]._parent, "objList") - - def test_set(self): # Need more? - with MAD() as mad: - mad.load("element", "quadrupole") - mad.send("""qd = quadrupole {knl={0, 0.25}, l = 1} py:send(qd)""") - mad["qd2"] = mad.recv("qd") - self.assertEqual(mad.qd2._name, "qd2") - self.assertEqual(mad.qd2._parent, None) - self.assertEqual(mad.qd2._mad, mad._MAD__process) - self.assertEqual(mad.qd2.knl, [0, 0.25]) - self.assertEqual(mad.qd2.l, 1) - self.assertEqual(mad.qd2, mad.qd) - mad["a", "b"] = mad.MAD.gmath.reim(9.75 + 1.5j) - self.assertEqual(mad.a, 9.75) - self.assertEqual(mad.b, 1.5) - mad.send("f = \\-> (1, 2, 3, 9, 8, 7)") - mad["r1", "r2", "r3", "r4", "r5", "r6"] = mad.f() - self.assertEqual(mad.r1, 1) - self.assertEqual(mad.r2, 2) - self.assertEqual(mad.r3, 3) - self.assertEqual(mad.r4, 9) - self.assertEqual(mad.r5, 8) - self.assertEqual(mad.r6, 7) - - -class TestObjFun(unittest.TestCase): - def test_call_obj(self): - with MAD() as mad: - mad.load("element", "quadrupole", "sextupole") - qd = mad.quadrupole(knl=[0, 0.25], l=1) - sd = mad.sextupole(knl=[0, 0.25, 0.5], l=1) - - mad["qd"] = qd - self.assertEqual(mad.qd._name, "qd") - self.assertEqual(mad.qd._parent, None) - self.assertEqual(mad.qd._mad, mad._MAD__process) - self.assertEqual(mad.qd.knl, [0, 0.25]) - self.assertEqual(mad.qd.l, 1) - - sdc = sd - mad["sd"] = sd - del sd - self.assertEqual(mad.sd._name, "sd") - self.assertEqual(mad.sd._parent, None) - self.assertEqual(mad.sd._mad, mad._MAD__process) - self.assertEqual(mad.sd.knl, [0, 0.25, 0.5]) - self.assertEqual(mad.sd.l, 1) - - # Reference counting - qd = mad.quadrupole(knl=[0, 0.3], l=1) - self.assertEqual(sdc._name, "_last[2]") - self.assertEqual(qd._name, "_last[3]") - self.assertEqual(qd.knl, [0, 0.3]) - qd = mad.quadrupole(knl=[0, 0.25], l=1) - self.assertEqual(qd._name, "_last[1]") - - def test_call_last(self): - with MAD() as mad: - mad.send("func_test = \\a-> \\b-> \\c-> a+b*c") - self.assertRaises(TypeError, lambda: mad.MAD()) - self.assertEqual(mad.func_test(1)(2)(3), 7) - - def test_call_fail(self): - with MAD(stdout="/dev/null", redirect_sterr=True) as mad: - mad.send("func_test = \\a-> \\b-> \\c-> 'a'+b") - mad.func_test(1)(2)(3) - self.assertRaises(RuntimeError, lambda: mad.recv()) - self.assertRaises( - RuntimeError, lambda: mad.mtable.read("'abad.tfs'").eval() - ) - - def test_call_func(self): - with MAD() as mad: - mad.load("element", "quadrupole") - mad["qd"] = mad.quadrupole(knl=[0, 0.25], l=1) - mad.qd.select() - mad["qdSelected"] = mad.qd.is_selected() - self.assertTrue(mad.qdSelected) - mad.qd.deselect() - mad["qdSelected"] = mad.qd.is_selected() - self.assertFalse(mad.qdSelected) - mad.qd.set_variables({"l": 2}) - self.assertEqual(mad.qd.l, 2) - - def test_mult_rtrn(self): - with MAD() as mad: - mad.send(""" - obj = MAD.object "obj" {a = 1, b = 2} - local function mult_rtrn () - obj2 = MAD.object "obj2" {a = 2, b = 3} - return obj, obj, obj, obj2 - end - last_rtn = __mklast__(mult_rtrn()) - lastobj = __mklast__(obj) - notLast = {mult_rtrn()} - """) - - mad["o11", "o12", "o13", "o2"] = mad._MAD__get_mad_ref("last_rtn") - mad["p11", "p12", "p13", "p2"] = mad._MAD__get_mad_ref("notLast") - mad["objCpy"] = mad._MAD__get_mad_ref( - "lastobj" - ) # Test single object in __mklast__ - self.assertEqual(mad.o11.a, 1) - self.assertEqual(mad.o11.b, 2) - self.assertEqual(mad.o12.a, 1) - self.assertEqual(mad.o12.b, 2) - self.assertEqual(mad.o13.a, 1) - self.assertEqual(mad.o13.b, 2) - self.assertEqual(mad.o2.a, 2) - self.assertEqual(mad.o2.b, 3) - self.assertEqual(mad.p11.a, 1) - self.assertEqual(mad.p11.b, 2) - self.assertEqual(mad.p12.a, 1) - self.assertEqual(mad.p12.b, 2) - self.assertEqual(mad.p13.a, 1) - self.assertEqual(mad.p13.b, 2) - self.assertEqual(mad.p2.a, 2) - self.assertEqual(mad.p2.b, 3) - self.assertEqual(mad.objCpy, mad.obj) - - def test_MADX(self): - from pymadng import MAD - - mad = MAD() - self.assertEqual(mad.MADX.abs(-1), 1) - self.assertEqual(mad.MADX.ceil(1.2), 2) - - self.assertEqual(mad.MAD.MADX.abs(-1), 1) - self.assertEqual(mad.MAD.MADX.ceil(1.2), 2) - - self.assertEqual(mad.MADX.MAD.MADX.abs(-1), 1) - self.assertEqual(mad.MADX.MAD.MADX.ceil(1.2), 2) - - with open("test.seq", "w") as f: - f.write(""" - qd: quadrupole, l=1, knl:={0, 0.25}; - """) - mad.MADX.load("'test.seq'") - self.assertEqual(mad.MADX.qd.l, 1) - self.assertEqual(mad.MADX.qd.knl, [0, 0.25]) - os.remove("test.seq") - - -class TestOps(unittest.TestCase): - def test_matrix(self): - with MAD() as mad: - mad.load("MAD", "matrix") - pyMat = np.arange(1, 101).reshape((10, 10)) - - mad["mat"] = mad.matrix(10).seq(2) + 2 - self.assertTrue(np.all(mad.mat == pyMat + 4)) - - mad["mat"] = mad.matrix(10).seq() / 3 - self.assertTrue(np.allclose(mad.mat, pyMat / 3)) - - mad["mat"] = mad.matrix(10).seq() * 4 - self.assertTrue(np.all(mad.mat == pyMat * 4)) - - mad["mat"] = mad.matrix(10).seq() ** 3 - self.assertTrue(np.all(mad.mat == np.linalg.matrix_power(pyMat, 3))) - - mad["mat"] = mad.matrix(10).seq() + 2 / 3 * 4**3 # bidmas - self.assertTrue(np.all(mad.mat == pyMat + 2 / 3 * 4**3)) - - # conversions - self.assertTrue( - np.all(np.array(mad.MAD.matrix(10).seq()) == np.arange(1, 101)) - ) - self.assertTrue(np.all(list(mad.MAD.matrix(10).seq()) == np.arange(1, 101))) - self.assertTrue(np.all(mad.MAD.matrix(10).seq().eval() == pyMat)) - self.assertEqual(np.sin(1), mad.math.sin(1).eval()) - self.assertAlmostEqual( - np.cos(0.5), mad.math.cos(0.5).eval(), None, None, 4e-16 - ) - - # temp vars - res = ( - ( - (mad.matrix(3).seq().emul(2) * mad.matrix(3).seq(3) + 3) * 2 - + mad.matrix(3).seq(2) - ) - - mad.matrix(3).seq(4) - ).eval() - np_mat = np.arange(9).reshape((3, 3)) + 1 - exp = ((np.matmul((np_mat * 2), (np_mat + 3)) + 3) * 2 + (np_mat + 2)) - ( - np_mat + 4 - ) - self.assertTrue(np.all(exp == res)) - - -class TestArgsAndKwargs(unittest.TestCase): - def test_args(self): - with MAD() as mad: - mad.load("MAD", "matrix", "cmatrix") - mad["m1"] = mad.matrix(3).seq() - mad["m2"] = mad.matrix(3).eye(2).mul(mad.m1) - mad["m3"] = mad.matrix(3).seq().emul(mad.m1) - mad["cm1"] = mad.cmatrix(3).seq(1j).map([1, 5, 9], "\\mn, n -> mn - 1i") - self.assertTrue(np.all(mad.m2 == mad.m1 * 2)) - self.assertTrue(np.all(mad.m3 == (mad.m1 * mad.m1))) - self.assertTrue(np.all(mad.cm1 == mad.m1 + 1j - np.eye(3) * 1j)) - # Add bool - - def test_kwargs(self): - with MAD() as mad: - mad.load("element", "sextupole") - mad["m1"] = mad.MAD.matrix(3).seq() - sd = mad.sextupole( - knl=[0, 0.25j, 1 + 1j], - l=1, - alist=[1, 2, 3, 5], - abool=True, - opposite=False, - mat=mad.m1, - ) - self.assertEqual(sd.knl, [0, 0.25j, 1 + 1j]) - self.assertEqual(sd.l, 1) - self.assertEqual(sd.alist, [1, 2, 3, 5]) - self.assertEqual(sd.abool, True) - self.assertEqual(sd.opposite, False) - self.assertTrue(np.all(sd.mat == np.arange(9).reshape((3, 3)) + 1)) - - -class TestDir(unittest.TestCase): - def test_dir(self): - with MAD() as mad: - mad.load("MAD", "gfunc", "element", "object") - mad.load("element", "quadrupole") - mad.load("gfunc", "functor") - obj_dir = dir(mad.object) - mad.send("my_obj = object {a1 = 2, a2 = functor(\\s->s.a1), a3 = \\s->s.a1}") - obj_exp = sorted(["a1", "a2()", "a3"] + obj_dir) - self.assertEqual(dir(mad.my_obj), obj_exp) - self.assertEqual(mad.my_obj.a1, mad.my_obj.a3) - - quad_exp = dir(mad.quadrupole) - self.assertEqual( - dir(mad.quadrupole(knl=[0, 0.3], l=1)), quad_exp - ) # Dir of instance of class should be the same as the class - self.assertEqual( - dir(mad.quadrupole(asd=10, qwe=20)), sorted(quad_exp + ["asd", "qwe"]) - ) # Adding to the instance should change the dir - - -class TestDataFrame(unittest.TestCase): - def generalDataFrame(self, headers, DataFrame): - mad = MAD() - mad.send(""" -test = mtable{ - {"string"}, "number", "integer", "complex", "boolean", "list", "table", "range",! "generator", - name = "test", - header = {"string", "number", "integer", "complex", "boolean", "list", "table", "range"}, - string = "string", - number = 1.234567890, - integer = 12345670, - complex = 1.3 + 1.2i, - boolean = true, - list = {1, 2, 3, 4, 5}, - table = {1, 2, ["key"] = "value"}, - range = MAD.range(1, 11), -} - + {"a", 1.1, 1, 1 + 2i, true , {1, 2 }, {1 , 2 , ["3" ] = 3 }, MAD.range(1, 11),} - + {"b", 2.2, 2, 2 + 3i, false, {3, 4 }, {4 , 5 , ["6" ] = 6 }, MAD.range(2, 12),} - + {"c", 3.3, 3, 3 + 4i, true , {5, 6 }, {7 , 8 , ["9" ] = 9 }, MAD.range(3, 13),} - + {"d", 4.4, 4, 4 + 5i, false, {7, 8 }, {10, 11, ["12"] = 12}, MAD.range(4, 14),} - + {"e", 5.5, 5, 5 + 6i, true , {9, 10}, {13, 14, ["15"] = 15}, MAD.range(5, 15),} - -test:addcol("generator", \\ri, m -> m:getcol("number")[ri] + 1i * m:getcol("number")[ri]) -test:write("test") - """) - df = mad.test.to_df() - self.assertTrue(isinstance(df, DataFrame)) - header = getattr(df, headers) - self.assertEqual(header["name"], "test") - self.assertEqual(header["string"], "string") - self.assertEqual(header["number"], 1.234567890) - self.assertEqual(header["integer"], 12345670) - self.assertEqual(header["complex"], 1.3 + 1.2j) - self.assertEqual(header["boolean"], True) - self.assertEqual(header["list"], [1, 2, 3, 4, 5]) - tbl = getattr(df, headers)["table"] - self.assertEqual([x for x in tbl], [1, 2]) - self.assertEqual(tbl["key"], "value") - - self.assertEqual(df["string"].tolist(), ["a", "b", "c", "d", "e"]) - self.assertEqual(df["number"].tolist(), [1.1, 2.2, 3.3, 4.4, 5.5]) - self.assertEqual(df["integer"].tolist(), [1, 2, 3, 4, 5]) - self.assertEqual( - df["complex"].tolist(), [1 + 2j, 2 + 3j, 3 + 4j, 4 + 5j, 5 + 6j] - ) - self.assertEqual(df["boolean"].tolist(), [True, False, True, False, True]) - self.assertEqual(df["list"].tolist(), [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]]) - tbl = df["table"].tolist() - for i in range(len(tbl)): - lst = tbl[i] - self.assertEqual([lst[0], lst[1]], [i * 3 + 1, i * 3 + 2]) - self.assertEqual(lst[str((i + 1) * 3)], (i + 1) * 3) - self.assertEqual( - df["range"].tolist(), - [range(1, 12), range(2, 13), range(3, 14), range(4, 15), range(5, 16)], - ) - - def test_tfsDataFrame(self): - self.generalDataFrame("headers", tfs.TfsDataFrame) - - def test_pandasDataFrame(self): - sys.modules["tfs"] = None # Remove tfs-pandas - self.generalDataFrame("attrs", pandas.DataFrame) - del sys.modules["tfs"] - - def test_failure(self): - with MAD() as mad: - mad.send(""" -test = mtable{"string", "number"} + {"a", 1.1} + {"b", 2.2} - """) - pandas = sys.modules["pandas"] - sys.modules["pandas"] = None - self.assertRaises(ImportError, lambda: mad.test.to_df()) - sys.modules["pandas"] = pandas - df = mad.test.to_df() - self.assertTrue(isinstance(df, tfs.TfsDataFrame)) - self.assertEqual(df["string"].tolist(), ["a", "b"]) - self.assertEqual(df["number"].tolist(), [1.1, 2.2]) - - -# class TestSpeed(unittest.TestCase): - -# def test_benchmark(self): -# with MAD() as mad: -# mad.load("element", "quadrupole") -# mad.send(""" -# qd = quadrupole {knl={0, 0.25}, l = 1} -# py:send(qd) -# """) -# qd = mad.recv("qd") -# start = time.time() -# for i in range(int(1e5)): -# mad["qf"] = qd -# mad.qd -# total = time.time() - start -# self.assertAlmostEqual(total, 5, None, None, 5) # 1 second +/- 1 second - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_communication.py b/tests/test_communication.py new file mode 100644 index 0000000..5b6847b --- /dev/null +++ b/tests/test_communication.py @@ -0,0 +1,89 @@ +import threading +import unittest +from unittest.mock import patch + +from pymadng import MAD + + +class TestExecution(unittest.TestCase): + def test_recv_and_exec(self): + with MAD() as mad: + mad.send("""py:send([==[mad.send('''py:send([=[mad.send("py:send([[a = 100/2]])")]=])''')]==])""") + mad.recv_and_exec() + mad.recv_and_exec() + a = mad.recv_and_exec()["a"] + self.assertEqual(a, 50) + + def test_err(self): + with MAD(stdout="/dev/null", redirect_sterr=True) as mad: + mad.send("py:__err(true)") + mad.send("1+1") #Load error + self.assertRaises(RuntimeError, mad.recv) + mad.send("py:__err(true)") + mad.send("print(nil/2)") #Runtime error + self.assertRaises(RuntimeError, mad.recv) + +class TestStrings(unittest.TestCase): + def test_recv(self): + with MAD() as mad: + mad.send("py:send('hi')") + mad.send("""py:send([[Multiline string should work + +Like So.]])""") + self.assertEqual(mad.recv(), 'hi') + self.assertEqual(mad.receive(), 'Multiline string should work\n\nLike So.') + + def test_send(self): + with MAD() as mad: + initString = "asdfghjkl;" + mad.send("str = py:recv(); py:send(str .. str)") + mad.send(initString) + self.assertEqual(mad.recv(), initString * 2) + mad.send("str2 = py:recv(); py:send(str2 .. str2)") + initString = """Py Multiline string should work + +Like So.]])""" + mad.send(initString) + self.assertEqual(mad.recv(), initString * 2) + + def test_protected_send(self): + with MAD(stdout="/dev/null", redirect_sterr=True, raise_on_madng_error=False) as mad: + mad.send("py:send('hello world'); a = nil/2") + self.assertEqual(mad.recv(), "hello world") # python should not crash + mad.send("py:send(1)") + self.assertEqual(mad.recv(), 1) # Check that the error did not affect the pipe + + mad.protected_send("a = nil/2") + self.assertRaises(RuntimeError, mad.recv) # python should receive an error + + mad.psend("a = nil/2") + self.assertRaises(RuntimeError, mad.recv) + + +class TestOutput(unittest.TestCase): + + def test_print(self): + with MAD() as mad: + mad.send("py:send('hello world')") + self.assertEqual(mad.recv(), "hello world") # Check printing does not affect pipe + +class TestSignalHandler(unittest.TestCase): + @patch("pymadng.madp_pymad.mad_process._setup_signal_handler") + def test_signal_handler_called_in_main_thread(self, mock_setup_signal_handler): + with patch("pathlib.Path.exists", return_value=True): + MAD() + mock_setup_signal_handler.assert_called_once() + + @patch("pymadng.madp_pymad.mad_process._setup_signal_handler") + def test_signal_handler_not_called_in_non_main_thread(self, mock_setup_signal_handler): + def create_mad_process(): + with patch("pathlib.Path.exists", return_value=True): + MAD() + + thread = threading.Thread(target=create_mad_process) + thread.start() + thread.join() + mock_setup_signal_handler.assert_not_called() + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/test_debug.py b/tests/test_debug.py new file mode 100644 index 0000000..b27a711 --- /dev/null +++ b/tests/test_debug.py @@ -0,0 +1,69 @@ +import unittest +from pathlib import Path +from pymadng import MAD +import time + +inputs_folder = Path(__file__).parent / "inputs" + +class TestDebug(unittest.TestCase): + test_log1 = inputs_folder / "test.log" + + def test_logfile(self): + example_log = inputs_folder / "example.log" + test_log2 = inputs_folder / "test2.log" + + with MAD(stdout=self.test_log1, debug=True, raise_on_madng_error=False) as mad: + pass + time.sleep(0.1) # Wait for file to be written + with open(self.test_log1, "r") as f: + with open(example_log, "r") as f2: + self.assertEqual(f.read(), f2.read()) + + with MAD(stdout=test_log2, debug=True) as mad: + mad.send("!This is a line that does nothing") + mad.send("print('hello world')") + + with open(test_log2) as f: + text = f.read() + self.assertTrue("[!This is a line that does nothing]" in text) + self.assertTrue("hello world\n" in text) + self.assertTrue("[print('hello world')]" in text) + + with MAD(stdout=test_log2, debug=False) as mad: + mad.send("!This is a line that does nothing") + mad.send("print('hello world')") + + with open(test_log2) as f: + self.assertEqual(f.read(), "hello world\n") + + self.test_log1.unlink() + test_log2.unlink() + + def test_err(self): + # Run debug without stderr redirection + with open(self.test_log1, "w") as f: + with MAD(debug=True, stdout=f, raise_on_madng_error=False) as mad: + mad.psend("a = nil/2") + # receive the error before closing the pipe + self.assertRaises(RuntimeError, mad.recv) + with open(self.test_log1) as f: + # Check command was sent + file_text = f.read() + self.assertTrue("[py:__err(true); a = nil/2; py:__err(false);]" in file_text) + # Check error was not in stdout + self.assertFalse("***pymad.run:" in file_text) + + # Run debug with stderr redirection + with MAD(stdout=self.test_log1, redirect_sterr=True) as mad: + mad.psend("a = nil/2") + # receive the error before closing the pipe + self.assertRaises(RuntimeError, mad.recv) + with open(self.test_log1) as f: + # Check command was sent + file_text = f.read() + # Check error was in stdout + self.assertEqual("***pymad.run:", file_text[:13]) + self.test_log1.unlink() + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/test_io_and_loading.py b/tests/test_io_and_loading.py new file mode 100644 index 0000000..30739b8 --- /dev/null +++ b/tests/test_io_and_loading.py @@ -0,0 +1,69 @@ +import os +import unittest +import numpy as np +from pymadng import MAD + +class TestLoad(unittest.TestCase): + a = np.arange(1, 21).reshape(4, 5) + b = np.arange(1, 7).reshape(2, 3) + c = (np.arange(1, 16) + 1j).reshape(5, 3) + + def test_load(self): + with MAD() as mad: + mad.load("MAD", "matrix") + self.assertTrue(mad.send("py:send(matrix == MAD.matrix)").recv()) + + mad.load("MAD.gmath") + self.assertTrue(mad.send("py:send(sin == MAD.gmath.sin)").recv()) + self.assertTrue(mad.send("py:send(cos == MAD.gmath.cos)").recv()) + self.assertTrue(mad.send("py:send(tan == MAD.gmath.tan)").recv()) + + mad.load("MAD.element", "quadrupole", "sextupole", "drift") + self.assertTrue( + mad.send("py:send(quadrupole == MAD.element.quadrupole)").recv() + ) + self.assertTrue( + mad.send("py:send(sextupole == MAD.element.sextupole )").recv() + ) + self.assertTrue( + mad.send("py:send(drift == MAD.element.drift )").recv() + ) + + def test_run_file(self): + with open("test.mad", "w") as f: + f.write(""" + local matrix, cmatrix in MAD + a = matrix(4, 5):seq() + b = cmatrix(2, 3):seq() + """) + with MAD(stdout="/dev/null", redirect_sterr=True) as mad: + mad.loadfile("test.mad") + self.assertIsNone(mad.matrix) + self.assertTrue(np.all(mad.a == self.a)) + self.assertTrue(np.all(mad.b == self.b)) + os.remove("test.mad") + + def test_load_file(self): + with open("test.mad", "w") as f: + f.write(""" + local matrix, cmatrix in MAD + local a = matrix(4, 5):seq() + local c = cmatrix(5, 3):seq() + 1i + return {res1 = a * c, res2 = a * c:conj()} + """) + with MAD() as mad: + mad.loadfile("test.mad", "res1", "res2") + self.assertTrue(np.all(mad.res1 == np.matmul(self.a, self.c))) + self.assertTrue(np.all(mad.res2 == np.matmul(self.a, self.c.conj()))) + os.remove("test.mad") + + def test_globals(self): + with MAD() as mad: + mad.send_vars(a=1, b=2, c=3) + global_vars = mad.globals() + self.assertIn("a", global_vars) + self.assertIn("b", global_vars) + self.assertIn("c", global_vars) + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/test_misc_types.py b/tests/test_misc_types.py new file mode 100644 index 0000000..7cbe7b9 --- /dev/null +++ b/tests/test_misc_types.py @@ -0,0 +1,225 @@ +from pathlib import Path +import time +import unittest + +import numpy as np + +from pymadng import MAD + +inputs_folder = Path(__file__).parent / "inputs" +# TODO: Test the following functions: +# - eval +# - error on stdout = something strange + +class TestNil(unittest.TestCase): + + def test_send_recv(self): + with MAD() as mad: + mad.send(""" + local myNil = py:recv() + py:send(myNil) + py:send(nil) + py:send() + """) + mad.send(None) + self.assertIsNone(mad.recv()) + self.assertIsNone(mad.recv()) + self.assertIsNone(mad.recv()) + +class TestRngs(unittest.TestCase): + + def test_recv(self): + with MAD() as mad: + mad.send(""" + irng = MAD.range(3, 11, 2) + rng = MAD.nrange(3.5, 21.4, 12) + lrng = MAD.nlogrange(1, 20, 20) + py:send(irng) + py:send(rng) + py:send(lrng) + py:send(irng:totable()) + py:send(rng:totable()) + py:send(lrng:totable()) + """) + self.assertEqual(mad.recv(), range(3 , 12 , 2)) #MAD is inclusive, python is exclusive (on stop) + self.assertTrue (np.allclose(mad.recv(), np.linspace(3.5, 21.4, 12))) + self.assertTrue (np.allclose(mad.recv(), np.geomspace(1, 20, 20))) + self.assertEqual(mad.recv(), list(range(3, 12, 2))) #MAD is inclusive, python is exclusive (on stop) + self.assertTrue (np.allclose(mad.recv(), np.linspace(3.5, 21.4, 12))) + self.assertTrue (np.allclose(mad.recv(), np.geomspace(1, 20, 20))) + + def test_send(self): + with MAD() as mad: + mad.send(""" + irng = py:recv() + 1 + rng = py:recv() + 2 + lrng = py:recv() + py:send(irng:totable()) + py:send(rng:totable()) + py:send(lrng:totable()) + """) + mad.send(range(3, 10, 1)) + mad.send_range(3.5, 21.4, 14) + mad.send_logrange(1, 20, 20) + self.assertEqual(mad.recv(), list(range(4, 12, 1))) + self.assertTrue (np.allclose(mad.recv(), np.linspace(5.5, 23.4, 14))) + self.assertTrue (np.allclose(mad.recv(), np.geomspace(1, 20, 20))) + +class TestBool(unittest.TestCase): + + def test_send_recv(self): + with MAD() as mad: + mad.send(""" + bool1 = py:recv() + bool2 = py:recv() + py:send(not bool1) + py:send(not bool2) + """) + mad.send(True ) + mad.send(False) + self.assertEqual(mad.recv(), False) + self.assertEqual(mad.recv(), True) + +class TestMono(unittest.TestCase): + + def test_recv(self): + with MAD() as mad: + mad.send(""" + local m = MAD.monomial({1, 2, 3, 4, 6, 20, 100}) + py:send(m) + local m = MAD.monomial("WZ346oy") + py:send(m) + """) + + self.assertTrue(np.all(mad.recv() == [1, 2, 3, 4, 6, 20, 100])) + self.assertTrue(np.all(mad.recv() == [32, 35, 3, 4, 6, 50, 60])) + + def test_send_recv(self): + with MAD() as mad: + mad.send(""" + local m1 = py:recv() + local m2 = py:recv() + py:send(m1 + m2) + """) + pym1 = np.random.randint(0, 255, 20, dtype=np.ubyte) + pym2 = np.random.randint(0, 255, 20, dtype=np.ubyte) + mad.send(pym1) + mad.send(pym2) + mad_res = mad.recv() + self.assertTrue(np.all(mad_res == pym1+pym2)) + #Check the return is a monomial + self.assertEqual(mad_res.dtype, np.dtype("ubyte")) + +class TestTPSA(unittest.TestCase): + + def test_recv_real(self): + with MAD() as mad: + mad.send(""" + local d = MAD.gtpsad(3, 6) + res = MAD.tpsa(6):set(1,2):set(2, 1) + res2 = res:copy():set(3, 1) + res3 = res2:copy():set(4, 1) + py:send(res:axypbzpc(res2, res3, 1, 2, 3)) + """) + monomials, coefficients = mad.recv() + self.assertTrue(np.all(monomials[0] == [0, 0, 0])) + self.assertTrue(np.all(monomials[1] == [1, 0, 0])) + self.assertTrue(np.all(monomials[2] == [0, 1, 0])) + self.assertTrue(np.all(monomials[3] == [0, 0, 1])) + self.assertTrue(np.all(monomials[4] == [2, 0, 0])) + self.assertTrue(np.all(monomials[5] == [1, 1, 0])) + self.assertTrue(np.all(coefficients == [11, 6, 4, 2, 1, 1])) + + def test_recv_cpx(self): + with MAD() as mad: + mad.send(""" + local d = MAD.gtpsad(3, 6) + res = MAD.ctpsa(6):set(1,2+1i):set(2, 1+2i) + res2 = res:copy():set(3, 1+2i) + res3 = res2:copy():set(4, 1+2i) + py:send(res:axypbzpc(res2, res3, 1, 2, 3)) + """) + monomials, coefficients = mad.recv() + self.assertTrue(np.all(monomials[0] == [0, 0, 0])) + self.assertTrue(np.all(monomials[1] == [1, 0, 0])) + self.assertTrue(np.all(monomials[2] == [0, 1, 0])) + self.assertTrue(np.all(monomials[3] == [0, 0, 1])) + self.assertTrue(np.all(monomials[4] == [2, 0, 0])) + self.assertTrue(np.all(monomials[5] == [1, 1, 0])) + self.assertTrue(np.all(coefficients == [10+6j, 2+14j, 2+9j, 2+4j, -3+4j, -3+4j])) + + # Might be worth checking if the tab can be converted into a tpsa from Monomials. (jgray 2025) + def test_send_tpsa(self): + with MAD() as mad: + mad.send(""" + tab = py:recv() + index_part = {} + for i = 1, #tab do + index_part[i] = tab[i] + end + py:send(tab) + py:send(index_part) + """) + monos = np.asarray([[0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1], [2, 0, 0], [1, 1, 0]], dtype=np.uint8) + coefficients = [11, 6, 4, 2, 1, 1] + expected_index = ["000", "100", "010", "001", "200", "110"] + + mad.send_tpsa(monos, coefficients) + + whole_tab = mad.recv("tab") + index_part = mad.recv("index_part") + self.assertEqual(index_part, expected_index) + for i in range(len(whole_tab)): + self.assertTrue(whole_tab[i] == expected_index[i]) + + for i, key in enumerate(expected_index): + self.assertTrue(whole_tab[key] == coefficients[i]) + + def test_send_ctpsa(self): + with MAD() as mad: + mad.send(""" + tab = py:recv() + index_part = {} + for i = 1, #tab do + index_part[i] = tab[i] + end + py:send(tab) + py:send(index_part) + """) + monos = np.asarray([[0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1], [2, 0, 0], [1, 1, 0]], dtype=np.uint8) + coefficients = [10+6j, 2+14j, 2+9j, 2+4j, -3+4j, -3+4j] + expected_index = ["000", "100", "010", "001", "200", "110"] + + mad.send_cpx_tpsa(monos, coefficients) + + whole_tab = mad.recv("tab") + index_part = mad.recv("index_part") + self.assertEqual(index_part, expected_index) + for i in range(len(whole_tab)): + self.assertTrue(whole_tab[i] == expected_index[i]) + for i, key in enumerate(expected_index): + self.assertTrue(whole_tab[key] == coefficients[i]) + + def test_send_recv_damap(self): + with MAD() as mad: + mad.send(""" + py:__err(true) + local sin in MAD.gmath + MAD.gtpsad(6, 5) + local M = MAD.damap {xy = 5} + M[1] = 1 ; M[3] = 2 ; M[5] = 3 + M[2] = 2 ; M[4] = 1 ; M[6] = 1 + res = sin(M[1]) * sin(M[3]) + py:send(res) + recved = MAD.tpsa():fromtable(py:recv()) + py:send(recved) + """) + init = mad.recv() + mad.send_tpsa(*init) + final = mad.recv() + self.assertTrue((init[0] == final[0]).all()) + self.assertTrue((init[1] == final[1]).all()) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_numeric_types.py b/tests/test_numeric_types.py new file mode 100644 index 0000000..fff265d --- /dev/null +++ b/tests/test_numeric_types.py @@ -0,0 +1,137 @@ +import unittest +import numpy as np +from pymadng import MAD + +class TestList(unittest.TestCase): + def test_send_recv(self): + with MAD() as mad: + myList = [[1, 2, 3, 4, 5, 6, 7, 8, 9]] * 2 + mad.send(""" + local list = py:recv() + list[1][1] = 10 + list[2][1] = 10 + py:send(list) + """) + mad.send(myList) + myList[0][0] = 10 + myList[1][0] = 10 + self.assertEqual(mad.recv(), myList) + + def test_send_recv_wref(self): + with MAD() as mad: + mad.send(""" + list = {MAD.object "a" {a = 2}, MAD.object "b" {b = 2}} + list2 = {1, 2, 3, 4, 5, a = 10, b = 3, c = 4} + py:send(list) + py:send(list2) + """) + list1 = mad.recv("list") + list2 = mad.recv("list2") + self.assertEqual(len(list1), 2) + self.assertEqual([x for x in list2], [1, 2, 3, 4, 5]) + self.assertEqual(list2["a"], 10) + self.assertEqual(list2["b"], 3) + self.assertEqual(list2["c"], 4) + self.assertEqual(list1[0].a, 2) + self.assertEqual(list1[1].b, 2) + +class TestNums(unittest.TestCase): + eps = 2**-52 + tiny = 2**-1022 + huge = 2**1023 + flt_lst = [0, tiny, 2**-64, 2**-63, 2**-53, eps, 2**-52, 2*eps, 2**-32, 2**-31, 1e-9, + 0.1-eps, 0.1, 0.1+eps, 0.5, 0.7-eps, 0.7, 0.7+eps, 1-eps, 1, 1+eps, + 1.1, 1.7, 2, 10, 1e2, 1e3, 1e6, 1e9, 2**31, 2**32, 2**52, 2**53, + 2**63, 2**64, huge] + + def test_send_recv_int(self): + with MAD() as mad: + int_lst = [0, 1, 2, 10, 1e2, 1e3, 1e6, 1e9, 2**31-1] + for i in range(len(int_lst)): + mad.send(""" + local is_integer in MAD.typeid + local num = py:recv() + py:send( num) + py:send(-num) + py:send(is_integer(num)) + """) + mad.send(int_lst[i]) + recv_num = mad.recv() + self.assertEqual(recv_num, int_lst[i]) + self.assertTrue(isinstance(recv_num, np.int32)) + recv_num = mad.recv() + self.assertEqual(recv_num, -int_lst[i]) + self.assertTrue(isinstance(recv_num, np.int32)) + self.assertTrue(mad.recv()) + + def test_send_recv_num(self): + with MAD() as mad: + for i in range(len(self.flt_lst)): + mad.send(""" + local num = py:recv() + local negative = py:recv() + py:send(num) + py:send(negative) + py:send(num * 1.61) + """) + mad.send(self.flt_lst[i]) + mad.send(-self.flt_lst[i]) + self.assertEqual(mad.recv(), self.flt_lst[i]) #Check individual floats + self.assertEqual(mad.recv(), -self.flt_lst[i]) #Check negation + self.assertEqual(mad.recv(), self.flt_lst[i] * 1.61) #Check manipulation + + def test_send_recv_cpx(self): + with MAD() as mad: + for i in range(len(self.flt_lst)): + for j in range(len(self.flt_lst)): + mad.send(""" + local my_cpx = py:recv() + py:send(my_cpx) + py:send(-my_cpx) + py:send(my_cpx * 1.31i) + """) + my_cpx = self.flt_lst[i] + 1j * self.flt_lst[j] + mad.send(my_cpx) + self.assertEqual(mad.recv(), my_cpx) + self.assertEqual(mad.recv(), -my_cpx) + self.assertEqual(mad.recv(), my_cpx * 1.31j) + +class TestMatrices(unittest.TestCase): + def test_send_recv_imat(self): + with MAD() as mad: + mad.send(""" + local imat = py:recv() + py:send(imat) + py:send(MAD.imatrix(3, 5):seq()) + """) + imat = np.random.randint(0, 255, (5, 5), dtype=np.int32) + mad.send(imat) + self.assertTrue(np.all(mad.recv() == imat)) + self.assertTrue(np.all(mad.recv() == np.arange(1, 16).reshape(3, 5))) + + def test_send_recv_mat(self): + with MAD() as mad: + mad.send(""" + local mat = py:recv() + py:send(mat) + py:send(MAD.matrix(3, 5):seq() / 2) + """) + mat = np.arange(1, 25).reshape(4, 6) / 4 + mad.send(mat) + self.assertTrue(np.all(mad.recv() == mat)) + self.assertTrue(np.all(mad.recv() == np.arange(1, 16).reshape(3, 5) / 2)) + + def test_send_recv_cmat(self): + with MAD() as mad: + mad.send(""" + local cmat = py:recv() + py:send(cmat) + py:send(MAD.cmatrix(3, 5):seq() / 2i) + """) + cmat = np.arange(1, 25).reshape(4, 6) / 4 + 1j * np.arange(1, 25).reshape(4, 6) / 4 + mad.send(cmat) + self.assertTrue(np.all(mad.recv() == cmat)) + self.assertTrue(np.all(mad.recv() == (np.arange(1, 16).reshape(3, 5) / 2j))) + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/test_object_wrapping.py b/tests/test_object_wrapping.py new file mode 100644 index 0000000..3908940 --- /dev/null +++ b/tests/test_object_wrapping.py @@ -0,0 +1,490 @@ +import math +import os +import sys +import unittest + +import numpy as np +import pandas +import tfs + +from pymadng import MAD +from pymadng.madp_classes import high_level_mad_ref, mad_high_level_last_ref + +# TODO: Test the following functions: +# - __str__ on mad references (low priority) + +class TestGetSet(unittest.TestCase): + def test_get(self): + with MAD(stdout="/dev/null", redirect_sterr=True) as mad: + mad.load("element", "quadrupole") + self.assertEqual(mad.asdfg, None) + mad.send("""qd = quadrupole {knl={0, 0.25}, l = 1}""") + mad.send("""qf = quadrupole {qd = qd}""") + qd, qf = mad["qd", "qf"] + self.assertEqual(qd._name, "qd") + self.assertEqual(qd._parent, None) + self.assertEqual(qd._mad, mad._MAD__process) + self.assertEqual(qd.knl, [0, 0.25]) + self.assertEqual(qd.l, 1) + self.assertRaises(AttributeError, lambda: qd.asdfg) + self.assertRaises(KeyError, lambda: qd["asdfg"]) + self.assertRaises(IndexError, lambda: qd[1]) + self.assertTrue(isinstance(qf.qd, high_level_mad_ref)) + self.assertEqual(qf.qd.knl, [0, 0.25]) + self.assertEqual(qf.qd.l, 1) + self.assertEqual(qf.qd, qd) + + mad.send("objList = {qd, qf, qd, qf, qd} py:send(objList)") + objList = mad.recv("objList") + for i in range(len(objList)): + if i % 2 != 0: + self.assertTrue(isinstance(objList[i].qd, high_level_mad_ref)) + self.assertEqual(objList[i].qd._parent, f"objList[{i + 1}]") + self.assertEqual(objList[i].qd.knl, [0, 0.25]) + self.assertEqual(objList[i].qd.l, 1) + self.assertEqual(objList[i].qd, qd) + + else: + self.assertEqual(objList[i].knl, [0, 0.25]) + self.assertEqual(objList[i].l, 1) + self.assertEqual(objList[i], qd) + self.assertEqual(objList[i]._parent, "objList") + + def test_set(self): # Need more? + with MAD() as mad: + mad.load("element", "quadrupole") + mad.send("""qd = quadrupole {knl={0, 0.25}, l = 1} py:send(qd)""") + mad["qd2"] = mad.recv("qd") + self.assertEqual(mad.qd2._name, "qd2") + self.assertEqual(mad.qd2._parent, None) + self.assertEqual(mad.qd2._mad, mad._MAD__process) + self.assertEqual(mad.qd2.knl, [0, 0.25]) + self.assertEqual(mad.qd2.l, 1) + self.assertEqual(mad.qd2, mad.qd) + mad["a", "b"] = mad.MAD.gmath.reim(9.75 + 1.5j) + self.assertEqual(mad.a, 9.75) + self.assertEqual(mad.b, 1.5) + mad.send("f = \\-> (1, 2, 3, 9, 8, 7)") + mad["r1", "r2", "r3", "r4", "r5", "r6"] = mad.f() + self.assertEqual(mad.r1, 1) + self.assertEqual(mad.r2, 2) + self.assertEqual(mad.r3, 3) + self.assertEqual(mad.r4, 9) + self.assertEqual(mad.r5, 8) + self.assertEqual(mad.r6, 7) + + def test_send_vars(self): + with MAD() as mad: + mad.send_vars(a=1, b=2.5, c="test", d=[1, 2, 3]) + self.assertEqual(mad.a, 1) + self.assertEqual(mad.b, 2.5) + self.assertEqual(mad.c, "test") + self.assertEqual(mad.d, [1, 2, 3]) + + def test_recv_vars(self): + with MAD() as mad: + mad.send_vars(a=1, b=2.5, c="test", d=[1, 2, 3]) + a, b, c, d = mad.recv_vars("a", "b", "c", "d") + self.assertEqual(a, 1) + self.assertEqual(b, 2.5) + self.assertEqual(c, "test") + self.assertEqual(d, [1, 2, 3]) + + def test_quote_strings(self): + with MAD() as mad: + self.assertEqual(mad.quote_strings("test"), "'test'") + self.assertEqual(mad.quote_strings(["a", "b", "c"]), ["'a'", "'b'", "'c'"]) + + def test_create_deferred_expression(self): + with MAD() as mad: + deferred = mad.create_deferred_expression(a="x + y", b="x * y") + mad.send_vars(x=2, y=3) + self.assertEqual(deferred.a, 5) # x + y = 2 + 3 + self.assertEqual(deferred.b, 6) # x * y = 2 * 3 + mad.send_vars(x=5, y=10) + self.assertEqual(deferred.a, 15) # x + y = 5 + 10 + self.assertEqual(deferred.b, 50) + + mad["deferred"] = mad.create_deferred_expression(a="x - y", b="x / y") + mad.send_vars(x=10, y=2) + self.assertEqual(mad.deferred.a, 8) # x - y = 10 - 2 + self.assertEqual(mad.deferred.b, 5) # x / y = 10 / 2 + + +class TestObjFun(unittest.TestCase): + def test_call_obj(self): + with MAD() as mad: + mad.load("element", "quadrupole", "sextupole") + qd = mad.quadrupole(knl=[0, 0.25], l=1) + sd = mad.sextupole(knl=[0, 0.25, 0.5], l=1) + + mad["qd"] = qd + self.assertEqual(mad.qd._name, "qd") + self.assertEqual(mad.qd._parent, None) + self.assertEqual(mad.qd._mad, mad._MAD__process) + self.assertEqual(mad.qd.knl, [0, 0.25]) + self.assertEqual(mad.qd.l, 1) + + sdc = sd + mad["sd"] = sd + del sd + self.assertEqual(mad.sd._name, "sd") + self.assertEqual(mad.sd._parent, None) + self.assertEqual(mad.sd._mad, mad._MAD__process) + self.assertEqual(mad.sd.knl, [0, 0.25, 0.5]) + self.assertEqual(mad.sd.l, 1) + + # Reference counting + qd = mad.quadrupole(knl=[0, 0.3], l=1) + self.assertEqual(sdc._name, "_last[2]") + self.assertEqual(qd._name, "_last[3]") + self.assertEqual(qd.knl, [0, 0.3]) + qd = mad.quadrupole(knl=[0, 0.25], l=1) + self.assertEqual(qd._name, "_last[1]") + + def test_call_last(self): + with MAD() as mad: + mad.send("func_test = \\a-> \\b-> \\c-> a+b*c") + self.assertRaises(TypeError, lambda: mad.MAD()) + self.assertEqual(mad.func_test(1)(2)(3), 7) + + def test_call_fail(self): + with MAD(stdout="/dev/null", redirect_sterr=True) as mad: + mad.send("func_test = \\a-> \\b-> \\c-> 'a'+b") + mad.func_test(1)(2)(3) + self.assertRaises(RuntimeError, lambda: mad.recv()) + self.assertRaises( + RuntimeError, lambda: mad.mtable.read("'abad.tfs'").eval() + ) + + def test_call_func(self): + with MAD() as mad: + mad.load("element", "quadrupole") + mad["qd"] = mad.quadrupole(knl=[0, 0.25], l=1) + mad.qd.select() + mad["qdSelected"] = mad.qd.is_selected() + self.assertTrue(mad.qdSelected) + mad.qd.deselect() + mad["qdSelected"] = mad.qd.is_selected() + self.assertFalse(mad.qdSelected) + mad.qd.set_variables({"l": 2}) + self.assertEqual(mad.qd.l, 2) + + def test_mult_rtrn(self): + with MAD() as mad: + mad.send(""" + obj = MAD.object "obj" {a = 1, b = 2} + local function mult_rtrn () + obj2 = MAD.object "obj2" {a = 2, b = 3} + return obj, obj, obj, obj2 + end + last_rtn = __mklast__(mult_rtrn()) + lastobj = __mklast__(obj) + notLast = {mult_rtrn()} + """) + + mad["o11", "o12", "o13", "o2"] = mad._MAD__get_mad_ref("last_rtn") + mad["p11", "p12", "p13", "p2"] = mad._MAD__get_mad_ref("notLast") + mad["objCpy"] = mad._MAD__get_mad_ref( + "lastobj" + ) # Test single object in __mklast__ + self.assertEqual(mad.o11.a, 1) + self.assertEqual(mad.o11.b, 2) + self.assertEqual(mad.o12.a, 1) + self.assertEqual(mad.o12.b, 2) + self.assertEqual(mad.o13.a, 1) + self.assertEqual(mad.o13.b, 2) + self.assertEqual(mad.o2.a, 2) + self.assertEqual(mad.o2.b, 3) + self.assertEqual(mad.p11.a, 1) + self.assertEqual(mad.p11.b, 2) + self.assertEqual(mad.p12.a, 1) + self.assertEqual(mad.p12.b, 2) + self.assertEqual(mad.p13.a, 1) + self.assertEqual(mad.p13.b, 2) + self.assertEqual(mad.p2.a, 2) + self.assertEqual(mad.p2.b, 3) + self.assertEqual(mad.objCpy, mad.obj) + + def test_MADX(self): + from pymadng import MAD + + mad = MAD() + self.assertEqual(mad.MADX.abs(-1), 1) + self.assertEqual(mad.MADX.ceil(1.2), 2) + + self.assertEqual(mad.MAD.MADX.abs(-1), 1) + self.assertEqual(mad.MAD.MADX.ceil(1.2), 2) + + self.assertEqual(mad.MADX.MAD.MADX.abs(-1), 1) + self.assertEqual(mad.MADX.MAD.MADX.ceil(1.2), 2) + + with open("test.seq", "w") as f: + f.write(""" + qd: quadrupole, l=1, knl:={0, 0.25}; + """) + mad.MADX.load("'test.seq'") + self.assertEqual(mad.MADX.qd.l, 1) + self.assertEqual(mad.MADX.qd.knl, [0, 0.25]) + os.remove("test.seq") + + def test_evaluate_in_madx_environment(self): + with MAD() as mad: + madx_code = """ + qd = quadrupole {l=1, knl:={0, 0.25}} + """ + mad.evaluate_in_madx_environment(madx_code) + self.assertEqual(mad.MADX.qd.l, 1) + self.assertEqual(mad.MADX.qd.knl, [0, 0.25]) + + +class TestOps(unittest.TestCase): + def test_matrix(self): + with MAD() as mad: + mad.load("MAD", "matrix") + pyMat = np.arange(1, 101).reshape((10, 10)) + + mad["mat"] = mad.matrix(10).seq(2) + 2 + self.assertTrue(np.all(mad.mat == pyMat + 4)) + + mad["mat"] = mad.matrix(10).seq() / 3 + self.assertTrue(np.allclose(mad.mat, pyMat / 3)) + + mad["mat"] = mad.matrix(10).seq() * 4 + self.assertTrue(np.all(mad.mat == pyMat * 4)) + + mad["mat"] = mad.matrix(10).seq() ** 3 + self.assertTrue(np.all(mad.mat == np.linalg.matrix_power(pyMat, 3))) + + mad["mat"] = mad.matrix(10).seq() + 2 / 3 * 4**3 # bidmas + self.assertTrue(np.all(mad.mat == pyMat + 2 / 3 * 4**3)) + + # conversions + self.assertTrue( + np.all(np.array(mad.MAD.matrix(10).seq()) == np.arange(1, 101)) + ) + self.assertTrue(np.all(list(mad.MAD.matrix(10).seq()) == np.arange(1, 101))) + self.assertTrue(np.all(mad.MAD.matrix(10).seq().eval() == pyMat)) + self.assertEqual(np.sin(1), mad.math.sin(1).eval()) + self.assertAlmostEqual( + np.cos(0.5), mad.math.cos(0.5).eval(), None, None, 4e-16 + ) + + # temp vars + res = ( + ( + (mad.matrix(3).seq().emul(2) * mad.matrix(3).seq(3) + 3) * 2 + + mad.matrix(3).seq(2) + ) + - mad.matrix(3).seq(4) + ).eval() + np_mat = np.arange(9).reshape((3, 3)) + 1 + exp = ((np.matmul((np_mat * 2), (np_mat + 3)) + 3) * 2 + (np_mat + 2)) - ( + np_mat + 4 + ) + self.assertTrue(np.all(exp == res)) + + +class TestArgsAndKwargs(unittest.TestCase): + def test_args(self): + with MAD() as mad: + mad.load("MAD", "matrix", "cmatrix") + mad["m1"] = mad.matrix(3).seq() + mad["m2"] = mad.matrix(3).eye(2).mul(mad.m1) + mad["m3"] = mad.matrix(3).seq().emul(mad.m1) + mad["cm1"] = mad.cmatrix(3).seq(1j).map([1, 5, 9], "\\mn, n -> mn - 1i") + self.assertTrue(np.all(mad.m2 == mad.m1 * 2)) + self.assertTrue(np.all(mad.m3 == (mad.m1 * mad.m1))) + self.assertTrue(np.all(mad.cm1 == mad.m1 + 1j - np.eye(3) * 1j)) + # Add bool + + def test_kwargs(self): + with MAD() as mad: + mad.load("element", "sextupole") + mad["m1"] = mad.MAD.matrix(3).seq() + sd = mad.sextupole( + knl=[0, 0.25j, 1 + 1j], + l=1, + alist=[1, 2, 3, 5], + abool=True, + opposite=False, + mat=mad.m1, + ) + self.assertEqual(sd.knl, [0, 0.25j, 1 + 1j]) + self.assertEqual(sd.l, 1) + self.assertEqual(sd.alist, [1, 2, 3, 5]) + self.assertEqual(sd.abool, True) + self.assertEqual(sd.opposite, False) + self.assertTrue(np.all(sd.mat == np.arange(9).reshape((3, 3)) + 1)) + + +class TestDir(unittest.TestCase): + def test_dir(self): + with MAD() as mad: + mad.load("MAD", "gfunc", "element", "object") + mad.load("element", "quadrupole") + mad.load("gfunc", "functor") + obj_dir = dir(mad.object) + mad.send( + "my_obj = object {a1 = 2, a2 = functor(\\s->s.a1), a3 = \\s->s.a1}" + ) + obj_exp = sorted(["a1", "a2()", "a3"] + obj_dir) + self.assertEqual(dir(mad.my_obj), obj_exp) + self.assertEqual(mad.my_obj.a1, mad.my_obj.a3) + + quad_exp = dir(mad.quadrupole) + self.assertEqual( + dir(mad.quadrupole(knl=[0, 0.3], l=1)), quad_exp + ) # Dir of instance of class should be the same as the class + self.assertEqual( + dir(mad.quadrupole(asd=10, qwe=20)), sorted(quad_exp + ["asd", "qwe"]) + ) # Adding to the instance should change the dir + + def test_dir_on_mad_object(self): + with MAD() as mad: + mad.load("MAD", "object") + mad.send("my_obj = object {a = 1, b = 2, c = 3}") + expected_dir = sorted(["a", "b", "c"] + dir(mad.object)) + self.assertGreater(len(expected_dir), 0) + self.assertEqual(dir(mad.my_obj), expected_dir) + + def test_dir_on_last_object(self): + with MAD() as mad: + mad.load("MAD", "object") + mad.send("last_obj = __mklast__(object {x = 10, y = 20})") + mad["last"] = mad._MAD__get_mad_ref("last_obj") + expected_dir = sorted(["x", "y"] + dir(mad.object)) + self.assertGreater(len(expected_dir), 0) + self.assertEqual(dir(mad.last), expected_dir) + + def test_history(self): + with MAD(debug=True, stdout="/dev/null") as mad: + mad.send("a = 1") + mad.send("b = 2") + mad.send("c = a + b") + history = mad.history() + self.assertIn("a = 1", history) + self.assertIn("b = 2", history) + self.assertIn("c = a + b", history) + + +class TestDataFrame(unittest.TestCase): + def generalDataFrame(self, headers, DataFrame): + mad = MAD() + mad.send(""" +test = mtable{ + {"string"}, "number", "integer", "complex", "boolean", "list", "table", "range",! "generator", + name = "test", + header = {"string", "number", "integer", "complex", "boolean", "list", "table", "range"}, + string = "string", + number = 1.234567890, + integer = 12345670, + complex = 1.3 + 1.2i, + boolean = true, + list = {1, 2, 3, 4, 5}, + table = {1, 2, ["key"] = "value"}, + range = MAD.range(1, 11), +} + + {"a", 1.1, 1, 1 + 2i, true , {1, 2 }, {1 , 2 , ["3" ] = 3 }, MAD.range(1, 11),} + + {"b", 2.2, 2, 2 + 3i, false, {3, 4 }, {4 , 5 , ["6" ] = 6 }, MAD.range(2, 12),} + + {"c", 3.3, 3, 3 + 4i, true , {5, 6 }, {7 , 8 , ["9" ] = 9 }, MAD.range(3, 13),} + + {"d", 4.4, 4, 4 + 5i, false, {7, 8 }, {10, 11, ["12"] = 12}, MAD.range(4, 14),} + + {"e", 5.5, 5, 5 + 6i, true , {9, 10}, {13, 14, ["15"] = 15}, MAD.range(5, 15),} + +test:addcol("generator", \\ri, m -> m:getcol("number")[ri] + 1i * m:getcol("number")[ri]) +test:write("test") + """) + df = mad.test.to_df() + self.assertTrue(isinstance(df, DataFrame)) + header = getattr(df, headers) + self.assertEqual(header["name"], "test") + self.assertEqual(header["string"], "string") + self.assertEqual(header["number"], 1.234567890) + self.assertEqual(header["integer"], 12345670) + self.assertEqual(header["complex"], 1.3 + 1.2j) + self.assertEqual(header["boolean"], True) + self.assertEqual(header["list"], [1, 2, 3, 4, 5]) + tbl = getattr(df, headers)["table"] + self.assertEqual([x for x in tbl], [1, 2]) + self.assertEqual(tbl["key"], "value") + + self.assertEqual(df["string"].tolist(), ["a", "b", "c", "d", "e"]) + self.assertEqual(df["number"].tolist(), [1.1, 2.2, 3.3, 4.4, 5.5]) + self.assertEqual(df["integer"].tolist(), [1, 2, 3, 4, 5]) + self.assertEqual( + df["complex"].tolist(), [1 + 2j, 2 + 3j, 3 + 4j, 4 + 5j, 5 + 6j] + ) + self.assertEqual(df["boolean"].tolist(), [True, False, True, False, True]) + self.assertEqual(df["list"].tolist(), [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]]) + tbl = df["table"].tolist() + for i in range(len(tbl)): + lst = tbl[i] + self.assertEqual([lst[0], lst[1]], [i * 3 + 1, i * 3 + 2]) + self.assertEqual(lst[str((i + 1) * 3)], (i + 1) * 3) + self.assertEqual( + df["range"].tolist(), + [range(1, 12), range(2, 13), range(3, 14), range(4, 15), range(5, 16)], + ) + + def test_tfsDataFrame(self): + self.generalDataFrame("headers", tfs.TfsDataFrame) + + def test_pandasDataFrame(self): + sys.modules["tfs"] = None # Remove tfs-pandas + self.generalDataFrame("attrs", pandas.DataFrame) + del sys.modules["tfs"] + + def test_failure(self): + with MAD() as mad: + mad.send(""" +test = mtable{"string", "number"} + {"a", 1.1} + {"b", 2.2} + """) + pandas = sys.modules["pandas"] + sys.modules["pandas"] = None + self.assertRaises(ImportError, lambda: mad.test.to_df()) + sys.modules["pandas"] = pandas + df = mad.test.to_df() + self.assertTrue(isinstance(df, tfs.TfsDataFrame)) + self.assertEqual(df["string"].tolist(), ["a", "b"]) + self.assertEqual(df["number"].tolist(), [1.1, 2.2]) + + +class TestEval(unittest.TestCase): + def test_eval(self): + with MAD() as mad: + mad.send("a = 10; b = 20") + result = mad.eval("a + b") + self.assertEqual(result, 30) + + mad.send("c = {1, 2, 3, 4}") + result = mad.eval("c[2]") # Lua indexing starts at 1 + self.assertEqual(result, 2) + + mad.send("d = MAD.matrix(2, 2):seq()") + result = mad.eval("d[2]") # Accessing matrix element + self.assertEqual(result, 2) + + def test_eval_class(self): + with MAD() as mad: + result = mad.math.sqrt(2) + mad.math.log(10) + self.assertTrue(isinstance(result, mad_high_level_last_ref)) + self.assertEqual(result.eval(), math.sqrt(2) + math.log(10)) + + +class TestIteration(unittest.TestCase): + def test_iterate_through_object(self): + with MAD() as mad: + mad.send(""" + qd = MAD.element.quadrupole "qd" {knl = {0, 0.25}, l = 1.6} + my_obj = sequence {qd, qd, qd, qd} + """) + for elem in mad.my_obj: + if elem.name == "qd": + self.assertEqual(elem.l, 1.6) + self.assertEqual(elem.knl, [0, 0.25]) + else: + self.assertEqual(elem.kind, "marker") + + +if __name__ == "__main__": + unittest.main()