@@ -22,7 +22,25 @@ use tokio_tungstenite::{
2222
2323type Tx = UnboundedSender < Message > ;
2424pub type ConnectionMap = Arc < Mutex < HashMap < SocketAddr , Tx > > > ;
25- type MessageQueue = Arc < Mutex < VecDeque < DisplayMessage > > > ;
25+ type EventQueues = Arc < Mutex < Queues > > ;
26+
27+ static EVENT_QUEUE_ACTIVE : std:: sync:: atomic:: AtomicBool = std:: sync:: atomic:: AtomicBool :: new ( true ) ;
28+ static TTS_QUEUE_ACTIVE : std:: sync:: atomic:: AtomicBool = std:: sync:: atomic:: AtomicBool :: new ( false ) ;
29+ static IS_DISPLAYING : std:: sync:: atomic:: AtomicBool = std:: sync:: atomic:: AtomicBool :: new ( false ) ;
30+
31+ pub struct Queues {
32+ pub events : VecDeque < DisplayMessage > ,
33+ pub tts : VecDeque < DisplayMessage > ,
34+ }
35+
36+ impl Queues {
37+ pub fn new ( ) -> Queues {
38+ Queues {
39+ events : VecDeque :: new ( ) ,
40+ tts : VecDeque :: new ( ) ,
41+ }
42+ }
43+ }
2644
2745pub struct FrontendApi {
2846 ws_address : String ,
@@ -49,15 +67,32 @@ impl FrontendApi {
4967 println ! ( "Listening on: {}" , self . ws_address) ;
5068
5169 let connection_state = self . connection_state . clone ( ) ;
52- let message_queue_arc: MessageQueue = Arc :: new ( Mutex :: new ( VecDeque :: new ( ) ) ) ;
70+ let message_queue_arc: EventQueues = Arc :: new ( Mutex :: new ( Queues :: new ( ) ) ) ;
71+
72+ //TODO: Need to fetch un presented messages from database
5373
74+ let queue = message_queue_arc. clone ( ) ;
75+ let state = connection_state. clone ( ) ;
76+ // Listen for incoming events and store them in the queues
5477 tokio:: spawn ( async move {
5578 loop {
5679 let msg = ( & mut receiver) . recv ( ) . await ;
57- handle_message ( connection_state . clone ( ) , message_queue_arc . clone ( ) , msg) . await ;
80+ handle_message ( state . clone ( ) , queue . clone ( ) , msg) ;
5881 }
5982 } ) ;
6083
84+ // Process the Queues on a new thread
85+
86+ //tokio::spawn(async move {
87+ // loop {
88+ // let mut queues = message_queue_arc.lock().unwrap();
89+ // if !queues.events.is_empty() {
90+ // let message = queues.events.pop_front();
91+ // handle_message(connection_state.clone(), message_queue_arc.clone(), message);
92+ // }
93+ // }
94+ //});
95+
6196 let https_address = self . http_address . clone ( ) ;
6297 tokio:: spawn ( async move {
6398 let listener = TcpListener :: bind ( & https_address)
@@ -66,6 +101,7 @@ impl FrontendApi {
66101 // build our application
67102 let app = Router :: new ( )
68103 . route ( "/" , get ( index) )
104+ . route ( "/admin" , get ( admin) )
69105 //TODO: understand where to put our assets
70106 // Remember that these need served by nginx in production
71107 . nest_service ( "/assets" , ServeDir :: new ( "assets" ) ) ;
@@ -96,13 +132,21 @@ impl FrontendApi {
96132#[ template( path = "index.html" ) ]
97133struct IndexTemplate { }
98134
135+ #[ derive( askama:: Template ) ]
136+ #[ template( path = "admin.html" ) ]
137+ struct AdminTemplate { }
138+
99139async fn index ( ) -> IndexTemplate {
100140 IndexTemplate { }
101141}
102142
103- async fn handle_message (
143+ async fn admin ( ) -> AdminTemplate {
144+ AdminTemplate { }
145+ }
146+
147+ fn handle_message (
104148 connection_state : ConnectionMap ,
105- message_queue : MessageQueue ,
149+ event_queues : EventQueues ,
106150 message : Option < DisplayMessage > ,
107151) {
108152 match message {
@@ -113,14 +157,17 @@ async fn handle_message(
113157
114158 //Enqueue message
115159 {
116- let mut message_queue = message_queue. lock ( ) . unwrap ( ) ;
117- message_queue. push_back ( message. clone ( ) ) ;
160+ let mut queues = event_queues. lock ( ) . unwrap ( ) ;
161+ //TODO: need to handle different types of messages
162+
163+ queues. events . push_back ( message. clone ( ) ) ;
118164 }
119165
120166 //Make html message to send to frontend
121167 //<div id="alerts" hx-swap-oob="true">
168+ let trigger = format ! ( "delay:{}ms" , message. display_time) ;
122169 let html_message = html ! {
123- div id="alerts " hx-swap-oob= "true" {
170+ div id="notifications " hx-swap= "afterend" hx-target= "notifications" ws-send= "done" hx-trigger= ( trigger ) {
124171 h1 { ( message. message) }
125172 img src=( message. image_url) { }
126173 }
@@ -158,6 +205,7 @@ async fn handle_connection(
158205 state. lock ( ) . unwrap ( ) . insert ( peer, tx) ;
159206 }
160207 let ( mut ws_sender, mut ws_receiver) = ws_stream. split ( ) ;
208+ println ! ( "Connection state: {:?}" , state. lock( ) . unwrap( ) . keys( ) ) ;
161209 loop {
162210 tokio:: select! {
163211 msg = ws_receiver. next( ) => {
0 commit comments