Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions spec/api/federation_links_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
require "../spec_helper.cr"
require "uri"

def create_federation_link(server, upstream_name = "spec-upstream", exchange_name = "spec-ex")
vhost = server.vhosts["/"]
upstream = LavinMQ::Federation::Upstream.new(vhost, upstream_name, server.amqp_url, exchange_name)
vhost.upstreams.add(upstream)

vhost.declare_exchange(exchange_name, "topic", durable: false, auto_delete: false)
exchange = vhost.exchanges[exchange_name]
vhost.declare_queue("spec-q", durable: false, auto_delete: false)
queue = vhost.queues["spec-q"]
exchange.bind(queue, "#")

link = upstream.link(exchange)
wait_for { link.state.running? }
{upstream, link}
end

describe LavinMQ::HTTP::MainController do
describe "PUT /api/federation-links/:vhost/:upstream/:name/pause" do
it "should return 404 for non-existing link" do
with_http_server do |http, _s|
vhost_url = URI.encode_path_segment("/")
response = http.put("/api/federation-links/#{vhost_url}/non-existing/non-existing/pause")
response.status_code.should eq 404
end
end

it "should pause link and return 204" do
with_http_server do |http, s|
upstream, link = create_federation_link(s)
vhost_url = URI.encode_path_segment("/")
upstream_url = URI.encode_path_segment(upstream.name)
link_url = URI.encode_path_segment(link.name)

response = http.put("/api/federation-links/#{vhost_url}/#{upstream_url}/#{link_url}/pause")
response.status_code.should eq 204

link.state.paused?.should be_true
ensure
upstream.try &.close
end
end

it "should return 422 if link is already paused" do
with_http_server do |http, s|
upstream, link = create_federation_link(s)
link.pause
wait_for { link.state.paused? }

vhost_url = URI.encode_path_segment("/")
upstream_url = URI.encode_path_segment(upstream.name)
link_url = URI.encode_path_segment(link.name)

response = http.put("/api/federation-links/#{vhost_url}/#{upstream_url}/#{link_url}/pause")
response.status_code.should eq 422
ensure
upstream.try &.close
end
end
end

describe "PUT /api/federation-links/:vhost/:upstream/:name/resume" do
it "should return 404 for non-existing link" do
with_http_server do |http, _s|
vhost_url = URI.encode_path_segment("/")
response = http.put("/api/federation-links/#{vhost_url}/non-existing/non-existing/resume")
response.status_code.should eq 404
end
end

it "should resume a paused link and return 204" do
with_http_server do |http, s|
upstream, link = create_federation_link(s)
link.pause
wait_for { link.state.paused? }

vhost_url = URI.encode_path_segment("/")
upstream_url = URI.encode_path_segment(upstream.name)
link_url = URI.encode_path_segment(link.name)

response = http.put("/api/federation-links/#{vhost_url}/#{upstream_url}/#{link_url}/resume")
response.status_code.should eq 204

wait_for { link.state.running? }
ensure
upstream.try &.close
end
end

it "should return 422 if link is already running" do
with_http_server do |http, s|
upstream, link = create_federation_link(s)

vhost_url = URI.encode_path_segment("/")
upstream_url = URI.encode_path_segment(upstream.name)
link_url = URI.encode_path_segment(link.name)

response = http.put("/api/federation-links/#{vhost_url}/#{upstream_url}/#{link_url}/resume")
response.status_code.should eq 422
ensure
upstream.try &.close
end
end
end
end
130 changes: 130 additions & 0 deletions spec/upstream_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,136 @@ describe LavinMQ::Federation::Upstream do
end
end

describe "pause and resume" do
it "should pause an exchange link" do
with_amqp_server do |s|
vhost = s.vhosts["/"]
upstream = LavinMQ::Federation::Upstream.new(vhost, "ef pause test", s.amqp_url, "upstream_ex")

with_channel(s) do |ch|
downstream_ex = ch.exchange("downstream_ex", "topic")
downstream_q = ch.queue("downstream_q")
downstream_q.bind(downstream_ex.name, "#")
link = upstream.link(vhost.exchanges[downstream_ex.name])
wait_for { link.state.running? }

link.pause
wait_for { link.state.paused? }
link.state.paused?.should be_true
end
ensure
upstream.try &.close
end
end

it "should resume a paused exchange link" do
with_amqp_server do |s|
vhost = s.vhosts["/"]
upstream = LavinMQ::Federation::Upstream.new(vhost, "ef resume test", s.amqp_url, "upstream_ex")

with_channel(s) do |ch|
downstream_ex = ch.exchange("downstream_ex", "topic")
downstream_q = ch.queue("downstream_q")
downstream_q.bind(downstream_ex.name, "#")
link = upstream.link(vhost.exchanges[downstream_ex.name])
wait_for { link.state.running? }

link.pause
wait_for { link.state.paused? }

link.resume
wait_for { link.state.running? }
link.state.running?.should be_true
end
ensure
upstream.try &.close
end
end

it "should not federate messages while paused" do
with_amqp_server do |s|
vhost = s.vhosts["/"]
upstream = LavinMQ::Federation::Upstream.new(vhost, "ef pause msg test", s.amqp_url, "upstream_ex")

with_channel(s) do |ch|
downstream_ex = ch.exchange("downstream_ex", "topic")
downstream_q = ch.queue("downstream_q")
downstream_q.bind(downstream_ex.name, "#")
link = upstream.link(vhost.exchanges[downstream_ex.name])
wait_for { link.state.running? }

link.pause
wait_for { link.state.paused? }

upstream_ex = ch.exchange("upstream_ex", "topic", passive: true)
upstream_ex.publish "should not arrive", "rk"

# Give some time for the message to potentially arrive
10.times { Fiber.yield }
sleep 50.milliseconds

msgs = [] of AMQP::Client::DeliverMessage
downstream_q.subscribe { |msg| msgs << msg }
sleep 50.milliseconds
msgs.size.should eq 0
end
ensure
upstream.try &.close
end
end

it "should resume federating messages after resume" do
with_amqp_server do |s|
vhost = s.vhosts["/"]
upstream = LavinMQ::Federation::Upstream.new(vhost, "ef resume msg test", s.amqp_url, "upstream_ex")

with_channel(s) do |ch|
downstream_ex = ch.exchange("downstream_ex", "topic")
downstream_q = ch.queue("downstream_q")
downstream_q.bind(downstream_ex.name, "#")
link = upstream.link(vhost.exchanges[downstream_ex.name])
wait_for { link.state.running? }

link.pause
wait_for { link.state.paused? }

link.resume
wait_for { link.state.running? }

upstream_ex = ch.exchange("upstream_ex", "topic", passive: true)
upstream_ex.publish "federate after resume", "rk"
msgs = [] of AMQP::Client::DeliverMessage
downstream_q.subscribe { |msg| msgs << msg }
wait_for { msgs.size == 1 }
msgs.first.body_io.to_s.should eq "federate after resume"
end
ensure
upstream.try &.close
end
end

it "should show paused status in details_tuple" do
with_amqp_server do |s|
vhost = s.vhosts["/"]
upstream = LavinMQ::Federation::Upstream.new(vhost, "ef status test", s.amqp_url, "upstream_ex")

with_channel(s) do |ch|
downstream_ex = ch.exchange("downstream_ex", "topic")
downstream_q = ch.queue("downstream_q")
downstream_q.bind(downstream_ex.name, "#")
link = upstream.link(vhost.exchanges[downstream_ex.name])
wait_for { link.state.running? }

link.pause
wait_for { link.state.paused? }
link.details_tuple[:status].should eq "paused"
end
ensure
upstream.try &.close
end
end
end

describe "exchange federation chain" do
it "append to x-bound-from" do
with_http_server do |_http, s|
Expand Down
69 changes: 62 additions & 7 deletions src/lavinmq/federation/link.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "amqp-client"
require "digest/sha1"
require "../observable"
require "../logger"
require "../sortable_json"
Expand All @@ -22,13 +23,19 @@ module LavinMQ
@upstream_channel : ::AMQP::Client::Channel?
@metadata : ::Log::Metadata
@state_changed = Channel(State?).new
@paused_file_path : String

def initialize(@upstream : Upstream)
@metadata = ::Log::Metadata.new(nil, {vhost: @upstream.vhost.name, upstream: @upstream.name})
@log = Logger.new(Log, @metadata)
uri = @upstream.uri
ui = uri.userinfo
@scrubbed_uri = ui.nil? ? uri.to_s : uri.to_s.sub("#{ui}@", "")
hash = Digest::SHA1.hexdigest("#{@upstream.vhost.name}/#{@upstream.name}/#{name}")
@paused_file_path = File.join(Config.instance.data_dir, "federation.#{hash}.paused")
if File.exists?(@paused_file_path)
@state = State::Paused
end
end

def details_tuple
Expand All @@ -54,11 +61,49 @@ module LavinMQ
end

def run
if @state.paused?
@state_changed.close
return
end
@log.info { "Starting" }
spawn(run_loop, name: "Federation link #{@upstream.vhost.name}/#{name}")
Fiber.yield
end

def pause
return if @state.terminated?
File.write(@paused_file_path, name)
@log.info { "Pausing federation link #{name}" }
state(State::Paused)
@upstream_connection.try &.close
end

def resume
return unless @state.paused?
delete_paused_file
@log.info { "Resuming federation link #{name}" }
# Wait for the old run_loop fiber to fully exit.
# It closes @state_changed in its ensure block, which unblocks this receive.
old_channel = @state_changed
old_channel.receive? unless old_channel.closed?
@state_changed = Channel(State?).new
state(State::Stopped)
spawn(run_loop, name: "Federation link #{@upstream.vhost.name}/#{name}")
Fiber.yield
end

def paused?
@state.paused?
end

def running?
@state.running?
end

def delete_paused_file
FileUtils.rm(@paused_file_path) if File.exists?(@paused_file_path)
end

private def state(state)
@log.debug { "state change from=#{@state} to=#{state}" }
@last_changed = RoughTime.unix_ms
Expand All @@ -70,34 +115,41 @@ module LavinMQ
# Does not trigger reconnect, but a graceful close
def terminate
return if @state.terminated?
delete_paused_file
state(State::Terminating)
@upstream_connection.try &.close
end

private def should_stop_loop?
stop_link? || @state.paused?
end

private def run_loop
loop do
break if stop_link?
break if should_stop_loop?
state(State::Starting)
start_link
break if stop_link?
break if should_stop_loop?
state(State::Stopped)
wait_before_reconnect
break if stop_link?
break if should_stop_loop?
@log.info { "Federation try reconnect" }
rescue ex
break if stop_link?
break if should_stop_loop?
@log.info { "Federation link state=#{@state} error=#{ex.inspect}" }
state(State::Stopped)
@error = ex.message
wait_before_reconnect
break if stop_link?
break if should_stop_loop?
@log.info { "Federation try reconnect" }
end
@log.info { "Federation link stopped" }
ensure
state(State::Terminated)
unless @state.paused?
state(State::Terminated)
@log.info { "Terminated" }
end
@state_changed.close
@log.info { "Terminated" }
end

private def wait_before_reconnect
Expand All @@ -108,6 +160,7 @@ module LavinMQ
break
when event = @state_changed.receive?
break if stop_link?(event)
break if event.try &.paused?
@log.debug { "#wait_before_reconnect @state_changed.received? triggerd " \
"@state_changed.closed?=#{@state_changed.closed?}" }
end
Expand Down Expand Up @@ -167,6 +220,7 @@ module LavinMQ
::AMQP::Client.start(upstream_uri) do |upstream_connection|
upstream_connection.on_close do
next if stop_link?
next if @state.paused?
state(State::Stopped)
end
yield @upstream_connection = upstream_connection
Expand All @@ -177,6 +231,7 @@ module LavinMQ
Starting
Running
Stopped
Paused
Terminating
Terminated
Error
Expand Down
Loading
Loading