Skip to content

Commit 9584c0f

Browse files
authored
Merge pull request #11 from psteinroe/chore/increase-days-ahead
chore: create 7 partitions in advance instead of 3
2 parents 0938e37 + afab194 commit 9584c0f

3 files changed

Lines changed: 41 additions & 24 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ When the replication slot is invalidated (WAL exceeded `max_slot_wal_keep_size`)
329329
Postgres Stream automatically manages daily partitions in the background:
330330

331331
**Retention policy:**
332-
- **Creates partitions** 3 days in advance (today, tomorrow, day after)
332+
- **Creates partitions** 7 days in advance
333333
- **Drops partitions** older than 7 days
334334
- **Runs on startup** and then daily
335335

src/maintenance.rs

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ use crate::{
1212
store::StreamStore,
1313
};
1414

15+
const DAYS_AHEAD_TO_CREATE: u32 = 7;
16+
const RETENTION_DAYS: u32 = 7;
17+
1518
/// Domain representation of a partition with its date-based metadata.
1619
#[derive(Debug, Clone)]
1720
pub struct PartitionInfo {
@@ -117,9 +120,13 @@ pub async fn run_maintenance<S: SchemaStore>(store: &StreamStore<S>) -> EtlResul
117120
let result = async {
118121
let existing_partitions = store.load_partitions(SCHEMA_NAME, EVENTS_TABLE).await?;
119122

120-
let policy = RetentionPolicy::new(7);
121-
let (to_drop, to_create) =
122-
policy.plan_maintenance(existing_partitions, EVENTS_TABLE, 3, start);
123+
let policy = RetentionPolicy::new(RETENTION_DAYS);
124+
let (to_drop, to_create) = policy.plan_maintenance(
125+
existing_partitions,
126+
EVENTS_TABLE,
127+
DAYS_AHEAD_TO_CREATE,
128+
start,
129+
);
123130

124131
for partition in to_drop {
125132
info!("Dropping partition: {}", partition.name);
@@ -196,7 +203,7 @@ mod tests {
196203

197204
#[test]
198205
fn test_retention_policy_plan_maintenance_drop_old() {
199-
let policy = RetentionPolicy::new(7);
206+
let policy = RetentionPolicy::new(RETENTION_DAYS);
200207
let now = Utc.with_ymd_and_hms(2024, 3, 15, 12, 0, 0).unwrap();
201208

202209
// Partitions: one old (should drop), one current, one future
@@ -206,7 +213,8 @@ mod tests {
206213
PartitionInfo::for_date("events", NaiveDate::from_ymd_opt(2024, 3, 16).unwrap()), // tomorrow
207214
];
208215

209-
let (to_drop, _to_create) = policy.plan_maintenance(partitions, "events", 3, now);
216+
let (to_drop, _to_create) =
217+
policy.plan_maintenance(partitions, "events", DAYS_AHEAD_TO_CREATE, now);
210218

211219
assert_eq!(to_drop.len(), 1);
212220
assert_eq!(
@@ -217,50 +225,57 @@ mod tests {
217225

218226
#[test]
219227
fn test_retention_policy_plan_maintenance_create_future() {
220-
let policy = RetentionPolicy::new(7);
228+
let policy = RetentionPolicy::new(RETENTION_DAYS);
221229
let now = Utc.with_ymd_and_hms(2024, 3, 15, 12, 0, 0).unwrap();
222230

223-
// Only today exists, should create 2 more (tomorrow and day after)
231+
// Only today exists, should create 6 more (days 1-6 ahead)
224232
let partitions = vec![PartitionInfo::for_date(
225233
"events",
226234
NaiveDate::from_ymd_opt(2024, 3, 15).unwrap(),
227235
)];
228236

229-
let (to_drop, to_create) = policy.plan_maintenance(partitions, "events", 3, now);
237+
let (to_drop, to_create) =
238+
policy.plan_maintenance(partitions, "events", DAYS_AHEAD_TO_CREATE, now);
230239

231240
assert_eq!(to_drop.len(), 0);
232-
assert_eq!(to_create.len(), 2);
241+
assert_eq!(to_create.len(), 6);
242+
// Verify first and last created partitions
233243
assert_eq!(
234244
to_create.first().expect("first element exists").date,
235245
NaiveDate::from_ymd_opt(2024, 3, 16).unwrap()
236246
);
237247
assert_eq!(
238-
to_create.get(1).expect("second element exists").date,
239-
NaiveDate::from_ymd_opt(2024, 3, 17).unwrap()
248+
to_create.last().expect("last element exists").date,
249+
NaiveDate::from_ymd_opt(2024, 3, 21).unwrap()
240250
);
241251
}
242252

243253
#[test]
244254
fn test_retention_policy_plan_maintenance_no_changes_needed() {
245-
let policy = RetentionPolicy::new(7);
255+
let policy = RetentionPolicy::new(RETENTION_DAYS);
246256
let now = Utc.with_ymd_and_hms(2024, 3, 15, 12, 0, 0).unwrap();
247257

248-
// All partitions already exist
258+
// All partitions already exist (today + 6 days ahead)
249259
let partitions = vec![
250260
PartitionInfo::for_date("events", NaiveDate::from_ymd_opt(2024, 3, 15).unwrap()),
251261
PartitionInfo::for_date("events", NaiveDate::from_ymd_opt(2024, 3, 16).unwrap()),
252262
PartitionInfo::for_date("events", NaiveDate::from_ymd_opt(2024, 3, 17).unwrap()),
263+
PartitionInfo::for_date("events", NaiveDate::from_ymd_opt(2024, 3, 18).unwrap()),
264+
PartitionInfo::for_date("events", NaiveDate::from_ymd_opt(2024, 3, 19).unwrap()),
265+
PartitionInfo::for_date("events", NaiveDate::from_ymd_opt(2024, 3, 20).unwrap()),
266+
PartitionInfo::for_date("events", NaiveDate::from_ymd_opt(2024, 3, 21).unwrap()),
253267
];
254268

255-
let (to_drop, to_create) = policy.plan_maintenance(partitions, "events", 3, now);
269+
let (to_drop, to_create) =
270+
policy.plan_maintenance(partitions, "events", DAYS_AHEAD_TO_CREATE, now);
256271

257272
assert_eq!(to_drop.len(), 0);
258273
assert_eq!(to_create.len(), 0);
259274
}
260275

261276
#[test]
262277
fn test_retention_policy_plan_maintenance_at_retention_boundary() {
263-
let policy = RetentionPolicy::new(7);
278+
let policy = RetentionPolicy::new(RETENTION_DAYS);
264279
let now = Utc.with_ymd_and_hms(2024, 3, 15, 12, 0, 0).unwrap();
265280

266281
// Partition exactly at cutoff (7 days old) should be kept
@@ -269,6 +284,7 @@ mod tests {
269284
PartitionInfo::for_date("events", NaiveDate::from_ymd_opt(2024, 3, 7).unwrap()), // 8 days old - should drop
270285
];
271286

287+
// Using days_ahead=1 to focus on testing the retention boundary behavior
272288
let (to_drop, _to_create) = policy.plan_maintenance(partitions, "events", 1, now);
273289

274290
assert_eq!(to_drop.len(), 1);
@@ -280,17 +296,18 @@ mod tests {
280296

281297
#[test]
282298
fn test_retention_policy_combined_drop_and_create() {
283-
let policy = RetentionPolicy::new(7);
299+
let policy = RetentionPolicy::new(RETENTION_DAYS);
284300
let now = Utc.with_ymd_and_hms(2024, 3, 15, 12, 0, 0).unwrap();
285301

286302
let partitions = vec![
287303
PartitionInfo::for_date("events", NaiveDate::from_ymd_opt(2024, 3, 1).unwrap()), // old - drop
288304
PartitionInfo::for_date("events", NaiveDate::from_ymd_opt(2024, 3, 15).unwrap()), // today - keep
289305
];
290306

291-
let (to_drop, to_create) = policy.plan_maintenance(partitions, "events", 3, now);
307+
let (to_drop, to_create) =
308+
policy.plan_maintenance(partitions, "events", DAYS_AHEAD_TO_CREATE, now);
292309

293310
assert_eq!(to_drop.len(), 1);
294-
assert_eq!(to_create.len(), 2); // tomorrow and day after
311+
assert_eq!(to_create.len(), 6); // days 1-6 ahead
295312
}
296313
}

tests/maintenance_tests.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ async fn test_initial_partitions_created() {
1717
let _stream: PgStream<MemorySink, PostgresStore> =
1818
PgStream::create(config, sink, store).await.unwrap();
1919

20-
// Check that partitions exist (today + 2 days ahead = 3 partitions)
20+
// Check that partitions exist (today + 6 days ahead = 7 partitions)
2121
let count: (i64,) = sqlx::query_as(
2222
"select count(*) from pg_tables
2323
where schemaname = 'pgstream'
@@ -27,7 +27,7 @@ async fn test_initial_partitions_created() {
2727
.await
2828
.unwrap();
2929

30-
assert_eq!(count.0, 3, "Should create 3 initial partitions");
30+
assert_eq!(count.0, 7, "Should create 7 initial partitions");
3131
}
3232

3333
#[tokio::test(flavor = "multi_thread")]
@@ -77,7 +77,7 @@ async fn test_maintenance_creates_future_partitions() {
7777
.unwrap();
7878
run_maintenance(&stream_store).await.unwrap();
7979

80-
// Should now have 3 partitions again
80+
// Should now have 7 partitions again (today + 6 days ahead)
8181
let count_after: (i64,) = sqlx::query_as(
8282
"select count(*) from pg_tables
8383
where schemaname = 'pgstream'
@@ -86,7 +86,7 @@ async fn test_maintenance_creates_future_partitions() {
8686
.fetch_one(&db.pool)
8787
.await
8888
.unwrap();
89-
assert_eq!(count_after.0, 3);
89+
assert_eq!(count_after.0, 7);
9090
}
9191

9292
#[tokio::test(flavor = "multi_thread")]
@@ -213,5 +213,5 @@ async fn test_maintenance_idempotent() {
213213
.await
214214
.unwrap();
215215

216-
assert_eq!(count.0, 3, "Running maintenance twice should be idempotent");
216+
assert_eq!(count.0, 7, "Running maintenance twice should be idempotent");
217217
}

0 commit comments

Comments
 (0)