diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-09-18 11:49:11 +0200 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-09-18 11:49:11 +0200 | 
| commit | 015427d9e74f34dc7d0f7fbad4ad1eaad6537cce (patch) | |
| tree | 01ae9249194fbff234e32b59c07f7894aef16878 /src | |
| parent | 9c2e691744f96ae7ace8b82385b080ee9d858906 (diff) | |
| download | dabmux-015427d9e74f34dc7d0f7fbad4ad1eaad6537cce.tar.gz dabmux-015427d9e74f34dc7d0f7fbad4ad1eaad6537cce.tar.bz2 dabmux-015427d9e74f34dc7d0f7fbad4ad1eaad6537cce.zip | |
EDI in: add audio levels metadata and source version
Diffstat (limited to 'src')
| -rw-r--r-- | src/ConfigParser.cpp | 2 | ||||
| -rw-r--r-- | src/ManagementServer.cpp | 20 | ||||
| -rw-r--r-- | src/ManagementServer.h | 4 | ||||
| -rw-r--r-- | src/input/Edi.cpp | 74 | ||||
| -rw-r--r-- | src/input/Edi.h | 4 | 
5 files changed, 95 insertions, 9 deletions
| diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index b1e785a..2fb375b 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -956,7 +956,7 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,              }          }          else if (proto == "edi") { -            subchan->input = make_shared<Inputs::Edi>(); +            subchan->input = make_shared<Inputs::Edi>(subchanuid);          }          else if (proto == "stp") {              subchan->input = make_shared<Inputs::Sti_d_Rtp>(); diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp index 201fc7b..783a40b 100644 --- a/src/ManagementServer.cpp +++ b/src/ManagementServer.cpp @@ -485,6 +485,14 @@ void InputStat::notifyOverrun(void)      }  } +void InputStat::notifyVersion(const std::string& version, uint32_t uptime_s) +{ +    unique_lock<mutex> lock(m_mutex); + +    m_version = version; +    m_uptime_s = uptime_s; +} +  std::string InputStat::encodeValuesJSON()  {      std::ostringstream ss; @@ -548,6 +556,13 @@ std::string InputStat::encodeValuesJSON()          return dB;      }; +    auto version = m_version; +    size_t pos = 0; +    while ((pos = version.find("\"", pos)) != std::string::npos) { +         version.replace(pos, 1, "\\\""); +         pos++; +    } +      ss <<      "{ \"inputstat\" : {"          "\"min_fill\": " << min_fill_buffer << ", " @@ -557,7 +572,10 @@ std::string InputStat::encodeValuesJSON()          "\"peak_left_slow\": " << to_dB(peak_left) << ", "          "\"peak_right_slow\": " << to_dB(peak_right) << ", "          "\"num_underruns\": " << m_num_underruns << ", " -        "\"num_overruns\": " << m_num_overruns << ", "; +        "\"num_overruns\": " << m_num_overruns << ", " +        "\"version\": \"" << version << "\", " +        "\"uptime\": " << m_uptime_s << ", " +        ;      ss << "\"state\": "; diff --git a/src/ManagementServer.h b/src/ManagementServer.h index 18af48c..5b52957 100644 --- a/src/ManagementServer.h +++ b/src/ManagementServer.h @@ -100,6 +100,7 @@ class InputStat          void notifyPeakLevels(int peak_left, int peak_right);          void notifyUnderrun(void);          void notifyOverrun(void); +        void notifyVersion(const std::string& version, uint32_t uptime_s);          std::string encodeValuesJSON(void);          input_state_t determineState(void); @@ -131,6 +132,9 @@ class InputStat          size_t m_short_window_length = 0; +        std::string m_version; +        uint32_t m_uptime_s = 0; +          /************* STATE ***************/          /* Variables used for determining the input state */          int m_glitch_counter = 0; // saturating counter diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp index 247c16f..f6d4c07 100644 --- a/src/input/Edi.cpp +++ b/src/input/Edi.cpp @@ -52,14 +52,18 @@ constexpr size_t MAX_FRAMES_QUEUED = 1000;  constexpr size_t MAX_FRAMES_QUEUED_PREBUFFERING = 500;  /* When not using timestamping, how many frames to prebuffer. - * TODO should be configurable as ZMQ. - */ + * TODO should be configurable as ZMQ.  */  constexpr size_t NUM_FRAMES_PREBUFFERING = 10; +/* Consider the buffer to be full on the receive side if it's above the overrun threshold. */ +constexpr size_t MAX_FRAMES_OVERRUN = 900; -Edi::Edi() : + +Edi::Edi(const std::string& name) :      m_tcp_receive_server(TCP_BLOCKSIZE),      m_sti_writer(), -    m_sti_decoder(m_sti_writer, VERBOSE) +    m_sti_decoder(m_sti_writer, VERBOSE), +    m_name(name), +    m_stats(name)  { }  Edi::~Edi() { @@ -99,7 +103,7 @@ void Edi::open(const std::string& name)          throw runtime_error("Cannot parse EDI input URI");      } -    m_name = name; +    m_stats.registerAtServer();      m_running = true;      m_thread = std::thread(&Edi::m_run, this); @@ -107,6 +111,9 @@ void Edi::open(const std::string& name)  size_t Edi::readFrame(uint8_t *buffer, size_t size)  { +    // Save stats data in bytes, not in frames +    m_stats.notifyBuffer(m_frames.size() * size); +      EdiDecoder::sti_frame_t sti;      if (m_is_prebuffering) {          m_is_prebuffering = m_frames.size() < NUM_FRAMES_PREBUFFERING; @@ -117,6 +124,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size)          return 0;      }      else if (not m_pending_sti_frame.frame.empty()) { +        // Can only happen when switching from timestamp-based buffer management!          if (m_pending_sti_frame.frame.size() != size) {              etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<                  m_pending_sti_frame.frame.size() << " received, " << size << " requested"; @@ -124,6 +132,13 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size)              return 0;          }          else { +            if (not m_pending_sti_frame.version_data.version.empty()) { +                m_stats.notifyVersion( +                        m_pending_sti_frame.version_data.version, +                        m_pending_sti_frame.version_data.uptime_s); +            } +            m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left, m_pending_sti_frame.audio_levels.right); +              copy(m_pending_sti_frame.frame.begin(),                      m_pending_sti_frame.frame.end(),                      buffer); @@ -137,6 +152,28 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size)              return 0;          }          else if (sti.frame.size() == size) { +            // Steady-state when everything works well +            if (m_frames.size() > MAX_FRAMES_OVERRUN) { +                m_stats.notifyOverrun(); + +                /* If the buffer is too full, we drop as many frames as needed +                 * to get down to the prebuffering size. We would like to have our buffer +                 * filled to the prebuffering length. */ +                size_t over_max = m_frames.size() - NUM_FRAMES_PREBUFFERING; + +                while (over_max--) { +                    EdiDecoder::sti_frame_t discard; +                    m_frames.try_pop(discard); +                } +            } + +            if (not sti.version_data.version.empty()) { +                m_stats.notifyVersion( +                        sti.version_data.version, +                        sti.version_data.uptime_s); +            } +            m_stats.notifyPeakLevels(sti.audio_levels.left, sti.audio_levels.right); +              copy(sti.frame.cbegin(), sti.frame.cend(), buffer);              return size;          } @@ -151,6 +188,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size)          memset(buffer, 0, size * sizeof(*buffer));          m_is_prebuffering = true;          etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering"; +        m_stats.notifyUnderrun();          return 0;      }  } @@ -161,6 +199,8 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc          m_frames.try_pop(m_pending_sti_frame);      } +    m_stats.notifyBuffer(m_frames.size() * size); +      if (m_is_prebuffering) {          if (m_pending_sti_frame.frame.empty()) {              memset(buffer, 0, size); @@ -178,6 +218,15 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc                      m_is_prebuffering = false;                      etiLog.level(warn) << "EDI input " << m_name <<                          " valid timestamp, pre-buffering complete"; + +                    if (not m_pending_sti_frame.version_data.version.empty()) { +                        m_stats.notifyVersion( +                                m_pending_sti_frame.version_data.version, +                                m_pending_sti_frame.version_data.uptime_s); +                    } + +                    m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left, +                            m_pending_sti_frame.audio_levels.right);                      copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);                      m_pending_sti_frame.frame.clear();                      return size; @@ -188,6 +237,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc                      if (m_frames.size() >= MAX_FRAMES_QUEUED_PREBUFFERING) {                          m_pending_sti_frame.frame.clear();                      } +                    m_stats.notifyUnderrun();                      memset(buffer, 0, size);                      return 0;                  } @@ -215,12 +265,15 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc              etiLog.level(warn) << "EDI input " << m_name <<                  " empty, re-enabling pre-buffering";              memset(buffer, 0, size); +            m_stats.notifyUnderrun(); +            m_is_prebuffering = true;              return 0;          }          else if (not sti_frame.timestamp.valid()) {              etiLog.level(warn) << "EDI input " << m_name << -                " invalid timestamp, re-enabling pre-buffering"; +                " invalid timestamp, ignoring";              memset(buffer, 0, size); +            m_stats.notifyUnderrun();              return 0;          }          else { @@ -228,6 +281,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc              const double offset = m_pending_sti_frame.timestamp.diff_ms(ts_req);              if (offset > 24e-3) { +                m_stats.notifyUnderrun();                  m_is_prebuffering = true;                  etiLog.level(warn) << "EDI input " << m_name <<                      " timestamp out of bounds, re-enabling pre-buffering"; @@ -235,6 +289,14 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc                  return 0;              }              else { +                if (not sti_frame.version_data.version.empty()) { +                    m_stats.notifyVersion( +                            sti_frame.version_data.version, +                            sti_frame.version_data.uptime_s); +                } + +                m_stats.notifyPeakLevels(sti_frame.audio_levels.left, +                        sti_frame.audio_levels.right);                  copy(sti_frame.frame.cbegin(), sti_frame.frame.cend(), buffer);                  return size;              } diff --git a/src/input/Edi.h b/src/input/Edi.h index 8f270d0..56bbf65 100644 --- a/src/input/Edi.h +++ b/src/input/Edi.h @@ -36,6 +36,7 @@  #include "edi/STIDecoder.hpp"  #include "edi/STIWriter.hpp"  #include "ThreadsafeQueue.h" +#include "ManagementServer.h"  namespace Inputs { @@ -47,7 +48,7 @@ namespace Inputs {   */  class Edi : public InputBase {      public: -        Edi(); +        Edi(const std::string& name);          Edi(const Edi&) = delete;          Edi& operator=(const Edi&) = delete;          ~Edi(); @@ -81,6 +82,7 @@ class Edi : public InputBase {          bool m_is_prebuffering = true;          std::string m_name; +        InputStat m_stats;  };  }; | 
