Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build_and_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
pull_request:

env:
EUPS_DISTRIB_VERSION: "w_2025_19"
EUPS_DISTRIB_VERSION: "w_2025_41"

jobs:
test:
Expand Down
2 changes: 0 additions & 2 deletions src/check_non_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ def read_dsrs(fd: io.TextIOBase, dimensions: DimensionUniverse) -> DatasetRef:

def dbretry(retry_label: str, func: Any, *args, **kwargs) -> Any:
"""Retry a database-dependent function call up to 10 times."""
global logger

retries = 0
max_retries = 10
while retries < max_retries:
Expand Down
6 changes: 0 additions & 6 deletions src/generate_non_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ def parse_args():

def dbretry(retry_label: str, func: Any, *args, **kwargs) -> Any:
"""Retry a database-dependent function call up to 10 times."""
global logger

retries = 0
max_retries = 10
while retries < max_retries:
Expand All @@ -89,8 +87,6 @@ def dbretry(retry_label: str, func: Any, *args, **kwargs) -> Any:

def gather_collection_info(collection: str):
"""Get information for a collection and its children."""
global logger, butler

collection_info = butler.collections.get_info(collection)
calibration_collections = set()
tagged_collections = set()
Expand Down Expand Up @@ -121,8 +117,6 @@ def gather_collection_info(collection: str):

def generate_dstypes(collection: str) -> set[DatasetType]:
"""Generate a list of exportable dataset types from a collection."""
global butler, logger

all_types = butler.registry.queryDatasetTypes("*")
collection_info = butler.collections.query_info(collection, include_summary=True)
calibration_collections, tagged_collections = gather_collection_info(collection)
Expand Down
12 changes: 9 additions & 3 deletions src/transfer_from_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ def parse_args():
action="store_true",
help="Register any new dataset types.",
)
parser.add_argument(
"--transfer",
type=str,
required=False,
default="copy",
help="Transfer type (default=copy).",
)
parser.add_argument(
"--log",
type=str,
Expand Down Expand Up @@ -111,8 +118,6 @@ def read_dsrs(fd: io.TextIOBase, dimensions: DimensionUniverse) -> DatasetRef:

def dbretry(retry_label: str, func: Any, *args, **kwargs) -> Any:
"""Retry a database-dependent function call up to 10 times."""
global logger

retries = 0
max_retries = 10
while retries < max_retries:
Expand All @@ -132,6 +137,7 @@ def dbretry(retry_label: str, func: Any, *args, **kwargs) -> Any:

def main():
global logger

config = parse_args()

CliLog.initLog(longlog=True)
Expand All @@ -155,7 +161,7 @@ def main():
dest_butler.transfer_from,
source_butler,
batch,
transfer="copy",
transfer=config.transfer,
skip_missing=True,
register_dataset_types=config.register_dataset_types,
transfer_dimensions=False,
Expand Down
42 changes: 18 additions & 24 deletions src/transfer_non_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@ def parse_args():


def transfer_data_query(data_query):
global config, source_butler, dest_butler

all_types = source_butler.registry.queryDatasetTypes(data_query.dataset_types)
collections_info = source_butler.collections.query_info(
data_query.collections, include_summary=True
Expand Down Expand Up @@ -157,7 +155,6 @@ def transfer_data_query(data_query):


def transfer_dimension(dimension, dataset_type, data_query, ok_timespan):
global config, source_butler, logger
try:
# data_query.where goes last to avoid injection overriding timespan
dim_where = f"({dimension}.timespan OVERLAPS :ok_timespan)"
Expand Down Expand Up @@ -196,7 +193,6 @@ def transfer_dimension(dimension, dataset_type, data_query, ok_timespan):


def transfer_dataset_type(dataset_type, collections, where, bind):
global source_butler, logger
logger.debug(f"Querying datasets: {where} {bind}")
dataset_refs = list(
# ok to have empty results because this is used with batching.
Expand Down Expand Up @@ -226,6 +222,24 @@ def transfer_dataset_type(dataset_type, collections, where, bind):
)


def main():
initialize()

if config.config_file:
logger.info("using config file %s", config.config_file)
with open(config.config_file, "r") as f:
data_queries = DataQuery.from_yaml(f)
else:
logger.info("Using dataqueries: %s", config.dataqueries)
data_queries = DataQuery.from_yaml(config.dataqueries)
logger.info("data_queries %s", data_queries)

for data_query in data_queries:
logger.info("Processing %s", data_query)
transfer_data_query(data_query)
return 0


config: argparse.Namespace
logger: logging.Logger
source_butler: Butler
Expand All @@ -252,25 +266,5 @@ def initialize():
dest_butler = Butler(config.torepo, writeable=True)


def main():
global config, logger

initialize()

if config.config_file:
logger.info("using config file %s", config.config_file)
with open(config.config_file, "r") as f:
data_queries = DataQuery.from_yaml(f)
else:
logger.info("Using dataqueries: %s", config.dataqueries)
data_queries = DataQuery.from_yaml(config.dataqueries)
logger.info("data_queries %s", data_queries)

for data_query in data_queries:
logger.info("Processing %s", data_query)
transfer_data_query(data_query)
return 0


if __name__ == "__main__":
main()
47 changes: 19 additions & 28 deletions src/transfer_raw_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ def _add_replica(
dry_run: `bool`
If true, only log, do not write anything.
"""
global logger

logger.info("Adding replica to %s: %s", self.rucio_rse, did)
if dry_run:
return
Expand Down Expand Up @@ -203,8 +201,6 @@ def _add_file_to_dataset(self, did: dict, dataset_id: str, dry_run: bool) -> Non
dry_run: `bool`
If true, only log, do not write anything.
"""
global logger

logger.info(
"Registering %s in dataset %s, RSE %s", did, dataset_id, self.rucio_rse
)
Expand Down Expand Up @@ -411,8 +407,6 @@ def transfer_data_query(data_query: DataQuery) -> None:
data_query: `DataQuery`
The query and associated embargo time.
"""
global logger, config, source_butler

# End of window is now - embargo length
end_time = config.now - TimeDelta(data_query.embargo_hours * 3600, format="sec")
# If window is defined, then start is that much before the end
Expand Down Expand Up @@ -461,8 +455,6 @@ def process_exposure(exp: DimensionRecord, instrument: str) -> None:
instrument: `str`
The name of the instrument corresponding to the exposure.
"""
global logger, config, source_butler, dest_butler, rucio_interface

# Check several times (before each major step) for existence of the
# result to avoid work in case of race conditions
zip_name = f"{exp.obs_id}.zip"
Expand Down Expand Up @@ -688,6 +680,25 @@ def process_exposure(exp: DimensionRecord, instrument: str) -> None:
rucio_interface: RucioInterface


def main():
"""Main function."""
initialize()

with open(config.config_file, "r") as f:
data_queries = DataQuery.from_yaml(f)
logger.info("data_queries %s", data_queries)
for query in data_queries:
if (
query.collections != f"{query.instrument}/raw/all"
or query.dataset_types != "raw"
):
raise ValueError(f"Invalid data query for raws: {query}")

for data_query in data_queries:
logger.info("Processing %s", data_query)
transfer_data_query(data_query)


def initialize():
"""Set up the global variables."""
global config, source_butler, dest_butler, logger, rucio_interface
Expand Down Expand Up @@ -717,25 +728,5 @@ def initialize():
rucio_interface = RucioInterface(config.rucio_rse, config.scope)


def main():
"""Main function."""
global config, logger
initialize()

with open(config.config_file, "r") as f:
data_queries = DataQuery.from_yaml(f)
logger.info("data_queries %s", data_queries)
for query in data_queries:
if (
query.collections != f"{query.instrument}/raw/all"
or query.dataset_types != "raw"
):
raise ValueError(f"Invalid data query for raws: {query}")

for data_query in data_queries:
logger.info("Processing %s", data_query)
transfer_data_query(data_query)


if __name__ == "__main__":
main()