@@ -8,7 +8,7 @@ def test_query_with_stream(test_environment, test_stream):
8
8
# Test the Query object with stream
9
9
query = (
10
10
Query (env = test_environment )
11
- .sql (query = f"SELECT * FROM test_stream" )
11
+ .sql (query = f"SELECT * FROM test_stream where _tp_time > earliest_ts() " )
12
12
.create ()
13
13
)
14
14
@@ -22,6 +22,19 @@ def test_query_with_stream(test_environment, test_stream):
22
22
assert "sql" in metadata
23
23
assert "id" in metadata
24
24
25
+ limit = 1
26
+ count = 0
27
+
28
+ results = []
29
+ for event in query .result ():
30
+ if event .event == "message" :
31
+ results .extend (json .loads (event .data ))
32
+ count += 1
33
+ if count >= limit :
34
+ break
35
+
36
+ assert len (results ) == 4
37
+
25
38
query .cancel ()
26
39
# Without query.cancel(), there will be an error:
27
40
# OSError: [Errno 9] Bad file descriptor And the query cannot be deleted
@@ -32,30 +45,17 @@ def test_query_with_stream(test_environment, test_stream):
32
45
33
46
34
47
def test_query_with_table (test_environment , test_stream ):
35
- #Test the Query Object with history table
36
- data = [["time" , "data" ], [[1 , "efgh" ]]]
37
- test_stream .ingest (* data )
38
-
39
48
time .sleep (3 )
40
- # Without sleep(3), there will be an error:
41
- # FAILED test_query.py::test_query_with_table - KeyError: 'id'
42
-
49
+
43
50
query = (
44
51
Query (env = test_environment )
45
52
.sql (query = "SELECT * FROM table(test_stream)" )
46
53
.create ()
47
54
)
48
55
49
- limit = 2
50
- count = 0
51
-
52
56
results = []
53
57
for event in query .result ():
54
58
if event .event == "message" :
55
59
results .extend (json .loads (event .data ))
56
- count += 1
57
- if count >= limit :
58
- break
59
- print (results )
60
- assert len (results ) == 2
60
+ assert len (results ) == 4
61
61
query .delete ()
0 commit comments