Skip to content

Commit 5dee73c

Browse files
authored
test(http): add oauth test with header (#3998)
## Description This PR adds verification tests for the OAuth2 client credentials flow in REST sink and HTTP client, specifically validating the support for custom headers (e.g. `Content-Type: application/x-www-form-urlencoded`) which is required for Azure Event Hubs and other providers. ## Type of change - [x] Test cases --------- Signed-off-by: Jiyong Huang <[email protected]>
1 parent 2507eb1 commit 5dee73c

File tree

2 files changed

+187
-0
lines changed

2 files changed

+187
-0
lines changed

internal/io/http/client_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ package http
1717
import (
1818
"encoding/json"
1919
"fmt"
20+
"io"
21+
"net/http"
22+
"net/http/httptest"
2023
"testing"
2124

2225
"github.com/stretchr/testify/require"
@@ -204,3 +207,95 @@ func TestClientAuth(t *testing.T) {
204207
require.NoError(t, c.auth(ctx))
205208
require.NoError(t, c.refresh(ctx))
206209
}
210+
211+
func TestOAuthClientCredentials(t *testing.T) {
212+
// 1. Create a mock OAuth server
213+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
214+
if r.URL.Path == "/token" {
215+
// Verify Header
216+
contentType := r.Header.Get("Content-Type")
217+
if contentType != "application/x-www-form-urlencoded" {
218+
w.WriteHeader(http.StatusBadRequest)
219+
w.Write([]byte(fmt.Sprintf("Invalid Content-Type: %s", contentType)))
220+
return
221+
}
222+
223+
// Verify Body
224+
body, err := io.ReadAll(r.Body)
225+
if err != nil {
226+
w.WriteHeader(http.StatusInternalServerError)
227+
return
228+
}
229+
bodyStr := string(body)
230+
expectedBody := "grant_type=client_credentials&client_id=client_id&client_secret=client_secret&scope=test"
231+
if bodyStr != expectedBody {
232+
w.WriteHeader(http.StatusBadRequest)
233+
w.Write([]byte(fmt.Sprintf("Invalid Body: %s", bodyStr)))
234+
return
235+
}
236+
237+
// Return Token
238+
w.Header().Set("Content-Type", "application/json")
239+
json.NewEncoder(w).Encode(map[string]interface{}{
240+
"access_token": "mock_access_token",
241+
"expires_in": 3600,
242+
})
243+
return
244+
}
245+
246+
// Verify Protected Resource Access
247+
if r.URL.Path == "/data" {
248+
authHeader := r.Header.Get("Authorization")
249+
if authHeader != "Bearer mock_access_token" {
250+
w.WriteHeader(http.StatusUnauthorized)
251+
return
252+
}
253+
w.WriteHeader(http.StatusOK)
254+
w.Write([]byte(`{"status":"ok"}`))
255+
return
256+
}
257+
258+
w.WriteHeader(http.StatusNotFound)
259+
}))
260+
defer ts.Close()
261+
262+
// 2. Configure Client with OAuth
263+
ctx := mockContext.NewMockContext("rule1", "op1")
264+
c := &ClientConf{}
265+
266+
// Simulation of user configuration
267+
props := map[string]interface{}{
268+
"url": ts.URL + "/data",
269+
"method": "POST",
270+
"headers": map[string]interface{}{
271+
"Authorization": "Bearer {{.access_token}}",
272+
},
273+
"oauth": map[string]interface{}{
274+
"access": map[string]interface{}{
275+
"url": ts.URL + "/token",
276+
// Manually constructed body for client credentials
277+
"body": "grant_type=client_credentials&client_id=client_id&client_secret=client_secret&scope=test",
278+
// WORKAROUND: Explicitly set Content-Type header
279+
"headers": map[string]interface{}{
280+
"Content-Type": "application/x-www-form-urlencoded",
281+
},
282+
"expire": "3600",
283+
},
284+
},
285+
}
286+
287+
err := c.InitConf(ctx, "", props)
288+
require.NoError(t, err)
289+
290+
// 3. Connect (Triggers Auth)
291+
// This is where the auth flow happens. If it fails (e.g. wrong content type), this should error.
292+
err = c.Conn(ctx)
293+
require.NoError(t, err, "Connection failed, likely due to auth failure")
294+
295+
// 4. Send Data (Verifies Token Usage)
296+
data, _ := json.Marshal(map[string]interface{}{"data": 123})
297+
resp, err := c.Send(ctx, "json", "POST", c.config.Url, c.parsedHeaders, nil, "", data)
298+
require.NoError(t, err)
299+
defer resp.Body.Close()
300+
require.Equal(t, http.StatusOK, resp.StatusCode)
301+
}

internal/io/http/rest_sink_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,3 +430,95 @@ func TestRestSinkAuth(t *testing.T) {
430430
})
431431
}
432432
}
433+
434+
func TestRestSinkOAuthClientCredentials(t *testing.T) {
435+
// 1. Create a mock OAuth server
436+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
437+
if r.URL.Path == "/token" {
438+
// Verify Header
439+
contentType := r.Header.Get("Content-Type")
440+
if contentType != "application/x-www-form-urlencoded" {
441+
w.WriteHeader(http.StatusBadRequest)
442+
w.Write([]byte(fmt.Sprintf("Invalid Content-Type: %s", contentType)))
443+
return
444+
}
445+
446+
// Verify Body
447+
body, err := io.ReadAll(r.Body)
448+
if err != nil {
449+
w.WriteHeader(http.StatusInternalServerError)
450+
return
451+
}
452+
bodyStr := string(body)
453+
expectedBody := "grant_type=client_credentials&client_id=client_id&client_secret=client_secret&scope=test"
454+
if bodyStr != expectedBody {
455+
w.WriteHeader(http.StatusBadRequest)
456+
w.Write([]byte(fmt.Sprintf("Invalid Body: %s", bodyStr)))
457+
return
458+
}
459+
460+
// Return Token
461+
w.Header().Set("Content-Type", "application/json")
462+
json.NewEncoder(w).Encode(map[string]interface{}{
463+
"access_token": "mock_access_token",
464+
"expires_in": 3600,
465+
})
466+
return
467+
}
468+
469+
// Verify Protected Resource Access
470+
if r.URL.Path == "/data" {
471+
authHeader := r.Header.Get("Authorization")
472+
if authHeader != "Bearer mock_access_token" {
473+
w.WriteHeader(http.StatusUnauthorized)
474+
return
475+
}
476+
w.WriteHeader(http.StatusOK)
477+
w.Write([]byte(`{"status":"ok"}`))
478+
return
479+
}
480+
481+
w.WriteHeader(http.StatusNotFound)
482+
}))
483+
defer ts.Close()
484+
485+
// 2. Configure Rest Sink with OAuth Workaround
486+
ctx := mockContext.NewMockContext("ruleRestOAuth", "op")
487+
s := &RestSink{}
488+
err := s.Provision(ctx, map[string]interface{}{
489+
"url": ts.URL + "/data",
490+
"method": "POST",
491+
"headers": map[string]interface{}{
492+
"Authorization": "Bearer {{.access_token}}",
493+
},
494+
"oauth": map[string]interface{}{
495+
"access": map[string]interface{}{
496+
"url": ts.URL + "/token",
497+
// Manually constructed body for client credentials
498+
"body": "grant_type=client_credentials&client_id=client_id&client_secret=client_secret&scope=test",
499+
// WORKAROUND: Explicitly set Content-Type header
500+
"headers": map[string]interface{}{
501+
"Content-Type": "application/x-www-form-urlencoded",
502+
},
503+
"expire": "3600",
504+
},
505+
},
506+
})
507+
require.NoError(t, err)
508+
509+
// 3. Connect (Triggers Auth via Conn())
510+
err = s.Connect(ctx, func(status string, message string) {
511+
// do nothing
512+
})
513+
require.NoError(t, err, "RestSink Connect failed, likely OAuth auth failed")
514+
515+
// 4. Collect (Send Data verifying token)
516+
data := &xsql.RawTuple{
517+
Rawdata: []byte(`{"data":123}`),
518+
}
519+
err = s.Collect(ctx, data)
520+
require.NoError(t, err)
521+
522+
err = s.Close(ctx)
523+
require.NoError(t, err)
524+
}

0 commit comments

Comments
 (0)