Skip to content

Commit 85d026f

Browse files
authored
feat(stream): support temp stream (#3940)
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
1 parent 07b3642 commit 85d026f

File tree

21 files changed

+916
-39
lines changed

21 files changed

+916
-39
lines changed

docs/en_US/guide/streams/overview.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ Below is the list of data types supported.
5858
| TIMESTAMP | true | The field to represent the event's timestamp. If specified, the rule will run with event time. Otherwise, it will run with processing time. Please refer to [timestamp management](../../sqls/windows.md#timestamp-management) for details. |
5959
| TIMESTAMP_FORMAT | true | The default format to be used when converting string to or from datetime type. |
6060
| VERSION | true | Version of the stream, check [versioning](#versioning)|
61+
| TEMP | true | Whether the stream is temporary. Temporary streams are stored in memory only and will be lost when eKuiper restarts. Default is false. Check [Temporary Streams](#temporary-streams) for more details. |
6162

6263
**Example 1,**
6364

@@ -202,6 +203,28 @@ demoBin (
202203

203204
If "BINARY" format stream is defined as schemaless, a default field named `self` will be assigned for the binary payload.
204205

206+
## Temporary Streams
207+
208+
Temporary streams are in-memory streams that are not persisted to disk. They are useful for intermediate data processing
209+
or testing scenarios where persistence is not required. Temporary streams have the following characteristics:
210+
211+
- **In-memory storage**: Stream definitions are stored in memory only and will be lost when eKuiper restarts.
212+
- **Cannot be replaced**: Once created, temporary streams cannot be replaced using `REPLACE STREAM` statement.
213+
- **Usage restriction**: Temporary streams can only be used by temporary rules. Non-temporary rules cannot reference
214+
temporary streams.
215+
216+
### Creating a Temporary Stream
217+
218+
To create a temporary stream, set the `TEMP` option to `true`:
219+
220+
```sql
221+
CREATE
222+
STREAM temp_sensor (
223+
temperature FLOAT,
224+
humidity FLOAT
225+
) WITH (DATASOURCE="sensor/data", FORMAT="json", TEMP="true");
226+
```
227+
205228
## Versioning
206229

207230
The stream can have an optional **version** field to control updates. When you update a stream, the system compares the

docs/zh_CN/guide/streams/overview.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ CREATE STREAM
5858
| TIMESTAMP || 代表该事件时间戳的字段名。如果有设置,则使用此流的规则将采用事件时间;否则将采用处理时间。详情请看[时间戳管理](../../sqls/windows.md#时间戳管理)|
5959
| TIMESTAMP_FORMAT || 字符串和时间格式转换时使用的默认格式。 |
6060
| VERSION || 流的版本号,详情情况[版本控制](#版本控制)|
61+
| TEMP || 流是否为临时流。临时流仅存储在内存中,eKuiper 重启后会丢失。默认为 false。详情请参阅[临时流](#临时流)|
6162

6263
**示例1**
6364

@@ -196,6 +197,26 @@ demoBin (
196197

197198
如果 "BINARY" 格式流定义为 schemaless,数据将会解析到默认的名为 `self` 的字段。
198199

200+
## 临时流
201+
202+
临时流是仅存储在内存中的流,不会持久化到磁盘。它们适用于中间数据处理或测试场景,在这些场景中不需要持久化。临时流具有以下特性:
203+
204+
- **内存存储**: 流定义仅存储在内存中,eKuiper 重启后会丢失。
205+
- **不可替换**: 一旦创建,临时流不能使用 `REPLACE STREAM` 语句进行替换。
206+
- **使用限制**: 临时流只能被临时规则使用。非临时规则不能引用临时流。
207+
208+
### 创建临时流
209+
210+
要创建临时流,请将 `TEMP` 选项设置为 `true`:
211+
212+
```sql
213+
CREATE
214+
STREAM temp_sensor (
215+
temperature FLOAT,
216+
humidity FLOAT
217+
) WITH (DATASOURCE="sensor/data", FORMAT="json", TEMP="true");
218+
```
219+
199220
## 版本控制
200221

201222
Stream 可以包含一个可选的 **version** 字段,用于控制更新逻辑。当您更新一个流时,系统会将新的版本字符串与现有版本进行比较。只有当新版本在

fvt/rule_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,14 +119,15 @@ func (s *RuleTestSuite) TestRuleDisableBufferFullDiscard() {
119119
resp, err := client.CreateConf("sources/simulator/confKeys/sim1", conf)
120120
s.Require().NoError(err)
121121
s.Require().Equal(http.StatusOK, resp.StatusCode)
122-
streamSql := `{"sql": "create stream sim1() WITH (TYPE=\"simulator\", CONF_KEY=\"sim1\")"}`
122+
streamSql := `{"sql": "create stream sim1() WITH (TYPE=\"simulator\", CONF_KEY=\"sim1\", TEMP=\"true\")"}`
123123
resp, err = client.CreateStream(streamSql)
124124
s.Require().NoError(err)
125125
s.T().Log(GetResponseText(resp))
126126
s.Require().Equal(http.StatusCreated, resp.StatusCode)
127127
ruleSql := `{
128128
"id": "ruleSim1",
129129
"sql": "SELECT * FROM sim1",
130+
"temp":true,
130131
"actions": [
131132
{
132133
"memory":{
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package memory
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
"sync"
7+
8+
"github.com/lf-edge/ekuiper/v2/pkg/kv"
9+
)
10+
11+
type memoryKvStore struct {
12+
data map[string]string
13+
mu sync.RWMutex
14+
}
15+
16+
func NewMemoryKV() kv.KeyValue {
17+
return &memoryKvStore{
18+
data: make(map[string]string),
19+
}
20+
}
21+
22+
func (m *memoryKvStore) Open() error {
23+
return nil
24+
}
25+
26+
func (m *memoryKvStore) Close() error {
27+
return nil
28+
}
29+
30+
func (m *memoryKvStore) Set(key string, value interface{}) error {
31+
m.mu.Lock()
32+
defer m.mu.Unlock()
33+
if v, ok := value.(string); ok {
34+
m.data[key] = v
35+
return nil
36+
}
37+
return fmt.Errorf("value must be string")
38+
}
39+
40+
func (m *memoryKvStore) Setnx(key string, value interface{}) error {
41+
m.mu.Lock()
42+
defer m.mu.Unlock()
43+
if _, ok := m.data[key]; ok {
44+
return fmt.Errorf("key %s already exists", key)
45+
}
46+
if v, ok := value.(string); ok {
47+
m.data[key] = v
48+
return nil
49+
}
50+
return fmt.Errorf("value must be string")
51+
}
52+
53+
func (m *memoryKvStore) Get(key string, value interface{}) (bool, error) {
54+
m.mu.RLock()
55+
defer m.mu.RUnlock()
56+
if v, ok := m.data[key]; ok {
57+
if ptr, ok := value.(*string); ok && ptr != nil {
58+
*ptr = v
59+
}
60+
return true, nil
61+
}
62+
return false, nil
63+
}
64+
65+
func (m *memoryKvStore) Delete(key string) error {
66+
m.mu.Lock()
67+
defer m.mu.Unlock()
68+
delete(m.data, key)
69+
return nil
70+
}
71+
72+
func (m *memoryKvStore) Keys() ([]string, error) {
73+
m.mu.RLock()
74+
defer m.mu.RUnlock()
75+
keys := make([]string, 0, len(m.data))
76+
for k := range m.data {
77+
keys = append(keys, k)
78+
}
79+
return keys, nil
80+
}
81+
82+
func (m *memoryKvStore) All() (map[string]string, error) {
83+
m.mu.RLock()
84+
defer m.mu.RUnlock()
85+
// Return a copy to avoid race conditions if the caller modifies the map (though signature returns map[string]string, usually it's better to copy)
86+
// But matching the interface, let's just return a copy.
87+
result := make(map[string]string, len(m.data))
88+
for k, v := range m.data {
89+
result[k] = v
90+
}
91+
return result, nil
92+
}
93+
94+
func (m *memoryKvStore) Clean() error {
95+
m.mu.Lock()
96+
defer m.mu.Unlock()
97+
m.data = make(map[string]string)
98+
return nil
99+
}
100+
101+
func (m *memoryKvStore) Drop() error {
102+
return m.Clean()
103+
}
104+
105+
func (m *memoryKvStore) SetKeyedState(key string, value interface{}) error {
106+
return m.Set(key, value)
107+
}
108+
109+
func (m *memoryKvStore) GetKeyedState(key string) (interface{}, error) {
110+
m.mu.RLock()
111+
defer m.mu.RUnlock()
112+
if v, ok := m.data[key]; ok {
113+
return v, nil
114+
}
115+
return nil, nil
116+
}
117+
118+
func (m *memoryKvStore) GetByPrefix(prefix string) (map[string][]byte, error) {
119+
m.mu.RLock()
120+
defer m.mu.RUnlock()
121+
result := make(map[string][]byte)
122+
for k, v := range m.data {
123+
if strings.HasPrefix(k, prefix) {
124+
result[k] = []byte(v)
125+
}
126+
}
127+
return result, nil
128+
}

0 commit comments

Comments
 (0)