Skip to content

Commit 1a8796c

Browse files
committed
compact and optimize immediately
1 parent 4b8f472 commit 1a8796c

1 file changed

Lines changed: 41 additions & 17 deletions

File tree

src/database.rs

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -90,21 +90,47 @@ impl Database {
9090
self.batch_queue = Some(batch_queue);
9191
self
9292
}
93-
93+
9494
/// Start background maintenance schedulers for optimize and vacuum operations
9595
pub async fn start_maintenance_schedulers(self) -> Result<Self> {
9696
use tokio_cron_scheduler::{Job, JobScheduler};
97-
97+
9898
let scheduler = JobScheduler::new().await?;
9999
let db = Arc::new(self.clone());
100-
100+
101+
// Run immediate optimize and vacuum operations at startup
102+
info!("Running immediate optimize and vacuum on startup");
103+
let startup_db = db.clone();
104+
tokio::spawn(async move {
105+
info!("Starting immediate optimize operation on all tables");
106+
107+
// Run optimize first
108+
for (project_id, (_, _, table)) in startup_db.project_configs.read().await.iter() {
109+
info!("Optimizing table for project '{}' on startup", project_id);
110+
if let Err(e) = startup_db.optimize_table(table).await {
111+
error!("Startup optimize failed for {}: {}", project_id, e);
112+
}
113+
}
114+
115+
// Then run vacuum on the optimized tables
116+
let retention_hours = env::var("TIMEFUSION_VACUUM_RETENTION_HOURS").unwrap_or_else(|_| "336".to_string()).parse::<u64>().unwrap_or(336);
117+
info!("Starting immediate vacuum operation on all tables (retention: {}h)", retention_hours);
118+
119+
for (project_id, (_, _, table)) in startup_db.project_configs.read().await.iter() {
120+
info!("Vacuuming table for project '{}' on startup", project_id);
121+
startup_db.vacuum_table(table, retention_hours).await;
122+
}
123+
124+
info!("Completed startup maintenance operations");
125+
});
126+
101127
// Optimize job - every 3 hours
102128
let optimize_job = Job::new_async("0 0 */3 * * *", {
103129
let db = db.clone();
104130
move |_, _| {
105131
let db = db.clone();
106132
Box::pin(async move {
107-
info!("Running optimize on all tables");
133+
info!("Running scheduled optimize on all tables");
108134
for (project_id, (_, _, table)) in db.project_configs.read().await.iter() {
109135
if let Err(e) = db.optimize_table(table).await {
110136
error!("Optimize failed for {}: {}", project_id, e);
@@ -113,40 +139,38 @@ impl Database {
113139
})
114140
}
115141
})?;
116-
142+
117143
scheduler.add(optimize_job).await?;
118-
144+
119145
// Vacuum job - daily at 3AM
120146
let vacuum_job = Job::new_async("0 0 3 * * *", {
121147
let db = db.clone();
122148
move |_, _| {
123149
let db = db.clone();
124150
Box::pin(async move {
125-
info!("Running vacuum on all tables");
126-
let retention_hours = env::var("TIMEFUSION_VACUUM_RETENTION_HOURS")
127-
.unwrap_or_else(|_| "336".to_string())
128-
.parse::<u64>().unwrap_or(336);
129-
151+
info!("Running scheduled vacuum on all tables");
152+
let retention_hours = env::var("TIMEFUSION_VACUUM_RETENTION_HOURS").unwrap_or_else(|_| "336".to_string()).parse::<u64>().unwrap_or(336);
153+
130154
for (project_id, (_, _, table)) in db.project_configs.read().await.iter() {
131155
info!("Vacuuming {} (retention: {}h)", project_id, retention_hours);
132156
db.vacuum_table(table, retention_hours).await;
133157
}
134158
})
135159
}
136160
})?;
137-
161+
138162
scheduler.add(vacuum_job).await?;
139-
163+
140164
// Start the scheduler
141165
scheduler.start().await?;
142-
166+
143167
// Handle shutdown
144168
let shutdown = self.maintenance_shutdown.clone();
145169
tokio::spawn(async move {
146170
shutdown.cancelled().await;
147171
info!("Shutting down maintenance scheduler");
148172
});
149-
173+
150174
Ok(self)
151175
}
152176

@@ -423,7 +447,7 @@ impl Database {
423447

424448
let new_table = write_op.await?;
425449
*table = new_table;
426-
450+
427451
// Note: Checkpointing, optimization, and vacuum are now managed by scheduled jobs
428452
}
429453

@@ -736,7 +760,7 @@ impl TableProvider for ProjectRoutingTable {
736760
// Create a physical plan from the logical plan.
737761
// Check that the schema of the plan matches the schema of this table.
738762
match self.schema().logically_equivalent_names_and_types(&input.schema()) {
739-
Ok(_) => info!("Schema validation passed"),
763+
Ok(_) => debug!("insert_into; Schema validation passed"),
740764
Err(e) => {
741765
error!("Schema validation failed: {}", e);
742766
return Err(e);

0 commit comments

Comments
 (0)