aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.rs28
-rw-r--r--src/ui.rs189
2 files changed, 177 insertions, 40 deletions
diff --git a/src/main.rs b/src/main.rs
index 6b79581..49ef58e 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -2,7 +2,7 @@ use anyhow::{anyhow, Context};
use log::{debug, info, warn, error};
use std::sync::{Arc, Mutex};
use tokio::net::UdpSocket;
-use tokio::sync::mpsc;
+use tokio::sync::{mpsc, broadcast};
use radio::{RadioManager, MAX_PACKET_LEN};
mod db;
@@ -10,10 +10,17 @@ mod radio;
mod config;
mod ui;
+#[derive(Clone, serde::Serialize)]
+struct WSChatMessage {
+ from: String,
+ message: String,
+}
+
struct AppState {
conf : config::Config,
db : db::Database,
transmit_queue : mpsc::Sender<Vec<u8>>,
+ ws_broadcast : broadcast::Sender<WSChatMessage>,
start_time : chrono::DateTime<chrono::Utc>,
}
@@ -61,6 +68,7 @@ async fn main() -> std::io::Result<()> {
conf : conf.clone(),
db : db::Database::new().await,
transmit_queue : packet_send.clone(),
+ ws_broadcast : broadcast::Sender::new(2),
start_time : chrono::Utc::now(),
}));
@@ -115,11 +123,27 @@ async fn main() -> std::io::Result<()> {
let mut buf = [0; MAX_PACKET_LEN];
match ham_cats::packet::Packet::fully_decode(&packet_data, &mut buf) {
Ok(packet) => {
+ let (mut db, ws_broadcast) = {
+ let g = shared_state_receive.lock().unwrap();
+ (g.db.clone(), g.ws_broadcast.clone())
+ };
+
if let Some(ident) = packet.identification() {
debug!(" From {}-{}", ident.callsign, ident.ssid);
+
+ let mut commentbuf = [0u8, 255];
+ if let Ok(comment) = packet.comment(&mut commentbuf) {
+ let m = WSChatMessage {
+ from: format!("{}-{}", ident.callsign, ident.ssid),
+ message: comment.to_owned()
+ };
+ match ws_broadcast.send(m) {
+ Ok(num) => debug!("Send WS message to {num}"),
+ Err(_) => debug!("No WS receivers currently"),
+ }
+ }
}
- let mut db = shared_state_receive.lock().unwrap().db.clone();
if let Err(e) = db.store_packet(&packet_data).await {
warn!("Failed to write to sqlite: {}", e);
}
diff --git a/src/ui.rs b/src/ui.rs
index afc6398..cbf493e 100644
--- a/src/ui.rs
+++ b/src/ui.rs
@@ -1,17 +1,22 @@
-use anyhow::{anyhow, Context};
+use std::ops::ControlFlow;
+use std::net::SocketAddr;
use std::str::FromStr;
-use axum::Json;
-use log::{info, warn};
-use serde::Deserialize;
+use anyhow::{anyhow, Context};
use askama::Template;
use axum::{
- extract::State,
- routing::{get, post},
- Router,
- response::Html,
Form,
+ Json,
+ Router,
+ extract::State,
+ extract::{ws::{Message, WebSocket, WebSocketUpgrade}, ConnectInfo},
http::StatusCode,
+ response::Html,
+ response::IntoResponse,
+ routing::{get, post},
};
+use futures::{StreamExt, SinkExt};
+use log::{debug, info, warn};
+use serde::Deserialize;
use tower_http::services::ServeDir;
use ham_cats::{
@@ -25,7 +30,8 @@ use crate::SharedState;
pub async fn serve(port: u16, shared_state: SharedState) {
let app = Router::new()
.route("/", get(dashboard))
- .route("/incoming", get(incoming))
+ .route("/chat", get(chat))
+ .route("/chat/ws", get(ws_handler))
.route("/send", get(send))
.route("/api/send_packet", post(post_packet))
.route("/settings", get(show_settings).post(post_settings))
@@ -40,9 +46,10 @@ pub async fn serve(port: u16, shared_state: SharedState) {
#[derive(PartialEq)]
enum ActivePage {
Dashboard,
- Incoming,
+ Chat,
Send,
Settings,
+ None,
}
#[derive(Template)]
@@ -123,21 +130,110 @@ async fn dashboard(State(state): State<SharedState>) -> DashboardTemplate<'stati
}
#[derive(Template)]
-#[template(path = "incoming.html")]
-struct IncomingTemplate<'a> {
+#[template(path = "chat.html")]
+struct ChatTemplate<'a> {
title: &'a str,
page: ActivePage,
conf: config::Config,
}
-async fn incoming(State(state): State<SharedState>) -> IncomingTemplate<'static> {
- IncomingTemplate {
- title: "Incoming",
+async fn chat(State(state): State<SharedState>) -> ChatTemplate<'static> {
+ ChatTemplate {
+ title: "Chat",
conf: state.lock().unwrap().conf.clone(),
- page: ActivePage::Incoming,
+ page: ActivePage::Chat,
}
}
+async fn ws_handler(
+ State(state): State<SharedState>,
+ ws: WebSocketUpgrade,
+ ConnectInfo(addr): ConnectInfo<SocketAddr>) -> impl IntoResponse {
+ info!("User at {addr} connected.");
+ let rx = state.lock().unwrap().ws_broadcast.subscribe();
+ ws.on_upgrade(move |socket| handle_socket(socket, rx, addr))
+}
+
+async fn handle_socket(
+ mut socket: WebSocket,
+ mut rx: tokio::sync::broadcast::Receiver<crate::WSChatMessage>,
+ who: SocketAddr) {
+ if socket.send(Message::Ping(vec![1, 2, 3])).await.is_ok() {
+ info!("Pinged {who}...");
+ } else {
+ info!("Could not ping {who}!");
+ return;
+ }
+ let (mut sender, mut receiver) = socket.split();
+
+ let mut send_task = tokio::spawn(async move {
+ while let Ok(m) = rx.recv().await {
+ if let Ok(m_json) = serde_json::to_string(&m) {
+ if sender
+ .send(Message::Text(m_json))
+ .await
+ .is_err()
+ {
+ return;
+ }
+ }
+ }
+ });
+
+ let mut recv_task = tokio::spawn(async move {
+ while let Some(Ok(msg)) = receiver.next().await {
+ if process_message(msg, who).is_break() {
+ break;
+ }
+ }
+ });
+
+ // If any one of the tasks exit, abort the other.
+ tokio::select! {
+ _rv_a = (&mut send_task) => {
+ recv_task.abort();
+ },
+ _rv_b = (&mut recv_task) => {
+ send_task.abort();
+ }
+ }
+
+ info!("Websocket context {who} destroyed");
+}
+
+fn process_message(msg: Message, who: SocketAddr) -> ControlFlow<(), ()> {
+ match msg {
+ Message::Text(t) => {
+ debug!(">>> {who} sent str: {t:?}");
+ }
+ Message::Binary(d) => {
+ debug!(">>> {} sent {} bytes: {:?}", who, d.len(), d);
+ }
+ Message::Close(c) => {
+ if let Some(cf) = c {
+ debug!(
+ ">>> {} sent close with code {} and reason `{}`",
+ who, cf.code, cf.reason
+ );
+ } else {
+ debug!(">>> {who} somehow sent close message without CloseFrame");
+ }
+ return ControlFlow::Break(());
+ }
+
+ Message::Pong(v) => {
+ debug!(">>> {who} sent pong with {v:?}");
+ }
+ // You should never need to manually handle Message::Ping, as axum's websocket library
+ // will do so for you automagically by replying with Pong and copying the v according to
+ // spec. But if you need the contents of the pings you can see them here.
+ Message::Ping(v) => {
+ debug!(">>> {who} sent ping with {v:?}");
+ }
+ }
+ ControlFlow::Continue(())
+}
+
#[derive(Template)]
#[template(path = "send.html")]
struct SendTemplate<'a> {
@@ -305,40 +401,57 @@ impl TryFrom<FormConfig> for config::Config {
}
}
-async fn post_settings(State(state): State<SharedState>, Form(input): Form<FormConfig>) -> (StatusCode, Html<String>) {
+#[derive(Template)]
+#[template(path = "settings_applied.html")]
+struct SettingsAppliedTemplate<'a> {
+ title: &'a str,
+ page: ActivePage,
+ conf: config::Config,
+ ok: bool,
+ error_message: &'a str,
+ error_reason: String,
+}
+
+async fn post_settings(
+ State(state): State<SharedState>,
+ Form(input): Form<FormConfig>) -> (StatusCode, SettingsAppliedTemplate<'static>) {
+
match config::Config::try_from(input) {
Ok(c) => {
match c.store() {
Ok(()) => {
state.lock().unwrap().conf.clone_from(&c);
- (StatusCode::OK, Html(
- r#"<!doctype html>
- <html><head></head><body>
- <p>Configuration updated. If you enabled or disabled tunnel, please restart the cats-radio-node process.</p>
- <p>To <a href="/">dashboard</a></p>
- </body></html>"#.to_owned()))
+ (StatusCode::OK, SettingsAppliedTemplate {
+ title: "Settings",
+ conf: c,
+ page: ActivePage::None,
+ ok: true,
+ error_message: "",
+ error_reason: "".to_owned(),
+ })
}
Err(e) => {
- (StatusCode::INTERNAL_SERVER_ERROR, Html(
- format!(r#"<!doctype html>
- <html><head></head>
- <body><p>Internal Server Error: Could not write config</p>
- <p>{}</p>
- </body>
- </html>"#, e)))
+ (StatusCode::INTERNAL_SERVER_ERROR, SettingsAppliedTemplate {
+ title: "Settings",
+ conf : c,
+ page: ActivePage::None,
+ ok: false,
+ error_message: "Failed to store config",
+ error_reason: e.to_string(),
+ })
},
}
-
},
Err(e) => {
- (StatusCode::BAD_REQUEST, Html(
- format!(r#"<!doctype html>
- <html><head></head>
- <body><p>Error interpreting POST data</p>
- <p>{}</p>
- </body>
- </html>"#, e)))
+ (StatusCode::BAD_REQUEST, SettingsAppliedTemplate {
+ title: "Settings",
+ conf: state.lock().unwrap().conf.clone(),
+ page: ActivePage::None,
+ ok: false,
+ error_message: "Error interpreting POST data",
+ error_reason: e.to_string(),
+ })
},
}
}