Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] Message payload processor PIP-96 #1011

Closed
wants to merge 1 commit into from
Closed

[feature] Message payload processor PIP-96 #1011

wants to merge 1 commit into from

Conversation

ksankeerth
Copy link

Hi Team,

This PR introduces message payload processor feature to Golang client. (PIP-96)

  • added support for custom payload processor
  • added default payload processor
  • added a byte slice reader to process messages similar to Java Client

(In Java, the methods for reading messages are implemented as part of MessagePayloadContext https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadContext.java#L65-#L79
In this PR, I used MessagePayloadContext just to carry the information and separated the above methods into a reader.)

  • added tests for pulsar message format and custom message format similar
    to java implementation

Fixes #962

Motivation

As per PIP-96 proposal, it adds capabilities to process messages by the client using pluggable message processor. Same functionalities are implemented for Golang client.

Modifications

  • In Java Client implementation, checksum is only validated if the first bytes matches with magic number. However, the current Golang client always try to verify the checksum and return errors if checksum is not present. Due to this reasons, If we KoP and Kafka entry.format, We'll get errors when parsing MessageMeta. To avoid that, Checksum validation was separated from reading messages (We can log precise errors) and implemented similar logic we have in Java Client.

  • A few unexposed struct and APIs were exposed to the client. Changed the visibility as the client needs to use them when processing messages.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • added tests with Default Payload processor. It'll process pulsar messages
  • added tests with custom payload processor for different entry format.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (yes, not breaking changes)
  1. New field in ConsumerOptions (MessagePayloadProcessor)
  consumer, err := client.Subscribe(ConsumerOptions{
  	Topic:                   topic,
  	SubscriptionName:        "my-sub",
  	Type:                    Exclusive,
  	MessagePayloadProcessor: DefaultPayloadProcessor{},
  })
  1. previously message visibility was limited. It has changed

Old

type message struct {

New

type MessageImpl struct {
	publishTime         time.Time
  • The schema: (no )
  • The default values of configurations: (no)
  • The wire protocol: (no)

Documentation

This PR adds new feature. I have added Godoc comments in code. If additional docs/separate PR required, I could contribute.

    - added support for custom payload processor
    - added default payload processor for batch and single messages
    - added test for pulsar message format and custom message format similar
      to java implementation
@ksankeerth
Copy link
Author

Hi Team,

I have fixed one test that was failing and addressed lint issues in the codebase. However, there is still another test that is failing in the latest CI run:

https://github.com/apache/pulsar-client-go/actions/runs/4721764498/jobs/8446233403?pr=1011#step:5:9785

I am working on fixing this test as well, but I wanted to bring to your attention that it is also failing on the upstream master branch(when testing locally).I haven't made any changes to the code related to this test.

Could you please assist me in identifying the root cause of this failure? I appreciate your help.

--- FAIL: TestBatchIndexAck (1.00s)
    --- FAIL: TestBatchIndexAck/TestBatchIndexAck_WithResponse_true_Cumulative_true_AckGroupingOption_1000_10 (0.10s)
    --- FAIL: TestBatchIndexAck/TestBatchIndexAck_WithResponse_true_Cumulative_false_AckGroupingOption_1000_10 (0.08s)
    --- FAIL: TestBatchIndexAck/TestBatchIndexAck_WithResponse_false_Cumulative_true_AckGroupingOption_1000_10 (0.08s)
    --- FAIL: TestBatchIndexAck/TestBatchIndexAck_WithResponse_false_Cumulative_false_AckGroupingOption_1000_10 (0.08s)
    --- FAIL: TestBatchIndexAck/TestBatchIndexAck_WithResponse_true_Cumulative_true_AckGroupingOption_0_0 (0.08s)
    --- FAIL: TestBatchIndexAck/TestBatchIndexAck_WithResponse_true_Cumulative_false_AckGroupingOption_0_0 (0.08s)
    --- FAIL: TestBatchIndexAck/TestBatchIndexAck_WithResponse_false_Cumulative_true_AckGroupingOption_0_0 (0.09s)
    --- FAIL: TestBatchIndexAck/TestBatchIndexAck_WithResponse_false_Cumulative_false_AckGroupingOption_0_0 (0.08s)
    --- FAIL: TestBatchIndexAck/TestBatchIndexAck_WithResponse_true_Cumulative_true_AckGroupingOption_1000_0 (0.08s)
    --- FAIL: TestBatchIndexAck/TestBatchIndexAck_WithResponse_true_Cumulative_false_AckGroupingOption_1000_0 (0.08s)
    --- FAIL: TestBatchIndexAck/TestBatchIndexAck_WithResponse_false_Cumulative_true_AckGroupingOption_1000_0 (0.09s)
    --- FAIL: TestBatchIndexAck/TestBatchIndexAck_WithResponse_false_Cumulative_false_AckGroupingOption_1000_0 (0.08s)

Thanks in advance!

@RobertIndie RobertIndie self-requested a review May 11, 2023 06:59
@RobertIndie RobertIndie added this to the v0.11.0 milestone May 11, 2023
@RobertIndie
Copy link
Member

I am working on fixing this test as well, but I wanted to bring to your attention that it is also failing on the upstream master branch(when testing locally).I haven't made any changes to the code related to this test.

@ksankeerth
I can't reproduce the issue on the master branch. Did you turn on acknowledgmentAtBatchIndexLevelEnabled for the broker?

@RobertIndie RobertIndie modified the milestones: v0.11.0, v0.12.0 Jul 4, 2023
@RobertIndie RobertIndie modified the milestones: v0.12.0, v0.13.0 Jan 10, 2024
@ksankeerth ksankeerth closed this by deleting the head repository Apr 6, 2024
@RobertIndie RobertIndie removed this from the v0.13.0 milestone Jun 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[feature] Message payload processor
2 participants