diff options
Diffstat (limited to 'src/ManagementServer.cpp')
| -rw-r--r-- | src/ManagementServer.cpp | 96 |
1 files changed, 60 insertions, 36 deletions
diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp index 599d744..7344b8b 100644 --- a/src/ManagementServer.cpp +++ b/src/ManagementServer.cpp @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -28,13 +28,12 @@ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. */ -#include <errno.h> -#include <string.h> -#include <math.h> -#include <stdint.h> -#include <limits> #include <sstream> #include <algorithm> +#include <cstring> +#include <cmath> +#include <cstdint> +#include <limits> #include <boost/version.hpp> #include "ManagementServer.h" #include "Log.h" @@ -96,11 +95,6 @@ INPUT_COUNTER_RESET_TIME = std::chrono::minutes(30); static constexpr int INPUT_UNSTABLE_THRESHOLD = 3; -/* For how long the input buffers must be empty before we move an input to the - * NoData state. */ -static constexpr auto -INPUT_NODATA_TIMEOUT = std::chrono::seconds(30); - /* Keep 30s of min/max buffer fill information so that we can catch meaningful * values even if we have a slow poller */ static constexpr auto @@ -127,37 +121,42 @@ ManagementServer& get_mgmt_server() */ } -void ManagementServer::registerInput(InputStat* is) +void ManagementServer::register_input(InputStat* is) { unique_lock<mutex> lock(m_statsmutex); std::string id(is->get_name()); - if (m_inputStats.count(id) == 1) { + if (m_input_stats.count(id) == 1) { etiLog.level(error) << "Double registration in MGMT Server with id '" << id << "'"; return; } - m_inputStats[id] = is; + m_input_stats[id] = is; } -void ManagementServer::unregisterInput(std::string id) +void ManagementServer::unregister_input(std::string id) { unique_lock<mutex> lock(m_statsmutex); - if (m_inputStats.count(id) == 1) { - m_inputStats.erase(id); + if (m_input_stats.count(id) == 1) { + m_input_stats.erase(id); } } +// outputs will never disappear, no need to have a "remove" logic +void ManagementServer::update_edi_tcp_output_stat(uint16_t listen_port, size_t num_connections) +{ + m_output_stats[listen_port] = num_connections; +} bool ManagementServer::isInputRegistered(std::string& id) { unique_lock<mutex> lock(m_statsmutex); - if (m_inputStats.count(id) == 0) { + if (m_input_stats.count(id) == 0) { etiLog.level(error) << "Management Server: id '" << id << "' does was not registered"; @@ -166,7 +165,7 @@ bool ManagementServer::isInputRegistered(std::string& id) return true; } -std::string ManagementServer::getStatConfigJSON() +std::string ManagementServer::get_input_config_json() { unique_lock<mutex> lock(m_statsmutex); @@ -175,7 +174,7 @@ std::string ManagementServer::getStatConfigJSON() std::map<std::string,InputStat*>::iterator iter; int i = 0; - for(iter = m_inputStats.begin(); iter != m_inputStats.end(); + for (iter = m_input_stats.begin(); iter != m_input_stats.end(); ++iter, i++) { std::string id = iter->first; @@ -192,16 +191,15 @@ std::string ManagementServer::getStatConfigJSON() return ss.str(); } -std::string ManagementServer::getValuesJSON() +std::string ManagementServer::get_input_values_json() { unique_lock<mutex> lock(m_statsmutex); std::ostringstream ss; ss << "{ \"values\" : {\n"; - std::map<std::string,InputStat*>::iterator iter; int i = 0; - for(iter = m_inputStats.begin(); iter != m_inputStats.end(); + for (auto iter = m_input_stats.begin(); iter != m_input_stats.end(); ++iter, i++) { const std::string& id = iter->first; @@ -220,6 +218,31 @@ std::string ManagementServer::getValuesJSON() return ss.str(); } +std::string ManagementServer::get_output_values_json() +{ + unique_lock<mutex> lock(m_statsmutex); + + std::ostringstream ss; + ss << "{ \"output_values\" : {\n"; + + int i = 0; + for (auto iter = m_output_stats.begin(); iter != m_output_stats.end(); + ++iter, i++) + { + auto listen_port = iter->first; + auto num_connections = iter->second; + if (i > 0) { + ss << " ,\n"; + } + ss << " \"edi_tcp_" << listen_port << "\" : { \"num_connections\": " << + num_connections << "} "; + } + + ss << "}\n}\n"; + + return ss.str(); +} + ManagementServer::ManagementServer() : m_zmq_context(), m_zmq_sock(m_zmq_context, ZMQ_REP), @@ -280,8 +303,10 @@ void ManagementServer::serverThread() if (pollItems[0].revents & ZMQ_POLLIN) { zmq::message_t zmq_message; - m_zmq_sock.recv(zmq_message); - handle_message(zmq_message); + const auto r = m_zmq_sock.recv(zmq_message); + if (r.has_value()) { + handle_message(zmq_message); + } } } } @@ -321,10 +346,13 @@ void ManagementServer::handle_message(zmq::message_t& zmq_message) << "}\n"; } else if (data == "config") { - answer << getStatConfigJSON(); + answer << get_input_config_json(); } else if (data == "values") { - answer << getValuesJSON(); + answer << get_input_values_json(); + } + else if (data == "output_values") { + answer << get_output_values_json(); } else if (data == "getptree") { unique_lock<mutex> lock(m_configmutex); @@ -364,12 +392,12 @@ InputStat::InputStat(const std::string& name) : InputStat::~InputStat() { - get_mgmt_server().unregisterInput(m_name); + get_mgmt_server().unregister_input(m_name); } void InputStat::registerAtServer() { - get_mgmt_server().registerInput(this); + get_mgmt_server().register_input(this); } void InputStat::notifyBuffer(long bufsize) @@ -445,7 +473,7 @@ void InputStat::notifyPeakLevels(int peak_left, int peak_right) } } -void InputStat::notifyUnderrun(void) +void InputStat::notifyUnderrun() { unique_lock<mutex> lock(m_mutex); @@ -464,7 +492,7 @@ void InputStat::notifyUnderrun(void) } } -void InputStat::notifyOverrun(void) +void InputStat::notifyOverrun() { unique_lock<mutex> lock(m_mutex); @@ -610,11 +638,7 @@ input_state_t InputStat::determineState() // STATE CALCULATION - /* If the buffer has been empty for more than - * INPUT_NODATA_TIMEOUT, we go to the NoData state. - * - * Consider an empty deque to be NoData too. - */ + /* Consider an empty deque to be NoData. */ if (std::all_of( m_buffer_fill_stats.begin(), m_buffer_fill_stats.end(), [](const fill_stat_t& fs) { return fs.bufsize == 0; }) ) { |
