Skip to content

Conversation

@wzhramc
Copy link

@wzhramc wzhramc commented Sep 4, 2025

Motivation

This PIP proposes extending Pulsar Functions with configuration-based automatic/managed transaction support to enable exactly-once processing semantics. Currently, Pulsar Functions cannot publish to multiple topics transactionally, which is a significant limitation for use cases requiring atomic multi-topic operations.

Does this pull request potentially affect one of the following parts:

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added PIP doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. labels Sep 4, 2025
@lhotari
Copy link
Member

lhotari commented Sep 4, 2025

@wzhramc Please start a new thread on the dev mailing list about this proposal. You can reference the previous mailing list thread https://lists.apache.org/thread/rll8qyovpd7t9v5yxth25qo44zksbgkn in the new thread.
One reason to start the new thread is to reserve PIP-439 for this proposal since the first proposal to start a discussion with the PIP number reserves it.

lhotari
lhotari previously approved these changes Sep 9, 2025
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just a minor comment about pulsar_function_txn_latency metric, so that it would be disabled by default to reduce the volume and cardinality of the metrics.

@lhotari
Copy link
Member

lhotari commented Sep 9, 2025

One potential future improvement could be to add support for transaction batching. In transaction batching, multiple incoming messages would be processed within the same transaction due to performance reasons so the number of transactions can be reduced. There would need to be transactionBatchingMaxEntries ("entry" would refer to an incoming batch message which could contain multiple messages) and transactionBatchingQuietPeriodMs configuration parameters to control this type of batching. In most cases, it would be useful to configure transactionBatchingMaxEntries to at least 1 so that batch index acknowledgement state wouldn't have to be held across transactions. The transactionBatchingQuietPeriodMs would control how long to wait until "closing" the transaction batch in case the incoming messages don't flow in continuously and transactionBatchingMaxEntries isn't hit.

@lhotari
Copy link
Member

lhotari commented Sep 9, 2025

One potential future improvement could be to add support for transaction batching. In transaction batching, multiple incoming messages would be processed within the same transaction due to performance reasons so the number of transactions can be reduced. There would need to be transactionBatchingMaxEntries ("entry" would refer to an incoming batch message which could contain multiple messages) and transactionBatchingQuietPeriodMs configuration parameters to control this type of batching. In most cases, it would be useful to configure transactionBatchingMaxEntries to at least 1 so that batch index acknowledgement state wouldn't have to be held across transactions. The transactionBatchingQuietPeriodMs would control how long to wait until "closing" the transaction batch in case the incoming messages don't flow in continuously and transactionBatchingMaxEntries isn't hit.

@wzhramc For high-scale use cases, it might be necessary to implement "transaction batching" so that unnecessary load isn't added to the broker (including the transaction coordinator) when enabling transactions. It might be useful to cover the "transaction batching" aspect directly in PIP-439 if you'd be ready to do that before proceeding to the vote. It's also possible to handle that later in another PIP.

@lhotari lhotari dismissed their stale review September 9, 2025 12:27

Waiting for feedback on the "transaction batching" aspect.

Copy link
Contributor

@jiangpengcheng jiangpengcheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just have one question:

for an async function, and it will call the context.newOutputMessage method, how to make the newOutputMessage method to use the right transaction? shall we create a new method newOutputMessage and make it accept a transaction as an argument?

@lhotari
Copy link
Member

lhotari commented Sep 9, 2025

LGTM, just have one question:

for an async function, and it will call the context.newOutputMessage method, how to make the newOutputMessage method to use the right transaction? shall we create a new method newOutputMessage and make it accept a transaction as an argument?

@jiangpengcheng The async function doesn't have access to the Transaction in managed transactions. I think that the context must have a reference so that any new messages would get added to the active transaction for that asynchronous call chain.

@wzhramc
Copy link
Author

wzhramc commented Sep 9, 2025

One potential future improvement could be to add support for transaction batching. In transaction batching, multiple incoming messages would be processed within the same transaction due to performance reasons so the number of transactions can be reduced. There would need to be transactionBatchingMaxEntries ("entry" would refer to an incoming batch message which could contain multiple messages) and transactionBatchingQuietPeriodMs configuration parameters to control this type of batching. In most cases, it would be useful to configure transactionBatchingMaxEntries to at least 1 so that batch index acknowledgement state wouldn't have to be held across transactions. The transactionBatchingQuietPeriodMs would control how long to wait until "closing" the transaction batch in case the incoming messages don't flow in continuously and transactionBatchingMaxEntries isn't hit.

@wzhramc For high-scale use cases, it might be necessary to implement "transaction batching" so that unnecessary load isn't added to the broker (including the transaction coordinator) when enabling transactions. It might be useful to cover the "transaction batching" aspect directly in PIP-439 if you'd be ready to do that before proceeding to the vote. It's also possible to handle that later in another PIP.

I'm not really familiar with batch processing in Pulsar functions in general, so it would take a bit more time for me to figure it out... I would eventually need help with that as well. It would be nice if someone could help me out with this ^^'

@lhotari
Copy link
Member

lhotari commented Sep 9, 2025

I'm not really familiar with batch processing in Pulsar functions in general, so it would take a bit more time for me to figure it out... I would eventually need help with that as well. It would be nice if someone could help me out with this ^^'

Transaction batching wouldn't be directly related to Pulsar batching. It's simply about using the same transaction to process multiple incoming messages. This would be necessary to achieve high performance without putting too much pressure on the transaction coordinator and adding too much latency to processing. We could proceed without having this in PIP-439 and come back to this after testing the performance.

@wzhramc
Copy link
Author

wzhramc commented Sep 9, 2025

I'm not really familiar with batch processing in Pulsar functions in general, so it would take a bit more time for me to figure it out... I would eventually need help with that as well. It would be nice if someone could help me out with this ^^'

Transaction batching wouldn't be directly related to Pulsar batching. It's simply about using the same transaction to process multiple incoming messages. This would be necessary to achieve high performance without putting too much pressure on the transaction coordinator and adding too much latency to processing. We could proceed without having this in PIP-439 and come back to this after testing the performance.

Ah! That makes a lot of sense! Sorry, I mixed it up with Pulsar batching. I can include it in the PIP, it would be pretty neat to have this right away.

@lhotari
Copy link
Member

lhotari commented Sep 9, 2025

Ah! That makes a lot of sense! Sorry, I mixed it up with Pulsar batching. I can include it in the PIP, it would be pretty neat to have this right away.

It might be tricky to implement directly, so a phased implementation could be useful in any case. One benefit of having it in this PIP is that we could find a way how to make the function configuration "future proof". The transaction batching feature makes sense only in the MANAGED mode.

@wzhramc wzhramc changed the title [improve][pip] PIP-439: Adding Transaction Support to Pulsar Functions Through Auto-Transaction Wrapping [improve][pip] PIP-439: Adding Transaction Support to Pulsar Functions Through Managed Transaction Wrapping Sep 9, 2025
@wzhramc
Copy link
Author

wzhramc commented Sep 9, 2025

Ah! That makes a lot of sense! Sorry, I mixed it up with Pulsar batching. I can include it in the PIP, it would be pretty neat to have this right away.

It might be tricky to implement directly, so a phased implementation could be useful in any case. One benefit of having it in this PIP is that we could find a way how to make the function configuration "future proof". The transaction batching feature makes sense only in the MANAGED mode.

@lhotari I've added a section and updated the configuration classes/definitions + metrics for Transaction Batching. I would appreciate it if you could review it when you have a chance

@lhotari
Copy link
Member

lhotari commented Sep 9, 2025

@lhotari I've added a section and updated the configuration classes/definitions + metrics for Transaction Batching. I would appreciate it if you could review it when you have a chance

@wzhramc Looks good, just minor details about the default values.

@wzhramc
Copy link
Author

wzhramc commented Sep 9, 2025

@lhotari I've added a section and updated the configuration classes/definitions + metrics for Transaction Batching. I would appreciate it if you could review it when you have a chance

@wzhramc Looks good, just minor details about the default values.

I fixed the minor details in the latest commit. I'll start the PIP Voting now

@Technoboy- Technoboy- added this to the 4.2.0 milestone Sep 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. PIP

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants