@@ -2856,18 +2856,17 @@ private Task CopyBatchesAsyncContinued(BulkCopySimpleResultSet internalResults,
28562856 task ,
28572857 source ,
28582858 state : this ,
2859- onSuccess : ( object state ) =>
2859+ onSuccess : state =>
28602860 {
2861- SqlBulkCopy sqlBulkCopy = ( SqlBulkCopy ) state ;
2862- Task continuedTask = sqlBulkCopy . CopyBatchesAsyncContinuedOnSuccess ( internalResults , updateBulkCommandText , cts , source ) ;
2861+ Task continuedTask = state . CopyBatchesAsyncContinuedOnSuccess ( internalResults , updateBulkCommandText , cts , source ) ;
28632862 if ( continuedTask == null )
28642863 {
28652864 // Continuation finished sync, recall into CopyBatchesAsync to continue
2866- sqlBulkCopy . CopyBatchesAsync ( internalResults , updateBulkCommandText , cts , source ) ;
2865+ state . CopyBatchesAsync ( internalResults , updateBulkCommandText , cts , source ) ;
28672866 }
28682867 } ,
2869- onFailure : static ( Exception _ , object state ) => ( ( SqlBulkCopy ) state ) . CopyBatchesAsyncContinuedOnError ( cleanupParser : false ) ,
2870- onCancellation : static ( object state ) => ( ( SqlBulkCopy ) state ) . CopyBatchesAsyncContinuedOnError ( cleanupParser : true ) ) ;
2868+ onFailure : static ( state , _ ) => state . CopyBatchesAsyncContinuedOnError ( cleanupParser : false ) ,
2869+ onCancellation : static state => state . CopyBatchesAsyncContinuedOnError ( cleanupParser : true ) ) ;
28712870
28722871 return source . Task ;
28732872 }
@@ -2918,24 +2917,23 @@ private Task CopyBatchesAsyncContinuedOnSuccess(BulkCopySimpleResultSet internal
29182917 writeTask ,
29192918 source ,
29202919 state : this ,
2921- onSuccess : ( object state ) =>
2920+ onSuccess : state =>
29222921 {
2923- SqlBulkCopy sqlBulkCopy = ( SqlBulkCopy ) state ;
29242922 try
29252923 {
2926- sqlBulkCopy . RunParser ( ) ;
2927- sqlBulkCopy . CommitTransaction ( ) ;
2924+ state . RunParser ( ) ;
2925+ state . CommitTransaction ( ) ;
29282926 }
29292927 catch ( Exception )
29302928 {
2931- sqlBulkCopy . CopyBatchesAsyncContinuedOnError ( cleanupParser : false ) ;
2929+ state . CopyBatchesAsyncContinuedOnError ( cleanupParser : false ) ;
29322930 throw ;
29332931 }
29342932
29352933 // Always call back into CopyBatchesAsync
2936- sqlBulkCopy . CopyBatchesAsync ( internalResults , updateBulkCommandText , cts , source ) ;
2934+ state . CopyBatchesAsync ( internalResults , updateBulkCommandText , cts , source ) ;
29372935 } ,
2938- onFailure : static ( Exception _ , object state ) => ( ( SqlBulkCopy ) state ) . CopyBatchesAsyncContinuedOnError ( cleanupParser : false ) ) ;
2936+ onFailure : static ( state , _ ) => state . CopyBatchesAsyncContinuedOnError ( cleanupParser : false ) ) ;
29392937 return source . Task ;
29402938 }
29412939 }
@@ -3190,21 +3188,20 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio
31903188
31913189 // No need to cancel timer since SqlBulkCopy creates specific task source for reconnection.
31923190 AsyncHelper . SetTimeoutExceptionWithState (
3193- completion : cancellableReconnectTS ,
3194- timeout : BulkCopyTimeout ,
3191+ taskCompletionSource : cancellableReconnectTS ,
3192+ timeoutInSeconds : BulkCopyTimeout ,
31953193 state : _destinationTableName ,
3196- onFailure : static state =>
3197- SQL . BulkLoadInvalidDestinationTable ( ( string ) state , SQL . CR_ReconnectTimeout ( ) ) ,
3194+ onTimeout : static state => SQL . BulkLoadInvalidDestinationTable ( state , SQL . CR_ReconnectTimeout ( ) ) ,
31983195 cancellationToken : CancellationToken . None
31993196 ) ;
32003197
32013198 AsyncHelper . ContinueTaskWithState (
3202- task : cancellableReconnectTS . Task ,
3203- completion : source ,
3199+ taskToContinue : cancellableReconnectTS . Task ,
3200+ taskCompletionSource : source ,
32043201 state : regReconnectCancel ,
3205- onSuccess : ( object state ) =>
3202+ onSuccess : state =>
32063203 {
3207- ( ( StrongBox < CancellationTokenRegistration > ) state ) . Value . Dispose ( ) ;
3204+ state . Value . Dispose ( ) ;
32083205 if ( _parserLock != null )
32093206 {
32103207 _parserLock . Release ( ) ;
@@ -3214,10 +3211,22 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio
32143211 _parserLock . Wait ( canReleaseFromAnyThread : true ) ;
32153212 WriteToServerInternalRestAsync ( cts , source ) ;
32163213 } ,
3217- onFailure : static ( _ , state ) => ( ( StrongBox < CancellationTokenRegistration > ) state ) . Value . Dispose ( ) ,
3218- onCancellation : static state => ( ( StrongBox < CancellationTokenRegistration > ) state ) . Value . Dispose ( ) ,
3219- exceptionConverter : ex => SQL . BulkLoadInvalidDestinationTable ( _destinationTableName , ex )
3220- ) ;
3214+ onFailure : ( regReconnectCancel2 , exception ) =>
3215+ {
3216+ regReconnectCancel2 . Value . Dispose ( ) ;
3217+
3218+ // Convert exception and set it on the source
3219+ // Note: This is safe because the helper will only try to set the
3220+ // exception and b/c it is already set will pass without setting
3221+ // to the original exception.
3222+ Exception convertedException = SQL . BulkLoadInvalidDestinationTable (
3223+ _destinationTableName ,
3224+ exception ) ;
3225+ source . TrySetException ( convertedException ) ;
3226+ } ,
3227+ onCancellation : static regReconnectCancel2 =>
3228+ regReconnectCancel2 . Value . Dispose ( ) ) ;
3229+
32213230 return ;
32223231 }
32233232 else
0 commit comments