Skip to content

Commit e4861f5

Browse files
bug fixes for site invalidation mode #994
bug fixes for site invalidation mode
2 parents b790c1d + 32f8681 commit e4861f5

3 files changed

Lines changed: 31 additions & 11 deletions

File tree

DMOps/file_invalidation_tool/src/container_invalidation.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from CMSSpark.spark_utils import get_spark_session
77
from hadoop_queries import get_df_rse_locks, get_df_rse_replicas, get_df_contents
88
from pyspark.sql.window import Window
9+
from file_invalidation import includes_rse_safe
910

1011

1112
@click.command()
@@ -15,7 +16,6 @@
1516
help='RSE to look at')
1617
@click.option('--mode', required=False, type=click.Choice(['rucio','spark']), default='rucio', help='List generation mode')
1718
def invalidate_containers(filename,rse, mode):
18-
#TODO: Check rse option
1919

2020
if mode=='rucio':
2121
#Start Rucio Client
@@ -77,7 +77,7 @@ def invalidate_containers(filename,rse, mode):
7777
df_rules = pd.concat([df_rules,df_rules_i])
7878

7979
if rse is not None:
80-
df_rules['includes_rse'] = df_rules['rse_expression'].apply(lambda exp: {'rse':rse} in list(rucio_client.list_rses(rse_expression=exp)))
80+
df_rules['includes_rse'] = df_rules['rse_expression'].apply(lambda exp: includes_rse_safe(rucio_client,exp, rse))
8181
df_rules = df_rules.loc[df_rules.includes_rse]
8282

8383
df_rules.columns = df_rules.columns.str.upper()

DMOps/file_invalidation_tool/src/dataset_invalidation.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from pyspark.sql.functions import col, collect_list, concat_ws
2020
from hadoop_queries import get_df_rse_locks, get_df_rse_replicas, get_df_contents, get_df_dataset_level_rules
2121
from pyspark.sql.window import Window
22+
from file_invalidation import includes_rse_safe
2223

2324
@click.command()
2425
@click.option('--filename', required=True, default=None, type=str,
@@ -65,7 +66,7 @@ def invalidate_datasets(filename,rse, mode):
6566
df_rules = pd.DataFrame(columns=['rse','rule_id'])
6667

6768
if rse is not None:
68-
df_rules['includes_rse'] = df_rules['rse_expression'].apply(lambda exp: {'rse':rse} in list(rucio_client.list_rses(rse_expression=exp)))
69+
df_rules['includes_rse'] = df_rules['rse_expression'].apply(lambda exp: includes_rse_safe(rucio_client,exp, rse))
6970
df_rules = df_rules.loc[df_rules.includes_rse]
7071

7172
df_rules.columns = df_rules.columns.str.upper()

DMOps/file_invalidation_tool/src/file_invalidation.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,23 @@
1919
from pyspark.sql.window import Window
2020
import pandas as pd
2121
from rucio.client import Client
22+
from rucio.common.exception import InvalidRSEExpression, DataIdentifierNotFound
23+
24+
def includes_rse_safe(client,exp, rse):
25+
try:
26+
return {'rse': rse} in list(
27+
client.list_rses(rse_expression=exp)
28+
)
29+
except InvalidRSEExpression as e:
30+
return False
31+
except Exception as e:
32+
return False
33+
34+
def list_rules_safe(client,d):
35+
try:
36+
return client.list_associated_rules_for_file(scope=d["scope"], name=d["name"])
37+
except DataIdentifierNotFound:
38+
return []
2239

2340

2441
@click.command()
@@ -59,16 +76,18 @@ def invalidate_files(filename, rse, mode):
5976
df_delete.drop_duplicates().to_csv('/input/rucio_replicas_inv.csv',index=False)
6077

6178
#Replicas to the rules
62-
df_rules = pd.DataFrame(columns=['subscription_id', 'rse_expression', 'source_replica_expression', 'ignore_account_limit', 'created_at', 'account', 'copies', 'activity', 'priority', 'updated_at', 'scope', 'expires_at', 'grouping', 'name', 'weight', 'notification', 'comments', 'did_type', 'locked', 'stuck_at', 'child_rule_id', 'state', 'locks_ok_cnt', 'purge_replicas', 'eol_at', 'id', 'error', 'locks_replicating_cnt', 'ignore_availability', 'split_container', 'locks_stuck_cnt', 'meta', 'bytes'])
63-
rules = [list(rucio_client.list_associated_rules_for_file(scope=d['scope'],name=d['name'])) for d in dict_delete]
64-
for r in rules:
65-
if df_rules.shape[0]==0 and len(r)>0:
66-
df_rules = pd.DataFrame(r)
67-
else:
68-
df_rules = pd.concat([df_rules,pd.DataFrame(r)],axis=0)
79+
rules = [rule for d in dict_delete for rule in list_rules_safe(rucio_client,d)]
80+
81+
df_rules = pd.DataFrame(
82+
rules,
83+
columns=['subscription_id', 'rse_expression', 'source_replica_expression','ignore_account_limit', 'created_at', 'account', 'copies', 'activity',
84+
'priority', 'updated_at', 'scope', 'expires_at', 'grouping', 'name','weight', 'notification', 'comments', 'did_type', 'locked', 'stuck_at',
85+
'child_rule_id', 'state', 'locks_ok_cnt', 'purge_replicas', 'eol_at','id', 'error', 'locks_replicating_cnt', 'ignore_availability',
86+
'split_container', 'locks_stuck_cnt', 'meta', 'bytes']
87+
)
6988

7089
if rse is not None:
71-
df_rules['includes_rse'] = df_rules['rse_expression'].apply(lambda exp: {'rse':rse} in list(rucio_client.list_rses(rse_expression=exp)))
90+
df_rules['includes_rse'] = df_rules['rse_expression'].apply(lambda exp: includes_rse_safe(rucio_client,exp, rse))
7291
df_rules = df_rules.loc[df_rules.includes_rse]
7392

7493
#Rules protecting the replicas at File level

0 commit comments

Comments
 (0)