Skip to content

Commit 13abfd1

Browse files
committed
Consider null values in empty StreamPendingSummary (#3793)
* Consider null values in empty StreamPendingSummary * java stream
1 parent d175ce0 commit 13abfd1

File tree

2 files changed

+49
-34
lines changed

2 files changed

+49
-34
lines changed

src/main/java/redis/clients/jedis/BuilderFactory.java

+8-9
Original file line numberDiff line numberDiff line change
@@ -1486,15 +1486,14 @@ public StreamPendingSummary build(Object data) {
14861486
}
14871487

14881488
List<Object> objectList = (List<Object>) data;
1489-
long total = BuilderFactory.LONG.build(objectList.get(0));
1490-
String minId = SafeEncoder.encode((byte[]) objectList.get(1));
1491-
String maxId = SafeEncoder.encode((byte[]) objectList.get(2));
1492-
List<List<Object>> consumerObjList = (List<List<Object>>) objectList.get(3);
1493-
Map<String, Long> map = new HashMap<>(consumerObjList.size());
1494-
for (List<Object> consumerObj : consumerObjList) {
1495-
map.put(SafeEncoder.encode((byte[]) consumerObj.get(0)), Long.parseLong(SafeEncoder.encode((byte[]) consumerObj.get(1))));
1496-
}
1497-
return new StreamPendingSummary(total, new StreamEntryID(minId), new StreamEntryID(maxId), map);
1489+
long total = LONG.build(objectList.get(0));
1490+
StreamEntryID minId = STREAM_ENTRY_ID.build(objectList.get(1));
1491+
StreamEntryID maxId = STREAM_ENTRY_ID.build(objectList.get(2));
1492+
Map<String, Long> map = objectList.get(3) == null ? null
1493+
: ((List<List<Object>>) objectList.get(3)).stream().collect(
1494+
Collectors.toMap(pair -> STRING.build(pair.get(0)),
1495+
pair -> Long.parseLong(STRING.build(pair.get(1)))));
1496+
return new StreamPendingSummary(total, minId, maxId, map);
14981497
}
14991498

15001499
@Override

src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java

+41-25
Original file line numberDiff line numberDiff line change
@@ -523,14 +523,22 @@ public void xack() {
523523

524524
@Test
525525
public void xpendingWithParams() {
526+
final String stream = "xpendeing-stream";
527+
528+
assertEquals("OK", jedis.xgroupCreate(stream, "xpendeing-group", null, true));
529+
530+
// Get the summary from empty stream
531+
StreamPendingSummary emptySummary = jedis.xpending(stream, "xpendeing-group");
532+
assertEquals(0, emptySummary.getTotal());
533+
assertNull(emptySummary.getMinId());
534+
assertNull(emptySummary.getMaxId());
535+
assertNull(emptySummary.getConsumerMessageCount());
536+
526537
Map<String, String> map = new HashMap<>();
527538
map.put("f1", "v1");
528-
StreamEntryID id1 = jedis.xadd("xpendeing-stream", (StreamEntryID) null, map);
539+
StreamEntryID id1 = jedis.xadd(stream, (StreamEntryID) null, map);
529540

530-
assertEquals("OK", jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false));
531-
532-
Map<String, StreamEntryID> streamQeury1 = singletonMap(
533-
"xpendeing-stream", StreamEntryID.UNRECEIVED_ENTRY);
541+
Map<String, StreamEntryID> streamQeury1 = singletonMap(stream, StreamEntryID.UNRECEIVED_ENTRY);
534542

535543
// Read the event from Stream put it on pending
536544
List<Entry<String, List<StreamEntry>>> range = jedis.xreadGroup("xpendeing-group",
@@ -539,8 +547,14 @@ public void xpendingWithParams() {
539547
assertEquals(1, range.get(0).getValue().size());
540548
assertEquals(map, range.get(0).getValue().get(0).getFields());
541549

550+
// Get the summary about the pending messages
551+
StreamPendingSummary pendingSummary = jedis.xpending(stream, "xpendeing-group");
552+
assertEquals(1, pendingSummary.getTotal());
553+
assertEquals(id1, pendingSummary.getMinId());
554+
assertEquals(1l, pendingSummary.getConsumerMessageCount().get("xpendeing-consumer").longValue());
555+
542556
// Get the pending event
543-
List<StreamPendingEntry> pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group",
557+
List<StreamPendingEntry> pendingRange = jedis.xpending(stream, "xpendeing-group",
544558
new XPendingParams().count(3).consumer("xpendeing-consumer"));
545559
assertEquals(1, pendingRange.size());
546560
assertEquals(id1, pendingRange.get(0).getID());
@@ -549,41 +563,41 @@ public void xpendingWithParams() {
549563
assertTrue(pendingRange.get(0).toString().contains("xpendeing-consumer"));
550564

551565
// Without consumer
552-
pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group", new XPendingParams().count(3));
566+
pendingRange = jedis.xpending(stream, "xpendeing-group", new XPendingParams().count(3));
553567
assertEquals(1, pendingRange.size());
554568
assertEquals(id1, pendingRange.get(0).getID());
555569
assertEquals(1, pendingRange.get(0).getDeliveredTimes());
556570
assertEquals("xpendeing-consumer", pendingRange.get(0).getConsumerName());
557571

558572
// with idle
559-
pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group",
573+
pendingRange = jedis.xpending(stream, "xpendeing-group",
560574
new XPendingParams().idle(Duration.ofMinutes(1).toMillis()).count(3));
561575
assertEquals(0, pendingRange.size());
562576
}
563577

564578
@Test
565579
public void xpendingRange() {
580+
final String stream = "xpendeing-stream";
566581
Map<String, String> map = new HashMap<>();
567582
map.put("foo", "bar");
568-
StreamEntryID m1 = jedis.xadd("xpendeing-stream", (StreamEntryID) null, map);
569-
StreamEntryID m2 = jedis.xadd("xpendeing-stream", (StreamEntryID) null, map);
570-
jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false);
583+
StreamEntryID m1 = jedis.xadd(stream, (StreamEntryID) null, map);
584+
StreamEntryID m2 = jedis.xadd(stream, (StreamEntryID) null, map);
585+
jedis.xgroupCreate(stream, "xpendeing-group", null, false);
571586

572587
// read 1 message from the group with each consumer
573-
Map<String, StreamEntryID> streamQeury = singletonMap(
574-
"xpendeing-stream", StreamEntryID.UNRECEIVED_ENTRY);
588+
Map<String, StreamEntryID> streamQeury = singletonMap(stream, StreamEntryID.UNRECEIVED_ENTRY);
575589
jedis.xreadGroup("xpendeing-group", "consumer1", XReadGroupParams.xReadGroupParams().count(1), streamQeury);
576590
jedis.xreadGroup("xpendeing-group", "consumer2", XReadGroupParams.xReadGroupParams().count(1), streamQeury);
577591

578-
List<StreamPendingEntry> response = jedis.xpending("xpendeing-stream", "xpendeing-group",
592+
List<StreamPendingEntry> response = jedis.xpending(stream, "xpendeing-group",
579593
XPendingParams.xPendingParams("(0", "+", 5));
580594
assertEquals(2, response.size());
581595
assertEquals(m1, response.get(0).getID());
582596
assertEquals("consumer1", response.get(0).getConsumerName());
583597
assertEquals(m2, response.get(1).getID());
584598
assertEquals("consumer2", response.get(1).getConsumerName());
585599

586-
response = jedis.xpending("xpendeing-stream", "xpendeing-group",
600+
response = jedis.xpending(stream, "xpendeing-group",
587601
XPendingParams.xPendingParams(StreamEntryID.MINIMUM_ID, StreamEntryID.MAXIMUM_ID, 5));
588602
assertEquals(2, response.size());
589603
assertEquals(m1, response.get(0).getID());
@@ -594,18 +608,19 @@ public void xpendingRange() {
594608

595609
@Test
596610
public void xclaimWithParams() {
611+
final String stream = "xpendeing-stream";
597612
Map<String, String> map = new HashMap<>();
598613
map.put("f1", "v1");
599-
jedis.xadd("xpendeing-stream", (StreamEntryID) null, map);
614+
jedis.xadd(stream, (StreamEntryID) null, map);
600615

601-
assertEquals("OK", jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false));
616+
assertEquals("OK", jedis.xgroupCreate(stream, "xpendeing-group", null, false));
602617

603618
// Read the event from Stream put it on pending
604619
jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
605-
singletonMap("xpendeing-stream", StreamEntryID.UNRECEIVED_ENTRY));
620+
singletonMap(stream, StreamEntryID.UNRECEIVED_ENTRY));
606621

607622
// Get the pending event
608-
List<StreamPendingEntry> pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group",
623+
List<StreamPendingEntry> pendingRange = jedis.xpending(stream, "xpendeing-group",
609624
null, null, 3, "xpendeing-consumer");
610625
// Sleep for 100ms so we can claim events pending for more than 50ms
611626
try {
@@ -614,7 +629,7 @@ public void xclaimWithParams() {
614629
e.printStackTrace();
615630
}
616631

617-
List<StreamEntry> streamEntrys = jedis.xclaim("xpendeing-stream", "xpendeing-group",
632+
List<StreamEntry> streamEntrys = jedis.xclaim(stream, "xpendeing-group",
618633
"xpendeing-consumer2", 50, XClaimParams.xClaimParams().idle(0).retryCount(0),
619634
pendingRange.get(0).getID());
620635
assertEquals(1, streamEntrys.size());
@@ -624,18 +639,19 @@ public void xclaimWithParams() {
624639

625640
@Test
626641
public void xclaimJustId() {
642+
final String stream = "xpendeing-stream";
627643
Map<String, String> map = new HashMap<>();
628644
map.put("f1", "v1");
629-
jedis.xadd("xpendeing-stream", (StreamEntryID) null, map);
645+
jedis.xadd(stream, (StreamEntryID) null, map);
630646

631-
assertEquals("OK", jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false));
647+
assertEquals("OK", jedis.xgroupCreate(stream, "xpendeing-group", null, false));
632648

633649
// Read the event from Stream put it on pending
634650
jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1),
635-
singletonMap("xpendeing-stream", StreamEntryID.UNRECEIVED_ENTRY));
651+
singletonMap(stream, StreamEntryID.UNRECEIVED_ENTRY));
636652

637653
// Get the pending event
638-
List<StreamPendingEntry> pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group",
654+
List<StreamPendingEntry> pendingRange = jedis.xpending(stream, "xpendeing-group",
639655
null, null, 3, "xpendeing-consumer");
640656
// Sleep for 100ms so we can claim events pending for more than 50ms
641657
try {
@@ -644,7 +660,7 @@ public void xclaimJustId() {
644660
e.printStackTrace();
645661
}
646662

647-
List<StreamEntryID> streamEntryIDS = jedis.xclaimJustId("xpendeing-stream", "xpendeing-group",
663+
List<StreamEntryID> streamEntryIDS = jedis.xclaimJustId(stream, "xpendeing-group",
648664
"xpendeing-consumer2", 50, XClaimParams.xClaimParams().idle(0).retryCount(0),
649665
pendingRange.get(0).getID());
650666
assertEquals(1, streamEntryIDS.size());

0 commit comments

Comments
 (0)