aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock48
-rw-r--r--Cargo.toml7
-rw-r--r--src/main.rs28
-rw-r--r--src/ui.rs189
-rw-r--r--templates/chat.html (renamed from templates/incoming.html)2
-rw-r--r--templates/head.html6
-rw-r--r--templates/settings_applied.html13
7 files changed, 246 insertions, 47 deletions
diff --git a/Cargo.lock b/Cargo.lock
index dff1ed8..debd451 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -212,6 +212,7 @@ checksum = "1236b4b292f6c4d6dc34604bb5120d85c3fe1d1aa596bd5cc52ca054d13e7b9e"
dependencies = [
"async-trait",
"axum-core 0.4.3",
+ "base64",
"bytes",
"futures-util",
"http 1.0.0",
@@ -230,8 +231,10 @@ dependencies = [
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
+ "sha1",
"sync_wrapper",
"tokio",
+ "tokio-tungstenite",
"tower",
"tower-layer",
"tower-service",
@@ -405,9 +408,11 @@ dependencies = [
"rf4463",
"rppal",
"serde",
+ "serde_json",
"simple_logger",
"sqlx",
"tokio",
+ "tokio-tungstenite",
"toml",
"tonic",
"tower-http",
@@ -531,6 +536,12 @@ dependencies = [
]
[[package]]
+name = "data-encoding"
+version = "2.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5"
+
+[[package]]
name = "der"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2419,6 +2430,18 @@ dependencies = [
]
[[package]]
+name = "tokio-tungstenite"
+version = "0.21.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38"
+dependencies = [
+ "futures-util",
+ "log",
+ "tokio",
+ "tungstenite",
+]
+
+[[package]]
name = "tokio-util"
version = "0.7.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2611,6 +2634,25 @@ dependencies = [
]
[[package]]
+name = "tungstenite"
+version = "0.21.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1"
+dependencies = [
+ "byteorder",
+ "bytes",
+ "data-encoding",
+ "http 1.0.0",
+ "httparse",
+ "log",
+ "rand",
+ "sha1",
+ "thiserror",
+ "url",
+ "utf-8",
+]
+
+[[package]]
name = "typenum"
version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2682,6 +2724,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da"
[[package]]
+name = "utf-8"
+version = "0.7.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
+
+[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index 419374f..9502cf8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -9,14 +9,17 @@ license = "MIT"
anyhow = "1.0"
askama = { version = "0.12", features = ["with-axum"] }
askama_axum = "0.4"
-axum = "0.7"
+axum = { version = "0.7", features = ["ws"] }
+#axum-extra = "0.7"
chrono = "0.4"
simple_logger = "4.3"
log = "0.4"
serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
toml = "0.8"
sqlx = { version = "0.7", features = [ "runtime-tokio-rustls", "sqlite"]}
tokio = { version = "1", features = ["full"] }
+tokio-tungstenite = "0.21"
tower-http = { version = "0.5.0", features = ["fs"] }
futures-core = "0.3"
@@ -31,8 +34,6 @@ tonic = { version = "0.10", features = ["tls", "tls-roots"] }
async-stream = "0.3"
rand = "0.8"
-# Websockets example in https://github.com/tokio-rs/axum/tree/main/examples/websockets
-# tokio-tungstenite = "0.21"
[[bin]]
name = "fake-radio"
diff --git a/src/main.rs b/src/main.rs
index 6b79581..49ef58e 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -2,7 +2,7 @@ use anyhow::{anyhow, Context};
use log::{debug, info, warn, error};
use std::sync::{Arc, Mutex};
use tokio::net::UdpSocket;
-use tokio::sync::mpsc;
+use tokio::sync::{mpsc, broadcast};
use radio::{RadioManager, MAX_PACKET_LEN};
mod db;
@@ -10,10 +10,17 @@ mod radio;
mod config;
mod ui;
+#[derive(Clone, serde::Serialize)]
+struct WSChatMessage {
+ from: String,
+ message: String,
+}
+
struct AppState {
conf : config::Config,
db : db::Database,
transmit_queue : mpsc::Sender<Vec<u8>>,
+ ws_broadcast : broadcast::Sender<WSChatMessage>,
start_time : chrono::DateTime<chrono::Utc>,
}
@@ -61,6 +68,7 @@ async fn main() -> std::io::Result<()> {
conf : conf.clone(),
db : db::Database::new().await,
transmit_queue : packet_send.clone(),
+ ws_broadcast : broadcast::Sender::new(2),
start_time : chrono::Utc::now(),
}));
@@ -115,11 +123,27 @@ async fn main() -> std::io::Result<()> {
let mut buf = [0; MAX_PACKET_LEN];
match ham_cats::packet::Packet::fully_decode(&packet_data, &mut buf) {
Ok(packet) => {
+ let (mut db, ws_broadcast) = {
+ let g = shared_state_receive.lock().unwrap();
+ (g.db.clone(), g.ws_broadcast.clone())
+ };
+
if let Some(ident) = packet.identification() {
debug!(" From {}-{}", ident.callsign, ident.ssid);
+
+ let mut commentbuf = [0u8, 255];
+ if let Ok(comment) = packet.comment(&mut commentbuf) {
+ let m = WSChatMessage {
+ from: format!("{}-{}", ident.callsign, ident.ssid),
+ message: comment.to_owned()
+ };
+ match ws_broadcast.send(m) {
+ Ok(num) => debug!("Send WS message to {num}"),
+ Err(_) => debug!("No WS receivers currently"),
+ }
+ }
}
- let mut db = shared_state_receive.lock().unwrap().db.clone();
if let Err(e) = db.store_packet(&packet_data).await {
warn!("Failed to write to sqlite: {}", e);
}
diff --git a/src/ui.rs b/src/ui.rs
index afc6398..cbf493e 100644
--- a/src/ui.rs
+++ b/src/ui.rs
@@ -1,17 +1,22 @@
-use anyhow::{anyhow, Context};
+use std::ops::ControlFlow;
+use std::net::SocketAddr;
use std::str::FromStr;
-use axum::Json;
-use log::{info, warn};
-use serde::Deserialize;
+use anyhow::{anyhow, Context};
use askama::Template;
use axum::{
- extract::State,
- routing::{get, post},
- Router,
- response::Html,
Form,
+ Json,
+ Router,
+ extract::State,
+ extract::{ws::{Message, WebSocket, WebSocketUpgrade}, ConnectInfo},
http::StatusCode,
+ response::Html,
+ response::IntoResponse,
+ routing::{get, post},
};
+use futures::{StreamExt, SinkExt};
+use log::{debug, info, warn};
+use serde::Deserialize;
use tower_http::services::ServeDir;
use ham_cats::{
@@ -25,7 +30,8 @@ use crate::SharedState;
pub async fn serve(port: u16, shared_state: SharedState) {
let app = Router::new()
.route("/", get(dashboard))
- .route("/incoming", get(incoming))
+ .route("/chat", get(chat))
+ .route("/chat/ws", get(ws_handler))
.route("/send", get(send))
.route("/api/send_packet", post(post_packet))
.route("/settings", get(show_settings).post(post_settings))
@@ -40,9 +46,10 @@ pub async fn serve(port: u16, shared_state: SharedState) {
#[derive(PartialEq)]
enum ActivePage {
Dashboard,
- Incoming,
+ Chat,
Send,
Settings,
+ None,
}
#[derive(Template)]
@@ -123,21 +130,110 @@ async fn dashboard(State(state): State<SharedState>) -> DashboardTemplate<'stati
}
#[derive(Template)]
-#[template(path = "incoming.html")]
-struct IncomingTemplate<'a> {
+#[template(path = "chat.html")]
+struct ChatTemplate<'a> {
title: &'a str,
page: ActivePage,
conf: config::Config,
}
-async fn incoming(State(state): State<SharedState>) -> IncomingTemplate<'static> {
- IncomingTemplate {
- title: "Incoming",
+async fn chat(State(state): State<SharedState>) -> ChatTemplate<'static> {
+ ChatTemplate {
+ title: "Chat",
conf: state.lock().unwrap().conf.clone(),
- page: ActivePage::Incoming,
+ page: ActivePage::Chat,
}
}
+async fn ws_handler(
+ State(state): State<SharedState>,
+ ws: WebSocketUpgrade,
+ ConnectInfo(addr): ConnectInfo<SocketAddr>) -> impl IntoResponse {
+ info!("User at {addr} connected.");
+ let rx = state.lock().unwrap().ws_broadcast.subscribe();
+ ws.on_upgrade(move |socket| handle_socket(socket, rx, addr))
+}
+
+async fn handle_socket(
+ mut socket: WebSocket,
+ mut rx: tokio::sync::broadcast::Receiver<crate::WSChatMessage>,
+ who: SocketAddr) {
+ if socket.send(Message::Ping(vec![1, 2, 3])).await.is_ok() {
+ info!("Pinged {who}...");
+ } else {
+ info!("Could not ping {who}!");
+ return;
+ }
+ let (mut sender, mut receiver) = socket.split();
+
+ let mut send_task = tokio::spawn(async move {
+ while let Ok(m) = rx.recv().await {
+ if let Ok(m_json) = serde_json::to_string(&m) {
+ if sender
+ .send(Message::Text(m_json))
+ .await
+ .is_err()
+ {
+ return;
+ }
+ }
+ }
+ });
+
+ let mut recv_task = tokio::spawn(async move {
+ while let Some(Ok(msg)) = receiver.next().await {
+ if process_message(msg, who).is_break() {
+ break;
+ }
+ }
+ });
+
+ // If any one of the tasks exit, abort the other.
+ tokio::select! {
+ _rv_a = (&mut send_task) => {
+ recv_task.abort();
+ },
+ _rv_b = (&mut recv_task) => {
+ send_task.abort();
+ }
+ }
+
+ info!("Websocket context {who} destroyed");
+}
+
+fn process_message(msg: Message, who: SocketAddr) -> ControlFlow<(), ()> {
+ match msg {
+ Message::Text(t) => {
+ debug!(">>> {who} sent str: {t:?}");
+ }
+ Message::Binary(d) => {
+ debug!(">>> {} sent {} bytes: {:?}", who, d.len(), d);
+ }
+ Message::Close(c) => {
+ if let Some(cf) = c {
+ debug!(
+ ">>> {} sent close with code {} and reason `{}`",
+ who, cf.code, cf.reason
+ );
+ } else {
+ debug!(">>> {who} somehow sent close message without CloseFrame");
+ }
+ return ControlFlow::Break(());
+ }
+
+ Message::Pong(v) => {
+ debug!(">>> {who} sent pong with {v:?}");
+ }
+ // You should never need to manually handle Message::Ping, as axum's websocket library
+ // will do so for you automagically by replying with Pong and copying the v according to
+ // spec. But if you need the contents of the pings you can see them here.
+ Message::Ping(v) => {
+ debug!(">>> {who} sent ping with {v:?}");
+ }
+ }
+ ControlFlow::Continue(())
+}
+
#[derive(Template)]
#[template(path = "send.html")]
struct SendTemplate<'a> {
@@ -305,40 +401,57 @@ impl TryFrom<FormConfig> for config::Config {
}
}
-async fn post_settings(State(state): State<SharedState>, Form(input): Form<FormConfig>) -> (StatusCode, Html<String>) {
+#[derive(Template)]
+#[template(path = "settings_applied.html")]
+struct SettingsAppliedTemplate<'a> {
+ title: &'a str,
+ page: ActivePage,
+ conf: config::Config,
+ ok: bool,
+ error_message: &'a str,
+ error_reason: String,
+}
+
+async fn post_settings(
+ State(state): State<SharedState>,
+ Form(input): Form<FormConfig>) -> (StatusCode, SettingsAppliedTemplate<'static>) {
+
match config::Config::try_from(input) {
Ok(c) => {
match c.store() {
Ok(()) => {
state.lock().unwrap().conf.clone_from(&c);
- (StatusCode::OK, Html(
- r#"<!doctype html>
- <html><head></head><body>
- <p>Configuration updated. If you enabled or disabled tunnel, please restart the cats-radio-node process.</p>
- <p>To <a href="/">dashboard</a></p>
- </body></html>"#.to_owned()))
+ (StatusCode::OK, SettingsAppliedTemplate {
+ title: "Settings",
+ conf: c,
+ page: ActivePage::None,
+ ok: true,
+ error_message: "",
+ error_reason: "".to_owned(),
+ })
}
Err(e) => {
- (StatusCode::INTERNAL_SERVER_ERROR, Html(
- format!(r#"<!doctype html>
- <html><head></head>
- <body><p>Internal Server Error: Could not write config</p>
- <p>{}</p>
- </body>
- </html>"#, e)))
+ (StatusCode::INTERNAL_SERVER_ERROR, SettingsAppliedTemplate {
+ title: "Settings",
+ conf : c,
+ page: ActivePage::None,
+ ok: false,
+ error_message: "Failed to store config",
+ error_reason: e.to_string(),
+ })
},
}
-
},
Err(e) => {
- (StatusCode::BAD_REQUEST, Html(
- format!(r#"<!doctype html>
- <html><head></head>
- <body><p>Error interpreting POST data</p>
- <p>{}</p>
- </body>
- </html>"#, e)))
+ (StatusCode::BAD_REQUEST, SettingsAppliedTemplate {
+ title: "Settings",
+ conf: state.lock().unwrap().conf.clone(),
+ page: ActivePage::None,
+ ok: false,
+ error_message: "Error interpreting POST data",
+ error_reason: e.to_string(),
+ })
},
}
}
diff --git a/templates/incoming.html b/templates/chat.html
index 2a43623..584bde5 100644
--- a/templates/incoming.html
+++ b/templates/chat.html
@@ -1,6 +1,6 @@
{% include "head.html" %}
<div class="content">
- Incoming!
+ <h1>Chat</h1>
</div>
{% include "foot.html" %}
{# vi:set et sw=2 ts=2: #}
diff --git a/templates/head.html b/templates/head.html
index 99a4bca..3c611ed 100644
--- a/templates/head.html
+++ b/templates/head.html
@@ -24,9 +24,9 @@
<i class="w-8 fa fa-home" aria-hidden="true"></i><span>Dashboard</span>
</li>
</a>
- <a href="/incoming" class="">
- <li class="rounded-md mt-1 p-3 {% if page == ActivePage::Incoming %} bg-sky-200 text-sky-900 {% endif %} hover:bg-sky-300">
- <i class="w-8 fa fa-inbox" aria-hidden="true"></i><span>Incoming</span>
+ <a href="/chat" class="">
+ <li class="rounded-md mt-1 p-3 {% if page == ActivePage::Chat %} bg-sky-200 text-sky-900 {% endif %} hover:bg-sky-300">
+ <i class="w-8 fa fa-comments" aria-hidden="true"></i><span>Chat</span>
</li>
</a>
<a href="/send" class="">
diff --git a/templates/settings_applied.html b/templates/settings_applied.html
new file mode 100644
index 0000000..f536772
--- /dev/null
+++ b/templates/settings_applied.html
@@ -0,0 +1,13 @@
+{% include "head.html" %}
+<div class="content">
+ {% if ok %}
+ <h1>Configuration updated</h1>
+ <p>If you enabled or disabled tunnel, please restart the cats-radio-node process.</p>
+ {% else %}
+ <h1>Configuration update failed</h1>
+ <p>{{ error_message }}:</p>
+ <p>{{ error_reason }}:</p>
+ {% endif %}
+</div>
+{% include "foot.html" %}
+{# vi:set et sw=2 ts=2: #}