Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit f6ba5df

Browse files
authoredJun 22, 2024··
Feature/create stream 4 cluster (#93)
* update stream creation parameters * update test
1 parent f5973ca commit f6ba5df

File tree

6 files changed

+74
-14
lines changed

6 files changed

+74
-14
lines changed
 

‎python/test/conftest.py

+8
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import pytest
66
import os
7+
import time
78
from timeplus.dbapi import connect
89

910
from timeplus import Environment
@@ -33,18 +34,25 @@ def test_stream(test_environment):
3334

3435
try:
3536
stream.delete()
37+
time.sleep(3)
3638
except Exception:
3739
pass
3840

41+
time.sleep(3)
42+
3943
# Create a new stream
4044
stream = (
4145
Stream(env=test_environment)
4246
.name(stream_name)
4347
.column("time", "integer")
4448
.column("data", "string")
49+
.replication_factor(3)
50+
.shards(3)
4551
.create()
4652
)
4753

54+
time.sleep(3)
55+
4856
value = [["time", "data"], [[0, "abcd"]]]
4957
stream.ingest(*value)
5058
# Provide the stream to the test

‎python/test/test_ingest_stream.py

+19-7
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,19 @@
55
from timeplus import Stream, Query
66
import datetime
77

8+
time_wait = 5
89

910
def test_ingest(test_environment, test_stream):
11+
# there is one row data in test stream already
12+
time.sleep(time_wait)
13+
# wait previous data ingested
1014
data = [["time", "data"], [[1, "efgh"]]]
1115
try:
1216
test_stream.ingest(*data)
1317
except Exception as e:
1418
pytest.fail(f"Ingest method failed with exception {e}")
1519

16-
time.sleep(3)
20+
time.sleep(time_wait)
1721

1822
query = (
1923
Query(env=test_environment)
@@ -26,7 +30,6 @@ def test_ingest(test_environment, test_stream):
2630
if event.event == "message":
2731
results.extend(json.loads(event.data))
2832
print(results)
29-
print(results[1][1])
3033

3134
assert len(results) > 1, "No data returned from the stream"
3235
assert results[1][0] == 1, "Returned time does not match the ingested integer"
@@ -42,8 +45,12 @@ def test_stream_ingest_lines(test_environment,test_stream):
4245
Stream(env=test_environment)
4346
.name("test_stream")
4447
.column("raw", "string")
48+
.replication_factor(3)
49+
.shards(3)
4550
.create()
4651
)
52+
time.sleep(time_wait)
53+
4754
payload = '{"time":1,"data":"abcd"}\n{"time":2,"data":"xyz"}'
4855

4956
# Ingest data in 'lines' format
@@ -52,7 +59,7 @@ def test_stream_ingest_lines(test_environment,test_stream):
5259
except Exception as e:
5360
pytest.fail(f"Ingest lines method failed with exception {e}")
5461

55-
time.sleep(3)
62+
time.sleep(time_wait)
5663

5764
query = (
5865
Query(env=test_environment)
@@ -85,16 +92,20 @@ def test_stream_ingest_raw(test_environment,test_stream):
8592
Stream(env=test_environment)
8693
.name("test_stream")
8794
.column("raw", "string")
95+
.replication_factor(3)
96+
.shards(3)
8897
.create()
8998
)
9099

100+
time.sleep(time_wait)
101+
91102
# Ingest data in 'raw' format
92103
try:
93104
stream.ingest(payload=payload, format="raw")
94105
except Exception as e:
95106
pytest.fail(f"Ingest raw method failed with exception {e}")
96107

97-
time.sleep(3)
108+
time.sleep(time_wait)
98109

99110
query = (
100111
Query(env=test_environment)
@@ -115,6 +126,9 @@ def test_stream_ingest_raw(test_environment,test_stream):
115126

116127

117128
def test_json_ingest(test_environment, test_stream):
129+
# there is one row data in test stream already
130+
time.sleep(time_wait)
131+
# wait previous data ingested
118132
payload = """
119133
{"time":2,"data":"hello"}
120134
{"time":1,"data":"world"}
@@ -126,7 +140,7 @@ def test_json_ingest(test_environment, test_stream):
126140
except Exception as e:
127141
pytest.fail(f"Ingest streaming method failed with exception {e}")
128142

129-
time.sleep(3)
143+
time.sleep(time_wait)
130144

131145
query = (
132146
Query(env=test_environment)
@@ -138,8 +152,6 @@ def test_json_ingest(test_environment, test_stream):
138152
if event.event == "message":
139153
results.extend(json.loads(event.data))
140154
print(results)
141-
print(results[0][0])
142-
print(results[0])
143155

144156
assert len(results) > 1, "No data returned from the stream"
145157
assert results[1][0] == 2, "Returned data does not match the ingested data"

‎python/test/test_sqlalchemy_query.py

+22-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
from sqlalchemy import text, select, MetaData, Table
23
from timeplus import View
34

@@ -58,11 +59,30 @@ def test_table_names(engine):
5859
assert "car_live_data" in tables
5960

6061

61-
def test_view_names(engine):
62+
def test_view_names(test_environment, engine):
63+
view_name = "example_mv"
64+
view = View(env=test_environment).name(view_name)
65+
66+
try:
67+
view.delete()
68+
except Exception:
69+
pass
70+
71+
view = (
72+
View(env=test_environment)
73+
.name(view_name)
74+
.query("select * from test_stream")
75+
.create()
76+
)
77+
78+
time.sleep(1)
79+
6280
with engine.connect() as conn:
6381
views = engine.dialect.get_view_names(conn)
6482
print(views)
65-
assert "car_info" in views
83+
assert "example_mv" in views
84+
85+
view.delete()
6686

6787

6888
def test_materialized_view_names(engine,test_environment,test_stream):

‎python/timeplus/dbapi.py

+11-4
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import itertools
44
from collections import namedtuple
55

6-
from timeplus import Environment, Query, Stream, View
6+
from timeplus import Environment, Query, Stream, View, ExternalStream
77
from timeplus.error import Error
88

99

@@ -151,20 +151,27 @@ def __exit__(self, *exc):
151151
# since some SQL is not exposed through API yet
152152
def _exist_table(self, name):
153153
stream = Stream(env=self.env).name(name)
154-
return stream.exist()
154+
external_stream = ExternalStream(env=self.env).name(name)
155+
return stream.exist() or external_stream.exist()
155156

156157
def _exist_view(self, name):
157158
view = View(env=self.env).name(name)
158159
return view.exist()
159160

160161
def _get_table(self, name):
161-
return Stream(env=self.env).name(name).get()
162+
try:
163+
return Stream(env=self.env).name(name).get()
164+
except:
165+
return ExternalStream(env=self.env).name(name).get()
162166

163167
def _get_view(self, name):
164168
return View(env=self.env).name(name).get()
165169

166170
def _list_table(self):
167-
return Stream(env=self.env).list()
171+
streams = Stream(env=self.env).list()
172+
estreams = ExternalStream(env=self.env).list()
173+
174+
return streams + estreams
168175

169176
def _list_view(self):
170177
return View(env=self.env).list()

‎python/timeplus/stream.py

+13
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ def __init__(self, env):
2929
self._description = None
3030
self._mode = "append"
3131
self._primary_key = None
32+
self._replication_factor = None
33+
self._shards = None
3234

3335
def name(self, stream_name):
3436
"""
@@ -93,6 +95,14 @@ def primary_key(self, primary_key):
9395
self._primary_key = primary_key
9496
return self
9597

98+
def replication_factor(self, replication_factor):
99+
self._replication_factor = replication_factor
100+
return self
101+
102+
def shards(self, shards):
103+
self._shards = shards
104+
return self
105+
96106
def create(self):
97107
"""
98108
Sends a request to the API to create the stream.
@@ -125,6 +135,9 @@ def create(self):
125135
body["description"] = self._description if self._description else None
126136
body["mode"] = self._mode if self._mode else None
127137
body["primary_key"] = self._primary_key if self._primary_key else None
138+
body["replication_factor"] = self._replication_factor if self._replication_factor else None
139+
body["shards"] = self._shards if self._shards else None
140+
128141

129142
try:
130143
self._metadata = self._api_instance.v1beta2_streams_post(body)

‎python/timeplus/version.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.4.0" # noqa: F401
1+
__version__ = "1.4.1" # noqa: F401

0 commit comments

Comments
 (0)
Please sign in to comment.