-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathchange_set.py
More file actions
480 lines (411 loc) · 19.5 KB
/
change_set.py
File metadata and controls
480 lines (411 loc) · 19.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
import glob
import json
import os
import re
import string
from collections import Counter
from typing import Dict, Optional
from beartype import BeartypeConf, BeartypeStrategy, beartype
from beartype.typing import Callable, List
from loguru import logger
from pydantic import BaseModel, Field
from rich.console import Console
from rich.table import Table
from dbt_jobs_as_code.client import DBTCloud, DBTCloudException
from dbt_jobs_as_code.loader.load import LoadingJobsYAMLError, load_job_configuration
from dbt_jobs_as_code.schemas import check_env_var_same, check_job_mapping_same
from dbt_jobs_as_code.schemas.custom_environment_variable import CustomEnvironmentVariablePayload
from dbt_jobs_as_code.schemas.job import JobDefinition
# Dynamically create a new @nobeartype decorator disabling type-checking.
nobeartype = beartype(conf=BeartypeConf(strategy=BeartypeStrategy.O0))
def json_serializer_type(obj):
if isinstance(obj, type):
return obj.__name__
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")
class Change(BaseModel):
"""Describes what a given change is and how to apply it."""
identifier: str
type: str
action: str
proj_id: int
env_id: int
sync_function: Callable
parameters: dict
differences: Optional[Dict] = {}
def __str__(self):
return f"{self.action.upper()} {string.capwords(self.type)} {self.identifier}"
def apply(self):
return self.sync_function(**self.parameters)
class ChangeSet(BaseModel):
"""Store the set of changes to be displayed or applied."""
root: List[Change] = []
apply_success: bool = True
applied_changes: list[dict] = Field(default_factory=list)
def __iter__(self):
return iter(self.root)
def append(self, change: Change):
self.root.append(change)
def __str__(self):
list_str = [str(change) for change in self.root]
return "\n".join(list_str)
def to_table(self) -> Table:
"""Return a table representation of the changeset."""
table = Table(title="Changes detected")
table.add_column("Action", style="cyan", no_wrap=True)
table.add_column("Type", style="magenta")
table.add_column("ID", style="green")
table.add_column("Proj ID", style="yellow")
table.add_column("Env ID", style="red")
for change in self.root:
table.add_row(
change.action.upper(),
string.capwords(change.type),
change.identifier,
str(change.proj_id),
str(change.env_id),
)
return table
def to_json(self) -> dict:
"""Return a structured JSON representation of the changeset."""
job_changes = []
env_var_changes = []
for change in self.root:
# Create the base change dictionary for overall changes
overall_change_dict = {
"action": change.action.upper(),
"type": string.capwords(change.type),
"identifier": change.identifier,
"project_id": change.proj_id,
"environment_id": change.env_id,
"differences": change.differences,
}
if change.type == "job":
job_changes.append(overall_change_dict)
elif change.type == "env var overwrite":
env_var_changes.append(overall_change_dict)
return {
"job_changes": job_changes,
"env_var_overwrite_changes": env_var_changes,
}
def __len__(self):
return len(self.root)
def to_applied_json(self) -> dict:
job_changes = []
env_var_changes = []
for change in self.applied_changes:
if change["type"] == "job":
job_changes.append(change)
elif change["type"] == "env var overwrite":
env_var_changes.append(change)
return {
"job_changes": job_changes,
"env_var_overwrite_changes": env_var_changes,
}
def apply(self, fail_fast: bool = False):
self.apply_success = True
self.applied_changes = []
for change in self.root:
try:
result = change.apply()
applied_change = {
"action": change.action.upper(),
"type": change.type,
"identifier": change.identifier,
"project_id": change.proj_id,
"environment_id": change.env_id,
}
if change.type == "job":
job_id = None
if isinstance(result, JobDefinition):
job_id = result.id
else:
job_param = change.parameters.get("job")
if isinstance(job_param, JobDefinition):
job_id = job_param.id
applied_change["job_id"] = job_id
elif change.type == "env var overwrite":
env_var_id = None
job_definition_id = None
if isinstance(result, CustomEnvironmentVariablePayload):
env_var_id = result.id
job_definition_id = result.job_definition_id
else:
env_var_id = change.parameters.get("env_var_id")
job_definition_id = change.parameters.get("job_id")
applied_change["env_var_id"] = env_var_id
applied_change["job_id"] = job_definition_id
self.applied_changes.append(applied_change)
except DBTCloudException:
self.apply_success = False
if fail_fast:
logger.error(f"Operation failed for {change}, stopping due to --fail-fast")
break
# Don't bear type this function as we do some odd things in tests
@nobeartype
def filter_config(
defined_jobs: dict[str, JobDefinition], project_ids: List[int], environment_ids: List[int]
) -> dict[str, JobDefinition]:
"""Filters the config based on the inputs provided for project ids and environment ids."""
removed_job_ids: set[str] = set()
if len(environment_ids) != 0:
for job_id, job in defined_jobs.items():
if job.environment_id not in environment_ids:
removed_job_ids.add(job_id)
logger.warning(
f"For Job# {job.identifier}, environment_id(s) provided as arguments does not match the ID's in Jobs YAML file!!!"
)
if len(project_ids) != 0:
for job_id, job in defined_jobs.items():
if job.project_id not in project_ids:
removed_job_ids.add(job_id)
logger.warning(
f"For Job# {job.identifier}, project_id(s) provided as arguments does not match the ID's in Jobs YAML file!!!"
)
return {job_id: job for job_id, job in defined_jobs.items() if job_id not in removed_job_ids}
def _check_no_duplicate_job_identifier(remote_jobs: List[JobDefinition]):
"""Check if there are duplicate job identifiers in dbt Cloud.
If so, raise some error logs"""
count_identifiers = Counter(
[job.identifier for job in remote_jobs if job.identifier is not None]
)
multiple_counts = {item: count for item, count in count_identifiers.items() if count > 1}
jobs_affected = [job for job in remote_jobs if job.identifier in multiple_counts]
for job in jobs_affected:
logger.error(
f"The job {job.id} has a duplicate identifier '{job.identifier}' in dbt Cloud: {job.to_url(account_url=os.environ.get('DBT_BASE_URL', 'https://cloud.getdbt.com'))}"
)
def _check_single_account_id(defined_jobs: List[JobDefinition]):
"""Check if there are duplicate job identifiers in dbt Cloud.
If so, raise some error logs"""
count_account_ids = Counter([job.account_id for job in defined_jobs])
if len(count_account_ids) > 1:
logger.error(
f"There are different account_id in the jobs YAML file: {count_account_ids.keys()}"
)
def build_change_set(
config: str,
yml_vars: Optional[str],
disable_ssl_verification: bool,
project_ids: List[int],
environment_ids: List[int],
limit_projects_envs_to_yml: bool = False,
exclude_identifiers_matching: Optional[str] = None,
output_json: bool = False,
use_desc_for_id: bool = False,
):
"""Compares the config of YML files versus dbt Cloud.
Depending on the value of no_update, it will either update the dbt Cloud config or not.
CONFIG is the path to your jobs.yml config file.
"""
# If the config is a directory, we automatically search for all the `*.yml` files in this directory
if os.path.isdir(config):
config = os.path.join(config, "*.yml")
# Get list of files matching the glob pattern
config_files = glob.glob(config, recursive=True)
if not config_files:
logger.error(f"No files found matching pattern: {config}")
return ChangeSet()
yml_vars_files = glob.glob(yml_vars, recursive=True) if yml_vars else None
try:
configuration = load_job_configuration(config_files, yml_vars_files)
except (LoadingJobsYAMLError, KeyError) as e:
logger.error(f"Error loading jobs YAML file ({type(e).__name__}): {e}")
exit(1)
if limit_projects_envs_to_yml:
# if limit_projects_envs_to_yml is True, we keep all the YML jobs
defined_jobs = configuration.jobs
# and only the remote jobs with project_id and environment_id existing in the job YML file are considered
project_ids = list({job.project_id for job in defined_jobs.values()})
environment_ids = list({job.environment_id for job in defined_jobs.values()})
else:
# If a project_id or environment_id is passed in as a parameter (one or multiple), check if these match the ID's in Jobs YAML file, otherwise add a warning and continue the process
unfiltered_defined_jobs = configuration.jobs
defined_jobs = filter_config(unfiltered_defined_jobs, project_ids, environment_ids)
if len(defined_jobs) == 0:
logger.warning(
"No jobs found in the Jobs YAML file after filtering based on the project_id and environment_id provided as arguments!!!"
)
return ChangeSet()
_check_single_account_id(list(defined_jobs.values()))
dbt_cloud = DBTCloud(
account_id=list(defined_jobs.values())[0].account_id,
api_key=os.environ.get("DBT_API_KEY"),
base_url=os.environ.get("DBT_BASE_URL", "https://cloud.getdbt.com"),
disable_ssl_verification=disable_ssl_verification,
use_desc_for_id=use_desc_for_id,
)
cloud_jobs = dbt_cloud.get_jobs(project_ids=project_ids, environment_ids=environment_ids)
_check_no_duplicate_job_identifier(cloud_jobs)
tracked_jobs = {job.identifier: job for job in cloud_jobs if job.identifier is not None}
# Filter out jobs based on exclude_identifiers_matching regex if provided
if exclude_identifiers_matching:
try:
exclude_pattern = re.compile(exclude_identifiers_matching)
filtered_tracked_jobs = {}
excluded_count = 0
for identifier, job in tracked_jobs.items():
if exclude_pattern.search(identifier):
excluded_count += 1
if not output_json:
logger.debug(
f"Excluding job with identifier '{identifier}' (matches pattern)"
)
else:
filtered_tracked_jobs[identifier] = job
tracked_jobs = filtered_tracked_jobs
if not output_json and excluded_count > 0:
logger.info(
f"Excluded {excluded_count} jobs matching pattern '{exclude_identifiers_matching}'"
)
except re.error as e:
logger.error(f"Invalid regex pattern '{exclude_identifiers_matching}': {e}")
return ChangeSet()
dbt_cloud_change_set = ChangeSet()
# Use sets to find jobs for different operations
shared_jobs = set(defined_jobs.keys()).intersection(set(tracked_jobs.keys()))
created_jobs = set(defined_jobs.keys()) - set(tracked_jobs.keys())
deleted_jobs = set(tracked_jobs.keys()) - set(defined_jobs.keys())
# Update changed jobs
if not output_json:
logger.info("Detected {count} existing jobs.", count=len(shared_jobs))
for identifier in shared_jobs:
if not output_json:
logger.info("Checking for differences in {identifier}", identifier=identifier)
is_same, diff_data = check_job_mapping_same(
source_job=defined_jobs[identifier], dest_job=tracked_jobs[identifier]
)
if not is_same:
dbt_cloud_change = Change(
identifier=identifier,
type="job",
action="update",
proj_id=defined_jobs[identifier].project_id,
env_id=defined_jobs[identifier].environment_id,
sync_function=dbt_cloud.update_job,
parameters={"job": defined_jobs[identifier]},
differences=diff_data.get("differences", {}) if diff_data else {},
)
dbt_cloud_change_set.append(dbt_cloud_change)
defined_jobs[identifier].id = tracked_jobs[identifier].id
if not output_json:
console = Console()
console.print(
f"❌ Job {identifier} is different - Diff:\n{json.dumps(diff_data, indent=2, default=json_serializer_type)}"
)
elif not output_json:
logger.success(f"✅ Job {identifier} is identical")
# Create new jobs
if not output_json:
logger.info("Detected {count} new jobs.", count=len(created_jobs))
for identifier in created_jobs:
dbt_cloud_change = Change(
identifier=identifier,
type="job",
action="create",
proj_id=defined_jobs[identifier].project_id,
env_id=defined_jobs[identifier].environment_id,
sync_function=dbt_cloud.create_job,
parameters={"job": defined_jobs[identifier]},
)
dbt_cloud_change_set.append(dbt_cloud_change)
# Remove Deleted Jobs
if not output_json:
logger.info("Detected {count} deleted jobs.", count=len(deleted_jobs))
for identifier in deleted_jobs:
dbt_cloud_change = Change(
identifier=identifier,
type="job",
action="delete",
proj_id=tracked_jobs[identifier].project_id,
env_id=tracked_jobs[identifier].environment_id,
sync_function=dbt_cloud.delete_job,
parameters={"job": tracked_jobs[identifier]},
)
dbt_cloud_change_set.append(dbt_cloud_change)
# -- ENV VARS --
# Now that we have replicated all jobs we can get their IDs for further API calls
mapping_job_identifier_job_id = dbt_cloud.build_mapping_job_identifier_job_id(cloud_jobs)
if not output_json:
logger.debug(f"Mapping of job identifier to id: {mapping_job_identifier_job_id}")
# Replicate the env vars from the YML to dbt Cloud
for job in defined_jobs.values():
if job.identifier in mapping_job_identifier_job_id: # the job already exists
job_id = mapping_job_identifier_job_id[job.identifier]
all_env_vars_for_job = dbt_cloud.get_env_vars(project_id=job.project_id, job_id=job_id)
for env_var_yml in job.custom_environment_variables:
env_var_yml.job_definition_id = job_id
same_env_var, env_var_id, diff_data = check_env_var_same(
source_env_var=env_var_yml, dest_env_vars=all_env_vars_for_job
)
if not same_env_var and diff_data:
action = (
"CREATE"
if diff_data.get("old_value") is None
else "DELETE"
if diff_data.get("new_value") is None
else "UPDATE"
)
dbt_cloud_change = Change(
identifier=f"{job.identifier}:{env_var_yml.name}",
type="env var overwrite",
action=action,
proj_id=job.project_id,
env_id=job.environment_id,
sync_function=dbt_cloud.update_env_var,
parameters={
"project_id": job.project_id,
"job_id": job_id,
"custom_env_var": env_var_yml,
"env_var_id": env_var_id,
},
differences=diff_data,
)
dbt_cloud_change_set.append(dbt_cloud_change)
else: # the job doesn't exist yet so it doesn't have an ID
for env_var_yml in job.custom_environment_variables:
dbt_cloud_change = Change(
identifier=f"{job.identifier}:{env_var_yml.name}",
type="env var overwrite",
action="create",
proj_id=job.project_id,
env_id=job.environment_id,
sync_function=dbt_cloud.update_env_var,
parameters={
"project_id": job.project_id,
"job_id": None,
"custom_env_var": env_var_yml,
"env_var_id": None,
"yml_job_identifier": job.identifier,
},
)
dbt_cloud_change_set.append(dbt_cloud_change)
# Delete the env vars from dbt Cloud that are not in the yml
for job in defined_jobs.values():
# we only delete env var overwrite if the job already exists
if job.identifier in mapping_job_identifier_job_id:
job_id = mapping_job_identifier_job_id[job.identifier]
# We get the env vars from dbt Cloud, now that the YML ones have been replicated
env_var_dbt_cloud = dbt_cloud.get_env_vars(project_id=job.project_id, job_id=job_id)
# And we get the list of env vars defined for a given job in the YML
env_vars_for_job = [env_var.name for env_var in job.custom_environment_variables]
for env_var, env_var_val in env_var_dbt_cloud.items():
# If the env var is not in the YML but is defined at the "job" level in dbt Cloud, we delete it
if env_var not in env_vars_for_job and env_var_val.id:
if not output_json:
logger.info(f"{env_var} not in the YML file but in the dbt Cloud job")
dbt_cloud_change = Change(
identifier=f"{job.identifier}:{env_var}",
type="env var overwrite",
action="delete",
proj_id=job.project_id,
env_id=job.environment_id,
sync_function=dbt_cloud.delete_env_var,
parameters={
"project_id": job.project_id,
"env_var_id": env_var_val.id,
},
)
dbt_cloud_change_set.append(dbt_cloud_change)
# Filtering out the change set, if project_id(s), environment_id(s) are passed as arguments to function
# TODO: Confirm if this is the desired functionality, remove otherwise
return dbt_cloud_change_set