Skip to content

[improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client #23398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

grssam
Copy link
Contributor

@grssam grssam commented Oct 4, 2024

Pulsar protocol currently does not relay rate limiting information over to clients, thus it handles throttling by pausing the entire connection. This leads to noisy neighbour issue for the topics sharing the connection but are not causing/leading to quota breaches.

I am proposing to add a ThrottleProducer command which brokers will send to clients and expect the clients to stop sending further messages until the specified time. The proposal also includes the changes needed for the Java client.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@github-actions github-actions bot added PIP doc-not-needed Your PR changes do not impact docs labels Oct 4, 2024
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work @grssam! Great proposal.

I added one long comment about a small detail related to the metrics and the throttling exception.

Copy link
Contributor

@rdhabalia rdhabalia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think every issue is already addressed and I don't see any specific need for these issue.

@rdhabalia
Copy link
Contributor

I have few questions:

  1. what happens when broker sends Throttle Command to client?
  • how does it impact client application? what happens when client-app calls producer.sendAsync() -> if client lib stops sending messages then client will eventually gets queue-full-execption?
  • Or client will eventually get time-out exceptions for all the messages sitting into the queue>? I guess that could be the likely behavior and in that case, does it make sense to send publish-error-code instead throttle-command from broker. So, broker can send ERROR_CODE.THROTTLING and client can just handle it with same behavior as it has received throttle-command so, we might not need a separate command here and it can be achievable with an error-code.
  1. I think client doesn't have to send ACK for Throttle command back to broker as it's not useful to broker and it seems it's one side notification only.

  2. How does client perform backoff? PIP says 1 second but server could have rate-limiting with different unit as well? and it doesn't make sense to configure on the client side? are we also considering exponential backoff at client (starting with 10ms) and let client retry and see if producer can publish more messages or broker will again send throttle cmd/error.

@grssam
Copy link
Contributor Author

grssam commented Oct 31, 2024

@rdhabalia Please find replies inline below

  1. what happens when broker sends Throttle Command to client?
  • how does it impact client application? what happens when client-app calls producer.sendAsync() -> if client lib stops sending messages then client will eventually gets queue-full-execption?

Yes, the client holds those messages in a new queue until the throttle time has passed. During this time, it is possible that queue full exception may also come if the user has configured that flag.

This is explained in detail in this section of the PIP - https://github.com/apache/pulsar/pull/23398/files#diff-22a644af9af6cae8684b378ec31740275d87a6d96e7f823a63ded1ba86b2581cR265

Please let me know if the details need to be improved further.

  • Or client will eventually get time-out exceptions for all the messages sitting into the queue>? I guess that could be the likely behavior and in that case, does it make sense to send publish-error-code instead throttle-command from broker. So, broker can send ERROR_CODE.THROTTLING and client can just handle it with same behavior as it has received throttle-command so, we might not need a separate command here and it can be achievable with an error-code.

This behavior is unchanged overall. while the client is potentially preventing some of the messages sitting in the pending queue to be even sent out to the TCP pipe, the general behavior remains same with respect to timeout that anything in pending queue gets timedout if SendReceipt isn't received from server. The only difference is that now we don't throw a timeout exception if we know the client was being throttled for majority of the messageSendTimeout duration explained more in this section https://github.com/apache/pulsar/pull/23398/files#diff-22a644af9af6cae8684b378ec31740275d87a6d96e7f823a63ded1ba86b2581cR280

  1. I think client doesn't have to send ACK for Throttle command back to broker as it's not useful to broker and it seems it's one side notification only.

The only reason this throttle ack receipt is introduced is to handle older clients, or clients which haven't implemented this logic of stopping further production for the duration of the throttle time. This is important because if there is no throttle ack receipt , the server assumes that its a client that doesn't understand this new protocol and thus, the server will switch back to the older method of throttling - i.e. pausing TCP channel read.

Let me know if that makes sense.

  1. How does client perform backoff? PIP says 1 second but server could have rate-limiting with different unit as well? and it doesn't make sense to configure on the client side?

Are you referring to the throttling time calculation section - https://github.com/apache/pulsar/pull/23398/files#diff-22a644af9af6cae8684b378ec31740275d87a6d96e7f823a63ded1ba86b2581cR234

This is done on the server side.. so if the frequency/unit of token bucket refill is different than 1 second, it will be accommodated accordingly. The client doesn't make any assumptions on this frequency/unit, it simply blocks further produce until the time specified in the command.

are we also considering exponential backoff at client (starting with 10ms) and let client retry and see if producer can publish more messages or broker will again send throttle cmd/error.

I thought about this, but fundamentally, since we are using uniformly filling token bucket design for rate limiting, it would be very difficult to figure out what kind of exponential back-off to use for sending back the throttling time back to the client. Moreover, since on server side, its a uniform token bucket, sending exponential back-off based time back to client may result in the client over-throttling than needed.

@rdhabalia
Copy link
Contributor

This behavior is unchanged overall. while the client is potentially preventing some of the messages sitting in the pending queue to be even sent out to the TCP pipe

Umm.. I am still not clear. Can't we send THROTTLE_ERROR_CODE and handle it instead of introducing new command?

The only reason this throttle ack receipt is introduced is to handle older clients,

Correct. In that case, check client-version at broker side and don't trigger this workflow if client doesn't support the version. So, we don't need ACK response back from client. For information: check ServerCnx.java -> int clientProtocolVersion = connect.getProtocolVersion();

This is done on the server side.. so if the frequency/unit of token bucket refill is different than 1 second, it will be accommodated accordingly. The client doesn't make any assumptions on this frequency/unit,

got it.

@grssam
Copy link
Contributor Author

grssam commented Nov 3, 2024

Umm.. I am still not clear. Can't we send THROTTLE_ERROR_CODE and handle it instead of introducing new command?

Could you please elaborate a bit more? Not 100% sure what you mean here.. Are you saying that we simply treat all timeouts as throttling issue and return THROTTLE_ERROR_CODE instead of TIMEOUT without the need of server sending any info out to the client (via the new command?)

Correct. In that case, check client-version at broker side and don't trigger this workflow if client doesn't support the version. So, we don't need ACK response back from client. For information: check ServerCnx.java -> int clientProtocolVersion = connect.getProtocolVersion();

Actually, i checked around this on slack. seems like feature flags are the new approach of handling this and protocol version only bumps up in case of major changes in the protocol.
In fact, I am actually using this feature flag approach to not even bother sending this command to clients who have not implemented handling of this new command.

The concern still remains about bad actors - intentional or unintentional.
Intentional - a client says it supports the handling of this throttle command but still misbehaves due to code bug, or any other reason.
Unintentional - TCP is a 2 way stream (or internally, it opens 2 channels).. there can be a choking of outbound messages from broker, thus the throttle command gets held up and clients continue to bombard the broker.
Both of the above situations lead to noisy neighbor issue on the broker side and the client side alike. It essentially leads to no rate limiting.

@rdhabalia
Copy link
Contributor

Could you please elaborate a bit more? Not 100% sure what you mean here.. Are you saying that we simply treat all timeouts as throttling issue and return THROTTLE_ERROR_CODE instead of TIMEOUT without the need of server sending any info out to the client (via the new command?)

Sure, I was suggesting to send a failure ServerError.Throttled to producer as it is just a notification from server to client. and let client receive this error during publish and do necessary backoff.

Actually, i checked around this on slack. seems like feature flags are the new approach of handling this and protocol version only bumps up in case of major changes in the protocol.

Umm.. That should not be true because we can increase this version when we want to check client's compatibility and this should be the example of this usecase.

@grssam
Copy link
Contributor Author

grssam commented Nov 5, 2024

Sure, I was suggesting to send a failure ServerError.Throttled to producer as it is just a notification from server to client. and let client receive this error during publish and do necessary backoff.

Ah got it now. So there are a few reasons for this not being a "response" to a send request :

  • CommandSendError command doesn't have the provision to notify the time for which the producer is throttled. Adding that optional field just for one of the ServerError enums doesn't seem right.
  • Due to the distributed nature of a producer, relying on communication as a response to a Send request might not work out. Suppose 10 producers connected to a topic, one of them might not even have produced yet but the quota might have breached.. So we need to send communication to all 10 producers to throttle themselves.
  • Throttling doesn't actually lead to failures - If the client has reasonable timeouts (default being 30s is more than reasonable) then even with throttling, the message produce will pass. Moreover, every producer could be joining in with a different client sentTimeout value.. Thus, it is not possible in the current protocol approach to manage or figure out whethar a throttling on the broker side would actually lead to client side error at all. Thus, this is decoupled from send response.

Umm.. That should not be true because we can increase this version when we want to check client's compatibility and this should be the example of this usecase.

I suspect this is done because not every client wanted to implement a feature present in protocol version X, but still wanted to implement something present in version Y where X < Y.. thus this independent approach of "supported feature flag" was introduced. This is the slack discussion thread if you want to join in the conversation.

Comment on lines +240 to +241
In case (a), as per the current token bucket based quota tracking introduced in PIP-322, we (lazily) resume channel
reads on the next second mark. i.e. the throttling is only applied at max for 1 second.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After PIP-322, there's no longer any max 1 second throttling. That was the case before PIP-322 since the tokens in the token bucket couldn't get negative. After PIP-322, it will count all tokens and throttle until there's tokens available. You can find the implementation in org.apache.pulsar.broker.qos.AsyncTokenBucket#calculateThrottlingDuration:

/**
* Calculate the required throttling duration in nanoseconds to fill up the bucket with the minimum amount of
* tokens.
* This method shouldn't be called from the hot path since it calculates a consistent value for the tokens which
* isn't necessary on the hotpath.
*/
public long calculateThrottlingDuration() {
long currentTokens = consumeTokensAndMaybeUpdateTokensBalance(0, true);
if (currentTokens == Long.MIN_VALUE) {
throw new IllegalArgumentException(
"Unexpected result from updateAndConsumeTokens with forceUpdateTokens set to true");
}
if (currentTokens > 0) {
return 0L;
}
// currentTokens is negative, so subtracting a negative value results in adding the absolute value (-(-x) -> +x)
long needTokens = getTargetAmountOfTokensAfterThrottling() - currentTokens;
return (needTokens * getRatePeriodNanos()) / getRate();
}

Used in PublishRateLimiterImpl to calculate the throttling duration:
private long calculateThrottlingDurationNanos() {
AsyncTokenBucket currentTokenBucketOnMessage = tokenBucketOnMessage;
long throttlingDurationNanos = 0L;
if (currentTokenBucketOnMessage != null) {
throttlingDurationNanos = currentTokenBucketOnMessage.calculateThrottlingDuration();
}
AsyncTokenBucket currentTokenBucketOnByte = tokenBucketOnByte;
if (currentTokenBucketOnByte != null) {
throttlingDurationNanos = Math.max(throttlingDurationNanos,
currentTokenBucketOnByte.calculateThrottlingDuration());
}
return throttlingDurationNanos;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari but the fact that new tokens are added (lazily) every second to the bucket effectively limit the throttling to 1 second, right? the producers may get throttled right away the very next nano second again ..

Copy link
Member

@lhotari lhotari Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari but the fact that new tokens are added (lazily) every second to the bucket effectively limit the throttling to 1 second, right? the producers may get throttled right away the very next nano second again ..

@grssam No, after PIP-322, tokens will be calculated based on the elapsed time and the configured rate of adding tokens. The rate period of 1 second is only impacting how the rate of adding tokens is expressed. It's possible that some of the current documentation is outdated and hasn't been updated to capture this detail.
A different matter is that the maximum number of tokens in the bucket is limited by the bucket size. It's currently set to the configured rate since in Pulsar, we have a single number for configuring publishing rate. The DTO class is https://github.com/apache/pulsar/blob/master/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PublishRate.java .
After PIP-322, there's no resetting of the tokens and the number of consumed tokens will often become negative. That is necessary in an asynchronous token bucket implementation like there is in Pulsar. When tokens are consumed, the operation is never blocked when that happens. The throttling will happen when tokens become negative and the throttling period will last until time has elapsed until there's a minimum amount of tokens in the bucket again. By default, the minimum amount of tokens is 16 milliseconds of worth tokens with the configured rate. Let's say if the rate is 1000 / second and tokens have gone 500 tokens negative, the throttle duration will be 516 milliseconds. This is how the current rate limiting works in Pulsar after PIP-322.

When PIP-322 started, we had discussions around having configurable options for allowing bursting. That's all possible with the current foundation laid with PIP-322 changes. The simplest form of bursting configuration would be to have separate configuration options for the bucket size so that it's not dependent on the configuration of the rate of adding tokens as it is currently. There could also be variable rates for adding tokens as we have discussed in the past, but that would require additional work to implement it. Adding basic bursting support would be something to consider before adding more advanced features since bursting support is already technically implemented with the token bucket size and adding bursting support is mainly about adding the configuration options and documentation for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was busy with other production issue during the PIP-322 time and it was merged (impl included) within a week or two.. but overall, my understanding was that in practice, we are sticking to the strict rate limiter design and only the under the hood implementation is being changed to be more efficient and future proof?

But now the way you are explaining that tokens could be -500 - seems like we have gone back to the non-existent ex-default poller based rate limiter that was there prior to 322? What is preventing a topic to burst and reach -10000 tokens? Sure, it would take longer for the bucket to come back to 0 and thus, the produce to that topic would be blocked for longer time than the next second mark, but the damage would have already been done with respect to noisy neighbor (at the broker level) due to exhausting other physical resources like disk bandwidth etc..

I will have to read the code in detail to understand this better.. but please correct me if the above understanding is wrong.

Having said that, if my understanding is correct - isn't this a much bigger challenge than the concern you have raised in your comment regarding calculating RTT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari - Can you please provide some insights here?

But now the way you are explaining that tokens could be -500 - seems like we have gone back to the non-existent ex-default poller based rate limiter that was there prior to 322? What is preventing a topic to burst and reach -10000 tokens? Sure, it would take longer for the bucket to come back to 0 and thus, the produce to that topic would be blocked for longer time than the next second mark, but the damage would have already been done with respect to noisy neighbor (at the broker level) due to exhausting other physical resources like disk bandwidth etc..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Pulsar, the rate limiting is non-blocking and it doesn't drop traffic. For Netty, it's not a feasible approach to block in the network layer. This is one of the reasons for the non-blocking approach. The implications of this is that when a client sends (publishes) messages and it arrives to the broker, it will always be handled as fast as possible. This is the reason why it must be allowed to have as many negative tokens as has been published. The rate will smoothen over time since the throttling delay will last until there are a minimal amount of tokens available.

but overall, my understanding was that in practice, we are sticking to the strict rate limiter design and only the under the hood implementation is being changed to be more efficient and future proof?

What' your definition of "strict rate limiter" design?

But now the way you are explaining that tokens could be -500 - seems like we have gone back to the non-existent ex-default poller based rate limiter that was there prior to 322? What is preventing a topic to burst and reach -10000 tokens?

I hope that the explanation of the non-blocking nature of Pulsar's rate limiting in the first paragraph of this comment explains why tokens can go to any negative value that is necessary to handle the already in progress messages until throttling can be applied to prevent new messages to enter the broker.

Sure, it would take longer for the bucket to come back to 0 and thus, the produce to that topic would be blocked for longer time than the next second mark, but the damage would have already been done with respect to noisy neighbor (at the broker level) due to exhausting other physical resources like disk bandwidth etc..

In PIP-322, there's no "next second mark". I hope that the explanation of the non-blocking and non-dropping nature of Pulsar rate limiting explains that.

Having said that, if my understanding is correct - isn't this a much bigger challenge than the concern you have raised in your comment regarding calculating RTT?

What challenge are you referring to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Pulsar, the rate limiting is non-blocking and it doesn't drop traffic. For Netty, it's not a feasible approach to block in the network layer. This is one of the reasons for the non-blocking approach. The implications of this is that when a client sends (publishes) messages and it arrives to the broker, it will always be handled as fast as possible. This is the reason why it must be allowed to have as many negative tokens as has been published. The rate will smoothen over time since the throttling delay will last until there are a minimal amount of tokens available.

This wasn't the case before PIP-322.. Specifically, it was mentioned that overall user facing behavior isn't changing. It was mentioned multiple times:

https://github.com/apache/pulsar/pull/21680/files#diff-988544056ef2793c7360d39cdeb7f020416985d08bbc0072fee1a4aebf18246dR29

https://github.com/apache/pulsar/pull/21680/files#diff-988544056ef2793c7360d39cdeb7f020416985d08bbc0072fee1a4aebf18246dR179

https://github.com/apache/pulsar/pull/21680/files#diff-988544056ef2793c7360d39cdeb7f020416985d08bbc0072fee1a4aebf18246dR369

If I look at it from a user perspective, to me, this has now regressed from the way it was pre PIP-322. Let's take a look:

Before PIP-322:

  • Default poller based throttling - we can define the polling frequency and bad producers can burst as much as they want between 2 polling checks, once every second.
  • Precise poller based throttling - very strict rate limiter, doesn't allow a single byte/qps over the value. In theory, has locking contention, but in practice (I have personally done numerous different workload tests) has batter CPU performance than the default poller based throttling.

After PIP-322:

  • Async token based throttling - bad producers can burst as much as they want between an unconfigurable time window. They cannot do this every second, but depends on how much they burst.

So, post PIP-322 there is no strict rate limiter, but possibly a some what better version of the default rate limiter pre-PIP-322 assuming that hardware doesn't degrade.

What' your definition of "strict rate limiter" design?

I think its very straight forward - a rate limiter where "infinite" or even significant throughput above the limit is not allowed and is blocked.

I hope that the explanation of the non-blocking nature of Pulsar's rate limiting in the first paragraph of this comment explains why tokens can go to any negative value that is necessary to handle the already in progress messages until throttling can be applied to prevent new messages to enter the broker.

Sure, in theory, it sounds good. But this is a MAJOR concern in practice. There can be bad actors leading to sudden spike in traffic. Sure, they will be blocked for relevant time, but when they come back again, they will spike as much as they want once again.

imagine this case - a topic with 1000 qps rate limit. So essentially 1000 tokens gets added every second spread over the 16ms time frame you've mentioned.
A bad actor can lead to a burst of 10,000 - then it gets throttled for 10 seconds, then it can again do 10,000.

At a macro level, it may seem that throttling is working very well and things are getting smoothed out, but that's not how hardware works. That single 10,000 burst would have added heavy toll to the underlying disk or network leading to noisy neighbor impact on all other topics present on that broker. How is this being contained?

What challenge are you referring to?

the non existence of RTT in this proposal is practically a non-concern if the tokens can go to any humongous negative value. That's what I am referring to.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't the case before PIP-322.. Specifically, it was mentioned that overall user facing behavior isn't changing.

If I look at it from a user perspective, to me, this has now regressed from the way it was pre PIP-322. Let's take a look:

From the user's perspective, PIP-322 is a significant improvement. There aren't any regressions or breaking changes in behavior. If you test the PIP-322 solution, you will notice that it behaves in very reasonable ways and provide good results.

Async token based throttling - bad producers can burst as much as they want between an unconfigurable time window. They cannot do this every second, but depends on how much they burst.

This isn't the case.
Pulsar has never dropped or rejected traffic when a producer goes over the limit. Before PIP-322, the behavior was that a bad producer can burst as much as they want. That has been fixed in PIP-322.
If there's a burst and the tokens go negative, the TCP throttling will stop reading more messages from the network layer (Netty) and it will throttle until there are available tokens available. Due to the use of Netty's io.netty.handler.flow.FlowControlHandler in the channel pipeline, Pulsar can accept messages one-by-one when it's throttling. This is how the tokens wouldn't go infinitely negative even when there's a large spike. FlowControlHandler has been in use before too so things haven't change there.
In Pulsar, we don't have a way to configure the token bucket size separately from the rate at the moment. The token bucket will fill up in 1 second and that's why spikes are limited to that 1 second worth of tokens.

I think its very straight forward - a rate limiter where "infinite" or even significant throughput above the limit is not allowed and is blocked.

As mentioned above, there's no infinite rate limiter in PIP-322 implementation. The problem you are describing doesn't exist in PIP-322.

Sure, in theory, it sounds good. But this is a MAJOR concern in practice. There can be bad actors leading to sudden spike in traffic. Sure, they will be blocked for relevant time, but when they come back again, they will spike as much as they want once again.

imagine this case - a topic with 1000 qps rate limit. So essentially 1000 tokens gets added every second spread over the 16ms time frame you've mentioned.
A bad actor can lead to a burst of 10,000 - then it gets throttled for 10 seconds, then it can again do 10,000.

This problem doesn't exist in the PIP-322 implementation. Messages (== batch messages, a.k.a "entries") are handled one-by-one with FlowControlHandler. The maximum amount it can go over the limit is by one entry.
If the configured rate is very low, 1 batch message can cause the tokens to go negative. That's a correct way to handle rate limiting in Pulsar.

At a macro level, it may seem that throttling is working very well and things are getting smoothed out, but that's not how hardware works. That single 10,000 burst would have added heavy toll to the underlying disk or network leading to noisy neighbor impact on all other topics present on that broker. How is this being contained?

This is not a problem since the PIP-322 rate limiting is really a "strict" rate limiting implementation. I hope the above explanations clarify this.

the non existence of RTT in this proposal is practically a non-concern if the tokens can go to any humongous negative value. That's what I am referring to.

tokens don't go to humongous negative values as explained above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the case. Pulsar has never dropped or rejected traffic when a producer goes over the limit. Before PIP-322, the behavior was that a bad producer can burst as much as they want.

Not if the broker was configured to use PreciseRateLimieter. Moreover, even as per the default configurations, a bad producer could only burst for upto first 10ms of a every second.

That has been fixed in PIP-322. If there's a burst and the tokens go negative, the TCP throttling will stop reading more messages from the network layer (Netty) and it will throttle until there are available tokens available. Due to the use of Netty's io.netty.handler.flow.FlowControlHandler in the channel pipeline, Pulsar can accept messages one-by-one when it's throttling. This is how the tokens wouldn't go infinitely negative even when there's a large spike. FlowControlHandler has been in use before too so things haven't change there. In Pulsar, we don't have a way to configure the token bucket size separately from the rate at the moment. The token bucket will fill up in 1 second and that's why spikes are limited to that 1 second worth of tokens.

I am not talking about the case when it's already throttling. I am talking about the case during the 16ms (Default resolution time, and as far as I see, non-configurable by pulsar users), the token check takes a fast path as shouldUpdateTokensImmediately returns false and thus tokens are just added into a pendingConsumedTokens variable instead of actually checking if there are enough tokens left in the bucket.

ref: https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java#L153

Please let me know if my read on this code piece is incorrect, but as I understand, multiple producers can send multiple entries during the resolutionNanos time frame and they all will be allowed through. This can even go beyond the 1 second worth of tokens cap that you are referring to.

imagine this case - a topic with 1000 qps rate limit. So essentially 1000 tokens gets added every second spread over the 16ms time frame you've mentioned.
A bad actor can lead to a burst of 10,000 - then it gets throttled for 10 seconds, then it can again do 10,000.

This problem doesn't exist in the PIP-322 implementation. Messages (== batch messages, a.k.a "entries") are handled one-by-one with FlowControlHandler. The maximum amount it can go over the limit is by one entry. If the configured rate is very low, 1 batch message can cause the tokens to go negative. That's a correct way to handle rate limiting in Pulsar.

Can you point me to the code piece which enforces this? My read on the code and the java doc is different. I followed the code, and I see no such check that only allows 1 batch message to make the tokens go negative. As I read the code (referenced above), any number of batches can be produced within the resolutionNanos time duration without actually reducing the tokens.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I apologize for the late reviews on this PIP-385 document PR, #23398.

I support accepting PIP-385, although the current design contains a few major challenges that haven't been addressed yet. From my point of view, we can close the vote and accept this PIP. To make the implementation meet its goals, these design challenges will need to be addressed as part of future changes and captured in the PIP document to ensure the design is properly documented. We cannot simply proceed in a waterfall manner and expect that the design for PIP-385 is complete with implementation as the next phase. There will need to be iteration between design, experimentation, implementation, and validation.

The key design challenges that need to be addressed include:

  • Flow Control solution to avoid TCP-level throttling: The proposed throttling command and period-based solution faces challenges due to queuing between client and broker. When a client stops sending, the broker continues receiving messages in the queue, making flow control less accurate. While a permit/credit-based approach was initially considered, the throttling command solution can work with additional design considerations and will most likely lead to a simpler solution than a permit/credit based approach for producer flow control.
  • Queue Size Estimation: For the flow control solution, the broker needs a way to estimate messages queued between client and broker. One potential solution is using enhanced Ping/Pong commands with request IDs and timestamps to calculate RTT and estimate the queue size. This could help the broker send throttle commands early enough to avoid TCP-level throttling, which cannot be removed even when the client supports PIP-385. Removing TCP-level throttling would result in out-of-memory issues or higher memory usage compared to the current behavior in the Pulsar Broker.
  • Fair Queuing for Multi-Producer Topics: The current design doesn't address how to ensure fair quota sharing among multiple producers on a topic. While PIP-322 has a simple fairness solution, solving fair queuing for this new throttling mechanism will likely be more challenging.

Additionally, there's a Pulsar Java client issue (out of scope for PIP-385):

  • Non-blocking Producer Flow Control: The current blocking approach in the Java client is problematic for async APIs. A separate PIP may be needed to implement non-blocking producer flow control for the Pulsar Java client async API.

We aren't addressing a simple problem. That's why producer flow control hasn't been solved in Pulsar by now. Accepting PIP-385 will be beneficial as it commits us as a community to solving this problem and continuing to make progress.

Thank you, Girish, for driving this effort!

Most of the logic to actually block a message until throttling is lifted resides inside `ProducerImpl::processOpSendMsg`

If the configured `sendTimeout` is less than the remaining throttled duration while we are being throttled, we simply fail
with rate limit exception before adding the message to `pendingMessages`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we call it a "producer throttled exception" since the concept used in the commands is "throttle producer"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can rework the language here but its throttle exception as explained in the below sections

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to be consistent and call it "producer throttled exception" instead of "rate limit exception".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will do everywhere

Comment on lines +227 to +229
* For calls arising from `PublishRateLimiterImpl` class, add logic in `ServerCnxThrottleTracker.java` to send the
command to client and wait for response for the max configured duration before calling `changeAutoRead`. It checks for
feature availability first.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic might need some more careful thought.

It feels that in the case that the client supports the throttling command, that the broker should be able to accept that the rate goes over the limit in a shorter period of time and if this continues for too long, the throttling by changing auto read would have to happen since the broker could run out of memory if this doesn't happen.

One way to handle this is to send the throttling command to the client as soon as the tokens are below a certain threshold. If the tokens get consumed to a certain value (it can be negative), the broker would have to change auto read to false.

The flow control will be less accurate when using the throttling command and period based solution. The reason for this is that there's a queue between the client and the broker. When the client stops sending, the broker will continue to receive all produced messages that are in the queue. This is the reason why I was more in favor of a permit (credit) based flow control initially when we started the discussion about this PIP. However, I think that the problem can be solved with the throttling command based solution, but it will most likely require additional details that aren't currently in this PIP. We can figure out the details along the way when we get further with the implementation.

If the broker would know how many messages there are in the queues between the client and the broker, it would be possible for the broker to take this into account when calculating the throttling period and when it should send out the command to throttle the producer. However, this would most likely require a separate control channel between the broker and the client to pass this information.

It is also possible that this problem makes the throttling command useless for producer flow control without the need to constantly use the TCP level throttling. This would make this PIP not meet the original goals.

There are many ways around the challenges, but solving this aspect is perhaps the hardest problem in implementing this PIP so that it meets the original goals.

One possible solution could be to estimate the queue size by using the Pulsar's Ping/Pong commands. For this purpose, the broker would have to send Ping commands frequently and measure the round trip time from the Pong responses. However, the current CommandPing and CommandPong messages don't currently contain any fields, so passing a request id for correlating the Pong response with the Ping request isn't currently possible in the protocol. It would be necessary to add the request_id field as there is for other commands to support using Ping/Pong for calculating RTT.

The way how RTT is useful in preventing the need to block the reads (by changing auto read to false) is that the broker could assume that the client continues to produce messages with a constant rate and so that it sends the throttle command early enough that the threshold to block reads isn't met. I don't currently see other simple solutions to solve this problem. Perhaps there are also others. I think I had this RTT solution in mind when I first accepted that we can move forward with the throttle command based flow control instead of using permits based flow control. The permits based flow control has other problems so it might eventually be easier to solve flow control with the throttle command based flow control. In the permits based flow control, one of the challenges is that there could be multiple producers and if permits have been sent out to one producer, it could potentially prevent other producers from producing. It's also hard to integrate the permits based flow control to multiple ways of throttling at multiple levels. That's why I think that throttle command based flow control is a better way forward, as long as we are able to solve the challenge that I hopefully described well enough in this comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic might need some more careful thought.

It feels that in the case that the client supports the throttling command, that the broker should be able to accept that the rate goes over the limit in a shorter period of time and if this continues for too long, the throttling by changing auto read would have to happen since the broker could run out of memory if this doesn't happen.

One way to handle this is to send the throttling command to the client as soon as the tokens are below a certain threshold. If the tokens get consumed to a certain value (it can be negative), the broker would have to change auto read to false.

The flow control will be less accurate when using the throttling command and period based solution. The reason for this is that there's a queue between the client and the broker. When the client stops sending, the broker will continue to receive all produced messages that are in the queue. This is the reason why I was more in favor of a permit (credit) based flow control initially when we started the discussion about this PIP. However, I think that the problem can be solved with the throttling command based solution, but it will most likely require additional details that aren't currently in this PIP. We can figure out the details along the way when we get further with the implementation.

My major concern with permit based (and even if we put in a threshold to send in the command, it kind of falls in same category) is that its a major challenge to implement it correctly and efficiently in a distributed manner.
The producers here are distributed. We cannot make assumptions on the message rate spread of those producers, specially given how the batching works (round robin) from a client's perspective - leading to sudden bursts to a subset of producers for a topic at any given time. Managing permits in this situation at a sub second level is almost impossible to achieve without massively overshooting or undershooting the rate limiting mark.

I do understand that the producers could snoop in many more messages before they the throttle acknowledgement is read, but doing the other way round also is not feasible i.e. blocking the channel read until all throttle receipts are received ... as the channel is paused...

If the broker would know how many messages there are in the queues between the client and the broker, it would be possible for the broker to take this into account when calculating the throttling period and when it should send out the command to throttle the producer. However, this would most likely require a separate control channel between the broker and the client to pass this information.

It is also possible that this problem makes the throttling command useless for producer flow control without the need to constantly use the TCP level throttling. This would make this PIP not meet the original goals.

There are many ways around the challenges, but solving this aspect is perhaps the hardest problem in implementing this PIP so that it meets the original goals.

One possible solution could be to estimate the queue size by using the Pulsar's Ping/Pong commands. For this purpose, the broker would have to send Ping commands frequently and measure the round trip time from the Pong responses. However, the current CommandPing and CommandPong messages don't currently contain any fields, so passing a request id for correlating the Pong response with the Ping request isn't currently possible in the protocol. It would be necessary to add the request_id field as there is for other commands to support using Ping/Pong for calculating RTT.

The way how RTT is useful in preventing the need to block the reads (by changing auto read to false) is that the broker could assume that the client continues to produce messages with a constant rate and so that it sends the throttle command early enough that the threshold to block reads isn't met. I don't currently see other simple solutions to solve this problem. Perhaps there are also others. I think I had this RTT solution in mind when I first accepted that we can move forward with the throttle command based flow control instead of using permits based flow control. The permits based flow control has other problems so it might eventually be easier to solve flow control with the throttle command based flow control. In the permits based flow control, one of the challenges is that there could be multiple producers and if permits have been sent out to one producer, it could potentially prevent other producers from producing. It's also hard to integrate the permits based flow control to multiple ways of throttling at multiple levels. That's why I think that throttle command based flow control is a better way forward, as long as we are able to solve the challenge that I hopefully described well enough in this comment.

RTT time is one aspect that can help predicting and anticipating the throttling, but there are other aspects as well. TCP is dual channel, but in each way (sending the throttle command, and receiving back the ack) there could be pending messages. Although, the processing of those messages is async, so clearing any pending message read (on broker) or send receipt read (on client side) and reaching the actual throttle/throttle receipt command should be quick enough. This adds to the the network based RTT.

Moreover, calculating RTT using ping/pong would actually also face similar challenge based on the noisiness and variability of the producer rate from the producer in any point in time.. (batches and round robin partition assignment leading to burst patterns at a sub-second level)

This part of the proposal definitely needs more work to improve things, but I think its a good starting point with the pros overtaking the cons.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My major concern with permit based (and even if we put in a threshold to send in the command, it kind of falls in same category) is that its a major challenge to implement it correctly and efficiently in a distributed manner. The producers here are distributed. We cannot make assumptions on the message rate spread of those producers, specially given how the batching works (round robin) from a client's perspective - leading to sudden bursts to a subset of producers for a topic at any given time. Managing permits in this situation at a sub second level is almost impossible to achieve without massively overshooting or undershooting the rate limiting mark.

I do understand that the producers could snoop in many more messages before they the throttle acknowledgement is read, but doing the other way round also is not feasible i.e. blocking the channel read until all throttle receipts are received ... as the channel is paused...

I agree, permits based solution would be challenging too. However, the challenges could be solved there more accurately since the broker could send the client more specific information about the limits. In Pulsar rate limits are expressed in both messages count and size (bytes). We don't have to go into the solution details here and we can focus on addressing the challenges in the throttling period based solution.

RTT time is one aspect that can help predicting and anticipating the throttling, but there are other aspects as well. TCP is dual channel, but in each way (sending the throttle command, and receiving back the ack) there could be pending messages. Although, the processing of those messages is async, so clearing any pending message read (on broker) or send receipt read (on client side) and reaching the actual throttle/throttle receipt command should be quick enough. This adds to the the network based RTT.

Moreover, calculating RTT using ping/pong would actually also face similar challenge based on the noisiness and variability of the producer rate from the producer in any point in time.. (batches and round robin partition assignment leading to burst patterns at a sub-second level)

This part of the proposal definitely needs more work to improve things, but I think its a good starting point with the pros overtaking the cons.

The RTT could be calculated continuously so that the broker would be able to estimate how long it takes until the client pauses delivery when a throttle command is sent to the client. I don't see any specific problem in calculating the RTT. If there's noisiness, that impacts also the commands. With timestamps, it's possible to find out what the delay is in both directions. The clock differences of the broker and the client can be calculated initially when the connection gets established, before the connection is congested. This all would be a sufficiently accurate estimation of the clock difference and after this, it's possible to calculate how long it takes to travel from broker to client and from client to broker. This would be changing all the time and that's why the Ping/Pong messages would have to be flowing all the time when the connection isn't idling.

The reason why this information is needed is that without the information, the TCP level throttling would be needed. The primary goal of PIP-385 is to avoid that. The way how it could be avoided is sending the throttle producer command from the broker to the client early enough so that the client receives it on time so that it can impact the traffic.

There would have to be some level of allowance for bursting and that's why the throttling command solution will have a higher peak memory consumption. The peak memory usage can be reduced by improving the algorithm and the prediction logic. The simplest version would assume that the same message rate (count & bytes) is constant and using the RTT information, it could predict how long it would have to ask the client to throttle.

An improvement could be to have specific fields in the throttle producer response to include client side producer queue information of how many messages and what total size there is waiting to be sent to the broker. Metrics about the current rate could also help on the broker side to adjust the following throttle commands it sends to the client.

I think it's necessary to handle this before PIP-385 could be useful and prevent TCP level throttling in taking place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at many scenarios of extreme cases and average cases and RTT does seem to improve the situation about micro-bursting while we are waiting for throttle command receipt.
Although, I think calculating the RTT would need many more changes in the protocol, so I am proposing that I will raise a followup PIP for it, since RTT isn't specifically for this use case but can be used at other places as well.
What are your thoughts?

Implementation wise, we can ensure that these two PIP's land together, or are cherry-picked to be in the same version if needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, I think calculating the RTT would need many more changes in the protocol, so I am proposing that I will raise a followup PIP for it, since RTT isn't specifically for this use case but can be used at other places as well.
What are your thoughts?

I think that calculating RTT is relevant to be able to make PIP-385 useful. That's why I don't see a problem in introducing protocol changes for that purpose in the same PIP.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, I think calculating the RTT would need many more changes in the protocol, so I am proposing that I will raise a followup PIP for it, since RTT isn't specifically for this use case but can be used at other places as well.
What are your thoughts?

I think that calculating RTT is relevant to be able to make PIP-385 useful. That's why I don't see a problem in introducing protocol changes for that purpose in the same PIP.

Honestly, I have opposing views here. PIP-385 will improve situation what-so-ever, with or without RTT.
I've already gotten 3 votes here. If I introduce another major protocol change as part of the same PIP, I will have to re-initiate dsicussion and voting.

Honestly, with the massive delays in review, I am opposed to landing massive changes at once.

If you strongly believe that RTT is a must have (which I don't), then the implementation can be planned accordingly, but I do not see a point in increasing the scope of this PIP as it has already gotten enough discussion and votes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, I have opposing views here. PIP-385 will improve situation what-so-ever, with or without RTT.
I've already gotten 3 votes here. If I introduce another major protocol change as part of the same PIP, I will have to re-initiate dsicussion and voting.

It's possible to make progress incrementally too. I'm not blocking you of making progress. The current PIP-385 doesn't currently describe how the rate limiting is achieved in a way how the buffers on the broker side stay within limits so that broker doesn't have to use TCP throttling to stop buffer growth. You can choose to ignore that problem for now, but it's something that we will have to address sooner or later. Incremental approaches are very useful so that we don't have to speculate about possible issues. At the same time, there needs to be a sufficient test cases where the PIP-385 solution can show that buffers stay within limits without TCP throttling. If that's not met, the solution will be useless. It's always good to have a failing test case to guide the implementation. Practice is usually a better teacher than long theoretical discussions. We can come back to these challenges when it becomes relevant in the implementation.

If you strongly believe that RTT is a must have (which I don't), then the implementation can be planned accordingly, but I do not see a point in increasing the scope of this PIP as it has already gotten enough discussion and votes.

I'm not imposing any restrictions on the implementation as long as the success criteria for PIP-385 is met. I think that RTT will be useful in finding a solution for making PIP-385 successful. I could always be wrong with my gut feeling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think that RTT will be useful in finding a solution for making PIP-385 successful. I could always be wrong with my gut feeling.

If I understand correctly, your core concern is that the throttle command and the receipt back will have certain delays which will vary per producer, to tackle this, we need to understand RTT for each producer and pre-emptively send the throttle command.

What I can do in this PIP without changing the design much is to use the proposed config throttleProducerReceiptWaitTimeMillis (after some renaming) and based on the configured value, pre-emptively send the throttle command. Ofcourse, it wont be 100% accurate, but it would be tunable by the users.

Then once RTT is available, we can switch over to that.

Comment on lines +250 to +251
* We maintain a boolean and an enum field at a producer level to capture if the producer is being throttled or not and
the reason for current throttling.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This boolean field might not be very useful for clients. Similar to the ProducerThrottledException logic where the exception is used when the connection has been throttled for more than 80% of the time (for some observation period), it would be useful to have such a "producer utilization metric" available. (U = B/T
where U = utilization, B = total time the producer was throttled during T, the observation period, explained in https://github.com/apache/pulsar/pull/23398/files#r1787817397) With such a metric, the client could slow down producing so that it doesn't go over a given threshold in the case that the client has a way to control how fast it produces messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"slowing down" would mean anticipation.. and again, with producers being distributed.. its very tricky to handle anticipation about the future.

while the utilization can certainly be added in as a metric that can be consumed easily, this boolean gives a more direct answer.

in a partitioned producer, its even more useful while selecting the next partition in the round robin manner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while the utilization can certainly be added in as a metric that can be consumed easily, this boolean gives a more direct answer.

It does give a direct answer, but it's pretty much useless. In a real scenario, when the producer is producing faster than the limit, the throttling will be continuously switching on and off. If is't not throttling when the application checks it, it doesn't really tell much about the current situation. That's why the utilization metric is much more informative. It will provide probabilistic information for the client application whether the sending will get throttled or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does give a direct answer, but it's pretty much useless. In a real scenario, when the producer is producing faster than the limit, the throttling will be continuously switching on and off.

We have discussed this internally with the potential users and we already have a few use cases which will greatly benefit from this.
Yes, the throttling will be continuously switching on and off, but in scenarios where the upstream can do a retry after a delay, sending them an early exit saves many resources (threads on both upstream and pulsar client side) .

This is mostly coming from use cases similar to a RestBus on top of Pulsar serving to many topics at once.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the throttling will be continuously switching on and off, but in scenarios where the upstream can do a retry after a delay, sending them an early exit saves many resources (threads on both upstream and pulsar client side) .

I don't think that the boolean value provides necessary information for this type of use case since the value could be unthrottled even in cases where the throttling is happening a lot (it just happens to be unthrottled when the value is read) and publish latency is high. A sliding window publish latency information would be more useful. Isn't the publish latency the relevant metric for the use case that you presented?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that the boolean value provides necessary information for this type of use case since the value could be unthrottled even in cases where the throttling is happening a lot (it just happens to be unthrottled when the value is read) and publish latency is high. A sliding window publish latency information would be more useful. Isn't the publish latency the relevant metric for the use case that you presented?

You are assuming that this boolean would be accessed only once in a while? The example that you provided, sure, that very instant, that single call is a true negative, but the very next call, or a few calls later - it would give the right result that throttling is happening.
latency metrics cannot be used as a source of truth here as the latencies can be near "timeout" for N number of reason, not necessarily resulting in/caused by throttling on the topic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are assuming that this boolean would be accessed only once in a while? The example that you provided, sure, that very instant, that single call is a true negative, but the very next call, or a few calls later - it would give the right result that throttling is happening.
latency metrics cannot be used as a source of truth here as the latencies can be near "timeout" for N number of reason, not necessarily resulting in/caused by throttling on the topic.

my assumptions of how throttling will be changing on and off is perhaps very different from your assumption. It could happen multiple times a second, depending on the accuracy and configuration of the implementation. My opinion is simply that a boolean metric is not useful as a metric for the use case that you presented. A utilization (or probability) metric or a latency based metric would be more useful for such decisions. I could always be wrong about this and we don't have to get stuck on this particular detail. It's possible to revisit later based on real usage experiences.

Comment on lines +245 to +246
Thus, for case (a), we will be asking the client to pause further message sending for the remainder of the second and in
case (b), we will send `0` as the channel read is anyway paused.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't make sense. The previous comment starting "This logic might need some more careful thought." contains more details about the problem to solve. The solution should take all ways how the broker might throttle producing and calculate a single throttling period.

In the previous comment, I suggested frequently calculating the RTT of the client-broker connection using Ping/Pong commands and sending the throttle producer command early enough so that TCP level throttling (switching auto read to false) wouldn't have to take place. The similar solution should be expanded to cover all ways that producers get rate limited. For the broker level throttling, a similar approach could be used. My assumption is that by using the up-to-date RTT information, the broker could predict when the throttling would be needed and send it out considering the lag that there is until the command takes effect.

Besides RTT calculation, It could be useful to add a timestamp field to the CommandPing and CommandPong messages besides the request_id field since that information could be used to also estimate the clock differences between the client and the broker. The timestamps might be useful in calculating how long it takes for a message to arrive on the broker since it was created in the client.

I'm getting more confident that the problem is solvable, but we need more design and experimentation to get it solved. Ignoring this problem would make this PIP not very useful since it wouldn't meet the goal of avoiding the TCP level throttling. That cannot be disabled completely since the broker will quickly run out of memory at high publish rates. I hope that the problem is clear and we are aligned to address it as part of this PIP.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment specifically about the case b?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment specifically about the case b?

yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I understand that the way broker is handling the situation is different in case b than case a, but from a protocol perspective, there is no difference. In both cases, client gets the ProducerThrottled command and sends back receipt. Client is again expected to stop producing further.

case b can happen due to either memory consumption per connection and pending message count per connection exceeding the configured value via the configs maxMessagePublishBufferSizeInMB and maxPendingPublishRequestsPerConnection respectively. Both of these situations have a direct impact on broker's performance right away and are not just the cause of noisy neighbour to other topics sharing the connection.

Even if we effectively calculate RTT for the connection, we are still left with the challenge to correctly "predict" when the memory would exceed. message count could be easier to do but since this is affecting multiple topics, the message size pattern could be very different across those topics. Thus, in any situation, if we predict when the overall connection would exceed the memory footprint, it would either undershoot or overshoot the limit.

Taking a step back, for case a, we are actually introducing the comment so that we specifically do not need to pause the channel so that the non-breaching topics may continue to produce. In case b, this need doesn't exist.. so I do not see why we would still like to keep the channel open for read? Do you see any specific reason to let the channel read in resumed state and wait for all topics of the connection to send back a throttle receipt? Specially we may be already exceeding physical resources.


### Blocking messages to be sent during throttling

Most of the logic to actually block a message until throttling is lifted resides inside `ProducerImpl::processOpSendMsg`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current client, this blocking approach is problematic since it also applies to the Pulsar Java client async API. Blocking threads in asynchronous code is something to avoid and there's currently not a great way to do that in the Pulsar Java client.
In the Pulsar Reactive Java client, it required a relatively complex solution to implement non-blocking backpressure on top of the Pulsar Java client async API. It's handled by keeping a limited amount of messages in flight. The way to implement that isn't trivial for Reactive Streams / Project Reactor (one part of implementation is in InflightLimiter class).
It would be great if the Pulsar Java client async API had a non-blocking way for producer flow control.
This is a broad problem that needs a separate PIP.

supported) without worrying about response and then call `changeAutoRead`.
* capture `AbstractTopic::getTotalPublishRateLimitCounter` per publish rate limit counter and add relevant attribute in
the rate limit metric

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broker changes aren't currently considering how to address multi-producer topics. In that case there's the challenge of "fair queuing". Each producer should get a fair share of the quota. This problem should be addressed in the high level design.
In the current solution, there isn't a perfect solution for "fair queuing", however there's a simple solution for achieving some level of fairness. The current PIP-322 based solution is in

private void unthrottleQueuedProducers(ScheduledExecutorService executor) {
if (!processingQueuedProducers.compareAndSet(false, true)) {
// another thread is already processing unthrottling
return;
}
try {
Producer producer;
long throttlingDuration = 0L;
// unthrottle as many producers as possible while there are token available
while ((throttlingDuration = calculateThrottlingDurationNanos()) == 0L
&& (producer = unthrottlingQueue.poll()) != null) {
producer.decrementThrottleCount();
throttledProducersCount.decrementAndGet();
}
// if there are still producers to be unthrottled, schedule unthrottling again
// after another throttling period
if (throttledProducersCount.get() > 0) {
scheduleUnthrottling(executor, throttlingDuration);
}
} finally {
processingQueuedProducers.set(false);
}
}

For PIP-385, solving fair queuing will most likely be harder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broker changes aren't currently considering how to address multi-producer topics. In that case there's the challenge of "fair queuing". Each producer should get a fair share of the quota. This problem should be addressed in the high level design. In the current solution, there isn't a perfect solution for "fair queuing", however there's a simple solution for achieving some level of fairness. The current PIP-322 based solution is in

Can you elaborate more about this concern? Which specific part needs to hand "fair queuing"? Maybe an example could help me understand. But I am answering below based on our past discussions on slack and my understanding of what you mean here..

I am trying to stray away from "fair share" issue all together as it is very difficult to solve in general. From a topic and broker perspective, the only fair thing is to not receive any further messages from any producer. Now, if we try to figure out throttle time differently for different producers based on some historic data (rate/etc), that may seem fair at first. Or even if we actually distribute the rate among all producers, which also may seem fair at first. But there are problems in both the situations:

  • The producer count isn't fixed. It may vary within a single throttling period itself.
  • The producers themselves aren't sticky. Partitioned topics is a very big real world use case and the way it works is that at any given point in time, a partitioned producer is only writing to a single partition. This "time" is defined by the batch wait time and batch max size. This value in practice can be double digit ms. So suppose a topic is throttled for 300ms, effectively, in that 300 ms, only 3 of 10 connected producers would have actually produced.. At a micro level, the distribution is not uniform at all.
  • Given this bursty nature of producers at a sub second granularity, doing either uniform distribution (of permits/throttle time) or historic rate based - both would lead to unfair situation. Thus, i feel first cum first serve might actually work out the fairest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate more about this concern? Which specific part needs to hand "fair queuing"? Maybe an example could help me understand. But I am answering below based on our past discussions on slack and my understanding of what you mean here..

Well if there are multiple producer producing to the same topic, the throttling solution will have to throttle all producers somewhat evenly so that all producers get a fair share. It doesn't have to be perfect. Ignoring this aspect would cause the solution to not meet the goals. The goal is to avoid TCP level throttling. That means that with multiple producers, there's a need to send out throttle commands to all producers in a way that they produce about the same amount, thus it's called "fair queuing". If there would be priorities, that information could be later used to give some producers more "shares". Obviously prioritization could be scoped out from this phase. However, it's something that might be useful later and is typically a concern in multi-tenant systems with different QoS levels.

I am trying to stray away from "fair share" issue all together as it is very difficult to solve in general. From a topic and broker perspective, the only fair thing is to not receive any further messages from any producer. Now, if we try to figure out throttle time differently for different producers based on some historic data (rate/etc), that may seem fair at first. Or even if we actually distribute the rate among all producers, which also may seem fair at first. But there are problems in both the situations:

It's not an option to stay away from the problem unless we admit that this solution doesn't support multiple producers.

  • The producer count isn't fixed. It may vary within a single throttling period itself.

The changing number of producers isn't a problem. The throttling solution on the broker side will have to consider numerous changes and the changing number of producers doesn't make that harder.

  • The producers themselves aren't sticky. Partitioned topics is a very big real world use case and the way it works is that at any given point in time, a partitioned producer is only writing to a single partition. This "time" is defined by the batch wait time and batch max size. This value in practice can be double digit ms. So suppose a topic is throttled for 300ms, effectively, in that 300 ms, only 3 of 10 connected producers would have actually produced.. At a micro level, the distribution is not uniform at all.

This will impact accuracy, but it's not a reason to ignore fairness. Let's take a look at the current solution for fairness in PublishRateLimiterImpl. The solution is very simple. When publishing is throttled, the producer will be added to a unthrottling queue. When the throttling period is over, producer will be unthrottled one by one, as long as it doesn't go over the limit again. This doesn't ensure perfect fairness in any way, but the solution for good enough for a starting point. For the throttling command based throttling a similar solution wouldn't really work.
It would have to be designed in a different way. However it doesn't have to be perfect either.

  • Given this bursty nature of producers at a sub second granularity, doing either uniform distribution (of permits/throttle time) or historic rate based - both would lead to unfair situation. Thus, i feel first cum first serve might actually work out the fairest.

This is a challenge in accuracy. We simply need a good enough solution which ensures some level of fairness while the goal of PIP-385 is achieved. We want to avoid having to throttle at the TCP level. As we know, that cannot be removed since clients might misbehave and that could lead to the broker running out of memory or much higher memory unless the challenge is solved.

Unless this was a hard problem to solve we would have added producer throttling much easier. I'm not making up design problems here when I'm bringing up these points. These are essential complexity of the problem domain that must be solved in a good enough way to meet the requirements. I'm sure that we'll get there when we learn more about the problem, experiment with different solutions and continue iterating until we find those good enough solutions. It won't happen just by writing text in a design document. We will need to do many experiments along the way. That's how we will make progress.

One of the starting points could be to describe a good acceptance test plan for PIP-385. That would clarify what we are really expecting from this feature and how we will find out when we have a good enough solution in place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well if there are multiple producer producing to the same topic, the throttling solution will have to throttle all producers somewhat evenly so that all producers get a fair share. It doesn't have to be perfect. Ignoring this aspect would cause the solution to not meet the goals. The goal is to avoid TCP level throttling. That means that with multiple producers, there's a need to send out throttle commands to all producers in a way that they produce about the same amount, thus it's called "fair queuing". If there would be priorities, that information could be later used to give some producers more "shares". Obviously prioritization could be scoped out from this phase. However, it's something that might be useful later and is typically a concern in multi-tenant systems with different QoS levels.

There seems to be a disconnect here. Let's take a step back.

A topic may have N number of producers, but when the topic beaches the quota, beyond that point, no further messages from any of the N producers should be accommodated. As per the current design, all the connected producers of the topic will get the ProducerThrottled command and all of the connected producers will pause further producer to the topic for the requested time.

I still don't really understand where the "fair queuing" comes into picture here? Since rate limiting works at a topic granularity and not a producer granularity, we can't really do anything fair for the producers. Even if priorities come into picture, I believe, by design, they should be at a topic level and not at producer level.

It's not an option to stay away from the problem unless we admit that this solution doesn't support multiple producers.

Please explain how it doesn't support multiple producers? If current rate limiting supports multiple producers, I am not degrading anything from that. Unless you mean to say that current rate limiting design and implementation in pulsar also doesn't support multiple producers.

Again, @lhotari - I would request you to give an example of what you are calling "fair queuing among producers of a single topic" and how are multiple producers not supported in the current proposal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be a disconnect here. Let's take a step back.

A topic may have N number of producers, but when the topic beaches the quota, beyond that point, no further messages from any of the N producers should be accommodated. As per the current design, all the connected producers of the topic will get the ProducerThrottled command and all of the connected producers will pause further producer to the topic for the requested time.

It's not a feasible approach to synchronize the commands across all producer and expect them to be paused all at once. Each producer will receive that command at a different point in time depending on different factors such as queue lengths. Each producer would be also producing at independent rate so the results not be uniform if all producers are paused for the same duration of time.

I still don't really understand where the "fair queuing" comes into picture here? Since rate limiting works at a topic granularity and not a producer granularity, we can't really do anything fair for the producers. Even if priorities come into picture, I believe, by design, they should be at a topic level and not at producer level.

When you have multiple producers producing to the same topic, the expectation is that each producer would be able to produce roughly the same amount of messages. Isn't that a reasonable expectation?

Please explain how it doesn't support multiple producers? If current rate limiting supports multiple producers, I am not degrading anything from that. Unless you mean to say that current rate limiting design and implementation in pulsar also doesn't support multiple producers.

In the current solution fair queuing is relatively easy to achieve since there isn't a long feedback cycle in the way how rate limiting is controlled with the TCP level throttling. In the producer throttle command based solution, there's a relatively long delay from the point in time when the command is sent to the producer until it impacts the rate limiting. That changes the dynamics in a way that makes it harder to avoid the buffers growing on the broker side and hitting limits which require TCP level throttling. The producer throttle command will work only in the case that it's able to keep the buffers on the broker side in control so that the limits aren't hit. When the producer produces faster than the limit, it will be constantly hitting the limit and there will be a need to throttle and un-throttle to keep the rate between the limits. The broker will have to predict when to send the command so that this strategy would be effectively.

Again, @lhotari - I would request you to give an example of what you are calling "fair queuing among producers of a single topic" and how are multiple producers not supported in the current proposal.

I hope that the above explanations provide more clarity. Let's say that you have a rate limit of 1 MB/s and there are 3 producers currently sending at 500kB/s, 100kB/s and 5MB/s. The "fair queuing" aspect of this is that the expectation is that each producer is able to produce about 333kB/s. If all producers would be pausing the same amount of time, the producer with the highest current rate would be getting most of the bandwidth. The producer with the highest throughput would likely also have the longest queue length and that's why sending the producer command would take longer to take effect on that producer. Eventually it would pause for the duration specified in the producer throttle command. The pausing will start at a different point of time and also end at a different point in time. That's not a problem itself. The problem is the lack of fairness.
The reason why I was explaining the importance of having the RTT information is that the commands could be sent early so that the broker buffers says in the limits so that the broker doesn't have to use TCP throttling to prevent the buffers growing over limits. I hope this explanation helps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a feasible approach to synchronize the commands across all producer and expect them to be paused all at once. Each producer will receive that command at a different point in time depending on different factors such as queue lengths. Each producer would be also producing at independent rate so the results not be uniform if all producers are paused for the same duration of time.

2 questions here -

  1. from an application (users of that topic) point of view - why are the 2 producers different? a single topic is supposed to be homogeneous use case
  2. How exactly, or rather, on what basis, would it be decided to send throttle command to one producer vs other for the same topic? there is no such mechanism in pulsar right now.

When you have multiple producers producing to the same topic, the expectation is that each producer would be able to produce roughly the same amount of messages. Isn't that a reasonable expectation?

Sure, yes. Since a single topic is supposed to be a homogeneous use case, all producers connected to the topic are supposed to be roughly equal (except for the micro-batching that i explained previously for partitioned topics' partitions).

In the same breath - all producers producing to the same topic are also also equal priority and from an app perspective, it doesn't matter which producer is producing. (this will be used below)

I hope that the above explanations provide more clarity.

No it didn't.

Let's say that you have a rate limit of 1 MB/s and there are 3 producers currently sending at 500kB/s, 100kB/s and 5MB/s. The "fair queuing" aspect of this is that the expectation is that each producer is able to produce about 333kB/s.

Let me ask this - why? Why does it matter that each producer does 333kB/s. At the end of the day, the topic has to get 1MB/s - it can be from any producer (or a combination of) since all producers are equal in nature.

I've already established that there can be micro-batches due to partitioned topics' rolling assignment and batch max wait time etc..

If all producers would be pausing the same amount of time, the producer with the highest current rate would be getting most of the bandwidth. The producer with the highest throughput would likely also have the longest queue length and that's why sending the producer command would take longer to take effect on that producer.

Sure, yes, that's possible that even after accounting for RTT, this highest current rate producer may have higher queue length so the throttle receipt command may take longer to get back - but that's where the config throttleProducerReceiptWaitTimeMillis comes into picture. Where we will simply block the channel read post this time. This will only be relevant for that particular producer.

Eventually it would pause for the duration specified in the producer throttle command. The pausing will start at a different point of time and also end at a different point in time. That's not a problem itself. The problem is the lack of fairness. The reason why I was explaining the importance of having the RTT information is that the commands could be sent early so that the broker buffers says in the limits so that the broker doesn't have to use TCP throttling to prevent the buffers growing over limits. I hope this explanation helps.

I understand the RTT point. It is useful. I still don't understand the need for fairness here. In the example of 3 producers 500Kbps, 100KBps and 5MBps on a topic with 1MBps limit, essentially, the incoming traffic is 5.6MBps and the permitted traffic is 1MBps, so we have to reject/stop the delta - 4.6MBps. Why does it matter if all of this 4.6MBps is being blocked from the 5MBps producer? or any other combination.. There is practically no difference in the end result.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, yes, that's possible that even after accounting for RTT, this highest current rate producer may have higher queue length so the throttle receipt command may take longer to get back - but that's where the config throttleProducerReceiptWaitTimeMillis comes into picture. Where we will simply block the channel read post this time. This will only be relevant for that particular producer.

In the current design of PIP-385, I understand that this relates to the current design "For the case where connection level breaches happen - i.e. breach due to maxPendingPublishRequestsPerConnection, maxMessagePublishBufferSizeInMB or broker level rate limit - we continue to pause the connection, but we still send the ThrottleProducer command in order to inform the client about the reason for any potential timeout. The reason we continue to pause reads is that we are already breaching memory limits, thus, even if the client sends a ThrottleProducerReceipt response, we won't be able to read it until further pending messages before that are read."

this isn't practical at all. It's fine to have such assumptions that it could work this way, but It's not practical. The reason for this is that if you'd wait for the limit to be exceeded and if the command is sent at that point, the producer will continue to produce until it receives and processes the throttling command. If we'd wish to avoid broker buffers from going over limits, the receiving of new messages will have to be throttled with TCP level throttling. TCP level throttling is something that I believe that would need to be avoided so that we could really meet the goals of PIP-385 so that producers and consumers sharing the same TCP connection wouldn't interfere. In the motivation of PIP-385, the first point is "Noisy neighbors - Even if one topic is exceeding the quota, since the entire channel read is paused, all topics sharing the same connect (for example - using the same java client object) get rate limited."

As mentioned in other comments, we don't have to stop making progress. Let's get to the implementation and have good integration test cases to measure whether the goals of PIP-385 are met. That will help us iterate and learn on the go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't practical at all. It's fine to have such assumptions that it could work this way, but It's not practical. The reason for this is that if you'd wait for the limit to be exceeded and if the command is sent at that point,

I am a bit confused on what you are referring to here? As per the proposal:

  1. topic quota breaches
  2. throttle command is sent to producers of that topic (irrespective of the connection)
  3. broker wait for the throttle receipt from producer until throttleProducerReceiptWaitTimeMillis time
  4. If any producers hasn't responded back (received by broker) within that time frame, we disable channel auto-read for those producers' connections.

As I understand from the discussions so far, the concern is only around the additional messages received during this throttleProducerReceiptWaitTimeMillis duration, and thus, the whole RTT thing came into picture. I've proposed a solution for that as well in the other comment thread.

For case (b) that I have mentioned in the design, the order of events is exactly how it is currently in pulsar, except for the additional command sending to producers:

  1. resource limits breaches for the connection (can be from any topic)
  2. throttle command is sent to producers of that connection (not related to a specific topic now)
  3. we disable channel auto-read for the connection in question.

This is exactly how it behaves today in pulsar, with the addition of the step 2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs PIP
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants