feat(storage): backend for SQLite on job storage#125
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a SQLite-backed storage layer to persist jobs across process restarts/crashes, replacing the prior in-memory job map in the storage process.
Changes:
- Introduces a pluggable
Backendtrait with aSqliteBackendimplementation plus SQL migrations. - Updates the storage process initialization to accept a home directory and create/use
storage.sqlite. - Adds SQLx offline workflow support (
SQLX_OFFLINE,.sqlxmetadata,sqlx-prepare) and documents local dev prerequisites.
Reviewed changes
Copilot reviewed 13 out of 16 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
src/storage/src/lib.rs |
Switches storage from in-memory state to a backend abstraction and wires IPC handlers to backend calls. |
src/storage/src/backend.rs |
Adds the Backend trait and exports the sqlite backend module. |
src/storage/src/backend/sqlite.rs |
Implements SQLite persistence using SQLx (migrations, CRUD-ish operations, claiming). |
src/storage/src/backend/sqlite/migrations/001_create_table_jobs.sql |
Creates the jobs table and supporting indexes. |
src/storage/Cargo.toml |
Adds dependencies needed for SQLx/SQLite backend (sqlx, serde_json, async-trait, etc.). |
src/mate/src/proto/job.rs |
Centralizes job status/result strings into constants and uses them in Display/FromStr. |
src/cli/src/process/storage.rs |
Makes StorageProcess::new async and passes the home directory through. |
src/cli/src/cli/cmd/component/storage.rs |
Computes the storage home path and passes it into the storage process. |
README.md |
Documents dev prerequisites (Rust toolchain, SQLx CLI). |
Justfile |
Adds sqlx-prepare helper and a dkc helper task. |
Cargo.toml |
Adds sqlx to workspace dependencies. |
Cargo.lock |
Locks new transitive dependencies from SQLx and sqlite support. |
.sqlx/query-ad36629d0f34f5d236818a2ad2a236cf5dc9c1c33cec41a5325cd954000f3e1a.json |
Adds SQLx offline metadata for the insert query. |
.gitignore |
Ignores .env. |
.env.example |
Provides a sample DATABASE_URL for SQLx tooling. |
.cargo/config.toml |
Enables SQLx offline mode for builds. |
Files not reviewed (1)
- .sqlx/query-ad36629d0f34f5d236818a2ad2a236cf5dc9c1c33cec41a5325cd954000f3e1a.json: Language not supported
Comments suppressed due to low confidence (7)
src/storage/src/lib.rs:94
ClaimJobsreceives a(start, end)window from the scheduler, but storage discardsstartand usesSystemTime::now()instead. This can lead to inconsistent behavior if the scheduler and storage clocks differ or if the scheduler ever sends a non-"now" start. Consider passing the providedstartthrough tobackend.claim_jobs(...).
MessagePayload::ClaimJobs((_, end)) => {
match self
.backend
.claim_jobs(MAX_JOBS_PER_BATCH, SystemTime::now(), end)
.await
src/storage/src/backend/sqlite.rs:174
update_job_completedignores the case where the job id doesn't exist:execute(...)returning 0 affected rows still results inOk(()). Consider checkingrows_affected()and returning an error when no row was updated so callers get a reliable signal (this also matches the previous behavior which surfaced "job not found").
sqlx::query("UPDATE jobs SET status = ?, result = ?, completed_at = ? WHERE id = ?")
.bind(status)
.bind(result_json)
.bind(completed_at)
.bind(id)
.execute(&self.pool)
.await?;
Ok(())
src/storage/src/backend/sqlite.rs:124
create_jobonly inserts a subset ofJobfields, so persisted rows rely on DB defaults forerrors,attempts, andmax_attempts. This drops any non-defaultmax_attemptsset on theJobbefore storing (and would also ignore existingerrors/attemptsif those ever need to be persisted). Consider inserting these fields explicitly (andstarted_at/completed_at/resultwhen present) to ensure round-tripping is lossless.
async fn create_job(&self, job: Job) -> Result<Job> {
let id = job.id.to_string();
let args = serde_json::to_string(&job.args)?;
let task = job.task.to_string();
let status = job.status.to_string();
let scheduled_at = into_unix_timestamp(job.scheduled_at)?;
let record = sqlx::query_as!(
JobRecord,
r#"
INSERT INTO jobs (
id,
name,
args,
status,
scheduled_at,
task,
started_at,
completed_at
) VALUES (
$1,
$2,
$3,
$4,
$5,
$6,
$7,
$8
) RETURNING *"#,
id,
job.name,
args,
status,
scheduled_at,
task,
Option::<i64>::None,
Option::<i64>::None,
)
src/storage/src/lib.rs:29
Storage::newbuilds the sqlite path viahome.to_str().unwrap(), which can panic on non-UTF8 paths, and it doesn't ensure the parent directory exists (e.g.~/.mate). This can make first-run startup fail. Consider creating the directory withstd::fs::create_dir_all(&home_parent)and passing a&Path/PathBufintoSqliteBackend::newinstead of unwrapping a UTF-8 string.
pub async fn new(transport: Box<dyn Transport>, home: PathBuf) -> Result<Self> {
let ipc = Arc::new(IpcServer::new(IPC_SENDER_STORAGE, transport));
let home = home.join("storage.sqlite");
let backend = Arc::new(SqliteBackend::new(home.to_str().unwrap()).await?);
src/storage/src/lib.rs:98
- On backend errors,
QueryJobsandClaimJobscurrently returnJobsResult(vec![]), which makes storage/database failures indistinguishable from "no jobs" and can cause the scheduler/API to silently stop seeing jobs. At minimum, log the error; ideally propagate an error payload (or add a dedicated error response) so callers can surface/alert on storage failures.
MessagePayload::QueryJobs(query) => match self.backend.retrieve_jobs(query).await {
Ok(jobs) => Some(MessagePayload::JobsResult(jobs)),
Err(_err) => Some(MessagePayload::JobsResult(vec![])),
},
MessagePayload::ClaimJobs((_, end)) => {
match self
.backend
.claim_jobs(MAX_JOBS_PER_BATCH, SystemTime::now(), end)
.await
{
Ok(jobs) => Some(MessagePayload::JobsResult(jobs)),
Err(_err) => Some(MessagePayload::JobsResult(vec![])),
}
src/cli/src/cli/cmd/component/storage.rs:25
home_dir().unwrap()can panic (andstd::env::home_diris deprecated). Since this path is used to place the sqlite DB, consider using a non-deprecated home resolution (or reading from config) and returning a clear error when the home directory can't be determined. Also ensure the~/.matedirectory is created before starting storage so sqlite file creation doesn't fail due to a missing parent directory.
let config = Config::from_file(&self.config)?;
let mut home = home_dir().unwrap();
home.push(".mate");
let transport = make_transport(config.clone(), ProcessType::Storage).await?;
let mut storage = StorageProcess::new(transport, home).await?;
src/storage/src/backend/sqlite.rs:220
into_system_timecaststimestamp: i64tou64before building aDuration. If a negative timestamp ever gets into the DB, this will wrap to a hugeu64and can produce a far-futureSystemTimeinstead of erroring. Consider validatingtimestamp >= 0before casting, and returning an error for negative values.
fn into_system_time(timestamp: i64) -> Result<SystemTime> {
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs(timestamp as u64))
.context("Invalid timestamp")
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
b3f4ea0 to
0d2a717
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 16 changed files in this pull request and generated 4 comments.
Files not reviewed (1)
- .sqlx/query-ad36629d0f34f5d236818a2ad2a236cf5dc9c1c33cec41a5325cd954000f3e1a.json: Language not supported
Comments suppressed due to low confidence (5)
src/storage/src/lib.rs:29
Storage::newbuilds a filesystem path and then callshome.to_str().unwrap(). This can panic on non-UTF8 paths, and it also doesn’t ensure the parent directory exists (SQLite will fail to create the DB file if~/.mate/hasn’t been created). Prefer passing aPath/PathBufthrough toSqliteConnectOptions::filenameandcreate_dir_all(home.parent())before opening the DB.
pub async fn new(transport: Box<dyn Transport>, home: PathBuf) -> Result<Self> {
let ipc = Arc::new(IpcServer::new(IPC_SENDER_STORAGE, transport));
let home = home.join("storage.sqlite");
let backend = Arc::new(SqliteBackend::new(home.to_str().unwrap()).await?);
src/storage/src/lib.rs:94
ClaimJobsdiscards thestarttimestamp provided by the scheduler (ClaimJobs((_, end))) and substitutesSystemTime::now(). This can shrink/shift the intended claim window and potentially miss jobs near the boundary. Use the(start, end)values from the message when callingbackend.claim_jobs.
MessagePayload::ClaimJobs((_, end)) => {
match self
.backend
.claim_jobs(MAX_JOBS_PER_BATCH, SystemTime::now(), end)
.await
src/storage/src/lib.rs:103
handle_messageno longer handles severalMessagePayloadvariants that other components send to Storage (e.g.UpdateJobStatusfrom the scheduler, andJobStarted/JobFailedfrom executors). As a result, job state transitions won’t be persisted/updated. Add match arms for these variants and implement the corresponding backend updates.
MessagePayload::Ping => Some(MessagePayload::Pong),
MessagePayload::Shutdown => Some(MessagePayload::ShutdownAck),
_ => None,
}
src/storage/src/lib.rs:89
- On backend errors,
QueryJobsreturns an empty list. This will make API callers think there are simply no jobs instead of surfacing a storage failure. At minimum, log the error; ideally introduce an error-carrying response variant so callers can distinguish failure from an empty result set.
MessagePayload::QueryJobs(query) => match self.backend.retrieve_jobs(query).await {
Ok(jobs) => Some(MessagePayload::JobsResult(jobs)),
Err(_err) => Some(MessagePayload::JobsResult(vec![])),
},
src/storage/src/lib.rs:98
- In the
ClaimJobshandler, backend errors are swallowed by returning an emptyJobsResult. This can silently stall scheduling while hiding the underlying storage issue. Please at least log the error (and ideally propagate it in a way the scheduler can detect).
Ok(jobs) => Some(MessagePayload::JobsResult(jobs)),
Err(_err) => Some(MessagePayload::JobsResult(vec![])),
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 16 changed files in this pull request and generated 2 comments.
Files not reviewed (1)
- .sqlx/query-111a167ea1da690f9b2fdb8112d0fecf80a6ff4d202adc5772bf195ea7ad3a05.json: Language not supported
Comments suppressed due to low confidence (5)
src/cli/src/cli/cmd/component/storage.rs:25
homeis set to~/.mate, but the directory is not created before SQLite is opened.SqliteConnectOptions::create_if_missing(true)creates the DB file, but won’t create missing parent directories, so storage can fail to start on a fresh machine. Consider creating the directory (e.g.,create_dir_all) before launchingStorageProcess.
let config = Config::from_file(&self.config)?;
let mut home = home_dir().context("Failed to get home directory")?;
home.push(".mate");
let transport = make_transport(config.clone(), ProcessType::Storage).await?;
let mut storage = StorageProcess::new(transport, home).await?;
src/storage/src/lib.rs:29
home.to_str().unwrap()can panic for non-UTF8 paths, andSqliteConnectOptions::filename(...)accepts aPath-like value. Prefer passing thePathBuf/&Pathdirectly (or returning a proper error) instead of unwrapping.
let home = home.join("storage.sqlite");
let backend = Arc::new(SqliteBackend::new(home.to_str().unwrap()).await?);
src/storage/src/lib.rs:89
- On backend errors,
QueryJobscurrently returns an empty list, which makes API callers interpret “storage failed” as “no jobs exist”. Consider logging the error and propagating it to callers (e.g., changeJobsResultto carryResult<Vec<Job>, String>or add a dedicated error payload).
MessagePayload::QueryJobs(query) => match self.backend.retrieve_jobs(query).await {
Ok(jobs) => Some(MessagePayload::JobsResult(jobs)),
Err(_err) => Some(MessagePayload::JobsResult(vec![])),
},
src/storage/src/lib.rs:98
- Similarly, returning
JobsResult(vec![])onclaim_jobsfailure hides the fact that the scheduler can’t load work and may leave the system idle without any visible error. At minimum, log the error; ideally propagate an error to the scheduler so it can surface/handle it explicitly.
MessagePayload::ClaimJobs((_, end)) => {
match self
.backend
.claim_jobs(MAX_JOBS_PER_BATCH, SystemTime::now(), end)
.await
{
Ok(jobs) => Some(MessagePayload::JobsResult(jobs)),
Err(_err) => Some(MessagePayload::JobsResult(vec![])),
}
src/storage/src/backend/sqlite.rs:203
update_job_completeddoesn’t verify that a row was actually updated (e.g., when the job ID doesn’t exist). This can silently acknowledge success while losing the completion update. Consider checkingrows_affected()and returning an error when it’s 0.
JobResult::Success(_) => {
sqlx::query(
r#"UPDATE jobs
SET
status = 'completed',
result = ?,
completed_at = ?,
attempts = attempts + 1
WHERE id = ?"#,
)
.bind(result_json)
.bind(completed_at)
.bind(id)
.execute(&self.pool)
.await?;
}
JobResult::Failure(error) => {
sqlx::query(
r#"UPDATE jobs
SET
status = 'failed',
result = ?,
completed_at = ?,
attempts = attempts + 1,
errors = json_insert(errors, '$[#]', ?)
WHERE id = ?"#,
)
.bind(result_json)
.bind(completed_at)
.bind(error)
.bind(id)
.execute(&self.pool)
.await?;
}
}
Ok(())
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Provides support to store jobs into SQLite so they persist after shutting down/crashing
the
matethe server.