From d18935e22e3c687c152bd35ad05c66bf5a19e00a Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 24 Oct 2014 22:05:27 +0200 Subject: Refactor StatsServer to be a bit more generic --- src/StatsServer.cpp | 77 ++++++++++++++++++++++++++--------------------------- src/StatsServer.h | 34 +++++++++++++++-------- src/dabInputZmq.cpp | 14 +++++----- src/dabInputZmq.h | 3 +++ 4 files changed, 71 insertions(+), 57 deletions(-) diff --git a/src/StatsServer.cpp b/src/StatsServer.cpp index 25e1b76..558a479 100644 --- a/src/StatsServer.cpp +++ b/src/StatsServer.cpp @@ -38,10 +38,12 @@ #include "StatsServer.h" #include "Log.h" -void StatsServer::registerInput(std::string id) +void StatsServer::registerInput(InputStat* is) { boost::mutex::scoped_lock lock(m_mutex); + std::string id(is->get_name()); + if (m_inputStats.count(id) == 1) { etiLog.level(error) << "Double registration in Stats Server with id '" << @@ -49,44 +51,23 @@ void StatsServer::registerInput(std::string id) return; } - InputStat is; m_inputStats[id] = is; } -void StatsServer::notifyBuffer(std::string id, long bufsize) +void StatsServer::unregisterInput(std::string id) { boost::mutex::scoped_lock lock(m_mutex); - if (isInputRegistered(id)) - m_inputStats[id].notifyBuffer(bufsize); + if (m_inputStats.count(id) == 1) { + m_inputStats.erase(id); + } } -void StatsServer::notifyPeakLevels(std::string id, int peak_left, int peak_right) -{ - boost::mutex::scoped_lock lock(m_mutex); - if (isInputRegistered(id)) - m_inputStats[id].notifyPeakLevels(peak_left, peak_right); -} - -void StatsServer::notifyUnderrun(std::string id) -{ - boost::mutex::scoped_lock lock(m_mutex); - - if (isInputRegistered(id)) - m_inputStats[id].notifyUnderrun(); -} - -void StatsServer::notifyOverrun(std::string id) +bool StatsServer::isInputRegistered(std::string& id) { boost::mutex::scoped_lock lock(m_mutex); - if (isInputRegistered(id)) - m_inputStats[id].notifyOverrun(); -} - -bool StatsServer::isInputRegistered(std::string& id) -{ if (m_inputStats.count(id) == 0) { etiLog.level(error) << "Stats Server id '" << @@ -98,10 +79,12 @@ bool StatsServer::isInputRegistered(std::string& id) std::string StatsServer::getConfigJSON() { + boost::mutex::scoped_lock lock(m_mutex); + std::ostringstream ss; ss << "{ \"config\" : [\n"; - std::map::iterator iter; + std::map::iterator iter; int i = 0; for(iter = m_inputStats.begin(); iter != m_inputStats.end(); ++iter, i++) @@ -122,24 +105,26 @@ std::string StatsServer::getConfigJSON() std::string StatsServer::getValuesJSON() { + boost::mutex::scoped_lock lock(m_mutex); + std::ostringstream ss; ss << "{ \"values\" : {\n"; - std::map::iterator iter; + std::map::iterator iter; int i = 0; for(iter = m_inputStats.begin(); iter != m_inputStats.end(); ++iter, i++) { const std::string& id = iter->first; - InputStat& stats = iter->second; + InputStat* stats = iter->second; if (i > 0) { ss << " ,\n"; } ss << " \"" << id << "\" : "; - ss << stats.encodeValuesJSON(); - stats.reset(); + ss << stats->encodeValuesJSON(); + stats->reset(); } ss << "}\n}\n"; @@ -149,24 +134,26 @@ std::string StatsServer::getValuesJSON() std::string StatsServer::getStateJSON() { + boost::mutex::scoped_lock lock(m_mutex); + std::ostringstream ss; ss << "{\n"; - std::map::iterator iter; + std::map::iterator iter; int i = 0; for(iter = m_inputStats.begin(); iter != m_inputStats.end(); ++iter, i++) { const std::string& id = iter->first; - InputStat& stats = iter->second; + InputStat* stats = iter->second; if (i > 0) { ss << " ,\n"; } ss << " \"" << id << "\" : "; - ss << stats.encodeStateJSON(); - stats.reset(); + ss << stats->encodeStateJSON(); + stats->reset(); } ss << "}\n"; @@ -278,17 +265,14 @@ void StatsServer::serverThread() } if (strcmp(buffer, "config\n") == 0) { - boost::mutex::scoped_lock lock(m_mutex); std::string json = getConfigJSON(); n = write(accepted_sock, json.c_str(), json.size()); } else if (strcmp(buffer, "values\n") == 0) { - boost::mutex::scoped_lock lock(m_mutex); std::string json = getValuesJSON(); n = write(accepted_sock, json.c_str(), json.size()); } else if (strcmp(buffer, "state\n") == 0) { - boost::mutex::scoped_lock lock(m_mutex); std::string json = getStateJSON(); n = write(accepted_sock, json.c_str(), json.size()); } @@ -315,6 +299,17 @@ end_serverthread: } } +/************************************************/ + +void InputStat::registerAtServer() +{ + global_stats->registerInput(this); +} + +InputStat::~InputStat() +{ + global_stats->unregisterInput(m_name); +} std::string InputStat::encodeValuesJSON() { @@ -322,6 +317,8 @@ std::string InputStat::encodeValuesJSON() const int16_t int16_max = std::numeric_limits::max(); + boost::mutex::scoped_lock lock(m_mutex); + /* convert to dB */ int dB_l = peak_left ? round(20*log10((double)peak_left / int16_max)) : -90; int dB_r = peak_right ? round(20*log10((double)peak_right / int16_max)) : -90; @@ -369,6 +366,8 @@ std::string InputStat::encodeStateJSON() input_state_t InputStat::determineState(void) { + boost::mutex::scoped_lock lock(m_mutex); + time_t now = time(NULL); input_state_t state; diff --git a/src/StatsServer.h b/src/StatsServer.h index 731d8ca..26f90d4 100644 --- a/src/StatsServer.h +++ b/src/StatsServer.h @@ -146,7 +146,8 @@ enum input_state_t class InputStat { public: - InputStat() { + InputStat(std::string name) : m_name(name) + { /* Statistics */ num_underruns = 0; num_overruns = 0; @@ -162,6 +163,9 @@ class InputStat reset(); } + void registerAtServer(void); + + ~InputStat(); // Gets called each time the statistics are transmitted, // and resets the counters to zero @@ -174,11 +178,15 @@ class InputStat peak_right = 0; } + std::string& get_name(void) { return m_name; } + /* This function is called for every frame read by * the multiplexer */ void notifyBuffer(long bufsize) { + boost::mutex::scoped_lock lock(m_mutex); + // Statistics if (bufsize > max_fill_buffer) { max_fill_buffer = bufsize; @@ -198,6 +206,8 @@ class InputStat void notifyPeakLevels(int peak_left, int peak_right) { + boost::mutex::scoped_lock lock(m_mutex); + // Statistics if (peak_left > this->peak_left) { this->peak_left = peak_left; @@ -233,6 +243,8 @@ class InputStat void notifyUnderrun(void) { + boost::mutex::scoped_lock lock(m_mutex); + // Statistics num_underruns++; @@ -245,6 +257,8 @@ class InputStat void notifyOverrun(void) { + boost::mutex::scoped_lock lock(m_mutex); + // Statistics num_overruns++; @@ -262,6 +276,8 @@ class InputStat input_state_t determineState(void); private: + std::string m_name; + /************ STATISTICS ***********/ // minimum and maximum buffer fill since last reset long min_fill_buffer; @@ -283,6 +299,9 @@ class InputStat time_t m_time_last_buffer_nonempty; bool m_buffer_empty; + // The mutex that has to be held during all notify and readout + mutable boost::mutex m_mutex; + }; class StatsServer @@ -313,19 +332,12 @@ class StatsServer m_thread.join(); } - void registerInput(std::string id); + void registerInput(InputStat* is); + void unregisterInput(std::string id); bool fault_detected() { return m_fault; } void restart(void); - /* The following notify functions are used by the input to - * inform the StatsServer about new values - */ - void notifyBuffer(std::string id, long bufsize); - void notifyPeakLevels(std::string id, int peak_left, int peak_right); - void notifyUnderrun(std::string id); - void notifyOverrun(std::string id); - private: void restart_thread(long); @@ -348,7 +360,7 @@ class StatsServer int m_sock; /******* Statistics Data ********/ - std::map m_inputStats; + std::map m_inputStats; /* Return a description of the configuration that will * allow to define what graphs to be created diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp index 7e79c73..9dab6c9 100644 --- a/src/dabInputZmq.cpp +++ b/src/dabInputZmq.cpp @@ -233,7 +233,7 @@ int DabInputZmqBase::open(const std::string inputUri) rebind(); // We want to appear in the statistics ! - global_stats->registerInput(m_name); + m_stats.registerAtServer(); return 0; } @@ -264,7 +264,7 @@ int DabInputZmqBase::readFrame(void* buffer, int size) /* Notify of a buffer overrun, and drop some frames */ if (m_frame_buffer.size() >= m_config.buffer_size) { - global_stats->notifyOverrun(m_name); + m_stats.notifyOverrun(); /* If the buffer is really too full, we drop as many frames as needed * to get down to the prebuffering size. We would like to have our buffer @@ -308,13 +308,13 @@ int DabInputZmqBase::readFrame(void* buffer, int size) m_name.c_str()); /* During prebuffering, give a zeroed frame to the mux */ - global_stats->notifyUnderrun(m_name); + m_stats.notifyUnderrun(); memset(buffer, 0, size); return size; } // Save stats data in bytes, not in frames - global_stats->notifyBuffer(m_name, m_frame_buffer.size() * size); + m_stats.notifyBuffer(m_frame_buffer.size() * size); if (m_frame_buffer.empty()) { etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n", @@ -323,7 +323,7 @@ int DabInputZmqBase::readFrame(void* buffer, int size) m_prebuf_current = m_config.prebuffering; /* We have no data to give, we give a zeroed frame */ - global_stats->notifyUnderrun(m_name); + m_stats.notifyUnderrun(); memset(buffer, 0, size); return size; } @@ -373,7 +373,7 @@ int DabInputZmqMPEG::readFromSocket(size_t framesize) datalen = frame->datasize; data = ZMQ_FRAME_DATA(frame); - global_stats->notifyPeakLevels(m_name, frame->audiolevel_left, + m_stats.notifyPeakLevels(frame->audiolevel_left, frame->audiolevel_right); } @@ -444,7 +444,7 @@ int DabInputZmqAAC::readFromSocket(size_t framesize) datalen = frame->datasize; data = ZMQ_FRAME_DATA(frame); - global_stats->notifyPeakLevels(m_name, frame->audiolevel_left, + m_stats.notifyPeakLevels(frame->audiolevel_left, frame->audiolevel_right); } diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h index 1597298..2ba846b 100644 --- a/src/dabInputZmq.h +++ b/src/dabInputZmq.h @@ -167,6 +167,7 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable { m_bitrate(0), m_enable_input(true), m_config(config), + m_stats(m_name), m_prebuf_current(0) { RC_ADD_PARAMETER(enable, "If the input is enabled. Set to zero to empty the buffer."); @@ -231,6 +232,8 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable { std::string m_inputUri; + InputStat m_stats; + private: int m_prebuf_current; }; -- cgit v1.2.3