-
Notifications
You must be signed in to change notification settings - Fork 158
Expand file tree
/
Copy pathutil.py
More file actions
106 lines (89 loc) · 3.92 KB
/
util.py
File metadata and controls
106 lines (89 loc) · 3.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import re
import time
import uuid
from data_validation import clients, exceptions
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ibis.expr.types.relations import Table as IbisTable
from ibis.backends.base import BaseBackend
def timed_call(log_txt, fn, *args, **kwargs):
t0 = time.time()
result = fn(*args, **kwargs)
elapsed = time.time() - t0
logging.debug(f"{log_txt} elapsed: {round(elapsed, 2)}s")
return result
def split_not_in_quotes(
to_split: str, sep: str = " ", exclude_empty_tokens: bool = False
) -> list:
"""Split a string by a separator but only when the separator is not inside quotes.
re pattern taken from this comment:
https://stackoverflow.com/a/2787979/10979853
The commenter's words should the link ever go stale:
Each time it finds a semicolon, the lookahead scans the entire remaining string,
making sure there's an even number of single-quotes and an even number of double-quotes.
(Single-quotes inside double-quoted fields, or vice-versa, are ignored.) If the
lookahead succeeds, the semicolon is a delimiter.
The pattern doesn't cope with whitespace as sep, back to back spaces are multiple seps, therefore
we have exclude_empty_tokens parameter.
"""
pattern = r"""%(sep)s(?=(?:[^'"]|'[^']*'|"[^"]*")*$)""" % {"sep": sep}
if exclude_empty_tokens:
return [t for t in re.split(pattern, to_split) if t]
else:
return re.split(pattern, to_split)
def dvt_config_string_to_dict(config_string: str) -> dict:
"""Convert JSON in a string to a dict."""
if not config_string:
return None
if isinstance(config_string, dict):
return config_string
try:
param_dict = json.loads(config_string.replace("'", '"'))
return param_dict
except json.JSONDecodeError as exc:
raise exceptions.ValidationException(
f"Invalid JSON format in connection parameter dictionary string: {config_string}"
) from exc
def ibis_table_to_sql(ibis_table: "IbisTable", alchemy_client: "BaseBackend") -> str:
"""Function to generate the SQL string for the table based on the backend.
We need the client in order to find the dialect, otherwise we end up with generic literals.
"""
# If the backend uses sqlalchemy, we will need to request sqla to bind variables
# for a non sqlalchemy backend, the parameters are already bound
if alchemy_client and clients.is_sqlalchemy_backend(alchemy_client):
dialect = alchemy_client.con.dialect
sql_string = str(
ibis_table.compile().compile(
dialect=dialect, compile_kwargs={"literal_binds": True}
)
)
else:
sql_string = str(ibis_table.compile())
return sql_string
def dvt_temp_object_name(prefix: str = "dvt_temp") -> str:
"""Generate a random name for when DVT needs to create a temporary object.
Args:
prefix: The prefix to use for the temporary object name.
Returns:
A random name for when DVT needs to create a temporary object.
"""
if not isinstance(prefix, str) or not re.match(r"^[a-zA-Z0-9_]+$", prefix):
raise exceptions.ValidationException(
f"Invalid prefix: '{prefix}'. Only alphanumeric and underscore characters are allowed."
)
return f"{prefix}_{uuid.uuid4().hex[:8].lower()}"