Skip to content
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions src/transfer_raw_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,19 @@ def register(
key="arcBackup",
value="SLAC_RAW_DISK_BKUP:need",
)
self.did_client.set_metadata(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make this a separate commit.

scope="raw",
name=datasets[-1],
key="SafeCopyCheck",
value="need",
)


def comma_separated_list(arg):
"""
Custom type function to split a comma-separated string into a list.
"""
return arg.split(',')


def parse_args():
Expand All @@ -321,9 +334,9 @@ def parse_args():
help="Butler repository from which data is transferred.",
)
parser.add_argument(
"torepo",
type=str,
help="Repository to which data is transferred.",
"torepos",
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep the name but add nargs="+" and not use comma_separated_list.

type=comma_separated_list,
help="Comma separated list of repositories to which data is transferred.",
)

parser.add_argument(
Expand Down Expand Up @@ -461,7 +474,7 @@ def process_exposure(exp: DimensionRecord, instrument: str) -> None:
instrument: `str`
The name of the instrument corresponding to the exposure.
"""
global logger, config, source_butler, dest_butler, rucio_interface
global logger, config, source_butler, dest_butlers, rucio_interface

# Check several times (before each major step) for existence of the
# result to avoid work in case of race conditions
Expand Down Expand Up @@ -653,12 +666,14 @@ def process_exposure(exp: DimensionRecord, instrument: str) -> None:

logger.info("Transferring dimension records to destination Butler repo")
if not config.dry_run:
dest_butler.transfer_dimension_records_from(source_butler, refs)
for dest_butler in dest_butlers:
dest_butler.transfer_dimension_records_from(source_butler, refs)

logger.info("Ingesting zip: %s", dest_path)
if not config.dry_run:
with time_this(logger, "Ingesting zip"):
dest_butler.ingest_zip(dest_path, transfer="direct")
for dest_butler in dest_butlers:
dest_butler.ingest_zip(dest_path, transfer="direct")

if config.rucio_rse:
logger.info("Registering zip in Rucio")
Expand All @@ -684,13 +699,13 @@ def process_exposure(exp: DimensionRecord, instrument: str) -> None:
config: argparse.Namespace
logger: logging.Logger
source_butler: Butler
dest_butler: Butler
dest_butlers: list[Butler]
rucio_interface: RucioInterface


def initialize():
"""Set up the global variables."""
global config, source_butler, dest_butler, logger, rucio_interface
global config, source_butler, dest_butlers, logger, rucio_interface

config = parse_args()

Expand All @@ -711,7 +726,9 @@ def initialize():
logger.warning("dry_run=True, no writes")

source_butler = Butler(config.fromrepo, skymap="lsst_cells_v1")
dest_butler = Butler(config.torepo, writeable=True)
for torepo in config.torepos:
dest_butler = Butler(torepo, writeable=True)
dest_butlers.append(dest_butler)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More compact is:

Suggested change
for torepo in config.torepos:
dest_butler = Butler(torepo, writeable=True)
dest_butlers.append(dest_butler)
dest_butlers = [Butler(torepo, writeable=True) for torepo in config.torepos]

This should also solve the immediate test failure.


if config.rucio_rse:
rucio_interface = RucioInterface(config.rucio_rse, config.scope)
Expand Down
Loading