Skip to content

[Bug]: Prism crashed if Flatten and GroupByKey share the same input #34643

Closed
@shunping

Description

@shunping

What happened?

The simple pipeline below (i.e. C = [A, B] | Flatten(), D = A | GroupByKey()) crashes prism runner.

with beam.Pipeline(options=options) as p:
  side1 = p | 'side1' >> beam.Create([('a', 1)])

  second_element = [('another_type')]
  side2 = p | 'side2' >> beam.Create(second_element)

  _ = (side1, side2) | beam.Flatten() | "print1" >> beam.Map(print)

  _ = side1 | beam.GroupByKey() | "print2" >>  beam.Map(print)

The error log:

   0: panic in stage.Execute bundle processing goroutine: zero length key: stage-004 ref_PCollection_PCollection_3, stage: &{ID:stage-003 transforms:[ref_AppliedPTransform_side1-FlatMap-lambda-at-core-py-3970-_4 ref_AppliedPTransform_side1-Map-decode-_6] primaryInput:ref_PCollection_PCollection_1 outputs:[{Transform:ref_AppliedPTransform_side1-Map-decode-_6 Local:None Global:ref_PCollection_PCollection_3}] sideInputs:[] internalCols:[ref_PCollection_PCollection_2] envID:ref_Environment_default_environment_1 finalize:false stateful:false onWindowExpiration:{TransformID: TimerFamily:} hasTimers:[] processingTimeTimers:map[] stateTypeLen:map[] exe:<nil> inputTransformID:stage-003_source inputInfo:{GlobalID:ref_PCollection_PCollection_1 WindowCoder:0 WDec:0x10314e680 WEnc:0x10314e680 EDec:0x100f51320 KeyDec:<nil>} desc:0x140002dafc0 prepareSides:0x100f70bd0 SinkToPCollection:map[ref_AppliedPTransform_side1-Map-decode-_6_None:ref_PCollection_PCollection_3] OutputsToCoders:map[ref_PCollection_PCollection_3:{GlobalID:ref_PCollection_PCollection_3 WindowCoder:0 WDec:0x10314e680 WEnc:0x10314e680 EDec:0x100f51320 KeyDec:<nil>}] baseProgTick:{v:100000000}},stackTrace:
goroutine 47 [running]:
runtime/debug.Stack()
	.../go/pkg/mod/golang.org/[email protected]/src/runtime/debug/stack.go:26 +0x64
github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal.(*stage).Execute.func1()
	.../Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/stage.go:118 +0x4c
panic({0x101c795a0?, 0x140009001a0?})
	../go/pkg/mod/golang.org/[email protected]/src/runtime/panic.go:792 +0xf0
github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine.(*aggregateStageKind).addPending(0x10314e680, 0x1400062d520, 0x140001decc0, {0x14000533c20, 0x1, 0x1})
	.../Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:1276 +0xac8
github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine.(*stageState).AddPending(0x1400062d520, 0x140001decc0, {0x14000533b80, 0x1, 0x1})
	.../Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:1250 +0xec
github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine.(*ElementManager).PersistBundle(0x140001decc0, {{0x140008082d0, 0x9}, {0x14000808397, 0x7}, 0x20c49ba5e353f7}, 0x140006f9590, {0x14000711830, 0x0, 0x0, ...}, ...)
	.../Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:841 +0x11c4
github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal.(*stage).Execute(0x14000642480, {0x102169cb0, 0x140001f1c20}, 0x140003006c0, 0x14000178000, 0x14000176280, 0x140001decc0, {{0x140008082d0, 0x9}, {0x14000808397, ...}, ...})
	.../Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/stage.go:333 +0x2324
github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal.executePipeline.func4()
	.../Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/execute.go:355 +0x104
golang.org/x/sync/errgroup.(*Group).Go.func1()
	...g/go/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:79 +0xa8
created by golang.org/x/sync/errgroup.(*Group).Go in goroutine 84
	.../go/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:76 +0xfc

This is the failed case I discovered when working on #34641. It also demonstrates that the previous approach of overwriting the input pcollection coder of Flatten could be problematic.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions