Skip to content

Commit a8c8ed4

Browse files
authored
feat: prism post datasets API (#1236)
--------- Signed-off-by: Devdutt Shenoi <[email protected]>
1 parent 9dda1bd commit a8c8ed4

File tree

6 files changed

+230
-12
lines changed

6 files changed

+230
-12
lines changed

src/handlers/http/modal/query_server.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ impl ParseableServer for QueryServer {
7676
.service(
7777
web::scope(&prism_base_path())
7878
.service(Server::get_prism_home())
79-
.service(Server::get_prism_logstream()),
79+
.service(Server::get_prism_logstream())
80+
.service(Server::get_prism_datasets()),
8081
)
8182
.service(Server::get_generated());
8283
}

src/handlers/http/modal/server.rs

+13-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ impl ParseableServer for Server {
9595
.service(
9696
web::scope(&prism_base_path())
9797
.service(Server::get_prism_home())
98-
.service(Server::get_prism_logstream()),
98+
.service(Server::get_prism_logstream())
99+
.service(Server::get_prism_datasets()),
99100
)
100101
.service(Self::get_ingest_otel_factory())
101102
.service(Self::get_generated());
@@ -180,6 +181,17 @@ impl Server {
180181
)
181182
}
182183

184+
pub fn get_prism_datasets() -> Scope {
185+
web::scope("/datasets").route(
186+
"",
187+
web::post()
188+
.to(http::prism_logstream::post_datasets)
189+
.authorize_for_stream(Action::GetStreamInfo)
190+
.authorize_for_stream(Action::GetStats)
191+
.authorize_for_stream(Action::GetRetention),
192+
)
193+
}
194+
183195
pub fn get_metrics_webscope() -> Scope {
184196
web::scope("/metrics").service(
185197
web::resource("").route(web::get().to(metrics::get).authorize(Action::Metrics)),

src/handlers/http/prism_logstream.rs

+21-3
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,33 @@
1717
*/
1818

1919
use actix_web::{
20-
web::{self, Path},
21-
Responder,
20+
web::{self, Json, Path},
21+
HttpRequest, Responder,
2222
};
2323

24-
use crate::prism::logstream::{get_prism_logstream_info, PrismLogstreamError};
24+
use crate::{
25+
prism::logstream::{get_prism_logstream_info, PrismDatasetRequest, PrismLogstreamError},
26+
utils::actix::extract_session_key_from_req,
27+
};
2528

2629
/// This API is essentially just combining the responses of /info and /schema together
2730
pub async fn get_info(stream_name: Path<String>) -> Result<impl Responder, PrismLogstreamError> {
2831
let prism_logstream_info = get_prism_logstream_info(&stream_name).await?;
2932

3033
Ok(web::Json(prism_logstream_info))
3134
}
35+
36+
/// A combination of /stats, /retention, /hottier, /info, /counts and /query
37+
pub async fn post_datasets(
38+
dataset_req: Option<Json<PrismDatasetRequest>>,
39+
req: HttpRequest,
40+
) -> Result<impl Responder, PrismLogstreamError> {
41+
let session_key = extract_session_key_from_req(&req)?;
42+
let dataset = dataset_req
43+
.map(|Json(r)| r)
44+
.unwrap_or_default()
45+
.get_datasets(session_key)
46+
.await?;
47+
48+
Ok(web::Json(dataset))
49+
}

src/prism/logstream/mod.rs

+188-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ use actix_web::http::header::ContentType;
2222
use arrow_schema::Schema;
2323
use chrono::Utc;
2424
use http::StatusCode;
25-
use serde::Serialize;
25+
use serde::{Deserialize, Serialize};
26+
use serde_json::{json, Value};
27+
use tracing::{debug, warn};
2628

2729
use crate::{
2830
handlers::http::{
@@ -31,11 +33,18 @@ use crate::{
3133
utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats},
3234
},
3335
logstream::error::StreamError,
34-
query::update_schema_when_distributed,
36+
query::{into_query, update_schema_when_distributed, Query, QueryError},
3537
},
38+
hottier::{HotTierError, HotTierManager, StreamHotTier},
3639
parseable::{StreamNotFound, PARSEABLE},
40+
query::{error::ExecuteError, execute, CountsRequest, CountsResponse, QUERY_SESSION},
41+
rbac::{map::SessionKey, role::Action, Users},
3742
stats,
3843
storage::{retention::Retention, StreamInfo, StreamType},
44+
utils::{
45+
arrow::record_batches_to_json,
46+
time::{TimeParseError, TimeRange},
47+
},
3948
LOCK_EXPECT,
4049
};
4150

@@ -185,6 +194,168 @@ async fn get_stream_info_helper(stream_name: &str) -> Result<StreamInfo, StreamE
185194
Ok(stream_info)
186195
}
187196

197+
/// Response structure for Prism dataset queries.
198+
/// Contains information about a stream, its statistics, retention policy,
199+
/// and query results.
200+
#[derive(Serialize)]
201+
pub struct PrismDatasetResponse {
202+
/// Name of the stream
203+
stream: String,
204+
/// Basic information about the stream
205+
info: StreamInfo,
206+
/// Statistics for the queried timeframe
207+
stats: QueriedStats,
208+
/// Retention policy details
209+
retention: Retention,
210+
/// Hot tier information if available
211+
hottier: Option<StreamHotTier>,
212+
/// Count of records in the specified time range
213+
counts: CountsResponse,
214+
/// Collection of distinct values for source identifiers
215+
distinct_sources: Value,
216+
}
217+
218+
/// Request parameters for retrieving Prism dataset information.
219+
/// Defines which streams to query
220+
#[derive(Deserialize, Default)]
221+
#[serde(rename_all = "camelCase")]
222+
pub struct PrismDatasetRequest {
223+
/// List of stream names to query
224+
#[serde(default)]
225+
streams: Vec<String>,
226+
}
227+
228+
impl PrismDatasetRequest {
229+
/// Retrieves dataset information for all specified streams.
230+
///
231+
/// Processes each stream in the request and compiles their information.
232+
/// Streams that don't exist or can't be accessed are skipped.
233+
///
234+
/// # Returns
235+
/// - `Ok(Vec<PrismDatasetResponse>)`: List of responses for successfully processed streams
236+
/// - `Err(PrismLogstreamError)`: If a critical error occurs during processing
237+
///
238+
/// # Note
239+
/// 1. This method won't fail if individual streams fail - it will only include
240+
/// successfully processed streams in the result.
241+
/// 2. On receiving an empty stream list, we return for all streams the user is able to query for
242+
pub async fn get_datasets(
243+
mut self,
244+
key: SessionKey,
245+
) -> Result<Vec<PrismDatasetResponse>, PrismLogstreamError> {
246+
let is_empty = self.streams.is_empty();
247+
if is_empty {
248+
self.streams = PARSEABLE.streams.list();
249+
}
250+
251+
let mut responses = vec![];
252+
for stream in self.streams.iter() {
253+
if Users.authorize(key.clone(), Action::ListStream, Some(stream), None)
254+
!= crate::rbac::Response::Authorized
255+
{
256+
// Don't warn if listed from Parseable
257+
if !is_empty {
258+
warn!("Unauthorized access requested for stream: {stream}");
259+
}
260+
continue;
261+
}
262+
263+
if PARSEABLE.check_or_load_stream(stream).await {
264+
debug!("Stream not found: {stream}");
265+
continue;
266+
}
267+
268+
let PrismLogstreamInfo {
269+
info,
270+
stats,
271+
retention,
272+
..
273+
} = get_prism_logstream_info(stream).await?;
274+
275+
let hottier = match HotTierManager::global() {
276+
Some(hot_tier_manager) => {
277+
let stats = hot_tier_manager.get_hot_tier(stream).await?;
278+
Some(stats)
279+
}
280+
_ => None,
281+
};
282+
let records = CountsRequest {
283+
stream: stream.clone(),
284+
start_time: "1h".to_owned(),
285+
end_time: "now".to_owned(),
286+
num_bins: 10,
287+
}
288+
.get_bin_density()
289+
.await?;
290+
let counts = CountsResponse {
291+
fields: vec!["start_time".into(), "end_time".into(), "count".into()],
292+
records,
293+
};
294+
295+
// Retrieve distinct values for source identifiers
296+
// Returns None if fields aren't present or if query fails
297+
let ips = self.get_distinct_entries(stream, "p_src_ip").await.ok();
298+
let user_agents = self.get_distinct_entries(stream, "p_user_agent").await.ok();
299+
300+
responses.push(PrismDatasetResponse {
301+
stream: stream.clone(),
302+
info,
303+
stats,
304+
retention,
305+
hottier,
306+
counts,
307+
distinct_sources: json!({
308+
"ips": ips,
309+
"user_agents": user_agents
310+
}),
311+
})
312+
}
313+
314+
Ok(responses)
315+
}
316+
317+
/// Retrieves distinct values for a specific field in a stream.
318+
///
319+
/// # Parameters
320+
/// - `stream_name`: Name of the stream to query
321+
/// - `field`: Field name to get distinct values for
322+
///
323+
/// # Returns
324+
/// - `Ok(Vec<String>)`: List of distinct values found for the field
325+
/// - `Err(QueryError)`: If the query fails or field doesn't exist
326+
async fn get_distinct_entries(
327+
&self,
328+
stream_name: &str,
329+
field: &str,
330+
) -> Result<Vec<String>, QueryError> {
331+
let query = Query {
332+
query: format!("SELECT DISTINCT({field}) FOR {stream_name}"),
333+
start_time: "1h".to_owned(),
334+
end_time: "now".to_owned(),
335+
send_null: false,
336+
filter_tags: None,
337+
fields: true,
338+
};
339+
let time_range = TimeRange::parse_human_time("1h", "now")?;
340+
341+
let session_state = QUERY_SESSION.state();
342+
let query = into_query(&query, &session_state, time_range).await?;
343+
let (records, _) = execute(query, stream_name).await?;
344+
let response = record_batches_to_json(&records)?;
345+
// Extract field values from the JSON response
346+
let values = response
347+
.iter()
348+
.flat_map(|row| {
349+
row.get(field)
350+
.and_then(|s| s.as_str())
351+
.map(|s| s.to_string())
352+
})
353+
.collect();
354+
355+
Ok(values)
356+
}
357+
}
358+
188359
#[derive(Debug, thiserror::Error)]
189360
pub enum PrismLogstreamError {
190361
#[error("Error: {0}")]
@@ -193,6 +364,16 @@ pub enum PrismLogstreamError {
193364
StreamError(#[from] StreamError),
194365
#[error("StreamNotFound: {0}")]
195366
StreamNotFound(#[from] StreamNotFound),
367+
#[error("Hottier: {0}")]
368+
Hottier(#[from] HotTierError),
369+
#[error("Query: {0}")]
370+
Query(#[from] QueryError),
371+
#[error("TimeParse: {0}")]
372+
TimeParse(#[from] TimeParseError),
373+
#[error("Execute: {0}")]
374+
Execute(#[from] ExecuteError),
375+
#[error("Auth: {0}")]
376+
Auth(#[from] actix_web::Error),
196377
}
197378

198379
impl actix_web::ResponseError for PrismLogstreamError {
@@ -201,6 +382,11 @@ impl actix_web::ResponseError for PrismLogstreamError {
201382
PrismLogstreamError::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR,
202383
PrismLogstreamError::StreamError(e) => e.status_code(),
203384
PrismLogstreamError::StreamNotFound(_) => StatusCode::NOT_FOUND,
385+
PrismLogstreamError::Hottier(_) => StatusCode::INTERNAL_SERVER_ERROR,
386+
PrismLogstreamError::Query(_) => StatusCode::INTERNAL_SERVER_ERROR,
387+
PrismLogstreamError::TimeParse(_) => StatusCode::NOT_FOUND,
388+
PrismLogstreamError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR,
389+
PrismLogstreamError::Auth(_) => StatusCode::UNAUTHORIZED,
204390
}
205391
}
206392

src/response.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ pub struct QueryResponse {
3333
impl QueryResponse {
3434
pub fn to_http(&self) -> Result<HttpResponse, QueryError> {
3535
info!("{}", "Returning query results");
36-
let records: Vec<&RecordBatch> = self.records.iter().collect();
37-
let mut json_records = record_batches_to_json(&records)?;
36+
let mut json_records = record_batches_to_json(&self.records)?;
3837

3938
if self.fill_null {
4039
for map in &mut json_records {

src/utils/arrow/mod.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,12 @@ pub fn replace_columns(
9090
/// * Result<Vec<Map<String, Value>>>
9191
///
9292
/// A vector of JSON objects representing the record batches.
93-
pub fn record_batches_to_json(records: &[&RecordBatch]) -> Result<Vec<Map<String, Value>>> {
93+
pub fn record_batches_to_json(records: &[RecordBatch]) -> Result<Vec<Map<String, Value>>> {
9494
let buf = vec![];
9595
let mut writer = arrow_json::ArrayWriter::new(buf);
96-
writer.write_batches(records)?;
96+
for record in records {
97+
writer.write(record)?;
98+
}
9799
writer.finish()?;
98100

99101
let buf = writer.into_inner();
@@ -188,7 +190,7 @@ mod tests {
188190
#[test]
189191
fn check_empty_json_to_record_batches() {
190192
let r = RecordBatch::new_empty(Arc::new(Schema::empty()));
191-
let rb = vec![&r];
193+
let rb = vec![r];
192194
let batches = record_batches_to_json(&rb).unwrap();
193195
assert_eq!(batches, vec![]);
194196
}

0 commit comments

Comments
 (0)