diff --git a/README.md b/README.md index 90fd5b6..b508564 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,10 @@ import xboinc as xb # prepare the line line = xt.Line.from_json("path/to/your/line.json") +# Add monitors in the line if needed +monitor = xt.ParticlesMonitor(...) +line.append("my_monitor", monitor) + # create a job manager job_manager = xb.JobSubmitter( user="mycernshortname", @@ -107,6 +111,8 @@ job_manager.submit() Note that the jobs will be executed on a single CPU core from a volunteer computer, we therefore recommend balancing the workload across multiple jobs to optimize the usage of available resources. Xboinc will offer a time estimate for each job, which can help you to decide how many particles to track in each job. Note also that we are currently enforcing a lower time limit of 90 seconds for each job, as it becomes not practical to use the BOINC platform for jobs that take less time than that. +Moreover, to avoid excessive bandwidth/disk usage, we currently limit the size of a single job to 1 GB. Consider this when allocating large monitors in the line, as this has a direct impact on the resulting footprint of the input and output files transmitted. + ## Retrieve the results When the jobs are completed, the Xboinc server will store the results in your allocated folder in compressed tar files. You can decompress and explore them by using the `JobRetriever` class from the `xboinc` package. The simplest way to do that is: @@ -114,8 +120,13 @@ When the jobs are completed, the Xboinc server will store the results in your al ```python import xboinc as xb -for job_name, job_metadata, result_particles in xb.JobRetriever.iterate("mycernshortname", "a_relevant_study_name", dev_server=True): - print(f"Job {job_name} completed with particles: {result_particles.to_dict()}") +for job_name, job_metadata, result_particles, line_of_monitors, io_buffer in xb.JobRetriever.iterate("mycernshortname", "a_relevant_study_name", dev_server=True): + print(f"Job {job_name} completed!") + # Tracked particles are available directly + print(result_particles.at_turn) + # Monitors are stored in a separate line with only monitors inside + # You can access them by their given name as you would in your original line! + print(line_of_monitors.element_dict["my_monitor"].x) print(f"Job metadata: {job_metadata}") ``` diff --git a/docs/source/conf.py b/docs/source/conf.py index 0509238..7dd23d4 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -10,7 +10,7 @@ "Frederik F. Van der Veken, Carlo E. Montanari, Davide Di Croce, Giovanni Iadorala" ) copyright = f"2025, {author}" -release = "0.4.1" +release = "0.5.0" version = release # -- General configuration --------------------------------------------------- diff --git a/pyproject.toml b/pyproject.toml index 3cdc3c6..df09484 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,14 +24,14 @@ include = [ "LICENSE", "NOTICE" ] python = '>=3.9' numpy = '>=1.0' packaging = '>=23.0' -xobjects = ">=0.5.0" +xobjects = '==0.5.2' xdeps = '==0.10.5' -xpart = '==0.23.0' -xtrack = '==0.84.7' -xfields = '==0.24.0' -xcoll = '==0.6.1' +xpart = '==0.23.1' +xtrack = '==0.88.2' +xfields = '==0.25.1' +xcoll = '==0.6.2' xaux = '==0.3.5' -xsuite = '==0.32.3' +xsuite = '==0.36.1' [poetry.group.dev.dependencies] pytest = ">7" diff --git a/tests/test_01_compilation_and_running.py b/tests/test_01_compilation_and_running.py index 034f23c..69bda6d 100644 --- a/tests/test_01_compilation_and_running.py +++ b/tests/test_01_compilation_and_running.py @@ -7,11 +7,13 @@ import os import subprocess import time +import pandas as pd from pathlib import Path from typing import Optional, Tuple import numpy as np import pytest +import xcoll as xc import xtrack as xt import xboinc as xb @@ -119,6 +121,13 @@ def create_test_particles( The tracking line and initial particle distribution. """ line = xt.Line.from_json(TestConfig.LINE_FILE) + monitor = xt.ParticlesMonitor( + start_at_turn=0, + stop_at_turn=TestConfig.NUM_TURNS, + particle_id_range=(0, 10), + ) + line.append("my_monitor", monitor) + line.build_tracker() x_norm = np.linspace(-15, 15, TestConfig.NUM_PARTICLES) @@ -232,8 +241,43 @@ def assert_particles_equal( for attr in attributes: values1 = getattr(particles1, attr) values2 = getattr(particles2, attr) + assert np.array_equal( + + values1, values2 + ), f"{context}: {attr} values are not equal" + + +def assert_monitors_equal( + monitor1: xt.ParticlesMonitor, monitor2: xt.ParticlesMonitor, context: str +) -> None: + """ + Assert that two particle monitor objects are equivalent. + + Parameters + ---------- + monitor1, monitor2 : xt.ParticlesMonitor + The particle monitor objects to compare. + context : str + Description of the comparison context for error messages. + """ + attributes = [ + "particle_id", + "state", + "at_turn", + "x", + "y", + "zeta", + "px", + "py", + "delta", + ] + + for attr in attributes: + values1 = getattr(monitor1, attr) + values2 = getattr(monitor2, attr) assert np.array_equal( values1, values2 + ), f"{context}: {attr} values are not equal" @@ -523,6 +567,11 @@ def test_consistency_with_xtrack(skip_version_check, cleanup_files): xb_state_boinc.particles, f"xboinc vs xtrack (at_element={at_element})", ) + assert_monitors_equal( + line.element_dict["my_monitor"], + xb_state_boinc.monitors.element_dict["my_monitor"], + f"xboinc vs xtrack (at_element={at_element})", + ) # Test different stop elements stop_elements = ["ip2", 3500] @@ -568,3 +617,90 @@ def test_consistency_with_xtrack(skip_version_check, cleanup_files): xb_state.particles, f"{exec_name} vs xtrack (ele_stop={ele_stop})", ) + assert_monitors_equal( + line.element_dict["my_monitor"], + xb_state.monitors.element_dict["my_monitor"], + f"{exec_name} vs xtrack (ele_stop={ele_stop})", + ) + + +@pytest.mark.parametrize( + "use_boinc", + [ + False, + pytest.param( + True, + marks=pytest.mark.skipif( + not TestConfig.vcpkg_available(), + reason="VCPKG + BOINC installation not found", + ), + ), + ], + ids=["w/o BOINC api", "with BOINC api"], +) +def test_impact_table(use_boinc, skip_version_check, cleanup_files): + # Get line and collimators + line = xt.Line.from_json(xb._pkg_root.parent / 'tests' / 'data' / 'xcoll' / 'lhc_run3_b1.json') + colldb = xc.CollimatorDatabase.from_yaml(xb._pkg_root.parent / 'tests' / 'data' / 'xcoll' / 'lhc_run3.yaml', beam=1) + colldb.install_everest_collimators(verbose=True, line=line) + df_with_coll = line.check_aperture() + assert not np.any(df_with_coll.has_aperture_problem) + copy_line = line.copy() + + # Start interaction record + impacts = xc.InteractionRecord.start(line=line) + copy_impacts = xc.InteractionRecord.start(line=copy_line) + + # Build tracker, assign optics and generate particles + line.build_tracker() + copy_line.build_tracker() + + # Final touches... + line.collimators.assign_optics() + part = line['tcp.d6l7.b1'].generate_pencil(5000) + line.scattering.enable() + + # Create input file + input_file = Path.cwd() / TestConfig.INPUT_FILE + xb_input = xb.XbInput( + line=line, + particles=part, + num_turns=20, + checkpoint_every=100, + io_buffer=line.tracker.io_buffer, + ) + xb_input.to_binary(input_file) + + # Track with Xsuite + line.track(part, num_turns=20) + ref_df = impacts.to_pandas() + blank_df = copy_impacts.to_pandas() + + # They should NOT be equal!!! + assert not ref_df.equals(blank_df) + + # Track with Xboinc + executable_test = get_executable_path(use_boinc=False) + run_xboinc_tracking(executable_test) + output_file = Path.cwd() / TestConfig.OUTPUT_FILE + xb_state = xb.XbState.from_binary(output_file) + + # Inject xb_state io_buffer into new line + xb_state.place_io_buffer(copy_line) + xb_df = copy_impacts.to_pandas() + + # Now, these two DataFrames should be equal + pd.testing.assert_frame_equal(ref_df, xb_df) + + # If VCPKG is available, re-run the test with BOINC + if TestConfig.vcpkg_available(): + # Track with Xboinc + executable_test = get_executable_path(use_boinc=True) + run_xboinc_tracking(executable_test) + output_file = Path.cwd() / TestConfig.OUTPUT_FILE + xb_state = xb.XbState.from_binary(output_file) + + # Inject xb_state io_buffer into new line + xb_state.place_io_buffer(copy_line) + xb_df = copy_impacts.to_pandas() + pd.testing.assert_frame_equal(ref_df, xb_df) diff --git a/tests/test_version.py b/tests/test_version.py index ec280a9..d7f1c91 100644 --- a/tests/test_version.py +++ b/tests/test_version.py @@ -3,12 +3,17 @@ # Copyright (c) CERN, 2025. # ########################################### # -import pytest import sys +import pytest + from xboinc import __version__, __xsuite__versions__ from xboinc.simulation_io import XbVersion -from xboinc.simulation_io.version import _version_to_int, _int_to_version, assert_versions +from xboinc.simulation_io.version import ( + _int_to_version, + _version_to_int, + assert_versions, +) def test_version(): @@ -25,13 +30,13 @@ def test_xb_ver(): def test_xsuite_versions(): expected_version = { - 'xobjects' : '0.5.0', + 'xobjects' : '0.5.2', 'xdeps' : '0.10.5', - 'xpart' : '0.23.0', - 'xtrack' : '0.84.7', - 'xfields' : '0.24.0', - 'xcoll' : '0.6.1', - 'xaux' : '0.3.5' + 'xpart' : '0.23.1', + 'xtrack' : '0.88.2', + 'xfields' : '0.25.1', + 'xcoll' : '0.6.2', + 'xaux' : '0.3.5', } current_version = {} diff --git a/xboinc/executable/CMakeLists.txt b/xboinc/executable/CMakeLists.txt index 46c8ea2..292fd3e 100644 --- a/xboinc/executable/CMakeLists.txt +++ b/xboinc/executable/CMakeLists.txt @@ -6,7 +6,7 @@ # ################################################################################ cmake_minimum_required(VERSION 3.16) -project(xboinc VERSION 0.4.0 LANGUAGES C CXX) +project(xboinc VERSION 0.5.0 LANGUAGES C CXX) # === Compiler and Build Options === set(CMAKE_C_STANDARD 99) diff --git a/xboinc/executable/main.cpp b/xboinc/executable/main.cpp index 4c49c1c..e5475b6 100644 --- a/xboinc/executable/main.cpp +++ b/xboinc/executable/main.cpp @@ -83,6 +83,10 @@ static void XB_fprintf(int8_t verbose_level, FILE *stream, char *format, ...) static FILE* XB_fopen(char *filename, const char *mode); static FILE* XB_fopen_allow_null(char *filename, const char *mode); static int8_t* XB_file_to_buffer(FILE *fid, int8_t *buf_in); +static void XB_line_to_monitors(XbInput &xb_input, XbState &xb_state); +static void XB_monitors_to_line(XbInput &xb_input, XbState &xb_state); +static void XB_io_buffer_to_line(XbInput &xb_input, XbState &xb_state); +static void XB_line_to_io_buffer(XbInput &xb_input, XbState &xb_state); static int XB_do_checkpoint(XbInput xb_input, XbState xb_state); @@ -154,6 +158,8 @@ int main(int argc, char **argv){ FILE* checkpoint_state = XB_fopen_allow_null(XB_CHECKPOINT_FILE, "rb"); if (checkpoint_state){ XB_file_to_buffer(checkpoint_state, (int8_t*) xb_state); + XB_monitors_to_line(xb_input, xb_state); + XB_io_buffer_to_line(xb_input, xb_state); current_turn = XbState_get__i_turn(xb_state); XB_fprintf(1, stdout, "Loaded checkpoint, continuing from turn %d.\n", (int) current_turn); } else { @@ -173,6 +179,17 @@ int main(int argc, char **argv){ const int64_t num_elements = XbInput_get_num_elements(xb_input); const int64_t ele_start = XbInput_get_ele_start(xb_input); const int64_t ele_stop = XbInput_get_ele_stop(xb_input); + const int64_t size_io_buffer = XbInput_get_size_io_buffer(xb_input); + int8_t *io_buffer; + if (size_io_buffer == 0) + { + io_buffer = NULL; + } + else + { + io_buffer = reinterpret_cast(XbInput_getp1_io_buffer(xb_input, 0)); + } + XB_fprintf(1, stdout, "num_turns: %d\n", (int) num_turns); XB_fprintf(1, stdout, "num_elements: %d\n", (int) num_elements); XB_fprintf(1, stdout, "ele_start: %d\n", (int) ele_start); @@ -237,7 +254,7 @@ int main(int argc, char **argv){ 0.0, // double line_length, (needed only for backtracking) NULL, // int8_t* buffer_tbt_monitor, 0, // int64_t offset_tbt_monitor - NULL // int8_t* io_buffer, + io_buffer // int8_t* io_buffer, ); current_turn += step_turns; XB_fprintf(2, stdout, "Tracked turn %i\n", current_turn); @@ -248,6 +265,8 @@ int main(int argc, char **argv){ boinc_time_to_checkpoint() || #endif (checkpoint_every > 0 && current_turn % checkpoint_every == 0) ){ + XB_line_to_monitors(xb_input, xb_state); + XB_line_to_io_buffer(xb_input, xb_state); retval = XB_do_checkpoint(xb_input, xb_state); if (retval) { XB_fprintf(0, stderr, "Checkpointing failed!\n"); @@ -268,7 +287,8 @@ int main(int argc, char **argv){ } // End main loop =========== // ========================== - + XB_line_to_monitors(xb_input, xb_state); + XB_line_to_io_buffer(xb_input, xb_state); XB_fprintf(1, stdout, "Finished tracking\n"); // Write output @@ -376,6 +396,119 @@ static int8_t* XB_file_to_buffer(FILE *fid, int8_t *buf_in){ } +static void XB_line_to_monitors(XbInput &xb_input, XbState &xb_state) { + XB_fprintf(1, stdout, "Moving monitors data from line to output.\n"); + int64_t num_monitors = XbInput_get_num_monitors(xb_input); + if (num_monitors == 0) { + XB_fprintf(1, stdout, "No monitors in the line.\n"); + return; + } + + ElementRefData elem_ref_data = XbInput_getp_line_metadata(xb_input); + ElementRefData monitors_metadata = XbState_getp__monitors_metadata(xb_state); + + for (int64_t i = 0; i < num_monitors; i++) { + int64_t idx = XbInput_get_idx_monitors(xb_input, i); + int64_t size = XbInput_get_size_monitors(xb_input, i); + + XB_fprintf(1, stdout, "Monitor %ld size: %ld, idx: %ld\n", i, size, idx); + + if (idx < 0 || idx >= XbInput_get_num_elements(xb_input)) { + XB_fprintf(0, stderr, "Monitor index %ld out of bounds.\n", idx); + continue; + } + void* monitor_from = MyElementRefData_member_elements(elem_ref_data, idx); + void* monitor_to = MyElementRefData_member_elements(monitors_metadata, i); + if (!monitor_from || !monitor_to) { + XB_fprintf(0, stderr, "Monitor pointer invalid for index %ld.\n", idx); + continue; + } + + memcpy(monitor_to, monitor_from, size * sizeof(int8_t)); + } +} + + +static void XB_monitors_to_line(XbInput &xb_input, XbState &xb_state) { + XB_fprintf(1, stdout, "Moving monitors data from checkpoint to line.\n"); + int64_t num_monitors = XbInput_get_num_monitors(xb_input); + if (num_monitors == 0) { + XB_fprintf(1, stdout, "No monitors in the line.\n"); + return; + } + + ElementRefData elem_ref_data = XbInput_getp_line_metadata(xb_input); + ElementRefData monitors_metadata = XbState_getp__monitors_metadata(xb_state); + + for (int64_t i = 0; i < num_monitors; i++) { + int64_t idx = XbInput_get_idx_monitors(xb_input, i); + int64_t size = XbInput_get_size_monitors(xb_input, i); + + XB_fprintf(1, stdout, "Monitor %ld size: %ld, idx: %ld\n", i, size, idx); + + if (idx < 0 || idx >= XbInput_get_num_elements(xb_input)) + { + XB_fprintf(0, stderr, "Monitor index %ld out of bounds.\n", idx); + continue; + } + void* monitor_from = MyElementRefData_member_elements(monitors_metadata, i); + void* monitor_to = MyElementRefData_member_elements(elem_ref_data, idx); + if (!monitor_from || !monitor_to) { + XB_fprintf(0, stderr, "Monitor pointer invalid for index %ld.\n", idx); + continue; + } + + memcpy(monitor_to, monitor_from, size * sizeof(int8_t)); + } +} + + +static void XB_io_buffer_to_line(XbInput &xb_input, XbState &xb_state) { + XB_fprintf(1, stdout, "Moving I/O buffer data from checkpoint to line.\n"); + int64_t size_io_buffer = XbInput_get_size_io_buffer(xb_input); + if (size_io_buffer == 0) { + XB_fprintf(1, stdout, "No I/O buffer data to move.\n"); + return; + } + + int8_t *io_buffer_to = reinterpret_cast(XbInput_getp1_io_buffer(xb_input, 0)); + int8_t *io_buffer_from = reinterpret_cast(XbState_getp1__io_buffer(xb_state, 0)); + + if (!io_buffer_from || !io_buffer_to) { + XB_fprintf(0, stderr, "Invalid I/O buffer pointer.\n"); + return; + } + + XB_fprintf(1, stdout, "Moving %ld bytes from I/O buffer to line.\n", size_io_buffer); + + memcpy(io_buffer_to, io_buffer_from, size_io_buffer * sizeof(int8_t)); +} + + +static void XB_line_to_io_buffer(XbInput &xb_input, XbState &xb_state) { + XB_fprintf(1, stdout, "Moving line data from line to I/O buffer.\n"); + int64_t size_io_buffer = XbInput_get_size_io_buffer(xb_input); + if (size_io_buffer == 0) { + XB_fprintf(1, stdout, "No I/O buffer data to move.\n"); + return; + } + + int8_t *io_buffer_from = reinterpret_cast(XbInput_getp1_io_buffer(xb_input, 0)); + int8_t *io_buffer_to = reinterpret_cast(XbState_getp1__io_buffer(xb_state, 0)); + + int64_t len_from = XbInput_len_io_buffer(xb_input); + int64_t len_to = XbState_len__io_buffer(xb_state); + + if (!io_buffer_from || !io_buffer_to) { + XB_fprintf(0, stderr, "Invalid I/O buffer pointer.\n"); + return; + } + + XB_fprintf(1, stdout, "Moving %ld bytes from I/O buffer to line.\n", size_io_buffer); + + memcpy(io_buffer_to, io_buffer_from, size_io_buffer * sizeof(int8_t)); +} + static int XB_do_checkpoint(XbInput xb_input, XbState xb_state) { FILE *chkp_fid = XB_fopen(XB_CHECKPOINT_FILE, "wb"); if (!chkp_fid) { diff --git a/xboinc/executable/xtrack.h b/xboinc/executable/xtrack.h index b797da2..161816a 100644 --- a/xboinc/executable/xtrack.h +++ b/xboinc/executable/xtrack.h @@ -9,7 +9,21 @@ #define XB_XTRACK_HEADERS typedef struct ElementRefData_s * ElementRefData; -typedef struct ParticlesData_s * ParticlesData; + +// We manually re-define ElementRefData_member_elements to a copycat +// MyElementRefData_member_elements to avoid the name clash and the big +// headache it would take to extract the static inline void* function coming +// from the original xtrack tracker code. +void *MyElementRefData_member_elements(const ElementRefData obj, int64_t i0) +{ + int64_t offset = 0; + offset += 16; + offset += 16 + i0 * 16; + offset += *(int64_t *)((char *)obj + offset); + return (void *)((char *)obj + offset); +} + +typedef struct ParticlesData_s *ParticlesData; void track_line(int8_t*,ElementRefData,ParticlesData,int,int,int,int,int,int,int,double,int8_t*,int64_t,int8_t*); typedef struct XbState_s * XbState; @@ -18,6 +32,9 @@ void XbState_set__i_turn(XbState,int64_t); ParticlesData XbState_getp__particles(XbState); int64_t XbState_get__particles__capacity(const XbState); int64_t XbState_get__particles_state(const XbState,int64_t); +int64_t *XbState_getp1__io_buffer(XbState, int64_t); +int64_t XbState_len__io_buffer(const XbState); +ElementRefData XbState_getp__monitors_metadata(XbState); typedef struct XbInput_s * XbInput; int64_t XbInput_get__version_xboinc_version(const XbInput); @@ -28,6 +45,12 @@ int64_t XbInput_get_num_turns(const XbInput); int64_t XbInput_get_num_elements(const XbInput); int64_t XbInput_get_ele_start(const XbInput); int64_t XbInput_get_ele_stop(const XbInput); +int64_t XbInput_get_num_monitors(const XbInput); +int64_t XbInput_get_size_io_buffer(const XbInput); +int64_t XbInput_get_idx_monitors(const XbInput, int64_t); +int64_t XbInput_get_size_monitors(const XbInput, int64_t); +int64_t *XbInput_getp1_io_buffer(XbInput, int64_t); +int64_t XbInput_len_io_buffer(const XbInput); XbState XbInput_getp_xb_state(XbInput); int64_t XbInput_get_xb_state__xsize(const XbInput); diff --git a/xboinc/general.py b/xboinc/general.py index 4d6bb99..2efc5ee 100644 --- a/xboinc/general.py +++ b/xboinc/general.py @@ -19,12 +19,13 @@ # making a minor release. If a new package needs to be pinned, add it here with # a random version number, and similarily in the pyproject.toml __xsuite__versions__ = { - 'xobjects' : '0.5.0', + 'xobjects' : '0.5.2', 'xdeps' : '0.10.5', - 'xpart' : '0.23.0', - 'xtrack' : '0.84.7', - 'xfields' : '0.24.0', - 'xcoll' : '0.6.1', + 'xpart' : '0.23.1', + 'xtrack' : '0.88.2', + 'xfields' : '0.25.1', + 'xcoll' : '0.6.2', + 'xsuite' : '0.36.1', 'xaux' : '0.3.5', } # ============================================================================== diff --git a/xboinc/retrieve.py b/xboinc/retrieve.py index 7790982..58df591 100644 --- a/xboinc/retrieve.py +++ b/xboinc/retrieve.py @@ -7,6 +7,7 @@ from warnings import warn import pandas as pd +import xtrack as xt from tqdm.auto import tqdm from xaux import FsPath, eos_accessible @@ -294,7 +295,7 @@ def get_study_status(self, study_name, verbose=False): return result_job_names, diff_jobs - def iterate_results(self, study_name): + def iterate_results(self, study_name, line=None): """ Iterate over all results for a specific study. @@ -305,11 +306,19 @@ def iterate_results(self, study_name): ---------- study_name : str Name of the study to iterate over + line : xt.Line, optional + If provided, the io_buffer from the results will be placed into its tracker and yielded. If not provided, the io_buffer will be yielded as is. By default, None. Yields ------ - tuple of (str, dict, xpart.Particles) - Job name, corresponding metadata, and particles object for each result + tuple of (str, dict, xtrack.Particles, xtrack.Line, xtrack.Line or BufferNumpy) + - Job name + - Metadata about the job + - Corresponding particles object + - A Line containing the monitors that were contained in the Line + with the resulting data + - The provided line with its updated io_buffer or the original + io_buffer Raises ------ @@ -324,9 +333,10 @@ def iterate_results(self, study_name): Examples -------- >>> retriever = JobRetriever('myuser', dev_server=True) - >>> for job_name, particles in retriever.iterate_results('my_study'): + >>> for job_name, particles, monitors in retriever.iterate_results('my_study'): ... print(f"Processing job: {job_name}") ... print(f"Number of particles: {len(particles.x)}") + ... print(f"Number of monitors: {len(monitors.element_dict)}") """ if study_name not in self._df["study_name"].unique(): raise ValueError(f"Study name {study_name} not found in results.") @@ -342,6 +352,11 @@ def iterate_results(self, study_name): UserWarning, ) continue + + if line is None: + yield job_name, result.particles, result.monitors, result.io_buffer + else: + result.place_io_buffer(line) try: with open(json_file, "r") as f: metadata = json.load(f) @@ -357,7 +372,7 @@ def iterate_results(self, study_name): UserWarning, ) metadata = {} - yield job_name, metadata, result.particles + yield job_name, metadata, result.particles, result.monitors, line def clean(self, study_name): """ @@ -419,8 +434,8 @@ def iterate(cls, user, study_name, dev_server=False, silent=False): Yields ------ - tuple of (str, xpart.Particles) - Job name and corresponding particles object for each result + tuple of (str, xtrack.Particles) + track.Line) Job name and corresponding particles object for each result Examples -------- diff --git a/xboinc/simulation_io/default_tracker.py b/xboinc/simulation_io/default_tracker.py index 0487a8b..38fbf47 100644 --- a/xboinc/simulation_io/default_tracker.py +++ b/xboinc/simulation_io/default_tracker.py @@ -6,139 +6,32 @@ # ============================================================================== # IMPORTANT # ============================================================================== -# Only make changes to this file just before a minor version bump (need a -# separate commit though) to avoid having multiple xboinc versions with +# Only make changes to this file just before a minor version bump (need a +# separate commit though) to avoid having multiple xboinc versions with # out-of-sync executables. # ============================================================================== import xtrack as xt -import xfields as xf -import xcoll as xc - -from xtrack.beam_elements import * -from xtrack.monitors import * -from xtrack.random import * -from xtrack.multisetter import MultiSetter +from xsuite.kernel_definitions import ( + DEFAULT_XCOLL_ELEMENTS, + DEFAULT_XF_ELEMENTS, + NO_SYNRAD_ELEMENTS, + ONLY_XTRACK_ELEMENTS, +) from .version import assert_versions -ONLY_XTRACK_ELEMENTS = [ - Drift, - Multipole, - Bend, - RBend, - Quadrupole, - Sextupole, - Octupole, - Magnet, - SecondOrderTaylorMap, - Marker, - ReferenceEnergyIncrease, - Cavity, - Elens, - Wire, - Solenoid, - RFMultipole, - DipoleEdge, - MultipoleEdge, - SimpleThinBend, - SimpleThinQuadrupole, - LineSegmentMap, - FirstOrderTaylorMap, - NonLinearLens, - # Slices - DriftSlice, - DriftSliceBend, - DriftSliceRBend, - DriftSliceOctupole, - DriftSliceQuadrupole, - DriftSliceSextupole, - ThickSliceBend, - ThickSliceRBend, - ThickSliceOctupole, - ThickSliceQuadrupole, - ThickSliceSextupole, - ThickSliceSolenoid, - ThinSliceBend, - ThinSliceRBend, - ThinSliceBendEntry, - ThinSliceBendExit, - ThinSliceRBendEntry, - ThinSliceRBendExit, - ThinSliceQuadrupoleEntry, - ThinSliceQuadrupoleExit, - ThinSliceSextupoleEntry, - ThinSliceSextupoleExit, - ThinSliceOctupoleEntry, - ThinSliceOctupoleExit, - ThinSliceOctupole, - ThinSliceQuadrupole, - ThinSliceSextupole, - # Transformations - XYShift, - ZetaShift, - XRotation, - SRotation, - YRotation, - # Apertures - LimitEllipse, - LimitRectEllipse, - LimitRect, - LimitRacetrack, - LimitPolygon, - LongitudinalLimitRect, - # Monitors - BeamPositionMonitor, - BeamSizeMonitor, - BeamProfileMonitor, - LastTurnsMonitor, - ParticlesMonitor, -] - -NO_SYNRAD_ELEMENTS = [ - Exciter, -] - -# Xfields elements -DEFAULT_XF_ELEMENTS = [ - xf.BeamBeamBiGaussian2D, - xf.BeamBeamBiGaussian3D, - xf.SpaceChargeBiGaussian, -] - -# Xcoll elements -DEFAULT_XCOLL_ELEMENTS = [ - # ZetaShift, - xc.BlackAbsorber, - xc.EverestBlock, - xc.EverestCollimator, - xc.EverestCrystal, - xc.BlowUp, - xc.EmittanceMonitor, -] - -NON_TRACKING_ELEMENTS = [ - RandomUniform, - RandomExponential, - RandomNormal, - RandomRutherford, - MultiSetter, -] - -default_element_classes = ( +default_element_classes = list(set( ONLY_XTRACK_ELEMENTS + NO_SYNRAD_ELEMENTS + DEFAULT_XF_ELEMENTS + DEFAULT_XCOLL_ELEMENTS -) +)) # The class ElementRefData is dynamically generated inside the tracker. We # extract it here and use it to create the line metadata inside XbInput ElementRefData = xt.tracker._element_ref_data_class_from_element_classes( - ONLY_XTRACK_ELEMENTS - + NO_SYNRAD_ELEMENTS - + DEFAULT_XF_ELEMENTS - + DEFAULT_XCOLL_ELEMENTS, + default_element_classes, ) if {f.name for f in ElementRefData._fields} != {'elements', 'names'}: raise RuntimeError("The definition of `ElementRefData` has changed inside Xtrack! " diff --git a/xboinc/simulation_io/input.py b/xboinc/simulation_io/input.py index a41c626..e0b744b 100644 --- a/xboinc/simulation_io/input.py +++ b/xboinc/simulation_io/input.py @@ -6,21 +6,37 @@ # ============================================================================== # IMPORTANT # ============================================================================== -# Only make changes to this file just before a minor version bump (need a -# separate commit though) to avoid having multiple xboinc versions with +# Only make changes to this file just before a minor version bump (need a +# separate commit though) to avoid having multiple xboinc versions with # out-of-sync executables. # ============================================================================== -import numpy as np from pathlib import Path +import numpy as np import xobjects as xo -import xpart as xp import xtrack as xt - +from xcoll.beam_elements.monitor import EmittanceMonitor +from xtrack.monitors import ( + ParticlesMonitor, + LastTurnsMonitor, + BeamSizeMonitor, + BeamProfileMonitor, + BeamPositionMonitor, +) + +from .default_tracker import ElementRefData +from .output import XbState, _build_line_metadata from .version import XbVersion, assert_versions -from .default_tracker import default_element_classes, get_default_config, ElementRefData -from .output import XbState + +ALLOWED_MONITOR_CLASSES = ( + EmittanceMonitor, + ParticlesMonitor, + LastTurnsMonitor, + BeamSizeMonitor, + BeamProfileMonitor, + BeamPositionMonitor, +) # TODO: line.particle_ref is not dumped nor retrieved... Why is this no issue? # TODO: parity @@ -38,15 +54,19 @@ class XbInput(xo.Struct): - _version = XbVersion # This HAS to be the first field! - num_turns = xo.Int64 - num_elements = xo.Int64 - ele_start = xo.Int64 # The start index of the elements in the line - ele_stop = xo.Int64 # The end index of the elements in the line + _version = XbVersion # This HAS to be the first field! + num_turns = xo.Int64 + num_elements = xo.Int64 + ele_start = xo.Int64 # The start index of the elements in the line + ele_stop = xo.Int64 # The end index of the elements in the line checkpoint_every = xo.Int64 - _parity_check = xo.Int64 # TODO - xb_state = XbState - line_metadata = xo.Ref(ElementRefData) + num_monitors = xo.Int64 # Number of monitors in the line + size_io_buffer = xo.Int64 # Size of the I/O buffer + io_buffer = xo.Ref(xo.Int64[:]) # I/O buffer to be used in the tracking + idx_monitors = xo.Ref(xo.Int64[:]) # Indices of the monitors in the line + size_monitors = xo.Ref(xo.Int64[:]) # Buffer size of the monitors + line_metadata = xo.Ref(ElementRefData) + xb_state = xo.Ref(XbState) # This HAS to be the last field! def __init__(self, **kwargs): """ @@ -71,6 +91,8 @@ def __init__(self, **kwargs): checkpoint_every : Int64, optional When to checkpoint. The default value -1 represents no checkpointing. + io_buffer : BufferNumpy, optional + The I/O buffer to use for the simulation. By default is None. store_element_names : bool, optional Whether or not to store the element names in the binary. Defaults to True. @@ -78,68 +100,121 @@ def __init__(self, **kwargs): The other xofields are generated automatically and will be overwritten if provided. """ - assert_versions() - kwargs['_version'] = XbVersion() - kwargs.setdefault('_buffer', _xboinc_context.new_buffer()) - kwargs.setdefault('checkpoint_every', -1) - kwargs.setdefault('ele_start', 0) - kwargs.setdefault('ele_stop', -1) # Will be set to the number of elements in the line + # NOTE: In this initialization, it is important to maintain the order of + # the fields and of their initialization, as this will affect directly + # their position in the resulting buffer we operate with. At the end of + # this initialization, the XbState and its internal monitors-only line + # will be at the end of the buffer, enabling a simple cut-and-paste + # manouver at the C++ level in the final Xboinc executable. + + # Set up version and buffer + kwargs["_version"] = XbVersion() + kwargs.setdefault("_buffer", _xboinc_context.new_buffer()) + kwargs.setdefault("checkpoint_every", -1) + + # Handle element start/stop positions + kwargs.setdefault("ele_start", 0) + kwargs.setdefault("ele_stop", -1) # -1 means end of line + + line = kwargs.pop("line", None) + if not isinstance(line, xt.Line): + raise ValueError("Must provide a valid `line` to XbInput.") + + # Convert element name to index if needed if isinstance(kwargs["ele_start"], str): - kwargs["ele_start"] = kwargs["line"].element_names.index(kwargs["ele_start"]) + kwargs["ele_start"] = line.element_names.index(kwargs["ele_start"]) if isinstance(kwargs["ele_stop"], str): - kwargs["ele_stop"] = kwargs["line"].element_names.index(kwargs["ele_stop"]) + kwargs["ele_stop"] = line.element_names.index(kwargs["ele_stop"]) - # Pre-build particles / XbState; will be moved to correct buffer at XoStruct init - particles = kwargs.pop('particles', None) - xb_state = kwargs.get('xb_state', None) - if particles is not None: - if xb_state is not None: - raise ValueError("Use `xb_state` or `particles`, not both.") - kwargs['xb_state'] = XbState(particles=particles, _i_turn=0) - elif xb_state is None or not isinstance(xb_state, XbState): - raise ValueError("Need to provide `xb_state` or `particles`.") - # Get the line, build the metadata after building the XoStruct - # We need to do it like this because the elements are not moved correctly - line = kwargs.pop('line', None) - if kwargs.pop('line_metadata', None) is not None: + # Validate line metadata handling + if kwargs.pop("line_metadata", None) is not None: raise ValueError("Cannot provide the line metadata directly!") - store_element_names = kwargs.pop('store_element_names', True) + store_element_names = kwargs.pop("store_element_names", True) + + # Process monitors data + monitor_indices = [] + monitor_sizes = [] + monitor_line = xt.Line() + + for name, element in line.element_dict.items(): + if isinstance(element, ALLOWED_MONITOR_CLASSES): + monitor_line.append(name, element) + monitor_indices.append(line.element_names.index(name)) + monitor_sizes.append(element._xobject._size) + + kwargs["num_monitors"] = len(monitor_indices) + kwargs["idx_monitors"] = np.array(monitor_indices, dtype=np.int64) + kwargs["size_monitors"] = np.array(monitor_sizes, dtype=np.int64) + + kwargs.setdefault("io_buffer", None) + if kwargs["io_buffer"] is None: + kwargs["size_io_buffer"] = 0 + else: + kwargs["size_io_buffer"] = kwargs["io_buffer"].capacity + kwargs["io_buffer"] = kwargs["io_buffer"].buffer.view(np.int64).copy() + + # Initialize the parent class super().__init__(**kwargs) - self.line_metadata = _build_line_metadata(line, _buffer=self._buffer, - store_element_names=store_element_names) + + # Handle particles or state + particles = kwargs.pop("particles", None) + xb_state = kwargs.get("xb_state", None) + + if particles is not None and xb_state is not None: + raise ValueError("Use either `xb_state` or `particles`, not both.") + + # Set up line metadata + self.line_metadata = _build_line_metadata( + line, + _buffer=self._buffer, + store_element_names=store_element_names + ) self.num_elements = len(line.elements) - # Start position - if particles.start_tracking_at_element >= 0: + if particles is not None: + self.xb_state = XbState( + particles=particles, + monitor_line=monitor_line, + _i_turn=0, + _buffer=self._buffer, + store_element_names=store_element_names, + _io_buffer=kwargs["io_buffer"], + ) + elif not isinstance(xb_state, XbState): + raise ValueError("Need to provide either `xb_state` or `particles`.") + + # Handle element start position + if hasattr(particles, 'start_tracking_at_element') and particles.start_tracking_at_element >= 0: if self.ele_start != 0: raise ValueError( - "The argument ele_start is used, but particles.start_tracking_at_element is set as well. " - "Please use only one of those methods." + "Both ele_start argument and particles.start_tracking_at_element are set. " + "Please use only one method." ) self.ele_start = particles.start_tracking_at_element - if self.ele_start == -1: - self.ele_start = 0 - - assert self.ele_start >= 0 - assert self.ele_start <= self.num_elements + # Ensure valid element positions + self.ele_start = max(0, self.ele_start) + assert 0 <= self.ele_start <= self.num_elements assert self.num_turns > 0 + # Handle element stop position if self.ele_stop == -1: self.ele_stop = self.num_elements else: - if isinstance(self.ele_stop, str): - self.ele_stop = self.line.element_names.index(self.ele_stop) - assert self.ele_stop >= 0 - assert self.ele_stop <= self.num_elements + assert 0 <= self.ele_stop <= self.num_elements if self.ele_stop <= self.ele_start: - # Correct for overflow: + # Correct for overflow - need extra turn self.num_turns += 1 + # Final steps, shrink the buffer _shrink(self._buffer) + # Then, check where the xb_state starts in the buffer, and + # consequently evaluate its size, necessary for C++ manipulations + self.xb_state._xsize = self._buffer.capacity - self.xb_state._offset + @classmethod def from_binary(cls, filename, offset=0, raise_version_error=True): """ @@ -158,19 +233,23 @@ def from_binary(cls, filename, offset=0, raise_version_error=True): # Read binary filename = Path(filename) - with filename.open('rb') as fid: + with filename.open("rb") as fid: state_bytes = fid.read() buffer_data = _xboinc_context.new_buffer(capacity=len(state_bytes)) buffer_data.buffer[:] = np.frombuffer(state_bytes, dtype=np.int8) # Cast to XbVersion to verify versions of xsuite packages version_offset = -1 for field in cls._fields: - if field.name == '_version': + if field.name == "_version": version_offset = field.offset if version_offset == -1: raise ValueError("No xofield `_version` found in XbInput!") - xb_ver = XbVersion._from_buffer(buffer=buffer_data, offset=offset+version_offset) - if not xb_ver.assert_version(raise_error=raise_version_error, filename=filename): + xb_ver = XbVersion._from_buffer( + buffer=buffer_data, offset=offset + version_offset + ) + if not xb_ver.assert_version( + raise_error=raise_version_error, filename=filename + ): return None # Retrieve simulation input return cls._from_buffer(buffer=buffer_data, offset=offset) @@ -191,7 +270,7 @@ def to_binary(self, filename): _shrink(self._buffer) assert self._offset == 0 filename = Path(filename).expanduser().resolve() - with filename.open('wb') as fid: + with filename.open("wb") as fid: fid.write(self._buffer.buffer.tobytes()) @property @@ -200,7 +279,9 @@ def version(self): @property def line(self): - elements = [el._DressingClass(_xobject=el) for el in self.line_metadata.elements] + elements = [ + el._DressingClass(_xobject=el) for el in self.line_metadata.elements + ] names = self.line_metadata.names if len(names) == 0: n = len(elements) @@ -211,67 +292,21 @@ def line(self): @line.setter def line(self, val): # Only works as long as line_metadata is an xo.Ref, but we try to avoid this - raise NotImplementedError + raise NotImplementedError("Setting line metadata is not supported yet.") @property def particles(self): return self.xb_state.particles -def _build_line_metadata(line, _buffer=None, store_element_names=True): - # Create the ElementRefData from a given line - line_id = id(line) - # TODO: caching currently doesn't work - _previous_line_cache = {} - if line_id not in _previous_line_cache: - _check_config(line) - _check_compatible_elements(line) - if _buffer is None: - _buffer = _xboinc_context.new_buffer() - names = list(line.element_names) if store_element_names else [] - element_ref_data = ElementRefData( - elements=len(line.element_names), - names=names, - _buffer=_buffer, - ) - element_ref_data.elements = [ - line.element_dict[name]._xobject for name in line.element_names - ] - _previous_line_cache[line_id] = element_ref_data - - return _previous_line_cache[line_id] - -def _check_config(line): - # Check that the present config is on Xboinc - default_config_hash = get_default_config() - for key, val in default_config_hash: - if key not in line.config: - print(f"Warning: Configuration option `{key}` not found in line.config! " - + f"Set to Xboinc default `{val}`.") - elif val != line.config[key]: - print(f"Warning: Configuration option `{key}` set to `{line.config[key]}` " - + f"in line.config! Not supported by Xboinc. Overwritten to default `{val}`.") - for key in set(line.config.keys()) - {k[0] for k in default_config_hash}: - print(f"Warning: Configuration option `{key}` requested in line.config!" - + f"Not supported by Xboinc. Ignored.") - -def _check_compatible_elements(line): - # Check that all elements are supported by Xboinc - default_elements = [d.__name__ for d in default_element_classes] - for ee in np.unique([ee.__class__.__name__ for ee in line.elements]): - if ee not in default_elements: - raise ValueError(f"Element of type {ee} not supported " - + f"in this version of xboinc!") - - def _shrink(buffer): # Shrink a buffer by removing all free capacity if buffer.get_free() > 0: new_capacity = buffer.capacity - buffer.get_free() newbuff = buffer._new_buffer(new_capacity) buffer.copy_to_native( - dest=newbuff, dest_offset=0, source_offset=0, nbytes=new_capacity - ) + dest=newbuff, dest_offset=0, source_offset=0, nbytes=new_capacity + ) buffer.buffer = newbuff buffer.capacity = new_capacity buffer.chunks = [] diff --git a/xboinc/simulation_io/output.py b/xboinc/simulation_io/output.py index 1c0508b..e7b1561 100644 --- a/xboinc/simulation_io/output.py +++ b/xboinc/simulation_io/output.py @@ -18,16 +18,19 @@ import xobjects as xo import xtrack as xt +from .default_tracker import ElementRefData, default_element_classes, get_default_config from .version import XbVersion, assert_versions class XbState(xo.Struct): - _version = XbVersion # This HAS to be the first field! - _i_turn = xo.Int64 # Current turn in tracking (not necessarily the same as particles.at_turn) - _xsize = xo.Int64 # Needed to have access to the size in C + _version = XbVersion # This HAS to be the first field! + _i_turn = xo.Int64 # Current turn in tracking + _xsize = xo.Int64 # Needed to have access to the size in C _particles = xt.Particles._XoStruct + _io_buffer = xo.Ref(xo.Int64[:]) # I/O buffer to be returned + _monitors_metadata = xo.Ref(ElementRefData) - def __init__(self, **kwargs): + def __init__(self, monitor_line=None, **kwargs): """ Parameters ---------- @@ -36,14 +39,25 @@ def __init__(self, **kwargs): """ assert_versions() - kwargs['_version'] = XbVersion() - particles = kwargs.pop('particles', None) + kwargs["_version"] = XbVersion() + particles = kwargs.pop("particles", None) if particles is None or not isinstance(particles, xt.Particles): raise ValueError("Need to provide `particles` to XbState.") - kwargs['_particles'] = particles._xobject + kwargs["_particles"] = particles._xobject + super().__init__(**kwargs) - self._xsize = self._size + if monitor_line is None: + self._monitors_metadata = ElementRefData() + elif isinstance(monitor_line, xt.Line): + self._monitors_metadata = _build_line_metadata( + monitor_line, + _buffer=kwargs["_buffer"], + store_element_names=kwargs.get("store_element_names", True), + ) + + # self._xsize HAS to be set externally in order to perform the + # appropriate buffer inspections @classmethod def from_binary(cls, filename, offset=0, raise_version_error=True): @@ -63,19 +77,23 @@ def from_binary(cls, filename, offset=0, raise_version_error=True): # Read binary filename = Path(filename) - with filename.open('rb') as fid: + with filename.open("rb") as fid: state_bytes = fid.read() buffer_data = xo.ContextCpu().new_buffer(capacity=len(state_bytes)) buffer_data.buffer[:] = np.frombuffer(state_bytes, dtype=np.int8) # Cast to XbVersion to verify versions of xsuite packages version_offset = -1 for field in cls._fields: - if field.name == '_version': + if field.name == "_version": version_offset = field.offset if version_offset == -1: raise ValueError("No xofield `_version` found in XbState!") - xb_ver = XbVersion._from_buffer(buffer=buffer_data, offset=offset+version_offset) - if not xb_ver.assert_version(raise_error=raise_version_error, filename=filename): + xb_ver = XbVersion._from_buffer( + buffer=buffer_data, offset=offset + version_offset + ) + if not xb_ver.assert_version( + raise_error=raise_version_error, filename=filename + ): return None # Retrieve simulation state return cls._from_buffer(buffer=buffer_data, offset=offset) @@ -93,12 +111,13 @@ def to_binary(self, filename): ------- None. """ - assert self._offset == 0 # TODO: create new buffer if this is not the case (like when XbState inside XbInput) + assert ( + self._offset == 0 + ) # TODO: create new buffer if this is not the case (like when XbState inside XbInput) filename = Path(filename).expanduser().resolve() - with filename.open('wb') as fid: + with filename.open("wb") as fid: fid.write(self._buffer.buffer.tobytes()) - @property def version(self): return self._version @@ -110,3 +129,84 @@ def particles(self): @property def i_turn(self): return self._i_turn + + @property + def monitors(self): + elements = [ + el._DressingClass(_xobject=el) for el in self._monitors_metadata.elements + ] + names = self._monitors_metadata.names + if len(np.array(names)) == 0: + # line is empty, return empty Line + return xt.Line(elements=[], element_names=[]) + if len(names) == 0: + n = len(elements) + digits = int(np.ceil(np.log10(n))) + names = [f"el_{i:>0{digits}}" for i in range(n)] + return xt.Line(elements=elements, element_names=names) + + @property + def io_buffer(self): + return self._io_buffer + + def place_io_buffer(self, line): + """Given a xt.Line, place the io_buffer into the tracker.""" + line.tracker.io_buffer.update_from_nplike( + 0, "int8", self._io_buffer.to_nparray().view(np.int8) + ) + return line + + +def _check_config(line): + # Check that the present config is on Xboinc + default_config_hash = get_default_config() + for key, val in default_config_hash: + if key not in line.config: + print( + f"Warning: Configuration option `{key}` not found in line.config! " + + f"Set to Xboinc default `{val}`." + ) + elif val != line.config[key]: + print( + f"Warning: Configuration option `{key}` set to `{line.config[key]}` " + + f"in line.config! Not supported by Xboinc. Overwritten to default `{val}`." + ) + for key in set(line.config.keys()) - {k[0] for k in default_config_hash}: + print( + f"Warning: Configuration option `{key}` requested in line.config!" + + "Not supported by Xboinc. Ignored." + ) + + +def _check_compatible_elements(line): + # Check that all elements are supported by Xboinc + default_elements = [d.__name__ for d in default_element_classes] + for ee in np.unique([ee.__class__.__name__ for ee in line.elements]): + if ee not in default_elements: + raise ValueError( + f"Element of type {ee} not supported " + f"in this version of xboinc!" + ) + + +def _build_line_metadata(line, _buffer=None, store_element_names=True): + # Create the ElementRefData from a given line + line_id = id(line) + # TODO: caching currently doesn't work + _previous_line_cache = {} + if line_id not in _previous_line_cache: + _check_config(line) + _check_compatible_elements(line) + if _buffer is None: + _buffer = _xboinc_context.new_buffer() + names = list(line.element_names) if store_element_names else [] + element_ref_data = ElementRefData( + elements=len(line.element_names), + names=names, + _buffer=_buffer, + ) + element_ref_data.elements = [ + line.element_dict[name]._xobject for name in line.element_names + ] + _previous_line_cache[line_id] = element_ref_data + + return _previous_line_cache[line_id] diff --git a/xboinc/submit.py b/xboinc/submit.py index d24e254..7021a3f 100644 --- a/xboinc/submit.py +++ b/xboinc/submit.py @@ -45,6 +45,9 @@ UPPER_TIME_BOUND = 3 * 24 * 60 * 60 # seconds, maximum time, 3 days SWEET_SPOT_TIME = 8 * 60 * 60 # seconds, default "ideal" time for a job, 8 hours +# For now, let's set a size limit of 1GB for job inputs, set constant in bytes +XB_INPUT_SIZE_LIMIT = 1 * 1024 * 1024 * 1024 # 1GB + def _get_num_elements_from_line(line): """ @@ -205,6 +208,7 @@ def add( particles, line=None, checkpoint_every=-1, + with_records=False, **kwargs, ): """ @@ -238,6 +242,10 @@ def add( checkpoint_every : int, optional Checkpoint interval in turns. Default is -1 (no checkpointing). If positive, simulation state will be saved every N turns. + with_records : bool, optional + If True, the io_buffer of the line, used for internal logging and + impact tables, will be included in the job submission. By default, + this is False. **kwargs Additional job metadata to be included in the job JSON file. @@ -342,6 +350,11 @@ def add( } with json_file.open("w", encoding="utf-8") as fid: json.dump(json_dict, fid, cls=xo.JEncoder) + + if with_records: + if not hasattr(line.tracker, "io_buffer"): + raise ValueError("Line tracker is missing the io_buffer attribute. Have you built the tracker and activated the internal logging?") + data = XbInput( num_turns=num_turns, line=line, @@ -350,7 +363,16 @@ def add( store_element_names=False, ele_start=ele_start, ele_stop=-ele_stop, + io_buffer=line.tracker.io_buffer if with_records else None, ) + + # check the size of data + if data._buffer.capacity > XB_INPUT_SIZE_LIMIT: + raise ValueError( + f"Input data size of {data._buffer.capacity} bytes exceeds limit of {XB_INPUT_SIZE_LIMIT} bytes. " + "Please reduce the size of the job by revising the number of particles or the extent of the monitors included." + ) + data.to_binary(bin_file) self._json_files += [json_file] self._bin_files += [bin_file] @@ -369,6 +391,7 @@ def slice_and_add( ele_stop=-1, particles, line=None, + with_records=False, checkpoint_every=-1, target_execution_time=SWEET_SPOT_TIME, **kwargs, @@ -398,6 +421,9 @@ def slice_and_add( The tracking line for this specific job. If None, uses the line provided during JobSubmitter initialization. Providing a line per job is slower due to repeated preprocessing. + with_records : bool, optional + If True, the job will include additional tracking information by + passing the io_buffer. Default is False. checkpoint_every : int, optional Checkpoint interval in turns. Default is -1 (no checkpointing). If positive, simulation state will be saved every N turns. @@ -468,7 +494,17 @@ def slice_and_add( else: mask[i * part_per_job : (i + 1) * part_per_job] = True sliced_particles = particles.filter(mask) - self.add(job_name=f"{base_job_name}_{i}", num_turns=num_turns, ele_start=ele_start, ele_stop=ele_stop, particles=sliced_particles, line=line, checkpoint_every=checkpoint_every, **kwargs) + self.add( + job_name=f"{base_job_name}_{i}", + num_turns=num_turns, + ele_start=ele_start, + ele_stop=ele_stop, + particles=sliced_particles, + line=line, + with_records=with_records, + checkpoint_every=checkpoint_every, + **kwargs, + ) def submit(self): """