44from textwrap import dedent
55from typing import Iterator
66
7- from . import _cur , _introspect
7+ from . import _cur , _introspect , _partition
88from . import _psycopg as psycopg
99
1010
@@ -16,11 +16,13 @@ def __init__(
1616 cur : _cur .Cursor ,
1717 introspector : _introspect .Introspector ,
1818 schema : str ,
19+ partition_config : _partition .PartitionConfig | None = None ,
1920 ) -> None :
2021 self .conn = conn
2122 self .cur = cur
2223 self .introspector = introspector
2324 self .schema = schema
25+ self .partition_config = partition_config
2426
2527 def drop_constraint (self , * , table : str , constraint : str ) -> None :
2628 self .cur .execute (
@@ -49,6 +51,17 @@ def drop_table_if_exists(self, *, table: str) -> None:
4951 )
5052
5153 def create_copy_table (self , * , base_table : str , copy_table : str ) -> None :
54+ if self .partition_config :
55+ return self ._create_partitioned_table (
56+ base_table = base_table , copy_table = copy_table
57+ )
58+
59+ # Create a non-partitioned table (default behavior)
60+ self ._create_non_partitioned_table (base_table = base_table , copy_table = copy_table )
61+
62+ def _create_non_partitioned_table (
63+ self , * , base_table : str , copy_table : str
64+ ) -> None :
5265 self .cur .execute (
5366 psycopg .sql .SQL (
5467 dedent ("""
@@ -64,6 +77,189 @@ def create_copy_table(self, *, base_table: str, copy_table: str) -> None:
6477 .as_string (self .conn )
6578 )
6679
80+ def _create_partitioned_table (self , * , base_table : str , copy_table : str ) -> None :
81+ assert self .partition_config is not None
82+ assert isinstance (self .partition_config .strategy , _partition .DateRangeStrategy )
83+
84+ # Create the parent partitioned table
85+ self .cur .execute (
86+ psycopg .sql .SQL (
87+ dedent ("""
88+ CREATE TABLE {schema}.{copy_table}
89+ (LIKE {schema}.{table} INCLUDING DEFAULTS)
90+ PARTITION BY RANGE ({partition_column});
91+ """ )
92+ )
93+ .format (
94+ table = psycopg .sql .Identifier (base_table ),
95+ copy_table = psycopg .sql .Identifier (copy_table ),
96+ schema = psycopg .sql .Identifier (self .schema ),
97+ partition_column = psycopg .sql .Identifier (self .partition_config .column ),
98+ )
99+ .as_string (self .conn )
100+ )
101+
102+ # Create partitions ahead of time
103+ self ._create_partitions (base_table = base_table , copy_table = copy_table )
104+
105+ def _create_partitions (self , * , base_table : str , copy_table : str ) -> None :
106+ assert self .partition_config is not None
107+ strategy = self .partition_config .strategy
108+ assert isinstance (strategy , _partition .DateRangeStrategy )
109+
110+ num_of_extra_partitions = self .partition_config .num_of_extra_partitions_ahead
111+
112+ min_value = self .introspector .get_min_partition_date_value (
113+ table = base_table , column = self .partition_config .column
114+ )
115+ max_value = self .introspector .get_max_partition_date_value (
116+ table = base_table , column = self .partition_config .column
117+ )
118+ partition_start = self ._get_first_partition_start_date (
119+ min_value = min_value , strategy = strategy
120+ )
121+ partition_end = self ._get_last_partition_end_date (
122+ max_value = max_value ,
123+ strategy = strategy ,
124+ num_of_extra_partitions = num_of_extra_partitions ,
125+ )
126+
127+ # Create partitions from partition_start to partition_end
128+ current_partition_start = partition_start
129+
130+ while current_partition_start < partition_end :
131+ partition_suffix = self ._get_partition_suffix (
132+ current_partition_start = current_partition_start , strategy = strategy
133+ )
134+
135+ current_partition_end = self ._get_partition_end_boundary (
136+ current_partition_start = current_partition_start , strategy = strategy
137+ )
138+ self ._create_datetime_partition (
139+ base_table = base_table ,
140+ copy_table = copy_table ,
141+ partition_suffix = partition_suffix ,
142+ start = current_partition_start ,
143+ end = current_partition_end ,
144+ )
145+ current_partition_start = current_partition_end
146+
147+ def _get_first_partition_start_date (
148+ self , * , min_value : datetime .date , strategy : _partition .DateRangeStrategy
149+ ) -> datetime .date :
150+ """
151+ Align the minimum value to partition boundaries.
152+ For DAY: uses the exact min_value
153+ For MONTH: aligns to the first day of the month
154+ """
155+ if strategy .partition_by == _partition .PartitionInterval .DAY :
156+ return min_value
157+ elif strategy .partition_by == _partition .PartitionInterval .MONTH :
158+ # Align to start of month
159+ return min_value .replace (day = 1 )
160+ else :
161+ raise ValueError (f"Unsupported partition_by: { strategy .partition_by } " )
162+
163+ def _get_last_partition_end_date (
164+ self ,
165+ * ,
166+ max_value : datetime .date ,
167+ strategy : _partition .DateRangeStrategy ,
168+ num_of_extra_partitions : int ,
169+ ) -> datetime .date :
170+ """
171+ Calculate the end date for partitioning.
172+ Always adds 1 interval unit to cover max_value (since range partitions are
173+ exclusive on the upper bound), plus num_of_extra_partitions.
174+ For DAY: adds 1 + num_of_extra_partitions days
175+ For MONTH: adds 1 + num_of_extra_partitions months
176+ """
177+ if strategy .partition_by == _partition .PartitionInterval .DAY :
178+ # Add 1 day to ensure max_value is covered, plus extra partitions
179+ return max_value + datetime .timedelta (days = 1 + num_of_extra_partitions )
180+ elif strategy .partition_by == _partition .PartitionInterval .MONTH :
181+ # Add 1 month to ensure max_value is covered, plus extra partitions
182+ # Add months by advancing to first of month and adding 32*months,
183+ # then normalising. This is because timedelta doesn't accept
184+ # "months" as argument.
185+ temp_date = max_value .replace (day = 1 )
186+ for _ in range (1 + num_of_extra_partitions ):
187+ temp_date = (temp_date + datetime .timedelta (days = 32 )).replace (day = 1 )
188+ return temp_date
189+ else :
190+ raise ValueError (f"Unsupported partition_by: { strategy .partition_by } " )
191+
192+ def _get_partition_end_boundary (
193+ self ,
194+ * ,
195+ current_partition_start : datetime .date ,
196+ strategy : _partition .DateRangeStrategy ,
197+ ) -> datetime .date :
198+ """
199+ Calculate the end boundary for a single partition.
200+ For DAY: adds 1 day
201+ For MONTH: advances to the first day of the next month
202+ """
203+ if strategy .partition_by == _partition .PartitionInterval .DAY :
204+ return current_partition_start + datetime .timedelta (days = 1 )
205+ elif strategy .partition_by == _partition .PartitionInterval .MONTH :
206+ # Next month boundary
207+ return (current_partition_start + datetime .timedelta (days = 32 )).replace (
208+ day = 1
209+ )
210+ else :
211+ raise ValueError (f"Unsupported partition_by: { strategy .partition_by } " )
212+
213+ def _get_partition_suffix (
214+ self ,
215+ * ,
216+ current_partition_start : datetime .date ,
217+ strategy : _partition .DateRangeStrategy ,
218+ ) -> str :
219+ """
220+ Generate a date-based partition suffix.
221+ For DAY: returns p20250101 (YYYYMMDD format)
222+ For MONTH: returns p202501 (YYYYMM format)
223+ """
224+ if strategy .partition_by == _partition .PartitionInterval .DAY :
225+ # Format: p20250101 (YYYYMMDD)
226+ return f"p{ current_partition_start .strftime ('%Y%m%d' )} "
227+ elif strategy .partition_by == _partition .PartitionInterval .MONTH :
228+ # Format: p202501 (YYYYMM)
229+ return f"p{ current_partition_start .strftime ('%Y%m' )} "
230+ else :
231+ raise ValueError (f"Unsupported partition_by: { strategy .partition_by } " )
232+
233+ def _create_datetime_partition (
234+ self ,
235+ * ,
236+ base_table : str ,
237+ copy_table : str ,
238+ partition_suffix : str ,
239+ start : datetime .date ,
240+ end : datetime .date ,
241+ ) -> None :
242+ """Create a single datetime range partition."""
243+ self .cur .execute (
244+ psycopg .sql .SQL (
245+ dedent ("""
246+ CREATE TABLE {schema}.{partition_name}
247+ PARTITION OF {schema}.{copy_table}
248+ FOR VALUES FROM ({start}) TO ({end});
249+ """ )
250+ )
251+ .format (
252+ schema = psycopg .sql .Identifier (self .schema ),
253+ partition_name = psycopg .sql .Identifier (
254+ f"{ base_table } _{ partition_suffix } "
255+ ),
256+ copy_table = psycopg .sql .Identifier (copy_table ),
257+ start = psycopg .sql .Literal (start ),
258+ end = psycopg .sql .Literal (end ),
259+ )
260+ .as_string (self .conn )
261+ )
262+
67263 def drop_sequence_if_exists (self , * , seq : str ) -> None :
68264 self .cur .execute (
69265 psycopg .sql .SQL ("DROP SEQUENCE IF EXISTS {schema}.{seq};" )
@@ -109,17 +305,37 @@ def set_table_id_seq(self, *, table: str, seq: str, pk_column: str) -> None:
109305 )
110306
111307 def add_pk (self , * , table : str , pk_column : str ) -> None :
112- self .cur .execute (
113- psycopg .sql .SQL (
114- "ALTER TABLE {schema}.{table} ADD PRIMARY KEY ({pk_column});"
308+ # For partitioned tables, the PK must include all partitioning columns
309+ if self .partition_config :
310+ pk_columns = psycopg .sql .SQL (", " ).join (
311+ [
312+ psycopg .sql .Identifier (pk_column ),
313+ psycopg .sql .Identifier (self .partition_config .column ),
314+ ]
115315 )
116- .format (
117- table = psycopg .sql .Identifier (table ),
118- pk_column = psycopg .sql .Identifier (pk_column ),
119- schema = psycopg .sql .Identifier (self .schema ),
316+ self .cur .execute (
317+ psycopg .sql .SQL (
318+ "ALTER TABLE {schema}.{table} ADD PRIMARY KEY ({pk_columns});"
319+ )
320+ .format (
321+ table = psycopg .sql .Identifier (table ),
322+ pk_columns = pk_columns ,
323+ schema = psycopg .sql .Identifier (self .schema ),
324+ )
325+ .as_string (self .conn )
326+ )
327+ else :
328+ self .cur .execute (
329+ psycopg .sql .SQL (
330+ "ALTER TABLE {schema}.{table} ADD PRIMARY KEY ({pk_column});"
331+ )
332+ .format (
333+ table = psycopg .sql .Identifier (table ),
334+ pk_column = psycopg .sql .Identifier (pk_column ),
335+ schema = psycopg .sql .Identifier (self .schema ),
336+ )
337+ .as_string (self .conn )
120338 )
121- .as_string (self .conn )
122- )
123339
124340 def create_copy_function (
125341 self ,
@@ -511,12 +727,24 @@ def create_unique_constraint_using_idx(
511727 def create_not_valid_constraint_from_def (
512728 self , * , table : str , constraint : str , definition : str , is_validated : bool
513729 ) -> None :
730+ # For partitioned tables, we can't use NOT VALID on foreign keys
731+ # So we need to remove it from the definition
732+ is_fk = "FOREIGN KEY" in definition .upper ()
733+ if self .partition_config and is_fk and not is_validated :
734+ # Remove NOT VALID from the definition for partitioned tables
735+ definition = definition .replace (" NOT VALID" , "" ).replace ("NOT VALID" , "" )
736+
514737 add_constraint_sql = dedent ("""
515738 ALTER TABLE {schema}.{table}
516739 ADD CONSTRAINT {constraint}
517740 {definition}
518741 """ )
519- if is_validated :
742+ # Only add NOT VALID if:
743+ # 1. The constraint is validated (so we make it NOT VALID temporarily)
744+ # 2. AND it's not a FK on a partitioned table (which doesn't support NOT VALID)
745+ should_add_not_valid = is_validated and not (self .partition_config and is_fk )
746+
747+ if should_add_not_valid :
520748 # If the definition is for a valid constraint, alter it to be not
521749 # valid manually so that it can be created ONLINE.
522750 add_constraint_sql += " NOT VALID"
0 commit comments