From b4d788a065fb3338df26c44a94d804c194af4955 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 11 May 2020 21:04:18 +0200 Subject: Update common EDI input and adapt to API changes --- lib/Socket.cpp | 7 +++++- lib/Socket.h | 4 ++-- lib/edi/ETIDecoder.cpp | 37 +++++++++++++++++++++++------- lib/edi/ETIDecoder.hpp | 28 ++++++++++++++++------- lib/edi/common.cpp | 62 +++++++++++++++++++++++++++++++++----------------- lib/edi/common.hpp | 26 +++++++++++++++++---- src/EtiReader.cpp | 4 ++-- src/EtiReader.h | 30 ++++++++++++------------ 8 files changed, 137 insertions(+), 61 deletions(-) diff --git a/lib/Socket.cpp b/lib/Socket.cpp index 159de7e..bc1b179 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -2,7 +2,7 @@ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2019 + Copyright (C) 2020 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -932,6 +932,11 @@ void TCPSendClient::sendall(const std::vector& buffer) } m_queue.push(buffer); + + if (m_queue.size() > MAX_QUEUE_SIZE) { + vector discard; + m_queue.try_pop(discard); + } } void TCPSendClient::process() diff --git a/lib/Socket.h b/lib/Socket.h index 84def40..8c6f8a9 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -2,7 +2,7 @@ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2019 + Copyright (C) 2020 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -311,7 +311,7 @@ class TCPSendClient { bool m_is_connected = false; TCPSocket m_sock; - static constexpr size_t MAX_QUEUE_SIZE = 1024; + static constexpr size_t MAX_QUEUE_SIZE = 512; ThreadsafeQueue > m_queue; std::atomic m_running; std::string m_exception_data; diff --git a/lib/edi/ETIDecoder.cpp b/lib/edi/ETIDecoder.cpp index 1fa9c3c..88a7333 100644 --- a/lib/edi/ETIDecoder.cpp +++ b/lib/edi/ETIDecoder.cpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2019 + Copyright (C) 2020 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -30,9 +30,9 @@ namespace EdiDecoder { using namespace std; -ETIDecoder::ETIDecoder(ETIDataCollector& data_collector, bool verbose) : +ETIDecoder::ETIDecoder(ETIDataCollector& data_collector) : m_data_collector(data_collector), - m_dispatcher(std::bind(&ETIDecoder::packet_completed, this), verbose) + m_dispatcher(std::bind(&ETIDecoder::packet_completed, this)) { using std::placeholders::_1; using std::placeholders::_2; @@ -44,6 +44,12 @@ ETIDecoder::ETIDecoder(ETIDataCollector& data_collector, bool verbose) : std::bind(&ETIDecoder::decode_estn, this, _1, _2)); m_dispatcher.register_tag("*dmy", std::bind(&ETIDecoder::decode_stardmy, this, _1, _2)); + m_dispatcher.register_tagpacket_handler(std::bind(&ETIDecoder::decode_tagpacket, this, _1)); +} + +void ETIDecoder::set_verbose(bool verbose) +{ + m_dispatcher.set_verbose(verbose); } void ETIDecoder::push_bytes(const vector &buf) @@ -63,7 +69,7 @@ void ETIDecoder::setMaxDelay(int num_af_packets) #define AFPACKET_HEADER_LEN 10 // includes SYNC -bool ETIDecoder::decode_starptr(const vector &value, uint16_t) +bool ETIDecoder::decode_starptr(const std::vector& value, const tag_name_t& n) { if (value.size() != 0x40 / 8) { etiLog.log(warn, "Incorrect length %02lx for *PTR", value.size()); @@ -83,7 +89,7 @@ bool ETIDecoder::decode_starptr(const vector &value, uint16_t) return true; } -bool ETIDecoder::decode_deti(const vector &value, uint16_t) +bool ETIDecoder::decode_deti(const std::vector& value, const tag_name_t& n) { /* uint16_t detiHeader = fct | (fcth << 8) | (rfudf << 13) | (ficf << 14) | (atstf << 15); @@ -145,6 +151,8 @@ bool ETIDecoder::decode_deti(const vector &value, uint16_t) i += 4; m_data_collector.update_edi_time(utco, seconds); + m_received_tagpacket.timestamp.utco = utco; + m_received_tagpacket.timestamp.seconds = seconds; fc.tsta = read_24b(value.begin() + i); i += 3; @@ -152,8 +160,12 @@ bool ETIDecoder::decode_deti(const vector &value, uint16_t) else { // Null timestamp, ETSI ETS 300 799, C.2.2 fc.tsta = 0xFFFFFF; + m_received_tagpacket.timestamp.utco = 0; + m_received_tagpacket.timestamp.seconds = 0; } + m_received_tagpacket.timestamp.tsta = fc.tsta; + if (fc.ficf) { vector fic(fic_length); @@ -183,12 +195,13 @@ bool ETIDecoder::decode_deti(const vector &value, uint16_t) return true; } -bool ETIDecoder::decode_estn(const vector &value, uint16_t n) +bool ETIDecoder::decode_estn(const std::vector& value, const tag_name_t& name) { uint32_t sstc = read_24b(value.begin()); eti_stc_data stc; + const uint8_t n = name[3]; stc.stream_index = n - 1; // n is 1-indexed stc.scid = (sstc >> 18) & 0x3F; stc.sad = (sstc >> 8) & 0x3FF; @@ -207,14 +220,22 @@ bool ETIDecoder::decode_estn(const vector &value, uint16_t n) return true; } -bool ETIDecoder::decode_stardmy(const vector& /*value*/, uint16_t) +bool ETIDecoder::decode_stardmy(const std::vector&, const tag_name_t&) +{ + return true; +} + +bool ETIDecoder::decode_tagpacket(const std::vector& value) { + m_received_tagpacket.tagpacket = value; return true; } void ETIDecoder::packet_completed() { - m_data_collector.assemble(); + ReceivedTagPacket tp; + swap(tp, m_received_tagpacket); + m_data_collector.assemble(move(tp)); } } diff --git a/lib/edi/ETIDecoder.hpp b/lib/edi/ETIDecoder.hpp index f5d0b81..ffa9037 100644 --- a/lib/edi/ETIDecoder.hpp +++ b/lib/edi/ETIDecoder.hpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2019 + Copyright (C) 2020 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -57,6 +57,12 @@ struct eti_stc_data { uint16_t stl(void) const { return mst.size() / 8; } }; +struct ReceivedTagPacket { + std::vector tagpacket; + frame_timestamp_t timestamp; +}; + + /* A class that receives multiplex data must implement the interface described * in the ETIDataCollector. This can be e.g. a converter to ETI, or something that * prepares data structures for a modulator. @@ -87,8 +93,9 @@ class ETIDataCollector { virtual void add_subchannel(eti_stc_data&& stc) = 0; - // Tell the ETIWriter that the AFPacket is complete - virtual void assemble(void) = 0; + // Tell the consumer that the AFPacket is complete, and include + // the raw received TAGs + virtual void assemble(ReceivedTagPacket&& tagpacket) = 0; }; /* The ETIDecoder takes care of decoding the EDI TAGs related to the transport @@ -99,7 +106,9 @@ class ETIDataCollector { */ class ETIDecoder { public: - ETIDecoder(ETIDataCollector& data_collector, bool verbose); + ETIDecoder(ETIDataCollector& data_collector); + + void set_verbose(bool verbose); /* Push bytes into the decoder. The buf can contain more * than a single packet. This is useful when reading from streams @@ -118,16 +127,19 @@ class ETIDecoder { void setMaxDelay(int num_af_packets); private: - bool decode_starptr(const std::vector &value, uint16_t); - bool decode_deti(const std::vector &value, uint16_t); - bool decode_estn(const std::vector &value, uint16_t n); - bool decode_stardmy(const std::vector &value, uint16_t); + bool decode_starptr(const std::vector& value, const tag_name_t& n); + bool decode_deti(const std::vector& value, const tag_name_t& n); + bool decode_estn(const std::vector& value, const tag_name_t& n); + bool decode_stardmy(const std::vector& value, const tag_name_t& n); + + bool decode_tagpacket(const std::vector& value); void packet_completed(); ETIDataCollector& m_data_collector; TagDispatcher m_dispatcher; + ReceivedTagPacket m_received_tagpacket; }; } diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp index 470b3ba..3005802 100644 --- a/lib/edi/common.cpp +++ b/lib/edi/common.cpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2019 + Copyright (C) 2020 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -26,6 +26,7 @@ #include #include #include +#include namespace EdiDecoder { @@ -66,7 +67,7 @@ time_t frame_timestamp_t::to_unix_epoch() const return 946684800 + seconds - utco; } -double frame_timestamp_t::diff_ms(const frame_timestamp_t& other) const +double frame_timestamp_t::diff_s(const frame_timestamp_t& other) const { const double lhs = (double)seconds + (tsta / 16384000.0); const double rhs = (double)other.seconds + (other.tsta / 16384000.0); @@ -111,10 +112,30 @@ std::chrono::system_clock::time_point frame_timestamp_t::to_system_clock() const return ts; } +std::string tag_name_to_human_readable(const tag_name_t& name) +{ + std::string s; + for (const uint8_t c : name) { + if (isprint(c)) { + s += (char)c; + } + else { + char escaped[5]; + snprintf(escaped, 5, "\\x%02x", c); + s += escaped; + } + } + return s; +} TagDispatcher::TagDispatcher( - std::function&& af_packet_completed, bool verbose) : - m_af_packet_completed(move(af_packet_completed)) + std::function&& af_packet_completed) : + m_af_packet_completed(move(af_packet_completed)), + m_tagpacket_handler([](const std::vector& ignore){}) +{ +} + +void TagDispatcher::set_verbose(bool verbose) { m_pft.setVerbose(verbose); } @@ -295,6 +316,11 @@ void TagDispatcher::register_tag(const std::string& tag, tag_handler&& h) m_handlers[tag] = move(h); } +void TagDispatcher::register_tagpacket_handler(tagpacket_handler&& h) +{ + m_tagpacket_handler = move(h); +} + bool TagDispatcher::decode_tagpacket(const vector &payload) { @@ -326,31 +352,23 @@ bool TagDispatcher::decode_tagpacket(const vector &payload) break; } + const array tag_name({ + (uint8_t)tag_sz[0], (uint8_t)tag_sz[1], (uint8_t)tag_sz[2], (uint8_t)tag_sz[3] + }); vector tag_value(taglength); copy( payload.begin() + i+8, payload.begin() + i+8+taglength, tag_value.begin()); - bool tagsuccess = false; + bool tagsuccess = true; bool found = false; for (auto tag_handler : m_handlers) { - if (tag_handler.first.size() == 4 and tag_handler.first == tag) { - found = true; - tagsuccess = tag_handler.second(tag_value, 0); - } - else if (tag_handler.first.size() == 3 and - tag.substr(0, 3) == tag_handler.first) { - found = true; - uint8_t n = tag_sz[3]; - tagsuccess = tag_handler.second(tag_value, n); - } - else if (tag_handler.first.size() == 2 and - tag.substr(0, 2) == tag_handler.first) { + if ( (tag_handler.first.size() == 4 and tag == tag_handler.first) or + (tag_handler.first.size() == 3 and tag.substr(0, 3) == tag_handler.first) or + (tag_handler.first.size() == 2 and tag.substr(0, 2) == tag_handler.first) or + (tag_handler.first.size() == 1 and tag.substr(0, 1) == tag_handler.first)) { found = true; - uint16_t n = 0; - n = (uint16_t)(tag_sz[2]) << 8; - n |= (uint16_t)(tag_sz[3]); - tagsuccess = tag_handler.second(tag_value, n); + tagsuccess &= tag_handler.second(tag_value, tag_name); } } @@ -366,6 +384,8 @@ bool TagDispatcher::decode_tagpacket(const vector &payload) } } + m_tagpacket_handler(payload); + return success; } diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp index 2a9c683..498b28a 100644 --- a/lib/edi/common.hpp +++ b/lib/edi/common.hpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2019 + Copyright (C) 2020 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -41,7 +41,7 @@ struct frame_timestamp_t { std::time_t to_unix_epoch() const; std::chrono::system_clock::time_point to_system_clock() const; - double diff_ms(const frame_timestamp_t& other) const; + double diff_s(const frame_timestamp_t& other) const; frame_timestamp_t& operator+=(const std::chrono::milliseconds& ms); @@ -55,6 +55,10 @@ struct decode_state_t { size_t num_bytes_consumed; }; +using tag_name_t = std::array; + +std::string tag_name_to_human_readable(const tag_name_t& name); + /* The TagDispatcher takes care of decoding EDI, with or without PFT, and * will call functions when TAGs are encountered. * @@ -63,7 +67,10 @@ struct decode_state_t { */ class TagDispatcher { public: - TagDispatcher(std::function&& af_packet_completed, bool verbose); + TagDispatcher(std::function&& af_packet_completed); + + void set_verbose(bool verbose); + /* Push bytes into the decoder. The buf can contain more * than a single packet. This is useful when reading from streams @@ -81,9 +88,19 @@ class TagDispatcher { */ void setMaxDelay(int num_af_packets); - using tag_handler = std::function, uint16_t)>; + /* Handler function for a tag. The first argument contains the tag value, + * the second argument contains the tag name */ + using tag_handler = std::function&, const tag_name_t&)>; + + /* Register a handler for a tag. If the tag string can be length 0, 1, 2, 3 or 4. + * If is shorter than 4, it will perform a longest match on the tag name. + */ void register_tag(const std::string& tag, tag_handler&& h); + /* The complete tagpacket can also be retrieved */ + using tagpacket_handler = std::function&)>; + void register_tagpacket_handler(tagpacket_handler&& h); + private: decode_state_t decode_afpacket(const std::vector &input_data); bool decode_tagpacket(const std::vector &payload); @@ -93,6 +110,7 @@ class TagDispatcher { std::vector m_input_data; std::map m_handlers; std::function m_af_packet_completed; + tagpacket_handler m_tagpacket_handler; }; // Data carried inside the ODRv EDI TAG diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp index be4ba8e..33194b2 100644 --- a/src/EtiReader.cpp +++ b/src/EtiReader.cpp @@ -475,7 +475,7 @@ void EdiReader::add_subchannel(EdiDecoder::eti_stc_data&& stc) } } -void EdiReader::assemble() +void EdiReader::assemble(EdiDecoder::ReceivedTagPacket&& tagpacket) { if (not m_proto_valid) { throw std::logic_error("Cannot assemble EDI data before protocol"); @@ -640,7 +640,7 @@ bool EdiTransport::rxPacket() EdiInput::EdiInput(double& tist_offset_s, float edi_max_delay_ms) : ediReader(tist_offset_s), - decoder(ediReader, false), + decoder(ediReader), ediTransport(decoder) { if (edi_max_delay_ms > 0.0f) { diff --git a/src/EtiReader.h b/src/EtiReader.h index cda181d..be3dd27 100644 --- a/src/EtiReader.h +++ b/src/EtiReader.h @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2019 + Copyright (C) 2020 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -138,11 +138,11 @@ class EdiReader : public EtiSource, public EdiDecoder::ETIDataCollector public: EdiReader(double& tist_offset_s); - virtual unsigned getMode(); - virtual unsigned getFp(); - virtual unsigned getFct(); - virtual bool sourceContainsTimestamp(); - virtual const std::vector > getSubchannels() const; + virtual unsigned getMode() override; + virtual unsigned getFp() override; + virtual unsigned getFct() override; + virtual bool sourceContainsTimestamp() override; + virtual const std::vector > getSubchannels() const override; virtual bool isFrameReady(void); virtual void clearFrame(void); @@ -152,29 +152,29 @@ public: virtual void update_protocol( const std::string& proto, uint16_t major, - uint16_t minor); + uint16_t minor) override; // Update the data for the frame characterisation - virtual void update_fc_data(const EdiDecoder::eti_fc_data& fc_data); + virtual void update_fc_data(const EdiDecoder::eti_fc_data& fc_data) override; - virtual void update_fic(std::vector&& fic); + virtual void update_fic(std::vector&& fic) override; - virtual void update_err(uint8_t err); + virtual void update_err(uint8_t err) override; // In addition to TSTA in ETI, EDI also transports more time // stamp information. virtual void update_edi_time( uint32_t utco, - uint32_t seconds); + uint32_t seconds) override; - virtual void update_mnsc(uint16_t mnsc); + virtual void update_mnsc(uint16_t mnsc) override; - virtual void update_rfu(uint16_t rfu); + virtual void update_rfu(uint16_t rfu) override; - virtual void add_subchannel(EdiDecoder::eti_stc_data&& stc); + virtual void add_subchannel(EdiDecoder::eti_stc_data&& stc) override; // Gets called by the EDI library to tell us that all data for a frame was given to us - virtual void assemble(void); + virtual void assemble(EdiDecoder::ReceivedTagPacket&& tagpacket) override; private: bool m_proto_valid = false; bool m_frameReady = false; -- cgit v1.2.3