Skip to content

Commit db4a68d

Browse files
update log source in stream info (#1231)
updated to store multiple log source formats along with the known fields list added - fields list for otel-logs, otel-traces and otel-metrics - logic to add log source, - fields to existing log source, - merge log sources when fetching from storage (in distributed) --------- Co-authored-by: Devdutt Shenoi <[email protected]>
1 parent 7f32288 commit db4a68d

File tree

14 files changed

+546
-120
lines changed

14 files changed

+546
-120
lines changed

src/connectors/kafka/processor.rs

+7-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use tracing::{debug, error};
2828
use crate::{
2929
connectors::common::processor::Processor,
3030
event::{
31-
format::{json, EventFormat, LogSource},
31+
format::{json, EventFormat, LogSourceEntry},
3232
Event as ParseableEvent,
3333
},
3434
parseable::PARSEABLE,
@@ -49,9 +49,14 @@ impl ParseableSinkProcessor {
4949
.first()
5050
.map(|r| r.topic.as_str())
5151
.unwrap_or_default();
52+
let log_source_entry = LogSourceEntry::default();
5253

5354
PARSEABLE
54-
.create_stream_if_not_exists(stream_name, StreamType::UserDefined, LogSource::Json)
55+
.create_stream_if_not_exists(
56+
stream_name,
57+
StreamType::UserDefined,
58+
vec![log_source_entry],
59+
)
5560
.await?;
5661

5762
let stream = PARSEABLE.get_stream(stream_name)?;

src/event/format/mod.rs

+18-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ static TIME_FIELD_NAME_PARTS: [&str; 2] = ["time", "date"];
4444
type EventSchema = Vec<Arc<Field>>;
4545

4646
/// Source of the logs, used to perform special processing for certain sources
47-
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47+
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
4848
pub enum LogSource {
4949
// AWS Kinesis sends logs in the format of a json array
5050
Kinesis,
@@ -92,6 +92,23 @@ impl Display for LogSource {
9292
}
9393
}
9494

95+
/// Contains the format name and a list of known field names that are associated with the said format.
96+
/// Stored on disk as part of `ObjectStoreFormat` in stream.json
97+
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
98+
pub struct LogSourceEntry {
99+
pub log_source_format: LogSource,
100+
pub fields: HashSet<String>,
101+
}
102+
103+
impl LogSourceEntry {
104+
pub fn new(log_source_format: LogSource, fields: HashSet<String>) -> Self {
105+
LogSourceEntry {
106+
log_source_format,
107+
fields,
108+
}
109+
}
110+
}
111+
95112
// Global Trait for event format
96113
// This trait is implemented by all the event formats
97114
pub trait EventFormat: Sized {

src/handlers/http/ingest.rs

+48-8
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*
1717
*/
1818

19-
use std::collections::HashMap;
19+
use std::collections::{HashMap, HashSet};
2020

2121
use actix_web::web::{Json, Path};
2222
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
@@ -28,10 +28,13 @@ use serde_json::Value;
2828

2929
use crate::event;
3030
use crate::event::error::EventError;
31-
use crate::event::format::{self, EventFormat, LogSource};
31+
use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
3232
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
3333
use crate::metadata::SchemaVersion;
3434
use crate::option::Mode;
35+
use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST;
36+
use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST;
37+
use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST;
3538
use crate::parseable::{StreamNotFound, PARSEABLE};
3639
use crate::storage::{ObjectStorageError, StreamType};
3740
use crate::utils::header_parsing::ParseHeaderError;
@@ -55,9 +58,6 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
5558
if internal_stream_names.contains(&stream_name) {
5659
return Err(PostError::InternalStream(stream_name));
5760
}
58-
PARSEABLE
59-
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::default())
60-
.await?;
6161

6262
let log_source = req
6363
.headers()
@@ -72,6 +72,15 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7272
return Err(PostError::OtelNotSupported);
7373
}
7474

75+
let log_source_entry = LogSourceEntry::new(log_source.clone(), HashSet::new());
76+
PARSEABLE
77+
.create_stream_if_not_exists(
78+
&stream_name,
79+
StreamType::UserDefined,
80+
vec![log_source_entry],
81+
)
82+
.await?;
83+
7584
flatten_and_push_logs(json, &stream_name, &log_source).await?;
7685

7786
Ok(HttpResponse::Ok().finish())
@@ -119,8 +128,20 @@ pub async fn handle_otel_logs_ingestion(
119128
}
120129

121130
let stream_name = stream_name.to_str().unwrap().to_owned();
131+
132+
let log_source_entry = LogSourceEntry::new(
133+
log_source.clone(),
134+
OTEL_LOG_KNOWN_FIELD_LIST
135+
.iter()
136+
.map(|&s| s.to_string())
137+
.collect(),
138+
);
122139
PARSEABLE
123-
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelLogs)
140+
.create_stream_if_not_exists(
141+
&stream_name,
142+
StreamType::UserDefined,
143+
vec![log_source_entry],
144+
)
124145
.await?;
125146

126147
flatten_and_push_logs(json, &stream_name, &log_source).await?;
@@ -146,11 +167,18 @@ pub async fn handle_otel_metrics_ingestion(
146167
return Err(PostError::IncorrectLogSource(LogSource::OtelMetrics));
147168
}
148169
let stream_name = stream_name.to_str().unwrap().to_owned();
170+
let log_source_entry = LogSourceEntry::new(
171+
log_source.clone(),
172+
OTEL_METRICS_KNOWN_FIELD_LIST
173+
.iter()
174+
.map(|&s| s.to_string())
175+
.collect(),
176+
);
149177
PARSEABLE
150178
.create_stream_if_not_exists(
151179
&stream_name,
152180
StreamType::UserDefined,
153-
LogSource::OtelMetrics,
181+
vec![log_source_entry],
154182
)
155183
.await?;
156184

@@ -178,8 +206,20 @@ pub async fn handle_otel_traces_ingestion(
178206
return Err(PostError::IncorrectLogSource(LogSource::OtelTraces));
179207
}
180208
let stream_name = stream_name.to_str().unwrap().to_owned();
209+
let log_source_entry = LogSourceEntry::new(
210+
log_source.clone(),
211+
OTEL_TRACES_KNOWN_FIELD_LIST
212+
.iter()
213+
.map(|&s| s.to_string())
214+
.collect(),
215+
);
216+
181217
PARSEABLE
182-
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelTraces)
218+
.create_stream_if_not_exists(
219+
&stream_name,
220+
StreamType::UserDefined,
221+
vec![log_source_entry],
222+
)
183223
.await?;
184224

185225
flatten_and_push_logs(json, &stream_name, &log_source).await?;

src/handlers/http/logstream.rs

+8
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,14 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
309309
None
310310
};
311311

312+
let stream_log_source = storage
313+
.get_log_source_from_storage(&stream_name)
314+
.await
315+
.unwrap_or_default();
316+
PARSEABLE
317+
.update_log_source(&stream_name, stream_log_source)
318+
.await?;
319+
312320
let hash_map = PARSEABLE.streams.read().unwrap();
313321
let stream_meta = hash_map
314322
.get(&stream_name)

src/metadata.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::num::NonZeroU32;
2424
use std::sync::Arc;
2525

2626
use crate::catalog::snapshot::ManifestItem;
27-
use crate::event::format::LogSource;
27+
use crate::event::format::LogSourceEntry;
2828
use crate::metrics::{
2929
EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE,
3030
EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE,
@@ -87,7 +87,7 @@ pub struct LogStreamMetadata {
8787
pub static_schema_flag: bool,
8888
pub hot_tier_enabled: bool,
8989
pub stream_type: StreamType,
90-
pub log_source: LogSource,
90+
pub log_source: Vec<LogSourceEntry>,
9191
}
9292

9393
impl LogStreamMetadata {
@@ -101,7 +101,7 @@ impl LogStreamMetadata {
101101
static_schema: HashMap<String, Arc<Field>>,
102102
stream_type: StreamType,
103103
schema_version: SchemaVersion,
104-
log_source: LogSource,
104+
log_source: Vec<LogSourceEntry>,
105105
) -> Self {
106106
LogStreamMetadata {
107107
created_at: if created_at.is_empty() {

0 commit comments

Comments
 (0)