Skip to content

Commit 2031ade

Browse files
committed
Add full support for subscription
Implemented according to https://spec.graphql.org/October2021/#sec-Subscription
1 parent 4fd3562 commit 2031ade

11 files changed

+500
-26
lines changed

README.md

+72-6
Original file line numberDiff line numberDiff line change
@@ -351,14 +351,16 @@ inject() ->
351351
'Faction' => faction_resource,
352352
...
353353
'Query' => query_resource,
354-
'Mutation' => mutation_resource
354+
'Mutation' => mutation_resource,
355+
'Subscription' => subscription_resource
355356
}
356357
},
357358
ok = graphql:load_schema(Map, SchemaData),
358359
Root = {root,
359360
#{
360361
query => 'Query',
361362
mutation => 'Mutation',
363+
subscription => 'Subscription',
362364
interfaces => []
363365
}},
364366
ok = graphql:insert_schema_definition(Root),
@@ -386,10 +388,14 @@ run(Doc, OpName, Vars, Req, State) ->
386388
ok = graphql:validate(AST2),
387389
Coerced = graphql:type_check_params(FunEnv, OpName, Vars),
388390
Ctx = #{ params => Coerced, operation_name => OpName },
389-
Response = graphql:execute(Ctx, AST2),
390-
Req2 = cowboy_req:set_resp_body(encode_json(Response), Req),
391-
{ok, Reply} = cowboy_req:reply(200, Req2),
392-
{halt, Reply, State}
391+
case graphql:execute(Ctx, AST2) of
392+
{subscription, Subscription, SubscriptionContext} ->
393+
{cowboy_loop, Req, store_subscription(Subscription, SubscriptionContext, State)};
394+
Response ->
395+
Req2 = cowboy_req:set_resp_body(encode_json(Response), Req),
396+
{ok, Reply} = cowboy_req:reply(200, Req2),
397+
{halt, Reply, State}
398+
end
393399
catch
394400
throw:Err ->
395401
err(400, Err, Req, State)
@@ -579,8 +585,15 @@ handling field requests in that object. The module looks like:
579585

580586
execute(Ctx, SrcObj, <<"f">>, Args) ->
581587
{ok, 42};
588+
execute(Ctx, SrcObj, <<"heavy">>, Args) ->
589+
Token = graphql:token(Ctxt),
590+
Worker = spawn_link(fun() ->
591+
Result = calculate_heavy_value(Args),
592+
graphql:reply_cast(Token, {ok, Result})
593+
end),
594+
{defer, Token, #{timeout => 5000, worker => Worker}};
582595
execute(Ctx, SrcObj, Field, Args) ->
583-
default
596+
default...
584597
```
585598

586599
The only function which is needed is the `execute/4` function which is
@@ -599,9 +612,62 @@ called by the system whenever a field is requested in that object. The
599612
of a backing store and then moving the *cursor* onto that object and
600613
calling the correct object resource for that type. The `SrcObj` is
601614
set to point to the object that is currently being operated upon.
615+
For `query` and `mutation` the **root** field `SrcObj` is set to `none`, for
616+
`subscription` it is set to `{subscription_value(), Event}`.
602617
* `Field` - The field in the object which is requested.
603618
* `Args` - A map of field arguments. See the next section.
604619

620+
Possible return values of `execute/4` callback are:
621+
622+
* `{ok, Value}` - the value of the field (either scalar for scalar fields or arbitrarily
623+
complex object for object fields); `Value` can be `null` for nullable fields
624+
* `{ok, [{ok, Value}, ..]}` for array fields
625+
* `{ok, Val, [Auxilary, ..]}` all the `Auxilary` lists returned during operation execution will be
626+
concatenated and returned as a `.aux` field in the result map.
627+
* `{error, Reason}` for execution errors (`Reason` is either `atom()` or `binary()` or record
628+
that will be used to construct the resulting `.errors` field)
629+
* `{defer, Token, undefined | #{timeout => timeout(), worker => pid()}}` tells graphql execution
630+
engine that the value of the field is calculated asynchronously and will be delivered later to it
631+
via `graphql:reply_cast/2`.
632+
* `timeout` - tells graphql engine to give-up waiting for the value after this many milliseconds
633+
* `worker` - tells graphql engine that the value is calculated by this Erlang process (worker),
634+
so the engine will monitor this process and will report error if the process crashes.
635+
* `{defer, Token}` same as `{defer, Token, undefined}`
636+
637+
Any value can be either returned from the function or thrown using `graphql:throw/1`.
638+
639+
### Subscription resolution modules
640+
641+
Subscription root object is mapped onto an Erlang module responsible for
642+
initiation of the subscription and for handling field requests in received events.
643+
The module is similar to [Output object Resources](#output-object-resources) but
644+
they have one extra callback.
645+
646+
```erlang
647+
-module(object_resource).
648+
649+
-export([subscribe/3, execute/4]).
650+
651+
subscribe(Ctx, <<"commentAdded">>, #{<<"topic">> := TopicId}) ->
652+
Subscription = #my_subscription{} = topic:subscribe_new_comments(TopicId),
653+
{subscription, Subscription}.
654+
655+
execute(Ctx, {#my_subscription{}, Comment}, <<"commentAdded">>, _Args) ->
656+
{ok, Comment}.
657+
658+
```
659+
660+
The `subscribe/3` callback is called whenever `subscription` operation is
661+
executed (it returns `{subscription, Subscription, OpaqueCtx}`). This callback
662+
should initialize the subscription and return `{subscription, Subscription}`
663+
tuple, where `Subscription` is whatever could be necessary to properly map
664+
the events received from pub-sub system to the correct subscription.
665+
666+
Whenever the new event `Event` is received from pubs-sub system,
667+
`graphql:handle_subscription_event(ExtraCtx, Subscription, OpaqueCtx, Event)`
668+
must be called which will call `execute/4` callback which will act the same
669+
way as described in [Output object Resources](#output-object-resources).
670+
605671
#### Field Argument rules
606672

607673
In GraphQL, field arguments follow a specific pattern:

src/graphql.erl

+32-6
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
type_check/1, type_check_params/3,
1313
insert_root/1,
1414
validate/1,
15-
execute/1, execute/2
15+
execute/1, execute/2,
16+
handle_subscription_event/4
1617
]).
1718

1819
-export([
@@ -45,9 +46,15 @@
4546
-type json() :: number() | binary() | true | false | null | #{ binary() | atom() => json() } | [json()] .
4647
-type param_context() :: json().
4748

49+
-type execute_result() :: #{data => graphql:json(),
50+
errors => [#{message := binary(),
51+
path => [binary()],
52+
extensions => map()}],
53+
aux => [any()]}.
54+
4855
-type schema_definition() :: {atom(), #{ atom() => term() }}.
4956

50-
-export_type([json/0, param_context/0]).
57+
-export_type([json/0, param_context/0, execute_result/0]).
5158

5259
-type token() :: {'$graphql_token', pid(), reference(), reference()}.
5360
-type defer_map() :: #{ worker => pid(),
@@ -57,10 +64,15 @@
5764
-type name() :: {name, pos_integer(), binary()} | binary().
5865
-type document() :: #document{}.
5966
-type directive() :: #directive{}.
60-
-export_type([directive/0,
6167

68+
-type subscription_value() :: any().
69+
-type subscription_ctx() :: graphql_execute:subscription_ctx().
70+
71+
-export_type([directive/0,
6272
token/0,
63-
schema_field/0]).
73+
schema_field/0,
74+
subscription_value/0,
75+
subscription_ctx/0]).
6476

6577
-define(DEFAULT_TIMEOUT, 3000).
6678

@@ -153,12 +165,14 @@ type_check(AST) ->
153165
type_check_params(FunEnv, OpName, Vars) ->
154166
graphql_check:check_params(FunEnv, OpName, Vars).
155167

156-
-spec execute(document()) -> #{ atom() => json() }.
168+
-spec execute(document()) -> execute_result()
169+
| {subscription, subscription_value(), subscription_ctx()}.
157170
execute(AST) ->
158171
Ctx = #{ params => #{}, default_timeout => ?DEFAULT_TIMEOUT },
159172
execute(Ctx, AST).
160173

161-
-spec execute(context(), document()) -> #{ atom() => json() }.
174+
-spec execute(context(), document()) -> execute_result()
175+
| {subscription, subscription_value(), subscription_ctx()}.
162176
execute(#{default_timeout := _DT } = Ctx, AST) ->
163177
graphql_execute:x(Ctx, AST);
164178
execute(Ctx, AST) ->
@@ -168,6 +182,18 @@ execute(Ctx, AST) ->
168182
Result -> Result
169183
end.
170184

185+
%% @doc Executes subscription MapSourceToResponseEvent step
186+
%% `subscription_value()' and `subscription_ctx()' are the values returned as
187+
%% `{subscription, SubValue, SubCtx} = graphql:execute(...)'
188+
%% A tuple `{Subscription, Event}' will be passed as 2nd argument to the `execute/4' callback
189+
%%
190+
%% @param ExtraCtx additional map fields that needs to be added to execution context
191+
%% @param Event arbitrary event that was produced by the subscription
192+
-spec handle_subscription_event(context(), subscription_value(), subscription_ctx(), any()) ->
193+
#{ atom() => json() }.
194+
handle_subscription_event(ExtraCtx, Subscription, SubscriptionCtx, Event) ->
195+
graphql_execute:x_subscription_event(ExtraCtx, Subscription, SubscriptionCtx, Event).
196+
171197
%% @doc insert_schema_definition/1 loads a schema definition into the Graph Schema
172198
%% @end
173199
-spec insert_schema_definition(schema_definition()) -> ok | {error, Reason}

src/graphql_execute.erl

+110-9
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
-compile(inline_list_funcs).
99
-compile({inline_size, 50}).
1010

11-
-export([x/1, x/2]).
11+
-export([x/1, x/2, x_subscription_event/4]).
1212
-export([builtin_input_coercer/1]).
13+
-export_type([subscription_ctx/0]).
14+
1315
-type source() :: reference().
1416
-type demonitor() :: {reference(), pid()} .
1517

@@ -74,27 +76,50 @@
7476
work = #{} :: #{ source() => defer_closure() },
7577
timeout :: non_neg_integer() }).
7678

77-
-spec x(graphql:ast()) -> #{ atom() => graphql:json() }.
79+
-record(subscription,
80+
{ orig_context,
81+
op,
82+
fragments}).
83+
84+
-opaque subscription_ctx() :: #subscription{}.
85+
86+
-spec x(graphql:ast()) ->
87+
graphql:execute_result()
88+
| {subscription, graphql:subscription_value(), subscription_ctx()}.
7889
x(X) -> x(#{ params => #{} }, X).
7990

80-
-spec x(term(), graphql:ast()) -> #{ atom() => graphql:json() }.
91+
-spec x(term(), graphql:ast()) ->
92+
graphql:execute_result()
93+
| {subscription, graphql:subscription_value(), subscription_ctx()}.
8194
x(Ctx, X) ->
8295
Canon = canon_context(Ctx),
8396
execute_request(Canon, X).
8497

98+
-spec x_subscription_event(map(), graphql:subscription_value(), subscription_ctx(), any()) ->
99+
#{atom() => graphql:json()}.
100+
x_subscription_event(ExtraCtx, Subscription,
101+
#subscription{orig_context = OrigContext,
102+
op = Op,
103+
fragments = Frags}, Event) ->
104+
CanonCtx = canon_context(maps:merge(OrigContext, ExtraCtx)),
105+
Ctx = CanonCtx#ectx{ frags = Frags,
106+
defer_request_id = make_ref() },
107+
InitialValue = {Subscription, Event},
108+
execute_query(Ctx#ectx{ op_type = subscription }, Op, InitialValue).
109+
85110
execute_request(InitialCtx, #document { definitions = Operations }) ->
86111
{Frags, Ops} = lists:partition(fun (#frag {}) -> true;(_) -> false end, Operations),
87112
Ctx = InitialCtx#ectx{ frags = fragments(Frags),
88113
defer_request_id = make_ref() },
89114
case get_operation(Ctx, Ops) of
90115
{ok, #op { ty = {query, _} } = Op } ->
91-
execute_query(Ctx#ectx{ op_type = query }, Op);
92-
{ok, #op { ty = {subscription, _} } = Op } ->
93-
execute_query(Ctx#ectx{ op_type = subscription }, Op);
116+
execute_query(Ctx#ectx{ op_type = query }, Op, none);
94117
{ok, #op { ty = undefined } = Op } ->
95-
execute_query(Ctx#ectx{ op_type = query }, Op);
118+
execute_query(Ctx#ectx{ op_type = query }, Op, none);
96119
{ok, #op { ty = {mutation, _} } = Op } ->
97120
execute_mutation(Ctx#ectx{ op_type = mutation }, Op);
121+
{ok, #op { ty = {subscription, _} } = Op } ->
122+
create_source_event_stream(Ctx#ectx{ op_type = subscription }, Op);
98123
{error, Reason} ->
99124
{error, Errs} = err(Ctx, Reason),
100125
complete_top_level(undefined, Errs)
@@ -124,12 +149,13 @@ collect_auxiliary_data() ->
124149
end.
125150

126151
execute_query(#ectx{ defer_request_id = ReqId } = Ctx,
127-
#op { selection_set = SSet, schema = QType }) ->
152+
#op { selection_set = SSet, schema = QType },
153+
InitialValue) ->
128154
#object_type{} = QType,
129155
case execute_sset(Ctx#ectx{ path = [],
130156
defer_process = self(),
131157
defer_target = top_level },
132-
SSet, QType, none) of
158+
SSet, QType, InitialValue) of
133159
{ok, Res, Errs} ->
134160
complete_top_level(Res, Errs);
135161
#work { items = WL, monitor = Ms, demonitors = [] , timeout = TimeOut} ->
@@ -154,6 +180,34 @@ execute_mutation(Ctx, #op { selection_set = SSet,
154180
complete_top_level(Res, Errs)
155181
end.
156182

183+
%% 6.2.3.1 https://spec.graphql.org/October2021/#sec-Source-Stream
184+
create_source_event_stream(#ectx{ frags = Frags, ctx = OrigCtx} = Ctx,
185+
#op { selection_set = SSet,
186+
schema = QType } = Op) ->
187+
#object_type{} = QType,
188+
GroupedFieldSet = collect_fields(Ctx, QType, SSet),
189+
case orddict:to_list(GroupedFieldSet) of
190+
[{Key, [Field]}] ->
191+
FieldName = name(Field),
192+
#schema_field { directives = Directives } = lookup_field(Field, QType),
193+
Args = resolve_args(Ctx, Field),
194+
Fun = subscribe_resolver_function(QType),
195+
CtxP = add_path(Ctx, Key),
196+
case resolve_field_event_stream(CtxP, QType, FieldName, Directives, Fun, Args) of
197+
{subscription, Sub} ->
198+
SubCtx = #subscription{orig_context = OrigCtx,
199+
op = Op,
200+
fragments = Frags},
201+
{subscription, Sub, SubCtx};
202+
{error, Reason} ->
203+
{error, Errs} = err(CtxP, Reason),
204+
complete_top_level(undefined, Errs)
205+
end;
206+
_Other ->
207+
{error, Errs} = err(Ctx, subscription_must_have_one_root_field),
208+
complete_top_level(undefined, Errs)
209+
end.
210+
157211
execute_sset(#ectx{ defer_target = DeferTarget } = Ctx, SSet, Type, Value) ->
158212
GroupedFields = collect_fields(Ctx, Type, SSet),
159213
Self = make_ref(),
@@ -557,6 +611,44 @@ resolve_field_value(#ectx { op_type = OpType,
557611
{error, {resolver_crash, M}}
558612
end.
559613

614+
resolve_field_event_stream(#ectx { op_type = OpType,
615+
ctx = CallerContext },
616+
#object_type { id = OID,
617+
directives = ODirectives} = ObjectType,
618+
Name, FDirectives, Fun, Args) ->
619+
AnnotatedCallerCtx =
620+
CallerContext#{ op_type => OpType,
621+
field => Name,
622+
field_directives => format_directives(FDirectives),
623+
object_type => OID,
624+
object_directives => format_directives(ODirectives)
625+
},
626+
try Fun(AnnotatedCallerCtx, Name, Args) of
627+
V ->
628+
case handle_subscribe_resolver_result(V) of
629+
wrong ->
630+
Obj = graphql_schema:id(ObjectType),
631+
report_wrong_return(Obj, Name, Fun, V),
632+
{error, {wrong_resolver_return, {Obj, Name}}};
633+
Res -> Res
634+
end
635+
catch
636+
throw:{'$graphql_throw', Msg} ->
637+
case handle_subscribe_resolver_result(Msg) of
638+
wrong ->
639+
Obj = graphql_schema:id(ObjectType),
640+
report_wrong_return(Obj, Name, Fun, Msg),
641+
{error, {wrong_resolver_return, {Obj, Name}}};
642+
Res -> Res
643+
end;
644+
?EXCEPTION(Cl, Err, Stacktrace) ->
645+
M = #{ type => graphql_schema:id(ObjectType),
646+
field => Name,
647+
stack => ?GET_STACK(Stacktrace),
648+
class => Cl,
649+
error => Err},
650+
{error, {resolver_crash, M}}
651+
end.
560652

561653
handle_resolver_result({error, Reason}) ->
562654
{error, {resolver_error, Reason}};
@@ -571,6 +663,12 @@ handle_resolver_result({defer, Token, DeferStateMap}) ->
571663
{defer, Token, DeferStateMap};
572664
handle_resolver_result(_Unknown) -> wrong.
573665

666+
handle_subscribe_resolver_result({subscription, V}) ->
667+
{subscription, V};
668+
handle_subscribe_resolver_result({error, Reason}) ->
669+
{error, {resolver_error, Reason}};
670+
handle_subscribe_resolver_result(_Unknown) -> wrong.
671+
574672
complete_value(Ctx, Ty, Fields, {ok, Value}) when is_binary(Ty) ->
575673
error_logger:warning_msg(
576674
"Canary: Type lookup during value completion for: ~p~n",
@@ -929,6 +1027,9 @@ resolver_function(#object_type {
9291027
resolver_function(#object_type { resolve_module = M }, undefined) ->
9301028
fun M:execute/4.
9311029

1030+
subscribe_resolver_function(#object_type { resolve_module = M }) ->
1031+
fun M:subscribe/3.
1032+
9321033
%% -- OUTPUT COERCION ------------------------------------
9331034

9341035
builtin_input_coercer(X) ->

0 commit comments

Comments
 (0)