1717from zephyr .execution import ZephyrContext
1818from zephyr .writers import write_parquet_file
1919
20- from .conftest import CallCounter
21-
2220
2321@pytest .fixture
2422def sample_data ():
@@ -191,27 +189,11 @@ def test_chaining_operations(zephyr_ctx):
191189
192190
193191def test_lazy_evaluation ():
194- """Test that operations are lazy until backend executes."""
195- call_count = 0
196-
197- def counting_fn (x ):
198- nonlocal call_count
199- call_count += 1
200- return x * 2
201-
202- # Create dataset with map - should not execute yet
203- ds = Dataset .from_list ([1 , 2 , 3 ]).map (counting_fn )
204- assert call_count == 0
205-
206- # Now execute - should call function
207- client = LocalClient ()
208- ctx = ZephyrContext (client = client , max_workers = 1 , resources = ResourceConfig (cpu = 1 , ram = "512m" ), name = "test-dataset" )
209- try :
210- result = list (ctx .execute (ds ))
211- assert result == [2 , 4 , 6 ]
212- assert call_count == 3
213- finally :
214- ctx .shutdown ()
192+ """Test that dataset construction does not execute operations eagerly."""
193+ sentinel = []
194+ _ = Dataset .from_list ([1 , 2 , 3 ]).map (lambda x : sentinel .append (x ) or x * 2 )
195+ # Pipeline was built but nothing executed yet
196+ assert sentinel == []
215197
216198
217199def test_empty_dataset (zephyr_ctx ):
@@ -992,21 +974,20 @@ def test_skip_existing_clean_run(tmp_path, sample_input_files):
992974 output_dir = tmp_path / "output"
993975 output_dir .mkdir ()
994976
995- counter = CallCounter ()
996977 ds = (
997978 Dataset .from_files (f"{ sample_input_files } /*.jsonl" )
998- .flat_map (lambda x : counter . counting_flat_map ( x ) )
999- .map (lambda x : counter . counting_map ( x ) )
979+ .flat_map (load_file )
980+ .map (lambda x : { ** x , "processed" : True } )
1000981 .write_jsonl (str (output_dir / "output-{shard:05d}.jsonl" ), skip_existing = True )
1001982 )
1002983
1003984 try :
1004985 result = list (ctx .execute (ds ))
1005986 assert len (result ) == 3
1006987 assert all (Path (p ).exists () for p in result )
1007- assert counter . flat_map_count == 3 # All files loaded
1008- assert counter . map_count == 3 # All items mapped
1009- assert sorted ( counter . processed_ids ) == [ 0 , 1 , 2 ] # All shards ran
988+ for p in result :
989+ records = [ json . loads ( line ) for line in Path ( p ). read_text (). strip (). splitlines ()]
990+ assert all ( r . get ( "processed" ) for r in records )
1010991 finally :
1011992 ctx .shutdown ()
1012993
@@ -1018,25 +999,28 @@ def test_skip_existing_one_file_exists(tmp_path, sample_input_files):
1018999 output_dir = tmp_path / "output"
10191000 output_dir .mkdir ()
10201001
1021- # Manually create one output file (shard 1)
1002+ # Manually create one output file (shard 1) — no "processed" flag
10221003 with open (output_dir / "output-00001.jsonl" , "w" ) as f :
1023- f .write ('{"id": 1, "processed ": true}\n ' )
1004+ f .write ('{"id": 1, "skipped ": true}\n ' )
10241005
1025- counter = CallCounter ()
10261006 ds = (
10271007 Dataset .from_files (f"{ sample_input_files } /*.jsonl" )
1028- .flat_map (lambda x : counter . counting_flat_map ( x ) )
1029- .map (lambda x : counter . counting_map ( x ) )
1008+ .flat_map (load_file )
1009+ .map (lambda x : { ** x , "processed" : True } )
10301010 .write_jsonl (str (output_dir / "output-{shard:05d}.jsonl" ), skip_existing = True )
10311011 )
10321012
10331013 try :
10341014 result = list (ctx .execute (ds ))
10351015 assert len (result ) == 3
10361016 assert all (Path (p ).exists () for p in result )
1037- assert counter .flat_map_count == 2 # Only 2 files loaded (shard 1 skipped)
1038- assert counter .map_count == 2 # Only 2 items mapped
1039- assert sorted (counter .processed_ids ) == [0 , 2 ] # Only shards 0 and 2 ran
1017+ # Shard 1 was skipped — its file still has the pre-existing content
1018+ shard1 = [json .loads (line ) for line in (output_dir / "output-00001.jsonl" ).read_text ().strip ().splitlines ()]
1019+ assert shard1 == [{"id" : 1 , "skipped" : True }]
1020+ # Shards 0 and 2 ran — they have "processed" flag
1021+ for shard_file in ["output-00000.jsonl" , "output-00002.jsonl" ]:
1022+ records = [json .loads (line ) for line in (output_dir / shard_file ).read_text ().strip ().splitlines ()]
1023+ assert all (r .get ("processed" ) for r in records )
10401024 finally :
10411025 ctx .shutdown ()
10421026
@@ -1048,36 +1032,38 @@ def test_skip_existing_all_files_exist(tmp_path, sample_input_files):
10481032 output_dir = tmp_path / "output"
10491033 output_dir .mkdir ()
10501034
1051- counter = CallCounter ()
10521035 ds = (
10531036 Dataset .from_files (f"{ sample_input_files } /*.jsonl" )
1054- .flat_map (lambda x : counter . counting_flat_map ( x ) )
1055- .map (lambda x : counter . counting_map ( x ) )
1037+ .flat_map (load_file )
1038+ .map (lambda x : { ** x , "processed" : True } )
10561039 .write_jsonl (str (output_dir / "output-{shard:05d}.jsonl" ), skip_existing = True )
10571040 )
10581041
10591042 try :
10601043 # First run: create all output files
10611044 result = list (ctx .execute (ds ))
10621045 assert len (result ) == 3
1063- assert counter .flat_map_count == 3
1064- assert counter .map_count == 3
1065- assert sorted (counter .processed_ids ) == [0 , 1 , 2 ] # All shards ran
1046+ assert all (Path (p ).exists () for p in result )
1047+ for p in result :
1048+ records = [json .loads (line ) for line in Path (p ).read_text ().strip ().splitlines ()]
1049+ assert all (r .get ("processed" ) for r in records )
10661050
1067- # Second run: all files exist, nothing should process
1068- counter .reset ()
1069- ds = (
1051+ # Snapshot file contents
1052+ contents = {p : Path (p ).read_text () for p in result }
1053+
1054+ # Second run: all files exist, nothing should be rewritten
1055+ ds2 = (
10701056 Dataset .from_files (f"{ sample_input_files } /*.jsonl" )
1071- .flat_map (counter . counting_flat_map )
1072- .map (counter . counting_map )
1057+ .flat_map (load_file )
1058+ .map (lambda x : { ** x , "rerun" : True } )
10731059 .write_jsonl (str (output_dir / "output-{shard:05d}.jsonl" ), skip_existing = True )
10741060 )
10751061
1076- result = list (ctx .execute (ds ))
1077- assert len (result ) == 3
1078- assert counter . flat_map_count == 0 # Nothing loaded
1079- assert counter . map_count == 0 # Nothing mapped
1080- assert counter . processed_ids == [] # No shards ran
1062+ result2 = list (ctx .execute (ds2 ))
1063+ assert len (result2 ) == 3
1064+ # Files should be untouched — still have "processed", not "rerun"
1065+ for p in result2 :
1066+ assert Path ( p ). read_text () == contents [ p ]
10811067 finally :
10821068 ctx .shutdown ()
10831069
0 commit comments