diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 3c79e1e..b6dabe8 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -238,8 +238,11 @@ jobs: - name: test id: test + env: + TDENGINE_CLOUD_ENDPOINT: ${{ secrets.TDENGINE_CLOUD_ENDPOINT }} + TDENGINE_CLOUD_TOKEN: ${{ secrets.TDENGINE_CLOUD_TOKEN }} run: | - sudo dotnet test --logger "console;verbosity=detailed" --collect:"XPlat Code Coverage" --results-directory:./testresults + dotnet test --logger "console;verbosity=detailed" --collect:"XPlat Code Coverage" --results-directory:./testresults timeout-minutes: 15 - uses: actions/upload-artifact@v4 diff --git a/test/Driver.Test/Client/Query/Client.cs b/test/Driver.Test/Client/Query/Client.cs index 678181b..634e000 100644 --- a/test/Driver.Test/Client/Query/Client.cs +++ b/test/Driver.Test/Client/Query/Client.cs @@ -10,34 +10,28 @@ namespace Driver.Test.Client.Query public partial class Client { private readonly ITestOutputHelper _output; - private readonly string _createTableSql; private readonly string _nativeConnectString; private readonly string _wsConnectString; + private readonly string? _cloudConnectString; public Client(ITestOutputHelper output) { this._output = output; - this._createTableSql = "create table if not exists all_type(ts timestamp," + - "c1 bool," + - "c2 tinyint," + - "c3 smallint," + - "c4 int," + - "c5 bigint," + - "c6 tinyint unsigned," + - "c7 smallint unsigned," + - "c8 int unsigned," + - "c9 bigint unsigned," + - "c10 float," + - "c11 double," + - "c12 binary(20)," + - "c13 nchar(20)," + - "c14 varbinary(20)," + - "c15 geometry(100)" + - ")" + - "tags(t json)"; this._nativeConnectString = "host=localhost;port=6030;username=root;password=taosdata"; this._wsConnectString = "protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata;enableCompression=true"; + var cloudHost = Environment.GetEnvironmentVariable("TDENGINE_CLOUD_ENDPOINT"); + var cloudToken = Environment.GetEnvironmentVariable("TDENGINE_CLOUD_TOKEN"); + if (!string.IsNullOrEmpty(cloudHost) && !string.IsNullOrEmpty(cloudToken)) + { + this._cloudConnectString = GetCloudConnectString(cloudHost, cloudToken); + } + } + + private static string GetCloudConnectString(string host, string token) + { + return + $"protocol=WebSocket;host={host};port=443;useSSL=true;token={token};enableCompression=true"; } private object?[][] GenerateValue(TDenginePrecision precision, out string sql) @@ -102,6 +96,29 @@ public Client(ITestOutputHelper output) }; } + private static string GenerateCreateTableSql(string tableName) + { + var createTableSql = $"create table if not exists {tableName} (ts timestamp," + + "c1 bool," + + "c2 tinyint," + + "c3 smallint," + + "c4 int," + + "c5 bigint," + + "c6 tinyint unsigned," + + "c7 smallint unsigned," + + "c8 int unsigned," + + "c9 bigint unsigned," + + "c10 float," + + "c11 double," + + "c12 binary(20)," + + "c13 nchar(20)," + + "c14 varbinary(20)," + + "c15 geometry(100)" + + ")" + + "tags(t json)"; + return createTableSql; + } + private static Array[] TransposeToTypedArrays(object?[][] data) { var aTs = new DateTime[] { (DateTime)data[0][0]!, (DateTime)data[1][0]! }; @@ -139,23 +156,36 @@ private string PrecisionString(TDenginePrecision precision) return "ms"; } + private static bool IsCloudTest(ConnectionStringBuilder builder) + { + return !string.IsNullOrEmpty(builder.Token); + } + private void QueryTest(string connectString, string db, TDenginePrecision precision) { var data = this.GenerateValue(precision, out var insertSql); - var builder = new ConnectionStringBuilder(connectString); + var inCloud = IsCloudTest(builder); using (var client = DbDriver.Open(builder)) { + var now = DateTime.Now; + var superTableName = $"all_type_stb_{now.Ticks}"; + var subTableName = $"all_type_ctb_{now.Ticks}"; try { - client.Exec($"drop database if exists {db}"); - client.Exec($"create database {db} precision '{PrecisionString(precision)}'"); + if (!inCloud) + { + client.Exec($"drop database if exists {db}"); + client.Exec($"create database {db} precision '{PrecisionString(precision)}'"); + } + client.Exec($"use {db}"); - client.Exec(this._createTableSql); - string insertQuery = string.Format("insert into t1 using all_type tags('{{\"a\":\"b\"}}') {0}", - insertSql); + var createTableSql = GenerateCreateTableSql(superTableName); + client.Exec(createTableSql); + string insertQuery = + $"insert into {subTableName} using {superTableName} tags('{{\"a\":\"b\"}}') {insertSql}"; client.Exec(insertQuery); - string query = "select * from all_type order by ts asc"; + string query = $"select * from {superTableName} order by ts asc"; using (var rows = client.Query(query)) { this.AssertColumn(rows); @@ -169,7 +199,11 @@ private void QueryTest(string connectString, string db, TDenginePrecision precis } finally { - client.Exec($"drop database if exists {db}"); + client.Exec($"drop table if exists {superTableName}"); + if (!inCloud) + { + client.Exec($"drop database if exists {db}"); + } } } } @@ -177,20 +211,28 @@ private void QueryTest(string connectString, string db, TDenginePrecision precis private void QueryWithReqIDTest(string connectString, string db, TDenginePrecision precision) { var data = this.GenerateValue(precision, out var insertSql); - var builder = new ConnectionStringBuilder(connectString); + var inCloud = IsCloudTest(builder); using (var client = DbDriver.Open(builder)) { + var now = DateTime.Now; + var superTableName = $"all_type_stb_{now.Ticks}"; + var subTableName = $"all_type_ctb_{now.Ticks}"; try { - client.Exec($"drop database if exists {db}", ReqId.GetReqId()); - client.Exec($"create database {db} precision '{PrecisionString(precision)}'", ReqId.GetReqId()); + if (!inCloud) + { + client.Exec($"drop database if exists {db}", ReqId.GetReqId()); + client.Exec($"create database {db} precision '{PrecisionString(precision)}'", ReqId.GetReqId()); + } + client.Exec($"use {db}", ReqId.GetReqId()); - client.Exec(this._createTableSql, ReqId.GetReqId()); - string insertQuery = string.Format("insert into t1 using all_type tags('{{\"a\":\"b\"}}') {0}", - insertSql); + string createTableSql = GenerateCreateTableSql(superTableName); + client.Exec(createTableSql, ReqId.GetReqId()); + string insertQuery = + $"insert into {subTableName} using {superTableName} tags('{{\"a\":\"b\"}}') {insertSql}"; client.Exec(insertQuery, ReqId.GetReqId()); - string query = "select * from all_type order by ts asc"; + string query = $"select * from {superTableName} order by ts asc"; using (var rows = client.Query(query, ReqId.GetReqId())) { this.AssertColumn(rows); @@ -204,7 +246,11 @@ private void QueryWithReqIDTest(string connectString, string db, TDenginePrecisi } finally { - client.Exec($"drop database if exists {db}"); + client.Exec($"drop table if exists {superTableName}", ReqId.GetReqId()); + if (!inCloud) + { + client.Exec($"drop database if exists {db}", ReqId.GetReqId()); + } } } } @@ -213,16 +259,24 @@ private void QueryWithReqIDTest(string connectString, string db, TDenginePrecisi private void StmtTest(string connectString, string db, TDenginePrecision precision) { var data = this.GenerateValue(precision, out _); - var builder = new ConnectionStringBuilder(connectString); + var inCloud = IsCloudTest(builder); using (var client = DbDriver.Open(builder)) { + var now = DateTime.Now; + var superTableName = $"all_type_stb_{now.Ticks}"; + var subTableName = $"all_type_ctb_{now.Ticks}"; try { - client.Exec($"drop database if exists {db}"); - client.Exec($"create database {db} precision '{PrecisionString(precision)}'"); + if (!inCloud) + { + client.Exec($"drop database if exists {db}"); + client.Exec($"create database {db} precision '{PrecisionString(precision)}'"); + } + client.Exec($"use {db}"); - client.Exec(this._createTableSql); + var createTableSql = GenerateCreateTableSql(superTableName); + client.Exec(createTableSql); var stmt = client.StmtInit(); StringBuilder questionMarks = new StringBuilder(); var count = data[0].Length; @@ -236,10 +290,10 @@ private void StmtTest(string connectString, string db, TDenginePrecision precisi } var values = questionMarks.ToString(); - stmt.Prepare($"insert into ? using all_type tags(?) values({values})"); + stmt.Prepare($"insert into ? using {superTableName} tags(?) values({values})"); var isInsert = stmt.IsInsert(); Assert.True(isInsert); - stmt.SetTableName("t1"); + stmt.SetTableName(subTableName); stmt.SetTags(new object[] { "{\"a\":\"b\"}" }); stmt.BindRow(data[0]); stmt.BindRow(data[1]); @@ -247,7 +301,7 @@ private void StmtTest(string connectString, string db, TDenginePrecision precisi stmt.Exec(); var affected = stmt.Affected(); Assert.Equal((long)2, affected); - stmt.Prepare("select * from all_type where ts >= ? order by ts asc"); + stmt.Prepare($"select * from {superTableName} where ts >= ? order by ts asc"); isInsert = stmt.IsInsert(); Assert.False(isInsert); stmt.BindRow(new object[] { data[0][0]! }); @@ -266,7 +320,11 @@ private void StmtTest(string connectString, string db, TDenginePrecision precisi } finally { - client.Exec($"drop database if exists {db}"); + client.Exec($"drop table if exists {superTableName}"); + if (!inCloud) + { + client.Exec($"drop database if exists {db}"); + } } } } @@ -275,16 +333,24 @@ private void StmtTest(string connectString, string db, TDenginePrecision precisi private void StmtWithReqIDTest(string connectString, string db, TDenginePrecision precision) { var data = this.GenerateValue(precision, out _); - var builder = new ConnectionStringBuilder(connectString); + var inCloud = IsCloudTest(builder); using (var client = DbDriver.Open(builder)) { + var now = DateTime.Now; + var superTableName = $"all_type_stb_{now.Ticks}"; + var subTableName = $"all_type_ctb_{now.Ticks}"; try { - client.Exec($"drop database if exists {db}", ReqId.GetReqId()); - client.Exec($"create database {db} precision '{PrecisionString(precision)}'", ReqId.GetReqId()); + if (!inCloud) + { + client.Exec($"drop database if exists {db}", ReqId.GetReqId()); + client.Exec($"create database {db} precision '{PrecisionString(precision)}'", ReqId.GetReqId()); + } + client.Exec($"use {db}", ReqId.GetReqId()); - client.Exec(this._createTableSql, ReqId.GetReqId()); + var createTableSql = GenerateCreateTableSql(superTableName); + client.Exec(createTableSql, ReqId.GetReqId()); var stmt = client.StmtInit(ReqId.GetReqId()); StringBuilder questionMarks = new StringBuilder(); var count = data[0].Length; @@ -298,10 +364,10 @@ private void StmtWithReqIDTest(string connectString, string db, TDenginePrecisio } var values = questionMarks.ToString(); - stmt.Prepare($"insert into ? using all_type tags(?) values({values})"); + stmt.Prepare($"insert into ? using {superTableName} tags(?) values({values})"); var isInsert = stmt.IsInsert(); Assert.True(isInsert); - stmt.SetTableName("t1"); + stmt.SetTableName(subTableName); stmt.SetTags(new object[] { "{\"a\":\"b\"}" }); stmt.BindRow(data[0]); stmt.BindRow(data[1]); @@ -309,7 +375,7 @@ private void StmtWithReqIDTest(string connectString, string db, TDenginePrecisio stmt.Exec(); var affected = stmt.Affected(); Assert.Equal((long)2, affected); - stmt.Prepare("select * from all_type where ts >= ? order by ts asc"); + stmt.Prepare($"select * from {superTableName} where ts >= ? order by ts asc"); isInsert = stmt.IsInsert(); Assert.False(isInsert); stmt.BindRow(new object[] { data[0][0]! }); @@ -328,7 +394,11 @@ private void StmtWithReqIDTest(string connectString, string db, TDenginePrecisio } finally { - client.Exec($"drop database if exists {db}"); + client.Exec($"drop table if exists {superTableName}", ReqId.GetReqId()); + if (!inCloud) + { + client.Exec($"drop database if exists {db}"); + } } } } @@ -341,19 +411,29 @@ private void StmtBindColumnsTest(string connectString, string db, TDenginePrecis var builder = new ConnectionStringBuilder(connectString); + var inCloud = IsCloudTest(builder); using (var client = DbDriver.Open(builder)) { + var now = DateTime.Now; + var superTableName = $"all_type_stb_{now.Ticks}"; + var subTableName = $"all_type_ctb_{now.Ticks}"; try { - client.Exec($"drop database if exists {db}", ReqId.GetReqId()); - client.Exec($"create database {db} precision '{PrecisionString(precision)}'", ReqId.GetReqId()); + if (!inCloud) + { + client.Exec($"drop database if exists {db}", ReqId.GetReqId()); + client.Exec($"create database {db} precision '{PrecisionString(precision)}'", ReqId.GetReqId()); + } + client.Exec($"use {db}"); - client.Exec(this._createTableSql); + var createTableSql = GenerateCreateTableSql(superTableName); + client.Exec(createTableSql); var stmt = client.StmtInit(ReqId.GetReqId()); - stmt.Prepare("insert into ? using all_type tags(?) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"); + stmt.Prepare( + $"insert into ? using {superTableName} tags(?) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"); var isInsert = stmt.IsInsert(); Assert.True(isInsert); - stmt.SetTableName("t1"); + stmt.SetTableName(subTableName); stmt.SetTags(new object[] { "{\"a\":\"b\"}" }); var fields = stmt.GetColFields(); stmt.BindColumn(fields, transposedData); @@ -361,7 +441,7 @@ private void StmtBindColumnsTest(string connectString, string db, TDenginePrecis stmt.Exec(); var affected = stmt.Affected(); Assert.Equal((long)2, affected); - stmt.Prepare("select * from all_type where ts >= ? order by ts asc"); + stmt.Prepare($"select * from {superTableName} where ts >= ? order by ts asc"); isInsert = stmt.IsInsert(); Assert.False(isInsert); stmt.BindRow(new object[] { data[0][0]! }); @@ -380,7 +460,11 @@ private void StmtBindColumnsTest(string connectString, string db, TDenginePrecis } finally { - client.Exec($"drop database if exists {db}"); + client.Exec($"drop table if exists {superTableName}"); + if (!inCloud) + { + client.Exec($"drop database if exists {db}", ReqId.GetReqId()); + } } } } @@ -389,20 +473,26 @@ private void StmtBindColumnsTest(string connectString, string db, TDenginePrecis private void VarbinaryTest(string connectString, string db) { DateTime dateTime = DateTime.Now; - var ts = (dateTime.ToUniversalTime().Ticks - TDengineConstant.TimeZero.Ticks) * 100; - var now = TDengineConstant.ConvertTimeToDatetime(ts, TDenginePrecision.TSDB_TIME_PRECISION_NANO); + var ts = (dateTime.ToUniversalTime().Ticks - TDengineConstant.TimeZero.Ticks) / 10000; + var now = TDengineConstant.ConvertTimeToDatetime(ts, TDenginePrecision.TSDB_TIME_PRECISION_MILLI); var builder = new ConnectionStringBuilder(connectString); + var inCloud = IsCloudTest(builder); using (var client = DbDriver.Open(builder)) { + var tableName = $"test_varbinary_{dateTime.Ticks}"; try { - client.Exec($"drop database if exists {db}", ReqId.GetReqId()); - client.Exec($"create database {db} precision 'ns'"); + if (!inCloud) + { + client.Exec($"drop database if exists {db}", ReqId.GetReqId()); + client.Exec($"create database {db} precision 'ms'"); + } + client.Exec($"use {db}"); - client.Exec("create table if not exists test_varbinary(ts timestamp,c1 varbinary(65517))"); + client.Exec($"create table if not exists {tableName}(ts timestamp,c1 varbinary(65517))"); var stmt = client.StmtInit(ReqId.GetReqId()); - stmt.Prepare("insert into test_varbinary values(?,?)"); + stmt.Prepare($"insert into {tableName} values(?,?)"); var isInsert = stmt.IsInsert(); Assert.True(isInsert); var fields = stmt.GetColFields(); @@ -418,7 +508,7 @@ private void VarbinaryTest(string connectString, string db) stmt.Exec(); var affected = stmt.Affected(); Assert.Equal((long)1, affected); - stmt.Prepare("select * from test_varbinary where c1 = ?"); + stmt.Prepare($"select * from {tableName} where c1 = ?"); stmt.BindRow(new object[] { data }); stmt.AddBatch(); stmt.Exec(); @@ -437,7 +527,11 @@ private void VarbinaryTest(string connectString, string db) } finally { - client.Exec($"drop database if exists {db}"); + client.Exec($"drop table if exists {tableName}"); + if (!inCloud) + { + client.Exec($"drop database if exists {db}"); + } } } } @@ -447,12 +541,17 @@ private void InfluxDBTest(string connectString, string db) { var builder = new ConnectionStringBuilder(connectString); + var inCloud = IsCloudTest(builder); using (var client = DbDriver.Open(builder)) { try { - client.Exec($"drop database if exists {db}"); - client.Exec($"create database {db} precision 'ns'"); + if (!inCloud) + { + client.Exec($"drop database if exists {db}"); + client.Exec($"create database {db} precision 'ns'"); + } + client.Exec($"use {db}"); var data = @"http_response,host=host161,method=GET,result=success,server=http://localhost,status_code=404 response_time=0.003226372,http_response_code=404i,content_length=19i,result_type=""success"",result_code=0i 1648090640000000000 @@ -555,7 +654,10 @@ private void InfluxDBTest(string connectString, string db) } finally { - client.Exec($"drop database if exists {db}"); + if (!inCloud) + { + client.Exec($"drop database if exists {db}"); + } } } } @@ -564,12 +666,17 @@ private void TelnetTest(string connectString, string db) { var builder = new ConnectionStringBuilder(connectString); + var inCloud = IsCloudTest(builder); using (var client = DbDriver.Open(builder)) { try { - client.Exec($"drop database if exists {db}"); - client.Exec($"create database {db} precision 'ns'"); + if (!inCloud) + { + client.Exec($"drop database if exists {db}"); + client.Exec($"create database {db} precision 'ns'"); + } + client.Exec($"use {db}"); var data = new string[] { @@ -586,7 +693,10 @@ private void TelnetTest(string connectString, string db) } finally { - client.Exec($"drop database if exists {db}"); + if (!inCloud) + { + client.Exec($"drop database if exists {db}"); + } } } } @@ -595,12 +705,17 @@ private void SMLJsonTest(string connectString, string db) { var builder = new ConnectionStringBuilder(connectString); + var inCloud = IsCloudTest(builder); using (var client = DbDriver.Open(builder)) { try { - client.Exec($"drop database if exists {db}"); - client.Exec($"create database {db} precision 'ns'"); + if (!inCloud) + { + client.Exec($"drop database if exists {db}"); + client.Exec($"create database {db} precision 'ns'"); + } + client.Exec($"use {db}"); var data = new string[] { @@ -624,7 +739,10 @@ private void SMLJsonTest(string connectString, string db) } finally { - client.Exec($"drop database if exists {db}"); + if (!inCloud) + { + client.Exec($"drop database if exists {db}"); + } } } } @@ -674,14 +792,19 @@ private void QueryConcurrencyTest(string connectString, string db) { var precision = TDenginePrecision.TSDB_TIME_PRECISION_MILLI; var builder = new ConnectionStringBuilder(connectString); + var inCloud = IsCloudTest(builder); var client = DbDriver.Open(builder); var count = 30; try { - client.Exec($"drop database if exists {db}"); - client.Exec($"create database {db} precision '{PrecisionString(precision)}'"); + if (!inCloud) + { + client.Exec($"drop database if exists {db}"); + client.Exec($"create database {db} precision '{PrecisionString(precision)}'"); + } + client.Exec($"use {db}"); - client.Exec("create table t1 (ts timestamp, a int, b float, c binary(10))"); + client.Exec("create table if not exists t1 (ts timestamp, a int, b float, c binary(10))"); var ts = new long[count]; var dateTime = DateTime.Now; var tsv = new DateTime[count]; @@ -734,7 +857,12 @@ private void QueryConcurrencyTest(string connectString, string db) } finally { - client.Exec($"drop database if exists {db}"); + client.Exec($"drop table if exists t1"); + if (!inCloud) + { + client.Exec($"drop database if exists {db}"); + } + client.Dispose(); } } diff --git a/test/Driver.Test/Client/Query/Cloud.cs b/test/Driver.Test/Client/Query/Cloud.cs new file mode 100644 index 0000000..f118063 --- /dev/null +++ b/test/Driver.Test/Client/Query/Cloud.cs @@ -0,0 +1,91 @@ +using System; +using System.Runtime.CompilerServices; +using System.Text; +using TDengine.Driver; +using TDengine.Driver.Client; +using Xunit; +using Xunit.Sdk; + +namespace Driver.Test.Client.Query +{ + public partial class Client + { + private void RunCloudTest(Action testAction, string testName) + { + var db = "cs_test"; + if (string.IsNullOrEmpty(this._cloudConnectString)) + { + _output.WriteLine($"Cloud connection string is not set. Skipping {testName}."); + return; + } + + testAction(this._cloudConnectString, db); + } + + [Fact] + public void CloudQueryTest() + { + RunCloudTest((conn, db) => this.QueryTest(conn, db, TDenginePrecision.TSDB_TIME_PRECISION_MILLI), + nameof(CloudQueryTest)); + } + + [Fact] + public void CloudQueryWithReqIDMSTest() + { + RunCloudTest((conn, db) => this.QueryWithReqIDTest(conn, db, TDenginePrecision.TSDB_TIME_PRECISION_MILLI), + nameof(CloudQueryWithReqIDMSTest)); + } + + [Fact] + public void CloudStmtMSTest() + { + RunCloudTest((conn, db) => this.StmtTest(conn, db, TDenginePrecision.TSDB_TIME_PRECISION_MILLI), + nameof(CloudStmtMSTest)); + } + + + [Fact] + public void CloudStmtWithReqIDMSTest() + { + RunCloudTest((conn, db) => this.StmtWithReqIDTest(conn, db, TDenginePrecision.TSDB_TIME_PRECISION_MILLI), + nameof(CloudStmtWithReqIDMSTest)); + } + + [Fact] + public void CloudStmtColumnsMSTest() + { + RunCloudTest((conn, db) => this.StmtBindColumnsTest(conn, db, TDenginePrecision.TSDB_TIME_PRECISION_MILLI), + nameof(CloudStmtColumnsMSTest)); + } + + [Fact] + public void CloudVarbinaryTest() + { + RunCloudTest(this.VarbinaryTest, nameof(CloudVarbinaryTest)); + } + + [Fact] + public void CloudInfluxDBTest() + { + RunCloudTest(this.InfluxDBTest, nameof(CloudInfluxDBTest)); + } + + [Fact] + public void CloudTelnetTest() + { + RunCloudTest(this.TelnetTest, nameof(CloudTelnetTest)); + } + + [Fact] + public void CloudSMLJsonTest() + { + RunCloudTest(this.SMLJsonTest, nameof(CloudSMLJsonTest)); + } + + [Fact] + public void CloudConcurrencyTest() + { + RunCloudTest(this.QueryConcurrencyTest, nameof(CloudConcurrencyTest)); + } + } +} \ No newline at end of file diff --git a/test/Driver.Test/Client/TMQ/Client.cs b/test/Driver.Test/Client/TMQ/Client.cs index afd8dbe..5adb64e 100644 --- a/test/Driver.Test/Client/TMQ/Client.cs +++ b/test/Driver.Test/Client/TMQ/Client.cs @@ -16,10 +16,12 @@ public partial class Consumer private readonly string _nativeConnectString; private readonly string _wsConnectString; private readonly string _createTableSql; + private readonly string? _cloudConnectString; private readonly Dictionary _nativeTMQCfg; private readonly Dictionary _nativeTMQCfgAutoCommit; private readonly Dictionary _wsTMQCfg; private readonly Dictionary _wsTMQCfgAutoCommit; + private readonly Dictionary? _cloudTMQCfg; public Consumer(ITestOutputHelper output) @@ -28,8 +30,34 @@ public Consumer(ITestOutputHelper output) this._nativeConnectString = "host=127.0.0.1;port=6030;username=root;password=taosdata"; this._wsConnectString = "protocol=WebSocket;host=127.0.0.1;port=6041;useSSL=false;username=root;password=taosdata;enableCompression=true"; + var cloudHost = Environment.GetEnvironmentVariable("TDENGINE_CLOUD_ENDPOINT"); + var cloudToken = Environment.GetEnvironmentVariable("TDENGINE_CLOUD_TOKEN"); + if (!string.IsNullOrEmpty(cloudHost) && !string.IsNullOrEmpty(cloudToken)) + { + this._cloudConnectString = GetCloudConnectString(cloudHost, cloudToken); + var now = DateTime.Now; + var clientId = $"cs_test_{now.Ticks}"; + var goupId = $"cs_test_group_{now.Ticks}"; + this._cloudTMQCfg = new Dictionary() + { + { "td.connect.type", "WebSocket" }, + { "group.id", goupId }, + { "auto.offset.reset", "latest" }, + { "td.connect.ip", cloudHost }, + { "token", cloudToken }, + { "td.connect.port", "443" }, + { "client.id", clientId }, + { "enable.auto.commit", "false" }, + { "msg.with.table.name", "true" }, + { "useSSL", "true" }, + { "ws.message.enableCompression", "true" }, + { "session.timeout.ms", "12000" }, + { "max.poll.interval.ms", "300000" }, + { "min.poll.rows", "20" }, + }; + } - this._createTableSql = "create table if not exists all_type(ts timestamp," + + this._createTableSql = "create table if not exists tmq_all_type(ts timestamp," + "c1 bool," + "c2 tinyint," + "c3 smallint," + @@ -119,12 +147,24 @@ public Consumer(ITestOutputHelper output) }; } + private static string GetCloudConnectString(string host, string token) + { + return + $"protocol=WebSocket;host={host};port=443;useSSL=true;token={token};enableCompression=true"; + } + + private static bool IsCloudTest(Dictionary cfg) + { + return cfg.ContainsKey("token") && !string.IsNullOrEmpty(cfg["token"]); + } + private void NewConsumerTest(string connectString, string db, string topic, Dictionary cfg) { var builder = new ConnectionStringBuilder(connectString); using (var client = DbDriver.Open(builder)) { + var isCloud = IsCloudTest(cfg); try { string[] sqlCommands = @@ -134,11 +174,22 @@ private void NewConsumerTest(string connectString, string db, string topic, Dict $"create database if not exists {db} vgroups 2 WAL_RETENTION_PERIOD 86400", $"use {db}", this._createTableSql, - "create table if not exists ct0 using all_type tags(1000)", - "create table if not exists ct1 using all_type tags(2000)", - "create table if not exists ct2 using all_type tags(3000)", - $"create topic if not exists {topic} as stable all_type" + "create table if not exists ct0 using tmq_all_type tags(1000)", + "create table if not exists ct1 using tmq_all_type tags(2000)", + "create table if not exists ct2 using tmq_all_type tags(3000)", + $"create topic if not exists {topic} as stable tmq_all_type" }; + if (isCloud) + { + sqlCommands = new string[] + { + $"use {db}", + "create table if not exists ct0 using tmq_all_type tags(1000)", + "create table if not exists ct1 using tmq_all_type tags(2000)", + "create table if not exists ct2 using tmq_all_type tags(3000)", + }; + } + foreach (var sqlCommand in sqlCommands) { DoRequest(client, sqlCommand); @@ -148,12 +199,6 @@ private void NewConsumerTest(string connectString, string db, string topic, Dict DateTime now = new DateTime(dateTime.Year, dateTime.Month, dateTime.Day, dateTime.Hour, dateTime.Minute, dateTime.Second, dateTime.Millisecond, dateTime.Kind); - for (int i = 0; i < 3; i++) - { - var sql = - $"insert into ct{i} values('{now.ToString("yyyy-MM-dd'T'HH:mm:ss.fffK")}',true,2,3,4,5,6,7,8,9,10,11,'binary','nchar','varbinary','POINT(100 100)')"; - DoRequest(client, sql); - } var consumer = new ConsumerBuilder>(cfg).Build(); consumer.Subscribe($"{topic}"); @@ -163,15 +208,12 @@ private void NewConsumerTest(string connectString, string db, string topic, Dict Assert.Single(topics); Assert.Equal($"{topic}", topics[0]); _output.WriteLine(assignment.ToString()); - var position1 = consumer.Position(assignment[0]); - Assert.Equal(0, position1); - var position2 = consumer.Position(assignment[1]); - Assert.Equal(0, position2); var messageCount = 0; for (int i = 0; i < 5; i++) { - using (var result = consumer.Consume(100)) + using (var result = consumer.Consume(500)) { + _output.WriteLine($"{result}"); if (messageCount == 3) { break; @@ -179,6 +221,13 @@ private void NewConsumerTest(string connectString, string db, string topic, Dict if (result == null) { + for (int j = 0; j < 3; j++) + { + var sql = + $"insert into ct{j} values('{now.ToString("yyyy-MM-dd'T'HH:mm:ss.fffK")}',true,2,3,4,5,6,7,8,9,10,11,'binary','nchar','varbinary','POINT(100 100)')"; + DoRequest(client, sql); + } + continue; } @@ -211,10 +260,13 @@ private void NewConsumerTest(string connectString, string db, string topic, Dict } finally { - Thread.Sleep(3000); - DoRequest(client, $"drop topic if exists {topic}"); - Thread.Sleep(3000); - DoRequest(client, $"drop database if exists {db}"); + if (!isCloud) + { + Thread.Sleep(3000); + DoRequest(client, $"drop topic if exists {topic}"); + Thread.Sleep(3000); + DoRequest(client, $"drop database if exists {db}"); + } } } } @@ -239,10 +291,10 @@ private void ConsumerSeekTest(string connectString, string db, string topic, Dic $"create database if not exists {db} vgroups 2 WAL_RETENTION_PERIOD 86400", $"use {db}", this._createTableSql, - "create table if not exists ct0 using all_type tags(1000)", - "create table if not exists ct1 using all_type tags(2000)", - "create table if not exists ct2 using all_type tags(3000)", - $"create topic if not exists {topic} as stable all_type" + "create table if not exists ct0 using tmq_all_type tags(1000)", + "create table if not exists ct1 using tmq_all_type tags(2000)", + "create table if not exists ct2 using tmq_all_type tags(3000)", + $"create topic if not exists {topic} as stable tmq_all_type" }; foreach (var sqlCommand in sqlCommands) { @@ -382,10 +434,10 @@ private void ConsumerCommitTest(string connectString, string db, string topic, D $"create database if not exists {db} vgroups 2 WAL_RETENTION_PERIOD 86400", $"use {db}", this._createTableSql, - "create table if not exists ct0 using all_type tags(1000)", - "create table if not exists ct1 using all_type tags(2000)", - "create table if not exists ct2 using all_type tags(3000)", - $"create topic if not exists {topic} as stable all_type" + "create table if not exists ct0 using tmq_all_type tags(1000)", + "create table if not exists ct1 using tmq_all_type tags(2000)", + "create table if not exists ct2 using tmq_all_type tags(3000)", + $"create topic if not exists {topic} as stable tmq_all_type" }; foreach (var sqlCommand in sqlCommands) { @@ -490,10 +542,10 @@ private void ConsumerAutoCommitTest(string connectString, string db, string topi $"create database if not exists {db} vgroups 2 WAL_RETENTION_PERIOD 86400", $"use {db}", this._createTableSql, - "create table if not exists ct0 using all_type tags(1000)", - "create table if not exists ct1 using all_type tags(2000)", - "create table if not exists ct2 using all_type tags(3000)", - $"create topic if not exists {topic} as stable all_type" + "create table if not exists ct0 using tmq_all_type tags(1000)", + "create table if not exists ct1 using tmq_all_type tags(2000)", + "create table if not exists ct2 using tmq_all_type tags(3000)", + $"create topic if not exists {topic} as stable tmq_all_type" }; foreach (var sqlCommand in sqlCommands) {