Skip to content

Commit 7094779

Browse files
replicators: Implement MySQL JSON print
MySQL has its own implementation for printing JSON, which is implicitly called when executing a SELECT on a table that has a JSON column. This implementation adds spaces after the colon and comma in each key-value pair. During snapshot we retrieve the data via SELECT * FROM table, which returns the JSON data in the MySQL own format. When MySQL sends data via binlog, it sends the json without the extra spaces. Our current implementation to lookup a row, requires a full match of the row values, which fails when the JSON data is not in the MySQL format. This commit adds a new function to the MySQL connector to print the JSON data using MySQL format. This function is used in the replicator to transform the JSON data before converting it to DfValue. Fixes: REA-4724 Fixes: #1361 Release-Note-Core: Fixes an issue where the MySQL replicator fails to match rows when the table has JSON fields, causing the record to become stalled. Change-Id: I62e1885b2ee08f498e621e52ac4044bcc1664019 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/7896 Tested-by: Buildkite CI Reviewed-by: Johnathan Davis <[email protected]>
1 parent 7c63128 commit 7094779

File tree

3 files changed

+135
-1
lines changed

3 files changed

+135
-1
lines changed

replicators/src/mysql_connector/connector.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use replication_offset::ReplicationOffset;
2626
use rust_decimal::Decimal;
2727
use tracing::{error, info, warn};
2828

29+
use super::utils::mysql_json_print;
2930
use crate::mysql_connector::utils::mysql_pad_collation_column;
3031
use crate::noria_adapter::{Connector, ReplicationAction};
3132

@@ -1111,7 +1112,7 @@ fn binlog_row_to_noria_row(
11111112
BinlogValue::Jsonb(val) => {
11121113
let json: Result<serde_json::Value, _> = val.clone().try_into(); // urgh no TryFrom impl
11131114
match json {
1114-
Ok(val) => Ok(DfValue::from(val.to_string())),
1115+
Ok(val) => Ok(DfValue::from(mysql_json_print(&val))),
11151116
Err(JsonbToJsonError::Opaque) => match val {
11161117
jsonb::Value::Opaque(opaque_val) => {
11171118
// As far as I can *tell* Opaque is just a raw JSON string, which we

replicators/src/mysql_connector/utils.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::string::FromUtf8Error;
22
use std::sync::Arc;
33

4+
use itertools::Itertools;
45
use mysql_common::collations::{self, Collation, CollationId};
56
use mysql_srv::ColumnType;
67
use readyset_data::DfValue;
@@ -58,6 +59,39 @@ pub fn parse_mysql_version(version: &str) -> mysql_async::Result<u32> {
5859
Ok(major * 10000 + minor * 100 + patch)
5960
}
6061

62+
/// MySQL has its own implementation of json print that deserializes the json and adds a space after
63+
/// the ":" in between key->value and adds a space after the "," in between key->value pairs.
64+
/// This function is a re-implementation of that.
65+
/// More details can be found at [1].
66+
///
67+
/// # Arguments
68+
/// * `json` - the json value to print
69+
///
70+
/// # Returns
71+
/// This function returns a string that represents the printed json
72+
///
73+
/// [1]: https://linear.app/readyset/issue/REA-4724/
74+
pub fn mysql_json_print(json: &serde_json::Value) -> String {
75+
match json {
76+
serde_json::Value::Object(obj) => {
77+
let res = obj
78+
.iter()
79+
.map(|(key, value)| format!("\"{}\": {}", key, mysql_json_print(value)))
80+
.join(", ");
81+
format!("{{{}}}", res)
82+
}
83+
serde_json::Value::Array(arr) => {
84+
let res = arr.iter().map(mysql_json_print).join(", ");
85+
86+
format!("[{}]", res)
87+
}
88+
serde_json::Value::String(s) => format!("\"{}\"", s),
89+
serde_json::Value::Number(n) => n.to_string(),
90+
serde_json::Value::Bool(b) => b.to_string(),
91+
serde_json::Value::Null => "null".to_string(),
92+
}
93+
}
94+
6195
#[cfg(test)]
6296
mod tests {
6397
use super::*;
@@ -76,4 +110,28 @@ mod tests {
76110
let version_number = parse_mysql_version(version).unwrap();
77111
assert_eq!(version_number, 80023);
78112
}
113+
114+
#[test]
115+
fn test_mysql_json_print() {
116+
let json = serde_json::json!({
117+
"key1": "value1",
118+
"key2": {
119+
"key3": "value3",
120+
"key4": {
121+
"key5": "value5"
122+
}
123+
},
124+
"key6": [
125+
"value6",
126+
{
127+
"key7": "value7"
128+
}
129+
]
130+
});
131+
let pretty_json = mysql_json_print(&json);
132+
assert_eq!(
133+
pretty_json,
134+
r#"{"key1": "value1", "key2": {"key3": "value3", "key4": {"key5": "value5"}}, "key6": ["value6", {"key7": "value7"}]}"#
135+
);
136+
}
79137
}

replicators/tests/tests.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3542,3 +3542,78 @@ async fn mysql_handle_dml_in_statement_events() {
35423542
assert_eq!(caches.len(), 0);
35433543
shutdown_tx.shutdown().await;
35443544
}
3545+
3546+
#[tokio::test(flavor = "multi_thread")]
3547+
#[serial_test::serial]
3548+
#[slow]
3549+
async fn mysql_replicate_json_field() {
3550+
readyset_tracing::init_test_logging();
3551+
let url = mysql_url();
3552+
let mut client = DbConnection::connect(&url).await.unwrap();
3553+
3554+
client
3555+
.query(
3556+
"DROP TABLE IF EXISTS j_table;
3557+
CREATE TABLE j_table (id INT PRIMARY KEY, data JSON, c CHAR(1));
3558+
INSERT INTO j_table (id, data, c) VALUES (1, '{\"age\":30,\"car\": [\"Ford\", \"BMW\", \"Fiat\"], \"name\": \"John\"}', 'A');
3559+
INSERT INTO j_table (id, data, c) VALUES (2, NULL, 'A');",
3560+
)
3561+
.await
3562+
.unwrap();
3563+
3564+
let (mut ctx, shutdown_tx) = TestHandle::start_noria(url.to_string(), None)
3565+
.await
3566+
.unwrap();
3567+
ctx.notification_channel
3568+
.as_mut()
3569+
.unwrap()
3570+
.snapshot_completed()
3571+
.await
3572+
.unwrap();
3573+
3574+
// Check that the row is replicated correctly
3575+
ctx.check_results(
3576+
"j_table",
3577+
"Snapshot1",
3578+
&[
3579+
&[
3580+
DfValue::Int(1),
3581+
DfValue::Text(
3582+
"{\"age\": 30, \"car\": [\"Ford\", \"BMW\", \"Fiat\"], \"name\": \"John\"}"
3583+
.into(),
3584+
),
3585+
DfValue::Text("A".into()),
3586+
],
3587+
&[DfValue::Int(2), DfValue::None, DfValue::Text("A".into())],
3588+
],
3589+
)
3590+
.await
3591+
.unwrap();
3592+
3593+
// Update the JSON data
3594+
client
3595+
.query("UPDATE j_table SET c = 'B' WHERE id = 1 OR id = 2;")
3596+
.await
3597+
.unwrap();
3598+
3599+
// Check that the update is replicated correctly
3600+
ctx.check_results(
3601+
"j_table",
3602+
"Replication",
3603+
&[
3604+
&[
3605+
DfValue::Int(1),
3606+
DfValue::Text(
3607+
"{\"age\": 30, \"car\": [\"Ford\", \"BMW\", \"Fiat\"], \"name\": \"John\"}"
3608+
.into(),
3609+
),
3610+
DfValue::Text("B".into()),
3611+
],
3612+
&[DfValue::Int(2), DfValue::None, DfValue::Text("B".into())],
3613+
],
3614+
)
3615+
.await
3616+
.unwrap();
3617+
3618+
shutdown_tx.shutdown().await;
3619+
}

0 commit comments

Comments
 (0)