From eadac6465d83ec6e26a45fd90d96ec5e081ce7b2 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 20 Sep 2024 22:45:23 +0200 Subject: Get stats working --- src/dabmux.rs | 156 ++++++++++++++++++++++++++++++++++++++++++---------------- src/ui.rs | 32 +++++++----- 2 files changed, 133 insertions(+), 55 deletions(-) (limited to 'src') diff --git a/src/dabmux.rs b/src/dabmux.rs index dbf6ea9..af81b68 100644 --- a/src/dabmux.rs +++ b/src/dabmux.rs @@ -1,11 +1,16 @@ +use std::collections::HashMap; + use anyhow::anyhow; +use serde::Deserialize; use serde_json::Value; +use log::info; const ZMQ_TIMEOUT : i64 = 2000; pub struct DabMux { ctx : zmq::Context, rc_endpoint : String, + stats_endpoint : String, } pub struct Param { @@ -20,7 +25,8 @@ impl DabMux { let ctx = zmq::Context::new(); Self { ctx, - rc_endpoint : "tcp://127.0.0.1:12722".to_owned() + rc_endpoint : "tcp://127.0.0.1:12722".to_owned(), + stats_endpoint : "tcp://127.0.0.1:12720".to_owned(), } } @@ -67,67 +73,131 @@ impl DabMux { Ok(all_params) } - pub fn get_rc_parameters(&mut self) -> anyhow::Result> { - let sock = self.ctx.socket(zmq::REQ)?; - sock.connect(&self.rc_endpoint)?; - sock.send("showjson", 0)?; - - let mut msg = zmq::Message::new(); - let mut items = [ - sock.as_poll_item(zmq::POLLIN), - ]; + fn poll_multipart(sock: &zmq::Socket) -> anyhow::Result> { + let mut items = [ sock.as_poll_item(zmq::POLLIN), ]; zmq::poll(&mut items, ZMQ_TIMEOUT).unwrap(); if items[0].is_readable() { - sock.recv(&mut msg, 0)?; - let msg = msg.as_str().ok_or(anyhow!("RC response is not a str"))?; + let mut parts = Vec::new(); + for part in sock.recv_multipart(0)? { + let p = String::from_utf8(part)?; + parts.push(p); + } + Ok(parts) + } + else { + Err(anyhow!("ZMQ timeout")) + } + } - // JSON structure: - // { "module1": { "param1": "value", "param2": "value" }, "module2": { ... } } - let v: Value = serde_json::from_str(msg)?; - Self::value_to_params(v) + fn poll_message(sock: &zmq::Socket) -> anyhow::Result { + let parts = Self::poll_multipart(&sock)?; + if parts.len() == 1 { + Ok(parts[0].clone()) } else { - Err(anyhow!("Timeout reading RC")) + info!("multipart returned: {}", parts.join(",")); + return Err(anyhow!("unexpected multipart answer")); } } - pub fn set_rc_parameter(&mut self, module: &str, param: &str, value: &str) -> anyhow::Result { + pub fn get_rc_parameters(&mut self) -> anyhow::Result> { let sock = self.ctx.socket(zmq::REQ)?; sock.connect(&self.rc_endpoint)?; - sock.send_multipart(["set", module, param, value], 0)?; + sock.send("showjson", 0)?; - let mut items = [ - sock.as_poll_item(zmq::POLLIN), - ]; - zmq::poll(&mut items, ZMQ_TIMEOUT).unwrap(); - if items[0].is_readable() { - let mut parts = sock.recv_multipart(0)?; + let msg = Self::poll_message(&sock)?; + + // JSON structure: + // { "module1": { "param1": "value", "param2": "value" }, "module2": { ... } } + let v: Value = serde_json::from_str(&msg)?; + Self::value_to_params(v) + } - let j : serde_json::Value = parts.drain(..) - .map(|p| match String::from_utf8(p) - { - Ok(s) => s, - Err(_) => "???".to_owned(), - }) - .collect(); + pub fn set_rc_parameter(&mut self, module: &str, param: &str, value: &str) -> anyhow::Result<()> { + let sock = self.ctx.socket(zmq::REQ)?; + sock.connect(&self.rc_endpoint)?; + sock.send_multipart(["set", module, param, value], 0)?; - let j_arr = j.as_array().unwrap(); + let resp = Self::poll_multipart(&sock)?; - //eprintln!("SET_RC: {}", j); - if j_arr[0].as_str() == Some("ok") { - Ok(j) + //eprintln!("SET_RC: {}", j); + if resp.len() > 0 && resp[0] == "ok" { + Ok(()) + } + else { + if resp.len() > 1 && resp[0] == "fail" { + Err(anyhow!(format!("Failed to set RC: {}", resp[1]))) } else { - if j_arr.len() > 0 && j_arr[1].is_string() { - Err(anyhow!(format!("Failed to set RC: {}", j_arr[1].as_str().unwrap()))) - } - else { - Err(anyhow!("Failed to set RC: unknown error")) - } + Err(anyhow!("Failed to set RC: unknown error")) + } + } + } + + pub fn get_stats(&mut self) -> anyhow::Result { + let sock = self.ctx.socket(zmq::REQ)?; + sock.connect(&self.stats_endpoint)?; + sock.send("info", 0)?; + + let info_json : Value = serde_json::from_str(&Self::poll_message(&sock)?)?; + + if let Some(service) = info_json.get("service") + .and_then(|v| v.as_str()) + { + if !service.starts_with("ODR-DabMux") { + info!("stats info service is {}", service); + return Err(anyhow!("Wrong service in stats")); + } + + let version = info_json.get("version") + .and_then(|v| v.as_str()) + .or(Some("UNKNOWN")) + .unwrap() + .to_owned(); + + sock.send("values", 0)?; + let values_json : Value = serde_json::from_str(&Self::poll_message(&sock)?)?; + match values_json.get("values") + .and_then(|v| v.as_object()) { + Some(v) => { + let mut input_stats : HashMap = HashMap::new(); + + for (k, v) in v { + let is = v.get("inputstat") + .ok_or(anyhow!("inputstat missing"))?; + let stat : InputStat = serde_json::from_value(is.clone())?; + input_stats.insert(k.clone(), stat); + } + + Ok(Stats { version, input_stats }) + }, + None => Err(anyhow!("values isn't an object")), } } else { - Err(anyhow!("Timeout reading RC")) + Err(anyhow!("Missing service in stats response")) } } } + +#[derive(Debug)] +pub struct Stats { + pub version : String, + pub input_stats : HashMap, +} + +#[derive(Debug, Deserialize)] +pub struct InputStat { + pub max_fill : u32, + pub min_fill : u32, + pub num_underruns : u64, + pub num_overruns : u64, + pub peak_left : i32, + pub peak_right : i32, + pub peak_left_slow : i32, + pub peak_right_slow : i32, + pub state : Option, + pub version : Option, + pub uptime : Option, + pub last_tist_offset : i32, +} diff --git a/src/ui.rs b/src/ui.rs index 4a73ba4..99a96a5 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -7,6 +7,7 @@ use axum::{ http::StatusCode, routing::{get, post}, }; +use log::info; use serde::Deserialize; use tower_http::services::ServeDir; @@ -52,26 +53,31 @@ struct DashboardTemplate<'a> { title: &'a str, page: ActivePage, conf: config::Config, - errors: Option, params: Vec, + params_errors: Option, + stats: Option, + stats_errors: Option, } async fn dashboard(State(state): State) -> DashboardTemplate<'static> { - let (conf, params_result) = { + let (conf, params_result, stats_result) = { let mut st = state.lock().unwrap(); let params_result = st.dabmux.get_rc_parameters(); + let stats_result = st.dabmux.get_stats(); + info!("STATS: {:?}", stats_result); - (st.conf.clone(), params_result) + (st.conf.clone(), params_result, stats_result) }; - let (params, errors) = match params_result { - Ok(v) => { - (v, None) - }, - Err(e) => { - (Vec::new(), Some(format!("{}", e))) - }, + let (params, params_errors) = match params_result { + Ok(v) => (v, None), + Err(e) => (Vec::new(), Some(format!("{}", e))), + }; + + let (stats, stats_errors) = match stats_result { + Ok(v) => (Some(v), None), + Err(e) => (None, Some(format!("{}", e))), }; DashboardTemplate { @@ -79,7 +85,9 @@ async fn dashboard(State(state): State) -> DashboardTemplate<'stati conf, page: ActivePage::Dashboard, params, - errors, + params_errors, + stats, + stats_errors, } } @@ -100,7 +108,7 @@ async fn post_rc( }; match set_rc_result { - Ok(v) => (StatusCode::OK, v.as_str().or(Some("")).unwrap().to_owned()), + Ok(()) => (StatusCode::OK, "".to_owned()), Err(e) => (StatusCode::BAD_REQUEST, e.to_string()), } } -- cgit v1.2.3