From 965792f13ca5bbaf563ae924774e8354235b3ba4 Mon Sep 17 00:00:00 2001 From: Annie Blomgren Date: Mon, 9 Mar 2026 11:42:02 +0100 Subject: [PATCH 1/5] Add lavinmqctl list_in_sync_replicas command Queries etcd directly to list nodes in the in-sync replica set, their advertised addresses, and whether each node is leader or follower. Co-Authored-By: Claude Sonnet 4.6 --- src/lavinmq/etcd.cr | 9 ++++++++ src/lavinmqctl/cli.cr | 50 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/src/lavinmq/etcd.cr b/src/lavinmq/etcd.cr index 16bea5b815..2036547d79 100644 --- a/src/lavinmq/etcd.cr +++ b/src/lavinmq/etcd.cr @@ -162,6 +162,15 @@ module LavinMQ end end + def election_leader(name) : String? + json = post("/v3/election/leader", %({"name":"#{Base64.strict_encode name}"})) + if value = json.dig?("kv", "value") + Base64.decode_string(value.as_s) + end + rescue Error + nil + end + private def post(path, body) : JSON::Any with_tcp do |conn| return post_request(conn.socket, conn.address, conn.auth, path, body) diff --git a/src/lavinmqctl/cli.cr b/src/lavinmqctl/cli.cr index 543a91f59e..43d6c01d89 100644 --- a/src/lavinmqctl/cli.cr +++ b/src/lavinmqctl/cli.cr @@ -7,6 +7,7 @@ require "../lavinmq/shovel/constants" require "../lavinmq/federation/constants" require "../lavinmq/definitions_generator" require "../lavinmq/auth/user" +require "../lavinmq/etcd" class LavinMQCtl @options = {} of String => String @@ -60,6 +61,12 @@ class LavinMQCtl if host = ENV["LAVINMQCTL_HOST"]? @options["host"] = host end + if ep = ENV["LAVINMQ_CLUSTERING_ETCD_ENDPOINTS"]? + @options["etcd-endpoints"] = ep + end + if pfx = ENV["LAVINMQ_CLUSTERING_ETCD_PREFIX"]? + @options["etcd-prefix"] = pfx + end global_options parse_cmd end @@ -224,6 +231,12 @@ class LavinMQCtl @args["queue"] = JSON::Any.new(v) end end + @parser.on("list_in_sync_replicas", "List nodes in the in-sync replica set") do + @cmd = "list_in_sync_replicas" + self.banner = "Usage: #{PROGRAM_NAME} list_in_sync_replicas" + end + + @parser.separator("") @parser.on("-v", "--version", "Show version") { @io.puts LavinMQ::VERSION; exit 0 } @parser.on("--build-info", "Show build information") { @io.puts LavinMQ::BUILD_INFO; exit 0 } @parser.on("-h", "--help", "Show this help") do @@ -279,6 +292,7 @@ class LavinMQCtl when "list_federations" then list_federations when "add_federation" then add_federation when "delete_federation" then delete_federation + when "list_in_sync_replicas" then list_in_sync_replicas when "stop_app" when "start_app" else @@ -389,6 +403,14 @@ class LavinMQCtl end @options["format"] = v end + @parser.on("--etcd-endpoints=ENDPOINTS", + "Comma-separated etcd endpoints (or LAVINMQ_CLUSTERING_ETCD_ENDPOINTS)") do |v| + @options["etcd-endpoints"] = v + end + @parser.on("--etcd-prefix=PREFIX", + "etcd key prefix used by LavinMQ (default: lavinmq)") do |v| + @options["etcd-prefix"] = v + end end private def quiet? @@ -925,4 +947,32 @@ class LavinMQCtl resp = http.delete url handle_response(resp, 204) end + + private def list_in_sync_replicas + endpoints = @options["etcd-endpoints"]? || + abort "Specify etcd endpoints with --etcd-endpoints or LAVINMQ_CLUSTERING_ETCD_ENDPOINTS" + prefix = @options["etcd-prefix"]? || "lavinmq" + etcd = LavinMQ::Etcd.new(endpoints) + + isr_str = etcd.get("#{prefix}/isr") || + abort "No ISR data found in etcd (key: #{prefix}/isr). Is LavinMQ running in a cluster?" + node_ids = isr_str.split(',').map(&.strip).reject(&.empty?) + + # Build node_id -> address from etcd election candidates. + # Each candidate stores its advertised_uri under {prefix}/leader/{hex_lease_id} + # where hex_lease_id.to_i64(16).to_s(36) == node_id + candidates = etcd.get_prefix("#{prefix}/leader/") + id_to_uri = candidates.each_with_object(Hash(String, String).new) do |(key, uri), h| + hex = key.split('/').last + h[hex.to_i64(16).to_s(36)] = uri + end + + leader_uri = etcd.election_leader("#{prefix}/leader") + + output node_ids.map { |id| + address = id_to_uri[id]? || "" + role = address == leader_uri ? "leader" : "follower" + {node_id: id, address: address, role: role} + }, ["node_id", "address", "role"] + end end From 369aac31d131fa06ecf24925f187365473e006cb Mon Sep 17 00:00:00 2001 From: Annie Blomgren Date: Mon, 9 Mar 2026 11:59:20 +0100 Subject: [PATCH 2/5] Format cli.cr Co-Authored-By: Claude Sonnet 4.6 --- src/lavinmqctl/cli.cr | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lavinmqctl/cli.cr b/src/lavinmqctl/cli.cr index 43d6c01d89..2a9784e537 100644 --- a/src/lavinmqctl/cli.cr +++ b/src/lavinmqctl/cli.cr @@ -404,11 +404,11 @@ class LavinMQCtl @options["format"] = v end @parser.on("--etcd-endpoints=ENDPOINTS", - "Comma-separated etcd endpoints (or LAVINMQ_CLUSTERING_ETCD_ENDPOINTS)") do |v| + "Comma-separated etcd endpoints (or LAVINMQ_CLUSTERING_ETCD_ENDPOINTS)") do |v| @options["etcd-endpoints"] = v end @parser.on("--etcd-prefix=PREFIX", - "etcd key prefix used by LavinMQ (default: lavinmq)") do |v| + "etcd key prefix used by LavinMQ (default: lavinmq)") do |v| @options["etcd-prefix"] = v end end From 57ff12fd8e25b14b240b98c6b81b8cd28b9c3ebd Mon Sep 17 00:00:00 2001 From: Annie Blomgren Date: Mon, 9 Mar 2026 13:02:13 +0100 Subject: [PATCH 3/5] Address review feedback on list_in_sync_replicas - Handle empty etcd response in get_prefix (no kvs key) - Show "unknown" role when no leader is elected - Default --etcd-endpoints to localhost:2379 - Remove blank separator before version/help flags Co-Authored-By: Claude Sonnet 4.6 --- src/lavinmq/etcd.cr | 2 +- src/lavinmqctl/cli.cr | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/lavinmq/etcd.cr b/src/lavinmq/etcd.cr index 2036547d79..4c2f1a47d4 100644 --- a/src/lavinmq/etcd.cr +++ b/src/lavinmq/etcd.cr @@ -56,7 +56,7 @@ module LavinMQ range_end = key.to_slice.dup range_end.update(-1) { |v| v + 1 } json = post("/v3/kv/range", %({"key":"#{Base64.strict_encode key}","range_end":"#{Base64.strict_encode range_end}"})) - kvs = json["kvs"].as_a + kvs = json["kvs"]?.try(&.as_a) || return Hash(String, String).new h = Hash(String, String).new(initial_capacity: kvs.size) kvs.each do |kv| key = Base64.decode_string kv["key"].as_s diff --git a/src/lavinmqctl/cli.cr b/src/lavinmqctl/cli.cr index 2a9784e537..c97a094667 100644 --- a/src/lavinmqctl/cli.cr +++ b/src/lavinmqctl/cli.cr @@ -236,7 +236,6 @@ class LavinMQCtl self.banner = "Usage: #{PROGRAM_NAME} list_in_sync_replicas" end - @parser.separator("") @parser.on("-v", "--version", "Show version") { @io.puts LavinMQ::VERSION; exit 0 } @parser.on("--build-info", "Show build information") { @io.puts LavinMQ::BUILD_INFO; exit 0 } @parser.on("-h", "--help", "Show this help") do @@ -949,8 +948,7 @@ class LavinMQCtl end private def list_in_sync_replicas - endpoints = @options["etcd-endpoints"]? || - abort "Specify etcd endpoints with --etcd-endpoints or LAVINMQ_CLUSTERING_ETCD_ENDPOINTS" + endpoints = @options["etcd-endpoints"]? || "localhost:2379" prefix = @options["etcd-prefix"]? || "lavinmq" etcd = LavinMQ::Etcd.new(endpoints) @@ -971,7 +969,13 @@ class LavinMQCtl output node_ids.map { |id| address = id_to_uri[id]? || "" - role = address == leader_uri ? "leader" : "follower" + role = if leader_uri.nil? + "unknown" + elsif address == leader_uri + "leader" + else + "follower" + end {node_id: id, address: address, role: role} }, ["node_id", "address", "role"] end From 2980bb7aef06afdc55243df93385ab378272a827 Mon Sep 17 00:00:00 2001 From: Annie Blomgren Date: Mon, 9 Mar 2026 13:19:04 +0100 Subject: [PATCH 4/5] Add .claude/review-criteria.md and review-model for CI review workflow Co-Authored-By: Claude Sonnet 4.6 --- .claude/review-criteria.md | 19 +++++++++++++++++++ .claude/review-model | 1 + 2 files changed, 20 insertions(+) create mode 100644 .claude/review-criteria.md create mode 100644 .claude/review-model diff --git a/.claude/review-criteria.md b/.claude/review-criteria.md new file mode 100644 index 0000000000..ccf23246ee --- /dev/null +++ b/.claude/review-criteria.md @@ -0,0 +1,19 @@ +Report only issues you are confident about: +- Bugs and logic errors +- Security vulnerabilities +- Performance issues (especially allocations in hot paths) +- Missing error handling +- Crystal anti-patterns + +Quality bar: +- Only report issues you'd block a PR for. Skip stylistic nits and minor suggestions. +- If you start describing an issue and realize it's not actually a problem, drop it. Do not include findings you've talked yourself out of. +- Each finding must have a concrete file:line reference and a clear explanation of the actual impact. + +If no problems found, say "No issues found." + +Do NOT: +- Check CI status or previous commits +- Run specs +- Use emojis +- Explore the codebase broadly — only read specific files when needed diff --git a/.claude/review-model b/.claude/review-model new file mode 100644 index 0000000000..cc494a456b --- /dev/null +++ b/.claude/review-model @@ -0,0 +1 @@ +claude-opus-4-5 \ No newline at end of file From cb075aecbab716b6eaff7384c494f299b833da5a Mon Sep 17 00:00:00 2001 From: Loofeit Date: Tue, 31 Mar 2026 09:16:17 +0200 Subject: [PATCH 5/5] Add lavinmqctl list_etcd_members command Lists etcd cluster members using the MemberList API, showing name, peer_urls, client_urls and learner status. Co-Authored-By: Claude Sonnet 4.6 --- src/lavinmq/etcd.cr | 12 ++++++++++++ src/lavinmqctl/cli.cr | 13 +++++++++++++ 2 files changed, 25 insertions(+) diff --git a/src/lavinmq/etcd.cr b/src/lavinmq/etcd.cr index 4c2f1a47d4..46496f27d2 100644 --- a/src/lavinmq/etcd.cr +++ b/src/lavinmq/etcd.cr @@ -162,6 +162,18 @@ module LavinMQ end end + def member_list : Array(NamedTuple(name: String, peer_urls: String, client_urls: String, learner: Bool)) + json = post("/v3/cluster/member/list", "{}") + members = json["members"]?.try(&.as_a) || return [] of NamedTuple(name: String, peer_urls: String, client_urls: String, learner: Bool) + members.map do |m| + name = m["name"]?.try(&.as_s) || "" + peer_urls = m["peerURLs"]?.try(&.as_a.map(&.as_s).join(",")) || "" + client_urls = m["clientURLs"]?.try(&.as_a.map(&.as_s).join(",")) || "" + learner = m["isLearner"]?.try(&.as_bool) || false + {name: name, peer_urls: peer_urls, client_urls: client_urls, learner: learner} + end + end + def election_leader(name) : String? json = post("/v3/election/leader", %({"name":"#{Base64.strict_encode name}"})) if value = json.dig?("kv", "value") diff --git a/src/lavinmqctl/cli.cr b/src/lavinmqctl/cli.cr index c97a094667..cf5d342441 100644 --- a/src/lavinmqctl/cli.cr +++ b/src/lavinmqctl/cli.cr @@ -235,6 +235,10 @@ class LavinMQCtl @cmd = "list_in_sync_replicas" self.banner = "Usage: #{PROGRAM_NAME} list_in_sync_replicas" end + @parser.on("list_etcd_members", "List etcd cluster members") do + @cmd = "list_etcd_members" + self.banner = "Usage: #{PROGRAM_NAME} list_etcd_members" + end @parser.on("-v", "--version", "Show version") { @io.puts LavinMQ::VERSION; exit 0 } @parser.on("--build-info", "Show build information") { @io.puts LavinMQ::BUILD_INFO; exit 0 } @@ -292,6 +296,7 @@ class LavinMQCtl when "add_federation" then add_federation when "delete_federation" then delete_federation when "list_in_sync_replicas" then list_in_sync_replicas + when "list_etcd_members" then list_etcd_members when "stop_app" when "start_app" else @@ -947,6 +952,14 @@ class LavinMQCtl handle_response(resp, 204) end + private def list_etcd_members + endpoints = @options["etcd-endpoints"]? || "localhost:2379" + etcd = LavinMQ::Etcd.new(endpoints) + members = etcd.member_list + abort "No members found. Is etcd running?" if members.empty? + output members, ["name", "peer_urls", "client_urls", "learner"] + end + private def list_in_sync_replicas endpoints = @options["etcd-endpoints"]? || "localhost:2379" prefix = @options["etcd-prefix"]? || "lavinmq"