From 47f59cfe19e7d4cc43b4a19c746504016b0881d8 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 23 Jan 2024 09:20:21 +0100 Subject: Create settings applied template, replace incoming by chat --- Cargo.lock | 48 ++++++++++ Cargo.toml | 7 +- src/main.rs | 28 +++++- src/ui.rs | 189 ++++++++++++++++++++++++++++++++-------- templates/chat.html | 6 ++ templates/head.html | 6 +- templates/incoming.html | 6 -- templates/settings_applied.html | 13 +++ 8 files changed, 251 insertions(+), 52 deletions(-) create mode 100644 templates/chat.html delete mode 100644 templates/incoming.html create mode 100644 templates/settings_applied.html diff --git a/Cargo.lock b/Cargo.lock index dff1ed8..debd451 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -212,6 +212,7 @@ checksum = "1236b4b292f6c4d6dc34604bb5120d85c3fe1d1aa596bd5cc52ca054d13e7b9e" dependencies = [ "async-trait", "axum-core 0.4.3", + "base64", "bytes", "futures-util", "http 1.0.0", @@ -230,8 +231,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper", "tokio", + "tokio-tungstenite", "tower", "tower-layer", "tower-service", @@ -405,9 +408,11 @@ dependencies = [ "rf4463", "rppal", "serde", + "serde_json", "simple_logger", "sqlx", "tokio", + "tokio-tungstenite", "toml", "tonic", "tower-http", @@ -530,6 +535,12 @@ dependencies = [ "typenum", ] +[[package]] +name = "data-encoding" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" + [[package]] name = "der" version = "0.7.8" @@ -2418,6 +2429,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.10" @@ -2610,6 +2633,25 @@ dependencies = [ "wintun", ] +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.0.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" @@ -2681,6 +2723,12 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 419374f..9502cf8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,14 +9,17 @@ license = "MIT" anyhow = "1.0" askama = { version = "0.12", features = ["with-axum"] } askama_axum = "0.4" -axum = "0.7" +axum = { version = "0.7", features = ["ws"] } +#axum-extra = "0.7" chrono = "0.4" simple_logger = "4.3" log = "0.4" serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" toml = "0.8" sqlx = { version = "0.7", features = [ "runtime-tokio-rustls", "sqlite"]} tokio = { version = "1", features = ["full"] } +tokio-tungstenite = "0.21" tower-http = { version = "0.5.0", features = ["fs"] } futures-core = "0.3" @@ -31,8 +34,6 @@ tonic = { version = "0.10", features = ["tls", "tls-roots"] } async-stream = "0.3" rand = "0.8" -# Websockets example in https://github.com/tokio-rs/axum/tree/main/examples/websockets -# tokio-tungstenite = "0.21" [[bin]] name = "fake-radio" diff --git a/src/main.rs b/src/main.rs index 6b79581..49ef58e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,7 @@ use anyhow::{anyhow, Context}; use log::{debug, info, warn, error}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, broadcast}; use radio::{RadioManager, MAX_PACKET_LEN}; mod db; @@ -10,10 +10,17 @@ mod radio; mod config; mod ui; +#[derive(Clone, serde::Serialize)] +struct WSChatMessage { + from: String, + message: String, +} + struct AppState { conf : config::Config, db : db::Database, transmit_queue : mpsc::Sender>, + ws_broadcast : broadcast::Sender, start_time : chrono::DateTime, } @@ -61,6 +68,7 @@ async fn main() -> std::io::Result<()> { conf : conf.clone(), db : db::Database::new().await, transmit_queue : packet_send.clone(), + ws_broadcast : broadcast::Sender::new(2), start_time : chrono::Utc::now(), })); @@ -115,11 +123,27 @@ async fn main() -> std::io::Result<()> { let mut buf = [0; MAX_PACKET_LEN]; match ham_cats::packet::Packet::fully_decode(&packet_data, &mut buf) { Ok(packet) => { + let (mut db, ws_broadcast) = { + let g = shared_state_receive.lock().unwrap(); + (g.db.clone(), g.ws_broadcast.clone()) + }; + if let Some(ident) = packet.identification() { debug!(" From {}-{}", ident.callsign, ident.ssid); + + let mut commentbuf = [0u8, 255]; + if let Ok(comment) = packet.comment(&mut commentbuf) { + let m = WSChatMessage { + from: format!("{}-{}", ident.callsign, ident.ssid), + message: comment.to_owned() + }; + match ws_broadcast.send(m) { + Ok(num) => debug!("Send WS message to {num}"), + Err(_) => debug!("No WS receivers currently"), + } + } } - let mut db = shared_state_receive.lock().unwrap().db.clone(); if let Err(e) = db.store_packet(&packet_data).await { warn!("Failed to write to sqlite: {}", e); } diff --git a/src/ui.rs b/src/ui.rs index afc6398..cbf493e 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -1,17 +1,22 @@ -use anyhow::{anyhow, Context}; +use std::ops::ControlFlow; +use std::net::SocketAddr; use std::str::FromStr; -use axum::Json; -use log::{info, warn}; -use serde::Deserialize; +use anyhow::{anyhow, Context}; use askama::Template; use axum::{ - extract::State, - routing::{get, post}, - Router, - response::Html, Form, + Json, + Router, + extract::State, + extract::{ws::{Message, WebSocket, WebSocketUpgrade}, ConnectInfo}, http::StatusCode, + response::Html, + response::IntoResponse, + routing::{get, post}, }; +use futures::{StreamExt, SinkExt}; +use log::{debug, info, warn}; +use serde::Deserialize; use tower_http::services::ServeDir; use ham_cats::{ @@ -25,7 +30,8 @@ use crate::SharedState; pub async fn serve(port: u16, shared_state: SharedState) { let app = Router::new() .route("/", get(dashboard)) - .route("/incoming", get(incoming)) + .route("/chat", get(chat)) + .route("/chat/ws", get(ws_handler)) .route("/send", get(send)) .route("/api/send_packet", post(post_packet)) .route("/settings", get(show_settings).post(post_settings)) @@ -40,9 +46,10 @@ pub async fn serve(port: u16, shared_state: SharedState) { #[derive(PartialEq)] enum ActivePage { Dashboard, - Incoming, + Chat, Send, Settings, + None, } #[derive(Template)] @@ -123,21 +130,110 @@ async fn dashboard(State(state): State) -> DashboardTemplate<'stati } #[derive(Template)] -#[template(path = "incoming.html")] -struct IncomingTemplate<'a> { +#[template(path = "chat.html")] +struct ChatTemplate<'a> { title: &'a str, page: ActivePage, conf: config::Config, } -async fn incoming(State(state): State) -> IncomingTemplate<'static> { - IncomingTemplate { - title: "Incoming", +async fn chat(State(state): State) -> ChatTemplate<'static> { + ChatTemplate { + title: "Chat", conf: state.lock().unwrap().conf.clone(), - page: ActivePage::Incoming, + page: ActivePage::Chat, } } +async fn ws_handler( + State(state): State, + ws: WebSocketUpgrade, + ConnectInfo(addr): ConnectInfo) -> impl IntoResponse { + info!("User at {addr} connected."); + let rx = state.lock().unwrap().ws_broadcast.subscribe(); + ws.on_upgrade(move |socket| handle_socket(socket, rx, addr)) +} + +async fn handle_socket( + mut socket: WebSocket, + mut rx: tokio::sync::broadcast::Receiver, + who: SocketAddr) { + if socket.send(Message::Ping(vec![1, 2, 3])).await.is_ok() { + info!("Pinged {who}..."); + } else { + info!("Could not ping {who}!"); + return; + } + let (mut sender, mut receiver) = socket.split(); + + let mut send_task = tokio::spawn(async move { + while let Ok(m) = rx.recv().await { + if let Ok(m_json) = serde_json::to_string(&m) { + if sender + .send(Message::Text(m_json)) + .await + .is_err() + { + return; + } + } + } + }); + + let mut recv_task = tokio::spawn(async move { + while let Some(Ok(msg)) = receiver.next().await { + if process_message(msg, who).is_break() { + break; + } + } + }); + + // If any one of the tasks exit, abort the other. + tokio::select! { + _rv_a = (&mut send_task) => { + recv_task.abort(); + }, + _rv_b = (&mut recv_task) => { + send_task.abort(); + } + } + + info!("Websocket context {who} destroyed"); +} + +fn process_message(msg: Message, who: SocketAddr) -> ControlFlow<(), ()> { + match msg { + Message::Text(t) => { + debug!(">>> {who} sent str: {t:?}"); + } + Message::Binary(d) => { + debug!(">>> {} sent {} bytes: {:?}", who, d.len(), d); + } + Message::Close(c) => { + if let Some(cf) = c { + debug!( + ">>> {} sent close with code {} and reason `{}`", + who, cf.code, cf.reason + ); + } else { + debug!(">>> {who} somehow sent close message without CloseFrame"); + } + return ControlFlow::Break(()); + } + + Message::Pong(v) => { + debug!(">>> {who} sent pong with {v:?}"); + } + // You should never need to manually handle Message::Ping, as axum's websocket library + // will do so for you automagically by replying with Pong and copying the v according to + // spec. But if you need the contents of the pings you can see them here. + Message::Ping(v) => { + debug!(">>> {who} sent ping with {v:?}"); + } + } + ControlFlow::Continue(()) +} + #[derive(Template)] #[template(path = "send.html")] struct SendTemplate<'a> { @@ -305,40 +401,57 @@ impl TryFrom for config::Config { } } -async fn post_settings(State(state): State, Form(input): Form) -> (StatusCode, Html) { +#[derive(Template)] +#[template(path = "settings_applied.html")] +struct SettingsAppliedTemplate<'a> { + title: &'a str, + page: ActivePage, + conf: config::Config, + ok: bool, + error_message: &'a str, + error_reason: String, +} + +async fn post_settings( + State(state): State, + Form(input): Form) -> (StatusCode, SettingsAppliedTemplate<'static>) { + match config::Config::try_from(input) { Ok(c) => { match c.store() { Ok(()) => { state.lock().unwrap().conf.clone_from(&c); - (StatusCode::OK, Html( - r#" - -

Configuration updated. If you enabled or disabled tunnel, please restart the cats-radio-node process.

-

To dashboard

- "#.to_owned())) + (StatusCode::OK, SettingsAppliedTemplate { + title: "Settings", + conf: c, + page: ActivePage::None, + ok: true, + error_message: "", + error_reason: "".to_owned(), + }) } Err(e) => { - (StatusCode::INTERNAL_SERVER_ERROR, Html( - format!(r#" - -

Internal Server Error: Could not write config

-

{}

- - "#, e))) + (StatusCode::INTERNAL_SERVER_ERROR, SettingsAppliedTemplate { + title: "Settings", + conf : c, + page: ActivePage::None, + ok: false, + error_message: "Failed to store config", + error_reason: e.to_string(), + }) }, } - }, Err(e) => { - (StatusCode::BAD_REQUEST, Html( - format!(r#" - -

Error interpreting POST data

-

{}

- - "#, e))) + (StatusCode::BAD_REQUEST, SettingsAppliedTemplate { + title: "Settings", + conf: state.lock().unwrap().conf.clone(), + page: ActivePage::None, + ok: false, + error_message: "Error interpreting POST data", + error_reason: e.to_string(), + }) }, } } diff --git a/templates/chat.html b/templates/chat.html new file mode 100644 index 0000000..584bde5 --- /dev/null +++ b/templates/chat.html @@ -0,0 +1,6 @@ +{% include "head.html" %} +
+

Chat

+
+{% include "foot.html" %} +{# vi:set et sw=2 ts=2: #} diff --git a/templates/head.html b/templates/head.html index 99a4bca..3c611ed 100644 --- a/templates/head.html +++ b/templates/head.html @@ -24,9 +24,9 @@ Dashboard - -
  • - Incoming + +
  • + Chat
  • diff --git a/templates/incoming.html b/templates/incoming.html deleted file mode 100644 index 2a43623..0000000 --- a/templates/incoming.html +++ /dev/null @@ -1,6 +0,0 @@ -{% include "head.html" %} -
    - Incoming! -
    -{% include "foot.html" %} -{# vi:set et sw=2 ts=2: #} diff --git a/templates/settings_applied.html b/templates/settings_applied.html new file mode 100644 index 0000000..f536772 --- /dev/null +++ b/templates/settings_applied.html @@ -0,0 +1,13 @@ +{% include "head.html" %} +
    + {% if ok %} +

    Configuration updated

    +

    If you enabled or disabled tunnel, please restart the cats-radio-node process.

    + {% else %} +

    Configuration update failed

    +

    {{ error_message }}:

    +

    {{ error_reason }}:

    + {% endif %} +
    +{% include "foot.html" %} +{# vi:set et sw=2 ts=2: #} -- cgit v1.2.3