Skip to content

Commit 1775851

Browse files
committed
Preserve deck URI when a node is aborted
Abort builds its own node event and never set DeckUri, so aborting a run wiped the deck from the UI even though deck.html still existed. Re-attach it when the file is present, matching the recovery path. Fixes #7610 Signed-off-by: Moritz Althaus <moritzalthaus@gmx.de>
1 parent de65211 commit 1775851

2 files changed

Lines changed: 70 additions & 0 deletions

File tree

flytepropeller/pkg/controller/nodes/executor.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -933,6 +933,14 @@ func (c *nodeExecutor) Abort(ctx context.Context, h interfaces.NodeHandler, nCtx
933933
}
934934

935935
targetEntity := common.GetTargetEntity(ctx, nCtx)
936+
937+
// This event replaces the running one, so re-attach the deck URI or it is lost.
938+
deckFile := ioutils.NewRemoteFileOutputPaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir(), nil).GetDeckPath()
939+
var deckURI string
940+
if md, err := nCtx.DataStore().Head(ctx, deckFile); err == nil && md.Exists() {
941+
deckURI = deckFile.String()
942+
}
943+
936944
err := nCtx.EventsRecorder().RecordNodeEvent(ctx, &event.NodeExecutionEvent{
937945
Id: nodeExecutionID,
938946
Phase: core.NodeExecution_ABORTED,
@@ -943,6 +951,7 @@ func (c *nodeExecutor) Abort(ctx context.Context, h interfaces.NodeHandler, nCtx
943951
Message: reason,
944952
},
945953
},
954+
DeckUri: deckURI,
946955
ProducerId: c.clusterID,
947956
ReportedAt: ptypes.TimestampNow(),
948957
IsInDynamicChain: dynamic,

flytepropeller/pkg/controller/nodes/executor_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1657,6 +1657,7 @@ func TestNodeExecutor_AbortHandler(t *testing.T) {
16571657
ns := &mocks.ExecutableNodeStatus{}
16581658
ns.EXPECT().GetPhase().Return(v1alpha1.NodePhaseRunning)
16591659
ns.EXPECT().GetDataDir().Return(storage.DataReference("s3:/foo"))
1660+
ns.EXPECT().GetOutputDir().Return(storage.DataReference("out"))
16601661
nl.EXPECT().GetNodeExecutionStatus(mock.Anything, id).Return(ns)
16611662
nl.EXPECT().GetNode(id).Return(n, true)
16621663
incompatibleClusterErr := fakeEventRecorder{nodeErr: &eventsErr.EventError{Code: eventsErr.AlreadyExists, Cause: fmt.Errorf("err")}}
@@ -1667,8 +1668,15 @@ func TestNodeExecutor_AbortHandler(t *testing.T) {
16671668
h.EXPECT().Finalize(mock.Anything, mock.Anything).Return(nil)
16681669
hf.EXPECT().GetHandler(v1alpha1.NodeKindStart).Return(h, nil)
16691670

1671+
pbStore := &storageMocks.ComposedProtobufStore{}
1672+
pbStore.EXPECT().Head(mock.Anything, storage.DataReference(deckPath)).Return(nil, fmt.Errorf("no deck"))
1673+
refConstructor := &storageMocks.ReferenceConstructor{}
1674+
refConstructor.On("ConstructReference", mock.Anything, storage.DataReference("out"), "deck.html").
1675+
Return(storage.DataReference(deckPath), nil)
1676+
16701677
nodeExecutor := &nodeExecutor{
16711678
nodeRecorder: incompatibleClusterErr,
1679+
store: &storage.DataStore{ComposedProtobufStore: pbStore, ReferenceConstructor: refConstructor},
16721680
}
16731681
nExec := recursiveNodeExecutor{
16741682
nodeExecutor: nodeExecutor,
@@ -1708,6 +1716,59 @@ func TestNodeExecutor_AbortHandler(t *testing.T) {
17081716
})
17091717
}
17101718

1719+
// deckMetadata reports a deck file that exists; existsMetadata reports the opposite.
1720+
type deckMetadata struct{ existsMetadata }
1721+
1722+
func (deckMetadata) Exists() bool { return true }
1723+
1724+
func TestNodeExecutor_Abort_ReattachesDeck(t *testing.T) {
1725+
ctx := context.Background()
1726+
1727+
h := &nodemocks.NodeHandler{}
1728+
h.EXPECT().Abort(mock.Anything, mock.Anything, "aborting").Return(nil)
1729+
h.EXPECT().Finalize(mock.Anything, mock.Anything).Return(nil)
1730+
1731+
pbStore := &storageMocks.ComposedProtobufStore{}
1732+
pbStore.EXPECT().Head(mock.Anything, storage.DataReference(deckPath)).Return(deckMetadata{}, nil)
1733+
refConstructor := &storageMocks.ReferenceConstructor{}
1734+
refConstructor.On("ConstructReference", mock.Anything, storage.DataReference("out"), "deck.html").
1735+
Return(storage.DataReference(deckPath), nil)
1736+
store := &storage.DataStore{ComposedProtobufStore: pbStore, ReferenceConstructor: refConstructor}
1737+
1738+
ns := &mocks.ExecutableNodeStatus{}
1739+
ns.EXPECT().GetOutputDir().Return(storage.DataReference("out"))
1740+
execContext := &mocks4.ExecutionContext{}
1741+
execContext.EXPECT().GetEventVersion().Return(v1alpha1.EventVersion0)
1742+
execContext.EXPECT().GetParentInfo().Return(nil)
1743+
n := &mocks.ExecutableNode{}
1744+
n.EXPECT().GetWorkflowNode().Return(nil)
1745+
noTask := ""
1746+
n.EXPECT().GetTaskID().Return(&noTask)
1747+
nm := &nodemocks.NodeExecutionMetadata{}
1748+
nm.EXPECT().GetNodeExecutionID().Return(&core.NodeExecutionIdentifier{
1749+
ExecutionId: &core.WorkflowExecutionIdentifier{Project: "p", Domain: "d", Name: "w"},
1750+
NodeId: "n0",
1751+
})
1752+
1753+
var recorded *event.NodeExecutionEvent
1754+
rec := &nodemocks.EventRecorder{}
1755+
rec.EXPECT().RecordNodeEvent(mock.Anything, mock.Anything, mock.Anything).
1756+
Run(func(_ context.Context, ev *event.NodeExecutionEvent, _ *config.EventConfig) { recorded = ev }).
1757+
Return(nil)
1758+
1759+
nCtx := &nodemocks.NodeExecutionContext{}
1760+
nCtx.EXPECT().ExecutionContext().Return(execContext)
1761+
nCtx.EXPECT().NodeExecutionMetadata().Return(nm)
1762+
nCtx.EXPECT().Node().Return(n)
1763+
nCtx.EXPECT().NodeStatus().Return(ns)
1764+
nCtx.EXPECT().DataStore().Return(store)
1765+
nCtx.EXPECT().EventsRecorder().Return(rec)
1766+
1767+
executor := &nodeExecutor{store: store, eventConfig: &config.EventConfig{}}
1768+
assert.NoError(t, executor.Abort(ctx, h, nCtx, "aborting", true))
1769+
assert.Equal(t, deckPath, recorded.GetDeckUri())
1770+
}
1771+
17111772
func TestNodeExecutor_FinalizeHandler(t *testing.T) {
17121773
ctx := context.Background()
17131774
exec := recursiveNodeExecutor{}

0 commit comments

Comments
 (0)