Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion docs/en/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ Different tasks may require extra configs, refer to [task templates](/docs/templ
| password | database connection password | password | - |
| max_connections | max connections for source database | 10 | currently 10, may be dynamically adjusted in the future |
| batch_size | number of extracted records in a batch | 10000 | same as [pipeline] buffer_size |
| parallel_size | number of workers for extracting a table | 4 | 1 |
| partition_cols | partition column for data splitting during snapshot migration, only single column supported | json:[{"db":"db_1","tb":"tb_1","partition_col":"id"},{"db":"db_2","tb":"tb_2","partition_col":"id"}] | - |

## URL escaping

Expand Down Expand Up @@ -59,7 +61,7 @@ url=mysql://user1:abc%25%24%23%3F%[email protected]:3307?ssl-mode=disabled
- All configurations support multiple items, which are separated by ",". Example: do_dbs=db_1,db_2.
- Set to \* to match all. Example: do_dbs=\*.
- Keep empty to match nothing. Example: ignore_dbs=.
- ignore_cols and where_conditions are in JSON format, it should starts with "json:".
- `ignore_cols` and `where_conditions` are in JSON format, it should starts with "json:".
- do_events takes one or more values from **insert**, **update**, and **delete**.

## Priority
Expand Down
3 changes: 2 additions & 1 deletion docs/zh/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
| password | 数据库连接密码 | password | - |
| max_connections | 最大连接数 | 10 | 目前是 10,未来可能会动态适配 |
| batch_size | 批量拉取数据条数 | 10000 | 和 [pipeline] buffer_size 一致 |

| parallel_size | 全量同步时,单表并行拉取任务数 | 4 | 1 |
| partition_cols | 全量同步时,指定分区列,用于数据切分,仅支持单列 | json:[{"db":"db_1","tb":"tb_1","partition_col":"id"},{"db":"db_2","tb":"tb_2","partition_col":"id"}] | - |
## url 转义

- 如果用户名/密码中包含特殊字符,需要对相应部分进行通用的 url 百分号转义,如:
Expand Down
3 changes: 3 additions & 0 deletions dt-common/src/config/extractor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub enum ExtractorConfig {
sample_interval: usize,
parallel_size: usize,
batch_size: usize,
partition_cols: String,
},

MysqlCdc {
Expand Down Expand Up @@ -65,7 +66,9 @@ pub enum ExtractorConfig {
schema: String,
tb: String,
sample_interval: usize,
parallel_size: usize,
batch_size: usize,
partition_cols: String,
},

PgCdc {
Expand Down
4 changes: 4 additions & 0 deletions dt-common/src/config/task_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ const URL: &str = "url";
const BATCH_SIZE: &str = "batch_size";
const MAX_CONNECTIONS: &str = "max_connections";
const SAMPLE_INTERVAL: &str = "sample_interval";
const PARTITION_COLS: &str = "partition_cols";
const HEARTBEAT_INTERVAL_SECS: &str = "heartbeat_interval_secs";
const KEEPALIVE_INTERVAL_SECS: &str = "keepalive_interval_secs";
const HEARTBEAT_TB: &str = "heartbeat_tb";
Expand Down Expand Up @@ -200,6 +201,7 @@ impl TaskConfig {
sample_interval: loader.get_with_default(EXTRACTOR, SAMPLE_INTERVAL, 1),
parallel_size: loader.get_with_default(EXTRACTOR, PARALLEL_SIZE, 1),
batch_size,
partition_cols: loader.get_optional(EXTRACTOR, PARTITION_COLS),
},

ExtractType::Cdc => ExtractorConfig::MysqlCdc {
Expand Down Expand Up @@ -284,7 +286,9 @@ impl TaskConfig {
schema: String::new(),
tb: String::new(),
sample_interval: loader.get_with_default(EXTRACTOR, SAMPLE_INTERVAL, 1),
parallel_size: loader.get_with_default(EXTRACTOR, PARALLEL_SIZE, 1),
batch_size,
partition_cols: loader.get_optional(EXTRACTOR, PARTITION_COLS),
},

ExtractType::Cdc => ExtractorConfig::PgCdc {
Expand Down
153 changes: 153 additions & 0 deletions dt-common/src/meta/col_value.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::{
cmp,
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
};

use anyhow::bail;
use mongodb::bson::{Bson, Document};
use serde::{Deserialize, Serialize, Serializer};

Expand Down Expand Up @@ -54,6 +56,103 @@ impl std::fmt::Display for ColValue {
}

impl ColValue {
pub fn is_integer(&self) -> bool {
matches!(
self,
Self::Tiny { .. }
| Self::UnsignedTiny { .. }
| Self::Short { .. }
| Self::UnsignedShort { .. }
| Self::Long { .. }
| Self::UnsignedLong { .. }
| Self::LongLong { .. }
| Self::UnsignedLongLong { .. }
)
}

pub fn is_float(&self) -> bool {
matches!(self, Self::Float { .. } | Self::Double { .. })
}

pub fn is_decimal(&self) -> bool {
matches!(self, Self::Decimal { .. })
}

pub fn is_string(&self) -> bool {
matches!(self, Self::String { .. })
}

pub fn convert_into_integer_128(&self) -> anyhow::Result<i128> {
match self {
Self::Tiny(v) => Ok(*v as i128),
Self::UnsignedTiny(v) => Ok(*v as i128),
Self::Short(v) => Ok(*v as i128),
Self::UnsignedShort(v) => Ok(*v as i128),
Self::Long(v) => Ok(*v as i128),
Self::UnsignedLong(v) => Ok(*v as i128),
Self::LongLong(v) => Ok(*v as i128),
Self::UnsignedLongLong(v) => Ok(*v as i128),
_ => bail!("can not convert {:?} into 128-bit integer", self),
}
}

pub fn add_integer_128(&self, t: i128) -> anyhow::Result<Self> {
match self {
Self::Tiny(v) => Ok(Self::Tiny(cmp::min(*v as i128 + t, i8::MAX as i128) as i8)),
Self::UnsignedTiny(v) => Ok(Self::UnsignedTiny(
cmp::min(*v as i128 + t, i8::MAX as i128) as u8,
)),
Self::Short(v) => Ok(Self::Short(
cmp::min(*v as i128 + t, i16::MAX as i128) as i16
)),
Self::UnsignedShort(v) => Ok(Self::UnsignedShort(cmp::min(
*v as i128 + t,
i16::MAX as i128,
) as u16)),
Self::Long(v) => Ok(Self::Long(cmp::min(*v as i128 + t, i32::MAX as i128) as i32)),
Self::UnsignedLong(v) => Ok(Self::UnsignedLong(cmp::min(
*v as i128 + t,
i32::MAX as i128,
) as u32)),
Self::LongLong(v) => Ok(Self::LongLong(
cmp::min(*v as i128 + t, i64::MAX as i128) as i64
)),
Self::UnsignedLongLong(v) => Ok(Self::UnsignedLongLong(cmp::min(
*v as i128 + t,
i64::MAX as i128,
) as u64)),
_ => bail!("{} can not add 128-bit integer", self),
}
}

pub fn convert_into_float_64(&self) -> anyhow::Result<f64> {
match self {
Self::Float(v) => Ok(*v as f64),
Self::Double(v) => Ok(*v),
_ => bail!("can not convert {:?} into double", self),
}
}

pub fn is_same_value(&self, other: &ColValue) -> bool {
match (self, other) {
(ColValue::Float(v1), ColValue::Float(v2)) => {
if v1.is_nan() && v2.is_nan() {
true
} else {
v1 == v2
}
}
(ColValue::Double(v1), ColValue::Double(v2)) => {
if v1.is_nan() && v2.is_nan() {
true
} else {
v1 == v2
}
}
_ => self == other,
}
}

pub fn hash_code(&self) -> u64 {
match self {
ColValue::None => 0,
Expand Down Expand Up @@ -284,3 +383,57 @@ impl From<Bson> for ColValue {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_is_same_value() {
let v1 = ColValue::Float(f32::NAN);
let v2 = ColValue::Double(f64::NAN);
let v3 = ColValue::None;
let v4 = ColValue::Long(7);

assert!(v1.is_same_value(&ColValue::Float(f32::NAN)));
assert!(v2.is_same_value(&ColValue::Double(f64::NAN)));
assert!(v3.is_same_value(&ColValue::None));
assert!(v4.is_same_value(&ColValue::Long(7)));
}

#[test]
fn test_add_integer_128() {
let cases = vec![
(ColValue::Tiny(10), 20, Some(ColValue::Tiny(30))),
(ColValue::Short(1000), 2000, Some(ColValue::Short(3000))),
(ColValue::Long(50), -20, Some(ColValue::Long(30))),
(ColValue::Tiny(100), 50, Some(ColValue::Tiny(127))),
// i64::MAX boundary check
(
ColValue::LongLong(i64::MAX - 5),
10,
Some(ColValue::LongLong(i64::MAX)),
),
(
ColValue::UnsignedTiny(100),
50,
Some(ColValue::UnsignedTiny(127)),
),
// --- Error Case ---
(ColValue::String("test".into()), 1, None),
];

for (index, (input, delta, expected)) in cases.into_iter().enumerate() {
let result = input.add_integer_128(delta);

match expected {
Some(exp_val) => {
assert_eq!(result.unwrap(), exp_val, "Failed at case #{}", index);
}
None => {
assert!(result.is_err(), "Case #{} should fail", index);
}
}
}
}
}
47 changes: 47 additions & 0 deletions dt-common/src/meta/mysql/mysql_col_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ pub enum MysqlColType {
}

impl MysqlColType {
pub fn is_integer(&self) -> bool {
matches!(
self,
Self::TinyInt { .. }
| Self::SmallInt { .. }
| Self::MediumInt { .. }
| Self::Int { .. }
| Self::BigInt { .. }
)
}

pub fn is_string(&self) -> bool {
matches!(
self,
Expand All @@ -104,4 +115,40 @@ impl MysqlColType {
| Self::LongText { .. }
)
}

pub fn can_be_splitted(&self) -> bool {
// Means wheather the type can be used in `max`/`min` aggregate operations and `order by` comparisons.
// Comparing Enum/Set types is different between `max`/`min` and `order by`, so we exclude them here.
// Compatible with mysql 5.7+. Reference: https://dev.mysql.com/doc/refman/5.7/en/aggregate-functions.html#function_max.
matches!(
self,
MysqlColType::TinyInt { .. }
| MysqlColType::SmallInt { .. }
| MysqlColType::MediumInt { .. }
| MysqlColType::Int { .. }
| MysqlColType::BigInt { .. }
| MysqlColType::Float
| MysqlColType::Double
| MysqlColType::Decimal { .. }
| MysqlColType::Date { .. }
| MysqlColType::DateTime { .. }
| MysqlColType::Time { .. }
| MysqlColType::Timestamp { .. }
| MysqlColType::Year
| MysqlColType::Char { .. }
| MysqlColType::Varchar { .. }
| MysqlColType::TinyText { .. }
| MysqlColType::MediumText { .. }
| MysqlColType::Text { .. }
| MysqlColType::LongText { .. }
| MysqlColType::Binary { .. }
| MysqlColType::VarBinary { .. }
| MysqlColType::TinyBlob
| MysqlColType::MediumBlob
| MysqlColType::Blob
| MysqlColType::LongBlob
| MysqlColType::Bit
| MysqlColType::Json
)
}
}
1 change: 0 additions & 1 deletion dt-common/src/meta/mysql/mysql_meta_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ impl MysqlMetaFetcher {
let basic = RdbTbMeta {
schema: schema.to_string(),
tb: tb.to_string(),
order_cols_are_nullable: order_cols.iter().any(|col| nullable_cols.contains(col)),
cols,
nullable_cols,
col_origin_type_map,
Expand Down
3 changes: 1 addition & 2 deletions dt-common/src/meta/order_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use serde::{Deserialize, Serialize};
use serde_json::json;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum OrderKey {
#[serde(rename = "single")]
Single((String, Option<String>)),
#[serde(rename = "composite")]
Composite(Vec<(String, Option<String>)>),
}

Expand Down
8 changes: 8 additions & 0 deletions dt-common/src/meta/pg/pg_col_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,12 @@ impl PgColType {
pub fn is_user_defined(&self) -> bool {
"U" == self.category
}

pub fn is_integer(&self) -> bool {
self.value_type.is_integer()
}

pub fn can_be_splitted(&self) -> bool {
self.value_type.can_be_splitted()
}
}
1 change: 0 additions & 1 deletion dt-common/src/meta/pg/pg_meta_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ impl PgMetaManager {
let basic = RdbTbMeta {
schema: schema.to_string(),
tb: tb.to_string(),
order_cols_are_nullable: order_cols.iter().any(|col| nullable_cols.contains(col)),
cols,
nullable_cols,
col_origin_type_map,
Expand Down
27 changes: 27 additions & 0 deletions dt-common/src/meta/pg/pg_value_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,31 @@ impl PgValueType {
_ => PgValueType::String,
}
}

pub fn is_integer(&self) -> bool {
matches!(
self,
Self::Int16 { .. } | Self::Int32 { .. } | Self::Int64 { .. }
)
}

pub fn can_be_splitted(&self) -> bool {
// Means wheather the type can be used in `max`/`min` aggregate operations and `order by` comparisons.
// Compatible with postgresql 14+. Reference: https://www.postgresql.org/docs/14/functions-aggregate.html
matches!(
self,
PgValueType::Int32
| PgValueType::Int16
| PgValueType::Int64
| PgValueType::Float32
| PgValueType::Float64
| PgValueType::Numeric
| PgValueType::TimestampTZ
| PgValueType::Timestamp
| PgValueType::Time
| PgValueType::TimeTZ
| PgValueType::Date
| PgValueType::String
)
}
}
Loading
Loading