@@ -313,10 +313,8 @@ def emit(
313313
314314 # Incremental: skip if already processed (cache key exists)
315315 cache_key = self ._make_emit_cache_key (data )
316- if self .incremental :
317- if cache_key and self .check_tag (cache_key ):
318- self .log .info ("Skipping emit (incremental)" , cache_key = cache_key )
319- return
316+ if self .should_skip_incremental (cache_key ):
317+ return
320318 # Store cache key in data for marking complete at store stage
321319 data ["_emit_cache_key" ] = cache_key
322320
@@ -358,6 +356,14 @@ def mark_emit_complete(self, data: dict[str, Any]) -> None:
358356 self .set_tag (cache_key , datetime .now ())
359357 self .log .debug ("Marked emit complete" , cache_key = cache_key )
360358
359+ def should_skip_incremental (self , cache_key : str | None ) -> bool :
360+ """Test if an emit_cache_key already exists"""
361+ if self .incremental :
362+ if cache_key and self .check_tag (cache_key ):
363+ self .log .info ("Skipping (incremental)" , cache_key = cache_key )
364+ return True
365+ return False
366+
361367 def recurse (
362368 self , data : dict [str , Any ] | None = None , delay : int | None = None
363369 ) -> None :
@@ -369,6 +375,10 @@ def recurse(
369375 def execute (self , data : dict [str , Any ]) -> Any :
370376 """Execute the stage method with the given data."""
371377 try :
378+ # maybe someone else was faster:
379+ cache_key = self ._make_emit_cache_key (data )
380+ if self .should_skip_incremental (cache_key ):
381+ return
372382 self .log .info (
373383 "Executing stage" ,
374384 method = self .stage .method_name ,
0 commit comments