Skip to content

Commit e2b3d4b

Browse files
panawalaweilong.pwl
andauthored
add spl consume processor (#348)
Co-authored-by: weilong.pwl <[email protected]>
1 parent af158f8 commit e2b3d4b

File tree

7 files changed

+37
-6
lines changed

7 files changed

+37
-6
lines changed

consumer/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ LogHubConfig是提供给用户的配置类,用于配置消费策略,您可
6666
|AutoCommitDisabled|是否禁用sdk自动提交checkpoint|非必填,默认不会禁用|
6767
|AutoCommitIntervalInMS|自动提交checkpoint的时间间隔|非必填,单位为MS,默认时间为60s|
6868
|Query|过滤规则 基于规则消费时必须设置对应规则 如 *| where a = 'xxx'|非必填|
69+
|Processor|消费处理器 基于规则消费时设置对应的消费处理标识 如 consume-processor-1, 推荐使用Processor代替Query|非必填|
6970

7071

7172
**自定义 logger**

consumer/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ type LogHubConfig struct {
1616
//:param Project:
1717
//:param Logstore:
1818
//:param Query: Filter rules Corresponding rules must be set when consuming based on rules, such as *| where a = 'xxx'
19+
//:param Processor: name of consume processor, default is empty
1920
//:param ConsumerGroupName:
2021
//:param ConsumerName:
2122
//:param CursorPosition: This options is used for initialization, will be ignored once consumer group is created and each shard has beeen started to be consumed.
@@ -60,6 +61,7 @@ type LogHubConfig struct {
6061
Project string
6162
Logstore string
6263
Query string
64+
Processor string
6365
ConsumerGroupName string
6466
ConsumerName string
6567
CursorPosition string

consumer/consumer_client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.Lo
148148
Query: consumer.option.Query,
149149
LogGroupMaxCount: consumer.option.MaxFetchLogGroupCount,
150150
CompressType: consumer.option.CompressType,
151+
Processor: consumer.option.Processor,
151152
}
152153
for retry := 0; retry < 3; retry++ {
153154
gl, plm, err = consumer.client.PullLogsWithQuery(plr)

consumer/shard_worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ func (c *ShardConsumerWorker) sleepUtilNextFetch(lastFetchSuccessTime time.Time,
197197

198198
lastFetchRawSize := plm.RawSize
199199
lastFetchGroupCount := plm.Count
200-
if c.client.option.Query != "" {
200+
if c.client.option.Query != "" || c.client.option.Processor != "" {
201201
lastFetchRawSize = plm.RawSizeBeforeQuery
202202
lastFetchGroupCount = plm.DataCountBeforeQuery
203203
}

consumer/worker_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,28 @@ func TestConsumerQueryNoData(t *testing.T) {
8383

8484
}
8585

86+
func TestConsumerProcessor(t *testing.T) {
87+
option := LogHubConfig{
88+
Endpoint: os.Getenv("LOG_TEST_ENDPOINT"),
89+
CredentialsProvider: sls.NewStaticCredentialsProvider(
90+
os.Getenv("LOG_TEST_ACCESS_KEY_ID"),
91+
os.Getenv("LOG_TEST_ACCESS_KEY_SECRET"), ""),
92+
Project: os.Getenv("LOG_TEST_PROJECT"),
93+
Logstore: os.Getenv("LOG_TEST_LOGSTORE"),
94+
ConsumerGroupName: "test-consumer",
95+
ConsumerName: "test-consumer-1",
96+
CursorPosition: END_CURSOR,
97+
Processor: "consume-processor-1753854227-0001",
98+
}
99+
100+
worker := InitConsumerWorkerWithCheckpointTracker(option, process)
101+
102+
worker.Start()
103+
time.Sleep(time.Second * 2000)
104+
worker.StopAndWait()
105+
106+
}
107+
86108
func TestConsumerWithLogId(t *testing.T) {
87109
option := LogHubConfig{
88110
Endpoint: os.Getenv("LOG_TEST_ENDPOINT"),

example/consumer/query_demo/simple_demo_with_query.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ func main() {
2828
CursorPosition: consumerLibrary.SPECIAL_TIMER_CURSOR,
2929
CursorStartTime: 1706077849,
3030
// Query is for log pre-handling before return to client, more info refer to https://www.alibabacloud.com/help/zh/sls/user-guide/rule-based-consumption
31-
Query: "* | where cast(body_bytes_sent as bigint) > 14000",
31+
// Query: "* | where cast(body_bytes_sent as bigint) > 14000",
32+
33+
// Processor is for rule based consumption, more info refer to https://www.alibabacloud.com/help/zh/sls/user-guide/rule-based-consumption
34+
// prefer to use processor instead of query, query will be deprecated in the future
35+
Processor: "consume-processor-1753854227-0001",
3236
}
3337

3438
consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process)

model.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ type PullLogRequest struct {
7373
Query string
7474
// Deprecated: PullMode is not used
7575
PullMode string
76-
QueryId string
76+
Processor string
7777
CompressType int
7878
}
7979

@@ -88,10 +88,11 @@ func (plr *PullLogRequest) ToURLParams() url.Values {
8888
if plr.Query != "" {
8989
urlVal.Add("query", plr.Query)
9090
urlVal.Add("pullMode", "scan_on_stream")
91-
if plr.QueryId != "" {
92-
urlVal.Add("queryId", plr.QueryId)
93-
}
9491
}
92+
if plr.Processor != "" {
93+
urlVal.Add("processor", plr.Processor)
94+
}
95+
9596
return urlVal
9697
}
9798

0 commit comments

Comments
 (0)