Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions parsons/databases/database_connector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Optional
from typing import Optional, Union
from parsons.etl.table import Table


Expand Down Expand Up @@ -151,7 +151,7 @@ def copy(self, tbl: Table, table_name: str, if_exists: str):
pass

@abstractmethod
def query(self, sql: str, parameters: Optional[list] = None) -> Optional[Table]:
def query(self, sql: str, parameters: Optional[Union[list, dict]] = None) -> Optional[Table]:
"""Execute a query against the database. Will return ``None`` if the query returns empty.

To include python variables in your query, it is recommended to pass them as parameters,
Expand Down
8 changes: 7 additions & 1 deletion parsons/databases/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,13 @@ def table_sync_incremental(
"Destination tables %s does not exist, running a full sync",
destination_table,
)
self.table_sync_full(source_table, destination_table, order_by=primary_key, **kwargs)
self.table_sync_full(
source_table,
destination_table,
order_by=primary_key,
verify_row_count=verify_row_count,
**kwargs,
)
return

# Check that the source table primary key is distinct
Expand Down
22 changes: 19 additions & 3 deletions parsons/databases/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,22 @@ class Postgres(PostgresCore, Alchemy, DatabaseConnector):
Seconds to timeout if connection not established.
"""

def __init__(self, username=None, password=None, host=None, db=None, port=5432, timeout=10):
def __init__(
self,
username: Optional[str] = None,
password: Optional[str] = None,
host: Optional[str] = None,
db: Optional[str] = None,
port: Optional[int] = None,
timeout: int = 10,
) -> None:
super().__init__()

self.username = username or os.environ.get("PGUSER")
self.password = password or os.environ.get("PGPASSWORD")
self.host = host or os.environ.get("PGHOST")
self.db = db or os.environ.get("PGDATABASE")
self.port = port or os.environ.get("PGPORT")
self.port = port or os.environ.get("PGPORT") or 5432

# Check if there is a pgpass file. Psycopg2 will search for this file first when
# creating a connection.
Expand Down Expand Up @@ -88,7 +96,15 @@ def copy(
self.query_with_connection(sql, connection, commit=False)
logger.info(f"{table_name} created.")

sql = f"""COPY "{table_name}" ("{'","'.join(tbl.columns)}") FROM STDIN CSV HEADER;"""
# appropriately quote table name, in case it has a schema
if "." in table_name:
quoted_table_name = '"' + '"."'.join(table_name.split(".")) + '"'
else:
quoted_table_name = '"' + table_name + '"'

sql = (
f"""COPY {quoted_table_name} ("{'","'.join(tbl.columns)}") FROM STDIN CSV HEADER;"""
)

with self.cursor(connection) as cursor:
cursor.copy_expert(sql, open(tbl.to_csv(), "r"))
Expand Down
3 changes: 2 additions & 1 deletion parsons/databases/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ def get_rows(self, offset=0, chunk_size=None, order_by=None):
if chunk_size:
sql += f" LIMIT {chunk_size}"

sql += f" OFFSET {offset}"
if offset:
sql += f" OFFSET {offset}"

return self.db.query(sql)

Expand Down
3 changes: 2 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ pytest-mock==3.14.0
pytest==8.3.3
requests-mock==1.12.1
ruff==0.6.9
testfixtures==8.3.0
testfixtures==8.3.0
testcontainers==4.9.0
19 changes: 19 additions & 0 deletions test/test_databases/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import pytest
import os
from testcontainers.postgres import PostgresContainer


@pytest.fixture(scope="class")
def postgres_container():
"""Initialize a Postgres container

Sets environment variables so that parsons.Postgres(port=None)
connects to this container automatically.
"""
with PostgresContainer("postgres:9.5") as postgres:
os.environ["PGUSER"] = "test"
os.environ["PGPASSWORD"] = "test"
os.environ["PGHOST"] = "localhost"
os.environ["PGDATABASE"] = "test"
os.environ["PGPORT"] = postgres.get_exposed_port(5432)
yield
20 changes: 18 additions & 2 deletions test/test_databases/fakes.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
from parsons.etl.table import Table
import logging
from parsons.databases.database_connector import DatabaseConnector
from typing import Union, Optional

logger = logging.getLogger(__name__)


class FakeDatabase:
class FakeDatabase(DatabaseConnector):
def __init__(self):
self.table_map = {}
self.copy_call_args = []

def query(self, sql: str, parameters: Optional[Union[list, dict]] = None) -> Table:
return Table()

def table_exists(self, table_name: str) -> bool:
return table_name in self.table_map

def setup_table(self, table_name, data, failures=0):
self.table_map[table_name] = {
"failures": failures,
Expand Down Expand Up @@ -91,12 +99,20 @@ def exists(self):
return self.data is not None

def get_rows(self, offset=0, chunk_size=None, order_by=None):
if not self.data:
return self.data

data = self.data.cut(*self.data.columns)

if order_by:
data.sort(order_by)

return Table(data[offset : chunk_size + offset])
if chunk_size:
subset = data[offset : chunk_size + offset]
else:
subset = data[offset:]

return Table(subset)

def get_new_rows_count(self, primary_key_col, start_value=None):
data = self.data.select_rows(lambda row: row[primary_key_col] > start_value)
Expand Down
198 changes: 99 additions & 99 deletions test/test_databases/test_data/sample_table_1.csv
Original file line number Diff line number Diff line change
@@ -1,101 +1,101 @@
pk,data
1,0.679486656
2,0.848844061
3,0.598265158
4,0.735555144
5,0.075005578
6,0.981531357
7,0.385244935
8,0.697549388
9,0.2448082
10,0.001719608
11,0.088980714
12,0.004997709
13,0.138848063
14,0.782436804
15,0.834330693
16,0.661956764
17,0.484198472
18,0.120525256
19,0.489294504
20,0.363148191
21,0.366096106
22,0.727252033
23,0.757665108
24,0.475220701
25,0.829202054
26,0.650770171
27,0.604401885
28,0.005841475
29,0.204282967
30,0.924787585
31,0.587613061
32,0.016397599
33,0.138880218
34,0.425946177
35,0.100646395
36,0.456084315
37,0.327708803
38,0.468574829
39,0.863829174
40,0.605790821
41,0.220630181
42,0.28198559
43,0.091765302
44,0.46970548
45,0.293797116
46,0.737265057
47,0.248236546
48,0.507833394
49,0.143234494
50,0.702242695
51,0.9176936
52,0.060715068
53,0.995546862
54,0.864823893
55,0.796310749
56,0.426268862
57,0.181820151
58,0.987232576
59,0.775017236
60,0.040703773
61,0.24926236
62,0.462959934
63,0.087065088
64,0.478892271
65,0.940646558
66,0.184419399
67,0.486187147
68,0.791061583
69,0.250508352
70,0.281071437
71,0.986594243
72,0.056997511
73,0.40651719
74,0.342951771
75,0.326230227
76,0.215776004
77,0.348375215
78,0.288171773
79,0.473194662
80,0.618250648
81,0.401636822
82,0.083181865
83,0.256641255
84,0.273200035
85,0.761393426
86,0.953584614
87,0.88312547
88,0.576319503
89,0.413691538
90,0.419546137
91,0.676809949
92,0.147364027
93,0.825263695
94,0.137755125
95,0.456635225
96,0.845755931
97,0.354521767
98,0.691127801
99,0.91811167
001,0.679486656
002,0.848844061
003,0.598265158
004,0.735555144
005,0.075005578
006,0.981531357
007,0.385244935
008,0.697549388
009,0.2448082
010,0.001719608
011,0.088980714
012,0.004997709
013,0.138848063
014,0.782436804
015,0.834330693
016,0.661956764
017,0.484198472
018,0.120525256
019,0.489294504
020,0.363148191
021,0.366096106
022,0.727252033
023,0.757665108
024,0.475220701
025,0.829202054
026,0.650770171
027,0.604401885
028,0.005841475
029,0.204282967
030,0.924787585
031,0.587613061
032,0.016397599
033,0.138880218
034,0.425946177
035,0.100646395
036,0.456084315
037,0.327708803
038,0.468574829
039,0.863829174
040,0.605790821
041,0.220630181
042,0.28198559
043,0.091765302
044,0.46970548
045,0.293797116
046,0.737265057
047,0.248236546
048,0.507833394
049,0.143234494
050,0.702242695
051,0.9176936
052,0.060715068
053,0.995546862
054,0.864823893
055,0.796310749
056,0.426268862
057,0.181820151
058,0.987232576
059,0.775017236
060,0.040703773
061,0.24926236
062,0.462959934
063,0.087065088
064,0.478892271
065,0.940646558
066,0.184419399
067,0.486187147
068,0.791061583
069,0.250508352
070,0.281071437
071,0.986594243
072,0.056997511
073,0.40651719
074,0.342951771
075,0.326230227
076,0.215776004
077,0.348375215
078,0.288171773
079,0.473194662
080,0.618250648
081,0.401636822
082,0.083181865
083,0.256641255
084,0.273200035
085,0.761393426
086,0.953584614
087,0.88312547
088,0.576319503
089,0.413691538
090,0.419546137
091,0.676809949
092,0.147364027
093,0.825263695
094,0.137755125
095,0.456635225
096,0.845755931
097,0.354521767
098,0.691127801
099,0.91811167
100,0.58538417
Loading
Loading