diff --git a/Cargo.toml b/Cargo.toml index 65ac2f45a..74cbad9d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ bellatrix = { path = "orion-server/bellatrix" } context = { path = "context" } neptune = { path = "neptune" } -git-internal = "0.1.0" + git-internal = "0.2.1" #==== anyhow = "1.0.100" @@ -51,11 +51,12 @@ tokio-stream = "0.1.17" tokio-util = "0.7.16" async-trait = "0.1.89" async-stream = "0.3.6" +async-recursion = "1.1.1" futures = "0.3.31" futures-util = "0.3.31" axum = { version = "0.8.6", features = ["macros", "json"] } axum-extra = "0.10.3" -russh = "0.52.1" +russh = "0.54.6" tower-http = "0.6.6" tower = "0.5.2" tower-sessions = { version = "0.12.3", features = ["memory-store"] } diff --git a/ceres/Cargo.toml b/ceres/Cargo.toml index 242bb489b..55d5badbf 100644 --- a/ceres/Cargo.toml +++ b/ceres/Cargo.toml @@ -15,6 +15,7 @@ common = { workspace = true } jupiter = { workspace = true } callisto = { workspace = true } git-internal = { workspace = true } + neptune = { workspace = true } bellatrix = { workspace = true } @@ -32,6 +33,7 @@ chrono = { workspace = true } futures = { workspace = true } bytes = { workspace = true } async-trait = { workspace = true } +async-recursion = { workspace = true } rand = { workspace = true } ring = { workspace = true } hex = { workspace = true } diff --git a/ceres/src/api_service/import_api_service.rs b/ceres/src/api_service/import_api_service.rs index e19d89d91..51a831a3e 100644 --- a/ceres/src/api_service/import_api_service.rs +++ b/ceres/src/api_service/import_api_service.rs @@ -8,6 +8,7 @@ use tokio::sync::Mutex; use async_trait::async_trait; use git_internal::errors::GitError; use git_internal::hash::SHA1; +use git_internal::internal::metadata::{EntryMeta, MetaAttached}; use git_internal::internal::object::commit::Commit; use git_internal::internal::object::tree::{Tree, TreeItem, TreeItemMode}; use git_internal::internal::pack::entry::Entry; @@ -443,12 +444,21 @@ impl ApiHandler for ImportApiService { Commit::from_tree_id(new_root_id, vec![parent_id], &payload.commit_message); let new_commit_id = new_commit.id.to_string(); - let mut entries: Vec = Vec::new(); + let mut entries: Vec> = Vec::new(); for t in updated_trees.iter().cloned() { - entries.push(Entry::from(t)); + entries.push(MetaAttached { + inner: Entry::from(t), + meta: EntryMeta::new(), + }); } - entries.push(Entry::from(new_blob.clone())); - entries.push(Entry::from(new_commit.clone())); + entries.push(MetaAttached { + inner: Entry::from(new_blob.clone()), + meta: EntryMeta::new(), + }); + entries.push(MetaAttached { + inner: Entry::from(new_commit.clone()), + meta: EntryMeta::new(), + }); git_storage .save_entry(self.repo.repo_id, entries) .await @@ -788,6 +798,9 @@ impl ImportApiService { tag_name: name, tagger: tagger_info, message: message.unwrap_or_default(), + pack_id: String::new(), + pack_offset: 0, + created_at: chrono::Utc::now().naive_utc(), } } diff --git a/ceres/src/api_service/mono_api_service.rs b/ceres/src/api_service/mono_api_service.rs index 6b97fdc93..92ee176a9 100644 --- a/ceres/src/api_service/mono_api_service.rs +++ b/ceres/src/api_service/mono_api_service.rs @@ -52,6 +52,7 @@ use bytes::Bytes; use common::utils::MEGA_BRANCH_NAME; use git_internal::errors::GitError; use git_internal::hash::SHA1; +use git_internal::internal::metadata::EntryMeta; use git_internal::internal::object::blob::Blob; use git_internal::internal::object::commit::Commit; use git_internal::internal::object::tree::{Tree, TreeItem, TreeItemMode}; @@ -283,7 +284,7 @@ impl ApiHandler for MonoApiService { let save_trees: Vec = save_trees .into_iter() .map(|save_t| { - let mut tree_model: mega_tree::Model = save_t.into_mega_model(); + let mut tree_model: mega_tree::Model = save_t.into_mega_model(EntryMeta::new()); tree_model.commit_id.clone_from(&new_commit_id); tree_model.into() }) @@ -390,7 +391,7 @@ impl ApiHandler for MonoApiService { let save_trees: Vec = save_trees .into_iter() .map(|save_t| { - let mut tree_model: mega_tree::Model = save_t.into_mega_model(); + let mut tree_model: mega_tree::Model = save_t.into_mega_model(EntryMeta::new()); tree_model.commit_id.clone_from(&new_commit_id); tree_model.into() }) @@ -1045,6 +1046,8 @@ impl MonoApiService { tag_name: name, tagger: tagger_info, message: message.unwrap_or_default(), + pack_id: String::new(), + pack_offset: 0, created_at: chrono::Utc::now().naive_utc(), } } @@ -1229,7 +1232,7 @@ impl MonoApiService { .clone() .into_iter() .map(|save_t| { - let mut tree_model: mega_tree::Model = save_t.into_mega_model(); + let mut tree_model: mega_tree::Model = save_t.into_mega_model(EntryMeta::new()); tree_model.commit_id.clone_from(&new_commit_id); tree_model.into() }) diff --git a/ceres/src/pack/import_repo.rs b/ceres/src/pack/import_repo.rs index bcbd47054..a56a0af98 100644 --- a/ceres/src/pack/import_repo.rs +++ b/ceres/src/pack/import_repo.rs @@ -1,3 +1,6 @@ +use async_recursion::async_recursion; +use async_trait::async_trait; +use futures::StreamExt; use std::{ collections::{HashMap, HashSet}, path::PathBuf, @@ -7,9 +10,6 @@ use std::{ atomic::{AtomicUsize, Ordering}, }, }; - -use async_trait::async_trait; -use futures::StreamExt; use tokio::sync::{ Mutex, mpsc::{self}, @@ -18,6 +18,7 @@ use tokio_stream::wrappers::ReceiverStream; use callisto::{mega_tree, raw_blob, sea_orm_active_enums::RefTypeEnum}; use common::errors::MegaError; +use git_internal::internal::metadata::{EntryMeta, MetaAttached}; use git_internal::{ errors::GitError, internal::{ @@ -65,14 +66,23 @@ impl RepoHandler for ImportRepo { async fn post_receive_pack(&self) -> Result<(), MegaError> { let _guard = self.shared.lock().await; + self.traverses_tree_and_update_filepath().await?; self.attach_to_monorepo_parent().await } - async fn save_entry(&self, entry_list: Vec) -> Result<(), MegaError> { + async fn save_entry( + &self, + entry_list: Vec>, + ) -> Result<(), MegaError> { let storage = self.storage.git_db_storage(); storage.save_entry(self.repo.repo_id, entry_list).await } + async fn update_pack_id(&self, temp_pack_id: &str, pack_id: &str) -> Result<(), MegaError> { + let storage = self.storage.git_db_storage(); + storage.update_pack_id(temp_pack_id, pack_id).await + } + async fn check_entry(&self, _: &Entry) -> Result<(), GitError> { Ok(()) } @@ -96,7 +106,10 @@ impl RepoHandler for ImportRepo { match model { Ok(m) => { let c: Commit = Commit::from_git_model(m); - let entry = c.into(); + let entry = MetaAttached { + inner: c.into(), + meta: EntryMeta::new(), + }; entry_tx.send(entry).await.unwrap(); } Err(err) => eprintln!("Error: {err:?}"), @@ -109,7 +122,10 @@ impl RepoHandler for ImportRepo { match model { Ok(m) => { let t: Tree = Tree::from_git_model(m); - let entry = t.into(); + let entry = MetaAttached { + inner: t.into(), + meta: EntryMeta::new(), + }; entry_tx.send(entry).await.unwrap(); } Err(err) => eprintln!("Error: {err:?}"), @@ -139,7 +155,26 @@ impl RepoHandler for ImportRepo { // TODO handle storage type let data = m.data.unwrap_or_default(); let b: Blob = Blob::from_content_bytes(data); - let entry: Entry = b.into(); + // let blob_with_data = storage.get_blobs_by_hashes(repo_id,vec![b.id.to_string()]).await?.iter().next().unwrap(); + let blob_with_data = storage + .get_blobs_by_hashes(repo_id, vec![b.id.to_string()]) + .await + .expect("get_blobs_by_hashes failed") + .into_iter() + .next() + .expect("blob metadata not found"); + + let meta_data = EntryMeta { + pack_id: Some(blob_with_data.pack_id.clone()), + pack_offset: Some(blob_with_data.pack_offset as usize), + file_path: Some(blob_with_data.file_path.clone()), + is_delta: Some(blob_with_data.is_delta_in_pack), + }; + + let entry = MetaAttached { + inner: b.into(), + meta: meta_data, + }; sender_clone.send(entry).await.unwrap(); } Err(err) => eprintln!("Error: {err:?}"), @@ -154,7 +189,10 @@ impl RepoHandler for ImportRepo { let tags = storage.get_tags_by_repo_id(repo_id).await.unwrap(); for m in tags.into_iter() { let c: Tag = Tag::from_git_model(m); - let entry: Entry = c.into(); + let entry = MetaAttached { + inner: c.into(), + meta: EntryMeta::new(), + }; entry_tx.send(entry).await.unwrap(); } drop(entry_tx); @@ -256,7 +294,13 @@ impl RepoHandler for ImportRepo { Some(&entry_tx), ) .await; - entry_tx.send(c.into()).await.unwrap(); + entry_tx + .send(MetaAttached { + inner: c.into(), + meta: EntryMeta::new(), + }) + .await + .unwrap(); } drop(entry_tx); @@ -285,6 +329,34 @@ impl RepoHandler for ImportRepo { .await } + async fn get_blob_metadata_by_hashes( + &self, + hashes: Vec, + ) -> Result, MegaError> { + let models = self + .storage + .git_db_storage() + .get_blobs_by_hashes(self.repo.repo_id, hashes) + .await?; + + let map = models + .into_iter() + .map(|blob| { + ( + blob.blob_id.clone(), + EntryMeta { + pack_id: Some(blob.pack_id.clone()), + pack_offset: Some(blob.pack_offset as usize), + file_path: Some(blob.file_path.clone()), + is_delta: Some(blob.is_delta_in_pack), + }, + ) + }) + .collect::>(); + + Ok(map) + } + async fn update_refs(&self, refs: &RefCommand) -> Result<(), GitError> { let storage = self.storage.git_db_storage(); match refs.command_type { @@ -324,9 +396,65 @@ impl RepoHandler for ImportRepo { .await .unwrap() } + + async fn traverses_tree_and_update_filepath(&self) -> Result<(), MegaError> { + //let (current_head, refs) = self.head_hash().await; + let (current_head, _refs) = self.refs_with_head_hash().await; + let commit = Commit::from_git_model( + self.storage + .git_db_storage() + .get_commit_by_hash(self.repo.repo_id, ¤t_head) + .await? + .unwrap(), + ); + + let root_tree = Tree::from_git_model( + self.storage + .git_db_storage() + .get_tree_by_hash(self.repo.repo_id, &commit.tree_id.to_string()) + .await? + .unwrap() + .clone(), + ); + self.traverses_and_update_filepath(root_tree, PathBuf::new()) + .await?; + Ok(()) + } } impl ImportRepo { + #[async_recursion] + async fn traverses_and_update_filepath( + &self, + tree: Tree, + path: PathBuf, + ) -> Result<(), MegaError> { + for item in tree.tree_items { + if item.is_tree() { + let tree = Tree::from_git_model( + self.storage + .git_db_storage() + .get_tree_by_hash(self.repo.repo_id, &item.id.to_string()) + .await? + .unwrap() + .clone(), + ); + + // 递归调用 + self.traverses_and_update_filepath(tree, path.join(item.name)) + .await?; + } else { + let id = item.id.to_string(); + self.storage + .git_db_storage() + .update_git_blob_filepath(&id, path.join(item.name).to_str().unwrap()) + .await?; + } + } + + Ok(()) + } + // attach import repo to monorepo parent tree pub(crate) async fn attach_to_monorepo_parent(&self) -> Result<(), MegaError> { // 1. find branch command @@ -379,7 +507,7 @@ impl ImportRepo { let save_trees: Vec = save_trees .into_iter() .map(|tree| { - let mut model: mega_tree::Model = tree.into_mega_model(); + let mut model: mega_tree::Model = tree.into_mega_model(EntryMeta::new()); model.commit_id = new_commit.id.to_string(); model.into() }) diff --git a/ceres/src/pack/mod.rs b/ceres/src/pack/mod.rs index c56d2d8b3..4015b56f1 100644 --- a/ceres/src/pack/mod.rs +++ b/ceres/src/pack/mod.rs @@ -1,3 +1,7 @@ +use async_trait::async_trait; +use bytes::Bytes; +use futures::{Stream, future::join_all}; +use std::collections::HashMap; use std::{ collections::HashSet, pin::Pin, @@ -6,10 +10,6 @@ use std::{ atomic::{AtomicUsize, Ordering}, }, }; - -use async_trait::async_trait; -use bytes::Bytes; -use futures::{Stream, future::join_all}; use sysinfo::System; use tokio::sync::{Semaphore, mpsc::UnboundedReceiver}; use tokio_stream::wrappers::ReceiverStream; @@ -21,6 +21,8 @@ use common::{ errors::{MegaError, ProtocolError}, utils::ZERO_ID, }; +use git_internal::hash::SHA1; +use git_internal::internal::metadata::{EntryMeta, MetaAttached}; use git_internal::internal::pack::Pack; use git_internal::{ errors::GitError, @@ -32,6 +34,7 @@ use git_internal::{ pack::entry::Entry, }, }; +use uuid::Uuid; pub mod import_repo; pub mod monorepo; @@ -44,14 +47,18 @@ pub trait RepoHandler: Send + Sync + 'static { async fn receiver_handler( self: Arc, - mut rx: UnboundedReceiver, + mut rx: UnboundedReceiver>, + mut rx_pack_id: UnboundedReceiver, ) -> Result<(), GitError> { let mut entry_list = vec![]; - let semaphore = Arc::new(Semaphore::new(4)); + let semaphore = Arc::new(Semaphore::new(1)); //这里暂时改动 let mut join_tasks = vec![]; - while let Some(entry) = rx.recv().await { - self.check_entry(&entry).await?; + let temp_pack_id = Uuid::new_v4().to_string(); + + while let Some(mut entry) = rx.recv().await { + self.check_entry(&entry.inner).await?; + entry.meta.set_pack_id(temp_pack_id.clone()); entry_list.push(entry); if entry_list.len() >= 1000 { let acquired = semaphore.clone().acquire_owned().await.unwrap(); @@ -84,12 +91,37 @@ pub trait RepoHandler: Send + Sync + 'static { } } } + + // receive pack_id and update it + if let Some(real_pack_id) = rx_pack_id.recv().await { + let real_pack_id_str = real_pack_id.to_string(); + tracing::debug!( + "Received real pack_id: {}, updating database from temp_pack_id: {}", + real_pack_id_str, + temp_pack_id + ); + + // 通过数据库操作更新 pack_id + if let Err(e) = self.update_pack_id(&temp_pack_id, &real_pack_id_str).await { + tracing::error!("Failed to update pack_id in database: {:?}", e); + return Err(GitError::CustomError(format!( + "Failed to update pack_id: {:?}", + e + ))); + } + } + Ok(()) } async fn post_receive_pack(&self) -> Result<(), MegaError>; - async fn save_entry(&self, entry_list: Vec) -> Result<(), MegaError>; + async fn save_entry( + &self, + entry_list: Vec>, + ) -> Result<(), MegaError>; + + async fn update_pack_id(&self, temp_pack_id: &str, pack_id: &str) -> Result<(), MegaError>; async fn check_entry(&self, entry: &Entry) -> Result<(), GitError>; @@ -116,6 +148,11 @@ pub trait RepoHandler: Send + Sync + 'static { hashes: Vec, ) -> Result, MegaError>; + async fn get_blob_metadata_by_hashes( + &self, + hashes: Vec, + ) -> Result, MegaError>; + async fn update_refs(&self, refs: &RefCommand) -> Result<(), GitError>; async fn check_commit_exist(&self, hash: &str) -> bool; @@ -136,7 +173,13 @@ pub trait RepoHandler: Send + Sync + 'static { &self, pack_config: &PackConfig, stream: Pin> + Send>>, - ) -> Result, ProtocolError> { + ) -> Result< + ( + UnboundedReceiver>, + UnboundedReceiver, + ), + ProtocolError, + > { let total_mem = || { let sys = System::new_all(); Ok(sys.total_memory() as usize) @@ -149,14 +192,16 @@ pub trait RepoHandler: Send + Sync + 'static { }; let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + let (pack_id_sender, pack_id_receiver) = tokio::sync::mpsc::unbounded_channel(); + let p = Pack::new( None, Some(cache_mem), Some(pack_config.pack_decode_cache_path.clone()), pack_config.clean_cache_after_decode, ); - p.decode_stream(stream, sender).await; - Ok(receiver) + p.decode_stream(stream, sender, Some(pack_id_sender)).await; + Ok((receiver, pack_id_receiver)) } async fn traverse_for_count( @@ -209,7 +254,7 @@ pub trait RepoHandler: Send + Sync + 'static { &self, tree: Tree, exist_objs: &mut HashSet, - sender: Option<&tokio::sync::mpsc::Sender>, + sender: Option<&tokio::sync::mpsc::Sender>>, ) { let mut search_tree_ids = vec![]; let mut search_blob_ids = vec![]; @@ -226,11 +271,25 @@ pub trait RepoHandler: Send + Sync + 'static { } if let Some(sender) = sender { - let blobs = self.get_blobs_by_hashes(search_blob_ids).await.unwrap(); + let blobs = self + .get_blobs_by_hashes(search_blob_ids.clone()) + .await + .unwrap(); + let blobs_ext_data = self + .get_blob_metadata_by_hashes(search_blob_ids) + .await + .unwrap(); for b in blobs { let data = b.data.unwrap_or_default(); let blob: Blob = Blob::from_content_bytes(data); - sender.send(blob.into()).await.unwrap(); + let ext_data = blobs_ext_data.get(&b.sha1).unwrap(); + sender + .send(MetaAttached { + inner: blob.into(), + meta: ext_data.to_owned(), + }) + .await + .unwrap(); } } @@ -240,7 +299,15 @@ pub trait RepoHandler: Send + Sync + 'static { } if let Some(sender) = sender { - sender.send(tree.into()).await.unwrap(); + sender + .send(MetaAttached { + inner: tree.into(), + meta: EntryMeta::new(), + }) + .await + .unwrap(); } } + + async fn traverses_tree_and_update_filepath(&self) -> Result<(), MegaError>; } diff --git a/ceres/src/pack/monorepo.rs b/ceres/src/pack/monorepo.rs index af314a3ed..0a15fce77 100644 --- a/ceres/src/pack/monorepo.rs +++ b/ceres/src/pack/monorepo.rs @@ -1,3 +1,5 @@ +use async_recursion::async_recursion; +use async_trait::async_trait; use std::{ collections::{HashMap, HashSet}, path::{Component, Path, PathBuf}, @@ -8,8 +10,6 @@ use std::{ }, vec, }; - -use async_trait::async_trait; use tokio::sync::{RwLock, mpsc}; use tokio_stream::wrappers::ReceiverStream; @@ -21,6 +21,7 @@ use common::{ errors::MegaError, utils::{self, ZERO_ID}, }; +use git_internal::internal::metadata::{EntryMeta, MetaAttached}; use git_internal::{ errors::GitError, hash::SHA1, @@ -143,11 +144,15 @@ impl RepoHandler for MonoRepo { async fn post_receive_pack(&self) -> Result<(), MegaError> { self.save_or_update_cl().await?; + self.traverses_tree_and_update_filepath().await?; self.post_cl_operation().await?; Ok(()) } - async fn save_entry(&self, entry_list: Vec) -> Result<(), MegaError> { + async fn save_entry( + &self, + entry_list: Vec>, + ) -> Result<(), MegaError> { let storage = self.storage.mono_storage(); let current_commit = self.current_commit.read().await; let commit_id = if let Some(commit) = &*current_commit { @@ -160,6 +165,11 @@ impl RepoHandler for MonoRepo { .await } + async fn update_pack_id(&self, temp_pack_id: &str, pack_id: &str) -> Result<(), MegaError> { + let storage = self.storage.mono_storage(); + storage.update_pack_id(temp_pack_id, pack_id).await + } + async fn check_entry(&self, entry: &Entry) -> Result<(), GitError> { if self.current_commit.read().await.is_none() { if entry.obj_type == ObjectType::Commit { @@ -262,7 +272,7 @@ impl RepoHandler for MonoRepo { let (stream_tx, stream_rx) = mpsc::channel(pack_config.channel_message_size); let encoder = PackEncoder::new(obj_num.into_inner(), 0, stream_tx); encoder.encode_async(entry_rx).await.unwrap(); - + // todo: For now, send metadata only for blob objects. for c in want_commits { self.traverse( want_trees.get(&c.tree_id).unwrap().clone(), @@ -270,7 +280,13 @@ impl RepoHandler for MonoRepo { Some(&entry_tx), ) .await; - entry_tx.send(c.into()).await.unwrap(); + entry_tx + .send(MetaAttached { + inner: c.into(), + meta: EntryMeta::new(), + }) + .await + .unwrap(); } drop(entry_tx); @@ -299,6 +315,34 @@ impl RepoHandler for MonoRepo { .await } + async fn get_blob_metadata_by_hashes( + &self, + hashes: Vec, + ) -> Result, MegaError> { + let models = self + .storage + .mono_storage() + .get_mega_blobs_by_hashes(hashes) + .await?; + + let map = models + .into_iter() + .map(|blob| { + ( + blob.blob_id.clone(), + EntryMeta { + pack_id: Some(blob.pack_id.clone()), + pack_offset: Some(blob.pack_offset as usize), + file_path: Some(blob.file_path.clone()), + is_delta: Some(blob.is_delta_in_pack), + }, + ) + }) + .collect::>(); + + Ok(map) + } + async fn update_refs(&self, refs: &RefCommand) -> Result<(), GitError> { let storage = self.storage.mono_storage(); let current_commit = self.current_commit.read().await; @@ -335,9 +379,144 @@ impl RepoHandler for MonoRepo { async fn check_default_branch(&self) -> bool { true } + + async fn traverses_tree_and_update_filepath(&self) -> Result<(), MegaError> { + let commit_guard = self.current_commit.read().await; + + let commit_opt = match commit_guard.as_ref() { + Some(commit) => commit, + None => { + tracing::info!( + "Skipping file path update: no current commit available. \ + This typically occurs when only updating references or pushing empty pack files." + ); + return Ok(()); + } + }; + + let tree_hashes = vec![commit_opt.tree_id.to_string()]; + let trees = self + .storage + .mono_storage() + .get_trees_by_hashes(tree_hashes) + .await + .map_err(|e| { + MegaError::with_message(format!( + "Failed to retrieve root tree for commit {}: {}", + commit_opt.id, e + )) + })?; + + if trees.is_empty() { + return Err(MegaError::with_message(format!( + "Root tree {} not found for commit {}", + commit_opt.tree_id, commit_opt.id + ))); + } + + let root_tree = Tree::from_mega_model(trees[0].clone()); + + tracing::info!( + "Starting file path update for commit {} with root tree {}", + commit_opt.id, + commit_opt.tree_id + ); + + self.traverses_and_update_filepath(root_tree, PathBuf::new()) + .await + .map_err(|e| { + MegaError::with_message(format!( + "Failed to update file paths for commit {}: {}", + commit_opt.id, e + )) + })?; + + tracing::info!( + "Successfully completed file path update for commit {}", + commit_opt.id + ); + + Ok(()) + } } impl MonoRepo { + #[async_recursion] + async fn traverses_and_update_filepath( + &self, + tree: Tree, + path: PathBuf, + ) -> Result<(), MegaError> { + for item in tree.tree_items { + let item_path = path.join(&item.name); + + if item.is_tree() { + let tree_hash = item.id.to_string(); + let trees = self + .storage + .mono_storage() + .get_trees_by_hashes(vec![tree_hash.clone()]) + .await + .map_err(|e| { + MegaError::with_message(format!( + "Failed to retrieve tree {} at path '{}': {}", + tree_hash, + item_path.display(), + e + )) + })?; + + if trees.is_empty() { + return Err(MegaError::with_message(format!( + "Tree {} not found at path '{}'", + tree_hash, + item_path.display() + ))); + } + + let child_tree = Tree::from_mega_model(trees[0].clone()); + + self.traverses_and_update_filepath(child_tree, item_path.clone()) + .await + .map_err(|e| { + MegaError::with_message(format!( + "Failed to process subtree {} at path '{}': {}", + tree_hash, + item_path.display(), + e + )) + })?; + } else { + let blob_id = item.id.to_string(); + let file_path_str = item_path.to_str().ok_or_else(|| { + MegaError::with_message(format!( + "Invalid UTF-8 path for blob {}: '{}'", + blob_id, + item_path.display() + )) + })?; + + self.storage + .mono_storage() + .update_blob_filepath(&blob_id, file_path_str) + .await + .map_err(|e| { + MegaError::with_message(format!( + "Failed to update file path for blob {} at '{}': {}", + blob_id, file_path_str, e + )) + })?; + + tracing::debug!( + "Updated file path for blob {} to '{}'", + blob_id, + file_path_str + ); + } + } + + Ok(()) + } async fn fetch_or_new_cl_link(&self) -> Result { let storage = self.storage.cl_storage(); let path_str = self.path.to_str().unwrap(); @@ -591,7 +770,14 @@ fn get_plain_items(tree: &Tree) -> Vec<(PathBuf, SHA1)> { #[cfg(test)] mod test { - use std::path::{Component, Path}; + use crate::pack::RepoHandler; + use crate::pack::monorepo::MonoRepo; + use git_internal::hash::SHA1; + use git_internal::internal::object::tree::{Tree, TreeItem, TreeItemMode}; + use std::path::{Component, Path, PathBuf}; + use std::str::FromStr; + use std::sync::Arc; + use tokio::sync::RwLock; #[test] fn get_component_reverse() { @@ -606,4 +792,81 @@ mod test { assert_eq!(vec!["d.txt", "c", "b", "a"], reversed); // ["d.txt", "c", "b", "a"] } + + // 创建测试用的 MonoRepo 实例 + async fn create_test_mono_repo() -> MonoRepo { + use bellatrix::Bellatrix; + use common::config::BuildConfig; + use jupiter::tests::test_storage; + use tempfile::TempDir; + + // 创建临时目录和测试存储 + let temp_dir = TempDir::new().expect("Failed to create temporary directory"); + let storage = test_storage(temp_dir.path()).await; + + // 创建测试 Bellatrix + let bellatrix = Arc::new(Bellatrix::new(BuildConfig::default())); + + MonoRepo { + storage, + path: PathBuf::from("/test/repo"), + from_hash: "from_hash".to_string(), + to_hash: "to_hash".to_string(), + current_commit: Arc::new(RwLock::new(None)), + cl_link: Arc::new(RwLock::new(None)), + bellatrix, + username: Some("test_user".to_string()), + } + } + + #[tokio::test] + async fn test_traverses_tree_and_update_filepath_with_no_commit() { + let mono_repo = create_test_mono_repo().await; + + // 测试当 current_commit 为 None 时的情况 + let result = mono_repo.traverses_tree_and_update_filepath().await; + + // 应该优雅地处理这种情况,记录日志并跳过更新 + assert!( + result.is_ok(), + "Should handle None current_commit gracefully" + ); + } + + #[tokio::test] + async fn test_traverses_and_update_filepath_with_files() { + let mono_repo = create_test_mono_repo().await; + + // 创建测试树结构 + let blob_sha1 = SHA1::from_str("1234567890abcdef1234567890abcdef12345678").unwrap(); + let tree_items = vec![TreeItem { + mode: TreeItemMode::Blob, + name: "test_file.txt".to_string(), + id: blob_sha1, + }]; + + let tree = Tree { + id: SHA1::from_str("abcdef1234567890abcdef1234567890abcdef12").unwrap(), + tree_items, + }; + + let path = PathBuf::from("src"); + + // 测试遍历和更新文件路径 + let result = mono_repo.traverses_and_update_filepath(tree, path).await; + + // 注意:这个测试需要配置正确的数据库环境才能真正验证 + // 在实际测试环境中,应该验证数据库中的 file_path 字段是否正确更新 + println!("Test result: {:?}", result); + } + + // 验证 UTF-8 路径处理 + #[test] + fn test_utf8_path_handling() { + let path = PathBuf::from("src/测试文件.txt"); + let path_str = path.to_str(); + + assert!(path_str.is_some(), "Should handle UTF-8 paths correctly"); + assert_eq!(path_str.unwrap(), "src/测试文件.txt"); + } } diff --git a/ceres/src/protocol/smart.rs b/ceres/src/protocol/smart.rs index ce208c81d..676f6c1bb 100644 --- a/ceres/src/protocol/smart.rs +++ b/ceres/src/protocol/smart.rs @@ -220,7 +220,10 @@ impl SmartProtocol { .unpack_stream(&self.storage.config().pack, data_stream) .await?; - let unpack_result = repo_handler.clone().receiver_handler(receiver).await; + let unpack_result = repo_handler + .clone() + .receiver_handler(receiver.0, receiver.1) + .await; // write "unpack ok\n to report" add_pkt_line_string(&mut report_status, "unpack ok\n".to_owned()); diff --git a/jupiter/callisto/src/git_blob.rs b/jupiter/callisto/src/git_blob.rs index a2d1a4e4b..9524c473e 100644 --- a/jupiter/callisto/src/git_blob.rs +++ b/jupiter/callisto/src/git_blob.rs @@ -12,6 +12,10 @@ pub struct Model { pub blob_id: String, pub name: Option, pub size: i32, + pub pack_id: String, + pub file_path: String, + pub pack_offset: i64, + pub is_delta_in_pack: bool, pub created_at: DateTime, } diff --git a/jupiter/callisto/src/git_commit.rs b/jupiter/callisto/src/git_commit.rs index cd3e7186e..fedaeb5ce 100644 --- a/jupiter/callisto/src/git_commit.rs +++ b/jupiter/callisto/src/git_commit.rs @@ -18,6 +18,8 @@ pub struct Model { pub committer: Option, #[sea_orm(column_type = "Text", nullable)] pub content: Option, + pub pack_id: String, + pub pack_offset: i64, pub created_at: DateTime, } diff --git a/jupiter/callisto/src/git_tag.rs b/jupiter/callisto/src/git_tag.rs index a401a777b..91615113d 100644 --- a/jupiter/callisto/src/git_tag.rs +++ b/jupiter/callisto/src/git_tag.rs @@ -19,6 +19,8 @@ pub struct Model { pub tagger: String, #[sea_orm(column_type = "Text")] pub message: String, + pub pack_id: String, + pub pack_offset: i64, pub created_at: DateTime, } diff --git a/jupiter/callisto/src/git_tree.rs b/jupiter/callisto/src/git_tree.rs index 8ca2a176b..8192249d2 100644 --- a/jupiter/callisto/src/git_tree.rs +++ b/jupiter/callisto/src/git_tree.rs @@ -13,6 +13,8 @@ pub struct Model { #[sea_orm(column_type = "VarBinary(StringLen::None)")] pub sub_trees: Vec, pub size: i32, + pub pack_id: String, + pub pack_offset: i64, pub created_at: DateTime, } diff --git a/jupiter/callisto/src/mega_blob.rs b/jupiter/callisto/src/mega_blob.rs index c647e1fd3..637412baa 100644 --- a/jupiter/callisto/src/mega_blob.rs +++ b/jupiter/callisto/src/mega_blob.rs @@ -14,6 +14,10 @@ pub struct Model { #[sea_orm(column_type = "Text")] pub name: String, pub size: i32, + pub pack_id: String, + pub file_path: String, + pub pack_offset: i64, + pub is_delta_in_pack: bool, pub created_at: DateTime, } diff --git a/jupiter/callisto/src/mega_commit.rs b/jupiter/callisto/src/mega_commit.rs index 95333e3a7..4c2b745cc 100644 --- a/jupiter/callisto/src/mega_commit.rs +++ b/jupiter/callisto/src/mega_commit.rs @@ -18,6 +18,8 @@ pub struct Model { pub committer: Option, #[sea_orm(column_type = "Text", nullable)] pub content: Option, + pub pack_id: String, + pub pack_offset: i64, pub created_at: DateTime, } diff --git a/jupiter/callisto/src/mega_tag.rs b/jupiter/callisto/src/mega_tag.rs index 32d337c45..94895ebb3 100644 --- a/jupiter/callisto/src/mega_tag.rs +++ b/jupiter/callisto/src/mega_tag.rs @@ -18,6 +18,8 @@ pub struct Model { pub tagger: String, #[sea_orm(column_type = "Text")] pub message: String, + pub pack_id: String, + pub pack_offset: i64, pub created_at: DateTime, } diff --git a/jupiter/callisto/src/mega_tree.rs b/jupiter/callisto/src/mega_tree.rs index 97b61953c..d24b9b4b3 100644 --- a/jupiter/callisto/src/mega_tree.rs +++ b/jupiter/callisto/src/mega_tree.rs @@ -14,6 +14,8 @@ pub struct Model { pub sub_trees: Vec, pub size: i32, pub commit_id: String, + pub pack_id: String, + pub pack_offset: i64, pub created_at: DateTime, } diff --git a/jupiter/src/migration/m20251027_062734_add_metadata_to_object.rs b/jupiter/src/migration/m20251027_062734_add_metadata_to_object.rs new file mode 100644 index 000000000..e10a8b965 --- /dev/null +++ b/jupiter/src/migration/m20251027_062734_add_metadata_to_object.rs @@ -0,0 +1,512 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Add columns to MegaBlob table one by one for SQLite compatibility + manager + .alter_table( + Table::alter() + .table(MegaBlob::Table) + .add_column( + ColumnDef::new(MegaBlob::PackId) + .string() + .not_null() + .default(""), + ) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(MegaBlob::Table) + .add_column( + ColumnDef::new(MegaBlob::FilePath) + .string() + .not_null() + .default(""), + ) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(MegaBlob::Table) + .add_column( + ColumnDef::new(MegaBlob::PackOffset) + .big_integer() + .default(0), + ) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(MegaBlob::Table) + .add_column( + ColumnDef::new(MegaBlob::IsDeltaInPack) + .boolean() + .not_null() + .default(false), + ) + .to_owned(), + ) + .await?; + + // Add columns to GitBlob table one by one for SQLite compatibility + manager + .alter_table( + Table::alter() + .table(GitBlob::Table) + .add_column( + ColumnDef::new(GitBlob::PackId) + .string() + .not_null() + .default(""), + ) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(GitBlob::Table) + .add_column( + ColumnDef::new(GitBlob::FilePath) + .string() + .not_null() + .default(""), + ) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(GitBlob::Table) + .add_column(ColumnDef::new(GitBlob::PackOffset).big_integer().default(0)) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(GitBlob::Table) + .add_column( + ColumnDef::new(GitBlob::IsDeltaInPack) + .boolean() + .not_null() + .default(false), + ) + .to_owned(), + ) + .await?; + + // Add columns to MegaCommit table one by one for SQLite compatibility + manager + .alter_table( + Table::alter() + .table(MegaCommit::Table) + .add_column( + ColumnDef::new(MegaCommit::PackId) + .string() + .not_null() + .default(""), + ) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(MegaCommit::Table) + .add_column( + ColumnDef::new(MegaCommit::PackOffset) + .big_integer() + .default(0), + ) + .to_owned(), + ) + .await?; + + // Add columns to GitCommit table one by one for SQLite compatibility + manager + .alter_table( + Table::alter() + .table(GitCommit::Table) + .add_column( + ColumnDef::new(GitCommit::PackId) + .string() + .not_null() + .default(""), + ) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(GitCommit::Table) + .add_column( + ColumnDef::new(GitCommit::PackOffset) + .big_integer() + .default(0), + ) + .to_owned(), + ) + .await?; + + // Add columns to MegaTag table one by one for SQLite compatibility + manager + .alter_table( + Table::alter() + .table(MegaTag::Table) + .add_column( + ColumnDef::new(MegaTag::PackId) + .string() + .not_null() + .default(""), + ) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(MegaTag::Table) + .add_column(ColumnDef::new(MegaTag::PackOffset).big_integer().default(0)) + .to_owned(), + ) + .await?; + + // Add columns to GitTag table one by one for SQLite compatibility + manager + .alter_table( + Table::alter() + .table(GitTag::Table) + .add_column( + ColumnDef::new(GitTag::PackId) + .string() + .not_null() + .default(""), + ) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(GitTag::Table) + .add_column(ColumnDef::new(GitTag::PackOffset).big_integer().default(0)) + .to_owned(), + ) + .await?; + + // Add columns to MegaTree table one by one for SQLite compatibility + manager + .alter_table( + Table::alter() + .table(MegaTree::Table) + .add_column( + ColumnDef::new(MegaTree::PackId) + .string() + .not_null() + .default(""), + ) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(MegaTree::Table) + .add_column( + ColumnDef::new(MegaTree::PackOffset) + .big_integer() + .default(0), + ) + .to_owned(), + ) + .await?; + + // Add columns to GitTree table one by one for SQLite compatibility + manager + .alter_table( + Table::alter() + .table(GitTree::Table) + .add_column( + ColumnDef::new(GitTree::PackId) + .string() + .not_null() + .default(""), + ) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(GitTree::Table) + .add_column(ColumnDef::new(GitTree::PackOffset).big_integer().default(0)) + .to_owned(), + ) + .await?; + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Drop columns from MegaBlob table one by one for SQLite compatibility + manager + .alter_table( + Table::alter() + .table(MegaBlob::Table) + .drop_column(MegaBlob::IsDeltaInPack) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(MegaBlob::Table) + .drop_column(MegaBlob::PackOffset) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(MegaBlob::Table) + .drop_column(MegaBlob::FilePath) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(MegaBlob::Table) + .drop_column(MegaBlob::PackId) + .to_owned(), + ) + .await?; + + // Drop columns from MegaCommit table one by one for SQLite compatibility + manager + .alter_table( + Table::alter() + .table(MegaCommit::Table) + .drop_column(MegaCommit::PackOffset) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(MegaCommit::Table) + .drop_column(MegaCommit::PackId) + .to_owned(), + ) + .await?; + + // Drop columns from MegaTag table one by one for SQLite compatibility + manager + .alter_table( + Table::alter() + .table(MegaTag::Table) + .drop_column(MegaTag::PackOffset) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(MegaTag::Table) + .drop_column(MegaTag::PackId) + .to_owned(), + ) + .await?; + + // Drop columns from MegaTree table one by one for SQLite compatibility + manager + .alter_table( + Table::alter() + .table(MegaTree::Table) + .drop_column(MegaTree::PackOffset) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(MegaTree::Table) + .drop_column(MegaTree::PackId) + .to_owned(), + ) + .await?; + + // Drop columns from GitBlob table one by one for SQLite compatibility + manager + .alter_table( + Table::alter() + .table(GitBlob::Table) + .drop_column(GitBlob::IsDeltaInPack) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(GitBlob::Table) + .drop_column(GitBlob::PackOffset) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(GitBlob::Table) + .drop_column(GitBlob::FilePath) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(GitBlob::Table) + .drop_column(GitBlob::PackId) + .to_owned(), + ) + .await?; + + // Drop columns from GitCommit table one by one for SQLite compatibility + manager + .alter_table( + Table::alter() + .table(GitCommit::Table) + .drop_column(GitCommit::PackOffset) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(GitCommit::Table) + .drop_column(GitCommit::PackId) + .to_owned(), + ) + .await?; + + // Drop columns from GitTag table one by one for SQLite compatibility + manager + .alter_table( + Table::alter() + .table(GitTag::Table) + .drop_column(GitTag::PackOffset) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(GitTag::Table) + .drop_column(GitTag::PackId) + .to_owned(), + ) + .await?; + + // Drop columns from GitTree table one by one for SQLite compatibility + manager + .alter_table( + Table::alter() + .table(GitTree::Table) + .drop_column(GitTree::PackOffset) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(GitTree::Table) + .drop_column(GitTree::PackId) + .to_owned(), + ) + .await?; + + Ok(()) + } +} + +#[derive(Iden)] +enum MegaBlob { + Table, + PackId, + FilePath, + PackOffset, + IsDeltaInPack, +} +#[derive(Iden)] +enum GitBlob { + Table, + PackId, + FilePath, + PackOffset, + IsDeltaInPack, +} + +#[derive(Iden)] +enum MegaCommit { + Table, + PackId, + PackOffset, +} + +#[derive(Iden)] +enum GitCommit { + Table, + PackId, + PackOffset, +} + +#[derive(Iden)] +enum MegaTag { + Table, + PackId, + PackOffset, +} +#[derive(Iden)] +enum GitTag { + Table, + PackId, + PackOffset, +} + +#[derive(Iden)] +enum MegaTree { + Table, + PackId, + PackOffset, +} + +#[derive(Iden)] +enum GitTree { + Table, + PackId, + PackOffset, +} diff --git a/jupiter/src/migration/mod.rs b/jupiter/src/migration/mod.rs index 20626324d..a6f5bcdcb 100644 --- a/jupiter/src/migration/mod.rs +++ b/jupiter/src/migration/mod.rs @@ -63,7 +63,11 @@ mod m20250930_024736_mr_to_cl; mod m20251011_091944_tasks_mr_id_to_cl_id; mod m20251012_071700_mr_to_cl_batch; mod m20251021_073817_rename_mr_sync_to_cl_sync; + mod m20251026_065433_drop_user_table; + +mod m20251027_062734_add_metadata_to_object; + /// Creates a primary key column definition with big integer type. /// /// # Arguments @@ -115,6 +119,7 @@ impl MigratorTrait for Migrator { Box::new(m20251012_071700_mr_to_cl_batch::Migration), Box::new(m20251021_073817_rename_mr_sync_to_cl_sync::Migration), Box::new(m20251026_065433_drop_user_table::Migration), + Box::new(m20251027_062734_add_metadata_to_object::Migration), ] } } diff --git a/jupiter/src/service/blame_service.rs b/jupiter/src/service/blame_service.rs index 7a90e1537..7a3fcd577 100644 --- a/jupiter/src/service/blame_service.rs +++ b/jupiter/src/service/blame_service.rs @@ -1163,6 +1163,7 @@ mod tests { use crate::storage::base_storage::StorageConnector; use callisto::mega_refs; use common::utils::MEGA_BRANCH_NAME; + use git_internal::internal::metadata::EntryMeta; use git_internal::internal::object::blob::Blob; use git_internal::internal::object::commit::Commit; use git_internal::internal::object::signature::{Signature, SignatureType}; @@ -1285,7 +1286,7 @@ enable_https = true let save_trees: Vec = vec![tree1, tree2, tree3.clone()] .into_iter() .map(|tree| { - let mut tree_model: mega_tree::Model = tree.into_mega_model(); + let mut tree_model: mega_tree::Model = tree.into_mega_model(EntryMeta::new()); tree_model.commit_id = "test".to_string(); tree_model.into() }) @@ -1504,7 +1505,7 @@ enable_https = true let save_trees: Vec = vec![tree.clone()] .into_iter() .map(|tree| { - let mut tree_model: mega_tree::Model = tree.into_mega_model(); + let mut tree_model: mega_tree::Model = tree.into_mega_model(EntryMeta::new()); tree_model.commit_id = "test".to_string(); tree_model.into() }) @@ -1660,7 +1661,7 @@ enable_https = true let save_trees: Vec = vec![tree.clone()] .into_iter() .map(|tree| { - let mut tree_model: mega_tree::Model = tree.into_mega_model(); + let mut tree_model: mega_tree::Model = tree.into_mega_model(EntryMeta::new()); tree_model.commit_id = "test".to_string(); tree_model.into() }) diff --git a/jupiter/src/storage/git_db_storage.rs b/jupiter/src/storage/git_db_storage.rs index 219cd21ee..c48a12b5e 100644 --- a/jupiter/src/storage/git_db_storage.rs +++ b/jupiter/src/storage/git_db_storage.rs @@ -4,10 +4,11 @@ use std::sync::Arc; use callisto::sea_orm_active_enums::RefTypeEnum; use common::utils::generate_id; use futures::{Stream, StreamExt, stream}; +use git_internal::internal::metadata::{EntryMeta, MetaAttached}; use sea_orm::sea_query::Expr; use sea_orm::{ - ActiveModelTrait, ColumnTrait, DbBackend, DbErr, EntityTrait, IntoActiveModel, QueryFilter, - QueryTrait, Set, + ActiveModelTrait, ColumnTrait, DatabaseTransaction, DbBackend, DbErr, EntityTrait, + IntoActiveModel, QueryFilter, QueryTrait, Set, TransactionTrait, }; use sea_orm::{PaginatorTrait, QueryOrder}; use tokio::sync::Mutex; @@ -146,7 +147,11 @@ impl GitDbStorage { Ok(result > 0) } - pub async fn save_entry(&self, repo_id: i64, entry_list: Vec) -> Result<(), MegaError> { + pub async fn save_entry( + &self, + repo_id: i64, + entry_list: Vec>, + ) -> Result<(), MegaError> { let git_objects = Arc::new(Mutex::new(GitObjects { commits: Vec::new(), trees: Vec::new(), @@ -160,8 +165,8 @@ impl GitDbStorage { let git_objects = git_objects.clone(); async move { - let raw_obj = process_entry(entry); - let model = raw_obj.convert_to_git_model(); + let raw_obj = process_entry(entry.inner); + let model = raw_obj.convert_to_git_model(entry.meta); let mut git_objects = git_objects.lock().await; match model { @@ -198,6 +203,80 @@ impl GitDbStorage { Ok(()) } + pub async fn update_pack_id(&self, temp_pack_id: &str, pack_id: &str) -> Result<(), MegaError> { + let conn = self.get_connection(); + + // + let txn: DatabaseTransaction = conn.begin().await?; + + // + let tables = [ + ( + "git_blob", + git_blob::Entity::update_many() + .col_expr(git_blob::Column::PackId, Expr::value(pack_id)) + .filter(git_blob::Column::PackId.eq(temp_pack_id)) + .exec(&txn) + .await?, + ), + ( + "git_tree", + git_tree::Entity::update_many() + .col_expr(git_tree::Column::PackId, Expr::value(pack_id)) + .filter(git_tree::Column::PackId.eq(temp_pack_id)) + .exec(&txn) + .await?, + ), + ( + "git_tag", + git_tag::Entity::update_many() + .col_expr(git_tag::Column::PackId, Expr::value(pack_id)) + .filter(git_tag::Column::PackId.eq(temp_pack_id)) + .exec(&txn) + .await?, + ), + ( + "git_commit", + git_commit::Entity::update_many() + .col_expr(git_commit::Column::PackId, Expr::value(pack_id)) + .filter(git_commit::Column::PackId.eq(temp_pack_id)) + .exec(&txn) + .await?, + ), + ]; + + // + for (name, res) in tables { + if res.rows_affected > 0 { + tracing::info!(" git object Updated {} rows in {}", res.rows_affected, name); + } + } + + // + txn.commit().await?; + Ok(()) + } + + pub async fn update_git_blob_filepath( + &self, + blob_id: &String, + file_path: &str, + ) -> Result<(), MegaError> { + if let Some(model) = git_blob::Entity::find() + .filter(git_blob::Column::BlobId.eq(blob_id)) + .one(self.get_connection()) + .await? + { + let mut active: git_blob::ActiveModel = model.into(); + + active.file_path = Set(file_path.to_string()); + + active.update(self.get_connection()).await?; + } + + Ok(()) + } + /// Finds a Git repository with an exact match on the repository path. /// /// # Arguments diff --git a/jupiter/src/storage/mono_storage.rs b/jupiter/src/storage/mono_storage.rs index 39df75540..4f48f6160 100644 --- a/jupiter/src/storage/mono_storage.rs +++ b/jupiter/src/storage/mono_storage.rs @@ -4,26 +4,28 @@ use std::sync::{Arc, Mutex}; use futures::stream::FuturesUnordered; use futures::{StreamExt, stream}; + use sea_orm::ActiveValue::Set; use sea_orm::{ - ActiveModelTrait, ColumnTrait, Condition, EntityTrait, IntoActiveModel, PaginatorTrait, - QueryFilter, QueryOrder, QuerySelect, + ActiveModelTrait, ColumnTrait, Condition, DatabaseTransaction, EntityTrait, IntoActiveModel, + PaginatorTrait, QueryFilter, QueryOrder, QuerySelect, TransactionTrait, }; +use crate::storage::base_storage::{BaseStorage, StorageConnector}; +use crate::storage::commit_binding_storage::CommitBindingStorage; +use crate::storage::user_storage::UserStorage; +use crate::utils::converter::MegaModelConverter; +use crate::utils::converter::{IntoMegaModel, MegaObjectModel, ToRawBlob, process_entry}; use callisto::{mega_blob, mega_commit, mega_refs, mega_tag, mega_tree, raw_blob}; use common::config::MonoConfig; use common::errors::MegaError; use common::model::Pagination; use common::utils::MEGA_BRANCH_NAME; +use git_internal::internal::metadata::{EntryMeta, MetaAttached}; use git_internal::internal::object::ObjectTrait; use git_internal::internal::object::blob::Blob; use git_internal::internal::{object::commit::Commit, pack::entry::Entry}; - -use crate::storage::base_storage::{BaseStorage, StorageConnector}; -use crate::storage::commit_binding_storage::CommitBindingStorage; -use crate::storage::user_storage::UserStorage; -use crate::utils::converter::MegaModelConverter; -use crate::utils::converter::{IntoMegaModel, MegaObjectModel, ToRawBlob, process_entry}; +use sea_orm::sea_query::Expr; #[derive(Clone)] pub struct MonoStorage { @@ -221,7 +223,7 @@ impl MonoStorage { pub async fn save_entry( &self, commit_id: &str, - entry_list: Vec, + entry_list: Vec>, authenticated_username: Option, ) -> Result<(), MegaError> { let git_objects = Arc::new(Mutex::new(GitObjects { @@ -240,10 +242,10 @@ impl MonoStorage { let git_objects = git_objects.clone(); let commits_to_process = commits_to_process.clone(); async move { - let entry_data = entry.data.clone(); - let entry_hash = entry.hash; - let raw_obj = process_entry(entry); - let model = raw_obj.convert_to_mega_model(); + let entry_data = entry.inner.data.clone(); + let entry_hash = entry.inner.hash; + let raw_obj = process_entry(entry.inner); + let model = raw_obj.convert_to_mega_model(entry.meta); let mut git_objects = git_objects.lock().unwrap(); match model { MegaObjectModel::Commit(commit) => { @@ -302,6 +304,80 @@ impl MonoStorage { Ok(()) } + pub async fn update_blob_filepath( + &self, + blob_id: &str, + file_path: &str, + ) -> Result<(), MegaError> { + if let Some(model) = mega_blob::Entity::find() + .filter(mega_blob::Column::BlobId.eq(blob_id)) + .one(self.get_connection()) + .await? + { + let mut active: mega_blob::ActiveModel = model.into(); + + active.file_path = Set(file_path.to_string()); + + active.update(self.get_connection()).await?; + } + + Ok(()) + } + + pub async fn update_pack_id(&self, temp_pack_id: &str, pack_id: &str) -> Result<(), MegaError> { + let conn = self.get_connection(); + + // + let txn: DatabaseTransaction = conn.begin().await?; + + // + let tables = [ + ( + "mega_blob", + mega_blob::Entity::update_many() + .col_expr(mega_blob::Column::PackId, Expr::value(pack_id)) + .filter(mega_blob::Column::PackId.eq(temp_pack_id)) + .exec(&txn) + .await?, + ), + ( + "mega_tree", + mega_tree::Entity::update_many() + .col_expr(mega_tree::Column::PackId, Expr::value(pack_id)) + .filter(mega_tree::Column::PackId.eq(temp_pack_id)) + .exec(&txn) + .await?, + ), + ( + "mega_tag", + mega_tag::Entity::update_many() + .col_expr(mega_tag::Column::PackId, Expr::value(pack_id)) + .filter(mega_tag::Column::PackId.eq(temp_pack_id)) + .exec(&txn) + .await?, + ), + ( + "mega_commit", + mega_commit::Entity::update_many() + .col_expr(mega_commit::Column::PackId, Expr::value(pack_id)) + .filter(mega_commit::Column::PackId.eq(temp_pack_id)) + .exec(&txn) + .await?, + ), + ]; + + // + for (name, res) in tables { + if res.rows_affected > 0 { + tracing::info!("mega object Updated {} rows in {}", res.rows_affected, name); + } + } + + // + txn.commit().await?; + Ok(()) + } + /// Process commit author bindings async fn process_commit_bindings( &self, @@ -351,7 +427,7 @@ impl MonoStorage { return; } let converter = MegaModelConverter::init(mono_config); - let commit: mega_commit::Model = converter.commit.into_mega_model(); + let commit: mega_commit::Model = converter.commit.into_mega_model(EntryMeta::default()); mega_commit::Entity::insert(commit.into_active_model()) .exec(self.get_connection()) .await @@ -372,7 +448,7 @@ impl MonoStorage { pub async fn save_mega_commits(&self, commits: Vec) -> Result<(), MegaError> { let save_models: Vec = commits .into_iter() - .map(|c| c.into_mega_model()) + .map(|c| c.into_mega_model(EntryMeta::default())) .map(|m| m.into_active_model()) .collect(); self.batch_save_model(save_models).await.unwrap(); @@ -386,7 +462,7 @@ impl MonoStorage { ) -> Result<(), MegaError> { let mega_blobs: Vec = blobs .iter() - .map(|b| (*b).clone().into_mega_model()) + .map(|b| (*b).clone().into_mega_model(EntryMeta::default())) .map(|mut m: mega_blob::Model| { m.commit_id = commit_id.to_owned(); m.into_active_model() diff --git a/jupiter/src/utils/converter.rs b/jupiter/src/utils/converter.rs index 2ca1c7842..3d807c31b 100644 --- a/jupiter/src/utils/converter.rs +++ b/jupiter/src/utils/converter.rs @@ -12,6 +12,7 @@ use callisto::{ sea_orm_active_enums::StorageTypeEnum, }; +use git_internal::internal::metadata::EntryMeta; use git_internal::internal::object::tree::{TreeItem, TreeItemMode}; use git_internal::internal::pack::entry::Entry; // git_internal types @@ -55,12 +56,12 @@ fn commit_from_model( pub trait IntoMegaModel { type MegaTarget; - fn into_mega_model(self) -> Self::MegaTarget; + fn into_mega_model(self, ext_meta: EntryMeta) -> Self::MegaTarget; } pub trait IntoGitModel { type GitTarget; - fn into_git_model(self) -> Self::GitTarget; + fn into_git_model(self, ext_meta: EntryMeta) -> Self::GitTarget; } pub trait FromMegaModel { @@ -97,25 +98,25 @@ pub enum MegaObjectModel { } impl GitObject { - pub fn convert_to_mega_model(self) -> MegaObjectModel { + pub fn convert_to_mega_model(self, meta: EntryMeta) -> MegaObjectModel { match self { - GitObject::Commit(commit) => MegaObjectModel::Commit(commit.into_mega_model()), - GitObject::Tree(tree) => MegaObjectModel::Tree(tree.into_mega_model()), + GitObject::Commit(commit) => MegaObjectModel::Commit(commit.into_mega_model(meta)), + GitObject::Tree(tree) => MegaObjectModel::Tree(tree.into_mega_model(meta)), GitObject::Blob(blob) => { - MegaObjectModel::Blob(blob.clone().into_mega_model(), blob.to_raw_blob()) + MegaObjectModel::Blob(blob.clone().into_mega_model(meta), blob.to_raw_blob()) } - GitObject::Tag(tag) => MegaObjectModel::Tag(tag.into_mega_model()), + GitObject::Tag(tag) => MegaObjectModel::Tag(tag.into_mega_model(meta)), } } - pub fn convert_to_git_model(self) -> GitObjectModel { + pub fn convert_to_git_model(self, meta: EntryMeta) -> GitObjectModel { match self { - GitObject::Commit(commit) => GitObjectModel::Commit(commit.into_git_model()), - GitObject::Tree(tree) => GitObjectModel::Tree(tree.into_git_model()), + GitObject::Commit(commit) => GitObjectModel::Commit(commit.into_git_model(meta)), + GitObject::Tree(tree) => GitObjectModel::Tree(tree.into_git_model(meta)), GitObject::Blob(blob) => { - GitObjectModel::Blob(blob.clone().into_git_model(), blob.to_raw_blob()) + GitObjectModel::Blob(blob.clone().into_git_model(meta), blob.to_raw_blob()) } - GitObject::Tag(tag) => GitObjectModel::Tag(tag.into_git_model()), + GitObject::Tag(tag) => GitObjectModel::Tag(tag.into_git_model(meta)), } } } @@ -144,13 +145,17 @@ impl IntoMegaModel for Blob { /// # Returns /// /// A new mega_blob::Model instance populated with data from the blob - fn into_mega_model(self) -> Self::MegaTarget { + fn into_mega_model(self, meta: EntryMeta) -> Self::MegaTarget { mega_blob::Model { id: generate_id(), blob_id: self.id.to_string(), size: 0, commit_id: String::new(), name: String::new(), + pack_id: meta.pack_id.unwrap_or_default(), + file_path: meta.file_path.unwrap_or_default(), + pack_offset: meta.pack_offset.unwrap_or(0) as i64, + is_delta_in_pack: meta.is_delta.unwrap_or(false), created_at: chrono::Utc::now().naive_utc(), } } @@ -172,7 +177,7 @@ impl IntoMegaModel for Commit { /// # Panics /// /// This function will panic if author or committer signature data cannot be converted to bytes - fn into_mega_model(self) -> Self::MegaTarget { + fn into_mega_model(self, meta: EntryMeta) -> Self::MegaTarget { mega_commit::Model { id: generate_id(), commit_id: self.id.to_string(), @@ -187,6 +192,8 @@ impl IntoMegaModel for Commit { String::from_utf8_lossy(&self.committer.to_data().unwrap()).to_string(), ), content: Some(self.message.clone()), + pack_id: meta.pack_id.unwrap_or_default(), + pack_offset: meta.pack_offset.unwrap_or(0) as i64, created_at: chrono::Utc::now().naive_utc(), } } @@ -208,7 +215,7 @@ impl IntoMegaModel for Tag { /// # Panics /// /// This function will panic if tagger signature data cannot be converted to bytes - fn into_mega_model(self) -> Self::MegaTarget { + fn into_mega_model(self, meta: EntryMeta) -> Self::MegaTarget { mega_tag::Model { id: generate_id(), tag_id: self.id.to_string(), @@ -217,6 +224,8 @@ impl IntoMegaModel for Tag { tag_name: self.tag_name, tagger: String::from_utf8_lossy(&self.tagger.to_data().unwrap()).to_string(), message: self.message, + pack_id: meta.pack_id.unwrap_or_default(), + pack_offset: meta.pack_offset.unwrap_or(0) as i64, created_at: chrono::Utc::now().naive_utc(), } } @@ -238,13 +247,15 @@ impl IntoMegaModel for Tree { /// # Panics /// /// This function will panic if the tree's data cannot be serialized - fn into_mega_model(self) -> Self::MegaTarget { + fn into_mega_model(self, meta: EntryMeta) -> Self::MegaTarget { mega_tree::Model { id: generate_id(), tree_id: self.id.to_string(), sub_trees: self.to_data().unwrap(), size: 0, commit_id: String::new(), + pack_id: meta.pack_id.unwrap_or_default(), + pack_offset: meta.pack_offset.unwrap_or(0) as i64, created_at: chrono::Utc::now().naive_utc(), } } @@ -291,13 +302,17 @@ impl IntoGitModel for Blob { /// # Returns /// /// A new git_blob::Model instance populated with data from the blob - fn into_git_model(self) -> Self::GitTarget { + fn into_git_model(self, meta: EntryMeta) -> Self::GitTarget { git_blob::Model { id: generate_id(), repo_id: 0, blob_id: self.id.to_string(), size: 0, name: None, + pack_id: meta.pack_id.unwrap_or_default(), + pack_offset: meta.pack_offset.unwrap_or(0) as i64, + file_path: meta.file_path.unwrap_or_default(), + is_delta_in_pack: meta.is_delta.unwrap_or(false), created_at: chrono::Utc::now().naive_utc(), } } @@ -320,7 +335,7 @@ impl IntoGitModel for Commit { /// # Panics /// /// This function will panic if author or committer signature data cannot be converted to bytes - fn into_git_model(self) -> Self::GitTarget { + fn into_git_model(self, meta: EntryMeta) -> Self::GitTarget { git_commit::Model { id: generate_id(), repo_id: 0, @@ -336,6 +351,8 @@ impl IntoGitModel for Commit { String::from_utf8_lossy(&self.committer.to_data().unwrap()).to_string(), ), content: Some(self.message.clone()), + pack_id: meta.pack_id.unwrap_or_default(), + pack_offset: meta.pack_offset.unwrap_or(0) as i64, created_at: chrono::Utc::now().naive_utc(), } } @@ -358,7 +375,7 @@ impl IntoGitModel for Tag { /// # Panics /// /// This function will panic if tagger signature data cannot be converted to bytes - fn into_git_model(self) -> Self::GitTarget { + fn into_git_model(self, meta: EntryMeta) -> Self::GitTarget { git_tag::Model { id: generate_id(), repo_id: 0, @@ -368,6 +385,8 @@ impl IntoGitModel for Tag { tag_name: self.tag_name, tagger: String::from_utf8_lossy(&self.tagger.to_data().unwrap()).to_string(), message: self.message, + pack_id: meta.pack_id.unwrap_or_default(), + pack_offset: meta.pack_offset.unwrap_or(0) as i64, created_at: chrono::Utc::now().naive_utc(), } } @@ -390,13 +409,15 @@ impl IntoGitModel for Tree { /// # Panics /// /// This function will panic if the tree's data cannot be serialized - fn into_git_model(self) -> Self::GitTarget { + fn into_git_model(self, meta: EntryMeta) -> Self::GitTarget { git_tree::Model { id: generate_id(), repo_id: 0, tree_id: self.id.to_string(), sub_trees: self.to_data().unwrap(), size: 0, + pack_id: meta.pack_id.unwrap_or_default(), + pack_offset: meta.pack_offset.unwrap_or(0) as i64, created_at: chrono::Utc::now().naive_utc(), } } @@ -461,7 +482,7 @@ pub struct MegaModelConverter { impl MegaModelConverter { fn traverse_from_root(&self) { let root_tree = &self.root_tree; - let mut mega_tree: mega_tree::Model = root_tree.clone().into_mega_model(); + let mut mega_tree: mega_tree::Model = root_tree.clone().into_mega_model(EntryMeta::new()); mega_tree.commit_id = self.commit.id.to_string(); self.mega_trees .borrow_mut() @@ -473,7 +494,8 @@ impl MegaModelConverter { for item in &tree.tree_items { if item.mode == TreeItemMode::Tree { let child_tree = self.tree_maps.get(&item.id).unwrap(); - let mut mega_tree: mega_tree::Model = child_tree.clone().into_mega_model(); + let mut mega_tree: mega_tree::Model = + child_tree.clone().into_mega_model(EntryMeta::new()); mega_tree.commit_id = self.commit.id.to_string(); self.mega_trees .borrow_mut() @@ -481,7 +503,8 @@ impl MegaModelConverter { self.traverse_for_update(child_tree); } else { let blob = self.blob_maps.get(&item.id).unwrap(); - let mut mega_blob: mega_blob::Model = blob.clone().into_mega_model(); + let mut mega_blob: mega_blob::Model = + blob.clone().into_mega_model(EntryMeta::new()); mega_blob.commit_id = self.commit.id.to_string(); self.mega_blobs .borrow_mut()