Skip to content
This repository was archived by the owner on Apr 9, 2022. It is now read-only.

Commit 1411c98

Browse files
authored
Merge pull request #38 from ievsiukovi/master
Race condition when complete connection async send
2 parents def0041 + 9e07853 commit 1411c98

File tree

1 file changed

+18
-7
lines changed

1 file changed

+18
-7
lines changed

kafka-sharp/kafka-sharp/Network/Connection.cs

+18-7
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,20 @@ private void CleanSend()
368368
_sendContext.Promise = null;
369369
}
370370

371+
private void CompleteSend()
372+
{
373+
var promise = _sendContext.Promise;
374+
CleanSend();
375+
promise.SetResult(SuccessResult);
376+
}
377+
378+
private void FaultSend(Exception ex)
379+
{
380+
var promise = _sendContext.Promise;
381+
CleanSend();
382+
promise.SetException(ex);
383+
}
384+
371385
// Async send loop body
372386
private static void OnSendCompleted(ISocket sender, ISocketAsyncEventArgs saea)
373387
{
@@ -381,9 +395,8 @@ private static void OnSendCompleted(ISocket sender, ISocketAsyncEventArgs saea)
381395

382396
if (saea.SocketError != SocketError.Success)
383397
{
384-
connection._sendContext.Promise.SetException(new TransportException(TransportError.WriteError,
385-
new SocketException((int) saea.SocketError)));
386-
connection.CleanSend();
398+
connection.FaultSend(new TransportException(TransportError.WriteError,
399+
new SocketException((int)saea.SocketError)));
387400
return;
388401
}
389402

@@ -402,8 +415,7 @@ private static void OnSendCompleted(ISocket sender, ISocketAsyncEventArgs saea)
402415
}
403416
else
404417
{
405-
connection._sendContext.Promise.SetResult(SuccessResult);
406-
connection.CleanSend();
418+
connection.CompleteSend();
407419
}
408420
}
409421
}
@@ -430,8 +442,7 @@ private void LoopSend(int from, int count)
430442
}
431443
catch (Exception ex)
432444
{
433-
CleanSend();
434-
_sendContext.Promise.SetException(new TransportException(TransportError.WriteError, ex));
445+
FaultSend(new TransportException(TransportError.WriteError, ex));
435446
}
436447
}
437448

0 commit comments

Comments
 (0)