Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .claude/review-criteria.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Report only issues you are confident about:
- Bugs and logic errors
- Security vulnerabilities
- Performance issues (especially allocations in hot paths)
- Missing error handling
- Crystal anti-patterns

Quality bar:
- Only report issues you'd block a PR for. Skip stylistic nits and minor suggestions.
- If you start describing an issue and realize it's not actually a problem, drop it. Do not include findings you've talked yourself out of.
- Each finding must have a concrete file:line reference and a clear explanation of the actual impact.

If no problems found, say "No issues found."

Do NOT:
- Check CI status or previous commits
- Run specs
- Use emojis
- Explore the codebase broadly — only read specific files when needed
1 change: 1 addition & 0 deletions .claude/review-model
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
claude-opus-4-5
23 changes: 22 additions & 1 deletion src/lavinmq/etcd.cr
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ module LavinMQ
range_end = key.to_slice.dup
range_end.update(-1) { |v| v + 1 }
json = post("/v3/kv/range", %({"key":"#{Base64.strict_encode key}","range_end":"#{Base64.strict_encode range_end}"}))
kvs = json["kvs"].as_a
kvs = json["kvs"]?.try(&.as_a) || return Hash(String, String).new
h = Hash(String, String).new(initial_capacity: kvs.size)
kvs.each do |kv|
key = Base64.decode_string kv["key"].as_s
Expand Down Expand Up @@ -162,6 +162,27 @@ module LavinMQ
end
end

def member_list : Array(NamedTuple(name: String, peer_urls: String, client_urls: String, learner: Bool))
json = post("/v3/cluster/member/list", "{}")
members = json["members"]?.try(&.as_a) || return [] of NamedTuple(name: String, peer_urls: String, client_urls: String, learner: Bool)
members.map do |m|
name = m["name"]?.try(&.as_s) || ""
peer_urls = m["peerURLs"]?.try(&.as_a.map(&.as_s).join(",")) || ""
client_urls = m["clientURLs"]?.try(&.as_a.map(&.as_s).join(",")) || ""
learner = m["isLearner"]?.try(&.as_bool) || false
{name: name, peer_urls: peer_urls, client_urls: client_urls, learner: learner}
end
end

def election_leader(name) : String?
json = post("/v3/election/leader", %({"name":"#{Base64.strict_encode name}"}))
if value = json.dig?("kv", "value")
Base64.decode_string(value.as_s)
end
rescue Error
nil
end

private def post(path, body) : JSON::Any
with_tcp do |conn|
return post_request(conn.socket, conn.address, conn.auth, path, body)
Expand Down
67 changes: 67 additions & 0 deletions src/lavinmqctl/cli.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require "../lavinmq/shovel/constants"
require "../lavinmq/federation/constants"
require "../lavinmq/definitions_generator"
require "../lavinmq/auth/user"
require "../lavinmq/etcd"

class LavinMQCtl
@options = {} of String => String
Expand Down Expand Up @@ -60,6 +61,12 @@ class LavinMQCtl
if host = ENV["LAVINMQCTL_HOST"]?
@options["host"] = host
end
if ep = ENV["LAVINMQ_CLUSTERING_ETCD_ENDPOINTS"]?
@options["etcd-endpoints"] = ep
end
if pfx = ENV["LAVINMQ_CLUSTERING_ETCD_PREFIX"]?
@options["etcd-prefix"] = pfx
end
global_options
parse_cmd
end
Expand Down Expand Up @@ -224,6 +231,15 @@ class LavinMQCtl
@args["queue"] = JSON::Any.new(v)
end
end
@parser.on("list_in_sync_replicas", "List nodes in the in-sync replica set") do
@cmd = "list_in_sync_replicas"
self.banner = "Usage: #{PROGRAM_NAME} list_in_sync_replicas"
end
@parser.on("list_etcd_members", "List etcd cluster members") do
@cmd = "list_etcd_members"
self.banner = "Usage: #{PROGRAM_NAME} list_etcd_members"
end

@parser.on("-v", "--version", "Show version") { @io.puts LavinMQ::VERSION; exit 0 }
@parser.on("--build-info", "Show build information") { @io.puts LavinMQ::BUILD_INFO; exit 0 }
@parser.on("-h", "--help", "Show this help") do
Expand Down Expand Up @@ -279,6 +295,8 @@ class LavinMQCtl
when "list_federations" then list_federations
when "add_federation" then add_federation
when "delete_federation" then delete_federation
when "list_in_sync_replicas" then list_in_sync_replicas
when "list_etcd_members" then list_etcd_members
when "stop_app"
when "start_app"
else
Expand Down Expand Up @@ -389,6 +407,14 @@ class LavinMQCtl
end
@options["format"] = v
end
@parser.on("--etcd-endpoints=ENDPOINTS",
"Comma-separated etcd endpoints (or LAVINMQ_CLUSTERING_ETCD_ENDPOINTS)") do |v|
@options["etcd-endpoints"] = v
end
@parser.on("--etcd-prefix=PREFIX",
"etcd key prefix used by LavinMQ (default: lavinmq)") do |v|
@options["etcd-prefix"] = v
end
end

private def quiet?
Expand Down Expand Up @@ -925,4 +951,45 @@ class LavinMQCtl
resp = http.delete url
handle_response(resp, 204)
end

private def list_etcd_members
endpoints = @options["etcd-endpoints"]? || "localhost:2379"
etcd = LavinMQ::Etcd.new(endpoints)
members = etcd.member_list
abort "No members found. Is etcd running?" if members.empty?
output members, ["name", "peer_urls", "client_urls", "learner"]
end

private def list_in_sync_replicas
endpoints = @options["etcd-endpoints"]? || "localhost:2379"
prefix = @options["etcd-prefix"]? || "lavinmq"
etcd = LavinMQ::Etcd.new(endpoints)

isr_str = etcd.get("#{prefix}/isr") ||
abort "No ISR data found in etcd (key: #{prefix}/isr). Is LavinMQ running in a cluster?"
node_ids = isr_str.split(',').map(&.strip).reject(&.empty?)

# Build node_id -> address from etcd election candidates.
# Each candidate stores its advertised_uri under {prefix}/leader/{hex_lease_id}
# where hex_lease_id.to_i64(16).to_s(36) == node_id
candidates = etcd.get_prefix("#{prefix}/leader/")
id_to_uri = candidates.each_with_object(Hash(String, String).new) do |(key, uri), h|
hex = key.split('/').last
h[hex.to_i64(16).to_s(36)] = uri
end

leader_uri = etcd.election_leader("#{prefix}/leader")

output node_ids.map { |id|
address = id_to_uri[id]? || ""
role = if leader_uri.nil?
"unknown"
elsif address == leader_uri
"leader"
else
"follower"
end
{node_id: id, address: address, role: role}
}, ["node_id", "address", "role"]
end
end
Loading