Skip to content

[Feature Request]: Dynamic partition key in WriteToKinesis #33632

Open
@Nakachi-S

Description

@Nakachi-S

What would you like to happen?

I am currently using WriteToKinesis to transfer data from Pub/Sub to Kinesis.
https://beam.apache.org/releases/pydoc/2.60.0/apache_beam.io.kinesis.html#apache_beam.io.kinesis.WriteToKinesis

Here is the sample code.

import json
import logging

import apache_beam as beam
import yaml
from apache_beam.io.kinesis import WriteToKinesis
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners import PipelineState
from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult


class CustomOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument('--kinesis_stream_name', default=None, type=str, required=True)
        parser.add_argument('--aws_access_key', default=None, type=str, required=True)
        parser.add_argument('--aws_secret_key' , default=None, type=str, required=True)
        parser.add_argument('--input_subscription', default=None, type=str, required=True)


def run(argv=None):
    custom_options = CustomOptions()
    custom_options_dic = custom_options.get_all_options()

    input_subscription = custom_options_dic['input_subscription']

    with beam.Pipeline(options=custom_options) as p:
        (
            p
            | f'Read from Pubsub {input_subscription.replace("/", "_")}' >> beam.io.gcp.pubsub.ReadFromPubSub(subscription=input_subscription).with_output_types(bytes)
            | f'Write to Kinesis {custom_options.kinesis_stream_name}'
            >> WriteToKinesis(
                stream_name=custom_options.kinesis_stream_name,
                aws_access_key=custom_options.aws_access_key,
                aws_secret_key=custom_options.aws_secret_key,
                region="ap-northeast-1",
                producer_properties={"RecordTtl": "3000000"},
                partition_key="1",
            )
        )


if __name__ == '__main__':
    DataflowPipelineResult.wait_until_finish = lambda duration=None: PipelineState.DONE
    logging.getLogger().setLevel(logging.INFO)
    run()

At this time, partition_key can only be specified statically.
I want to refer to the actual contents of the data sent from Pub/Sub and specify it as the partiton key.
For example, I want to specify the actual value of the key named key1 as partition key.

#23570
This is mentioned as a destructive design where only one shard can be written.

Do you have any ideas?

Issue Priority

Priority: 2 (default / most feature requests should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions