Skip to content

Commit 2e5e4fd

Browse files
aw-transform: Add union_events_split
1 parent 7d55fca commit 2e5e4fd

File tree

2 files changed

+181
-0
lines changed

2 files changed

+181
-0
lines changed

aw-transform/src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,6 @@ pub use filter_period::filter_period_intersect;
4444

4545
mod split_url;
4646
pub use split_url::split_url_event;
47+
48+
mod union;
49+
pub use union::union_events_split;

aw-transform/src/union.rs

+178
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
use std::vec::Vec;
2+
3+
use serde_json::{Map, Value};
4+
5+
use aw_models::Event;
6+
7+
fn merge_value(a: &mut Value, b: &Value) {
8+
match (a, b) {
9+
(&mut Value::Object(ref mut a), &Value::Object(ref b)) => {
10+
for (kb, vb) in b {
11+
merge_value(a.entry(kb.clone()).or_insert(Value::Null), vb);
12+
}
13+
}
14+
(a, b) => {
15+
*a = b.clone();
16+
}
17+
}
18+
}
19+
20+
fn merge_map(map1: &mut Map<String, Value>, map2: &Map<String, Value>) {
21+
for (k1, mut v1) in map1.iter_mut() {
22+
if let Some(v2) = map2.get(k1) {
23+
merge_value(&mut v1, &v2);
24+
println!("{:?}", v1);
25+
}
26+
}
27+
for (k2, v2) in map2.iter() {
28+
if !map1.contains_key(k2) {
29+
map1.insert(k2.to_string(), v2.clone());
30+
}
31+
}
32+
}
33+
34+
/// events1 is the "master" list of events and if an event in events2
35+
/// intersects it the intersecting part will be removed from the original
36+
/// event and split into a new event and merges the data. It also differs from
37+
/// a normal intersection in that the part that does not intersect from the
38+
/// "master" events will still be kept, but if it also intersects that interval
39+
/// will be removed.
40+
///
41+
/// NOTE: It is technically only a union of the event1, not event2.
42+
/// Maybe we should improve that in the future?
43+
///
44+
/// Example:
45+
/// ```ignore
46+
/// |---------|--------------------|
47+
/// | events1 |[a ][b ] |
48+
/// | events2 | [c ] [d ]|
49+
/// | result |[a ][ac][bc][b ] |
50+
/// |---------|--------------------|
51+
/// ```
52+
pub fn union_events_split(events1: Vec<Event>, events2: &Vec<Event>) -> Vec<Event> {
53+
let mut events: Vec<Event> = Vec::new();
54+
55+
'event1: for mut event1 in events1 {
56+
let event1_endtime = event1.calculate_endtime();
57+
'event2: for event2 in events2 {
58+
// Check that events intersect, otherwise skip
59+
if event2.timestamp > event1_endtime {
60+
continue 'event2;
61+
}
62+
let event2_endtime = event2.calculate_endtime();
63+
if event2_endtime < event1.timestamp {
64+
continue 'event2;
65+
}
66+
// Find the events common intersection
67+
let intersect_timestamp = std::cmp::max(event1.timestamp, event2.timestamp);
68+
let intersect_endtime = std::cmp::min(event1_endtime, event2_endtime);
69+
let intersect_duration = intersect_endtime - intersect_timestamp;
70+
71+
// If event1 starts before event2, add that event
72+
if intersect_timestamp > event1.timestamp {
73+
let prepended_event = Event {
74+
id: None,
75+
timestamp: event1.timestamp,
76+
duration: intersect_timestamp - event1.timestamp,
77+
data: event1.data.clone(),
78+
};
79+
events.push(prepended_event);
80+
}
81+
82+
// Add intersecting event
83+
let mut intersect_data = event1.data.clone();
84+
merge_map(&mut intersect_data, &event2.data);
85+
let intersecting_event = Event {
86+
id: None,
87+
timestamp: intersect_timestamp,
88+
duration: intersect_duration,
89+
data: intersect_data,
90+
};
91+
events.push(intersecting_event);
92+
93+
// Update event1 to end at end of common event
94+
event1.timestamp = intersect_endtime;
95+
event1.duration = event1_endtime - intersect_endtime;
96+
if event1.duration.num_milliseconds() <= 0 {
97+
continue 'event1;
98+
}
99+
}
100+
events.push(event1);
101+
}
102+
103+
events
104+
}
105+
106+
#[cfg(test)]
107+
mod tests {
108+
use super::*;
109+
110+
use chrono::DateTime;
111+
use chrono::Duration;
112+
use serde_json::json;
113+
use std::str::FromStr;
114+
115+
#[test]
116+
fn test_merge_data() {
117+
/* test merge same */
118+
let mut d1 = json_map! {"test": json!(1)};
119+
let d2 = d1.clone();
120+
merge_map(&mut d1, &d2);
121+
assert_eq!(d1, d2);
122+
123+
/* test merge different keys */
124+
let mut d1 = json_map! {"test1": json!(1)};
125+
let d2 = json_map! {"test2": json!(2)};
126+
merge_map(&mut d1, &d2);
127+
assert_eq!(d1, json_map! {"test1": json!(1), "test2": json!(2)});
128+
129+
/* test merge intersecting objects */
130+
let mut d1 = json_map! {"test": json_map!{"a": json!(1)}};
131+
let d2 = json_map! {"test": json_map!{"b": json!(2)}};
132+
merge_map(&mut d1, &d2);
133+
assert_eq!(
134+
d1,
135+
json_map! {"test": json_map!{"a": json!(1), "b": json!(2)}}
136+
);
137+
138+
/* test non-object conflict, prefer map1 value */
139+
// TODO: This does not work yet!
140+
// It should be a pretty rare use-case anyway
141+
/*
142+
let mut d1 = json_map!{"test": json!(1)};
143+
let d1_orig = d1.clone();
144+
let d2 = json_map!{"test": json!(2)};
145+
merge_map(&mut d1, &d2);
146+
assert_eq!(d1, d1_orig);
147+
*/
148+
}
149+
150+
#[test]
151+
fn test_union_events_split() {
152+
// Test intersection, before and after
153+
let e1 = Event {
154+
id: None,
155+
timestamp: DateTime::from_str("2000-01-01T00:00:00Z").unwrap(),
156+
duration: Duration::seconds(3),
157+
data: json_map! {"test": json!(1)},
158+
};
159+
let mut e2 = e1.clone();
160+
e2.timestamp = DateTime::from_str("2000-01-01T00:00:01Z").unwrap();
161+
e2.duration = Duration::seconds(1);
162+
163+
let res = union_events_split(vec![e1.clone()], &vec![e2.clone()]);
164+
assert_eq!(res.len(), 3);
165+
assert_eq!(res[0].id, None);
166+
assert_eq!(res[0].timestamp, e1.timestamp);
167+
assert_eq!(res[0].duration, Duration::seconds(1));
168+
assert_eq!(res[0].data, json_map! {"test": json!(1)});
169+
assert_eq!(res[1].id, None);
170+
assert_eq!(res[1].timestamp, e2.timestamp);
171+
assert_eq!(res[1].duration, Duration::seconds(1));
172+
assert_eq!(res[1].data, json_map! {"test": json!(1)});
173+
assert_eq!(res[2].id, None);
174+
assert_eq!(res[2].timestamp, e2.timestamp + e2.duration);
175+
assert_eq!(res[2].duration, Duration::seconds(1));
176+
assert_eq!(res[2].data, json_map! {"test": json!(1)});
177+
}
178+
}

0 commit comments

Comments
 (0)