diff options
-rw-r--r-- | Cargo.lock | 48 | ||||
-rw-r--r-- | Cargo.toml | 7 | ||||
-rw-r--r-- | src/main.rs | 28 | ||||
-rw-r--r-- | src/ui.rs | 189 | ||||
-rw-r--r-- | templates/chat.html (renamed from templates/incoming.html) | 2 | ||||
-rw-r--r-- | templates/head.html | 6 | ||||
-rw-r--r-- | templates/settings_applied.html | 13 |
7 files changed, 246 insertions, 47 deletions
@@ -212,6 +212,7 @@ checksum = "1236b4b292f6c4d6dc34604bb5120d85c3fe1d1aa596bd5cc52ca054d13e7b9e" dependencies = [ "async-trait", "axum-core 0.4.3", + "base64", "bytes", "futures-util", "http 1.0.0", @@ -230,8 +231,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper", "tokio", + "tokio-tungstenite", "tower", "tower-layer", "tower-service", @@ -405,9 +408,11 @@ dependencies = [ "rf4463", "rppal", "serde", + "serde_json", "simple_logger", "sqlx", "tokio", + "tokio-tungstenite", "toml", "tonic", "tower-http", @@ -531,6 +536,12 @@ dependencies = [ ] [[package]] +name = "data-encoding" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" + +[[package]] name = "der" version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2419,6 +2430,18 @@ dependencies = [ ] [[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + +[[package]] name = "tokio-util" version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2611,6 +2634,25 @@ dependencies = [ ] [[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.0.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + +[[package]] name = "typenum" version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2682,6 +2724,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" [[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + +[[package]] name = "vcpkg" version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -9,14 +9,17 @@ license = "MIT" anyhow = "1.0" askama = { version = "0.12", features = ["with-axum"] } askama_axum = "0.4" -axum = "0.7" +axum = { version = "0.7", features = ["ws"] } +#axum-extra = "0.7" chrono = "0.4" simple_logger = "4.3" log = "0.4" serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" toml = "0.8" sqlx = { version = "0.7", features = [ "runtime-tokio-rustls", "sqlite"]} tokio = { version = "1", features = ["full"] } +tokio-tungstenite = "0.21" tower-http = { version = "0.5.0", features = ["fs"] } futures-core = "0.3" @@ -31,8 +34,6 @@ tonic = { version = "0.10", features = ["tls", "tls-roots"] } async-stream = "0.3" rand = "0.8" -# Websockets example in https://github.com/tokio-rs/axum/tree/main/examples/websockets -# tokio-tungstenite = "0.21" [[bin]] name = "fake-radio" diff --git a/src/main.rs b/src/main.rs index 6b79581..49ef58e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,7 @@ use anyhow::{anyhow, Context}; use log::{debug, info, warn, error}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, broadcast}; use radio::{RadioManager, MAX_PACKET_LEN}; mod db; @@ -10,10 +10,17 @@ mod radio; mod config; mod ui; +#[derive(Clone, serde::Serialize)] +struct WSChatMessage { + from: String, + message: String, +} + struct AppState { conf : config::Config, db : db::Database, transmit_queue : mpsc::Sender<Vec<u8>>, + ws_broadcast : broadcast::Sender<WSChatMessage>, start_time : chrono::DateTime<chrono::Utc>, } @@ -61,6 +68,7 @@ async fn main() -> std::io::Result<()> { conf : conf.clone(), db : db::Database::new().await, transmit_queue : packet_send.clone(), + ws_broadcast : broadcast::Sender::new(2), start_time : chrono::Utc::now(), })); @@ -115,11 +123,27 @@ async fn main() -> std::io::Result<()> { let mut buf = [0; MAX_PACKET_LEN]; match ham_cats::packet::Packet::fully_decode(&packet_data, &mut buf) { Ok(packet) => { + let (mut db, ws_broadcast) = { + let g = shared_state_receive.lock().unwrap(); + (g.db.clone(), g.ws_broadcast.clone()) + }; + if let Some(ident) = packet.identification() { debug!(" From {}-{}", ident.callsign, ident.ssid); + + let mut commentbuf = [0u8, 255]; + if let Ok(comment) = packet.comment(&mut commentbuf) { + let m = WSChatMessage { + from: format!("{}-{}", ident.callsign, ident.ssid), + message: comment.to_owned() + }; + match ws_broadcast.send(m) { + Ok(num) => debug!("Send WS message to {num}"), + Err(_) => debug!("No WS receivers currently"), + } + } } - let mut db = shared_state_receive.lock().unwrap().db.clone(); if let Err(e) = db.store_packet(&packet_data).await { warn!("Failed to write to sqlite: {}", e); } @@ -1,17 +1,22 @@ -use anyhow::{anyhow, Context}; +use std::ops::ControlFlow; +use std::net::SocketAddr; use std::str::FromStr; -use axum::Json; -use log::{info, warn}; -use serde::Deserialize; +use anyhow::{anyhow, Context}; use askama::Template; use axum::{ - extract::State, - routing::{get, post}, - Router, - response::Html, Form, + Json, + Router, + extract::State, + extract::{ws::{Message, WebSocket, WebSocketUpgrade}, ConnectInfo}, http::StatusCode, + response::Html, + response::IntoResponse, + routing::{get, post}, }; +use futures::{StreamExt, SinkExt}; +use log::{debug, info, warn}; +use serde::Deserialize; use tower_http::services::ServeDir; use ham_cats::{ @@ -25,7 +30,8 @@ use crate::SharedState; pub async fn serve(port: u16, shared_state: SharedState) { let app = Router::new() .route("/", get(dashboard)) - .route("/incoming", get(incoming)) + .route("/chat", get(chat)) + .route("/chat/ws", get(ws_handler)) .route("/send", get(send)) .route("/api/send_packet", post(post_packet)) .route("/settings", get(show_settings).post(post_settings)) @@ -40,9 +46,10 @@ pub async fn serve(port: u16, shared_state: SharedState) { #[derive(PartialEq)] enum ActivePage { Dashboard, - Incoming, + Chat, Send, Settings, + None, } #[derive(Template)] @@ -123,21 +130,110 @@ async fn dashboard(State(state): State<SharedState>) -> DashboardTemplate<'stati } #[derive(Template)] -#[template(path = "incoming.html")] -struct IncomingTemplate<'a> { +#[template(path = "chat.html")] +struct ChatTemplate<'a> { title: &'a str, page: ActivePage, conf: config::Config, } -async fn incoming(State(state): State<SharedState>) -> IncomingTemplate<'static> { - IncomingTemplate { - title: "Incoming", +async fn chat(State(state): State<SharedState>) -> ChatTemplate<'static> { + ChatTemplate { + title: "Chat", conf: state.lock().unwrap().conf.clone(), - page: ActivePage::Incoming, + page: ActivePage::Chat, } } +async fn ws_handler( + State(state): State<SharedState>, + ws: WebSocketUpgrade, + ConnectInfo(addr): ConnectInfo<SocketAddr>) -> impl IntoResponse { + info!("User at {addr} connected."); + let rx = state.lock().unwrap().ws_broadcast.subscribe(); + ws.on_upgrade(move |socket| handle_socket(socket, rx, addr)) +} + +async fn handle_socket( + mut socket: WebSocket, + mut rx: tokio::sync::broadcast::Receiver<crate::WSChatMessage>, + who: SocketAddr) { + if socket.send(Message::Ping(vec![1, 2, 3])).await.is_ok() { + info!("Pinged {who}..."); + } else { + info!("Could not ping {who}!"); + return; + } + let (mut sender, mut receiver) = socket.split(); + + let mut send_task = tokio::spawn(async move { + while let Ok(m) = rx.recv().await { + if let Ok(m_json) = serde_json::to_string(&m) { + if sender + .send(Message::Text(m_json)) + .await + .is_err() + { + return; + } + } + } + }); + + let mut recv_task = tokio::spawn(async move { + while let Some(Ok(msg)) = receiver.next().await { + if process_message(msg, who).is_break() { + break; + } + } + }); + + // If any one of the tasks exit, abort the other. + tokio::select! { + _rv_a = (&mut send_task) => { + recv_task.abort(); + }, + _rv_b = (&mut recv_task) => { + send_task.abort(); + } + } + + info!("Websocket context {who} destroyed"); +} + +fn process_message(msg: Message, who: SocketAddr) -> ControlFlow<(), ()> { + match msg { + Message::Text(t) => { + debug!(">>> {who} sent str: {t:?}"); + } + Message::Binary(d) => { + debug!(">>> {} sent {} bytes: {:?}", who, d.len(), d); + } + Message::Close(c) => { + if let Some(cf) = c { + debug!( + ">>> {} sent close with code {} and reason `{}`", + who, cf.code, cf.reason + ); + } else { + debug!(">>> {who} somehow sent close message without CloseFrame"); + } + return ControlFlow::Break(()); + } + + Message::Pong(v) => { + debug!(">>> {who} sent pong with {v:?}"); + } + // You should never need to manually handle Message::Ping, as axum's websocket library + // will do so for you automagically by replying with Pong and copying the v according to + // spec. But if you need the contents of the pings you can see them here. + Message::Ping(v) => { + debug!(">>> {who} sent ping with {v:?}"); + } + } + ControlFlow::Continue(()) +} + #[derive(Template)] #[template(path = "send.html")] struct SendTemplate<'a> { @@ -305,40 +401,57 @@ impl TryFrom<FormConfig> for config::Config { } } -async fn post_settings(State(state): State<SharedState>, Form(input): Form<FormConfig>) -> (StatusCode, Html<String>) { +#[derive(Template)] +#[template(path = "settings_applied.html")] +struct SettingsAppliedTemplate<'a> { + title: &'a str, + page: ActivePage, + conf: config::Config, + ok: bool, + error_message: &'a str, + error_reason: String, +} + +async fn post_settings( + State(state): State<SharedState>, + Form(input): Form<FormConfig>) -> (StatusCode, SettingsAppliedTemplate<'static>) { + match config::Config::try_from(input) { Ok(c) => { match c.store() { Ok(()) => { state.lock().unwrap().conf.clone_from(&c); - (StatusCode::OK, Html( - r#"<!doctype html> - <html><head></head><body> - <p>Configuration updated. If you enabled or disabled tunnel, please restart the cats-radio-node process.</p> - <p>To <a href="/">dashboard</a></p> - </body></html>"#.to_owned())) + (StatusCode::OK, SettingsAppliedTemplate { + title: "Settings", + conf: c, + page: ActivePage::None, + ok: true, + error_message: "", + error_reason: "".to_owned(), + }) } Err(e) => { - (StatusCode::INTERNAL_SERVER_ERROR, Html( - format!(r#"<!doctype html> - <html><head></head> - <body><p>Internal Server Error: Could not write config</p> - <p>{}</p> - </body> - </html>"#, e))) + (StatusCode::INTERNAL_SERVER_ERROR, SettingsAppliedTemplate { + title: "Settings", + conf : c, + page: ActivePage::None, + ok: false, + error_message: "Failed to store config", + error_reason: e.to_string(), + }) }, } - }, Err(e) => { - (StatusCode::BAD_REQUEST, Html( - format!(r#"<!doctype html> - <html><head></head> - <body><p>Error interpreting POST data</p> - <p>{}</p> - </body> - </html>"#, e))) + (StatusCode::BAD_REQUEST, SettingsAppliedTemplate { + title: "Settings", + conf: state.lock().unwrap().conf.clone(), + page: ActivePage::None, + ok: false, + error_message: "Error interpreting POST data", + error_reason: e.to_string(), + }) }, } } diff --git a/templates/incoming.html b/templates/chat.html index 2a43623..584bde5 100644 --- a/templates/incoming.html +++ b/templates/chat.html @@ -1,6 +1,6 @@ {% include "head.html" %} <div class="content"> - Incoming! + <h1>Chat</h1> </div> {% include "foot.html" %} {# vi:set et sw=2 ts=2: #} diff --git a/templates/head.html b/templates/head.html index 99a4bca..3c611ed 100644 --- a/templates/head.html +++ b/templates/head.html @@ -24,9 +24,9 @@ <i class="w-8 fa fa-home" aria-hidden="true"></i><span>Dashboard</span> </li> </a> - <a href="/incoming" class=""> - <li class="rounded-md mt-1 p-3 {% if page == ActivePage::Incoming %} bg-sky-200 text-sky-900 {% endif %} hover:bg-sky-300"> - <i class="w-8 fa fa-inbox" aria-hidden="true"></i><span>Incoming</span> + <a href="/chat" class=""> + <li class="rounded-md mt-1 p-3 {% if page == ActivePage::Chat %} bg-sky-200 text-sky-900 {% endif %} hover:bg-sky-300"> + <i class="w-8 fa fa-comments" aria-hidden="true"></i><span>Chat</span> </li> </a> <a href="/send" class=""> diff --git a/templates/settings_applied.html b/templates/settings_applied.html new file mode 100644 index 0000000..f536772 --- /dev/null +++ b/templates/settings_applied.html @@ -0,0 +1,13 @@ +{% include "head.html" %} +<div class="content"> + {% if ok %} + <h1>Configuration updated</h1> + <p>If you enabled or disabled tunnel, please restart the cats-radio-node process.</p> + {% else %} + <h1>Configuration update failed</h1> + <p>{{ error_message }}:</p> + <p>{{ error_reason }}:</p> + {% endif %} +</div> +{% include "foot.html" %} +{# vi:set et sw=2 ts=2: #} |