From 241622ff7fd612eb521dec8829153851406c5884 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 7 Jan 2024 22:22:51 +0100 Subject: Store incoming frames in DB and show most recent 10 in dashboard --- Cargo.lock | 79 +++++++++++++--------------------- cats-radio-node.db | Bin 16384 -> 0 bytes migrations/20231230_create.sql | 4 +- src/bin/fake-radio.rs | 93 ++++++++++++++++++++++------------------- src/db.rs | 60 ++++++++++++++++++++++++++ src/main.rs | 73 ++++++++++++++++---------------- src/ui.rs | 55 +++++++++++++++++++++++- templates/dashboard.html | 12 +++++- 8 files changed, 241 insertions(+), 135 deletions(-) create mode 100644 src/db.rs diff --git a/Cargo.lock b/Cargo.lock index bd58099..a11d633 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -85,7 +85,7 @@ dependencies = [ "proc-macro2", "quote", "serde", - "syn 2.0.46", + "syn 2.0.48", ] [[package]] @@ -143,7 +143,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.46", + "syn 2.0.48", ] [[package]] @@ -154,7 +154,7 @@ checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.46", + "syn 2.0.48", ] [[package]] @@ -439,9 +439,9 @@ checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" [[package]] name = "cpufeatures" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce420fe07aecd3e67c5f910618fe65e94158f6dcc0adf44e00d69ce2bdfe0fd0" +checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504" dependencies = [ "libc", ] @@ -1114,9 +1114,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.151" +version = "0.2.152" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" +checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" [[package]] name = "libm" @@ -1406,7 +1406,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.46", + "syn 2.0.48", ] [[package]] @@ -1462,9 +1462,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.74" +version = "1.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2de98502f212cfcea8d0bb305bd0f49d7ebdd75b64ba0a68f937d888f4e0d6db" +checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c" dependencies = [ "unicode-ident", ] @@ -1713,29 +1713,29 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.194" +version = "1.0.195" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b114498256798c94a0689e1a15fec6005dee8ac1f41de56404b67afc2a4b773" +checksum = "63261df402c67811e9ac6def069e4786148c4563f4b50fd4bf30aa370d626b02" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.194" +version = "1.0.195" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3385e45322e8f9931410f01b3031ec534c3947d0e94c18049af4d9f9907d4e0" +checksum = "46fe8f8603d81ba86327b23a2e9cdf49e1255fb94a4c5f297f6ee0547178ea2c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.46", + "syn 2.0.48", ] [[package]] name = "serde_json" -version = "1.0.110" +version = "1.0.111" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fbd975230bada99c8bb618e0c365c2eefa219158d5c6c29610fd09ff1833257" +checksum = "176e46fa42316f18edd598015a5166857fc835ec732f5215eac6b7bdbf0a84f4" dependencies = [ "itoa", "ryu", @@ -1890,12 +1890,11 @@ dependencies = [ [[package]] name = "spin_sleep" -version = "1.1.1" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cafa7900db085f4354dbc7025e25d7a839a14360ea13b5fc4fd717f2d3b23134" +checksum = "368a978649eaf70006b082e79c832bd72556ac1393eaf564d686e919dca2347f" dependencies = [ - "once_cell", - "winapi", + "windows-sys 0.52.0", ] [[package]] @@ -2148,9 +2147,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.46" +version = "2.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89456b690ff72fddcecf231caedbe615c59480c93358a93dfae7fc29e3ebbf0e" +checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" dependencies = [ "proc-macro2", "quote", @@ -2199,7 +2198,7 @@ checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" dependencies = [ "proc-macro2", "quote", - "syn 2.0.46", + "syn 2.0.48", ] [[package]] @@ -2285,7 +2284,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.46", + "syn 2.0.48", ] [[package]] @@ -2465,7 +2464,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.46", + "syn 2.0.48", ] [[package]] @@ -2599,28 +2598,6 @@ version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22fc3756b8a9133049b26c7f61ab35416c130e8c09b660f5b3958b446f52cc50" -[[package]] -name = "winapi" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" -dependencies = [ - "winapi-i686-pc-windows-gnu", - "winapi-x86_64-pc-windows-gnu", -] - -[[package]] -name = "winapi-i686-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" - -[[package]] -name = "winapi-x86_64-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" - [[package]] name = "windows-sys" version = "0.48.0" @@ -2755,9 +2732,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "winnow" -version = "0.5.32" +version = "0.5.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8434aeec7b290e8da5c3f0d628cb0eac6cabcb31d14bb74f779a08109a5914d6" +checksum = "b7520bbdec7211caa7c4e682eb1fbe07abe20cee6756b6e00f537c82c11816aa" dependencies = [ "memchr", ] @@ -2788,7 +2765,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.46", + "syn 2.0.48", ] [[package]] diff --git a/cats-radio-node.db b/cats-radio-node.db index f4b42dd..e69de29 100644 Binary files a/cats-radio-node.db and b/cats-radio-node.db differ diff --git a/migrations/20231230_create.sql b/migrations/20231230_create.sql index 23d7282..261c18d 100644 --- a/migrations/20231230_create.sql +++ b/migrations/20231230_create.sql @@ -1,6 +1,6 @@ CREATE TABLE IF NOT EXISTS frames_received ( id INTEGER NOT NULL PRIMARY KEY, - recevied_at INTEGER, - content VARCHAR + received_at INTEGER, + content BLOB ); diff --git a/src/bin/fake-radio.rs b/src/bin/fake-radio.rs index fd38149..b7daf67 100644 --- a/src/bin/fake-radio.rs +++ b/src/bin/fake-radio.rs @@ -1,5 +1,4 @@ use anyhow::{anyhow, Context}; -use tokio::net::UdpSocket; use ham_cats::{ buffer::Buffer, whisker::{Arbitrary, Identification, Gps}, @@ -7,46 +6,7 @@ use ham_cats::{ const MAX_PACKET_LEN : usize = 8191; -#[tokio::main] -async fn main() -> std::io::Result<()> { - eprintln!("Sending example packet"); - - let packet = build_example_packet().await.unwrap(); - let sock = UdpSocket::bind("127.0.0.1:9074").await.unwrap(); - sock.send_to(&packet, "127.0.0.1:9073").await.unwrap(); - - eprintln!("Receiving packets. Ctrl-C to stop"); - - let mut data = [0; MAX_PACKET_LEN]; - while let Ok((len, _addr)) = sock.recv_from(&mut data).await { - let mut buf = [0; MAX_PACKET_LEN]; - match ham_cats::packet::Packet::fully_decode(&data[2..len], &mut buf) { - Ok(packet) => { - if let Some(ident) = packet.identification() { - eprintln!(" Ident {}-{}", ident.callsign, ident.ssid); - } - - if let Some(gps) = packet.gps() { - eprintln!(" GPS {} {}", gps.latitude(), gps.longitude()); - } - - let mut comment = [0; 1024]; - if let Ok(c) = packet.comment(&mut comment) { - eprintln!(" Comment {}", c); - } - - eprintln!(" With {} Arbitrary whiskers", packet.arbitrary_iter().count()); - }, - Err(e) => { - eprintln!(" Cannot decode packet of length {} {:?}", len, e); - } - } - } - - Ok(()) -} - -async fn build_example_packet() -> anyhow::Result> { +fn build_example_packet(comment: &str) -> anyhow::Result> { let callsign = "EX4MPLE"; let ssid = 0; let icon = 0; @@ -59,7 +19,7 @@ async fn build_example_packet() -> anyhow::Result> { ) .map_err(|e| anyhow!("Could not add identification to packet: {e}"))?; - pkt.add_comment("Debugging packet") + pkt.add_comment(comment) .map_err(|e| anyhow!("Could not add comment to packet: {e}"))?; let latitude = 46.5; @@ -89,3 +49,52 @@ async fn build_example_packet() -> anyhow::Result> { Ok(data.to_vec()) } + +fn main() -> std::io::Result<()> { + std::thread::spawn(receive_loop); + + eprintln!("Receiving messages. Write a comment and press ENTER to send. Ctrl-C to stop"); + let mut stdin_lines = std::io::stdin().lines(); + let sock = std::net::UdpSocket::bind("127.0.0.1:9075").unwrap(); + + while let Some(Ok(line)) = stdin_lines.next() { + eprintln!("Sending with comment = {}", line); + + let packet = build_example_packet(&line).unwrap(); + sock.send_to(&packet, "127.0.0.1:9073").unwrap(); + } + + Ok(()) +} + +fn receive_loop() { + let sock = std::net::UdpSocket::bind("127.0.0.1:9074").unwrap(); + let mut data = [0; MAX_PACKET_LEN]; + while let Ok((len, _addr)) = sock.recv_from(&mut data) { + eprintln!("Packet of length {}", len); + + let mut buf = [0; MAX_PACKET_LEN]; + match ham_cats::packet::Packet::fully_decode(&data[2..len], &mut buf) { + Ok(packet) => { + if let Some(ident) = packet.identification() { + eprintln!(" Ident {}-{}", ident.callsign, ident.ssid); + } + + if let Some(gps) = packet.gps() { + eprintln!(" GPS {} {}", gps.latitude(), gps.longitude()); + } + + let mut comment = [0; 1024]; + if let Ok(c) = packet.comment(&mut comment) { + eprintln!(" Comment {}", c); + } + + eprintln!(" With {} Arbitrary whiskers", packet.arbitrary_iter().count()); + }, + Err(e) => { + eprintln!(" Cannot decode {:?}", e); + } + } + } +} + diff --git a/src/db.rs b/src/db.rs new file mode 100644 index 0000000..26f0c32 --- /dev/null +++ b/src/db.rs @@ -0,0 +1,60 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +use log::debug; +use sqlx::SqlitePool; + +#[derive(Clone)] +pub struct Database { + pool : SqlitePool +} + +#[derive(sqlx::FromRow, Debug)] +pub struct Packet { + pub id : i64, + pub received_at : i64, + pub content : Vec, +} + +impl Database { + pub async fn new() -> Self { + let pool = SqlitePool::connect("sqlite:cats-radio-node.db").await.unwrap(); + let mut conn = pool.acquire().await.unwrap(); + + sqlx::migrate!() + .run(&mut conn) + .await + .expect("could not run SQLx migrations"); + + Self { pool } + } + + pub async fn store_packet(&mut self, packet: &[u8]) -> anyhow::Result<()> { + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards"); + + let timestamp_i64 : i64 = timestamp.as_secs().try_into()?; + + let id = sqlx::query(r#"INSERT INTO frames_received (received_at, content) VALUES ( ?1 , ?2 )"#) + .bind(timestamp_i64).bind(packet) + .execute(&self.pool) + .await? + .last_insert_rowid(); + + debug!("INSERTed row {id}"); + Ok(()) + } + + pub async fn get_most_recent_packets(&mut self, count: i64) -> anyhow::Result> { + let results = sqlx::query_as(r#" + SELECT id, received_at, content + FROM frames_received + ORDER BY received_at DESC + LIMIT ?1"#) + .bind(count) + .fetch_all(&self.pool) + .await?; + + Ok(results) + } +} diff --git a/src/main.rs b/src/main.rs index 0a427e9..796dd7f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,16 +3,16 @@ use log::{debug, info, warn, error}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; use tokio::sync::mpsc; -use sqlx::{Connection, SqliteConnection}; use radio::{RadioManager, MAX_PACKET_LEN}; +mod db; mod radio; mod config; mod ui; struct AppState { conf : config::Config, - db : Mutex, + db : db::Database, transmit_queue : mpsc::Sender>, } @@ -22,17 +22,17 @@ type SharedState = Arc>; async fn main() -> std::io::Result<()> { simple_logger::SimpleLogger::new().env().init().unwrap(); - let mut conn = SqliteConnection::connect("sqlite:cats-radio-node.db").await.unwrap(); - sqlx::migrate!() - .run(&mut conn) - .await - .expect("could not run SQLx migrations"); - let conf = config::Config::load().expect("Could not load config"); let (radio_rx_queue, mut packet_receive) = mpsc::channel(16); let (packet_send, mut radio_tx_queue) = mpsc::channel::>(16); + let shared_state = Arc::new(Mutex::new(AppState { + conf : conf.clone(), + db : db::Database::new().await, + transmit_queue: packet_send.clone(), + })); + if conf.freq == 0 { warn!("Frequency {0} is zero, disabling radio. Fake receiver udp 127.0.0.1:9073, sending to 9074", conf.freq); let sock_r = Arc::new(UdpSocket::bind("127.0.0.1:9073").await?); @@ -74,39 +74,38 @@ async fn main() -> std::io::Result<()> { } } }); + } - tokio::task::spawn(async move { - loop { - match packet_receive - .recv() - .await - .context("Packet receive channel died") { - Ok((packet, rssi)) => { - debug!("RX RSSI {} len {}", rssi, packet.len()); - let mut buf = [0; MAX_PACKET_LEN]; - match ham_cats::packet::Packet::fully_decode(&packet, &mut buf) { - Ok(packet) => { - if let Some(ident) = packet.identification() { - debug!(" From {}-{}", ident.callsign, ident.ssid); - } - // TODO save to db + let shared_state_receive = shared_state.clone(); + tokio::task::spawn(async move { + loop { + match packet_receive + .recv() + .await + .context("Packet receive channel died") { + Ok((packet_data, rssi)) => { + debug!("RX RSSI {} len {}", rssi, packet_data.len()); + let mut buf = [0; MAX_PACKET_LEN]; + match ham_cats::packet::Packet::fully_decode(&packet_data[2..], &mut buf) { + Ok(packet) => { + if let Some(ident) = packet.identification() { + debug!(" From {}-{}", ident.callsign, ident.ssid); } - Err(e) => { - warn!("Failed to decode packet: {}", e); + + let mut db = shared_state_receive.lock().unwrap().db.clone(); + if let Err(e) = db.store_packet(&packet_data).await { + warn!("Failed to write to sqlite: {}", e); } } - }, - Err(e) => warn!("Failed to decode packet: {}", e), - } - } - }); - } - - let shared_state = Arc::new(Mutex::new(AppState { - conf, - db: Mutex::new(conn), - transmit_queue: packet_send.clone(), - })); + Err(e) => { + warn!("Failed to decode packet: {}", e); + } + } + }, + Err(e) => warn!("Failed to decode packet: {}", e), + } + } + }); let port = 3000; info!("Setting up listener on port {port}"); diff --git a/src/ui.rs b/src/ui.rs index aedf56c..3ce2ab8 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -1,7 +1,7 @@ use anyhow::{anyhow, Context}; use std::str::FromStr; use axum::Json; -use log::info; +use log::{info, warn}; use serde::Deserialize; use askama::Template; use axum::{ @@ -19,7 +19,7 @@ use ham_cats::{ whisker::Identification, }; -use crate::config; +use crate::{config, radio::MAX_PACKET_LEN}; use crate::SharedState; pub async fn serve(port: u16, shared_state: SharedState) { @@ -51,13 +51,64 @@ struct DashboardTemplate<'a> { title: &'a str, page: ActivePage, conf: config::Config, + packets: Vec, +} + +struct UIPacket { + pub received_at : i64, + + pub from_callsign : String, + pub from_ssid : u8, + + pub comment : Option, } async fn dashboard(State(state): State) -> DashboardTemplate<'static> { + let mut db = state.lock().unwrap().db.clone(); + + let packets = match db.get_most_recent_packets(10).await { + Ok(v) => v, + Err(e) => { + warn!("Dashboard will have empty packet list: {}", e); + Vec::new() + }, + }.iter() + .filter_map(|db_packet| { + let mut buf = [0; MAX_PACKET_LEN]; + match ham_cats::packet::Packet::fully_decode(&db_packet.content[2..], &mut buf) { + Ok(p) => { + if let Some(ident) = p.identification() { + + let mut commentbuf = [0; 1024]; + let comment = match p.comment(&mut commentbuf) { + Ok(c) => Some(c.to_owned()), + Err(_) => None, + }; + + Some(UIPacket { + received_at : db_packet.received_at, + from_callsign : ident.callsign.to_string(), + from_ssid : ident.ssid, + comment + }) + } + else { + None + } + }, + Err(e) => { + warn!("Failed to decode packet {}: {}", db_packet.id, e); + None + }, + } + }) + .collect(); + DashboardTemplate { title: "Dashboard", conf: state.lock().unwrap().conf.clone(), page: ActivePage::Dashboard, + packets } } diff --git a/templates/dashboard.html b/templates/dashboard.html index b2cc5f4..be176c4 100644 --- a/templates/dashboard.html +++ b/templates/dashboard.html @@ -1,6 +1,16 @@ {% include "head.html" %}
- Dashboard! +

Dashboard

+
+

Ten most recent packets

+
    + {% for packet in packets %} +
  • {{ packet.received_at|e }} {{ packet.from_callsign|e }}-{{ packet.from_ssid|e }} + {% match packet.comment %}{% when Some with (val) %}{{ val|e }}{% when None %}N/A{% endmatch %} +
  • + {% endfor %} +
+
{% include "foot.html" %} {# vi:set et sw=2 ts=2: #} -- cgit v1.2.3