7
7
8
8
use std:: thread:: JoinHandle ;
9
9
use std:: sync:: mpsc:: { Receiver , Sender } ;
10
+ use std:: time:: { Duration , SystemTime } ;
10
11
use log:: { extend_and_hash, hash, Entry , Event , Sha256Hash } ;
11
12
12
13
pub struct Historian {
@@ -20,29 +21,48 @@ pub enum ExitReason {
20
21
RecvDisconnected ,
21
22
SendDisconnected ,
22
23
}
24
+ fn log_event (
25
+ sender : & Sender < Entry > ,
26
+ num_hashes : & mut u64 ,
27
+ end_hash : & mut Sha256Hash ,
28
+ event : Event ,
29
+ ) -> Result < ( ) , ( Entry , ExitReason ) > {
30
+ if let Event :: UserDataKey ( key) = event {
31
+ * end_hash = extend_and_hash ( end_hash, & key) ;
32
+ }
33
+ let entry = Entry {
34
+ end_hash : * end_hash,
35
+ num_hashes : * num_hashes,
36
+ event,
37
+ } ;
38
+ if let Err ( _) = sender. send ( entry. clone ( ) ) {
39
+ return Err ( ( entry, ExitReason :: SendDisconnected ) ) ;
40
+ }
41
+ * num_hashes = 0 ;
42
+ Ok ( ( ) )
43
+ }
23
44
24
45
fn log_events (
25
46
receiver : & Receiver < Event > ,
26
47
sender : & Sender < Entry > ,
27
48
num_hashes : & mut u64 ,
28
49
end_hash : & mut Sha256Hash ,
50
+ epoch : SystemTime ,
51
+ num_ticks : & mut u64 ,
52
+ ms_per_tick : Option < u64 > ,
29
53
) -> Result < ( ) , ( Entry , ExitReason ) > {
30
54
use std:: sync:: mpsc:: TryRecvError ;
31
55
loop {
56
+ if let Some ( ms) = ms_per_tick {
57
+ let now = SystemTime :: now ( ) ;
58
+ if now > epoch + Duration :: from_millis ( ( * num_ticks + 1 ) * ms) {
59
+ log_event ( sender, num_hashes, end_hash, Event :: Tick ) ?;
60
+ * num_ticks += 1 ;
61
+ }
62
+ }
32
63
match receiver. try_recv ( ) {
33
64
Ok ( event) => {
34
- if let Event :: UserDataKey ( key) = event {
35
- * end_hash = extend_and_hash ( end_hash, & key) ;
36
- }
37
- let entry = Entry {
38
- end_hash : * end_hash,
39
- num_hashes : * num_hashes,
40
- event,
41
- } ;
42
- if let Err ( _) = sender. send ( entry. clone ( ) ) {
43
- return Err ( ( entry, ExitReason :: SendDisconnected ) ) ;
44
- }
45
- * num_hashes = 0 ;
65
+ log_event ( sender, num_hashes, end_hash, event) ?;
46
66
}
47
67
Err ( TryRecvError :: Empty ) => {
48
68
return Ok ( ( ) ) ;
@@ -63,15 +83,26 @@ fn log_events(
63
83
/// sending back Entry messages until either the receiver or sender channel is closed.
64
84
pub fn create_logger (
65
85
start_hash : Sha256Hash ,
86
+ ms_per_tick : Option < u64 > ,
66
87
receiver : Receiver < Event > ,
67
88
sender : Sender < Entry > ,
68
89
) -> JoinHandle < ( Entry , ExitReason ) > {
69
90
use std:: thread;
70
91
thread:: spawn ( move || {
71
92
let mut end_hash = start_hash;
72
93
let mut num_hashes = 0 ;
94
+ let mut num_ticks = 0 ;
95
+ let epoch = SystemTime :: now ( ) ;
73
96
loop {
74
- if let Err ( err) = log_events ( & receiver, & sender, & mut num_hashes, & mut end_hash) {
97
+ if let Err ( err) = log_events (
98
+ & receiver,
99
+ & sender,
100
+ & mut num_hashes,
101
+ & mut end_hash,
102
+ epoch,
103
+ & mut num_ticks,
104
+ ms_per_tick,
105
+ ) {
75
106
return err;
76
107
}
77
108
end_hash = hash ( & end_hash) ;
@@ -81,11 +112,11 @@ pub fn create_logger(
81
112
}
82
113
83
114
impl Historian {
84
- pub fn new ( start_hash : & Sha256Hash ) -> Self {
115
+ pub fn new ( start_hash : & Sha256Hash , ms_per_tick : Option < u64 > ) -> Self {
85
116
use std:: sync:: mpsc:: channel;
86
117
let ( sender, event_receiver) = channel ( ) ;
87
118
let ( entry_sender, receiver) = channel ( ) ;
88
- let thread_hdl = create_logger ( * start_hash, event_receiver, entry_sender) ;
119
+ let thread_hdl = create_logger ( * start_hash, ms_per_tick , event_receiver, entry_sender) ;
89
120
Historian {
90
121
sender,
91
122
receiver,
@@ -98,14 +129,13 @@ impl Historian {
98
129
mod tests {
99
130
use super :: * ;
100
131
use log:: * ;
132
+ use std:: thread:: sleep;
133
+ use std:: time:: Duration ;
101
134
102
135
#[ test]
103
136
fn test_historian ( ) {
104
- use std:: thread:: sleep;
105
- use std:: time:: Duration ;
106
-
107
137
let zero = Sha256Hash :: default ( ) ;
108
- let hist = Historian :: new ( & zero) ;
138
+ let hist = Historian :: new ( & zero, None ) ;
109
139
110
140
hist. sender . send ( Event :: Tick ) . unwrap ( ) ;
111
141
sleep ( Duration :: new ( 0 , 1_000_000 ) ) ;
@@ -129,12 +159,30 @@ mod tests {
129
159
#[ test]
130
160
fn test_historian_closed_sender ( ) {
131
161
let zero = Sha256Hash :: default ( ) ;
132
- let hist = Historian :: new ( & zero) ;
162
+ let hist = Historian :: new ( & zero, None ) ;
133
163
drop ( hist. receiver ) ;
134
164
hist. sender . send ( Event :: Tick ) . unwrap ( ) ;
135
165
assert_eq ! (
136
166
hist. thread_hdl. join( ) . unwrap( ) . 1 ,
137
167
ExitReason :: SendDisconnected
138
168
) ;
139
169
}
170
+
171
+ #[ test]
172
+ fn test_ticking_historian ( ) {
173
+ let zero = Sha256Hash :: default ( ) ;
174
+ let hist = Historian :: new ( & zero, Some ( 20 ) ) ;
175
+ sleep ( Duration :: from_millis ( 30 ) ) ;
176
+ hist. sender . send ( Event :: UserDataKey ( zero) ) . unwrap ( ) ;
177
+ sleep ( Duration :: from_millis ( 15 ) ) ;
178
+ drop ( hist. sender ) ;
179
+ assert_eq ! (
180
+ hist. thread_hdl. join( ) . unwrap( ) . 1 ,
181
+ ExitReason :: RecvDisconnected
182
+ ) ;
183
+
184
+ let entries: Vec < Entry > = hist. receiver . iter ( ) . collect ( ) ;
185
+ assert ! ( entries. len( ) > 1 ) ;
186
+ assert ! ( verify_slice( & entries, & zero) ) ;
187
+ }
140
188
}
0 commit comments