summaryrefslogtreecommitdiffstats
path: root/lib/edi
diff options
context:
space:
mode:
Diffstat (limited to 'lib/edi')
-rw-r--r--lib/edi/PFT.cpp40
-rw-r--r--lib/edi/PFT.hpp7
-rw-r--r--lib/edi/STIDecoder.cpp18
-rw-r--r--lib/edi/STIDecoder.hpp7
-rw-r--r--lib/edi/STIWriter.cpp1
-rw-r--r--lib/edi/STIWriter.hpp1
-rw-r--r--lib/edi/common.cpp31
-rw-r--r--lib/edi/common.hpp16
8 files changed, 97 insertions, 24 deletions
diff --git a/lib/edi/PFT.cpp b/lib/edi/PFT.cpp
index 158b206..85d6b63 100644
--- a/lib/edi/PFT.cpp
+++ b/lib/edi/PFT.cpp
@@ -23,6 +23,7 @@
#include <cstdio>
#include <cassert>
#include <cstring>
+#include <cmath>
#include <sstream>
#include <stdexcept>
#include <algorithm>
@@ -109,11 +110,18 @@ class FECDecoder {
size_t Fragment::loadData(const std::vector<uint8_t> &buf)
{
+ return loadData(buf, 0);
+}
+
+size_t Fragment::loadData(const std::vector<uint8_t> &buf, int received_on_port)
+{
const size_t header_len = 14;
if (buf.size() < header_len) {
return 0;
}
+ this->received_on_port = received_on_port;
+
size_t index = 0;
// Parse PFT Fragment Header (ETSI TS 102 821 V1.4.1 ch7.1)
@@ -461,6 +469,32 @@ std::string AFBuilder::visualise() const
return ss.str();
}
+std::string AFBuilder::visualise_fragment_origins() const
+{
+ stringstream ss;
+ if (_fragments.size() == 0) {
+ return "No fragments";
+ }
+ else {
+ ss << _fragments.size() << " fragments: ";
+ }
+
+ std::map<int, size_t> port_count;
+
+ for (const auto& f : _fragments) {
+ port_count[f.second.received_on_port]++;
+ }
+
+ for (const auto& p : port_count) {
+ ss << "p" << p.first << " " <<
+ std::round(100.0 * ((double)p.second) / (double)_fragments.size()) << "% ";
+ }
+
+ ss << "\n";
+
+ return ss.str();
+}
+
void PFT::pushPFTFrag(const Fragment &fragment)
{
// Start decoding the first pseq we receive. In normal
@@ -518,6 +552,9 @@ std::vector<uint8_t> PFT::getNextAFPacket()
if (builder.canAttemptToDecode() == dar_t::yes) {
auto afpacket = builder.extractAF();
assert(not afpacket.empty());
+ if (m_verbose) {
+ etiLog.level(debug) << "Fragment origin stats: " << builder.visualise_fragment_origins();
+ }
incrementNextPseq();
return afpacket;
}
@@ -533,6 +570,9 @@ std::vector<uint8_t> PFT::getNextAFPacket()
if (afpacket.empty()) {
etiLog.log(debug,"pseq %d timed out after RS", m_next_pseq);
}
+ if (m_verbose) {
+ etiLog.level(debug) << "Fragment origin stats: " << builder.visualise_fragment_origins();
+ }
incrementNextPseq();
return afpacket;
}
diff --git a/lib/edi/PFT.hpp b/lib/edi/PFT.hpp
index 208fd70..08dca45 100644
--- a/lib/edi/PFT.hpp
+++ b/lib/edi/PFT.hpp
@@ -36,11 +36,14 @@ using findex_t = uint32_t; // findex is a 24-bit value
class Fragment
{
public:
+ int received_on_port = 0;
+
// Load the data for one fragment from buf into
// the Fragment.
// \returns the number of bytes of useful data found in buf
// A non-zero return value doesn't imply a valid fragment
// the isValid() method must be used to verify this.
+ size_t loadData(const std::vector<uint8_t> &buf, int received_on_port);
size_t loadData(const std::vector<uint8_t> &buf);
bool isValid() const { return _valid; }
@@ -111,7 +114,9 @@ class AFBuilder
return {_fragments.size(), _Fcount};
}
- std::string visualise(void) const;
+ std::string visualise() const;
+
+ std::string visualise_fragment_origins() const;
/* The user of this instance can keep track of the lifetime of this
* builder
diff --git a/lib/edi/STIDecoder.cpp b/lib/edi/STIDecoder.cpp
index b6b9878..99f7c11 100644
--- a/lib/edi/STIDecoder.cpp
+++ b/lib/edi/STIDecoder.cpp
@@ -60,9 +60,9 @@ void STIDecoder::push_bytes(const vector<uint8_t> &buf)
m_dispatcher.push_bytes(buf);
}
-void STIDecoder::push_packet(const vector<uint8_t> &buf)
+void STIDecoder::push_packet(Packet &pack)
{
- m_dispatcher.push_packet(buf);
+ m_dispatcher.push_packet(pack);
}
void STIDecoder::setMaxDelay(int num_af_packets)
@@ -107,7 +107,7 @@ bool STIDecoder::decode_dsti(const std::vector<uint8_t>& value, const tag_name_t
uint8_t dfcth = (dstiHeader >> 8) & 0x1F;
uint8_t dfctl = dstiHeader & 0xFF;
- md.dflc = dfcth * 250 + dfctl; // modulo 5000 counter
+ md.dlfc = dfcth * 250 + dfctl; // modulo 5000 counter
const size_t expected_length = 2 +
(md.stihf ? 3 : 0) +
@@ -115,10 +115,14 @@ bool STIDecoder::decode_dsti(const std::vector<uint8_t>& value, const tag_name_t
(md.rfadf ? 9 : 0);
if (value.size() != expected_length) {
- throw std::runtime_error("EDI dsti: decoding error:"
- "value.size() != expected_length: " +
- to_string(value.size()) + " " +
- to_string(expected_length));
+ etiLog.level(warn) << "EDI dsti: decoding error: " <<
+ "value.size() != expected_length: " <<
+ value.size() << " " <<
+ expected_length << " " <<
+ (md.stihf ? "STIHF " : " ") <<
+ (md.atstf ? "ATSTF " : " ") <<
+ (md.rfadf ? "RFADF " : " ");
+ return false;
}
if (md.stihf) {
diff --git a/lib/edi/STIDecoder.hpp b/lib/edi/STIDecoder.hpp
index 9d55728..28887f2 100644
--- a/lib/edi/STIDecoder.hpp
+++ b/lib/edi/STIDecoder.hpp
@@ -34,7 +34,7 @@ struct sti_management_data {
bool stihf;
bool atstf;
bool rfadf;
- uint16_t dflc;
+ uint16_t dlfc;
uint32_t tsta;
};
@@ -104,14 +104,15 @@ class STIDecoder {
/* Push bytes into the decoder. The buf can contain more
* than a single packet. This is useful when reading from streams
- * (files, TCP)
+ * (files, TCP). Pushing an empty buf will clear the internal decoder
+ * state to ensure realignment (e.g. on stream reconnection)
*/
void push_bytes(const std::vector<uint8_t> &buf);
/* Push a complete packet into the decoder. Useful for UDP and other
* datagram-oriented protocols.
*/
- void push_packet(const std::vector<uint8_t> &buf);
+ void push_packet(Packet &pack);
/* Set the maximum delay in number of AF Packets before we
* abandon decoding a given pseq.
diff --git a/lib/edi/STIWriter.cpp b/lib/edi/STIWriter.cpp
index a7e4f20..29f0124 100644
--- a/lib/edi/STIWriter.cpp
+++ b/lib/edi/STIWriter.cpp
@@ -123,6 +123,7 @@ void STIWriter::assemble()
// TODO check time validity
sti_frame_t stiFrame;
+ stiFrame.dlfc = m_management_data.dlfc;
stiFrame.frame = move(m_payload.istd);
stiFrame.timestamp.seconds = m_seconds;
stiFrame.timestamp.utco = m_utco;
diff --git a/lib/edi/STIWriter.hpp b/lib/edi/STIWriter.hpp
index fc08e97..a7a5cda 100644
--- a/lib/edi/STIWriter.hpp
+++ b/lib/edi/STIWriter.hpp
@@ -32,6 +32,7 @@ namespace EdiDecoder {
struct sti_frame_t {
std::vector<uint8_t> frame;
+ uint16_t dlfc;
frame_timestamp_t timestamp;
audio_level_data audio_levels;
odr_version_data version_data;
diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp
index 306261a..7907656 100644
--- a/lib/edi/common.cpp
+++ b/lib/edi/common.cpp
@@ -22,6 +22,7 @@
#include "buffer_unpack.hpp"
#include "Log.h"
#include "crc.h"
+#include <algorithm>
#include <sstream>
#include <cassert>
#include <cmath>
@@ -142,6 +143,12 @@ void TagDispatcher::set_verbose(bool verbose)
void TagDispatcher::push_bytes(const vector<uint8_t> &buf)
{
+ if (buf.empty()) {
+ m_input_data.clear();
+ m_last_seq_valid = false;
+ return;
+ }
+
copy(buf.begin(), buf.end(), back_inserter(m_input_data));
while (m_input_data.size() > 2) {
@@ -194,14 +201,16 @@ void TagDispatcher::push_bytes(const vector<uint8_t> &buf)
}
}
else {
- etiLog.log(warn,"Unknown %c!", *m_input_data.data());
+ etiLog.log(warn, "Unknown 0x%02x!", *m_input_data.data());
m_input_data.erase(m_input_data.begin());
}
}
}
-void TagDispatcher::push_packet(const vector<uint8_t> &buf)
+void TagDispatcher::push_packet(const Packet &packet)
{
+ auto& buf = packet.buf;
+
if (buf.size() < 2) {
throw std::invalid_argument("Not enough bytes to read EDI packet header");
}
@@ -216,7 +225,7 @@ void TagDispatcher::push_packet(const vector<uint8_t> &buf)
}
else if (buf[0] == 'P' and buf[1] == 'F') {
PFT::Fragment fragment;
- fragment.loadData(buf);
+ fragment.loadData(buf, packet.received_on_port);
if (fragment.isValid()) {
m_pft.pushPFTFrag(fragment);
@@ -232,11 +241,10 @@ void TagDispatcher::push_packet(const vector<uint8_t> &buf)
}
}
else {
- const char packettype[3] = {(char)buf[0], (char)buf[1], '\0'};
std::stringstream ss;
- ss << "Unknown EDI packet ";
- ss << packettype;
- throw std::invalid_argument(ss.str());
+ ss << "Unknown EDI packet " << std::hex << (int)buf[0] << " " << (int)buf[1];
+ m_ignored_tags.clear();
+ throw invalid_argument(ss.str());
}
}
@@ -268,6 +276,7 @@ decode_state_t TagDispatcher::decode_afpacket(
const uint16_t expected_seq = m_last_seq + 1;
if (expected_seq != seq) {
etiLog.level(warn) << "EDI AF Packet sequence error, " << seq;
+ m_ignored_tags.clear();
}
}
else {
@@ -303,8 +312,7 @@ decode_state_t TagDispatcher::decode_afpacket(
uint16_t packet_crc = read_16b(input_data.begin() + AFPACKET_HEADER_LEN + taglength);
if (packet_crc != crc) {
- throw invalid_argument(
- "AF Packet crc wrong");
+ throw invalid_argument("AF Packet crc wrong");
}
else {
vector<uint8_t> payload(taglength);
@@ -379,7 +387,10 @@ bool TagDispatcher::decode_tagpacket(const vector<uint8_t> &payload)
}
if (not found) {
- etiLog.log(warn, "Ignoring unknown TAG %s", tag.c_str());
+ if (std::find(m_ignored_tags.begin(), m_ignored_tags.end(), tag) == m_ignored_tags.end()) {
+ etiLog.log(warn, "Ignoring unknown TAG %s", tag.c_str());
+ m_ignored_tags.push_back(tag);
+ }
break;
}
diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp
index c8c4bb3..14b91ba 100644
--- a/lib/edi/common.hpp
+++ b/lib/edi/common.hpp
@@ -60,6 +60,14 @@ using tag_name_t = std::array<uint8_t, 4>;
std::string tag_name_to_human_readable(const tag_name_t& name);
+struct Packet {
+ std::vector<uint8_t> buf;
+ int received_on_port;
+
+ Packet(std::vector<uint8_t>&& b) : buf(b), received_on_port(0) { }
+ Packet() {}
+};
+
/* The TagDispatcher takes care of decoding EDI, with or without PFT, and
* will call functions when TAGs are encountered.
*
@@ -72,17 +80,17 @@ class TagDispatcher {
void set_verbose(bool verbose);
-
/* Push bytes into the decoder. The buf can contain more
* than a single packet. This is useful when reading from streams
- * (files, TCP)
+ * (files, TCP). Pushing an empty buf will clear the internal decoder
+ * state to ensure realignment (e.g. on stream reconnection)
*/
void push_bytes(const std::vector<uint8_t> &buf);
/* Push a complete packet into the decoder. Useful for UDP and other
* datagram-oriented protocols.
*/
- void push_packet(const std::vector<uint8_t> &buf);
+ void push_packet(const Packet &packet);
/* Set the maximum delay in number of AF Packets before we
* abandon decoding a given pseq.
@@ -113,6 +121,8 @@ class TagDispatcher {
std::map<std::string, tag_handler> m_handlers;
std::function<void()> m_af_packet_completed;
tagpacket_handler m_tagpacket_handler;
+
+ std::vector<std::string> m_ignored_tags;
};
// Data carried inside the ODRv EDI TAG