Skip to content

Commit ececef2

Browse files
committed
feat(io): rest support formdata
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
1 parent ddfb03b commit ececef2

File tree

5 files changed

+179
-17
lines changed

5 files changed

+179
-17
lines changed

internal/io/http/client.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,16 @@ type RefreshTokenConf struct {
6666
}
6767

6868
type RawConf struct {
69-
Url string `json:"url"`
70-
Method string `json:"method"`
71-
Body string `json:"body"`
72-
BodyType string `json:"bodyType"`
73-
Format string `json:"format"`
74-
Headers map[string]string `json:"headers"`
75-
Timeout cast.DurationConf `json:"timeout"`
76-
Incremental bool `json:"incremental"`
69+
Url string `json:"url"`
70+
Method string `json:"method"`
71+
Body string `json:"body"`
72+
BodyType string `json:"bodyType"`
73+
Format string `json:"format"`
74+
Headers map[string]string `json:"headers"`
75+
FormData map[string]string `json:"formData"`
76+
FileFieldName string `json:"fileFieldName"`
77+
Timeout cast.DurationConf `json:"timeout"`
78+
Incremental bool `json:"incremental"`
7779

7880
OAuth map[string]map[string]interface{} `json:"oauth"`
7981
SendSingle bool `json:"sendSingle"`
@@ -92,7 +94,7 @@ type bodyResp struct {
9294
Code int `json:"code"`
9395
}
9496

95-
var bodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": "", "binary": "application/octet-stream"}
97+
var bodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": "", "binary": "application/octet-stream", "formdata": "multipart/form-data"}
9698

9799
// newTransport allows EdgeX Foundry, protected by OpenZiti to override and obtain a transport
98100
// protected by OpenZiti's zero trust connectivity. See client_edgex.go where this function is
@@ -142,13 +144,13 @@ func (cc *ClientConf) InitConf(device string, props map[string]interface{}) erro
142144
if _, ok2 := bodyTypeMap[strings.ToLower(c.BodyType)]; ok2 {
143145
c.BodyType = strings.ToLower(c.BodyType)
144146
} else {
145-
return fmt.Errorf("Not valid body type value %v.", c.BodyType)
147+
return fmt.Errorf("Invalid body type value %v.", c.BodyType)
146148
}
147149
switch c.ResponseType {
148150
case "code", "body":
149151
// correct
150152
default:
151-
return fmt.Errorf("Not valid response type value %v.", c.ResponseType)
153+
return fmt.Errorf("Invalid response type value %v.", c.ResponseType)
152154
}
153155
err := httpx.IsHttpUrl(c.Url)
154156
if err != nil {

internal/io/http/rest_sink.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ func (r *RestSink) Collect(ctx api.StreamContext, item api.RawTuple) error {
6565
headers := r.config.Headers
6666
bodyType := r.config.BodyType
6767
method := r.config.Method
68+
formData := r.config.FormData
6869
u := r.config.Url
6970

7071
if dp, ok := item.(api.HasDynamicProps); ok {
@@ -93,6 +94,14 @@ func (r *RestSink) Collect(ctx api.StreamContext, item api.RawTuple) error {
9394
if ok {
9495
u = nu
9596
}
97+
if bodyType == "formdata" {
98+
for k, v := range formData {
99+
nv, ok := dp.DynamicProps(v)
100+
if ok {
101+
formData[k] = nv
102+
}
103+
}
104+
}
96105
}
97106

98107
switch r.config.Compression {
@@ -108,7 +117,7 @@ func (r *RestSink) Collect(ctx api.StreamContext, item api.RawTuple) error {
108117
headers["Content-Encoding"] = "gzip"
109118
}
110119

111-
resp, err := httpx.Send(ctx.GetLogger(), r.client, bodyType, method, u, headers, item.Raw())
120+
resp, err := httpx.SendWithFormData(ctx.GetLogger(), r.client, bodyType, method, u, headers, formData, r.config.FileFieldName, item.Raw())
112121
failpoint.Inject("recoverAbleErr", func() {
113122
err = errors.New("connection reset by peer")
114123
})
@@ -123,10 +132,10 @@ func (r *RestSink) Collect(ctx api.StreamContext, item api.RawTuple) error {
123132
method,
124133
u, string(item.Raw())))
125134
}
126-
return fmt.Errorf(`rest sink fails to send out the data:err=%s recoverAble=%v method=%s path="%s" request_body="%s"`,
135+
return fmt.Errorf(`rest sink fails to send out the data:err=%s recoverAble=%v method=%s path="%s"`,
127136
originErr.Error(),
128137
recoverAble,
129-
method, u, string(item.Raw()))
138+
method, u)
130139
} else {
131140
logger.Debugf("rest sink got response %v", resp)
132141
_, b, err := r.parseResponse(ctx, resp, "", r.config.DebugResp, false)

internal/io/http/rest_sink_test.go

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2024 EMQ Technologies Co., Ltd.
1+
// Copyright 2024-2025 EMQ Technologies Co., Ltd.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -291,3 +291,78 @@ func TestRestSinkRecoverErr(t *testing.T) {
291291
require.Error(t, err)
292292
require.True(t, errorx.IsIOError(err))
293293
}
294+
295+
func TestFormData(t *testing.T) {
296+
config := map[string]any{
297+
"method": "post",
298+
//"url": "http://localhost/test", //set dynamically to the test server
299+
"bodyType": "formdata",
300+
"sendSingle": true,
301+
"fileFieldName": "d",
302+
"formData": map[string]any{
303+
"tp": "1",
304+
},
305+
}
306+
data := []byte{0x1, 0x2, 0x3, 0x4, 0x5, 0x6}
307+
308+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
309+
err := r.ParseMultipartForm(10 << 20)
310+
require.NoError(t, err)
311+
file, _, err := r.FormFile("d") // "file" 是表单字段名
312+
require.NoError(t, err)
313+
defer file.Close()
314+
dd, err := io.ReadAll(file)
315+
require.NoError(t, err)
316+
require.Equal(t, data, dd)
317+
require.Equal(t, "1", r.Form.Get("tp"))
318+
w.WriteHeader(http.StatusOK)
319+
w.Write([]byte("文件上传成功"))
320+
}))
321+
defer ts.Close()
322+
// send request and wait result
323+
s := &RestSink{}
324+
config["url"] = ts.URL
325+
ctx := mockContext.NewMockContext("testFormData", "op")
326+
e := s.Provision(ctx, config)
327+
assert.NoError(t, e)
328+
e = s.Connect(ctx, func(status string, message string) {
329+
// do nothing
330+
})
331+
assert.NoError(t, e)
332+
e = s.Collect(ctx, &xsql.RawTuple{
333+
Rawdata: data,
334+
})
335+
assert.NoError(t, e)
336+
err := s.Close(ctx)
337+
assert.NoError(t, err)
338+
}
339+
340+
func TestFormDataErr(t *testing.T) {
341+
config := map[string]any{
342+
"method": "post",
343+
"url": "http://localhost/test", //not exist
344+
"bodyType": "formdata",
345+
"sendSingle": true,
346+
"fileFieldName": "d",
347+
"formData": map[string]any{
348+
"tp": "1",
349+
},
350+
}
351+
data := []byte{0x1, 0x2, 0x3, 0x4, 0x5, 0x6}
352+
353+
// send request and wait result
354+
s := &RestSink{}
355+
ctx := mockContext.NewMockContext("testFormData", "op")
356+
e := s.Provision(ctx, config)
357+
assert.NoError(t, e)
358+
e = s.Connect(ctx, func(status string, message string) {
359+
// do nothing
360+
})
361+
assert.NoError(t, e)
362+
e = s.Collect(ctx, &xsql.RawTuple{
363+
Rawdata: data,
364+
})
365+
assert.NotNil(t, e)
366+
err := s.Close(ctx)
367+
assert.NoError(t, err)
368+
}

internal/pkg/httpx/http.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,29 @@ import (
2020
"errors"
2121
"fmt"
2222
"io"
23+
"mime/multipart"
2324
"net/http"
2425
"net/url"
2526
"os"
27+
"strconv"
2628
"strings"
2729
"time"
2830

2931
"github.com/lf-edge/ekuiper/contract/v2/api"
3032
"github.com/pingcap/failpoint"
3133

3234
"github.com/lf-edge/ekuiper/v2/internal/conf"
35+
"github.com/lf-edge/ekuiper/v2/pkg/timex"
3336
)
3437

3538
var BodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": "application/x-www-form-urlencoded;param=value"}
3639

3740
// Send v must be a []byte or map
3841
func Send(logger api.Logger, client *http.Client, bodyType string, method string, u string, headers map[string]string, v any) (*http.Response, error) {
42+
return SendWithFormData(logger, client, bodyType, method, u, headers, nil, "", v)
43+
}
44+
45+
func SendWithFormData(logger api.Logger, client *http.Client, bodyType string, method string, u string, headers map[string]string, formData map[string]string, formFieldName string, v any) (*http.Response, error) {
3946
var req *http.Request
4047
var err error
4148
switch bodyType {
@@ -69,6 +76,41 @@ func Send(logger api.Logger, client *http.Client, bodyType string, method string
6976
if req.Header.Get("Content-Type") == "" {
7077
req.Header.Set("Content-Type", BodyTypeMap[bodyType])
7178
}
79+
case "formdata":
80+
var requestBody bytes.Buffer
81+
writer := multipart.NewWriter(&requestBody)
82+
fileField, err := writer.CreateFormFile(formFieldName, strconv.FormatInt(timex.GetNowInMilli(), 10))
83+
if err != nil {
84+
return nil, fmt.Errorf("fail to create file field: %v", err)
85+
}
86+
var payload io.Reader
87+
switch t := v.(type) {
88+
case []byte:
89+
payload = bytes.NewBuffer(t)
90+
case string:
91+
payload = bytes.NewBufferString(t)
92+
default:
93+
return nil, fmt.Errorf("http send only supports bytes but receive invalid content: %v", v)
94+
}
95+
_, err = io.Copy(fileField, payload)
96+
if err != nil {
97+
return nil, fmt.Errorf("fail to copy payload to file field: %v", err)
98+
}
99+
for k, v := range formData {
100+
err := writer.WriteField(k, v)
101+
if err != nil {
102+
logger.Errorf("fail write form data field %s: %v", k, err)
103+
}
104+
}
105+
err = writer.Close()
106+
if err != nil {
107+
logger.Errorf("fail to close writer: %v", err)
108+
}
109+
req, err = http.NewRequest(method, u, &requestBody)
110+
if err != nil {
111+
return nil, fmt.Errorf("fail to create request: %v", err)
112+
}
113+
req.Header.Set("Content-Type", writer.FormDataContentType())
72114
default:
73115
return nil, fmt.Errorf("unsupported body type %s", bodyType)
74116
}

internal/pkg/httpx/http_test.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022 EMQ Technologies Co., Ltd.
1+
// Copyright 2022-2025 EMQ Technologies Co., Ltd.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -14,7 +14,13 @@
1414

1515
package httpx
1616

17-
import "testing"
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/require"
21+
22+
"github.com/lf-edge/ekuiper/v2/internal/conf"
23+
)
1824

1925
func TestIsUrl(t *testing.T) {
2026
urls := []string{
@@ -38,3 +44,31 @@ func TestIsUrl(t *testing.T) {
3844
}
3945
}
4046
}
47+
48+
func TestErr(t *testing.T) {
49+
tests := []struct {
50+
name string
51+
u string
52+
data any
53+
err string
54+
}{
55+
{
56+
name: "wrong data",
57+
u: "http://noexist.org",
58+
data: 45,
59+
err: "http send only supports bytes but receive invalid content: 45",
60+
},
61+
{
62+
name: "wrong url",
63+
u: "\\\abc",
64+
data: "test",
65+
err: "fail to create request: parse \"\\\\\\abc\": net/url: invalid control character in URL",
66+
},
67+
}
68+
for _, test := range tests {
69+
t.Run(test.name, func(t *testing.T) {
70+
_, err := SendWithFormData(conf.Log, nil, "formdata", "POST", test.u, nil, nil, "", test.data)
71+
require.EqualError(t, err, test.err)
72+
})
73+
}
74+
}

0 commit comments

Comments
 (0)