Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 20 additions & 27 deletions ceres/src/api_service/mono_api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use neptune::neptune_engine::Diff;
use regex::Regex;

use callisto::sea_orm_active_enums::ConvTypeEnum;
use callisto::{mega_cl, mega_tag, mega_tree};
use callisto::{mega_cl, mega_refs, mega_tag, mega_tree};
use common::errors::MegaError;
use common::model::{Pagination, TagInfo};
use jupiter::utils::converter::{FromMegaModel, IntoMegaModel};
Expand Down Expand Up @@ -411,7 +411,7 @@ impl ApiHandler for MonoApiService {

async fn get_root_tree(&self) -> Tree {
let storage = self.storage.mono_storage();
let refs = storage.get_ref("/").await.unwrap().unwrap();
let refs = storage.get_main_ref("/").await.unwrap().unwrap();

Tree::from_mega_model(
storage
Expand Down Expand Up @@ -618,7 +618,7 @@ impl ApiHandler for MonoApiService {
// lightweight refs from refs table under path
let repo_path = repo_path.as_deref().unwrap_or("/");
let mut lightweight_refs: Vec<TagInfo> = vec![];
if let Ok(refs) = mono_storage.get_refs(repo_path).await {
if let Ok(refs) = mono_storage.get_all_refs(repo_path, false).await {
for r in refs {
if r.ref_name.starts_with("refs/tags/") {
let tag_name = r.ref_name.trim_start_matches("refs/tags/").to_string();
Expand Down Expand Up @@ -869,16 +869,10 @@ impl MonoApiService {
// try to write ref; if ref write fails, rollback DB insert
let path_str = repo_path.unwrap_or_else(|| "/".to_string());
let tree_hash = common::utils::ZERO_ID.to_string();
if let Err(e) = mono_storage
.save_ref(
&path_str,
Some(full_ref.clone()),
&object_id,
&tree_hash,
false,
)
.await
{
let refs =
mega_refs::Model::new(&path_str, full_ref.clone(), object_id, tree_hash, false);

if let Err(e) = mono_storage.save_refs(refs).await {
// attempt to remove DB record
if let Err(del_e) = mono_storage.delete_tag_by_name(&name).await {
tracing::error!(
Expand Down Expand Up @@ -915,19 +909,18 @@ impl MonoApiService {
let path_str = repo_path.unwrap_or_else(|| "/".to_string());
let object_id = target.clone().unwrap_or_default();
let tree_hash = common::utils::ZERO_ID.to_string();
mono_storage
.save_ref(
&path_str,
Some(full_ref.clone()),
&object_id,
&tree_hash,
false,
)
.await
.map_err(|e| {
tracing::error!("Failed to write lightweight tag ref: {}", e);
GitError::CustomError("[code:500] Failed to write lightweight tag ref".to_string())
})?;

let refs = mega_refs::Model::new(
&path_str,
full_ref.clone(),
object_id.clone(),
tree_hash,
false,
);
mono_storage.save_refs(refs).await.map_err(|e| {
tracing::error!("Failed to write lightweight tag ref: {}", e);
GitError::CustomError("[code:500] Failed to write lightweight tag ref".to_string())
})?;
// Fetch saved ref to use its creation time
let saved_ref = mono_storage
.get_ref_by_name(&full_ref)
Expand Down Expand Up @@ -1016,7 +1009,7 @@ impl MonoApiService {
}
pub async fn merge_cl(&self, username: &str, cl: mega_cl::Model) -> Result<(), GitError> {
let storage = self.storage.mono_storage();
let refs = storage.get_ref(&cl.path).await.unwrap().unwrap();
let refs = storage.get_main_ref(&cl.path).await.unwrap().unwrap();

if cl.from_hash == refs.ref_commit_hash {
let commit: Commit = Commit::from_mega_model(
Expand Down
2 changes: 1 addition & 1 deletion ceres/src/merge_checker/cl_sync_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl Checker for ClSyncChecker {
let refs = self
.storage
.mono_storage()
.get_ref(&cl_info.path)
.get_main_ref(&cl_info.path)
.await?
.expect("Err: CL Related Refs Not Found");
Ok(serde_json::json!({
Expand Down
4 changes: 2 additions & 2 deletions ceres/src/pack/import_repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl RepoHandler for ImportRepo {
false
}

async fn head_hash(&self) -> (String, Vec<Refs>) {
async fn refs_with_head_hash(&self) -> (String, Vec<Refs>) {
let result = self
.storage
.git_db_storage()
Expand Down Expand Up @@ -349,7 +349,7 @@ impl ImportRepo {

// 3. get root ref
let mut root_ref = storage
.get_ref("/")
.get_main_ref("/")
.await?
.ok_or_else(|| MegaError::with_message("root ref not found"))?;

Expand Down
2 changes: 1 addition & 1 deletion ceres/src/pack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub mod monorepo;
pub trait RepoHandler: Send + Sync + 'static {
fn is_monorepo(&self) -> bool;

async fn head_hash(&self) -> (String, Vec<Refs>);
async fn refs_with_head_hash(&self) -> (String, Vec<Refs>);

async fn receiver_handler(
self: Arc<Self>,
Expand Down
145 changes: 74 additions & 71 deletions ceres/src/pack/monorepo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ use tokio::sync::{RwLock, mpsc};
use tokio_stream::wrappers::ReceiverStream;

use bellatrix::{Bellatrix, orion_client::BuildInfo, orion_client::OrionBuildRequest};
use callisto::{entity_ext::generate_link, mega_cl, raw_blob, sea_orm_active_enums::ConvTypeEnum};
use callisto::{
entity_ext::generate_link, mega_cl, mega_refs, raw_blob, sea_orm_active_enums::ConvTypeEnum,
};
use common::{
errors::MegaError,
utils::{self, MEGA_BRANCH_NAME},
utils::{self, ZERO_ID},
};
use git_internal::{
errors::GitError,
Expand Down Expand Up @@ -57,81 +59,84 @@ impl RepoHandler for MonoRepo {
true
}

async fn head_hash(&self) -> (String, Vec<Refs>) {
async fn refs_with_head_hash(&self) -> (String, Vec<Refs>) {
let storage = self.storage.mono_storage();

let result = storage.get_refs(self.path.to_str().unwrap()).await.unwrap();
let path_refs = storage
.get_all_refs(self.path.to_str().unwrap(), false)
.await
.unwrap();

let heads_exist = result
let heads_exist = path_refs
.iter()
.any(|x| x.ref_name == common::utils::MEGA_BRANCH_NAME);

let refs = if heads_exist {
let refs: Vec<Refs> = result.into_iter().map(|x| x.into()).collect();
let refs: Vec<Refs> = path_refs.into_iter().map(|x| x.into()).collect();
refs
} else {
let target_path = self.path.clone();
let refs = storage.get_ref("/").await.unwrap().unwrap();
let tree_hash = refs.ref_tree_hash.clone();
let mut refs = vec![];

let mut tree: Tree =
Tree::from_mega_model(storage.get_tree_by_hash(&tree_hash).await.unwrap().unwrap());
let root_refs = storage.get_all_refs("/", true).await.unwrap();

let commit: Commit = Commit::from_mega_model(
storage
.get_commit_by_hash(&refs.ref_commit_hash)
.await
.unwrap()
.unwrap(),
);

for component in target_path.components() {
if component != Component::RootDir {
let path_name = component.as_os_str().to_str().unwrap();
let sha1 = tree
.tree_items
.iter()
.find(|x| x.name == path_name)
.map(|x| x.id);
if let Some(sha1) = sha1 {
tree = Tree::from_mega_model(
storage
.get_trees_by_hashes(vec![sha1.to_string()])
.await
.unwrap()[0]
.clone(),
);
} else {
return self.find_head_hash(vec![]);
for root_ref in root_refs {
let (tree_hash, commit_hash) = (root_ref.ref_tree_hash, root_ref.ref_commit_hash);
let mut tree: Tree = Tree::from_mega_model(
storage.get_tree_by_hash(&tree_hash).await.unwrap().unwrap(),
);

let commit: Commit = Commit::from_mega_model(
storage
.get_commit_by_hash(&commit_hash)
.await
.unwrap()
.unwrap(),
);

for component in target_path.components() {
if component != Component::RootDir {
let path_compo_name = component.as_os_str().to_str().unwrap();
let path_compo_hash = tree
.tree_items
.iter()
.find(|x| x.name == path_compo_name)
.map(|x| x.id);
if let Some(hash) = path_compo_hash {
tree = Tree::from_mega_model(
storage
.get_tree_by_hash(&hash.to_string())
.await
.unwrap()
.unwrap(),
);
} else {
return (ZERO_ID.to_string(), vec![]);
}
}
}
}
let c = Commit::new(
commit.author,
commit.committer,
tree.id,
vec![],
&commit.message,
);

let c = Commit::new(
commit.author,
commit.committer,
tree.id,
vec![],
&commit.message,
);
storage
.save_ref(
self.path.to_str().unwrap(),
None,
&c.id.to_string(),
&c.tree_id.to_string(),
let new_mega_ref = mega_refs::Model::new(
&self.path,
root_ref.ref_name.clone(),
c.id.to_string(),
c.tree_id.to_string(),
false,
)
.await
.unwrap();
storage.save_mega_commits(vec![c.clone()]).await.unwrap();

vec![Refs {
ref_name: MEGA_BRANCH_NAME.to_string(),
ref_hash: c.id.to_string(),
default_branch: true,
..Default::default()
}]
);

storage.save_refs(new_mega_ref.clone()).await.unwrap();
storage.save_mega_commits(vec![c.clone()]).await.unwrap();

refs.push(new_mega_ref.into());
}
refs
};
self.find_head_hash(refs)
}
Expand Down Expand Up @@ -305,16 +310,14 @@ impl RepoHandler for MonoRepo {
cl_ref.ref_tree_hash = c.tree_id.to_string();
storage.update_ref(cl_ref).await.unwrap();
} else {
storage
.save_ref(
self.path.to_str().unwrap(),
Some(ref_name),
&refs.new_id,
&c.tree_id.to_string(),
true,
)
.await
.unwrap();
let refs = mega_refs::Model::new(
&self.path,
ref_name,
refs.new_id.clone(),
c.tree_id.to_string(),
true,
);
storage.save_refs(refs).await.unwrap();
}
}
Ok(())
Expand Down
16 changes: 10 additions & 6 deletions ceres/src/protocol/smart.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::pin::Pin;

use anyhow::Result;
Expand Down Expand Up @@ -64,7 +65,7 @@ impl SmartProtocol {
let service_type = self.service_type.unwrap();

// The stream MUST include capability declarations behind a NUL on the first ref.
let (head_hash, git_refs) = repo_handler.head_hash().await;
let (head_hash, git_refs) = repo_handler.refs_with_head_hash().await;
let name = if head_hash == ZERO_ID {
"capabilities^{}"
} else {
Expand Down Expand Up @@ -92,8 +93,8 @@ impl SmartProtocol {
) -> Result<(ReceiverStream<Vec<u8>>, BytesMut), ProtocolError> {
let repo_handler = self.repo_handler().await?;

let mut want: Vec<String> = Vec::new();
let mut have: Vec<String> = Vec::new();
let mut want: HashSet<String> = HashSet::new();
let mut have: HashSet<String> = HashSet::new();
let mut last_common_commit = String::new();

let mut read_first_line = false;
Expand All @@ -112,10 +113,10 @@ impl SmartProtocol {

match commands {
b"want" => {
want.push(String::from_utf8(dst[5..45].to_vec()).unwrap());
want.insert(String::from_utf8(dst[5..45].to_vec()).unwrap());
}
b"have" => {
have.push(String::from_utf8(dst[5..45].to_vec()).unwrap());
have.insert(String::from_utf8(dst[5..45].to_vec()).unwrap());
}
b"done" => break,
other => {
Expand All @@ -142,8 +143,11 @@ impl SmartProtocol {
let pack_data;
let mut protocol_buf = BytesMut::new();

let want: Vec<String> = want.into_iter().collect();
let have: Vec<String> = have.into_iter().collect();

if have.is_empty() {
pack_data = repo_handler.full_pack(want.clone()).await.unwrap();
pack_data = repo_handler.full_pack(want).await.unwrap();
add_pkt_line_string(&mut protocol_buf, String::from("NAK\n"));
} else {
if self.capabilities.contains(&Capability::MultiAckDetailed) {
Expand Down
25 changes: 25 additions & 0 deletions jupiter/callisto/src/entity_ext/mega_refs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use crate::entity_ext::generate_id;
use crate::mega_refs;
use std::path::Path;

impl mega_refs::Model {
pub fn new<P: AsRef<Path>>(
path: P,
ref_name: String,
ref_commit_hash: String,
ref_tree_hash: String,
is_cl: bool,
) -> Self {
let now = chrono::Utc::now().naive_utc();
Self {
id: generate_id(),
path: path.as_ref().to_str().unwrap().to_string(),
ref_name,
ref_commit_hash,
ref_tree_hash,
created_at: now,
updated_at: now,
is_cl,
}
}
}
1 change: 1 addition & 0 deletions jupiter/callisto/src/entity_ext/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod label;
pub mod mega_cl;
pub mod mega_conversation;
pub mod mega_issue;
pub mod mega_refs;
pub mod reactions;

use idgenerator::IdInstance;
Expand Down
Loading
Loading