18
18
*/
19
19
package org .apache .pulsar .client .impl ;
20
20
21
- import com .google .common .collect .Maps ;
22
21
import com .google .common .collect .Sets ;
23
- import lombok .Cleanup ;
22
+ import java .nio .charset .StandardCharsets ;
23
+ import java .util .ArrayList ;
24
+ import java .util .List ;
25
+ import java .util .Map ;
26
+ import java .util .Set ;
27
+ import java .util .UUID ;
28
+ import java .util .concurrent .ConcurrentHashMap ;
29
+ import java .util .concurrent .TimeUnit ;
30
+ import java .util .concurrent .atomic .AtomicBoolean ;
31
+ import java .util .concurrent .atomic .AtomicLong ;
24
32
import org .apache .commons .lang3 .RandomStringUtils ;
33
+ import org .apache .pulsar .client .api .BatcherBuilder ;
25
34
import org .apache .pulsar .client .api .Consumer ;
35
+ import org .apache .pulsar .client .api .ConsumerBuilder ;
36
+ import org .apache .pulsar .client .api .KeySharedPolicy ;
26
37
import org .apache .pulsar .client .api .MessageId ;
27
38
import org .apache .pulsar .client .api .Producer ;
28
39
import org .apache .pulsar .client .api .ProducerConsumerBase ;
29
- import org .apache .pulsar .client .api .PulsarClient ;
30
40
import org .apache .pulsar .client .api .PulsarClientException ;
41
+ import org .apache .pulsar .client .api .Range ;
31
42
import org .apache .pulsar .client .api .SubscriptionType ;
43
+ import org .apache .pulsar .common .util .Murmur3_32Hash ;
32
44
import org .awaitility .Awaitility ;
33
45
import org .testng .Assert ;
34
46
import org .testng .annotations .AfterMethod ;
35
47
import org .testng .annotations .BeforeMethod ;
36
48
import org .testng .annotations .DataProvider ;
37
49
import org .testng .annotations .Test ;
38
- import java .nio .charset .StandardCharsets ;
39
- import java .util .ArrayList ;
40
- import java .util .List ;
41
- import java .util .Map ;
42
- import java .util .Set ;
43
- import java .util .UUID ;
44
- import java .util .concurrent .TimeUnit ;
45
- import java .util .concurrent .atomic .AtomicBoolean ;
46
- import java .util .concurrent .atomic .AtomicLong ;
47
50
48
51
@ Test (groups = "broker-impl" )
49
52
public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@@ -70,91 +73,58 @@ public Object[][] subType() {
70
73
@ Test (dataProvider = "subType" )
71
74
public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction (SubscriptionType subscriptionType )
72
75
throws PulsarClientException {
73
- PulsarClient pulsarClient = PulsarClient .builder ().
74
- serviceUrl (lookupUrl .toString ())
75
- .build ();
76
76
final int totalMsg = 1000 ;
77
77
String topic = "broker-close-test-" + RandomStringUtils .randomAlphabetic (5 );
78
- Map <Consumer <?>, List <MessageId >> nameToId = Maps . newConcurrentMap ();
78
+ Map <Consumer <?>, List <MessageId >> nameToId = new ConcurrentHashMap <> ();
79
79
Set <MessageId > pubMessages = Sets .newConcurrentHashSet ();
80
80
Set <MessageId > recMessages = Sets .newConcurrentHashSet ();
81
81
AtomicLong lastActiveTime = new AtomicLong ();
82
82
AtomicBoolean canAcknowledgement = new AtomicBoolean (false );
83
83
84
- @ Cleanup
85
- Consumer <byte []> consumer1 = pulsarClient .newConsumer ()
86
- .topic (topic )
87
- .subscriptionName ("sub-1" )
88
- .subscriptionType (subscriptionType )
89
- .consumerName ("con-1" )
90
- .messageListener ((cons1 , msg ) -> {
91
- lastActiveTime .set (System .currentTimeMillis ());
92
- nameToId .computeIfAbsent (cons1 ,(k ) -> new ArrayList <>())
93
- .add (msg .getMessageId ());
94
- recMessages .add (msg .getMessageId ());
95
- if (canAcknowledgement .get ()) {
96
- try {
97
- cons1 .acknowledge (msg );
98
- } catch (PulsarClientException e ) {
99
- throw new RuntimeException (e );
100
- }
101
- }
102
- })
103
- .subscribe ();
104
- @ Cleanup
105
- Consumer <byte []> consumer2 = pulsarClient .newConsumer ()
106
- .topic (topic )
107
- .subscriptionName ("sub-1" )
108
- .subscriptionType (subscriptionType )
109
- .messageListener ((cons2 , msg ) -> {
110
- lastActiveTime .set (System .currentTimeMillis ());
111
- nameToId .computeIfAbsent (cons2 ,(k ) -> new ArrayList <>())
112
- .add (msg .getMessageId ());
113
- recMessages .add (msg .getMessageId ());
114
- if (canAcknowledgement .get ()) {
115
- try {
116
- cons2 .acknowledge (msg );
117
- } catch (PulsarClientException e ) {
118
- throw new RuntimeException (e );
84
+ List <Consumer <?>> consumerList = new ArrayList <>();
85
+ // create 3 consumers
86
+ for (int i = 0 ; i < 3 ; i ++) {
87
+ ConsumerBuilder <byte []> builder = pulsarClient .newConsumer ()
88
+ .topic (topic )
89
+ .subscriptionName ("sub-1" )
90
+ .subscriptionType (subscriptionType )
91
+ .messageListener ((consumer , msg ) -> {
92
+ lastActiveTime .set (System .currentTimeMillis ());
93
+ nameToId .computeIfAbsent (consumer , (k ) -> new ArrayList <>())
94
+ .add (msg .getMessageId ());
95
+ recMessages .add (msg .getMessageId ());
96
+ if (canAcknowledgement .get ()) {
97
+ try {
98
+ consumer .acknowledge (msg );
99
+ } catch (PulsarClientException e ) {
100
+ throw new RuntimeException (e );
101
+ }
119
102
}
120
- }
121
- })
122
- .consumerName ("con-2" )
123
- .subscribe ();
124
- @ Cleanup
125
- Consumer <byte []> consumer3 = pulsarClient .newConsumer ()
126
- .topic (topic )
127
- .subscriptionName ("sub-1" )
128
- .subscriptionType (subscriptionType )
129
- .messageListener ((cons3 , msg ) -> {
130
- lastActiveTime .set (System .currentTimeMillis ());
131
- nameToId .computeIfAbsent (cons3 ,(k ) -> new ArrayList <>())
132
- .add (msg .getMessageId ());
133
- recMessages .add (msg .getMessageId ());
134
- if (canAcknowledgement .get ()) {
135
- try {
136
- cons3 .acknowledge (msg );
137
- } catch (PulsarClientException e ) {
138
- throw new RuntimeException (e );
139
- }
140
- }
141
- })
142
- .consumerName ("con-3" )
143
- .subscribe ();
103
+ });
104
+
105
+ if (subscriptionType == SubscriptionType .Key_Shared ) {
106
+ // ensure every consumer can be distributed messages
107
+ int hash = Murmur3_32Hash .getInstance ().makeHash (("key-" + i ).getBytes ())
108
+ % KeySharedPolicy .DEFAULT_HASH_RANGE_SIZE ;
109
+ builder .keySharedPolicy (KeySharedPolicy .stickyHashRange ().ranges (Range .of (hash , hash )));
110
+ }
111
+
112
+ consumerList .add (builder .subscribe ());
113
+ }
144
114
145
- @ Cleanup
146
115
Producer <byte []> producer = pulsarClient .newProducer ()
147
116
.topic (topic )
148
117
.enableBatching (true )
149
118
.batchingMaxPublishDelay (1 , TimeUnit .MILLISECONDS )
150
119
// We chose 9 because the maximum unacked message is 10
151
120
.batchingMaxMessages (9 )
121
+ .batcherBuilder (BatcherBuilder .KEY_BASED )
152
122
.create ();
153
123
154
124
for (int i = 0 ; i < totalMsg ; i ++) {
155
- producer . sendAsync ( UUID .randomUUID ().toString ()
156
- . getBytes ( StandardCharsets . UTF_8 ) )
157
- .thenAccept (pubMessages ::add );
125
+ byte [] msg = UUID .randomUUID ().toString (). getBytes ( StandardCharsets . UTF_8 );
126
+ producer . newMessage (). key ( "key-" + ( i % 3 )). value ( msg )
127
+ .sendAsync (). thenAccept (pubMessages ::add );
158
128
}
159
129
160
130
// Wait for all consumers can not read more messages. the consumers are stuck by max unacked messages.
@@ -176,7 +146,7 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc
176
146
177
147
// Wait for all consumers to continue receiving messages.
178
148
Awaitility .await ()
179
- .atMost (30 , TimeUnit .SECONDS )
149
+ .atMost (15 , TimeUnit .SECONDS )
180
150
.pollDelay (5 , TimeUnit .SECONDS )
181
151
.until (() ->
182
152
(System .currentTimeMillis () - lastActiveTime .get ()) > TimeUnit .SECONDS .toMillis (5 ));
@@ -186,5 +156,11 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc
186
156
Assert .assertEquals (pubMessages .size (), totalMsg );
187
157
Assert .assertEquals (pubMessages .size (), recMessages .size ());
188
158
Assert .assertTrue (recMessages .containsAll (pubMessages ));
159
+
160
+ // cleanup
161
+ producer .close ();
162
+ for (Consumer <?> consumer : consumerList ) {
163
+ consumer .close ();
164
+ }
189
165
}
190
166
}
0 commit comments