Skip to content

Commit d805656

Browse files
authored
Merge pull request #756 from el-ev/fix_memory_monitoring
fix(mercury): fix memory recorder for delta objects
2 parents 1329e67 + 1fb3389 commit d805656

File tree

3 files changed

+79
-25
lines changed

3 files changed

+79
-25
lines changed

mercury/src/internal/pack/cache_object.rs

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ use lru_mem::{HeapSize, MemSize};
99
use serde::{Deserialize, Serialize};
1010
use threadpool::ThreadPool;
1111

12+
use crate::internal::pack::entry::Entry;
1213
use crate::internal::pack::utils;
1314
use crate::{hash::SHA1, internal::object::types::ObjectType};
14-
use crate::internal::pack::entry::Entry;
1515

1616
// /// record heap-size of all CacheObjects, used for memory limit.
1717
// static CACHE_OBJS_MEM_SIZE: AtomicUsize = AtomicUsize::new(0);
@@ -56,7 +56,14 @@ pub struct CacheObject {
5656
pub data_decompress: Vec<u8>,
5757
pub offset: usize,
5858
pub hash: SHA1,
59-
pub mem_recorder: Option<Arc<AtomicUsize>> // record mem-size of all CacheObjects of a Pack
59+
/// If a [`CacheObject`] is an [`ObjectType::HashDelta`] or an [`ObjectType::OffsetDelta`],
60+
/// it will expand to another [`CacheObject`] of other types. To prevent potential OOM,
61+
/// we record the size of the expanded object as well as that of the object itself.
62+
///
63+
/// See [Comment in PR #755](https://github.com/web3infra-foundation/mega/pull/755#issuecomment-2543100481) for more details.
64+
#[serde(skip, default = "usize::default")]
65+
pub delta_final_size: usize,
66+
pub mem_recorder: Option<Arc<AtomicUsize>>, // record mem-size of all CacheObjects of a Pack
6067
}
6168

6269
impl Clone for CacheObject {
@@ -68,6 +75,7 @@ impl Clone for CacheObject {
6875
data_decompress: self.data_decompress.clone(),
6976
offset: self.offset,
7077
hash: self.hash,
78+
delta_final_size: self.delta_final_size,
7179
mem_recorder: self.mem_recorder.clone(),
7280
};
7381
obj.record_mem_size();
@@ -87,6 +95,7 @@ impl Default for CacheObject {
8795
obj_type: ObjectType::Blob,
8896
offset: 0,
8997
hash: SHA1::default(),
98+
delta_final_size: 0,
9099
mem_recorder: None,
91100
};
92101
obj.record_mem_size();
@@ -98,8 +107,21 @@ impl Default for CacheObject {
98107
// ! the implementation of HeapSize is not accurate, only calculate the size of the data_decompress
99108
// Note that: mem_size == value_size + heap_size, and we only need to impl HeapSize because value_size is known
100109
impl HeapSize for CacheObject {
110+
/// For [`ObjectType::OffsetDelta`] and [`ObjectType::HashDelta`],
111+
/// `delta_final_size` is the size of the expanded object;
112+
/// for other types, `delta_final_size` is 0 as they won't expand.
101113
fn heap_size(&self) -> usize {
102-
self.data_decompress.heap_size()
114+
// To those who are concerned about why these two values are added,
115+
// let's consider the lifetime of two `CacheObject`s, say `delta_obj`
116+
// and `final_obj` in the function `Pack::rebuild_delta`.
117+
//
118+
// `delta_obj` is dropped only after `Pack::rebuild_delta` returns,
119+
// but the space for `final_obj` is allocated in that function.
120+
//
121+
// Therefore, during the execution of `Pack::rebuild_delta`, both `delta_obj`
122+
// and `final_obj` coexist. The maximum memory usage is the sum of the memory
123+
// usage of `delta_obj` and `final_obj`.
124+
self.data_decompress.heap_size() + self.delta_final_size
103125
}
104126
}
105127

@@ -111,7 +133,6 @@ impl Drop for CacheObject {
111133
if let Some(mem_recorder) = &self.mem_recorder {
112134
mem_recorder.fetch_sub((*self).mem_size(), Ordering::SeqCst);
113135
}
114-
115136
}
116137
}
117138

@@ -146,14 +167,15 @@ impl MemSizeRecorder for CacheObject {
146167
}
147168

148169
impl CacheObject {
149-
/// Create a new CacheObject witch is not offset_delta or hash_delta
170+
/// Create a new CacheObject which is neither [`ObjectType::OffsetDelta`] nor [`ObjectType::HashDelta`].
150171
pub fn new_for_undeltified(obj_type: ObjectType, data: Vec<u8>, offset: usize) -> Self {
151172
let hash = utils::calculate_object_hash(obj_type, &data);
152173
CacheObject {
153174
data_decompress: data,
154175
obj_type,
155176
offset,
156177
hash,
178+
delta_final_size: 0, // Only delta objects have `delta_final_size`
157179
mem_recorder: None,
158180
..Default::default()
159181
}
@@ -162,13 +184,11 @@ impl CacheObject {
162184
/// transform the CacheObject to Entry
163185
pub fn to_entry(&self) -> Entry {
164186
match self.obj_type {
165-
ObjectType::Blob | ObjectType::Tree | ObjectType::Commit | ObjectType::Tag => {
166-
Entry {
167-
obj_type: self.obj_type,
168-
data: self.data_decompress.clone(),
169-
hash: self.hash,
170-
}
171-
}
187+
ObjectType::Blob | ObjectType::Tree | ObjectType::Commit | ObjectType::Tag => Entry {
188+
obj_type: self.obj_type,
189+
data: self.data_decompress.clone(),
190+
hash: self.hash,
191+
},
172192
_ => {
173193
unreachable!("delta object should not persist!")
174194
}
@@ -177,10 +197,16 @@ impl CacheObject {
177197
}
178198

179199
/// trait alias for simple use
180-
pub trait ArcWrapperBounds: HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static {}
200+
pub trait ArcWrapperBounds:
201+
HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static
202+
{
203+
}
181204
// You must impl `Alias Trait` for all the `T` satisfying Constraints
182205
// Or, `T` will not satisfy `Alias Trait` even if it satisfies the Original traits
183-
impl<T: HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static> ArcWrapperBounds for T {}
206+
impl<T: HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static> ArcWrapperBounds
207+
for T
208+
{
209+
}
184210

185211
/// Implementing encapsulation of Arc to enable third-party Trait HeapSize implementation for the Arc type
186212
/// Because of use Arc in LruCache, the LruCache is not clear whether a pointer will drop the referenced
@@ -300,6 +326,7 @@ mod test {
300326
obj_type: ObjectType::Blob,
301327
offset: 0,
302328
hash: SHA1::new(&vec![0; 20]),
329+
delta_final_size: 0,
303330
mem_recorder: None,
304331
};
305332
assert!(a.heap_size() == 1024);
@@ -318,6 +345,7 @@ mod test {
318345
obj_type: ObjectType::Blob,
319346
offset: 0,
320347
hash: SHA1::new(&vec![0; 20]),
348+
delta_final_size: 0,
321349
mem_recorder: None,
322350
};
323351
println!("a.heap_size() = {}", a.heap_size());
@@ -329,6 +357,7 @@ mod test {
329357
obj_type: ObjectType::Blob,
330358
offset: 0,
331359
hash: SHA1::new(&vec![1; 20]),
360+
delta_final_size: 0,
332361
mem_recorder: None,
333362
};
334363
{
@@ -433,6 +462,7 @@ mod test {
433462
obj_type: ObjectType::Blob,
434463
offset: 0,
435464
hash: SHA1::new(&vec![0; 20]),
465+
delta_final_size: 0,
436466
mem_recorder: None,
437467
};
438468
let s = bincode::serialize(&a).unwrap();

mercury/src/internal/pack/decode.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -275,11 +275,15 @@ impl Pack {
275275
})
276276
.unwrap();
277277

278+
let mut reader = Cursor::new(&data);
279+
let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
280+
278281
Ok(CacheObject {
279282
base_offset,
280283
data_decompress: data,
281284
obj_type: t,
282285
offset: init_offset,
286+
delta_final_size: final_size,
283287
mem_recorder: None,
284288
..Default::default()
285289
})
@@ -292,12 +296,16 @@ impl Pack {
292296

293297
let (data, raw_size) = self.decompress_data(pack, size)?;
294298
*offset += raw_size;
299+
300+
let mut reader = Cursor::new(&data);
301+
let (_, final_size) = utils::read_delta_object_size(&mut reader)?;
295302

296303
Ok(CacheObject {
297304
base_ref: ref_sha1,
298305
data_decompress: data,
299306
obj_type: t,
300307
offset: init_offset,
308+
delta_final_size: final_size,
301309
mem_recorder: None,
302310
..Default::default()
303311
})
@@ -306,8 +314,6 @@ impl Pack {
306314
}
307315

308316
/// Decodes a pack file from a given Read and BufRead source and get a vec of objects.
309-
///
310-
///
311317
pub fn decode<F>(&mut self, pack: &mut (impl BufRead + Send), callback: F) -> Result<(), GitError>
312318
where
313319
F: Fn(Entry, usize) + Sync + Send + 'static
@@ -338,7 +344,7 @@ impl Pack {
338344
let mut offset: usize = 12;
339345
let mut i = 0;
340346
while i < self.number {
341-
// log per 2000&more then 1 se objects
347+
// log per 1000 objects and 1 second
342348
if i%1000 == 0 {
343349
let time_now = time.elapsed().as_millis();
344350
if time_now - last_update_time > 1000 {
@@ -536,16 +542,15 @@ impl Pack {
536542

537543
let mut stream = Cursor::new(&delta_obj.data_decompress);
538544

539-
// Read the base object size & Result Size
545+
// Read the base object size
540546
// (Size Encoding)
541-
let base_size = utils::read_varint_le(&mut stream).unwrap().0;
542-
let result_size = utils::read_varint_le(&mut stream).unwrap().0;
547+
let (base_size, result_size) = utils::read_delta_object_size(&mut stream).unwrap();
543548

544549
//Get the base object row data
545550
let base_info = &base_obj.data_decompress;
546-
assert_eq!(base_info.len() as u64, base_size);
551+
assert_eq!(base_info.len(), base_size, "Base object size mismatch");
547552

548-
let mut result = Vec::with_capacity(result_size as usize);
553+
let mut result = Vec::with_capacity(result_size);
549554

550555
loop {
551556
// Check if the stream has ended, meaning the new object is done
@@ -597,14 +602,15 @@ impl Pack {
597602
}
598603
}
599604
}
600-
assert_eq!(result_size, result.len() as u64);
605+
assert_eq!(result_size, result.len(), "Result size mismatch");
601606

602607
let hash = utils::calculate_object_hash(base_obj.obj_type, &result);
603608
// create new obj from `delta_obj` & `result` instead of modifying `delta_obj` for heap-size recording
604609
CacheObject {
605610
data_decompress: result,
606611
obj_type: base_obj.obj_type, // Same as the Type of base object
607612
hash,
613+
delta_final_size: 0, // The new object is not a delta obj, so set its final size to 0.
608614
mem_recorder: None, // This filed(Arc) can't be moved from `delta_obj` by `struct update syntax`
609615
..delta_obj // This syntax is actually move `delta_obj` to `new_obj`
610616
} // Canonical form (Complete Object)

mercury/src/internal/pack/utils.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,6 @@ pub fn read_varint_le<R: Read>(reader: &mut R) -> io::Result<(u64, usize)> {
153153
Ok((value, offset))
154154
}
155155

156-
157-
158156
/// The offset for an OffsetDelta object(big-endian order)
159157
/// # Arguments
160158
///
@@ -232,6 +230,26 @@ pub fn read_partial_int<R: Read>(
232230
Ok(value)
233231
}
234232

233+
/// Reads the base size and result size of a delta object from the given stream.
234+
///
235+
/// **Note**: The stream MUST be positioned at the start of the delta object.
236+
///
237+
/// The base size and result size are encoded as variable-length integers in little-endian order.
238+
///
239+
/// The base size is the size of the base object, and the result size is the size of the result object.
240+
///
241+
/// # Parameters
242+
/// * `stream`: The stream from which the sizes are read.
243+
///
244+
/// # Returns
245+
/// Returns a tuple containing the base size and result size.
246+
///
247+
pub fn read_delta_object_size<R: Read>(stream: &mut R) -> io::Result<(usize, usize)> {
248+
let base_size = read_varint_le(stream)?.0 as usize;
249+
let result_size = read_varint_le(stream)?.0 as usize;
250+
Ok((base_size, result_size))
251+
}
252+
235253
/// Calculate the SHA1 hash of the given object.
236254
/// <br> "`<type> <size>\0<content>`"
237255
/// <br> data: The decompressed content of the object

0 commit comments

Comments
 (0)