Skip to content

Commit 8381e72

Browse files
authored
refactor: utility Minute handles slotting by minute (#1203)
1 parent 59ba6a8 commit 8381e72

File tree

3 files changed

+101
-26
lines changed

3 files changed

+101
-26
lines changed

src/parseable/streams.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use crate::{
5252
metrics,
5353
option::Mode,
5454
storage::{object_storage::to_bytes, retention::Retention, StreamType},
55-
utils::minute_to_slot,
55+
utils::time::Minute,
5656
LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY,
5757
};
5858

@@ -166,7 +166,7 @@ impl Stream {
166166
"{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.data.{ARROW_FILE_EXTENSION}",
167167
parsed_timestamp.date(),
168168
parsed_timestamp.hour(),
169-
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
169+
Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY),
170170
custom_partition_values
171171
.iter()
172172
.sorted_by_key(|v| v.0)
@@ -890,7 +890,7 @@ mod tests {
890890
"{stream_hash}.date={}.hour={:02}.minute={}.{}.data.{ARROW_FILE_EXTENSION}",
891891
parsed_timestamp.date(),
892892
parsed_timestamp.hour(),
893-
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
893+
Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY),
894894
hostname::get().unwrap().into_string().unwrap()
895895
));
896896

@@ -924,7 +924,7 @@ mod tests {
924924
"{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.data.{ARROW_FILE_EXTENSION}",
925925
parsed_timestamp.date(),
926926
parsed_timestamp.hour(),
927-
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
927+
Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY),
928928
hostname::get().unwrap().into_string().unwrap()
929929
));
930930

src/utils/mod.rs

-17
Original file line numberDiff line numberDiff line change
@@ -35,23 +35,6 @@ use regex::Regex;
3535
use sha2::{Digest, Sha256};
3636
use tracing::debug;
3737

38-
/// Convert minutes to a slot range
39-
/// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19"
40-
pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option<String> {
41-
if minute >= 60 {
42-
return None;
43-
}
44-
45-
let block_n = minute / data_granularity;
46-
let block_start = block_n * data_granularity;
47-
if data_granularity == 1 {
48-
return Some(format!("{block_start:02}"));
49-
}
50-
51-
let block_end = (block_n + 1) * data_granularity - 1;
52-
Some(format!("{block_start:02}-{block_end:02}"))
53-
}
54-
5538
pub fn get_ingestor_id() -> String {
5639
let now = Utc::now().to_rfc3339();
5740
let id = get_hash(&now).to_string().split_at(15).0.to_string();

src/utils/time.rs

+97-5
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
*
1717
*/
1818

19-
use chrono::{DateTime, NaiveDate, TimeDelta, Timelike, Utc};
20-
21-
use super::minute_to_slot;
19+
use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeDelta, Timelike, Utc};
2220

2321
#[derive(Debug, thiserror::Error)]
2422
pub enum TimeParseError {
@@ -231,8 +229,8 @@ impl TimeRange {
231229
prefixes: &mut Vec<String>,
232230
) {
233231
let mut push_prefix = |block: u32| {
234-
if let Some(minute_slot) = minute_to_slot(block * data_granularity, data_granularity) {
235-
let prefix = format!("{hour_prefix}minute={minute_slot}/");
232+
if let Ok(minute) = Minute::try_from(block * data_granularity) {
233+
let prefix = format!("{hour_prefix}minute={}/", minute.to_slot(data_granularity));
236234
prefixes.push(prefix);
237235
}
238236
};
@@ -266,6 +264,61 @@ impl TimeRange {
266264
}
267265
}
268266

267+
/// Represents a minute value (0-59) and provides methods for converting it to a slot range.
268+
///
269+
/// # Examples
270+
///
271+
/// ```
272+
/// use crate::utils::time::Minute;
273+
///
274+
/// let minute = Minute::try_from(15).unwrap();
275+
/// assert_eq!(minute.to_slot(10), "10-19");
276+
/// ```
277+
#[derive(Debug, Clone, Copy)]
278+
pub struct Minute {
279+
block: u32,
280+
}
281+
282+
impl TryFrom<u32> for Minute {
283+
type Error = u32;
284+
285+
/// Returns a Minute if block is an acceptable minute value, else returns it as is
286+
fn try_from(block: u32) -> Result<Self, Self::Error> {
287+
if block >= 60 {
288+
return Err(block);
289+
}
290+
291+
Ok(Self { block })
292+
}
293+
}
294+
295+
impl From<NaiveDateTime> for Minute {
296+
fn from(timestamp: NaiveDateTime) -> Self {
297+
Self {
298+
block: timestamp.minute(),
299+
}
300+
}
301+
}
302+
303+
impl Minute {
304+
/// Convert minutes to a slot range
305+
/// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19"
306+
///
307+
/// ### PANICS
308+
/// If the provided `data_granularity` value isn't cleanly divisble from 60
309+
pub fn to_slot(self, data_granularity: u32) -> String {
310+
assert!(60 % data_granularity == 0);
311+
let block_n = self.block / data_granularity;
312+
let block_start = block_n * data_granularity;
313+
if data_granularity == 1 {
314+
return format!("{block_start:02}");
315+
}
316+
317+
let block_end = (block_n + 1) * data_granularity - 1;
318+
format!("{block_start:02}-{block_end:02}")
319+
}
320+
}
321+
269322
#[cfg(test)]
270323
mod tests {
271324
use super::*;
@@ -421,4 +474,43 @@ mod tests {
421474
let left = prefixes.iter().map(String::as_str).collect::<Vec<&str>>();
422475
assert_eq!(left.as_slice(), right);
423476
}
477+
478+
#[test]
479+
fn valid_minute_to_minute_slot() {
480+
let res = Minute::try_from(10);
481+
assert!(res.is_ok());
482+
assert_eq!(res.unwrap().to_slot(1), "10");
483+
}
484+
485+
#[test]
486+
fn invalid_minute() {
487+
assert!(Minute::try_from(100).is_err());
488+
}
489+
490+
#[test]
491+
fn minute_from_timestamp() {
492+
let timestamp =
493+
NaiveDateTime::parse_from_str("2025-01-01 02:03", "%Y-%m-%d %H:%M").unwrap();
494+
assert_eq!(Minute::from(timestamp).to_slot(1), "03");
495+
}
496+
497+
#[test]
498+
fn slot_5_min_from_timestamp() {
499+
let timestamp =
500+
NaiveDateTime::parse_from_str("2025-01-01 02:03", "%Y-%m-%d %H:%M").unwrap();
501+
assert_eq!(Minute::from(timestamp).to_slot(5), "00-04");
502+
}
503+
504+
#[test]
505+
fn slot_30_min_from_timestamp() {
506+
let timestamp =
507+
NaiveDateTime::parse_from_str("2025-01-01 02:33", "%Y-%m-%d %H:%M").unwrap();
508+
assert_eq!(Minute::from(timestamp).to_slot(30), "30-59");
509+
}
510+
511+
#[test]
512+
#[should_panic]
513+
fn illegal_slot_granularity() {
514+
Minute::try_from(0).unwrap().to_slot(40);
515+
}
424516
}

0 commit comments

Comments
 (0)