Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions crates/crates_io_database/src/models/git_index_sync_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use crate::schema::git_index_sync_queue;
use diesel::{prelude::*, sql_types::Integer};
use diesel_async::{AsyncPgConnection, RunQueryDsl};

#[derive(Debug, HasQuery, QueryableByName)]
#[diesel(table_name = git_index_sync_queue)]
pub struct GitIndexSyncQueueItem {
pub crate_name: String,
pub created_at: chrono::DateTime<chrono::Utc>,
}

#[derive(Debug, Insertable)]
#[diesel(table_name = git_index_sync_queue, check_for_backend(diesel::pg::Pg))]
struct NewGitIndexSyncQueueItem<'a> {
pub crate_name: &'a str,
}

impl GitIndexSyncQueueItem {
/// Queue a crate to be synced.
///
/// If the crate is already in the queue, then nothing happens, successfully.
pub async fn queue(conn: &mut AsyncPgConnection, crate_name: &str) -> QueryResult<()> {
diesel::insert_into(git_index_sync_queue::table)
.values(NewGitIndexSyncQueueItem { crate_name })
// It's possible the crate has already been enqueued, in which case we won't change
// anything, since we want to keep the original creation time.
.on_conflict_do_nothing()
.execute(conn)
.await?;

Ok(())
}

/// Fetch the oldest crates from the queue, deleting them as we go.
///
/// It's likely that you'll want to use this in a transaction so this can rolled back if
/// downstream processing fails.
pub async fn fetch_batch(
conn: &mut AsyncPgConnection,
limit: i32,
) -> QueryResult<Vec<GitIndexSyncQueueItem>> {
diesel::sql_query(include_str!("git_index_sync_queue_fetch_batch.sql"))
.bind::<Integer, _>(limit)
.load(conn)
.await
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
WITH
batch AS (
SELECT
crate_name
FROM
git_index_sync_queue
ORDER BY
created_at ASC
FOR UPDATE
LIMIT
$1
)
DELETE FROM git_index_sync_queue USING batch
Copy link
Member

@syphar syphar Dec 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note:

using a postgres table as a queue like this has one problem:

if our handler code fails, the entries are still removed from the queue.
Also, multiple workers arent supported.

If these things are an issue, you could do it the same way we do it in docs.rs:

  • start a transaction
  • SELECT [...] LIMIT 100 FOR UPDATE SKIP LOCKED
  • do your work
  • DELETE the entries
  • commit the transaction

Through this,

  • other workers just skip over these locked records, and
  • and in case of an error, your transaction is rolled back, and the records are unlocked again.

WHERE
git_index_sync_queue.crate_name = batch.crate_name
RETURNING
git_index_sync_queue.crate_name,
git_index_sync_queue.created_at;
2 changes: 2 additions & 0 deletions crates/crates_io_database/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub use self::dependency::{Dependency, DependencyKind, ReverseDependency};
pub use self::download::VersionDownload;
pub use self::email::{Email, NewEmail};
pub use self::follow::Follow;
pub use self::git_index_sync_queue::GitIndexSyncQueueItem;
pub use self::keyword::{CrateKeyword, Keyword};
pub use self::krate::{Crate, CrateName, NewCrate};
pub use self::owner::{CrateOwner, Owner, OwnerKind};
Expand All @@ -31,6 +32,7 @@ pub mod dependency;
pub mod download;
mod email;
mod follow;
mod git_index_sync_queue;
mod keyword;
pub mod krate;
mod owner;
Expand Down
50 changes: 32 additions & 18 deletions crates/crates_io_database/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,19 @@ diesel::table! {
}
}

diesel::table! {
use diesel::sql_types::*;
use diesel_full_text_search::Tsvector;

/// Queue for crates that need to be synced to the Git index
git_index_sync_queue (crate_name) {
/// The name of the crate to be synced
crate_name -> Text,
/// Timestamp when the sync was queued
created_at -> Timestamptz,
}
}

diesel::table! {
use diesel::sql_types::*;
use diesel_full_text_search::Tsvector;
Expand Down Expand Up @@ -745,6 +758,24 @@ diesel::table! {
}
}

diesel::table! {
/// Representation of the `recent_crate_downloads` view.
///
/// This data represents the downloads in the last 90 days.
/// This view does not contain realtime data.
/// It is refreshed by the `update-downloads` script.
recent_crate_downloads (crate_id) {
/// The `crate_id` column of the `recent_crate_downloads` view.
///
/// Its SQL type is `Integer`.
crate_id -> Integer,
/// The `downloads` column of the `recent_crate_downloads` table.
///
/// Its SQL type is `BigInt`.
downloads -> BigInt,
}
}

diesel::table! {
use diesel::sql_types::*;
use diesel_full_text_search::Tsvector;
Expand All @@ -768,24 +799,6 @@ diesel::table! {
}
}

diesel::table! {
/// Representation of the `recent_crate_downloads` view.
///
/// This data represents the downloads in the last 90 days.
/// This view does not contain realtime data.
/// It is refreshed by the `update-downloads` script.
recent_crate_downloads (crate_id) {
/// The `crate_id` column of the `recent_crate_downloads` view.
///
/// Its SQL type is `Integer`.
crate_id -> Integer,
/// The `downloads` column of the `recent_crate_downloads` table.
///
/// Its SQL type is `BigInt`.
downloads -> BigInt,
}
}

diesel::table! {
use diesel::sql_types::*;
use diesel_full_text_search::Tsvector;
Expand Down Expand Up @@ -1272,6 +1285,7 @@ diesel::allow_tables_to_appear_in_same_query!(
dependencies,
emails,
follows,
git_index_sync_queue,
keywords,
metadata,
processed_log_files,
Expand Down
55 changes: 24 additions & 31 deletions crates/crates_io_index/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,21 @@ impl Repository {
Ok(head.target().unwrap())
}

/// Commits the specified file with the specified commit message and pushes
/// the commit to the `master` branch on the `origin` remote.
///
/// Note that `modified_file` expects a file path **relative** to the
/// repository working folder!
/// Commits the specified file with specified commit message.
#[instrument(skip_all, fields(message = %msg))]
fn perform_commit_and_push(&self, msg: &str, modified_file: &Path) -> anyhow::Result<()> {
pub fn commit_file(&self, msg: &str, modified_file: &Path) -> anyhow::Result<()> {
info!("Committing \"{msg}\"");

// Strip the checkout path from the path.
let relative_path = modified_file.strip_prefix(self.checkout_path.path())?;

// git add $file
let mut index = self.repository.index()?;

if self.checkout_path.path().join(modified_file).exists() {
index.add_path(modified_file)?;
if self.checkout_path.path().join(relative_path).exists() {
index.add_path(relative_path)?;
} else {
index.remove_path(modified_file)?;
index.remove_path(relative_path)?;
}

index.write()?;
Expand All @@ -204,7 +205,7 @@ impl Repository {
self.repository
.commit(Some("HEAD"), &sig, &sig, msg, &tree, &[&parent])?;

self.push()
Ok(())
}

/// Gets a list of files that have been modified since a given `starting_commit`
Expand Down Expand Up @@ -252,29 +253,21 @@ impl Repository {
Ok(files)
}

/// Push the current branch to the provided refname
#[instrument(skip_all)]
fn push(&self) -> anyhow::Result<()> {
self.run_command(Command::new("git").args(["push", "origin", "HEAD:master"]))
}

/// Commits the specified file with the specified commit message and pushes
/// the commit to the `master` branch on the `origin` remote.
///
/// Note that `modified_file` expects an **absolute** file path!
/// Push the current branch to the `master` branch on the `origin` remote.
///
/// This function also prints the commit message and a success or failure
/// message to the console.
pub fn commit_and_push(&self, message: &str, modified_file: &Path) -> anyhow::Result<()> {
info!("Committing and pushing \"{message}\"");
/// This function also emits a success or failure message at INFO level.
#[instrument(skip_all)]
pub fn push(&self) -> anyhow::Result<()> {
info!("Pushing commits");
if let Err(err) =
self.run_command(Command::new("git").args(["push", "origin", "HEAD:master"]))
{
error!(?err, "Push errored");
} else {
info!("Push finished");
}

let relative_path = modified_file.strip_prefix(self.checkout_path.path())?;
self.perform_commit_and_push(message, relative_path)
.map(|_| info!("Commit and push finished for \"{message}\""))
.map_err(|err| {
error!(?err, "Commit and push for \"{message}\" errored");
err
})
Ok(())
}

/// Fetches any changes from the `origin` remote and performs a hard reset
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Drop the Git index sync queue table.
DROP TABLE IF EXISTS git_index_sync_queue;
14 changes: 14 additions & 0 deletions migrations/2025-11-07-202524_create_git_index_sync_queue/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- Create a table to batch pending syncs to the Git crate index.
CREATE TABLE git_index_sync_queue (
crate_name TEXT PRIMARY KEY NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);

COMMENT ON TABLE git_index_sync_queue IS 'Queue for crates that need to be synced to the Git index';

COMMENT ON COLUMN git_index_sync_queue.crate_name IS 'The name of the crate to be synced';

COMMENT ON COLUMN git_index_sync_queue.created_at IS 'Timestamp when the sync was queued';

-- Index for efficient batch processing (oldest first).
CREATE INDEX idx_git_index_sync_queue_created_at ON git_index_sync_queue (created_at);
6 changes: 3 additions & 3 deletions src/bin/crates-admin/delete_crate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use anyhow::Context;
use chrono::{DateTime, Utc};
use colored::Colorize;
use crates_io::controllers::krate::delete::max_downloads;
use crates_io::models::{NewDeletedCrate, User};
use crates_io::models::{GitIndexSyncQueueItem, NewDeletedCrate, User};
use crates_io::schema::{crate_downloads, deleted_crates};
use crates_io::worker::jobs;
use crates_io::{db, schema::crates};
Expand Down Expand Up @@ -106,12 +106,12 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> {
};

info!("{name}: Enqueuing background jobs…");
let git_index_job = jobs::SyncToGitIndex::new(name);
GitIndexSyncQueueItem::queue(&mut conn, &name).await?;
let sparse_index_job = jobs::SyncToSparseIndex::new(name);
let delete_from_storage_job = jobs::DeleteCrateFromStorage::new(name.into());

if let Err(error) = tokio::try_join!(
git_index_job.enqueue(&mut conn),
jobs::SyncToGitIndex.enqueue(&mut conn),
sparse_index_job.enqueue(&mut conn),
delete_from_storage_job.enqueue(&mut conn),
) {
Expand Down
6 changes: 3 additions & 3 deletions src/bin/crates-admin/delete_version.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::dialoguer;
use anyhow::Context;
use crates_io::models::update_default_version;
use crates_io::models::{GitIndexSyncQueueItem, update_default_version};
use crates_io::schema::crates;
use crates_io::storage::Storage;
use crates_io::worker::jobs;
Expand Down Expand Up @@ -95,11 +95,11 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> {
let crate_name = &opts.crate_name;

info!(%crate_name, "Enqueuing index sync jobs");
let git_index_job = jobs::SyncToGitIndex::new(crate_name);
GitIndexSyncQueueItem::queue(&mut conn, crate_name).await?;
let sparse_index_job = jobs::SyncToSparseIndex::new(crate_name);

if let Err(error) = tokio::try_join!(
git_index_job.enqueue(&mut conn),
jobs::SyncToGitIndex.enqueue(&mut conn),
sparse_index_job.enqueue(&mut conn),
) {
warn!(%crate_name, "Failed to enqueue background job: {error}");
Expand Down
4 changes: 3 additions & 1 deletion src/bin/crates-admin/enqueue_job.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::Result;
use chrono::NaiveDate;
use crates_io::db;
use crates_io::models::GitIndexSyncQueueItem;
use crates_io::schema::{background_jobs, crates};
use crates_io::worker::jobs;
use crates_io_worker::BackgroundJob;
Expand Down Expand Up @@ -163,7 +164,8 @@ pub async fn run(command: Command) -> Result<()> {
jobs::rss::SyncCratesFeed.enqueue(&mut conn).await?;
}
Command::SyncToGitIndex { name } => {
jobs::SyncToGitIndex::new(name).enqueue(&mut conn).await?;
GitIndexSyncQueueItem::queue(&mut conn, &name).await?;
jobs::SyncToGitIndex.enqueue(&mut conn).await?;
}
Command::SyncToSparseIndex { name } => {
jobs::SyncToSparseIndex::new(name)
Expand Down
6 changes: 3 additions & 3 deletions src/bin/crates-admin/yank_version.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::dialoguer;
use crates_io::db;
use crates_io::models::{Crate, Version};
use crates_io::models::{Crate, GitIndexSyncQueueItem, Version};
use crates_io::schema::versions;
use crates_io::worker::jobs::{SyncToGitIndex, SyncToSparseIndex, UpdateDefaultVersion};
use crates_io_worker::BackgroundJob;
Expand Down Expand Up @@ -67,12 +67,12 @@ async fn yank(opts: Opts, conn: &mut AsyncPgConnection) -> anyhow::Result<()> {
.execute(conn)
.await?;

let git_index_job = SyncToGitIndex::new(&krate.name);
GitIndexSyncQueueItem::queue(conn, &krate.name).await?;
let sparse_index_job = SyncToSparseIndex::new(&krate.name);
let update_default_version_job = UpdateDefaultVersion::new(krate.id);

tokio::try_join!(
git_index_job.enqueue(conn),
SyncToGitIndex.enqueue(conn),
sparse_index_job.enqueue(conn),
update_default_version_job.enqueue(conn),
)?;
Expand Down
5 changes: 3 additions & 2 deletions src/controllers/krate/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use axum::extract::rejection::QueryRejection;
use axum::extract::{FromRequestParts, Query};
use bigdecimal::ToPrimitive;
use chrono::{TimeDelta, Utc};
use crates_io_database::models::GitIndexSyncQueueItem;
use crates_io_database::schema::deleted_crates;
use crates_io_worker::BackgroundJob;
use diesel::prelude::*;
Expand Down Expand Up @@ -132,12 +133,12 @@ pub async fn delete_crate(
.execute(conn)
.await?;

let git_index_job = jobs::SyncToGitIndex::new(&krate.name);
GitIndexSyncQueueItem::queue(conn, &krate.name).await?;
let sparse_index_job = jobs::SyncToSparseIndex::new(&krate.name);
let delete_from_storage_job = jobs::DeleteCrateFromStorage::new(path.name);

tokio::try_join!(
git_index_job.enqueue(conn),
jobs::SyncToGitIndex.enqueue(conn),
sparse_index_job.enqueue(conn),
delete_from_storage_job.enqueue(conn),
)?;
Expand Down
Loading
Loading