5
5
import static java .util .stream .Collectors .toMap ;
6
6
7
7
import com .provectus .kafka .ui .api .MessagesApi ;
8
+ import com .provectus .kafka .ui .exception .ValidationException ;
8
9
import com .provectus .kafka .ui .model .ConsumerPosition ;
9
10
import com .provectus .kafka .ui .model .CreateTopicMessageDTO ;
10
11
import com .provectus .kafka .ui .model .MessageFilterTypeDTO ;
18
19
import java .util .List ;
19
20
import java .util .Map ;
20
21
import java .util .Optional ;
22
+ import javax .annotation .Nullable ;
21
23
import javax .validation .Valid ;
22
24
import lombok .RequiredArgsConstructor ;
23
25
import lombok .extern .slf4j .Slf4j ;
@@ -63,18 +65,22 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
63
65
String keySerde ,
64
66
String valueSerde ,
65
67
ServerWebExchange exchange ) {
68
+ seekType = seekType != null ? seekType : SeekTypeDTO .BEGINNING ;
69
+ seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO .FORWARD ;
70
+ filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO .STRING_CONTAINS ;
71
+ int recordsLimit =
72
+ Optional .ofNullable (limit ).map (s -> Math .min (s , MAX_LOAD_RECORD_LIMIT )).orElse (DEFAULT_LOAD_RECORD_LIMIT );
73
+
66
74
var positions = new ConsumerPosition (
67
- seekType != null ? seekType : SeekTypeDTO . BEGINNING ,
68
- parseSeekTo ( topicName , seekTo ) ,
69
- seekDirection
75
+ seekType ,
76
+ topicName ,
77
+ parseSeekTo ( topicName , seekType , seekTo )
70
78
);
71
- int recordsLimit = Optional .ofNullable (limit )
72
- .map (s -> Math .min (s , MAX_LOAD_RECORD_LIMIT ))
73
- .orElse (DEFAULT_LOAD_RECORD_LIMIT );
74
79
return Mono .just (
75
80
ResponseEntity .ok (
76
81
messagesService .loadMessages (
77
- getCluster (clusterName ), topicName , positions , q , filterQueryType , recordsLimit , keySerde , valueSerde )
82
+ getCluster (clusterName ), topicName , positions , q , filterQueryType ,
83
+ recordsLimit , seekDirection , keySerde , valueSerde )
78
84
)
79
85
);
80
86
}
@@ -92,9 +98,13 @@ public Mono<ResponseEntity<Void>> sendTopicMessages(
92
98
* The format is [partition]::[offset] for specifying offsets
93
99
* or [partition]::[timestamp in millis] for specifying timestamps.
94
100
*/
95
- private Map <TopicPartition , Long > parseSeekTo (String topic , List <String > seekTo ) {
101
+ @ Nullable
102
+ private Map <TopicPartition , Long > parseSeekTo (String topic , SeekTypeDTO seekType , List <String > seekTo ) {
96
103
if (seekTo == null || seekTo .isEmpty ()) {
97
- return Map .of ();
104
+ if (seekType == SeekTypeDTO .LATEST || seekType == SeekTypeDTO .BEGINNING ) {
105
+ return null ;
106
+ }
107
+ throw new ValidationException ("seekTo should be set if seekType is " + seekType );
98
108
}
99
109
return seekTo .stream ()
100
110
.map (p -> {
0 commit comments