Skip to content

PIP-341: Pluggable client metrics tracker interface #22145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
305 changes: 305 additions & 0 deletions pip/pip-341.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
# PIP-341: Pluggable client metrics tracker interface

# Background knowledge

The Pulsar Java client records stats/metrics when `statsIntervalSeconds` is configured to a positive number (default is 60 seconds). The current implementation (`ProducerStatsRecorderImpl`/`ConsumerStatsRecorderImpl`) will perodically log stats and reset them based on this interval.

The user can manually retrieve the stat values by calling `getStats()` on the actual `Consumer`/`Producer` objects, however this will just return a snapshot of the metrics at that instant.

The current tracked `Consumer` stats are:
```java
// Current window value
long getNumMsgsReceived()
long getNumBytesReceived()
long getNumAcksSent()
long getNumAcksFailed()
long getNumReceiveFailed()
long getNumBatchReceiveFailed()
double getRateMsgsReceived()
double getRateBytesReceived()

// Cumulative total
long getTotalMsgsReceived()
long getTotalBytesReceived()
long getTotalReceivedFailed()
long getTotaBatchReceivedFailed()
long getTotalAcksSent()
long getTotalAcksFailed()

// Current value
Integer getMsgNumInReceiverQueue()
Map<Long, Integer> getMsgNumInSubReceiverQueue()
```

The current tracked `Producer` stats are:
```java
// Current window value
long getNumMsgsSent()
long getNumBytesSent()
long getNumSendFailed()
long getNumAcksReceived()
double getSendMsgsRate()
double getSendBytesRate()
double getSendLatencyMillis50pct()
double getSendLatencyMillis75pct()
double getSendLatencyMillis95pct()
double getSendLatencyMillis99pct()
double getSendLatencyMillis999pct()
double getSendLatencyMillisMax()

// Cumulative total
long getTotalMsgsSent()
long getTotalBytesSent()
long getTotalSendFailed()
long getTotalAcksReceived()

// Current value
int getPendingQueueSize()
```

If connecting to a partitioned topic, then `PartitionedTopicProducerStatsRecorderImpl`/`MultiTopicConsumerStatsRecorderImpl` recorders are also created and these recorders simply aggregate the stat values from the per-partition stat recorders.

# Motivation
The current implementation poses several challenges:
1. Using the `Consumer`/`Producer` `getStats()` API will likely result in data loss as the reset interval is done internally, so it is difficult to perfectly call the API to capture all data in the window
2. No pluggable mechanism to support other metric frameworks (ex. OTel, custom in-house solution, etc)
3. When enabled, the recorder(s) will record all stats even if the user only wants to track certain stats

# Goals

## In Scope
- Provide a pluggable client metrics tracker interface
- Some default implementation(s): `LoggingMetricsTracker` which replaces current behavior
- User can provide their own custom implementation
- Compatibility with existing public `Producer`/`Consumer`'s `getStats` API

## Out of Scope
- OTeL tracker (TBD if it should be included in this PIP or a separate one)

# High Level Design
We will add a simple metrics tracking interface for client/producer/consumer which will contain methods to record certain events in each class (ex. `recordProducerCreated`, `recordMessagesSent`, `recordMessageReceived`, etc). The underlying client/producer/consumer will call these methods, and the implementing class can decide on how to handle the event(s).

We will add a `MetricsTrackerFactory` interface at the `PulsarClient` level which will create the individual metrics trackers whenever a client/producer/consumer is created. This `MetricsTrackerFactory` will be configurable by the user when creating a `PulsarClient`.

# Detailed Design

## Design & Implementation Details
### MetricsTrackerFactory
The `MetricsTrackerFactory` is responsible for creating the metrics trackers, but also receiving any external configurations required by the implementation via a `configure` method:
```java
public interface MetricsTrackerFactory {
void configure(Map<String, String> params);

PulsarClientMetricsTracker create(PulsarClient client);

ProducerMetricsTracker create(PulsarClient client, Producer<?> producer);

ConsumerMetricsTracker create(PulsarClient client, Consumer<?> consumer);
}
```
`PulsarClientMetricsTracker#create` is called when a `PulsarClient` is created, while `ProducerMetricsTracker#create`/`ConsumerMetricsTracker#create` is called when a `Producer`/`Consumer` is created. The underlying client/producer/consumer object is passed as a parameter so that the metric tracker can query information such as topic or subscription name.

`MetricsTrackerFactory` will be configured in `ClientBuilder`, and the user can either provide a fully instantiated object or provide the class name along with any extra configurations:
```java
ClientBuilder metricsTrackerFactory(MetricsTrackerFactory metricsTrackerFactory);

ClientBuilder metricsTrackerFactory(String metricsTrackerFactoryClassName, Map<String, String> params)
```
The 2nd API (class name + params) will attempt to create the provided class and call the `MetricsTrackerFactory#configure` method with the provided params. This is useful for configuration-driven users, and this approach is similar to the one taken in the `Authentication` implementation.

### PulsarClientMetricsTracker
`PulsarClientMetricsTracker` is responsible for recording events occurring in the `PulsarClient`.
```java
public interface PulsarClientMetricsTracker {
void recordProducerCreated(Producer<?> producer);

void recordConsumerCreated(Consumer<?> consumer);

void close();
}
```

### ConsumerMetricsTracker
`ConsumerMetricsTracker` is responsible for recording events occurring in the consumer.
```java
public interface ConsumerMetricsTracker {
void recordMessageReceived(Message<?> message);

void recordAcksSent(long numAcks);

void recordAckFailed();

void recordReceiveFailed();

void recordBatchReceiveFailed();

void close();

ConsumerStats getStats();
}
```

### ProducerMetricsTracker
`PulsarClientMetricsTracker` is responsible for recording events occurring in the producer.
```java
public interface ProducerMetricsTracker {
void recordMessagesSent(long numMsgs, long totalMsgsSize);

void recordSendFailed();

void recordAckReceived(long sendTimeNs);

void close();

ProducerStats getStats();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to force this new interface implementation to implement that?

}
```

### Metric tracker creation in producer/consumer
While `PulsarClientMetricsTracker` creation fits naturally in the `PulsarClientImpl` class, the producer/consumer metric tracker is more interesting as the producer/consumer class structure is a bit more complex.

Producer/consumer have base classes `ProducerBase`/`ConsumerBase`, and both the single-topic/partition classes (`ProducerImpl`/`ConsumerImpl`) and the multi-topic/partitioned wrapper classes (`PartitionedProducerImpl`/`MultiTopicsConsumerImpl`) extend the `ProducerBase`/`ConsumerBase` class. We need a metrics tracker to record events in `ProducerImpl`/`ConsumerImpl`, but we also need to record events occurring in `PartitionedProducerImpl`/`MultiTopicsConsumerImpl`. For example, `batchReceiveFailed` is recorded at [MultiTopicsConsumerImpl.java#L404](https://github.com/apache/pulsar/blob/fc2e314c3560eb5011ab3e5e3ebf66fa0b9e6d4e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L404).

Since we need to record events in both layers, we can create and store the metric trackers in the base classes `ProducerBase`/`ConsumerBase`, so that both `ProducerImpl`/`ConsumerImpl` and `PartitionedProducerImpl`/`MultiTopicsConsumerImpl` will have its own metric tracker.

For a non-partitioned topic, there will only be one producer/consumer metrics tracker. For partitioned topics, there will be one producer/consumer tracker per partition (topic name `{partitionedTopicName}-partition-{partitionNum}`), and an additional tracker on the multi-topic/partitioned wrapper (topic name `{partitionedTopicName}`). It is up to the metric tracker implementation to decide on how to handle this (ex. aggregation at `partitionedTopicName` level).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to make it easy for implementors to prepare a partition number label if it is a partition. We can have another interface targeted at that or some other means. Let's avoid topic parsing :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add partitionIndex as a parameter to the MetricsTrackerFactory#create producer/consumer metrics tracker methods. Value will be -1 for non-partitioned topics. This is the same approach as Producer/ConsumerImpl creation

* @param partitionIndex partition index of a partitioned topic. the value -1 is used for non-partitioned topics.
* @param conf producer configuration
* @param schema topic schema
* @param interceptors producer interceptors
* @param producerCreatedFuture future for signaling completion of async producer creation
* @param <T> message type class
*
* @return a producer instance
*/
protected <T> ProducerImpl<T> newProducerImpl(String topic, int partitionIndex,
ProducerConfigurationData conf,
Schema<T> schema,
ProducerInterceptors interceptors,
CompletableFuture<Producer<T>> producerCreatedFuture,
Optional<String> overrideProducerName) {
return new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, partitionIndex, schema,
interceptors, overrideProducerName);
}


### Error handling
Exceptions thrown by the metric tracker methods may impact internal producer/consumer functionality so it may be ideal to wrap all calls with a try/catch. This can be implemented by adding a wrapper which will hold the metric tracker and wrap each call with a try/catch (similar approach as interceptors).

```java
public class ProducerMetricsTrackerWrapper implements Closeable {
private static final Logger log = LoggerFactory.getLogger(ProducerMetricsTrackerWrapper.class);

private final ProducerMetricsTracker metricsTracker;

public ProducerMetricsTrackerWrapper(ProducerMetricsTracker metricsTracker) {
this.metricsTracker = metricsTracker;
}

public void recordMessagesSent(long numMsgs, long totalMsgsSize) {
try {
this.metricsTracker.recordMessagesSent(numMsgs, totalMsgsSize);
} catch (Throwable e) {
log.warn("Error executing producer metrics tracker recordMessagesSent call ", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably throttled.

}
}

// Other ProducerMetricsTracker methods...
}
```

A `ConsumerMetricsTrackerWrapper` will be used for the consumer.

### No-Op implementation
To avoid unnecessary null checks, we will provide a `NoOpMetricsTrackerFactory` which is configured if no factory has been provided. It will create `NoOpProducerMetricsTracker` and `NoOpConsumerMetricsTracker` metric trackers which will have empty implementations to do nothing for each event.

### LoggingMetricsTrackerFactory
The default configured metrics tracker factory will be the `LoggingMetricsTrackerFactory` as it will replace the existing functionality.

The logging producer/consumer metrics tracker will initialize and hold an instance of the underlying stats recorder, and forward all calls to the stats recorder.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to do that in the infrastructure and not a concrete implementation.
and after 2 versions, remove it.


Example:
```java
public class LoggingConsumerMetricsTracker implements ConsumerMetricsTracker {
private final ConsumerStatsRecorder statsRecorder;

public LoggingConsumerMetricsTracker(PulsarClient pulsarClient, Consumer<?> consumer) {
if (consumer instanceof MultiTopicsConsumerImpl) {
this.statsRecorder = new MultiTopicConsumerStatsRecorderImpl(consumer);
} else if (consumer instanceof ConsumerImpl){
this.statsRecorder = new ConsumerStatsRecorderImpl((PulsarClientImpl) pulsarClient, null, consumer);
} else {
this.statsRecorder = new ConsumerStatsDisabled();
}
}

@Override
public void recordMessageReceived(Message<?> message) {
statsRecorder.updateNumMsgsReceived(message);
}

// Other methods...
}
```

This allows us to completely replace all existing calls to the stat recorders with our metrics trackers, while still maintaining backwards compatibility with the previous behavior.

### Backwards compatibility with statsIntervalSeconds configuration
We can mark the `statsIntervalSeconds` client configuration as deprecated since implementation-specific configurations should be provided via the params map when constructing the metrics tracker in `ClientBuilder` (`statsIntervalSeconds` is specific to `LoggingMetricsTrackerFactory`).

We will set the default metrics tracker factory as the `LoggingMetricsTrackerFactory` to maintain backwards compatibility, however we still need to do some checks:
1. If a non-default non-null metrics tracker factory (ex. OTel one) is configured, then use it
2. Else if `statsIntervalSeconds=0`, then the user has explicitly disabled client metrics so we must use the `NoOpMetricsTrackerFactory`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As said before, I wouldn't couple those two.

3. Else, use `LoggingMetricsTrackerFactory`

Inside `LoggingMetricsTrackerFactory`, we must also derive `statsIntervalSeconds`:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see it a bit differently.
The producerStats, consumerStats, and its matching interval config will be marked deprecated and removed in 2 subsequent versions.
Users can choose a LoggingMetricsTrackerFactory, which they configure with a new parameter, or build on their own and provide the factory instance, already configured.

1. First try to use `statsIntervalSeconds` if it is configured via the new params map in `ClientBuilder metricsTrackerFactory(String metricsTrackerFactoryClassName, Map<String, String> params)`
2. Otherwise, try to use the deprecated `statsIntervalSeconds` value if it is greater than 0
3. Finally, default to 60 seconds if `statsIntervalSeconds` is not configured anywhere

### Supporting existing Consumer/Producer getStats API
There are two major challenges with supporting the existing producer/consumer `getStats()` API:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have the two mechanisms in place, this problem disappears. This implementation will be deleted in two versions from now.

1. The returned object depends on whether the topic is partitioned or non-partitioned (`PartitionedTopicProducerStatsRecorderImpl`/`MultiTopicConsumerStatsRecorderImpl` vs `ProducerStatsRecorderImpl`/`ConsumerStatsRecorderImpl`). This is problematic for our metrics tracker interface as it means the user must know when to provide the partitioned vs non-partitioned stats recorder by checking the producer/consumer class type.
2. The existing stat recorders do not provide setter/builder/supplier methods to construct a stats recorder with specific values (they only provide the increment methods). This is problematic as the user will have to maintain an updated stats recorder instance in the metrics tracker, which can result in duplicate event recording (one call to stats recorder, and another call to the user's implementation for each event).

We can alleviate these issues by making two changes:
1. Consolidate to a new stats supplier class to handle both partitioned & non-partitioned cases
2. Add setter/builder/supplier methods to the new stats supplier class to allow the metrics tracker to supply stat values

#### Consolidated ProducerStatsSupplier/ConsumerStatsSupplier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the context here. Maybe you should explain in background knowledge what are those suppliers.

We are able to consolidate into a single class as the partitioned stats recorder just aggregates the stat values across all partitions by maintaining a `partitionStats` map of partition to stats object. Furthermore, this map is only populated & aggregated (via `updateCumulativeStats`) when the user calls `getStats()`. All of this logic can be handled in a single class as we can simply check the `partitionStats` map to determine if the stats supplier is handling a partitioned vs non-partitioned topic (`partitionStats` map is empty or not), and aggregate accordingly.

#### Setter/builder/supplier methods
Calling producer/consumer `getStats()` will forward the call to our metrics tracker's `getStats()` method, so we need a way for users to construct a `ProducerStats`/`ConsumerStats` object. We can do this by adding setter/builder/supplier methods to our new `ProducerStatsSupplier/ConsumerStatsSupplier` class so the user can asynchronously provide the values on-demand as opposed to having to maintain an updated instance of the recorder on each event recording.

## Public-facing Changes
A new set of metrics tracker interfaces will be added:
- `MetricsTrackerFactory`
- `PulsarClientMetricsTracker`
- `ProducerMetricsTracker`
- `ConsumerMetricsTracker`

`ClientBuilder` will have new methods for configuring the metrics tracker factory:
```java
ClientBuilder metricsTrackerFactory(MetricsTrackerFactory metricsTrackerFactory);

ClientBuilder metricsTrackerFactory(String metricsTrackerFactoryClassName, Map<String, String> params)
```

And `ProducerStatsSupplier`/`ConsumerStatsSupplier` are new implementations of the `ProducerStats`/`ConsumerStats` interfaces.

# Backward & Forward Compatibility

## Revert
Downgrading Pulsar will simply remove the newly added metrics tracker, and revert the functionality back to using `statsIntervalSeconds` to determine if producer/consumer stats recorder is enabled.

## Upgrade
There are no specific upgrade steps required aside from configuring the desired metrics tracker as we will ensure backwards compatibility.

# Alternatives

## Producer/ConsumerStats compatibility strategies
We have a few options for compatibility between the metrics tracker and the old producer/consumer stats system:
1. Maintain full compatibility with the existing Producer/Consumer `getStats()` API (this is the current proposal)
2. Metrics tracker is a completely separate flow, and deprecate all of the old producer/consumer stats system
3. Deprecate the `getStats()` API, and only implement it in the default `LoggingMetricsTracker`

Option 1 is ideal if we have to maintain compatibility with the existing `getStats()` API.

Option 2 works if we decide to completely deprecate the old stats system.

Option 3 is a "meet-in-the-middle" solution although it feels awkward as it would add an already deprecated method (`getStats()`) to the new metrics tracker interface.

Ultimately, it all depends on if we need to provide support for the `getStats()` API.

## Supporting multiple metrics trackers
If we want to support configuring multiple metrics trackers, then we again need to look at the `Producer`/`Consumer`'s `getStats()` API:
1. We need to define what the expected behavior is when calling `getStats()` if multiple metrics trackers are configured (ex. which one will be returned if any?)
2. If we can deprecate the `getStats()` API, then there is no issue with supporting multiple metrics trackers

# General Notes

# Links
* Mailing List discussion thread:
* Mailing List voting thread: