forked from dbt-labs/dbt-core
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_system_client.py
More file actions
187 lines (149 loc) · 6.57 KB
/
test_system_client.py
File metadata and controls
187 lines (149 loc) · 6.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import os
import shutil
import stat
import unittest
from tempfile import mkdtemp, NamedTemporaryFile
from dbt.exceptions import ExecutableError, WorkingDirectoryError
import dbt.clients.system
class SystemClient(unittest.TestCase):
def setUp(self):
super().setUp()
self.tmp_dir = mkdtemp()
self.profiles_path = '{}/profiles.yml'.format(self.tmp_dir)
def set_up_profile(self):
with open(self.profiles_path, 'w') as f:
f.write('ORIGINAL_TEXT')
def get_profile_text(self):
with open(self.profiles_path, 'r') as f:
return f.read()
def tearDown(self):
try:
shutil.rmtree(self.tmp_dir)
except:
pass
def test__make_file_when_exists(self):
self.set_up_profile()
written = dbt.clients.system.make_file(self.profiles_path, contents='NEW_TEXT')
self.assertFalse(written)
self.assertEqual(self.get_profile_text(), 'ORIGINAL_TEXT')
def test__make_file_when_not_exists(self):
written = dbt.clients.system.make_file(self.profiles_path, contents='NEW_TEXT')
self.assertTrue(written)
self.assertEqual(self.get_profile_text(), 'NEW_TEXT')
def test__make_file_with_overwrite(self):
self.set_up_profile()
written = dbt.clients.system.make_file(self.profiles_path, contents='NEW_TEXT', overwrite=True)
self.assertTrue(written)
self.assertEqual(self.get_profile_text(), 'NEW_TEXT')
class TestRunCmd(unittest.TestCase):
"""Test `run_cmd`.
Don't mock out subprocess, in order to expose any OS-level differences.
"""
not_a_file = 'zzzbbfasdfasdfsdaq'
def setUp(self):
self.tempdir = mkdtemp()
self.run_dir = os.path.join(self.tempdir, 'run_dir')
self.does_not_exist = os.path.join(self.tempdir, 'does_not_exist')
self.empty_file = os.path.join(self.tempdir, 'empty_file')
if os.name == 'nt':
self.exists_cmd = ['cmd', '/C', 'echo', 'hello']
else:
self.exists_cmd = ['echo', 'hello']
os.mkdir(self.run_dir)
with open(self.empty_file, 'w') as fp:
pass # "touch"
def tearDown(self):
shutil.rmtree(self.tempdir)
def test__executable_does_not_exist(self):
with self.assertRaises(ExecutableError) as exc:
dbt.clients.system.run_cmd(self.run_dir, [self.does_not_exist])
msg = str(exc.exception).lower()
self.assertIn('path', msg)
self.assertIn('could not find', msg)
self.assertIn(self.does_not_exist.lower(), msg)
def test__not_exe(self):
with self.assertRaises(ExecutableError) as exc:
dbt.clients.system.run_cmd(self.run_dir, [self.empty_file])
msg = str(exc.exception).lower()
if os.name == 'nt':
# on windows, this means it's not an executable at all!
self.assertIn('not executable', msg)
else:
# on linux, this means you don't have executable permissions on it
self.assertIn('permissions', msg)
self.assertIn(self.empty_file.lower(), msg)
def test__cwd_does_not_exist(self):
with self.assertRaises(WorkingDirectoryError) as exc:
dbt.clients.system.run_cmd(self.does_not_exist, self.exists_cmd)
msg = str(exc.exception).lower()
self.assertIn('does not exist', msg)
self.assertIn(self.does_not_exist.lower(), msg)
def test__cwd_not_directory(self):
with self.assertRaises(WorkingDirectoryError) as exc:
dbt.clients.system.run_cmd(self.empty_file, self.exists_cmd)
msg = str(exc.exception).lower()
self.assertIn('not a directory', msg)
self.assertIn(self.empty_file.lower(), msg)
def test__cwd_no_permissions(self):
# it would be nice to add a windows test. Possible path to that is via
# `psexec` (to get SYSTEM privs), use `icacls` to set permissions on
# the directory for the test user. I'm pretty sure windows users can't
# create files that they themselves cannot access.
if os.name == 'nt':
return
# read-only -> cannot cd to it
os.chmod(self.run_dir, stat.S_IRUSR)
with self.assertRaises(WorkingDirectoryError) as exc:
dbt.clients.system.run_cmd(self.run_dir, self.exists_cmd)
msg = str(exc.exception).lower()
self.assertIn('permissions', msg)
self.assertIn(self.run_dir.lower(), msg)
def test__ok(self):
out, err = dbt.clients.system.run_cmd(self.run_dir, self.exists_cmd)
self.assertEqual(out.strip(), b'hello')
self.assertEqual(err.strip(), b'')
class TestFindMatching(unittest.TestCase):
def setUp(self):
self.base_dir = mkdtemp()
self.tempdir = mkdtemp(dir=self.base_dir)
def test_find_matching_lowercase_file_pattern(self):
with NamedTemporaryFile(
prefix='sql-files', suffix='.sql', dir=self.tempdir
) as named_file:
file_path = os.path.dirname(named_file.name)
relative_path = os.path.basename(file_path)
out = dbt.clients.system.find_matching(
self.base_dir, [relative_path], '*.sql'
)
expected_output = [{
'searched_path': relative_path,
'absolute_path': named_file.name,
'relative_path': os.path.basename(named_file.name),
'modification_time': out[0]['modification_time'],
}]
self.assertEqual(out, expected_output)
def test_find_matching_uppercase_file_pattern(self):
with NamedTemporaryFile(prefix='sql-files', suffix='.SQL', dir=self.tempdir) as named_file:
file_path = os.path.dirname(named_file.name)
relative_path = os.path.basename(file_path)
out = dbt.clients.system.find_matching(
self.base_dir, [relative_path], '*.sql'
)
expected_output = [{
'searched_path': relative_path,
'absolute_path': named_file.name,
'relative_path': os.path.basename(named_file.name),
'modification_time': out[0]['modification_time'],
}]
self.assertEqual(out, expected_output)
def test_find_matching_file_pattern_not_found(self):
with NamedTemporaryFile(
prefix='sql-files', suffix='.SQLT', dir=self.tempdir
):
out = dbt.clients.system.find_matching(self.tempdir, [''], '*.sql')
self.assertEqual(out, [])
def tearDown(self):
try:
shutil.rmtree(self.base_dir)
except:
pass