aboutsummaryrefslogtreecommitdiffstats
path: root/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs73
1 files changed, 36 insertions, 37 deletions
diff --git a/src/main.rs b/src/main.rs
index 0a427e9..796dd7f 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,16 +3,16 @@ use log::{debug, info, warn, error};
use std::sync::{Arc, Mutex};
use tokio::net::UdpSocket;
use tokio::sync::mpsc;
-use sqlx::{Connection, SqliteConnection};
use radio::{RadioManager, MAX_PACKET_LEN};
+mod db;
mod radio;
mod config;
mod ui;
struct AppState {
conf : config::Config,
- db : Mutex<SqliteConnection>,
+ db : db::Database,
transmit_queue : mpsc::Sender<Vec<u8>>,
}
@@ -22,17 +22,17 @@ type SharedState = Arc<Mutex<AppState>>;
async fn main() -> std::io::Result<()> {
simple_logger::SimpleLogger::new().env().init().unwrap();
- let mut conn = SqliteConnection::connect("sqlite:cats-radio-node.db").await.unwrap();
- sqlx::migrate!()
- .run(&mut conn)
- .await
- .expect("could not run SQLx migrations");
-
let conf = config::Config::load().expect("Could not load config");
let (radio_rx_queue, mut packet_receive) = mpsc::channel(16);
let (packet_send, mut radio_tx_queue) = mpsc::channel::<Vec<u8>>(16);
+ let shared_state = Arc::new(Mutex::new(AppState {
+ conf : conf.clone(),
+ db : db::Database::new().await,
+ transmit_queue: packet_send.clone(),
+ }));
+
if conf.freq == 0 {
warn!("Frequency {0} is zero, disabling radio. Fake receiver udp 127.0.0.1:9073, sending to 9074", conf.freq);
let sock_r = Arc::new(UdpSocket::bind("127.0.0.1:9073").await?);
@@ -74,39 +74,38 @@ async fn main() -> std::io::Result<()> {
}
}
});
+ }
- tokio::task::spawn(async move {
- loop {
- match packet_receive
- .recv()
- .await
- .context("Packet receive channel died") {
- Ok((packet, rssi)) => {
- debug!("RX RSSI {} len {}", rssi, packet.len());
- let mut buf = [0; MAX_PACKET_LEN];
- match ham_cats::packet::Packet::fully_decode(&packet, &mut buf) {
- Ok(packet) => {
- if let Some(ident) = packet.identification() {
- debug!(" From {}-{}", ident.callsign, ident.ssid);
- }
- // TODO save to db
+ let shared_state_receive = shared_state.clone();
+ tokio::task::spawn(async move {
+ loop {
+ match packet_receive
+ .recv()
+ .await
+ .context("Packet receive channel died") {
+ Ok((packet_data, rssi)) => {
+ debug!("RX RSSI {} len {}", rssi, packet_data.len());
+ let mut buf = [0; MAX_PACKET_LEN];
+ match ham_cats::packet::Packet::fully_decode(&packet_data[2..], &mut buf) {
+ Ok(packet) => {
+ if let Some(ident) = packet.identification() {
+ debug!(" From {}-{}", ident.callsign, ident.ssid);
}
- Err(e) => {
- warn!("Failed to decode packet: {}", e);
+
+ let mut db = shared_state_receive.lock().unwrap().db.clone();
+ if let Err(e) = db.store_packet(&packet_data).await {
+ warn!("Failed to write to sqlite: {}", e);
}
}
- },
- Err(e) => warn!("Failed to decode packet: {}", e),
- }
- }
- });
- }
-
- let shared_state = Arc::new(Mutex::new(AppState {
- conf,
- db: Mutex::new(conn),
- transmit_queue: packet_send.clone(),
- }));
+ Err(e) => {
+ warn!("Failed to decode packet: {}", e);
+ }
+ }
+ },
+ Err(e) => warn!("Failed to decode packet: {}", e),
+ }
+ }
+ });
let port = 3000;
info!("Setting up listener on port {port}");