Skip to content
Open
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f6b8dd6
feat(stream): support schema evolution of virtual table
JinqingKuang Nov 3, 2025
feded46
feat(stream): support meta change in virtual table
wangmm0220 Nov 3, 2025
84ee8cc
enh: support trigger original reader deploy times
dapan1121 Nov 19, 2025
367ac43
Merge remote-tracking branch 'origin/3.0' into feat/TD-37208-3.0
dapan1121 Nov 19, 2025
2212f10
feat(stream): [TD-37208] process schema changed in vtable
wangmm0220 Dec 9, 2025
19e439d
fix(stream): reviewd by gemini
wangmm0220 Dec 10, 2025
3e320b1
fix(stream): add test cases
wangmm0220 Dec 12, 2025
044063a
fix(stream): remove trigger periodically pulling vtable info
JinqingKuang Dec 15, 2025
74288d8
fix(tmq): get spicific uid info in vtableinfo requestion
wangmm0220 Dec 17, 2025
f5b6c24
fix(tmq): merge from 3.0
wangmm0220 Dec 19, 2025
ef9933c
fix(tmq): add tableBlock
wangmm0220 Dec 19, 2025
ad0a00d
fix(stream): add tableBlock processing in trigger
JinqingKuang Dec 19, 2025
207756f
feat(strea): fix col bytes error
wangmm0220 Dec 22, 2025
a0f70f0
fix(tmq): support vtable change in reader
wangmm0220 Dec 23, 2025
7fcb880
fix(tmq): support vtable change in reader
wangmm0220 Dec 24, 2025
0d66910
fix(tmq): support vtable change in reader
wangmm0220 Dec 24, 2025
263c0f0
Merge branch '3.0' into feat/TD-37208-3.0
wangmm0220 Dec 25, 2025
ecf5d41
fix(tmq): support vtable change in reader
wangmm0220 Dec 25, 2025
ce93b69
fix(stream): conflicts from 3.0
wangmm0220 Jan 4, 2026
b5ab8e8
fix(stream): add test case
wangmm0220 Jan 4, 2026
fc6eff1
Merge branch '3.0' into feat/TD-37208-3.0
wangmm0220 Mar 9, 2026
42f4934
fix(stream): fix cases for vtable in stream
wangmm0220 Mar 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,27 @@ def test_stream_meta_change_vtable(self):
"""

tdStream.createSnode()
tdSql.execute(f"alter all dnodes 'debugflag 131';")
tdSql.execute(f"alter all dnodes 'stdebugflag 131';")
tdSql.execute(f"alter all dnodes 'qdebugflag 143';")
tdSql.execute(f"alter all dnodes 'stdebugflag 135';")

streams = []
# streams.append(self.Basic37208()) # [ok] add ctb and drop ctb from stb
streams.append(self.Basic0()) # [ok] add ctb and drop ctb from stb
streams.append(self.Basic1()) # [ok] drop data source table
streams.append(self.Basic37208()) # [ok] add ctb and drop ctb from stb
# streams.append(self.Basic0()) # [ok] add ctb and drop ctb from stb
# streams.append(self.Basic1()) # [ok] drop data source table

# streams.append(self.Basic2()) # tag过滤时,修改tag的值,从满足流条件,到不满足流条件; 从不满足流条件,到满足流条件 [fail]
# # streams.append(self.Basic2()) # tag过滤时,修改tag的值,从满足流条件,到不满足流条件; 从不满足流条件,到满足流条件 [fail]

# TD-36750 [流计算开发阶段] 虚拟表+删除pre_filter(cbigint >=1)中cbigint列后,应该没有符合条件的数据了,不会触发计算窗口
# TD-38126 pre_filter 在 %%trows 且触发表为虚拟表时不可用
# streams.append(self.Basic3()) # [ok]
# # TD-36750 [流计算开发阶段] 虚拟表+删除pre_filter(cbigint >=1)中cbigint列后,应该没有符合条件的数据了,不会触发计算窗口
# # TD-38126 pre_filter 在 %%trows 且触发表为虚拟表时不可用
# # streams.append(self.Basic3()) # [ok]

streams.append(self.Basic4()) # [ok]
streams.append(self.Basic5()) # [ok]
# streams.append(self.Basic4()) # [ok]
# streams.append(self.Basic5()) # [ok]

# TD-37144 [流计算开发阶段] 删除流结果表后继续触发了也没有重建,不符合预期
# streams.append(self.Basic6()) # [fail]
# # TD-37144 [流计算开发阶段] 删除流结果表后继续触发了也没有重建,不符合预期
# # streams.append(self.Basic6()) # [fail]

streams.append(self.Basic7()) # [ok]
# streams.append(self.Basic7()) # [ok]

tdStream.checkAll(streams)

Expand All @@ -70,30 +70,20 @@ def create(self):
tdSql.execute(f"create table {self.db}.ct5 using {self.db}.{self.stbName} (tint, tbigint)tags(5, 5)")

tdSql.execute(f"create vtable {self.db}.vct1 (cint from {self.db}.ct1.cint) using {self.db}.{self.vstbName} tags(1,1)")
# tdSql.execute(f"create vtable vct2 (cint from {self.db}.ct2.cint) using {self.db}.{self.vstbName} tags(2,2)")
# tdSql.execute(f"create vtable vct4 (cint from {self.db}.ct4.cint) using {self.db}.{self.vstbName} tags(4,4)")
# tdSql.execute(f"create vtable vct5 (cint from {self.db}.ct5.cint) using {self.db}.{self.vstbName} tags(5,5)")

# tdSql.execute(
# f"create stream {self.db}.s37208_g count_window(1) from {self.vstbName} partition by tbname, tint into {self.db}.res_stb OUTPUT_SUBTABLE(CONCAT('res_stb_', tbname)) (firstts, lastts, cnt_v, sum_v, avg_v) as select first(_c0), last_row(_c0), count(cint), sum(cint), avg(cint) from %%trows;"
# )

# tdSql.error(f"alter table vct2 set tag tint = 9")

# tdSql.execute(
# f"create stream {self.db}.s37208_g_f count_window(1) from {self.vstbName} partition by tbname, tint stream_options(DELETE_OUTPUT_TABLE) into {self.db}.res_stb_f OUTPUT_SUBTABLE(CONCAT('res_stb_f_', tbname)) (firstts, lastts, cnt_v, sum_v, avg_v) as select first(_c0), last_row(_c0), count(cint), sum(cint), avg(cint) from %%trows;"
# )

tdSql.execute(f"create table if not exists {self.db}.tt (cts timestamp, cint int, cfloat float, cdouble double, cvar varchar(4)) tags (tint int, tbigint bigint)")
tdSql.execute(f"use {self.db}")

tdSql.execute(
f"create stream {self.db}.s37208_g_t1 state_window(cint) from {self.db}.tt partition by tbname, tint stream_options(pre_filter(cdouble >= 10)) into {self.db}.res_stb_t1 as select first(_c0), last_row(_c0), sum(cfloat) from %%trows;"
f"create stream {self.db}.s1 count_window(1) from {self.vstbName} partition by tbname, tint into {self.db}.res_stb OUTPUT_SUBTABLE(CONCAT('res_stb_', tbname)) (firstts, lastts, cnt_v, sum_v, avg_v) as select first(_c0), last_row(_c0), count(cint), sum(cint), avg(cint) from %%trows;"
)

# tdSql.execute(
# f"create stream {self.db}.s37208_g_t2 count_window(1) from {self.vstbName} partition by tbname, tint stream_options(pre_filter(tbigint >= 10)) into res_stb_t2 OUTPUT_SUBTABLE(CONCAT('res_stb_t2_', tbname)) (firstts, lastts, cnt_v, sum_v, avg_v) as select first(_c0), last_row(_c0), count(cint), sum(cint), avg(cint) from %%tbname where cts >= _twstart and cts <= _twend;"
# )
tdSql.execute(
f"create stream {self.db}.s2 count_window(1) from {self.vstbName} partition by tbname, tint stream_options(DELETE_OUTPUT_TABLE) into {self.db}.res_stb_1 OUTPUT_SUBTABLE(CONCAT('res_stb_1_', tbname)) (firstts, lastts, cnt_v, sum_v, avg_v) as select first(_c0), last_row(_c0), count(cint), sum(cint), avg(cint) from %%trows;"
)

tdSql.execute(
f"create stream {self.db}.s3 count_window(1) from {self.vstbName} partition by tbname, tint stream_options(PRE_FILTER(tbigint <= 2)) into {self.db}.res_stb_2 OUTPUT_SUBTABLE(CONCAT('res_stb_2_', tbname)) (firstts, lastts, cnt_v, sum_v, avg_v) as select first(_c0), last_row(_c0), count(cint), sum(cint), avg(cint) from {self.vstbName};"
)
Comment on lines +84 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with other stream definitions in this file, it's better to use lowercase for SQL keywords like pre_filter. Also, the trailing semicolon in the f-string is unnecessary and can be removed.

Suggested change
tdSql.execute(
f"create stream {self.db}.s3 count_window(1) from {self.vstbName} partition by tbname, tint stream_options(PRE_FILTER(tbigint <= 2)) into {self.db}.res_stb_2 OUTPUT_SUBTABLE(CONCAT('res_stb_2_', tbname)) (firstts, lastts, cnt_v, sum_v, avg_v) as select first(_c0), last_row(_c0), count(cint), sum(cint), avg(cint) from {self.vstbName};"
)
tdSql.execute(
f"create stream {self.db}.s3 count_window(1) from {self.vstbName} partition by tbname, tint stream_options(pre_filter(tbigint <= 2)) into {self.db}.res_stb_2 OUTPUT_SUBTABLE(CONCAT('res_stb_2_', tbname)) (firstts, lastts, cnt_v, sum_v, avg_v) as select first(_c0), last_row(_c0), count(cint), sum(cint), avg(cint) from {self.vstbName}"
)


def insert1(self):
tdSql.execute(f"create vtable {self.db}.vct2 (cint from {self.db}.ct2.cint) using {self.db}.{self.vstbName} tags(2,2)")
Expand All @@ -106,13 +96,13 @@ def insert1(self):

def check1(self):
tdSql.checkResultsByFunc(
sql=f'select * from information_schema.ins_tables where db_name="{self.db}" and table_name like "res_stb_f%"',
sql=f'select * from information_schema.ins_tables where db_name="{self.db}" and table_name like "res_stb_1%"',
func=lambda: tdSql.getRows() == 2,
)

tdSql.checkResultsByFunc(
sql=f'select * from information_schema.ins_tables where db_name="{self.db}" and table_name like "res_stb_%"',
func=lambda: tdSql.getRows() == 4,
func=lambda: tdSql.getRows() == 6,
)

tdSql.checkResultsByFunc(
Expand Down Expand Up @@ -165,19 +155,32 @@ def check2(self):

def insert3(self):
tdSql.execute(f"create vtable {self.db}.vct3 (cint from {self.db}.ct3.cint) using {self.db}.{self.vstbName} tags(10,10)")
tdSql.execute(f"create vtable {self.db}.vct4 (cint from {self.db}.ct3.cint) using {self.db}.{self.vstbName} tags(20,20)")
tdSql.execute(f"create vtable {self.db}.vct4 (cint from {self.db}.ct4.cint) using {self.db}.{self.vstbName} tags(-20,-20)")
tdSql.execute(f"create vtable {self.db}.vct5 (cint from {self.db}.ct2.cint) using {self.db}.{self.vstbName} tags(20,20)")
time.sleep(5)
tdSql.execute(f"insert into ct4 values ('2025-01-01 00:01:00', 1);")
tdSql.execute(f"insert into ct3 values ('2025-01-01 00:01:00', 1);")
tdSql.execute(f"alter table {self.db}.ct1 set tag tbigint = 30")
tdSql.execute(f"insert into ct2 values ('2025-01-05 10:01:00', 1);")

tdSql.execute(f"alter table {self.db}.vct1 set tag tbigint = 30")
time.sleep(5)
tdSql.execute(f"insert into {self.db}.ct1 values ('2025-01-03 00:00:00', 1);")
tdSql.execute(f"alter table {self.db}.ct3 set tag tbigint = 3")
tdSql.execute(f"alter table {self.db}.ct4 set tag tbigint = 30")

tdSql.execute(f"alter table {self.db}.vct3 set tag tbigint = -3")
tdSql.execute(f"alter table {self.db}.vct4 set tag tbigint = -30")
tdSql.execute(f"alter table {self.db}.vct5 set tag tbigint = 60")
time.sleep(5)
tdSql.execute(f"insert into ct4 values ('2025-01-02 00:01:00', 1);")
tdSql.execute(f"insert into ct3 values ('2025-01-02 00:01:00', 1);")

tdSql.execute(f"alter vtable {self.db}.vct4 alter column cint set {self.db}.ct2.cint")
time.sleep(5)
tdSql.execute(f"insert into ct2 values ('2025-01-05 10:01:10', 1);")

tdSql.execute(f"alter vtable {self.db}.vct4 alter column cint set null")
time.sleep(5)
tdSql.execute(f"insert into ct2 values ('2025-01-05 10:01:11', 1);")
Comment on lines 160 to +182
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The insert3 method contains multiple time.sleep(5) calls. These hardcoded sleeps can make tests slower and potentially flaky. Consider refactoring these waits into a helper method, for example _wait_for_stream_processing(seconds=5). This would improve readability by making the purpose of the wait explicit and centralize the wait duration, making it easier to adjust if needed. A more robust solution would be to use a polling mechanism to check for the expected state instead of a fixed sleep time.


def check3(self):
tdSql.checkResultsByFunc(
sql=f'select * from information_schema.ins_tables where db_name="{self.db}" and table_name like "res_stb_%"',
Comment on lines 155 to 186
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check3 method references output table names res_stb_t2_vct1, res_stb_t2_vct3, and res_stb_f_vct4 (lines 191, 201, 211), but none of the three streams defined in create() generate tables with those prefixes. Stream s1 uses prefix res_stb_, stream s2 uses res_stb_1_, and stream s3 uses res_stb_2_. These check queries appear to be stale references from the old commented-out stream definitions (s37208_g_t2res_stb_t2_ and s37208_g_fres_stb_f_). If Basic37208 is re-enabled, these checks will always fail because the expected result tables don't exist under those names.

Copilot uses AI. Check for mistakes.
Expand Down Expand Up @@ -218,6 +221,7 @@ def check3(self):
and tdSql.compareData(1, 3, 1)
and tdSql.compareData(1, 4, 1),
)

class Basic0(StreamCheckItem):
def __init__(self):
self.db = "sdb0"
Expand Down
Loading