Skip to content

Commit 9d7a52e

Browse files
authored
feat: extract multi-pk/uk tables by batch (#436)
feat: extract multi pk/uk tables by batch
1 parent ad22de2 commit 9d7a52e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+1636
-418
lines changed

dt-common/src/meta/adaptor/mysql_col_value_convertor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -324,8 +324,8 @@ impl MysqlColValueConvertor {
324324
| MysqlColType::TinyBlob
325325
| MysqlColType::MediumBlob
326326
| MysqlColType::Blob
327-
| MysqlColType::LongBlob
328-
| MysqlColType::Unknown => {
327+
| MysqlColType::LongBlob => ColValue::Blob(hex::decode(value_str)?),
328+
MysqlColType::Unknown => {
329329
bail! {Error::Unexpected(format!(
330330
"unsupported column type: {:?}",
331331
col_type

dt-common/src/meta/adaptor/pg_col_value_convertor.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ impl PgColValueConvertor {
3131
value_str: &str,
3232
meta_manager: &mut PgMetaManager,
3333
) -> anyhow::Result<ColValue> {
34+
if value_str.is_empty() {
35+
return Ok(ColValue::None);
36+
}
37+
3438
if col_type.parent_oid != 0 {
3539
let parent_col_type = meta_manager.get_col_type_by_oid(col_type.parent_oid)?;
3640
return Self::from_str(&parent_col_type, value_str, meta_manager);
@@ -75,7 +79,7 @@ impl PgColValueConvertor {
7579
let bytes = hex::decode(value_str.trim_start_matches(r#"\x"#))?;
7680
ColValue::Blob(bytes)
7781
} else {
78-
ColValue::String(value_str)
82+
ColValue::Blob(hex::decode(value_str)?)
7983
}
8084
}
8185

dt-common/src/meta/avro/avro_converter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl AvroConverter {
5757
pub async fn row_data_to_avro_key(&mut self, row_data: &RowData) -> anyhow::Result<String> {
5858
if let Some(tb_meta) = self.get_tb_meta(row_data).await? {
5959
let convert = |col_values: &HashMap<String, ColValue>| {
60-
if let Some(col) = &tb_meta.order_col {
60+
if let Some(col) = tb_meta.order_cols.first() {
6161
if let Some(value) = col_values.get(col) {
6262
return value.to_option_string();
6363
}

dt-common/src/meta/col_value.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ use std::{
66
use mongodb::bson::Document;
77
use serde::{Deserialize, Serialize, Serializer};
88

9-
use crate::utils::sql_util::SqlUtil;
10-
119
// #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1210
// #[serde(tag = "type", content = "value")]
1311
#[derive(Debug, Clone, PartialEq, Deserialize)]
@@ -86,7 +84,7 @@ impl ColValue {
8684
ColValue::Timestamp(v) => Some(v.to_string()),
8785
ColValue::Year(v) => Some(v.to_string()),
8886
ColValue::String(v) => Some(v.to_string()),
89-
ColValue::RawString(v) => Some(SqlUtil::binary_to_str(v).0),
87+
ColValue::RawString(v) => Some(hex::encode(v)),
9088
ColValue::Bit(v) => Some(v.to_string()),
9189
ColValue::Set(v) => Some(v.to_string()),
9290
ColValue::Set2(v) => Some(v.to_string()),
@@ -95,7 +93,7 @@ impl ColValue {
9593
ColValue::Json(v) => Some(format!("{:?}", v)),
9694
ColValue::Json2(v) => Some(v.to_string()),
9795
ColValue::Json3(v) => Some(v.to_string()),
98-
ColValue::Blob(v) => Some(SqlUtil::binary_to_str(v).0),
96+
ColValue::Blob(v) => Some(hex::encode(v)),
9997
ColValue::MongoDoc(v) => Some(v.to_string()),
10098
ColValue::Bool(v) => Some(v.to_string()),
10199
ColValue::None => Option::None,

dt-common/src/meta/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub mod foxlake;
1212
pub mod kafka;
1313
pub mod mongo;
1414
pub mod mysql;
15+
pub mod order_key;
1516
pub mod pg;
1617
pub mod position;
1718
pub mod rdb_meta_manager;

dt-common/src/meta/mysql/mysql_dbengine_meta_center.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ impl MysqlDbEngineMetaCenter {
6060
let query = sqlx::query(&ddl_data.query);
6161
if let Err(error) = query.execute(&conn_pool).await {
6262
if self.ddl_conflict_policy == ConflictPolicyEnum::Ignore {
63-
log_error!("failed to sync dll to meta_center: {}", error);
63+
log_error!("failed to sync ddl to meta_center: {}", error);
6464
} else {
6565
conn_pool.close().await;
6666
bail!(error);

dt-common/src/meta/mysql/mysql_meta_fetcher.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1-
use std::collections::HashMap;
1+
use std::collections::{HashMap, HashSet};
22

3-
use crate::{config::config_enums::DbType, error::Error, meta::ddl_meta::ddl_data::DdlData};
3+
use crate::{
4+
config::config_enums::DbType,
5+
error::Error,
6+
meta::{ddl_meta::ddl_data::DdlData, rdb_meta_manager::RDB_PRIMARY_KEY_FLAG},
7+
};
48
use anyhow::{bail, Ok};
59
use futures::TryStreamExt;
610

@@ -82,11 +86,11 @@ impl MysqlMetaFetcher {
8286
) -> anyhow::Result<&'a MysqlTbMeta> {
8387
let full_name = format!("{}.{}", schema, tb);
8488
if !self.cache.contains_key(&full_name) {
85-
let (cols, col_origin_type_map, col_type_map) =
89+
let (cols, col_origin_type_map, col_type_map, nullable_cols) =
8690
Self::parse_cols(&self.conn_pool, &self.db_type, schema, tb).await?;
8791
let key_map = Self::parse_keys(&self.conn_pool, schema, tb).await?;
88-
let (order_col, partition_col, id_cols) =
89-
RdbMetaManager::parse_rdb_cols(&key_map, &cols)?;
92+
let (order_cols, partition_col, id_cols) =
93+
RdbMetaManager::parse_rdb_cols(&key_map, &cols, &nullable_cols)?;
9094
// disable get_foreign_keys since we don't support foreign key check,
9195
// also querying them is very slow, which may cause terrible performance issue if there were many tables in a CDC task.
9296
let (foreign_keys, ref_by_foreign_keys) = (vec![], vec![]);
@@ -96,10 +100,12 @@ impl MysqlMetaFetcher {
96100
let basic = RdbTbMeta {
97101
schema: schema.to_string(),
98102
tb: tb.to_string(),
103+
order_cols_are_nullable: order_cols.iter().any(|col| nullable_cols.contains(col)),
99104
cols,
105+
nullable_cols,
100106
col_origin_type_map,
101107
key_map,
102-
order_col,
108+
order_cols,
103109
partition_col,
104110
id_cols,
105111
foreign_keys,
@@ -123,10 +129,12 @@ impl MysqlMetaFetcher {
123129
Vec<String>,
124130
HashMap<String, String>,
125131
HashMap<String, MysqlColType>,
132+
HashSet<String>,
126133
)> {
127134
let mut cols = Vec::new();
128135
let mut col_origin_type_map = HashMap::new();
129136
let mut col_type_map = HashMap::new();
137+
let mut nullable_cols = HashSet::new();
130138

131139
let sql = if matches!(db_type, DbType::Mysql) {
132140
"SELECT * FROM information_schema.columns
@@ -153,7 +161,12 @@ impl MysqlMetaFetcher {
153161
cols.push(col.clone());
154162
let (origin_type, col_type) = Self::get_col_type(&row).await?;
155163
col_origin_type_map.insert(col.clone(), origin_type);
156-
col_type_map.insert(col, col_type);
164+
col_type_map.insert(col.clone(), col_type);
165+
166+
let is_nullable = row.try_get::<String, _>(IS_NULLABLE)?.to_lowercase() == "yes";
167+
if is_nullable {
168+
nullable_cols.insert(col);
169+
}
157170
}
158171

159172
if cols.is_empty() {
@@ -162,7 +175,7 @@ impl MysqlMetaFetcher {
162175
schema, tb
163176
)) }
164177
}
165-
Ok((cols, col_origin_type_map, col_type_map))
178+
Ok((cols, col_origin_type_map, col_type_map, nullable_cols))
166179
}
167180

168181
async fn get_col_type(row: &MySqlRow) -> anyhow::Result<(String, MysqlColType)> {
@@ -338,7 +351,10 @@ impl MysqlMetaFetcher {
338351
// | a | 0 | PRIMARY | 2 | value | A | 0 | NULL | NULL | | BTREE | | |
339352
// | a | 0 | some_uk_name | 1 | value | A | 0 | NULL | NULL | | BTREE | | |
340353
// +-------+------------+--------------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+
341-
let key_name: String = row.try_get("Key_name")?;
354+
let mut key_name: String = row.try_get("Key_name")?;
355+
if key_name == "PRIMARY" {
356+
key_name = RDB_PRIMARY_KEY_FLAG.to_string();
357+
}
342358
let col_name: String = row.try_get("Column_name")?;
343359
if let Some(key_cols) = key_map.get_mut(&key_name) {
344360
key_cols.push(col_name);

dt-common/src/meta/order_key.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use serde::{Deserialize, Serialize};
2+
use serde_json::json;
3+
4+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
5+
pub enum OrderKey {
6+
#[serde(rename = "single")]
7+
Single((String, Option<String>)),
8+
#[serde(rename = "composite")]
9+
Composite(Vec<(String, Option<String>)>),
10+
}
11+
12+
impl std::fmt::Display for OrderKey {
13+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
14+
write!(f, "{}", json!(self))
15+
}
16+
}
17+
18+
#[cfg(test)]
19+
mod tests {
20+
use super::*;
21+
#[test]
22+
fn test_from_str() {
23+
let strs = [
24+
r#"{"single":["id",null]}"#,
25+
r#"{"single":["id","1"]}"#,
26+
r#"{"composite":[["id","1"],["name",null]]}"#,
27+
r#"{"composite":[["id","1"],["name","test"]]}"#,
28+
];
29+
30+
let expected = [
31+
r#"{"single":["id",null]}"#,
32+
r#"{"single":["id","1"]}"#,
33+
r#"{"composite":[["id","1"],["name",null]]}"#,
34+
r#"{"composite":[["id","1"],["name","test"]]}"#,
35+
];
36+
37+
for (str, expected) in strs.iter().zip(expected.iter()) {
38+
let order_key: OrderKey = serde_json::from_str(str).unwrap();
39+
assert_eq!(expected, &serde_json::to_string(&order_key).unwrap());
40+
}
41+
}
42+
}

dt-common/src/meta/pg/pg_meta_manager.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
use std::collections::HashMap;
1+
use std::collections::{HashMap, HashSet};
22

3-
use crate::{error::Error, meta::ddl_meta::ddl_data::DdlData};
3+
use crate::{
4+
error::Error,
5+
meta::{ddl_meta::ddl_data::DdlData, rdb_meta_manager::RDB_PRIMARY_KEY_FLAG},
6+
};
47
use anyhow::{bail, Context};
58
use futures::TryStreamExt;
69
use sqlx::{Pool, Postgres, Row};
@@ -76,11 +79,11 @@ impl PgMetaManager {
7679
let full_name = format!(r#""{}"."{}""#, schema, tb);
7780
if !self.name_to_tb_meta.contains_key(&full_name) {
7881
let oid = Self::get_oid(&self.conn_pool, schema, tb).await?;
79-
let (cols, col_origin_type_map, col_type_map) =
82+
let (cols, col_origin_type_map, col_type_map, nullable_cols) =
8083
Self::parse_cols(&self.conn_pool, &mut self.type_registry, schema, tb).await?;
8184
let key_map = Self::parse_keys(&self.conn_pool, schema, tb).await?;
82-
let (order_col, partition_col, id_cols) =
83-
RdbMetaManager::parse_rdb_cols(&key_map, &cols)?;
85+
let (order_cols, partition_col, id_cols) =
86+
RdbMetaManager::parse_rdb_cols(&key_map, &cols, &nullable_cols)?;
8487
// disable get_foreign_keys since we don't support foreign key check
8588
let (foreign_keys, ref_by_foreign_keys) = (vec![], vec![]);
8689
// let (foreign_keys, ref_by_foreign_keys) =
@@ -89,10 +92,12 @@ impl PgMetaManager {
8992
let basic = RdbTbMeta {
9093
schema: schema.to_string(),
9194
tb: tb.to_string(),
95+
order_cols_are_nullable: order_cols.iter().any(|col| nullable_cols.contains(col)),
9296
cols,
97+
nullable_cols,
9398
col_origin_type_map,
9499
key_map,
95-
order_col,
100+
order_cols,
96101
partition_col,
97102
id_cols,
98103
foreign_keys,
@@ -133,22 +138,29 @@ impl PgMetaManager {
133138
Vec<String>,
134139
HashMap<String, String>,
135140
HashMap<String, PgColType>,
141+
HashSet<String>,
136142
)> {
137143
let mut cols = Vec::new();
138144
let mut col_origin_type_map = HashMap::new();
139145
let mut col_type_map = HashMap::new();
146+
let mut nullable_cols = HashSet::new();
140147

141148
// get cols of the table
142149
let sql = format!(
143-
"SELECT column_name FROM information_schema.columns
150+
"SELECT column_name, is_nullable FROM information_schema.columns
144151
WHERE table_schema='{}' AND table_name = '{}'
145152
ORDER BY ordinal_position;",
146153
schema, tb
147154
);
148155
let mut rows = sqlx::query(&sql).fetch(conn_pool);
149156
while let Some(row) = rows.try_next().await? {
150157
let col: String = row.try_get("column_name")?;
151-
cols.push(col);
158+
cols.push(col.clone());
159+
160+
let is_nullable = row.try_get::<String, _>("is_nullable")?.to_lowercase() == "yes";
161+
if is_nullable {
162+
nullable_cols.insert(col);
163+
}
152164
}
153165

154166
// get col_type_oid of the table
@@ -178,7 +190,7 @@ impl PgMetaManager {
178190
col_type_map.insert(col, col_type);
179191
}
180192

181-
Ok((cols, col_origin_type_map, col_type_map))
193+
Ok((cols, col_origin_type_map, col_type_map, nullable_cols))
182194
}
183195

184196
async fn parse_keys(
@@ -214,7 +226,7 @@ impl PgMetaManager {
214226
let constraint_type: String = row.try_get("constraint_type")?;
215227
let mut key_name: String = row.try_get("constraint_name")?;
216228
if constraint_type == "PRIMARY KEY" {
217-
key_name = "primary".to_string();
229+
key_name = RDB_PRIMARY_KEY_FLAG.to_string();
218230
}
219231

220232
// key_map

0 commit comments

Comments
 (0)