1
- using Dotmim . Sync . Builders ;
1
+ using Dotmim . Sync . Batch ;
2
+ using Dotmim . Sync . Builders ;
2
3
using Dotmim . Sync . Enumerations ;
3
4
using Microsoft . Extensions . Logging ;
4
5
using System ;
@@ -38,6 +39,9 @@ public abstract partial class BaseOrchestrator
38
39
39
40
var schemaTables = message . Schema . Tables . SortByDependencies ( tab => tab . GetRelations ( ) . Select ( r => r . GetParentTable ( ) ) ) ;
40
41
42
+ // contains list of table that have been done
43
+ var doneTables = new List < SyncTable > ( ) ;
44
+
41
45
// Disable check constraints
42
46
// Because Sqlite does not support "PRAGMA foreign_keys=OFF" Inside a transaction
43
47
// Report this disabling constraints brefore opening a transaction
@@ -59,17 +63,27 @@ public abstract partial class BaseOrchestrator
59
63
// 1) Applying Inserts and Updates. Apply in table order
60
64
// -----------------------------------------------------
61
65
if ( hasChanges )
66
+ {
67
+ doneTables . Clear ( ) ;
62
68
foreach ( var table in schemaTables )
63
- await this . InternalApplyTableChangesAsync ( context , table , message , connection ,
69
+ {
70
+ await this . InternalApplyTableChangesAsync ( context , table , message , doneTables , connection ,
64
71
transaction , DataRowState . Modified , changesApplied , cancellationToken , progress ) . ConfigureAwait ( false ) ;
72
+ }
73
+ }
65
74
66
75
// -----------------------------------------------------
67
76
// 2) Applying Deletes. Do not apply deletes if we are in a new database
68
77
// -----------------------------------------------------
69
78
if ( ! message . IsNew && hasChanges )
79
+ {
80
+ doneTables . Clear ( ) ;
70
81
foreach ( var table in schemaTables . Reverse ( ) )
71
- await this . InternalApplyTableChangesAsync ( context , table , message , connection ,
82
+ {
83
+ await this . InternalApplyTableChangesAsync ( context , table , message , doneTables , connection ,
72
84
transaction , DataRowState . Deleted , changesApplied , cancellationToken , progress ) . ConfigureAwait ( false ) ;
85
+ }
86
+ }
73
87
74
88
// Re enable check constraints
75
89
if ( message . DisableConstraintsOnApplyChanges )
@@ -216,7 +230,7 @@ private async Task<bool> InternalApplyConflictUpdateAsync(SyncContext context, D
216
230
/// <summary>
217
231
/// Apply changes internal method for one type of query: Insert, Update or Delete for every batch from a table
218
232
/// </summary>
219
- private async Task InternalApplyTableChangesAsync ( SyncContext context , SyncTable schemaTable , MessageApplyChanges message ,
233
+ private async Task InternalApplyTableChangesAsync ( SyncContext context , SyncTable schemaTable , MessageApplyChanges message , List < SyncTable > doneTables ,
220
234
DbConnection connection , DbTransaction transaction , DataRowState applyType , DatabaseChangesApplied changesApplied ,
221
235
CancellationToken cancellationToken , IProgress < ProgressArgs > progress )
222
236
{
@@ -250,12 +264,19 @@ private async Task InternalApplyTableChangesAsync(SyncContext context, SyncTable
250
264
var enumerableOfTables = message . Changes . GetTableAsync ( schemaTable . TableName , schemaTable . SchemaName , message . SerializerFactory , this ) ;
251
265
var enumeratorOfTable = enumerableOfTables . GetAsyncEnumerator ( ) ;
252
266
267
+ // List of batchpartinfos during the iteration
268
+ var batchPartinfos = new List < BatchPartInfo > ( ) ;
269
+
253
270
// getting the table to be applied
254
271
// we may have multiple batch files, so we can have multipe sync tables with the same name
255
272
// We can say that dmTable may be contained in several files
256
273
while ( await enumeratorOfTable . MoveNextAsync ( ) )
257
274
{
258
- var syncTable = enumeratorOfTable . Current ;
275
+ var syncTable = enumeratorOfTable . Current . SyncTable ;
276
+
277
+ // add curent batch part info
278
+ if ( enumeratorOfTable . Current . BatchPartInfo != null )
279
+ batchPartinfos . Add ( enumeratorOfTable . Current . BatchPartInfo ) ;
259
280
260
281
if ( syncTable == null || syncTable . Rows == null || syncTable . Rows . Count == 0 )
261
282
continue ;
@@ -334,8 +355,46 @@ private async Task InternalApplyTableChangesAsync(SyncContext context, SyncTable
334
355
await this . InterceptAsync ( tableChangesBatchAppliedArgs , cancellationToken ) . ConfigureAwait ( false ) ;
335
356
this . ReportProgress ( context , progress , tableChangesBatchAppliedArgs , connection , transaction ) ;
336
357
}
358
+
359
+ }
360
+
361
+ // table processeed
362
+ // we can add it to the list of done tables
363
+ doneTables . Add ( schemaTable ) ;
364
+
365
+ // Let's see if we can close the batchpartinfo
366
+ // we can close it if all the tables contains in the bpiTables are already processed
367
+ foreach ( var batchPartinInfo in batchPartinfos )
368
+ {
369
+ var isDoneTable = false ;
370
+ // for each table in the current file
371
+ foreach ( var batchPartTableInfo in batchPartinInfo . Tables )
372
+ {
373
+ // check if all tables in batch part info are done
374
+ isDoneTable = doneTables . Any ( doneTable =>
375
+ {
376
+ var sc = SyncGlobalization . DataSourceStringComparison ;
377
+ var innerTableSchemaName = string . IsNullOrEmpty ( doneTable . SchemaName ) ? string . Empty : doneTable . SchemaName ;
378
+ var batchPartTableSchemaName = string . IsNullOrEmpty ( batchPartTableInfo . SchemaName ) ? string . Empty : batchPartTableInfo . SchemaName ;
379
+ return string . Equals ( doneTable . TableName , batchPartTableInfo . TableName , sc ) && string . Equals ( innerTableSchemaName , batchPartTableSchemaName ) ;
380
+ } ) ;
381
+
382
+ // the current table is not done yet, don't need to continue to iterate
383
+ // over the other tables information
384
+ if ( ! isDoneTable )
385
+ break ;
386
+
387
+ }
388
+
389
+ if ( isDoneTable )
390
+ {
391
+ batchPartinInfo . Data . Dispose ( ) ;
392
+ batchPartinInfo . Data = null ;
393
+ }
337
394
}
338
395
396
+
397
+
339
398
// Report the overall changes applied for the current table
340
399
if ( tableChangesApplied != null )
341
400
{
@@ -387,7 +446,7 @@ private async Task InternalApplyTableChangesAsync(SyncContext context, SyncTable
387
446
// Get command
388
447
var command = await syncAdapter . GetCommandAsync ( dbCommandType , connection , transaction ) ;
389
448
390
- if ( command == null ) return ( 0 , 0 ) ;
449
+ if ( command == null ) return ( 0 , 0 ) ;
391
450
392
451
// Launch any interceptor if available
393
452
var args = new TableChangesBatchApplyingArgs ( context , changesTable , applyType , command , connection , transaction ) ;
0 commit comments