Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions examples/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use milvus::index::{IndexParams, IndexType};
use milvus::options::LoadOptions;
use milvus::query::QueryOptions;
use milvus::schema::{CollectionSchema, CollectionSchemaBuilder};
use milvus::{
client::Client, collection::Collection, data::FieldColumn, error::Error, schema::FieldSchema,
};
use milvus::{client::Client, data::FieldColumn, error::Error, schema::FieldSchema};
use std::collections::HashMap;

use rand::prelude::*;
Expand Down
14 changes: 8 additions & 6 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Interceptor for AuthInterceptor {
mut req: Request<()>,
) -> std::result::Result<tonic::Request<()>, tonic::Status> {
if let Some(ref token) = self.token {
let header_value = format!("{}", token);
let header_value = token;
req.metadata_mut()
.insert("authorization", header_value.parse().unwrap());
}
Expand Down Expand Up @@ -92,6 +92,7 @@ where
pub struct Client {
pub(crate) client: MilvusServiceClient<InterceptedService<Channel, AuthInterceptor>>,
pub(crate) collection_cache: CollectionCache,
pub(crate) db_name: String,
}

impl Client {
Expand Down Expand Up @@ -137,7 +138,8 @@ impl Client {

Ok(Self {
client: client.clone(),
collection_cache: CollectionCache::new(client),
collection_cache: CollectionCache::new(client, ""),
db_name: String::new(),
})
}

Expand All @@ -151,7 +153,7 @@ impl Client {
.clone()
.flush(FlushRequest {
base: Some(MsgBase::new(MsgType::Flush)),
db_name: "".to_string(),
db_name: self.db_name.clone(),
collection_names: collections.into_iter().map(|x| x.to_string()).collect(),
})
.await?
Expand Down Expand Up @@ -190,7 +192,7 @@ impl Client {
.clone()
.create_alias(crate::proto::milvus::CreateAliasRequest {
base: Some(MsgBase::new(MsgType::CreateAlias)),
db_name: "".to_string(), // reserved
db_name: self.db_name.clone(),
collection_name,
alias,
})
Expand Down Expand Up @@ -218,7 +220,7 @@ impl Client {
.clone()
.drop_alias(crate::proto::milvus::DropAliasRequest {
base: Some(MsgBase::new(MsgType::DropAlias)),
db_name: "".to_string(), // reserved
db_name: self.db_name.clone(),
alias,
})
.await?
Expand Down Expand Up @@ -248,7 +250,7 @@ impl Client {
.clone()
.alter_alias(crate::proto::milvus::AlterAliasRequest {
base: Some(MsgBase::new(MsgType::AlterAlias)),
db_name: "".to_string(), // reserved
db_name: self.db_name.clone(),
collection_name,
alias,
})
Expand Down
50 changes: 29 additions & 21 deletions src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::proto::milvus::{
ReleaseCollectionRequest, ShowCollectionsRequest,
};
use crate::proto::schema::DataType;
use crate::schema::CollectionSchema;
use crate::schema::{CollectionSchema, FieldSchema};
use crate::types::*;
use crate::utils::status_to_result;
use crate::value::Value;
Expand Down Expand Up @@ -55,7 +55,7 @@ pub struct Collection {
// pub num_partitions: usize,
pub consistency_level: ConsistencyLevel,
pub description: String,
pub fields: Vec<Field>,
pub fields: Vec<FieldSchema>,
// pub enable_dynamic_field: bool,
}

Expand All @@ -64,25 +64,30 @@ pub(crate) struct CollectionCache {
collections: dashmap::DashMap<String, Collection>,
timestamps: dashmap::DashMap<String, Timestamp>,
client: MilvusServiceClient<InterceptedService<Channel, AuthInterceptor>>,
db_name: String,
}

impl CollectionCache {
pub fn new(client: MilvusServiceClient<InterceptedService<Channel, AuthInterceptor>>) -> Self {
pub fn new(
client: MilvusServiceClient<InterceptedService<Channel, AuthInterceptor>>,
db_name: &str,
) -> Self {
Self {
collections: dashmap::DashMap::new(),
timestamps: dashmap::DashMap::new(),
client: client,
client,
db_name: db_name.to_string(),
}
}

pub async fn get<'a>(&self, name: &str) -> Result<Collection> {
pub async fn get(&self, name: &str) -> Result<Collection> {
if !self.local_exist(name) {
let resp = self
.client
.clone()
.describe_collection(DescribeCollectionRequest {
base: Some(MsgBase::new(MsgType::DescribeCollection)),
db_name: "".to_owned(),
db_name: self.db_name.clone(),
collection_name: name.into(),
collection_id: 0,
time_stamp: 0,
Expand Down Expand Up @@ -115,7 +120,7 @@ impl CollectionCache {
}

pub fn get_timestamp(&self, name: &str) -> Option<Timestamp> {
self.timestamps.get(name).map(|v| v.value().clone())
self.timestamps.get(name).map(|v| *v.value())
}

fn local_exist(&self, name: &str) -> bool {
Expand All @@ -134,7 +139,7 @@ impl From<proto::milvus::DescribeCollectionResponse> for Collection {
// num_partitions: value.partitions_num as usize,
consistency_level: ConsistencyLevel::from_i32(value.consistency_level).unwrap(),
description: schema.description,
fields: schema.fields.into_iter().map(|f| Field::from(f)).collect(),
fields: schema.fields.into_iter().map(FieldSchema::from).collect(),
// enable_dynamic_field: value.enable_dynamic_field,
}
}
Expand Down Expand Up @@ -182,8 +187,6 @@ impl From<GetCompactionStateResponse> for CompactionState {
}
}

type ConcurrentHashMap<K, V> = tokio::sync::RwLock<std::collections::HashMap<K, V>>;

impl Client {
/// Creates a new collection with the specified schema and options.
///
Expand Down Expand Up @@ -211,6 +214,7 @@ impl Client {
schema: buf.to_vec(),
shards_num: options.shard_num,
consistency_level: options.consistency_level as i32,
db_name: self.db_name.clone(),
..Default::default()
})
.await?
Expand Down Expand Up @@ -238,7 +242,7 @@ impl Client {
.drop_collection(DropCollectionRequest {
base: Some(MsgBase::new(MsgType::DropCollection)),
collection_name: name.into(),
..Default::default()
db_name: self.db_name.clone(),
})
.await?
.into_inner(),
Expand All @@ -256,6 +260,7 @@ impl Client {
.clone()
.show_collections(ShowCollectionsRequest {
base: Some(MsgBase::new(MsgType::ShowCollections)),
db_name: self.db_name.clone(),
..Default::default()
})
.await?
Expand Down Expand Up @@ -283,7 +288,7 @@ impl Client {
.clone()
.describe_collection(DescribeCollectionRequest {
base: Some(MsgBase::new(MsgType::DescribeCollection)),
db_name: "".to_owned(),
db_name: self.db_name.clone(),
collection_name: name.into(),
collection_id: 0,
time_stamp: 0,
Expand Down Expand Up @@ -315,7 +320,7 @@ impl Client {
.clone()
.has_collection(HasCollectionRequest {
base: Some(MsgBase::new(MsgType::HasCollection)),
db_name: "".to_string(),
db_name: self.db_name.clone(),
collection_name: name.clone(),
time_stamp: 0,
})
Expand Down Expand Up @@ -365,7 +370,7 @@ impl Client {
.clone()
.get_collection_statistics(proto::milvus::GetCollectionStatisticsRequest {
base: Some(MsgBase::new(MsgType::GetCollectionStatistics)),
db_name: "".into(),
db_name: self.db_name.clone(),
collection_name: name.to_owned(),
})
.await?
Expand Down Expand Up @@ -401,11 +406,12 @@ impl Client {
.clone()
.load_collection(LoadCollectionRequest {
base: Some(MsgBase::new(MsgType::LoadCollection)),
db_name: "".to_string(),
db_name: self.db_name.clone(),
collection_name: collection_name.clone(),
replica_number: options.replica_number,
resource_groups: vec![],
refresh: false,
..Default::default()
})
.await?
.into_inner(),
Expand Down Expand Up @@ -455,7 +461,7 @@ impl Client {
.clone()
.get_load_state(proto::milvus::GetLoadStateRequest {
base: Some(MsgBase::new(MsgType::Undefined)),
db_name: "".into(),
db_name: self.db_name.clone(),
collection_name: collection_name.into(),
partition_names: options.partition_names,
})
Expand Down Expand Up @@ -485,7 +491,7 @@ impl Client {
.clone()
.release_collection(ReleaseCollectionRequest {
base: Some(MsgBase::new(MsgType::ReleaseCollection)),
db_name: "".to_string(),
db_name: self.db_name.clone(),
collection_name: collection_name.into(),
})
.await?
Expand All @@ -502,7 +508,7 @@ impl Client {
.clone()
.flush(FlushRequest {
base: Some(MsgBase::new(MsgType::Flush)),
db_name: "".to_string(),
db_name: self.db_name.clone(),
collection_names: vec![collection_name.into()],
})
.await?
Expand All @@ -528,7 +534,7 @@ impl Client {
.clone()
.create_index(CreateIndexRequest {
base: Some(MsgBase::new(MsgType::CreateIndex)),
db_name: "".to_string(),
db_name: self.db_name.clone(),
collection_name: collection_name.into(),
field_name,
extra_params: index_params.extra_kv_params(),
Expand Down Expand Up @@ -593,7 +599,7 @@ impl Client {
.clone()
.describe_index(DescribeIndexRequest {
base: Some(MsgBase::new(MsgType::DescribeIndex)),
db_name: "".to_string(),
db_name: self.db_name.clone(),
collection_name: collection_name.into(),
field_name: field_name.into(),
index_name: "".to_string(),
Expand All @@ -615,7 +621,7 @@ impl Client {
.clone()
.drop_index(DropIndexRequest {
base: Some(MsgBase::new(MsgType::DropIndex)),
db_name: "".to_string(),
db_name: self.db_name.clone(),
collection_name: collection_name.into(),
field_name: field_name.into(),
index_name: "".to_string(),
Expand All @@ -637,6 +643,8 @@ impl Client {
.manual_compaction(ManualCompactionRequest {
collection_id: collection.id,
timetravel: 0,
db_name: self.db_name.clone(),
..Default::default()
})
.await?
.into_inner();
Expand Down
4 changes: 4 additions & 0 deletions src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ impl FieldColumn {
self.value.len() / self.dim as usize
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}

pub fn copy_with_metadata(&self) -> Self {
Self {
dim: self.dim,
Expand Down
Loading