Skip to content

Commit 19f64ec

Browse files
feat(launcher): Add credential refresh to launcher (#12708)
Signed-off-by: Caroline DeVoto <[email protected]> Co-authored-by: Caroline DeVoto <[email protected]> Co-authored-by: droctothorpe <[email protected]>
1 parent 22731c6 commit 19f64ec

File tree

2 files changed

+57
-4
lines changed

2 files changed

+57
-4
lines changed

backend/src/v2/component/launcher_v2.go

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,14 @@ func stopWaitingArtifacts(artifacts map[string]*pipelinespec.ArtifactList) {
162162
}
163163
}
164164

165+
// OpenBucketConfig stores the parameters that are passed into the objectstore.OpenBucket function.
166+
type OpenBucketConfig struct {
167+
ctx context.Context
168+
k8sClient kubernetes.Interface
169+
namespace string
170+
config *objectstore.Config
171+
}
172+
165173
// Execute calls executeV2, updates the cache, and publishes the results to MLMD.
166174
func (l *LauncherV2) Execute(ctx context.Context) (err error) {
167175
defer func() {
@@ -218,7 +226,20 @@ func (l *LauncherV2) Execute(ctx context.Context) (err error) {
218226
if err != nil {
219227
return err
220228
}
221-
bucket, err := objectstore.OpenBucket(ctx, l.clientManager.K8sClient(), l.options.Namespace, bucketConfig)
229+
230+
openBucketConfig := &OpenBucketConfig{
231+
ctx: ctx,
232+
k8sClient: l.clientManager.K8sClient(),
233+
namespace: l.options.Namespace,
234+
config: bucketConfig,
235+
}
236+
237+
bucket, err := objectstore.OpenBucket(
238+
openBucketConfig.ctx,
239+
openBucketConfig.k8sClient,
240+
openBucketConfig.namespace,
241+
openBucketConfig.config,
242+
)
222243
if err != nil {
223244
return err
224245
}
@@ -238,6 +259,7 @@ func (l *LauncherV2) Execute(ctx context.Context) (err error) {
238259
l.clientManager.K8sClient(),
239260
l.options.PublishLogs,
240261
l.options.CaCertPath,
262+
openBucketConfig,
241263
)
242264
if err != nil {
243265
return err
@@ -361,6 +383,7 @@ func executeV2(
361383
k8sClient kubernetes.Interface,
362384
publishLogs string,
363385
customCAPath string,
386+
openBucketConfig *OpenBucketConfig,
364387
) (*pipelinespec.ExecutorOutput, []*metadata.OutputArtifact, error) {
365388

366389
// Add parameter default values to executorInput, if there is not already a user input.
@@ -399,15 +422,38 @@ func executeV2(
399422
if err != nil {
400423
return nil, nil, err
401424
}
402-
// TODO(Bobgy): should we log metadata per each artifact, or batched after uploading all artifacts.
425+
403426
outputArtifacts, err := uploadOutputArtifacts(ctx, executorInput, executorOutput, uploadOutputArtifactsOptions{
404427
bucketConfig: bucketConfig,
405428
bucket: bucket,
406429
metadataClient: metadataClient,
407430
})
431+
408432
if err != nil {
409-
return nil, nil, err
433+
glog.Errorf("Failed to upload output artifacts: %v", err)
434+
435+
glog.Info("Refreshing credentials before retrying artifacts upload.")
436+
bucket, err = objectstore.OpenBucket(
437+
openBucketConfig.ctx,
438+
openBucketConfig.k8sClient,
439+
openBucketConfig.namespace,
440+
openBucketConfig.config,
441+
)
442+
if err != nil {
443+
return nil, nil, err
444+
}
445+
446+
glog.Info("Executing second uploadOutputArtifacts attempt.")
447+
outputArtifacts, err = uploadOutputArtifacts(ctx, executorInput, executorOutput, uploadOutputArtifactsOptions{
448+
bucketConfig: bucketConfig,
449+
bucket: bucket,
450+
metadataClient: metadataClient,
451+
})
452+
if err != nil {
453+
return nil, nil, err
454+
}
410455
}
456+
411457
// TODO(Bobgy): only return executor output. Merge info in output artifacts
412458
// to executor output.
413459
return executorOutput, outputArtifacts, nil
@@ -612,7 +658,12 @@ type uploadOutputArtifactsOptions struct {
612658
metadataClient metadata.ClientInterface
613659
}
614660

615-
func uploadOutputArtifacts(ctx context.Context, executorInput *pipelinespec.ExecutorInput, executorOutput *pipelinespec.ExecutorOutput, opts uploadOutputArtifactsOptions) ([]*metadata.OutputArtifact, error) {
661+
func uploadOutputArtifacts(
662+
ctx context.Context,
663+
executorInput *pipelinespec.ExecutorInput,
664+
executorOutput *pipelinespec.ExecutorOutput,
665+
opts uploadOutputArtifactsOptions,
666+
) ([]*metadata.OutputArtifact, error) {
616667
// Register artifacts with MLMD.
617668
outputArtifacts := make([]*metadata.OutputArtifact, 0, len(executorInput.GetOutputs().GetArtifacts()))
618669
for name, artifactList := range executorInput.GetOutputs().GetArtifacts() {

backend/src/v2/component/launcher_v2_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ func Test_executeV2_Parameters(t *testing.T) {
103103
fakeKubernetesClientset,
104104
"false",
105105
"",
106+
&OpenBucketConfig{context.Background(), fakeKubernetesClientset, "namespace", bucketConfig},
106107
)
107108

108109
if test.wantErr {
@@ -165,6 +166,7 @@ func Test_executeV2_publishLogs(t *testing.T) {
165166
fakeKubernetesClientset,
166167
"false",
167168
"",
169+
&OpenBucketConfig{context.Background(), fakeKubernetesClientset, "namespace", bucketConfig},
168170
)
169171

170172
if test.wantErr {

0 commit comments

Comments
 (0)