1
- import { AbstractPowerSyncDatabase , BaseListener , BaseObserver , CompilableQuery } from '@powersync/common' ;
1
+ import { AbstractPowerSyncDatabase , BaseListener , BaseObserver , CompilableQuery , Disposable } from '@powersync/common' ;
2
2
import { AdditionalOptions } from './hooks/useQuery' ;
3
3
4
4
export class Query < T > {
@@ -9,9 +9,10 @@ export class Query<T> {
9
9
10
10
export interface WatchedQueryListener extends BaseListener {
11
11
onUpdate : ( ) => void ;
12
+ disposed : ( ) => void ;
12
13
}
13
14
14
- export class WatchedQuery extends BaseObserver < WatchedQueryListener > {
15
+ export class WatchedQuery extends BaseObserver < WatchedQueryListener > implements Disposable {
15
16
readyPromise : Promise < void > ;
16
17
isReady : boolean = false ;
17
18
currentData : any [ ] | undefined ;
@@ -26,14 +27,12 @@ export class WatchedQuery extends BaseObserver<WatchedQueryListener> {
26
27
27
28
readonly query : Query < unknown > ;
28
29
readonly options : AdditionalOptions ;
29
- private disposer : ( ) => void ;
30
30
31
- constructor ( db : AbstractPowerSyncDatabase , query : Query < unknown > , options : AdditionalOptions , disposer : ( ) => void ) {
31
+ constructor ( db : AbstractPowerSyncDatabase , query : Query < unknown > , options : AdditionalOptions ) {
32
32
super ( ) ;
33
33
this . db = db ;
34
34
this . query = query ;
35
35
this . options = options ;
36
- this . disposer = disposer ;
37
36
38
37
this . readyPromise = new Promise ( ( resolve ) => {
39
38
this . resolveReady = resolve ;
@@ -89,6 +88,7 @@ export class WatchedQuery extends BaseObserver<WatchedQueryListener> {
89
88
90
89
async fetchData ( ) {
91
90
try {
91
+ await new Promise ( ( resolve ) => setTimeout ( resolve , 18000 ) ) ;
92
92
const result =
93
93
typeof this . query . rawQuery == 'string'
94
94
? await this . db . getAll ( this . query . sqlStatement , this . query . queryParameters )
@@ -106,7 +106,8 @@ export class WatchedQuery extends BaseObserver<WatchedQueryListener> {
106
106
if ( this . controller != null ) {
107
107
return ;
108
108
}
109
- if ( this . listeners . size == 0 && this . temporaryHolds . size == 0 ) {
109
+
110
+ if ( this . onUpdateListenersCount ( ) == 0 && this . temporaryHolds . size == 0 ) {
110
111
return ;
111
112
}
112
113
@@ -145,7 +146,7 @@ export class WatchedQuery extends BaseObserver<WatchedQueryListener> {
145
146
this . currentError = undefined ;
146
147
this . resolveReady ?.( ) ;
147
148
148
- this . iterateListeners ( ( l ) => l ? .onUpdate ( ) ) ;
149
+ this . iterateListeners ( ( l ) => l . onUpdate ?. ( ) ) ;
149
150
}
150
151
151
152
private setError ( error : any ) {
@@ -154,21 +155,29 @@ export class WatchedQuery extends BaseObserver<WatchedQueryListener> {
154
155
this . currentError = error ;
155
156
this . resolveReady ?.( ) ;
156
157
157
- this . iterateListeners ( ( l ) => l ?. onUpdate ( ) ) ;
158
+ this . iterateListeners ( ( l ) => l . onUpdate ?.( ) ) ;
159
+ }
160
+
161
+ private onUpdateListenersCount ( ) : number {
162
+ return Array . from ( this . listeners ) . filter ( ( listener ) => listener . onUpdate !== undefined ) . length ;
158
163
}
159
164
160
165
private maybeDispose ( ) {
161
- if ( this . listeners . size == 0 && this . temporaryHolds . size == 0 ) {
166
+ if ( this . onUpdateListenersCount ( ) == 0 && this . temporaryHolds . size == 0 ) {
162
167
this . controller ?. abort ( ) ;
163
168
this . controller = undefined ;
164
169
this . isReady = false ;
165
170
this . currentData = undefined ;
166
171
this . currentError = undefined ;
167
- this . disposer ?. ( ) ;
172
+ this . dispose ( ) ;
168
173
169
174
this . readyPromise = new Promise ( ( resolve , reject ) => {
170
175
this . resolveReady = resolve ;
171
176
} ) ;
172
177
}
173
178
}
179
+
180
+ async dispose ( ) {
181
+ this . iterateAsyncListeners ( async ( l ) => l . disposed ?.( ) ) ;
182
+ }
174
183
}
0 commit comments