Skip to content

Commit 825cd99

Browse files
committed
Fix starting a queue on a specific node
The `:node` option was incorrectly preserved when starting a queue, which would crash it. Now the `:node` is dropped after scoping the start signal. Closes #1382
1 parent a68363f commit 825cd99

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

lib/oban.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -954,14 +954,15 @@ defmodule Oban do
954954
def start_queue(name \\ __MODULE__, [_ | _] = opts) do
955955
conf = config(name)
956956

957-
validate_queue_opts!(opts, ~w(local_only queue)a)
957+
validate_queue_opts!(opts, ~w(local_only node queue)a)
958958
validate_engine_meta!(conf, opts)
959959

960960
data =
961961
opts
962962
|> Map.new()
963963
|> Map.put(:action, :start)
964964
|> Map.put(:ident, scope_signal(conf, opts))
965+
|> Map.drop([:node])
965966

966967
Notifier.notify(conf, :signal, data)
967968
end

test/oban_test.exs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ defmodule ObanTest do
486486
assert_invalid_opts(name, :start_queue, wat: -1)
487487
end
488488

489-
test "starting individual queues dynamically" do
489+
test "starting queues dynamically" do
490490
name = start_supervised_oban!(queues: [alpha: 9])
491491

492492
assert :ok = Oban.start_queue(name, queue: :gamma, limit: 5, refresh_interval: 10)
@@ -500,7 +500,7 @@ defmodule ObanTest do
500500
end)
501501
end
502502

503-
test "starting individual queues only on the local node" do
503+
test "starting queues only on the local node" do
504504
name1 = start_supervised_oban!(queues: [])
505505
name2 = start_supervised_oban!(queues: [])
506506

@@ -511,6 +511,18 @@ defmodule ObanTest do
511511
refute supervised_queue?(name2, "alpha")
512512
end)
513513
end
514+
515+
test "starting queues on a specific node" do
516+
name1 = start_supervised_oban!(node: "worker.1", queues: [])
517+
name2 = start_supervised_oban!(node: "worker.2", queues: [])
518+
519+
assert :ok = Oban.start_queue(name1, queue: :alpha, limit: 1, node: "worker.1")
520+
521+
with_backoff(fn ->
522+
assert supervised_queue?(name1, "alpha")
523+
refute supervised_queue?(name2, "alpha")
524+
end)
525+
end
514526
end
515527

516528
describe "stop_queue/2" do

0 commit comments

Comments
 (0)