diff --git a/spec/api/federation_links_spec.cr b/spec/api/federation_links_spec.cr new file mode 100644 index 0000000000..0e7ca8bf93 --- /dev/null +++ b/spec/api/federation_links_spec.cr @@ -0,0 +1,107 @@ +require "../spec_helper.cr" +require "uri" + +def create_federation_link(server, upstream_name = "spec-upstream", exchange_name = "spec-ex") + vhost = server.vhosts["/"] + upstream = LavinMQ::Federation::Upstream.new(vhost, upstream_name, server.amqp_url, exchange_name) + vhost.upstreams.add(upstream) + + vhost.declare_exchange(exchange_name, "topic", durable: false, auto_delete: false) + exchange = vhost.exchanges[exchange_name] + vhost.declare_queue("spec-q", durable: false, auto_delete: false) + queue = vhost.queues["spec-q"] + exchange.bind(queue, "#") + + link = upstream.link(exchange) + wait_for { link.state.running? } + {upstream, link} +end + +describe LavinMQ::HTTP::MainController do + describe "PUT /api/federation-links/:vhost/:upstream/:name/pause" do + it "should return 404 for non-existing link" do + with_http_server do |http, _s| + vhost_url = URI.encode_path_segment("/") + response = http.put("/api/federation-links/#{vhost_url}/non-existing/non-existing/pause") + response.status_code.should eq 404 + end + end + + it "should pause link and return 204" do + with_http_server do |http, s| + upstream, link = create_federation_link(s) + vhost_url = URI.encode_path_segment("/") + upstream_url = URI.encode_path_segment(upstream.name) + link_url = URI.encode_path_segment(link.name) + + response = http.put("/api/federation-links/#{vhost_url}/#{upstream_url}/#{link_url}/pause") + response.status_code.should eq 204 + + link.state.paused?.should be_true + ensure + upstream.try &.close + end + end + + it "should return 422 if link is already paused" do + with_http_server do |http, s| + upstream, link = create_federation_link(s) + link.pause + wait_for { link.state.paused? } + + vhost_url = URI.encode_path_segment("/") + upstream_url = URI.encode_path_segment(upstream.name) + link_url = URI.encode_path_segment(link.name) + + response = http.put("/api/federation-links/#{vhost_url}/#{upstream_url}/#{link_url}/pause") + response.status_code.should eq 422 + ensure + upstream.try &.close + end + end + end + + describe "PUT /api/federation-links/:vhost/:upstream/:name/resume" do + it "should return 404 for non-existing link" do + with_http_server do |http, _s| + vhost_url = URI.encode_path_segment("/") + response = http.put("/api/federation-links/#{vhost_url}/non-existing/non-existing/resume") + response.status_code.should eq 404 + end + end + + it "should resume a paused link and return 204" do + with_http_server do |http, s| + upstream, link = create_federation_link(s) + link.pause + wait_for { link.state.paused? } + + vhost_url = URI.encode_path_segment("/") + upstream_url = URI.encode_path_segment(upstream.name) + link_url = URI.encode_path_segment(link.name) + + response = http.put("/api/federation-links/#{vhost_url}/#{upstream_url}/#{link_url}/resume") + response.status_code.should eq 204 + + wait_for { link.state.running? } + ensure + upstream.try &.close + end + end + + it "should return 422 if link is already running" do + with_http_server do |http, s| + upstream, link = create_federation_link(s) + + vhost_url = URI.encode_path_segment("/") + upstream_url = URI.encode_path_segment(upstream.name) + link_url = URI.encode_path_segment(link.name) + + response = http.put("/api/federation-links/#{vhost_url}/#{upstream_url}/#{link_url}/resume") + response.status_code.should eq 422 + ensure + upstream.try &.close + end + end + end +end diff --git a/spec/upstream_spec.cr b/spec/upstream_spec.cr index 90d60be70d..d43f06d624 100644 --- a/spec/upstream_spec.cr +++ b/spec/upstream_spec.cr @@ -793,6 +793,136 @@ describe LavinMQ::Federation::Upstream do end end + describe "pause and resume" do + it "should pause an exchange link" do + with_amqp_server do |s| + vhost = s.vhosts["/"] + upstream = LavinMQ::Federation::Upstream.new(vhost, "ef pause test", s.amqp_url, "upstream_ex") + + with_channel(s) do |ch| + downstream_ex = ch.exchange("downstream_ex", "topic") + downstream_q = ch.queue("downstream_q") + downstream_q.bind(downstream_ex.name, "#") + link = upstream.link(vhost.exchanges[downstream_ex.name]) + wait_for { link.state.running? } + + link.pause + wait_for { link.state.paused? } + link.state.paused?.should be_true + end + ensure + upstream.try &.close + end + end + + it "should resume a paused exchange link" do + with_amqp_server do |s| + vhost = s.vhosts["/"] + upstream = LavinMQ::Federation::Upstream.new(vhost, "ef resume test", s.amqp_url, "upstream_ex") + + with_channel(s) do |ch| + downstream_ex = ch.exchange("downstream_ex", "topic") + downstream_q = ch.queue("downstream_q") + downstream_q.bind(downstream_ex.name, "#") + link = upstream.link(vhost.exchanges[downstream_ex.name]) + wait_for { link.state.running? } + + link.pause + wait_for { link.state.paused? } + + link.resume + wait_for { link.state.running? } + link.state.running?.should be_true + end + ensure + upstream.try &.close + end + end + + it "should not federate messages while paused" do + with_amqp_server do |s| + vhost = s.vhosts["/"] + upstream = LavinMQ::Federation::Upstream.new(vhost, "ef pause msg test", s.amqp_url, "upstream_ex") + + with_channel(s) do |ch| + downstream_ex = ch.exchange("downstream_ex", "topic") + downstream_q = ch.queue("downstream_q") + downstream_q.bind(downstream_ex.name, "#") + link = upstream.link(vhost.exchanges[downstream_ex.name]) + wait_for { link.state.running? } + + link.pause + wait_for { link.state.paused? } + + upstream_ex = ch.exchange("upstream_ex", "topic", passive: true) + upstream_ex.publish "should not arrive", "rk" + + # Give some time for the message to potentially arrive + 10.times { Fiber.yield } + sleep 50.milliseconds + + msgs = [] of AMQP::Client::DeliverMessage + downstream_q.subscribe { |msg| msgs << msg } + sleep 50.milliseconds + msgs.size.should eq 0 + end + ensure + upstream.try &.close + end + end + + it "should resume federating messages after resume" do + with_amqp_server do |s| + vhost = s.vhosts["/"] + upstream = LavinMQ::Federation::Upstream.new(vhost, "ef resume msg test", s.amqp_url, "upstream_ex") + + with_channel(s) do |ch| + downstream_ex = ch.exchange("downstream_ex", "topic") + downstream_q = ch.queue("downstream_q") + downstream_q.bind(downstream_ex.name, "#") + link = upstream.link(vhost.exchanges[downstream_ex.name]) + wait_for { link.state.running? } + + link.pause + wait_for { link.state.paused? } + + link.resume + wait_for { link.state.running? } + + upstream_ex = ch.exchange("upstream_ex", "topic", passive: true) + upstream_ex.publish "federate after resume", "rk" + msgs = [] of AMQP::Client::DeliverMessage + downstream_q.subscribe { |msg| msgs << msg } + wait_for { msgs.size == 1 } + msgs.first.body_io.to_s.should eq "federate after resume" + end + ensure + upstream.try &.close + end + end + + it "should show paused status in details_tuple" do + with_amqp_server do |s| + vhost = s.vhosts["/"] + upstream = LavinMQ::Federation::Upstream.new(vhost, "ef status test", s.amqp_url, "upstream_ex") + + with_channel(s) do |ch| + downstream_ex = ch.exchange("downstream_ex", "topic") + downstream_q = ch.queue("downstream_q") + downstream_q.bind(downstream_ex.name, "#") + link = upstream.link(vhost.exchanges[downstream_ex.name]) + wait_for { link.state.running? } + + link.pause + wait_for { link.state.paused? } + link.details_tuple[:status].should eq "paused" + end + ensure + upstream.try &.close + end + end + end + describe "exchange federation chain" do it "append to x-bound-from" do with_http_server do |_http, s| diff --git a/src/lavinmq/federation/link.cr b/src/lavinmq/federation/link.cr index 7bbec55b30..c75573e9bc 100644 --- a/src/lavinmq/federation/link.cr +++ b/src/lavinmq/federation/link.cr @@ -1,4 +1,5 @@ require "amqp-client" +require "digest/sha1" require "../observable" require "../logger" require "../sortable_json" @@ -22,6 +23,7 @@ module LavinMQ @upstream_channel : ::AMQP::Client::Channel? @metadata : ::Log::Metadata @state_changed = Channel(State?).new + @paused_file_path : String def initialize(@upstream : Upstream) @metadata = ::Log::Metadata.new(nil, {vhost: @upstream.vhost.name, upstream: @upstream.name}) @@ -29,6 +31,11 @@ module LavinMQ uri = @upstream.uri ui = uri.userinfo @scrubbed_uri = ui.nil? ? uri.to_s : uri.to_s.sub("#{ui}@", "") + hash = Digest::SHA1.hexdigest("#{@upstream.vhost.name}/#{@upstream.name}/#{name}") + @paused_file_path = File.join(Config.instance.data_dir, "federation.#{hash}.paused") + if File.exists?(@paused_file_path) + @state = State::Paused + end end def details_tuple @@ -54,11 +61,49 @@ module LavinMQ end def run + if @state.paused? + @state_changed.close + return + end @log.info { "Starting" } spawn(run_loop, name: "Federation link #{@upstream.vhost.name}/#{name}") Fiber.yield end + def pause + return if @state.terminated? + File.write(@paused_file_path, name) + @log.info { "Pausing federation link #{name}" } + state(State::Paused) + @upstream_connection.try &.close + end + + def resume + return unless @state.paused? + delete_paused_file + @log.info { "Resuming federation link #{name}" } + # Wait for the old run_loop fiber to fully exit. + # It closes @state_changed in its ensure block, which unblocks this receive. + old_channel = @state_changed + old_channel.receive? unless old_channel.closed? + @state_changed = Channel(State?).new + state(State::Stopped) + spawn(run_loop, name: "Federation link #{@upstream.vhost.name}/#{name}") + Fiber.yield + end + + def paused? + @state.paused? + end + + def running? + @state.running? + end + + def delete_paused_file + FileUtils.rm(@paused_file_path) if File.exists?(@paused_file_path) + end + private def state(state) @log.debug { "state change from=#{@state} to=#{state}" } @last_changed = RoughTime.unix_ms @@ -70,34 +115,41 @@ module LavinMQ # Does not trigger reconnect, but a graceful close def terminate return if @state.terminated? + delete_paused_file state(State::Terminating) @upstream_connection.try &.close end + private def should_stop_loop? + stop_link? || @state.paused? + end + private def run_loop loop do - break if stop_link? + break if should_stop_loop? state(State::Starting) start_link - break if stop_link? + break if should_stop_loop? state(State::Stopped) wait_before_reconnect - break if stop_link? + break if should_stop_loop? @log.info { "Federation try reconnect" } rescue ex - break if stop_link? + break if should_stop_loop? @log.info { "Federation link state=#{@state} error=#{ex.inspect}" } state(State::Stopped) @error = ex.message wait_before_reconnect - break if stop_link? + break if should_stop_loop? @log.info { "Federation try reconnect" } end @log.info { "Federation link stopped" } ensure - state(State::Terminated) + unless @state.paused? + state(State::Terminated) + @log.info { "Terminated" } + end @state_changed.close - @log.info { "Terminated" } end private def wait_before_reconnect @@ -108,6 +160,7 @@ module LavinMQ break when event = @state_changed.receive? break if stop_link?(event) + break if event.try &.paused? @log.debug { "#wait_before_reconnect @state_changed.received? triggerd " \ "@state_changed.closed?=#{@state_changed.closed?}" } end @@ -167,6 +220,7 @@ module LavinMQ ::AMQP::Client.start(upstream_uri) do |upstream_connection| upstream_connection.on_close do next if stop_link? + next if @state.paused? state(State::Stopped) end yield @upstream_connection = upstream_connection @@ -177,6 +231,7 @@ module LavinMQ Starting Running Stopped + Paused Terminating Terminated Error diff --git a/src/lavinmq/federation/upstream.cr b/src/lavinmq/federation/upstream.cr index 5cae82c9b8..251d996932 100644 --- a/src/lavinmq/federation/upstream.cr +++ b/src/lavinmq/federation/upstream.cr @@ -39,6 +39,10 @@ module LavinMQ @q_links.values + @ex_links.values end + def find_link(resource_name : String) : Link? + @q_links[resource_name]? || @ex_links[resource_name]? + end + # declare queue on upstream # consume queue and publish to downstream exchange # declare upstream exchange (passive) diff --git a/src/lavinmq/federation/upstream_store.cr b/src/lavinmq/federation/upstream_store.cr index d40459224f..ed217c37aa 100644 --- a/src/lavinmq/federation/upstream_store.cr +++ b/src/lavinmq/federation/upstream_store.cr @@ -61,6 +61,10 @@ module LavinMQ end end + def find_link(upstream_name : String, resource_name : String) : Upstream::Link? + @upstreams[upstream_name]?.try &.find_link(resource_name) + end + def link(name, resource : Queue | Exchange) @upstreams[name]?.try &.link(resource) end diff --git a/src/lavinmq/http/controller/main.cr b/src/lavinmq/http/controller/main.cr index b7ce6cf22e..76af770b16 100644 --- a/src/lavinmq/http/controller/main.cr +++ b/src/lavinmq/http/controller/main.cr @@ -159,6 +159,36 @@ module LavinMQ end end + put "/api/federation-links/:vhost/:upstream/:name/pause" do |context, params| + with_vhost(context, params) do |vhost| + if link = vhost.upstreams.find_link(params["upstream"], params["name"]) + if !link.running? + context.response.status_code = 422 + next + end + link.pause + context.response.status_code = 204 + else + context.response.status_code = 404 + end + end + end + + put "/api/federation-links/:vhost/:upstream/:name/resume" do |context, params| + with_vhost(context, params) do |vhost| + if link = vhost.upstreams.find_link(params["upstream"], params["name"]) + if !link.paused? + context.response.status_code = 422 + next + end + link.resume + context.response.status_code = 204 + else + context.response.status_code = 404 + end + end + end + get "/api/extensions" do |context, _params| Tuple.new.to_json(context.response) context diff --git a/static/js/federation.js b/static/js/federation.js index b73f99c1ef..1c438203f4 100644 --- a/static/js/federation.js +++ b/static/js/federation.js @@ -50,18 +50,43 @@ const upstreamsTable = Table.renderTable('upstreamTable', utOpts, (tr, item) => Table.renderCell(tr, 11, buttons, 'right') }) -const linksOpts = { url: linksUrl, keyColumns: ['vhost', 'name'], countId: 'links-count' } +const linksOpts = { url: linksUrl, keyColumns: ['vhost', 'upstream', 'resource'], countId: 'links-count' } -Table.renderTable('linksTable', linksOpts, (tr, item) => { +const linksTable = Table.renderTable('linksTable', linksOpts, (tr, item) => { const resourceDiv = document.createElement('span') resourceDiv.textContent = item.resource resourceDiv.appendChild(document.createElement('br')) resourceDiv.appendChild(document.createElement('small')).textContent = item.type Table.renderCell(tr, 0, item.vhost) - Table.renderCell(tr, 1, item.name) + Table.renderCell(tr, 1, item.upstream) Table.renderCell(tr, 2, decodeURI(item.uri)) Table.renderCell(tr, 3, resourceDiv) - Table.renderCell(tr, 4, item.timestamp) + Table.renderCell(tr, 4, item.status) + Table.renderCell(tr, 5, item.timestamp) + + const isRunning = ['running', 'starting'].includes(item.status) + const pauseLabel = isRunning ? 'Pause' : 'Resume' + const pauseBtn = DOM.button.edit({ + click: function () { + const action = isRunning ? 'pause' : 'resume' + const url = HTTP.url`api/federation-links/${item.vhost}/${item.upstream}/${item.resource}/${action}` + if (!window.confirm('Are you sure?')) return + HTTP.request('PUT', url) + .then(() => { + linksTable.reload() + DOM.toast(`Federation link ${item.resource} ${isRunning ? 'paused' : 'resumed'}`) + }) + .catch((err) => { + console.error(err) + DOM.toast.error(`Federation link ${item.resource} failed to ${isRunning ? 'pause' : 'resume'}`) + }) + }, + text: pauseLabel + }) + const buttons = document.createElement('div') + buttons.classList.add('buttons') + buttons.append(pauseBtn) + Table.renderCell(tr, 6, buttons, 'right') }) document.querySelector('#createUpstream').addEventListener('submit', function (evt) { diff --git a/views/federation.ecr b/views/federation.ecr index ab8e2b216e..64b52f113c 100644 --- a/views/federation.ecr +++ b/views/federation.ecr @@ -49,7 +49,9 @@