From 12670a017ddb14fbf4a932799051dcfe21dd6c78 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 15 Jan 2021 07:09:03 +0100 Subject: Common 6b5db53: Update zmq.hpp, TCPReceiveServer, EDI decoder and output --- lib/edi/PFT.cpp | 40 ++++++++++++++++++++++++++++++++++++++++ lib/edi/PFT.hpp | 7 ++++++- lib/edi/STIDecoder.cpp | 18 +++++++++++------- lib/edi/STIDecoder.hpp | 7 ++++--- lib/edi/STIWriter.cpp | 1 + lib/edi/STIWriter.hpp | 1 + lib/edi/common.cpp | 31 +++++++++++++++++++++---------- lib/edi/common.hpp | 16 +++++++++++++--- 8 files changed, 97 insertions(+), 24 deletions(-) (limited to 'lib/edi') diff --git a/lib/edi/PFT.cpp b/lib/edi/PFT.cpp index 158b206..85d6b63 100644 --- a/lib/edi/PFT.cpp +++ b/lib/edi/PFT.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -108,12 +109,19 @@ class FECDecoder { }; size_t Fragment::loadData(const std::vector &buf) +{ + return loadData(buf, 0); +} + +size_t Fragment::loadData(const std::vector &buf, int received_on_port) { const size_t header_len = 14; if (buf.size() < header_len) { return 0; } + this->received_on_port = received_on_port; + size_t index = 0; // Parse PFT Fragment Header (ETSI TS 102 821 V1.4.1 ch7.1) @@ -461,6 +469,32 @@ std::string AFBuilder::visualise() const return ss.str(); } +std::string AFBuilder::visualise_fragment_origins() const +{ + stringstream ss; + if (_fragments.size() == 0) { + return "No fragments"; + } + else { + ss << _fragments.size() << " fragments: "; + } + + std::map port_count; + + for (const auto& f : _fragments) { + port_count[f.second.received_on_port]++; + } + + for (const auto& p : port_count) { + ss << "p" << p.first << " " << + std::round(100.0 * ((double)p.second) / (double)_fragments.size()) << "% "; + } + + ss << "\n"; + + return ss.str(); +} + void PFT::pushPFTFrag(const Fragment &fragment) { // Start decoding the first pseq we receive. In normal @@ -518,6 +552,9 @@ std::vector PFT::getNextAFPacket() if (builder.canAttemptToDecode() == dar_t::yes) { auto afpacket = builder.extractAF(); assert(not afpacket.empty()); + if (m_verbose) { + etiLog.level(debug) << "Fragment origin stats: " << builder.visualise_fragment_origins(); + } incrementNextPseq(); return afpacket; } @@ -533,6 +570,9 @@ std::vector PFT::getNextAFPacket() if (afpacket.empty()) { etiLog.log(debug,"pseq %d timed out after RS", m_next_pseq); } + if (m_verbose) { + etiLog.level(debug) << "Fragment origin stats: " << builder.visualise_fragment_origins(); + } incrementNextPseq(); return afpacket; } diff --git a/lib/edi/PFT.hpp b/lib/edi/PFT.hpp index 208fd70..08dca45 100644 --- a/lib/edi/PFT.hpp +++ b/lib/edi/PFT.hpp @@ -36,11 +36,14 @@ using findex_t = uint32_t; // findex is a 24-bit value class Fragment { public: + int received_on_port = 0; + // Load the data for one fragment from buf into // the Fragment. // \returns the number of bytes of useful data found in buf // A non-zero return value doesn't imply a valid fragment // the isValid() method must be used to verify this. + size_t loadData(const std::vector &buf, int received_on_port); size_t loadData(const std::vector &buf); bool isValid() const { return _valid; } @@ -111,7 +114,9 @@ class AFBuilder return {_fragments.size(), _Fcount}; } - std::string visualise(void) const; + std::string visualise() const; + + std::string visualise_fragment_origins() const; /* The user of this instance can keep track of the lifetime of this * builder diff --git a/lib/edi/STIDecoder.cpp b/lib/edi/STIDecoder.cpp index b6b9878..99f7c11 100644 --- a/lib/edi/STIDecoder.cpp +++ b/lib/edi/STIDecoder.cpp @@ -60,9 +60,9 @@ void STIDecoder::push_bytes(const vector &buf) m_dispatcher.push_bytes(buf); } -void STIDecoder::push_packet(const vector &buf) +void STIDecoder::push_packet(Packet &pack) { - m_dispatcher.push_packet(buf); + m_dispatcher.push_packet(pack); } void STIDecoder::setMaxDelay(int num_af_packets) @@ -107,7 +107,7 @@ bool STIDecoder::decode_dsti(const std::vector& value, const tag_name_t uint8_t dfcth = (dstiHeader >> 8) & 0x1F; uint8_t dfctl = dstiHeader & 0xFF; - md.dflc = dfcth * 250 + dfctl; // modulo 5000 counter + md.dlfc = dfcth * 250 + dfctl; // modulo 5000 counter const size_t expected_length = 2 + (md.stihf ? 3 : 0) + @@ -115,10 +115,14 @@ bool STIDecoder::decode_dsti(const std::vector& value, const tag_name_t (md.rfadf ? 9 : 0); if (value.size() != expected_length) { - throw std::runtime_error("EDI dsti: decoding error:" - "value.size() != expected_length: " + - to_string(value.size()) + " " + - to_string(expected_length)); + etiLog.level(warn) << "EDI dsti: decoding error: " << + "value.size() != expected_length: " << + value.size() << " " << + expected_length << " " << + (md.stihf ? "STIHF " : " ") << + (md.atstf ? "ATSTF " : " ") << + (md.rfadf ? "RFADF " : " "); + return false; } if (md.stihf) { diff --git a/lib/edi/STIDecoder.hpp b/lib/edi/STIDecoder.hpp index 9d55728..28887f2 100644 --- a/lib/edi/STIDecoder.hpp +++ b/lib/edi/STIDecoder.hpp @@ -34,7 +34,7 @@ struct sti_management_data { bool stihf; bool atstf; bool rfadf; - uint16_t dflc; + uint16_t dlfc; uint32_t tsta; }; @@ -104,14 +104,15 @@ class STIDecoder { /* Push bytes into the decoder. The buf can contain more * than a single packet. This is useful when reading from streams - * (files, TCP) + * (files, TCP). Pushing an empty buf will clear the internal decoder + * state to ensure realignment (e.g. on stream reconnection) */ void push_bytes(const std::vector &buf); /* Push a complete packet into the decoder. Useful for UDP and other * datagram-oriented protocols. */ - void push_packet(const std::vector &buf); + void push_packet(Packet &pack); /* Set the maximum delay in number of AF Packets before we * abandon decoding a given pseq. diff --git a/lib/edi/STIWriter.cpp b/lib/edi/STIWriter.cpp index a7e4f20..29f0124 100644 --- a/lib/edi/STIWriter.cpp +++ b/lib/edi/STIWriter.cpp @@ -123,6 +123,7 @@ void STIWriter::assemble() // TODO check time validity sti_frame_t stiFrame; + stiFrame.dlfc = m_management_data.dlfc; stiFrame.frame = move(m_payload.istd); stiFrame.timestamp.seconds = m_seconds; stiFrame.timestamp.utco = m_utco; diff --git a/lib/edi/STIWriter.hpp b/lib/edi/STIWriter.hpp index fc08e97..a7a5cda 100644 --- a/lib/edi/STIWriter.hpp +++ b/lib/edi/STIWriter.hpp @@ -32,6 +32,7 @@ namespace EdiDecoder { struct sti_frame_t { std::vector frame; + uint16_t dlfc; frame_timestamp_t timestamp; audio_level_data audio_levels; odr_version_data version_data; diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp index 306261a..7907656 100644 --- a/lib/edi/common.cpp +++ b/lib/edi/common.cpp @@ -22,6 +22,7 @@ #include "buffer_unpack.hpp" #include "Log.h" #include "crc.h" +#include #include #include #include @@ -142,6 +143,12 @@ void TagDispatcher::set_verbose(bool verbose) void TagDispatcher::push_bytes(const vector &buf) { + if (buf.empty()) { + m_input_data.clear(); + m_last_seq_valid = false; + return; + } + copy(buf.begin(), buf.end(), back_inserter(m_input_data)); while (m_input_data.size() > 2) { @@ -194,14 +201,16 @@ void TagDispatcher::push_bytes(const vector &buf) } } else { - etiLog.log(warn,"Unknown %c!", *m_input_data.data()); + etiLog.log(warn, "Unknown 0x%02x!", *m_input_data.data()); m_input_data.erase(m_input_data.begin()); } } } -void TagDispatcher::push_packet(const vector &buf) +void TagDispatcher::push_packet(const Packet &packet) { + auto& buf = packet.buf; + if (buf.size() < 2) { throw std::invalid_argument("Not enough bytes to read EDI packet header"); } @@ -216,7 +225,7 @@ void TagDispatcher::push_packet(const vector &buf) } else if (buf[0] == 'P' and buf[1] == 'F') { PFT::Fragment fragment; - fragment.loadData(buf); + fragment.loadData(buf, packet.received_on_port); if (fragment.isValid()) { m_pft.pushPFTFrag(fragment); @@ -232,11 +241,10 @@ void TagDispatcher::push_packet(const vector &buf) } } else { - const char packettype[3] = {(char)buf[0], (char)buf[1], '\0'}; std::stringstream ss; - ss << "Unknown EDI packet "; - ss << packettype; - throw std::invalid_argument(ss.str()); + ss << "Unknown EDI packet " << std::hex << (int)buf[0] << " " << (int)buf[1]; + m_ignored_tags.clear(); + throw invalid_argument(ss.str()); } } @@ -268,6 +276,7 @@ decode_state_t TagDispatcher::decode_afpacket( const uint16_t expected_seq = m_last_seq + 1; if (expected_seq != seq) { etiLog.level(warn) << "EDI AF Packet sequence error, " << seq; + m_ignored_tags.clear(); } } else { @@ -303,8 +312,7 @@ decode_state_t TagDispatcher::decode_afpacket( uint16_t packet_crc = read_16b(input_data.begin() + AFPACKET_HEADER_LEN + taglength); if (packet_crc != crc) { - throw invalid_argument( - "AF Packet crc wrong"); + throw invalid_argument("AF Packet crc wrong"); } else { vector payload(taglength); @@ -379,7 +387,10 @@ bool TagDispatcher::decode_tagpacket(const vector &payload) } if (not found) { - etiLog.log(warn, "Ignoring unknown TAG %s", tag.c_str()); + if (std::find(m_ignored_tags.begin(), m_ignored_tags.end(), tag) == m_ignored_tags.end()) { + etiLog.log(warn, "Ignoring unknown TAG %s", tag.c_str()); + m_ignored_tags.push_back(tag); + } break; } diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp index c8c4bb3..14b91ba 100644 --- a/lib/edi/common.hpp +++ b/lib/edi/common.hpp @@ -60,6 +60,14 @@ using tag_name_t = std::array; std::string tag_name_to_human_readable(const tag_name_t& name); +struct Packet { + std::vector buf; + int received_on_port; + + Packet(std::vector&& b) : buf(b), received_on_port(0) { } + Packet() {} +}; + /* The TagDispatcher takes care of decoding EDI, with or without PFT, and * will call functions when TAGs are encountered. * @@ -72,17 +80,17 @@ class TagDispatcher { void set_verbose(bool verbose); - /* Push bytes into the decoder. The buf can contain more * than a single packet. This is useful when reading from streams - * (files, TCP) + * (files, TCP). Pushing an empty buf will clear the internal decoder + * state to ensure realignment (e.g. on stream reconnection) */ void push_bytes(const std::vector &buf); /* Push a complete packet into the decoder. Useful for UDP and other * datagram-oriented protocols. */ - void push_packet(const std::vector &buf); + void push_packet(const Packet &packet); /* Set the maximum delay in number of AF Packets before we * abandon decoding a given pseq. @@ -113,6 +121,8 @@ class TagDispatcher { std::map m_handlers; std::function m_af_packet_completed; tagpacket_handler m_tagpacket_handler; + + std::vector m_ignored_tags; }; // Data carried inside the ODRv EDI TAG -- cgit v1.2.3