From 241622ff7fd612eb521dec8829153851406c5884 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 7 Jan 2024 22:22:51 +0100 Subject: Store incoming frames in DB and show most recent 10 in dashboard --- src/main.rs | 73 ++++++++++++++++++++++++++++++------------------------------- 1 file changed, 36 insertions(+), 37 deletions(-) (limited to 'src/main.rs') diff --git a/src/main.rs b/src/main.rs index 0a427e9..796dd7f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,16 +3,16 @@ use log::{debug, info, warn, error}; use std::sync::{Arc, Mutex}; use tokio::net::UdpSocket; use tokio::sync::mpsc; -use sqlx::{Connection, SqliteConnection}; use radio::{RadioManager, MAX_PACKET_LEN}; +mod db; mod radio; mod config; mod ui; struct AppState { conf : config::Config, - db : Mutex, + db : db::Database, transmit_queue : mpsc::Sender>, } @@ -22,17 +22,17 @@ type SharedState = Arc>; async fn main() -> std::io::Result<()> { simple_logger::SimpleLogger::new().env().init().unwrap(); - let mut conn = SqliteConnection::connect("sqlite:cats-radio-node.db").await.unwrap(); - sqlx::migrate!() - .run(&mut conn) - .await - .expect("could not run SQLx migrations"); - let conf = config::Config::load().expect("Could not load config"); let (radio_rx_queue, mut packet_receive) = mpsc::channel(16); let (packet_send, mut radio_tx_queue) = mpsc::channel::>(16); + let shared_state = Arc::new(Mutex::new(AppState { + conf : conf.clone(), + db : db::Database::new().await, + transmit_queue: packet_send.clone(), + })); + if conf.freq == 0 { warn!("Frequency {0} is zero, disabling radio. Fake receiver udp 127.0.0.1:9073, sending to 9074", conf.freq); let sock_r = Arc::new(UdpSocket::bind("127.0.0.1:9073").await?); @@ -74,39 +74,38 @@ async fn main() -> std::io::Result<()> { } } }); + } - tokio::task::spawn(async move { - loop { - match packet_receive - .recv() - .await - .context("Packet receive channel died") { - Ok((packet, rssi)) => { - debug!("RX RSSI {} len {}", rssi, packet.len()); - let mut buf = [0; MAX_PACKET_LEN]; - match ham_cats::packet::Packet::fully_decode(&packet, &mut buf) { - Ok(packet) => { - if let Some(ident) = packet.identification() { - debug!(" From {}-{}", ident.callsign, ident.ssid); - } - // TODO save to db + let shared_state_receive = shared_state.clone(); + tokio::task::spawn(async move { + loop { + match packet_receive + .recv() + .await + .context("Packet receive channel died") { + Ok((packet_data, rssi)) => { + debug!("RX RSSI {} len {}", rssi, packet_data.len()); + let mut buf = [0; MAX_PACKET_LEN]; + match ham_cats::packet::Packet::fully_decode(&packet_data[2..], &mut buf) { + Ok(packet) => { + if let Some(ident) = packet.identification() { + debug!(" From {}-{}", ident.callsign, ident.ssid); } - Err(e) => { - warn!("Failed to decode packet: {}", e); + + let mut db = shared_state_receive.lock().unwrap().db.clone(); + if let Err(e) = db.store_packet(&packet_data).await { + warn!("Failed to write to sqlite: {}", e); } } - }, - Err(e) => warn!("Failed to decode packet: {}", e), - } - } - }); - } - - let shared_state = Arc::new(Mutex::new(AppState { - conf, - db: Mutex::new(conn), - transmit_queue: packet_send.clone(), - })); + Err(e) => { + warn!("Failed to decode packet: {}", e); + } + } + }, + Err(e) => warn!("Failed to decode packet: {}", e), + } + } + }); let port = 3000; info!("Setting up listener on port {port}"); -- cgit v1.2.3