diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2024-01-23 09:20:21 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2024-01-23 09:21:53 +0100 |
commit | 47f59cfe19e7d4cc43b4a19c746504016b0881d8 (patch) | |
tree | c19ad3199769a2daea1861d3e6ae612e40061ca7 /src | |
parent | 8671c1bd2f89dc83c4d495ae0b0e8c814661cc74 (diff) | |
download | cats-radio-node-47f59cfe19e7d4cc43b4a19c746504016b0881d8.tar.gz cats-radio-node-47f59cfe19e7d4cc43b4a19c746504016b0881d8.tar.bz2 cats-radio-node-47f59cfe19e7d4cc43b4a19c746504016b0881d8.zip |
Create settings applied template, replace incoming by chat
Diffstat (limited to 'src')
-rw-r--r-- | src/main.rs | 28 | ||||
-rw-r--r-- | src/ui.rs | 189 |
2 files changed, 177 insertions, 40 deletions
diff --git a/src/main.rs b/src/main.rs index 6b79581..49ef58e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,7 @@ use anyhow::{anyhow, Context}; use log::{debug, info, warn, error}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, broadcast}; use radio::{RadioManager, MAX_PACKET_LEN}; mod db; @@ -10,10 +10,17 @@ mod radio; mod config; mod ui; +#[derive(Clone, serde::Serialize)] +struct WSChatMessage { + from: String, + message: String, +} + struct AppState { conf : config::Config, db : db::Database, transmit_queue : mpsc::Sender<Vec<u8>>, + ws_broadcast : broadcast::Sender<WSChatMessage>, start_time : chrono::DateTime<chrono::Utc>, } @@ -61,6 +68,7 @@ async fn main() -> std::io::Result<()> { conf : conf.clone(), db : db::Database::new().await, transmit_queue : packet_send.clone(), + ws_broadcast : broadcast::Sender::new(2), start_time : chrono::Utc::now(), })); @@ -115,11 +123,27 @@ async fn main() -> std::io::Result<()> { let mut buf = [0; MAX_PACKET_LEN]; match ham_cats::packet::Packet::fully_decode(&packet_data, &mut buf) { Ok(packet) => { + let (mut db, ws_broadcast) = { + let g = shared_state_receive.lock().unwrap(); + (g.db.clone(), g.ws_broadcast.clone()) + }; + if let Some(ident) = packet.identification() { debug!(" From {}-{}", ident.callsign, ident.ssid); + + let mut commentbuf = [0u8, 255]; + if let Ok(comment) = packet.comment(&mut commentbuf) { + let m = WSChatMessage { + from: format!("{}-{}", ident.callsign, ident.ssid), + message: comment.to_owned() + }; + match ws_broadcast.send(m) { + Ok(num) => debug!("Send WS message to {num}"), + Err(_) => debug!("No WS receivers currently"), + } + } } - let mut db = shared_state_receive.lock().unwrap().db.clone(); if let Err(e) = db.store_packet(&packet_data).await { warn!("Failed to write to sqlite: {}", e); } @@ -1,17 +1,22 @@ -use anyhow::{anyhow, Context}; +use std::ops::ControlFlow; +use std::net::SocketAddr; use std::str::FromStr; -use axum::Json; -use log::{info, warn}; -use serde::Deserialize; +use anyhow::{anyhow, Context}; use askama::Template; use axum::{ - extract::State, - routing::{get, post}, - Router, - response::Html, Form, + Json, + Router, + extract::State, + extract::{ws::{Message, WebSocket, WebSocketUpgrade}, ConnectInfo}, http::StatusCode, + response::Html, + response::IntoResponse, + routing::{get, post}, }; +use futures::{StreamExt, SinkExt}; +use log::{debug, info, warn}; +use serde::Deserialize; use tower_http::services::ServeDir; use ham_cats::{ @@ -25,7 +30,8 @@ use crate::SharedState; pub async fn serve(port: u16, shared_state: SharedState) { let app = Router::new() .route("/", get(dashboard)) - .route("/incoming", get(incoming)) + .route("/chat", get(chat)) + .route("/chat/ws", get(ws_handler)) .route("/send", get(send)) .route("/api/send_packet", post(post_packet)) .route("/settings", get(show_settings).post(post_settings)) @@ -40,9 +46,10 @@ pub async fn serve(port: u16, shared_state: SharedState) { #[derive(PartialEq)] enum ActivePage { Dashboard, - Incoming, + Chat, Send, Settings, + None, } #[derive(Template)] @@ -123,21 +130,110 @@ async fn dashboard(State(state): State<SharedState>) -> DashboardTemplate<'stati } #[derive(Template)] -#[template(path = "incoming.html")] -struct IncomingTemplate<'a> { +#[template(path = "chat.html")] +struct ChatTemplate<'a> { title: &'a str, page: ActivePage, conf: config::Config, } -async fn incoming(State(state): State<SharedState>) -> IncomingTemplate<'static> { - IncomingTemplate { - title: "Incoming", +async fn chat(State(state): State<SharedState>) -> ChatTemplate<'static> { + ChatTemplate { + title: "Chat", conf: state.lock().unwrap().conf.clone(), - page: ActivePage::Incoming, + page: ActivePage::Chat, } } +async fn ws_handler( + State(state): State<SharedState>, + ws: WebSocketUpgrade, + ConnectInfo(addr): ConnectInfo<SocketAddr>) -> impl IntoResponse { + info!("User at {addr} connected."); + let rx = state.lock().unwrap().ws_broadcast.subscribe(); + ws.on_upgrade(move |socket| handle_socket(socket, rx, addr)) +} + +async fn handle_socket( + mut socket: WebSocket, + mut rx: tokio::sync::broadcast::Receiver<crate::WSChatMessage>, + who: SocketAddr) { + if socket.send(Message::Ping(vec![1, 2, 3])).await.is_ok() { + info!("Pinged {who}..."); + } else { + info!("Could not ping {who}!"); + return; + } + let (mut sender, mut receiver) = socket.split(); + + let mut send_task = tokio::spawn(async move { + while let Ok(m) = rx.recv().await { + if let Ok(m_json) = serde_json::to_string(&m) { + if sender + .send(Message::Text(m_json)) + .await + .is_err() + { + return; + } + } + } + }); + + let mut recv_task = tokio::spawn(async move { + while let Some(Ok(msg)) = receiver.next().await { + if process_message(msg, who).is_break() { + break; + } + } + }); + + // If any one of the tasks exit, abort the other. + tokio::select! { + _rv_a = (&mut send_task) => { + recv_task.abort(); + }, + _rv_b = (&mut recv_task) => { + send_task.abort(); + } + } + + info!("Websocket context {who} destroyed"); +} + +fn process_message(msg: Message, who: SocketAddr) -> ControlFlow<(), ()> { + match msg { + Message::Text(t) => { + debug!(">>> {who} sent str: {t:?}"); + } + Message::Binary(d) => { + debug!(">>> {} sent {} bytes: {:?}", who, d.len(), d); + } + Message::Close(c) => { + if let Some(cf) = c { + debug!( + ">>> {} sent close with code {} and reason `{}`", + who, cf.code, cf.reason + ); + } else { + debug!(">>> {who} somehow sent close message without CloseFrame"); + } + return ControlFlow::Break(()); + } + + Message::Pong(v) => { + debug!(">>> {who} sent pong with {v:?}"); + } + // You should never need to manually handle Message::Ping, as axum's websocket library + // will do so for you automagically by replying with Pong and copying the v according to + // spec. But if you need the contents of the pings you can see them here. + Message::Ping(v) => { + debug!(">>> {who} sent ping with {v:?}"); + } + } + ControlFlow::Continue(()) +} + #[derive(Template)] #[template(path = "send.html")] struct SendTemplate<'a> { @@ -305,40 +401,57 @@ impl TryFrom<FormConfig> for config::Config { } } -async fn post_settings(State(state): State<SharedState>, Form(input): Form<FormConfig>) -> (StatusCode, Html<String>) { +#[derive(Template)] +#[template(path = "settings_applied.html")] +struct SettingsAppliedTemplate<'a> { + title: &'a str, + page: ActivePage, + conf: config::Config, + ok: bool, + error_message: &'a str, + error_reason: String, +} + +async fn post_settings( + State(state): State<SharedState>, + Form(input): Form<FormConfig>) -> (StatusCode, SettingsAppliedTemplate<'static>) { + match config::Config::try_from(input) { Ok(c) => { match c.store() { Ok(()) => { state.lock().unwrap().conf.clone_from(&c); - (StatusCode::OK, Html( - r#"<!doctype html> - <html><head></head><body> - <p>Configuration updated. If you enabled or disabled tunnel, please restart the cats-radio-node process.</p> - <p>To <a href="/">dashboard</a></p> - </body></html>"#.to_owned())) + (StatusCode::OK, SettingsAppliedTemplate { + title: "Settings", + conf: c, + page: ActivePage::None, + ok: true, + error_message: "", + error_reason: "".to_owned(), + }) } Err(e) => { - (StatusCode::INTERNAL_SERVER_ERROR, Html( - format!(r#"<!doctype html> - <html><head></head> - <body><p>Internal Server Error: Could not write config</p> - <p>{}</p> - </body> - </html>"#, e))) + (StatusCode::INTERNAL_SERVER_ERROR, SettingsAppliedTemplate { + title: "Settings", + conf : c, + page: ActivePage::None, + ok: false, + error_message: "Failed to store config", + error_reason: e.to_string(), + }) }, } - }, Err(e) => { - (StatusCode::BAD_REQUEST, Html( - format!(r#"<!doctype html> - <html><head></head> - <body><p>Error interpreting POST data</p> - <p>{}</p> - </body> - </html>"#, e))) + (StatusCode::BAD_REQUEST, SettingsAppliedTemplate { + title: "Settings", + conf: state.lock().unwrap().conf.clone(), + page: ActivePage::None, + ok: false, + error_message: "Error interpreting POST data", + error_reason: e.to_string(), + }) }, } } |