Skip to content

Commit f43d991

Browse files
committed
Modify nydus-image chunkdict save
Add two fields image_name and version_name of the chunk table in the database Modify the database storage logic to save data without erasing the original data Signed-off-by: Zhao Yuan <[email protected]>
1 parent 5cf3e43 commit f43d991

File tree

2 files changed

+64
-184
lines changed

2 files changed

+64
-184
lines changed

src/bin/nydus-image/deduplicate.rs

+54-141
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use nydus_storage::device::BlobInfo;
1111
use rusqlite::{params, Connection};
1212
use std::fs;
1313
use std::path::Path;
14+
use std::result::Result::Ok;
1415
use std::sync::{Arc, Mutex};
1516

1617
#[derive(Debug)]
@@ -66,15 +67,11 @@ pub struct SqliteDatabase {
6667

6768
impl SqliteDatabase {
6869
pub fn new(database_url: &str) -> Result<Self, rusqlite::Error> {
69-
// Delete the database file if it exists.
70+
// Connect to a database that already exists
7071
if let Ok(metadata) = fs::metadata(database_url) {
7172
if metadata.is_file() {
72-
if let Err(err) = fs::remove_file(database_url) {
73-
warn!(
74-
"Warning: Unable to delete existing database file: {:?}.",
75-
err
76-
);
77-
}
73+
} else {
74+
panic!("Warning: Unable to find existing database file.");
7875
}
7976
}
8077

@@ -95,24 +92,6 @@ impl SqliteDatabase {
9592
blob_table,
9693
})
9794
}
98-
99-
pub fn connect(database_url: &str) -> Result<Self, rusqlite::Error> {
100-
// Connect to a database that already exists
101-
if let Ok(metadata) = fs::metadata(database_url) {
102-
if metadata.is_file() {
103-
} else {
104-
panic!("Warning: Unable to find existing database file.");
105-
}
106-
}
107-
108-
let chunk_table = ChunkTable::new(database_url)?;
109-
let blob_table = BlobTable::new(database_url)?;
110-
111-
Ok(Self {
112-
chunk_table,
113-
blob_table,
114-
})
115-
}
11695
}
11796

11897
impl Database for SqliteDatabase {
@@ -163,11 +142,15 @@ impl Deduplicate<SqliteDatabase> {
163142
Ok(Self { sb, db })
164143
}
165144

166-
pub fn save_metadata(&mut self) -> anyhow::Result<Vec<Arc<BlobInfo>>> {
145+
pub fn save_metadata(
146+
&mut self,
147+
image_name: String,
148+
version_name: String,
149+
) -> anyhow::Result<Vec<Arc<BlobInfo>>> {
167150
self.create_tables()?;
168151
let blob_infos = self.sb.superblock.get_blob_infos();
169152
self.insert_blobs(&blob_infos)?;
170-
self.insert_chunks(&blob_infos)?;
153+
self.insert_chunks(&blob_infos, image_name, version_name)?;
171154
Ok(blob_infos)
172155
}
173156

@@ -194,14 +177,21 @@ impl Deduplicate<SqliteDatabase> {
194177
Ok(())
195178
}
196179

197-
fn insert_chunks(&mut self, blob_infos: &[Arc<BlobInfo>]) -> anyhow::Result<()> {
180+
fn insert_chunks(
181+
&mut self,
182+
blob_infos: &[Arc<BlobInfo>],
183+
image_name: String,
184+
version_name: String,
185+
) -> anyhow::Result<()> {
198186
let process_chunk = &mut |t: &Tree| -> Result<()> {
199187
let node = t.lock_node();
200188
for chunk in &node.chunks {
201189
let index = chunk.inner.blob_index();
202190
let chunk_blob_id = blob_infos[index as usize].blob_id();
203191
self.db
204192
.insert_chunk(&Chunk {
193+
image_name: image_name.to_string(),
194+
version_name: version_name.to_string(),
205195
chunk_blob_id,
206196
chunk_digest: chunk.inner.id().to_string(),
207197
chunk_compressed_size: chunk.inner.compressed_size(),
@@ -220,75 +210,14 @@ impl Deduplicate<SqliteDatabase> {
220210
}
221211
}
222212

223-
pub struct Algorithm<D: Database + Send + Sync> {
224-
algorithm_name: String,
225-
db: D,
226-
}
227-
228-
impl Algorithm<SqliteDatabase> {
229-
pub fn new(algorithm: String, db_url: &str) -> anyhow::Result<Self> {
230-
let algorithm_name = algorithm;
231-
let db = SqliteDatabase::connect(db_url)?;
232-
Ok(Self { algorithm_name, db })
233-
}
234-
235-
pub fn chunkdict_generate(&mut self) -> anyhow::Result<Vec<Chunk>> {
236-
let all_chunks = self.db.chunk_table.list_all()?;
237-
let chunkdict = match &self.algorithm_name as &str {
238-
"exponential_smoothing" => Self::exponential_smoothing(self, all_chunks)?,
239-
_ => {
240-
bail!("Unsupported algorithm name:, please use a valid algorithm name, such as exponential_smoothing")
241-
}
242-
};
243-
Ok(chunkdict)
244-
}
245-
246-
// Algorithm "exponential_smoothing"
247-
// List all chunk and sort them by the order in chunk table
248-
// Score each chunk by "exponential_smoothing" formula
249-
// Select chunks whose score is greater than threshold and generate chunk dictionary
250-
fn exponential_smoothing(&mut self, all_chunks: Vec<Chunk>) -> anyhow::Result<Vec<Chunk>> {
251-
let alpha = 0.5;
252-
let previou_length = 1000;
253-
let threshold = 0.1;
254-
let mut smoothed_data = Vec::new();
255-
for (chunk_index, chunk) in all_chunks.iter().enumerate() {
256-
let mut is_duplicate: f64 = 0.0;
257-
let mut temp = 0;
258-
if chunk_index > previou_length {
259-
temp = chunk_index - previou_length;
260-
}
261-
262-
for previou_index in all_chunks.iter().take(chunk_index).skip(temp) {
263-
if chunk.chunk_digest == previou_index.chunk_digest {
264-
is_duplicate = 1.0;
265-
break;
266-
}
267-
}
268-
if chunk_index == 0 {
269-
smoothed_data.push(0.0);
270-
} else {
271-
let smoothed_score: f64 =
272-
alpha * is_duplicate + (1.0 - alpha) * smoothed_data[chunk_index - 1];
273-
smoothed_data.push(smoothed_score);
274-
}
275-
}
276-
let mut chunkdict: Vec<Chunk> = Vec::new();
277-
for i in 0..smoothed_data.len() {
278-
let chunk = Chunk {
279-
chunk_blob_id: all_chunks[i].chunk_blob_id.clone(),
280-
chunk_digest: all_chunks[i].chunk_digest.clone(),
281-
chunk_compressed_offset: all_chunks[i].chunk_compressed_offset,
282-
chunk_uncompressed_offset: all_chunks[i].chunk_uncompressed_offset,
283-
chunk_compressed_size: all_chunks[i].chunk_compressed_size,
284-
chunk_uncompressed_size: all_chunks[i].chunk_uncompressed_size,
285-
};
286-
if smoothed_data[i] > threshold {
287-
chunkdict.push(chunk);
288-
}
289-
}
290-
Ok(chunkdict)
291-
}
213+
#[allow(dead_code)]
214+
#[derive(Debug)]
215+
struct DataPoint {
216+
image_name: String,
217+
chunk_list: Vec<Chunk>,
218+
visited: bool,
219+
clustered: bool,
220+
cluster_id: i32,
292221
}
293222

294223
pub trait Table<T, Err>: Sync + Send + Sized + 'static
@@ -311,7 +240,7 @@ where
311240
fn list_paged(&self, offset: i64, limit: i64) -> Result<Vec<T>, Err>;
312241
}
313242

314-
#[derive(Debug)]
243+
#[derive()]
315244
pub struct ChunkTable {
316245
conn: Arc<Mutex<Connection>>,
317246
}
@@ -332,8 +261,10 @@ impl ChunkTable {
332261
}
333262
}
334263

335-
#[derive(Debug)]
264+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
336265
pub struct Chunk {
266+
image_name: String,
267+
version_name: String,
337268
chunk_blob_id: String,
338269
chunk_digest: String,
339270
chunk_compressed_size: u32,
@@ -359,6 +290,8 @@ impl Table<Chunk, DatabaseError> for ChunkTable {
359290
.execute(
360291
"CREATE TABLE IF NOT EXISTS chunk (
361292
id INTEGER PRIMARY KEY,
293+
image_name TEXT,
294+
version_name TEXT,
362295
chunk_blob_id TEXT NOT NULL,
363296
chunk_digest TEXT,
364297
chunk_compressed_size INT,
@@ -378,16 +311,20 @@ impl Table<Chunk, DatabaseError> for ChunkTable {
378311
.map_err(|e| DatabaseError::PoisonError(e.to_string()))?
379312
.execute(
380313
"INSERT INTO chunk(
314+
image_name,
315+
version_name,
381316
chunk_blob_id,
382317
chunk_digest,
383318
chunk_compressed_size,
384319
chunk_uncompressed_size,
385320
chunk_compressed_offset,
386321
chunk_uncompressed_offset
387322
)
388-
VALUES (?1, ?2, ?3, ?4, ?5, ?6);
323+
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8);
389324
",
390325
rusqlite::params![
326+
chunk.image_name,
327+
chunk.version_name,
391328
chunk.chunk_blob_id,
392329
chunk.chunk_digest,
393330
chunk.chunk_compressed_size,
@@ -425,18 +362,20 @@ impl Table<Chunk, DatabaseError> for ChunkTable {
425362
.map_err(|e| DatabaseError::PoisonError(e.to_string()))?;
426363
let mut stmt: rusqlite::Statement<'_> = conn_guard
427364
.prepare(
428-
"SELECT id, chunk_blob_id, chunk_digest, chunk_compressed_size,
365+
"SELECT id, image_name, version_name, chunk_blob_id, chunk_digest, chunk_compressed_size,
429366
chunk_uncompressed_size, chunk_compressed_offset, chunk_uncompressed_offset from chunk
430367
ORDER BY id LIMIT ?1 OFFSET ?2",
431368
)?;
432369
let chunk_iterator = stmt.query_map(params![limit, offset], |row| {
433370
Ok(Chunk {
434-
chunk_blob_id: row.get(1)?,
435-
chunk_digest: row.get(2)?,
436-
chunk_compressed_size: row.get(3)?,
437-
chunk_uncompressed_size: row.get(4)?,
438-
chunk_compressed_offset: row.get(5)?,
439-
chunk_uncompressed_offset: row.get(6)?,
371+
image_name: row.get(1)?,
372+
version_name: row.get(2)?,
373+
chunk_blob_id: row.get(3)?,
374+
chunk_digest: row.get(4)?,
375+
chunk_compressed_size: row.get(5)?,
376+
chunk_uncompressed_size: row.get(6)?,
377+
chunk_compressed_offset: row.get(7)?,
378+
chunk_uncompressed_offset: row.get(8)?,
440379
})
441380
})?;
442381
let mut chunks = Vec::new();
@@ -593,6 +532,8 @@ mod tests {
593532
let chunk_table = ChunkTable::new_in_memory()?;
594533
chunk_table.create()?;
595534
let chunk = Chunk {
535+
image_name: "REDIS".to_string(),
536+
version_name: "1.0.0".to_string(),
596537
chunk_blob_id: "BLOB123".to_string(),
597538
chunk_digest: "DIGEST123".to_string(),
598539
chunk_compressed_size: 512,
@@ -602,6 +543,8 @@ mod tests {
602543
};
603544
chunk_table.insert(&chunk)?;
604545
let chunks = chunk_table.list_all()?;
546+
assert_eq!(chunks[0].image_name, chunk.image_name);
547+
assert_eq!(chunks[0].version_name, chunk.version_name);
605548
assert_eq!(chunks.len(), 1);
606549
assert_eq!(chunks[0].chunk_blob_id, chunk.chunk_blob_id);
607550
assert_eq!(chunks[0].chunk_digest, chunk.chunk_digest);
@@ -648,6 +591,8 @@ mod tests {
648591
for i in 0..200 {
649592
let i64 = i as u64;
650593
let chunk = Chunk {
594+
image_name: format!("REDIS{}", i),
595+
version_name: format!("1.0.0{}", i),
651596
chunk_blob_id: format!("BLOB{}", i),
652597
chunk_digest: format!("DIGEST{}", i),
653598
chunk_compressed_size: i,
@@ -659,6 +604,8 @@ mod tests {
659604
}
660605
let chunks = chunk_table.list_paged(100, 100)?;
661606
assert_eq!(chunks.len(), 100);
607+
assert_eq!(chunks[0].image_name, "REDIS100");
608+
assert_eq!(chunks[0].version_name, "1.0.0100");
662609
assert_eq!(chunks[0].chunk_blob_id, "BLOB100");
663610
assert_eq!(chunks[0].chunk_digest, "DIGEST100");
664611
assert_eq!(chunks[0].chunk_compressed_size, 100);
@@ -668,38 +615,4 @@ mod tests {
668615
Ok(())
669616
}
670617

671-
#[test]
672-
fn test_alalgorithm_exponential_smoothing() -> Result<(), Box<dyn std::error::Error>> {
673-
let algorithm = String::from("exponential_smoothing");
674-
let db_url = "./metadata.db";
675-
let chunk_table = ChunkTable::new(db_url)?;
676-
chunk_table.clear()?;
677-
chunk_table.create()?;
678-
for i in 0..200 {
679-
let i64 = i as u64;
680-
let chunk = Chunk {
681-
chunk_blob_id: format!("BLOB{}", i),
682-
chunk_digest: format!("DIGEST{}", (i + 1) % 2),
683-
chunk_compressed_size: i,
684-
chunk_uncompressed_size: i * 2,
685-
chunk_compressed_offset: i64 * 3,
686-
chunk_uncompressed_offset: i64 * 4,
687-
};
688-
chunk_table.insert(&chunk)?;
689-
}
690-
let mut algorithm = Algorithm::<SqliteDatabase>::new(algorithm, db_url)?;
691-
assert_eq!(
692-
algorithm.algorithm_name,
693-
"exponential_smoothing".to_string()
694-
);
695-
let chunkdict = algorithm.chunkdict_generate()?;
696-
assert_eq!(chunkdict.len(), 198);
697-
assert_eq!(chunkdict[0].chunk_blob_id, "BLOB2");
698-
assert_eq!(chunkdict[0].chunk_digest, "DIGEST1");
699-
assert_eq!(chunkdict[0].chunk_compressed_size, 2);
700-
assert_eq!(chunkdict[0].chunk_uncompressed_size, 4);
701-
assert_eq!(chunkdict[0].chunk_compressed_offset, 6);
702-
assert_eq!(chunkdict[0].chunk_uncompressed_offset, 8);
703-
Ok(())
704-
}
705618
}

0 commit comments

Comments
 (0)