Skip to content

Commit 31992c5

Browse files
committed
fixed websocket duplication bug
made button swap play/pause
1 parent 69f0ead commit 31992c5

File tree

5 files changed

+187
-76
lines changed

5 files changed

+187
-76
lines changed

frontend_api/src/lib.rs

Lines changed: 44 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
use axum::{routing::get, Router};
2-
use futures_channel::mpsc::{unbounded, UnboundedSender};
3-
use futures_util::stream::SelectNextSome;
2+
use futures_channel::mpsc::unbounded;
43
use futures_util::{SinkExt, StreamExt};
5-
use maud::{html, Markup};
4+
use maud::html;
65
use messages::DisplayMessage;
7-
use std::collections::VecDeque;
86
use std::net::SocketAddr;
97
use std::{
108
collections::HashMap,
@@ -21,26 +19,11 @@ use tokio_tungstenite::{
2119
tungstenite::{Error, Message, Result},
2220
};
2321

24-
type Tx = UnboundedSender<Message>;
25-
pub type ConnectionMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;
26-
type EventQueues = Arc<Mutex<Queues>>;
22+
mod routes;
23+
mod types;
24+
use routes::{admin, index};
2725

28-
static EVENT_QUEUE_ACTIVE: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(true);
29-
static TTS_QUEUE_ACTIVE: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
30-
31-
pub struct Queues {
32-
pub events: VecDeque<DisplayMessage>,
33-
pub tts: VecDeque<DisplayMessage>,
34-
}
35-
36-
impl Queues {
37-
pub fn new() -> Queues {
38-
Queues {
39-
events: VecDeque::new(),
40-
tts: VecDeque::new(),
41-
}
42-
}
43-
}
26+
use crate::types::{ConnectionMap, EventQueues, Queues};
4427

4528
pub struct FrontendApi {
4629
ws_address: String,
@@ -82,19 +65,26 @@ impl FrontendApi {
8265
});
8366

8467
// Process the Queues on a new thread
85-
8668
let queue_connection_state = connection_state.clone();
69+
let event_queue = message_queue_arc.clone();
8770
tokio::spawn(async move {
8871
loop {
72+
let active = types::EVENT_QUEUE_ACTIVE.load(std::sync::atomic::Ordering::SeqCst);
73+
if !active {
74+
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
75+
continue;
76+
}
77+
8978
let message = {
90-
let mut queues = message_queue_arc.lock().unwrap();
91-
queues.events.pop_front()
79+
let mut queues = event_queue.lock().unwrap();
80+
queues.unpublished_events.pop_front()
9281
};
9382

9483
let Some(message) = message else {
9584
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
9685
continue;
9786
};
87+
9888
//Make html message to send to frontend
9989
//<div id="alerts" hx-swap-oob="true">
10090
let html_message = html! {
@@ -103,6 +93,10 @@ impl FrontendApi {
10393
img src=(message.image_url) {}
10494
}
10595
};
96+
97+
let mut bad_websockets = vec![];
98+
99+
//Send message to all connected websockets
106100
{
107101
let mut websocket_state = queue_connection_state.lock().unwrap();
108102
for (&addr, tx) in websocket_state.iter_mut() {
@@ -111,6 +105,7 @@ impl FrontendApi {
111105
.is_err()
112106
{
113107
println!("closing websocket message to: {} ==========", addr);
108+
bad_websockets.push(addr);
114109
}
115110
}
116111
}
@@ -130,6 +125,7 @@ impl FrontendApi {
130125
.is_err()
131126
{
132127
println!("closing websocket message to: {} ==========", addr);
128+
bad_websockets.push(addr);
133129
}
134130
}
135131
}
@@ -140,6 +136,7 @@ impl FrontendApi {
140136
});
141137

142138
let https_address = self.http_address.clone();
139+
let event_queues = message_queue_arc.clone();
143140
tokio::spawn(async move {
144141
let listener = TcpListener::bind(&https_address)
145142
.await
@@ -148,9 +145,16 @@ impl FrontendApi {
148145
let app = Router::new()
149146
.route("/", get(index))
150147
.route("/admin", get(admin))
148+
.route("/events/latest", get(routes::get_latest_unpublished_events))
149+
.route("/tts", get(routes::get_all_events_in_queue))
150+
.route("/events", get(routes::get_all_events_in_queue))
151+
.route("/events/latest/all", get(routes::get_latest_events))
152+
.route("/events/pause", get(routes::pause_events))
153+
.route("/events/start", get(routes::resume_events))
151154
//TODO: understand where to put our assets
152155
// Remember that these need served by nginx in production
153-
.nest_service("/assets", ServeDir::new("assets"));
156+
.nest_service("/assets", ServeDir::new("assets"))
157+
.with_state(event_queues.clone());
154158

155159
// run it
156160
axum::serve(listener, app).await.unwrap();
@@ -174,40 +178,22 @@ impl FrontendApi {
174178
}
175179
}
176180

177-
#[derive(askama::Template)]
178-
#[template(path = "index.html")]
179-
struct IndexTemplate {}
180-
181-
#[derive(askama::Template)]
182-
#[template(path = "admin.html")]
183-
struct AdminTemplate {}
184-
185-
async fn index() -> IndexTemplate {
186-
IndexTemplate {}
187-
}
188-
189-
async fn admin() -> AdminTemplate {
190-
AdminTemplate {}
191-
}
192-
193181
async fn handle_message(
194182
connection_state: ConnectionMap,
195183
event_queues: EventQueues,
196184
message: Option<DisplayMessage>,
197185
) {
198186
match message {
199187
Some(message) => {
200-
let mut state2 = connection_state.lock().unwrap();
201-
for (&addr, tx) in state2.iter_mut() {
202-
println!("Sending message to: {}", addr);
188+
let mut queues = event_queues.lock().unwrap();
203189

204-
//Enqueue message
205-
{
206-
let mut queues = event_queues.lock().unwrap();
207-
//TODO: need to handle different types of messages
190+
//TODO: Store different types of messages in different queues
191+
queues.unpublished_events.push_back(message.clone());
208192

209-
queues.events.push_back(message.clone());
210-
}
193+
//add to latest events, remove oldest if over 10
194+
queues.latest_events.push_back(message.clone());
195+
if queues.latest_events.len() > 10 {
196+
queues.latest_events.pop_front();
211197
}
212198
}
213199
None => panic!("Error receiving message"),
@@ -247,6 +233,7 @@ async fn handle_connection(
247233
println!("Received a message from {}: {}", peer, msg.to_text()?);
248234
ws_sender.send(msg).await?;
249235
} else if msg.is_close() {
236+
println!("Issue with connection: {}", peer);
250237
break;
251238
}
252239
}
@@ -258,7 +245,11 @@ async fn handle_connection(
258245
msg = rx.next() => {
259246
let msg = msg.unwrap();
260247
println!("Sending message to {}: {}", peer, msg.to_text()?);
261-
ws_sender.send(msg).await?;
248+
let res = ws_sender.send(msg).await;
249+
if res.is_err() {
250+
println!("Error sending message to {}", peer);
251+
break;
252+
}
262253
}
263254
}
264255
}

frontend_api/src/main.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ async fn main() {
2020

2121
//TODO: write some test something to send a message to the receiver
2222

23+
let mut count = 0;
2324
loop {
24-
println!("Sending message");
25+
count += 1;
2526
let display_message = messages::DisplayMessage {
26-
message: "hello from htmx".to_string(),
27+
message: format!("hello from htmx {}", count),
2728
image_url: "".to_string(),
2829
sound_url: "".to_string(),
2930
display_time: 5000,
@@ -35,6 +36,6 @@ async fn main() {
3536

3637
let _ = tx.send(display_message).unwrap();
3738

38-
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
39+
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
3940
}
4041
}

frontend_api/src/routes.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
use crate::types::EventQueues;
2+
use axum::{extract::State, http::StatusCode};
3+
use maud::{html, Markup};
4+
5+
#[derive(askama::Template)]
6+
#[template(path = "index.html")]
7+
pub struct IndexTemplate {}
8+
9+
#[derive(askama::Template)]
10+
#[template(path = "admin.html")]
11+
pub struct AdminTemplate {
12+
pub enabled: bool,
13+
}
14+
15+
pub async fn index() -> IndexTemplate {
16+
IndexTemplate {}
17+
}
18+
19+
pub async fn admin() -> AdminTemplate {
20+
AdminTemplate {
21+
enabled: crate::types::EVENT_QUEUE_ACTIVE.load(std::sync::atomic::Ordering::SeqCst),
22+
}
23+
}
24+
25+
pub async fn get_latest_unpublished_events(
26+
State(queues): State<EventQueues>,
27+
) -> Result<Markup, (StatusCode, String)> {
28+
let queues = queues.lock().unwrap();
29+
let range = if queues.unpublished_events.len() < 10 {
30+
0..queues.unpublished_events.len()
31+
} else {
32+
0..10
33+
};
34+
let events = queues.unpublished_events.range(range);
35+
36+
Ok(html! {
37+
ul class="waiting" {
38+
@for event in events {
39+
li { (event.message) }
40+
}
41+
}
42+
})
43+
}
44+
45+
pub async fn get_latest_events(
46+
State(queues): State<EventQueues>,
47+
) -> Result<Markup, (StatusCode, String)> {
48+
let queues = queues.lock().unwrap();
49+
let events = queues.latest_events.clone();
50+
Ok(html! {
51+
ul {
52+
@for event in events {
53+
li { (event.message) }
54+
}
55+
}
56+
})
57+
}
58+
59+
pub async fn pause_events() -> Result<Markup, (StatusCode, String)> {
60+
crate::types::EVENT_QUEUE_ACTIVE.store(false, std::sync::atomic::Ordering::SeqCst);
61+
Ok(html! {
62+
button id="event-queue-toggle" hx-get="/events/start" hx-swap="outerHTML" hx-target="#event-queue-toggle" { "Start" }
63+
})
64+
}
65+
66+
pub async fn resume_events() -> Result<Markup, (StatusCode, String)> {
67+
crate::types::EVENT_QUEUE_ACTIVE.store(true, std::sync::atomic::Ordering::SeqCst);
68+
Ok(html! {
69+
button id="event-queue-toggle" hx-get="/events/pause" hx-swap="outerHTML" hx-target="#event-queue-toggle" { "Pause" }
70+
})
71+
}
72+
73+
pub async fn get_all_events_in_queue(
74+
State(queues): State<EventQueues>,
75+
) -> Result<Markup, (StatusCode, String)> {
76+
let queues = queues.lock().unwrap();
77+
let events = queues.unpublished_events.clone();
78+
Ok(html! {
79+
ul {
80+
@for event in events {
81+
li { (event.message) }
82+
}
83+
}
84+
})
85+
}

frontend_api/src/types.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use futures_channel::mpsc::UnboundedSender;
2+
use messages::DisplayMessage;
3+
use std::{
4+
collections::{HashMap, VecDeque},
5+
net::SocketAddr,
6+
sync::{Arc, Mutex},
7+
};
8+
use tokio_tungstenite::tungstenite::Message;
9+
pub type Tx = UnboundedSender<Message>;
10+
pub type ConnectionMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;
11+
pub type EventQueues = Arc<Mutex<Queues>>;
12+
13+
pub struct Queues {
14+
pub unpublished_events: VecDeque<DisplayMessage>,
15+
pub tts: VecDeque<DisplayMessage>,
16+
pub latest_events: VecDeque<DisplayMessage>,
17+
pub last_sub: Option<DisplayMessage>,
18+
}
19+
20+
pub static EVENT_QUEUE_ACTIVE: std::sync::atomic::AtomicBool =
21+
std::sync::atomic::AtomicBool::new(true);
22+
23+
impl Queues {
24+
pub fn new() -> Queues {
25+
Queues {
26+
unpublished_events: VecDeque::new(),
27+
tts: VecDeque::new(),
28+
latest_events: VecDeque::new(),
29+
last_sub: None,
30+
}
31+
}
32+
}

frontend_api/templates/admin.html

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,34 @@
1515
<main class="flex flex-row justify-center w-full">
1616
<!-- Should display 2 queues with buttons at the bottom -->
1717
<div>
18-
<div class="queue">
19-
<h1>Events</h1>
20-
<div class="">
21-
<h1>TODO</h1>
22-
<div id="todo-queue"></div>
18+
<div class="queue" hx-get="/events/latest/all" hx-target="#latest" hx-swap="innerHTML"
19+
hx-trigger="every 1s">
20+
<h1>Last Events</h1>
21+
<li id="latest"></li>
22+
<div class="button-holder">
2323
</div>
24-
<div class="">
25-
<h1>DOING</h1>
26-
<div id="doing-queue"></div>
24+
</div>
25+
<div class="queue" hx-get="/events/latest" hx-target="#waiting" hx-swap="innerHTML"
26+
hx-trigger="every 2s">
27+
<h1>In Queue</h1>
28+
<li id="waiting"></li>
29+
<div class="button-holder">
30+
{%- if enabled %}
31+
<button id="event-queue-toggle" hx-get="/events/pause" hx-swap="outerHTML"
32+
hx-target="#event-queue-toggle">Stop</button>
33+
{% else %}
34+
<button id="event-queue-toggle" hx-get="/events/start" hx-swap="outerHTML"
35+
hx-target="#event-queue-toggle">Start</button>
36+
{% endif %}
2737
</div>
2838
</div>
29-
<div class="queue">
39+
<div class="queue" hx-get="/tts" hx-swap="innerHTML" hx-target="tts" hx-trigger="every 2s">
3040
<h1>TTS</h1>
31-
<div>
32-
<h1>DONE</h1>
33-
<div id=""></div>
34-
</div>
35-
<div class="">
36-
<h1>ARCHIVED</h1>
37-
<div id="archived-queue"></div>
41+
<li id="tts"></li>
42+
<div class="button-holder">
43+
<button id="play-next" hx-swap="none">Play Next</button>
3844
</div>
3945
</div>
40-
<div class="">
41-
<button id="event-queue-start-stop">Stop</button>
42-
<button id="tts">Play Next TTS</button>
43-
</div>
4446
</div>
4547
<!-- Should display notifications and alerts -->
4648
<div hx-ext="ws" ws-connect="ws://localhost:9000/">

0 commit comments

Comments
 (0)