Skip to content

[Bug]: PubsubIO on Flink Runner not acknowledging old messages #32461

Open
@xzhang2sc

Description

@xzhang2sc

What happened?

I'm using "org.apache.beam:beam-runners-flink-1.18:2.57.0".
When I read from pubsub, I found it's not able to acknowledging messages that are generated before the job starts. As a result, the messages are sent to Flink repeatedly, the number of unacked messages stay flat.
I also observed a similiar issue to this one #31510
The ack message count can be higher than the message produce rate.

It can be reproduced with the following code, it's simply reading from pubsub and print out a string.
args

 - "--runner=FlinkRunner"
 - "--attachedMode=false"
 - "--checkpointingInterval=10000"
 - "--unalignedCheckpointEnabled=true"
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import java.util.concurrent.ThreadLocalRandom;

public class Test {
    public static void main(String[] args) {
        FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
                .withValidation().withoutStrictParsing().as(FlinkPipelineOptions.class);
        Pipeline pipeline = Pipeline.create(options);
        PCollection<PubsubMessage> pubsubMessages = pipeline.apply(
                        PubsubIO.readMessages().fromSubscription(
                                "xxx"))
                .apply("print", ParDo.of(new DoFn<PubsubMessage, PubsubMessage>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        if (ThreadLocalRandom.current().nextDouble() < 0.01) {
                            System.out.println("##################");
                            c.output(c.element());
                        }
                    }
                }));
        pipeline.run();
    }
}

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
    Component: Java SDK
    Component: Go SDK
    Component: Typescript SDK
    Component: IO connector
    Component: Beam YAML
    Component: Beam examples
    Component: Beam playground
    Component: Beam katas
    Component: Website
    Component: Infrastructure
    Component: Spark Runner
    Component: Flink Runner
    Component: Samza Runner
    Component: Twister2 Runner
    Component: Hazelcast Jet Runner
    Component: Google Cloud Dataflow Runner

Activity

liferoad

liferoad commented on Sep 16, 2024

@liferoad
Contributor

@je-ik Is this something you could provide some help? or any guideline to fix this issue?

xzhang2sc

xzhang2sc commented on Sep 16, 2024

@xzhang2sc
Author

I have a suspicion that the job needs permission to access pubsub metrics (oldest unacked message age) to work properly, verifying that.

xzhang2sc

xzhang2sc commented on Sep 16, 2024

@xzhang2sc
Author

I found it's able to acknowledging old messages after I got the permission to access pubsub metrics. However, the number is not adding up.
[update: I don't think accessing pubsub metrics is helping]

In the past 30 minutes, the ack message count stays well about 150/s, in total it should've ack'ed 150 * 60 * 30 = 270k messages, but the unacked messages only dropped 8k. The publish rate is about 10/s, which is negligible.

Screenshot 2024-09-16 at 10 42 08 AM
xzhang2sc

xzhang2sc commented on Sep 16, 2024

@xzhang2sc
Author

I found this assumption quite problematic, and the consequence of a wrong watermark is actually dramatic.

This assumes Pubsub delivers the oldest (in Pubsub processing time) available message at least once a minute

If pubsub didn't deliver an old message during the past minute, then the estimated watermark will be wrong. If the watermark has already progressed, then it means old messages don't get acked properly and they will be delivered repeatedly.

In summary I think there are two problems:

  1. the inaccuracy in estimated watermark results in old messages not acked.
  2. The ack message count metric doesn't align with the actual ack'ed messages count. The metrics seems way higher than the actual ack'ed message count.
je-ik

je-ik commented on Sep 17, 2024

@je-ik
Contributor

What is your ack deadline in PubSub? FlinkRunner can ack messages only after checkpoint, default ack deadline is 10 seconds and your checkpoint interval is aligned with that (--checkpointingInterval=10000). This could cause issues you observe, you might try to either decrese checkpoint interval or increase ack deadline.

xzhang2sc

xzhang2sc commented on Sep 17, 2024

@xzhang2sc
Author

My ACK deadline is 600s, so that shouldn't be the issue

xzhang2sc

xzhang2sc commented on Sep 18, 2024

@xzhang2sc
Author

@liferoad @je-ik PubsubIO is basically unusable on Flink runner, but maybe I'm missing some configurations. Is it possible to bump up the priority of this issue?

je-ik

je-ik commented on Sep 18, 2024

@je-ik
Contributor

Adding @Abacn @kennknowles who might have more context.

zendesk-kjaanson

zendesk-kjaanson commented on Jan 27, 2025

@zendesk-kjaanson

I am adding my experience trying to use PubsubIO with FlinkRunner and Python for the past few months.

  • Getting PubsubIO working via expansion transform requires adding jobserver jar manually to the expansion service classpath, otherwise it is not registered and trying to use this will give an error regarding transform URN not found. Quick peek into the underlying code shoed that it is not using ExternalTransform base class as KafkaIO is using? Not sure whats happening there.
  • There is some kind of bug when trying to send PubsubMessage with attributes. Can't remember what was the issue since I simply wanted to get the thing working and did not need attribute sending actually.
  • After getting PubsubIO seemingly functional in Python FlinkRunner pipeline, then small amounts of data (1-40 messages per min) go thrgouh, but if there is any volume that starts to resemble possible production volume the pipeline will consume messages until it hits checkpoint interval limit and then fail processing and start consuming messages from the start. Some of the processed messages will get sent to sink, repeatedly. I played around with different combinations of checkpointing intervals and pubsub ack deadlines but nothing really helped.

In the end I switched to KafkaIO and that works nicely (when using the use_deprecated_read mode).

For now I don't think PubsubIO is in any way usable with FlinkRunner.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

      Participants

      @liferoad@je-ik@xzhang2sc@zendesk-kjaanson

      Issue actions

        [Bug]: PubsubIO on Flink Runner not acknowledging old messages · Issue #32461 · apache/beam