From 6a9d4b12280f7d0411a44aaad9528d9cbb725b94 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Wed, 19 May 2021 14:47:55 +0200 Subject: Common 44ae39c: Make SEQ and PSEQ available on EDI receive and improve error handling --- lib/edi/ETIDecoder.cpp | 4 ++-- lib/edi/PFT.cpp | 63 +++++++++++++++++++++++++++++++++----------------- lib/edi/PFT.hpp | 19 ++++++++++----- lib/edi/common.cpp | 26 +++++++++++++-------- lib/edi/common.hpp | 14 +++++++++-- 5 files changed, 85 insertions(+), 41 deletions(-) diff --git a/lib/edi/ETIDecoder.cpp b/lib/edi/ETIDecoder.cpp index 656f50b..a377692 100644 --- a/lib/edi/ETIDecoder.cpp +++ b/lib/edi/ETIDecoder.cpp @@ -69,7 +69,7 @@ void ETIDecoder::setMaxDelay(int num_af_packets) #define AFPACKET_HEADER_LEN 10 // includes SYNC -bool ETIDecoder::decode_starptr(const std::vector& value, const tag_name_t& n) +bool ETIDecoder::decode_starptr(const std::vector& value, const tag_name_t& /*n*/) { if (value.size() != 0x40 / 8) { etiLog.log(warn, "Incorrect length %02lx for *PTR", value.size()); @@ -89,7 +89,7 @@ bool ETIDecoder::decode_starptr(const std::vector& value, const tag_nam return true; } -bool ETIDecoder::decode_deti(const std::vector& value, const tag_name_t& n) +bool ETIDecoder::decode_deti(const std::vector& value, const tag_name_t& /*n*/) { /* uint16_t detiHeader = fct | (fcth << 8) | (rfudf << 13) | (ficf << 14) | (atstf << 15); diff --git a/lib/edi/PFT.cpp b/lib/edi/PFT.cpp index 85d6b63..25f2d1f 100644 --- a/lib/edi/PFT.cpp +++ b/lib/edi/PFT.cpp @@ -1,6 +1,6 @@ /* ------------------------------------------------------------------ * Copyright (C) 2017 AVT GmbH - Fabien Vercasson - * Copyright (C) 2017 Matthias P. Braendli + * Copyright (C) 2021 Matthias P. Braendli * matthias.braendli@mpb.li * * http://opendigitalradio.org @@ -126,7 +126,7 @@ size_t Fragment::loadData(const std::vector &buf, int received_on_port) // Parse PFT Fragment Header (ETSI TS 102 821 V1.4.1 ch7.1) if (not (buf[0] == 'P' and buf[1] == 'F') ) { - throw invalid_argument("Invalid PFT SYNC bytes"); + throw runtime_error("Invalid PFT SYNC bytes"); } index += 2; // Psync @@ -208,15 +208,30 @@ AFBuilder::AFBuilder(pseq_t Pseq, findex_t Fcount, size_t lifetime) void AFBuilder::pushPFTFrag(const Fragment &frag) { - if (_Pseq != frag.Pseq() or _Fcount != frag.Fcount()) { - throw invalid_argument("Invalid PFT fragment Pseq or Fcount"); + if (_Pseq != frag.Pseq()) { + throw logic_error("Invalid PFT fragment Pseq"); } - const auto Findex = frag.Findex(); - const bool fragment_already_received = _fragments.count(Findex); - if (not fragment_already_received) - { - _fragments[Findex] = frag; + if (_Fcount != frag.Fcount()) { + etiLog.level(warn) << "Discarding fragment with invalid fcount"; + } + else { + const auto Findex = frag.Findex(); + const bool fragment_already_received = _fragments.count(Findex); + + if (not fragment_already_received) { + bool consistent = true; + if (_fragments.size() > 0) { + consistent = frag.checkConsistency(_fragments.cbegin()->second); + } + + if (consistent) { + _fragments[Findex] = frag; + } + else { + etiLog.level(warn) << "Discard fragment"; + } + } } } @@ -246,7 +261,7 @@ bool Fragment::checkConsistency(const Fragment& other) const } -AFBuilder::decode_attempt_result_t AFBuilder::canAttemptToDecode() const +AFBuilder::decode_attempt_result_t AFBuilder::canAttemptToDecode() { if (_fragments.empty()) { return AFBuilder::decode_attempt_result_t::no; @@ -263,7 +278,8 @@ AFBuilder::decode_attempt_result_t AFBuilder::canAttemptToDecode() const const Fragment& frag = pair.second; return first.checkConsistency(frag) and _Pseq == frag.Pseq(); }) ) { - throw invalid_argument("Inconsistent PFT fragments"); + _fragments.clear(); + throw runtime_error("Inconsistent PFT fragments"); } // Calculate the minimum number of fragments necessary to apply FEC. @@ -301,7 +317,7 @@ AFBuilder::decode_attempt_result_t AFBuilder::canAttemptToDecode() const return AFBuilder::decode_attempt_result_t::no; } -std::vector AFBuilder::extractAF() const +std::vector AFBuilder::extractAF() { if (not _af_packet.empty()) { return _af_packet; @@ -310,13 +326,12 @@ std::vector AFBuilder::extractAF() const bool ok = false; if (canAttemptToDecode() != AFBuilder::decode_attempt_result_t::no) { - auto frag_it = _fragments.begin(); if (frag_it->second.Fcount() == _Fcount - 1) { frag_it++; if (frag_it == _fragments.end()) { - throw std::runtime_error("Invalid attempt at extracting AF"); + throw runtime_error("Invalid attempt at extracting AF"); } } @@ -343,12 +358,14 @@ std::vector AFBuilder::extractAF() const const auto& fragment = _fragments.at(j).payload(); if (j != _Fcount - 1 and fragment.size() != Plen) { + _fragments.clear(); throw runtime_error("Incorrect fragment length " + to_string(fragment.size()) + " " + to_string(Plen)); } if (j == _Fcount - 1 and fragment.size() > Plen) { + _fragments.clear(); throw runtime_error("Incorrect last fragment length " + to_string(fragment.size()) + " " + to_string(Plen)); @@ -453,7 +470,7 @@ std::vector AFBuilder::extractAF() const return _af_packet; } -std::string AFBuilder::visualise() const +std::string AFBuilder::visualise() { stringstream ss; ss << "|"; @@ -525,7 +542,7 @@ void PFT::pushPFTFrag(const Fragment &fragment) if (m_verbose) { etiLog.log(debug, "Got frag %u:%u, afbuilders: ", fragment.Pseq(), fragment.Findex()); - for (const auto &k : m_afbuilders) { + for (auto &k : m_afbuilders) { const bool isNextPseq = (m_next_pseq == k.first); etiLog.level(debug) << (isNextPseq ? "->" : " ") << k.first << " " << k.second.visualise(); @@ -534,15 +551,17 @@ void PFT::pushPFTFrag(const Fragment &fragment) } -std::vector PFT::getNextAFPacket() +afpacket_pft_t PFT::getNextAFPacket() { + afpacket_pft_t af; + if (m_afbuilders.count(m_next_pseq) == 0) { if (m_afbuilders.size() > m_max_delay) { m_afbuilders.clear(); etiLog.level(debug) << " Reinit"; } - return {}; + return af; } auto &builder = m_afbuilders.at(m_next_pseq); @@ -555,8 +574,9 @@ std::vector PFT::getNextAFPacket() if (m_verbose) { etiLog.level(debug) << "Fragment origin stats: " << builder.visualise_fragment_origins(); } + af.pseq = m_next_pseq; + af.af_packet = afpacket; incrementNextPseq(); - return afpacket; } else if (builder.canAttemptToDecode() == dar_t::maybe) { if (builder.lifeTime > 0) { @@ -573,8 +593,9 @@ std::vector PFT::getNextAFPacket() if (m_verbose) { etiLog.level(debug) << "Fragment origin stats: " << builder.visualise_fragment_origins(); } + af.pseq = m_next_pseq; + af.af_packet = afpacket; incrementNextPseq(); - return afpacket; } } else { @@ -588,7 +609,7 @@ std::vector PFT::getNextAFPacket() } } - return {}; + return af; } void PFT::setMaxDelay(size_t num_af_packets) diff --git a/lib/edi/PFT.hpp b/lib/edi/PFT.hpp index 08dca45..aa5b9d3 100644 --- a/lib/edi/PFT.hpp +++ b/lib/edi/PFT.hpp @@ -1,6 +1,6 @@ /* ------------------------------------------------------------------ * Copyright (C) 2017 AVT GmbH - Fabien Vercasson - * Copyright (C) 2017 Matthias P. Braendli + * Copyright (C) 2021 Matthias P. Braendli * matthias.braendli@mpb.li * * http://opendigitalradio.org @@ -101,20 +101,20 @@ class AFBuilder void pushPFTFrag(const Fragment &frag); /* Assess if it may be possible to decode this AF packet */ - decode_attempt_result_t canAttemptToDecode() const; + decode_attempt_result_t canAttemptToDecode(); /* Try to build the AF with received fragments. * Apply error correction if necessary (missing packets/CRC errors) * \return an empty vector if building the AF is not possible */ - std::vector extractAF(void) const; + std::vector extractAF(); std::pair numberOfFragments(void) const { return {_fragments.size(), _Fcount}; } - std::string visualise() const; + std::string visualise(); std::string visualise_fragment_origins() const; @@ -135,6 +135,13 @@ class AFBuilder findex_t _Fcount; }; +struct afpacket_pft_t +{ + // validity of the struct is given by af_packet begin empty or not. + std::vector af_packet; + pseq_t pseq = 0; +}; + class PFT { public: @@ -145,7 +152,7 @@ class PFT * * \return an empty vector if building the AF is not possible */ - std::vector getNextAFPacket(void); + afpacket_pft_t getNextAFPacket(); /* Set the maximum delay in number of AF Packets before we * abandon decoding a given pseq. @@ -156,7 +163,7 @@ class PFT void setVerbose(bool enable); private: - void incrementNextPseq(void); + void incrementNextPseq(); pseq_t m_next_pseq; size_t m_max_delay = 10; // in AF packets diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp index 7907656..2f20391 100644 --- a/lib/edi/common.cpp +++ b/lib/edi/common.cpp @@ -132,7 +132,7 @@ std::string tag_name_to_human_readable(const tag_name_t& name) TagDispatcher::TagDispatcher( std::function&& af_packet_completed) : m_af_packet_completed(move(af_packet_completed)), - m_tagpacket_handler([](const std::vector& ignore){}) + m_tagpacket_handler([](const std::vector& /*ignore*/){}) { } @@ -145,7 +145,7 @@ void TagDispatcher::push_bytes(const vector &buf) { if (buf.empty()) { m_input_data.clear(); - m_last_seq_valid = false; + m_last_sequences.seq_valid = false; return; } @@ -168,6 +168,7 @@ void TagDispatcher::push_bytes(const vector &buf) m_input_data = remaining_data; } + m_last_sequences.pseq_valid = false; if (st.complete) { m_af_packet_completed(); } @@ -192,8 +193,10 @@ void TagDispatcher::push_bytes(const vector &buf) } auto af = m_pft.getNextAFPacket(); - if (not af.empty()) { - decode_state_t st = decode_afpacket(af); + if (not af.af_packet.empty()) { + const decode_state_t st = decode_afpacket(af.af_packet); + m_last_sequences.pseq = af.pseq; + m_last_sequences.pseq_valid = true; if (st.complete) { m_af_packet_completed(); @@ -217,6 +220,7 @@ void TagDispatcher::push_packet(const Packet &packet) if (buf[0] == 'A' and buf[1] == 'F') { const decode_state_t st = decode_afpacket(buf); + m_last_sequences.pseq_valid = false; if (st.complete) { m_af_packet_completed(); @@ -232,8 +236,10 @@ void TagDispatcher::push_packet(const Packet &packet) } auto af = m_pft.getNextAFPacket(); - if (not af.empty()) { - const decode_state_t st = decode_afpacket(af); + if (not af.af_packet.empty()) { + const decode_state_t st = decode_afpacket(af.af_packet); + m_last_sequences.pseq = af.pseq; + m_last_sequences.pseq_valid = true; if (st.complete) { m_af_packet_completed(); @@ -272,8 +278,8 @@ decode_state_t TagDispatcher::decode_afpacket( } // SEQ wraps at 0xFFFF, unsigned integer overflow is intentional - if (m_last_seq_valid) { - const uint16_t expected_seq = m_last_seq + 1; + if (m_last_sequences.seq_valid) { + const uint16_t expected_seq = m_last_sequences.seq + 1; if (expected_seq != seq) { etiLog.level(warn) << "EDI AF Packet sequence error, " << seq; m_ignored_tags.clear(); @@ -281,9 +287,9 @@ decode_state_t TagDispatcher::decode_afpacket( } else { etiLog.level(info) << "EDI AF Packet initial sequence number: " << seq; - m_last_seq_valid = true; + m_last_sequences.seq_valid = true; } - m_last_seq = seq; + m_last_sequences.seq = seq; bool has_crc = (input_data[8] & 0x80) ? true : false; uint8_t major_revision = (input_data[8] & 0x70) >> 4; diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp index 14b91ba..e8c57c1 100644 --- a/lib/edi/common.hpp +++ b/lib/edi/common.hpp @@ -68,6 +68,13 @@ struct Packet { Packet() {} }; +struct seq_info_t { + bool seq_valid = false; + uint16_t seq = 0; + bool pseq_valid = false; + uint16_t pseq = 0; +}; + /* The TagDispatcher takes care of decoding EDI, with or without PFT, and * will call functions when TAGs are encountered. * @@ -110,13 +117,16 @@ class TagDispatcher { using tagpacket_handler = std::function&)>; void register_tagpacket_handler(tagpacket_handler&& h); + seq_info_t get_seq_info() const { + return m_last_sequences; + } + private: decode_state_t decode_afpacket(const std::vector &input_data); bool decode_tagpacket(const std::vector &payload); PFT::PFT m_pft; - bool m_last_seq_valid = false; - uint16_t m_last_seq = 0; + seq_info_t m_last_sequences; std::vector m_input_data; std::map m_handlers; std::function m_af_packet_completed; -- cgit v1.2.3