@@ -76,6 +76,9 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
76
76
/// Count of record to be returned
77
77
int ? _limit;
78
78
79
+ /// Flag if the stream has at least one time been subscribed to realtime
80
+ bool _gotSubscribed = false ;
81
+
79
82
SupabaseStreamBuilder ({
80
83
required PostgrestQueryBuilder queryBuilder,
81
84
required String realtimeTopic,
@@ -209,14 +212,15 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
209
212
.subscribe ((status, [error]) {
210
213
switch (status) {
211
214
case RealtimeSubscribeStatus .subscribed:
212
- // Get first data when realtime is subscribed and reload all data
213
- // from postgrest if e.g. got a channel error and is resubscribed
214
- _getPostgrestData ();
215
+ // Reload all data after a reconnect from postgrest
216
+ // First data from postgrest gets loaded before the realtime connect
217
+ if (_gotSubscribed) {
218
+ _getPostgrestData ();
219
+ }
220
+ _gotSubscribed = true ;
215
221
break ;
216
222
case RealtimeSubscribeStatus .closed:
217
- if (! (_streamController? .isClosed ?? true )) {
218
- _streamController? .close ();
219
- }
223
+ _streamController? .close ();
220
224
break ;
221
225
case RealtimeSubscribeStatus .timedOut:
222
226
_addException (RealtimeSubscribeException (status, error));
@@ -226,6 +230,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
226
230
break ;
227
231
}
228
232
});
233
+ _getPostgrestData ();
229
234
}
230
235
231
236
Future <void > _getPostgrestData () async {
@@ -271,6 +276,10 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
271
276
_addStream ();
272
277
} catch (error, stackTrace) {
273
278
_addException (error, stackTrace);
279
+ // In case the postgrest call fails, there is no need to keep the
280
+ // realtime connection open
281
+ _channel? .unsubscribe ();
282
+ _streamController? .close ();
274
283
}
275
284
}
276
285
0 commit comments