diff options
Diffstat (limited to 'lib/edi')
-rw-r--r-- | lib/edi/PFT.cpp | 63 | ||||
-rw-r--r-- | lib/edi/PFT.hpp | 19 | ||||
-rw-r--r-- | lib/edi/STIDecoder.cpp | 11 | ||||
-rw-r--r-- | lib/edi/STIDecoder.hpp | 2 | ||||
-rw-r--r-- | lib/edi/STIWriter.cpp | 3 | ||||
-rw-r--r-- | lib/edi/STIWriter.hpp | 19 | ||||
-rw-r--r-- | lib/edi/common.cpp | 26 | ||||
-rw-r--r-- | lib/edi/common.hpp | 14 |
8 files changed, 102 insertions, 55 deletions
diff --git a/lib/edi/PFT.cpp b/lib/edi/PFT.cpp index 85d6b63..25f2d1f 100644 --- a/lib/edi/PFT.cpp +++ b/lib/edi/PFT.cpp @@ -1,6 +1,6 @@ /* ------------------------------------------------------------------ * Copyright (C) 2017 AVT GmbH - Fabien Vercasson - * Copyright (C) 2017 Matthias P. Braendli + * Copyright (C) 2021 Matthias P. Braendli * matthias.braendli@mpb.li * * http://opendigitalradio.org @@ -126,7 +126,7 @@ size_t Fragment::loadData(const std::vector<uint8_t> &buf, int received_on_port) // Parse PFT Fragment Header (ETSI TS 102 821 V1.4.1 ch7.1) if (not (buf[0] == 'P' and buf[1] == 'F') ) { - throw invalid_argument("Invalid PFT SYNC bytes"); + throw runtime_error("Invalid PFT SYNC bytes"); } index += 2; // Psync @@ -208,15 +208,30 @@ AFBuilder::AFBuilder(pseq_t Pseq, findex_t Fcount, size_t lifetime) void AFBuilder::pushPFTFrag(const Fragment &frag) { - if (_Pseq != frag.Pseq() or _Fcount != frag.Fcount()) { - throw invalid_argument("Invalid PFT fragment Pseq or Fcount"); + if (_Pseq != frag.Pseq()) { + throw logic_error("Invalid PFT fragment Pseq"); } - const auto Findex = frag.Findex(); - const bool fragment_already_received = _fragments.count(Findex); - if (not fragment_already_received) - { - _fragments[Findex] = frag; + if (_Fcount != frag.Fcount()) { + etiLog.level(warn) << "Discarding fragment with invalid fcount"; + } + else { + const auto Findex = frag.Findex(); + const bool fragment_already_received = _fragments.count(Findex); + + if (not fragment_already_received) { + bool consistent = true; + if (_fragments.size() > 0) { + consistent = frag.checkConsistency(_fragments.cbegin()->second); + } + + if (consistent) { + _fragments[Findex] = frag; + } + else { + etiLog.level(warn) << "Discard fragment"; + } + } } } @@ -246,7 +261,7 @@ bool Fragment::checkConsistency(const Fragment& other) const } -AFBuilder::decode_attempt_result_t AFBuilder::canAttemptToDecode() const +AFBuilder::decode_attempt_result_t AFBuilder::canAttemptToDecode() { if (_fragments.empty()) { return AFBuilder::decode_attempt_result_t::no; @@ -263,7 +278,8 @@ AFBuilder::decode_attempt_result_t AFBuilder::canAttemptToDecode() const const Fragment& frag = pair.second; return first.checkConsistency(frag) and _Pseq == frag.Pseq(); }) ) { - throw invalid_argument("Inconsistent PFT fragments"); + _fragments.clear(); + throw runtime_error("Inconsistent PFT fragments"); } // Calculate the minimum number of fragments necessary to apply FEC. @@ -301,7 +317,7 @@ AFBuilder::decode_attempt_result_t AFBuilder::canAttemptToDecode() const return AFBuilder::decode_attempt_result_t::no; } -std::vector<uint8_t> AFBuilder::extractAF() const +std::vector<uint8_t> AFBuilder::extractAF() { if (not _af_packet.empty()) { return _af_packet; @@ -310,13 +326,12 @@ std::vector<uint8_t> AFBuilder::extractAF() const bool ok = false; if (canAttemptToDecode() != AFBuilder::decode_attempt_result_t::no) { - auto frag_it = _fragments.begin(); if (frag_it->second.Fcount() == _Fcount - 1) { frag_it++; if (frag_it == _fragments.end()) { - throw std::runtime_error("Invalid attempt at extracting AF"); + throw runtime_error("Invalid attempt at extracting AF"); } } @@ -343,12 +358,14 @@ std::vector<uint8_t> AFBuilder::extractAF() const const auto& fragment = _fragments.at(j).payload(); if (j != _Fcount - 1 and fragment.size() != Plen) { + _fragments.clear(); throw runtime_error("Incorrect fragment length " + to_string(fragment.size()) + " " + to_string(Plen)); } if (j == _Fcount - 1 and fragment.size() > Plen) { + _fragments.clear(); throw runtime_error("Incorrect last fragment length " + to_string(fragment.size()) + " " + to_string(Plen)); @@ -453,7 +470,7 @@ std::vector<uint8_t> AFBuilder::extractAF() const return _af_packet; } -std::string AFBuilder::visualise() const +std::string AFBuilder::visualise() { stringstream ss; ss << "|"; @@ -525,7 +542,7 @@ void PFT::pushPFTFrag(const Fragment &fragment) if (m_verbose) { etiLog.log(debug, "Got frag %u:%u, afbuilders: ", fragment.Pseq(), fragment.Findex()); - for (const auto &k : m_afbuilders) { + for (auto &k : m_afbuilders) { const bool isNextPseq = (m_next_pseq == k.first); etiLog.level(debug) << (isNextPseq ? "->" : " ") << k.first << " " << k.second.visualise(); @@ -534,15 +551,17 @@ void PFT::pushPFTFrag(const Fragment &fragment) } -std::vector<uint8_t> PFT::getNextAFPacket() +afpacket_pft_t PFT::getNextAFPacket() { + afpacket_pft_t af; + if (m_afbuilders.count(m_next_pseq) == 0) { if (m_afbuilders.size() > m_max_delay) { m_afbuilders.clear(); etiLog.level(debug) << " Reinit"; } - return {}; + return af; } auto &builder = m_afbuilders.at(m_next_pseq); @@ -555,8 +574,9 @@ std::vector<uint8_t> PFT::getNextAFPacket() if (m_verbose) { etiLog.level(debug) << "Fragment origin stats: " << builder.visualise_fragment_origins(); } + af.pseq = m_next_pseq; + af.af_packet = afpacket; incrementNextPseq(); - return afpacket; } else if (builder.canAttemptToDecode() == dar_t::maybe) { if (builder.lifeTime > 0) { @@ -573,8 +593,9 @@ std::vector<uint8_t> PFT::getNextAFPacket() if (m_verbose) { etiLog.level(debug) << "Fragment origin stats: " << builder.visualise_fragment_origins(); } + af.pseq = m_next_pseq; + af.af_packet = afpacket; incrementNextPseq(); - return afpacket; } } else { @@ -588,7 +609,7 @@ std::vector<uint8_t> PFT::getNextAFPacket() } } - return {}; + return af; } void PFT::setMaxDelay(size_t num_af_packets) diff --git a/lib/edi/PFT.hpp b/lib/edi/PFT.hpp index 08dca45..aa5b9d3 100644 --- a/lib/edi/PFT.hpp +++ b/lib/edi/PFT.hpp @@ -1,6 +1,6 @@ /* ------------------------------------------------------------------ * Copyright (C) 2017 AVT GmbH - Fabien Vercasson - * Copyright (C) 2017 Matthias P. Braendli + * Copyright (C) 2021 Matthias P. Braendli * matthias.braendli@mpb.li * * http://opendigitalradio.org @@ -101,20 +101,20 @@ class AFBuilder void pushPFTFrag(const Fragment &frag); /* Assess if it may be possible to decode this AF packet */ - decode_attempt_result_t canAttemptToDecode() const; + decode_attempt_result_t canAttemptToDecode(); /* Try to build the AF with received fragments. * Apply error correction if necessary (missing packets/CRC errors) * \return an empty vector if building the AF is not possible */ - std::vector<uint8_t> extractAF(void) const; + std::vector<uint8_t> extractAF(); std::pair<findex_t, findex_t> numberOfFragments(void) const { return {_fragments.size(), _Fcount}; } - std::string visualise() const; + std::string visualise(); std::string visualise_fragment_origins() const; @@ -135,6 +135,13 @@ class AFBuilder findex_t _Fcount; }; +struct afpacket_pft_t +{ + // validity of the struct is given by af_packet begin empty or not. + std::vector<uint8_t> af_packet; + pseq_t pseq = 0; +}; + class PFT { public: @@ -145,7 +152,7 @@ class PFT * * \return an empty vector if building the AF is not possible */ - std::vector<uint8_t> getNextAFPacket(void); + afpacket_pft_t getNextAFPacket(); /* Set the maximum delay in number of AF Packets before we * abandon decoding a given pseq. @@ -156,7 +163,7 @@ class PFT void setVerbose(bool enable); private: - void incrementNextPseq(void); + void incrementNextPseq(); pseq_t m_next_pseq; size_t m_max_delay = 10; // in AF packets diff --git a/lib/edi/STIDecoder.cpp b/lib/edi/STIDecoder.cpp index 99f7c11..d387f1e 100644 --- a/lib/edi/STIDecoder.cpp +++ b/lib/edi/STIDecoder.cpp @@ -72,7 +72,7 @@ void STIDecoder::setMaxDelay(int num_af_packets) #define AFPACKET_HEADER_LEN 10 // includes SYNC -bool STIDecoder::decode_starptr(const std::vector<uint8_t>& value, const tag_name_t& n) +bool STIDecoder::decode_starptr(const std::vector<uint8_t>& value, const tag_name_t& /*n*/) { if (value.size() != 0x40 / 8) { etiLog.log(warn, "Incorrect length %02lx for *PTR", value.size()); @@ -92,7 +92,7 @@ bool STIDecoder::decode_starptr(const std::vector<uint8_t>& value, const tag_nam return true; } -bool STIDecoder::decode_dsti(const std::vector<uint8_t>& value, const tag_name_t& n) +bool STIDecoder::decode_dsti(const std::vector<uint8_t>& value, const tag_name_t& /*n*/) { size_t offset = 0; @@ -200,7 +200,7 @@ bool STIDecoder::decode_stardmy(const std::vector<uint8_t>&, const tag_name_t&) return true; } -bool STIDecoder::decode_odraudiolevel(const std::vector<uint8_t>& value, const tag_name_t& n) +bool STIDecoder::decode_odraudiolevel(const std::vector<uint8_t>& value, const tag_name_t& /*n*/) { constexpr size_t expected_length = 2 * sizeof(int16_t); @@ -223,7 +223,7 @@ bool STIDecoder::decode_odraudiolevel(const std::vector<uint8_t>& value, const t return true; } -bool STIDecoder::decode_odrversion(const std::vector<uint8_t>& value, const tag_name_t& n) +bool STIDecoder::decode_odrversion(const std::vector<uint8_t>& value, const tag_name_t& /*n*/) { const auto vd = parse_odr_version_data(value); m_data_collector.update_odr_version(vd); @@ -233,7 +233,8 @@ bool STIDecoder::decode_odrversion(const std::vector<uint8_t>& value, const tag_ void STIDecoder::packet_completed() { - m_data_collector.assemble(); + auto seq = m_dispatcher.get_seq_info(); + m_data_collector.assemble(seq); } } diff --git a/lib/edi/STIDecoder.hpp b/lib/edi/STIDecoder.hpp index 28887f2..f85f789 100644 --- a/lib/edi/STIDecoder.hpp +++ b/lib/edi/STIDecoder.hpp @@ -87,7 +87,7 @@ class STIDataCollector { virtual void update_audio_levels(const audio_level_data& data) = 0; virtual void update_odr_version(const odr_version_data& data) = 0; - virtual void assemble() = 0; + virtual void assemble(seq_info_t sequences) = 0; }; /* The STIDecoder takes care of decoding the EDI TAGs related to the transport diff --git a/lib/edi/STIWriter.cpp b/lib/edi/STIWriter.cpp index 29f0124..1171065 100644 --- a/lib/edi/STIWriter.cpp +++ b/lib/edi/STIWriter.cpp @@ -106,7 +106,7 @@ void STIWriter::update_edi_time( } -void STIWriter::assemble() +void STIWriter::assemble(seq_info_t seq) { if (not m_proto_valid) { throw std::runtime_error("Cannot assemble STI before protocol"); @@ -130,6 +130,7 @@ void STIWriter::assemble() stiFrame.timestamp.tsta = m_management_data.tsta; stiFrame.audio_levels = m_audio_levels; stiFrame.version_data = m_version_data; + stiFrame.sequence_counters = seq; m_frame_callback(move(stiFrame)); diff --git a/lib/edi/STIWriter.hpp b/lib/edi/STIWriter.hpp index a7a5cda..2454a74 100644 --- a/lib/edi/STIWriter.hpp +++ b/lib/edi/STIWriter.hpp @@ -36,6 +36,7 @@ struct sti_frame_t { frame_timestamp_t timestamp; audio_level_data audio_levels; odr_version_data version_data; + seq_info_t sequence_counters; }; class STIWriter : public STIDataCollector { @@ -48,22 +49,22 @@ class STIWriter : public STIDataCollector { virtual void update_protocol( const std::string& proto, uint16_t major, - uint16_t minor); + uint16_t minor) override; - virtual void update_stat(uint8_t stat, uint16_t spid); + virtual void update_stat(uint8_t stat, uint16_t spid) override; virtual void update_edi_time( uint32_t utco, - uint32_t seconds); + uint32_t seconds) override; - virtual void update_rfad(std::array<uint8_t, 9> rfad); - virtual void update_sti_management(const sti_management_data& data); - virtual void add_payload(sti_payload_data&& payload); + virtual void update_rfad(std::array<uint8_t, 9> rfad) override; + virtual void update_sti_management(const sti_management_data& data) override; + virtual void add_payload(sti_payload_data&& payload) override; - virtual void update_audio_levels(const audio_level_data& data); - virtual void update_odr_version(const odr_version_data& data); + virtual void update_audio_levels(const audio_level_data& data) override; + virtual void update_odr_version(const odr_version_data& data) override; - virtual void assemble(void); + virtual void assemble(seq_info_t seq) override; private: std::function<void(sti_frame_t&&)> m_frame_callback; diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp index 7907656..2f20391 100644 --- a/lib/edi/common.cpp +++ b/lib/edi/common.cpp @@ -132,7 +132,7 @@ std::string tag_name_to_human_readable(const tag_name_t& name) TagDispatcher::TagDispatcher( std::function<void()>&& af_packet_completed) : m_af_packet_completed(move(af_packet_completed)), - m_tagpacket_handler([](const std::vector<uint8_t>& ignore){}) + m_tagpacket_handler([](const std::vector<uint8_t>& /*ignore*/){}) { } @@ -145,7 +145,7 @@ void TagDispatcher::push_bytes(const vector<uint8_t> &buf) { if (buf.empty()) { m_input_data.clear(); - m_last_seq_valid = false; + m_last_sequences.seq_valid = false; return; } @@ -168,6 +168,7 @@ void TagDispatcher::push_bytes(const vector<uint8_t> &buf) m_input_data = remaining_data; } + m_last_sequences.pseq_valid = false; if (st.complete) { m_af_packet_completed(); } @@ -192,8 +193,10 @@ void TagDispatcher::push_bytes(const vector<uint8_t> &buf) } auto af = m_pft.getNextAFPacket(); - if (not af.empty()) { - decode_state_t st = decode_afpacket(af); + if (not af.af_packet.empty()) { + const decode_state_t st = decode_afpacket(af.af_packet); + m_last_sequences.pseq = af.pseq; + m_last_sequences.pseq_valid = true; if (st.complete) { m_af_packet_completed(); @@ -217,6 +220,7 @@ void TagDispatcher::push_packet(const Packet &packet) if (buf[0] == 'A' and buf[1] == 'F') { const decode_state_t st = decode_afpacket(buf); + m_last_sequences.pseq_valid = false; if (st.complete) { m_af_packet_completed(); @@ -232,8 +236,10 @@ void TagDispatcher::push_packet(const Packet &packet) } auto af = m_pft.getNextAFPacket(); - if (not af.empty()) { - const decode_state_t st = decode_afpacket(af); + if (not af.af_packet.empty()) { + const decode_state_t st = decode_afpacket(af.af_packet); + m_last_sequences.pseq = af.pseq; + m_last_sequences.pseq_valid = true; if (st.complete) { m_af_packet_completed(); @@ -272,8 +278,8 @@ decode_state_t TagDispatcher::decode_afpacket( } // SEQ wraps at 0xFFFF, unsigned integer overflow is intentional - if (m_last_seq_valid) { - const uint16_t expected_seq = m_last_seq + 1; + if (m_last_sequences.seq_valid) { + const uint16_t expected_seq = m_last_sequences.seq + 1; if (expected_seq != seq) { etiLog.level(warn) << "EDI AF Packet sequence error, " << seq; m_ignored_tags.clear(); @@ -281,9 +287,9 @@ decode_state_t TagDispatcher::decode_afpacket( } else { etiLog.level(info) << "EDI AF Packet initial sequence number: " << seq; - m_last_seq_valid = true; + m_last_sequences.seq_valid = true; } - m_last_seq = seq; + m_last_sequences.seq = seq; bool has_crc = (input_data[8] & 0x80) ? true : false; uint8_t major_revision = (input_data[8] & 0x70) >> 4; diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp index 14b91ba..e8c57c1 100644 --- a/lib/edi/common.hpp +++ b/lib/edi/common.hpp @@ -68,6 +68,13 @@ struct Packet { Packet() {} }; +struct seq_info_t { + bool seq_valid = false; + uint16_t seq = 0; + bool pseq_valid = false; + uint16_t pseq = 0; +}; + /* The TagDispatcher takes care of decoding EDI, with or without PFT, and * will call functions when TAGs are encountered. * @@ -110,13 +117,16 @@ class TagDispatcher { using tagpacket_handler = std::function<void(const std::vector<uint8_t>&)>; void register_tagpacket_handler(tagpacket_handler&& h); + seq_info_t get_seq_info() const { + return m_last_sequences; + } + private: decode_state_t decode_afpacket(const std::vector<uint8_t> &input_data); bool decode_tagpacket(const std::vector<uint8_t> &payload); PFT::PFT m_pft; - bool m_last_seq_valid = false; - uint16_t m_last_seq = 0; + seq_info_t m_last_sequences; std::vector<uint8_t> m_input_data; std::map<std::string, tag_handler> m_handlers; std::function<void()> m_af_packet_completed; |