Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 35 additions & 29 deletions src/Driver/Client/Websocket/WSClient.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Diagnostics;
using System.Net.WebSockets;
using TDengine.Driver.Impl.WebSocketMethods;

namespace TDengine.Driver.Client.Websocket
Expand All @@ -10,6 +9,8 @@ public class WSClient : ITDengineClient
private Connection _connection;
private readonly TimeZoneInfo _tz;
private readonly ConnectionStringBuilder _builder;
private readonly object _reconnectLock = new object();


public WSClient(ConnectionStringBuilder builder)
{
Expand Down Expand Up @@ -66,42 +67,47 @@ private void Reconnect()
{
if (!_builder.AutoReconnect)
return;

Connection connection = null;
for (int i = 0; i < _builder.ReconnectRetryCount; i++)
lock (_reconnectLock)
{
try
{
// sleep
System.Threading.Thread.Sleep(_builder.ReconnectIntervalMs);
connection = new Connection(GetUrl(_builder), _builder.Username, _builder.Password,
_builder.Database, _builder.ConnTimeout, _builder.ReadTimeout, _builder.WriteTimeout,
_builder.EnableCompression);
connection.Connect();
break;
}
catch (Exception)
if (_connection != null && _connection.IsAvailable()) // connection is available, no need to reconnect
return;

Connection connection = null;
for (int i = 0; i < _builder.ReconnectRetryCount; i++)
{
if (connection != null)
try
{
connection.Close();
connection = null;
// sleep
System.Threading.Thread.Sleep(_builder.ReconnectIntervalMs);
connection = new Connection(GetUrl(_builder), _builder.Username, _builder.Password,
_builder.Database, _builder.ConnTimeout, _builder.ReadTimeout, _builder.WriteTimeout,
_builder.EnableCompression);
connection.Connect();
break;
}
catch (Exception)
{
if (connection != null)
{
connection.Close();
connection = null;
}
}
}
}

if (connection == null)
{
throw new TDengineError((int)TDengineError.InternalErrorCode.WS_RECONNECT_FAILED,
"websocket connection reconnect failed");
}
if (connection == null)
{
throw new TDengineError((int)TDengineError.InternalErrorCode.WS_RECONNECT_FAILED,
"websocket connection reconnect failed");
}

if (_connection != null)
{
_connection.Close();
}
if (_connection != null)
{
_connection.Close();
}

_connection = connection;
_connection = connection;
}
}

public IStmt StmtInit()
Expand Down
12 changes: 7 additions & 5 deletions src/Driver/Client/Websocket/WSRows.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,18 @@ private List<TDengineMeta> ParseMetas(WSStmtUseResultResp result)

public void Dispose()
{
if (_freed)
{
return;
}
if (_freed) return;

_freed = true;
if (_connection != null && _connection.IsAvailable())
if (_connection == null || !_connection.IsAvailable()) return;
try
{
_connection.FreeResult(_resultId);
}
catch (Exception)
{
// ignored
}
}

public long GetBytes(int ordinal, long dataOffset, byte[] buffer, int bufferOffset, int length)
Expand Down
28 changes: 17 additions & 11 deletions src/Driver/Client/Websocket/WSStmt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ namespace TDengine.Driver.Client.Websocket
{
public class WSStmt : IStmt
{
private ulong _stmt;
private readonly ulong _stmt;
private readonly TimeZoneInfo _tz;
private Connection _connection;
private bool closed;
private long lastAffected;
private readonly Connection _connection;
private bool _closed;
private long _lastAffected;
private bool _isInsert;

public WSStmt(ulong stmt, TimeZoneInfo tz, Connection connection)
Expand All @@ -22,13 +22,18 @@ public WSStmt(ulong stmt, TimeZoneInfo tz, Connection connection)

public void Dispose()
{
if (closed)
if (_closed) return;

_closed = true;
if (_connection == null || !_connection.IsAvailable()) return;
try
{
return;
_connection.StmtClose(_stmt);
}
catch (Exception)
{
// ignored
}

_connection.StmtClose(_stmt);
closed = true;
}

public void Prepare(string query)
Expand Down Expand Up @@ -219,12 +224,12 @@ public void AddBatch()
public void Exec()
{
var resp = _connection.StmtExec(_stmt);
lastAffected = resp.Affected;
_lastAffected = resp.Affected;
}

public long Affected()
{
return lastAffected;
return _lastAffected;
}

public IRows Result()
Expand All @@ -233,6 +238,7 @@ public IRows Result()
{
return new WSRows((int)Affected());
}

var resp = _connection.StmtUseResult(_stmt);
return new WSRows(resp, _connection, _tz);
}
Expand Down
Loading
Loading