diff options
Diffstat (limited to 'src/dabmux.rs')
-rw-r--r-- | src/dabmux.rs | 156 |
1 files changed, 113 insertions, 43 deletions
diff --git a/src/dabmux.rs b/src/dabmux.rs index dbf6ea9..af81b68 100644 --- a/src/dabmux.rs +++ b/src/dabmux.rs @@ -1,11 +1,16 @@ +use std::collections::HashMap; + use anyhow::anyhow; +use serde::Deserialize; use serde_json::Value; +use log::info; const ZMQ_TIMEOUT : i64 = 2000; pub struct DabMux { ctx : zmq::Context, rc_endpoint : String, + stats_endpoint : String, } pub struct Param { @@ -20,7 +25,8 @@ impl DabMux { let ctx = zmq::Context::new(); Self { ctx, - rc_endpoint : "tcp://127.0.0.1:12722".to_owned() + rc_endpoint : "tcp://127.0.0.1:12722".to_owned(), + stats_endpoint : "tcp://127.0.0.1:12720".to_owned(), } } @@ -67,67 +73,131 @@ impl DabMux { Ok(all_params) } - pub fn get_rc_parameters(&mut self) -> anyhow::Result<Vec<Param>> { - let sock = self.ctx.socket(zmq::REQ)?; - sock.connect(&self.rc_endpoint)?; - sock.send("showjson", 0)?; - - let mut msg = zmq::Message::new(); - let mut items = [ - sock.as_poll_item(zmq::POLLIN), - ]; + fn poll_multipart(sock: &zmq::Socket) -> anyhow::Result<Vec<String>> { + let mut items = [ sock.as_poll_item(zmq::POLLIN), ]; zmq::poll(&mut items, ZMQ_TIMEOUT).unwrap(); if items[0].is_readable() { - sock.recv(&mut msg, 0)?; - let msg = msg.as_str().ok_or(anyhow!("RC response is not a str"))?; + let mut parts = Vec::new(); + for part in sock.recv_multipart(0)? { + let p = String::from_utf8(part)?; + parts.push(p); + } + Ok(parts) + } + else { + Err(anyhow!("ZMQ timeout")) + } + } - // JSON structure: - // { "module1": { "param1": "value", "param2": "value" }, "module2": { ... } } - let v: Value = serde_json::from_str(msg)?; - Self::value_to_params(v) + fn poll_message(sock: &zmq::Socket) -> anyhow::Result<String> { + let parts = Self::poll_multipart(&sock)?; + if parts.len() == 1 { + Ok(parts[0].clone()) } else { - Err(anyhow!("Timeout reading RC")) + info!("multipart returned: {}", parts.join(",")); + return Err(anyhow!("unexpected multipart answer")); } } - pub fn set_rc_parameter(&mut self, module: &str, param: &str, value: &str) -> anyhow::Result<Value> { + pub fn get_rc_parameters(&mut self) -> anyhow::Result<Vec<Param>> { let sock = self.ctx.socket(zmq::REQ)?; sock.connect(&self.rc_endpoint)?; - sock.send_multipart(["set", module, param, value], 0)?; + sock.send("showjson", 0)?; - let mut items = [ - sock.as_poll_item(zmq::POLLIN), - ]; - zmq::poll(&mut items, ZMQ_TIMEOUT).unwrap(); - if items[0].is_readable() { - let mut parts = sock.recv_multipart(0)?; + let msg = Self::poll_message(&sock)?; + + // JSON structure: + // { "module1": { "param1": "value", "param2": "value" }, "module2": { ... } } + let v: Value = serde_json::from_str(&msg)?; + Self::value_to_params(v) + } - let j : serde_json::Value = parts.drain(..) - .map(|p| match String::from_utf8(p) - { - Ok(s) => s, - Err(_) => "???".to_owned(), - }) - .collect(); + pub fn set_rc_parameter(&mut self, module: &str, param: &str, value: &str) -> anyhow::Result<()> { + let sock = self.ctx.socket(zmq::REQ)?; + sock.connect(&self.rc_endpoint)?; + sock.send_multipart(["set", module, param, value], 0)?; - let j_arr = j.as_array().unwrap(); + let resp = Self::poll_multipart(&sock)?; - //eprintln!("SET_RC: {}", j); - if j_arr[0].as_str() == Some("ok") { - Ok(j) + //eprintln!("SET_RC: {}", j); + if resp.len() > 0 && resp[0] == "ok" { + Ok(()) + } + else { + if resp.len() > 1 && resp[0] == "fail" { + Err(anyhow!(format!("Failed to set RC: {}", resp[1]))) } else { - if j_arr.len() > 0 && j_arr[1].is_string() { - Err(anyhow!(format!("Failed to set RC: {}", j_arr[1].as_str().unwrap()))) - } - else { - Err(anyhow!("Failed to set RC: unknown error")) - } + Err(anyhow!("Failed to set RC: unknown error")) + } + } + } + + pub fn get_stats(&mut self) -> anyhow::Result<Stats> { + let sock = self.ctx.socket(zmq::REQ)?; + sock.connect(&self.stats_endpoint)?; + sock.send("info", 0)?; + + let info_json : Value = serde_json::from_str(&Self::poll_message(&sock)?)?; + + if let Some(service) = info_json.get("service") + .and_then(|v| v.as_str()) + { + if !service.starts_with("ODR-DabMux") { + info!("stats info service is {}", service); + return Err(anyhow!("Wrong service in stats")); + } + + let version = info_json.get("version") + .and_then(|v| v.as_str()) + .or(Some("UNKNOWN")) + .unwrap() + .to_owned(); + + sock.send("values", 0)?; + let values_json : Value = serde_json::from_str(&Self::poll_message(&sock)?)?; + match values_json.get("values") + .and_then(|v| v.as_object()) { + Some(v) => { + let mut input_stats : HashMap<String, InputStat> = HashMap::new(); + + for (k, v) in v { + let is = v.get("inputstat") + .ok_or(anyhow!("inputstat missing"))?; + let stat : InputStat = serde_json::from_value(is.clone())?; + input_stats.insert(k.clone(), stat); + } + + Ok(Stats { version, input_stats }) + }, + None => Err(anyhow!("values isn't an object")), } } else { - Err(anyhow!("Timeout reading RC")) + Err(anyhow!("Missing service in stats response")) } } } + +#[derive(Debug)] +pub struct Stats { + pub version : String, + pub input_stats : HashMap<String, InputStat>, +} + +#[derive(Debug, Deserialize)] +pub struct InputStat { + pub max_fill : u32, + pub min_fill : u32, + pub num_underruns : u64, + pub num_overruns : u64, + pub peak_left : i32, + pub peak_right : i32, + pub peak_left_slow : i32, + pub peak_right_slow : i32, + pub state : Option<String>, + pub version : Option<String>, + pub uptime : Option<u64>, + pub last_tist_offset : i32, +} |