diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2024-01-03 21:57:27 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2024-01-03 21:57:27 +0100 |
commit | c2742cde3d034b2af9bbcee90765338ee094e6cc (patch) | |
tree | c35399d5d52725efbf3ebaaee83752574bf2ba1c /src | |
parent | 43201470d4092f54c176dd5ebd2d7b0bbf15192e (diff) | |
download | cats-radio-node-c2742cde3d034b2af9bbcee90765338ee094e6cc.tar.gz cats-radio-node-c2742cde3d034b2af9bbcee90765338ee094e6cc.tar.bz2 cats-radio-node-c2742cde3d034b2af9bbcee90765338ee094e6cc.zip |
Add src/radio and freq config
Diffstat (limited to 'src')
-rw-r--r-- | src/config.rs | 2 | ||||
-rw-r--r-- | src/main.rs | 64 | ||||
-rw-r--r-- | src/radio.rs | 161 | ||||
-rw-r--r-- | src/ui.rs | 24 |
4 files changed, 227 insertions, 24 deletions
diff --git a/src/config.rs b/src/config.rs index d54e01a..7700b45 100644 --- a/src/config.rs +++ b/src/config.rs @@ -69,6 +69,7 @@ impl Default for BeaconConfig { #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Config { + pub freq: u32, // kHz pub callsign: String, pub ssid: u8, #[serde(default)] @@ -81,6 +82,7 @@ pub struct Config { impl Default for Config { fn default() -> Self { Config { + freq: 430500, callsign: "CHANGEME".to_owned(), ssid: 0, icon: 0, diff --git a/src/main.rs b/src/main.rs index 20fedf6..9a4ae2b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,11 @@ +use anyhow::Context; +use log::{debug, info, warn, error}; use std::sync::{Arc, Mutex}; +use tokio::sync::mpsc; use sqlx::{Connection, SqliteConnection}; +use radio::{RadioManager, MAX_PACKET_LEN}; +mod radio; mod config; mod ui; @@ -13,8 +18,7 @@ type SharedState = Arc<Mutex<AppState>>; #[tokio::main] async fn main() -> std::io::Result<()> { - - // simple_logger:: + simple_logger::SimpleLogger::new().env().init().unwrap(); let mut conn = SqliteConnection::connect("sqlite:cats-radio-node.db").await.unwrap(); sqlx::migrate!() @@ -24,12 +28,66 @@ async fn main() -> std::io::Result<()> { let conf = config::Config::load().expect("Could not load config"); + if conf.freq == 0 { + warn!("Frequency {0} is zero, disabling radio", conf.freq); + } + else if !(430000..=436380).contains(&conf.freq) { + error!("Frequency {} kHz out of range (430MHz - 436.375MHz), skipping radio setup", conf.freq); + } + else { + info!("Setting up radio"); + let (packet_tx, mut packet_receive) = mpsc::channel(16); + let (packet_send, packet_rx) = mpsc::channel(16); + let mut radio = RadioManager::new(packet_tx, packet_rx).expect("Could not initialize radio"); + + let channel = ((conf.freq - 430000) / 25) as u8; + radio.set_channel(channel); + let actual_freq = 430000 + 25 * channel as u32; + info!("Setting up radio on {actual_freq} kHz..."); + + tokio::task::spawn(async move { + loop { + if let Err(e) = radio.process_forever().await { + error!("Radio error: {e}") + } + } + }); + + tokio::task::spawn(async move { + loop { + match packet_receive + .recv() + .await + .context("Packet receive channel died") { + Ok((packet, rssi)) => { + debug!("RX RSSI {} len {}", rssi, packet.len()); + let mut buf = [0; MAX_PACKET_LEN]; + match ham_cats::packet::Packet::fully_decode(&packet, &mut buf) { + Ok(packet) => { + if let Some(ident) = packet.identification() { + debug!(" From {}-{}", ident.callsign, ident.ssid); + } + // TODO save to db + } + Err(e) => { + warn!("Failed to decode packet: {}", e); + } + } + }, + Err(e) => warn!("Failed to decode packet: {}", e), + } + } + }); + } + let shared_state = Arc::new(Mutex::new(AppState { conf, db: Mutex::new(conn) })); - ui::serve(3000, shared_state).await; + let port = 3000; + info!("Setting up listener on port {port}"); + ui::serve(port, shared_state).await; Ok(()) } diff --git a/src/radio.rs b/src/radio.rs new file mode 100644 index 0000000..77f7c0f --- /dev/null +++ b/src/radio.rs @@ -0,0 +1,161 @@ +use anyhow::{anyhow, bail, Context}; +use rand::{thread_rng, Rng}; +use rf4463::{config::RADIO_CONFIG_CATS, Rf4463}; +use rppal::{ + gpio::{Gpio, OutputPin}, + hal::Delay, + spi::{Bus, Mode, SlaveSelect, Spi}, +}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; +use tokio::sync::{ + mpsc::{error::TryRecvError, Receiver, Sender}, + Mutex, +}; + +pub const MAX_PACKET_LEN: usize = 8191; + +pub struct RadioManager { + radio: Rf4463<Spi, OutputPin, OutputPin, Delay>, + + tx: Sender<(Vec<u8>, f64)>, + rx: Receiver<Vec<u8>>, + rx_buf: [u8; MAX_PACKET_LEN], + temperature: Arc<Mutex<f32>>, +} + +impl RadioManager { + pub fn new(tx: Sender<(Vec<u8>, f64)>, rx: Receiver<Vec<u8>>) -> anyhow::Result<Self> { + let spi = Spi::new(Bus::Spi0, SlaveSelect::Ss0, 1_000_000, Mode::Mode0)?; + let gpio = Gpio::new()?; + let sdn = gpio.get(22)?.into_output(); + let cs = gpio.get(24)?.into_output(); + + let delay = Delay::new(); + + let mut radio = Rf4463::new(spi, sdn, cs, delay, &mut RADIO_CONFIG_CATS.clone()) + .map_err(|e| anyhow!("{e:?}"))?; + radio.set_channel(20); + + let rx_buf = [0; MAX_PACKET_LEN]; + let temperature = Arc::new(Mutex::new(radio.get_temp()?)); + + Ok(Self { + radio, + tx, + rx, + rx_buf, + temperature, + }) + } + + pub fn set_channel(&mut self, channel: u8) { + self.radio.set_channel(channel); + } + + pub fn temperature_mutex(&self) -> Arc<Mutex<f32>> { + self.temperature.clone() + } + + pub async fn process_forever(&mut self) -> anyhow::Result<()> { + loop { + self.tick().await?; + + *self.temperature.lock().await = self.radio.get_temp()?; + + match self.rx.try_recv() { + Ok(pkt) => { + self.tx(&pkt).await?; + } + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Disconnected) => { + bail!("RX channel disconnected") + } + } + + tokio::time::sleep(Duration::from_millis(25)).await; + } + } + + async fn tick(&mut self) -> anyhow::Result<()> { + if self.radio.is_idle() { + self.radio + .start_rx(None, false) + .map_err(|e| anyhow!("{e}"))?; + + tokio::time::sleep(Duration::from_millis(25)).await; + } + + self.radio + .interrupt(Some(&mut self.rx_buf), None) + .map_err(|e| anyhow!("{e:?}"))?; + + if let Some(data) = self + .radio + .finish_rx(&mut self.rx_buf) + .map_err(|e| anyhow!("{e}"))? + { + self.radio + .start_rx(None, false) + .map_err(|e| anyhow!("{e}"))?; + + self.tx + .send((data.data().to_vec(), data.rssi())) + .await + .ok() + .context("TX channel died")?; + } + + Ok(()) + } + + async fn tx(&mut self, data: &[u8]) -> anyhow::Result<()> { + // ensures we don't tx over a packet, + // and adds some random delay so that every node + // if offset slightly + self.tx_delay().await?; + + self.radio.start_tx(data).map_err(|e| anyhow!("{e:?}"))?; + + const TIMEOUT: Duration = Duration::from_secs(10); + let start_time = Instant::now(); + while !self.radio.is_idle() { + self.radio + .interrupt(None, Some(data)) + .map_err(|e| anyhow!("{e:?}"))?; + + if start_time + TIMEOUT < Instant::now() { + bail!("Timeout while transmitting"); + } + + tokio::time::sleep(Duration::from_millis(25)).await; + } + + Ok(()) + } + + async fn tx_delay(&mut self) -> anyhow::Result<()> { + loop { + let delay_ms = thread_rng().gen_range(0..50); + + // since delay_ms < 100 we can safely sleep without calling tick + tokio::time::sleep(Duration::from_millis(delay_ms)).await; + + let mut rx = false; + + while self.radio.is_busy_rxing()? { + rx = true; + self.tick().await?; + + tokio::time::sleep(Duration::from_millis(25)).await; + } + + if !rx { + // didn't rx a packet, so we're safe to leave + break Ok(()); + } + } + } +} @@ -21,27 +21,7 @@ pub async fn serve(port: u16, shared_state: SharedState) { .route("/send", get(send)) .route("/settings", get(show_settings).post(post_settings)) .nest_service("/static", ServeDir::new("static")) - /* requires tracing and tower, e.g. - * tower = { version = "0.4", features = ["util", "timeout"] } - * tower-http = { version = "0.5.0", features = ["add-extension", "trace"] } - * tracing = "0.1" - * tracing-subscriber = { version = "0.3", features = ["env-filter"] } - .layer( - ServiceBuilder::new() - .layer(HandleErrorLayer::new(|error: BoxError| async move { - ,if error.is::<tower::timeout::error::Elapsed>() { - Ok(StatusCode::REQUEST_TIMEOUT) - } else { - Err(( - StatusCode::INTERNAL_SERVER_ERROR, - format!("Unhandled internal error: {error}"), - )) - } - })) - .timeout(Duration::from_secs(10)) - .layer(TraceLayer::new_for_http()) - .into_inner(), - )*/ + /* For an example for timeouts and tracing, have a look at the git history */ .with_state(shared_state); let listener = tokio::net::TcpListener::bind(("0.0.0.0", port)).await.unwrap(); @@ -122,6 +102,7 @@ async fn show_settings(State(state): State<SharedState>) -> SettingsTemplate<'st #[derive(Deserialize, Debug)] struct FormConfig { + freq: String, callsign: String, ssid: String, icon: String, @@ -164,6 +145,7 @@ impl TryFrom<FormConfig> for config::Config { fn try_from(value: FormConfig) -> Result<Self, Self::Error> { Ok(config::Config { + freq: value.freq.parse()?, callsign: value.callsign, ssid: value.ssid.parse()?, icon: value.icon.parse()?, |