Skip to content

Commit b946abc

Browse files
authored
feat(mono): refactor directory update sync with refs(#1569) (#1587)
Signed-off-by: MYUU <[email protected]>
1 parent 6d6d78a commit b946abc

File tree

3 files changed

+175
-93
lines changed

3 files changed

+175
-93
lines changed

ceres/src/api_service/mono_api_service.rs

Lines changed: 93 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,13 @@ use crate::model::third_party::{ThirdPartyClient, ThirdPartyRepoTrait};
4949
use crate::protocol::{SmartProtocol, TransportProtocol};
5050
use async_trait::async_trait;
5151
use bytes::Bytes;
52+
use common::utils::MEGA_BRANCH_NAME;
5253
use git_internal::errors::GitError;
5354
use git_internal::hash::SHA1;
5455
use git_internal::internal::object::blob::Blob;
5556
use git_internal::internal::object::commit::Commit;
5657
use git_internal::internal::object::tree::{Tree, TreeItem, TreeItemMode};
58+
use jupiter::storage::mono_storage::RefUpdateData;
5759
use neptune::model::diff_model::DiffItem;
5860
use neptune::neptune_engine::Diff;
5961
use regex::Regex;
@@ -79,9 +81,9 @@ pub struct TreeUpdateResult {
7981
pub ref_updates: Vec<RefUpdate>,
8082
}
8183

82-
pub enum RefUpdate {
83-
Update { path: String, tree_id: SHA1 },
84-
Delete { path: String },
84+
pub struct RefUpdate {
85+
path: String,
86+
tree_id: SHA1,
8587
}
8688

8789
#[async_trait]
@@ -123,7 +125,7 @@ impl ApiHandler for MonoApiService {
123125

124126
// Apply and save
125127
let new_commit_id = self
126-
.apply_update_result(&result, &payload.commit_message)
128+
.apply_update_result(&result, &payload.commit_message, None)
127129
.await?;
128130
storage
129131
.save_mega_blobs(vec![&new_blob], &new_commit_id)
@@ -273,7 +275,7 @@ impl ApiHandler for MonoApiService {
273275
target_tree.id,
274276
)?;
275277
let new_commit_id = self
276-
.apply_update_result(&update_result, &entry_info.commit_msg())
278+
.apply_update_result(&update_result, &entry_info.commit_msg(), None)
277279
.await?;
278280

279281
storage.save_mega_blobs(vec![&blob], &new_commit_id).await?;
@@ -377,7 +379,7 @@ impl ApiHandler for MonoApiService {
377379
let update_result =
378380
self.build_result_by_chain(PathBuf::from(prefix), update_chain, target_tree.id)?;
379381
let new_commit_id = self
380-
.apply_update_result(&update_result, &entry_info.commit_msg())
382+
.apply_update_result(&update_result, &entry_info.commit_msg(), None)
381383
.await?;
382384

383385
storage
@@ -1030,9 +1032,12 @@ impl MonoApiService {
10301032
// because only parent tree is needed so we skip current directory
10311033
let update_chain = self.search_tree_for_update(path.parent().unwrap()).await?;
10321034
let result = self.build_result_by_chain(path, update_chain, commit.tree_id)?;
1033-
self.apply_update_result(&result, "cl merge generated commit")
1034-
.await?;
1035-
// remove refs start with path except cl type
1035+
self.apply_update_result(
1036+
&result,
1037+
"cl merge generated commit",
1038+
Some(cl.link.as_str()),
1039+
)
1040+
.await?;
10361041
storage.remove_none_cl_refs(&cl.path).await.unwrap();
10371042
// TODO: self.clean_dangling_commits().await;
10381043
}
@@ -1054,38 +1059,56 @@ impl MonoApiService {
10541059
Ok(())
10551060
}
10561061

1057-
/// Traverse parent trees and update them with the new commit's tree hash.
1058-
/// This function only prepares updated trees and optionally a new parent commit.
1062+
/// Update parent trees along the given update chain with the new child tree hash.
1063+
/// This function prepares all updated trees and their associated ref updates.
1064+
/// Trees that do not depend on each other (e.g., sibling directories) can be updated in parallel.
1065+
/// No new commits are created; only tree objects and ref updates are produced.
10591066
pub fn build_result_by_chain(
10601067
&self,
10611068
mut path: PathBuf,
10621069
mut update_chain: Vec<Arc<Tree>>,
10631070
mut updated_tree_hash: SHA1,
10641071
) -> Result<TreeUpdateResult, GitError> {
1072+
fn clean_path_str(path: &str) -> String {
1073+
let s = path.trim_end_matches('/');
1074+
if s.is_empty() {
1075+
"/".to_string()
1076+
} else {
1077+
s.to_string()
1078+
}
1079+
}
1080+
10651081
let mut updated_trees = Vec::new();
10661082
let mut ref_updates = Vec::new();
1083+
let mut path_str = path.to_string_lossy().to_string();
1084+
1085+
loop {
1086+
let clean_path = clean_path_str(&path_str);
1087+
1088+
ref_updates.push(RefUpdate {
1089+
path: clean_path,
1090+
tree_id: updated_tree_hash,
1091+
});
1092+
1093+
if update_chain.is_empty() {
1094+
break;
1095+
}
10671096

1068-
while let Some(tree) = update_chain.pop() {
10691097
let cloned_path = path.clone();
10701098
let name = cloned_path
10711099
.file_name()
10721100
.and_then(|n| n.to_str())
10731101
.ok_or_else(|| GitError::CustomError("Invalid path".into()))?;
10741102
path.pop();
1103+
path_str = path.to_string_lossy().to_string();
1104+
1105+
let tree = update_chain
1106+
.pop()
1107+
.ok_or_else(|| GitError::CustomError("Empty update chain".into()))?;
10751108

10761109
let new_tree = self.update_tree_hash(tree, name, updated_tree_hash)?;
10771110
updated_tree_hash = new_tree.id;
10781111
updated_trees.push(new_tree);
1079-
1080-
let path_str = path.to_string_lossy().to_string();
1081-
if path == Path::new("/") {
1082-
ref_updates.push(RefUpdate::Update {
1083-
path: path_str,
1084-
tree_id: updated_tree_hash,
1085-
});
1086-
} else {
1087-
ref_updates.push(RefUpdate::Delete { path: path_str });
1088-
}
10891112
}
10901113

10911114
Ok(TreeUpdateResult {
@@ -1098,52 +1121,62 @@ impl MonoApiService {
10981121
&self,
10991122
result: &TreeUpdateResult,
11001123
commit_msg: &str,
1124+
cl_link: Option<&str>,
11011125
) -> Result<String, GitError> {
11021126
let storage = self.storage.mono_storage();
11031127
let mut new_commit_id = String::new();
1128+
let mut commits: Vec<Commit> = Vec::new();
1129+
1130+
let paths: Vec<&str> = result.ref_updates.iter().map(|r| r.path.as_str()).collect();
1131+
1132+
let cl_refs_formatted = cl_link.map(|cl| format!("refs/cl/{}", cl));
1133+
let cl_refs: Option<Vec<&str>> = cl_refs_formatted
1134+
.as_ref()
1135+
.map(|formatted| vec![formatted.as_str(), MEGA_BRANCH_NAME]);
1136+
1137+
let refs = storage
1138+
.get_refs_for_paths_and_cls(&paths, cl_refs.as_deref())
1139+
.await?;
1140+
1141+
let mut updates: Vec<RefUpdateData> = Vec::new();
11041142

11051143
for update in &result.ref_updates {
1106-
match update {
1107-
RefUpdate::Update { path, tree_id } => {
1108-
// update can only be root path
1109-
if let Some(mut p_ref) = storage
1110-
.get_ref(path)
1111-
.await
1112-
.map_err(|e| GitError::CustomError(e.to_string()))?
1113-
{
1114-
let commit = Commit::from_tree_id(
1115-
*tree_id,
1116-
vec![SHA1::from_str(&p_ref.ref_commit_hash).unwrap()],
1117-
commit_msg,
1118-
);
1119-
new_commit_id = commit.id.to_string();
1120-
p_ref.ref_commit_hash = new_commit_id.clone();
1121-
p_ref.ref_tree_hash = tree_id.to_string();
1122-
storage
1123-
.update_ref(p_ref)
1124-
.await
1125-
.map_err(|e| GitError::CustomError(e.to_string()))?;
1126-
storage
1127-
.save_mega_commits(vec![commit])
1128-
.await
1129-
.map_err(|e| GitError::CustomError(e.to_string()))?;
1130-
}
1131-
}
1132-
RefUpdate::Delete { path } => {
1133-
if let Some(p_ref) = storage
1134-
.get_ref(path)
1135-
.await
1136-
.map_err(|e| GitError::CustomError(e.to_string()))?
1137-
{
1138-
storage
1139-
.remove_ref(p_ref)
1140-
.await
1141-
.map_err(|e| GitError::CustomError(e.to_string()))?;
1142-
}
1144+
if let Some(p_ref) = refs.iter().find(|r| r.path == update.path) {
1145+
let commit = Commit::from_tree_id(
1146+
update.tree_id,
1147+
vec![SHA1::from_str(&p_ref.ref_commit_hash).unwrap()],
1148+
commit_msg,
1149+
);
1150+
let commit_id = commit.id.to_string();
1151+
new_commit_id = commit_id.clone();
1152+
commits.push(commit);
1153+
1154+
let mut push_update = |ref_name: &str| {
1155+
updates.push(RefUpdateData {
1156+
path: p_ref.path.clone(),
1157+
ref_name: ref_name.to_string(),
1158+
commit_id: commit_id.clone(),
1159+
tree_hash: update.tree_id.to_string(),
1160+
});
1161+
};
1162+
1163+
push_update(&p_ref.ref_name);
1164+
if p_ref.ref_name.starts_with("refs/cl/") {
1165+
push_update(MEGA_BRANCH_NAME);
11431166
}
11441167
}
11451168
}
11461169

1170+
storage
1171+
.batch_update_by_path_concurrent(updates)
1172+
.await
1173+
.map_err(|e| GitError::CustomError(e.to_string()))?;
1174+
1175+
storage
1176+
.save_mega_commits(commits)
1177+
.await
1178+
.map_err(|e| GitError::CustomError(e.to_string()))?;
1179+
11471180
let save_trees: Vec<mega_tree::ActiveModel> = result
11481181
.updated_trees
11491182
.clone()
@@ -1154,6 +1187,7 @@ impl MonoApiService {
11541187
tree_model.into()
11551188
})
11561189
.collect();
1190+
11571191
storage
11581192
.batch_save_model(save_trees)
11591193
.await

jupiter/src/storage/mono_storage.rs

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
use std::collections::HashMap;
12
use std::ops::Deref;
23
use std::sync::{Arc, Mutex};
34

5+
use futures::stream::FuturesUnordered;
46
use futures::{StreamExt, stream};
7+
use sea_orm::ActiveValue::Set;
58
use sea_orm::{
6-
ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter,
7-
QueryOrder, QuerySelect,
9+
ActiveModelTrait, ColumnTrait, Condition, EntityTrait, IntoActiveModel, PaginatorTrait,
10+
QueryFilter, QueryOrder, QuerySelect,
811
};
912

1013
use crate::utils::converter::{IntoMegaModel, MegaObjectModel, ToRawBlob, process_entry};
@@ -43,6 +46,14 @@ struct GitObjects {
4346
tags: Vec<mega_tag::ActiveModel>,
4447
}
4548

49+
#[derive(Debug)]
50+
pub struct RefUpdateData {
51+
pub path: String,
52+
pub ref_name: String,
53+
pub commit_id: String,
54+
pub tree_hash: String,
55+
}
56+
4657
impl MonoStorage {
4758
pub fn user_storage(&self) -> UserStorage {
4859
UserStorage {
@@ -82,9 +93,11 @@ impl MonoStorage {
8293
Ok(())
8394
}
8495

96+
/// Removes non-CL refs under the given path, but keeps the ref matching the path itself.
8597
pub async fn remove_none_cl_refs(&self, path: &str) -> Result<(), MegaError> {
8698
mega_refs::Entity::delete_many()
8799
.filter(mega_refs::Column::Path.starts_with(path))
100+
.filter(mega_refs::Column::Path.ne(path))
88101
.filter(mega_refs::Column::IsCl.eq(false))
89102
.exec(self.get_connection())
90103
.await?;
@@ -98,6 +111,25 @@ impl MonoStorage {
98111
Ok(())
99112
}
100113

114+
pub async fn get_refs_for_paths_and_cls(
115+
&self,
116+
paths: &[&str],
117+
cls: Option<&[&str]>,
118+
) -> Result<Vec<mega_refs::Model>, MegaError> {
119+
let mut query = mega_refs::Entity::find()
120+
.filter(mega_refs::Column::Path.is_in(paths.iter().copied()))
121+
.order_by_asc(mega_refs::Column::RefName);
122+
123+
if let Some(cls_values) = cls {
124+
query = query.filter(mega_refs::Column::RefName.is_in(cls_values.iter().copied()));
125+
} else {
126+
query = query.filter(mega_refs::Column::RefName.eq(MEGA_BRANCH_NAME));
127+
}
128+
129+
let result = query.all(self.get_connection()).await?;
130+
Ok(result)
131+
}
132+
101133
pub async fn get_refs(&self, path: &str) -> Result<Vec<mega_refs::Model>, MegaError> {
102134
let result = mega_refs::Entity::find()
103135
.filter(mega_refs::Column::Path.eq(path))
@@ -149,6 +181,53 @@ impl MonoStorage {
149181
ref_data.update(self.get_connection()).await.unwrap();
150182
Ok(())
151183
}
184+
pub async fn batch_update_by_path_concurrent(
185+
&self,
186+
updates: Vec<RefUpdateData>,
187+
) -> Result<(), MegaError> {
188+
let conn = self.get_connection();
189+
let mut condition = Condition::any();
190+
for update in &updates {
191+
condition = condition.add(
192+
Condition::all()
193+
.add(mega_refs::Column::Path.eq(update.path.clone()))
194+
.add(mega_refs::Column::RefName.eq(update.ref_name.clone())),
195+
);
196+
}
197+
198+
let existing_refs: Vec<mega_refs::Model> = mega_refs::Entity::find()
199+
.filter(condition)
200+
.all(conn)
201+
.await?;
202+
203+
let ref_map: HashMap<(String, String), mega_refs::Model> = existing_refs
204+
.into_iter()
205+
.map(|r| ((r.path.clone(), r.ref_name.clone()), r))
206+
.collect();
207+
208+
let mut futures = FuturesUnordered::new();
209+
210+
for update in updates {
211+
if let Some(ref_data) = ref_map.get(&(update.path.clone(), update.ref_name.clone())) {
212+
let conn = conn.clone();
213+
let mut active: mega_refs::ActiveModel = ref_data.clone().into();
214+
215+
futures.push(async move {
216+
active.ref_commit_hash = Set(update.commit_id);
217+
active.ref_tree_hash = Set(update.tree_hash);
218+
active.updated_at = Set(chrono::Utc::now().naive_utc());
219+
active.update(&conn).await?;
220+
Ok::<(), MegaError>(())
221+
});
222+
}
223+
}
224+
225+
while let Some(res) = futures.next().await {
226+
res?;
227+
}
228+
229+
Ok(())
230+
}
152231

153232
pub async fn save_entry(
154233
&self,

0 commit comments

Comments
 (0)