Skip to content

Commit 32db107

Browse files
author
Amit Kumar Das
authored
feat(client): add resuable kubernetes client package (#172)
Signed-off-by: AmitKumarDas <[email protected]>
1 parent 5bf151d commit 32db107

9 files changed

+196
-82
lines changed

pkg/kubernetes/retry.go

+30-9
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,24 @@ func (rt *RetryTimeout) Error() string {
3737
// Retryable helps executing user provided functions as
3838
// conditions in a repeated manner till this condition succeeds
3939
type Retryable struct {
40-
Message string
40+
//Message string
4141

4242
WaitTimeout time.Duration
4343
WaitInterval time.Duration
44+
45+
// RunOnce will run the function only once
46+
//
47+
// NOTE:
48+
// In other words, this makes the retry option
49+
// as a No Operation i.e. noop
50+
RunOnce bool
4451
}
4552

4653
// RetryConfig helps in creating an instance of Retryable
4754
type RetryConfig struct {
4855
WaitTimeout *time.Duration
4956
WaitInterval *time.Duration
57+
RunOnce bool
5058
}
5159

5260
// NewRetry returns a new instance of Retryable
@@ -55,34 +63,46 @@ func NewRetry(config RetryConfig) *Retryable {
5563
timeout := 60 * time.Second
5664
// sleep interval defaults to 1 second
5765
interval := 1 * time.Second
66+
5867
// override timeout with user specified value
5968
if config.WaitTimeout != nil {
6069
timeout = *config.WaitTimeout
6170
}
71+
6272
// override interval with user specified value
6373
if config.WaitInterval != nil {
6474
interval = *config.WaitInterval
6575
}
76+
6677
return &Retryable{
6778
WaitTimeout: timeout,
6879
WaitInterval: interval,
80+
RunOnce: config.RunOnce,
6981
}
7082
}
7183

7284
// Waitf retries this provided function as a condition till
7385
// this condition succeeds.
7486
//
75-
// Clients invoking this method need to return appropriate
76-
// values in the function implementation to let this function
77-
// to be either returned, or exited or retried.
87+
// NOTE:
88+
// Clients invoking this method need to return appropriate
89+
// values (i.e. bool & error) within the condition implementation.
90+
// These return values let the condition to be either returned or
91+
// retried.
7892
func (r *Retryable) Waitf(
79-
condition func() (bool, error),
80-
message string,
81-
args ...interface{},
93+
condition func() (bool, error), // condition that gets retried
94+
msgFormat string,
95+
msgArgs ...interface{},
8296
) error {
97+
if r.RunOnce {
98+
// No need to retry if this condition is meant to be run once
99+
_, err := condition()
100+
return err
101+
}
102+
83103
context := fmt.Sprintf(
84-
message,
85-
args...,
104+
msgFormat,
105+
msgArgs...,
86106
)
87107
// mark the start time
88108
start := time.Now()
@@ -130,6 +150,7 @@ func (r *Retryable) Waitf(
130150
context,
131151
)
132152
}
153+
// retry after sleeping for specified interval
133154
time.Sleep(r.WaitInterval)
134155
}
135156
}

pkg/kubernetes/utility.go

+134-53
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@ limitations under the License.
1717
package kubernetes
1818

1919
import (
20+
"sync"
21+
"time"
22+
2023
"github.com/pkg/errors"
2124
apiextnv1beta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1"
25+
"k8s.io/client-go/discovery"
2226
"k8s.io/client-go/kubernetes"
2327
"k8s.io/client-go/rest"
2428
"k8s.io/klog/v2"
@@ -47,13 +51,14 @@ type UtilityFuncs struct {
4751
getAPIForAPIVersionAndResourceFn func(string, string) *dynamicdiscovery.APIResource
4852
}
4953

50-
// Utility exposes instances needed to invoke kubernetes
51-
// api operations
54+
// Utility provides options to invoke kubernetes APIs
5255
type Utility struct {
5356
*UtilityFuncs
5457
Retry *Retryable
5558

56-
apiDiscovery *dynamicdiscovery.APIResourceDiscovery
59+
kubeConfig *rest.Config
60+
61+
apiResourceDiscovery *dynamicdiscovery.APIResourceDiscovery
5762

5863
// dynamic client to invoke kubernetes operations
5964
// against kubernetes native as well as custom resources
@@ -79,53 +84,62 @@ type Utility struct {
7984
err error
8085
}
8186

82-
func (u *Utility) setTeardownFlag(config UtilityConfig) {
83-
u.isTeardown = config.IsTeardown
84-
}
87+
var doOnce sync.Once
8588

86-
func (u *Utility) setAPIDiscovery(config UtilityConfig) {
87-
u.apiDiscovery = config.APIDiscovery
88-
}
89+
// singleton instance of Utility to be used across the
90+
// project to invoke Kubernetes operations
91+
var singleton *Utility
8992

90-
func (u *Utility) setCRDClient(config UtilityConfig) {
91-
if config.KubeConfig == nil {
92-
u.err = errors.Errorf(
93-
"Failed to set crd client: Nil kube config provided",
94-
)
95-
return
93+
// Singleton returns a new or existing instance of Utility
94+
func Singleton(config UtilityConfig) (*Utility, error) {
95+
if singleton != nil {
96+
return singleton, nil
9697
}
97-
u.crdClient, u.err = apiextnv1beta1.NewForConfig(
98-
config.KubeConfig,
99-
)
98+
var err error
99+
doOnce.Do(func() {
100+
singleton, err = NewUtility(config)
101+
})
102+
return singleton, err
100103
}
101104

102-
func (u *Utility) setDynamicClientset(config UtilityConfig) {
103-
u.dynamicClientset, u.err = dynamicclientset.New(
104-
config.KubeConfig,
105-
config.APIDiscovery,
106-
)
107-
}
108-
109-
func (u *Utility) setKubeClientset(config UtilityConfig) {
110-
u.kubeClientset, u.err = kubernetes.NewForConfig(
111-
config.KubeConfig,
112-
)
113-
}
114-
115-
// NewUtility returns a new instance of Fixture
105+
// NewUtility returns a new instance of Kubernetes utility
116106
func NewUtility(config UtilityConfig) (*Utility, error) {
117-
// check retry
107+
// initialize retry instance to default behaviour
118108
var retry = NewRetry(RetryConfig{})
119109
if config.Retry != nil {
110+
// override from config if available
120111
retry = config.Retry
121112
}
113+
114+
// initialize utility instance
122115
u := &Utility{
123116
UtilityFuncs: &UtilityFuncs{},
124117
Retry: retry,
118+
kubeConfig: config.KubeConfig,
125119
}
120+
121+
if u.kubeConfig == nil {
122+
// Set kube config to in-cluster config
123+
u.kubeConfig, u.err = rest.InClusterConfig()
124+
if u.err != nil {
125+
return nil, errors.Wrapf(
126+
u.err,
127+
"Failed to create k8s utility",
128+
)
129+
}
130+
}
131+
132+
// setup options to mutate utility instance
133+
// based on the provided config
134+
//
135+
// NOTE:
136+
// Following order needs to be maintained
126137
var setters = []func(UtilityConfig){
138+
// pre settings
127139
u.setTeardownFlag,
128-
u.setAPIDiscovery,
140+
u.setAPIResourceDiscoveryOrDefault,
141+
142+
// post settings
129143
u.setCRDClient,
130144
u.setDynamicClientset,
131145
u.setKubeClientset,
@@ -139,11 +153,50 @@ func NewUtility(config UtilityConfig) (*Utility, error) {
139153
return u, nil
140154
}
141155

142-
// Teardown deletes resources created through this instance
143-
func (u *Utility) Teardown() {
144-
if !u.isTeardown {
156+
func (u *Utility) setTeardownFlag(config UtilityConfig) {
157+
u.isTeardown = config.IsTeardown
158+
}
159+
160+
func (u *Utility) setAPIResourceDiscoveryOrDefault(config UtilityConfig) {
161+
u.apiResourceDiscovery = config.APIDiscovery
162+
if u.apiResourceDiscovery != nil {
145163
return
146164
}
165+
discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(
166+
u.kubeConfig,
167+
)
168+
d := dynamicdiscovery.NewAPIResourceDiscoverer(discoveryClient)
169+
170+
// This needs to be started with appropriate refresh interval
171+
// before being used. We set to 30 seconds discovery interval.
172+
// In other words, if new CRDs are applied then it will take
173+
// this interval for these new CRDs to be discovered.
174+
d.Start(time.Duration(30) * time.Second)
175+
176+
u.apiResourceDiscovery = d
177+
}
178+
179+
func (u *Utility) setCRDClient(config UtilityConfig) {
180+
u.crdClient, u.err = apiextnv1beta1.NewForConfig(
181+
u.kubeConfig,
182+
)
183+
}
184+
185+
func (u *Utility) setDynamicClientset(config UtilityConfig) {
186+
u.dynamicClientset, u.err = dynamicclientset.New(
187+
u.kubeConfig,
188+
u.apiResourceDiscovery, // this must be set previously
189+
)
190+
}
191+
192+
func (u *Utility) setKubeClientset(config UtilityConfig) {
193+
u.kubeClientset, u.err = kubernetes.NewForConfig(
194+
u.kubeConfig,
195+
)
196+
}
197+
198+
// MustTeardown deletes resources created through this instance
199+
func (u *Utility) MustTeardown() {
147200
// cleanup in descending order
148201
for i := len(u.teardownFuncs) - 1; i >= 0; i-- {
149202
teardown := u.teardownFuncs[i]
@@ -173,13 +226,31 @@ func (u *Utility) Teardown() {
173226
}
174227
}
175228

176-
// AddToTeardown adds the given teardown function to
229+
// Teardown optionally deletes resources created through this
230+
// instance
231+
func (u *Utility) Teardown() {
232+
if !u.isTeardown {
233+
return
234+
}
235+
u.MustTeardown()
236+
}
237+
238+
// MustAddToTeardown adds the given teardown function to
239+
// the list of teardown functions
240+
func (u *Utility) MustAddToTeardown(teardown func() error) {
241+
if teardown == nil {
242+
return
243+
}
244+
u.teardownFuncs = append(u.teardownFuncs, teardown)
245+
}
246+
247+
// AddToTeardown optionally adds the given teardown function to
177248
// the list of teardown functions
178249
func (u *Utility) AddToTeardown(teardown func() error) {
179250
if !u.isTeardown {
180251
return
181252
}
182-
u.teardownFuncs = append(u.teardownFuncs, teardown)
253+
u.MustAddToTeardown(teardown)
183254
}
184255

185256
// GetClientForAPIVersionAndKind returns the dynamic client for the
@@ -198,32 +269,42 @@ func (u *Utility) GetClientForAPIVersionAndKind(
198269
}
199270

200271
// GetClientForAPIVersionAndResource returns the dynamic client for the
201-
// given api version & resource
272+
// given api version & resource name
202273
func (u *Utility) GetClientForAPIVersionAndResource(
203274
apiversion string,
204-
resource string,
275+
resourceName string,
205276
) (*clientset.ResourceClient, error) {
206277
if u.getClientForAPIVersionAndResourceFn != nil {
207-
return u.getClientForAPIVersionAndResourceFn(apiversion, resource)
278+
return u.getClientForAPIVersionAndResourceFn(apiversion, resourceName)
208279
}
209280
return u.dynamicClientset.GetClientForAPIVersionAndResource(
210281
apiversion,
211-
resource,
282+
resourceName,
212283
)
213284
}
214285

215-
// GetAPIForAPIVersionAndResource returns the discovered api based
216-
// on the provided api version & resource
217-
func (u *Utility) GetAPIForAPIVersionAndResource(
286+
// GetAPIResourceForAPIVersionAndResourceName returns the
287+
// discovered API resource based on the provided api version &
288+
// resource name
289+
func (u *Utility) GetAPIResourceForAPIVersionAndResourceName(
218290
apiversion string,
219-
resource string,
291+
resourceName string,
220292
) *dynamicdiscovery.APIResource {
221293
if u.getAPIForAPIVersionAndResourceFn != nil {
222-
return u.getAPIForAPIVersionAndResourceFn(apiversion, resource)
294+
return u.getAPIForAPIVersionAndResourceFn(apiversion, resourceName)
223295
}
224-
return u.apiDiscovery.
225-
GetAPIForAPIVersionAndResource(
226-
apiversion,
227-
resource,
228-
)
296+
return u.apiResourceDiscovery.GetAPIForAPIVersionAndResource(
297+
apiversion,
298+
resourceName,
299+
)
300+
}
301+
302+
// GetAPIResourceDiscovery returns the api resource discovery instance
303+
func (u *Utility) GetAPIResourceDiscovery() *dynamicdiscovery.APIResourceDiscovery {
304+
return u.apiResourceDiscovery
305+
}
306+
307+
// GetKubeConfig returns the Kubernetes config instance
308+
func (u *Utility) GetKubeConfig() *rest.Config {
309+
return u.kubeConfig
229310
}

pkg/recipe/base.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@ limitations under the License.
1616

1717
package recipe
1818

19-
import types "mayadata.io/d-operators/types/recipe"
19+
import (
20+
"mayadata.io/d-operators/pkg/kubernetes"
21+
types "mayadata.io/d-operators/types/recipe"
22+
)
2023

2124
// BaseRunner is the common runner used by all action runners
2225
type BaseRunner struct {
2326
*Fixture
24-
TaskIndex int
25-
TaskName string
26-
Retry *Retryable
27+
TaskIndex int
28+
TaskName string
29+
//Retry *Retryable
30+
Retry *kubernetes.Retryable
2731
FailFastRule types.FailFastRule
2832
}
2933

0 commit comments

Comments
 (0)