Skip to content

Commit 5d7c35b

Browse files
author
tlonny
committed
Proposal for chunked pulls
1 parent f5c34eb commit 5d7c35b

File tree

1 file changed

+67
-61
lines changed

1 file changed

+67
-61
lines changed

src/sync/impl/synchronize.js

Lines changed: 67 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@ import {
1414
import { ensureSameDatabase, isChangeSetEmpty, changeSetCount } from './helpers'
1515
import type { SyncArgs, Timestamp, SyncPullStrategy } from '../index'
1616

17+
const liftToGenerator = async function* (promise) {
18+
const result = await promise
19+
yield result
20+
}
21+
1722
export default async function synchronize({
1823
database,
1924
pullChanges,
2025
onWillApplyRemoteChanges,
2126
onDidPullChanges,
2227
pushChanges,
28+
useAsyncGeneratorForPull = false,
2329
sendCreatedAsUpdated = false,
2430
migrationsEnabledAtVersion,
2531
log,
@@ -45,79 +51,79 @@ export default async function synchronize({
4551
)
4652
log && (log.phase = 'ready to pull')
4753

48-
// $FlowFixMe
49-
const pullResult = await pullChanges({
50-
lastPulledAt,
51-
schemaVersion,
52-
migration,
53-
})
54-
log && (log.phase = 'pulled')
54+
const pullGenerator = useAsyncGeneratorForPull
55+
? pullChanges({ lastPulledAt, schemaVersion, migration })
56+
: liftToGenerator(pullChanges({ lastPulledAt, schemaVersion, migration }))
5557

56-
let newLastPulledAt: Timestamp = (pullResult: any).timestamp
57-
const remoteChangeCount = pullResult.changes ? changeSetCount(pullResult.changes) : NaN
58+
for await (const pullResult of pullGenerator) {
59+
log && (log.phase = 'pulled')
5860

59-
if (onWillApplyRemoteChanges) {
60-
await onWillApplyRemoteChanges({ remoteChangeCount })
61-
}
61+
let newLastPulledAt: Timestamp = (pullResult: any).timestamp
62+
const remoteChangeCount = pullResult.changes ? changeSetCount(pullResult.changes) : NaN
6263

63-
await database.write(async () => {
64-
ensureSameDatabase(database, resetCount)
65-
invariant(
66-
lastPulledAt === (await getLastPulledAt(database)),
67-
'[Sync] Concurrent synchronization is not allowed. More than one synchronize() call was running at the same time, and the later one was aborted before committing results to local database.',
68-
)
64+
if (onWillApplyRemoteChanges) {
65+
await onWillApplyRemoteChanges({ remoteChangeCount })
66+
}
6967

70-
if (unsafeTurbo) {
71-
invariant(
72-
!_unsafeBatchPerCollection,
73-
'unsafeTurbo must not be used with _unsafeBatchPerCollection',
74-
)
68+
await database.write(async () => {
69+
ensureSameDatabase(database, resetCount)
7570
invariant(
76-
'syncJson' in pullResult || 'syncJsonId' in pullResult,
77-
'missing syncJson/syncJsonId',
71+
lastPulledAt === (await getLastPulledAt(database)),
72+
'[Sync] Concurrent synchronization is not allowed. More than one synchronize() call was running at the same time, and the later one was aborted before committing results to local database.',
7873
)
79-
invariant(lastPulledAt === null, 'unsafeTurbo can only be used as the first sync')
8074

81-
const syncJsonId = pullResult.syncJsonId || Math.floor(Math.random() * 1000000000)
82-
83-
if (pullResult.syncJson) {
84-
await database.adapter.provideSyncJson(syncJsonId, pullResult.syncJson)
75+
if (unsafeTurbo) {
76+
invariant(
77+
!_unsafeBatchPerCollection,
78+
'unsafeTurbo must not be used with _unsafeBatchPerCollection',
79+
)
80+
invariant(
81+
'syncJson' in pullResult || 'syncJsonId' in pullResult,
82+
'missing syncJson/syncJsonId',
83+
)
84+
invariant(lastPulledAt === null, 'unsafeTurbo can only be used as the first sync')
85+
86+
const syncJsonId = pullResult.syncJsonId || Math.floor(Math.random() * 1000000000)
87+
88+
if (pullResult.syncJson) {
89+
await database.adapter.provideSyncJson(syncJsonId, pullResult.syncJson)
90+
}
91+
92+
const resultRest = await database.adapter.unsafeLoadFromSync(syncJsonId)
93+
newLastPulledAt = resultRest.timestamp
94+
onDidPullChanges && onDidPullChanges(resultRest)
8595
}
8696

87-
const resultRest = await database.adapter.unsafeLoadFromSync(syncJsonId)
88-
newLastPulledAt = resultRest.timestamp
89-
onDidPullChanges && onDidPullChanges(resultRest)
90-
}
97+
log && (log.newLastPulledAt = newLastPulledAt)
98+
invariant(
99+
typeof newLastPulledAt === 'number' && newLastPulledAt > 0,
100+
`pullChanges() returned invalid timestamp ${newLastPulledAt}. timestamp must be a non-zero number`,
101+
)
91102

92-
log && (log.newLastPulledAt = newLastPulledAt)
93-
invariant(
94-
typeof newLastPulledAt === 'number' && newLastPulledAt > 0,
95-
`pullChanges() returned invalid timestamp ${newLastPulledAt}. timestamp must be a non-zero number`,
96-
)
97-
98-
if (!unsafeTurbo) {
99-
// $FlowFixMe
100-
const { changes: remoteChanges, ...resultRest } = pullResult
101-
log && (log.remoteChangeCount = remoteChangeCount)
102-
// $FlowFixMe
103-
await applyRemoteChanges(remoteChanges, {
104-
db: database,
105-
strategy: ((pullResult: any).experimentalStrategy: ?SyncPullStrategy),
106-
sendCreatedAsUpdated,
107-
log,
108-
conflictResolver,
109-
_unsafeBatchPerCollection,
110-
})
111-
onDidPullChanges && onDidPullChanges(resultRest)
112-
}
103+
if (!unsafeTurbo) {
104+
// $FlowFixMe
105+
const { changes: remoteChanges, ...resultRest } = pullResult
106+
log && (log.remoteChangeCount = remoteChangeCount)
107+
// $FlowFixMe
108+
await applyRemoteChanges(remoteChanges, {
109+
db: database,
110+
strategy: ((pullResult: any).experimentalStrategy: ?SyncPullStrategy),
111+
sendCreatedAsUpdated,
112+
log,
113+
conflictResolver,
114+
_unsafeBatchPerCollection,
115+
})
116+
onDidPullChanges && onDidPullChanges(resultRest)
117+
}
113118

114-
log && (log.phase = 'applied remote changes')
115-
await setLastPulledAt(database, newLastPulledAt)
119+
log && (log.phase = 'applied remote changes')
120+
await setLastPulledAt(database, newLastPulledAt)
116121

117-
if (shouldSaveSchemaVersion) {
118-
await setLastPulledSchemaVersion(database, schemaVersion)
119-
}
120-
}, 'sync-synchronize-apply')
122+
if (shouldSaveSchemaVersion) {
123+
await setLastPulledSchemaVersion(database, schemaVersion)
124+
}
125+
}, 'sync-synchronize-apply')
126+
}
121127

122128
// push phase
123129
if (pushChanges) {

0 commit comments

Comments
 (0)