Skip to content
This repository has been archived by the owner on Jan 19, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1 from pleo-io/feat/ignore-undefined-metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
Luca Florio authored Mar 12, 2021
2 parents b19ef96 + 790533c commit d3b9177
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 40 deletions.
22 changes: 14 additions & 8 deletions dbtmetabase/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

__version__ = '0.5.2'

def export(dbt_path: str,

def export(dbt_path: str,
mb_host: str, mb_user: str, mb_password: str,
database: str, schema: str,
mb_https = True, sync = True, sync_timeout = 30,
includes = [], excludes = []):
mb_https = True, sync = True, sync_timeout = 30,
includes = [], excludes = [],
ignore_undefined = False):
"""Exports models from dbt to Metabase.
Arguments:
Expand All @@ -26,24 +28,26 @@ def export(dbt_path: str,
sync_timeout {int} -- Synchronization timeout in seconds. (default: {30})
includes {list} -- Model names to limit processing to. (default: {[]})
excludes {list} -- Model names to exclude. (default: {[]})
ignore_undefined {bool} -- Ignore properties not defined in dbt model. (default: {False})
"""

mbc = MetabaseClient(mb_host, mb_user, mb_password, mb_https)
models = DbtReader(dbt_path).read_models(
includes=includes,
includes=includes,
excludes=excludes
)

if sync:
if not mbc.sync_and_wait(database, schema, models, sync_timeout):
logging.critical("Sync timeout reached, models still not compatible")
return

mbc.export_models(database, schema, models)

mbc.export_models(database, schema, models, ignore_undefined)


def main(args: list = None):
import argparse

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)

parser = argparse.ArgumentParser(
Expand All @@ -61,6 +65,7 @@ def main(args: list = None):
parser.add_argument('--sync_timeout', metavar='SECS', type=int, default=30, help="synchronization timeout (in secs)")
parser.add_argument('--includes', metavar='MODELS', nargs='*', default=[], help="model names to limit processing to")
parser.add_argument('--excludes', metavar='MODELS', nargs='*', default=[], help="model names to exclude")
parser.add_argument('--ignore_undefined', metavar='IGNORE_UNDEFINED', type=bool, default=False, help="ignore properties not defined in dbt models")
parsed = parser.parse_args(args=args)

if parsed.command == 'export':
Expand All @@ -75,5 +80,6 @@ def main(args: list = None):
sync=parsed.sync,
sync_timeout=parsed.sync_timeout,
includes=parsed.includes,
excludes=parsed.excludes
excludes=parsed.excludes,
ignore_undefined=parsed.ignore_undefined
)
71 changes: 39 additions & 32 deletions dbtmetabase/metabase.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, host: str, user: str, password: str, https = True):
self.protocol = "https" if https else "http"
self.session_id = self.get_session_id(user, password)
logging.info("Session established successfully")

def get_session_id(self, user: str, password: str) -> str:
"""Obtains new session ID from API.
Expand All @@ -43,7 +43,7 @@ def get_session_id(self, user: str, password: str) -> str:
'username': user,
'password': password
})['id']

def sync_and_wait(self, database: str, schema: str, models: list, timeout = 30) -> bool:
"""Synchronize with the database and wait for schema compatibility.
Expand All @@ -67,7 +67,7 @@ def sync_and_wait(self, database: str, schema: str, models: list, timeout = 30)
if not database_id:
logging.critical("Cannot find database by name %s", database)
return

self.api('post', f'/api/database/{database_id}/sync_schema')

deadline = int(time.time()) + timeout
Expand Down Expand Up @@ -108,35 +108,37 @@ def models_compatible(self, database_id: str, schema: str, models: list) -> bool
if column_name not in table_lookup:
logging.warn("Column %s not found in model %s", column_name, model_name)
are_models_compatible = False

return are_models_compatible

def export_models(self, database: str, schema: str, models: list):
def export_models(self, database: str, schema: str, models: list, ignore_undefined: bool):
"""Exports dbt models to Metabase database schema.
Arguments:
database {str} -- Metabase database name.
schema {str} -- Metabase schema name.
models {list} -- List of dbt models read from project.
ignore_undefined {bool} -- Ignore undefined properties.
"""

database_id = self.find_database_id(database)
if not database_id:
logging.critical("Cannot find database by name %s", database)
return

table_lookup, field_lookup = self.build_metadata_lookups(database_id, schema)

for model in models:
self.export_model(model, table_lookup, field_lookup)
def export_model(self, model: dict, table_lookup: dict, field_lookup: dict):
self.export_model(model, table_lookup, field_lookup, ignore_undefined)

def export_model(self, model: dict, table_lookup: dict, field_lookup: dict, ignore_undefined: bool):
"""Exports one dbt model to Metabase database schema.
Arguments:
model {dict} -- One dbt model read from project.
table_lookup {dict} -- Dictionary of Metabase tables indexed by name.
field_lookup {dict} -- Dictionary of Metabase fields indexed by name, indexed by table name.
ignore_undefined {bool} -- Ignore undefined properties.
"""

model_name = model['name'].upper()
Expand All @@ -157,15 +159,16 @@ def export_model(self, model: dict, table_lookup: dict, field_lookup: dict):
logging.info("Table %s is up-to-date", model_name)

for column in model.get('columns', []):
self.export_column(model_name, column, field_lookup)
def export_column(self, model_name: str, column: dict, field_lookup: dict):
self.export_column(model_name, column, field_lookup, ignore_undefined)

def export_column(self, model_name: str, column: dict, field_lookup: dict, ignore_undefined: bool):
"""Exports one dbt column to Metabase database schema.
Arguments:
model_name {str} -- One dbt model name read from project.
column {dict} -- One dbt column read from project.
field_lookup {dict} -- Dictionary of Metabase fields indexed by name, indexed by table name.
ignore_undefined {bool} -- Ignore undefined properties.
"""

column_name = column['name'].upper()
Expand All @@ -174,7 +177,7 @@ def export_column(self, model_name: str, column: dict, field_lookup: dict):
if not field:
logging.error('Field %s.%s does not exist in Metabase', model_name, column_name)
return

field_id = field['id']
fk_target_field_id = None
if column.get('special_type') == 'type/FK':
Expand All @@ -183,35 +186,39 @@ def export_column(self, model_name: str, column: dict, field_lookup: dict):
fk_target_field_id = field_lookup.get(target_table, {}) \
.get(target_field, {}) \
.get('id')

if fk_target_field_id:
self.api('put', f'/api/field/{fk_target_field_id}', json={
'special_type': 'type/PK'
})
else:
logging.error("Unable to find foreign key target %s.%s", target_table, target_field)

# Nones are not accepted, default to normal
if not column.get('visibility_type'):
column['visibility_type'] = 'normal'

api_field = self.api('get', f'/api/field/{field_id}')

if api_field['description'] != column.get('description') or \
api_field['special_type'] != column.get('special_type') or \
api_field['visibility_type'] != column.get('visibility_type') or \
api_field['fk_target_field_id'] != fk_target_field_id:
payload = {}
payload_fields = ['description', 'special_type', 'visibility_type']
for name in payload_fields:
mb_value = api_field[name]
dbt_value = column.get(name)
# Add null properties to payload only if they should not be ignored
if mb_value != dbt_value and (dbt_value or not ignore_undefined):
# Nones are not accepted, default to normal
if name == 'visibility_type':
payload[name] = 'normal'
else:
payload[name] = dbt_value

if api_field['fk_target_field_id'] != fk_target_field_id and (fk_target_field_id or not ignore_undefined):
payload['fk_target_field_id'] = fk_target_field_id

if payload:
# Update with new values
self.api('put', f'/api/field/{field_id}', json={
'description': column.get('description'),
'special_type': column.get('special_type'),
'visibility_type': column.get('visibility_type'),
'fk_target_field_id': fk_target_field_id
})
self.api('put', f'/api/field/{field_id}', json=payload)
logging.info("Updated field %s.%s successfully", model_name, column_name)
else:
logging.info("Field %s.%s is up-to-date", model_name, column_name)

def find_database_id(self, name: str) -> str:
"""Finds Metabase database ID by name.
Expand All @@ -226,7 +233,7 @@ def find_database_id(self, name: str) -> str:
if database['name'].upper() == name.upper():
return database['id']
return None

def build_metadata_lookups(self, database_id: str, schema: str) -> (dict, dict):
"""Builds table and field lookups.
Expand Down Expand Up @@ -262,7 +269,7 @@ def build_metadata_lookups(self, database_id: str, schema: str) -> (dict, dict):
table_field_lookup[field_name] = field

field_lookup[table_name] = table_field_lookup

return table_lookup, field_lookup

def api(self, method: str, path: str, authenticated = True, critical = True, **kwargs) -> Any:
Expand All @@ -285,7 +292,7 @@ def api(self, method: str, path: str, authenticated = True, critical = True, **k
kwargs['headers'] = headers
else:
headers = kwargs['headers'].copy()

if authenticated:
headers['X-Metabase-Session'] = self.session_id

Expand Down

0 comments on commit d3b9177

Please sign in to comment.