diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2024-01-07 22:22:51 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2024-01-07 22:22:51 +0100 |
commit | 241622ff7fd612eb521dec8829153851406c5884 (patch) | |
tree | 3d7b17caa99c7b08f01d94fad9cbdca249ff2612 /src | |
parent | b40299538a73d25d096d8f58f1468cd7f647a3f9 (diff) | |
download | cats-radio-node-241622ff7fd612eb521dec8829153851406c5884.tar.gz cats-radio-node-241622ff7fd612eb521dec8829153851406c5884.tar.bz2 cats-radio-node-241622ff7fd612eb521dec8829153851406c5884.zip |
Store incoming frames in DB and show most recent 10 in dashboard
Diffstat (limited to 'src')
-rw-r--r-- | src/bin/fake-radio.rs | 93 | ||||
-rw-r--r-- | src/db.rs | 60 | ||||
-rw-r--r-- | src/main.rs | 73 | ||||
-rw-r--r-- | src/ui.rs | 55 |
4 files changed, 200 insertions, 81 deletions
diff --git a/src/bin/fake-radio.rs b/src/bin/fake-radio.rs index fd38149..b7daf67 100644 --- a/src/bin/fake-radio.rs +++ b/src/bin/fake-radio.rs @@ -1,5 +1,4 @@ use anyhow::{anyhow, Context}; -use tokio::net::UdpSocket; use ham_cats::{ buffer::Buffer, whisker::{Arbitrary, Identification, Gps}, @@ -7,46 +6,7 @@ use ham_cats::{ const MAX_PACKET_LEN : usize = 8191; -#[tokio::main] -async fn main() -> std::io::Result<()> { - eprintln!("Sending example packet"); - - let packet = build_example_packet().await.unwrap(); - let sock = UdpSocket::bind("127.0.0.1:9074").await.unwrap(); - sock.send_to(&packet, "127.0.0.1:9073").await.unwrap(); - - eprintln!("Receiving packets. Ctrl-C to stop"); - - let mut data = [0; MAX_PACKET_LEN]; - while let Ok((len, _addr)) = sock.recv_from(&mut data).await { - let mut buf = [0; MAX_PACKET_LEN]; - match ham_cats::packet::Packet::fully_decode(&data[2..len], &mut buf) { - Ok(packet) => { - if let Some(ident) = packet.identification() { - eprintln!(" Ident {}-{}", ident.callsign, ident.ssid); - } - - if let Some(gps) = packet.gps() { - eprintln!(" GPS {} {}", gps.latitude(), gps.longitude()); - } - - let mut comment = [0; 1024]; - if let Ok(c) = packet.comment(&mut comment) { - eprintln!(" Comment {}", c); - } - - eprintln!(" With {} Arbitrary whiskers", packet.arbitrary_iter().count()); - }, - Err(e) => { - eprintln!(" Cannot decode packet of length {} {:?}", len, e); - } - } - } - - Ok(()) -} - -async fn build_example_packet() -> anyhow::Result<Vec<u8>> { +fn build_example_packet(comment: &str) -> anyhow::Result<Vec<u8>> { let callsign = "EX4MPLE"; let ssid = 0; let icon = 0; @@ -59,7 +19,7 @@ async fn build_example_packet() -> anyhow::Result<Vec<u8>> { ) .map_err(|e| anyhow!("Could not add identification to packet: {e}"))?; - pkt.add_comment("Debugging packet") + pkt.add_comment(comment) .map_err(|e| anyhow!("Could not add comment to packet: {e}"))?; let latitude = 46.5; @@ -89,3 +49,52 @@ async fn build_example_packet() -> anyhow::Result<Vec<u8>> { Ok(data.to_vec()) } + +fn main() -> std::io::Result<()> { + std::thread::spawn(receive_loop); + + eprintln!("Receiving messages. Write a comment and press ENTER to send. Ctrl-C to stop"); + let mut stdin_lines = std::io::stdin().lines(); + let sock = std::net::UdpSocket::bind("127.0.0.1:9075").unwrap(); + + while let Some(Ok(line)) = stdin_lines.next() { + eprintln!("Sending with comment = {}", line); + + let packet = build_example_packet(&line).unwrap(); + sock.send_to(&packet, "127.0.0.1:9073").unwrap(); + } + + Ok(()) +} + +fn receive_loop() { + let sock = std::net::UdpSocket::bind("127.0.0.1:9074").unwrap(); + let mut data = [0; MAX_PACKET_LEN]; + while let Ok((len, _addr)) = sock.recv_from(&mut data) { + eprintln!("Packet of length {}", len); + + let mut buf = [0; MAX_PACKET_LEN]; + match ham_cats::packet::Packet::fully_decode(&data[2..len], &mut buf) { + Ok(packet) => { + if let Some(ident) = packet.identification() { + eprintln!(" Ident {}-{}", ident.callsign, ident.ssid); + } + + if let Some(gps) = packet.gps() { + eprintln!(" GPS {} {}", gps.latitude(), gps.longitude()); + } + + let mut comment = [0; 1024]; + if let Ok(c) = packet.comment(&mut comment) { + eprintln!(" Comment {}", c); + } + + eprintln!(" With {} Arbitrary whiskers", packet.arbitrary_iter().count()); + }, + Err(e) => { + eprintln!(" Cannot decode {:?}", e); + } + } + } +} + diff --git a/src/db.rs b/src/db.rs new file mode 100644 index 0000000..26f0c32 --- /dev/null +++ b/src/db.rs @@ -0,0 +1,60 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +use log::debug; +use sqlx::SqlitePool; + +#[derive(Clone)] +pub struct Database { + pool : SqlitePool +} + +#[derive(sqlx::FromRow, Debug)] +pub struct Packet { + pub id : i64, + pub received_at : i64, + pub content : Vec<u8>, +} + +impl Database { + pub async fn new() -> Self { + let pool = SqlitePool::connect("sqlite:cats-radio-node.db").await.unwrap(); + let mut conn = pool.acquire().await.unwrap(); + + sqlx::migrate!() + .run(&mut conn) + .await + .expect("could not run SQLx migrations"); + + Self { pool } + } + + pub async fn store_packet(&mut self, packet: &[u8]) -> anyhow::Result<()> { + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards"); + + let timestamp_i64 : i64 = timestamp.as_secs().try_into()?; + + let id = sqlx::query(r#"INSERT INTO frames_received (received_at, content) VALUES ( ?1 , ?2 )"#) + .bind(timestamp_i64).bind(packet) + .execute(&self.pool) + .await? + .last_insert_rowid(); + + debug!("INSERTed row {id}"); + Ok(()) + } + + pub async fn get_most_recent_packets(&mut self, count: i64) -> anyhow::Result<Vec<Packet>> { + let results = sqlx::query_as(r#" + SELECT id, received_at, content + FROM frames_received + ORDER BY received_at DESC + LIMIT ?1"#) + .bind(count) + .fetch_all(&self.pool) + .await?; + + Ok(results) + } +} diff --git a/src/main.rs b/src/main.rs index 0a427e9..796dd7f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,16 +3,16 @@ use log::{debug, info, warn, error}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; use tokio::sync::mpsc; -use sqlx::{Connection, SqliteConnection}; use radio::{RadioManager, MAX_PACKET_LEN}; +mod db; mod radio; mod config; mod ui; struct AppState { conf : config::Config, - db : Mutex<SqliteConnection>, + db : db::Database, transmit_queue : mpsc::Sender<Vec<u8>>, } @@ -22,17 +22,17 @@ type SharedState = Arc<Mutex<AppState>>; async fn main() -> std::io::Result<()> { simple_logger::SimpleLogger::new().env().init().unwrap(); - let mut conn = SqliteConnection::connect("sqlite:cats-radio-node.db").await.unwrap(); - sqlx::migrate!() - .run(&mut conn) - .await - .expect("could not run SQLx migrations"); - let conf = config::Config::load().expect("Could not load config"); let (radio_rx_queue, mut packet_receive) = mpsc::channel(16); let (packet_send, mut radio_tx_queue) = mpsc::channel::<Vec<u8>>(16); + let shared_state = Arc::new(Mutex::new(AppState { + conf : conf.clone(), + db : db::Database::new().await, + transmit_queue: packet_send.clone(), + })); + if conf.freq == 0 { warn!("Frequency {0} is zero, disabling radio. Fake receiver udp 127.0.0.1:9073, sending to 9074", conf.freq); let sock_r = Arc::new(UdpSocket::bind("127.0.0.1:9073").await?); @@ -74,39 +74,38 @@ async fn main() -> std::io::Result<()> { } } }); + } - tokio::task::spawn(async move { - loop { - match packet_receive - .recv() - .await - .context("Packet receive channel died") { - Ok((packet, rssi)) => { - debug!("RX RSSI {} len {}", rssi, packet.len()); - let mut buf = [0; MAX_PACKET_LEN]; - match ham_cats::packet::Packet::fully_decode(&packet, &mut buf) { - Ok(packet) => { - if let Some(ident) = packet.identification() { - debug!(" From {}-{}", ident.callsign, ident.ssid); - } - // TODO save to db + let shared_state_receive = shared_state.clone(); + tokio::task::spawn(async move { + loop { + match packet_receive + .recv() + .await + .context("Packet receive channel died") { + Ok((packet_data, rssi)) => { + debug!("RX RSSI {} len {}", rssi, packet_data.len()); + let mut buf = [0; MAX_PACKET_LEN]; + match ham_cats::packet::Packet::fully_decode(&packet_data[2..], &mut buf) { + Ok(packet) => { + if let Some(ident) = packet.identification() { + debug!(" From {}-{}", ident.callsign, ident.ssid); } - Err(e) => { - warn!("Failed to decode packet: {}", e); + + let mut db = shared_state_receive.lock().unwrap().db.clone(); + if let Err(e) = db.store_packet(&packet_data).await { + warn!("Failed to write to sqlite: {}", e); } } - }, - Err(e) => warn!("Failed to decode packet: {}", e), - } - } - }); - } - - let shared_state = Arc::new(Mutex::new(AppState { - conf, - db: Mutex::new(conn), - transmit_queue: packet_send.clone(), - })); + Err(e) => { + warn!("Failed to decode packet: {}", e); + } + } + }, + Err(e) => warn!("Failed to decode packet: {}", e), + } + } + }); let port = 3000; info!("Setting up listener on port {port}"); @@ -1,7 +1,7 @@ use anyhow::{anyhow, Context}; use std::str::FromStr; use axum::Json; -use log::info; +use log::{info, warn}; use serde::Deserialize; use askama::Template; use axum::{ @@ -19,7 +19,7 @@ use ham_cats::{ whisker::Identification, }; -use crate::config; +use crate::{config, radio::MAX_PACKET_LEN}; use crate::SharedState; pub async fn serve(port: u16, shared_state: SharedState) { @@ -51,13 +51,64 @@ struct DashboardTemplate<'a> { title: &'a str, page: ActivePage, conf: config::Config, + packets: Vec<UIPacket>, +} + +struct UIPacket { + pub received_at : i64, + + pub from_callsign : String, + pub from_ssid : u8, + + pub comment : Option<String>, } async fn dashboard(State(state): State<SharedState>) -> DashboardTemplate<'static> { + let mut db = state.lock().unwrap().db.clone(); + + let packets = match db.get_most_recent_packets(10).await { + Ok(v) => v, + Err(e) => { + warn!("Dashboard will have empty packet list: {}", e); + Vec::new() + }, + }.iter() + .filter_map(|db_packet| { + let mut buf = [0; MAX_PACKET_LEN]; + match ham_cats::packet::Packet::fully_decode(&db_packet.content[2..], &mut buf) { + Ok(p) => { + if let Some(ident) = p.identification() { + + let mut commentbuf = [0; 1024]; + let comment = match p.comment(&mut commentbuf) { + Ok(c) => Some(c.to_owned()), + Err(_) => None, + }; + + Some(UIPacket { + received_at : db_packet.received_at, + from_callsign : ident.callsign.to_string(), + from_ssid : ident.ssid, + comment + }) + } + else { + None + } + }, + Err(e) => { + warn!("Failed to decode packet {}: {}", db_packet.id, e); + None + }, + } + }) + .collect(); + DashboardTemplate { title: "Dashboard", conf: state.lock().unwrap().conf.clone(), page: ActivePage::Dashboard, + packets } } |