aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2024-01-07 22:22:51 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2024-01-07 22:22:51 +0100
commit241622ff7fd612eb521dec8829153851406c5884 (patch)
tree3d7b17caa99c7b08f01d94fad9cbdca249ff2612 /src
parentb40299538a73d25d096d8f58f1468cd7f647a3f9 (diff)
downloadcats-radio-node-241622ff7fd612eb521dec8829153851406c5884.tar.gz
cats-radio-node-241622ff7fd612eb521dec8829153851406c5884.tar.bz2
cats-radio-node-241622ff7fd612eb521dec8829153851406c5884.zip
Store incoming frames in DB and show most recent 10 in dashboard
Diffstat (limited to 'src')
-rw-r--r--src/bin/fake-radio.rs93
-rw-r--r--src/db.rs60
-rw-r--r--src/main.rs73
-rw-r--r--src/ui.rs55
4 files changed, 200 insertions, 81 deletions
diff --git a/src/bin/fake-radio.rs b/src/bin/fake-radio.rs
index fd38149..b7daf67 100644
--- a/src/bin/fake-radio.rs
+++ b/src/bin/fake-radio.rs
@@ -1,5 +1,4 @@
use anyhow::{anyhow, Context};
-use tokio::net::UdpSocket;
use ham_cats::{
buffer::Buffer,
whisker::{Arbitrary, Identification, Gps},
@@ -7,46 +6,7 @@ use ham_cats::{
const MAX_PACKET_LEN : usize = 8191;
-#[tokio::main]
-async fn main() -> std::io::Result<()> {
- eprintln!("Sending example packet");
-
- let packet = build_example_packet().await.unwrap();
- let sock = UdpSocket::bind("127.0.0.1:9074").await.unwrap();
- sock.send_to(&packet, "127.0.0.1:9073").await.unwrap();
-
- eprintln!("Receiving packets. Ctrl-C to stop");
-
- let mut data = [0; MAX_PACKET_LEN];
- while let Ok((len, _addr)) = sock.recv_from(&mut data).await {
- let mut buf = [0; MAX_PACKET_LEN];
- match ham_cats::packet::Packet::fully_decode(&data[2..len], &mut buf) {
- Ok(packet) => {
- if let Some(ident) = packet.identification() {
- eprintln!(" Ident {}-{}", ident.callsign, ident.ssid);
- }
-
- if let Some(gps) = packet.gps() {
- eprintln!(" GPS {} {}", gps.latitude(), gps.longitude());
- }
-
- let mut comment = [0; 1024];
- if let Ok(c) = packet.comment(&mut comment) {
- eprintln!(" Comment {}", c);
- }
-
- eprintln!(" With {} Arbitrary whiskers", packet.arbitrary_iter().count());
- },
- Err(e) => {
- eprintln!(" Cannot decode packet of length {} {:?}", len, e);
- }
- }
- }
-
- Ok(())
-}
-
-async fn build_example_packet() -> anyhow::Result<Vec<u8>> {
+fn build_example_packet(comment: &str) -> anyhow::Result<Vec<u8>> {
let callsign = "EX4MPLE";
let ssid = 0;
let icon = 0;
@@ -59,7 +19,7 @@ async fn build_example_packet() -> anyhow::Result<Vec<u8>> {
)
.map_err(|e| anyhow!("Could not add identification to packet: {e}"))?;
- pkt.add_comment("Debugging packet")
+ pkt.add_comment(comment)
.map_err(|e| anyhow!("Could not add comment to packet: {e}"))?;
let latitude = 46.5;
@@ -89,3 +49,52 @@ async fn build_example_packet() -> anyhow::Result<Vec<u8>> {
Ok(data.to_vec())
}
+
+fn main() -> std::io::Result<()> {
+ std::thread::spawn(receive_loop);
+
+ eprintln!("Receiving messages. Write a comment and press ENTER to send. Ctrl-C to stop");
+ let mut stdin_lines = std::io::stdin().lines();
+ let sock = std::net::UdpSocket::bind("127.0.0.1:9075").unwrap();
+
+ while let Some(Ok(line)) = stdin_lines.next() {
+ eprintln!("Sending with comment = {}", line);
+
+ let packet = build_example_packet(&line).unwrap();
+ sock.send_to(&packet, "127.0.0.1:9073").unwrap();
+ }
+
+ Ok(())
+}
+
+fn receive_loop() {
+ let sock = std::net::UdpSocket::bind("127.0.0.1:9074").unwrap();
+ let mut data = [0; MAX_PACKET_LEN];
+ while let Ok((len, _addr)) = sock.recv_from(&mut data) {
+ eprintln!("Packet of length {}", len);
+
+ let mut buf = [0; MAX_PACKET_LEN];
+ match ham_cats::packet::Packet::fully_decode(&data[2..len], &mut buf) {
+ Ok(packet) => {
+ if let Some(ident) = packet.identification() {
+ eprintln!(" Ident {}-{}", ident.callsign, ident.ssid);
+ }
+
+ if let Some(gps) = packet.gps() {
+ eprintln!(" GPS {} {}", gps.latitude(), gps.longitude());
+ }
+
+ let mut comment = [0; 1024];
+ if let Ok(c) = packet.comment(&mut comment) {
+ eprintln!(" Comment {}", c);
+ }
+
+ eprintln!(" With {} Arbitrary whiskers", packet.arbitrary_iter().count());
+ },
+ Err(e) => {
+ eprintln!(" Cannot decode {:?}", e);
+ }
+ }
+ }
+}
+
diff --git a/src/db.rs b/src/db.rs
new file mode 100644
index 0000000..26f0c32
--- /dev/null
+++ b/src/db.rs
@@ -0,0 +1,60 @@
+use std::time::{SystemTime, UNIX_EPOCH};
+
+use log::debug;
+use sqlx::SqlitePool;
+
+#[derive(Clone)]
+pub struct Database {
+ pool : SqlitePool
+}
+
+#[derive(sqlx::FromRow, Debug)]
+pub struct Packet {
+ pub id : i64,
+ pub received_at : i64,
+ pub content : Vec<u8>,
+}
+
+impl Database {
+ pub async fn new() -> Self {
+ let pool = SqlitePool::connect("sqlite:cats-radio-node.db").await.unwrap();
+ let mut conn = pool.acquire().await.unwrap();
+
+ sqlx::migrate!()
+ .run(&mut conn)
+ .await
+ .expect("could not run SQLx migrations");
+
+ Self { pool }
+ }
+
+ pub async fn store_packet(&mut self, packet: &[u8]) -> anyhow::Result<()> {
+ let timestamp = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("Time went backwards");
+
+ let timestamp_i64 : i64 = timestamp.as_secs().try_into()?;
+
+ let id = sqlx::query(r#"INSERT INTO frames_received (received_at, content) VALUES ( ?1 , ?2 )"#)
+ .bind(timestamp_i64).bind(packet)
+ .execute(&self.pool)
+ .await?
+ .last_insert_rowid();
+
+ debug!("INSERTed row {id}");
+ Ok(())
+ }
+
+ pub async fn get_most_recent_packets(&mut self, count: i64) -> anyhow::Result<Vec<Packet>> {
+ let results = sqlx::query_as(r#"
+ SELECT id, received_at, content
+ FROM frames_received
+ ORDER BY received_at DESC
+ LIMIT ?1"#)
+ .bind(count)
+ .fetch_all(&self.pool)
+ .await?;
+
+ Ok(results)
+ }
+}
diff --git a/src/main.rs b/src/main.rs
index 0a427e9..796dd7f 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,16 +3,16 @@ use log::{debug, info, warn, error};
use std::sync::{Arc, Mutex};
use tokio::net::UdpSocket;
use tokio::sync::mpsc;
-use sqlx::{Connection, SqliteConnection};
use radio::{RadioManager, MAX_PACKET_LEN};
+mod db;
mod radio;
mod config;
mod ui;
struct AppState {
conf : config::Config,
- db : Mutex<SqliteConnection>,
+ db : db::Database,
transmit_queue : mpsc::Sender<Vec<u8>>,
}
@@ -22,17 +22,17 @@ type SharedState = Arc<Mutex<AppState>>;
async fn main() -> std::io::Result<()> {
simple_logger::SimpleLogger::new().env().init().unwrap();
- let mut conn = SqliteConnection::connect("sqlite:cats-radio-node.db").await.unwrap();
- sqlx::migrate!()
- .run(&mut conn)
- .await
- .expect("could not run SQLx migrations");
-
let conf = config::Config::load().expect("Could not load config");
let (radio_rx_queue, mut packet_receive) = mpsc::channel(16);
let (packet_send, mut radio_tx_queue) = mpsc::channel::<Vec<u8>>(16);
+ let shared_state = Arc::new(Mutex::new(AppState {
+ conf : conf.clone(),
+ db : db::Database::new().await,
+ transmit_queue: packet_send.clone(),
+ }));
+
if conf.freq == 0 {
warn!("Frequency {0} is zero, disabling radio. Fake receiver udp 127.0.0.1:9073, sending to 9074", conf.freq);
let sock_r = Arc::new(UdpSocket::bind("127.0.0.1:9073").await?);
@@ -74,39 +74,38 @@ async fn main() -> std::io::Result<()> {
}
}
});
+ }
- tokio::task::spawn(async move {
- loop {
- match packet_receive
- .recv()
- .await
- .context("Packet receive channel died") {
- Ok((packet, rssi)) => {
- debug!("RX RSSI {} len {}", rssi, packet.len());
- let mut buf = [0; MAX_PACKET_LEN];
- match ham_cats::packet::Packet::fully_decode(&packet, &mut buf) {
- Ok(packet) => {
- if let Some(ident) = packet.identification() {
- debug!(" From {}-{}", ident.callsign, ident.ssid);
- }
- // TODO save to db
+ let shared_state_receive = shared_state.clone();
+ tokio::task::spawn(async move {
+ loop {
+ match packet_receive
+ .recv()
+ .await
+ .context("Packet receive channel died") {
+ Ok((packet_data, rssi)) => {
+ debug!("RX RSSI {} len {}", rssi, packet_data.len());
+ let mut buf = [0; MAX_PACKET_LEN];
+ match ham_cats::packet::Packet::fully_decode(&packet_data[2..], &mut buf) {
+ Ok(packet) => {
+ if let Some(ident) = packet.identification() {
+ debug!(" From {}-{}", ident.callsign, ident.ssid);
}
- Err(e) => {
- warn!("Failed to decode packet: {}", e);
+
+ let mut db = shared_state_receive.lock().unwrap().db.clone();
+ if let Err(e) = db.store_packet(&packet_data).await {
+ warn!("Failed to write to sqlite: {}", e);
}
}
- },
- Err(e) => warn!("Failed to decode packet: {}", e),
- }
- }
- });
- }
-
- let shared_state = Arc::new(Mutex::new(AppState {
- conf,
- db: Mutex::new(conn),
- transmit_queue: packet_send.clone(),
- }));
+ Err(e) => {
+ warn!("Failed to decode packet: {}", e);
+ }
+ }
+ },
+ Err(e) => warn!("Failed to decode packet: {}", e),
+ }
+ }
+ });
let port = 3000;
info!("Setting up listener on port {port}");
diff --git a/src/ui.rs b/src/ui.rs
index aedf56c..3ce2ab8 100644
--- a/src/ui.rs
+++ b/src/ui.rs
@@ -1,7 +1,7 @@
use anyhow::{anyhow, Context};
use std::str::FromStr;
use axum::Json;
-use log::info;
+use log::{info, warn};
use serde::Deserialize;
use askama::Template;
use axum::{
@@ -19,7 +19,7 @@ use ham_cats::{
whisker::Identification,
};
-use crate::config;
+use crate::{config, radio::MAX_PACKET_LEN};
use crate::SharedState;
pub async fn serve(port: u16, shared_state: SharedState) {
@@ -51,13 +51,64 @@ struct DashboardTemplate<'a> {
title: &'a str,
page: ActivePage,
conf: config::Config,
+ packets: Vec<UIPacket>,
+}
+
+struct UIPacket {
+ pub received_at : i64,
+
+ pub from_callsign : String,
+ pub from_ssid : u8,
+
+ pub comment : Option<String>,
}
async fn dashboard(State(state): State<SharedState>) -> DashboardTemplate<'static> {
+ let mut db = state.lock().unwrap().db.clone();
+
+ let packets = match db.get_most_recent_packets(10).await {
+ Ok(v) => v,
+ Err(e) => {
+ warn!("Dashboard will have empty packet list: {}", e);
+ Vec::new()
+ },
+ }.iter()
+ .filter_map(|db_packet| {
+ let mut buf = [0; MAX_PACKET_LEN];
+ match ham_cats::packet::Packet::fully_decode(&db_packet.content[2..], &mut buf) {
+ Ok(p) => {
+ if let Some(ident) = p.identification() {
+
+ let mut commentbuf = [0; 1024];
+ let comment = match p.comment(&mut commentbuf) {
+ Ok(c) => Some(c.to_owned()),
+ Err(_) => None,
+ };
+
+ Some(UIPacket {
+ received_at : db_packet.received_at,
+ from_callsign : ident.callsign.to_string(),
+ from_ssid : ident.ssid,
+ comment
+ })
+ }
+ else {
+ None
+ }
+ },
+ Err(e) => {
+ warn!("Failed to decode packet {}: {}", db_packet.id, e);
+ None
+ },
+ }
+ })
+ .collect();
+
DashboardTemplate {
title: "Dashboard",
conf: state.lock().unwrap().conf.clone(),
page: ActivePage::Dashboard,
+ packets
}
}