diff --git a/CHANGELOG.md b/CHANGELOG.md index 02ab134..f8d189f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 3.8.4 + +- Added `GqlSubscription` class for composable GraphQL subscription query construction. +- Added `LayrzConnector.subscribe()` method to open WebSocket subscriptions using the `graphql-transport-ws` protocol. + ## 3.8.3 - Added `attributes` field to `TableItem` and `TableItemInput` models to support asset attributes in workspace tables. diff --git a/lib/src/api/api.dart b/lib/src/api/api.dart index b457bcb..a983176 100644 --- a/lib/src/api/api.dart +++ b/lib/src/api/api.dart @@ -1,9 +1,11 @@ library; +import 'dart:async'; import 'dart:convert'; import 'package:dio/dio.dart'; import 'package:freezed_annotation/freezed_annotation.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; part 'api.freezed.dart'; part 'api.g.dart'; diff --git a/lib/src/api/src/api_connector.dart b/lib/src/api/src/api_connector.dart index bde6028..5f146bb 100644 --- a/lib/src/api/src/api_connector.dart +++ b/lib/src/api/src/api_connector.dart @@ -52,4 +52,92 @@ class LayrzConnector { 'operationName': null, }); } + + /// [subscribe] opens a WebSocket connection and executes a [GqlSubscription] using the + /// `graphql-transport-ws` protocol. Each server `next` message is emitted on the returned stream + /// as a decoded `Map`. The stream closes when the server sends `complete` or + /// the caller cancels the subscription. + Stream> subscribe(GqlSubscription gql) { + final wsUri = uri.replace(scheme: uri.scheme == 'https' ? 'wss' : 'ws'); + + final controller = StreamController>(); + WebSocketChannel? channel; + StreamSubscription? sub; + + // Use a unique subscription id per call. + final id = DateTime.now().microsecondsSinceEpoch.toString(); + + final wsHeaders = Map.from(headers)..remove('Content-Type'); + + Future connect() async { + channel = WebSocketChannel.connect( + wsUri, + protocols: ['graphql-transport-ws'], + ); + await channel!.ready; + + // Send connection_init with auth headers as payload. + channel!.sink.add(jsonEncode({'type': 'connection_init', 'payload': wsHeaders})); + + final variables = { + for (final v in gql.variables) + if (v.value != null) v.name: v.value, + }; + + sub = channel!.stream.listen( + (raw) { + final msg = jsonDecode(raw as String) as Map; + final type = msg['type'] as String?; + + switch (type) { + case 'connection_ack': + // Send the subscribe message once acknowledged. + channel!.sink.add(jsonEncode({ + 'id': id, + 'type': 'subscribe', + 'payload': { + 'query': gql.generated, + 'variables': variables, + 'operationName': null, + }, + })); + case 'next': + if (msg['id'] == id) { + final data = msg['payload']?['data']; + if (data is Map && !controller.isClosed) { + controller.add(data); + } + } + case 'error': + if (msg['id'] == id && !controller.isClosed) { + controller.addError(msg['payload'] ?? 'Subscription error'); + } + case 'complete': + if (msg['id'] == id) { + controller.close(); + } + } + }, + onError: (e) { + if (!controller.isClosed) controller.addError(e); + }, + onDone: () { + if (!controller.isClosed) controller.close(); + }, + ); + } + + connect(); + + controller.onCancel = () { + // Send complete to server before closing. + try { + channel?.sink.add(jsonEncode({'id': id, 'type': 'complete'})); + } catch (_) {} + sub?.cancel(); + channel?.sink.close(); + }; + + return controller.stream; + } } diff --git a/lib/src/api/src/gql_builder/gql.dart b/lib/src/api/src/gql_builder/gql.dart index 8fa7ff6..8c8dca6 100644 --- a/lib/src/api/src/gql_builder/gql.dart +++ b/lib/src/api/src/gql_builder/gql.dart @@ -41,7 +41,7 @@ abstract class Gql { buffer.write('}\n\n'); } - buffer.write(this is GqlMutation ? 'mutation' : 'query'); + buffer.write(this is GqlMutation ? 'mutation' : this is GqlSubscription ? 'subscription' : 'query'); if (name != null) { buffer.write(' $name'); @@ -50,8 +50,9 @@ abstract class Gql { if (variables.isNotEmpty) { buffer.write('('); buffer.write(variables.map((v) => '\$${v.name}: ${_renderType(v)}').join(', ')); - buffer.write(') {\n'); + buffer.write(')'); } + buffer.write(' {\n'); for (final field in fields) { buffer.write('${_writeField(field)}\n'); @@ -159,3 +160,8 @@ class GqlQuery extends Gql { class GqlMutation extends Gql { GqlMutation({super.name, required super.variables, super.fields, super.includeTypename}); } + +/// [GqlSubscription] represents a GraphQL subscription operation, used with [LayrzConnector.subscribe]. +class GqlSubscription extends Gql { + GqlSubscription({super.name, required super.variables, super.fields, super.includeTypename}); +} diff --git a/pubspec.yaml b/pubspec.yaml index 1214384..16aecd7 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,7 +1,7 @@ description: Layrz API models for Dart/Flutter. This package contains the models used by the Layrz API. name: layrz_models -version: "3.8.3" +version: "3.8.4" repository: https://github.com/goldenm-software/layrz_models environment: @@ -13,6 +13,7 @@ dependencies: sdk: flutter dio: ^5.9.0 + web_socket_channel: ^3.0.1 web: ^1.1.0 collection: ^1.18.0 recase: ^4.1.0