Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions internal/cmd/local/helm/airbyte_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@ type ValuesOpts struct {
ImagePullSecret string
DisableAuth bool
LocalStorage bool
EnablePsql17 bool
}

const (
Psql17AirbyteTag = "1.7.0-17"
)

func BuildAirbyteValues(ctx context.Context, opts ValuesOpts) (string, error) {
span := trace.SpanFromContext(ctx)

Expand All @@ -33,6 +38,10 @@ func BuildAirbyteValues(ctx context.Context, opts ValuesOpts) (string, error) {
vals = append(vals, "global.storage.type=local")
}

if opts.EnablePsql17 {
vals = append(vals, "postgresql.image.tag="+Psql17AirbyteTag)
}

span.SetAttributes(
attribute.Bool("low-resource-mode", opts.LowResourceMode),
attribute.Bool("insecure-cookies", opts.InsecureCookies),
Expand Down
26 changes: 25 additions & 1 deletion internal/cmd/local/local/cmd.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package local

import (
"errors"
"fmt"
"io/fs"
"net/http"
"os"
"path"
"path/filepath"
"time"

Expand All @@ -12,6 +15,7 @@ import (
"github.com/airbytehq/abctl/internal/cmd/local/k8s/kind"
"github.com/airbytehq/abctl/internal/cmd/local/paths"
"github.com/airbytehq/abctl/internal/common"
"github.com/airbytehq/abctl/internal/pgdata"
"k8s.io/client-go/rest"

"github.com/airbytehq/abctl/internal/cmd/local/k8s"
Expand Down Expand Up @@ -199,7 +203,7 @@ func DefaultK8s(kubecfg, kubectx string) (k8s.Client, error) {
// local filesystem. It returns true if the MinIO data directory exists.
// Otherwise it returns false.
func SupportMinio() (bool, error) {
minioPath := filepath.Join(paths.Data, pvMinio)
minioPath := filepath.Join(paths.Data, paths.PvMinio)
f, err := os.Stat(minioPath)
if err != nil {
if os.IsNotExist(err) {
Expand All @@ -211,3 +215,23 @@ func SupportMinio() (bool, error) {

return f.IsDir(), nil
}

// EnablePsql17 checks if PostgreSQL data needs patching by examining the
// local PostgreSQL data directory. It returns true if the directory doesn't
// exist or contains PostgreSQL version 17. Otherwise it returns false.
func EnablePsql17() (bool, error) {
pgData := pgdata.New(&pgdata.Config{
Path: path.Join(paths.Data, paths.PvPsql, "pgdata"),
})

pgVersion, err := pgData.Version()
if err != nil && !errors.Is(err, fs.ErrNotExist) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errors.Is will handle the nil case for you, you don't need to check it explicitly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I can't do that is because I am using !. It would also return error when err is nil. See - https://go.dev/play/p/CAN89EgXbav

return false, fmt.Errorf("failed to determine if any previous psql version exists: %w", err)
}

if pgVersion == "" || pgVersion == "17" {
return true, nil
}

return false, nil
}
54 changes: 23 additions & 31 deletions internal/cmd/local/local/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ import (
"github.com/airbytehq/abctl/internal/cmd/local/helm"
"github.com/airbytehq/abctl/internal/cmd/local/k8s"
"github.com/airbytehq/abctl/internal/cmd/local/localerr"
"github.com/airbytehq/abctl/internal/cmd/local/migrate"
"github.com/airbytehq/abctl/internal/cmd/local/paths"
"github.com/airbytehq/abctl/internal/common"
"github.com/airbytehq/abctl/internal/telemetry"
"github.com/airbytehq/abctl/internal/merge"
"github.com/airbytehq/abctl/internal/trace"
helmclient "github.com/mittwald/go-helm-client"
"github.com/pterm/pterm"
Expand All @@ -36,11 +35,6 @@ import (
)

const (
// persistent volume constants, these are named to match the values given in the helm chart
pvMinio = "airbyte-minio-pv"
pvLocal = "airbyte-local-pv"
pvPsql = "airbyte-volume-db"

// persistent volume claim constants, these are named to match the values given in the helm chart
pvcMinio = "airbyte-minio-pv-claim-airbyte-minio-0"
pvcLocal = "airbyte-storage-pvc"
Expand All @@ -52,10 +46,10 @@ type InstallOpts struct {
HelmValuesYaml string
AirbyteChartLoc string
Secrets []string
Migrate bool
Hosts []string
ExtraVolumeMounts []k8s.ExtraVolumeMount
LocalStorage bool
EnablePsql17 bool

DockerServer string
DockerUser string
Expand Down Expand Up @@ -158,16 +152,23 @@ func (c *Command) persistentVolumeClaim(ctx context.Context, namespace, name, vo

// PrepImages determines the docker images needed by the chart, pulls them, and loads them into the cluster.
// This is best effort, so errors are dropped here.
func (c *Command) PrepImages(ctx context.Context, cluster k8s.Cluster, opts *InstallOpts) {
func (c *Command) PrepImages(ctx context.Context, cluster k8s.Cluster, opts *InstallOpts, withImages ...string) {
ctx, span := trace.NewSpan(ctx, "command.PrepImages")
defer span.End()

for _, image := range withImages {
pterm.Info.Printfln("Patching image %s", image)
}

manifest, err := images.FindImagesFromChart(opts.HelmValuesYaml, opts.AirbyteChartLoc, opts.HelmChartVersion)
if err != nil {
pterm.Debug.Printfln("error building image manifest: %s", err)
return
}

// Merge images with the manifest.
manifest = merge.DockerImages(manifest, withImages)

cluster.LoadImages(ctx, c.docker.Client, manifest)
}

Expand All @@ -192,40 +193,31 @@ func (c *Command) Install(ctx context.Context, opts *InstallOpts) error {
pterm.Info.Printfln("Namespace '%s' already exists", common.AirbyteNamespace)
}

// Storage volumes.
if opts.LocalStorage {
if err := c.persistentVolume(ctx, common.AirbyteNamespace, pvLocal); err != nil {
if err := c.persistentVolume(ctx, common.AirbyteNamespace, paths.PvLocal); err != nil {
return err
}

if err := c.persistentVolumeClaim(ctx, common.AirbyteNamespace, pvcLocal, paths.PvLocal); err != nil {
return err
}
} else {
if err := c.persistentVolume(ctx, common.AirbyteNamespace, pvMinio); err != nil {
if err := c.persistentVolume(ctx, common.AirbyteNamespace, paths.PvMinio); err != nil {
return err
}
}

if err := c.persistentVolume(ctx, common.AirbyteNamespace, pvPsql); err != nil {
return err
}

span.SetAttributes(attribute.Bool("migrate", opts.Migrate))
if opts.Migrate {
c.spinner.UpdateText("Migrating airbyte data")
if err := c.tel.Wrap(ctx, telemetry.Migrate, func() error { return migrate.FromDockerVolume(ctx, c.docker.Client, "airbyte_db") }); err != nil {
pterm.Error.Println("Failed to migrate data from previous Airbyte installation")
return fmt.Errorf("unable to migrate data from previous airbyte installation: %w", err)
if err := c.persistentVolumeClaim(ctx, common.AirbyteNamespace, pvcMinio, paths.PvMinio); err != nil {
return err
}
}

if opts.LocalStorage {
if err := c.persistentVolumeClaim(ctx, common.AirbyteNamespace, pvcLocal, pvLocal); err != nil {
return err
}
} else {
if err := c.persistentVolumeClaim(ctx, common.AirbyteNamespace, pvcMinio, pvMinio); err != nil {
return err
}
// PSQL volumes.
if err := c.persistentVolume(ctx, common.AirbyteNamespace, paths.PvPsql); err != nil {
return err
}

if err := c.persistentVolumeClaim(ctx, common.AirbyteNamespace, pvcPsql, pvPsql); err != nil {
if err := c.persistentVolumeClaim(ctx, common.AirbyteNamespace, pvcPsql, paths.PvPsql); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@ global:
memory: 4Gi
storage:
type: local
postgresql:
image:
tag: 1.7.0-17
22 changes: 19 additions & 3 deletions internal/cmd/local/local_install.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type InstallCmd struct {
Host []string `help:"HTTP ingress host."`
InsecureCookies bool `help:"Allow cookies to be served over HTTP."`
LowResourceMode bool `help:"Run Airbyte in low resource mode."`
Migrate bool `help:"Migrate data from a previous Docker Compose Airbyte installation."`
NoBrowser bool `help:"Disable launching a browser post install."`
Port int `default:"8000" help:"HTTP ingress port."`
Secret []string `type:"existingfile" help:"An Airbyte helm chart secret file."`
Expand Down Expand Up @@ -60,13 +59,22 @@ func (i *InstallCmd) InstallOpts(ctx context.Context, user string) (*local.Insta
pterm.Warning.Println("Found MinIO physical volume. Consider migrating it to local storage (see project docs)")
}

enablePsql17, err := local.EnablePsql17()
if err != nil {
return nil, err
}

if !enablePsql17 {
pterm.Warning.Println("Psql 13 detected. Consider upgrading to 17")
}

opts := &local.InstallOpts{
HelmChartVersion: i.ChartVersion,
AirbyteChartLoc: helm.LocateLatestAirbyteChart(i.ChartVersion, i.Chart),
Secrets: i.Secret,
Migrate: i.Migrate,
Hosts: i.Host,
LocalStorage: !supportMinio,
EnablePsql17: enablePsql17,
ExtraVolumeMounts: extraVolumeMounts,
DockerServer: i.DockerServer,
DockerUser: i.DockerUsername,
Expand All @@ -81,6 +89,7 @@ func (i *InstallCmd) InstallOpts(ctx context.Context, user string) (*local.Insta
LowResourceMode: i.LowResourceMode,
DisableAuth: i.DisableAuth,
LocalStorage: !supportMinio,
EnablePsql17: enablePsql17,
}

if opts.DockerAuth() {
Expand Down Expand Up @@ -115,11 +124,18 @@ func (i *InstallCmd) Run(ctx context.Context, provider k8s.Provider, telClient t
return fmt.Errorf("unable to determine docker installation status: %w", err)
}

// Overrides Helm chart images.
overrideImages := []string{}

opts, err := i.InstallOpts(ctx, telClient.User())
if err != nil {
return err
}

if opts.EnablePsql17 {
overrideImages = append(overrideImages, "airbyte/db:"+helm.Psql17AirbyteTag)
}

return telClient.Wrap(ctx, telemetry.Install, func() error {
spinner.UpdateText(fmt.Sprintf("Checking for existing Kubernetes cluster '%s'", provider.ClusterName))

Expand Down Expand Up @@ -180,7 +196,7 @@ func (i *InstallCmd) Run(ctx context.Context, provider k8s.Provider, telClient t
}

spinner.UpdateText("Pulling images")
lc.PrepImages(ctx, cluster, opts)
lc.PrepImages(ctx, cluster, opts, overrideImages...)

if err := lc.Install(ctx, opts); err != nil {
spinner.Fail("Unable to install Airbyte locally")
Expand Down
1 change: 1 addition & 0 deletions internal/cmd/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func TestInstallOpts(t *testing.T) {
HelmValuesYaml: string(b),
AirbyteChartLoc: "/test/path/to/chart",
LocalStorage: true,
EnablePsql17: true,
}
opts, err := cmd.InstallOpts(context.Background(), "test-user")
if err != nil {
Expand Down
Loading