Skip to content

Commit 9f93e61

Browse files
authored
feat(io): support httppull state (#3804)
Signed-off-by: Song Gao <disxiaofei@163.com>
1 parent 76bf37e commit 9f93e61

File tree

5 files changed

+152
-5
lines changed

5 files changed

+152
-5
lines changed

docs/en_US/guide/sources/builtin/http_pull.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ default:
3838
headers:
3939
Accept: application/json
4040
# how to check the response status, by status code or by body
41+
states:
42+
# state_key: state_value
4143
responseType: code
4244
# # Get token
4345
# oAuth:
@@ -82,6 +84,7 @@ Use can specify the global HTTP pull settings here. The configuration items spec
8284
- `body`: The body of request, such as `'{"data": "data", "method": 1}'`
8385
- `bodyType`: Body type, it could be none|text|json|html|xml|javascript|format.
8486
- `headers`: The HTTP request headers that you want to send along with the HTTP request.
87+
- `states`: The status of httppull, can update the url by rendering.
8588
- `responseType`: Define how to parse the HTTP response. There are two types defined:
8689
- `code`: To check the response status from the HTTP status code.
8790
- `body`: To check the response status from the response body. The body must be "application/json" content type and contains a "code" field.
@@ -131,6 +134,12 @@ The following configurations are designed under the assumption that the authenti
131134

132135
`incremental`: If it's set to `true`, then will compare with the last result; If the responses of two requests are the same, then will skip sending out the result.
133136

137+
#### State Update
138+
139+
State updates are dynamically updated at runtime. When creating the http_pull source, you can specify the initial state. The state can currently be rendered in the HTTP request URL. The format for these properties is based on the [data template](../../sinks/data_template.md) syntax.
140+
141+
State can also be updated based on the results of the http_pull result. When QOS is set to 1, the state will be periodically flushed to disk and loaded after the next boot.
142+
134143
#### Dynamic Properties
135144

136145
Dynamic properties adapt in real time and can be employed to customize the HTTP request's URL, body, and header. The format for these properties is based on the [data template](../../sinks/data_template.md) syntax.

docs/zh_CN/guide/sources/builtin/http_pull.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ default:
3838
headers:
3939
Accept: application/json
4040
# 如何检查响应状态,支持通过状态码或 body
41+
states:
42+
# state_key: state_value
4143
responseType: code
4244
# 获取 token
4345
# oAuth:
@@ -82,6 +84,7 @@ application_conf: #Conf_key
8284
- `body`:请求的正文,例如 `{"data": "data", "method": 1}`
8385
- `bodyType`:正文类型,可选值 none、text、json、html、xml、javascript、format.
8486
- `headers`:需要与 HTTP 请求一起发送的 HTTP 请求标头。
87+
- `states`: httppull 的参数状态,可以通过渲染的方式更新 url。
8588
- `responseType`:定义如何解析 HTTP 响应。目前支持两种方式:
8689
- `code`:通过 HTTP 响应码判断响应状态。
8790
- `body`:通过 HTTP 响应正文判断响应状态。要求响应正文为 JSON 格式且其中包含 code 字段。
@@ -130,6 +133,12 @@ OAuth 2.0 是一个授权协议,让 API 客户端有限度地访问网络服
130133

131134
`incremental`:如设置为 `true`,则将与上次的结果进行比较;如果两次请求的响应相同,则将跳过发送结果。
132135

136+
#### 状态更新
137+
138+
状态更新是指在运行时会动态更新的状态,在创建 http_pull 时可以指定初始状态。状态目前可以渲染在 HTTP 请求的 URL 中,其语法基于[数据模板](../../sinks/data_template.md)格式的动态属性。
139+
140+
并且状态可以根据 http_pull 的结果进行更新。当开启了 QOS 为 1 后,状态将会被定期落盘,并在下一次启动后加载。
141+
133142
#### 动态属性
134143

135144
动态属性是指在运行时会动态更新的属性。您可以使用动态属性来指定 HTTP 请求的 URL、正文和标头。其语法基于[数据模板](../../sinks/data_template.md)格式的动态属性。

internal/io/http/httppull_source.go

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,28 @@ import (
2626
type HttpPullSource struct {
2727
*ClientConf
2828
lastMD5 string
29+
psc *pullSourceConfig
30+
}
31+
32+
func (hps *HttpPullSource) GetOffset() (any, error) {
33+
return hps.psc.States, nil
34+
}
35+
36+
func (hps *HttpPullSource) Rewind(offset any) error {
37+
m, ok := offset.(map[string]interface{})
38+
if ok {
39+
for k, v := range m {
40+
hps.psc.States[k] = v
41+
}
42+
}
43+
return nil
44+
}
45+
46+
func (hps *HttpPullSource) ResetOffset(input map[string]any) error {
47+
for k, v := range input {
48+
hps.psc.States[k] = v
49+
}
50+
return nil
2951
}
3052

3153
func (hps *HttpPullSource) Pull(ctx api.StreamContext, trigger time.Time, ingest api.TupleIngest, ingestError api.ErrorIngest) {
@@ -47,30 +69,32 @@ func (hps *HttpPullSource) Connect(ctx api.StreamContext, sch api.StatusChangeHa
4769
}
4870

4971
type pullSourceConfig struct {
50-
Path string `json:"datasource"`
72+
Path string `json:"datasource"`
73+
States map[string]any `json:"states"`
5174
}
5275

5376
func (hps *HttpPullSource) Provision(ctx api.StreamContext, configs map[string]any) error {
54-
pc := &pullSourceConfig{}
77+
pc := &pullSourceConfig{States: map[string]any{}}
5578
if err := cast.MapToStruct(configs, pc); err != nil {
5679
return err
5780
}
5881
if hps.ClientConf == nil {
5982
hps.ClientConf = &ClientConf{}
6083
}
84+
hps.psc = pc
6185
return hps.InitConf(ctx, pc.Path, configs)
6286
}
6387

6488
func (hps *HttpPullSource) doPull(ctx api.StreamContext) ([]map[string]any, error) {
65-
result, latestMD5, err := doPull(ctx, hps.ClientConf, hps.lastMD5)
89+
result, latestMD5, err := hps.doPullInternal(ctx, hps.ClientConf, hps.lastMD5)
6690
if err != nil {
6791
return nil, err
6892
}
6993
hps.lastMD5 = latestMD5
7094
return result, nil
7195
}
7296

73-
func doPull(ctx api.StreamContext, c *ClientConf, lastMD5 string) ([]map[string]any, string, error) {
97+
func (hps *HttpPullSource) doPullInternal(ctx api.StreamContext, c *ClientConf, lastMD5 string) ([]map[string]any, string, error) {
7498
headers, err := c.parseHeaders(ctx, c.tokens)
7599
if err != nil {
76100
return nil, "", err
@@ -79,7 +103,14 @@ func doPull(ctx api.StreamContext, c *ClientConf, lastMD5 string) ([]map[string]
79103
if err != nil {
80104
return nil, "", err
81105
}
82-
resp, err := httpx.Send(ctx.GetLogger(), c.client, c.config.BodyType, c.config.Method, c.config.Url, headers, []byte(newBody))
106+
newUrl := c.config.Url
107+
if len(hps.psc.States) > 0 {
108+
newUrl, err = ctx.ParseTemplate(c.config.Url, hps.psc.States)
109+
if err != nil {
110+
return nil, "", err
111+
}
112+
}
113+
resp, err := httpx.Send(ctx.GetLogger(), c.client, c.config.BodyType, c.config.Method, newUrl, headers, []byte(newBody))
83114
if err != nil {
84115
return nil, "", err
85116
}
@@ -88,9 +119,21 @@ func doPull(ctx api.StreamContext, c *ClientConf, lastMD5 string) ([]map[string]
88119
if err != nil {
89120
return nil, "", err
90121
}
122+
hps.updateState(results)
91123
return results, newMD5, nil
92124
}
93125

126+
func (hps *HttpPullSource) updateState(results []map[string]interface{}) {
127+
for _, r := range results {
128+
for k, v := range r {
129+
_, ok := hps.psc.States[k]
130+
if ok {
131+
hps.psc.States[k] = v
132+
}
133+
}
134+
}
135+
}
136+
94137
func GetSource() api.Source {
95138
return &HttpPullSource{}
96139
}

internal/io/http/httppull_source_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"encoding/json"
1919
"net/http"
2020
"net/http/httptest"
21+
"strconv"
2122
"testing"
2223
"time"
2324

@@ -90,6 +91,14 @@ func handleRefresh(w http.ResponseWriter, r *http.Request) {
9091
json.NewEncoder(w).Encode(resp)
9192
}
9293

94+
func handleParam(w http.ResponseWriter, r *http.Request) {
95+
v, _ := strconv.ParseInt(r.URL.Query().Get("a"), 10, 64)
96+
w.Header().Set("Content-Type", "application/json")
97+
w.WriteHeader(http.StatusOK)
98+
v = v + 1
99+
json.NewEncoder(w).Encode(map[string]interface{}{"a": v})
100+
}
101+
93102
func createServer() *httptest.Server {
94103
router := http.NewServeMux()
95104
router.HandleFunc("/get", handleGet)
@@ -98,10 +107,65 @@ func createServer() *httptest.Server {
98107
router.HandleFunc("/codeErr", handleCodeErr)
99108
router.HandleFunc("/auth", handleAuth)
100109
router.HandleFunc("/refresh", handleRefresh)
110+
router.HandleFunc("/param", handleParam)
101111
server := httptest.NewServer(router)
102112
return server
103113
}
104114

115+
func TestHttpPullStateRewind(t *testing.T) {
116+
ctx := mockContext.NewMockContext("1", "2")
117+
require.NotNil(t, GetSource())
118+
source := &HttpPullSource{}
119+
require.NoError(t, source.Provision(ctx, map[string]any{
120+
"url": "http://www.mock.com",
121+
"datasource": "/dd",
122+
"method": "get",
123+
"states": map[string]interface{}{"a": 1},
124+
}))
125+
v, err := source.GetOffset()
126+
require.NoError(t, err)
127+
require.NotNil(t, v)
128+
require.NoError(t, source.Rewind(map[string]any{"a": 1}))
129+
require.NoError(t, source.ResetOffset(map[string]any{"a": 1}))
130+
}
131+
132+
func TestHttpPullStateSource(t *testing.T) {
133+
server := createServer()
134+
defer func() {
135+
server.Close()
136+
}()
137+
ctx := mockContext.NewMockContext("1", "2")
138+
source := &HttpPullSource{}
139+
require.NoError(t, source.Provision(ctx, map[string]any{
140+
"url": server.URL,
141+
"datasource": "/param?a={{.a}}",
142+
"method": "get",
143+
"states": map[string]interface{}{"a": 1},
144+
}))
145+
require.NoError(t, source.Connect(ctx, func(status string, message string) {
146+
// do nothing
147+
}))
148+
dataCh := make(chan any, 1)
149+
source.Pull(ctx, time.Now(), func(ctx api.StreamContext, data any, meta map[string]any, ts time.Time) {
150+
dataCh <- data
151+
}, func(ctx api.StreamContext, err error) {})
152+
require.Equal(t, []map[string]interface{}{
153+
{
154+
"a": float64(2),
155+
},
156+
}, <-dataCh)
157+
source.Pull(ctx, time.Now(), func(ctx api.StreamContext, data any, meta map[string]any, ts time.Time) {
158+
dataCh <- data
159+
}, func(ctx api.StreamContext, err error) {})
160+
require.Equal(t, []map[string]interface{}{
161+
{
162+
"a": float64(3),
163+
},
164+
}, <-dataCh)
165+
source.Close(ctx)
166+
close(dataCh)
167+
}
168+
105169
func TestHttpPullSource(t *testing.T) {
106170
server := createServer()
107171
defer func() {

internal/io/http/lookup_source.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package http
1717
import (
1818
"github.com/lf-edge/ekuiper/contract/v2/api"
1919

20+
"github.com/lf-edge/ekuiper/v2/internal/pkg/httpx"
2021
"github.com/lf-edge/ekuiper/v2/pkg/cast"
2122
)
2223

@@ -96,3 +97,24 @@ func GetLookUpSource() api.Source {
9697
}
9798

9899
var _ api.LookupSource = &HttpLookupSource{}
100+
101+
func doPull(ctx api.StreamContext, c *ClientConf, lastMD5 string) ([]map[string]any, string, error) {
102+
headers, err := c.parseHeaders(ctx, c.tokens)
103+
if err != nil {
104+
return nil, "", err
105+
}
106+
newBody, err := ctx.ParseTemplate(c.config.Body, c.tokens)
107+
if err != nil {
108+
return nil, "", err
109+
}
110+
resp, err := httpx.Send(ctx.GetLogger(), c.client, c.config.BodyType, c.config.Method, c.config.Url, headers, []byte(newBody))
111+
if err != nil {
112+
return nil, "", err
113+
}
114+
defer resp.Body.Close()
115+
results, newMD5, err := c.parseResponse(ctx, resp, lastMD5, true, false)
116+
if err != nil {
117+
return nil, "", err
118+
}
119+
return results, newMD5, nil
120+
}

0 commit comments

Comments
 (0)