Skip to content

Commit 3453eac

Browse files
authored
fix(stream): allow deleting corrupted streams (#3979)
## Summary Fix bug where DELETE stream/table fails if content cannot be unmarshalled. This occurs when migrating from v1.x where streams/tables use a different storage format. The fix changes `DropStream` to check key existence without parsing content, enabling deletion of corrupted entries. ## Problem When upgrading from eKuiper 1.x to 2.x, the SQLite KV database format changed: - v1.x: Plain SQL statement stored directly - v2.x: JSON wrapper with `streamType`, `streamKind`, `statement` fields The old `DropStream` implementation called `GetStream()` which unmarshalled the content before deleting. This prevented users from deleting corrupted/incompatible streams via the REST API. ## Solution Changed `DropStream` to check key existence using `db.Get()` without unmarshalling the JSON content. This allows deletion regardless of content format. ## Testing - All existing stream processor tests pass - Verified no breaking changes to error messages Closes #3894 --------- Signed-off-by: Jiyong Huang <huangjy@emqx.io>
1 parent b2953b4 commit 3453eac

File tree

4 files changed

+212
-10
lines changed

4 files changed

+212
-10
lines changed

docs/en_US/operation/migration.md

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
# Migrating from eKuiper 1.x to 2.x
2+
3+
This guide covers important breaking changes and migration steps when upgrading from eKuiper 1.x to 2.x.
4+
5+
## Breaking Changes
6+
7+
### SQLite Database Format
8+
9+
eKuiper 2.x uses a different storage format for streams and tables in the SQLite database (`sqliteKV.db`). This means:
10+
11+
- **Streams and tables created in 1.x cannot be read by 2.x**
12+
- Attempting to describe or use old streams results in: `error unmarshall <name>, the data in db may be corrupted`
13+
14+
#### Format Changes
15+
16+
| Resource | eKuiper 1.x | eKuiper 2.x |
17+
|----------|-------------|-------------|
18+
| Streams | Plain SQL statement | JSON with `streamType`, `streamKind`, `statement` |
19+
| Tables | Plain SQL statement | JSON with `streamType`, `streamKind`, `statement` |
20+
| Rules | JSON with `triggered` field | JSON without `triggered` field |
21+
22+
## Migration Options
23+
24+
### Option 1: Clean Installation (Recommended)
25+
26+
Start fresh by removing the old database:
27+
28+
```bash
29+
# Stop eKuiper
30+
docker stop ekuiper
31+
32+
# Remove old database
33+
rm -rf /kuiper/data/sqliteKV.db
34+
35+
# Start eKuiper 2.x (creates new database)
36+
docker start ekuiper
37+
38+
# Re-create all streams and rules via REST API or CLI
39+
```
40+
41+
### Option 2: Separate Database File
42+
43+
Keep the old database for rollback capability by using a different filename:
44+
45+
Edit `etc/kuiper.yaml` before upgrading:
46+
47+
```yaml
48+
store:
49+
sqlite:
50+
name: sqliteKV-v2.db
51+
```
52+
53+
### Option 3: Delete Corrupted Entries
54+
55+
If you've already upgraded and have corrupted entries, delete them via REST API:
56+
57+
```bash
58+
# Delete corrupted stream
59+
curl -X DELETE http://localhost:9081/streams/<stream_name>
60+
61+
# Delete corrupted table
62+
curl -X DELETE http://localhost:9081/tables/<table_name>
63+
64+
# Then recreate
65+
curl -X POST http://localhost:9081/streams \
66+
-d '{"sql": "CREATE STREAM my_stream () WITH (DATASOURCE=\"topic\", FORMAT=\"JSON\", TYPE=\"mqtt\")"}'
67+
```
68+
69+
### Option 4: Direct Database Manipulation
70+
71+
For bulk cleanup, use SQLite directly:
72+
73+
```bash
74+
# List all streams
75+
sqlite3 /kuiper/data/sqliteKV.db "SELECT key FROM stream;"
76+
77+
# Delete specific stream
78+
sqlite3 /kuiper/data/sqliteKV.db "DELETE FROM stream WHERE key = 'my_stream';"
79+
80+
# Restart eKuiper
81+
docker restart ekuiper
82+
```
83+
84+
## Additional Notes
85+
86+
- Fresh installations of eKuiper 2.x are not affected
87+
- Always backup your database before upgrading
88+
- Consider exporting your rule definitions using the REST API before upgrading:
89+
90+
```bash
91+
curl http://localhost:9081/data/export > backup.json
92+
```
93+
94+
## See Also
95+
96+
- [Installation Guide](../installation.md)
97+
- [REST API Reference](../api/restapi.md)

docs/zh_CN/operation/migration.md

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
# 从 eKuiper 1.x 迁移到 2.x
2+
3+
本指南介绍从 eKuiper 1.x 升级到 2.x 时的重要破坏性变更和迁移步骤。
4+
5+
## 破坏性变更
6+
7+
### SQLite 数据库格式
8+
9+
eKuiper 2.x 在 SQLite 数据库(`sqliteKV.db`)中使用了不同的存储格式。这意味着:
10+
11+
- **1.x 创建的流和表无法被 2.x 读取**
12+
- 尝试描述或使用旧的流会导致错误:`error unmarshall <name>, the data in db may be corrupted`
13+
14+
#### 格式变更
15+
16+
| 资源 | eKuiper 1.x | eKuiper 2.x |
17+
|------|-------------|-------------|
18+
|| 纯 SQL 语句 | 包含 `streamType`, `streamKind`, `statement` 的 JSON |
19+
|| 纯 SQL 语句 | 包含 `streamType`, `streamKind`, `statement` 的 JSON |
20+
| 规则 | 包含 `triggered` 字段的 JSON | 不包含 `triggered` 字段的 JSON |
21+
22+
## 迁移选项
23+
24+
### 选项 1:全新安装(推荐)
25+
26+
删除旧数据库,重新开始:
27+
28+
```bash
29+
# 停止 eKuiper
30+
docker stop ekuiper
31+
32+
# 删除旧数据库
33+
rm -rf /kuiper/data/sqliteKV.db
34+
35+
# 启动 eKuiper 2.x(创建新数据库)
36+
docker start ekuiper
37+
38+
# 通过 REST API 或 CLI 重新创建所有流和规则
39+
```
40+
41+
### 选项 2:使用独立数据库文件
42+
43+
使用不同的文件名以保留旧数据库用于回滚:
44+
45+
在升级前编辑 `etc/kuiper.yaml`
46+
47+
```yaml
48+
store:
49+
sqlite:
50+
name: sqliteKV-v2.db
51+
```
52+
53+
### 选项 3:删除损坏的条目
54+
55+
如果您已经升级并且有损坏的条目,可以通过 REST API 删除:
56+
57+
```bash
58+
# 删除损坏的流
59+
curl -X DELETE http://localhost:9081/streams/<stream_name>
60+
61+
# 删除损坏的表
62+
curl -X DELETE http://localhost:9081/tables/<table_name>
63+
64+
# 然后重新创建
65+
curl -X POST http://localhost:9081/streams \
66+
-d '{"sql": "CREATE STREAM my_stream () WITH (DATASOURCE=\"topic\", FORMAT=\"JSON\", TYPE=\"mqtt\")"}'
67+
```
68+
69+
### 选项 4:直接操作数据库
70+
71+
对于批量清理,可以直接使用 SQLite:
72+
73+
```bash
74+
# 列出所有流
75+
sqlite3 /kuiper/data/sqliteKV.db "SELECT key FROM stream;"
76+
77+
# 删除指定流
78+
sqlite3 /kuiper/data/sqliteKV.db "DELETE FROM stream WHERE key = 'my_stream';"
79+
80+
# 重启 eKuiper
81+
docker restart ekuiper
82+
```
83+
84+
## 其他说明
85+
86+
- 全新安装的 eKuiper 2.x 不受影响
87+
- 升级前请务必备份数据库
88+
- 建议在升级前使用 REST API 导出规则定义:
89+
90+
```bash
91+
curl http://localhost:9081/data/export > backup.json
92+
```
93+
94+
## 另请参阅
95+
96+
- [安装指南](../installation.md)
97+
- [REST API 参考](../api/restapi.md)

internal/processor/stream.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -619,18 +619,26 @@ func (p *StreamProcessor) DropStream(name string, st ast.StreamType) (r string,
619619
return "", err
620620
}
621621
}
622-
_, err = p.GetStream(name, st)
623-
if err != nil {
624-
return "", err
625-
}
626-
627-
err = p.db.Delete(name)
628-
if err != nil {
629-
// try delete from temp db
622+
// Check if the key exists without unmarshalling content
623+
// This allows deleting corrupted streams (e.g., from v1.x migration)
624+
var v string
625+
found, _ := p.db.Get(name, &v)
626+
if !found {
627+
found, _ = p.tempDb.Get(name, &v)
628+
if !found {
629+
return "", errorx.NewWithCode(errorx.NOT_FOUND, fmt.Sprintf("%s %s is not found", ast.StreamTypeMap[st], name))
630+
}
631+
// Delete from temp db
630632
err = p.tempDb.Delete(name)
631633
if err != nil {
632634
return "", err
633635
}
636+
} else {
637+
// Delete from main db
638+
err = p.db.Delete(name)
639+
if err != nil {
640+
return "", err
641+
}
634642
}
635643
streamSchema.RemoveStreamSchema(name)
636644
return fmt.Sprintf("%s %s is dropped.", cases.Title(language.Und).String(ast.StreamTypeMap[st]), name), nil

internal/processor/stream_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func TestStreamCreateProcessor(t *testing.T) {
101101
},
102102
{
103103
s: `DROP STREAM topic1;`,
104-
err: "Drop stream fails: topic1 is not found.",
104+
err: "Drop stream fails: stream topic1 is not found.",
105105
},
106106
{
107107
s: "DROP STREAM `stream`;",
@@ -196,7 +196,7 @@ func TestTableProcessor(t *testing.T) {
196196
},
197197
{
198198
s: `DROP TABLE topic1;`,
199-
err: "Drop table fails: topic1 is not found.",
199+
err: "Drop table fails: table topic1 is not found.",
200200
},
201201
{
202202
s: "DROP TABLE `stream`;",

0 commit comments

Comments
 (0)