Skip to content

Commit 177c08b

Browse files
committed
Add interceptor on connection open just after connection is opened and not after transaction is created
1 parent 6005837 commit 177c08b

10 files changed

+200
-93
lines changed

Projects/Dotmim.Sync.Core/CoreProvider.ApplyChanges.cs

+67-63
Original file line numberDiff line numberDiff line change
@@ -31,88 +31,93 @@ public abstract partial class CoreProvider
3131
using (connection = this.CreateConnection())
3232
{
3333
await connection.OpenAsync();
34+
await this.InterceptAsync(new ConnectionOpenArgs(context, connection));
3435

3536
// Create a transaction
36-
applyTransaction = connection.BeginTransaction();
37-
38-
context.SyncStage = SyncStage.DatabaseChangesApplying;
37+
using (applyTransaction = connection.BeginTransaction())
38+
{
3939

40-
// Launch any interceptor if available
41-
await this.InterceptAsync(new ConnectionOpenArgs(context, connection, applyTransaction));
42-
await this.InterceptAsync(new DatabaseChangesApplyingArgs(context, connection, applyTransaction));
40+
await this.InterceptAsync(new TransactionOpenArgs(context, connection, applyTransaction));
4341

44-
// Disable check constraints
45-
if (this.Options.DisableConstraintsOnApplyChanges)
46-
changeApplicationAction = this.DisableConstraints(context, message.Schema, connection, applyTransaction, message.FromScope);
42+
context.SyncStage = SyncStage.DatabaseChangesApplying;
4743

48-
// -----------------------------------------------------
49-
// 0) Check if we are in a reinit mode
50-
// -----------------------------------------------------
51-
if (context.SyncWay == SyncWay.Download && context.SyncType != SyncType.Normal)
52-
{
53-
changeApplicationAction = this.ResetInternal(context, message.Schema, connection, applyTransaction, message.FromScope);
44+
// Launch any interceptor if available
45+
await this.InterceptAsync(new DatabaseChangesApplyingArgs(context, connection, applyTransaction));
5446

55-
// Rollback
56-
if (changeApplicationAction == ChangeApplicationAction.Rollback)
57-
throw new SyncException("Rollback during reset tables", context.SyncStage, SyncExceptionType.Rollback);
58-
}
47+
// Disable check constraints
48+
if (this.Options.DisableConstraintsOnApplyChanges)
49+
changeApplicationAction = this.DisableConstraints(context, message.Schema, connection, applyTransaction, message.FromScope);
5950

60-
// -----------------------------------------------------
61-
// 1) Applying deletes. Do not apply deletes if we are in a new database
62-
// -----------------------------------------------------
63-
if (!message.FromScope.IsNewScope)
64-
{
65-
// for delete we must go from Up to Down
66-
foreach (var table in message.Schema.Tables.Reverse())
51+
// -----------------------------------------------------
52+
// 0) Check if we are in a reinit mode
53+
// -----------------------------------------------------
54+
if (context.SyncWay == SyncWay.Download && context.SyncType != SyncType.Normal)
6755
{
68-
changeApplicationAction = await this.ApplyChangesInternalAsync(table, context, message, connection,
69-
applyTransaction, DmRowState.Deleted, changesApplied);
56+
changeApplicationAction = this.ResetInternal(context, message.Schema, connection, applyTransaction, message.FromScope);
57+
58+
// Rollback
59+
if (changeApplicationAction == ChangeApplicationAction.Rollback)
60+
throw new SyncException("Rollback during reset tables", context.SyncStage, SyncExceptionType.Rollback);
7061
}
7162

72-
// Rollback
73-
if (changeApplicationAction == ChangeApplicationAction.Rollback)
63+
// -----------------------------------------------------
64+
// 1) Applying deletes. Do not apply deletes if we are in a new database
65+
// -----------------------------------------------------
66+
if (!message.FromScope.IsNewScope)
7467
{
75-
RaiseRollbackException(context, "Rollback during applying deletes");
76-
}
77-
}
68+
// for delete we must go from Up to Down
69+
foreach (var table in message.Schema.Tables.Reverse())
70+
{
71+
changeApplicationAction = await this.ApplyChangesInternalAsync(table, context, message, connection,
72+
applyTransaction, DmRowState.Deleted, changesApplied);
73+
}
7874

79-
// -----------------------------------------------------
80-
// 2) Applying Inserts and Updates. Apply in table order
81-
// -----------------------------------------------------
82-
foreach (var table in message.Schema.Tables)
83-
{
84-
changeApplicationAction = await this.ApplyChangesInternalAsync(table, context, message, connection,
85-
applyTransaction, DmRowState.Added, changesApplied);
75+
// Rollback
76+
if (changeApplicationAction == ChangeApplicationAction.Rollback)
77+
{
78+
RaiseRollbackException(context, "Rollback during applying deletes");
79+
}
80+
}
8681

87-
// Rollback
88-
if (changeApplicationAction == ChangeApplicationAction.Rollback)
82+
// -----------------------------------------------------
83+
// 2) Applying Inserts and Updates. Apply in table order
84+
// -----------------------------------------------------
85+
foreach (var table in message.Schema.Tables)
8986
{
90-
RaiseRollbackException(context, "Rollback during applying inserts");
91-
}
87+
changeApplicationAction = await this.ApplyChangesInternalAsync(table, context, message, connection,
88+
applyTransaction, DmRowState.Added, changesApplied);
9289

93-
changeApplicationAction = await this.ApplyChangesInternalAsync(table, context, message, connection,
94-
applyTransaction, DmRowState.Modified, changesApplied);
90+
// Rollback
91+
if (changeApplicationAction == ChangeApplicationAction.Rollback)
92+
{
93+
RaiseRollbackException(context, "Rollback during applying inserts");
94+
}
9595

96-
// Rollback
97-
if (changeApplicationAction == ChangeApplicationAction.Rollback)
98-
{
99-
RaiseRollbackException(context, "Rollback during applying updates");
96+
changeApplicationAction = await this.ApplyChangesInternalAsync(table, context, message, connection,
97+
applyTransaction, DmRowState.Modified, changesApplied);
98+
99+
// Rollback
100+
if (changeApplicationAction == ChangeApplicationAction.Rollback)
101+
{
102+
RaiseRollbackException(context, "Rollback during applying updates");
103+
}
100104
}
101-
}
102105

103106

104-
// Progress & Interceptor
105-
context.SyncStage = SyncStage.DatabaseChangesApplied;
106-
var databaseChangesAppliedArgs = new DatabaseChangesAppliedArgs(context, connection, applyTransaction);
107-
this.ReportProgress(context, databaseChangesAppliedArgs, connection, applyTransaction);
108-
await this.InterceptAsync(databaseChangesAppliedArgs);
107+
// Progress & Interceptor
108+
context.SyncStage = SyncStage.DatabaseChangesApplied;
109+
var databaseChangesAppliedArgs = new DatabaseChangesAppliedArgs(context, connection, applyTransaction);
110+
this.ReportProgress(context, databaseChangesAppliedArgs, connection, applyTransaction);
111+
await this.InterceptAsync(databaseChangesAppliedArgs);
109112

110-
// Re enable check constraints
111-
if (this.Options.DisableConstraintsOnApplyChanges)
112-
changeApplicationAction = this.EnableConstraints(context, message.Schema, connection, applyTransaction, message.FromScope);
113+
// Re enable check constraints
114+
if (this.Options.DisableConstraintsOnApplyChanges)
115+
changeApplicationAction = this.EnableConstraints(context, message.Schema, connection, applyTransaction, message.FromScope);
113116

114-
applyTransaction.Commit();
117+
await this.InterceptAsync(new TransactionCommitArgs(context, connection, applyTransaction));
118+
applyTransaction.Commit();
115119

120+
}
116121

117122
return (context, changesApplied);
118123
}
@@ -383,9 +388,8 @@ internal async Task<ChangeApplicationAction> ApplyChangesInternalAsync(
383388
/// </summary>
384389
internal async Task<(ChangeApplicationAction, int, DmRow)> HandleConflictAsync(DbSyncAdapter syncAdapter, SyncContext context, SyncConflict conflict, ConflictResolutionPolicy policy, ScopeInfo scope, long fromScopeLocalTimeStamp, DbConnection connection, DbTransaction transaction)
385390
{
386-
DmRow finalRow = null;
387-
var conflictApplyAction = ApplyAction.Continue;
388-
391+
DmRow finalRow;
392+
ApplyAction conflictApplyAction;
389393
(conflictApplyAction, finalRow) = await this.GetConflictActionAsync(context, conflict, policy, connection, transaction);
390394

391395
// Default behavior and an error occured

Projects/Dotmim.Sync.Core/CoreProvider.Database.cs

+18-9
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public abstract partial class CoreProvider
2424
public async Task DeprovisionAsync(SyncConfiguration configuration, SyncProvision provision)
2525
{
2626
DbConnection connection = null;
27+
DbTransaction transaction = null;
2728
try
2829
{
2930
if (configuration.Schema == null || !configuration.Schema.HasTables)
@@ -34,10 +35,11 @@ public async Task DeprovisionAsync(SyncConfiguration configuration, SyncProvisio
3435
using (connection = this.CreateConnection())
3536
{
3637
await connection.OpenAsync();
38+
await this.InterceptAsync(new ConnectionOpenArgs(null, connection));
3739

38-
using (var transaction = connection.BeginTransaction())
40+
using (transaction = connection.BeginTransaction())
3941
{
40-
await this.InterceptAsync(new ConnectionOpenArgs(null, connection, transaction));
42+
await this.InterceptAsync(new TransactionOpenArgs(null, connection, transaction));
4143

4244
// Load the configuration
4345
this.ReadSchema(configuration.Schema, connection, transaction);
@@ -87,6 +89,7 @@ public async Task DeprovisionAsync(SyncConfiguration configuration, SyncProvisio
8789
// Launch any interceptor if available
8890
await this.InterceptAsync(new DatabaseDeprovisionedArgs(null, provision, configuration.Schema, null, connection, transaction));
8991

92+
await this.InterceptAsync(new TransactionCommitArgs(null, connection, transaction));
9093
transaction.Commit();
9194
}
9295
}
@@ -100,7 +103,7 @@ public async Task DeprovisionAsync(SyncConfiguration configuration, SyncProvisio
100103
if (connection != null && connection.State != ConnectionState.Closed)
101104
connection.Close();
102105

103-
await this.InterceptAsync(new ConnectionCloseArgs(null, connection, null));
106+
await this.InterceptAsync(new ConnectionCloseArgs(null, connection, transaction));
104107
}
105108

106109
}
@@ -111,6 +114,7 @@ public async Task DeprovisionAsync(SyncConfiguration configuration, SyncProvisio
111114
public async Task ProvisionAsync(SyncConfiguration configuration, SyncProvision provision)
112115
{
113116
DbConnection connection = null;
117+
DbTransaction transaction = null;
114118

115119
try
116120
{
@@ -122,10 +126,11 @@ public async Task ProvisionAsync(SyncConfiguration configuration, SyncProvision
122126
using (connection = this.CreateConnection())
123127
{
124128
await connection.OpenAsync();
129+
await this.InterceptAsync(new ConnectionOpenArgs(null, connection));
125130

126-
using (var transaction = connection.BeginTransaction())
131+
using (transaction = connection.BeginTransaction())
127132
{
128-
await this.InterceptAsync(new ConnectionOpenArgs(null, connection, transaction));
133+
await this.InterceptAsync(new TransactionOpenArgs(null, connection, transaction));
129134

130135
// Load the configuration
131136
this.ReadSchema(configuration.Schema, connection, transaction);
@@ -181,6 +186,7 @@ public async Task ProvisionAsync(SyncConfiguration configuration, SyncProvision
181186

182187
// call any interceptor
183188
await this.InterceptAsync(new DatabaseProvisionedArgs(null, provision, configuration.Schema, null, connection, transaction));
189+
await this.InterceptAsync(new TransactionCommitArgs(null, connection, transaction));
184190

185191
transaction.Commit();
186192
}
@@ -198,7 +204,7 @@ public async Task ProvisionAsync(SyncConfiguration configuration, SyncProvision
198204
if (connection != null && connection.State != ConnectionState.Closed)
199205
connection.Close();
200206

201-
await this.InterceptAsync(new ConnectionCloseArgs(null, connection, null));
207+
await this.InterceptAsync(new ConnectionCloseArgs(null, connection, transaction));
202208
}
203209
}
204210

@@ -209,6 +215,7 @@ public async Task ProvisionAsync(SyncConfiguration configuration, SyncProvision
209215
public virtual async Task<SyncContext> EnsureDatabaseAsync(SyncContext context, MessageEnsureDatabase message)
210216
{
211217
DbConnection connection = null;
218+
DbTransaction transaction = null;
212219
try
213220
{
214221
// Event progress
@@ -220,11 +227,12 @@ public virtual async Task<SyncContext> EnsureDatabaseAsync(SyncContext context,
220227
using (connection = this.CreateConnection())
221228
{
222229
await connection.OpenAsync();
230+
await this.InterceptAsync(new ConnectionOpenArgs(context, connection));
223231

224-
using (var transaction = connection.BeginTransaction())
232+
using (transaction = connection.BeginTransaction())
225233
{
226234
// Interceptors
227-
await this.InterceptAsync(new ConnectionOpenArgs(context, connection, transaction));
235+
await this.InterceptAsync(new TransactionOpenArgs(context, connection, transaction));
228236

229237
var beforeArgs = new DatabaseProvisioningArgs(context, SyncProvision.All, message.Schema, connection, transaction);
230238
await this.InterceptAsync(beforeArgs);
@@ -275,6 +283,7 @@ public virtual async Task<SyncContext> EnsureDatabaseAsync(SyncContext context,
275283
this.ReportProgress(context, args);
276284
await this.InterceptAsync(args);
277285

286+
await this.InterceptAsync(new TransactionCommitArgs(context, connection, transaction));
278287
transaction.Commit();
279288
}
280289

@@ -293,7 +302,7 @@ public virtual async Task<SyncContext> EnsureDatabaseAsync(SyncContext context,
293302
if (connection != null && connection.State != ConnectionState.Closed)
294303
connection.Close();
295304

296-
await this.InterceptAsync(new ConnectionCloseArgs(context, connection, null));
305+
await this.InterceptAsync(new ConnectionCloseArgs(context, connection, transaction));
297306
}
298307
}
299308

Projects/Dotmim.Sync.Core/CoreProvider.GetChanges.cs

+9-4
Original file line numberDiff line numberDiff line change
@@ -179,12 +179,13 @@ public abstract partial class CoreProvider
179179
{
180180
// Open the connection
181181
await connection.OpenAsync();
182+
await this.InterceptAsync(new ConnectionOpenArgs(context, connection));
182183

183184
using (var transaction = connection.BeginTransaction())
184185
{
185186
try
186187
{
187-
await this.InterceptAsync(new ConnectionOpenArgs(context, connection, transaction));
188+
await this.InterceptAsync(new TransactionOpenArgs(context, connection, transaction));
188189

189190
// changes that will be returned as selected changes
190191
var changes = new DatabaseChangesSelected();
@@ -336,6 +337,7 @@ public abstract partial class CoreProvider
336337
await this.InterceptAsync(args);
337338
}
338339

340+
await this.InterceptAsync(new TransactionCommitArgs(context, connection, transaction));
339341
transaction.Commit();
340342

341343
// generate the batchpartinfo
@@ -426,14 +428,16 @@ private static void SetSelectChangesCommonParameters(SyncContext context, ScopeI
426428

427429
using (var connection = this.CreateConnection())
428430
{
431+
DbTransaction transaction = null;
429432
try
430433
{
431434
// Open the connection
432435
await connection.OpenAsync();
436+
await this.InterceptAsync(new ConnectionOpenArgs(context, connection));
433437

434-
using (var transaction = connection.BeginTransaction())
438+
using (transaction = connection.BeginTransaction())
435439
{
436-
await this.InterceptAsync(new ConnectionOpenArgs(context, connection, transaction));
440+
await this.InterceptAsync(new TransactionOpenArgs(context, connection, transaction));
437441

438442
// create the in memory changes set
439443
var changesSet = new DmSet(configTables.DmSetName);
@@ -648,6 +652,7 @@ private static void SetSelectChangesCommonParameters(SyncContext context, ScopeI
648652

649653
}
650654

655+
await this.InterceptAsync(new TransactionCommitArgs(context, connection, transaction));
651656
transaction.Commit();
652657
}
653658

@@ -661,7 +666,7 @@ private static void SetSelectChangesCommonParameters(SyncContext context, ScopeI
661666
if (connection != null && connection.State == ConnectionState.Open)
662667
connection.Close();
663668

664-
await this.InterceptAsync(new ConnectionCloseArgs(context, connection, null));
669+
await this.InterceptAsync(new ConnectionCloseArgs(context, connection, transaction));
665670
}
666671

667672

0 commit comments

Comments
 (0)