Skip to content

Commit b6c2ab3

Browse files
authored
feat: add --decode-values flag to metactl dump-raft-log-wal (#19448)
* feat: add --decode-values flag to metactl dump-raft-log-wal Previously, `dump-raft-log-wal` displayed protobuf-encoded values as raw bytes (e.g., `Update("[binary]")`). The new `--decode-values` flag uses key prefixes (`__fd_database_by_id`, `__fd_roles`, etc.) to route values to their protobuf types via `FromToProto`, rendering them as debug structs (`DatabaseMeta { engine: "", ... }`). The decoder is in `meta/process` (which already has all proto-conv deps) while the dump logic is in `meta/control` (alongside other metactl utilities), keeping `meta/binaries` as thin CLI dispatch. Example: databend-metactl dump-raft-log-wal --raft-dir .databend/meta1 --decode-values RaftLog: ChunkId(00_000_000_000_000_000_000) R-00015: ... Append(... txn:TxnRequest{if:[...] then:[Put(Put key=__fd_database_by_id/1),...] else:[]}) txn.if_then[0].put __fd_database_by_id/1: DatabaseMeta { engine: "", engine_options: {}, options: {}, ... } txn.if_then[1].put __fd_db_id_list/test_tenant/default: DbIdList { id_list: [1] } R-00017: ... Append(... txn:TxnRequest{if:[...] then:[Put(Put key=__fd_roles/test_tenant/public)] else:[...]}) txn.if_then[0].put __fd_roles/test_tenant/public: RoleInfo { name: "public", grants: UserGrantSet { entries: [], roles: {} }, ... } Changes: - Add `pb_value_decoder` module with `decode_pb_value()` (30 key-prefix mappings) and `decode_cmd_values()` for `UpsertKV`/`Transaction` cmds - Add `dump_raft_log_wal` module using `Dump::write_with()` to append decoded lines after each `Append(Normal(...))` WAL record - Add static WAL integration test with a real 283KB WAL fixture containing `__fd_*` keys that decode to actual protobuf types * chore: add trailing slash to prefix matching * fix: use fixed timestamp in WAL decode test for cross-platform determinism `DatabaseMeta::default()` calls `Utc::now()`, whose protobuf varint encoding size varies with the timestamp value (e.g., 214 bytes locally vs 220 on CI). Replace with an explicit construction using a fixed timestamp (`2024-01-01T00:00:00Z`) so the encoded size is deterministic (200 bytes) across all platforms. Also remove the now-unused `normalize_timestamps` helper and `regex` dev-dependency since the fixed timestamp makes masking unnecessary. * feat: add --raw flag to metactl dump-raft-log-wal When debugging cross-platform protobuf encoding differences, the `--decode-values` flag shows decoded structs but hides the actual byte encoding. The new `--raw` / `-R` flag dumps raw protobuf byte arrays so you can see exactly what bytes differ. Both flags can be combined. Changes: - Add `raw_cmd_values()` and `raw_txn_op()` parallel to their decode counterparts, formatting bytes as `[10, 0, 26, ...]` - Thread `raw` bool through `dump_wal()`, collecting extra lines before consuming the WAL record to avoid borrow conflicts Example: metactl dump-raft-log-wal --raft-dir .databend/meta --raw R-00001: [... Size(200): Append(...) raw(__fd_database_by_id/123): [10, 0, 26, 12, ...] metactl dump-raft-log-wal --raft-dir .databend/meta -V -R R-00001: [...) Size(200): Append(...) value(__fd_database_by_id/123): DatabaseMeta { engine: , ... } raw(__fd_database_by_id/123): [10, 0, 26, 12, ...] EOF )
1 parent d7c8bc6 commit b6c2ab3

File tree

13 files changed

+2521
-71
lines changed

13 files changed

+2521
-71
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/meta/binaries/metactl/main.rs

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -318,29 +318,6 @@ return metrics, nil
318318
) -> Result<Arc<ClientHandle<DatabendRuntime>>, CreationError> {
319319
lua_support::new_grpc_client(addresses)
320320
}
321-
322-
async fn dump_raft_log_wal(&self, args: &DumpRaftLogWalArgs) -> anyhow::Result<()> {
323-
use std::path::PathBuf;
324-
325-
use databend_meta_raft_store::raft_log::Config;
326-
use databend_meta_raft_store::raft_log::DumpApi;
327-
328-
let mut wal_dir = PathBuf::from(&args.raft_dir);
329-
wal_dir.push("df_meta");
330-
wal_dir.push("V004");
331-
wal_dir.push("log");
332-
333-
let config = Arc::new(Config {
334-
dir: wal_dir.to_string_lossy().to_string(),
335-
..Default::default()
336-
});
337-
338-
let dump = databend_meta_raft_store::raft_log::Dump::<
339-
databend_meta_raft_store::raft_log_v004::RaftLogTypes,
340-
>::new(config)?;
341-
dump.write_display(io::stdout())?;
342-
Ok(())
343-
}
344321
}
345322

346323
#[derive(Debug, Clone, Deserialize, Subcommand)]
@@ -684,7 +661,7 @@ async fn main() -> anyhow::Result<()> {
684661
app.get_metrics(args).await?;
685662
}
686663
CtlCommand::DumpRaftLogWal(args) => {
687-
app.dump_raft_log_wal(args).await?;
664+
databend_common_meta_control::dump_raft_log_wal::dump_raft_log_wal(args)?;
688665
}
689666
},
690667
// for backward compatibility

src/meta/control/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ edition = { workspace = true }
1010
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1111

1212
[dependencies]
13+
databend-common-meta-process = { workspace = true }
1314
databend-common-tracing = { workspace = true }
1415
databend-meta = { workspace = true }
1516
databend-meta-admin = { workspace = true }
@@ -34,6 +35,12 @@ url = { workspace = true }
3435

3536
[dev-dependencies]
3637
anyhow = { workspace = true }
38+
chrono = { workspace = true }
39+
databend-common-meta-app = { workspace = true }
40+
databend-common-proto-conv = { workspace = true }
41+
prost = { workspace = true }
42+
tempfile = { workspace = true }
43+
tokio = { workspace = true }
3744

3845
[build-dependencies]
3946
databend-common-building = { workspace = true }

src/meta/control/src/args.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,4 +301,12 @@ pub struct DumpRaftLogWalArgs {
301301
/// The dir to store persisted meta state, e.g., `.databend/meta1`
302302
#[clap(long)]
303303
pub raft_dir: String,
304+
305+
/// Decode protobuf-encoded values in UpsertKV and Transaction operations
306+
#[clap(short = 'V', long, default_value_t = false)]
307+
pub decode_values: bool,
308+
309+
/// Show raw protobuf bytes for values in UpsertKV and Transaction operations
310+
#[clap(short = 'R', long, default_value_t = false)]
311+
pub raw: bool,
304312
}
Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::io;
16+
use std::io::Write;
17+
use std::path::Path;
18+
use std::path::PathBuf;
19+
use std::sync::Arc;
20+
21+
use databend_common_meta_process::pb_value_decoder::decode_cmd_values;
22+
use databend_common_meta_process::pb_value_decoder::raw_cmd_values;
23+
use databend_meta_raft_store::raft_log::Config;
24+
use databend_meta_raft_store::raft_log::Dump;
25+
use databend_meta_raft_store::raft_log::DumpApi;
26+
use databend_meta_raft_store::raft_log::WALRecord;
27+
use databend_meta_raft_store::raft_log::dump_writer::write_record_display;
28+
use databend_meta_raft_store::raft_log_v004::RaftLogTypes;
29+
use databend_meta_types::raft_types::EntryPayload;
30+
31+
use crate::args::DumpRaftLogWalArgs;
32+
33+
pub fn dump_raft_log_wal(args: &DumpRaftLogWalArgs) -> anyhow::Result<()> {
34+
let mut wal_dir = PathBuf::from(&args.raft_dir);
35+
wal_dir.push("df_meta");
36+
wal_dir.push("V004");
37+
wal_dir.push("log");
38+
39+
dump_wal(&wal_dir, args.decode_values, args.raw, io::stdout())
40+
}
41+
42+
pub fn dump_wal(
43+
wal_dir: &Path,
44+
decode_values: bool,
45+
raw: bool,
46+
mut w: impl Write,
47+
) -> anyhow::Result<()> {
48+
let config = Arc::new(Config {
49+
dir: wal_dir.to_string_lossy().to_string(),
50+
..Default::default()
51+
});
52+
53+
let dump = Dump::<RaftLogTypes>::new(config)?;
54+
55+
if !decode_values && !raw {
56+
dump.write_display(&mut w)?;
57+
return Ok(());
58+
}
59+
60+
writeln!(w, "RaftLog:")?;
61+
62+
dump.write_with(|chunk_id, idx, res| {
63+
let mut extra_lines = vec![];
64+
65+
if let Ok((_seg, WALRecord::Append(_log_id, payload))) = &res
66+
&& let EntryPayload::Normal(log_entry) = &payload.0
67+
{
68+
if decode_values {
69+
extra_lines.extend(decode_cmd_values(&log_entry.cmd));
70+
}
71+
if raw {
72+
extra_lines.extend(raw_cmd_values(&log_entry.cmd));
73+
}
74+
}
75+
76+
write_record_display(&mut w, chunk_id, idx, res)?;
77+
78+
for line in extra_lines {
79+
writeln!(w, "{}", line)?;
80+
}
81+
Ok(())
82+
})?;
83+
84+
Ok(())
85+
}
86+
87+
#[cfg(test)]
88+
mod tests {
89+
use std::collections::BTreeMap;
90+
use std::sync::Arc;
91+
92+
use chrono::TimeZone;
93+
use chrono::Utc;
94+
use databend_common_meta_app::schema::DatabaseMeta;
95+
use databend_common_proto_conv::FromToProto;
96+
use databend_meta_raft_store::raft_log::Config;
97+
use databend_meta_raft_store::raft_log::api::raft_log_writer::RaftLogWriter;
98+
use databend_meta_raft_store::raft_log_v004::Cw;
99+
use databend_meta_raft_store::raft_log_v004::RaftLogV004;
100+
use databend_meta_raft_store::raft_log_v004::util::blocking_flush;
101+
use databend_meta_types::Cmd;
102+
use databend_meta_types::LogEntry;
103+
use databend_meta_types::UpsertKV;
104+
use databend_meta_types::raft_types::EntryPayload;
105+
use databend_meta_types::raft_types::new_log_id;
106+
use prost::Message;
107+
108+
use super::*;
109+
110+
#[tokio::test]
111+
async fn test_dump_wal_without_decode() -> anyhow::Result<()> {
112+
let tmp = tempfile::tempdir()?;
113+
let wal_dir = tmp.path().join("log");
114+
std::fs::create_dir_all(&wal_dir)?;
115+
116+
let config = Arc::new(Config {
117+
dir: wal_dir.to_string_lossy().to_string(),
118+
..Default::default()
119+
});
120+
121+
let mut log = RaftLogV004::open(config)?;
122+
log.append([(Cw(new_log_id(1, 0, 0)), Cw(EntryPayload::Blank))])?;
123+
blocking_flush(&mut log).await?;
124+
drop(log);
125+
126+
let mut buf = Vec::new();
127+
dump_wal(&wal_dir, false, false, &mut buf)?;
128+
let output = String::from_utf8(buf)?;
129+
130+
assert_eq!(
131+
output,
132+
concat!(
133+
"RaftLog:\n",
134+
"ChunkId(00_000_000_000_000_000_000)\n",
135+
" R-00000: [000_000_000, 000_000_018) Size(18): RaftLogState(RaftLogState(vote: None, last: None, committed: None, purged: None, user_data: None))\n",
136+
" R-00001: [000_000_018, 000_000_070) Size(52): Append(log_id: T1-N0.0, payload: blank)\n",
137+
)
138+
);
139+
140+
Ok(())
141+
}
142+
143+
#[tokio::test]
144+
async fn test_dump_wal_decode_upsert_kv() -> anyhow::Result<()> {
145+
let tmp = tempfile::tempdir()?;
146+
let wal_dir = tmp.path().join("log");
147+
std::fs::create_dir_all(&wal_dir)?;
148+
149+
let config = Arc::new(Config {
150+
dir: wal_dir.to_string_lossy().to_string(),
151+
..Default::default()
152+
});
153+
154+
// Use a fixed timestamp to ensure deterministic protobuf encoding size
155+
// across platforms. `Utc::now()` in `DatabaseMeta::default()` produces
156+
// different varint sizes depending on the timestamp value.
157+
let ts = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
158+
let meta = DatabaseMeta {
159+
engine: "".to_string(),
160+
engine_options: BTreeMap::new(),
161+
options: BTreeMap::new(),
162+
created_on: ts,
163+
updated_on: ts,
164+
comment: "".to_string(),
165+
drop_on: None,
166+
gc_in_progress: false,
167+
};
168+
let pb = meta.to_pb()?;
169+
let mut pb_buf = vec![];
170+
pb.encode(&mut pb_buf)?;
171+
172+
let cmd = Cmd::UpsertKV(UpsertKV::update("__fd_database_by_id/123", &pb_buf));
173+
let log_entry = LogEntry::new(cmd);
174+
let payload = EntryPayload::Normal(log_entry);
175+
176+
let mut log = RaftLogV004::open(config)?;
177+
log.append([(Cw(new_log_id(1, 0, 0)), Cw(payload))])?;
178+
blocking_flush(&mut log).await?;
179+
drop(log);
180+
181+
let mut buf = Vec::new();
182+
dump_wal(&wal_dir, true, false, &mut buf)?;
183+
let output = String::from_utf8(buf)?;
184+
185+
assert_eq!(
186+
output,
187+
concat!(
188+
"RaftLog:\n",
189+
"ChunkId(00_000_000_000_000_000_000)\n",
190+
" R-00000: [000_000_000, 000_000_018) Size(18): RaftLogState(RaftLogState(vote: None, last: None, committed: None, purged: None, user_data: None))\n",
191+
r#" R-00001: [000_000_018, 000_000_218) Size(200): Append(log_id: T1-N0.0, payload: normal: cmd: upsert_kv:__fd_database_by_id/123(GE(0)) = Update("[binary]") (None))"#,
192+
"\n",
193+
r#" value(__fd_database_by_id/123): DatabaseMeta { engine: "", engine_options: {}, options: {}, created_on: 2024-01-01T00:00:00Z, updated_on: 2024-01-01T00:00:00Z, comment: "", drop_on: None, gc_in_progress: false }"#,
194+
"\n",
195+
)
196+
);
197+
198+
Ok(())
199+
}
200+
201+
#[tokio::test]
202+
async fn test_dump_wal_raw() -> anyhow::Result<()> {
203+
let tmp = tempfile::tempdir()?;
204+
let wal_dir = tmp.path().join("log");
205+
std::fs::create_dir_all(&wal_dir)?;
206+
207+
let config = Arc::new(Config {
208+
dir: wal_dir.to_string_lossy().to_string(),
209+
..Default::default()
210+
});
211+
212+
let ts = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
213+
let meta = DatabaseMeta {
214+
engine: "".to_string(),
215+
engine_options: BTreeMap::new(),
216+
options: BTreeMap::new(),
217+
created_on: ts,
218+
updated_on: ts,
219+
comment: "".to_string(),
220+
drop_on: None,
221+
gc_in_progress: false,
222+
};
223+
let pb = meta.to_pb()?;
224+
let mut pb_buf = vec![];
225+
pb.encode(&mut pb_buf)?;
226+
227+
let raw_bytes_str = format!(
228+
"[{}]",
229+
pb_buf
230+
.iter()
231+
.map(|b| b.to_string())
232+
.collect::<Vec<_>>()
233+
.join(", ")
234+
);
235+
236+
let cmd = Cmd::UpsertKV(UpsertKV::update("__fd_database_by_id/123", &pb_buf));
237+
let log_entry = LogEntry::new(cmd);
238+
let payload = EntryPayload::Normal(log_entry);
239+
240+
let mut log = RaftLogV004::open(config)?;
241+
log.append([(Cw(new_log_id(1, 0, 0)), Cw(payload))])?;
242+
blocking_flush(&mut log).await?;
243+
drop(log);
244+
245+
// --raw only
246+
let mut buf = Vec::new();
247+
dump_wal(&wal_dir, false, true, &mut buf)?;
248+
let output = String::from_utf8(buf)?;
249+
250+
let expected = format!(
251+
concat!(
252+
"RaftLog:\n",
253+
"ChunkId(00_000_000_000_000_000_000)\n",
254+
" R-00000: [000_000_000, 000_000_018) Size(18): RaftLogState(RaftLogState(vote: None, last: None, committed: None, purged: None, user_data: None))\n",
255+
r#" R-00001: [000_000_018, 000_000_218) Size(200): Append(log_id: T1-N0.0, payload: normal: cmd: upsert_kv:__fd_database_by_id/123(GE(0)) = Update("[binary]") (None))"#,
256+
"\n",
257+
" raw(__fd_database_by_id/123): {}\n",
258+
),
259+
raw_bytes_str
260+
);
261+
262+
assert_eq!(output, expected);
263+
264+
// --decode-values --raw (both)
265+
let mut buf = Vec::new();
266+
dump_wal(&wal_dir, true, true, &mut buf)?;
267+
let output = String::from_utf8(buf)?;
268+
269+
let expected = format!(
270+
concat!(
271+
"RaftLog:\n",
272+
"ChunkId(00_000_000_000_000_000_000)\n",
273+
" R-00000: [000_000_000, 000_000_018) Size(18): RaftLogState(RaftLogState(vote: None, last: None, committed: None, purged: None, user_data: None))\n",
274+
r#" R-00001: [000_000_018, 000_000_218) Size(200): Append(log_id: T1-N0.0, payload: normal: cmd: upsert_kv:__fd_database_by_id/123(GE(0)) = Update("[binary]") (None))"#,
275+
"\n",
276+
r#" value(__fd_database_by_id/123): DatabaseMeta {{ engine: "", engine_options: {{}}, options: {{}}, created_on: 2024-01-01T00:00:00Z, updated_on: 2024-01-01T00:00:00Z, comment: "", drop_on: None, gc_in_progress: false }}"#,
277+
"\n",
278+
" raw(__fd_database_by_id/123): {}\n",
279+
),
280+
raw_bytes_str
281+
);
282+
283+
assert_eq!(output, expected);
284+
285+
Ok(())
286+
}
287+
}

0 commit comments

Comments
 (0)