diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java
index 32d341e6134..709efcdc185 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java
@@ -5,13 +5,14 @@
import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
import static com.provectus.kafka.ui.serde.api.Serde.Target.KEY;
import static com.provectus.kafka.ui.serde.api.Serde.Target.VALUE;
-import static java.util.stream.Collectors.toMap;
import com.provectus.kafka.ui.api.MessagesApi;
-import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
+import com.provectus.kafka.ui.model.MessageFilterIdDTO;
+import com.provectus.kafka.ui.model.MessageFilterRegistrationDTO;
import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
import com.provectus.kafka.ui.model.SeekDirectionDTO;
import com.provectus.kafka.ui.model.SeekTypeDTO;
import com.provectus.kafka.ui.model.SerdeUsageDTO;
@@ -25,14 +26,11 @@
import com.provectus.kafka.ui.service.DeserializationService;
import com.provectus.kafka.ui.service.MessagesService;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import javax.annotation.Nullable;
import javax.validation.Valid;
+import javax.validation.ValidationException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.kafka.common.TopicPartition;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
@@ -76,6 +74,7 @@ public Mono> executeSmartFilte
.map(ResponseEntity::ok);
}
+ @Deprecated
@Override
public Mono>> getTopicMessages(String clusterName,
String topicName,
@@ -88,6 +87,23 @@ public Mono>> getTopicMessages(String
String keySerde,
String valueSerde,
ServerWebExchange exchange) {
+ throw new ValidationException("Not supported");
+ }
+
+
+ @Override
+ public Mono>> getTopicMessagesV2(String clusterName, String topicName,
+ PollingModeDTO mode,
+ List partitions,
+ Integer limit,
+ String stringFilter,
+ String smartFilterId,
+ Long offset,
+ Long timestamp,
+ String keySerde,
+ String valueSerde,
+ String cursor,
+ ServerWebExchange exchange) {
var contextBuilder = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
@@ -98,27 +114,26 @@ public Mono>> getTopicMessages(String
contextBuilder.auditActions(AuditAction.VIEW);
}
- seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
- seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
- filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
-
- var positions = new ConsumerPosition(
- seekType,
- topicName,
- parseSeekTo(topicName, seekType, seekTo)
- );
- Mono>> job = Mono.just(
- ResponseEntity.ok(
- messagesService.loadMessages(
- getCluster(clusterName), topicName, positions, q, filterQueryType,
- limit, seekDirection, keySerde, valueSerde)
- )
- );
-
- var context = contextBuilder.build();
- return validateAccess(context)
- .then(job)
- .doOnEach(sig -> audit(context, sig));
+ var accessContext = contextBuilder.build();
+
+ Flux messagesFlux;
+ if (cursor != null) {
+ messagesFlux = messagesService.loadMessages(getCluster(clusterName), topicName, cursor);
+ } else {
+ messagesFlux = messagesService.loadMessages(
+ getCluster(clusterName),
+ topicName,
+ ConsumerPosition.create(mode, topicName, partitions, timestamp, offset),
+ stringFilter,
+ smartFilterId,
+ limit,
+ keySerde,
+ valueSerde
+ );
+ }
+ return accessControlService.validateAccess(accessContext)
+ .then(Mono.just(ResponseEntity.ok(messagesFlux)))
+ .doOnEach(sig -> auditService.audit(accessContext, sig));
}
@Override
@@ -140,34 +155,6 @@ public Mono> sendTopicMessages(
).doOnEach(sig -> audit(context, sig));
}
- /**
- * The format is [partition]::[offset] for specifying offsets
- * or [partition]::[timestamp in millis] for specifying timestamps.
- */
- @Nullable
- private Map parseSeekTo(String topic, SeekTypeDTO seekType, List seekTo) {
- if (seekTo == null || seekTo.isEmpty()) {
- if (seekType == SeekTypeDTO.LATEST || seekType == SeekTypeDTO.BEGINNING) {
- return null;
- }
- throw new ValidationException("seekTo should be set if seekType is " + seekType);
- }
- return seekTo.stream()
- .map(p -> {
- String[] split = p.split("::");
- if (split.length != 2) {
- throw new IllegalArgumentException(
- "Wrong seekTo argument format. See API docs for details");
- }
-
- return Pair.of(
- new TopicPartition(topic, Integer.parseInt(split[0])),
- Long.parseLong(split[1])
- );
- })
- .collect(toMap(Pair::getKey, Pair::getValue));
- }
-
@Override
public Mono> getSerdes(String clusterName,
String topicName,
@@ -195,7 +182,20 @@ public Mono> getSerdes(String clusterNam
);
}
+ @Override
+ public Mono> registerFilter(String clusterName,
+ String topicName,
+ Mono registration,
+ ServerWebExchange exchange) {
+ final Mono validateAccess = accessControlService.validateAccess(AccessContext.builder()
+ .cluster(clusterName)
+ .topic(topicName)
+ .topicActions(MESSAGES_READ)
+ .build());
-
+ return validateAccess.then(registration)
+ .map(reg -> messagesService.registerMessageFilter(reg.getFilterCode()))
+ .map(id -> ResponseEntity.ok(new MessageFilterIdDTO().id(id)));
+ }
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java
index ec576a1d1a6..21ef0b43adb 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java
@@ -1,6 +1,7 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import jakarta.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@@ -21,12 +22,14 @@ protected PolledRecords poll(FluxSink sink, EnhancedConsum
return records;
}
- protected boolean sendLimitReached() {
+ protected boolean isSendLimitReached() {
return messagesProcessing.limitReached();
}
- protected void send(FluxSink sink, Iterable> records) {
- messagesProcessing.send(sink, records);
+ protected void send(FluxSink sink,
+ Iterable> records,
+ @Nullable Cursor.Tracking cursor) {
+ messagesProcessing.send(sink, records, cursor);
}
protected void sendPhase(FluxSink sink, String name) {
@@ -37,8 +40,9 @@ protected void sendConsuming(FluxSink sink, PolledRecords
messagesProcessing.sentConsumingInfo(sink, records);
}
- protected void sendFinishStatsAndCompleteSink(FluxSink sink) {
- messagesProcessing.sendFinishEvent(sink);
+ // cursor is null if target partitions were fully polled (no, need to do paging)
+ protected void sendFinishStatsAndCompleteSink(FluxSink sink, @Nullable Cursor.Tracking cursor) {
+ messagesProcessing.sendFinishEvents(sink, cursor);
sink.complete();
}
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardEmitter.java
index cdc45336e46..75aa21bdf83 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardEmitter.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardEmitter.java
@@ -18,18 +18,15 @@ public BackwardEmitter(Supplier consumerSupplier,
int messagesPerPage,
ConsumerRecordDeserializer deserializer,
Predicate filter,
- PollingSettings pollingSettings) {
+ PollingSettings pollingSettings,
+ Cursor.Tracking cursor) {
super(
consumerSupplier,
consumerPosition,
messagesPerPage,
- new MessagesProcessing(
- deserializer,
- filter,
- false,
- messagesPerPage
- ),
- pollingSettings
+ new MessagesProcessing(deserializer, filter, false, messagesPerPage),
+ pollingSettings,
+ cursor
);
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java
index b0737e1cb9c..17b519434b4 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java
@@ -2,6 +2,8 @@
import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.model.TopicMessageNextPageCursorDTO;
+import javax.annotation.Nullable;
import reactor.core.publisher.FluxSink;
class ConsumingStats {
@@ -26,10 +28,15 @@ void incFilterApplyError() {
filterApplyErrors++;
}
- void sendFinishEvent(FluxSink sink) {
+ void sendFinishEvent(FluxSink sink, @Nullable Cursor.Tracking cursor) {
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.DONE)
+ .cursor(
+ cursor != null
+ ? new TopicMessageNextPageCursorDTO().id(cursor.registerCursor())
+ : null
+ )
.consuming(createConsumingStats())
);
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/Cursor.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/Cursor.java
new file mode 100644
index 00000000000..f0fd135bacf
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/Cursor.java
@@ -0,0 +1,90 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.PollingModeDTO;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.kafka.common.TopicPartition;
+
+public record Cursor(ConsumerRecordDeserializer deserializer,
+ ConsumerPosition consumerPosition,
+ Predicate filter,
+ int limit) {
+
+ public static class Tracking {
+ private final ConsumerRecordDeserializer deserializer;
+ private final ConsumerPosition originalPosition;
+ private final Predicate filter;
+ private final int limit;
+ private final Function registerAction;
+
+ //topic -> partition -> offset
+ private final Table trackingOffsets = HashBasedTable.create();
+
+ public Tracking(ConsumerRecordDeserializer deserializer,
+ ConsumerPosition originalPosition,
+ Predicate filter,
+ int limit,
+ Function registerAction) {
+ this.deserializer = deserializer;
+ this.originalPosition = originalPosition;
+ this.filter = filter;
+ this.limit = limit;
+ this.registerAction = registerAction;
+ }
+
+ void trackOffset(String topic, int partition, long offset) {
+ trackingOffsets.put(topic, partition, offset);
+ }
+
+ void initOffsets(Map initialSeekOffsets) {
+ initialSeekOffsets.forEach((tp, off) -> trackOffset(tp.topic(), tp.partition(), off));
+ }
+
+ private Map getOffsetsMap(int offsetToAdd) {
+ Map result = new HashMap<>();
+ trackingOffsets.rowMap()
+ .forEach((topic, partsMap) ->
+ partsMap.forEach((p, off) -> result.put(new TopicPartition(topic, p), off + offsetToAdd)));
+ return result;
+ }
+
+ String registerCursor() {
+ return registerAction.apply(
+ new Cursor(
+ deserializer,
+ new ConsumerPosition(
+ switch (originalPosition.pollingMode()) {
+ case TO_OFFSET, TO_TIMESTAMP, LATEST -> PollingModeDTO.TO_OFFSET;
+ case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> PollingModeDTO.FROM_OFFSET;
+ case TAILING -> throw new IllegalStateException();
+ },
+ originalPosition.topic(),
+ originalPosition.partitions(),
+ null,
+ new ConsumerPosition.Offsets(
+ null,
+ getOffsetsMap(
+ switch (originalPosition.pollingMode()) {
+ case TO_OFFSET, TO_TIMESTAMP, LATEST -> 0;
+ // when doing forward polling we need to start from latest msg's offset + 1
+ case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> 1;
+ case TAILING -> throw new IllegalStateException();
+ }
+ )
+ )
+ ),
+ filter,
+ limit
+ )
+ );
+ }
+ }
+
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardEmitter.java
index 5c915fb2e8c..6627bc45c10 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardEmitter.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardEmitter.java
@@ -18,18 +18,15 @@ public ForwardEmitter(Supplier consumerSupplier,
int messagesPerPage,
ConsumerRecordDeserializer deserializer,
Predicate filter,
- PollingSettings pollingSettings) {
+ PollingSettings pollingSettings,
+ Cursor.Tracking cursor) {
super(
consumerSupplier,
consumerPosition,
messagesPerPage,
- new MessagesProcessing(
- deserializer,
- filter,
- true,
- messagesPerPage
- ),
- pollingSettings
+ new MessagesProcessing(deserializer, filter, true, messagesPerPage),
+ pollingSettings,
+ cursor
);
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java
index 6e9f8a8bbe3..44f372474c6 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java
@@ -1,7 +1,6 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.exception.ValidationException;
-import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import groovy.json.JsonSlurper;
import java.util.function.Predicate;
@@ -22,23 +21,16 @@ public class MessageFilters {
private MessageFilters() {
}
- public static Predicate createMsgFilter(String query, MessageFilterTypeDTO type) {
- switch (type) {
- case STRING_CONTAINS:
- return containsStringFilter(query);
- case GROOVY_SCRIPT:
- return groovyScriptFilter(query);
- default:
- throw new IllegalStateException("Unknown query type: " + type);
- }
+ public static Predicate noop() {
+ return e -> true;
}
- static Predicate containsStringFilter(String string) {
+ public static Predicate containsStringFilter(String string) {
return msg -> StringUtils.contains(msg.getKey(), string)
|| StringUtils.contains(msg.getContent(), string);
}
- static Predicate groovyScriptFilter(String script) {
+ public static Predicate groovyScriptFilter(String script) {
var engine = getGroovyEngine();
var compiledScript = compileScript(engine, script);
var jsonSlurper = new JsonSlurper();
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java
index df8505a2e9a..8b8332e0398 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java
@@ -39,7 +39,9 @@ boolean limitReached() {
return limit != null && sentMessages >= limit;
}
- void send(FluxSink sink, Iterable> polled) {
+ void send(FluxSink sink,
+ Iterable> polled,
+ @Nullable Cursor.Tracking cursor) {
sortForSending(polled, ascendingSortBeforeSend)
.forEach(rec -> {
if (!limitReached() && !sink.isCancelled()) {
@@ -53,6 +55,9 @@ void send(FluxSink sink, Iterable sink, PolledRecords polled
}
}
- void sendFinishEvent(FluxSink sink) {
+ void sendFinishEvents(FluxSink sink, @Nullable Cursor.Tracking cursor) {
if (!sink.isCancelled()) {
- consumingStats.sendFinishEvent(sink);
+ consumingStats.sendFinishEvent(sink, cursor);
}
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java
index 24e2a0aa889..a8d8e6a8911 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java
@@ -1,6 +1,7 @@
package com.provectus.kafka.ui.emitter;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
@@ -62,4 +63,8 @@ long summaryOffsetsRange() {
return cnt.getValue();
}
+ public Set allTargetPartitions() {
+ return Sets.union(nonEmptyPartitions, emptyPartitions);
+ }
+
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PolledRecords.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PolledRecords.java
index bc6bd95d5f6..94169f1b634 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PolledRecords.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PolledRecords.java
@@ -3,6 +3,7 @@
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
@@ -32,6 +33,10 @@ public Iterator> iterator() {
return records.iterator();
}
+ public Set partitions() {
+ return records.partitions();
+ }
+
private static int calculatePolledRecSize(Iterable> recs) {
int polledBytes = 0;
for (ConsumerRecord rec : recs) {
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/RangePollingEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/RangePollingEmitter.java
index af6dc7d0693..8abcd4772e4 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/RangePollingEmitter.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/RangePollingEmitter.java
@@ -17,6 +17,7 @@
abstract class RangePollingEmitter extends AbstractEmitter {
private final Supplier consumerSupplier;
+ private final Cursor.Tracking cursor;
protected final ConsumerPosition consumerPosition;
protected final int messagesPerPage;
@@ -24,11 +25,13 @@ protected RangePollingEmitter(Supplier consumerSupplier,
ConsumerPosition consumerPosition,
int messagesPerPage,
MessagesProcessing messagesProcessing,
- PollingSettings pollingSettings) {
+ PollingSettings pollingSettings,
+ Cursor.Tracking cursor) {
super(messagesProcessing, pollingSettings);
this.consumerPosition = consumerPosition;
this.messagesPerPage = messagesPerPage;
this.consumerSupplier = consumerSupplier;
+ this.cursor = cursor;
}
protected record FromToOffset(/*inclusive*/ long from, /*exclusive*/ long to) {
@@ -46,18 +49,20 @@ public void accept(FluxSink sink) {
try (EnhancedConsumer consumer = consumerSupplier.get()) {
sendPhase(sink, "Consumer created");
var seekOperations = SeekOperations.create(consumer, consumerPosition);
+ cursor.initOffsets(seekOperations.getOffsetsForSeek());
+
TreeMap pollRange = nextPollingRange(new TreeMap<>(), seekOperations);
log.debug("Starting from offsets {}", pollRange);
- while (!sink.isCancelled() && !pollRange.isEmpty() && !sendLimitReached()) {
+ while (!sink.isCancelled() && !pollRange.isEmpty() && !isSendLimitReached()) {
var polled = poll(consumer, sink, pollRange);
- send(sink, polled);
+ send(sink, polled, cursor);
pollRange = nextPollingRange(pollRange, seekOperations);
}
if (sink.isCancelled()) {
log.debug("Polling finished due to sink cancellation");
}
- sendFinishStatsAndCompleteSink(sink);
+ sendFinishStatsAndCompleteSink(sink, pollRange.isEmpty() ? null : cursor);
log.debug("Polling finished");
} catch (InterruptException kafkaInterruptException) {
log.debug("Polling finished due to thread interruption");
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java
index 4de027bdb23..8fa2cfeb0bb 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java
@@ -1,13 +1,13 @@
package com.provectus.kafka.ui.emitter;
+import static com.provectus.kafka.ui.model.PollingModeDTO.TO_TIMESTAMP;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.provectus.kafka.ui.model.ConsumerPosition;
-import com.provectus.kafka.ui.model.SeekTypeDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
import java.util.HashMap;
import java.util.Map;
-import java.util.stream.Collectors;
-import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.mutable.MutableLong;
@@ -22,17 +22,11 @@ public class SeekOperations {
private final Map offsetsForSeek; //only contains non-empty partitions!
public static SeekOperations create(Consumer consumer, ConsumerPosition consumerPosition) {
- OffsetsInfo offsetsInfo;
- if (consumerPosition.getSeekTo() == null) {
- offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getTopic());
- } else {
- offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getSeekTo().keySet());
- }
- return new SeekOperations(
- consumer,
- offsetsInfo,
- getOffsetsForSeek(consumer, offsetsInfo, consumerPosition.getSeekType(), consumerPosition.getSeekTo())
- );
+ OffsetsInfo offsetsInfo = consumerPosition.partitions().isEmpty()
+ ? new OffsetsInfo(consumer, consumerPosition.topic())
+ : new OffsetsInfo(consumer, consumerPosition.partitions());
+ var offsetsToSeek = getOffsetsForSeek(consumer, offsetsInfo, consumerPosition);
+ return new SeekOperations(consumer, offsetsInfo, offsetsToSeek);
}
public void assignAndSeekNonEmptyPartitions() {
@@ -75,27 +69,26 @@ public Map getOffsetsForSeek() {
@VisibleForTesting
static Map getOffsetsForSeek(Consumer consumer,
OffsetsInfo offsetsInfo,
- SeekTypeDTO seekType,
- @Nullable Map seekTo) {
- switch (seekType) {
- case LATEST:
- return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
- case BEGINNING:
- return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
- case OFFSET:
- Preconditions.checkNotNull(seekTo);
- return fixOffsets(offsetsInfo, seekTo);
- case TIMESTAMP:
- Preconditions.checkNotNull(seekTo);
- return offsetsForTimestamp(consumer, offsetsInfo, seekTo);
- default:
- throw new IllegalStateException();
- }
+ ConsumerPosition position) {
+ return switch (position.pollingMode()) {
+ case TAILING -> consumer.endOffsets(offsetsInfo.allTargetPartitions());
+ case LATEST -> consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
+ case EARLIEST -> consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
+ case FROM_OFFSET, TO_OFFSET -> fixOffsets(offsetsInfo, requireNonNull(position.offsets()));
+ case FROM_TIMESTAMP, TO_TIMESTAMP ->
+ offsetsForTimestamp(consumer, position.pollingMode(), offsetsInfo, requireNonNull(position.timestamp()));
+ };
}
- private static Map fixOffsets(OffsetsInfo offsetsInfo, Map offsets) {
- offsets = new HashMap<>(offsets);
- offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
+ private static Map fixOffsets(OffsetsInfo offsetsInfo,
+ ConsumerPosition.Offsets positionOffset) {
+ var offsets = new HashMap();
+ if (positionOffset.offset() != null) {
+ offsetsInfo.getNonEmptyPartitions().forEach(tp -> offsets.put(tp, positionOffset.offset()));
+ } else {
+ offsets.putAll(requireNonNull(positionOffset.tpOffsets()));
+ offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
+ }
Map result = new HashMap<>();
offsets.forEach((tp, targetOffset) -> {
@@ -112,13 +105,25 @@ private static Map fixOffsets(OffsetsInfo offsetsInfo, Map
return result;
}
- private static Map offsetsForTimestamp(Consumer consumer, OffsetsInfo offsetsInfo,
- Map timestamps) {
- timestamps = new HashMap<>(timestamps);
- timestamps.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
+ private static Map offsetsForTimestamp(Consumer consumer,
+ PollingModeDTO pollingMode,
+ OffsetsInfo offsetsInfo,
+ Long timestamp) {
+ Map timestamps = new HashMap<>();
+ offsetsInfo.getNonEmptyPartitions().forEach(tp -> timestamps.put(tp, timestamp));
- return consumer.offsetsForTimes(timestamps).entrySet().stream()
- .filter(e -> e.getValue() != null)
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
+ Map result = new HashMap<>();
+ consumer.offsetsForTimes(timestamps).forEach((tp, offsetAndTimestamp) -> {
+ if (offsetAndTimestamp == null) {
+ if (pollingMode == TO_TIMESTAMP && offsetsInfo.getNonEmptyPartitions().contains(tp)) {
+ // if no offset was returned this means that *all* timestamps are lower
+ // than target timestamp. Is case of TO_OFFSET mode we need to read from the ending of tp
+ result.put(tp, offsetsInfo.getEndOffsets().get(tp));
+ }
+ } else {
+ result.put(tp, offsetAndTimestamp.offset());
+ }
+ });
+ return result;
}
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java
index c3f04fe8cc2..dd73f743710 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java
@@ -35,7 +35,7 @@ public void accept(FluxSink sink) {
while (!sink.isCancelled()) {
sendPhase(sink, "Polling");
var polled = poll(sink, consumer);
- send(sink, polled);
+ send(sink, polled, null);
}
sink.complete();
log.debug("Tailing finished");
@@ -55,5 +55,4 @@ private void assignAndSeek(EnhancedConsumer consumer) {
consumer.assign(seekOffsets.keySet());
seekOffsets.forEach(consumer::seek);
}
-
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java
index 9d77923fbc6..51f4e51f7c6 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java
@@ -1,14 +1,72 @@
package com.provectus.kafka.ui.model;
+import com.google.common.base.Preconditions;
+import com.provectus.kafka.ui.exception.ValidationException;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
-import lombok.Value;
import org.apache.kafka.common.TopicPartition;
-@Value
-public class ConsumerPosition {
- SeekTypeDTO seekType;
- String topic;
- @Nullable
- Map seekTo; // null if positioning should apply to all tps
+public record ConsumerPosition(PollingModeDTO pollingMode,
+ String topic,
+ List partitions, //all partitions if list is empty
+ @Nullable Long timestamp,
+ @Nullable Offsets offsets) {
+
+ public record Offsets(@Nullable Long offset, //should be applied to all partitions
+ @Nullable Map tpOffsets) {
+ public Offsets {
+ // only one of properties should be set
+ Preconditions.checkArgument((offset == null && tpOffsets != null) || (offset != null && tpOffsets == null));
+ }
+ }
+
+ public static ConsumerPosition create(PollingModeDTO pollingMode,
+ String topic,
+ @Nullable List partitions,
+ @Nullable Long timestamp,
+ @Nullable Long offset) {
+ @Nullable var offsets = parseAndValidateOffsets(pollingMode, offset);
+
+ var topicPartitions = Optional.ofNullable(partitions).orElse(List.of())
+ .stream()
+ .map(p -> new TopicPartition(topic, p))
+ .collect(Collectors.toList());
+
+ // if offsets are specified - inferring partitions list from there
+ topicPartitions = (offsets != null && offsets.tpOffsets() != null)
+ ? List.copyOf(offsets.tpOffsets().keySet())
+ : topicPartitions;
+
+ return new ConsumerPosition(
+ pollingMode,
+ topic,
+ topicPartitions,
+ validateTimestamp(pollingMode, timestamp),
+ offsets
+ );
+ }
+
+ private static Long validateTimestamp(PollingModeDTO pollingMode, @Nullable Long ts) {
+ if (pollingMode == PollingModeDTO.FROM_TIMESTAMP || pollingMode == PollingModeDTO.TO_TIMESTAMP) {
+ if (ts == null) {
+ throw new ValidationException("timestamp not provided for " + pollingMode);
+ }
+ }
+ return ts;
+ }
+
+ private static Offsets parseAndValidateOffsets(PollingModeDTO pollingMode,
+ @Nullable Long offset) {
+ if (pollingMode == PollingModeDTO.FROM_OFFSET || pollingMode == PollingModeDTO.TO_OFFSET) {
+ if (offset == null) {
+ throw new ValidationException("offsets not provided for " + pollingMode);
+ }
+ return new Offsets(offset, null);
+ }
+ return null;
+ }
+
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java
index 620bd840861..b14b885c56a 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java
@@ -1,8 +1,13 @@
package com.provectus.kafka.ui.service;
+import com.google.common.base.Charsets;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.RateLimiter;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.emitter.BackwardEmitter;
+import com.provectus.kafka.ui.emitter.Cursor;
import com.provectus.kafka.ui.emitter.ForwardEmitter;
import com.provectus.kafka.ui.emitter.MessageFilters;
import com.provectus.kafka.ui.emitter.TailingEmitter;
@@ -11,12 +16,12 @@
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
-import com.provectus.kafka.ui.model.SeekDirectionDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.time.Instant;
@@ -27,12 +32,12 @@
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -50,8 +55,11 @@
@Slf4j
public class MessagesService {
+ private static final long SALT_FOR_HASHING = ThreadLocalRandom.current().nextLong();
+
private static final int DEFAULT_MAX_PAGE_SIZE = 500;
private static final int DEFAULT_PAGE_SIZE = 100;
+
// limiting UI messages rate to 20/sec in tailing mode
private static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
@@ -61,6 +69,12 @@ public class MessagesService {
private final int maxPageSize;
private final int defaultPageSize;
+ private final Cache> registeredFilters = CacheBuilder.newBuilder()
+ .maximumSize(PollingCursorsStorage.MAX_SIZE)
+ .build();
+
+ private final PollingCursorsStorage cursorsStorage = new PollingCursorsStorage();
+
public MessagesService(AdminClientService adminClientService,
DeserializationService deserializationService,
ConsumerGroupService consumerGroupService,
@@ -86,10 +100,7 @@ private Mono withExistingTopic(KafkaCluster cluster, String to
public static SmartFilterTestExecutionResultDTO execSmartFilterTest(SmartFilterTestExecutionDTO execData) {
Predicate predicate;
try {
- predicate = MessageFilters.createMsgFilter(
- execData.getFilterCode(),
- MessageFilterTypeDTO.GROOVY_SCRIPT
- );
+ predicate = MessageFilters.groovyScriptFilter(execData.getFilterCode());
} catch (Exception e) {
log.info("Smart filter '{}' compilation error", execData.getFilterCode(), e);
return new SmartFilterTestExecutionResultDTO()
@@ -197,67 +208,103 @@ public static KafkaProducer createProducer(KafkaCluster cluster,
return new KafkaProducer<>(properties);
}
- public Flux loadMessages(KafkaCluster cluster, String topic,
+ public Flux loadMessages(KafkaCluster cluster,
+ String topic,
ConsumerPosition consumerPosition,
- @Nullable String query,
- MessageFilterTypeDTO filterQueryType,
- @Nullable Integer pageSize,
- SeekDirectionDTO seekDirection,
+ @Nullable String containsStringFilter,
+ @Nullable String filterId,
+ @Nullable Integer limit,
@Nullable String keySerde,
@Nullable String valueSerde) {
+ return loadMessages(
+ cluster,
+ topic,
+ deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde),
+ consumerPosition,
+ getMsgFilter(containsStringFilter, filterId),
+ fixPageSize(limit)
+ );
+ }
+
+ public Flux loadMessages(KafkaCluster cluster, String topic, String cursorId) {
+ Cursor cursor = cursorsStorage.getCursor(cursorId)
+ .orElseThrow(() -> new ValidationException("Next page cursor not found. Maybe it was evicted from cache."));
+ return loadMessages(
+ cluster,
+ topic,
+ cursor.deserializer(),
+ cursor.consumerPosition(),
+ cursor.filter(),
+ cursor.limit()
+ );
+ }
+
+ private Flux loadMessages(KafkaCluster cluster,
+ String topic,
+ ConsumerRecordDeserializer deserializer,
+ ConsumerPosition consumerPosition,
+ Predicate filter,
+ int limit) {
return withExistingTopic(cluster, topic)
.flux()
.publishOn(Schedulers.boundedElastic())
- .flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query,
- filterQueryType, fixPageSize(pageSize), seekDirection, keySerde, valueSerde));
- }
-
- private int fixPageSize(@Nullable Integer pageSize) {
- return Optional.ofNullable(pageSize)
- .filter(ps -> ps > 0 && ps <= maxPageSize)
- .orElse(defaultPageSize);
+ .flatMap(td -> loadMessagesImpl(cluster, deserializer, consumerPosition, filter, limit));
}
private Flux loadMessagesImpl(KafkaCluster cluster,
- String topic,
+ ConsumerRecordDeserializer deserializer,
ConsumerPosition consumerPosition,
- @Nullable String query,
- MessageFilterTypeDTO filterQueryType,
- int limit,
- SeekDirectionDTO seekDirection,
- @Nullable String keySerde,
- @Nullable String valueSerde) {
-
- var deserializer = deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
- var filter = getMsgFilter(query, filterQueryType);
- var emitter = switch (seekDirection) {
- case FORWARD -> new ForwardEmitter(
+ Predicate filter,
+ int limit) {
+ var emitter = switch (consumerPosition.pollingMode()) {
+ case TO_OFFSET, TO_TIMESTAMP, LATEST -> new BackwardEmitter(
() -> consumerGroupService.createConsumer(cluster),
- consumerPosition, limit, deserializer, filter, cluster.getPollingSettings()
+ consumerPosition,
+ limit,
+ deserializer,
+ filter,
+ cluster.getPollingSettings(),
+ cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit)
);
- case BACKWARD -> new BackwardEmitter(
+ case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> new ForwardEmitter(
() -> consumerGroupService.createConsumer(cluster),
- consumerPosition, limit, deserializer, filter, cluster.getPollingSettings()
+ consumerPosition,
+ limit,
+ deserializer,
+ filter,
+ cluster.getPollingSettings(),
+ cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit)
);
case TAILING -> new TailingEmitter(
() -> consumerGroupService.createConsumer(cluster),
- consumerPosition, deserializer, filter, cluster.getPollingSettings()
+ consumerPosition,
+ deserializer,
+ filter,
+ cluster.getPollingSettings()
);
};
return Flux.create(emitter)
- .map(throttleUiPublish(seekDirection));
+ .map(throttleUiPublish(consumerPosition.pollingMode()));
}
- private Predicate getMsgFilter(String query,
- MessageFilterTypeDTO filterQueryType) {
- if (StringUtils.isEmpty(query)) {
- return evt -> true;
+ private Predicate getMsgFilter(@Nullable String containsStrFilter,
+ @Nullable String smartFilterId) {
+ Predicate messageFilter = MessageFilters.noop();
+ if (containsStrFilter != null) {
+ messageFilter = messageFilter.and(MessageFilters.containsStringFilter(containsStrFilter));
}
- return MessageFilters.createMsgFilter(query, filterQueryType);
+ if (smartFilterId != null) {
+ var registered = registeredFilters.getIfPresent(smartFilterId);
+ if (registered == null) {
+ throw new ValidationException("No filter was registered with id " + smartFilterId);
+ }
+ messageFilter = messageFilter.and(registered);
+ }
+ return messageFilter;
}
- private UnaryOperator throttleUiPublish(SeekDirectionDTO seekDirection) {
- if (seekDirection == SeekDirectionDTO.TAILING) {
+ private UnaryOperator throttleUiPublish(PollingModeDTO pollingMode) {
+ if (pollingMode == PollingModeDTO.TAILING) {
RateLimiter rateLimiter = RateLimiter.create(TAILING_UI_MESSAGE_THROTTLE_RATE);
return m -> {
rateLimiter.acquire(1);
@@ -269,4 +316,22 @@ private UnaryOperator throttleUiPublish(SeekDirectionDTO seekDirection) {
return UnaryOperator.identity();
}
+ private int fixPageSize(@Nullable Integer pageSize) {
+ return Optional.ofNullable(pageSize)
+ .filter(ps -> ps > 0 && ps <= maxPageSize)
+ .orElse(defaultPageSize);
+ }
+
+ public String registerMessageFilter(String groovyCode) {
+ String saltedCode = groovyCode + SALT_FOR_HASHING;
+ String filterId = Hashing.sha256()
+ .hashString(saltedCode, Charsets.UTF_8)
+ .toString()
+ .substring(0, 8);
+ if (registeredFilters.getIfPresent(filterId) == null) {
+ registeredFilters.put(filterId, MessageFilters.groovyScriptFilter(groovyCode));
+ }
+ return filterId;
+ }
+
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/PollingCursorsStorage.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/PollingCursorsStorage.java
new file mode 100644
index 00000000000..98094b5113b
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/PollingCursorsStorage.java
@@ -0,0 +1,45 @@
+package com.provectus.kafka.ui.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.provectus.kafka.ui.emitter.Cursor;
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+import org.apache.commons.lang3.RandomStringUtils;
+
+public class PollingCursorsStorage {
+
+ public static final int MAX_SIZE = 10_000;
+
+ private final Cache cursorsCache = CacheBuilder.newBuilder()
+ .maximumSize(MAX_SIZE)
+ .build();
+
+
+ public Cursor.Tracking createNewCursor(ConsumerRecordDeserializer deserializer,
+ ConsumerPosition originalPosition,
+ Predicate filter,
+ int limit) {
+ return new Cursor.Tracking(deserializer, originalPosition, filter, limit, this::register);
+ }
+
+ public Optional getCursor(String id) {
+ return Optional.ofNullable(cursorsCache.getIfPresent(id));
+ }
+
+ public String register(Cursor cursor) {
+ var id = RandomStringUtils.random(8, true, true);
+ cursorsCache.put(id, cursor);
+ return id;
+ }
+
+ @VisibleForTesting
+ public Map asMap() {
+ return cursorsCache.asMap();
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java
index 2523aae89ec..692c63109fa 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java
@@ -1,6 +1,6 @@
package com.provectus.kafka.ui.service.analyze;
-import static com.provectus.kafka.ui.model.SeekTypeDTO.BEGINNING;
+import static com.provectus.kafka.ui.model.PollingModeDTO.EARLIEST;
import com.provectus.kafka.ui.emitter.EnhancedConsumer;
import com.provectus.kafka.ui.emitter.SeekOperations;
@@ -14,6 +14,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
@@ -104,7 +105,8 @@ public void run() {
consumer.partitionsFor(topicId.topicName)
.forEach(tp -> partitionStats.put(tp.partition(), new TopicAnalysisStats()));
- var seekOperations = SeekOperations.create(consumer, new ConsumerPosition(BEGINNING, topicId.topicName, null));
+ var seekOperations =
+ SeekOperations.create(consumer, new ConsumerPosition(EARLIEST, topicId.topicName, List.of(), null, null));
long summaryOffsetsRange = seekOperations.summaryOffsetsRange();
seekOperations.assignAndSeekNonEmptyPartitions();
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java
index ff11aa6656a..b925ea607f2 100644
--- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java
@@ -56,7 +56,7 @@ public void shouldDeleteRecords() {
}
long count = webTestClient.get()
- .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
+ .uri("/api/clusters/{clusterName}/topics/{topicName}/messages/v2?m=EARLIEST", LOCAL, topicName)
.accept(TEXT_EVENT_STREAM)
.exchange()
.expectStatus()
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/CursorTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/CursorTest.java
new file mode 100644
index 00000000000..88be63fe67b
--- /dev/null
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/CursorTest.java
@@ -0,0 +1,195 @@
+package com.provectus.kafka.ui.emitter;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.AbstractIntegrationTest;
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.PollingModeDTO;
+import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.producer.KafkaTestProducer;
+import com.provectus.kafka.ui.serde.api.Serde;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
+import com.provectus.kafka.ui.serdes.builtin.StringSerde;
+import com.provectus.kafka.ui.service.PollingCursorsStorage;
+import com.provectus.kafka.ui.util.ApplicationMetrics;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+class CursorTest extends AbstractIntegrationTest {
+
+ static final String TOPIC = CursorTest.class.getSimpleName() + "_" + UUID.randomUUID();
+ static final int MSGS_IN_PARTITION = 20;
+ static final int PAGE_SIZE = (MSGS_IN_PARTITION / 2) + 1; //to poll fill data set in 2 iterations
+
+ final PollingCursorsStorage cursorsStorage = new PollingCursorsStorage();
+
+ @BeforeAll
+ static void setup() {
+ createTopic(new NewTopic(TOPIC, 1, (short) 1));
+ try (var producer = KafkaTestProducer.forKafka(kafka)) {
+ for (int i = 0; i < MSGS_IN_PARTITION; i++) {
+ producer.send(new ProducerRecord<>(TOPIC, "msg_" + i));
+ }
+ }
+ }
+
+ @AfterAll
+ static void cleanup() {
+ deleteTopic(TOPIC);
+ }
+
+ @Test
+ void backwardEmitter() {
+ var consumerPosition = new ConsumerPosition(PollingModeDTO.LATEST, TOPIC, List.of(), null, null);
+ var emitter = createBackwardEmitter(consumerPosition);
+ emitMessages(emitter, PAGE_SIZE);
+ var cursor = assertCursor(
+ PollingModeDTO.TO_OFFSET,
+ offsets -> assertThat(offsets)
+ .hasSize(1)
+ .containsEntry(new TopicPartition(TOPIC, 0), 9L)
+ );
+
+ // polling remaining records using registered cursor
+ emitter = createBackwardEmitterWithCursor(cursor);
+ emitMessages(emitter, MSGS_IN_PARTITION - PAGE_SIZE);
+ //checking no new cursors registered
+ assertThat(cursorsStorage.asMap()).hasSize(1).containsValue(cursor);
+ }
+
+ @Test
+ void forwardEmitter() {
+ var consumerPosition = new ConsumerPosition(PollingModeDTO.EARLIEST, TOPIC, List.of(), null, null);
+ var emitter = createForwardEmitter(consumerPosition);
+ emitMessages(emitter, PAGE_SIZE);
+ var cursor = assertCursor(
+ PollingModeDTO.FROM_OFFSET,
+ offsets -> assertThat(offsets)
+ .hasSize(1)
+ .containsEntry(new TopicPartition(TOPIC, 0), 11L)
+ );
+
+ //polling remaining records using registered cursor
+ emitter = createForwardEmitterWithCursor(cursor);
+ emitMessages(emitter, MSGS_IN_PARTITION - PAGE_SIZE);
+ //checking no new cursors registered
+ assertThat(cursorsStorage.asMap()).hasSize(1).containsValue(cursor);
+ }
+
+ private Cursor assertCursor(PollingModeDTO expectedMode,
+ Consumer