@@ -91,6 +91,10 @@ type ClusterProxy interface {
9191 // GetLogCollector returns the machine log collector for the Kubernetes cluster.
9292 GetLogCollector () ClusterLogCollector
9393
94+ // Create creates objects using the clusterProxy client.
95+ // It will return an error if any object already exists.
96+ Create (ctx context.Context , resources []byte , options ... CreateOption ) error
97+
9498 // CreateOrUpdate creates or updates objects using the clusterProxy client
9599 CreateOrUpdate (ctx context.Context , resources []byte , options ... CreateOrUpdateOption ) error
96100
@@ -105,11 +109,42 @@ type ClusterProxy interface {
105109 Dispose (context.Context )
106110}
107111
112+ // createConfig contains options for use with Create.
113+ type createConfig struct {
114+ labelSelector labels.Selector
115+ createOpts []client.CreateOption
116+ pollTimeout , pollInterval time.Duration
117+ }
118+
119+ // CreateOption is a configuration option supplied to Create.
120+ type CreateOption func (* createConfig )
121+
122+ // CreateWithLabelSelector allows definition of the LabelSelector to be used in Create.
123+ func CreateWithLabelSelector (labelSelector labels.Selector ) CreateOption {
124+ return func (c * createConfig ) {
125+ c .labelSelector = labelSelector
126+ }
127+ }
128+
129+ // CreateWithCreateOpts allows definition of the Create options to be used in resource Create.
130+ func CreateWithCreateOpts (createOpts ... client.CreateOption ) CreateOption {
131+ return func (c * createConfig ) {
132+ c .createOpts = createOpts
133+ }
134+ }
135+
136+ // CreateWithPolling enables retries over the specified interval.
137+ func CreateWithPolling (pollTimeout , pollInterval time.Duration ) CreateOption {
138+ return func (c * createConfig ) {
139+ c .pollTimeout = pollTimeout
140+ c .pollInterval = pollInterval
141+ }
142+ }
143+
108144// createOrUpdateConfig contains options for use with CreateOrUpdate.
109145type createOrUpdateConfig struct {
110- labelSelector labels.Selector
111- createOpts []client.CreateOption
112- updateOpts []client.UpdateOption
146+ createConfig
147+ updateOpts []client.UpdateOption
113148}
114149
115150// CreateOrUpdateOption is a configuration option supplied to CreateOrUpdate.
@@ -136,6 +171,14 @@ func WithUpdateOpts(updateOpts ...client.UpdateOption) CreateOrUpdateOption {
136171 }
137172}
138173
174+ // WithPolling enables retries over the specified interval.
175+ func WithPolling (pollTimeout , pollInterval time.Duration ) CreateOrUpdateOption {
176+ return func (c * createOrUpdateConfig ) {
177+ c .pollTimeout = pollTimeout
178+ c .pollInterval = pollInterval
179+ }
180+ }
181+
139182// ClusterLogCollector defines an object that can collect logs from a machine.
140183type ClusterLogCollector interface {
141184 // CollectMachineLog collects log from a machine.
@@ -306,6 +349,56 @@ func (p *clusterProxy) GetCache(ctx context.Context) cache.Cache {
306349 return p .cache
307350}
308351
352+ // Create creates objects using the clusterProxy client.
353+ // It will return an error if any object already exists.
354+ // Defaults to use FieldValidation: strict, which can be overwritten with CreateOptions.
355+ func (p * clusterProxy ) Create (ctx context.Context , resources []byte , opts ... CreateOption ) error {
356+ Expect (ctx ).NotTo (BeNil (), "ctx is required for CreateOrUpdate" )
357+ Expect (resources ).NotTo (BeNil (), "resources is required for CreateOrUpdate" )
358+ labelSelector := labels .Everything ()
359+ config := & createConfig {}
360+ for _ , opt := range opts {
361+ opt (config )
362+ }
363+ if config .labelSelector != nil {
364+ labelSelector = config .labelSelector
365+ }
366+ // Prepending field validation strict so that it is used per default, but can still be overwritten.
367+ config .createOpts = append ([]client.CreateOption {client .FieldValidation ("Strict" )}, config .createOpts ... )
368+ objs , err := yaml .ToUnstructured (resources )
369+ if err != nil {
370+ return err
371+ }
372+
373+ retryDisabled := config .pollTimeout == 0 && config .pollInterval == 0
374+ var retErrs []error
375+ for _ , o := range objs {
376+ labels := labels .Set (o .GetLabels ())
377+ if labelSelector .Matches (labels ) {
378+ var err error
379+ if retryDisabled {
380+ err = p .GetClient ().Create (ctx , & o , config .createOpts ... )
381+ } else {
382+ err = wait .PollUntilContextTimeout (ctx , config .pollInterval , config .pollTimeout , true /*immediate*/ , func (ctx context.Context ) (bool , error ) {
383+ if err := p .GetClient ().Create (ctx , & o , config .createOpts ... ); err != nil {
384+ if apierrors .IsAlreadyExists (err ) {
385+ // Retrying won't help. Abort early.
386+ return false , fmt .Errorf ("create %s %s %s: %v" , o .GetAPIVersion (), o .GetKind (), klog .KObj (& o ), err )
387+ }
388+ log .Logf ("error creating %s %s %s, will retry: %v" , o .GetAPIVersion (), o .GetKind (), klog .KObj (& o ), err )
389+ return false , nil
390+ }
391+ return true , nil
392+ })
393+ }
394+ if err != nil {
395+ retErrs = append (retErrs , err )
396+ }
397+ }
398+ }
399+ return kerrors .NewAggregate (retErrs )
400+ }
401+
309402// CreateOrUpdate creates or updates objects using the clusterProxy client.
310403// Defaults to use FieldValidation: strict, which can be overwritten with CreateOrUpdateOptions.
311404func (p * clusterProxy ) CreateOrUpdate (ctx context.Context , resources []byte , opts ... CreateOrUpdateOption ) error {
@@ -327,6 +420,7 @@ func (p *clusterProxy) CreateOrUpdate(ctx context.Context, resources []byte, opt
327420 return err
328421 }
329422
423+ retryDisabled := config .pollTimeout == 0 && config .pollInterval == 0
330424 existingObject := & unstructured.Unstructured {}
331425 var retErrs []error
332426 for _ , o := range objs {
@@ -341,15 +435,43 @@ func (p *clusterProxy) CreateOrUpdate(ctx context.Context, resources []byte, opt
341435 if err := p .GetClient ().Get (ctx , objectKey , existingObject ); err != nil {
342436 // Expected error -- if the object does not exist, create it
343437 if apierrors .IsNotFound (err ) {
344- if err := p .GetClient ().Create (ctx , & o , config .createOpts ... ); err != nil {
438+ var err error
439+ if retryDisabled {
440+ err = p .GetClient ().Create (ctx , & o , config .createOpts ... )
441+ } else {
442+ err = wait .PollUntilContextTimeout (ctx , config .pollInterval , config .pollTimeout , true /*immediate*/ , func (ctx context.Context ) (bool , error ) {
443+ if err := p .GetClient ().Create (ctx , & o , config .createOpts ... ); err != nil {
444+ if apierrors .IsAlreadyExists (err ) {
445+ // Retrying won't help. Abort early.
446+ return false , fmt .Errorf ("create %s %s %s: %v" , o .GetAPIVersion (), o .GetKind (), klog .KObj (& o ), err )
447+ }
448+ log .Logf ("error creating %s %s %s, will retry: %v" , o .GetAPIVersion (), o .GetKind (), klog .KObj (& o ), err )
449+ return false , nil
450+ }
451+ return true , nil
452+ })
453+ }
454+ if err != nil {
345455 retErrs = append (retErrs , err )
346456 }
347457 } else {
348458 retErrs = append (retErrs , err )
349459 }
350460 } else {
351461 o .SetResourceVersion (existingObject .GetResourceVersion ())
352- if err := p .GetClient ().Update (ctx , & o , config .updateOpts ... ); err != nil {
462+ var err error
463+ if retryDisabled {
464+ err = p .GetClient ().Update (ctx , & o , config .updateOpts ... )
465+ } else {
466+ err = wait .PollUntilContextTimeout (ctx , config .pollInterval , config .pollTimeout , true /*immediate*/ , func (ctx context.Context ) (bool , error ) {
467+ if err := p .GetClient ().Update (ctx , & o , config .updateOpts ... ); err != nil {
468+ log .Logf ("error creating %s %s %s, will retry: %v" , o .GetAPIVersion (), o .GetKind (), klog .KObj (& o ), err )
469+ return false , nil
470+ }
471+ return true , nil
472+ })
473+ }
474+ if err != nil {
353475 retErrs = append (retErrs , err )
354476 }
355477 }
0 commit comments