Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 83 additions & 70 deletions .github/workflows/10_feature_dbt_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ jobs:
# Full refresh control variables
FULL_REFRESH_FLAG: ${{ contains(github.event.pull_request.labels.*.name, 'full-refresh') && '--full-refresh' || '' }}

# Needed for dbt-api
DATACOVES__API_ENDPOINT: ${{ vars.DATACOVES__API_ENDPOINT }}
DATACOVES__API_TOKEN: ${{ secrets.DATACOVES__API_TOKEN }}
DATACOVES__ACCOUNT_ID: ${{ vars.DATACOVES__ACCOUNT_ID }}
DATACOVES__PROJECT_SLUG: ${{ vars.DATACOVES__PROJECT_SLUG }}
DATACOVES__ENVIRONMENT_SLUG: ${{ vars.DATACOVES__ENVIRONMENT_SLUG }}

steps:
- name: Checkout branch
uses: actions/[email protected]
Expand All @@ -89,77 +96,83 @@ jobs:
run: "git diff origin/${{ github.event.pull_request.base.ref }} HEAD --name-status"

- name: Install dbt packages
run: "dbt deps"

- name: Create PR database
run: dbt --no-write-json run-operation create_database

- name: Get prod manifest
id: prod_manifest
run: "../automate/dbt/get_artifacts.sh"

- name: Clone incremental models that are directly or indirectly affected by the change
if: ${{ steps.prod_manifest.outputs.manifest_found == 'true' }}
run: |
dbt clone -s state:modified+,config.materialized:incremental,state:old --state logs
dbt clone -s state:modified+,config.materialized:snapshot,state:old --state logs

##### Governance Checks
# this first runs dbt but creates enpty tables, this is enough to then run the hooks and fail fast

# We need to run observe model so that post hook works
- name: Run Observe Model
run: "dbt build --fail-fast -s L1_inlets.observe"

# There is an issue with --empty and dynamic tables so need to run them by themselves
- name: Governance run of dynamic tables
if: ${{ steps.prod_manifest.outputs.manifest_found == 'false' }}
run: "dbt build --fail-fast -s config.materialized:dynamic_table stg_test_failures resource_type:seed"

# There is an issue with --empty and dynamic tables so need to run them by themselves
- name: Governance run of dynamic tables
if: ${{ steps.prod_manifest.outputs.manifest_found == 'true' }}
run: "dbt build --fail-fast -s config.materialized:dynamic_table stg_test_failures --defer --state logs"

# There is an issue with --empty and dynamic tables so need to exclude them
- name: Governance run of dbt with EMPTY models using slim mode
if: ${{ steps.prod_manifest.outputs.manifest_found == 'true' }}
run: "dbt build --fail-fast --defer --state logs --select state:modified+ --empty --exclude config.materialized:dynamic_table ${{ env.FULL_REFRESH_FLAG }}"

# There is an issue with --empty and dynamic tables so need to exclude
- name: Governance run of dbt with EMPTY models using full run
if: ${{ steps.prod_manifest.outputs.manifest_found == 'false' }}
run: "dbt build --fail-fast --empty --exclude config.materialized:dynamic_table ${{ env.FULL_REFRESH_FLAG }}"

- name: Generate Docs Combining Prod and branch catalog.json
if: ${{ steps.prod_manifest.outputs.catalog_found == 'true' }}
run: "dbt-coves generate docs --merge-deferred --state logs"

- name: Generate dbt Docs
if: ${{ steps.prod_manifest.outputs.catalog_found == 'false' }}
run: "dbt docs generate"

- name: Run governance checks
run: "pre-commit run --from-ref origin/${{ github.event.pull_request.base.ref }} --to-ref HEAD"

##### Real dbt run given that we passed governance checks
- name: Run dbt build slim mode
if: ${{ steps.prod_manifest.outputs.manifest_found == 'true' }}
run: "dbt build --fail-fast --defer --state logs --select state:modified+ ${{ env.FULL_REFRESH_FLAG }}"

- name: Run dbt build full run
if: ${{ steps.prod_manifest.outputs.manifest_found == 'false' }}
run: "dbt build --fail-fast ${{ env.FULL_REFRESH_FLAG }}"

- name: Grant access to PR database
id: grant-access-to-database
run: "dbt --no-write-json run-operation grant_access_to_pr_database"

# We drop the database when there is a failure to grant access to the db because
# most likely the schema was not set properly in dbt_project.yml so models built to default schema
- name: Drop PR database on Failure to grant security access
if: always() && (env.DATACOVES__DROP_DB_ON_FAIL == 'true') && (steps.grant-access-to-database.outcome == 'failure')
run: "dbt --no-write-json run-operation drop_recreate_db --args '{db_name: ${{env.DATACOVES__MAIN__DATABASE}}, recreate: False}'" # yamllint disable-line rule:line-length
dbt deps
dbt compile
../automate/dbt/push_dbt_artifacts.py

# - name: Install dbt packages
# run: "dbt deps"

# - name: Create PR database
# run: dbt --no-write-json run-operation create_database

# - name: Get prod manifest
# id: prod_manifest
# run: "../automate/dbt/get_artifacts.sh"

# - name: Clone incremental models that are directly or indirectly affected by the change
# if: ${{ steps.prod_manifest.outputs.manifest_found == 'true' }}
# run: |
# dbt clone -s state:modified+,config.materialized:incremental,state:old --state logs
# dbt clone -s state:modified+,config.materialized:snapshot,state:old --state logs

# ##### Governance Checks
# # this first runs dbt but creates enpty tables, this is enough to then run the hooks and fail fast

# # We need to run observe model so that post hook works
# - name: Run Observe Model
# run: "dbt build --fail-fast -s L1_inlets.observe"

# # There is an issue with --empty and dynamic tables so need to run them by themselves
# - name: Governance run of dynamic tables
# if: ${{ steps.prod_manifest.outputs.manifest_found == 'false' }}
# run: "dbt build --fail-fast -s config.materialized:dynamic_table stg_test_failures resource_type:seed"

# # There is an issue with --empty and dynamic tables so need to run them by themselves
# - name: Governance run of dynamic tables
# if: ${{ steps.prod_manifest.outputs.manifest_found == 'true' }}
# run: "dbt build --fail-fast -s config.materialized:dynamic_table stg_test_failures --defer --state logs"

# # There is an issue with --empty and dynamic tables so need to exclude them
# - name: Governance run of dbt with EMPTY models using slim mode
# if: ${{ steps.prod_manifest.outputs.manifest_found == 'true' }}
# run: "dbt build --fail-fast --defer --state logs --select state:modified+ --empty --exclude config.materialized:dynamic_table ${{ env.FULL_REFRESH_FLAG }}"

# # There is an issue with --empty and dynamic tables so need to exclude
# - name: Governance run of dbt with EMPTY models using full run
# if: ${{ steps.prod_manifest.outputs.manifest_found == 'false' }}
# run: "dbt build --fail-fast --empty --exclude config.materialized:dynamic_table ${{ env.FULL_REFRESH_FLAG }}"

# - name: Generate Docs Combining Prod and branch catalog.json
# if: ${{ steps.prod_manifest.outputs.catalog_found == 'true' }}
# run: "dbt-coves generate docs --merge-deferred --state logs"

# - name: Generate dbt Docs
# if: ${{ steps.prod_manifest.outputs.catalog_found == 'false' }}
# run: "dbt docs generate"

# - name: Run governance checks
# run: "pre-commit run --from-ref origin/${{ github.event.pull_request.base.ref }} --to-ref HEAD"

# ##### Real dbt run given that we passed governance checks
# - name: Run dbt build slim mode
# if: ${{ steps.prod_manifest.outputs.manifest_found == 'true' }}
# run: "dbt build --fail-fast --defer --state logs --select state:modified+ ${{ env.FULL_REFRESH_FLAG }}"

# - name: Run dbt build full run
# if: ${{ steps.prod_manifest.outputs.manifest_found == 'false' }}
# run: "dbt build --fail-fast ${{ env.FULL_REFRESH_FLAG }}"

# - name: Grant access to PR database
# id: grant-access-to-database
# run: "dbt --no-write-json run-operation grant_access_to_pr_database"

# # We drop the database when there is a failure to grant access to the db because
# # most likely the schema was not set properly in dbt_project.yml so models built to default schema
# - name: Drop PR database on Failure to grant security access
# if: always() && (env.DATACOVES__DROP_DB_ON_FAIL == 'true') && (steps.grant-access-to-database.outcome == 'failure')
# run: "dbt --no-write-json run-operation drop_recreate_db --args '{db_name: ${{env.DATACOVES__MAIN__DATABASE}}, recreate: False}'" # yamllint disable-line rule:line-length

dbt-job-status:
runs-on: ubuntu-latest
Expand Down
175 changes: 175 additions & 0 deletions automate/dbt/push_dbt_artifacts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#!/usr/bin/env -S uv run
# /// script
# dependencies = [
# "requests",
# "python-dotenv",
# "rich",
# ]
# ///

import requests
import os
from dotenv import load_dotenv
import json
from rich import print_json
from rich.console import Console
from rich.table import Table


load_dotenv()
base_url = os.getenv("DATACOVES__API_ENDPOINT")
token = os.getenv("DATACOVES__API_TOKEN")
account_id = os.getenv("DATACOVES__ACCOUNT_ID")
project_slug = os.getenv("DATACOVES__PROJECT_SLUG")
environment_slug = os.getenv("DATACOVES__ENVIRONMENT_SLUG")
dbt_home = os.getenv("DATACOVES__DBT_HOME")


#######################################
# Utility for api interactions
#######################################
def print_responce(r):
print("STATUS:", r.status_code)

response_text = r.text

try:
parsed_json = json.loads(response_text)
print_json(data=parsed_json)
except json.JSONDecodeError:
print("RESPONSE:", response_text)

print("-----------------------")

def print_table(items, keys_to_show, title="Items"):
"""Print a table showing only specified keys from a list of dictionaries"""
console = Console()
table = Table(title=title)

# Define different colors for each column
colors = ["blue", "bright_green", "yellow", "green", "cyan", "magenta", "red", "bright_cyan", "bright_magenta", "bright_yellow"]

# Add columns for each key we want to show with different colors
for index, key in enumerate(keys_to_show):
color = colors[index % len(colors)] # Cycle through colors if more columns than colors
table.add_column(key.replace('_', ' ').title(), style=color)

# Add rows for each item in the list
for item in items:
row_values = []
for key in keys_to_show:
value = item.get(key, "N/A")
row_values.append(str(value))
table.add_row(*row_values)

console.print(table)

def get_endpoint(endpoint: str) -> str:
return f"{base_url}/{endpoint}"

def get_headers() -> dict:
return {
"Accept": "application/json",
"Authorization": f"Bearer {token}"
}

#######################################
# Get information
#######################################

def health_check():
print("Checking Health of api")

r = requests.get(
url=get_endpoint(endpoint="/api/v3/healthcheck"),
headers=get_headers(),
)

print_responce(r)

#######################################
# Working with files
#######################################

def list_project_files(account_id: int, project_slug: str):
print(f"Listing files for project: {project_slug}")

r = requests.get(
# url=get_endpoint(endpoint=f"/api/v3/datacoves/account/{account_id}/projects/{project_slug}/files"),

url=get_endpoint(endpoint=f"api/v3/accounts/{account_id}/projects/{project_slug}/files"),

headers=get_headers(),
)

return r.json().get("data", {})

def upload_env_file(account_id: int, project_slug: str, env_slug: str,
filename: str, is_manifest: bool = False,
dag_id: str = None, run_id: str = None, use_multipart: bool = False):

print(f"Uploading file {filename} to project: {project_slug} in environment: {env_slug}")

file = {"file": (filename, open(f"{dbt_home}/target/{filename}", "rb"))}

data = {
'filename': filename,
'is_manifest': str(is_manifest).lower()
}

r = requests.post(
url=get_endpoint(endpoint=f"api/v3/accounts/{account_id}/projects/{project_slug}/environments/{env_slug}/files"),
headers=get_headers(),
files=file,
data=data
)

print_responce(r)

def promote_env_file(account_id: int, project_slug: str, env_slug: str,
filename: str):

print(f"Promoting file {filename} in environment: {env_slug} to project level ({project_slug})")

r = requests.post(
url=get_endpoint(endpoint=f"api/v3/accounts/{account_id}/projects/{project_slug}/environments/{env_slug}/files/{filename}/promote"),
headers=get_headers()
)

print_responce(r)

def delete_project_file(account_id: int, project_slug: str, filename: str):

print(f"Deleting file {filename} from project: {project_slug}")

r = requests.delete(
url=get_endpoint(endpoint=f"api/v3/accounts/{account_id}/projects/{project_slug}/files/{filename}"),
headers=get_headers()
)

print_responce(r)

if __name__ == "__main__":
# Get infomration

health_check()

cols = ["environment_slug",'filename', 'metadata', 'inserted_at']

# UPLOAD FILES

filenames = ["graph.gpickle", "graph_summary.json", "partial_parse.msgpack"]
for filename in filenames:
upload_env_file(account_id, project_slug, environment_slug, filename)

for filename in filenames:
promote_env_file(account_id, project_slug, environment_slug, filename)

upload_env_file(account_id, project_slug, environment_slug, "manifest.json", is_manifest=True )
promote_env_file(account_id, project_slug, environment_slug, "manifest.json" )

# delete_project_file(account_id, project_slug, "manifest.json")

# SHOW FILE DETAILS
files = list_project_files(account_id, project_slug)
print_table(files, cols)
11 changes: 5 additions & 6 deletions orchestrate/dags/daily_loan_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,13 @@ def extract_and_load_fivetran():
tooltip="dlt Extract and Load"
)
def extract_and_load_dlt():
@task.datacoves_bash
@task.datacoves_bash(
env = datacoves_utils.set_dlt_env_vars({"destinations": ["main_load_keypair"]}),
append_env=True
)
def load_loans_data():
from orchestrate.utils import datacoves_utils

env_vars = datacoves_utils.set_dlt_env_vars({"destinations": ["main_load_keypair"]})
env_exports = datacoves_utils.generate_env_exports(env_vars)

return f"{env_exports}; cd load/dlt && ./loans_data.py"
return "cd load/dlt && ./loans_data.py"

load_loans_data()

Expand Down
12 changes: 5 additions & 7 deletions orchestrate/dags/other_examples/load_dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@
)
def load_with_dlt():

@task.datacoves_bash
@task.datacoves_bash(
env = datacoves_utils.set_dlt_env_vars({"destinations": ["main_load_keypair"]}),
append_env=True
)
def load_us_population():
from orchestrate.utils import datacoves_utils

env_vars = datacoves_utils.set_dlt_env_vars({"destinations": ["main_load_keypair"]})
env_exports = datacoves_utils.generate_env_exports(env_vars)

return f"{env_exports}; cd load/dlt && ./us_population.py"
return "cd load/dlt && ./us_population.py"

load_us_population()

Expand Down
Loading