Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## v2.8.9 - 2026-03-05

### Features:

- Data subscription supports token authentication

## v2.8.6 - 2025-10-27

### Features:
Expand Down
6 changes: 3 additions & 3 deletions ci/release-ws.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ sed "1,9d" CHANGELOG.md >> CHANGELOG.md2
mv CHANGELOG.md2 CHANGELOG.md

# git commit -a -m "release(taos-ws-py): v$newv"
#git push
#git tag taos-ws-py-v$newv
#git push --force origin taos-ws-py-v$newv:taos-ws-py-v$newv
# git push
# git tag taos-ws-py-v$newv
# git push --force origin taos-ws-py-v$newv:taos-ws-py-v$newv
10 changes: 5 additions & 5 deletions ci/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ $ci/changelog-generate.sh >> CHANGELOG.md2
sed "1,9d" CHANGELOG.md >> CHANGELOG.md2
mv CHANGELOG.md2 CHANGELOG.md

git commit -a -m "release: v$newv"
git push
git tag v$newv
git push --force origin v$newv:v$newv
#bash build_doc.sh
# git commit -a -m "release: v$newv"
# git push
# git tag v$newv
# git push --force origin v$newv:v$newv
# bash build_doc.sh
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "taospy"
version = "2.8.6"
version = "2.8.9"
description = "The official TDengine Python connector"
authors = ["Taosdata Inc. <support@taosdata.com>"]
license = "MIT"
Expand Down
6 changes: 6 additions & 0 deletions taos-ws-py/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Conventional Changelog](https://www.conventionalcommits.org/en/v1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## v0.6.6 - 2026-03-05

### Features:

- Data subscription supports token authentication

## v0.6.5 - 2025-12-31

### Features:
Expand Down
24 changes: 12 additions & 12 deletions taos-ws-py/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion taos-ws-py/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "taos-ws-py"
version = "0.6.5"
version = "0.6.6"
edition = "2021"
publish = false
license = "MIT"
Expand Down
11 changes: 11 additions & 0 deletions taos-ws-py/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ impl Consumer {
}

let mut tz = dsn_cfg.get("timezone").cloned();
if let Some(token) = dsn_cfg.remove("td.connect.bearer_token") {
dsn_cfg.set("bearer_token", token);
}

if let Some(args) = conf {
if let Some(scheme) = args
Expand Down Expand Up @@ -105,6 +108,12 @@ impl Consumer {
if let Some(value) = args.get_item("td.connect.token").or(args.get_item("token")) {
dsn_cfg.set("token", value.extract::<String>()?);
}
if let Some(value) = args
.get_item("td.connect.bearer_token")
.or(args.get_item("bearer_token"))
{
dsn_cfg.set("bearer_token", value.extract::<String>()?);
}
if let Some(value) = args.get_item("timezone") {
tz = Some(value.extract::<String>()?);
}
Expand Down Expand Up @@ -135,6 +144,8 @@ impl Consumer {
"td.connect.user",
"td.connect.pass",
"group.id",
"bearer_token",
"td.connect.bearer_token",
];

for (key, value) in args {
Expand Down
126 changes: 124 additions & 2 deletions taos-ws-py/tests/test_tmq.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import pytest
import taosws
import time
import utils
from taosws import Consumer


Expand Down Expand Up @@ -55,7 +58,6 @@ def setup():
"alter table `table` drop column new1",
]
for statement in statements:
# print(statement)
cursor.execute(statement)


Expand All @@ -69,7 +71,6 @@ def test_tmq():
"min.poll.rows": "129",
}
consumer = Consumer(conf)

consumer.subscribe(["ws_tmq_meta"])

while 1:
Expand All @@ -95,6 +96,127 @@ def test_tmq():
consumer.unsubscribe()


@pytest.mark.skipif(not utils.TEST_TD_ENTERPRISE, reason="only for TDengine Enterprise")
def test_tmq_with_token():
conn = taosws.connect("ws://localhost:6041")
try:
conn.execute("drop token if exists token_1772607422")
conn.execute("drop topic if exists topic_1772607422")
conn.execute("drop database if exists test_1772607422")
conn.execute("create database test_1772607422")
conn.execute("create topic topic_1772607422 as database test_1772607422")

rs = conn.query(f"create token token_1772607422 from user {utils.test_username()}")
token = next(iter(rs))[0]

consumer = Consumer(conf={
"td.connect.websocket.scheme": "ws",
"td.connect.ip": "localhost",
"td.connect.port": 6041,
"td.connect.user": "invalid_user",
"td.connect.pass": "invalid_pass",
"td.connect.bearer_token": token,
"group.id": "1001",
"client.id": "1001",
})
consumer.subscribe(["topic_1772607422"])
consumer.unsubscribe()

consumer2 = Consumer(conf={
"td.connect.websocket.scheme": "ws",
"td.connect.ip": "localhost",
"td.connect.port": 6041,
"td.connect.user": "invalid_user",
"td.connect.pass": "invalid_pass",
"bearer_token": token,
"group.id": "1001",
"client.id": "1001",
})
consumer2.subscribe(["topic_1772607422"])
consumer2.unsubscribe()

consumer3 = Consumer(conf={
"td.connect.websocket.scheme": "ws",
"td.connect.ip": "localhost",
"td.connect.port": 6041,
"td.connect.user": "invalid_user",
"td.connect.pass": "invalid_pass",
"td.connect.bearer_token": token,
"bearer_token": "invalid_token",
"group.id": "1001",
"client.id": "1001",
})
consumer3.subscribe(["topic_1772607422"])
consumer3.unsubscribe()

consumer4 = Consumer(dsn = f"ws://invalid_user:invalid_pass@localhost:6041?group.id=1001&client.id=1001&td.connect.bearer_token={token}")
consumer4.subscribe(["topic_1772607422"])
consumer4.unsubscribe()

consumer5 = Consumer(dsn = f"ws://invalid_user:invalid_pass@localhost:6041?group.id=1001&client.id=1001&bearer_token={token}")
consumer5.subscribe(["topic_1772607422"])
consumer5.unsubscribe()

consumer6 = Consumer(dsn = f"ws://invalid_user:invalid_pass@localhost:6041?group.id=1001&client.id=1001&td.connect.bearer_token={token}&bearer_token=invalid_token")
consumer6.subscribe(["topic_1772607422"])
consumer6.unsubscribe()
finally:
time.sleep(3)
conn.execute("drop token if exists token_1772607422")
conn.execute("drop topic if exists topic_1772607422")
conn.execute("drop database if exists test_1772607422")
conn.close()


@pytest.mark.skipif(not utils.TEST_TD_ENTERPRISE, reason="only for TDengine Enterprise")
def test_tmq_with_invalid_token():
conn = taosws.connect("ws://localhost:6041")
try:
conn.execute("drop topic if exists topic_1772611547")
conn.execute("drop database if exists test_1772611547")
conn.execute("create database test_1772611547")
conn.execute("create topic topic_1772611547 as database test_1772611547")

consumer = Consumer(conf={
"td.connect.websocket.scheme": "ws",
"td.connect.ip": "localhost",
"td.connect.port": 6041,
"td.connect.user": "invalid_user",
"td.connect.pass": "invalid_pass",
"td.connect.bearer_token": "invalid_token",
"group.id": "1001",
"client.id": "1001",
})
with pytest.raises(Exception, match=r"init tscObj with token failed"):
consumer.subscribe(["topic_1772611547"])

consumer2 = Consumer(conf={
"td.connect.websocket.scheme": "ws",
"td.connect.ip": "localhost",
"td.connect.port": 6041,
"td.connect.user": "invalid_user",
"td.connect.pass": "invalid_pass",
"bearer_token": "invalid_token",
"group.id": "1001",
"client.id": "1001",
})
with pytest.raises(Exception, match=r"init tscObj with token failed"):
consumer2.subscribe(["topic_1772611547"])

consumer3 = Consumer(dsn = f"ws://invalid_user:invalid_pass@localhost:6041?group.id=1001&client.id=1001&td.connect.bearer_token=invalid_token")
with pytest.raises(Exception, match=r"init tscObj with token failed"):
consumer3.subscribe(["topic_1772611547"])

consumer4 = Consumer(dsn = f"ws://invalid_user:invalid_pass@localhost:6041?group.id=1001&client.id=1001&bearer_token=invalid_token")
with pytest.raises(Exception, match=r"init tscObj with token failed"):
consumer4.subscribe(["topic_1772611547"])
finally:
time.sleep(3)
conn.execute("drop topic if exists topic_1772611547")
conn.execute("drop database if exists test_1772611547")
conn.close()


def show_env():
import os

Expand Down
2 changes: 1 addition & 1 deletion taos/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "2.8.6"
__version__ = '2.8.9'
11 changes: 6 additions & 5 deletions taos/cinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -1955,13 +1955,14 @@ def tmq_conf_set_auto_commit_cb(conf, cb, param):
_UNSUPPORTED["tmq_consumer_new"] = err


def tmq_consumer_new(conf, errstrlen=0):
# type (c_void_p, c_char_p, c_int) -> c_void_p
def tmq_consumer_new(conf):
# type: (c_void_p) -> c_void_p
_check_if_supported()
buf = ctypes.create_string_buffer(errstrlen)
tmq = cast(_libtaos.tmq_consumer_new(conf, buf, errstrlen), c_void_p)
errstr = ctypes.create_string_buffer(256)
tmq = cast(_libtaos.tmq_consumer_new(conf, errstr, 255), c_void_p)
if tmq.value is None:
raise TmqError("failed on tmq_consumer_new()")
err = errstr.value.decode("utf-8", errors="replace")
raise TmqError(f"failed on tmq_consumer_new(), err: {err}")
return tmq


Expand Down
4 changes: 0 additions & 4 deletions taos/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,6 @@ def __init__(self, msg=None, errno=0xFFFF, affected_rows=0):
def __str__(self):
return self._full_msg + "(affected rows: %d)" % self.affected_rows

# @property
# def affected_rows(self):
# return self.affected_rows


class LinesError(DatabaseError):
"""taos_insert_lines errors."""
Expand Down
Loading
Loading