Skip to content

feat(go/adbc/driver/flightsql): Add OAuth Support to Flight Client #2651

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Apr 17, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
39da2c4
initial attempt. working with token exchange
xborder Mar 20, 2025
c10af80
add oauth support for flight client
xborder Mar 25, 2025
7758432
add oauth tests
xborder Mar 25, 2025
bb8a1f1
add missing configurations to token exchange flow
xborder Mar 25, 2025
10cd341
add missing configurations to client credentials
xborder Mar 25, 2025
775e779
revert changes to example_usage_test
xborder Mar 25, 2025
3f83c43
fix(go/adbc/driver/flightsql): tests linting
xborder Mar 25, 2025
e74fb32
fix(go/adbc/driver/flightsql): reset test suit db instead of server
xborder Mar 25, 2025
823e6b8
chore(go/adbc/driver/flightsql): refactor set auth header to a function
xborder Apr 7, 2025
a0ad006
chore(go/adbc/driver/flightsql): refactor flightsql_oauth implementation
xborder Apr 7, 2025
737e207
chore(go/adbc/driver/flightsql): small fixes
xborder Apr 7, 2025
9ff438c
chore(go/adbc/driver/flightsql): moved token to flightsq configuration
xborder Apr 7, 2025
f8ce583
chore(go/adbc/driver/flightsql): fix token key
xborder Apr 7, 2025
78fc5fc
chore(go/adbc/driver/flightsql): simplify setting token in SetOptions
xborder Apr 7, 2025
b1f4a4d
fix(go/adbc/driver/flightsql): throw error if triggering oauth but to…
xborder Apr 7, 2025
1fc80e7
test(go/adbc/driver/flightsql): test to fail oauth if token is set
xborder Apr 7, 2025
39693e2
chore(go/adbc): remove token as universal option
xborder Apr 8, 2025
2a84059
chore(go/adbc/driver/flightsql): replace oauth impl
xborder Apr 8, 2025
a2b2fe3
test(go/adbc/driver/flightsql): adapt OAuth tests with tls
xborder Apr 8, 2025
bded2f0
chore(go/adbc/driver/flightsql): replace TokenSource with PerRPCCrede…
xborder Apr 9, 2025
76fc327
chore(go/adbc/driver/flightsql): remove atoi
xborder Apr 9, 2025
4d763b9
docs(source/driver): document oauth options
xborder Apr 11, 2025
ee56375
chore(go/adbc/driver/flightsql): simplification
xborder Apr 11, 2025
e7dc917
chore(go/adbc/driver/flightsql): code improvements
xborder Apr 14, 2025
64b8143
fix(go/adbc/driver/flightsql): fix expected message
xborder Apr 14, 2025
9e98498
docs(docs/source/driver): typo
xborder Apr 16, 2025
9a665d0
docs(docs/source/driver): remove go only from docs
xborder Apr 17, 2025
c94f2ee
fix(go/adbc/driver/flightsql): small nits
xborder Apr 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/adbc/adbc.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ const (
OptionKeyURI = "uri"
OptionKeyUsername = "username"
OptionKeyPassword = "password"
OptionKeyToken = "token"
)

type OptionIsolationLevel string
Expand Down
294 changes: 294 additions & 0 deletions go/adbc/driver/flightsql/flightsql_adbc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"errors"
"fmt"
"net"
"net/http"
"net/http/httptest"
"net/textproto"
"os"
"strconv"
Expand Down Expand Up @@ -150,6 +152,10 @@ func TestGetObjects(t *testing.T) {
suite.Run(t, &GetObjectsTests{})
}

func TestOauth(t *testing.T) {
suite.Run(t, &OAuthTests{})
}

// ---- AuthN Tests --------------------

type AuthnTestServer struct {
Expand Down Expand Up @@ -249,6 +255,294 @@ func (suite *AuthnTests) TestBearerTokenUpdated() {
defer reader.Release()
}

type OAuthTests struct {
ServerBasedTests

oauthServer *httptest.Server
mockOAuthServer *MockOAuthServer
}

// MockOAuthServer simulates an OAuth 2.0 server for testing
type MockOAuthServer struct {
// Track calls to validate server behavior
clientCredentialsCalls int
tokenExchangeCalls int
}

func (m *MockOAuthServer) handleTokenRequest(w http.ResponseWriter, r *http.Request) {
// Parse the form to get the request parameters
if err := r.ParseForm(); err != nil {
http.Error(w, "Invalid request", http.StatusBadRequest)
return
}

grantType := r.FormValue("grant_type")

switch grantType {
case "client_credentials":
m.clientCredentialsCalls++
// Validate client credentials
clientID := r.FormValue("client_id")
clientSecret := r.FormValue("client_secret")

if clientID == "test-client" && clientSecret == "test-secret" {
// Return a valid token response
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"access_token": "test-client-token",
"token_type": "bearer",
"expires_in": 3600
}`))

return
}

case "urn:ietf:params:oauth:grant-type:token-exchange":
m.tokenExchangeCalls++
// Validate token exchange parameters
subjectToken := r.FormValue("subject_token")
subjectTokenType := r.FormValue("subject_token_type")

if subjectToken == "test-subject-token" &&
subjectTokenType == "urn:ietf:params:oauth:token-type:jwt" {
// Return a valid token response
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"access_token": "test-exchanged-token",
"token_type": "bearer",
"expires_in": 3600
}`))
return
}
}

// Default: return error for invalid request
http.Error(w, "Invalid request", http.StatusBadRequest)
}

func oauthTestUnary(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.InvalidArgument, "Could not get metadata")
}
auth := md.Get("authorization")
if len(auth) == 0 {
return nil, status.Error(codes.Unauthenticated, "No token")
} else if auth[0] != "Bearer test-exchanged-token" && auth[0] != "Bearer test-client-token" {
return nil, status.Error(codes.Unauthenticated, "Invalid token for unary call: "+auth[0])
}

md.Set("authorization", "Bearer final")
ctx = metadata.NewOutgoingContext(ctx, md)
return handler(ctx, req)
}

func (suite *OAuthTests) SetupSuite() {
suite.mockOAuthServer = &MockOAuthServer{}
suite.oauthServer = httptest.NewServer(http.HandlerFunc(suite.mockOAuthServer.handleTokenRequest))
suite.DoSetupSuite(&AuthnTestServer{}, []flight.ServerMiddleware{
{Unary: oauthTestUnary},
}, map[string]string{})
}

func (suite *OAuthTests) TearDownSuite() {
suite.oauthServer.Close()
}

func (suite *OAuthTests) SetupTest() {
// override inherited setup to reset db and open the database during test execution
var err error
suite.db, err = (driver.NewDriver(memory.DefaultAllocator)).NewDatabaseWithOptions(
map[string]string{
"uri": "grpc+tcp://" + suite.s.Addr().String(),
})
suite.Require().NoError(err)
}

func (suite *OAuthTests) TearDownTest() {
suite.db.Close()
}

func (suite *OAuthTests) TestToken() {
err := suite.db.SetOptions(map[string]string{
adbc.OptionKeyToken: "test-client-token",
})
suite.Require().NoError(err)

suite.openAndExecuteQuery("a-query")
}

func (suite *OAuthTests) TestTokenExchangeFlow() {
err := suite.db.SetOptions(map[string]string{
driver.OptionKeyOauthFlow: strconv.Itoa(driver.TokenExchange),
adbc.OptionKeyToken: "test-subject-token",
driver.OptionKeySubjectTokenType: "urn:ietf:params:oauth:token-type:jwt",
driver.OptionKeyTokenURI: suite.oauthServer.URL,
})
suite.Require().NoError(err)

suite.openAndExecuteQuery("a-query")
suite.Equal(1, suite.mockOAuthServer.tokenExchangeCalls, "Token exchange flow should be called once")
}

func (suite *OAuthTests) TestClientCredentialsFlow() {
var err error
err = suite.db.SetOptions(map[string]string{
driver.OptionKeyOauthFlow: strconv.Itoa(driver.ClientCredentials),
driver.OptionKeyClientId: "test-client",
driver.OptionKeyClientSecret: "test-secret",
driver.OptionKeyTokenURI: suite.oauthServer.URL,
})
suite.Require().NoError(err)

suite.cnxn, err = suite.db.Open(context.Background())
suite.Require().NoError(err)
defer suite.cnxn.Close()

suite.openAndExecuteQuery("a-query")
// golang/oauth2 tries to call the token endpoint sending the client credentials in the authentication header,
// if it fails, it retries sending the client credentials in the request body.
// See https://code.google.com/p/goauth2/issues/detail?id=31 for background.
suite.Equal(2, suite.mockOAuthServer.clientCredentialsCalls, "Client credentials flow should be called once")
}

func (suite *OAuthTests) openAndExecuteQuery(query string) {
var err error
suite.cnxn, err = suite.db.Open(context.Background())
suite.Require().NoError(err)
defer suite.cnxn.Close()

stmt, err := suite.cnxn.NewStatement()
suite.Require().NoError(err)
defer stmt.Close()

suite.Require().NoError(stmt.SetSqlQuery(query))
reader, _, err := stmt.ExecuteQuery(context.Background())
suite.NoError(err)
defer reader.Release()
}

func (suite *OAuthTests) TestMissingRequiredParamsTokenExchange() {
testCases := []struct {
name string
options map[string]string
expectedErrorMsg string
}{
{
name: "Missing token",
options: map[string]string{
driver.OptionKeyOauthFlow: strconv.Itoa(driver.TokenExchange),
driver.OptionKeySubjectTokenType: "urn:ietf:params:oauth:token-type:jwt",
driver.OptionKeyTokenURI: suite.oauthServer.URL,
},
expectedErrorMsg: "Not Implemented: oauth flow not implemented: 4",
},
{
name: "Missing subject token type",
options: map[string]string{
driver.OptionKeyOauthFlow: strconv.Itoa(driver.TokenExchange),
adbc.OptionKeyToken: "test-subject-token",
driver.OptionKeyTokenURI: suite.oauthServer.URL,
},
expectedErrorMsg: "token Exchange grant requires subject token type",
},
{
name: "Missing token URI",
options: map[string]string{
driver.OptionKeyOauthFlow: strconv.Itoa(driver.TokenExchange),
adbc.OptionKeyToken: "test-subject-token",
driver.OptionKeySubjectTokenType: "urn:ietf:params:oauth:token-type:jwt",
},
expectedErrorMsg: "token exchange grant requires token URI",
},
}

for _, tc := range testCases {
suite.Run(tc.name, func() {
// We need to set options with the driver's SetOptions method
err := suite.db.SetOptions(tc.options)
suite.Error(err, "Expected error for missing parameters")
suite.Contains(err.Error(), tc.expectedErrorMsg)
})
}
}
func (suite *OAuthTests) TestMissingRequiredParamsClientCredentials() {
testCases := []struct {
name string
options map[string]string
expectedErrorMsg string
}{
{
name: "Missing client ID",
options: map[string]string{
driver.OptionKeyOauthFlow: strconv.Itoa(driver.ClientCredentials),
driver.OptionKeyClientSecret: "test-secret",
driver.OptionKeyTokenURI: suite.oauthServer.URL,
},
expectedErrorMsg: "client credentials grant requires client_id",
},
{
name: "Missing client secret",
options: map[string]string{
driver.OptionKeyOauthFlow: strconv.Itoa(driver.ClientCredentials),
driver.OptionKeyClientId: "test-client",
driver.OptionKeyTokenURI: suite.oauthServer.URL,
},
expectedErrorMsg: "client credentials grant requires client_secret",
},
{
name: "Missing token URI",
options: map[string]string{
driver.OptionKeyOauthFlow: strconv.Itoa(driver.ClientCredentials),
driver.OptionKeyClientId: "test-client",
driver.OptionKeyClientSecret: "test-secret",
},
expectedErrorMsg: "client credentials grant requires token_uri",
},
}

for _, tc := range testCases {
suite.Run(tc.name, func() {
// We need to set options with the driver's SetOptions method
err := suite.db.SetOptions(tc.options)
suite.Error(err, "Expected error for missing parameters")
suite.Contains(err.Error(), tc.expectedErrorMsg)
})
}
}

func (suite *OAuthTests) TestInvalidOAuthFlow() {
testCases := []struct {
name string
options map[string]string
expectedErrorMsg string
}{
{
name: "Invalid flow value",
options: map[string]string{
driver.OptionKeyOauthFlow: "invalid-flow",
adbc.OptionKeyToken: "test-token",
},
expectedErrorMsg: "Invalid Argument: unsupported option",
},
{
name: "Unsupported flow type",
options: map[string]string{
driver.OptionKeyOauthFlow: "99", // Using a number that doesn't map to a defined flow
},
expectedErrorMsg: "Not Implemented: oauth flow not implemented: 99",
},
}

for _, tc := range testCases {
suite.Run(tc.name, func() {
err := suite.db.SetOptions(tc.options)
suite.Error(err, "Expected error for invalid OAuth flow")
suite.Contains(err.Error(), tc.expectedErrorMsg)
})
}
}

// ---- Grpc Dialer Options Tests --------------

type DialerOptionsTests struct {
Expand Down
Loading
Loading