Skip to content

Commit 6dfb045

Browse files
sevenEngfede1024
authored andcommitted
Add key and timestamp to tailed messages (#35)
* Add key and timestmap to tailed messages * Show key and timestamp together with message payload * Cargo fmt the code
1 parent f2b0d8d commit 6dfb045

File tree

4 files changed

+68
-10
lines changed

4 files changed

+68
-10
lines changed

resources/web_server/public/my_css.css

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ h3 {
2626
padding-bottom: 3px;
2727
margin-top: 0px;
2828
margin-bottom: 0px;
29+
display: flex;
30+
}
31+
32+
.message-key,
33+
.message-ts {
34+
padding-right: 5px;
35+
margin-right: 5px;
36+
border-right: 1px solid #eee;
2937
}
3038

3139
div.topic_tailer {

resources/web_server/public/my_js.js

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,23 @@ function error_to_graphic(cell) {
106106
$(cell).html(symbol);
107107
}
108108

109+
function message_to_tailer_entry(msg) {
110+
var ts_text;
111+
if (msg["created_at"]) {
112+
ts_text = (new Date(msg["created_at"])).toISOString() + " Created";
113+
} else if (msg["appended_at"]) {
114+
ts_text = (new Date(msg["appended_at"])).toISOString() + " Appended";
115+
} else {
116+
ts_text = "N/A";
117+
}
118+
119+
var entry = $("<div>", {class: "message"});
120+
entry.append($("<div>", { class: "message-key", text: msg["key"] ? msg["key"] : "N/A" }));
121+
entry.append($("<div>", { class: "message-ts", text: ts_text }));
122+
entry.append($("<div>", { class: "message-payload", text: msg["payload"] }));
123+
return entry;
124+
}
125+
109126
$(document).ready(function() {
110127
$('#datatable-brokers-ajax').each(function(index) {
111128
$(this).DataTable({
@@ -375,9 +392,7 @@ function background_tailer(cluster_id, topic_name, tailer_id) {
375392
messages = JSON.parse(data);
376393
for (var i = 0; i < messages.length; i++) {
377394
var message = messages[i];
378-
var p = $("<p>", {class: "message"});
379-
p.append(truncate(message[2], max_msg_length));
380-
div_tailer.append(p);
395+
div_tailer.append(message_to_tailer_entry(message));
381396
}
382397
if (bottom)
383398
scroll_to_bottom(div_tailer);

src/live_consumer.rs

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use rdkafka::config::ClientConfig;
22
use rdkafka::consumer::{BaseConsumer, Consumer, EmptyConsumerContext};
33
use rdkafka::message::BorrowedMessage;
4+
use rdkafka::message::Timestamp::*;
45
use rdkafka::Message;
56
use rocket::http::RawStr;
67
use rocket::State;
@@ -183,8 +184,18 @@ impl LiveConsumerStore {
183184

184185
// TODO: check log in case of error
185186

187+
#[derive(Serialize)]
188+
struct TailedMessage {
189+
partition: i32,
190+
offset: i64,
191+
key: Option<String>,
192+
created_at: Option<i64>,
193+
appended_at: Option<i64>,
194+
payload: String,
195+
}
196+
186197
#[get("/api/tailer/<cluster_id>/<topic>/<id>")]
187-
pub fn test_live_consumer_api(
198+
pub fn topic_tailer_api(
188199
cluster_id: ClusterId,
189200
topic: &RawStr,
190201
id: u64,
@@ -217,16 +228,40 @@ pub fn test_live_consumer_api(
217228

218229
let mut output = Vec::new();
219230
for message in consumer.poll(100, Duration::from_secs(3)) {
220-
let payload = message
231+
let key = message
232+
.key()
233+
.map(|bytes| String::from_utf8_lossy(bytes))
234+
.map(|cow_str| cow_str.into_owned());
235+
236+
let mut created_at = None;
237+
let mut appended_at = None;
238+
match message.timestamp() {
239+
CreateTime(ctime) => created_at = Some(ctime),
240+
LogAppendTime(atime) => appended_at = Some(atime),
241+
NotAvailable => (),
242+
}
243+
244+
let original_payload = message
221245
.payload()
222246
.map(|bytes| String::from_utf8_lossy(bytes))
223247
.unwrap_or(Cow::Borrowed(""));
224-
if payload.len() > 1024 {
225-
let truncated = format!("{}...", payload.chars().take(1024).collect::<String>());
226-
output.push(json! {(message.partition(), message.offset(), truncated)});
248+
let payload = if original_payload.len() > 1024 {
249+
format!(
250+
"{}...",
251+
original_payload.chars().take(1024).collect::<String>()
252+
)
227253
} else {
228-
output.push(json! {(message.partition(), message.offset(), payload)});
254+
original_payload.into_owned()
229255
};
256+
257+
output.push(TailedMessage {
258+
partition: message.partition(),
259+
offset: message.offset(),
260+
key,
261+
created_at,
262+
appended_at,
263+
payload,
264+
})
230265
}
231266

232267
Ok(json!(output).to_string())

src/web_server/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ pub fn run_server(executor: &ThreadPoolExecutor, cache: Cache, config: &Config)
127127
api::topic_groups,
128128
api::topic_search,
129129
api::topic_topology,
130-
live_consumer::test_live_consumer_api,
130+
live_consumer::topic_tailer_api,
131131
],
132132
)
133133
.launch();

0 commit comments

Comments
 (0)