@@ -52,4 +52,92 @@ class LayrzConnector {
5252 'operationName' : null ,
5353 });
5454 }
55+
56+ /// [subscribe] opens a WebSocket connection and executes a [GqlSubscription] using the
57+ /// `graphql-transport-ws` protocol. Each server `next` message is emitted on the returned stream
58+ /// as a decoded `Map<String, dynamic>` . The stream closes when the server sends `complete` or
59+ /// the caller cancels the subscription.
60+ Stream <Map <String , dynamic >> subscribe (GqlSubscription gql) {
61+ final wsUri = uri.replace (scheme: uri.scheme == 'https' ? 'wss' : 'ws' );
62+
63+ final controller = StreamController <Map <String , dynamic >>();
64+ WebSocketChannel ? channel;
65+ StreamSubscription <dynamic >? sub;
66+
67+ // Use a unique subscription id per call.
68+ final id = DateTime .now ().microsecondsSinceEpoch.toString ();
69+
70+ final wsHeaders = Map <String , dynamic >.from (headers)..remove ('Content-Type' );
71+
72+ Future <void > connect () async {
73+ channel = WebSocketChannel .connect (
74+ wsUri,
75+ protocols: ['graphql-transport-ws' ],
76+ );
77+ await channel! .ready;
78+
79+ // Send connection_init with auth headers as payload.
80+ channel! .sink.add (jsonEncode ({'type' : 'connection_init' , 'payload' : wsHeaders}));
81+
82+ final variables = < String , dynamic > {
83+ for (final v in gql.variables)
84+ if (v.value != null ) v.name: v.value,
85+ };
86+
87+ sub = channel! .stream.listen (
88+ (raw) {
89+ final msg = jsonDecode (raw as String ) as Map <String , dynamic >;
90+ final type = msg['type' ] as String ? ;
91+
92+ switch (type) {
93+ case 'connection_ack' :
94+ // Send the subscribe message once acknowledged.
95+ channel! .sink.add (jsonEncode ({
96+ 'id' : id,
97+ 'type' : 'subscribe' ,
98+ 'payload' : {
99+ 'query' : gql.generated,
100+ 'variables' : variables,
101+ 'operationName' : null ,
102+ },
103+ }));
104+ case 'next' :
105+ if (msg['id' ] == id) {
106+ final data = msg['payload' ]? ['data' ];
107+ if (data is Map <String , dynamic > && ! controller.isClosed) {
108+ controller.add (data);
109+ }
110+ }
111+ case 'error' :
112+ if (msg['id' ] == id && ! controller.isClosed) {
113+ controller.addError (msg['payload' ] ?? 'Subscription error' );
114+ }
115+ case 'complete' :
116+ if (msg['id' ] == id) {
117+ controller.close ();
118+ }
119+ }
120+ },
121+ onError: (e) {
122+ if (! controller.isClosed) controller.addError (e);
123+ },
124+ onDone: () {
125+ if (! controller.isClosed) controller.close ();
126+ },
127+ );
128+ }
129+
130+ connect ();
131+
132+ controller.onCancel = () {
133+ // Send complete to server before closing.
134+ try {
135+ channel? .sink.add (jsonEncode ({'id' : id, 'type' : 'complete' }));
136+ } catch (_) {}
137+ sub? .cancel ();
138+ channel? .sink.close ();
139+ };
140+
141+ return controller.stream;
142+ }
55143}
0 commit comments