diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ConfigParser.cpp | 8 | ||||
| -rw-r--r-- | src/DabMultiplexer.cpp | 12 | ||||
| -rw-r--r-- | src/MuxElements.cpp | 11 | ||||
| -rw-r--r-- | src/MuxElements.h | 12 | ||||
| -rw-r--r-- | src/input/Edi.cpp | 91 | ||||
| -rw-r--r-- | src/input/Edi.h | 25 | ||||
| -rw-r--r-- | src/input/inputs.h | 15 | 
7 files changed, 130 insertions, 44 deletions
| diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 2fb375b..d45cfef 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -956,7 +956,9 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,              }          }          else if (proto == "edi") { -            subchan->input = make_shared<Inputs::Edi>(subchanuid); +            auto inedi = make_shared<Inputs::Edi>(subchanuid); +            rcs.enrol(inedi.get()); +            subchan->input = inedi;          }          else if (proto == "stp") {              subchan->input = make_shared<Inputs::Sti_d_Rtp>(); @@ -1023,10 +1025,10 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,      const string bufferManagement = pt.get("buffer-management", "prebuffering");      if (bufferManagement == "prebuffering") { -        subchan->bufferManagement = BufferManagement::Prebuffering; +        subchan->input->setBufferManagement(Inputs::BufferManagement::Prebuffering);      }      else if (bufferManagement == "timestamped") { -        subchan->bufferManagement = BufferManagement::Timestamped; +        subchan->input->setBufferManagement(Inputs::BufferManagement::Timestamped);      }      else {          throw runtime_error("Subchannel with uid " + subchanuid + " has invalid buffer-management !"); diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp index 549850a..9bd3c3f 100644 --- a/src/DabMultiplexer.cpp +++ b/src/DabMultiplexer.cpp @@ -592,17 +592,9 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs          edi::TagESTn& tag = edi_subchannelToTag[subchannel.get()];          int sizeSubchannel = subchannel->getSizeByte(); -        int result = -1; -        switch (subchannel->bufferManagement) { -            case BufferManagement::Prebuffering: -                result = subchannel->input->readFrame(&etiFrame[index], sizeSubchannel); -                break; -            case BufferManagement::Timestamped: -                // no need to check enableTist because we always increment the timestamp -                result = subchannel->input->readFrame(&etiFrame[index], +        // no need to check enableTist because we always increment the timestamp +        int result = subchannel->readFrame(&etiFrame[index],                          sizeSubchannel, edi_time + m_tist_offset, tai_utc_offset, timestamp); -                break; -        }          if (result < 0) {              etiLog.log(info, diff --git a/src/MuxElements.cpp b/src/MuxElements.cpp index ad1fcd4..81466a8 100644 --- a/src/MuxElements.cpp +++ b/src/MuxElements.cpp @@ -784,6 +784,17 @@ unsigned short DabSubchannel::getSizeDWord() const      return (bitrate * 3) >> 3;  } +size_t DabSubchannel::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta) +{ +    switch (input->getBufferManagement()) { +        case Inputs::BufferManagement::Prebuffering: +            return input->readFrame(buffer, size); +        case Inputs::BufferManagement::Timestamped: +            return input->readFrame(buffer, size, seconds, utco, tsta); +    } +    throw logic_error("Unhandled case"); +} +  LinkageSet::LinkageSet(const std::string& name,          uint16_t lsn,          bool active, diff --git a/src/MuxElements.h b/src/MuxElements.h index f691093..0f7e621 100644 --- a/src/MuxElements.h +++ b/src/MuxElements.h @@ -338,14 +338,6 @@ struct dabProtection {      };  }; -enum class BufferManagement { -    // Use a buffer in the input that doesn't consider timestamps -    Prebuffering, - -    // Buffer incoming data until a given timestamp is reached -    Timestamped, -}; -  class DabSubchannel  {  public: @@ -364,10 +356,12 @@ public:      // Calculate subchannel size in number of uint64_t      unsigned short getSizeDWord(void) const; +    // Read from the input, using the correct buffer management +    size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta); +      std::string uid;      std::string inputUri; -    BufferManagement bufferManagement = BufferManagement::Prebuffering;      std::shared_ptr<Inputs::InputBase> input;      unsigned char id = 0;      subchannel_type_t type = subchannel_type_t::DABAudio; diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp index f6d4c07..98e8e9c 100644 --- a/src/input/Edi.cpp +++ b/src/input/Edi.cpp @@ -44,27 +44,23 @@ namespace Inputs {  constexpr bool VERBOSE = false;  constexpr size_t TCP_BLOCKSIZE = 2048; -/* Absolute max number of frames to be queued, both with and without timestamping */ -constexpr size_t MAX_FRAMES_QUEUED = 1000; - -/* When using timestamping, start discarding the front of the queue once the queue - * is this full. Must be smaller than MAX_FRAMES_QUEUED.  */ -constexpr size_t MAX_FRAMES_QUEUED_PREBUFFERING = 500; - -/* When not using timestamping, how many frames to prebuffer. - * TODO should be configurable as ZMQ.  */ -constexpr size_t NUM_FRAMES_PREBUFFERING = 10; -/* Consider the buffer to be full on the receive side if it's above the overrun threshold. */ -constexpr size_t MAX_FRAMES_OVERRUN = 900; - -  Edi::Edi(const std::string& name) : +    RemoteControllable(name),      m_tcp_receive_server(TCP_BLOCKSIZE),      m_sti_writer(),      m_sti_decoder(m_sti_writer, VERBOSE),      m_name(name),      m_stats(name) -{ } +{ +    RC_ADD_PARAMETER(buffermanagement, +            "Set type of buffer management to use [prebuffering, timestamped]"); + +    RC_ADD_PARAMETER(buffer, +            "Size of the input buffer [24ms frames]"); + +    RC_ADD_PARAMETER(prebuffering, +            "Min buffer level before streaming starts [24ms frames]"); +}  Edi::~Edi() {      m_running = false; @@ -116,7 +112,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size)      EdiDecoder::sti_frame_t sti;      if (m_is_prebuffering) { -        m_is_prebuffering = m_frames.size() < NUM_FRAMES_PREBUFFERING; +        m_is_prebuffering = m_frames.size() < m_num_frames_prebuffering;          if (not m_is_prebuffering) {              etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete.";          } @@ -153,13 +149,13 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size)          }          else if (sti.frame.size() == size) {              // Steady-state when everything works well -            if (m_frames.size() > MAX_FRAMES_OVERRUN) { +            if (m_frames.size() > m_max_frames_overrun) {                  m_stats.notifyOverrun();                  /* If the buffer is too full, we drop as many frames as needed                   * to get down to the prebuffering size. We would like to have our buffer                   * filled to the prebuffering length. */ -                size_t over_max = m_frames.size() - NUM_FRAMES_PREBUFFERING; +                size_t over_max = m_frames.size() - m_num_frames_prebuffering;                  while (over_max--) {                      EdiDecoder::sti_frame_t discard; @@ -234,7 +230,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc                  else {                      // Wait more, but erase the front of the frame queue to avoid                      // stalling on one frame with incorrect timestamp -                    if (m_frames.size() >= MAX_FRAMES_QUEUED_PREBUFFERING) { +                    if (m_frames.size() >= m_max_frames_overrun) {                          m_pending_sti_frame.frame.clear();                      }                      m_stats.notifyUnderrun(); @@ -338,7 +334,10 @@ void Edi::m_run()          const auto sti = m_sti_writer.getFrame();          if (not sti.frame.empty()) { -            m_frames.push_wait_if_full(move(sti), MAX_FRAMES_QUEUED); +            // We should not wait here, because we want the complete input buffering +            // happening inside m_frames. Using the blocking function is only a protection +            // against runaway memory usage if something goes wrong in the consumer. +            m_frames.push_wait_if_full(move(sti), m_max_frames_overrun * 2);              work_done = true;          } @@ -363,4 +362,56 @@ void Edi::close()      m_udp_sock.close();  } + +void Edi::set_parameter(const std::string& parameter, const std::string& value) +{ +    if (parameter == "buffer") { +        size_t new_limit = atol(value.c_str()); +        m_max_frames_overrun = new_limit; +    } +    else if (parameter == "prebuffering") { +        size_t new_limit = atol(value.c_str()); +        m_num_frames_prebuffering = new_limit; +    } +    else if (parameter == "buffermanagement") { +        if (value == "prebuffering") { +            setBufferManagement(Inputs::BufferManagement::Prebuffering); +        } +        else if (value == "timestamped") { +            setBufferManagement(Inputs::BufferManagement::Timestamped); +        } +        else { +            throw ParameterError("Invalid value for '" + parameter + "' in controllable " + get_rc_name()); +        } +    } +    else { +        throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name()); +    } +} + +const std::string Edi::get_parameter(const std::string& parameter) const +{ +    stringstream ss; +    if (parameter == "buffer") { +        ss << m_max_frames_overrun; +    } +    else if (parameter == "prebuffering") { +        ss << m_num_frames_prebuffering; +    } +    else if (parameter == "buffermanagement") { +        switch (getBufferManagement()) { +            case Inputs::BufferManagement::Prebuffering: +                ss << "prebuffering"; +                break; +            case Inputs::BufferManagement::Timestamped: +                ss << "Timestamped"; +                break; +        } +    } +    else { +        throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name()); +    } +    return ss.str(); +} +  } diff --git a/src/input/Edi.h b/src/input/Edi.h index 56bbf65..0a44139 100644 --- a/src/input/Edi.h +++ b/src/input/Edi.h @@ -46,7 +46,7 @@ namespace Inputs {   *   * This way, the EDI decoding happens in a separate thread.   */ -class Edi : public InputBase { +class Edi : public InputBase, public RemoteControllable {      public:          Edi(const std::string& name);          Edi(const Edi&) = delete; @@ -59,6 +59,10 @@ class Edi : public InputBase {          virtual int setBitrate(int bitrate);          virtual void close(); +        /* Remote control */ +        virtual void set_parameter(const std::string& parameter, const std::string& value); +        virtual const std::string get_parameter(const std::string& parameter) const; +      protected:          void m_run(); @@ -75,12 +79,29 @@ class Edi : public InputBase {          std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);          ThreadsafeQueue<EdiDecoder::sti_frame_t> m_frames; +        std::mutex m_rc_params_mutex; +        // InputBase defines bufferManagement, which must also be guarded by that mutex +          // Used in timestamp-based buffer management          EdiDecoder::sti_frame_t m_pending_sti_frame; -        // Used in prebuffering-based buffer management +        // State variable used in prebuffering-based buffer management          bool m_is_prebuffering = true; +        /* When using prebuffering, consider the buffer to be full on the +         * receive side if it's above the overrun threshold. +         * +         * When using timestamping, start discarding the front of the queue once the queue +         * is this full. Must be smaller than m_max_frames_queued. +         * +         * Parameter 'buffer' inside RC. */ +        std::atomic<size_t> m_max_frames_overrun = ATOMIC_VAR_INIT(1000); + +        /* When not using timestamping, how many frames to prebuffer. +         * Parameter 'prebuffering' inside RC. */ +        std::atomic<size_t> m_num_frames_prebuffering = ATOMIC_VAR_INIT(10); + +          std::string m_name;          InputStat m_stats;  }; diff --git a/src/input/inputs.h b/src/input/inputs.h index b99a88f..5d4fc60 100644 --- a/src/input/inputs.h +++ b/src/input/inputs.h @@ -35,6 +35,15 @@  namespace Inputs { +enum class BufferManagement { +    // Use a buffer in the input that doesn't consider timestamps +    Prebuffering, + +    // Buffer incoming data until a given timestamp is reached +    Timestamped, +}; + +  /* New input object base */  class InputBase {      public: @@ -73,8 +82,14 @@ class InputBase {          virtual void close() = 0;          virtual ~InputBase() {} + +        void setBufferManagement(BufferManagement bm) { m_bufferManagement = bm; } +        BufferManagement getBufferManagement() const { return m_bufferManagement; } +      protected:          InputBase() {} + +        std::atomic<BufferManagement> m_bufferManagement = ATOMIC_VAR_INIT(BufferManagement::Prebuffering);  };  }; | 
