1
1
#!/usr/bin/env python
2
2
# -*- coding: utf-8 -*-
3
- # Copyright 1999-2024 Alibaba Group Holding Ltd.
3
+ # Copyright 1999-2025 Alibaba Group Holding Ltd.
4
4
#
5
5
# Licensed under the Apache License, Version 2.0 (the "License");
6
6
# you may not use this file except in compliance with the License.
14
14
# See the License for the specific language governing permissions and
15
15
# limitations under the License.
16
16
17
+ import json
18
+ import logging
17
19
import time
18
20
from collections import OrderedDict
19
21
23
25
from .instance import Instance
24
26
from .xflow import XFlow
25
27
28
+ logger = logging .getLogger (__name__ )
29
+
26
30
27
31
class XFlows (Iterable ):
28
32
marker = serializers .XMLNodeField ("Marker" )
@@ -139,15 +143,31 @@ def run_xflow(
139
143
self , xflow_instance = None , project = None , hints = None , parameters = None , ** kw
140
144
):
141
145
project = project or self .parent
142
- hints = hints or {}
146
+ props = kw .get ("properties" ) or OrderedDict ()
147
+ props .update (hints or {})
143
148
if options .ml .xflow_settings :
144
- hints .update (options .ml .xflow_settings )
145
- if hints :
146
- kw ["properties" ] = hints
149
+ props .update (options .ml .xflow_settings )
147
150
if options .biz_id :
148
- if kw .get ("properties" ) is None :
149
- kw ["properties" ] = OrderedDict ()
150
- kw ["properties" ]["biz_id" ] = str (options .biz_id )
151
+ props ["biz_id" ] = str (options .biz_id )
152
+
153
+ if options .default_task_settings :
154
+ settings = options .default_task_settings .copy ()
155
+ exist_settings = json .loads (
156
+ props .get ("settings" ) or "{}" , object_pairs_hook = OrderedDict
157
+ )
158
+ settings .update (exist_settings )
159
+ str_settings = OrderedDict ()
160
+ for k , v in settings .items ():
161
+ if isinstance (v , six .string_types ):
162
+ str_settings [k ] = v
163
+ elif isinstance (v , bool ):
164
+ str_settings [k ] = "true" if v else "false"
165
+ else :
166
+ str_settings [k ] = str (v )
167
+ props ["settings" ] = json .dumps (str_settings )
168
+
169
+ if props :
170
+ kw ["properties" ] = props
151
171
if parameters :
152
172
new_params = OrderedDict ()
153
173
for k , v in six .iteritems (parameters ):
@@ -158,11 +178,15 @@ def run_xflow(
158
178
else :
159
179
new_params [k ] = v
160
180
parameters = new_params
161
- return project .instances .create (
162
- xml = self ._gen_xflow_instance_xml (
163
- xflow_instance = xflow_instance , parameters = parameters , ** kw
164
- )
181
+
182
+ inst_xml = self ._gen_xflow_instance_xml (
183
+ xflow_instance = xflow_instance , parameters = parameters , ** kw
165
184
)
185
+ try :
186
+ return project .instances .create (xml = inst_xml )
187
+ except :
188
+ logger .error ("Failed to create xflow instance. Job XML:\n %s" , inst_xml )
189
+ raise
166
190
167
191
class XFlowResult (XMLRemoteModel ):
168
192
class XFlowAction (XMLRemoteModel ):
0 commit comments