Skip to content

Commit 935aec8

Browse files
author
Yinwei Li
committed
Add apis for query and partition
What's changed: 1. client.rs: Add comment for list_aliases 2. collectoin.rs: Change the return type for describe_collection() from `collection` to `DescribeCollection`,containing more info than before,which now acts like pymilvus. 3. partitions.rs: Add load_partitions and release_partitions,following their options' struct. 4. query.rs: Add get() and it's following functions and struct,like extract_primary_field and pack_pks_expr,like pymilvus.Add options for query() to use and some build functions for QueryOptions. 5. test/: Change some settings and details to adapt to the changes above Why Change Some Struct: To catch up with pymivus,we need to approve optional params,so we need to change the option struct Signed-off-by: Yinwei Li <yinwei.li@zilliz.com>
1 parent 8e8107e commit 935aec8

File tree

8 files changed

+596
-73
lines changed

8 files changed

+596
-73
lines changed

src/client.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,15 @@ impl Client {
281281
Ok((resp.alias, resp.collection, resp.db_name))
282282
}
283283

284+
/// List a collection's aliases
285+
///
286+
/// # Arguments
287+
///
288+
/// * `collection_name` - The name of the collection.
289+
///
290+
/// # Returns
291+
///
292+
/// Returns a `Result` containing the database name, collection name, and aliases.
284293
pub async fn list_aliases(
285294
&self,
286295
collection_name: &str,

src/collection.rs

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,18 @@ pub struct Collection {
6060
// pub enable_dynamic_field: bool,
6161
}
6262

63+
/// Return type for describe collection,containing enough messages
64+
pub struct DescribeCollection {
65+
pub collection_name: String,
66+
pub collection_id: i64,
67+
pub shards_num: i32,
68+
pub aliases: Vec<String>,
69+
pub consistency_level: i32,
70+
pub properties: Vec<proto::common::KeyValuePair>,
71+
pub num_partitions: i64,
72+
pub schema: crate::proto::schema::CollectionSchema,
73+
}
74+
6375
#[derive(Debug, Clone)]
6476
pub(crate) struct CollectionCache {
6577
collections: dashmap::DashMap<String, Collection>,
@@ -275,7 +287,7 @@ impl Client {
275287
/// # Returns
276288
///
277289
/// Returns a `Result` containing the `Collection` information if successful, or an error if the collection does not exist or cannot be accessed.
278-
pub async fn describe_collection<S>(&self, name: S) -> Result<Collection>
290+
pub async fn describe_collection<S>(&self, name: S) -> Result<DescribeCollection>
279291
where
280292
S: Into<String>,
281293
{
@@ -294,7 +306,16 @@ impl Client {
294306

295307
status_to_result(&resp.status)?;
296308

297-
Ok(resp.into())
309+
Ok(DescribeCollection {
310+
collection_name: resp.collection_name,
311+
collection_id: resp.collection_id,
312+
shards_num: resp.shards_num,
313+
aliases: resp.aliases,
314+
consistency_level: resp.consistency_level,
315+
properties: resp.properties,
316+
num_partitions: resp.num_partitions,
317+
schema: resp.schema.unwrap_or_default(),
318+
})
298319
}
299320

300321
/// Checks if a collection with the given name exists.
@@ -328,26 +349,30 @@ impl Client {
328349
Ok(res.value)
329350
}
330351

331-
332-
pub async fn rename_collection<S>(&self, name: S, new_name: S,options:Option<(String,String)>) -> Result<()>
352+
pub async fn rename_collection<S>(
353+
&self,
354+
name: S,
355+
new_name: S,
356+
options: Option<(String, String)>,
357+
) -> Result<()>
333358
where
334359
S: Into<String>,
335360
{
336361
let name = name.into();
337362
let new_name = new_name.into();
338-
let (db_name,new_db_name)=match options {
339-
Some((db_name,new_db_name))=>(db_name,new_db_name),
340-
None=>("".to_string(),"".to_string()),
363+
let (db_name, new_db_name) = match options {
364+
Some((db_name, new_db_name)) => (db_name, new_db_name),
365+
None => ("".to_string(), "".to_string()),
341366
};
342367
let res = self
343368
.client
344369
.clone()
345370
.rename_collection(crate::proto::milvus::RenameCollectionRequest {
346371
base: Some(MsgBase::new(MsgType::RenameCollection)),
347-
db_name:db_name,
348-
old_name:name,
349-
new_name:new_name,
350-
new_db_name:new_db_name,
372+
db_name: db_name,
373+
old_name: name,
374+
new_name: new_name,
375+
new_db_name: new_db_name,
351376
})
352377
.await?
353378
.into_inner();

src/partition.rs

Lines changed: 196 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,41 @@ use crate::{
1010
utils::status_to_result,
1111
};
1212

13+
/// load_partitions' waitting time
14+
const WAIT_LOAD_DURATION_MS: u64 = 100;
15+
16+
#[derive(Debug, Clone)]
17+
pub struct LoadPartitionsOption {
18+
resource_groups: Vec<String>,
19+
refresh: bool,
20+
load_fields: Vec<String>,
21+
skip_load_dynamic_field: bool,
22+
load_params: HashMap<String, String>,
23+
}
24+
25+
impl Default for LoadPartitionsOption {
26+
fn default() -> Self {
27+
LoadPartitionsOption {
28+
resource_groups: Vec::new(),
29+
refresh: false,
30+
load_fields: Vec::new(),
31+
skip_load_dynamic_field: false,
32+
load_params: HashMap::new(),
33+
}
34+
}
35+
}
36+
1337
impl Client {
38+
/// Creates a new partition in the specified collection.
39+
///
40+
/// # Arguments
41+
///
42+
/// * `collection_name` - The name of the collection where the partition will be created.
43+
/// * `partition_name` - The name of the partition to be created.
44+
///
45+
/// # Returns
46+
///
47+
/// Returns a `Result` indicating success or failure.
1448
pub async fn create_partition(
1549
&self,
1650
collection_name: String,
@@ -30,6 +64,16 @@ impl Client {
3064
))
3165
}
3266

67+
/// Drops a partition from the specified collection.
68+
///
69+
/// # Arguments
70+
///
71+
/// * `collection_name` - The name of the collection containing the partition.
72+
/// * `partition_name` - The name of the partition to be dropped.
73+
///
74+
/// # Returns
75+
///
76+
/// Returns a `Result` indicating success or failure.
3377
pub async fn drop_partition(
3478
&self,
3579
collection_name: String,
@@ -49,6 +93,15 @@ impl Client {
4993
))
5094
}
5195

96+
/// Retrieves a list of all partitions in the specified collection.
97+
///
98+
/// # Arguments
99+
///
100+
/// * `collection_name` - The name of the collection to list partitions for.
101+
///
102+
/// # Returns
103+
///
104+
/// Returns a `Result` containing a vector of partition names if successful, or an error if the operation fails.
52105
pub async fn list_partitions(&self, collection_name: String) -> Result<Vec<String>> {
53106
let res = self
54107
.client
@@ -67,6 +120,16 @@ impl Client {
67120
Ok(res.partition_names)
68121
}
69122

123+
/// Checks if a partition exists in the specified collection.
124+
///
125+
/// # Arguments
126+
///
127+
/// * `collection_name` - The name of the collection to check.
128+
/// * `partition_name` - The name of the partition to check for existence.
129+
///
130+
/// # Returns
131+
///
132+
/// Returns a `Result` containing a boolean indicating whether the partition exists.
70133
pub async fn has_partition(
71134
&self,
72135
collection_name: String,
@@ -87,6 +150,16 @@ impl Client {
87150
Ok(res.value)
88151
}
89152

153+
/// Retrieves statistics for a specific partition.
154+
///
155+
/// # Arguments
156+
///
157+
/// * `collection_name` - The name of the collection containing the partition.
158+
/// * `partition_name` - The name of the partition to get statistics for.
159+
///
160+
/// # Returns
161+
///
162+
/// Returns a `Result` containing a HashMap of statistics key-value pairs.
90163
pub async fn get_partition_stats(
91164
&self,
92165
collection_name: String,
@@ -108,51 +181,127 @@ impl Client {
108181
Ok(res.stats.into_iter().map(|s| (s.key, s.value)).collect())
109182
}
110183

111-
// pub async fn load_partitions<S: Into<String>, I: IntoIterator<Item = S>>(
112-
// &self,
113-
// collection_name: S,
114-
// partition_names: I,
115-
// replica_number: i32,
116-
// ) -> Result<()> {
117-
// let names: Vec<String> = partition_names.into_iter().map(|x| x.to_string()).collect();
118-
// status_to_result(&Some(
119-
// self.client
120-
// .clone()
121-
// .load_partitions(proto::milvus::LoadPartitionsRequest {
122-
// base: Some(MsgBase::new(MsgType::LoadPartitions)),
123-
// db_name: "".to_string(),
124-
// collection_name: collection_name.into(),
125-
// replica_number,
126-
// partition_names: names.clone(),
127-
// })
128-
// .await?
129-
// .into_inner(),
130-
// ))?;
131-
132-
// loop {
133-
// if self.get_loading_progress(&names).await? >= 100 {
134-
// return Ok(());
135-
// }
136-
137-
// tokio::time::sleep(Duration::from_millis(config::WAIT_LOAD_DURATION_MS)).await;
138-
// }
139-
// }
140-
141-
// pub async fn release_partitions<S: ToString, I: IntoIterator<Item = S>>(
142-
// &self,
143-
// partition_names: I,
144-
// ) -> Result<()> {
145-
// status_to_result(&Some(
146-
// self.client
147-
// .clone()
148-
// .release_partitions(ReleasePartitionsRequest {
149-
// base: Some(MsgBase::new(MsgType::ReleasePartitions)),
150-
// db_name: "".to_string(),
151-
// collection_name: self.schema().name.to_string(),
152-
// partition_names: partition_names.into_iter().map(|x| x.to_string()).collect(),
153-
// })
154-
// .await?
155-
// .into_inner(),
156-
// ))
157-
// }
184+
/// Gets the loading progress for specified partitions.
185+
///
186+
/// # Arguments
187+
///
188+
/// * `collection_name` - The name of the collection.
189+
/// * `partition_names` - An iterator of partition names to check progress for.
190+
///
191+
/// # Returns
192+
///
193+
/// Returns a `Result` containing the loading progress percentage (0-100).
194+
async fn get_loading_progress<'a, S, I>(
195+
&self,
196+
collection_name: S,
197+
partition_names: I,
198+
) -> Result<i64>
199+
where
200+
S: Into<String>,
201+
I: IntoIterator<Item = &'a String>,
202+
{
203+
let partition_names: Vec<String> = partition_names.into_iter().map(|x| x.into()).collect();
204+
let resp = self
205+
.client
206+
.clone()
207+
.get_loading_progress(crate::proto::milvus::GetLoadingProgressRequest {
208+
base: Some(MsgBase::new(MsgType::LoadPartitions)),
209+
db_name: "".to_string(),
210+
collection_name: collection_name.into(),
211+
partition_names: partition_names,
212+
})
213+
.await?
214+
.into_inner();
215+
216+
status_to_result(&resp.status)?;
217+
Ok(resp.progress)
218+
}
219+
220+
/// Loads partitions into memory with configurable options.
221+
///
222+
/// This method loads the specified partitions into memory and waits for the loading
223+
/// process to complete. The method polls the loading progress until it reaches 100%.
224+
///
225+
/// # Arguments
226+
///
227+
/// * `collection_name` - The name of the collection containing the partitions.
228+
/// * `partition_names` - An iterator of partition names to load.
229+
/// * `replica_number` - The number of replicas to load.
230+
/// * `options` - Optional configuration for the loading process.
231+
///
232+
/// # Returns
233+
///
234+
/// Returns a `Result` indicating success or failure. The method will wait until
235+
/// all partitions are fully loaded before returning.
236+
pub async fn load_partitions<S: Into<String> + Copy, I: IntoIterator<Item = S>>(
237+
&self,
238+
collection_name: S,
239+
partition_names: I,
240+
replica_number: i32,
241+
options: Option<LoadPartitionsOption>,
242+
) -> Result<()> {
243+
let names: Vec<String> = partition_names.into_iter().map(|x| x.into()).collect();
244+
let options = options.unwrap_or_default();
245+
246+
status_to_result(&Some(
247+
self.client
248+
.clone()
249+
.load_partitions(proto::milvus::LoadPartitionsRequest {
250+
base: Some(MsgBase::new(MsgType::LoadPartitions)),
251+
db_name: "".to_string(),
252+
collection_name: collection_name.into(),
253+
replica_number,
254+
partition_names: names.clone(),
255+
resource_groups: options.resource_groups,
256+
refresh: options.refresh,
257+
load_fields: options.load_fields,
258+
skip_load_dynamic_field: options.skip_load_dynamic_field,
259+
load_params: options.load_params,
260+
})
261+
.await?
262+
.into_inner(),
263+
))?;
264+
265+
loop {
266+
if self.get_loading_progress(collection_name, &names).await? >= 100 {
267+
return Ok(());
268+
}
269+
270+
tokio::time::sleep(tokio::time::Duration::from_millis(WAIT_LOAD_DURATION_MS)).await;
271+
}
272+
}
273+
274+
/// Releases partitions from memory.
275+
///
276+
/// This method releases the specified partitions from memory, freeing up
277+
/// system resources. After releasing, the partitions will no longer be
278+
/// available for queries until they are loaded again.
279+
///
280+
/// # Arguments
281+
///
282+
/// * `collection_name` - The name of the collection containing the partitions.
283+
/// * `partition_names` - An iterator of partition names to release.
284+
///
285+
/// # Returns
286+
///
287+
/// Returns a `Result` indicating success or failure.
288+
pub async fn release_partitions<S: Into<String>, I: IntoIterator<Item = S>>(
289+
&self,
290+
collection_name: S,
291+
partition_names: I,
292+
) -> Result<()> {
293+
let names: Vec<String> = partition_names.into_iter().map(|x| x.into()).collect();
294+
status_to_result(&Some(
295+
self.client
296+
.clone()
297+
.release_partitions(crate::proto::milvus::ReleasePartitionsRequest {
298+
base: Some(MsgBase::new(MsgType::ReleasePartitions)),
299+
db_name: "".to_string(),
300+
collection_name: collection_name.into(),
301+
partition_names: names,
302+
})
303+
.await?
304+
.into_inner(),
305+
))
306+
}
158307
}

0 commit comments

Comments
 (0)