aboutsummaryrefslogtreecommitdiffstats
path: root/src/db.rs
blob: 68455b3193a66dbea2e9b1ad7023e9f44165d954 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use std::time::{SystemTime, UNIX_EPOCH};
use std::io;

use log::debug;
use sqlx::{SqlitePool, sqlite::SqliteRow, Row};

#[derive(Clone)]
pub struct Database {
    pool : SqlitePool,
    num_frames_received : u64,
}

#[derive(Debug)]
pub struct Packet {
    pub id : i64,
    pub received_at: chrono::DateTime<chrono::Utc>,
    pub content : Vec<u8>,
}

impl sqlx::FromRow<'_, SqliteRow> for Packet {
    fn from_row(row: &SqliteRow) -> Result<Self, sqlx::Error> {
        Ok(Self {
            id: row.try_get("id")?,
            received_at: {
                let row : i64 = row.try_get("received_at")?;
                chrono::DateTime::from_timestamp(row, 0).expect("Convert timestamp to chrono")
            },
            content: row.try_get("content")?,

        })
    }
}

impl Database {
    pub async fn new() -> Self {
        {
            // Ensure the database file exists
            match std::fs::OpenOptions::new().write(true)
                .create_new(true)
                .open("cats-radio-node.db") {
                    Ok(_f) => (),
                    Err(e) if e.kind() == io::ErrorKind::AlreadyExists => (),
                    Err(e) => {
                        panic!("Failed to ensure DB exists: {e}");
                    },
                }
        }

        let pool = SqlitePool::connect("sqlite:cats-radio-node.db").await.unwrap();
        let mut conn = pool.acquire().await.unwrap();

        sqlx::migrate!()
            .run(&mut conn)
            .await
            .expect("could not run SQLx migrations");

        let num_frames_received : i64 = sqlx::query_scalar(r#"SELECT COUNT(id) FROM frames_received"#)
            .fetch_one(&pool)
            .await
            .expect("could not count frames");

        Self { pool, num_frames_received: num_frames_received.try_into().unwrap() }
    }

    pub fn get_num_received_frames(&self) -> u64 {
        self.num_frames_received
    }

    pub async fn store_packet(&mut self, packet: &[u8]) -> anyhow::Result<()> {
        let timestamp = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .expect("Time went backwards");

        let timestamp_i64 : i64 = timestamp.as_secs().try_into()?;

        let id = sqlx::query(r#"INSERT INTO frames_received (received_at, content) VALUES ( ?1 , ?2 )"#)
            .bind(timestamp_i64).bind(packet)
            .execute(&self.pool)
            .await?
            .last_insert_rowid();

        self.num_frames_received += 1;

        debug!("INSERTed row {id}");
        Ok(())
    }

    pub async fn get_most_recent_packets(&mut self, count: i64) -> anyhow::Result<Vec<Packet>> {
        let results = sqlx::query_as(r#"
               SELECT id, received_at, content
               FROM frames_received
               ORDER BY received_at DESC
               LIMIT ?1"#)
            .bind(count)
            .fetch_all(&self.pool)
            .await?;

        Ok(results)
    }

    pub async fn get_packets_since(&mut self, unix_timestamp: i64) -> anyhow::Result<Vec<Packet>> {
        let results = sqlx::query_as(r#"
               SELECT id, received_at, content
               FROM frames_received
               WHERE received_at > ?1
               ORDER BY received_at"#)
            .bind(unix_timestamp)
            .fetch_all(&self.pool)
            .await?;

        Ok(results)
    }
}