aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2024-01-03 21:57:27 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2024-01-03 21:57:27 +0100
commitc2742cde3d034b2af9bbcee90765338ee094e6cc (patch)
treec35399d5d52725efbf3ebaaee83752574bf2ba1c /src
parent43201470d4092f54c176dd5ebd2d7b0bbf15192e (diff)
downloadcats-radio-node-c2742cde3d034b2af9bbcee90765338ee094e6cc.tar.gz
cats-radio-node-c2742cde3d034b2af9bbcee90765338ee094e6cc.tar.bz2
cats-radio-node-c2742cde3d034b2af9bbcee90765338ee094e6cc.zip
Add src/radio and freq config
Diffstat (limited to 'src')
-rw-r--r--src/config.rs2
-rw-r--r--src/main.rs64
-rw-r--r--src/radio.rs161
-rw-r--r--src/ui.rs24
4 files changed, 227 insertions, 24 deletions
diff --git a/src/config.rs b/src/config.rs
index d54e01a..7700b45 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -69,6 +69,7 @@ impl Default for BeaconConfig {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Config {
+ pub freq: u32, // kHz
pub callsign: String,
pub ssid: u8,
#[serde(default)]
@@ -81,6 +82,7 @@ pub struct Config {
impl Default for Config {
fn default() -> Self {
Config {
+ freq: 430500,
callsign: "CHANGEME".to_owned(),
ssid: 0,
icon: 0,
diff --git a/src/main.rs b/src/main.rs
index 20fedf6..9a4ae2b 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,6 +1,11 @@
+use anyhow::Context;
+use log::{debug, info, warn, error};
use std::sync::{Arc, Mutex};
+use tokio::sync::mpsc;
use sqlx::{Connection, SqliteConnection};
+use radio::{RadioManager, MAX_PACKET_LEN};
+mod radio;
mod config;
mod ui;
@@ -13,8 +18,7 @@ type SharedState = Arc<Mutex<AppState>>;
#[tokio::main]
async fn main() -> std::io::Result<()> {
-
- // simple_logger::
+ simple_logger::SimpleLogger::new().env().init().unwrap();
let mut conn = SqliteConnection::connect("sqlite:cats-radio-node.db").await.unwrap();
sqlx::migrate!()
@@ -24,12 +28,66 @@ async fn main() -> std::io::Result<()> {
let conf = config::Config::load().expect("Could not load config");
+ if conf.freq == 0 {
+ warn!("Frequency {0} is zero, disabling radio", conf.freq);
+ }
+ else if !(430000..=436380).contains(&conf.freq) {
+ error!("Frequency {} kHz out of range (430MHz - 436.375MHz), skipping radio setup", conf.freq);
+ }
+ else {
+ info!("Setting up radio");
+ let (packet_tx, mut packet_receive) = mpsc::channel(16);
+ let (packet_send, packet_rx) = mpsc::channel(16);
+ let mut radio = RadioManager::new(packet_tx, packet_rx).expect("Could not initialize radio");
+
+ let channel = ((conf.freq - 430000) / 25) as u8;
+ radio.set_channel(channel);
+ let actual_freq = 430000 + 25 * channel as u32;
+ info!("Setting up radio on {actual_freq} kHz...");
+
+ tokio::task::spawn(async move {
+ loop {
+ if let Err(e) = radio.process_forever().await {
+ error!("Radio error: {e}")
+ }
+ }
+ });
+
+ tokio::task::spawn(async move {
+ loop {
+ match packet_receive
+ .recv()
+ .await
+ .context("Packet receive channel died") {
+ Ok((packet, rssi)) => {
+ debug!("RX RSSI {} len {}", rssi, packet.len());
+ let mut buf = [0; MAX_PACKET_LEN];
+ match ham_cats::packet::Packet::fully_decode(&packet, &mut buf) {
+ Ok(packet) => {
+ if let Some(ident) = packet.identification() {
+ debug!(" From {}-{}", ident.callsign, ident.ssid);
+ }
+ // TODO save to db
+ }
+ Err(e) => {
+ warn!("Failed to decode packet: {}", e);
+ }
+ }
+ },
+ Err(e) => warn!("Failed to decode packet: {}", e),
+ }
+ }
+ });
+ }
+
let shared_state = Arc::new(Mutex::new(AppState {
conf,
db: Mutex::new(conn)
}));
- ui::serve(3000, shared_state).await;
+ let port = 3000;
+ info!("Setting up listener on port {port}");
+ ui::serve(port, shared_state).await;
Ok(())
}
diff --git a/src/radio.rs b/src/radio.rs
new file mode 100644
index 0000000..77f7c0f
--- /dev/null
+++ b/src/radio.rs
@@ -0,0 +1,161 @@
+use anyhow::{anyhow, bail, Context};
+use rand::{thread_rng, Rng};
+use rf4463::{config::RADIO_CONFIG_CATS, Rf4463};
+use rppal::{
+ gpio::{Gpio, OutputPin},
+ hal::Delay,
+ spi::{Bus, Mode, SlaveSelect, Spi},
+};
+use std::{
+ sync::Arc,
+ time::{Duration, Instant},
+};
+use tokio::sync::{
+ mpsc::{error::TryRecvError, Receiver, Sender},
+ Mutex,
+};
+
+pub const MAX_PACKET_LEN: usize = 8191;
+
+pub struct RadioManager {
+ radio: Rf4463<Spi, OutputPin, OutputPin, Delay>,
+
+ tx: Sender<(Vec<u8>, f64)>,
+ rx: Receiver<Vec<u8>>,
+ rx_buf: [u8; MAX_PACKET_LEN],
+ temperature: Arc<Mutex<f32>>,
+}
+
+impl RadioManager {
+ pub fn new(tx: Sender<(Vec<u8>, f64)>, rx: Receiver<Vec<u8>>) -> anyhow::Result<Self> {
+ let spi = Spi::new(Bus::Spi0, SlaveSelect::Ss0, 1_000_000, Mode::Mode0)?;
+ let gpio = Gpio::new()?;
+ let sdn = gpio.get(22)?.into_output();
+ let cs = gpio.get(24)?.into_output();
+
+ let delay = Delay::new();
+
+ let mut radio = Rf4463::new(spi, sdn, cs, delay, &mut RADIO_CONFIG_CATS.clone())
+ .map_err(|e| anyhow!("{e:?}"))?;
+ radio.set_channel(20);
+
+ let rx_buf = [0; MAX_PACKET_LEN];
+ let temperature = Arc::new(Mutex::new(radio.get_temp()?));
+
+ Ok(Self {
+ radio,
+ tx,
+ rx,
+ rx_buf,
+ temperature,
+ })
+ }
+
+ pub fn set_channel(&mut self, channel: u8) {
+ self.radio.set_channel(channel);
+ }
+
+ pub fn temperature_mutex(&self) -> Arc<Mutex<f32>> {
+ self.temperature.clone()
+ }
+
+ pub async fn process_forever(&mut self) -> anyhow::Result<()> {
+ loop {
+ self.tick().await?;
+
+ *self.temperature.lock().await = self.radio.get_temp()?;
+
+ match self.rx.try_recv() {
+ Ok(pkt) => {
+ self.tx(&pkt).await?;
+ }
+ Err(TryRecvError::Empty) => {}
+ Err(TryRecvError::Disconnected) => {
+ bail!("RX channel disconnected")
+ }
+ }
+
+ tokio::time::sleep(Duration::from_millis(25)).await;
+ }
+ }
+
+ async fn tick(&mut self) -> anyhow::Result<()> {
+ if self.radio.is_idle() {
+ self.radio
+ .start_rx(None, false)
+ .map_err(|e| anyhow!("{e}"))?;
+
+ tokio::time::sleep(Duration::from_millis(25)).await;
+ }
+
+ self.radio
+ .interrupt(Some(&mut self.rx_buf), None)
+ .map_err(|e| anyhow!("{e:?}"))?;
+
+ if let Some(data) = self
+ .radio
+ .finish_rx(&mut self.rx_buf)
+ .map_err(|e| anyhow!("{e}"))?
+ {
+ self.radio
+ .start_rx(None, false)
+ .map_err(|e| anyhow!("{e}"))?;
+
+ self.tx
+ .send((data.data().to_vec(), data.rssi()))
+ .await
+ .ok()
+ .context("TX channel died")?;
+ }
+
+ Ok(())
+ }
+
+ async fn tx(&mut self, data: &[u8]) -> anyhow::Result<()> {
+ // ensures we don't tx over a packet,
+ // and adds some random delay so that every node
+ // if offset slightly
+ self.tx_delay().await?;
+
+ self.radio.start_tx(data).map_err(|e| anyhow!("{e:?}"))?;
+
+ const TIMEOUT: Duration = Duration::from_secs(10);
+ let start_time = Instant::now();
+ while !self.radio.is_idle() {
+ self.radio
+ .interrupt(None, Some(data))
+ .map_err(|e| anyhow!("{e:?}"))?;
+
+ if start_time + TIMEOUT < Instant::now() {
+ bail!("Timeout while transmitting");
+ }
+
+ tokio::time::sleep(Duration::from_millis(25)).await;
+ }
+
+ Ok(())
+ }
+
+ async fn tx_delay(&mut self) -> anyhow::Result<()> {
+ loop {
+ let delay_ms = thread_rng().gen_range(0..50);
+
+ // since delay_ms < 100 we can safely sleep without calling tick
+ tokio::time::sleep(Duration::from_millis(delay_ms)).await;
+
+ let mut rx = false;
+
+ while self.radio.is_busy_rxing()? {
+ rx = true;
+ self.tick().await?;
+
+ tokio::time::sleep(Duration::from_millis(25)).await;
+ }
+
+ if !rx {
+ // didn't rx a packet, so we're safe to leave
+ break Ok(());
+ }
+ }
+ }
+}
diff --git a/src/ui.rs b/src/ui.rs
index aebcd4a..5a13bc0 100644
--- a/src/ui.rs
+++ b/src/ui.rs
@@ -21,27 +21,7 @@ pub async fn serve(port: u16, shared_state: SharedState) {
.route("/send", get(send))
.route("/settings", get(show_settings).post(post_settings))
.nest_service("/static", ServeDir::new("static"))
- /* requires tracing and tower, e.g.
- * tower = { version = "0.4", features = ["util", "timeout"] }
- * tower-http = { version = "0.5.0", features = ["add-extension", "trace"] }
- * tracing = "0.1"
- * tracing-subscriber = { version = "0.3", features = ["env-filter"] }
- .layer(
- ServiceBuilder::new()
- .layer(HandleErrorLayer::new(|error: BoxError| async move {
- ,if error.is::<tower::timeout::error::Elapsed>() {
- Ok(StatusCode::REQUEST_TIMEOUT)
- } else {
- Err((
- StatusCode::INTERNAL_SERVER_ERROR,
- format!("Unhandled internal error: {error}"),
- ))
- }
- }))
- .timeout(Duration::from_secs(10))
- .layer(TraceLayer::new_for_http())
- .into_inner(),
- )*/
+ /* For an example for timeouts and tracing, have a look at the git history */
.with_state(shared_state);
let listener = tokio::net::TcpListener::bind(("0.0.0.0", port)).await.unwrap();
@@ -122,6 +102,7 @@ async fn show_settings(State(state): State<SharedState>) -> SettingsTemplate<'st
#[derive(Deserialize, Debug)]
struct FormConfig {
+ freq: String,
callsign: String,
ssid: String,
icon: String,
@@ -164,6 +145,7 @@ impl TryFrom<FormConfig> for config::Config {
fn try_from(value: FormConfig) -> Result<Self, Self::Error> {
Ok(config::Config {
+ freq: value.freq.parse()?,
callsign: value.callsign,
ssid: value.ssid.parse()?,
icon: value.icon.parse()?,