aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/dabmux.rs156
-rw-r--r--src/ui.rs32
2 files changed, 133 insertions, 55 deletions
diff --git a/src/dabmux.rs b/src/dabmux.rs
index dbf6ea9..af81b68 100644
--- a/src/dabmux.rs
+++ b/src/dabmux.rs
@@ -1,11 +1,16 @@
+use std::collections::HashMap;
+
use anyhow::anyhow;
+use serde::Deserialize;
use serde_json::Value;
+use log::info;
const ZMQ_TIMEOUT : i64 = 2000;
pub struct DabMux {
ctx : zmq::Context,
rc_endpoint : String,
+ stats_endpoint : String,
}
pub struct Param {
@@ -20,7 +25,8 @@ impl DabMux {
let ctx = zmq::Context::new();
Self {
ctx,
- rc_endpoint : "tcp://127.0.0.1:12722".to_owned()
+ rc_endpoint : "tcp://127.0.0.1:12722".to_owned(),
+ stats_endpoint : "tcp://127.0.0.1:12720".to_owned(),
}
}
@@ -67,67 +73,131 @@ impl DabMux {
Ok(all_params)
}
- pub fn get_rc_parameters(&mut self) -> anyhow::Result<Vec<Param>> {
- let sock = self.ctx.socket(zmq::REQ)?;
- sock.connect(&self.rc_endpoint)?;
- sock.send("showjson", 0)?;
-
- let mut msg = zmq::Message::new();
- let mut items = [
- sock.as_poll_item(zmq::POLLIN),
- ];
+ fn poll_multipart(sock: &zmq::Socket) -> anyhow::Result<Vec<String>> {
+ let mut items = [ sock.as_poll_item(zmq::POLLIN), ];
zmq::poll(&mut items, ZMQ_TIMEOUT).unwrap();
if items[0].is_readable() {
- sock.recv(&mut msg, 0)?;
- let msg = msg.as_str().ok_or(anyhow!("RC response is not a str"))?;
+ let mut parts = Vec::new();
+ for part in sock.recv_multipart(0)? {
+ let p = String::from_utf8(part)?;
+ parts.push(p);
+ }
+ Ok(parts)
+ }
+ else {
+ Err(anyhow!("ZMQ timeout"))
+ }
+ }
- // JSON structure:
- // { "module1": { "param1": "value", "param2": "value" }, "module2": { ... } }
- let v: Value = serde_json::from_str(msg)?;
- Self::value_to_params(v)
+ fn poll_message(sock: &zmq::Socket) -> anyhow::Result<String> {
+ let parts = Self::poll_multipart(&sock)?;
+ if parts.len() == 1 {
+ Ok(parts[0].clone())
}
else {
- Err(anyhow!("Timeout reading RC"))
+ info!("multipart returned: {}", parts.join(","));
+ return Err(anyhow!("unexpected multipart answer"));
}
}
- pub fn set_rc_parameter(&mut self, module: &str, param: &str, value: &str) -> anyhow::Result<Value> {
+ pub fn get_rc_parameters(&mut self) -> anyhow::Result<Vec<Param>> {
let sock = self.ctx.socket(zmq::REQ)?;
sock.connect(&self.rc_endpoint)?;
- sock.send_multipart(["set", module, param, value], 0)?;
+ sock.send("showjson", 0)?;
- let mut items = [
- sock.as_poll_item(zmq::POLLIN),
- ];
- zmq::poll(&mut items, ZMQ_TIMEOUT).unwrap();
- if items[0].is_readable() {
- let mut parts = sock.recv_multipart(0)?;
+ let msg = Self::poll_message(&sock)?;
+
+ // JSON structure:
+ // { "module1": { "param1": "value", "param2": "value" }, "module2": { ... } }
+ let v: Value = serde_json::from_str(&msg)?;
+ Self::value_to_params(v)
+ }
- let j : serde_json::Value = parts.drain(..)
- .map(|p| match String::from_utf8(p)
- {
- Ok(s) => s,
- Err(_) => "???".to_owned(),
- })
- .collect();
+ pub fn set_rc_parameter(&mut self, module: &str, param: &str, value: &str) -> anyhow::Result<()> {
+ let sock = self.ctx.socket(zmq::REQ)?;
+ sock.connect(&self.rc_endpoint)?;
+ sock.send_multipart(["set", module, param, value], 0)?;
- let j_arr = j.as_array().unwrap();
+ let resp = Self::poll_multipart(&sock)?;
- //eprintln!("SET_RC: {}", j);
- if j_arr[0].as_str() == Some("ok") {
- Ok(j)
+ //eprintln!("SET_RC: {}", j);
+ if resp.len() > 0 && resp[0] == "ok" {
+ Ok(())
+ }
+ else {
+ if resp.len() > 1 && resp[0] == "fail" {
+ Err(anyhow!(format!("Failed to set RC: {}", resp[1])))
}
else {
- if j_arr.len() > 0 && j_arr[1].is_string() {
- Err(anyhow!(format!("Failed to set RC: {}", j_arr[1].as_str().unwrap())))
- }
- else {
- Err(anyhow!("Failed to set RC: unknown error"))
- }
+ Err(anyhow!("Failed to set RC: unknown error"))
+ }
+ }
+ }
+
+ pub fn get_stats(&mut self) -> anyhow::Result<Stats> {
+ let sock = self.ctx.socket(zmq::REQ)?;
+ sock.connect(&self.stats_endpoint)?;
+ sock.send("info", 0)?;
+
+ let info_json : Value = serde_json::from_str(&Self::poll_message(&sock)?)?;
+
+ if let Some(service) = info_json.get("service")
+ .and_then(|v| v.as_str())
+ {
+ if !service.starts_with("ODR-DabMux") {
+ info!("stats info service is {}", service);
+ return Err(anyhow!("Wrong service in stats"));
+ }
+
+ let version = info_json.get("version")
+ .and_then(|v| v.as_str())
+ .or(Some("UNKNOWN"))
+ .unwrap()
+ .to_owned();
+
+ sock.send("values", 0)?;
+ let values_json : Value = serde_json::from_str(&Self::poll_message(&sock)?)?;
+ match values_json.get("values")
+ .and_then(|v| v.as_object()) {
+ Some(v) => {
+ let mut input_stats : HashMap<String, InputStat> = HashMap::new();
+
+ for (k, v) in v {
+ let is = v.get("inputstat")
+ .ok_or(anyhow!("inputstat missing"))?;
+ let stat : InputStat = serde_json::from_value(is.clone())?;
+ input_stats.insert(k.clone(), stat);
+ }
+
+ Ok(Stats { version, input_stats })
+ },
+ None => Err(anyhow!("values isn't an object")),
}
}
else {
- Err(anyhow!("Timeout reading RC"))
+ Err(anyhow!("Missing service in stats response"))
}
}
}
+
+#[derive(Debug)]
+pub struct Stats {
+ pub version : String,
+ pub input_stats : HashMap<String, InputStat>,
+}
+
+#[derive(Debug, Deserialize)]
+pub struct InputStat {
+ pub max_fill : u32,
+ pub min_fill : u32,
+ pub num_underruns : u64,
+ pub num_overruns : u64,
+ pub peak_left : i32,
+ pub peak_right : i32,
+ pub peak_left_slow : i32,
+ pub peak_right_slow : i32,
+ pub state : Option<String>,
+ pub version : Option<String>,
+ pub uptime : Option<u64>,
+ pub last_tist_offset : i32,
+}
diff --git a/src/ui.rs b/src/ui.rs
index 4a73ba4..99a96a5 100644
--- a/src/ui.rs
+++ b/src/ui.rs
@@ -7,6 +7,7 @@ use axum::{
http::StatusCode,
routing::{get, post},
};
+use log::info;
use serde::Deserialize;
use tower_http::services::ServeDir;
@@ -52,26 +53,31 @@ struct DashboardTemplate<'a> {
title: &'a str,
page: ActivePage,
conf: config::Config,
- errors: Option<String>,
params: Vec<crate::dabmux::Param>,
+ params_errors: Option<String>,
+ stats: Option<crate::dabmux::Stats>,
+ stats_errors: Option<String>,
}
async fn dashboard(State(state): State<SharedState>) -> DashboardTemplate<'static> {
- let (conf, params_result) = {
+ let (conf, params_result, stats_result) = {
let mut st = state.lock().unwrap();
let params_result = st.dabmux.get_rc_parameters();
+ let stats_result = st.dabmux.get_stats();
+ info!("STATS: {:?}", stats_result);
- (st.conf.clone(), params_result)
+ (st.conf.clone(), params_result, stats_result)
};
- let (params, errors) = match params_result {
- Ok(v) => {
- (v, None)
- },
- Err(e) => {
- (Vec::new(), Some(format!("{}", e)))
- },
+ let (params, params_errors) = match params_result {
+ Ok(v) => (v, None),
+ Err(e) => (Vec::new(), Some(format!("{}", e))),
+ };
+
+ let (stats, stats_errors) = match stats_result {
+ Ok(v) => (Some(v), None),
+ Err(e) => (None, Some(format!("{}", e))),
};
DashboardTemplate {
@@ -79,7 +85,9 @@ async fn dashboard(State(state): State<SharedState>) -> DashboardTemplate<'stati
conf,
page: ActivePage::Dashboard,
params,
- errors,
+ params_errors,
+ stats,
+ stats_errors,
}
}
@@ -100,7 +108,7 @@ async fn post_rc(
};
match set_rc_result {
- Ok(v) => (StatusCode::OK, v.as_str().or(Some("")).unwrap().to_owned()),
+ Ok(()) => (StatusCode::OK, "".to_owned()),
Err(e) => (StatusCode::BAD_REQUEST, e.to_string()),
}
}