Skip to content

Commit 9e8d744

Browse files
authored
Merge pull request #56 from kraken-tech/partitioning_by_date
Partition by date
2 parents 3dd13bc + 9a51968 commit 9e8d744

File tree

8 files changed

+769
-27
lines changed

8 files changed

+769
-27
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,14 @@ The following types of tables aren't currently supported:
125125
- Tables with deferrable unique constraints.
126126
- Referring foreign keys on a different schema than the original table.
127127

128+
### Limitations when partitioning:
129+
130+
- The CHANGE_LOG strategy must be used when doing partitioning.
131+
- Unique constraints are skipped, as they are not supported on partitioned
132+
tables. The user should add them manually after the process is complete or
133+
after the setup step.
134+
- No support for referring fks yet.
135+
128136
## Required user permissions (or privileges)
129137

130138
Unless the user is a superuser, they may lack certain privileges to run

src/psycopack/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from ._conn import get_db_connection
66
from ._cur import get_cursor
77
from ._introspect import BackfillBatch
8+
from ._partition import DateRangeStrategy, PartitionConfig, PartitionInterval
89
from ._registry import RegistryException, UnexpectedSyncStrategy
910
from ._repack import (
1011
BasePsycopackError,
@@ -18,6 +19,7 @@
1819
NoReferencesPrivilege,
1920
NoReferringTableOwnership,
2021
NotTableOwner,
22+
PartitioningForTableWithReferringFKs,
2123
PostBackfillBatchCallback,
2224
PrimaryKeyNotFound,
2325
Psycopack,
@@ -35,6 +37,7 @@
3537
"BackfillBatch",
3638
"BasePsycopackError",
3739
"CompositePrimaryKey",
40+
"DateRangeStrategy",
3841
"DeferrableUniqueConstraint",
3942
"FailureDueToLockTimeout",
4043
"InheritedTable",
@@ -45,11 +48,14 @@
4548
"NoReferencesPrivilege",
4649
"NoReferringTableOwnership",
4750
"NotTableOwner",
51+
"PartitionConfig",
52+
"PartitionInterval",
53+
"PartitioningForTableWithReferringFKs",
4854
"PostBackfillBatchCallback",
4955
"PrimaryKeyNotFound",
56+
"Psycopack",
5057
"ReferringForeignKeyInDifferentSchema",
5158
"RegistryException",
52-
"Psycopack",
5359
"Stage",
5460
"SyncStrategy",
5561
"TableDoesNotExist",

src/psycopack/_commands.py

Lines changed: 239 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from textwrap import dedent
55
from typing import Iterator
66

7-
from . import _cur, _introspect
7+
from . import _cur, _introspect, _partition
88
from . import _psycopg as psycopg
99

1010

@@ -16,11 +16,13 @@ def __init__(
1616
cur: _cur.Cursor,
1717
introspector: _introspect.Introspector,
1818
schema: str,
19+
partition_config: _partition.PartitionConfig | None = None,
1920
) -> None:
2021
self.conn = conn
2122
self.cur = cur
2223
self.introspector = introspector
2324
self.schema = schema
25+
self.partition_config = partition_config
2426

2527
def drop_constraint(self, *, table: str, constraint: str) -> None:
2628
self.cur.execute(
@@ -49,6 +51,17 @@ def drop_table_if_exists(self, *, table: str) -> None:
4951
)
5052

5153
def create_copy_table(self, *, base_table: str, copy_table: str) -> None:
54+
if self.partition_config:
55+
return self._create_partitioned_table(
56+
base_table=base_table, copy_table=copy_table
57+
)
58+
59+
# Create a non-partitioned table (default behavior)
60+
self._create_non_partitioned_table(base_table=base_table, copy_table=copy_table)
61+
62+
def _create_non_partitioned_table(
63+
self, *, base_table: str, copy_table: str
64+
) -> None:
5265
self.cur.execute(
5366
psycopg.sql.SQL(
5467
dedent("""
@@ -64,6 +77,189 @@ def create_copy_table(self, *, base_table: str, copy_table: str) -> None:
6477
.as_string(self.conn)
6578
)
6679

80+
def _create_partitioned_table(self, *, base_table: str, copy_table: str) -> None:
81+
assert self.partition_config is not None
82+
assert isinstance(self.partition_config.strategy, _partition.DateRangeStrategy)
83+
84+
# Create the parent partitioned table
85+
self.cur.execute(
86+
psycopg.sql.SQL(
87+
dedent("""
88+
CREATE TABLE {schema}.{copy_table}
89+
(LIKE {schema}.{table} INCLUDING DEFAULTS)
90+
PARTITION BY RANGE ({partition_column});
91+
""")
92+
)
93+
.format(
94+
table=psycopg.sql.Identifier(base_table),
95+
copy_table=psycopg.sql.Identifier(copy_table),
96+
schema=psycopg.sql.Identifier(self.schema),
97+
partition_column=psycopg.sql.Identifier(self.partition_config.column),
98+
)
99+
.as_string(self.conn)
100+
)
101+
102+
# Create partitions ahead of time
103+
self._create_partitions(base_table=base_table, copy_table=copy_table)
104+
105+
def _create_partitions(self, *, base_table: str, copy_table: str) -> None:
106+
assert self.partition_config is not None
107+
strategy = self.partition_config.strategy
108+
assert isinstance(strategy, _partition.DateRangeStrategy)
109+
110+
num_of_extra_partitions = self.partition_config.num_of_extra_partitions_ahead
111+
112+
min_value = self.introspector.get_min_partition_date_value(
113+
table=base_table, column=self.partition_config.column
114+
)
115+
max_value = self.introspector.get_max_partition_date_value(
116+
table=base_table, column=self.partition_config.column
117+
)
118+
partition_start = self._get_first_partition_start_date(
119+
min_value=min_value, strategy=strategy
120+
)
121+
partition_end = self._get_last_partition_end_date(
122+
max_value=max_value,
123+
strategy=strategy,
124+
num_of_extra_partitions=num_of_extra_partitions,
125+
)
126+
127+
# Create partitions from partition_start to partition_end
128+
current_partition_start = partition_start
129+
130+
while current_partition_start < partition_end:
131+
partition_suffix = self._get_partition_suffix(
132+
current_partition_start=current_partition_start, strategy=strategy
133+
)
134+
135+
current_partition_end = self._get_partition_end_boundary(
136+
current_partition_start=current_partition_start, strategy=strategy
137+
)
138+
self._create_datetime_partition(
139+
base_table=base_table,
140+
copy_table=copy_table,
141+
partition_suffix=partition_suffix,
142+
start=current_partition_start,
143+
end=current_partition_end,
144+
)
145+
current_partition_start = current_partition_end
146+
147+
def _get_first_partition_start_date(
148+
self, *, min_value: datetime.date, strategy: _partition.DateRangeStrategy
149+
) -> datetime.date:
150+
"""
151+
Align the minimum value to partition boundaries.
152+
For DAY: uses the exact min_value
153+
For MONTH: aligns to the first day of the month
154+
"""
155+
if strategy.partition_by == _partition.PartitionInterval.DAY:
156+
return min_value
157+
elif strategy.partition_by == _partition.PartitionInterval.MONTH:
158+
# Align to start of month
159+
return min_value.replace(day=1)
160+
else:
161+
raise ValueError(f"Unsupported partition_by: {strategy.partition_by}")
162+
163+
def _get_last_partition_end_date(
164+
self,
165+
*,
166+
max_value: datetime.date,
167+
strategy: _partition.DateRangeStrategy,
168+
num_of_extra_partitions: int,
169+
) -> datetime.date:
170+
"""
171+
Calculate the end date for partitioning.
172+
Always adds 1 interval unit to cover max_value (since range partitions are
173+
exclusive on the upper bound), plus num_of_extra_partitions.
174+
For DAY: adds 1 + num_of_extra_partitions days
175+
For MONTH: adds 1 + num_of_extra_partitions months
176+
"""
177+
if strategy.partition_by == _partition.PartitionInterval.DAY:
178+
# Add 1 day to ensure max_value is covered, plus extra partitions
179+
return max_value + datetime.timedelta(days=1 + num_of_extra_partitions)
180+
elif strategy.partition_by == _partition.PartitionInterval.MONTH:
181+
# Add 1 month to ensure max_value is covered, plus extra partitions
182+
# Add months by advancing to first of month and adding 32*months,
183+
# then normalising. This is because timedelta doesn't accept
184+
# "months" as argument.
185+
temp_date = max_value.replace(day=1)
186+
for _ in range(1 + num_of_extra_partitions):
187+
temp_date = (temp_date + datetime.timedelta(days=32)).replace(day=1)
188+
return temp_date
189+
else:
190+
raise ValueError(f"Unsupported partition_by: {strategy.partition_by}")
191+
192+
def _get_partition_end_boundary(
193+
self,
194+
*,
195+
current_partition_start: datetime.date,
196+
strategy: _partition.DateRangeStrategy,
197+
) -> datetime.date:
198+
"""
199+
Calculate the end boundary for a single partition.
200+
For DAY: adds 1 day
201+
For MONTH: advances to the first day of the next month
202+
"""
203+
if strategy.partition_by == _partition.PartitionInterval.DAY:
204+
return current_partition_start + datetime.timedelta(days=1)
205+
elif strategy.partition_by == _partition.PartitionInterval.MONTH:
206+
# Next month boundary
207+
return (current_partition_start + datetime.timedelta(days=32)).replace(
208+
day=1
209+
)
210+
else:
211+
raise ValueError(f"Unsupported partition_by: {strategy.partition_by}")
212+
213+
def _get_partition_suffix(
214+
self,
215+
*,
216+
current_partition_start: datetime.date,
217+
strategy: _partition.DateRangeStrategy,
218+
) -> str:
219+
"""
220+
Generate a date-based partition suffix.
221+
For DAY: returns p20250101 (YYYYMMDD format)
222+
For MONTH: returns p202501 (YYYYMM format)
223+
"""
224+
if strategy.partition_by == _partition.PartitionInterval.DAY:
225+
# Format: p20250101 (YYYYMMDD)
226+
return f"p{current_partition_start.strftime('%Y%m%d')}"
227+
elif strategy.partition_by == _partition.PartitionInterval.MONTH:
228+
# Format: p202501 (YYYYMM)
229+
return f"p{current_partition_start.strftime('%Y%m')}"
230+
else:
231+
raise ValueError(f"Unsupported partition_by: {strategy.partition_by}")
232+
233+
def _create_datetime_partition(
234+
self,
235+
*,
236+
base_table: str,
237+
copy_table: str,
238+
partition_suffix: str,
239+
start: datetime.date,
240+
end: datetime.date,
241+
) -> None:
242+
"""Create a single datetime range partition."""
243+
self.cur.execute(
244+
psycopg.sql.SQL(
245+
dedent("""
246+
CREATE TABLE {schema}.{partition_name}
247+
PARTITION OF {schema}.{copy_table}
248+
FOR VALUES FROM ({start}) TO ({end});
249+
""")
250+
)
251+
.format(
252+
schema=psycopg.sql.Identifier(self.schema),
253+
partition_name=psycopg.sql.Identifier(
254+
f"{base_table}_{partition_suffix}"
255+
),
256+
copy_table=psycopg.sql.Identifier(copy_table),
257+
start=psycopg.sql.Literal(start),
258+
end=psycopg.sql.Literal(end),
259+
)
260+
.as_string(self.conn)
261+
)
262+
67263
def drop_sequence_if_exists(self, *, seq: str) -> None:
68264
self.cur.execute(
69265
psycopg.sql.SQL("DROP SEQUENCE IF EXISTS {schema}.{seq};")
@@ -109,17 +305,37 @@ def set_table_id_seq(self, *, table: str, seq: str, pk_column: str) -> None:
109305
)
110306

111307
def add_pk(self, *, table: str, pk_column: str) -> None:
112-
self.cur.execute(
113-
psycopg.sql.SQL(
114-
"ALTER TABLE {schema}.{table} ADD PRIMARY KEY ({pk_column});"
308+
# For partitioned tables, the PK must include all partitioning columns
309+
if self.partition_config:
310+
pk_columns = psycopg.sql.SQL(", ").join(
311+
[
312+
psycopg.sql.Identifier(pk_column),
313+
psycopg.sql.Identifier(self.partition_config.column),
314+
]
115315
)
116-
.format(
117-
table=psycopg.sql.Identifier(table),
118-
pk_column=psycopg.sql.Identifier(pk_column),
119-
schema=psycopg.sql.Identifier(self.schema),
316+
self.cur.execute(
317+
psycopg.sql.SQL(
318+
"ALTER TABLE {schema}.{table} ADD PRIMARY KEY ({pk_columns});"
319+
)
320+
.format(
321+
table=psycopg.sql.Identifier(table),
322+
pk_columns=pk_columns,
323+
schema=psycopg.sql.Identifier(self.schema),
324+
)
325+
.as_string(self.conn)
326+
)
327+
else:
328+
self.cur.execute(
329+
psycopg.sql.SQL(
330+
"ALTER TABLE {schema}.{table} ADD PRIMARY KEY ({pk_column});"
331+
)
332+
.format(
333+
table=psycopg.sql.Identifier(table),
334+
pk_column=psycopg.sql.Identifier(pk_column),
335+
schema=psycopg.sql.Identifier(self.schema),
336+
)
337+
.as_string(self.conn)
120338
)
121-
.as_string(self.conn)
122-
)
123339

124340
def create_copy_function(
125341
self,
@@ -511,12 +727,24 @@ def create_unique_constraint_using_idx(
511727
def create_not_valid_constraint_from_def(
512728
self, *, table: str, constraint: str, definition: str, is_validated: bool
513729
) -> None:
730+
# For partitioned tables, we can't use NOT VALID on foreign keys
731+
# So we need to remove it from the definition
732+
is_fk = "FOREIGN KEY" in definition.upper()
733+
if self.partition_config and is_fk and not is_validated:
734+
# Remove NOT VALID from the definition for partitioned tables
735+
definition = definition.replace(" NOT VALID", "").replace("NOT VALID", "")
736+
514737
add_constraint_sql = dedent("""
515738
ALTER TABLE {schema}.{table}
516739
ADD CONSTRAINT {constraint}
517740
{definition}
518741
""")
519-
if is_validated:
742+
# Only add NOT VALID if:
743+
# 1. The constraint is validated (so we make it NOT VALID temporarily)
744+
# 2. AND it's not a FK on a partitioned table (which doesn't support NOT VALID)
745+
should_add_not_valid = is_validated and not (self.partition_config and is_fk)
746+
747+
if should_add_not_valid:
520748
# If the definition is for a valid constraint, alter it to be not
521749
# valid manually so that it can be created ONLINE.
522750
add_constraint_sql += " NOT VALID"

0 commit comments

Comments
 (0)