Skip to content

Commit fd246bd

Browse files
tabVersionlmatz
authored andcommitted
feat: support more types for canal json (#10053)
1 parent f67b9b5 commit fd246bd

File tree

2 files changed

+60
-72
lines changed

2 files changed

+60
-72
lines changed

src/connector/src/parser/canal/simd_json_parser.rs

Lines changed: 26 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,20 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::str::FromStr;
16-
1715
use anyhow::anyhow;
1816
use futures_async_stream::try_stream;
19-
use risingwave_common::cast::{
20-
str_to_date, str_to_time, str_to_timestamp, str_with_time_zone_to_timestamptz,
21-
};
22-
use risingwave_common::error::ErrorCode::{InternalError, ProtocolError};
17+
use risingwave_common::error::ErrorCode::ProtocolError;
2318
use risingwave_common::error::{Result, RwError};
24-
use risingwave_common::types::{DataType, Datum, Decimal, ScalarImpl};
19+
use risingwave_common::types::{DataType, Datum};
2520
use risingwave_common::util::iter_util::ZipEqFast;
2621
use simd_json::{BorrowedValue, StaticNode, ValueAccess};
2722

23+
use crate::impl_common_parser_logic;
2824
use crate::parser::canal::operators::*;
29-
use crate::parser::common::json_object_smart_get_value;
25+
use crate::parser::common::{do_parse_simd_json_value, json_object_smart_get_value};
3026
use crate::parser::util::at_least_one_ok;
3127
use crate::parser::{SourceStreamChunkRowWriter, WriteGuard};
32-
use crate::source::{SourceColumnDesc, SourceContextRef};
33-
use crate::{ensure_rust_type, ensure_str, impl_common_parser_logic};
28+
use crate::source::{SourceColumnDesc, SourceContextRef, SourceFormat};
3429

3530
const AFTER: &str = "data";
3631
const BEFORE: &str = "old";
@@ -104,7 +99,6 @@ impl CanalJsonParser {
10499
})
105100
})
106101
.collect::<Vec<Result<_>>>();
107-
108102
at_least_one_ok(results)
109103
}
110104
CANAL_UPDATE_EVENT => {
@@ -152,7 +146,6 @@ impl CanalJsonParser {
152146
})
153147
})
154148
.collect::<Vec<Result<_>>>();
155-
156149
at_least_one_ok(results)
157150
}
158151
CANAL_DELETE_EVENT => {
@@ -195,70 +188,41 @@ fn cannal_simd_json_parse_value(
195188
) -> Result<Datum> {
196189
match value {
197190
None | Some(BorrowedValue::Static(StaticNode::Null)) => Ok(None),
198-
Some(v) => Ok(Some(cannal_do_parse_simd_json_value(dtype, v).map_err(
199-
|e| {
191+
Some(v) => Ok(Some(
192+
do_parse_simd_json_value(&SourceFormat::CanalJson, dtype, v).map_err(|e| {
200193
tracing::warn!("failed to parse type '{}' from json: {}", dtype, e);
201194
anyhow!("failed to parse type '{}' from json: {}", dtype, e)
202-
},
203-
)?)),
195+
})?,
196+
)),
204197
}
205198
}
206199

207-
#[inline]
208-
fn cannal_do_parse_simd_json_value(dtype: &DataType, v: &BorrowedValue<'_>) -> Result<ScalarImpl> {
209-
let v = match dtype {
210-
// mysql use tinyint to represent boolean
211-
DataType::Boolean => ScalarImpl::Bool(ensure_rust_type!(v, i16) != 0),
212-
DataType::Int16 => ScalarImpl::Int16(ensure_rust_type!(v, i16)),
213-
DataType::Int32 => ScalarImpl::Int32(ensure_rust_type!(v, i32)),
214-
DataType::Int64 => ScalarImpl::Int64(ensure_rust_type!(v, i64)),
215-
DataType::Float32 => ScalarImpl::Float32(ensure_rust_type!(v, f32).into()),
216-
DataType::Float64 => ScalarImpl::Float64(ensure_rust_type!(v, f64).into()),
217-
// FIXME: decimal should have more precision than f64
218-
DataType::Decimal => Decimal::from_str(ensure_str!(v, "string"))
219-
.map_err(|_| anyhow!("parse decimal from string err {}", v))?
220-
.into(),
221-
DataType::Varchar => ensure_str!(v, "varchar").to_string().into(),
222-
DataType::Date => str_to_date(ensure_str!(v, "date"))?.into(),
223-
DataType::Time => str_to_time(ensure_str!(v, "time"))?.into(),
224-
DataType::Timestamp => str_to_timestamp(ensure_str!(v, "string"))?.into(),
225-
DataType::Timestamptz => {
226-
str_with_time_zone_to_timestamptz(ensure_str!(v, "string"))?.into()
227-
}
228-
_ => {
229-
return Err(RwError::from(InternalError(format!(
230-
"cannal data source not support type {}",
231-
dtype
232-
))))
233-
}
234-
};
235-
Ok(v)
236-
}
237-
238200
#[cfg(test)]
239201
mod tests {
240-
241202
use std::str::FromStr;
242203

243204
use risingwave_common::array::Op;
244205
use risingwave_common::cast::str_to_timestamp;
245206
use risingwave_common::row::Row;
246-
use risingwave_common::types::{DataType, Decimal, ScalarImpl, ToOwnedDatum};
207+
use risingwave_common::types::{DataType, Decimal, JsonbVal, ScalarImpl, ToOwnedDatum};
208+
use serde_json::Value;
247209

248210
use super::*;
249211
use crate::parser::SourceStreamChunkBuilder;
250212
use crate::source::SourceColumnDesc;
251213

252214
#[tokio::test]
253215
async fn test_data_types() {
254-
let payload = br#"{"id":0,"database":"test","table":"data_type","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1682057341424,"ts":1682057382913,"sql":"","sqlType":{"id":4,"tinyint":-6,"smallint":5,"mediumint":4,"int":4,"bigint":-5,"float":7,"double":8,"decimal":3,"date":91,"datetime":93,"time":92,"timestamp":93,"char":1,"varchar":12,"binary":2004,"varbinary":2004,"blob":2004,"text":2005,"enum":4,"set":-7},"mysqlType":{"binary":"binary","varbinary":"varbinary","enum":"enum","set":"set","bigint":"bigint","float":"float","datetime":"datetime","varchar":"varchar","smallint":"smallint","mediumint":"mediumint","double":"double","date":"date","char":"char","id":"int","tinyint":"tinyint","decimal":"decimal","blob":"blob","text":"text","int":"int","time":"time","timestamp":"timestamp"},"old":null,"data":[{"id":"1","tinyint":"5","smallint":"136","mediumint":"172113","int":"1801160058","bigint":"3916589616287113937","float":"0","double":"0.15652","decimal":"1.20364700","date":"2023-04-20","datetime":"2023-02-15 13:01:36","time":"20:23:41","timestamp":"2022-10-13 12:12:54","char":"Kathleen","varchar":"atque esse fugiat et quibusdam qui.","binary":"Joseph\u0000\u0000\u0000\u0000","varbinary":"Douglas","blob":"ducimus ut in commodi necessitatibus error magni repellat exercitationem!","text":"rerum sunt nulla quo quibusdam velit doloremque.","enum":"1","set":"1"}]}"#;
216+
let payload = br#"{"id":0,"database":"test","table":"data_type","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1682057341424,"ts":1682057382913,"sql":"","sqlType":{"id":4,"tinyint":-6,"smallint":5,"mediumint":4,"int":4,"bigint":-5,"float":7,"double":8,"decimal":3,"date":91,"datetime":93,"time":92,"timestamp":93,"char":1,"varchar":12,"binary":2004,"varbinary":2004,"blob":2004,"text":2005,"enum":4,"set":-7,"json":12},"mysqlType":{"binary":"binary","varbinary":"varbinary","enum":"enum","set":"set","bigint":"bigint","float":"float","datetime":"datetime","varchar":"varchar","smallint":"smallint","mediumint":"mediumint","double":"double","date":"date","char":"char","id":"int","tinyint":"tinyint","decimal":"decimal","blob":"blob","text":"text","int":"int","time":"time","timestamp":"timestamp","json":"json"},"old":null,"data":[{"id":"1","tinyint":"5","smallint":"136","mediumint":"172113","int":"1801160058","bigint":"3916589616287113937","float":"0","double":"0.15652","decimal":"1.20364700","date":"2023-04-20","datetime":"2023-02-15 13:01:36","time":"20:23:41","timestamp":"2022-10-13 12:12:54","char":"Kathleen","varchar":"atque esse fugiat et quibusdam qui.","binary":"Joseph\u0000\u0000\u0000\u0000","varbinary":"Douglas","blob":"ducimus ut in commodi necessitatibus error magni repellat exercitationem!","text":"rerum sunt nulla quo quibusdam velit doloremque.","enum":"1","set":"1","json":"{\"a\": 1, \"b\": 2}"}]}"#;
255217
let descs = vec![
256218
SourceColumnDesc::simple("id", DataType::Int32, 0.into()),
257219
SourceColumnDesc::simple("date", DataType::Date, 1.into()),
258220
SourceColumnDesc::simple("datetime", DataType::Timestamp, 2.into()),
259221
SourceColumnDesc::simple("time", DataType::Time, 3.into()),
260222
SourceColumnDesc::simple("timestamp", DataType::Timestamp, 4.into()),
261223
SourceColumnDesc::simple("char", DataType::Varchar, 5.into()),
224+
SourceColumnDesc::simple("binary", DataType::Bytea, 6.into()),
225+
SourceColumnDesc::simple("json", DataType::Jsonb, 7.into()),
262226
];
263227
let parser = CanalJsonParser::new(descs.clone(), Default::default()).unwrap();
264228

@@ -299,6 +263,18 @@ mod tests {
299263
row.datum_at(5).to_owned_datum(),
300264
Some(ScalarImpl::Utf8(Box::from("Kathleen".to_string())))
301265
);
266+
assert_eq!(
267+
row.datum_at(6).to_owned_datum(),
268+
Some(ScalarImpl::Bytea(Box::from(
269+
"Joseph\u{0}\u{0}\u{0}\u{0}".as_bytes()
270+
)))
271+
);
272+
assert_eq!(
273+
row.datum_at(7).to_owned_datum(),
274+
Some(ScalarImpl::Jsonb(JsonbVal::from(Value::from(
275+
"{\"a\": 1, \"b\": 2}".to_string()
276+
))))
277+
);
302278
}
303279

304280
#[tokio::test]

src/connector/src/parser/common.rs

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ use simd_json::value::StaticNode;
3030
use simd_json::{BorrowedValue, ValueAccess};
3131

3232
use crate::source::SourceFormat;
33-
use crate::{ensure_i16, ensure_i32, ensure_i64, ensure_str, simd_json_ensure_float};
33+
use crate::{
34+
ensure_i16, ensure_i32, ensure_i64, ensure_rust_type, ensure_str, simd_json_ensure_float,
35+
};
3436
pub(crate) fn json_object_smart_get_value<'a, 'b>(
3537
v: &'b simd_json::BorrowedValue<'a>,
3638
key: Cow<'b, str>,
@@ -47,13 +49,14 @@ pub(crate) fn json_object_smart_get_value<'a, 'b>(
4749
None
4850
}
4951

50-
fn do_parse_simd_json_value(
52+
pub(crate) fn do_parse_simd_json_value(
5153
format: &SourceFormat,
5254
dtype: &DataType,
5355
v: &BorrowedValue<'_>,
5456
) -> Result<ScalarImpl> {
55-
let v = match dtype {
56-
DataType::Boolean => match v {
57+
let v = match (dtype, format) {
58+
(DataType::Boolean, SourceFormat::CanalJson) => (ensure_rust_type!(v, i16) != 0).into(),
59+
(DataType::Boolean, _) => match v {
5760
BorrowedValue::Static(StaticNode::Bool(b)) => (*b).into(),
5861
// debezium converts bool to int, false -> 0, true -> 1, for mysql and postgres
5962
BorrowedValue::Static(v) => match v.as_i64() {
@@ -63,13 +66,18 @@ fn do_parse_simd_json_value(
6366
},
6467
_ => anyhow::bail!("expect bool, but found {v}"),
6568
},
66-
DataType::Int16 => ensure_i16!(v, i16).into(),
67-
DataType::Int32 => ensure_i32!(v, i32).into(),
68-
DataType::Int64 => ensure_i64!(v, i64).into(),
69-
DataType::Int256 => Int256::from_str(ensure_str!(v, "quoted int256"))?.into(),
70-
DataType::Serial => anyhow::bail!("serial should not be parsed"),
69+
(DataType::Int16, SourceFormat::CanalJson) => ensure_rust_type!(v, i16).into(),
70+
(DataType::Int16, _) => ensure_i16!(v, i16).into(),
71+
(DataType::Int32, SourceFormat::CanalJson) => ensure_rust_type!(v, i32).into(),
72+
(DataType::Int32, _) => ensure_i32!(v, i32).into(),
73+
(DataType::Int64, SourceFormat::CanalJson) => ensure_rust_type!(v, i64).into(),
74+
(DataType::Int64, _) => ensure_i64!(v, i64).into(),
75+
(DataType::Int256, _) => Int256::from_str(ensure_str!(v, "quoted int256"))?.into(),
76+
(DataType::Serial, _) => anyhow::bail!("serial should not be parsed"),
77+
// if the value is too large, str parsing to f32 will fail
78+
(DataType::Float32, SourceFormat::CanalJson) => ensure_rust_type!(v, f32).into(),
7179
// when f32 overflows, the value is converted to `inf` which is inappropriate
72-
DataType::Float32 => {
80+
(DataType::Float32, _) => {
7381
let scalar_val = ScalarImpl::Float32((simd_json_ensure_float!(v, f32) as f32).into());
7482
if let ScalarImpl::Float32(f) = scalar_val {
7583
if f.0.is_infinite() {
@@ -78,13 +86,17 @@ fn do_parse_simd_json_value(
7886
}
7987
scalar_val
8088
}
81-
DataType::Float64 => simd_json_ensure_float!(v, f64).into(),
89+
(DataType::Float64, SourceFormat::CanalJson) => ensure_rust_type!(v, f64).into(),
90+
(DataType::Float64, _) => simd_json_ensure_float!(v, f64).into(),
91+
(DataType::Decimal, SourceFormat::CanalJson) => Decimal::from_str(ensure_str!(v, "string"))
92+
.map_err(|_| anyhow!("parse decimal from string err {}", v))?
93+
.into(),
8294
// FIXME: decimal should have more precision than f64
83-
DataType::Decimal => Decimal::try_from(simd_json_ensure_float!(v, Decimal))
95+
(DataType::Decimal, _) => Decimal::try_from(simd_json_ensure_float!(v, Decimal))
8496
.map_err(|_| anyhow!("expect decimal"))?
8597
.into(),
86-
DataType::Varchar => ensure_str!(v, "varchar").to_string().into(),
87-
DataType::Bytea => match format {
98+
(DataType::Varchar, _) => ensure_str!(v, "varchar").to_string().into(),
99+
(DataType::Bytea, _) => match format {
88100
// debezium converts postgres bytea to base64 format
89101
SourceFormat::DebeziumJson => ScalarImpl::Bytea(
90102
base64::engine::general_purpose::STANDARD
@@ -94,15 +106,15 @@ fn do_parse_simd_json_value(
94106
),
95107
_ => ScalarImpl::Bytea(str_to_bytea(ensure_str!(v, "bytea")).map_err(|e| anyhow!(e))?),
96108
},
97-
DataType::Date => match v {
109+
(DataType::Date, _) => match v {
98110
BorrowedValue::String(s) => str_to_date(s).map_err(|e| anyhow!(e))?.into(),
99111
BorrowedValue::Static(_) => {
100112
// debezium converts date to i32 for mysql and postgres
101113
Date::with_days_since_unix_epoch(ensure_i32!(v, i32))?.into()
102114
}
103115
_ => anyhow::bail!("expect date, but found {v}"),
104116
},
105-
DataType::Time => {
117+
(DataType::Time, _) => {
106118
match v {
107119
BorrowedValue::String(s) => str_to_time(s).map_err(|e| anyhow!(e))?.into(),
108120
BorrowedValue::Static(_) => {
@@ -123,14 +135,14 @@ fn do_parse_simd_json_value(
123135
_ => anyhow::bail!("expect time, but found {v}"),
124136
}
125137
}
126-
DataType::Timestamp => match v {
138+
(DataType::Timestamp, _) => match v {
127139
BorrowedValue::String(s) => str_to_timestamp(s).map_err(|e| anyhow!(e))?.into(),
128140
BorrowedValue::Static(_) => i64_to_timestamp(ensure_i64!(v, i64))
129141
.map_err(|e| anyhow!(e))?
130142
.into(),
131143
_ => anyhow::bail!("expect timestamp, but found {v}"),
132144
},
133-
DataType::Timestamptz => match v {
145+
(DataType::Timestamptz, _) => match v {
134146
BorrowedValue::String(s) => str_with_time_zone_to_timestamptz(s)
135147
.map_err(|e| anyhow!(e))?
136148
.into(),
@@ -139,7 +151,7 @@ fn do_parse_simd_json_value(
139151
.into(),
140152
_ => anyhow::bail!("expect timestamptz, but found {v}"),
141153
},
142-
DataType::Jsonb => {
154+
(DataType::Jsonb, _) => {
143155
// jsonb will be output as a string in debezium format
144156
if *format == SourceFormat::DebeziumJson {
145157
ScalarImpl::Jsonb(JsonbVal::from_str(ensure_str!(v, "jsonb"))?)
@@ -149,7 +161,7 @@ fn do_parse_simd_json_value(
149161
ScalarImpl::Jsonb(JsonbVal::from_serde(v))
150162
}
151163
}
152-
DataType::Struct(struct_type_info) => {
164+
(DataType::Struct(struct_type_info), _) => {
153165
let fields: Vec<Option<ScalarImpl>> = struct_type_info
154166
.field_names
155167
.iter()
@@ -164,7 +176,7 @@ fn do_parse_simd_json_value(
164176
.collect::<Result<Vec<Datum>>>()?;
165177
ScalarImpl::Struct(StructValue::new(fields))
166178
}
167-
DataType::List(item_type) => {
179+
(DataType::List(item_type), _) => {
168180
if let BorrowedValue::Array(values) = v {
169181
let values = values
170182
.iter()
@@ -179,7 +191,7 @@ fn do_parse_simd_json_value(
179191
return Err(anyhow!(err_msg));
180192
}
181193
}
182-
DataType::Interval => match format {
194+
(DataType::Interval, _) => match format {
183195
SourceFormat::DebeziumJson => {
184196
ScalarImpl::Interval(Interval::from_iso_8601(ensure_str!(v, "interval"))?)
185197
}

0 commit comments

Comments
 (0)