Skip to content

Commit a3673d6

Browse files
author
tlonny
committed
Add unsafePullChangesAsyncGenerator to SyncArgs
unsafePullChangesAsyncGenerator is an alternative to pullChanges (now made optional), that returns an AsyncGenerator of PullResults. This should make large updates/initial syncs more tractable as the sync can be split up into several smaller chunks. 1. Updated TS types to be robust discriminated union (i.e. either pullChanges OR unsafePullChangesAsyncGenerator HAVE to be present) 2. Updated Flow types to make pullChanges and unsafePullChangesAsyncGenerator optional 3. Added an invariant check to ensure there is at least one pull strategy provided (for JS lads) 4. Added code that "lifts" the result from pullChanges into an AsyncGenerator - ensuring only 1 code path going forward 5. Added a loop that pulls of the generator and performs DB writes until the generator is exhausted. 6. Added tests 7. Added babel support for async generators
1 parent 5c46322 commit a3673d6

File tree

8 files changed

+175
-76
lines changed

8 files changed

+175
-76
lines changed

babel.config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ const plugins = [
4141
'@babel/plugin-proposal-nullish-coalescing-operator',
4242
'@babel/plugin-transform-shorthand-properties',
4343
'@babel/plugin-transform-spread',
44+
'@babel/plugin-transform-async-generator-functions',
4445
[
4546
'@babel/plugin-proposal-object-rest-spread',
4647
{

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
"@babel/plugin-syntax-flow": "^7.24.7",
8989
"@babel/plugin-syntax-jsx": "^7.24.7",
9090
"@babel/plugin-transform-arrow-functions": "^7.24.7",
91+
"@babel/plugin-transform-async-generator-functions": "^7.25.9",
9192
"@babel/plugin-transform-async-to-generator": "^7.24.7",
9293
"@babel/plugin-transform-block-scoping": "^7.24.7",
9394
"@babel/plugin-transform-classes": "^7.24.7",

src/sync/impl/__tests__/synchronize.test.js

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,66 @@ describe('synchronize', () => {
384384
expect(log.phase).toBe('ready to pull')
385385
expect(log.error).toBe(error)
386386
})
387+
it('streams pullChanges chunks from an async generator', async () => {
388+
const { database, projects } = makeDatabase()
389+
const onWillApplyRemoteChanges = jest.fn()
390+
391+
async function* chunkedPullChanges() {
392+
yield {
393+
changes: makeChangeSet({
394+
mock_projects: {
395+
created: [{ id: 'stream_project', name: 'remote-initial' }],
396+
},
397+
}),
398+
timestamp: 1500,
399+
}
400+
await delay(1)
401+
yield {
402+
changes: makeChangeSet({
403+
mock_projects: {
404+
updated: [{ id: 'stream_project', name: 'remote-updated' }],
405+
},
406+
}),
407+
timestamp: 2500,
408+
}
409+
}
410+
411+
await synchronize({
412+
database,
413+
unsafePullChangesAsyncGenerator: chunkedPullChanges,
414+
pushChanges: jest.fn(),
415+
onWillApplyRemoteChanges,
416+
})
417+
418+
expect(onWillApplyRemoteChanges).toHaveBeenCalledTimes(2)
419+
await expectSyncedAndMatches(projects, 'stream_project', { name: 'remote-updated' })
420+
expect(await getLastPulledAt(database)).toBe(2500)
421+
})
422+
it('rejects when chunked async generator yields no results', async () => {
423+
const { database } = makeDatabase()
424+
425+
async function* emptyGenerator() {}
426+
427+
await expectToRejectWithMessage(
428+
synchronize({
429+
database,
430+
unsafePullChangesAsyncGenerator: emptyGenerator,
431+
pushChanges: jest.fn(),
432+
}),
433+
/A pullChanges\(\) function must yield at least one result/,
434+
)
435+
})
436+
it('rejects when pullChanges and unsafePullChangesAsyncGenerator not provided', async () => {
437+
const { database } = makeDatabase()
438+
439+
await expectToRejectWithMessage(
440+
synchronize({
441+
database,
442+
pushChanges: jest.fn(),
443+
}),
444+
/Either pullChanges or unsafePullChangesAsyncGenerator must be provided/,
445+
)
446+
})
387447
it('can recover from push failure', async () => {
388448
const { database, projects } = makeDatabase()
389449

src/sync/impl/synchronize.d.ts

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,3 @@
11
import type { SyncArgs } from '../index'
22

3-
export default function synchronize({
4-
database,
5-
pullChanges,
6-
onDidPullChanges,
7-
pushChanges,
8-
sendCreatedAsUpdated,
9-
migrationsEnabledAtVersion,
10-
log,
11-
conflictResolver,
12-
_unsafeBatchPerCollection,
13-
unsafeTurbo,
14-
}: SyncArgs): Promise<void>
3+
export default function synchronize(params : SyncArgs): Promise<void>

src/sync/impl/synchronize.js

Lines changed: 90 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,16 @@ import {
1313
} from './index'
1414
import { ensureSameDatabase, isChangeSetEmpty, changeSetCount } from './helpers'
1515
import type { SyncArgs, Timestamp, SyncPullStrategy } from '../index'
16+
import type { SyncPullResult } from "..";
17+
18+
async function* liftToAsyncGenerator<T>(promise: Promise<T>): AsyncGenerator<T, void, void> {
19+
yield await promise
20+
}
1621

1722
export default async function synchronize({
1823
database,
1924
pullChanges,
25+
unsafePullChangesAsyncGenerator,
2026
onWillApplyRemoteChanges,
2127
onDidPullChanges,
2228
pushChanges,
@@ -27,14 +33,15 @@ export default async function synchronize({
2733
_unsafeBatchPerCollection,
2834
unsafeTurbo,
2935
}: SyncArgs): Promise<void> {
36+
3037
const resetCount = database._resetCount
3138
log && (log.startedAt = new Date())
3239
log && (log.phase = 'starting')
3340

3441
// TODO: Wrap the three computionally intensive phases in `requestIdleCallback`
3542

3643
// pull phase
37-
const lastPulledAt = await getLastPulledAt(database)
44+
let lastPulledAt = await getLastPulledAt(database)
3845
log && (log.lastPulledAt = lastPulledAt)
3946

4047
const { schemaVersion, migration, shouldSaveSchemaVersion } = await getMigrationInfo(
@@ -45,79 +52,101 @@ export default async function synchronize({
4552
)
4653
log && (log.phase = 'ready to pull')
4754

48-
// $FlowFixMe
49-
const pullResult = await pullChanges({
50-
lastPulledAt,
51-
schemaVersion,
52-
migration,
53-
})
54-
log && (log.phase = 'pulled')
55-
56-
let newLastPulledAt: Timestamp = (pullResult: any).timestamp
57-
const remoteChangeCount = pullResult.changes ? changeSetCount(pullResult.changes) : NaN
55+
let pullChunks: AsyncGenerator<SyncPullResult, void, void>
5856

59-
if (onWillApplyRemoteChanges) {
60-
await onWillApplyRemoteChanges({ remoteChangeCount })
57+
if(unsafePullChangesAsyncGenerator) {
58+
// $FlowFixMe
59+
pullChunks = unsafePullChangesAsyncGenerator({ lastPulledAt, schemaVersion, migration })
60+
} else if(pullChanges) {
61+
pullChunks = liftToAsyncGenerator(pullChanges({ lastPulledAt, schemaVersion, migration }))
62+
} else {
63+
invariant(false, 'Either pullChanges or unsafePullChangesAsyncGenerator must be provided')
6164
}
6265

63-
await database.write(async () => {
64-
ensureSameDatabase(database, resetCount)
65-
invariant(
66-
lastPulledAt === (await getLastPulledAt(database)),
67-
'[Sync] Concurrent synchronization is not allowed. More than one synchronize() call was running at the same time, and the later one was aborted before committing results to local database.',
68-
)
66+
let newLastPulledAt: Timestamp | null = null
67+
let result = await pullChunks.next()
68+
log && (log.phase = 'pulled')
69+
70+
// To answer your question - yes it would be nice to use a for await loop here
71+
// however, because of the use of 'fast-async' (see babel config), this syntax is *not* supported and will cause the code to hang...
72+
while(!result.done) {
73+
const pullResult = result.value
74+
let chunkNewLastPulledAt: Timestamp = (pullResult: any).timestamp
75+
newLastPulledAt = chunkNewLastPulledAt
76+
const remoteChangeCount = pullResult.changes ? changeSetCount(pullResult.changes) : NaN
6977

70-
if (unsafeTurbo) {
78+
if (onWillApplyRemoteChanges) {
79+
await onWillApplyRemoteChanges({ remoteChangeCount })
80+
}
81+
82+
await database.write(async () => {
83+
ensureSameDatabase(database, resetCount)
7184
invariant(
72-
!_unsafeBatchPerCollection,
73-
'unsafeTurbo must not be used with _unsafeBatchPerCollection',
85+
lastPulledAt === (await getLastPulledAt(database)),
86+
'[Sync] Concurrent synchronization is not allowed. More than one synchronize() call was running at the same time, and the later one was aborted before committing results to local database.',
7487
)
88+
89+
if (unsafeTurbo) {
90+
invariant(
91+
!_unsafeBatchPerCollection,
92+
'unsafeTurbo must not be used with _unsafeBatchPerCollection',
93+
)
94+
invariant(
95+
'syncJson' in pullResult || 'syncJsonId' in pullResult,
96+
'missing syncJson/syncJsonId',
97+
)
98+
invariant(lastPulledAt === null, 'unsafeTurbo can only be used as the first sync')
99+
100+
const syncJsonId = pullResult.syncJsonId || Math.floor(Math.random() * 1000000000)
101+
102+
if (pullResult.syncJson) {
103+
await database.adapter.provideSyncJson(syncJsonId, pullResult.syncJson)
104+
}
105+
106+
const resultRest = await database.adapter.unsafeLoadFromSync(syncJsonId)
107+
chunkNewLastPulledAt = resultRest.timestamp
108+
onDidPullChanges && onDidPullChanges(resultRest)
109+
}
110+
111+
log && (log.newLastPulledAt = chunkNewLastPulledAt)
75112
invariant(
76-
'syncJson' in pullResult || 'syncJsonId' in pullResult,
77-
'missing syncJson/syncJsonId',
113+
typeof chunkNewLastPulledAt === 'number' && chunkNewLastPulledAt > 0,
114+
`pullChanges() returned invalid timestamp ${chunkNewLastPulledAt}. timestamp must be a non-zero number`,
78115
)
79-
invariant(lastPulledAt === null, 'unsafeTurbo can only be used as the first sync')
80-
81-
const syncJsonId = pullResult.syncJsonId || Math.floor(Math.random() * 1000000000)
82116

83-
if (pullResult.syncJson) {
84-
await database.adapter.provideSyncJson(syncJsonId, pullResult.syncJson)
117+
if (!unsafeTurbo) {
118+
// $FlowFixMe
119+
const { changes: remoteChanges, ...resultRest } = pullResult
120+
log && (log.remoteChangeCount = remoteChangeCount)
121+
// $FlowFixMe
122+
await applyRemoteChanges(remoteChanges, {
123+
db: database,
124+
strategy: ((pullResult: any).experimentalStrategy: ?SyncPullStrategy),
125+
sendCreatedAsUpdated,
126+
log,
127+
conflictResolver,
128+
_unsafeBatchPerCollection,
129+
})
130+
onDidPullChanges && onDidPullChanges(resultRest)
85131
}
86132

87-
const resultRest = await database.adapter.unsafeLoadFromSync(syncJsonId)
88-
newLastPulledAt = resultRest.timestamp
89-
onDidPullChanges && onDidPullChanges(resultRest)
90-
}
91-
92-
log && (log.newLastPulledAt = newLastPulledAt)
93-
invariant(
94-
typeof newLastPulledAt === 'number' && newLastPulledAt > 0,
95-
`pullChanges() returned invalid timestamp ${newLastPulledAt}. timestamp must be a non-zero number`,
96-
)
97-
98-
if (!unsafeTurbo) {
99-
// $FlowFixMe
100-
const { changes: remoteChanges, ...resultRest } = pullResult
101-
log && (log.remoteChangeCount = remoteChangeCount)
102-
// $FlowFixMe
103-
await applyRemoteChanges(remoteChanges, {
104-
db: database,
105-
strategy: ((pullResult: any).experimentalStrategy: ?SyncPullStrategy),
106-
sendCreatedAsUpdated,
107-
log,
108-
conflictResolver,
109-
_unsafeBatchPerCollection,
110-
})
111-
onDidPullChanges && onDidPullChanges(resultRest)
112-
}
133+
log && (log.phase = 'applied remote changes')
134+
await setLastPulledAt(database, chunkNewLastPulledAt)
113135

114-
log && (log.phase = 'applied remote changes')
115-
await setLastPulledAt(database, newLastPulledAt)
136+
if (shouldSaveSchemaVersion) {
137+
await setLastPulledSchemaVersion(database, schemaVersion)
138+
}
139+
}, 'sync-synchronize-apply')
140+
lastPulledAt = await getLastPulledAt(database)
141+
result = await pullChunks.next()
142+
}
116143

117-
if (shouldSaveSchemaVersion) {
118-
await setLastPulledSchemaVersion(database, schemaVersion)
119-
}
120-
}, 'sync-synchronize-apply')
144+
if (newLastPulledAt === null) {
145+
invariant(
146+
typeof newLastPulledAt === 'number',
147+
`A pullChanges() function must yield at least one result`,
148+
)
149+
}
121150

122151
// push phase
123152
if (pushChanges) {

src/sync/index.d.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,8 @@ export type SyncConflictResolver = (
5656
resolved: DirtyRaw,
5757
) => DirtyRaw
5858

59-
export type SyncArgs = $Exact<{
59+
type BaseSyncArgs = $Exact<{
6060
database: Database
61-
pullChanges: (_: SyncPullArgs) => Promise<SyncPullResult>
6261
pushChanges?: (_: SyncPushArgs) => Promise<SyncPushResult | undefined | void>
6362
// version at which support for migration syncs was added - the version BEFORE first syncable migration
6463
migrationsEnabledAtVersion?: SchemaVersion
@@ -88,6 +87,16 @@ export type SyncArgs = $Exact<{
8887
onWillApplyRemoteChanges?: (info: $Exact<{ remoteChangeCount: number }>) => Promise<void>
8988
}>
9089

90+
type SimpleSyncArgs = $Exact<BaseSyncArgs & {
91+
pullChanges: (_: SyncPullArgs) => Promise<SyncPullResult>
92+
}>
93+
94+
type ChunkedSyncArgs = $Exact<BaseSyncArgs & {
95+
unsafePullChangesAsyncGenerator: (_: SyncPullArgs) => AsyncGenerator<SyncPullResult, void, void>
96+
}>
97+
98+
export type SyncArgs = SimpleSyncArgs | ChunkedSyncArgs
99+
91100
export function synchronize(args: SyncArgs): Promise<void>
92101

93102
export function hasUnsyncedChanges({ database }: $Exact<{ database: Database }>): Promise<boolean>

src/sync/index.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ export type SyncConflictResolver = (
8585
// TODO: JSDoc'ify this
8686
export type SyncArgs = $Exact<{
8787
database: Database,
88-
pullChanges: (SyncPullArgs) => Promise<SyncPullResult>,
8988
pushChanges?: (SyncPushArgs) => Promise<?SyncPushResult>,
9089
// version at which support for migration syncs was added - the version BEFORE first syncable migration
9190
migrationsEnabledAtVersion?: SchemaVersion,
@@ -113,6 +112,8 @@ export type SyncArgs = $Exact<{
113112
// when processing a very large sync (could be useful for replacement syncs). Note that remote change count
114113
// is NaN in turbo mode.
115114
onWillApplyRemoteChanges?: (info: $Exact<{ remoteChangeCount: number }>) => Promise<void>,
115+
pullChanges?: (_: SyncPullArgs) => Promise<SyncPullResult>,
116+
unsafePullChangesAsyncGenerator?: (_: SyncPullArgs) => AsyncGenerator<SyncPullResult, void, void>,
116117
}>
117118

118119
/**

yarn.lock

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,6 +1231,15 @@
12311231
dependencies:
12321232
"@babel/helper-plugin-utils" "^7.25.9"
12331233

1234+
"@babel/plugin-transform-async-generator-functions@^7.25.9":
1235+
version "7.25.9"
1236+
resolved "https://registry.yarnpkg.com/@babel/plugin-transform-async-generator-functions/-/plugin-transform-async-generator-functions-7.25.9.tgz#1b18530b077d18a407c494eb3d1d72da505283a2"
1237+
integrity sha512-RXV6QAzTBbhDMO9fWwOmwwTuYaiPbggWQ9INdZqAYeSHyG7FzQ+nOZaUUjNwKv9pV3aE4WFqFm1Hnbci5tBCAw==
1238+
dependencies:
1239+
"@babel/helper-plugin-utils" "^7.25.9"
1240+
"@babel/helper-remap-async-to-generator" "^7.25.9"
1241+
"@babel/traverse" "^7.25.9"
1242+
12341243
"@babel/plugin-transform-async-to-generator@^7.20.0":
12351244
version "7.24.7"
12361245
resolved "https://registry.yarnpkg.com/@babel/plugin-transform-async-to-generator/-/plugin-transform-async-to-generator-7.24.7.tgz#72a3af6c451d575842a7e9b5a02863414355bdcc"

0 commit comments

Comments
 (0)