diff options
| -rw-r--r-- | lib/edi/STIWriter.cpp | 1 | ||||
| -rw-r--r-- | lib/edi/common.cpp | 25 | ||||
| -rw-r--r-- | lib/edi/common.hpp | 4 | ||||
| -rw-r--r-- | src/input/Edi.cpp | 48 | ||||
| -rw-r--r-- | src/input/Edi.h | 3 | ||||
| -rw-r--r-- | src/utils.cpp | 2 | 
6 files changed, 72 insertions, 11 deletions
| diff --git a/lib/edi/STIWriter.cpp b/lib/edi/STIWriter.cpp index 6964eb1..399922a 100644 --- a/lib/edi/STIWriter.cpp +++ b/lib/edi/STIWriter.cpp @@ -120,6 +120,7 @@ void STIWriter::assemble()      m_stiFrame.frame = m_payload.istd;      m_stiFrame.timestamp.seconds = m_seconds;      m_stiFrame.timestamp.utco = m_utco; +    m_stiFrame.timestamp.tsta = m_management_data.tsta;  }  sti_frame_t STIWriter::getFrame() diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp index bc0fa1b..b4b0c79 100644 --- a/lib/edi/common.cpp +++ b/lib/edi/common.cpp @@ -25,18 +25,31 @@  #include <iomanip>  #include <sstream>  #include <cassert> +#include <cmath>  #include <cstdio>  namespace EdiDecoder {  using namespace std; +bool frame_timestamp_t::valid() const +{ +    return tsta != 0xFFFFFF; +} +  string frame_timestamp_t::to_string() const  {      const time_t seconds_in_unix_epoch = to_unix_epoch();      stringstream ss; -    ss << "Timestamp: " << std::put_time(std::gmtime(&seconds_in_unix_epoch), "%c %Z"); +    if (valid()) { +        ss << "Timestamp: "; +    } +    else { +        ss << "Timestamp not valid: "; +    } +    ss << std::put_time(std::gmtime(&seconds_in_unix_epoch), "%c %Z") << +        " + " << ((double)tsta / 16384000.0);      return ss.str();  } @@ -48,6 +61,16 @@ time_t frame_timestamp_t::to_unix_epoch() const      return 946684800 + seconds - utco;  } +std::chrono::system_clock::time_point frame_timestamp_t::to_system_clock() const +{ +    auto ts = chrono::system_clock::from_time_t(to_unix_epoch()); + +    // PPS offset in seconds = tsta / 16384000 +    ts += chrono::nanoseconds(std::lrint(tsta / 0.016384)); + +    return ts; +} +  TagDispatcher::TagDispatcher(          std::function<void()>&& af_packet_completed, bool verbose) : diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp index 1433004..887bc3d 100644 --- a/lib/edi/common.hpp +++ b/lib/edi/common.hpp @@ -23,6 +23,7 @@  #include "PFT.hpp"  #include <functional>  #include <map> +#include <chrono>  #include <string>  #include <vector>  #include <cstddef> @@ -33,9 +34,12 @@ namespace EdiDecoder {  struct frame_timestamp_t {      uint32_t seconds = 0;      uint32_t utco = 0; +    uint32_t tsta = 0; // According to EN 300 797 Annex B +    bool valid() const;      std::string to_string() const;      time_t to_unix_epoch() const; +    std::chrono::system_clock::time_point to_system_clock() const;  };  struct decode_state_t { diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp index 8aee296..765a355 100644 --- a/src/input/Edi.cpp +++ b/src/input/Edi.cpp @@ -27,6 +27,7 @@  #include "input/Edi.h"  #include <regex> +#include <chrono>  #include <stdexcept>  #include <sstream>  #include <cstring> @@ -41,7 +42,7 @@ namespace Inputs {  constexpr bool VERBOSE = false;  constexpr size_t TCP_BLOCKSIZE = 2048; -constexpr size_t MAX_FRAMES_QUEUED = 10; +constexpr size_t MAX_FRAMES_QUEUED = 1000;  Edi::Edi() :      m_tcp_receive_server(TCP_BLOCKSIZE), @@ -96,23 +97,53 @@ int Edi::open(const std::string& name)  int Edi::readFrame(uint8_t* buffer, size_t size)  { -    vector<uint8_t> frame; +    if (m_pending_sti_frame.frame.empty()) { +        m_frames.try_pop(m_pending_sti_frame); +    } + +    if (not m_pending_sti_frame.frame.empty()) { +        if (m_pending_sti_frame.frame.size() != size) { +            etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << +                m_pending_sti_frame.frame.size() << " received, " << size << " requested"; +            memset(buffer, 0, size * sizeof(*buffer)); +        } +        else { +            const auto now = chrono::system_clock::now(); + +            if (m_pending_sti_frame.timestamp.to_system_clock() <= now) { +                etiLog.level(debug) << "EDI input take frame with TS " << +                    m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size(); + +                std::copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer); +                m_pending_sti_frame.frame.clear(); +            } +            else { +                etiLog.level(debug) << "EDI input skip frame with TS " << +                    m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size(); +            } +        } +    } + +    return size; + +#if 0 +    EdiDecoder::sti_frame_t sti;      if (m_is_prebuffering) {          m_is_prebuffering = m_frames.size() < 10;          if (not m_is_prebuffering) {              etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete.";          }      } -    else if (m_frames.try_pop(frame)) { -        if (frame.size() == 0) { +    else if (m_frames.try_pop(sti)) { +        if (sti.frame.size() == 0) {              etiLog.level(debug) << "EDI input " << m_name << " empty frame";              return 0;          } -        else if (frame.size() == size) { -            std::copy(frame.cbegin(), frame.cend(), buffer); +        else if (sti.frame.size() == size) { +            std::copy(sti.frame.cbegin(), sti.frame.cend(), buffer);          }          else { -            etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << frame.size() << +            etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << sti.frame.size() <<                  " received, " << size << " requested";              memset(buffer, 0, size * sizeof(*buffer));          } @@ -123,6 +154,7 @@ int Edi::readFrame(uint8_t* buffer, size_t size)          etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering";      }      return size; +#endif  }  void Edi::m_run() @@ -159,7 +191,7 @@ void Edi::m_run()          const auto sti = m_sti_writer.getFrame();          if (not sti.frame.empty()) { -            m_frames.push_wait_if_full(move(sti.frame), MAX_FRAMES_QUEUED); +            m_frames.push_wait_if_full(move(sti), MAX_FRAMES_QUEUED);              work_done = true;          } diff --git a/src/input/Edi.h b/src/input/Edi.h index 7b3dc04..66ff682 100644 --- a/src/input/Edi.h +++ b/src/input/Edi.h @@ -71,7 +71,8 @@ class Edi : public InputBase {          EdiDecoder::STIDecoder m_sti_decoder;          std::thread m_thread;          std::atomic<bool> m_running = ATOMIC_VAR_INIT(false); -        ThreadsafeQueue<std::vector<uint8_t> > m_frames; +        ThreadsafeQueue<EdiDecoder::sti_frame_t> m_frames; +        EdiDecoder::sti_frame_t m_pending_sti_frame;          bool m_is_prebuffering = true; diff --git a/src/utils.cpp b/src/utils.cpp index 721c145..3e3e86e 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -328,7 +328,7 @@ void printSubchannels(const vec_sp_subchannel& subchannels)          etiLog.level(info) << "   URI:     " << subchannel->inputUri;          switch (subchannel->type) {              case subchannel_type_t::DABAudio: -                etiLog.log(info, " type:       DAbAudio"); +                etiLog.log(info, " type:       DABAudio");                  break;              case subchannel_type_t::DABPlusAudio:                  etiLog.log(info, " type:       DABPlusAudio"); | 
