Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,14 @@ func (c *nodeExecutor) Abort(ctx context.Context, h interfaces.NodeHandler, nCtx
}

targetEntity := common.GetTargetEntity(ctx, nCtx)

// This event replaces the running one, so re-attach the deck URI or it is lost.
deckFile := ioutils.NewRemoteFileOutputPaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir(), nil).GetDeckPath()
var deckURI string
if md, err := nCtx.DataStore().Head(ctx, deckFile); err == nil && md.Exists() {
deckURI = deckFile.String()
}

err := nCtx.EventsRecorder().RecordNodeEvent(ctx, &event.NodeExecutionEvent{
Id: nodeExecutionID,
Phase: core.NodeExecution_ABORTED,
Expand All @@ -943,6 +951,7 @@ func (c *nodeExecutor) Abort(ctx context.Context, h interfaces.NodeHandler, nCtx
Message: reason,
},
},
DeckUri: deckURI,
ProducerId: c.clusterID,
ReportedAt: ptypes.TimestampNow(),
IsInDynamicChain: dynamic,
Expand Down
61 changes: 61 additions & 0 deletions flytepropeller/pkg/controller/nodes/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1657,6 +1657,7 @@ func TestNodeExecutor_AbortHandler(t *testing.T) {
ns := &mocks.ExecutableNodeStatus{}
ns.EXPECT().GetPhase().Return(v1alpha1.NodePhaseRunning)
ns.EXPECT().GetDataDir().Return(storage.DataReference("s3:/foo"))
ns.EXPECT().GetOutputDir().Return(storage.DataReference("out"))
nl.EXPECT().GetNodeExecutionStatus(mock.Anything, id).Return(ns)
nl.EXPECT().GetNode(id).Return(n, true)
incompatibleClusterErr := fakeEventRecorder{nodeErr: &eventsErr.EventError{Code: eventsErr.AlreadyExists, Cause: fmt.Errorf("err")}}
Expand All @@ -1667,8 +1668,15 @@ func TestNodeExecutor_AbortHandler(t *testing.T) {
h.EXPECT().Finalize(mock.Anything, mock.Anything).Return(nil)
hf.EXPECT().GetHandler(v1alpha1.NodeKindStart).Return(h, nil)

pbStore := &storageMocks.ComposedProtobufStore{}
pbStore.EXPECT().Head(mock.Anything, storage.DataReference(deckPath)).Return(nil, fmt.Errorf("no deck"))
refConstructor := &storageMocks.ReferenceConstructor{}
refConstructor.On("ConstructReference", mock.Anything, storage.DataReference("out"), "deck.html").
Return(storage.DataReference(deckPath), nil)

nodeExecutor := &nodeExecutor{
nodeRecorder: incompatibleClusterErr,
store: &storage.DataStore{ComposedProtobufStore: pbStore, ReferenceConstructor: refConstructor},
}
nExec := recursiveNodeExecutor{
nodeExecutor: nodeExecutor,
Expand Down Expand Up @@ -1708,6 +1716,59 @@ func TestNodeExecutor_AbortHandler(t *testing.T) {
})
}

// deckMetadata reports a deck file that exists; existsMetadata reports the opposite.
type deckMetadata struct{ existsMetadata }

func (deckMetadata) Exists() bool { return true }

func TestNodeExecutor_Abort_ReattachesDeck(t *testing.T) {
ctx := context.Background()

h := &nodemocks.NodeHandler{}
h.EXPECT().Abort(mock.Anything, mock.Anything, "aborting").Return(nil)
h.EXPECT().Finalize(mock.Anything, mock.Anything).Return(nil)

pbStore := &storageMocks.ComposedProtobufStore{}
pbStore.EXPECT().Head(mock.Anything, storage.DataReference(deckPath)).Return(deckMetadata{}, nil)
refConstructor := &storageMocks.ReferenceConstructor{}
refConstructor.On("ConstructReference", mock.Anything, storage.DataReference("out"), "deck.html").
Return(storage.DataReference(deckPath), nil)
store := &storage.DataStore{ComposedProtobufStore: pbStore, ReferenceConstructor: refConstructor}

ns := &mocks.ExecutableNodeStatus{}
ns.EXPECT().GetOutputDir().Return(storage.DataReference("out"))
execContext := &mocks4.ExecutionContext{}
execContext.EXPECT().GetEventVersion().Return(v1alpha1.EventVersion0)
execContext.EXPECT().GetParentInfo().Return(nil)
n := &mocks.ExecutableNode{}
n.EXPECT().GetWorkflowNode().Return(nil)
noTask := ""
n.EXPECT().GetTaskID().Return(&noTask)
nm := &nodemocks.NodeExecutionMetadata{}
nm.EXPECT().GetNodeExecutionID().Return(&core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{Project: "p", Domain: "d", Name: "w"},
NodeId: "n0",
})

var recorded *event.NodeExecutionEvent
rec := &nodemocks.EventRecorder{}
rec.EXPECT().RecordNodeEvent(mock.Anything, mock.Anything, mock.Anything).
Run(func(_ context.Context, ev *event.NodeExecutionEvent, _ *config.EventConfig) { recorded = ev }).
Return(nil)

nCtx := &nodemocks.NodeExecutionContext{}
nCtx.EXPECT().ExecutionContext().Return(execContext)
nCtx.EXPECT().NodeExecutionMetadata().Return(nm)
nCtx.EXPECT().Node().Return(n)
nCtx.EXPECT().NodeStatus().Return(ns)
nCtx.EXPECT().DataStore().Return(store)
nCtx.EXPECT().EventsRecorder().Return(rec)

executor := &nodeExecutor{store: store, eventConfig: &config.EventConfig{}}
assert.NoError(t, executor.Abort(ctx, h, nCtx, "aborting", true))
assert.Equal(t, deckPath, recorded.GetDeckUri())
}

func TestNodeExecutor_FinalizeHandler(t *testing.T) {
ctx := context.Background()
exec := recursiveNodeExecutor{}
Expand Down
Loading