Skip to content

Commit c31a346

Browse files
committed
fix(backend): persistence agent - workflow not found error should be a permanent error (#4486)
* fix(backend): workflow not found error should be permanent * failing test case * Fix logic * fix another case * Switched to not found error * not found error should be permanent
1 parent 668a3ec commit c31a346

File tree

5 files changed

+88
-9
lines changed

5 files changed

+88
-9
lines changed

backend/src/agent/persistence/client/pipeline_client.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,10 @@ func (p *PipelineClient) ReportWorkflow(workflow *util.Workflow) error {
8585

8686
if err != nil {
8787
statusCode, _ := status.FromError(err)
88-
if statusCode.Code() == codes.InvalidArgument {
89-
// Do not retry if there is something wrong with the workflow
88+
if statusCode.Code() == codes.InvalidArgument || statusCode.Code() == codes.NotFound {
89+
// Do not retry if either:
90+
// * there is something wrong with the workflow
91+
// * the workflow has been deleted by someone else
9092
return util.NewCustomError(err, util.CUSTOM_CODE_PERMANENT,
9193
"Error while reporting workflow resource (code: %v, message: %v): %v, %+v",
9294
statusCode.Code(),

backend/src/apiserver/client/workflow_fake.go

+19-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ import (
2323
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
2424
"github.com/golang/glog"
2525
"github.com/pkg/errors"
26+
k8errors "k8s.io/apimachinery/pkg/api/errors"
2627
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
k8schema "k8s.io/apimachinery/pkg/runtime/schema"
2729
"k8s.io/apimachinery/pkg/types"
2830
"k8s.io/apimachinery/pkg/watch"
2931
)
@@ -55,7 +57,7 @@ func (c *FakeWorkflowClient) Get(name string, options v1.GetOptions) (*v1alpha1.
5557
if ok {
5658
return workflow, nil
5759
}
58-
return nil, errors.New("not found")
60+
return nil, k8errors.NewNotFound(k8schema.ParseGroupResource("workflows.argoproj.io"), name)
5961
}
6062

6163
func (c *FakeWorkflowClient) List(opts v1.ListOptions) (*v1alpha1.WorkflowList, error) {
@@ -69,11 +71,20 @@ func (c *FakeWorkflowClient) Watch(opts v1.ListOptions) (watch.Interface, error)
6971
}
7072

7173
func (c *FakeWorkflowClient) Update(workflow *v1alpha1.Workflow) (*v1alpha1.Workflow, error) {
72-
return nil, nil
74+
name := workflow.GetObjectMeta().GetName()
75+
_, ok := c.workflows[name]
76+
if ok {
77+
return workflow, nil
78+
}
79+
return nil, k8errors.NewNotFound(k8schema.ParseGroupResource("workflows.argoproj.io"), name)
7380
}
7481

7582
func (c *FakeWorkflowClient) Delete(name string, options *v1.DeleteOptions) error {
76-
return nil
83+
_, ok := c.workflows[name]
84+
if ok {
85+
return nil
86+
}
87+
return k8errors.NewNotFound(k8schema.ParseGroupResource("workflows.argoproj.io"), name)
7788
}
7889

7990
func (c *FakeWorkflowClient) DeleteCollection(options *v1.DeleteOptions,
@@ -85,6 +96,11 @@ func (c *FakeWorkflowClient) DeleteCollection(options *v1.DeleteOptions,
8596
func (c *FakeWorkflowClient) Patch(name string, pt types.PatchType, data []byte,
8697
subresources ...string) (*v1alpha1.Workflow, error) {
8798

99+
_, ok := c.workflows[name]
100+
if !ok {
101+
return nil, k8errors.NewNotFound(k8schema.ParseGroupResource("workflows.argoproj.io"), name)
102+
}
103+
88104
var dat map[string]interface{}
89105
json.Unmarshal(data, &dat)
90106

backend/src/apiserver/resource/resource_manager.go

+17-2
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,14 @@ func (r *ResourceManager) ReportWorkflowResource(workflow *util.Workflow) error
708708
// If workflow's final state has being persisted, the workflow should be garbage collected.
709709
err := r.getWorkflowClient(workflow.Namespace).Delete(workflow.Name, &v1.DeleteOptions{})
710710
if err != nil {
711-
return util.NewInternalServerError(err, "Failed to delete the completed workflow for run %s", runId)
711+
// A fix for kubeflow/pipelines#4484, persistence agent might have an outdated item in its workqueue, so it will
712+
// report workflows that no longer exist. It's important to return a not found error, so that persistence
713+
// agent won't retry again.
714+
if util.IsNotFound(err) {
715+
return util.NewNotFoundError(err, "Failed to delete the completed workflow for run %s", runId)
716+
} else {
717+
return util.NewInternalServerError(err, "Failed to delete the completed workflow for run %s", runId)
718+
}
712719
}
713720
}
714721

@@ -777,7 +784,15 @@ func (r *ResourceManager) ReportWorkflowResource(workflow *util.Workflow) error
777784
if workflow.IsInFinalState() {
778785
err := AddWorkflowLabel(r.getWorkflowClient(workflow.Namespace), workflow.Name, util.LabelKeyWorkflowPersistedFinalState, "true")
779786
if err != nil {
780-
return util.Wrap(err, "Failed to add PersistedFinalState label to workflow")
787+
message := fmt.Sprintf("Failed to add PersistedFinalState label to workflow %s", workflow.GetName())
788+
// A fix for kubeflow/pipelines#4484, persistence agent might have an outdated item in its workqueue, so it will
789+
// report workflows that no longer exist. It's important to return a not found error, so that persistence
790+
// agent won't retry again.
791+
if util.IsNotFound(err) {
792+
return util.NewNotFoundError(err, message)
793+
} else {
794+
return util.Wrapf(err, message)
795+
}
781796
}
782797
}
783798

backend/src/apiserver/resource/resource_manager_test.go

+39-2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/pkg/errors"
3232
"github.com/spf13/viper"
3333
"github.com/stretchr/testify/assert"
34+
"github.com/stretchr/testify/require"
3435
"google.golang.org/grpc/codes"
3536
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3637
"k8s.io/apimachinery/pkg/types"
@@ -670,7 +671,7 @@ func TestCreateRun_NullWorkflowSpec(t *testing.T) {
670671
apiRun := &api.Run{
671672
Name: "run1",
672673
PipelineSpec: &api.PipelineSpec{
673-
WorkflowManifest: "null", // this situation occurs for real when the manifest file disappears from object store in some way due to retention policy or manual deletion.
674+
WorkflowManifest: "null", // this situation occurs for real when the manifest file disappears from object store in some way due to retention policy or manual deletion.
674675
Parameters: []*api.Parameter{
675676
{Name: "param1", Value: "world"},
676677
},
@@ -1489,7 +1490,7 @@ func TestReportWorkflowResource_WorkflowMissingRunID(t *testing.T) {
14891490
defer store.Close()
14901491
workflow := util.NewWorkflow(&v1alpha1.Workflow{
14911492
ObjectMeta: v1.ObjectMeta{
1492-
Name: run.Name,
1493+
Name: run.Name,
14931494
},
14941495
})
14951496
err := manager.ReportWorkflowResource(workflow)
@@ -1519,6 +1520,24 @@ func TestReportWorkflowResource_WorkflowCompleted(t *testing.T) {
15191520
assert.Equal(t, wf.Labels[util.LabelKeyWorkflowPersistedFinalState], "true")
15201521
}
15211522

1523+
func TestReportWorkflowResource_WorkflowCompleted_WorkflowNotFound(t *testing.T) {
1524+
store, manager, run := initWithOneTimeRun(t)
1525+
defer store.Close()
1526+
workflow := util.NewWorkflow(&v1alpha1.Workflow{
1527+
ObjectMeta: v1.ObjectMeta{
1528+
Name: "non-existent-workflow",
1529+
Namespace: "kubeflow",
1530+
UID: types.UID(run.UUID),
1531+
Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID},
1532+
},
1533+
Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeFailed},
1534+
})
1535+
err := manager.ReportWorkflowResource(workflow)
1536+
require.NotNil(t, err)
1537+
assert.Equalf(t, codes.NotFound, err.(*util.UserError).ExternalStatusCode(), "Expected not found error, but got %s", err.Error())
1538+
assert.Contains(t, err.Error(), "Failed to add PersistedFinalState label")
1539+
}
1540+
15221541
func TestReportWorkflowResource_WorkflowCompleted_FinalStatePersisted(t *testing.T) {
15231542
store, manager, run := initWithOneTimeRun(t)
15241543
defer store.Close()
@@ -1536,6 +1555,24 @@ func TestReportWorkflowResource_WorkflowCompleted_FinalStatePersisted(t *testing
15361555
assert.Nil(t, err)
15371556
}
15381557

1558+
func TestReportWorkflowResource_WorkflowCompleted_FinalStatePersisted_WorkflowNotFound(t *testing.T) {
1559+
store, manager, run := initWithOneTimeRun(t)
1560+
defer store.Close()
1561+
workflow := util.NewWorkflow(&v1alpha1.Workflow{
1562+
ObjectMeta: v1.ObjectMeta{
1563+
Name: "non-existent-workflow",
1564+
Namespace: "kubeflow",
1565+
UID: types.UID(run.UUID),
1566+
Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID, util.LabelKeyWorkflowPersistedFinalState: "true"},
1567+
},
1568+
Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeFailed},
1569+
})
1570+
err := manager.ReportWorkflowResource(workflow)
1571+
require.NotNil(t, err)
1572+
assert.Equalf(t, codes.NotFound, err.(*util.UserError).ExternalStatusCode(), "Expected not found error, but got %s", err.Error())
1573+
assert.Contains(t, err.Error(), "Failed to delete the completed workflow")
1574+
}
1575+
15391576
func TestReportWorkflowResource_WorkflowCompleted_FinalStatePersisted_DeleteFailed(t *testing.T) {
15401577
store, manager, run := initWithOneTimeRun(t)
15411578
manager.argoClient = client.NewFakeArgoClientWithBadWorkflow()

backend/src/common/util/error.go

+9
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,15 @@ func NewInternalServerError(err error, internalMessageFormat string,
144144
codes.Internal)
145145
}
146146

147+
func NewNotFoundError(err error, externalMessageFormat string,
148+
a ...interface{}) *UserError {
149+
externalMessage := fmt.Sprintf(externalMessageFormat, a...)
150+
return newUserError(
151+
errors.Wrapf(err, fmt.Sprintf("NotFoundError: %v", externalMessage)),
152+
externalMessage,
153+
codes.NotFound)
154+
}
155+
147156
func NewResourceNotFoundError(resourceType string, resourceName string) *UserError {
148157
externalMessage := fmt.Sprintf("%s %s not found.", resourceType, resourceName)
149158
return newUserError(

0 commit comments

Comments
 (0)