Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 55 additions & 4 deletions backend/src/v2/component/launcher_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ func stopWaitingArtifacts(artifacts map[string]*pipelinespec.ArtifactList) {
}
}

// OpenBucketConfig stores the parameters that are passed into the objectstore.OpenBucket function.
type OpenBucketConfig struct {
ctx context.Context
k8sClient kubernetes.Interface
namespace string
config *objectstore.Config
}

// Execute calls executeV2, updates the cache, and publishes the results to MLMD.
func (l *LauncherV2) Execute(ctx context.Context) (err error) {
defer func() {
Expand Down Expand Up @@ -218,7 +226,20 @@ func (l *LauncherV2) Execute(ctx context.Context) (err error) {
if err != nil {
return err
}
bucket, err := objectstore.OpenBucket(ctx, l.clientManager.K8sClient(), l.options.Namespace, bucketConfig)

openBucketConfig := &OpenBucketConfig{
ctx: ctx,
k8sClient: l.clientManager.K8sClient(),
namespace: l.options.Namespace,
config: bucketConfig,
}

bucket, err := objectstore.OpenBucket(
openBucketConfig.ctx,
openBucketConfig.k8sClient,
openBucketConfig.namespace,
openBucketConfig.config,
)
if err != nil {
return err
}
Expand All @@ -238,6 +259,7 @@ func (l *LauncherV2) Execute(ctx context.Context) (err error) {
l.clientManager.K8sClient(),
l.options.PublishLogs,
l.options.CaCertPath,
openBucketConfig,
)
if err != nil {
return err
Expand Down Expand Up @@ -361,6 +383,7 @@ func executeV2(
k8sClient kubernetes.Interface,
publishLogs string,
customCAPath string,
openBucketConfig *OpenBucketConfig,
) (*pipelinespec.ExecutorOutput, []*metadata.OutputArtifact, error) {

// Add parameter default values to executorInput, if there is not already a user input.
Expand Down Expand Up @@ -399,15 +422,38 @@ func executeV2(
if err != nil {
return nil, nil, err
}
// TODO(Bobgy): should we log metadata per each artifact, or batched after uploading all artifacts.

outputArtifacts, err := uploadOutputArtifacts(ctx, executorInput, executorOutput, uploadOutputArtifactsOptions{
bucketConfig: bucketConfig,
bucket: bucket,
metadataClient: metadataClient,
})

if err != nil {
return nil, nil, err
glog.Errorf("Failed to upload output artifacts: %v", err)

glog.Info("Refreshing credentials before retrying artifacts upload.")
bucket, err = objectstore.OpenBucket(
openBucketConfig.ctx,
openBucketConfig.k8sClient,
openBucketConfig.namespace,
openBucketConfig.config,
)
if err != nil {
return nil, nil, err
}

glog.Info("Executing second uploadOutputArtifacts attempt.")
outputArtifacts, err = uploadOutputArtifacts(ctx, executorInput, executorOutput, uploadOutputArtifactsOptions{
bucketConfig: bucketConfig,
bucket: bucket,
metadataClient: metadataClient,
})
if err != nil {
return nil, nil, err
}
}

// TODO(Bobgy): only return executor output. Merge info in output artifacts
// to executor output.
return executorOutput, outputArtifacts, nil
Expand Down Expand Up @@ -612,7 +658,12 @@ type uploadOutputArtifactsOptions struct {
metadataClient metadata.ClientInterface
}

func uploadOutputArtifacts(ctx context.Context, executorInput *pipelinespec.ExecutorInput, executorOutput *pipelinespec.ExecutorOutput, opts uploadOutputArtifactsOptions) ([]*metadata.OutputArtifact, error) {
func uploadOutputArtifacts(
ctx context.Context,
executorInput *pipelinespec.ExecutorInput,
executorOutput *pipelinespec.ExecutorOutput,
opts uploadOutputArtifactsOptions,
) ([]*metadata.OutputArtifact, error) {
// Register artifacts with MLMD.
outputArtifacts := make([]*metadata.OutputArtifact, 0, len(executorInput.GetOutputs().GetArtifacts()))
for name, artifactList := range executorInput.GetOutputs().GetArtifacts() {
Expand Down
2 changes: 2 additions & 0 deletions backend/src/v2/component/launcher_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func Test_executeV2_Parameters(t *testing.T) {
fakeKubernetesClientset,
"false",
"",
&OpenBucketConfig{context.Background(), fakeKubernetesClientset, "namespace", bucketConfig},
)

if test.wantErr {
Expand Down Expand Up @@ -165,6 +166,7 @@ func Test_executeV2_publishLogs(t *testing.T) {
fakeKubernetesClientset,
"false",
"",
&OpenBucketConfig{context.Background(), fakeKubernetesClientset, "namespace", bucketConfig},
)

if test.wantErr {
Expand Down
Loading