forked from move-coop/parsons
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_dbsync.py
More file actions
214 lines (165 loc) · 7.38 KB
/
test_dbsync.py
File metadata and controls
214 lines (165 loc) · 7.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import tempfile
import unittest
from abc import ABC
from pathlib import Path
from typing import Optional
import pytest
from parsons import DBSync, Postgres, Redshift, Table
from parsons.databases.database_connector import DatabaseConnector
from parsons.databases.sqlite import Sqlite
from test.test_databases.fakes import FakeDatabase
from test.utils import assert_matching_tables, mark_live_test
_dir = Path(__file__).parent
TEMP_SCHEMA = "parsons_test"
class TestDBSync(ABC, unittest.TestCase):
setup_sql: Optional[str] = None
teardown_sql: Optional[str] = None
temp_schema: Optional[str] = TEMP_SCHEMA
db: type[DatabaseConnector]
@classmethod
def setUpClass(cls):
# Skip tests on this abstract base class
if cls is TestDBSync:
raise unittest.SkipTest(f"{cls.__name__} is an abstract base class")
else:
super().setUpClass()
def setUp(self):
self.initialize_db_connections()
if self.setup_sql:
self.source_db.query(self.setup_sql)
self.destination_db.query(self.setup_sql)
# Load dummy data to parsons tables
self.table1 = Table.from_csv(str(_dir / "test_data/sample_table_1.csv"))
self.table2 = Table.from_csv(str(_dir / "test_data/sample_table_2.csv"))
self.source_table = (
f"{self.temp_schema}.source_table" if self.temp_schema else "source_table"
)
self.destination_table = (
f"{self.temp_schema}.destination_table" if self.temp_schema else "destination_table"
)
# Create source table
self.source_db.copy(self.table1, self.source_table, if_exists="truncate")
self.set_up_db_sync()
def set_up_db_sync(self, **kwargs) -> None:
# Create DB Sync object
self.db_sync = DBSync(self.source_db, self.destination_db, **kwargs)
def initialize_db_connections(self) -> None:
self.source_db = self.db()
self.destination_db = self.db()
def tearDown(self):
if self.teardown_sql:
self.source_db.query(self.teardown_sql)
self.destination_db.query(self.teardown_sql)
def assert_matching_tables(self) -> None:
source = self.source_db.query(f"SELECT * FROM {self.source_table}")
destination = self.destination_db.query(f"SELECT * FROM {self.destination_table}")
assert_matching_tables(source, destination)
def table_sync_full(self, if_exists: str, **kwargs):
self.db_sync.table_sync_full(
self.source_table, self.destination_table, if_exists=if_exists, **kwargs
)
def test_table_sync_full_drop(self):
self.table_sync_full(if_exists="drop")
self.assert_matching_tables()
def test_table_sync_full_order_by(self):
self.table_sync_full(if_exists="drop", order_by="data")
destination_table = self.destination_db.table(self.destination_table)
rows = destination_table.get_rows()
# Check that the rows were inserted in the expected order
assert rows[0]["pk"] == "010"
assert rows[1]["pk"] == "012"
assert rows[2]["pk"] == "028"
def test_table_sync_full_truncate(self):
self.table_sync_full(if_exists="truncate")
self.assert_matching_tables()
def test_table_sync_full_empty_table(self):
# Empty the source table
self.source_db.table(self.source_table).truncate()
# Attempt to sync.
self.table_sync_full(if_exists="drop", verify_row_count=False)
def test_table_sync_full_chunk(self):
# Test chunking in full sync.
self.db_sync.chunk_size = 10
self.db_sync.table_sync_full(self.source_table, self.destination_table, if_exists="drop")
self.assert_matching_tables()
def test_table_sync_incremental(self):
# Test that incremental sync
self.destination_db.copy(self.table1, self.destination_table)
self.source_db.copy(self.table2, self.source_table, if_exists="append")
self.db_sync.table_sync_incremental(self.source_table, self.destination_table, "pk")
self.assert_matching_tables()
def test_table_sync_incremental_chunk(self):
# Test chunking of incremental sync.
self.db_sync.chunk_size = 10
self.destination_db.copy(self.table1, self.destination_table)
self.source_db.copy(self.table2, self.source_table, if_exists="append")
self.db_sync.table_sync_incremental(self.source_table, self.destination_table, "pk")
self.assert_matching_tables()
def test_table_sync_incremental_create_destination_table(self):
# Test that an incremental sync works if the destination table does not exist.
self.db_sync.table_sync_incremental(self.source_table, self.destination_table, "pk")
self.assert_matching_tables()
def test_table_sync_incremental_empty_table(self):
# Test an incremental sync of a table when the source table is empty.
# Empty the source table
self.source_db.table(self.source_table).truncate()
# Attempt to sync.
self.db_sync.table_sync_incremental(
self.source_table, self.destination_table, "pk", verify_row_count=False
)
class TestFakeDBSync(TestDBSync):
db = FakeDatabase
def test_table_sync_full_with_retry(self):
# Have the copy fail twice
self.destination_db.setup_table("destination", Table(), failures=2)
self.set_up_db_sync(retries=2)
self.table_sync_full(if_exists="drop")
self.assert_matching_tables()
def test_table_sync_full_without_retry(self):
# Have the copy fail
self.destination_db.setup_table(self.destination_table, Table(), failures=1)
# Make sure the sync results in an exception
with pytest.raises(ValueError, match="Canned error"):
self.table_sync_full(if_exists="drop")
def test_table_sync_full_read_chunk(self):
self.table_sync_full(if_exists="drop")
self.assert_matching_tables()
# Make sure copy was called the expected number of times
# read chunks of 2, 5 rows to write.. should be 3 copy calls
assert len(self.destination_db.copy_call_args[0]) == 3, self.destination_db.copy_call_args[
0
]
def test_table_sync_full_write_chunk(self):
self.set_up_db_sync(
read_chunk_size=1,
write_chunk_size=3,
)
self.table_sync_full(if_exists="drop")
self.assert_matching_tables()
# Make sure copy was called the expected number of times
assert len(self.destination_db.copy_call_args[0]) == 3, self.destination_db.copy_call_args[
0
]
class TestSqliteDBSync(TestDBSync):
db = Sqlite
temp_schema = None
def initialize_db_connections(self) -> None:
self.source_db = self.db(tempfile.mkstemp()[1])
self.destination_db = self.db(tempfile.mkstemp()[1])
# These tests interact directly with the Postgres database. In order to run, set the
# env to LIVE_TEST='TRUE'.
@mark_live_test
class TestPostgresDBSync(TestDBSync):
db = Postgres
setup_sql = f"""
DROP SCHEMA IF EXISTS {TEMP_SCHEMA} CASCADE;
CREATE SCHEMA {TEMP_SCHEMA};
"""
teardown_sql = f"""
DROP SCHEMA IF EXISTS {TEMP_SCHEMA} CASCADE;
"""
# These tests interact directly with the Postgres database. In order to run, set the
# env to LIVE_TEST='TRUE'.
@mark_live_test
class TestRedshiftDBSync(TestPostgresDBSync):
db = Redshift