Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 60 additions & 15 deletions src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::convert::TryFrom;

use crate::config;
use crate::data::FieldColumn;
use crate::error::{Error as SuperError, Result};
use crate::proto::milvus::{
AlterCollectionFieldRequest, AlterCollectionRequest, CreateCollectionRequest,
DropCollectionRequest, FlushRequest, GetCompactionStateRequest, GetCompactionStateResponse,
HasCollectionRequest, LoadCollectionRequest, ManualCompactionRequest, ManualCompactionResponse,
ReleaseCollectionRequest, ShowCollectionsRequest,
GetFlushStateRequest, HasCollectionRequest, LoadCollectionRequest, ManualCompactionRequest,
ManualCompactionResponse, ReleaseCollectionRequest, ShowCollectionsRequest,
};
use crate::proto::schema::DataType;
use crate::schema::{CollectionSchema, CollectionSchemaBuilder};
Expand Down Expand Up @@ -84,7 +86,7 @@ impl CollectionCache {
Self {
collections: dashmap::DashMap::new(),
timestamps: dashmap::DashMap::new(),
client: client,
client,
}
}

Expand All @@ -93,7 +95,7 @@ impl CollectionCache {
self.timestamps.clear();
}

pub async fn get<'a>(&self, name: &str) -> Result<Collection> {
pub async fn get(&self, name: &str) -> Result<Collection> {
if !self.local_exist(name) {
let resp = self
.client
Expand Down Expand Up @@ -133,7 +135,7 @@ impl CollectionCache {
}

pub fn get_timestamp(&self, name: &str) -> Option<Timestamp> {
self.timestamps.get(name).map(|v| v.value().clone())
self.timestamps.get(name).map(|v| *v.value())
}

fn local_exist(&self, name: &str) -> bool {
Expand All @@ -150,9 +152,9 @@ impl From<proto::milvus::DescribeCollectionResponse> for Collection {
auto_id: schema.auto_id,
num_shards: value.shards_num as usize,
// num_partitions: value.partitions_num as usize,
consistency_level: ConsistencyLevel::from_i32(value.consistency_level).unwrap(),
consistency_level: ConsistencyLevel::try_from(value.consistency_level).unwrap(),
description: schema.description,
fields: schema.fields.into_iter().map(|f| Field::from(f)).collect(),
fields: schema.fields.into_iter().map(Field::from).collect(),
// enable_dynamic_field: value.enable_dynamic_field,
}
}
Expand Down Expand Up @@ -191,7 +193,7 @@ pub struct CompactionState {
impl From<GetCompactionStateResponse> for CompactionState {
fn from(value: GetCompactionStateResponse) -> Self {
Self {
state: crate::proto::common::CompactionState::from_i32(value.state).unwrap(),
state: crate::proto::common::CompactionState::try_from(value.state).unwrap(),
executing_plan_num: value.executing_plan_no,
timeout_plan_num: value.timeout_plan_no,
completed_plan_num: value.completed_plan_no,
Expand Down Expand Up @@ -374,10 +376,10 @@ impl Client {
.clone()
.rename_collection(crate::proto::milvus::RenameCollectionRequest {
base: Some(MsgBase::new(MsgType::RenameCollection)),
db_name: db_name,
db_name,
old_name: name,
new_name: new_name,
new_db_name: new_db_name,
new_name,
new_db_name,
})
.await?
.into_inner();
Expand Down Expand Up @@ -576,7 +578,7 @@ impl Client {
db_name: "".to_string(),
collection_name: collection_name.into(),
field_name: field_name.into(),
properties: properties,
properties,
delete_keys: vec![],
})
.await?
Expand Down Expand Up @@ -618,7 +620,7 @@ impl Client {
db_name: "".to_string(),
collection_name: collection_name.into(),
collection_id: 0,
properties: properties,
properties,
delete_keys: Vec::new(),
})
.await?
Expand Down Expand Up @@ -679,20 +681,63 @@ impl Client {
where
S: Into<String>,
{
let name = collection_name.into();
let res = self
.client
.clone()
.flush(FlushRequest {
base: Some(MsgBase::new(MsgType::Flush)),
db_name: "".to_string(),
collection_names: vec![collection_name.into()],
collection_names: vec![name.clone()],
})
.await?
.into_inner();

status_to_result(&res.status)?;

Ok(())
let flush_ts = match res.coll_flush_ts.get(&name) {
Some(&ts) => ts,
None => {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
return Ok(());
}
};
let segment_i_ds = res
.coll_seg_i_ds
.get(&name)
.map(|a| a.data.clone())
.unwrap_or_default();

let mut stub = self.client.clone();
let request = GetFlushStateRequest {
segment_i_ds: segment_i_ds.clone(),
flush_ts,
db_name: "".to_string(),
collection_name: name.clone(),
};
for _ in 0..60 {
let state = match tokio::time::timeout(
std::time::Duration::from_secs(5),
stub.get_flush_state(request.clone()),
)
.await
{
Ok(Ok(resp)) => resp.into_inner(),
Ok(Err(e)) => return Err(e.into()),
Err(_) => {
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
continue;
}
};
status_to_result(&state.status)?;
if state.flushed {
return Ok(());
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
Err(SuperError::Unexpected(
"flush did not complete within 30s".to_owned(),
))
}

/// manual compaction
Expand Down