Skip to content

[Bug]: Prism failed on pipelines with reshuffle after windowing #34829

Closed
@shunping

Description

@shunping

What happened?

The following code failed to run after #34348.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.utils.timestamp import Timestamp
from apache_beam.transforms.window import FixedWindows

# need to run with a started prism server
options = PipelineOptions([
    "--streaming",
    "--job_server_timeout=600",
    "--environment_type=LOOPBACK",
    "--runner=PortableRunner", "--job_endpoint=localhost:8073",
])


class InitCount(beam.DoFn):

  def process(self, element):
    return [1]

class PlusOne(beam.DoFn):

  def process(self, element):
    print(element)
    return [element + 1]

duration = 5
fixed_window = FixedWindows(size=duration)

with beam.Pipeline(options=options) as p:
  unboundedSource = p | "s2" >> PeriodicImpulse(
      start_timestamp=Timestamp.now(), fire_interval=duration)
  trigger = unboundedSource | "w1" >> beam.WindowInto(fixed_window)
  c = trigger | "i2" >> beam.ParDo(InitCount())
  _ = (c | beam.Reshuffle() | beam.ParDo(PlusOne()))

Error:

ERROR:root:prism error building stage stage-002: 
unknown coder urn key: beam:coder:windowed_value:v1

Image

The urn beam:coder:windowed_value:v1 is introduced here as a sub-component of in TupleCoder.
Image

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions