Skip to content

Commit 5d7f96f

Browse files
committed
Refactor defaults for OpenShift clusters.
We now automatically created ingresses for OpenShift clusters and the connect info automatically switches to using ingress data for calculating connect info. This is required as OpenShift clusters are always in private subnets that arent accessible anyways.
1 parent be66f1b commit 5d7f96f

5 files changed

Lines changed: 341 additions & 119 deletions

File tree

deployment/caodeploy/deployer.go

Lines changed: 125 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@ import (
66
"time"
77

88
"github.com/couchbase/gocbcorex"
9+
"github.com/couchbase/gocbcorex/cbhttpx"
10+
"github.com/couchbase/gocbcorex/cbmgmtx"
911
"github.com/couchbaselabs/cbdinocluster/clusterdef"
1012
"github.com/couchbaselabs/cbdinocluster/deployment"
1113
"github.com/couchbaselabs/cbdinocluster/deployment/commondeploy"
1214
"github.com/couchbaselabs/cbdinocluster/utils/caocontrol"
1315
"github.com/couchbaselabs/cbdinocluster/utils/cbdcuuid"
16+
"github.com/couchbaselabs/gocbconnstr/v2"
1417
"github.com/pkg/errors"
1518
"go.uber.org/zap"
1619
corev1 "k8s.io/api/core/v1"
@@ -23,6 +26,27 @@ const (
2326
CngServiceName = CouchbaseClusterName + "-cloud-native-gateway-service"
2427
)
2528

29+
func withAgent[T any](d *Deployer, ctx context.Context, clusterID string, fn func(agent *gocbcorex.Agent) (T, error)) (T, error) {
30+
agent, err := d.getAgent(ctx, clusterID, "")
31+
if err != nil {
32+
var zero T
33+
return zero, errors.Wrap(err, "failed to get cluster agent")
34+
}
35+
defer agent.Close()
36+
37+
return fn(agent)
38+
}
39+
40+
func withMgmtx[T any](d *Deployer, ctx context.Context, clusterID string, fn func(helper commondeploy.MgmtxHelper) (T, error)) (T, error) {
41+
mgmt, err := d.getMgmtx(ctx, clusterID)
42+
if err != nil {
43+
var zero T
44+
return zero, errors.Wrap(err, "failed to get mgmt client")
45+
}
46+
47+
return fn(commondeploy.MgmtxHelper{Mgmt: mgmt})
48+
}
49+
2650
type Deployer struct {
2751
logger *zap.Logger
2852
client *caocontrol.Controller
@@ -42,63 +66,57 @@ func NewDeployer(opts *NewDeployerOptions) (*Deployer, error) {
4266
}, nil
4367
}
4468

45-
func (d *Deployer) getAgent(ctx context.Context, clusterID string, bucketName string) (*gocbcorex.Agent, error) {
69+
func (d *Deployer) getAdminAuth(ctx context.Context, clusterID string) (string, string, error) {
4670
namespaceName, err := d.getClusterNamespace(ctx, clusterID)
4771
if err != nil {
48-
return nil, errors.Wrap(err, "failed to get cluster namespace")
72+
return "", "", errors.Wrap(err, "failed to get cluster namespace")
4973
}
5074

5175
secret, err := d.client.GetSecret(ctx, namespaceName, "cbdc2-admin-auth")
5276
if err != nil {
53-
return nil, errors.Wrap(err, "failed to get admin auth secret")
77+
return "", "", errors.Wrap(err, "failed to get admin auth secret")
5478
}
5579

5680
username := string(secret.Data[corev1.BasicAuthUsernameKey])
5781
password := string(secret.Data[corev1.BasicAuthPasswordKey])
5882

59-
service, err := d.client.GetService(ctx, namespaceName, CouchbaseClusterName+"-ui")
83+
return username, password, nil
84+
}
85+
86+
func (d *Deployer) getAgent(ctx context.Context, clusterID string, bucketName string) (*gocbcorex.Agent, error) {
87+
username, password, err := d.getAdminAuth(ctx, clusterID)
6088
if err != nil {
61-
return nil, errors.Wrap(err, "failed to get cluster ui service")
89+
return nil, errors.Wrap(err, "failed to get admin auth")
6290
}
6391

64-
nodes, err := d.client.GetNodes(ctx)
92+
connectInfo, err := d.GetConnectInfo(ctx, clusterID)
6593
if err != nil {
66-
return nil, errors.Wrap(err, "failed to get k8s nodes")
94+
return nil, errors.Wrap(err, "failed to get connect info")
6795
}
6896

69-
var externalIP string
70-
for _, node := range nodes.Items {
71-
for _, address := range node.Status.Addresses {
72-
externalIP = address.Address
73-
break
74-
}
75-
if externalIP != "" {
76-
break
77-
}
78-
}
79-
if externalIP == "" {
80-
return nil, errors.New("could not identify node IP to use")
97+
if connectInfo.ConnStr == "" {
98+
return nil, errors.New("no data endpoint available")
8199
}
82100

83-
var httpPort int32
84-
var memdPort int32
85-
for _, port := range service.Spec.Ports {
86-
switch port.Name {
87-
case "couchbase-ui":
88-
httpPort = port.NodePort
89-
case "data":
90-
memdPort = port.NodePort
91-
}
101+
baseSpec, err := gocbconnstr.Parse(connectInfo.ConnStr)
102+
if err != nil {
103+
return nil, errors.Wrap(err, "failed to parse connstr")
92104
}
93-
if httpPort == 0 {
94-
return nil, errors.New("could not identify mgmt port")
105+
106+
resolvedSpec, err := gocbconnstr.Resolve(baseSpec)
107+
if err != nil {
108+
return nil, errors.Wrap(err, "failed to resolve connstr")
95109
}
96-
if memdPort == 0 {
97-
return nil, errors.New("could not identify data port")
110+
111+
var httpAddrs []string
112+
for _, host := range resolvedSpec.HttpHosts {
113+
httpAddrs = append(httpAddrs, fmt.Sprintf("%s:%d", host.Host, host.Port))
98114
}
99115

100-
httpEndpoint := fmt.Sprintf("%s:%d", externalIP, httpPort)
101-
memdEndpoint := fmt.Sprintf("%s:%d", externalIP, memdPort)
116+
var memdAddrs []string
117+
for _, host := range resolvedSpec.MemdHosts {
118+
memdAddrs = append(memdAddrs, fmt.Sprintf("%s:%d", host.Host, host.Port))
119+
}
102120

103121
agent, err := gocbcorex.CreateAgent(ctx, gocbcorex.AgentOptions{
104122
Logger: d.logger.Named("agent"),
@@ -109,8 +127,8 @@ func (d *Deployer) getAgent(ctx context.Context, clusterID string, bucketName st
109127
Password: password,
110128
},
111129
SeedConfig: gocbcorex.SeedConfig{
112-
HTTPAddrs: []string{httpEndpoint},
113-
MemdAddrs: []string{memdEndpoint},
130+
HTTPAddrs: httpAddrs,
131+
MemdAddrs: memdAddrs,
114132
NetworkType: "external",
115133
},
116134
})
@@ -121,6 +139,35 @@ func (d *Deployer) getAgent(ctx context.Context, clusterID string, bucketName st
121139
return agent, nil
122140
}
123141

142+
func (d *Deployer) getMgmtx(ctx context.Context, clusterID string) (*cbmgmtx.Management, error) {
143+
username, password, err := d.getAdminAuth(ctx, clusterID)
144+
if err != nil {
145+
return nil, errors.Wrap(err, "failed to get admin auth")
146+
}
147+
148+
connectInfo, err := d.GetConnectInfo(ctx, clusterID)
149+
if err != nil {
150+
return nil, errors.Wrap(err, "failed to get connect info")
151+
}
152+
153+
if connectInfo.Mgmt == "" && connectInfo.MgmtTls == "" {
154+
return nil, errors.New("no management endpoint available")
155+
}
156+
157+
endpoint := connectInfo.Mgmt
158+
if endpoint == "" {
159+
endpoint = connectInfo.MgmtTls
160+
}
161+
162+
return &cbmgmtx.Management{
163+
Endpoint: endpoint,
164+
Auth: &cbhttpx.BasicAuth{
165+
Username: username,
166+
Password: password,
167+
},
168+
}, nil
169+
}
170+
124171
func (d *Deployer) formatExpiry(expiry time.Time) string {
125172
if expiry.IsZero() {
126173
return "none"
@@ -378,6 +425,15 @@ func (d *Deployer) NewCluster(ctx context.Context, def *clusterdef.Cluster) (dep
378425
}
379426
}
380427

428+
if isOpenShift {
429+
// In OpenShift, the only way to access the cluster is through a route, so we
430+
// set it up by default every time the cluster is allocated.
431+
err = d.EnableIngresses(ctx, clusterID.String())
432+
if err != nil {
433+
return nil, errors.Wrap(err, "failed to enable ingresses")
434+
}
435+
}
436+
381437
return ClusterInfo{
382438
ClusterID: clusterID.String(),
383439
Expiry: time.Time{},
@@ -587,6 +643,22 @@ func (d *Deployer) DisableIngresses(ctx context.Context, clusterID string) error
587643
}
588644

589645
func (d *Deployer) GetConnectInfo(ctx context.Context, clusterID string) (*deployment.ConnectInfo, error) {
646+
isOpenShift, err := d.client.IsOpenShift(ctx)
647+
if err != nil {
648+
return nil, errors.Wrap(err, "failed to detect whether we are using openshift")
649+
}
650+
651+
if isOpenShift {
652+
// In OpenShift, it's not possible to directly address the cluster externally, so we just
653+
// use the ingress connectivity information unambiguously.
654+
ingressConnInfo, err := d.GetIngressConnectInfo(ctx, clusterID)
655+
if err != nil {
656+
return nil, errors.Wrap(err, "failed to get ingress connect info")
657+
}
658+
659+
return ingressConnInfo, nil
660+
}
661+
590662
namespaceName, err := d.getClusterNamespace(ctx, clusterID)
591663
if err != nil {
592664
return nil, err
@@ -732,33 +804,22 @@ func (d *Deployer) DeleteUser(ctx context.Context, clusterID string, username st
732804
return errors.New("caodeploy does not support deleting users")
733805
}
734806

735-
func withAgent[T any](d *Deployer, ctx context.Context, clusterID string, fn func(agent *gocbcorex.Agent) (T, error)) (T, error) {
736-
agent, err := d.getAgent(ctx, clusterID, "")
737-
if err != nil {
738-
var zero T
739-
return zero, errors.Wrap(err, "failed to get cluster agent")
740-
}
741-
defer agent.Close()
742-
743-
return fn(agent)
744-
}
745-
746807
func (d *Deployer) ListBuckets(ctx context.Context, clusterID string) ([]deployment.BucketInfo, error) {
747-
return withAgent(d, ctx, clusterID, func(agent *gocbcorex.Agent) ([]deployment.BucketInfo, error) {
748-
return commondeploy.ClusterHelper{Agent: agent}.ListBuckets(ctx)
808+
return withMgmtx(d, ctx, clusterID, func(h commondeploy.MgmtxHelper) ([]deployment.BucketInfo, error) {
809+
return h.ListBuckets(ctx)
749810
})
750811
}
751812

752813
func (d *Deployer) CreateBucket(ctx context.Context, clusterID string, opts *deployment.CreateBucketOptions) error {
753-
_, err := withAgent(d, ctx, clusterID, func(agent *gocbcorex.Agent) (struct{}, error) {
754-
return struct{}{}, commondeploy.ClusterHelper{Agent: agent}.CreateBucket(ctx, opts)
814+
_, err := withMgmtx(d, ctx, clusterID, func(h commondeploy.MgmtxHelper) (struct{}, error) {
815+
return struct{}{}, h.CreateBucket(ctx, opts)
755816
})
756817
return err
757818
}
758819

759820
func (d *Deployer) DeleteBucket(ctx context.Context, clusterID string, bucketName string) error {
760-
_, err := withAgent(d, ctx, clusterID, func(agent *gocbcorex.Agent) (struct{}, error) {
761-
return struct{}{}, commondeploy.ClusterHelper{Agent: agent}.DeleteBucket(ctx, bucketName)
821+
_, err := withMgmtx(d, ctx, clusterID, func(h commondeploy.MgmtxHelper) (struct{}, error) {
822+
return struct{}{}, h.DeleteBucket(ctx, bucketName)
762823
})
763824
return err
764825
}
@@ -796,40 +857,40 @@ func (d *Deployer) GetMetrics(ctx context.Context, clusterID string) (string, er
796857

797858
func (d *Deployer) ExecuteQuery(ctx context.Context, clusterID string, query string) (string, error) {
798859
return withAgent(d, ctx, clusterID, func(agent *gocbcorex.Agent) (string, error) {
799-
return commondeploy.ClusterHelper{Agent: agent}.ExecuteQuery(ctx, query)
860+
return commondeploy.AgentHelper{Agent: agent}.ExecuteQuery(ctx, query)
800861
})
801862
}
802863

803864
func (d *Deployer) ListCollections(ctx context.Context, clusterID string, bucketName string) ([]deployment.ScopeInfo, error) {
804-
return withAgent(d, ctx, clusterID, func(agent *gocbcorex.Agent) ([]deployment.ScopeInfo, error) {
805-
return commondeploy.ClusterHelper{Agent: agent}.ListCollections(ctx, bucketName)
865+
return withMgmtx(d, ctx, clusterID, func(h commondeploy.MgmtxHelper) ([]deployment.ScopeInfo, error) {
866+
return h.ListCollections(ctx, bucketName)
806867
})
807868
}
808869

809870
func (d *Deployer) CreateScope(ctx context.Context, clusterID string, bucketName, scopeName string) error {
810-
_, err := withAgent(d, ctx, clusterID, func(agent *gocbcorex.Agent) (struct{}, error) {
811-
return struct{}{}, commondeploy.ClusterHelper{Agent: agent}.CreateScope(ctx, bucketName, scopeName)
871+
_, err := withMgmtx(d, ctx, clusterID, func(h commondeploy.MgmtxHelper) (struct{}, error) {
872+
return struct{}{}, h.CreateScope(ctx, bucketName, scopeName)
812873
})
813874
return err
814875
}
815876

816877
func (d *Deployer) CreateCollection(ctx context.Context, clusterID string, bucketName, scopeName, collectionName string) error {
817-
_, err := withAgent(d, ctx, clusterID, func(agent *gocbcorex.Agent) (struct{}, error) {
818-
return struct{}{}, commondeploy.ClusterHelper{Agent: agent}.CreateCollection(ctx, bucketName, scopeName, collectionName)
878+
_, err := withMgmtx(d, ctx, clusterID, func(h commondeploy.MgmtxHelper) (struct{}, error) {
879+
return struct{}{}, h.CreateCollection(ctx, bucketName, scopeName, collectionName)
819880
})
820881
return err
821882
}
822883

823884
func (d *Deployer) DeleteScope(ctx context.Context, clusterID string, bucketName, scopeName string) error {
824-
_, err := withAgent(d, ctx, clusterID, func(agent *gocbcorex.Agent) (struct{}, error) {
825-
return struct{}{}, commondeploy.ClusterHelper{Agent: agent}.DeleteScope(ctx, bucketName, scopeName)
885+
_, err := withMgmtx(d, ctx, clusterID, func(h commondeploy.MgmtxHelper) (struct{}, error) {
886+
return struct{}{}, h.DeleteScope(ctx, bucketName, scopeName)
826887
})
827888
return err
828889
}
829890

830891
func (d *Deployer) DeleteCollection(ctx context.Context, clusterID string, bucketName, scopeName, collectionName string) error {
831-
_, err := withAgent(d, ctx, clusterID, func(agent *gocbcorex.Agent) (struct{}, error) {
832-
return struct{}{}, commondeploy.ClusterHelper{Agent: agent}.DeleteCollection(ctx, bucketName, scopeName, collectionName)
892+
_, err := withMgmtx(d, ctx, clusterID, func(h commondeploy.MgmtxHelper) (struct{}, error) {
893+
return struct{}{}, h.DeleteCollection(ctx, bucketName, scopeName, collectionName)
833894
})
834895
return err
835896
}

0 commit comments

Comments
 (0)