Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ require (
golang.org/x/term v0.27.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.8.0 // indirect
golang.org/x/tools v0.28.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 // indirect
google.golang.org/grpc v1.68.1 // indirect
google.golang.org/protobuf v1.35.2 // indirect
Expand All @@ -183,5 +182,3 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.4.3 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

tool go.uber.org/mock/mockgen
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,6 @@ go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeX
go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU=
go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM=
go.uber.org/mock v0.5.2 h1:LbtPTcP8A5k9WPXj54PPPbjcI4Y6lhyOZXn+VS7wNko=
go.uber.org/mock v0.5.2/go.mod h1:wLlUxC2vVTPTaE3UD51E0BGOAElKrILxhVSDYQLld5o=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
Expand Down
134 changes: 134 additions & 0 deletions internal/api/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package api

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"path"

"github.com/airbytehq/abctl/internal/auth/oidc"
)

// Client is the Airbyte API client
type Client struct {
baseURL string
authClient *oidc.AuthenticatedClient
}

// NewClient creates a new Airbyte API client
func NewClient(baseURL string, authClient *oidc.AuthenticatedClient) *Client {
return &Client{
baseURL: baseURL,
authClient: authClient,
}
}

// doRequest performs an authenticated API request to the public API
func (c *Client) doRequest(ctx context.Context, method, endpoint string, body interface{}) (*http.Response, error) {
return c.doRequestWithQuery(ctx, method, endpoint, nil, body)
}

// doRequestWithQuery performs an authenticated API request with query parameters
func (c *Client) doRequestWithQuery(ctx context.Context, method, endpoint string, queryParams url.Values, body interface{}) (*http.Response, error) {
// Build full URL
u, err := url.Parse(c.baseURL)
if err != nil {
return nil, fmt.Errorf("invalid base URL: %w", err)
}
u.Path = path.Join(u.Path, "api", "public", "v1", endpoint)

// Add query parameters if provided
if queryParams != nil {
u.RawQuery = queryParams.Encode()
}

// Serialize body if provided
var bodyReader io.Reader
if body != nil {
jsonBody, err := json.Marshal(body)
if err != nil {
return nil, fmt.Errorf("failed to marshal request body: %w", err)
}
// Debug: Print the JSON being sent
fmt.Printf("DEBUG - Request body JSON: %s\n", string(jsonBody))
bodyReader = bytes.NewReader(jsonBody)
}

// Create request
req, err := http.NewRequestWithContext(ctx, method, u.String(), bodyReader)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}

// Set content type for requests with body
if body != nil {
req.Header.Set("Content-Type", "application/json")
}

// The AuthenticatedClient will automatically add the Authorization header
// with the bearer token and handle token refresh if needed
return c.authClient.Do(req)
}

// doRequestWithPath performs an authenticated API request with a custom path
func (c *Client) doRequestWithPath(ctx context.Context, method, fullPath string, body interface{}) (*http.Response, error) {
// Build full URL
u, err := url.Parse(c.baseURL)
if err != nil {
return nil, fmt.Errorf("invalid base URL: %w", err)
}
u.Path = path.Join(u.Path, fullPath)

// Serialize body if provided
var bodyReader io.Reader
if body != nil {
jsonBody, err := json.Marshal(body)
if err != nil {
return nil, fmt.Errorf("failed to marshal request body: %w", err)
}
bodyReader = bytes.NewReader(jsonBody)
}

// Create request
req, err := http.NewRequestWithContext(ctx, method, u.String(), bodyReader)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}

// Set content type for requests with body
if body != nil {
req.Header.Set("Content-Type", "application/json")
}

// The AuthenticatedClient will automatically add the Authorization header
// with the bearer token and handle token refresh if needed
return c.authClient.Do(req)
}

// parseResponse parses the API response into the provided interface
func parseResponse(resp *http.Response, v interface{}) error {
defer resp.Body.Close()

// Read the full response body for better error reporting
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %w", err)
}

if resp.StatusCode >= 400 {
return fmt.Errorf("API error: status %d, body: %s", resp.StatusCode, string(body))
}

if v != nil {
if err := json.Unmarshal(body, v); err != nil {
return fmt.Errorf("failed to decode response (status %d): %w\nResponse body: %s",
resp.StatusCode, err, string(body))
}
}

return nil
}
89 changes: 89 additions & 0 deletions internal/api/dataplanes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package api

import (
"context"
"fmt"
)

// DataPlane represents an Airbyte data plane
type DataPlane struct {
ID string `json:"id,omitempty"`
Name string `json:"name"`
RegionID string `json:"regionId"`
Status string `json:"status,omitempty"`
}

// CreateDataPlaneRequest represents the request to create a data plane
type CreateDataPlaneRequest struct {
Name string `json:"name"`
RegionID string `json:"regionId"`
}

// CreateDataPlaneResponse represents the response from creating a data plane
type CreateDataPlaneResponse struct {
DataPlaneID string `json:"dataplaneId"`
ClientID string `json:"clientId"`
ClientSecret string `json:"clientSecret"`
}

// CreateDataPlane creates a new data plane
func (c *Client) CreateDataPlane(ctx context.Context, req *CreateDataPlaneRequest) (*CreateDataPlaneResponse, error) {
resp, err := c.doRequest(ctx, "POST", "dataplanes", req)
if err != nil {
return nil, fmt.Errorf("failed to create data plane: %w", err)
}

var response CreateDataPlaneResponse
if err := parseResponse(resp, &response); err != nil {
return nil, err
}

return &response, nil
}

// ListDataPlanes lists all data planes
func (c *Client) ListDataPlanes(ctx context.Context, regionID string) ([]*DataPlane, error) {
endpoint := "dataplanes"
if regionID != "" {
endpoint = fmt.Sprintf("dataplanes?regionId=%s", regionID)
}

resp, err := c.doRequest(ctx, "GET", endpoint, nil)
if err != nil {
return nil, fmt.Errorf("failed to list data planes: %w", err)
}

var dataPlanes []*DataPlane
if err := parseResponse(resp, &dataPlanes); err != nil {
return nil, err
}

return dataPlanes, nil
}

// GetDataPlane gets a data plane by ID
func (c *Client) GetDataPlane(ctx context.Context, dataPlaneID string) (*DataPlane, error) {
endpoint := fmt.Sprintf("dataplanes/%s", dataPlaneID)
resp, err := c.doRequest(ctx, "GET", endpoint, nil)
if err != nil {
return nil, fmt.Errorf("failed to get data plane: %w", err)
}

var dataPlane DataPlane
if err := parseResponse(resp, &dataPlane); err != nil {
return nil, err
}

return &dataPlane, nil
}

// DeleteDataPlane deletes a data plane
func (c *Client) DeleteDataPlane(ctx context.Context, dataPlaneID string) error {
endpoint := fmt.Sprintf("dataplanes/%s", dataPlaneID)
resp, err := c.doRequest(ctx, "DELETE", endpoint, nil)
if err != nil {
return fmt.Errorf("failed to delete data plane: %w", err)
}

return parseResponse(resp, nil)
}
30 changes: 30 additions & 0 deletions internal/api/instance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package api

import (
"context"
"fmt"
)

// InstanceConfiguration represents the instance configuration
type InstanceConfiguration struct {
Edition string `json:"edition"`
Version string `json:"version"`
LicenseType string `json:"licenseType,omitempty"`
TrackingStrategy string `json:"trackingStrategy,omitempty"`
}

// GetInstanceConfiguration gets the instance configuration
func (c *Client) GetInstanceConfiguration(ctx context.Context) (*InstanceConfiguration, error) {
// Note: This endpoint uses v1 instead of public/v1
resp, err := c.doRequestWithPath(ctx, "GET", "/api/v1/instance_configuration", nil)
if err != nil {
return nil, fmt.Errorf("failed to get instance configuration: %w", err)
}

var config InstanceConfiguration
if err := parseResponse(resp, &config); err != nil {
return nil, err
}

return &config, nil
}
49 changes: 49 additions & 0 deletions internal/api/organizations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package api

import (
"context"
"fmt"
)

// Organization represents an Airbyte organization
type Organization struct {
ID string `json:"organizationId"`
Name string `json:"organizationName"`
Email string `json:"email"`
}

// OrganizationResponse represents the API response for organizations
type OrganizationResponse struct {
Data []*Organization `json:"data"`
}

// ListOrganizations lists all organizations the user has access to
func (c *Client) ListOrganizations(ctx context.Context) ([]*Organization, error) {
resp, err := c.doRequest(ctx, "GET", "organizations", nil)
if err != nil {
return nil, fmt.Errorf("failed to list organizations: %w", err)
}

var response OrganizationResponse
if err := parseResponse(resp, &response); err != nil {
return nil, fmt.Errorf("failed to list organizations: %w", err)
}

return response.Data, nil
}

// GetOrganization gets an organization by ID
func (c *Client) GetOrganization(ctx context.Context, organizationID string) (*Organization, error) {
endpoint := fmt.Sprintf("organizations/%s", organizationID)
resp, err := c.doRequest(ctx, "GET", endpoint, nil)
if err != nil {
return nil, fmt.Errorf("failed to get organization: %w", err)
}

var org Organization
if err := parseResponse(resp, &org); err != nil {
return nil, err
}

return &org, nil
}
Loading