diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-10-24 22:05:27 +0200 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-10-24 22:05:27 +0200 | 
| commit | d18935e22e3c687c152bd35ad05c66bf5a19e00a (patch) | |
| tree | 805e41b15e7092939fa1a06ba5b4c45cfff93e32 | |
| parent | 31e8a4fd8e4c057fffc84c071cc0b71a7989795c (diff) | |
| download | dabmux-d18935e22e3c687c152bd35ad05c66bf5a19e00a.tar.gz dabmux-d18935e22e3c687c152bd35ad05c66bf5a19e00a.tar.bz2 dabmux-d18935e22e3c687c152bd35ad05c66bf5a19e00a.zip  | |
Refactor StatsServer to be a bit more generic
| -rw-r--r-- | src/StatsServer.cpp | 77 | ||||
| -rw-r--r-- | src/StatsServer.h | 34 | ||||
| -rw-r--r-- | src/dabInputZmq.cpp | 14 | ||||
| -rw-r--r-- | src/dabInputZmq.h | 3 | 
4 files changed, 71 insertions, 57 deletions
diff --git a/src/StatsServer.cpp b/src/StatsServer.cpp index 25e1b76..558a479 100644 --- a/src/StatsServer.cpp +++ b/src/StatsServer.cpp @@ -38,10 +38,12 @@  #include "StatsServer.h"  #include "Log.h" -void StatsServer::registerInput(std::string id) +void StatsServer::registerInput(InputStat* is)  {      boost::mutex::scoped_lock lock(m_mutex); +    std::string id(is->get_name()); +      if (m_inputStats.count(id) == 1) {          etiLog.level(error) <<              "Double registration in Stats Server with id '" << @@ -49,44 +51,23 @@ void StatsServer::registerInput(std::string id)          return;      } -    InputStat is;      m_inputStats[id] = is;  } -void StatsServer::notifyBuffer(std::string id, long bufsize) +void StatsServer::unregisterInput(std::string id)  {      boost::mutex::scoped_lock lock(m_mutex); -    if (isInputRegistered(id)) -        m_inputStats[id].notifyBuffer(bufsize); +    if (m_inputStats.count(id) == 1) { +        m_inputStats.erase(id); +    }  } -void StatsServer::notifyPeakLevels(std::string id, int peak_left, int peak_right) -{ -    boost::mutex::scoped_lock lock(m_mutex); -    if (isInputRegistered(id)) -        m_inputStats[id].notifyPeakLevels(peak_left, peak_right); -} - -void StatsServer::notifyUnderrun(std::string id) -{ -    boost::mutex::scoped_lock lock(m_mutex); - -    if (isInputRegistered(id)) -        m_inputStats[id].notifyUnderrun(); -} - -void StatsServer::notifyOverrun(std::string id) +bool StatsServer::isInputRegistered(std::string& id)  {      boost::mutex::scoped_lock lock(m_mutex); -    if (isInputRegistered(id)) -        m_inputStats[id].notifyOverrun(); -} - -bool StatsServer::isInputRegistered(std::string& id) -{      if (m_inputStats.count(id) == 0) {          etiLog.level(error) <<              "Stats Server id '" << @@ -98,10 +79,12 @@ bool StatsServer::isInputRegistered(std::string& id)  std::string StatsServer::getConfigJSON()  { +    boost::mutex::scoped_lock lock(m_mutex); +      std::ostringstream ss;      ss << "{ \"config\" : [\n"; -    std::map<std::string,InputStat>::iterator iter; +    std::map<std::string,InputStat*>::iterator iter;      int i = 0;      for(iter = m_inputStats.begin(); iter != m_inputStats.end();              ++iter, i++) @@ -122,24 +105,26 @@ std::string StatsServer::getConfigJSON()  std::string StatsServer::getValuesJSON()  { +    boost::mutex::scoped_lock lock(m_mutex); +      std::ostringstream ss;      ss << "{ \"values\" : {\n"; -    std::map<std::string,InputStat>::iterator iter; +    std::map<std::string,InputStat*>::iterator iter;      int i = 0;      for(iter = m_inputStats.begin(); iter != m_inputStats.end();              ++iter, i++)      {          const std::string& id = iter->first; -        InputStat& stats = iter->second; +        InputStat* stats = iter->second;          if (i > 0) {              ss << " ,\n";          }          ss << " \"" << id << "\" : "; -        ss << stats.encodeValuesJSON(); -        stats.reset(); +        ss << stats->encodeValuesJSON(); +        stats->reset();      }      ss << "}\n}\n"; @@ -149,24 +134,26 @@ std::string StatsServer::getValuesJSON()  std::string StatsServer::getStateJSON()  { +    boost::mutex::scoped_lock lock(m_mutex); +      std::ostringstream ss;      ss << "{\n"; -    std::map<std::string,InputStat>::iterator iter; +    std::map<std::string,InputStat*>::iterator iter;      int i = 0;      for(iter = m_inputStats.begin(); iter != m_inputStats.end();              ++iter, i++)      {          const std::string& id = iter->first; -        InputStat& stats = iter->second; +        InputStat* stats = iter->second;          if (i > 0) {              ss << " ,\n";          }          ss << " \"" << id << "\" : "; -        ss << stats.encodeStateJSON(); -        stats.reset(); +        ss << stats->encodeStateJSON(); +        stats->reset();      }      ss << "}\n"; @@ -278,17 +265,14 @@ void StatsServer::serverThread()              }              if (strcmp(buffer, "config\n") == 0) { -                boost::mutex::scoped_lock lock(m_mutex);                  std::string json = getConfigJSON();                  n = write(accepted_sock, json.c_str(), json.size());              }              else if (strcmp(buffer, "values\n") == 0) { -                boost::mutex::scoped_lock lock(m_mutex);                  std::string json = getValuesJSON();                  n = write(accepted_sock, json.c_str(), json.size());              }              else if (strcmp(buffer, "state\n") == 0) { -                boost::mutex::scoped_lock lock(m_mutex);                  std::string json = getStateJSON();                  n = write(accepted_sock, json.c_str(), json.size());              } @@ -315,6 +299,17 @@ end_serverthread:      }  } +/************************************************/ + +void InputStat::registerAtServer() +{ +    global_stats->registerInput(this); +} + +InputStat::~InputStat() +{ +    global_stats->unregisterInput(m_name); +}  std::string InputStat::encodeValuesJSON()  { @@ -322,6 +317,8 @@ std::string InputStat::encodeValuesJSON()      const int16_t int16_max = std::numeric_limits<int16_t>::max(); +    boost::mutex::scoped_lock lock(m_mutex); +      /* convert to dB */      int dB_l = peak_left  ? round(20*log10((double)peak_left / int16_max))  : -90;      int dB_r = peak_right ? round(20*log10((double)peak_right / int16_max)) : -90; @@ -369,6 +366,8 @@ std::string InputStat::encodeStateJSON()  input_state_t InputStat::determineState(void)  { +    boost::mutex::scoped_lock lock(m_mutex); +      time_t now = time(NULL);      input_state_t state; diff --git a/src/StatsServer.h b/src/StatsServer.h index 731d8ca..26f90d4 100644 --- a/src/StatsServer.h +++ b/src/StatsServer.h @@ -146,7 +146,8 @@ enum input_state_t  class InputStat  {      public: -        InputStat() { +        InputStat(std::string name) : m_name(name) +        {              /* Statistics */              num_underruns = 0;              num_overruns = 0; @@ -162,6 +163,9 @@ class InputStat              reset();          } +        void registerAtServer(void); + +        ~InputStat();          // Gets called each time the statistics are transmitted,          // and resets the counters to zero @@ -174,11 +178,15 @@ class InputStat              peak_right = 0;          } +        std::string& get_name(void) { return m_name; } +          /* This function is called for every frame read by           * the multiplexer           */          void notifyBuffer(long bufsize)          { +            boost::mutex::scoped_lock lock(m_mutex); +              // Statistics              if (bufsize > max_fill_buffer) {                  max_fill_buffer = bufsize; @@ -198,6 +206,8 @@ class InputStat          void notifyPeakLevels(int peak_left, int peak_right)          { +            boost::mutex::scoped_lock lock(m_mutex); +              // Statistics              if (peak_left > this->peak_left) {                  this->peak_left = peak_left; @@ -233,6 +243,8 @@ class InputStat          void notifyUnderrun(void)          { +            boost::mutex::scoped_lock lock(m_mutex); +              // Statistics              num_underruns++; @@ -245,6 +257,8 @@ class InputStat          void notifyOverrun(void)          { +            boost::mutex::scoped_lock lock(m_mutex); +              // Statistics              num_overruns++; @@ -262,6 +276,8 @@ class InputStat          input_state_t determineState(void);      private: +        std::string m_name; +          /************ STATISTICS ***********/          // minimum and maximum buffer fill since last reset          long min_fill_buffer; @@ -283,6 +299,9 @@ class InputStat          time_t m_time_last_buffer_nonempty;          bool m_buffer_empty; +        // The mutex that has to be held during all notify and readout +        mutable boost::mutex m_mutex; +  };  class StatsServer @@ -313,19 +332,12 @@ class StatsServer              m_thread.join();          } -        void registerInput(std::string id); +        void registerInput(InputStat* is); +        void unregisterInput(std::string id);          bool fault_detected() { return m_fault; }          void restart(void); -        /* The following notify functions are used by the input to -         * inform the StatsServer about new values -         */ -        void notifyBuffer(std::string id, long bufsize); -        void notifyPeakLevels(std::string id, int peak_left, int peak_right); -        void notifyUnderrun(std::string id); -        void notifyOverrun(std::string id); -      private:          void restart_thread(long); @@ -348,7 +360,7 @@ class StatsServer          int m_sock;          /******* Statistics Data ********/ -        std::map<std::string, InputStat> m_inputStats; +        std::map<std::string, InputStat*> m_inputStats;          /* Return a description of the configuration that will           * allow to define what graphs to be created diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp index 7e79c73..9dab6c9 100644 --- a/src/dabInputZmq.cpp +++ b/src/dabInputZmq.cpp @@ -233,7 +233,7 @@ int DabInputZmqBase::open(const std::string inputUri)      rebind();      // We want to appear in the statistics ! -    global_stats->registerInput(m_name); +    m_stats.registerAtServer();      return 0;  } @@ -264,7 +264,7 @@ int DabInputZmqBase::readFrame(void* buffer, int size)      /* Notify of a buffer overrun, and drop some frames */      if (m_frame_buffer.size() >= m_config.buffer_size) { -        global_stats->notifyOverrun(m_name); +        m_stats.notifyOverrun();          /* If the buffer is really too full, we drop as many frames as needed           * to get down to the prebuffering size. We would like to have our buffer @@ -308,13 +308,13 @@ int DabInputZmqBase::readFrame(void* buffer, int size)                  m_name.c_str());          /* During prebuffering, give a zeroed frame to the mux */ -        global_stats->notifyUnderrun(m_name); +        m_stats.notifyUnderrun();          memset(buffer, 0, size);          return size;      }      // Save stats data in bytes, not in frames -    global_stats->notifyBuffer(m_name, m_frame_buffer.size() * size); +    m_stats.notifyBuffer(m_frame_buffer.size() * size);      if (m_frame_buffer.empty()) {          etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n", @@ -323,7 +323,7 @@ int DabInputZmqBase::readFrame(void* buffer, int size)          m_prebuf_current = m_config.prebuffering;          /* We have no data to give, we give a zeroed frame */ -        global_stats->notifyUnderrun(m_name); +        m_stats.notifyUnderrun();          memset(buffer, 0, size);          return size;      } @@ -373,7 +373,7 @@ int DabInputZmqMPEG::readFromSocket(size_t framesize)          datalen = frame->datasize;          data = ZMQ_FRAME_DATA(frame); -        global_stats->notifyPeakLevels(m_name, frame->audiolevel_left, +        m_stats.notifyPeakLevels(frame->audiolevel_left,                  frame->audiolevel_right);      } @@ -444,7 +444,7 @@ int DabInputZmqAAC::readFromSocket(size_t framesize)          datalen = frame->datasize;          data = ZMQ_FRAME_DATA(frame); -        global_stats->notifyPeakLevels(m_name, frame->audiolevel_left, +        m_stats.notifyPeakLevels(frame->audiolevel_left,                  frame->audiolevel_right);      } diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h index 1597298..2ba846b 100644 --- a/src/dabInputZmq.h +++ b/src/dabInputZmq.h @@ -167,6 +167,7 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable {              m_bitrate(0),              m_enable_input(true),              m_config(config), +            m_stats(m_name),              m_prebuf_current(0) {                  RC_ADD_PARAMETER(enable,                          "If the input is enabled. Set to zero to empty the buffer."); @@ -231,6 +232,8 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable {          std::string m_inputUri; +        InputStat m_stats; +      private:          int m_prebuf_current;  };  | 
