diff --git a/src/IntSetupRSEs.py b/src/IntSetupRSEs.py index c25fbcd..918c52c 100755 --- a/src/IntSetupRSEs.py +++ b/src/IntSetupRSEs.py @@ -8,23 +8,17 @@ from rucio.client import Client from rucio.common.exception import RSEProtocolNotSupported, RSENotFound, Duplicate -RSES_TO_SET = ['T1_US_FNAL_Disk', 'T2_CH_CERN', 'T2_US_Purdue',] -RSES_WITHOUT_INPUT = ['T3_US_NERSC', 'T2_US_Wisconsin', 'T2_US_Nebraska', 'T2_US_UCSD'] -RSES_ONLY_INPUT = ['T1_US_FNAL_Tape', 'T0_CH_CERN_Tape', 'T1_IT_CNAF_Tape'] - -#RSES_TO_SET = ['T1_US_FNAL_Disk'] -#RSES_WITHOUT_INPUT = [] -#RSES_ONLY_INPUT = [] - +RSES_TO_SET = ['T2_CH_CERN'] +RSES_INPUT = ['T2_DE_DESY'] INT_SETTINGS = {'availability_read': True, 'availability_write': True, 'availability_delete': True} -INT_ATTRIBUTES = {'ddm_quota': 0, 'cms_type': 'int', 'reaper': True} +INT_ATTRIBUTES = {'dm_weight': 1, 'cms_type': 'int', 'reaper': True} INPUT_SETTINGS = {'availability_read': True, 'availability_write': False, 'availability_delete': False} -INPUT_ATTRIBUTES = {'ddm_quota': 0, 'cms_type': 'input', 'reaper': False} +INPUT_ATTRIBUTES = {'dm_weight': 1,'cms_type': 'input', 'reaper': False} -DEFAULT_PORTS = {'gsiftp': 2811, 'root': 1094, 'davs': 1094} +DEFAULT_PORTS = {'root': 1094, 'davs': 2880} def create_rse(client, rse_name, rse_type='DISK', settings=None): @@ -71,7 +65,7 @@ def overwrite_protocols(client, rse_name, new_protocols=None): logging.debug("Cannot remove protocol %s from %s", scheme, rse_name) for new_proto in new_protocols: - if new_proto['scheme'] in ['srm', 'srmv2', 'gsiftp', 'root', 'davs']: + if new_proto['scheme'] in ['root', 'davs']: try: print('Adding %s to %s' % (new_proto['scheme'], rse_name)) client.add_protocol(rse=rse_name, params=new_proto) @@ -82,19 +76,12 @@ def overwrite_protocols(client, rse_name, new_protocols=None): def set_rse_attributes(client, rse_name, attributes): attributes = attributes or [] - # FIXME: This might be used later. If not delete - try: - current_attributes = client.list_rse_attributes(rse=rse_name) - except RSENotFound: - current_attributes = {} - - # changed = False - for (key, value) in attributes.items(): print('Setting %s=%s for %s' % (key, value, rse_name)) client.add_rse_attribute(rse=rse_name, key=key, value=value) + changed = True - # return changed + return changed def rewrite_protocols(protocols, pfns, read_write=False, enforce_prefix=True): @@ -114,7 +101,6 @@ def rewrite_protocols(protocols, pfns, read_write=False, enforce_prefix=True): r'(?P\w+)://(?P[a-zA-Z0-9\-\.]+):(?P\d+)(?P\/.*\=)(?P.*)', r'(?P\w+)://(?P[a-zA-Z0-9\-\.]+)(?P\/.*\=)(?P.*)', ] -#{'type': 1, 'regexp': re.compile(r'(\w+)://([a-zA-Z0-9\-\.]+):(\d+)(\/.*\=)(.*)')}, int_root = pfns['cms:/store/test/rucio/int/'] for regex in regexes: @@ -135,10 +121,10 @@ def rewrite_protocols(protocols, pfns, read_write=False, enforce_prefix=True): if read_write: new_domains = {'lan': {'delete': 0, 'read': 0, 'write': 0}, - 'wan': {'delete': 1, 'read': 1, 'third_party_copy': 1, 'write': 1}} + 'wan': {'delete': 1, 'read': 1, 'third_party_copy_read': 1,'third_party_copy_write': 1, 'write': 1}} else: new_domains = {'lan': {'delete': 0, 'read': 0, 'write': 0}, - 'wan': {'delete': 0, 'read': 1, 'third_party_copy': 1, 'write': 0}} + 'wan': {'delete': 0, 'read': 1, 'third_party_copy_read': 1,'third_party_copy_write': 1, 'write': 0}} for protocol in protocols: new_protocol = copy.deepcopy(protocol) @@ -150,13 +136,15 @@ def rewrite_protocols(protocols, pfns, read_write=False, enforce_prefix=True): new_protocol['prefix'] = prefix # Need to handle SRM separately extended_attributes = protocol['extended_attributes'] or {} - if 'web_service_path' in extended_attributes: - web_path = extended_attributes['web_service_path'] + if 'tfc' in extended_attributes: + tfc = extended_attributes['tfc'] + tfc_proto = extended_attributes['tfc_proto'] new_protocol['extended_attributes'] = {} - new_protocol['extended_attributes']['web_service_path'] = web_path - prefix = prefix.replace(web_path, '') - new_protocol['prefix'] = prefix - if protocol['scheme'] in ['gsiftp', 'srm']: + new_protocol['extended_attributes']['tfc'] = tfc + new_protocol['extended_attributes']['tfc_proto'] = tfc_proto + if protocol['scheme'] in ['davs', 'root']: + if protocol['scheme']=='root': + new_protocol['domains']['wan'] = {'delete': 0, 'read': 3, 'third_party_copy_read': 3, 'third_party_copy_write': 0, 'write': 0} new_protocols.append(new_protocol) return new_protocols @@ -166,10 +154,11 @@ def rewrite_protocols(protocols, pfns, read_write=False, enforce_prefix=True): rci = Client(rucio_host='http://cms-rucio-int.cern.ch', auth_host='https://cms-rucio-auth-int.cern.ch', account='root') rcp = Client(rucio_host='http://cms-rucio.cern.ch', auth_host='https://cms-rucio-auth.cern.ch', - account='ewv') + account='transfer_ops') + - input_rses = set(RSES_TO_SET+RSES_ONLY_INPUT) - write_rses = set(RSES_TO_SET+RSES_WITHOUT_INPUT) + input_rses = set(RSES_TO_SET+RSES_INPUT) + write_rses = set(RSES_TO_SET) for rse_name in set(RSES_TO_SET+RSES_WITHOUT_INPUT+RSES_ONLY_INPUT): # Fetch the values needed from the production RSE @@ -200,4 +189,4 @@ def rewrite_protocols(protocols, pfns, read_write=False, enforce_prefix=True): set_rse_attributes(client=rci, rse_name=input_rse, attributes=input_attributes) rse_protocols = rse['protocols'] input_protocols = rewrite_protocols(rse_protocols, pfns, read_write=False, enforce_prefix=False) - overwrite_protocols(client=rci, rse_name=input_rse, new_protocols=input_protocols) + overwrite_protocols(client=rci, rse_name=input_rse, new_protocols=input_protocols) \ No newline at end of file diff --git a/src/IntSyncDatasets.py b/src/IntSyncDatasets.py index ec80aec..4fd77f5 100755 --- a/src/IntSyncDatasets.py +++ b/src/IntSyncDatasets.py @@ -1,6 +1,7 @@ #! /usr/bin/env python3 import json import logging +from pprint import pprint from rucio.client import Client from rucio.common.exception import (DataIdentifierAlreadyExists, DuplicateContent, DuplicateRule, FileAlreadyExists, @@ -24,7 +25,7 @@ def sync_block(rcp, rci, name, destinations=None): states = replica['states'] rses = [(INPUT_RSE % rse) for rse in all_rses if states[rse] == 'AVAILABLE'] file_dids = [{'scope': 'cms', 'name': lfn}] - # rci.add_did(scope=SCOPE, name=lfn, type='FILE') + # rci.add_did(scope=SCOPE, name=lfn, did_type='FILE') # try: # rci.attach_dids(scope=SCOPE, name=name, dids=file_dids) # except DuplicateContent: @@ -34,7 +35,6 @@ def sync_block(rcp, rci, name, destinations=None): # pdb.set_trace() try: result = rci.add_replicas(rse=rse, files=new_replicas) - print(rse, new_replicas, result) except RSENotFound: print('Source RSE %s not found. No replica made.' % rse) try: @@ -50,10 +50,12 @@ def sync_block(rcp, rci, name, destinations=None): rci = Client(rucio_host='http://cms-rucio-int.cern.ch', auth_host='https://cms-rucio-auth-int.cern.ch', account='root') rcp = Client(rucio_host='http://cms-rucio.cern.ch', auth_host='https://cms-rucio-auth.cern.ch', - account='ewv') + account='transfer_ops') - with open('int_wmcore_datasets.json', 'r') as wmcore_file: - containers = json.load(wmcore_file) + #with open('int_wmcore_datasets.json', 'r') as wmcore_file: + # containers = json.load(wmcore_file) + containers = [{'name':'/JetMET1/Run2023D-22Sep2023_v1-v1/MINIAOD'}] + destinations = ['T2_CH_CERN'] for container in containers: name = container['name'] @@ -61,13 +63,13 @@ def sync_block(rcp, rci, name, destinations=None): block_names = [container['name']] container_name, _ = container['name'].split('#') try: - rci.add_did(scope=SCOPE, name=container_name, type='CONTAINER') + rci.add_did(scope=SCOPE, name=container_name, did_type='CONTAINER') except DataIdentifierAlreadyExists: logging.debug('Container already existed') else: blocks = rcp.list_content(scope=SCOPE, name=name) try: - rci.add_did(scope=SCOPE, name=name, type='CONTAINER') + rci.add_did(scope=SCOPE, name=name, did_type='CONTAINER') except DataIdentifierAlreadyExists: logging.debug('Container already existed') block_names = sorted([block['name'] for block in blocks]) @@ -76,7 +78,7 @@ def sync_block(rcp, rci, name, destinations=None): for block_name in block_names: block_dids = [{'scope': 'cms', 'name': block_name}] try: - rci.add_did(scope=SCOPE, name=block_name, type='DATASET') + rci.add_did(scope=SCOPE, name=block_name, did_type='DATASET') except DataIdentifierAlreadyExists: logging.debug('Block already existed') try: @@ -85,10 +87,10 @@ def sync_block(rcp, rci, name, destinations=None): logging.debug('Block already attached') except DataIdentifierNotFound: print('DID not found %s' % block_name) - sync_block(rcp=rcp, rci=rci, name=block_name) # destinations=container['destinations'] - for destination in container.get('destinations', ['T1_US_FNAL_Disk', 'T2_CH_CERN']): - dest_rse = INT_RSE % destination - try: - rci.add_replication_rule(block_dids, 1, dest_rse, account='transfer_ops') - except DuplicateRule: - print('Rule already made for %s at %s' % (name, dest_rse)) + sync_block(rcp=rcp, rci=rci, name=block_name) + for destination in destinations: + dest_rse = INT_RSE % destination + try: + rci.add_replication_rule(dids=[{'scope':'cms','name':name}], copies=1, rse_expression=dest_rse, account='transfer_ops') + except DuplicateRule: + print('Rule already made for %s at %s' % (name, dest_rse)) \ No newline at end of file