Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions internal/abctl/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package abctl

// Config holds configuration extracted from Airbyte installations.
// May be moved to a more specific package as the architecture evolves.
type Config struct {
AirbyteAPIHost string // From AIRBYTE_API_HOST env var
AirbyteURL string // From AIRBYTE_URL env var
AirbyteAuthURL string // From AB_AIRBYTE_AUTH_IDENTITY_PROVIDER_OIDC_ENDPOINTS_AUTHORIZATION_SERVER_ENDPOINT env var
}
2 changes: 2 additions & 0 deletions internal/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"context"

"github.com/airbytehq/abctl/internal/cmd/config"
"github.com/airbytehq/abctl/internal/cmd/images"
"github.com/airbytehq/abctl/internal/cmd/local"
"github.com/airbytehq/abctl/internal/cmd/version"
Expand All @@ -20,6 +21,7 @@ func (v verbose) BeforeApply() error {
}

type Cmd struct {
Config config.Cmd `cmd:"" help:"Manage abctl configuration."`
Local local.Cmd `cmd:"" help:"Manage the local Airbyte installation."`
Images images.Cmd `cmd:"" help:"Manage images used by Airbyte and abctl."`
Version version.Cmd `cmd:"" help:"Display version information."`
Expand Down
6 changes: 6 additions & 0 deletions internal/cmd/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package config

// Cmd represents the config command group
type Cmd struct {
Init InitCmd `cmd:"" help:"Initialize abctl configuration from existing Airbyte installation."`
}
127 changes: 127 additions & 0 deletions internal/cmd/config/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package config

import (
"context"
"fmt"
"strings"
"time"

"github.com/airbytehq/abctl/internal/k8s"
"github.com/airbytehq/abctl/internal/service"
"github.com/pterm/pterm"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// InitCmd represents the init command
type InitCmd struct {
Namespace string `flag:"" short:"n" help:"Target namespace (default: current kubeconfig context)."`
Force bool `flag:"" help:"Overwrite existing abctl ConfigMap."`
FromConfigmap string `flag:"" help:"Source ConfigMap name (default: auto-detect via -airbyte-env suffix)."`
}

// Run executes the init command
func (c *InitCmd) Run(ctx context.Context, provider k8s.Provider) error {
pterm.Info.Println("Initializing abctl configuration...")

pterm.Info.Printf("Using namespace: %s\n", c.Namespace)
pterm.Debug.Printf("Provider kubeconfig: %s\n", provider.Kubeconfig)
pterm.Debug.Printf("Provider context: %s\n", provider.Context)

// Create k8s client using standard kubeconfig resolution (KUBECONFIG env var or ~/.kube/config)
k8sClient, err := service.DefaultK8s("", "")
if err != nil {
return fmt.Errorf("failed to create k8s client: %w", err)
}

// Determine source ConfigMap name
sourceConfigMapName := c.FromConfigmap
if sourceConfigMapName == "" {
// Find ConfigMap with -airbyte-env suffix
sourceConfigMapName, err = findAirbyteEnvConfigMap(ctx, k8sClient, c.Namespace)
if err != nil {
return fmt.Errorf("failed to auto-detect Airbyte ConfigMap: %w", err)
}
}

pterm.Info.Printf("Reading from ConfigMap: %s\n", sourceConfigMapName)

// Read the source ConfigMap
pterm.Debug.Printf("Attempting to get ConfigMap: namespace=%s, name=%s\n", c.Namespace, sourceConfigMapName)
sourceConfigMap, err := k8sClient.ConfigMapGet(ctx, c.Namespace, sourceConfigMapName)
if err != nil {
return fmt.Errorf("failed to read ConfigMap %s/%s: %w", c.Namespace, sourceConfigMapName, err)
}

pterm.Success.Printf("Found ConfigMap %s with %d keys\n", sourceConfigMapName, len(sourceConfigMap.Data))

// Extract key configuration values
config, err := k8s.AbctlConfigFromData(sourceConfigMap.Data)
if err != nil {
return fmt.Errorf("failed to extract configuration: %w", err)
}

pterm.Info.Printf("Extracted configuration:\n")
pterm.Info.Printf(" Airbyte API Host: %s\n", config.AirbyteAPIHost)
pterm.Info.Printf(" Airbyte URL: %s\n", config.AirbyteURL)

// Check if abctl ConfigMap already exists
const abctlConfigMapName = "abctl"
_, err = k8sClient.ConfigMapGet(ctx, c.Namespace, abctlConfigMapName)
if err == nil && !c.Force {
return fmt.Errorf("abctl ConfigMap already exists in namespace %s, use --force to overwrite", c.Namespace)
}

// Create abctl ConfigMap
abctlConfigMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: abctlConfigMapName,
Namespace: c.Namespace,
Annotations: map[string]string{
"abctl.airbyte.com/initialized-from": sourceConfigMapName,
"abctl.airbyte.com/initialized-at": time.Now().Format(time.RFC3339),
},
},
Data: map[string]string{
"airbyteApiHost": config.AirbyteAPIHost,
"airbyteURL": config.AirbyteURL,
"airbyteAuthURL": config.AirbyteAuthURL,
},
}

// Create or update the ConfigMap
if err = k8sClient.ConfigMapCreate(ctx, abctlConfigMap); err != nil {
// If creation failed and --force is set, try to update
if c.Force {
if updateErr := k8sClient.ConfigMapUpdate(ctx, abctlConfigMap); updateErr != nil {
return fmt.Errorf("failed to create or update abctl ConfigMap: %w", updateErr)
}
pterm.Success.Printf("Updated abctl ConfigMap in namespace %s\n", c.Namespace)
} else {
return fmt.Errorf("failed to create abctl ConfigMap: %w", err)
}
} else {
pterm.Success.Printf("Created abctl ConfigMap in namespace %s\n", c.Namespace)
}

pterm.Info.Println("Configuration initialization completed successfully")

return nil
}

// findAirbyteEnvConfigMap finds a ConfigMap whose name ends with "-airbyte-env" suffix
func findAirbyteEnvConfigMap(ctx context.Context, k8sClient k8s.Client, namespace string) (string, error) {
configMaps, err := k8sClient.ConfigMapList(ctx, namespace)
if err != nil {
return "", fmt.Errorf("failed to list ConfigMaps: %w", err)
}

const suffix = "-airbyte-env"
for _, cm := range configMaps.Items {
if strings.HasSuffix(cm.Name, suffix) {
return cm.Name, nil
}
}

return "", fmt.Errorf("no ConfigMap with suffix %q found in namespace %s", suffix, namespace)
}
30 changes: 30 additions & 0 deletions internal/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ type Client interface {
SecretDeleteCollection(ctx context.Context, namespace, _type string) error
SecretGet(ctx context.Context, namespace, name string) (*corev1.Secret, error)

// ConfigMapGet retrieves a ConfigMap by name
ConfigMapGet(ctx context.Context, namespace, name string) (*corev1.ConfigMap, error)
// ConfigMapList lists ConfigMaps in a namespace
ConfigMapList(ctx context.Context, namespace string) (*corev1.ConfigMapList, error)
// ConfigMapCreate creates a new ConfigMap
ConfigMapCreate(ctx context.Context, configMap *corev1.ConfigMap) error
// ConfigMapUpdate updates an existing ConfigMap
ConfigMapUpdate(ctx context.Context, configMap *corev1.ConfigMap) error

ServiceGet(ctx context.Context, namespace, name string) (*corev1.Service, error)

StreamPodLogs(ctx context.Context, namespace string, podName string, since time.Time) (io.ReadCloser, error)
Expand Down Expand Up @@ -336,3 +345,24 @@ func (d *DefaultK8sClient) StreamPodLogs(ctx context.Context, namespace string,
func (d *DefaultK8sClient) PodList(ctx context.Context, namespace string) (*corev1.PodList, error) {
return d.ClientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
}

// ConfigMapGet retrieves a ConfigMap by name
func (d *DefaultK8sClient) ConfigMapGet(ctx context.Context, namespace, name string) (*corev1.ConfigMap, error) {
return d.ClientSet.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{})
}

// ConfigMapCreate creates a new ConfigMap
func (d *DefaultK8sClient) ConfigMapCreate(ctx context.Context, configMap *corev1.ConfigMap) error {
_, err := d.ClientSet.CoreV1().ConfigMaps(configMap.Namespace).Create(ctx, configMap, metav1.CreateOptions{})
return err
}

// ConfigMapUpdate updates an existing ConfigMap
func (d *DefaultK8sClient) ConfigMapUpdate(ctx context.Context, configMap *corev1.ConfigMap) error {
_, err := d.ClientSet.CoreV1().ConfigMaps(configMap.Namespace).Update(ctx, configMap, metav1.UpdateOptions{})
return err
}

func (d *DefaultK8sClient) ConfigMapList(ctx context.Context, namespace string) (*corev1.ConfigMapList, error) {
return d.ClientSet.CoreV1().ConfigMaps(namespace).List(ctx, metav1.ListOptions{})
}
Loading