Skip to content

Latest commit

 

History

History

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

README.md

Orbit

Partitioned Consumer Groups

Artifact 0.1.0 0.1.1 Dependencies Help javadoc Maven Central

Initial implementation of a client-side partitioned consumer group feature for NATS streams leveraging some of the new features introduced in nats-server version 2.11.

Note that post 2.11 versions of nats-server may include new features related to the consumer group use case that could render this client-side library unneeded (or make much smaller)

Overview

This library enables the parallelization through partitioning of the consumption of messages from a stream while ensuring a strict order of not just delivery but also successful consumption of the messages using all or parts of the message's subject as a partitioning key.

In JetStream terms, strictly ordered consumption is achieved when you set the consumer's 'max acks pending' value to 1. However, setting this on a JetStream consumer has the very unfortunate side effect of being very low throughput (limited by the network latency and processing speed) and not being horizontally scalable: only one message is being delivered and processed synchronously at a time from that JetStream consumer, no matter how many instances of the consuming application are deployed.

The library allows the creation of 'consumer groups' on Stream, where each 'member' of the consumer group can consume from the group in parallel (with max acks pending 1 if needed), with the guarantee that in no way more than one message for a particular key can be consumed at the same time. Client applications wanting to consume messages from the group simply do so using a 'member name' and providing a callback. Even if more than one instance of a member is deployed, only one of those instances will be delivered messages at a time.

The library takes care of the partitioning and the mapping of the partitions between the members of the group, the idea being that it is mostly transparent to the consuming application's developers who only need to join a consumer group, providing a member name and a callback to process and acknowledge the message when successfully processed.

NATS Partitioned consumer groups come in two flavors: elastic and static.

Static partitioned consumer groups assume that the stream already has a partition number present as the first token of the message's subjects (something that can be done automatically when messages are stored into to the stream by setting a subject transform for the stream). You can only create and delete static consumer groups. Any change to the consumer group's config in the KV bucket will cause all the member instances for all members of the group to stop consuming.

Elastic partitioned consumer groups on the other hand are implemented differently: the stream doesn't need to already contain a partition number subject token and you can administratively add and drop members from the consumer group's config whenever you want without having to delete and re-create the consumer (like you have to with static consumer groups).

In both cases In both cases you must specify when creating the consumer group the maximum number of members for the group (which is actually the number of partitions used when partitioning the messages), plus a list of "members" (named instances of the consuming application). The library takes care of distributing the members over the list of partitions using either a 'balanced' distribution (the partitions are evenly distributed between the members) or 'mappings' (where you assign administratively the mappings of partitions to the members). The membership list or mappings must be specified once at consumer group creation time for static consumer groups, but can be changed at any time for elastic consumer groups.

Each consumer groups has a configuration which is stored in a KV bucket (named static-consumer-groups or elastic-consumer-groups).

Static

Static consumer groups operate on a stream where the partition number has already been inserted in the subject as the first token of the messages. In this mode of operation, the library creates JetStream consumers (one per member of the group) directly on the stream. This is not elastic: you create the consumer with a list of members once, and you can not adjust that membership list or mapping for the life of the consumer group (if you want to change the mapping, up to you to delete and re-create the static partitioned consumer group, and to figure out which sequence number you may want this new static partitioned consumer group to start from).

Elastic

Elastic consumer groups operate on any stream, the messages in the stream do not have the partition number present in their subjects. The membership list (or mapping) for the consumer can be adjusted administratively at any time and up to the max number of members defined initially. The consumer group library in this case creates a new work-queue stream that sources from the stream, inserting the partition number subject token on the way. The consumer group library takes care of creating this sourced stream and managing all the consumers on this stream according to the current membership, the developer only needs to provide a stream name, consumer group name and a member name and callback and make sure to ack the messages. You can specify (at creation time) a maximum size (in number of messages or bytes) for this working queue stream, but be aware that once this stream has reached its limit, it will pause the sourcing for at least 1 second (expecting messages to be consumed from the consumer group, thereby making room for more messages to be sourced) so you will want to set this value to more than 1 second's worth of message consumption by the clients of the consumer group or this could result in small delays in the consumption of messages from the consumer group.

High availability

You can deploy and run multiple instances of the consuming application using the same member name, in that case only one of the running instances of the member will be 'pinned' and have messages delivered to it (thereby the other instances are effectively in hot standby). There are functions (ElasticMemberStepDown() and StaticMemberStepDown) to force a change of the currently pinned member instance.

The importance of AckWait for reactivity to faults

The timers related to how quickly consumer group member instances react to faults (for example the currently pinned instance getting killed or suspended for an extended period of time) are related and derived from the AckWait value passed when joining the consumer group to consume messages, due to the limit of nats.go current implementation of Consume, if the AckWait value passed is less than 2 seconds and the consumer group is caught up with the head of the stream, then you may see some (slow and harmless) flapping of the active instance for the members in the consumer group. If the AckWait value passed is 0 then the default AckWait value of 5 seconds is used. Note that in the case of static consumer groups without acknowledgements, you can adjust the Pinned TTL value by specifying an AckWait value in the ConsumerConfig you pass to static consume.

Using Partitioned Consumer Groups

For the client application programmer, there is one basic functionality exposed by both static and elastic partitioned consumer groups: join and consume messages (when selected) from a named consumer group on a stream by specifying a member name, a regular JetStream consumer config, and a callback. The library takes care of stripping the partition number token from the subject such that you can use any existing callback code you may already have as is.

There are also administrative functions to create and delete consumer groups, plus, in the case of elastic consumer groups only, the ability to add/drop members or to change the custom member to partition mappings on an existing elastic consumer group.

CLI

Included is a small command line interface tool, named cg and located in the cli directory, that allows you to manage consumer groups, as well as test or demonstrate the functionality.

This cg CLI tool can be used by passing it commands and arguments directly, or with an interactive prompt using the prompt command (e.g. cg static prompt).

For more details on the CLI visit the Partitioned Consumer Groups CLI Project

Binaries

You can download the latest cg.jar archived in a tar file cg.tar or zip file cg.zip

Demo walkthrough

Static

Create a stream "foo" that automatically partitions over 10 partitions using static_stream_setup.sh, then generate some traffic (a new message every 10ms) for that stream using generate_traffic.sh.

Create a static consumer group named "cg" on the stream in question, with two members defined called "m1" and "m2": java -jar cg.jar static create balanced foo cg 10 '>' m1 m2

Start consuming messages with a simulated processing time of 20ms from an instance of member "m1": java -jar cg.jar static consume foo cg m1 --sleep 25ms. Run in another window cg again to consume as member m2 a second, run multiple instances of members m1 and m2, kill the active one (the one receiving messages) and watch as one of the other instances takes over.

Elastic

Create a stream 'foo' that captures messages on the subjects foo.*, then generate some traffic (a new message every 10ms) for that stream using generate_traffic.sh.

Create an elastic consumer group named "cg", partitioning over 10 partitions using the second token (first * wildcard in the filter "foo.*") in the subject as the partitioning key: java -jar cg.jar elastic create foo cg 10.

At this point the elastic consumer group is created, but no members have been added to it yet. But you can start instances of your consuming members already (e.g. java -jar cg.jar elastic consume foo cg m1 for an instance of a member "m1"), for example start instances of members "m1", "m2" and "m3". At this point none of those members are receiving messages.

Add "m1" and "m2" to the membership: java -jar cg.jar elastic add foo cg m1 m2, see how they start receiving messages. Then drop "m1" from the membership java -jar cg.jar elastic drop foo cg m1, add it again, and each time watch as the consumer starts and stops receiving messages, run another consumer "m3" and add/drop it from the membership, etc...

As soon as the elastic consumer group is created, you can start instances of consuming clients (e.g. java -jar cg.jar elastic consume foo cg m1), and they will start to consume messages as soon as (and as long as) they are in the group's membership.

Example

To start consuming from a static consumer group, you call StaticConsumerGroup.consume. To start consuming from an elastic consumer group you call ElasticConsumerGroup.consume. These calls will return a ConsumerGroupConsumeContext. Assuming no exception is thrown,this will create a few threads to handle handles consumption and monitoring for changes in the consumer group's config.

e.g. for static

ConsumerGroupConsumeContext ctx = StaticConsumerGroup.consume(nc, streamName, consumerGroupName, memberName, messageHandler, consumerConfig);

The arguments are:

  • nc is a NATS connection object.

  • streamName is the name of the Stream on which the consumer group has been created.

  • consumerGroupName is the name of the consumer group that has been created on the stream.

  • memberName is the name of the member you want to join the consumer group as.

  • messageHandler is a callback function that gets invoked and passed the messages for consumption. Note that if you are using an elastic consumer group you must explicitly acknowledge (positively or negatively) the message in your callback.

  • config is a regular JetStream consumer config to use by the library as a template when actually creating the JetStream consumers. For elastic consumers the acknowledgement policy must be explicit. For static consumer groups, it doesn't have to, but if you want to do strictly one at a time processing, you will need to use explicit acks in order for max acks pending 1 to apply. Note that this configuration being used as template means that some of the values will be overwritten and can be left empty (e.g. names and durable names, filters, priority groups) or will be overwritten. Note that there is a relationship that must be maintained between the ack wait time, the consumer fetch time out, and the pinned TTL values to avoid 'flapping' of the pinned client so those timeout and TTL values are computed from the value of AckWait passed in the ConsumerConfig, and as such the failover time between instances of the same member are affected by the AckWait value.

  • The ConsumerGroupConsumeContext returned lets you stop the consumption and get a future that completes when the consumer stops (e.g. when explicitly stopped, or when the consumer group gets deleted) or an error occurs.

You can look at the cg CLI tool's source code for examples of how to create and consume for both static and elastic consumer groups.

Requirements

Partitioned consumer groups require NATS server version 2.11 or above.


Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved. See LICENSE and NOTICE file for details.