Skip to content

Commit 4739be2

Browse files
authored
Merge pull request #188 from analyst-collective/development
Release 0.5.1
2 parents d50bb54 + 17e5539 commit 4739be2

25 files changed

+959
-139
lines changed

.bumpversion.cfg

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 0.5.0
2+
current_version = 0.5.1
33
commit = True
44
tag = True
55

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ dbt (data build tool) helps analysts write reliable, modular code using a workfl
88
- [What is dbt]?
99
- Read the [dbt viewpoint]
1010
- [Installation]
11-
- Join the [chat][gittr-url] on Gittr.
11+
- Join the [chat][slack-url] on Slack for live questions and support.
1212

1313

1414
## Code of Conduct
@@ -18,7 +18,7 @@ Everyone interacting in the dbt project's codebases, issue trackers, chat rooms,
1818

1919

2020
[PyPA Code of Conduct]: https://www.pypa.io/en/latest/code-of-conduct/
21-
[gittr-url]: https://gitter.im/analyst-collective/dbt
21+
[slack-url]: http://ac-slackin.herokuapp.com/
2222
[Installation]: http://dbt.readthedocs.io/en/master/guide/setup/
2323
[What is dbt]: http://dbt.readthedocs.io/en/master/about/overview/
2424
[dbt viewpoint]: http://dbt.readthedocs.io/en/master/about/viewpoint/

dbt/archival.py

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
2+
from __future__ import print_function
3+
import dbt.targets
4+
import dbt.schema
5+
import dbt.templates
6+
import jinja2
7+
8+
9+
class Archival(object):
10+
11+
def __init__(self, project, archive_model):
12+
self.archive_model = archive_model
13+
self.project = project
14+
15+
self.target = dbt.targets.get_target(self.project.run_environment())
16+
self.schema = dbt.schema.Schema(self.project, self.target)
17+
18+
def compile(self):
19+
source_schema = self.archive_model.source_schema
20+
target_schema = self.archive_model.target_schema
21+
source_table = self.archive_model.source_table
22+
target_table = self.archive_model.target_table
23+
unique_key = self.archive_model.unique_key
24+
updated_at = self.archive_model.updated_at
25+
26+
self.schema.create_schema(target_schema)
27+
28+
source_columns = self.schema.get_columns_in_table(source_schema, source_table)
29+
30+
if len(source_columns) == 0:
31+
raise RuntimeError('Source table "{}"."{}" does not exist'.format(source_schema, source_table))
32+
33+
extra_cols = [
34+
dbt.schema.Column("valid_from", "timestamp", None),
35+
dbt.schema.Column("valid_to", "timestamp", None),
36+
dbt.schema.Column("scd_id","text", None),
37+
dbt.schema.Column("dbt_updated_at","timestamp", None)
38+
]
39+
40+
dest_columns = source_columns + extra_cols
41+
self.schema.create_table(target_schema, target_table, dest_columns, sort=updated_at, dist=unique_key)
42+
43+
env = jinja2.Environment()
44+
45+
ctx = {
46+
"columns" : source_columns,
47+
"updated_at" : updated_at,
48+
"unique_key" : unique_key,
49+
"source_schema" : source_schema,
50+
"source_table" : source_table,
51+
"target_schema" : target_schema,
52+
"target_table" : target_table
53+
}
54+
55+
base_query = dbt.templates.SCDArchiveTemplate
56+
template = env.from_string(base_query, globals=ctx)
57+
rendered = template.render(ctx)
58+
59+
return rendered
60+
61+
def runtime_compile(self, compiled_model):
62+
context = self.context.copy()
63+
context.update(model.context())
64+
model.compile(context)
65+

dbt/compilation.py

+74-8
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,19 @@
77
from dbt.source import Source
88
from dbt.utils import find_model_by_fqn, find_model_by_name, dependency_projects, split_path, This, Var, compiler_error
99
from dbt.linker import Linker
10+
import dbt.targets
11+
import dbt.templates
1012
import time
1113
import sqlparse
1214

15+
CompilableEntities = ["models", "tests", "archives", "analyses"]
16+
1317
class Compiler(object):
1418
def __init__(self, project, create_template_class):
1519
self.project = project
1620
self.create_template = create_template_class()
21+
self.macro_generator = None
22+
self.target = self.get_target()
1723

1824
def initialize(self):
1925
if not os.path.exists(self.project['target-path']):
@@ -24,7 +30,7 @@ def initialize(self):
2430

2531
def get_target(self):
2632
target_cfg = self.project.run_environment()
27-
return RedshiftTarget(target_cfg)
33+
return dbt.targets.get_target(target_cfg)
2834

2935
def model_sources(self, this_project, own_project=None):
3036
if own_project is None:
@@ -35,9 +41,21 @@ def model_sources(self, this_project, own_project=None):
3541
return Source(this_project, own_project=own_project).get_models(paths, self.create_template)
3642
elif self.create_template.label == 'test':
3743
return Source(this_project, own_project=own_project).get_test_models(paths, self.create_template)
44+
elif self.create_template.label == 'archive':
45+
return []
3846
else:
3947
raise RuntimeError("unexpected create template type: '{}'".format(self.create_template.label))
4048

49+
def get_macros(self, this_project, own_project=None):
50+
if own_project is None:
51+
own_project = this_project
52+
paths = own_project.get('macro-paths', [])
53+
return Source(this_project, own_project=own_project).get_macros(paths)
54+
55+
def get_archives(self, project):
56+
archive_template = dbt.templates.ArchiveInsertTemplate()
57+
return Source(project, own_project=project).get_archives(archive_template)
58+
4159
def project_schemas(self):
4260
source_paths = self.project.get('source-paths', [])
4361
return Source(self.project).get_schemas(source_paths)
@@ -147,16 +165,30 @@ def wrapped_do_ref(*args):
147165

148166
def get_context(self, linker, model, models):
149167
context = self.project.context()
168+
169+
# built-ins
150170
context['ref'] = self.__ref(linker, context, model, models)
151171
context['config'] = self.__model_config(model, linker)
152172
context['this'] = This(context['env']['schema'], model.immediate_name, model.name)
153-
context['compiled_at'] = time.strftime('%Y-%m-%d %H:%M:%S')
154173
context['var'] = Var(model, context=context)
174+
175+
# these get re-interpolated at runtime!
176+
context['run_started_at'] = '{{ run_started_at }}'
177+
context['invocation_id'] = '{{ invocation_id }}'
178+
179+
# add in context from run target
180+
context.update(self.target.context)
181+
182+
# add in macros (can we cache these somehow?)
183+
for macro_name, macro in self.macro_generator(context):
184+
context[macro_name] = macro
185+
155186
return context
156187

157188
def compile_model(self, linker, model, models):
158189
try:
159-
jinja = jinja2.Environment(loader=jinja2.FileSystemLoader(searchpath=model.root_dir))
190+
fs_loader = jinja2.FileSystemLoader(searchpath=model.root_dir)
191+
jinja = jinja2.Environment(loader=fs_loader)
160192

161193
# this is a dumb jinja2 bug -- on windows, forward slashes are EXPECTED
162194
posix_filepath = '/'.join(split_path(model.rel_filepath))
@@ -169,8 +201,8 @@ def compile_model(self, linker, model, models):
169201

170202
return rendered
171203

172-
def write_graph_file(self, linker):
173-
filename = 'graph-{}.yml'.format(self.create_template.label)
204+
def write_graph_file(self, linker, label):
205+
filename = 'graph-{}.yml'.format(label)
174206
graph_path = os.path.join(self.project['target-path'], filename)
175207
linker.write_graph(graph_path)
176208

@@ -297,13 +329,39 @@ def compile_schema_tests(self, linker):
297329

298330
return written_tests
299331

332+
def generate_macros(self, all_macros):
333+
def do_gen(ctx):
334+
macros = []
335+
for macro in all_macros:
336+
new_macros = macro.get_macros(ctx)
337+
macros.extend(new_macros)
338+
return macros
339+
return do_gen
340+
341+
def compile_archives(self):
342+
linker = Linker()
343+
all_archives = self.get_archives(self.project)
344+
345+
for archive in all_archives:
346+
sql = archive.compile()
347+
fqn = tuple(archive.fqn)
348+
linker.update_node_data(fqn, archive.serialize())
349+
self.__write(archive.build_path(), sql)
350+
351+
self.write_graph_file(linker, 'archive')
352+
return all_archives
353+
300354
def compile(self, dry=False):
301355
linker = Linker()
302356

303357
all_models = self.model_sources(this_project=self.project)
358+
all_macros = self.get_macros(this_project=self.project)
304359

305360
for project in dependency_projects(self.project):
306361
all_models.extend(self.model_sources(this_project=self.project, own_project=project))
362+
all_macros.extend(self.get_macros(this_project=self.project, own_project=project))
363+
364+
self.macro_generator = self.generate_macros(all_macros)
307365

308366
enabled_models = [model for model in all_models if model.is_enabled]
309367

@@ -314,11 +372,19 @@ def compile(self, dry=False):
314372

315373
self.validate_models_unique(compiled_models)
316374
self.validate_models_unique(written_schema_tests)
317-
self.write_graph_file(linker)
375+
self.write_graph_file(linker, self.create_template.label)
318376

319-
if self.create_template.label != 'test':
377+
if self.create_template.label not in ['test', 'archive']:
320378
written_analyses = self.compile_analyses(linker, compiled_models)
321379
else:
322380
written_analyses = []
323381

324-
return len(written_models), len(written_schema_tests), len(written_analyses)
382+
383+
compiled_archives = self.compile_archives()
384+
385+
return {
386+
"models": len(written_models),
387+
"tests" : len(written_schema_tests),
388+
"archives": len(compiled_archives),
389+
"analyses" : len(written_analyses)
390+
}

dbt/compiled_model.py

+34
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import hashlib
2+
import jinja2
3+
from dbt.utils import compiler_error
24

35
class CompiledModel(object):
46
def __init__(self, fqn, data):
57
self.fqn = fqn
68
self.data = data
9+
self.nice_name = ".".join(fqn)
710

811
# these are set just before the models are executed
912
self.tmp_drop_type = None
@@ -12,6 +15,7 @@ def __init__(self, fqn, data):
1215

1316
self.skip = False
1417
self._contents = None
18+
self.compiled_contents = None
1519

1620
def __getitem__(self, key):
1721
return self.data[key]
@@ -20,6 +24,9 @@ def hashed_name(self):
2024
fqn_string = ".".join(self.fqn)
2125
return hashlib.md5(fqn_string.encode('utf-8')).hexdigest()
2226

27+
def context(self):
28+
return self.data
29+
2330
def hashed_contents(self):
2431
return hashlib.md5(self.contents.encode('utf-8')).hexdigest()
2532

@@ -39,6 +46,15 @@ def contents(self):
3946
self._contents = fh.read()
4047
return self._contents
4148

49+
def compile(self, context):
50+
contents = self.contents
51+
try:
52+
env = jinja2.Environment()
53+
self.compiled_contents = env.from_string(contents).render(context)
54+
return self.compiled_contents
55+
except jinja2.exceptions.TemplateSyntaxError as e:
56+
compiler_error(self, str(e))
57+
4258
@property
4359
def materialization(self):
4460
return self.data['materialized']
@@ -98,13 +114,31 @@ def prepare(self, existing, target):
98114
def __repr__(self):
99115
return "<CompiledModel {}.{}: {}>".format(self.data['project_name'], self.name, self.data['build_path'])
100116

117+
class CompiledArchive(CompiledModel):
118+
def __init__(self, fqn, data):
119+
super(CompiledArchive, self).__init__(fqn, data)
120+
121+
def should_rename(self):
122+
return False
123+
124+
def should_execute(self):
125+
return True
126+
127+
def prepare(self, existing, target):
128+
self.target = target
129+
130+
def __repr__(self):
131+
return "<CompiledArchive {}.{}: {}>".format(self.data['project_name'], self.name, self.data['build_path'])
132+
101133
def make_compiled_model(fqn, data):
102134
run_type = data['dbt_run_type']
103135

104136
if run_type in ['run', 'dry-run']:
105137
return CompiledModel(fqn, data)
106138
elif run_type == 'test':
107139
return CompiledTest(fqn, data)
140+
elif run_type == 'archive':
141+
return CompiledArchive(fqn, data)
108142
else:
109143
raise RuntimeError("invalid run_type given: {}".format(run_type))
110144

dbt/main.py

+7
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import dbt.task.init as init_task
1717
import dbt.task.seed as seed_task
1818
import dbt.task.test as test_task
19+
import dbt.task.archive as archive_task
1920
import dbt.tracking
2021

2122

@@ -71,9 +72,14 @@ def handle(args):
7172
sub = subs.add_parser('deps', parents=[base_subparser])
7273
sub.set_defaults(cls=deps_task.DepsTask, which='deps')
7374

75+
sub = subs.add_parser('archive', parents=[base_subparser])
76+
sub.add_argument('--threads', type=int, required=False, help="Specify number of threads to use while archiving tables. Overrides settings in profiles.yml")
77+
sub.set_defaults(cls=archive_task.ArchiveTask, which='archive')
78+
7479
sub = subs.add_parser('run', parents=[base_subparser])
7580
sub.add_argument('--dry', action='store_true', help="'dry run' models")
7681
sub.add_argument('--models', required=False, nargs='+', help="Specify the models to run. All models depending on these models will also be run")
82+
sub.add_argument('--threads', type=int, required=False, help="Specify number of threads to use while executing models. Overrides settings in profiles.yml")
7783
sub.set_defaults(cls=run_task.RunTask, which='run')
7884

7985
sub = subs.add_parser('seed', parents=[base_subparser])
@@ -83,6 +89,7 @@ def handle(args):
8389
sub = subs.add_parser('test', parents=[base_subparser])
8490
sub.add_argument('--skip-test-creates', action='store_true', help="Don't create temporary views to validate model SQL")
8591
sub.add_argument('--validate', action='store_true', help='Run constraint validations from schema.yml files')
92+
sub.add_argument('--threads', type=int, required=False, help="Specify number of threads to use while executing tests. Overrides settings in profiles.yml")
8693
sub.set_defaults(cls=test_task.TestTask, which='test')
8794

8895
if len(args) == 0: return p.print_help()

0 commit comments

Comments
 (0)