From d14814a92377084177753c7a60d83a9307ad0672 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sat, 16 Jan 2021 08:06:09 +0100 Subject: Update common code to latest, update zmq.hpp and adapt --- lib/edi/ETIDecoder.cpp | 6 +++--- lib/edi/ETIDecoder.hpp | 6 +++--- lib/edi/PFT.cpp | 40 ++++++++++++++++++++++++++++++++++++++++ lib/edi/PFT.hpp | 7 ++++++- lib/edi/README.md | 2 +- lib/edi/common.cpp | 31 +++++++++++++++++++++---------- lib/edi/common.hpp | 16 +++++++++++++--- 7 files changed, 87 insertions(+), 21 deletions(-) (limited to 'lib/edi') diff --git a/lib/edi/ETIDecoder.cpp b/lib/edi/ETIDecoder.cpp index 88a7333..656f50b 100644 --- a/lib/edi/ETIDecoder.cpp +++ b/lib/edi/ETIDecoder.cpp @@ -57,9 +57,9 @@ void ETIDecoder::push_bytes(const vector &buf) m_dispatcher.push_bytes(buf); } -void ETIDecoder::push_packet(const vector &buf) +void ETIDecoder::push_packet(Packet& pack) { - m_dispatcher.push_packet(buf); + m_dispatcher.push_packet(pack); } void ETIDecoder::setMaxDelay(int num_af_packets) @@ -107,7 +107,7 @@ bool ETIDecoder::decode_deti(const std::vector& value, const tag_name_t uint8_t fcth = (detiHeader >> 8) & 0x1F; uint8_t fct = detiHeader & 0xFF; - fc.dflc = fcth * 250 + fct; // modulo 5000 counter + fc.dlfc = fcth * 250 + fct; // modulo 5000 counter uint32_t etiHeader = read_32b(value.begin() + 2); diff --git a/lib/edi/ETIDecoder.hpp b/lib/edi/ETIDecoder.hpp index ffa9037..e0865ce 100644 --- a/lib/edi/ETIDecoder.hpp +++ b/lib/edi/ETIDecoder.hpp @@ -38,11 +38,11 @@ struct eti_fc_data { bool atstf; uint32_t tsta; bool ficf; - uint16_t dflc; + uint16_t dlfc; uint8_t mid; uint8_t fp; - uint8_t fct(void) const { return dflc % 250; } + uint8_t fct(void) const { return dlfc % 250; } }; // Information for a subchannel available in EDI @@ -119,7 +119,7 @@ class ETIDecoder { /* Push a complete packet into the decoder. Useful for UDP and other * datagram-oriented protocols. */ - void push_packet(const std::vector &buf); + void push_packet(Packet &pack); /* Set the maximum delay in number of AF Packets before we * abandon decoding a given pseq. diff --git a/lib/edi/PFT.cpp b/lib/edi/PFT.cpp index 158b206..85d6b63 100644 --- a/lib/edi/PFT.cpp +++ b/lib/edi/PFT.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -108,12 +109,19 @@ class FECDecoder { }; size_t Fragment::loadData(const std::vector &buf) +{ + return loadData(buf, 0); +} + +size_t Fragment::loadData(const std::vector &buf, int received_on_port) { const size_t header_len = 14; if (buf.size() < header_len) { return 0; } + this->received_on_port = received_on_port; + size_t index = 0; // Parse PFT Fragment Header (ETSI TS 102 821 V1.4.1 ch7.1) @@ -461,6 +469,32 @@ std::string AFBuilder::visualise() const return ss.str(); } +std::string AFBuilder::visualise_fragment_origins() const +{ + stringstream ss; + if (_fragments.size() == 0) { + return "No fragments"; + } + else { + ss << _fragments.size() << " fragments: "; + } + + std::map port_count; + + for (const auto& f : _fragments) { + port_count[f.second.received_on_port]++; + } + + for (const auto& p : port_count) { + ss << "p" << p.first << " " << + std::round(100.0 * ((double)p.second) / (double)_fragments.size()) << "% "; + } + + ss << "\n"; + + return ss.str(); +} + void PFT::pushPFTFrag(const Fragment &fragment) { // Start decoding the first pseq we receive. In normal @@ -518,6 +552,9 @@ std::vector PFT::getNextAFPacket() if (builder.canAttemptToDecode() == dar_t::yes) { auto afpacket = builder.extractAF(); assert(not afpacket.empty()); + if (m_verbose) { + etiLog.level(debug) << "Fragment origin stats: " << builder.visualise_fragment_origins(); + } incrementNextPseq(); return afpacket; } @@ -533,6 +570,9 @@ std::vector PFT::getNextAFPacket() if (afpacket.empty()) { etiLog.log(debug,"pseq %d timed out after RS", m_next_pseq); } + if (m_verbose) { + etiLog.level(debug) << "Fragment origin stats: " << builder.visualise_fragment_origins(); + } incrementNextPseq(); return afpacket; } diff --git a/lib/edi/PFT.hpp b/lib/edi/PFT.hpp index 208fd70..08dca45 100644 --- a/lib/edi/PFT.hpp +++ b/lib/edi/PFT.hpp @@ -36,11 +36,14 @@ using findex_t = uint32_t; // findex is a 24-bit value class Fragment { public: + int received_on_port = 0; + // Load the data for one fragment from buf into // the Fragment. // \returns the number of bytes of useful data found in buf // A non-zero return value doesn't imply a valid fragment // the isValid() method must be used to verify this. + size_t loadData(const std::vector &buf, int received_on_port); size_t loadData(const std::vector &buf); bool isValid() const { return _valid; } @@ -111,7 +114,9 @@ class AFBuilder return {_fragments.size(), _Fcount}; } - std::string visualise(void) const; + std::string visualise() const; + + std::string visualise_fragment_origins() const; /* The user of this instance can keep track of the lifetime of this * builder diff --git a/lib/edi/README.md b/lib/edi/README.md index b6ab67a..535a65c 100644 --- a/lib/edi/README.md +++ b/lib/edi/README.md @@ -1 +1 @@ -These files are copied from the odr-edilib project. +These files are copied from the common ODR code repository. diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp index 306261a..7907656 100644 --- a/lib/edi/common.cpp +++ b/lib/edi/common.cpp @@ -22,6 +22,7 @@ #include "buffer_unpack.hpp" #include "Log.h" #include "crc.h" +#include #include #include #include @@ -142,6 +143,12 @@ void TagDispatcher::set_verbose(bool verbose) void TagDispatcher::push_bytes(const vector &buf) { + if (buf.empty()) { + m_input_data.clear(); + m_last_seq_valid = false; + return; + } + copy(buf.begin(), buf.end(), back_inserter(m_input_data)); while (m_input_data.size() > 2) { @@ -194,14 +201,16 @@ void TagDispatcher::push_bytes(const vector &buf) } } else { - etiLog.log(warn,"Unknown %c!", *m_input_data.data()); + etiLog.log(warn, "Unknown 0x%02x!", *m_input_data.data()); m_input_data.erase(m_input_data.begin()); } } } -void TagDispatcher::push_packet(const vector &buf) +void TagDispatcher::push_packet(const Packet &packet) { + auto& buf = packet.buf; + if (buf.size() < 2) { throw std::invalid_argument("Not enough bytes to read EDI packet header"); } @@ -216,7 +225,7 @@ void TagDispatcher::push_packet(const vector &buf) } else if (buf[0] == 'P' and buf[1] == 'F') { PFT::Fragment fragment; - fragment.loadData(buf); + fragment.loadData(buf, packet.received_on_port); if (fragment.isValid()) { m_pft.pushPFTFrag(fragment); @@ -232,11 +241,10 @@ void TagDispatcher::push_packet(const vector &buf) } } else { - const char packettype[3] = {(char)buf[0], (char)buf[1], '\0'}; std::stringstream ss; - ss << "Unknown EDI packet "; - ss << packettype; - throw std::invalid_argument(ss.str()); + ss << "Unknown EDI packet " << std::hex << (int)buf[0] << " " << (int)buf[1]; + m_ignored_tags.clear(); + throw invalid_argument(ss.str()); } } @@ -268,6 +276,7 @@ decode_state_t TagDispatcher::decode_afpacket( const uint16_t expected_seq = m_last_seq + 1; if (expected_seq != seq) { etiLog.level(warn) << "EDI AF Packet sequence error, " << seq; + m_ignored_tags.clear(); } } else { @@ -303,8 +312,7 @@ decode_state_t TagDispatcher::decode_afpacket( uint16_t packet_crc = read_16b(input_data.begin() + AFPACKET_HEADER_LEN + taglength); if (packet_crc != crc) { - throw invalid_argument( - "AF Packet crc wrong"); + throw invalid_argument("AF Packet crc wrong"); } else { vector payload(taglength); @@ -379,7 +387,10 @@ bool TagDispatcher::decode_tagpacket(const vector &payload) } if (not found) { - etiLog.log(warn, "Ignoring unknown TAG %s", tag.c_str()); + if (std::find(m_ignored_tags.begin(), m_ignored_tags.end(), tag) == m_ignored_tags.end()) { + etiLog.log(warn, "Ignoring unknown TAG %s", tag.c_str()); + m_ignored_tags.push_back(tag); + } break; } diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp index c8c4bb3..14b91ba 100644 --- a/lib/edi/common.hpp +++ b/lib/edi/common.hpp @@ -60,6 +60,14 @@ using tag_name_t = std::array; std::string tag_name_to_human_readable(const tag_name_t& name); +struct Packet { + std::vector buf; + int received_on_port; + + Packet(std::vector&& b) : buf(b), received_on_port(0) { } + Packet() {} +}; + /* The TagDispatcher takes care of decoding EDI, with or without PFT, and * will call functions when TAGs are encountered. * @@ -72,17 +80,17 @@ class TagDispatcher { void set_verbose(bool verbose); - /* Push bytes into the decoder. The buf can contain more * than a single packet. This is useful when reading from streams - * (files, TCP) + * (files, TCP). Pushing an empty buf will clear the internal decoder + * state to ensure realignment (e.g. on stream reconnection) */ void push_bytes(const std::vector &buf); /* Push a complete packet into the decoder. Useful for UDP and other * datagram-oriented protocols. */ - void push_packet(const std::vector &buf); + void push_packet(const Packet &packet); /* Set the maximum delay in number of AF Packets before we * abandon decoding a given pseq. @@ -113,6 +121,8 @@ class TagDispatcher { std::map m_handlers; std::function m_af_packet_completed; tagpacket_handler m_tagpacket_handler; + + std::vector m_ignored_tags; }; // Data carried inside the ODRv EDI TAG -- cgit v1.2.3