@@ -94,6 +94,64 @@ def prepareTransformationTasks(self, transBody, taskDict, owner="", ownerGroup="
9494
9595 return S_OK (taskDict )
9696
97+ @staticmethod
98+ def _buildOperationsBodyForTask (transJson , task , owner , ownerGroup ):
99+ """
100+ :param transJson: list of lists of string and dictionaries, e.g.:
101+
102+ .. code :: python
103+
104+ body = [ ( "ReplicateAndRegister", { "SourceSE":"FOO-SRM", "TargetSE":"TASK:TargetSE" }),
105+ ( "RemoveReplica", { "TargetSE":"FOO-SRM" } ),
106+ ]
107+
108+ If a value of an operation parameter in the body starts with ``TASK:``,
109+ we take it from the taskDict.
110+ For example ``TASK:TargetSE`` is replaced with ``task['TargetSE']``
111+
112+ :param dict taskDict: dictionary of tasks, modified in this function
113+ :param str owner: owner used for the requests
114+ :param str onwerGroup: dirac group used for the requests
115+
116+ :returns: Request
117+ """
118+
119+ files = []
120+ oRequest = Request ()
121+ if isinstance (task ["InputData" ], list ):
122+ files = task ["InputData" ]
123+ elif isinstance (task ["InputData" ], str ):
124+ files = task ["InputData" ].split (";" )
125+
126+ # create the operations from the json structure
127+ for operationTuple in transJson :
128+ op = Operation ()
129+ op .Type = operationTuple [0 ]
130+ for parameter , value in operationTuple [1 ].items ():
131+ # Here we massage a bit the body to replace some parameters
132+ # with what we have in the task.
133+ try :
134+ taskKey = value .split ("TASK:" )[1 ]
135+ value = task [taskKey ]
136+ # Either the attribute is not a string (AttributeError)
137+ # or it does not start with 'TASK:' (IndexError)
138+ except (AttributeError , IndexError ):
139+ pass
140+ # That happens when the requested substitution is not
141+ # a key in the task, and that's a problem
142+ except KeyError :
143+ raise StopTaskIteration (f"Parameter { taskKey } does not exist in taskDict" )
144+
145+ setattr (op , parameter , value )
146+
147+ for lfn in files :
148+ opFile = File ()
149+ opFile .LFN = lfn
150+ op .addFile (opFile )
151+
152+ oRequest .addOperation (op )
153+ return oRequest
154+
97155 def _multiOperationsBody (self , transJson , taskDict , owner , ownerGroup ):
98156 """Deal with a Request that has multiple operations
99157
@@ -120,41 +178,7 @@ def _multiOperationsBody(self, transJson, taskDict, owner, ownerGroup):
120178 transID = task ["TransformationID" ]
121179 if not task .get ("InputData" ):
122180 raise StopTaskIteration ("No input data" )
123- files = []
124-
125- oRequest = Request ()
126- if isinstance (task ["InputData" ], list ):
127- files = task ["InputData" ]
128- elif isinstance (task ["InputData" ], str ):
129- files = task ["InputData" ].split (";" )
130-
131- # create the operations from the json structure
132- for operationTuple in transJson :
133- op = Operation ()
134- op .Type = operationTuple [0 ]
135- for parameter , value in operationTuple [1 ].items ():
136- # Here we massage a bit the body to replace some parameters
137- # with what we have in the task.
138- try :
139- taskKey = value .split ("TASK:" )[1 ]
140- value = task [taskKey ]
141- # Either the attribute is not a string (AttributeError)
142- # or it does not start with 'TASK:' (IndexError)
143- except (AttributeError , IndexError ):
144- pass
145- # That happens when the requested substitution is not
146- # a key in the task, and that's a problem
147- except KeyError :
148- raise StopTaskIteration (f"Parameter { taskKey } does not exist in taskDict" )
149-
150- setattr (op , parameter , value )
151-
152- for lfn in files :
153- opFile = File ()
154- opFile .LFN = lfn
155- op .addFile (opFile )
156-
157- oRequest .addOperation (op )
181+ oRequest = self ._buildOperationsBodyForTask (transJson , task , owner , ownerGroup )
158182
159183 result = self ._assignRequestToTask (oRequest , taskDict , transID , taskID , owner , ownerGroup )
160184 if not result ["OK" ]:
0 commit comments