@@ -166,5 +166,123 @@ def validate_events(events_file, spec_dir):
166166 exit (1 )
167167
168168
169+ @cli .command ()
170+ @click .option ('--scenario' , required = True , help = 'Scenario name to run' )
171+ @click .option ('--output-dir' , required = True , help = 'Output directory for events' )
172+ def run_scenario (scenario , output_dir ):
173+ """Run a specific scenario for CI/CD workflow using dbt-ol wrapper"""
174+ import subprocess
175+ import os
176+
177+ click .echo (f"🚀 Running scenario: { scenario } " )
178+ click .echo (f"📁 Output directory: { output_dir } \n " )
179+
180+ # Validate scenario exists
181+ scenario_path = Path (__file__ ).parent .parent / "scenarios" / scenario
182+ if not scenario_path .exists ():
183+ click .echo (f"❌ Scenario not found: { scenario } " )
184+ exit (1 )
185+
186+ # Ensure output directory exists
187+ output_path = Path (output_dir )
188+ output_path .mkdir (parents = True , exist_ok = True )
189+
190+ # Path to runner directory
191+ runner_dir = Path (__file__ ).parent .parent / "runner"
192+
193+ # Create scenario-specific output directory
194+ scenario_output_dir = output_path / scenario
195+ scenario_output_dir .mkdir (parents = True , exist_ok = True )
196+
197+ # Temporary events file for this run
198+ temp_events_file = scenario_output_dir / "openlineage_events.jsonl"
199+
200+ # Backup and modify openlineage.yml
201+ openlineage_config = runner_dir / "openlineage.yml"
202+ openlineage_backup = runner_dir / "openlineage.yml.backup"
203+
204+ import shutil
205+ import yaml
206+
207+ try :
208+ # Backup original config
209+ if openlineage_config .exists ():
210+ shutil .copy (openlineage_config , openlineage_backup )
211+
212+ # Update config to write to our output directory
213+ config = {
214+ 'transport' : {
215+ 'type' : 'file' ,
216+ 'log_file_path' : str (temp_events_file .absolute ()),
217+ 'append' : False
218+ }
219+ }
220+
221+ with open (openlineage_config , 'w' ) as f :
222+ yaml .dump (config , f )
223+
224+ click .echo ("📝 Updated OpenLineage configuration" )
225+
226+ # Run dbt-ol commands (wrapper that emits OpenLineage events)
227+ click .echo ("🔨 Running dbt-ol seed..." )
228+ result = subprocess .run (
229+ ['dbt-ol' , 'seed' , '--project-dir' , str (runner_dir ), '--profiles-dir' , str (runner_dir ),
230+ '--vars' , f'scenario: { scenario } ' , '--no-version-check' ],
231+ cwd = runner_dir ,
232+ check = True
233+ )
234+
235+ click .echo ("🔨 Running dbt-ol run..." )
236+ subprocess .run (
237+ ['dbt-ol' , 'run' , '--project-dir' , str (runner_dir ), '--profiles-dir' , str (runner_dir ),
238+ '--vars' , f'scenario: { scenario } ' , '--no-version-check' ],
239+ cwd = runner_dir ,
240+ check = True
241+ )
242+
243+ click .echo ("🔨 Running dbt-ol test..." )
244+ result = subprocess .run (
245+ ['dbt-ol' , 'test' , '--project-dir' , str (runner_dir ), '--profiles-dir' , str (runner_dir ),
246+ '--vars' , f'scenario: { scenario } ' , '--no-version-check' ],
247+ cwd = runner_dir
248+ )
249+ if result .returncode != 0 :
250+ click .echo ("⚠️ dbt test had failures (continuing to capture events)" )
251+
252+ # The file transport creates individual JSON files with timestamps
253+ # Find and rename them to sequential format
254+ import glob
255+ event_files = sorted (glob .glob (str (scenario_output_dir / "openlineage_events.jsonl-*.json" )))
256+
257+ if event_files :
258+ click .echo (f"📋 Generated { len (event_files )} OpenLineage events" )
259+
260+ # Rename to sequential format
261+ for i , event_file in enumerate (event_files , 1 ):
262+ old_path = Path (event_file )
263+ new_path = scenario_output_dir / f"event_{ i :03d} .json"
264+ old_path .rename (new_path )
265+
266+ click .echo (f"✅ Events written to { scenario_output_dir } " )
267+ else :
268+ click .echo (f"⚠️ No events generated in { scenario_output_dir } " )
269+
270+ exit (0 )
271+
272+ except subprocess .CalledProcessError as e :
273+ click .echo (f"❌ dbt command failed: { e } " )
274+ if e .output :
275+ click .echo (f" Output: { e .output .decode ()} " )
276+ exit (1 )
277+ except Exception as e :
278+ click .echo (f"❌ Error running scenario: { e } " )
279+ exit (1 )
280+ finally :
281+ # Restore original config
282+ if openlineage_backup .exists ():
283+ shutil .move (openlineage_backup , openlineage_config )
284+ click .echo ("🔄 Restored original OpenLineage configuration" )
285+
286+
169287if __name__ == '__main__' :
170288 cli ()
0 commit comments