|
1 | 1 | import json |
2 | 2 | import textwrap |
3 | | -import os |
4 | 3 | from typing import Any, Mapping, Union, Literal, Optional |
5 | 4 | import dagster as dg |
6 | 5 | from hooli_data_eng.utils import get_env |
|
17 | 16 | from datetime import datetime |
18 | 17 | from hooli_data_eng.defs.dbt.dbt_code_version import get_current_dbt_code_version |
19 | 18 | from hooli_data_eng.defs.dbt.resources import resource_def |
20 | | -from github import Github |
21 | 19 |
|
22 | 20 | from datetime import timedelta |
23 | 21 | from dagster.preview.freshness import apply_freshness_policy |
@@ -242,7 +240,6 @@ def _dbt_asset(context: dg.AssetExecutionContext, dbt: DbtCliResource): |
242 | 240 | asset_checks=checks, |
243 | 241 | sensors=sensors, |
244 | 242 | resources=resource_def[get_env()], |
245 | | - jobs=[get_slim_ci_job()], |
246 | 243 | ) |
247 | 244 |
|
248 | 245 | defs = defs.map_resolved_asset_specs( |
@@ -297,116 +294,3 @@ def dbt_code_version_sensor(context: dg.SensorEvaluationContext): |
297 | 294 | return dbt_code_version_sensor |
298 | 295 |
|
299 | 296 |
|
300 | | -def get_slim_ci_job(): |
301 | | - # This op will be used to run slim CI |
302 | | - @dg.op(out={}) |
303 | | - def dbt_slim_ci(context: dg.OpExecutionContext, dbt: DbtCliResource): |
304 | | - dbt_command = [ |
305 | | - "build", |
306 | | - "--select", |
307 | | - "state:modified.body+", |
308 | | - "--defer", |
309 | | - "--state", |
310 | | - dbt.state_path, |
311 | | - ] |
312 | | - |
313 | | - dbt_cli_task = dbt.cli( |
314 | | - args=dbt_command, |
315 | | - manifest=dbt_project.manifest_path, |
316 | | - dagster_dbt_translator=get_hooli_translator(), |
317 | | - ) |
318 | | - |
319 | | - # Collect results and track successful models |
320 | | - successful_models = [] |
321 | | - |
322 | | - for event in dbt_cli_task.stream().fetch_row_counts().fetch_column_metadata(): |
323 | | - yield event |
324 | | - |
325 | | - # Get run results to identify successful models |
326 | | - try: |
327 | | - run_results_json = dbt_cli_task.get_artifact("run_results.json") |
328 | | - for result in run_results_json.get("results", []): |
329 | | - if result.get("status") == "success": |
330 | | - model_name = result.get("unique_id", "").split(".")[-1] |
331 | | - if model_name: |
332 | | - successful_models.append(model_name) |
333 | | - |
334 | | - context.log.info(f"Successfully ran {len(successful_models)} dbt models: {successful_models}") |
335 | | - |
336 | | - # Post GitHub comment if in CI environment |
337 | | - _post_github_comment(context, successful_models) |
338 | | - |
339 | | - except Exception as e: |
340 | | - context.log.warning(f"Could not parse run results or post GitHub comment: {e}") |
341 | | - |
342 | | - def _post_github_comment(context: dg.OpExecutionContext, successful_models: list): |
343 | | - """Post a comment to the GitHub PR about successful dbt models""" |
344 | | - # Only post comments in CI/branch environment |
345 | | - if get_env() != "BRANCH": |
346 | | - context.log.info("Not in branch environment, skipping GitHub comment") |
347 | | - return |
348 | | - |
349 | | - # Get GitHub token and repo info from environment |
350 | | - github_token = os.getenv("GITHUB_TOKEN") |
351 | | - github_repo = os.getenv("GITHUB_REPOSITORY") # format: owner/repo - available by default |
352 | | - |
353 | | - # Get PR number from GitHub event (available in pull_request events) |
354 | | - github_event_path = os.getenv("GITHUB_EVENT_PATH") |
355 | | - github_pr_number = None |
356 | | - |
357 | | - if github_event_path: |
358 | | - try: |
359 | | - with open(github_event_path, 'r') as f: |
360 | | - event_data = json.load(f) |
361 | | - github_pr_number = event_data.get("number") or event_data.get("pull_request", {}).get("number") |
362 | | - except Exception as e: |
363 | | - context.log.warning(f"Could not parse GitHub event data: {e}") |
364 | | - |
365 | | - # Fallback to GITHUB_REF for PR number extraction |
366 | | - if not github_pr_number: |
367 | | - github_ref = os.getenv("GITHUB_REF") # format: refs/pull/{pr_number}/merge |
368 | | - if github_ref and github_ref.startswith("refs/pull/") and github_ref.endswith("/merge"): |
369 | | - try: |
370 | | - github_pr_number = int(github_ref.split("/")[2]) |
371 | | - except (IndexError, ValueError): |
372 | | - pass |
373 | | - |
374 | | - if not all([github_token, github_repo, github_pr_number]): |
375 | | - context.log.warning( |
376 | | - f"Missing required GitHub information. " |
377 | | - f"Token: {'✓' if github_token else '✗'}, " |
378 | | - f"Repo: {'✓' if github_repo else '✗'}, " |
379 | | - f"PR: {'✓' if github_pr_number else '✗'}. " |
380 | | - "Cannot post comment." |
381 | | - ) |
382 | | - return |
383 | | - |
384 | | - try: |
385 | | - g = Github(github_token) |
386 | | - repo = g.get_repo(github_repo) |
387 | | - pr = repo.get_pull(int(github_pr_number)) |
388 | | - |
389 | | - if not successful_models: |
390 | | - comment_body = "✅ **dbt Slim CI Results**\n\nNo dbt models were modified or ran successfully in this PR." |
391 | | - else: |
392 | | - model_list = "\n".join([f"- `{model}`" for model in sorted(successful_models)]) |
393 | | - comment_body = f"""✅ **dbt Slim CI Results** |
394 | | -
|
395 | | -Successfully ran **{len(successful_models)}** dbt model(s): |
396 | | -
|
397 | | -{model_list} |
398 | | -
|
399 | | -All models passed validation and tests!""" |
400 | | - |
401 | | - pr.create_issue_comment(comment_body) |
402 | | - context.log.info(f"Posted GitHub comment to PR #{github_pr_number}") |
403 | | - |
404 | | - except Exception as e: |
405 | | - context.log.error(f"Failed to post GitHub comment: {e}") |
406 | | - |
407 | | - # This job will be triggered by Pull Request and should only run new or changed dbt models |
408 | | - @dg.job(name="dbt_slim_ci_with_github_job") |
409 | | - def dbt_slim_ci_with_github_job(): |
410 | | - dbt_slim_ci() |
411 | | - |
412 | | - return dbt_slim_ci_with_github_job |
0 commit comments