Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 23 additions & 34 deletions src/IntSetupRSEs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,17 @@
from rucio.client import Client
from rucio.common.exception import RSEProtocolNotSupported, RSENotFound, Duplicate

RSES_TO_SET = ['T1_US_FNAL_Disk', 'T2_CH_CERN', 'T2_US_Purdue',]
RSES_WITHOUT_INPUT = ['T3_US_NERSC', 'T2_US_Wisconsin', 'T2_US_Nebraska', 'T2_US_UCSD']
RSES_ONLY_INPUT = ['T1_US_FNAL_Tape', 'T0_CH_CERN_Tape', 'T1_IT_CNAF_Tape']

#RSES_TO_SET = ['T1_US_FNAL_Disk']
#RSES_WITHOUT_INPUT = []
#RSES_ONLY_INPUT = []

RSES_TO_SET = ['T2_CH_CERN']
RSES_INPUT = ['T2_DE_DESY']



INT_SETTINGS = {'availability_read': True, 'availability_write': True, 'availability_delete': True}
INT_ATTRIBUTES = {'ddm_quota': 0, 'cms_type': 'int', 'reaper': True}
INT_ATTRIBUTES = {'dm_weight': 1, 'cms_type': 'int', 'reaper': True}
INPUT_SETTINGS = {'availability_read': True, 'availability_write': False, 'availability_delete': False}
INPUT_ATTRIBUTES = {'ddm_quota': 0, 'cms_type': 'input', 'reaper': False}
INPUT_ATTRIBUTES = {'dm_weight': 1,'cms_type': 'input', 'reaper': False}

DEFAULT_PORTS = {'gsiftp': 2811, 'root': 1094, 'davs': 1094}
DEFAULT_PORTS = {'root': 1094, 'davs': 2880}


def create_rse(client, rse_name, rse_type='DISK', settings=None):
Expand Down Expand Up @@ -71,7 +65,7 @@ def overwrite_protocols(client, rse_name, new_protocols=None):
logging.debug("Cannot remove protocol %s from %s", scheme, rse_name)

for new_proto in new_protocols:
if new_proto['scheme'] in ['srm', 'srmv2', 'gsiftp', 'root', 'davs']:
if new_proto['scheme'] in ['root', 'davs']:
try:
print('Adding %s to %s' % (new_proto['scheme'], rse_name))
client.add_protocol(rse=rse_name, params=new_proto)
Expand All @@ -82,19 +76,12 @@ def overwrite_protocols(client, rse_name, new_protocols=None):
def set_rse_attributes(client, rse_name, attributes):
attributes = attributes or []

# FIXME: This might be used later. If not delete
try:
current_attributes = client.list_rse_attributes(rse=rse_name)
except RSENotFound:
current_attributes = {}

# changed = False

for (key, value) in attributes.items():
print('Setting %s=%s for %s' % (key, value, rse_name))
client.add_rse_attribute(rse=rse_name, key=key, value=value)
changed = True

# return changed
return changed


def rewrite_protocols(protocols, pfns, read_write=False, enforce_prefix=True):
Expand All @@ -114,7 +101,6 @@ def rewrite_protocols(protocols, pfns, read_write=False, enforce_prefix=True):
r'(?P<scheme>\w+)://(?P<host>[a-zA-Z0-9\-\.]+):(?P<port>\d+)(?P<wsp>\/.*\=)(?P<prefix>.*)',
r'(?P<scheme>\w+)://(?P<host>[a-zA-Z0-9\-\.]+)(?P<wsp>\/.*\=)(?P<prefix>.*)',
]
#{'type': 1, 'regexp': re.compile(r'(\w+)://([a-zA-Z0-9\-\.]+):(\d+)(\/.*\=)(.*)')},

int_root = pfns['cms:/store/test/rucio/int/']
for regex in regexes:
Expand All @@ -135,10 +121,10 @@ def rewrite_protocols(protocols, pfns, read_write=False, enforce_prefix=True):

if read_write:
new_domains = {'lan': {'delete': 0, 'read': 0, 'write': 0},
'wan': {'delete': 1, 'read': 1, 'third_party_copy': 1, 'write': 1}}
'wan': {'delete': 1, 'read': 1, 'third_party_copy_read': 1,'third_party_copy_write': 1, 'write': 1}}
else:
new_domains = {'lan': {'delete': 0, 'read': 0, 'write': 0},
'wan': {'delete': 0, 'read': 1, 'third_party_copy': 1, 'write': 0}}
'wan': {'delete': 0, 'read': 1, 'third_party_copy_read': 1,'third_party_copy_write': 1, 'write': 0}}

for protocol in protocols:
new_protocol = copy.deepcopy(protocol)
Expand All @@ -150,13 +136,15 @@ def rewrite_protocols(protocols, pfns, read_write=False, enforce_prefix=True):
new_protocol['prefix'] = prefix
# Need to handle SRM separately
extended_attributes = protocol['extended_attributes'] or {}
if 'web_service_path' in extended_attributes:
web_path = extended_attributes['web_service_path']
if 'tfc' in extended_attributes:
tfc = extended_attributes['tfc']
tfc_proto = extended_attributes['tfc_proto']
new_protocol['extended_attributes'] = {}
new_protocol['extended_attributes']['web_service_path'] = web_path
prefix = prefix.replace(web_path, '')
new_protocol['prefix'] = prefix
if protocol['scheme'] in ['gsiftp', 'srm']:
new_protocol['extended_attributes']['tfc'] = tfc
new_protocol['extended_attributes']['tfc_proto'] = tfc_proto
if protocol['scheme'] in ['davs', 'root']:
if protocol['scheme']=='root':
new_protocol['domains']['wan'] = {'delete': 0, 'read': 3, 'third_party_copy_read': 3, 'third_party_copy_write': 0, 'write': 0}
new_protocols.append(new_protocol)

return new_protocols
Expand All @@ -166,10 +154,11 @@ def rewrite_protocols(protocols, pfns, read_write=False, enforce_prefix=True):
rci = Client(rucio_host='http://cms-rucio-int.cern.ch', auth_host='https://cms-rucio-auth-int.cern.ch',
account='root')
rcp = Client(rucio_host='http://cms-rucio.cern.ch', auth_host='https://cms-rucio-auth.cern.ch',
account='ewv')
account='transfer_ops')


input_rses = set(RSES_TO_SET+RSES_ONLY_INPUT)
write_rses = set(RSES_TO_SET+RSES_WITHOUT_INPUT)
input_rses = set(RSES_TO_SET+RSES_INPUT)
write_rses = set(RSES_TO_SET)

for rse_name in set(RSES_TO_SET+RSES_WITHOUT_INPUT+RSES_ONLY_INPUT):
# Fetch the values needed from the production RSE
Expand Down Expand Up @@ -200,4 +189,4 @@ def rewrite_protocols(protocols, pfns, read_write=False, enforce_prefix=True):
set_rse_attributes(client=rci, rse_name=input_rse, attributes=input_attributes)
rse_protocols = rse['protocols']
input_protocols = rewrite_protocols(rse_protocols, pfns, read_write=False, enforce_prefix=False)
overwrite_protocols(client=rci, rse_name=input_rse, new_protocols=input_protocols)
overwrite_protocols(client=rci, rse_name=input_rse, new_protocols=input_protocols)
32 changes: 17 additions & 15 deletions src/IntSyncDatasets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#! /usr/bin/env python3
import json
import logging
from pprint import pprint

from rucio.client import Client
from rucio.common.exception import (DataIdentifierAlreadyExists, DuplicateContent, DuplicateRule, FileAlreadyExists,
Expand All @@ -24,7 +25,7 @@ def sync_block(rcp, rci, name, destinations=None):
states = replica['states']
rses = [(INPUT_RSE % rse) for rse in all_rses if states[rse] == 'AVAILABLE']
file_dids = [{'scope': 'cms', 'name': lfn}]
# rci.add_did(scope=SCOPE, name=lfn, type='FILE')
# rci.add_did(scope=SCOPE, name=lfn, did_type='FILE')
# try:
# rci.attach_dids(scope=SCOPE, name=name, dids=file_dids)
# except DuplicateContent:
Expand All @@ -34,7 +35,6 @@ def sync_block(rcp, rci, name, destinations=None):
# pdb.set_trace()
try:
result = rci.add_replicas(rse=rse, files=new_replicas)
print(rse, new_replicas, result)
except RSENotFound:
print('Source RSE %s not found. No replica made.' % rse)
try:
Expand All @@ -50,24 +50,26 @@ def sync_block(rcp, rci, name, destinations=None):
rci = Client(rucio_host='http://cms-rucio-int.cern.ch', auth_host='https://cms-rucio-auth-int.cern.ch',
account='root')
rcp = Client(rucio_host='http://cms-rucio.cern.ch', auth_host='https://cms-rucio-auth.cern.ch',
account='ewv')
account='transfer_ops')

with open('int_wmcore_datasets.json', 'r') as wmcore_file:
containers = json.load(wmcore_file)
#with open('int_wmcore_datasets.json', 'r') as wmcore_file:
# containers = json.load(wmcore_file)
containers = [{'name':'/JetMET1/Run2023D-22Sep2023_v1-v1/MINIAOD'}]
destinations = ['T2_CH_CERN']

for container in containers:
name = container['name']
if '#' in name: # This is already a block, make the container
block_names = [container['name']]
container_name, _ = container['name'].split('#')
try:
rci.add_did(scope=SCOPE, name=container_name, type='CONTAINER')
rci.add_did(scope=SCOPE, name=container_name, did_type='CONTAINER')
except DataIdentifierAlreadyExists:
logging.debug('Container already existed')
else:
blocks = rcp.list_content(scope=SCOPE, name=name)
try:
rci.add_did(scope=SCOPE, name=name, type='CONTAINER')
rci.add_did(scope=SCOPE, name=name, did_type='CONTAINER')
except DataIdentifierAlreadyExists:
logging.debug('Container already existed')
block_names = sorted([block['name'] for block in blocks])
Expand All @@ -76,7 +78,7 @@ def sync_block(rcp, rci, name, destinations=None):
for block_name in block_names:
block_dids = [{'scope': 'cms', 'name': block_name}]
try:
rci.add_did(scope=SCOPE, name=block_name, type='DATASET')
rci.add_did(scope=SCOPE, name=block_name, did_type='DATASET')
except DataIdentifierAlreadyExists:
logging.debug('Block already existed')
try:
Expand All @@ -85,10 +87,10 @@ def sync_block(rcp, rci, name, destinations=None):
logging.debug('Block already attached')
except DataIdentifierNotFound:
print('DID not found %s' % block_name)
sync_block(rcp=rcp, rci=rci, name=block_name) # destinations=container['destinations']
for destination in container.get('destinations', ['T1_US_FNAL_Disk', 'T2_CH_CERN']):
dest_rse = INT_RSE % destination
try:
rci.add_replication_rule(block_dids, 1, dest_rse, account='transfer_ops')
except DuplicateRule:
print('Rule already made for %s at %s' % (name, dest_rse))
sync_block(rcp=rcp, rci=rci, name=block_name)
for destination in destinations:
dest_rse = INT_RSE % destination
try:
rci.add_replication_rule(dids=[{'scope':'cms','name':name}], copies=1, rse_expression=dest_rse, account='transfer_ops')
except DuplicateRule:
print('Rule already made for %s at %s' % (name, dest_rse))