Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ fs2 = "0.4"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "00e1d312ecc4b11ffb3c48161d61b03ed0e848a5" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5e358318890606a2961e12dcdf9674f6763cec3a" }
hex = "0.4"
http = "1"
humantime = "2.1"
Expand Down
45 changes: 45 additions & 0 deletions src/common/meta/src/ddl/create_table/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
use common_telemetry::warn;
use snafu::{OptionExt, ResultExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::region_request::RegionRequirements;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::{TableId, TableInfo};

Expand Down Expand Up @@ -70,6 +71,7 @@ pub fn build_template_from_raw_table_info(table_info: &TableInfo) -> Result<Crea
path: String::new(),
options,
partition: None,
requirements: None,
};

Ok(template)
Expand Down Expand Up @@ -126,6 +128,7 @@ pub fn build_template_from_raw_table_info_for_physical_table(
path: String::new(),
options,
partition: None,
requirements: None,
};

Ok(template)
Expand Down Expand Up @@ -188,6 +191,7 @@ pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<Crea
path: String::new(),
options: create_table_expr.table_options.clone(),
partition: None,
requirements: None,
};

Ok(template)
Expand All @@ -198,20 +202,27 @@ pub struct CreateRequestBuilder {
template: CreateRequest,
/// Optional. Only for metric engine.
physical_table_id: Option<TableId>,
requirements: RegionRequirements,
}

impl CreateRequestBuilder {
pub fn new(template: CreateRequest, physical_table_id: Option<TableId>) -> Self {
Self {
template,
physical_table_id,
requirements: RegionRequirements::empty(),
}
}

pub fn template(&self) -> &CreateRequest {
&self.template
}

pub fn with_requirements(mut self, requirements: RegionRequirements) -> Self {
self.requirements = requirements;
self
}

pub fn build_one(
&self,
region_id: RegionId,
Expand All @@ -223,6 +234,7 @@ impl CreateRequestBuilder {

request.region_id = region_id.as_u64();
request.path = storage_path;
request.requirements = Some(self.requirements.into());
// Stores the encoded wal options into the request options.
prepare_wal_options(&mut request.options, region_id, region_wal_options);
request.partition = Some(prepare_partition_expr(region_id, partition_exprs));
Expand Down Expand Up @@ -278,6 +290,7 @@ mod tests {
path: String::new(),
options: Default::default(),
partition: None,
requirements: None,
};
let builder = CreateRequestBuilder::new(template, None);

Expand All @@ -293,6 +306,38 @@ mod tests {
&partition_exprs,
);
assert_eq!(r0.partition.as_ref().unwrap().expression, expr_a);
assert_eq!(
r0.requirements.map(RegionRequirements::from),
Some(RegionRequirements::empty())
);
}

#[test]
fn test_build_one_sets_explicit_requirements() {
let template = CreateRequest {
region_id: 0,
engine: "mito".to_string(),
column_defs: vec![],
primary_key: vec![],
path: String::new(),
options: Default::default(),
partition: None,
requirements: None,
};
let builder = CreateRequestBuilder::new(template, None)
.with_requirements(RegionRequirements::object_storage());

let request = builder.build_one(
RegionId::new(42, 0),
"/p".to_string(),
&Default::default(),
&Default::default(),
);

assert_eq!(
request.requirements.map(RegionRequirements::from),
Some(RegionRequirements::object_storage())
);
}

#[test]
Expand Down
18 changes: 10 additions & 8 deletions src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ struct EngineInner {

type EngineInnerRef = Arc<EngineInner>;

fn ensure_open_requirements(
fn ensure_region_requirements(
requirements: RegionRequirements,
object_store: &ObjectStore,
) -> EngineResult<()> {
Expand Down Expand Up @@ -276,6 +276,8 @@ impl EngineInner {
return Ok(0);
}

ensure_region_requirements(request.requirements, &self.object_store)?;

let res = FileRegion::create(region_id, request, &self.object_store).await;
let region = res.inspect_err(|err| {
error!(
Expand Down Expand Up @@ -307,7 +309,7 @@ impl EngineInner {
return Ok(0);
}

ensure_open_requirements(request.requirements, &self.object_store)?;
ensure_region_requirements(request.requirements, &self.object_store)?;

let res = FileRegion::open(region_id, request, &self.object_store).await;
let region = res.inspect_err(|err| {
Expand Down Expand Up @@ -402,13 +404,13 @@ mod tests {
}

#[test]
fn test_empty_open_requirements_are_supported() {
ensure_open_requirements(RegionRequirements::empty(), &build_fs_object_store()).unwrap();
fn test_empty_region_requirements_are_supported() {
ensure_region_requirements(RegionRequirements::empty(), &build_fs_object_store()).unwrap();
}

#[test]
fn test_object_storage_open_requirement_rejects_fs_object_store() {
let err = ensure_open_requirements(
fn test_object_storage_region_requirement_rejects_fs_object_store() {
let err = ensure_region_requirements(
RegionRequirements::object_storage(),
&build_fs_object_store(),
)
Expand All @@ -418,8 +420,8 @@ mod tests {
}

#[test]
fn test_object_storage_open_requirement_accepts_s3_object_store() {
ensure_open_requirements(
fn test_object_storage_region_requirement_accepts_s3_object_store() {
ensure_region_requirements(
RegionRequirements::object_storage(),
&build_s3_object_store(),
)
Expand Down
3 changes: 3 additions & 0 deletions src/file-engine/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ mod tests {
table_dir: "create_region_dir/".to_string(),
path_type: PathType::Bare,
partition_expr_json: Some("".to_string()),
requirements: Default::default(),
};
let region_id = RegionId::new(1, 0);

Expand Down Expand Up @@ -167,6 +168,7 @@ mod tests {
table_dir: region_dir.clone(),
path_type: PathType::Bare,
partition_expr_json: Some("".to_string()),
requirements: Default::default(),
};
let region_id = RegionId::new(1, 0);

Expand Down Expand Up @@ -210,6 +212,7 @@ mod tests {
table_dir: region_dir.clone(),
path_type: PathType::Bare,
partition_expr_json: Some("".to_string()),
requirements: Default::default(),
};
let region_id = RegionId::new(1, 0);

Expand Down
17 changes: 14 additions & 3 deletions src/meta-srv/src/procedure/repartition/allocate_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{debug, info};
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::region_request::RegionRequirements;
use store_api::storage::{RegionId, RegionNumber, TableId};
use table::metadata::TableInfo;
use table::table_reference::TableReference;
Expand Down Expand Up @@ -382,7 +383,8 @@ impl AllocateRegion {
table_id,
request
);
let builder = CreateRequestBuilder::new(request, None);
let builder = CreateRequestBuilder::new(request, None)
.with_requirements(RegionRequirements::object_storage());
let region_count = region_routes.len();
let wal_region_count = wal_options.len();
info!(
Expand All @@ -404,6 +406,8 @@ mod tests {
use std::collections::HashMap;
use std::sync::Arc;

use api::v1::region::region_request::Body;
use common_meta::ddl::test_util::datanode_handler::DatanodeWatcher;
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::datanode_table::DatanodeTableKey;
use common_meta::peer::Peer;
Expand All @@ -412,7 +416,7 @@ mod tests {
use common_procedure::{ContextProvider, ProcedureId, ProcedureState};
use common_procedure_test::MockContextProvider;
use store_api::storage::RegionId;
use tokio::sync::watch;
use tokio::sync::{mpsc, watch};
use uuid::Uuid;

use super::*;
Expand Down Expand Up @@ -741,7 +745,8 @@ mod tests {
)
.await;

let node_manager = Arc::new(MockDatanodeManager::new(()));
let (sender, mut receiver) = mpsc::channel(1);
let node_manager = Arc::new(MockDatanodeManager::new(DatanodeWatcher::new(sender)));
let mut ctx = new_parent_context(&env, node_manager, table_id);
ctx.persistent_ctx.plans = vec![RepartitionPlanEntry {
group_id: Uuid::new_v4(),
Expand Down Expand Up @@ -770,6 +775,12 @@ mod tests {

state.next(&mut ctx, &procedure_ctx).await.unwrap();

let (_, request) = receiver.recv().await.unwrap();
let Some(Body::Create(create)) = request.body else {
unreachable!()
};
assert!(create.requirements.unwrap().object_storage);

let region_ids = current_parent_region_routes(&ctx)
.await
.into_iter()
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/procedure/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ fn test_region_request_builder() {
path: String::new(),
options: HashMap::new(),
partition: None,
requirements: None,
};
assert_eq!(template.template(), &expected);
}
Expand Down
17 changes: 16 additions & 1 deletion src/metric-engine/src/engine/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ impl MetricEngineInner {
table_dir: request.table_dir.clone(),
path_type: PathType::Metadata,
partition_expr_json: Some("".to_string()),
requirements: request.requirements,
}
}

Expand Down Expand Up @@ -654,7 +655,7 @@ mod test {
use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
use common_query::prelude::{greptime_timestamp, greptime_value};
use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
use store_api::region_request::BatchRegionDdlRequest;
use store_api::region_request::{BatchRegionDdlRequest, RegionRequirements};

use super::*;
use crate::config::EngineConfig;
Expand Down Expand Up @@ -699,6 +700,7 @@ mod test {
primary_key: vec![],
options: HashMap::new(),
partition_expr_json: Some("".to_string()),
requirements: RegionRequirements::object_storage(),
};
let result = MetricEngineInner::verify_region_create_request(&request);
assert!(result.is_err());
Expand Down Expand Up @@ -748,6 +750,7 @@ mod test {
.into_iter()
.collect(),
partition_expr_json: Some("".to_string()),
requirements: Default::default(),
};
MetricEngineInner::verify_region_create_request(&request).unwrap();

Expand Down Expand Up @@ -790,6 +793,7 @@ mod test {
.into_iter()
.collect(),
partition_expr_json: Some("".to_string()),
requirements: Default::default(),
};
MetricEngineInner::verify_region_create_request(&request).unwrap();
}
Expand Down Expand Up @@ -823,6 +827,7 @@ mod test {
primary_key: vec![],
options: HashMap::new(),
partition_expr_json: Some("".to_string()),
requirements: Default::default(),
};
MetricEngineInner::verify_region_create_request(&request).unwrap_err();

Expand Down Expand Up @@ -876,6 +881,7 @@ mod test {
table_dir: "/test_dir".to_string(),
path_type: PathType::Bare,
partition_expr_json: Some("".to_string()),
requirements: RegionRequirements::object_storage(),
};

// set up
Expand All @@ -893,6 +899,10 @@ mod test {
vec![ReservedColumnId::table_id(), ReservedColumnId::tsid(), 1]
);
assert!(data_region_request.options.contains_key("ttl"));
assert_eq!(
data_region_request.requirements,
RegionRequirements::object_storage()
);

// check create metadata region request
let metadata_region_request = engine_inner.create_request_for_metadata_region(&request);
Expand All @@ -903,6 +913,10 @@ mod test {
"forever"
);
assert!(!metadata_region_request.options.contains_key("skip_wal"));
assert_eq!(
metadata_region_request.requirements,
RegionRequirements::object_storage()
);
}

#[tokio::test]
Expand Down Expand Up @@ -951,6 +965,7 @@ mod test {
table_dir: "/test_dir".to_string(),
path_type: PathType::Bare,
partition_expr_json: Some("".to_string()),
requirements: Default::default(),
};

let env = TestEnv::new().await;
Expand Down
3 changes: 3 additions & 0 deletions src/metric-engine/src/engine/create/extract_new_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ mod tests {
table_dir: "test".to_string(),
path_type: PathType::Bare,
partition_expr_json: Some("".to_string()),
requirements: Default::default(),
},
),
(
Expand All @@ -118,6 +119,7 @@ mod tests {
table_dir: "test".to_string(),
path_type: PathType::Bare,
partition_expr_json: Some("".to_string()),
requirements: Default::default(),
},
),
];
Expand Down Expand Up @@ -171,6 +173,7 @@ mod tests {
table_dir: "test".to_string(),
path_type: PathType::Bare,
partition_expr_json: Some("".to_string()),
requirements: Default::default(),
},
)];

Expand Down
Loading
Loading