Skip to content

Commit 4af9a5e

Browse files
authored
feat: updates for Prism UI (#1224)
Multiple changes - Modified dashboards and filters to have async safe RwLock - Modified dashboards and filters to send back list of all available items (any user can GET all items) - Bugfix for stats - Modified stats format to be sent back as u64 instead of `format!("{number} Bytes")` - `fetch_daily_stats_from_ingestors()` function will read `stream.json` files for each ingestor and calculate date-wise stats instead of calling all ingestors' /metric endpoint
1 parent 8381e72 commit 4af9a5e

File tree

10 files changed

+269
-230
lines changed

10 files changed

+269
-230
lines changed

src/handlers/http/cluster/mod.rs

+23-58
Original file line numberDiff line numberDiff line change
@@ -367,64 +367,29 @@ pub async fn sync_role_update_with_ingestors(
367367
Ok(())
368368
}
369369

370-
pub async fn fetch_daily_stats_from_ingestors(
371-
stream_name: &str,
370+
pub fn fetch_daily_stats_from_ingestors(
372371
date: &str,
372+
stream_meta_list: &[ObjectStoreFormat],
373373
) -> Result<Stats, StreamError> {
374-
let mut total_events_ingested: u64 = 0;
375-
let mut total_ingestion_size: u64 = 0;
376-
let mut total_storage_size: u64 = 0;
377-
378-
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
379-
error!("Fatal: failed to get ingestor info: {:?}", err);
380-
StreamError::Anyhow(err)
381-
})?;
382-
for ingestor in ingestor_infos.iter() {
383-
let uri = Url::parse(&format!(
384-
"{}{}/metrics",
385-
&ingestor.domain_name,
386-
base_path_without_preceding_slash()
387-
))
388-
.map_err(|err| {
389-
StreamError::Anyhow(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
390-
})?;
391-
392-
let res = HTTP_CLIENT
393-
.get(uri)
394-
.header(header::AUTHORIZATION, &ingestor.token)
395-
.header(header::CONTENT_TYPE, "application/json")
396-
.send()
397-
.await;
398-
399-
if let Ok(res) = res {
400-
let text = res
401-
.text()
402-
.await
403-
.map_err(|err| StreamError::Anyhow(anyhow::anyhow!("Request failed: {}", err)))?;
404-
let lines: Vec<Result<String, std::io::Error>> =
405-
text.lines().map(|line| Ok(line.to_owned())).collect_vec();
406-
407-
let sample = prometheus_parse::Scrape::parse(lines.into_iter())
408-
.map_err(|err| {
409-
StreamError::Anyhow(anyhow::anyhow!(
410-
"Invalid URL in Ingestor Metadata: {}",
411-
err
412-
))
413-
})?
414-
.samples;
415-
416-
let (events_ingested, ingestion_size, storage_size) =
417-
Metrics::get_daily_stats_from_samples(sample, stream_name, date);
418-
total_events_ingested += events_ingested;
419-
total_ingestion_size += ingestion_size;
420-
total_storage_size += storage_size;
374+
// for the given date, get the stats from the ingestors
375+
let mut events_ingested = 0;
376+
let mut ingestion_size = 0;
377+
let mut storage_size = 0;
378+
379+
for meta in stream_meta_list.iter() {
380+
for manifest in meta.snapshot.manifest_list.iter() {
381+
if manifest.time_lower_bound.date_naive().to_string() == date {
382+
events_ingested += manifest.events_ingested;
383+
ingestion_size += manifest.ingestion_size;
384+
storage_size += manifest.storage_size;
385+
}
421386
}
422387
}
423388

424389
let stats = Stats {
425-
events: total_events_ingested,
426-
ingestion: total_ingestion_size,
427-
storage: total_storage_size,
390+
events: events_ingested,
391+
ingestion: ingestion_size,
392+
storage: storage_size,
428393
};
429394
Ok(stats)
430395
}
@@ -474,17 +439,17 @@ pub async fn fetch_stats_from_ingestors(
474439
Utc::now(),
475440
IngestionStats::new(
476441
count,
477-
format!("{} Bytes", ingestion_size),
442+
ingestion_size,
478443
lifetime_count,
479-
format!("{} Bytes", lifetime_ingestion_size),
444+
lifetime_ingestion_size,
480445
deleted_count,
481-
format!("{} Bytes", deleted_ingestion_size),
446+
deleted_ingestion_size,
482447
"json",
483448
),
484449
StorageStats::new(
485-
format!("{} Bytes", storage_size),
486-
format!("{} Bytes", lifetime_storage_size),
487-
format!("{} Bytes", deleted_storage_size),
450+
storage_size,
451+
lifetime_storage_size,
452+
deleted_storage_size,
488453
"parquet",
489454
),
490455
);

src/handlers/http/cluster/utils.rs

+16-65
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
use crate::{handlers::http::base_path_without_preceding_slash, HTTP_CLIENT};
2020
use actix_web::http::header;
2121
use chrono::{DateTime, Utc};
22-
use itertools::Itertools;
2322
use serde::{Deserialize, Serialize};
2423
use tracing::error;
2524
use url::Url;
@@ -81,22 +80,22 @@ impl ClusterInfo {
8180
#[derive(Debug, Default, Serialize, Deserialize)]
8281
pub struct IngestionStats {
8382
pub count: u64,
84-
pub size: String,
83+
pub size: u64,
8584
pub format: String,
8685
pub lifetime_count: u64,
87-
pub lifetime_size: String,
86+
pub lifetime_size: u64,
8887
pub deleted_count: u64,
89-
pub deleted_size: String,
88+
pub deleted_size: u64,
9089
}
9190

9291
impl IngestionStats {
9392
pub fn new(
9493
count: u64,
95-
size: String,
94+
size: u64,
9695
lifetime_count: u64,
97-
lifetime_size: String,
96+
lifetime_size: u64,
9897
deleted_count: u64,
99-
deleted_size: String,
98+
deleted_size: u64,
10099
format: &str,
101100
) -> Self {
102101
Self {
@@ -113,14 +112,14 @@ impl IngestionStats {
113112

114113
#[derive(Debug, Default, Serialize, Deserialize)]
115114
pub struct StorageStats {
116-
pub size: String,
115+
pub size: u64,
117116
pub format: String,
118-
pub lifetime_size: String,
119-
pub deleted_size: String,
117+
pub lifetime_size: u64,
118+
pub deleted_size: u64,
120119
}
121120

122121
impl StorageStats {
123-
pub fn new(size: String, lifetime_size: String, deleted_size: String, format: &str) -> Self {
122+
pub fn new(size: u64, lifetime_size: u64, deleted_size: u64, format: &str) -> Self {
124123
Self {
125124
size,
126125
format: format.to_string(),
@@ -143,71 +142,23 @@ pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
143142
.fold(IngestionStats::default(), |acc, x| IngestionStats {
144143
count: acc.count + x.count,
145144

146-
size: format!(
147-
"{} Bytes",
148-
acc.size.split(' ').collect_vec()[0]
149-
.parse::<u64>()
150-
.unwrap_or_default()
151-
+ x.size.split(' ').collect_vec()[0]
152-
.parse::<u64>()
153-
.unwrap_or_default()
154-
),
145+
size: acc.size + x.size,
155146
format: x.format.clone(),
156147
lifetime_count: acc.lifetime_count + x.lifetime_count,
157-
lifetime_size: format!(
158-
"{} Bytes",
159-
acc.lifetime_size.split(' ').collect_vec()[0]
160-
.parse::<u64>()
161-
.unwrap_or_default()
162-
+ x.lifetime_size.split(' ').collect_vec()[0]
163-
.parse::<u64>()
164-
.unwrap_or_default()
165-
),
148+
lifetime_size: acc.lifetime_size + x.lifetime_size,
166149
deleted_count: acc.deleted_count + x.deleted_count,
167-
deleted_size: format!(
168-
"{} Bytes",
169-
acc.deleted_size.split(' ').collect_vec()[0]
170-
.parse::<u64>()
171-
.unwrap_or_default()
172-
+ x.deleted_size.split(' ').collect_vec()[0]
173-
.parse::<u64>()
174-
.unwrap_or_default()
175-
),
150+
deleted_size: acc.deleted_size + x.deleted_size,
176151
});
177152

178153
let cumulative_storage =
179154
stats
180155
.iter()
181156
.map(|x| &x.storage)
182157
.fold(StorageStats::default(), |acc, x| StorageStats {
183-
size: format!(
184-
"{} Bytes",
185-
acc.size.split(' ').collect_vec()[0]
186-
.parse::<u64>()
187-
.unwrap_or_default()
188-
+ x.size.split(' ').collect_vec()[0]
189-
.parse::<u64>()
190-
.unwrap_or_default()
191-
),
158+
size: acc.size + x.size,
192159
format: x.format.clone(),
193-
lifetime_size: format!(
194-
"{} Bytes",
195-
acc.lifetime_size.split(' ').collect_vec()[0]
196-
.parse::<u64>()
197-
.unwrap_or_default()
198-
+ x.lifetime_size.split(' ').collect_vec()[0]
199-
.parse::<u64>()
200-
.unwrap_or_default()
201-
),
202-
deleted_size: format!(
203-
"{} Bytes",
204-
acc.deleted_size.split(' ').collect_vec()[0]
205-
.parse::<u64>()
206-
.unwrap_or_default()
207-
+ x.deleted_size.split(' ').collect_vec()[0]
208-
.parse::<u64>()
209-
.unwrap_or_default()
210-
),
160+
lifetime_size: acc.lifetime_size + x.lifetime_size,
161+
deleted_size: acc.deleted_size + x.deleted_size,
211162
});
212163

213164
QueriedStats::new(

src/handlers/http/logstream.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -262,17 +262,17 @@ pub async fn get_stats(
262262
let stats = {
263263
let ingestion_stats = IngestionStats::new(
264264
stats.current_stats.events,
265-
format!("{} Bytes", stats.current_stats.ingestion),
265+
stats.current_stats.ingestion,
266266
stats.lifetime_stats.events,
267-
format!("{} Bytes", stats.lifetime_stats.ingestion),
267+
stats.lifetime_stats.ingestion,
268268
stats.deleted_stats.events,
269-
format!("{} Bytes", stats.deleted_stats.ingestion),
269+
stats.deleted_stats.ingestion,
270270
"json",
271271
);
272272
let storage_stats = StorageStats::new(
273-
format!("{} Bytes", stats.current_stats.storage),
274-
format!("{} Bytes", stats.lifetime_stats.storage),
275-
format!("{} Bytes", stats.deleted_stats.storage),
273+
stats.current_stats.storage,
274+
stats.lifetime_stats.storage,
275+
stats.deleted_stats.storage,
276276
"parquet",
277277
);
278278

src/handlers/http/modal/query/querier_logstream.rs

+37-8
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use actix_web::{
2626
use bytes::Bytes;
2727
use chrono::Utc;
2828
use http::StatusCode;
29+
use relative_path::RelativePathBuf;
2930
use tokio::sync::Mutex;
3031
use tracing::{error, warn};
3132

@@ -44,7 +45,7 @@ use crate::{
4445
hottier::HotTierManager,
4546
parseable::{StreamNotFound, PARSEABLE},
4647
stats::{self, Stats},
47-
storage::StreamType,
48+
storage::{ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY},
4849
};
4950

5051
pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
@@ -151,7 +152,35 @@ pub async fn get_stats(
151152

152153
if !date_value.is_empty() {
153154
let querier_stats = get_stats_date(&stream_name, date_value).await?;
154-
let ingestor_stats = fetch_daily_stats_from_ingestors(&stream_name, date_value).await?;
155+
156+
// this function requires all the ingestor stream jsons
157+
let path = RelativePathBuf::from_iter([&stream_name, STREAM_ROOT_DIRECTORY]);
158+
let obs = PARSEABLE
159+
.storage
160+
.get_object_store()
161+
.get_objects(
162+
Some(&path),
163+
Box::new(|file_name| {
164+
file_name.starts_with(".ingestor") && file_name.ends_with("stream.json")
165+
}),
166+
)
167+
.await?;
168+
169+
let mut ingestor_stream_jsons = Vec::new();
170+
for ob in obs {
171+
let stream_metadata: ObjectStoreFormat = match serde_json::from_slice(&ob) {
172+
Ok(d) => d,
173+
Err(e) => {
174+
error!("Failed to parse stream metadata: {:?}", e);
175+
continue;
176+
}
177+
};
178+
ingestor_stream_jsons.push(stream_metadata);
179+
}
180+
181+
let ingestor_stats =
182+
fetch_daily_stats_from_ingestors(date_value, &ingestor_stream_jsons)?;
183+
155184
let total_stats = Stats {
156185
events: querier_stats.events + ingestor_stats.events,
157186
ingestion: querier_stats.ingestion + ingestor_stats.ingestion,
@@ -180,17 +209,17 @@ pub async fn get_stats(
180209
let stats = {
181210
let ingestion_stats = IngestionStats::new(
182211
stats.current_stats.events,
183-
format!("{} Bytes", stats.current_stats.ingestion),
212+
stats.current_stats.ingestion,
184213
stats.lifetime_stats.events,
185-
format!("{} Bytes", stats.lifetime_stats.ingestion),
214+
stats.lifetime_stats.ingestion,
186215
stats.deleted_stats.events,
187-
format!("{} Bytes", stats.deleted_stats.ingestion),
216+
stats.deleted_stats.ingestion,
188217
"json",
189218
);
190219
let storage_stats = StorageStats::new(
191-
format!("{} Bytes", stats.current_stats.storage),
192-
format!("{} Bytes", stats.lifetime_stats.storage),
193-
format!("{} Bytes", stats.deleted_stats.storage),
220+
stats.current_stats.storage,
221+
stats.lifetime_stats.storage,
222+
stats.deleted_stats.storage,
194223
"parquet",
195224
);
196225

0 commit comments

Comments
 (0)