Skip to content

Commit 892c9d2

Browse files
committed
Reset later
1 parent 3dd13bc commit 892c9d2

File tree

7 files changed

+438
-24
lines changed

7 files changed

+438
-24
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,14 @@ The following types of tables aren't currently supported:
125125
- Tables with deferrable unique constraints.
126126
- Referring foreign keys on a different schema than the original table.
127127

128+
### Limitations when partitioning:
129+
130+
- The CHANGE_LOG strategy must be used when doing partitioning.
131+
- Unique constraints are skipped, as they are not supported on partitioned
132+
tables. The user should add them manually after the process is complete or
133+
after the setup step.
134+
- No support for referring fks yet.
135+
128136
## Required user permissions (or privileges)
129137

130138
Unless the user is a superuser, they may lack certain privileges to run

src/psycopack/_commands.py

Lines changed: 235 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from textwrap import dedent
55
from typing import Iterator
66

7-
from . import _cur, _introspect
7+
from . import _cur, _introspect, _partition
88
from . import _psycopg as psycopg
99

1010

@@ -16,11 +16,13 @@ def __init__(
1616
cur: _cur.Cursor,
1717
introspector: _introspect.Introspector,
1818
schema: str,
19+
partition_config: _partition.PartitionConfig | None = None,
1920
) -> None:
2021
self.conn = conn
2122
self.cur = cur
2223
self.introspector = introspector
2324
self.schema = schema
25+
self.partition_config = partition_config
2426

2527
def drop_constraint(self, *, table: str, constraint: str) -> None:
2628
self.cur.execute(
@@ -49,6 +51,17 @@ def drop_table_if_exists(self, *, table: str) -> None:
4951
)
5052

5153
def create_copy_table(self, *, base_table: str, copy_table: str) -> None:
54+
if self.partition_config:
55+
return self._create_partitioned_table(
56+
base_table=base_table, copy_table=copy_table
57+
)
58+
59+
# Create a non-partitioned table (default behavior)
60+
self._create_non_partitioned_table(base_table=base_table, copy_table=copy_table)
61+
62+
def _create_non_partitioned_table(
63+
self, *, base_table: str, copy_table: str
64+
) -> None:
5265
self.cur.execute(
5366
psycopg.sql.SQL(
5467
dedent("""
@@ -64,6 +77,185 @@ def create_copy_table(self, *, base_table: str, copy_table: str) -> None:
6477
.as_string(self.conn)
6578
)
6679

80+
def _create_partitioned_table(self, *, base_table: str, copy_table: str) -> None:
81+
assert self.partition_config is not None
82+
assert isinstance(self.partition_config.strategy, _partition.DateRangeStrategy)
83+
84+
# Create the parent partitioned table
85+
self.cur.execute(
86+
psycopg.sql.SQL(
87+
dedent("""
88+
CREATE TABLE {schema}.{copy_table}
89+
(LIKE {schema}.{table} INCLUDING DEFAULTS)
90+
PARTITION BY RANGE ({partition_column});
91+
""")
92+
)
93+
.format(
94+
table=psycopg.sql.Identifier(base_table),
95+
copy_table=psycopg.sql.Identifier(copy_table),
96+
schema=psycopg.sql.Identifier(self.schema),
97+
partition_column=psycopg.sql.Identifier(self.partition_config.column),
98+
)
99+
.as_string(self.conn)
100+
)
101+
102+
# Create partitions ahead of time
103+
self._create_partitions(base_table=base_table, copy_table=copy_table)
104+
105+
def _create_partitions(self, *, base_table: str, copy_table: str) -> None:
106+
assert self.partition_config is not None
107+
strategy = self.partition_config.strategy
108+
assert isinstance(strategy, _partition.DateRangeStrategy)
109+
110+
num_of_extra_partitions = self.partition_config.num_of_extra_partitions_ahead
111+
112+
min_value = self.introspector.get_min_partition_date_value(
113+
table=base_table, column=self.partition_config.column
114+
)
115+
max_value = self.introspector.get_max_partition_date_value(
116+
table=base_table, column=self.partition_config.column
117+
)
118+
partition_start = self._get_first_partition_start_date(
119+
min_value=min_value, strategy=strategy
120+
)
121+
partition_end = self._get_last_partition_end_date(
122+
max_value=max_value,
123+
strategy=strategy,
124+
num_of_extra_partitions=num_of_extra_partitions,
125+
)
126+
127+
# Create partitions from partition_start to partition_end
128+
current_partition_start = partition_start
129+
130+
while current_partition_start < partition_end:
131+
partition_suffix = self._get_partition_suffix(
132+
current_partition_start=current_partition_start, strategy=strategy
133+
)
134+
135+
current_partition_end = self._get_partition_end_boundary(
136+
current_partition_start=current_partition_start, strategy=strategy
137+
)
138+
self._create_datetime_partition(
139+
base_table=base_table,
140+
copy_table=copy_table,
141+
partition_suffix=partition_suffix,
142+
start=current_partition_start,
143+
end=current_partition_end,
144+
)
145+
current_partition_start = current_partition_end
146+
147+
def _get_first_partition_start_date(
148+
self, *, min_value: datetime.date, strategy: _partition.DateRangeStrategy
149+
) -> datetime.date:
150+
"""
151+
Align the minimum value to partition boundaries.
152+
For DAY: uses the exact min_value
153+
For MONTH: aligns to the first day of the month
154+
"""
155+
if strategy.partition_by == _partition.PartitionInterval.DAY:
156+
return min_value
157+
elif strategy.partition_by == _partition.PartitionInterval.MONTH:
158+
# Align to start of month
159+
return min_value.replace(day=1)
160+
else:
161+
raise ValueError(f"Unsupported partition_by: {strategy.partition_by}")
162+
163+
def _get_last_partition_end_date(
164+
self,
165+
*,
166+
max_value: datetime.date,
167+
strategy: _partition.DateRangeStrategy,
168+
num_of_extra_partitions: int,
169+
) -> datetime.date:
170+
"""
171+
Calculate the end date for partitioning: max_value + num_of_extra_partitions.
172+
For DAY: adds the specified number of days
173+
For MONTH: adds the specified number of months
174+
"""
175+
if strategy.partition_by == _partition.PartitionInterval.DAY:
176+
return max_value + datetime.timedelta(days=num_of_extra_partitions)
177+
elif strategy.partition_by == _partition.PartitionInterval.MONTH:
178+
# Add months by advancing to first of month and adding 32*months,
179+
# then normalising. This is because timedelta doesn't deal accept
180+
# "months" as argument.
181+
temp_date = max_value.replace(day=1)
182+
for _ in range(num_of_extra_partitions):
183+
temp_date = (temp_date + datetime.timedelta(days=32)).replace(day=1)
184+
return temp_date
185+
else:
186+
raise ValueError(f"Unsupported partition_by: {strategy.partition_by}")
187+
188+
def _get_partition_end_boundary(
189+
self,
190+
*,
191+
current_partition_start: datetime.date,
192+
strategy: _partition.DateRangeStrategy,
193+
) -> datetime.date:
194+
"""
195+
Calculate the end boundary for a single partition.
196+
For DAY: adds 1 day
197+
For MONTH: advances to the first day of the next month
198+
"""
199+
if strategy.partition_by == _partition.PartitionInterval.DAY:
200+
return current_partition_start + datetime.timedelta(days=1)
201+
elif strategy.partition_by == _partition.PartitionInterval.MONTH:
202+
# Next month boundary
203+
return (current_partition_start + datetime.timedelta(days=32)).replace(
204+
day=1
205+
)
206+
else:
207+
raise ValueError(f"Unsupported partition_by: {strategy.partition_by}")
208+
209+
def _get_partition_suffix(
210+
self,
211+
*,
212+
current_partition_start: datetime.date,
213+
strategy: _partition.DateRangeStrategy,
214+
) -> str:
215+
"""
216+
Generate a date-based partition suffix.
217+
For DAY: returns p20250101 (YYYYMMDD format)
218+
For MONTH: returns p202501 (YYYYMM format)
219+
"""
220+
if strategy.partition_by == _partition.PartitionInterval.DAY:
221+
# Format: p20250101 (YYYYMMDD)
222+
return f"p{current_partition_start.strftime('%Y%m%d')}"
223+
elif strategy.partition_by == _partition.PartitionInterval.MONTH:
224+
# Format: p202501 (YYYYMM)
225+
return f"p{current_partition_start.strftime('%Y%m')}"
226+
else:
227+
raise ValueError(f"Unsupported partition_by: {strategy.partition_by}")
228+
229+
def _create_datetime_partition(
230+
self,
231+
*,
232+
base_table: str,
233+
copy_table: str,
234+
partition_suffix: str,
235+
start: datetime.date,
236+
end: datetime.date,
237+
) -> None:
238+
"""Create a single datetime range partition."""
239+
self.cur.execute(
240+
psycopg.sql.SQL(
241+
dedent("""
242+
CREATE TABLE {schema}.{partition_name}
243+
PARTITION OF {schema}.{copy_table}
244+
FOR VALUES FROM ({start}) TO ({end});
245+
""")
246+
)
247+
.format(
248+
schema=psycopg.sql.Identifier(self.schema),
249+
partition_name=psycopg.sql.Identifier(
250+
f"{base_table}_{partition_suffix}"
251+
),
252+
copy_table=psycopg.sql.Identifier(copy_table),
253+
start=psycopg.sql.Literal(start),
254+
end=psycopg.sql.Literal(end),
255+
)
256+
.as_string(self.conn)
257+
)
258+
67259
def drop_sequence_if_exists(self, *, seq: str) -> None:
68260
self.cur.execute(
69261
psycopg.sql.SQL("DROP SEQUENCE IF EXISTS {schema}.{seq};")
@@ -109,17 +301,37 @@ def set_table_id_seq(self, *, table: str, seq: str, pk_column: str) -> None:
109301
)
110302

111303
def add_pk(self, *, table: str, pk_column: str) -> None:
112-
self.cur.execute(
113-
psycopg.sql.SQL(
114-
"ALTER TABLE {schema}.{table} ADD PRIMARY KEY ({pk_column});"
304+
# For partitioned tables, the PK must include all partitioning columns
305+
if self.partition_config:
306+
pk_columns = psycopg.sql.SQL(", ").join(
307+
[
308+
psycopg.sql.Identifier(pk_column),
309+
psycopg.sql.Identifier(self.partition_config.column),
310+
]
115311
)
116-
.format(
117-
table=psycopg.sql.Identifier(table),
118-
pk_column=psycopg.sql.Identifier(pk_column),
119-
schema=psycopg.sql.Identifier(self.schema),
312+
self.cur.execute(
313+
psycopg.sql.SQL(
314+
"ALTER TABLE {schema}.{table} ADD PRIMARY KEY ({pk_columns});"
315+
)
316+
.format(
317+
table=psycopg.sql.Identifier(table),
318+
pk_columns=pk_columns,
319+
schema=psycopg.sql.Identifier(self.schema),
320+
)
321+
.as_string(self.conn)
322+
)
323+
else:
324+
self.cur.execute(
325+
psycopg.sql.SQL(
326+
"ALTER TABLE {schema}.{table} ADD PRIMARY KEY ({pk_column});"
327+
)
328+
.format(
329+
table=psycopg.sql.Identifier(table),
330+
pk_column=psycopg.sql.Identifier(pk_column),
331+
schema=psycopg.sql.Identifier(self.schema),
332+
)
333+
.as_string(self.conn)
120334
)
121-
.as_string(self.conn)
122-
)
123335

124336
def create_copy_function(
125337
self,
@@ -511,12 +723,24 @@ def create_unique_constraint_using_idx(
511723
def create_not_valid_constraint_from_def(
512724
self, *, table: str, constraint: str, definition: str, is_validated: bool
513725
) -> None:
726+
# For partitioned tables, we can't use NOT VALID on foreign keys
727+
# So we need to remove it from the definition
728+
is_fk = "FOREIGN KEY" in definition.upper()
729+
if self.partition_config and is_fk and not is_validated:
730+
# Remove NOT VALID from the definition for partitioned tables
731+
definition = definition.replace(" NOT VALID", "").replace("NOT VALID", "")
732+
514733
add_constraint_sql = dedent("""
515734
ALTER TABLE {schema}.{table}
516735
ADD CONSTRAINT {constraint}
517736
{definition}
518737
""")
519-
if is_validated:
738+
# Only add NOT VALID if:
739+
# 1. The constraint is validated (so we make it NOT VALID temporarily)
740+
# 2. AND it's not a FK on a partitioned table (which doesn't support NOT VALID)
741+
should_add_not_valid = is_validated and not (self.partition_config and is_fk)
742+
743+
if should_add_not_valid:
520744
# If the definition is for a valid constraint, alter it to be not
521745
# valid manually so that it can be created ONLINE.
522746
add_constraint_sql += " NOT VALID"

src/psycopack/_introspect.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import dataclasses
2+
import datetime
23
from textwrap import dedent
34

45
from . import _const, _cur
@@ -539,7 +540,9 @@ def get_primary_key_info(self, *, table: str) -> PrimaryKey | None:
539540
def get_pk_sequence_name(self, *, table: str) -> str:
540541
pk_info = self.get_primary_key_info(table=table)
541542
assert pk_info is not None
542-
assert len(pk_info.columns) == 1
543+
# Return empty string for composite PKs (e.g., partitioned tables)
544+
if len(pk_info.columns) != 1:
545+
return ""
543546

544547
if pk_info.identity_type:
545548
self.cur.execute(
@@ -761,3 +764,55 @@ def is_table_owner(self, *, table: str, schema: str) -> bool:
761764
result = self.cur.fetchone()
762765
assert result is not None
763766
return bool(result[0])
767+
768+
def get_current_date(self) -> datetime.date:
769+
self.cur.execute("SELECT CURRENT_DATE;")
770+
result = self.cur.fetchone()
771+
assert result is not None
772+
current_date = result[0]
773+
assert isinstance(current_date, datetime.date)
774+
return current_date
775+
776+
def get_min_partition_date_value(self, *, table: str, column: str) -> datetime.date:
777+
"""
778+
Get the minimum value of the partition column from the table.
779+
If the table is empty or the column is NULL, returns the current date.
780+
"""
781+
self.cur.execute(
782+
psycopg.sql.SQL(
783+
"SELECT COALESCE(MIN({column})::DATE, CURRENT_DATE) FROM {schema}.{table};"
784+
)
785+
.format(
786+
column=psycopg.sql.Identifier(column),
787+
schema=psycopg.sql.Identifier(self.schema),
788+
table=psycopg.sql.Identifier(table),
789+
)
790+
.as_string(self.conn)
791+
)
792+
result = self.cur.fetchone()
793+
assert result is not None
794+
min_value = result[0]
795+
assert isinstance(min_value, datetime.date)
796+
return min_value
797+
798+
def get_max_partition_date_value(self, *, table: str, column: str) -> datetime.date:
799+
"""
800+
Get the maximum value of the partition column from the table.
801+
If the table is empty or the column is NULL, returns the current date.
802+
"""
803+
self.cur.execute(
804+
psycopg.sql.SQL(
805+
"SELECT COALESCE(MAX({column})::DATE, CURRENT_DATE) FROM {schema}.{table};"
806+
)
807+
.format(
808+
column=psycopg.sql.Identifier(column),
809+
schema=psycopg.sql.Identifier(self.schema),
810+
table=psycopg.sql.Identifier(table),
811+
)
812+
.as_string(self.conn)
813+
)
814+
result = self.cur.fetchone()
815+
assert result is not None
816+
max_value = result[0]
817+
assert isinstance(max_value, datetime.date)
818+
return max_value

0 commit comments

Comments
 (0)