Skip to content

Commit 6e6ed49

Browse files
committed
basics working
1 parent 9dc2b08 commit 6e6ed49

File tree

4 files changed

+26
-40
lines changed

4 files changed

+26
-40
lines changed

lib/absinthe/phase/subscription/subscribe_self.ex

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,14 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do
2323

2424
with {:ok, config} <- get_config(field, context, blueprint) do
2525
field_keys = get_field_keys(field, config)
26-
subscription_id = get_subscription_id(config, blueprint, options)
26+
context_id = get_context_id(config)
27+
document_id = get_document_id(config, blueprint, options)
2728

28-
for field_key <- field_keys,
29-
do: Absinthe.Subscription.subscribe(pubsub, field_key, subscription_id, blueprint)
29+
subscription_id = subscription_id(context_id, document_id)
30+
31+
for field_key <- field_keys do
32+
Absinthe.Subscription.subscribe(pubsub, blueprint, field_key, context_id, subscription_id)
33+
end
3034

3135
{:replace, blueprint,
3236
[
@@ -137,10 +141,7 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do
137141
end
138142
end
139143

140-
defp get_subscription_id(config, blueprint, options) do
141-
context_id = get_context_id(config)
142-
document_id = get_document_id(config, blueprint, options)
143-
144+
defp subscription_id(context_id, document_id) do
144145
"__absinthe__:doc:#{context_id}:#{document_id}"
145146
end
146147

lib/absinthe/subscription.ex

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,21 +107,22 @@ defmodule Absinthe.Subscription do
107107
defp fetch_fields(_, _), do: []
108108

109109
@doc false
110-
def subscribe(pubsub, field_key, doc_id, doc) do
110+
def subscribe(pubsub, doc, field_key, context_id, subscription_id) do
111111
dup_registry = pubsub |> registry_name(:duplicate)
112112
uniq_registry = pubsub |> registry_name(:unique)
113113

114114
doc_value = {
115-
doc_id,
115+
subscription_id,
116+
context_id,
116117
%{
117118
initial_phases: PipelineSerializer.pack(doc.initial_phases),
118119
source: doc.source
119120
}
120121
}
121122

122-
_ = Registry.register(uniq_registry, {self(), :context}, doc.execution.context)
123+
_ = Registry.register(uniq_registry, {self(), context_id}, doc.execution.context)
123124
{:ok, _} = Registry.register(dup_registry, field_key, doc_value)
124-
{:ok, _} = Registry.register(dup_registry, {self(), doc_id}, field_key)
125+
{:ok, _} = Registry.register(dup_registry, {self(), subscription_id}, field_key)
125126
end
126127

127128
@doc false
@@ -145,8 +146,8 @@ defmodule Absinthe.Subscription do
145146
dup_registry
146147
|> Registry.lookup(key)
147148
|> Enum.map(fn match ->
148-
{pid, {doc_id, doc}} = match
149-
{_, context} = Registry.lookup(uniq_registry, {pid, doc_id, :context})
149+
{pid, {doc_id, context_id, doc}} = match
150+
[{_, context}] = Registry.lookup(uniq_registry, {pid, context_id})
150151

151152
doc =
152153
Map.update!(doc, :initial_phases, fn phases ->

lib/absinthe/subscription/proxy_supervisor.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ defmodule Absinthe.Subscription.ProxySupervisor do
33

44
use Supervisor
55

6-
def start_link([pubsub, registry, pool_size]) do
7-
Supervisor.start_link(__MODULE__, {pubsub, registry, pool_size})
6+
def start_link([pubsub, pool_size]) do
7+
Supervisor.start_link(__MODULE__, {pubsub, pool_size})
88
end
99

1010
def init({pubsub, pool_size}) do

test/absinthe/execution/subscription_test.exs

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,15 @@ defmodule Absinthe.Execution.SubscriptionTest do
4848
field :id, :id
4949
field :name, :string
5050

51+
field :email, :string do
52+
resolve fn
53+
_, _, %{context: %{admin: true}} ->
54+
55+
_, _, _ ->
56+
{:error, "unauthorized"}
57+
end
58+
end
59+
5160
field :group, :group do
5261
resolve fn user, _, %{context: %{test_pid: pid}} ->
5362
batch({__MODULE__, :batch_get_group, pid}, nil, fn _results ->
@@ -393,31 +402,6 @@ defmodule Absinthe.Execution.SubscriptionTest do
393402
assert String.contains?(error_log, "boom")
394403
end
395404

396-
@tag :pending
397-
test "different subscription docs are batched together" do
398-
opts = [context: %{test_pid: self()}]
399-
400-
assert {:ok, %{"subscribed" => doc1}} =
401-
run_subscription("subscription { user { group { name } id} }", Schema, opts)
402-
403-
# different docs required for test, otherwise they get deduplicated from the start
404-
assert {:ok, %{"subscribed" => doc2}} =
405-
run_subscription("subscription { user { group { name } id name} }", Schema, opts)
406-
407-
user = %{id: "1", name: "Alicia", group: %{name: "Elixir Users"}}
408-
409-
Absinthe.Subscription.publish(PubSub, user, user: ["*", user.id])
410-
411-
assert_receive({:broadcast, %{topic: ^doc1, result: %{data: _}}})
412-
assert_receive({:broadcast, %{topic: ^doc2, result: %{data: %{"user" => user}}}})
413-
414-
assert user["group"]["name"] == "Elixir Users"
415-
416-
# we should get this just once due to batching
417-
assert_receive(:batch_get_group)
418-
refute_receive(:batch_get_group)
419-
end
420-
421405
test "subscription docs with different contexts don't leak context" do
422406
ctx1 = %{test_pid: self(), user: 1}
423407

0 commit comments

Comments
 (0)