feat(stream): support the change of vtable in stream#34715
feat(stream): support the change of vtable in stream#34715wangmm0220 wants to merge 22 commits into3.0from
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the stream processing capabilities by introducing support for dynamic modifications to virtual tables (vtables). The changes are validated through an updated test case that demonstrates the system's ability to correctly handle ALTER VTABLE operations, specifically when altering a vtable column's source or setting it to null, ensuring robust and adaptable stream processing. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Pull request overview
This PR updates stream processing tests to support the "alter vtable" operations in streams, specifically testing scenarios where virtual table (vtable) column source references are changed or nulled. The key changes are in the Basic37208 test class within test_meta_change_vtable.py.
Changes:
- Replaced the old placeholder debug flags (
debugflag 131/stdebugflag 131) with more specific flags (qdebugflag 143/stdebugflag 135) that produce more detailed vtable query and stream debug logs. - Rewrote
Basic37208.create()to define three focused streams (s1,s2,s3) targeting vtable operations instead of the previous commented-out approach. - Expanded
insert3to exercise vtable tag changes and the newALTER VTABLE ... ALTER COLUMN ... SETandSET NULLoperations; addedvct3,vct4,vct5setup for wider coverage.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
|
|
||
| def insert3(self): | ||
| tdSql.execute(f"create vtable {self.db}.vct3 (cint from {self.db}.ct3.cint) using {self.db}.{self.vstbName} tags(10,10)") | ||
| tdSql.execute(f"create vtable {self.db}.vct4 (cint from {self.db}.ct3.cint) using {self.db}.{self.vstbName} tags(20,20)") | ||
| tdSql.execute(f"create vtable {self.db}.vct4 (cint from {self.db}.ct4.cint) using {self.db}.{self.vstbName} tags(-20,-20)") | ||
| tdSql.execute(f"create vtable {self.db}.vct5 (cint from {self.db}.ct2.cint) using {self.db}.{self.vstbName} tags(20,20)") | ||
| time.sleep(5) | ||
| tdSql.execute(f"insert into ct4 values ('2025-01-01 00:01:00', 1);") | ||
| tdSql.execute(f"insert into ct3 values ('2025-01-01 00:01:00', 1);") | ||
| tdSql.execute(f"alter table {self.db}.ct1 set tag tbigint = 30") | ||
| tdSql.execute(f"insert into ct2 values ('2025-01-05 10:01:00', 1);") | ||
|
|
||
| tdSql.execute(f"alter table {self.db}.vct1 set tag tbigint = 30") | ||
| time.sleep(5) | ||
| tdSql.execute(f"insert into {self.db}.ct1 values ('2025-01-03 00:00:00', 1);") | ||
| tdSql.execute(f"alter table {self.db}.ct3 set tag tbigint = 3") | ||
| tdSql.execute(f"alter table {self.db}.ct4 set tag tbigint = 30") | ||
|
|
||
| tdSql.execute(f"alter table {self.db}.vct3 set tag tbigint = -3") | ||
| tdSql.execute(f"alter table {self.db}.vct4 set tag tbigint = -30") | ||
| tdSql.execute(f"alter table {self.db}.vct5 set tag tbigint = 60") | ||
| time.sleep(5) | ||
| tdSql.execute(f"insert into ct4 values ('2025-01-02 00:01:00', 1);") | ||
| tdSql.execute(f"insert into ct3 values ('2025-01-02 00:01:00', 1);") | ||
|
|
||
| tdSql.execute(f"alter vtable {self.db}.vct4 alter column cint set {self.db}.ct2.cint") | ||
| time.sleep(5) | ||
| tdSql.execute(f"insert into ct2 values ('2025-01-05 10:01:10', 1);") | ||
|
|
||
| tdSql.execute(f"alter vtable {self.db}.vct4 alter column cint set null") | ||
| time.sleep(5) | ||
| tdSql.execute(f"insert into ct2 values ('2025-01-05 10:01:11', 1);") | ||
|
|
||
| def check3(self): | ||
| tdSql.checkResultsByFunc( | ||
| sql=f'select * from information_schema.ins_tables where db_name="{self.db}" and table_name like "res_stb_%"', |
There was a problem hiding this comment.
The check3 method references output table names res_stb_t2_vct1, res_stb_t2_vct3, and res_stb_f_vct4 (lines 191, 201, 211), but none of the three streams defined in create() generate tables with those prefixes. Stream s1 uses prefix res_stb_, stream s2 uses res_stb_1_, and stream s3 uses res_stb_2_. These check queries appear to be stale references from the old commented-out stream definitions (s37208_g_t2 → res_stb_t2_ and s37208_g_f → res_stb_f_). If Basic37208 is re-enabled, these checks will always fail because the expected result tables don't exist under those names.
There was a problem hiding this comment.
Code Review
This pull request enhances stream processing capabilities by adding support for metadata changes in virtual tables. The changes are demonstrated through updates to a Python test file, which now includes scenarios for altering virtual table tags and columns. My review focuses on improving code consistency and test reliability. Specifically, I've pointed out an inconsistent SQL keyword casing and suggested refactoring repeated time.sleep calls to make the tests more robust and maintainable. Overall, the changes are a good addition and the tests seem to cover the new functionality well.
| tdSql.execute( | ||
| f"create stream {self.db}.s3 count_window(1) from {self.vstbName} partition by tbname, tint stream_options(PRE_FILTER(tbigint <= 2)) into {self.db}.res_stb_2 OUTPUT_SUBTABLE(CONCAT('res_stb_2_', tbname)) (firstts, lastts, cnt_v, sum_v, avg_v) as select first(_c0), last_row(_c0), count(cint), sum(cint), avg(cint) from {self.vstbName};" | ||
| ) |
There was a problem hiding this comment.
For consistency with other stream definitions in this file, it's better to use lowercase for SQL keywords like pre_filter. Also, the trailing semicolon in the f-string is unnecessary and can be removed.
| tdSql.execute( | |
| f"create stream {self.db}.s3 count_window(1) from {self.vstbName} partition by tbname, tint stream_options(PRE_FILTER(tbigint <= 2)) into {self.db}.res_stb_2 OUTPUT_SUBTABLE(CONCAT('res_stb_2_', tbname)) (firstts, lastts, cnt_v, sum_v, avg_v) as select first(_c0), last_row(_c0), count(cint), sum(cint), avg(cint) from {self.vstbName};" | |
| ) | |
| tdSql.execute( | |
| f"create stream {self.db}.s3 count_window(1) from {self.vstbName} partition by tbname, tint stream_options(pre_filter(tbigint <= 2)) into {self.db}.res_stb_2 OUTPUT_SUBTABLE(CONCAT('res_stb_2_', tbname)) (firstts, lastts, cnt_v, sum_v, avg_v) as select first(_c0), last_row(_c0), count(cint), sum(cint), avg(cint) from {self.vstbName}" | |
| ) |
| time.sleep(5) | ||
| tdSql.execute(f"insert into ct4 values ('2025-01-01 00:01:00', 1);") | ||
| tdSql.execute(f"insert into ct3 values ('2025-01-01 00:01:00', 1);") | ||
| tdSql.execute(f"alter table {self.db}.ct1 set tag tbigint = 30") | ||
| tdSql.execute(f"insert into ct2 values ('2025-01-05 10:01:00', 1);") | ||
|
|
||
| tdSql.execute(f"alter table {self.db}.vct1 set tag tbigint = 30") | ||
| time.sleep(5) | ||
| tdSql.execute(f"insert into {self.db}.ct1 values ('2025-01-03 00:00:00', 1);") | ||
| tdSql.execute(f"alter table {self.db}.ct3 set tag tbigint = 3") | ||
| tdSql.execute(f"alter table {self.db}.ct4 set tag tbigint = 30") | ||
|
|
||
| tdSql.execute(f"alter table {self.db}.vct3 set tag tbigint = -3") | ||
| tdSql.execute(f"alter table {self.db}.vct4 set tag tbigint = -30") | ||
| tdSql.execute(f"alter table {self.db}.vct5 set tag tbigint = 60") | ||
| time.sleep(5) | ||
| tdSql.execute(f"insert into ct4 values ('2025-01-02 00:01:00', 1);") | ||
| tdSql.execute(f"insert into ct3 values ('2025-01-02 00:01:00', 1);") | ||
|
|
||
| tdSql.execute(f"alter vtable {self.db}.vct4 alter column cint set {self.db}.ct2.cint") | ||
| time.sleep(5) | ||
| tdSql.execute(f"insert into ct2 values ('2025-01-05 10:01:10', 1);") | ||
|
|
||
| tdSql.execute(f"alter vtable {self.db}.vct4 alter column cint set null") | ||
| time.sleep(5) | ||
| tdSql.execute(f"insert into ct2 values ('2025-01-05 10:01:11', 1);") |
There was a problem hiding this comment.
The insert3 method contains multiple time.sleep(5) calls. These hardcoded sleeps can make tests slower and potentially flaky. Consider refactoring these waits into a helper method, for example _wait_for_stream_processing(seconds=5). This would improve readability by making the purpose of the wait explicit and centralize the wait duration, making it easier to adjust if needed. A more robust solution would be to use a polling mechanism to check for the expected state instead of a fixed sleep time.
Description
Issue(s)
Checklist
Please check the items in the checklist if applicable.