11use axum:: { routing:: get, Router } ;
22use futures_channel:: mpsc:: { unbounded, UnboundedSender } ;
3+ use futures_util:: stream:: SelectNextSome ;
34use futures_util:: { SinkExt , StreamExt } ;
45use maud:: { html, Markup } ;
56use messages:: DisplayMessage ;
@@ -26,7 +27,6 @@ type EventQueues = Arc<Mutex<Queues>>;
2627
2728static EVENT_QUEUE_ACTIVE : std:: sync:: atomic:: AtomicBool = std:: sync:: atomic:: AtomicBool :: new ( true ) ;
2829static TTS_QUEUE_ACTIVE : std:: sync:: atomic:: AtomicBool = std:: sync:: atomic:: AtomicBool :: new ( false ) ;
29- static IS_DISPLAYING : std:: sync:: atomic:: AtomicBool = std:: sync:: atomic:: AtomicBool :: new ( false ) ;
3030
3131pub struct Queues {
3232 pub events : VecDeque < DisplayMessage > ,
@@ -77,21 +77,67 @@ impl FrontendApi {
7777 tokio:: spawn ( async move {
7878 loop {
7979 let msg = ( & mut receiver) . recv ( ) . await ;
80- handle_message ( state. clone ( ) , queue. clone ( ) , msg) ;
80+ handle_message ( state. clone ( ) , queue. clone ( ) , msg) . await ;
8181 }
8282 } ) ;
8383
8484 // Process the Queues on a new thread
8585
86- //tokio::spawn(async move {
87- // loop {
88- // let mut queues = message_queue_arc.lock().unwrap();
89- // if !queues.events.is_empty() {
90- // let message = queues.events.pop_front();
91- // handle_message(connection_state.clone(), message_queue_arc.clone(), message);
92- // }
93- // }
94- //});
86+ let queue_connection_state = connection_state. clone ( ) ;
87+ tokio:: spawn ( async move {
88+ loop {
89+ let message = {
90+ let mut queues = message_queue_arc. lock ( ) . unwrap ( ) ;
91+ queues. events . pop_front ( )
92+ } ;
93+
94+ let Some ( message) = message else {
95+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 500 ) ) . await ;
96+ continue ;
97+ } ;
98+ //Make html message to send to frontend
99+ //<div id="alerts" hx-swap-oob="true">
100+ let html_message = html ! {
101+ div id="notifications" hx-swap="afterend" hx-target="notifications" {
102+ h1 { ( message. message) }
103+ img src=( message. image_url) { }
104+ }
105+ } ;
106+ {
107+ let mut websocket_state = queue_connection_state. lock ( ) . unwrap ( ) ;
108+ for ( & addr, tx) in websocket_state. iter_mut ( ) {
109+ if tx
110+ . unbounded_send ( Message :: Text ( html_message. clone ( ) . into ( ) ) )
111+ . is_err ( )
112+ {
113+ println ! ( "closing websocket message to: {} ==========" , addr) ;
114+ }
115+ }
116+ }
117+
118+ //Pause for a bit to allow the message to be displayed
119+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 5000 ) ) . await ;
120+
121+ let html_message = html ! {
122+ div id="notifications" hx-swap="delete" hx-target="notifications" {
123+ }
124+ } ;
125+ {
126+ let mut websocket_state = queue_connection_state. lock ( ) . unwrap ( ) ;
127+ for ( & addr, tx) in websocket_state. iter_mut ( ) {
128+ if tx
129+ . unbounded_send ( Message :: Text ( html_message. clone ( ) . into ( ) ) )
130+ . is_err ( )
131+ {
132+ println ! ( "closing websocket message to: {} ==========" , addr) ;
133+ }
134+ }
135+ }
136+
137+ //Pause a bit before running queue again
138+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 500 ) ) . await ;
139+ }
140+ } ) ;
95141
96142 let https_address = self . http_address . clone ( ) ;
97143 tokio:: spawn ( async move {
@@ -144,7 +190,7 @@ async fn admin() -> AdminTemplate {
144190 AdminTemplate { }
145191}
146192
147- fn handle_message (
193+ async fn handle_message (
148194 connection_state : ConnectionMap ,
149195 event_queues : EventQueues ,
150196 message : Option < DisplayMessage > ,
@@ -162,22 +208,6 @@ fn handle_message(
162208
163209 queues. events . push_back ( message. clone ( ) ) ;
164210 }
165-
166- //Make html message to send to frontend
167- //<div id="alerts" hx-swap-oob="true">
168- let trigger = format ! ( "delay:{}ms" , message. display_time) ;
169- let html_message = html ! {
170- div id="notifications" hx-swap="afterend" hx-target="notifications" ws-send="done" hx-trigger=( trigger) {
171- h1 { ( message. message) }
172- img src=( message. image_url) { }
173- }
174- } ;
175- if tx
176- . unbounded_send ( Message :: Text ( html_message. clone ( ) . into ( ) ) )
177- . is_err ( )
178- {
179- println ! ( "closing websocket message to: {} ==========" , addr) ;
180- }
181211 }
182212 }
183213 None => panic ! ( "Error receiving message" ) ,
0 commit comments