/*
Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
A TCP Socket server that serves state information and statistics for
monitoring purposes, and also serves the internal configuration
property tree.
*/
/*
This file is part of ODR-DabMux.
ODR-DabMux is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
ODR-DabMux is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with ODR-DabMux. If not, see .
*/
#include
#include
#include
#include
#include
#include
#include
#include "ManagementServer.h"
#include "Log.h"
using namespace std;
#define MIN_FILL_BUFFER_UNDEF (-1)
/* For silence detection, we count the number of occurrences the audio level
* falls below a threshold.
*
* The counter is decreased for each frame that has good audio level.
*
* The counter saturates, and this value defines for how long the
* input will be considered silent after a cut.
*
* If the count reaches a certain value, the input changes state
* to Silence. */
#define INPUT_AUDIO_LEVEL_THRESHOLD -50 // dB
#define INPUT_AUDIO_LEVEL_SILENCE_COUNT 100 // superframes (120ms)
#define INPUT_AUDIO_LEVEL_COUNT_SATURATION 500 // superframes (120ms)
/* An example of how the state changes work.
* The timeout is set to expire in 30 minutes
* at each under-/overrun.
*
* The glitch counter is increased by one for each glitch (can be a
* saturating counter), and set to zero when the counter timeout expires.
*
* The state is then simply depending on the glitch counter value.
*
* Graphical example:
state STREAMING | UNSTABLE | STREAMING
xruns U U U
glitch
counter 0 1 2 3 0
reset
timeout \ |\ |\ |\
\ | \ | \ | \
\ | \ | \ | \
\| \ | \| \
` \| ` \
` \
\
\
\
\
timeout expires ___________________\
<--30min-->
*/
/* The delay after which the glitch counter is reset */
static constexpr auto
INPUT_COUNTER_RESET_TIME = std::chrono::minutes(30);
/* How many glitches we tolerate in Streaming state before
* we consider the input Unstable */
static constexpr int
INPUT_UNSTABLE_THRESHOLD = 3;
/* Keep 30s of min/max buffer fill information so that we can catch meaningful
* values even if we have a slow poller */
static constexpr auto
BUFFER_STATS_KEEP_DURATION = std::chrono::seconds(30);
/* Audio level information changes faster than buffer levels, so it makes sense
* to poll much faster. If we take the peak over too much data, we will hide
* the interesting short-time fluctuations. At the same time, we want to have a
* statistic that also catches the rare peaks, for slow pollers. */
static constexpr auto
PEAK_STATS_SHORT_WINDOW = std::chrono::milliseconds(500);
static constexpr auto
PEAK_STATS_KEEP_DURATION = std::chrono::minutes(5);
ManagementServer& get_mgmt_server()
{
static ManagementServer mgmt_server;
return mgmt_server;
/* Warning, do not use the mgmt_server in the destructor
* of another global object: you don't know which one
* gets destroyed first
*/
}
void ManagementServer::register_input(InputStat* is)
{
unique_lock lock(m_statsmutex);
std::string id(is->get_name());
if (m_input_stats.count(id) == 1) {
etiLog.level(error) <<
"Double registration in MGMT Server with id '" <<
id << "'";
return;
}
m_input_stats[id] = is;
}
void ManagementServer::unregister_input(std::string id)
{
unique_lock lock(m_statsmutex);
if (m_input_stats.count(id) == 1) {
m_input_stats.erase(id);
}
}
// outputs will never disappear, no need to have a "remove" logic
void ManagementServer::update_edi_tcp_output_stat(
uint16_t listen_port,
const std::vector& stats)
{
unique_lock lock(m_statsmutex);
m_output_stats[listen_port] = stats;
}
bool ManagementServer::isInputRegistered(std::string& id)
{
unique_lock lock(m_statsmutex);
if (m_input_stats.count(id) == 0) {
etiLog.level(error) <<
"Management Server: id '" <<
id << "' does was not registered";
return false;
}
return true;
}
std::string ManagementServer::get_json_stats_for_http(std::optional clocktai_expires_at) const
{
json::map_t j;
j["version"] = VERSION;
j["global_input_state"] = std::nullopt; // TODO
if (clocktai_expires_at) {
j["clock_tai_expiry"] = *clocktai_expires_at;
}
else {
j["clock_tai_expiry"] = std::nullopt;
}
j["inputs"] = get_input_values();
auto ov = get_output_values();
j["outputs"] = ov.values;
j["num_output_connections"] = ov.total_num_connections;
return json::map_to_json(j);
}
json::map_t ManagementServer::get_input_config_json()
{
unique_lock lock(m_statsmutex);
std::vector v;
for (const auto& stat : m_input_stats) {
json::value_t id = stat.first;
v.emplace_back(id);
}
json::map_t j;
j["config"] = v;
return j;
}
json::map_t ManagementServer::get_input_values() const
{
unique_lock lock(m_statsmutex);
json::map_t j;
for (const auto& stat : m_input_stats) {
const std::string& id = stat.first;
InputStat* stats = stat.second;
j[id] = stats->encodeValues();
}
return j;
}
ManagementServer::output_stats ManagementServer::get_output_values() const
{
unique_lock lock(m_statsmutex);
output_stats ret;
for (const auto& stat : m_output_stats) {
auto listen_port = stat.first;
auto output_stats = stat.second;
ret.total_num_connections += output_stats.size();
string key = "edi_tcp_" + to_string(listen_port);
json::map_t o;
o["num_connections"] = output_stats.size();
vector remote_addresses;
for (const auto& s : output_stats) {
remote_addresses.emplace_back(s.remote_address.to_string());
}
o["remote_addresses"] = remote_addresses;
ret.values[key] = std::move(o);
}
return ret;
}
ManagementServer::ManagementServer() :
m_zmq_context(),
m_zmq_sock(m_zmq_context, ZMQ_REP),
m_running(false),
m_fault(false)
{ }
ManagementServer::~ManagementServer()
{
m_running = false;
if (m_thread.joinable()) {
m_thread.join();
m_fault = false;
}
}
void ManagementServer::open(int listenport)
{
m_listenport = listenport;
if (m_listenport > 0) {
m_thread = std::thread(&ManagementServer::serverThread, this);
}
}
void ManagementServer::restart()
{
m_restarter_thread = thread(&ManagementServer::restart_thread, this, 0);
}
// This runs in a separate thread, because
// it would take too long to be done in the main loop
// thread.
void ManagementServer::restart_thread(long)
{
m_running = false;
if (m_thread.joinable()) {
m_thread.join();
m_fault = false;
}
m_thread = thread(&ManagementServer::serverThread, this);
}
void ManagementServer::serverThread()
{
m_running = true;
m_fault = false;
try {
std::string bind_addr = "tcp://127.0.0.1:" + to_string(m_listenport);
m_zmq_sock.bind(bind_addr.c_str());
zmq::pollitem_t pollItems[] = { {m_zmq_sock, 0, ZMQ_POLLIN, 0} };
while (m_running) {
zmq::poll(pollItems, 1, 1000);
if (pollItems[0].revents & ZMQ_POLLIN) {
zmq::message_t zmq_message;
const auto r = m_zmq_sock.recv(zmq_message);
if (r.has_value()) {
handle_message(zmq_message);
}
}
}
}
catch (const exception &e) {
etiLog.level(error) << "Exception in ManagementServer: " <<
e.what();
}
m_fault = true;
}
void ManagementServer::handle_message(zmq::message_t& zmq_message)
{
std::stringstream answer;
std::string data((char*)zmq_message.data(), zmq_message.size());
try {
if (data == "info") {
answer <<
"{ " <<
"\"service\": \"" <<
PACKAGE_NAME << " " <<
#if defined(GITVERSION)
GITVERSION <<
#else
PACKAGE_VERSION <<
#endif
" MGMT Server\", "
<<
"\"version\": \"" <<
#if defined(GITVERSION)
GITVERSION <<
#else
PACKAGE_VERSION <<
#endif
"\" "
<< "}\n";
}
else if (data == "config") {
answer << json::map_to_json(get_input_config_json());
}
else if (data == "values") {
json::map_t root;
root["values"] = get_input_values();
answer << json::map_to_json(root);
}
else if (data == "output_values") {
json::map_t root;
auto ov = get_output_values();
root["output_values"] = ov.values;
answer << json::map_to_json(root);
}
else if (data == "getptree") {
unique_lock lock(m_configmutex);
boost::property_tree::json_parser::write_json(answer, m_pt);
}
else {
etiLog.level(warn) << "ManagementServer: Invalid request '" << data << "'";
answer << "Invalid command";
}
std::string answerstr(answer.str());
zmq::const_buffer message(answerstr.data(), answerstr.size());
m_zmq_sock.send(message, zmq::send_flags::none);
}
catch (const std::exception& e) {
etiLog.level(error) <<
"MGMT server caught exception: " <<
e.what();
}
}
void ManagementServer::update_ptree(const boost::property_tree::ptree& pt)
{
if (m_running) {
unique_lock lock(m_configmutex);
m_pt = pt;
}
}
/************************************************/
InputStat::InputStat(const std::string& name) :
m_name(name),
m_time_last_event(std::chrono::steady_clock::now())
{
}
InputStat::~InputStat()
{
get_mgmt_server().unregister_input(m_name);
}
void InputStat::registerAtServer()
{
get_mgmt_server().register_input(this);
}
void InputStat::notifyBuffer(long bufsize)
{
unique_lock lock(m_mutex);
using namespace std::chrono;
const auto time_now = steady_clock::now();
m_buffer_fill_stats.push_front({time_now, bufsize});
prune_statistics(time_now);
}
void InputStat::notifyTimestampOffset(double offset)
{
unique_lock lock(m_mutex);
m_last_tist_offset = offset;
}
void InputStat::notifyPeakLevels(int peak_left, int peak_right)
{
unique_lock lock(m_mutex);
using namespace std::chrono;
const auto time_now = steady_clock::now();
m_peak_stats.push_front({time_now, peak_left, peak_right});
prune_statistics(time_now);
if (m_peak_stats.size() >= 2) {
// Calculate the peak over the short window
vector short_peaks;
copy_if(m_peak_stats.begin(), m_peak_stats.end(),
back_inserter(short_peaks),
[&](const peak_stat_t& ps) {
return ps.timestamp + PEAK_STATS_SHORT_WINDOW >= time_now;
});
const auto& short_left_peak_max = max_element(
short_peaks.begin(), short_peaks.end(),
[](const peak_stat_t& lhs, const peak_stat_t& rhs) {
return lhs.peak_left < rhs.peak_left;
});
const auto& short_right_peak_max = max_element(
short_peaks.begin(), short_peaks.end(),
[](const peak_stat_t& lhs, const peak_stat_t& rhs) {
return lhs.peak_right < rhs.peak_right;
});
// Using the lower of the two channels allows us to detect if only one
// channel is silent.
const int lower_peak = min(
short_left_peak_max->peak_left, short_right_peak_max->peak_right);
// State
const int16_t int16_max = std::numeric_limits::max();
int peak_dB = lower_peak ?
round(20*log10((double)lower_peak / int16_max)) :
-90;
if (peak_dB < INPUT_AUDIO_LEVEL_THRESHOLD) {
if (m_silence_counter < INPUT_AUDIO_LEVEL_COUNT_SATURATION) {
m_silence_counter++;
}
}
else {
if (m_silence_counter > 0) {
m_silence_counter--;
}
}
}
}
void InputStat::notifyUnderrun()
{
unique_lock lock(m_mutex);
// Statistics
m_num_underruns++;
// State
m_time_last_event = std::chrono::steady_clock::now();
if (m_glitch_counter < INPUT_UNSTABLE_THRESHOLD) {
m_glitch_counter++;
}
else {
// As we don't receive level notifications anymore, clear the
// audio level information
m_peak_stats.clear();
}
}
void InputStat::notifyOverrun()
{
unique_lock lock(m_mutex);
// Statistics
m_num_overruns++;
// State
m_time_last_event = std::chrono::steady_clock::now();
if (m_glitch_counter < INPUT_UNSTABLE_THRESHOLD) {
m_glitch_counter++;
}
}
void InputStat::notifyVersion(const std::string& version, uint32_t uptime_s)
{
unique_lock lock(m_mutex);
m_version = version;
m_uptime_s = uptime_s;
}
json::map_t InputStat::encodeValues()
{
const int16_t int16_max = std::numeric_limits::max();
unique_lock lock(m_mutex);
int peak_left_short = 0;
int peak_right_short = 0;
int peak_left = 0;
int peak_right = 0;
if (not m_peak_stats.empty()) {
peak_left = max_element(m_peak_stats.begin(), m_peak_stats.end(),
[](const peak_stat_t& lhs, const peak_stat_t& rhs) {
return lhs.peak_left < rhs.peak_left;
})->peak_left;
peak_right = max_element(m_peak_stats.begin(), m_peak_stats.end(),
[](const peak_stat_t& lhs, const peak_stat_t& rhs) {
return lhs.peak_right < rhs.peak_right;
})->peak_right;
if (m_peak_stats.size() > m_short_window_length) {
peak_left_short = max_element(m_peak_stats.begin(),
m_peak_stats.begin() + m_short_window_length,
[](const peak_stat_t& lhs, const peak_stat_t& rhs) {
return lhs.peak_left < rhs.peak_left;
})->peak_left;
peak_right_short = max_element(m_peak_stats.begin(),
m_peak_stats.begin() + m_short_window_length,
[](const peak_stat_t& lhs, const peak_stat_t& rhs) {
return lhs.peak_right < rhs.peak_right;
})->peak_right;
}
else {
peak_left_short = peak_left;
peak_right_short = peak_right;
}
}
long min_fill_buffer = MIN_FILL_BUFFER_UNDEF;
long max_fill_buffer = 0;
if (not m_buffer_fill_stats.empty()) {
const auto& buffer_min_max_fill = minmax_element(
m_buffer_fill_stats.begin(), m_buffer_fill_stats.end(),
[](const fill_stat_t& lhs, const fill_stat_t& rhs) {
return lhs.bufsize < rhs.bufsize;
});
min_fill_buffer = buffer_min_max_fill.first->bufsize;
max_fill_buffer = buffer_min_max_fill.second->bufsize;
}
/* convert to dB */
auto to_dB = [](int p) {
int dB = -90;
if (p) {
dB = round(20*log10((double)p / int16_max));
}
return dB;
};
auto version = m_version;
size_t pos = 0;
while ((pos = version.find("\"", pos)) != std::string::npos) {
version.replace(pos, 1, "\\\"");
pos++;
}
json::map_t inputstat;
inputstat["min_fill"] = min_fill_buffer;
inputstat["max_fill"] = max_fill_buffer;
inputstat["peak_left"] = to_dB(peak_left_short);
inputstat["peak_right"] = to_dB(peak_right_short);
inputstat["peak_left_slow"] = to_dB(peak_left);
inputstat["peak_right_slow"] = to_dB(peak_right);
inputstat["num_underruns"] = m_num_underruns;
inputstat["num_overruns"] = m_num_overruns;
inputstat["last_tist_offset"] = m_last_tist_offset;
inputstat["version"] = version;
inputstat["uptime"] = m_uptime_s;
inputstat["state"] = "";
string state;
switch (determineState()) {
case input_state_t::NoData:
inputstat["state"] = "NoData (1)";
break;
case input_state_t::Unstable:
inputstat["state"] = "Unstable (2)";
break;
case input_state_t::Silence:
inputstat["state"] = "Silent (3)";
break;
case input_state_t::Streaming:
inputstat["state"] = "Streaming (4)";
break;
}
json::map_t ret;
ret["inputstat"] = inputstat;
return ret;
}
input_state_t InputStat::determineState()
{
const auto now = std::chrono::steady_clock::now();
prune_statistics(now);
input_state_t state;
/* if the last event was more that INPUT_COUNTER_RESET_TIME
* ago, the timeout has expired. We can reset our
* glitch counter.
*/
if (now - m_time_last_event > INPUT_COUNTER_RESET_TIME) {
m_glitch_counter = 0;
}
// STATE CALCULATION
/* Consider an empty deque to be NoData. */
if (std::all_of(
m_buffer_fill_stats.begin(), m_buffer_fill_stats.end(),
[](const fill_stat_t& fs) { return fs.bufsize == 0; }) ) {
state = input_state_t::NoData;
}
/* Otherwise, the state depends on the glitch counter */
else if (m_glitch_counter >= INPUT_UNSTABLE_THRESHOLD) {
state = input_state_t::Unstable;
}
else {
/* The input is streaming, check if the audio level is too low */
if (m_silence_counter > INPUT_AUDIO_LEVEL_SILENCE_COUNT) {
state = input_state_t::Silence;
}
else {
state = input_state_t::Streaming;
}
}
return state;
}
void InputStat::prune_statistics(const std::chrono::time_point& time_now)
{
// Keep only stats whose timestamp are more recent than
// BUFFER_STATS_KEEP_DURATION ago
m_buffer_fill_stats.erase(remove_if(
m_buffer_fill_stats.begin(), m_buffer_fill_stats.end(),
[&](const fill_stat_t& fs) {
return fs.timestamp + BUFFER_STATS_KEEP_DURATION < time_now;
}),
m_buffer_fill_stats.end());
// Keep only stats whose timestamp are more recent than
// BUFFER_STATS_KEEP_DURATION ago
m_peak_stats.erase(remove_if(
m_peak_stats.begin(), m_peak_stats.end(),
[&](const peak_stat_t& ps) {
return ps.timestamp + PEAK_STATS_KEEP_DURATION < time_now;
}),
m_peak_stats.end());
}