24
24
from dagster ._utils .error import SerializableErrorInfo
25
25
26
26
if TYPE_CHECKING :
27
+ from dagster ._core .instance import DagsterInstance
27
28
from dagster ._daemon .daemon import DaemonIterator
28
29
29
30
@@ -140,6 +141,78 @@ def _is_retryable_asset_backfill_error(e: Exception):
140
141
return not isinstance (e , (DagsterError , check .CheckError ))
141
142
142
143
144
+ def execute_backfill_iteration_with_instigation_logger (
145
+ backfill : "PartitionBackfill" ,
146
+ logger : logging .Logger ,
147
+ workspace_process_context : IWorkspaceProcessContext ,
148
+ instance : "DagsterInstance" ,
149
+ debug_crash_flags : Optional [Mapping [str , int ]] = None ,
150
+ ) -> Iterable :
151
+ with _get_instigation_logger_if_log_storage_enabled (instance , backfill , logger ) as _logger :
152
+ # create a logger that will always include the backfill_id as an `extra`
153
+ backfill_logger = cast (
154
+ logging .Logger ,
155
+ logging .LoggerAdapter (_logger , extra = {"backfill_id" : backfill .backfill_id }),
156
+ )
157
+ try :
158
+ if backfill .is_asset_backfill :
159
+ yield from execute_asset_backfill_iteration (
160
+ backfill , backfill_logger , workspace_process_context , instance
161
+ )
162
+ else :
163
+ yield from execute_job_backfill_iteration (
164
+ backfill ,
165
+ backfill_logger ,
166
+ workspace_process_context ,
167
+ debug_crash_flags ,
168
+ instance ,
169
+ )
170
+ except Exception as e :
171
+ backfill = check .not_none (instance .get_backfill (backfill .backfill_id ))
172
+ if (
173
+ backfill .is_asset_backfill
174
+ and backfill .status == BulkActionStatus .REQUESTED
175
+ and backfill .failure_count < _get_max_asset_backfill_retries ()
176
+ and _is_retryable_asset_backfill_error (e )
177
+ ):
178
+ if isinstance (e , (DagsterUserCodeUnreachableError , DagsterCodeLocationLoadError )):
179
+ try :
180
+ raise DagsterUserCodeUnreachableError (
181
+ "Unable to reach the code server. Backfill will resume once the code server is available."
182
+ ) from e
183
+ except :
184
+ error_info = DaemonErrorCapture .process_exception (
185
+ sys .exc_info (),
186
+ logger = backfill_logger ,
187
+ log_message = f"Backfill failed for { backfill .backfill_id } due to unreachable code server and will retry" ,
188
+ )
189
+ instance .update_backfill (backfill .with_error (error_info ))
190
+ else :
191
+ error_info = DaemonErrorCapture .process_exception (
192
+ sys .exc_info (),
193
+ logger = backfill_logger ,
194
+ log_message = f"Backfill failed for { backfill .backfill_id } and will retry." ,
195
+ )
196
+ instance .update_backfill (
197
+ backfill .with_error (error_info ).with_failure_count (
198
+ backfill .failure_count + 1
199
+ )
200
+ )
201
+ else :
202
+ error_info = DaemonErrorCapture .process_exception (
203
+ sys .exc_info (),
204
+ logger = backfill_logger ,
205
+ log_message = f"Backfill failed for { backfill .backfill_id } " ,
206
+ )
207
+ instance .update_backfill (
208
+ backfill .with_status (BulkActionStatus .FAILED )
209
+ .with_error (error_info )
210
+ .with_failure_count (backfill .failure_count + 1 )
211
+ .with_end_timestamp (get_current_timestamp ())
212
+ )
213
+ yield error_info
214
+
215
+
143
216
def execute_backfill_jobs (
144
217
workspace_process_context : IWorkspaceProcessContext ,
145
218
logger : logging .Logger ,
@@ -155,100 +228,41 @@ def execute_backfill_jobs(
155
228
156
229
# refetch, in case the backfill was updated in the meantime
157
230
backfill = cast (PartitionBackfill , instance .get_backfill (backfill_id ))
158
- with _get_instigation_logger_if_log_storage_enabled (instance , backfill , logger ) as _logger :
159
- # create a logger that will always include the backfill_id as an `extra`
160
- backfill_logger = cast (
161
- logging .Logger ,
162
- logging .LoggerAdapter (_logger , extra = {"backfill_id" : backfill .backfill_id }),
163
- )
164
231
165
- try :
166
- if threadpool_executor :
167
- if backfill_futures is None :
168
- check .failed (
169
- "backfill_futures dict must be passed with threadpool_executor"
170
- )
171
-
172
- # only allow one backfill per backfill job to be in flight
173
- if backfill_id in backfill_futures and not backfill_futures [backfill_id ].done ():
174
- continue
175
-
176
- if backfill .is_asset_backfill :
177
- future = threadpool_executor .submit (
178
- return_as_list (execute_asset_backfill_iteration ),
179
- backfill ,
180
- backfill_logger ,
181
- workspace_process_context ,
182
- instance ,
183
- )
184
- else :
185
- future = threadpool_executor .submit (
186
- return_as_list (execute_job_backfill_iteration ),
187
- backfill ,
188
- backfill_logger ,
189
- workspace_process_context ,
190
- debug_crash_flags ,
191
- instance ,
192
- )
193
- backfill_futures [backfill_id ] = future
194
- yield
232
+ try :
233
+ if threadpool_executor :
234
+ if backfill_futures is None :
235
+ check .failed ("backfill_futures dict must be passed with threadpool_executor" )
236
+
237
+ # only allow one backfill per backfill job to be in flight
238
+ if backfill_id in backfill_futures and not backfill_futures [backfill_id ].done ():
239
+ continue
240
+
241
+ future = threadpool_executor .submit (
242
+ return_as_list (execute_backfill_iteration_with_instigation_logger ),
243
+ backfill ,
244
+ logger ,
245
+ workspace_process_context ,
246
+ instance ,
247
+ debug_crash_flags ,
248
+ )
249
+
250
+ backfill_futures [backfill_id ] = future
251
+ yield
252
+
253
+ else :
254
+ yield from execute_backfill_iteration_with_instigation_logger (
255
+ backfill ,
256
+ logger ,
257
+ workspace_process_context ,
258
+ instance ,
259
+ debug_crash_flags ,
260
+ )
195
261
196
- else :
197
- if backfill .is_asset_backfill :
198
- yield from execute_asset_backfill_iteration (
199
- backfill , backfill_logger , workspace_process_context , instance
200
- )
201
- else :
202
- yield from execute_job_backfill_iteration (
203
- backfill ,
204
- backfill_logger ,
205
- workspace_process_context ,
206
- debug_crash_flags ,
207
- instance ,
208
- )
209
- except Exception as e :
210
- backfill = check .not_none (instance .get_backfill (backfill .backfill_id ))
211
- if (
212
- backfill .is_asset_backfill
213
- and backfill .status == BulkActionStatus .REQUESTED
214
- and backfill .failure_count < _get_max_asset_backfill_retries ()
215
- and _is_retryable_asset_backfill_error (e )
216
- ):
217
- if isinstance (
218
- e , (DagsterUserCodeUnreachableError , DagsterCodeLocationLoadError )
219
- ):
220
- try :
221
- raise DagsterUserCodeUnreachableError (
222
- "Unable to reach the code server. Backfill will resume once the code server is available."
223
- ) from e
224
- except :
225
- error_info = DaemonErrorCapture .process_exception (
226
- sys .exc_info (),
227
- logger = backfill_logger ,
228
- log_message = f"Backfill failed for { backfill .backfill_id } due to unreachable code server and will retry" ,
229
- )
230
- instance .update_backfill (backfill .with_error (error_info ))
231
- else :
232
- error_info = DaemonErrorCapture .process_exception (
233
- sys .exc_info (),
234
- logger = backfill_logger ,
235
- log_message = f"Backfill failed for { backfill .backfill_id } and will retry." ,
236
- )
237
- instance .update_backfill (
238
- backfill .with_error (error_info ).with_failure_count (
239
- backfill .failure_count + 1
240
- )
241
- )
242
- else :
243
- error_info = DaemonErrorCapture .process_exception (
244
- sys .exc_info (),
245
- logger = backfill_logger ,
246
- log_message = f"Backfill failed for { backfill .backfill_id } " ,
247
- )
248
- instance .update_backfill (
249
- backfill .with_status (BulkActionStatus .FAILED )
250
- .with_error (error_info )
251
- .with_failure_count (backfill .failure_count + 1 )
252
- .with_end_timestamp (get_current_timestamp ())
253
- )
254
- yield error_info
262
+ except Exception :
263
+ error_info = DaemonErrorCapture .process_exception (
264
+ exc_info = sys .exc_info (),
265
+ logger = logger ,
266
+ log_message = f"BackfillDaemon caught an error for backfill { backfill_id } " ,
267
+ )
268
+ yield error_info
0 commit comments