Skip to content

[FEATURE] Support message TTL and dead letter queue #832

@GhostBoyBoy

Description

@GhostBoyBoy

reference:
1.https://www.rabbitmq.com/ttl.html
2.https://www.rabbitmq.com/dlx.html

implementation:

  1. The queue is created with the parameter set to PersistentQueue, the pulsar topic is fetched at creation, Through the topic - > managedLedger - > cursor, in a similar (cursor.asyncFindNewestMatching) API
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
                try {
                    long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
                    return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
                } catch (Exception e) {
                    log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e);
                } finally {
                    entry.release();
                }
                return false;
            }, this, null);
  1. We need a regular task to keep checking and, according to the rabbitmq ttl definition, we only need to check if the first item has expired.
  2. According to the definition of a dead letter queue, messages need to be sent to a dead letter queue when they expire. The parameters of the dead letter queue are maintained by the AmqpQueue and can be sent directly to the dead letter queue through the scheduled tasks in 2 (if configured by the user, otherwise deleted directly from the queue).
  3. When both a per-queue and a per-message TTL are specified, the lower value between the two will be chosen.

Implementation of the dead letter queue:
1.The message is negatively acknowledged by a consumer using basic.reject or basic.nack with requeue parameter set to false.
2.The message expires due to per-message TTL; or
3.The message is dropped because its queue exceeded a length limit

For 1: Implement api semantics.
For 2: As described above.
For 3:
Maximum number of messages can be set by supplying the x-max-length queue declaration argument with a non-negative integer value.
Maximum length in bytes can be set by supplying the x-max-length-bytes queue declaration argument with a non-negative integer value.
These two parameters are available in AmqpQueue,maybe we can do this with backlog.

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/featureIndicates new functionality

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions