summaryrefslogtreecommitdiffstats
path: root/lib/edi/PFT.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/edi/PFT.cpp')
-rw-r--r--lib/edi/PFT.cpp574
1 files changed, 574 insertions, 0 deletions
diff --git a/lib/edi/PFT.cpp b/lib/edi/PFT.cpp
new file mode 100644
index 0000000..158b206
--- /dev/null
+++ b/lib/edi/PFT.cpp
@@ -0,0 +1,574 @@
+/* ------------------------------------------------------------------
+ * Copyright (C) 2017 AVT GmbH - Fabien Vercasson
+ * Copyright (C) 2017 Matthias P. Braendli
+ * matthias.braendli@mpb.li
+ *
+ * http://opendigitalradio.org
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ * -------------------------------------------------------------------
+ */
+
+#include <cstdio>
+#include <cassert>
+#include <cstring>
+#include <sstream>
+#include <stdexcept>
+#include <algorithm>
+#include "crc.h"
+#include "PFT.hpp"
+#include "Log.h"
+#include "buffer_unpack.hpp"
+extern "C" {
+#include "fec/fec.h"
+}
+
+namespace EdiDecoder {
+namespace PFT {
+
+using namespace std;
+
+const findex_t NUM_AFBUILDERS_TO_KEEP = 10;
+
+static bool checkCRC(const uint8_t *buf, size_t size)
+{
+ const uint16_t crc_from_packet = read_16b(buf + size - 2);
+ uint16_t crc_calc = 0xffff;
+ crc_calc = crc16(crc_calc, buf, size - 2);
+ crc_calc ^= 0xffff;
+
+ return crc_from_packet == crc_calc;
+}
+
+class FECDecoder {
+ public:
+ FECDecoder() {
+ m_rs_handler = init_rs_char(
+ symsize, gfPoly, firstRoot, primElem, nroots, pad);
+ }
+ FECDecoder(const FECDecoder& other) = delete;
+ FECDecoder& operator=(const FECDecoder& other) = delete;
+ ~FECDecoder() {
+ free_rs_char(m_rs_handler);
+ }
+
+ // return -1 in case of failure, non-negative value if errors
+ // were corrected.
+ // Known positions of erasures should be given in eras_pos to
+ // improve decoding probability. After calling this function
+ // eras_pos will contain the positions of the corrected errors.
+ int decode(vector<uint8_t> &data, vector<int> &eras_pos) {
+ assert(data.size() == N);
+ const size_t no_eras = eras_pos.size();
+
+ eras_pos.resize(nroots);
+ int num_err = decode_rs_char(m_rs_handler, data.data(),
+ eras_pos.data(), no_eras);
+ if (num_err > 0) {
+ eras_pos.resize(num_err);
+ }
+ return num_err;
+ }
+
+ // return -1 in case of failure, non-negative value if errors
+ // were corrected. No known erasures.
+ int decode(vector<uint8_t> &data) {
+ assert(data.size() == N);
+ int num_err = decode_rs_char(m_rs_handler, data.data(), nullptr, 0);
+ return num_err;
+ }
+
+ private:
+ void* m_rs_handler;
+
+ const int firstRoot = 1; // Discovered by analysing EDI dump
+ const int gfPoly = 0x11d;
+
+ // The encoding has to be 255, 207 always, because the chunk has to
+ // be padded at the end, and not at the beginning as libfec would
+ // do
+ const size_t N = 255;
+ const size_t K = 207;
+ const int primElem = 1;
+ const int symsize = 8;
+ const size_t nroots = N - K; // For EDI PFT, this must be 48
+ const size_t pad = ((1 << symsize) - 1) - N; // is 255-N
+
+};
+
+size_t Fragment::loadData(const std::vector<uint8_t> &buf)
+{
+ const size_t header_len = 14;
+ if (buf.size() < header_len) {
+ return 0;
+ }
+
+ size_t index = 0;
+
+ // Parse PFT Fragment Header (ETSI TS 102 821 V1.4.1 ch7.1)
+ if (not (buf[0] == 'P' and buf[1] == 'F') ) {
+ throw invalid_argument("Invalid PFT SYNC bytes");
+ }
+ index += 2; // Psync
+
+ _Pseq = read_16b(buf.begin()+index); index += 2;
+ _Findex = read_24b(buf.begin()+index); index += 3;
+ _Fcount = read_24b(buf.begin()+index); index += 3;
+ _FEC = unpack1bit(buf[index], 0);
+ _Addr = unpack1bit(buf[index], 1);
+ _Plen = read_16b(buf.begin()+index) & 0x3FFF; index += 2;
+
+ const size_t required_len = header_len +
+ (_FEC ? 1 : 0) +
+ (_Addr ? 2 : 0) +
+ 2; // CRC
+ if (buf.size() < required_len) {
+ return 0;
+ }
+
+ // Optional RS Header
+ _RSk = 0;
+ _RSz = 0;
+ if (_FEC) {
+ _RSk = buf[index]; index += 1;
+ _RSz = buf[index]; index += 1;
+ }
+
+ // Optional transport header
+ _Source = 0;
+ _Dest = 0;
+ if (_Addr) {
+ _Source = read_16b(buf.begin()+index); index += 2;
+ _Dest = read_16b(buf.begin()+index); index += 2;
+ }
+
+ index += 2;
+ const bool crc_valid = checkCRC(buf.data(), index);
+ const bool buf_has_enough_data = (buf.size() >= index + _Plen);
+
+ if (not buf_has_enough_data) {
+ return 0;
+ }
+
+ _valid = ((not _FEC) or crc_valid) and buf_has_enough_data;
+
+#if 0
+ if (!_valid) {
+ stringstream ss;
+ ss << "Invalid PF fragment: ";
+ if (_FEC) {
+ ss << " RSk=" << (uint32_t)_RSk << " RSz=" << (uint32_t)_RSz;
+ }
+
+ if (_Addr) {
+ ss << " Source=" << _Source << " Dest=" << _Dest;
+ }
+ etiLog.log(debug, "%s\n", ss.str().c_str());
+ }
+#endif
+
+ _payload.clear();
+ if (_valid) {
+ copy( buf.begin()+index,
+ buf.begin()+index+_Plen,
+ back_inserter(_payload));
+ index += _Plen;
+ }
+
+ return index;
+}
+
+
+AFBuilder::AFBuilder(pseq_t Pseq, findex_t Fcount, size_t lifetime)
+{
+ _Pseq = Pseq;
+ _Fcount = Fcount;
+ assert(lifetime > 0);
+ lifeTime = lifetime;
+}
+
+void AFBuilder::pushPFTFrag(const Fragment &frag)
+{
+ if (_Pseq != frag.Pseq() or _Fcount != frag.Fcount()) {
+ throw invalid_argument("Invalid PFT fragment Pseq or Fcount");
+ }
+ const auto Findex = frag.Findex();
+ const bool fragment_already_received = _fragments.count(Findex);
+
+ if (not fragment_already_received)
+ {
+ _fragments[Findex] = frag;
+ }
+}
+
+bool Fragment::checkConsistency(const Fragment& other) const
+{
+ /* Consistency check, TS 102 821 Clause 7.3.2.
+ *
+ * Every PFT Fragment produced from a single AF or RS Packet shall have
+ * the same values in all of the PFT Header fields except for the Findex,
+ * Plen and HCRC fields.
+ */
+
+ return other._Fcount == _Fcount and
+ other._FEC == _FEC and
+ other._RSk == _RSk and
+ other._RSz == _RSz and
+ other._Addr == _Addr and
+ other._Source == _Source and
+ other._Dest == _Dest and
+
+ /* The Plen field of all fragments shall be the s for the initial f-1
+ * fragments and s - (L%f) for the final fragment.
+ * Note that when Reed Solomon has been used, all fragments will be of
+ * length s.
+ */
+ (_FEC ? other._Plen == _Plen : true);
+}
+
+
+AFBuilder::decode_attempt_result_t AFBuilder::canAttemptToDecode() const
+{
+ if (_fragments.empty()) {
+ return AFBuilder::decode_attempt_result_t::no;
+ }
+
+ if (_fragments.size() == _Fcount) {
+ return AFBuilder::decode_attempt_result_t::yes;
+ }
+
+ /* Check that all fragments are consistent */
+ const Fragment& first = _fragments.begin()->second;
+ if (not std::all_of(_fragments.begin(), _fragments.end(),
+ [&](const pair<int, Fragment>& pair) {
+ const Fragment& frag = pair.second;
+ return first.checkConsistency(frag) and _Pseq == frag.Pseq();
+ }) ) {
+ throw invalid_argument("Inconsistent PFT fragments");
+ }
+
+ // Calculate the minimum number of fragments necessary to apply FEC.
+ // This can't be done with the last fragment that may have a
+ // smaller size
+ // ETSI TS 102 821 V1.4.1 ch 7.4.4
+ auto frag_it = _fragments.begin();
+ if (frag_it->second.Fcount() == _Fcount - 1) {
+ frag_it++;
+
+ if (frag_it == _fragments.end()) {
+ return AFBuilder::decode_attempt_result_t::no;
+ }
+ }
+
+ const Fragment& frag = frag_it->second;
+
+ if ( frag.FEC() )
+ {
+ const uint16_t _Plen = frag.Plen();
+
+ /* max number of RS chunks that may have been sent */
+ const uint32_t _cmax = (_Fcount*_Plen) / (frag.RSk()+48);
+ assert(_cmax > 0);
+
+ /* Receiving _rxmin fragments does not guarantee that decoding
+ * will succeed! */
+ const uint32_t _rxmin = _Fcount - (_cmax*48)/_Plen;
+
+ if (_fragments.size() >= _rxmin) {
+ return AFBuilder::decode_attempt_result_t::maybe;
+ }
+ }
+
+ return AFBuilder::decode_attempt_result_t::no;
+}
+
+std::vector<uint8_t> AFBuilder::extractAF() const
+{
+ if (not _af_packet.empty()) {
+ return _af_packet;
+ }
+
+ bool ok = false;
+
+ if (canAttemptToDecode() != AFBuilder::decode_attempt_result_t::no) {
+
+ auto frag_it = _fragments.begin();
+ if (frag_it->second.Fcount() == _Fcount - 1) {
+ frag_it++;
+
+ if (frag_it == _fragments.end()) {
+ throw std::runtime_error("Invalid attempt at extracting AF");
+ }
+ }
+
+ const Fragment& ref_frag = frag_it->second;
+ const auto RSk = ref_frag.RSk();
+ const auto RSz = ref_frag.RSz();
+ const auto Plen = ref_frag.Plen();
+
+ if ( ref_frag.FEC() )
+ {
+ const uint32_t cmax = (_Fcount*Plen) / (RSk+48);
+
+ // Keep track of erasures (missing fragments) for
+ // every chunk
+ map<int, vector<int> > erasures;
+
+
+ // Assemble fragments into a RS block, immediately
+ // deinterleaving it.
+ vector<uint8_t> rs_block(Plen * _Fcount);
+ for (size_t j = 0; j < _Fcount; j++) {
+ const bool fragment_present = _fragments.count(j);
+ if (fragment_present) {
+ const auto& fragment = _fragments.at(j).payload();
+
+ if (j != _Fcount - 1 and fragment.size() != Plen) {
+ throw runtime_error("Incorrect fragment length " +
+ to_string(fragment.size()) + " " +
+ to_string(Plen));
+ }
+
+ if (j == _Fcount - 1 and fragment.size() > Plen) {
+ throw runtime_error("Incorrect last fragment length " +
+ to_string(fragment.size()) + " " +
+ to_string(Plen));
+ }
+
+ size_t k = 0;
+ for (; k < fragment.size(); k++) {
+ rs_block[k * _Fcount + j] = fragment[k];
+ }
+
+ for (; k < Plen; k++) {
+ rs_block[k * _Fcount + j] = 0x00;
+ }
+ }
+ else {
+ // fill with zeros if fragment is missing
+ for (size_t k = 0; k < Plen; k++) {
+ rs_block[k * _Fcount + j] = 0x00;
+
+ const size_t chunk_ix = (k * _Fcount + j) / (RSk + 48);
+ const size_t chunk_offset = (k * _Fcount + j) % (RSk + 48);
+ erasures[chunk_ix].push_back(chunk_offset);
+ }
+ }
+ }
+
+ // The RS block is a concatenation of chunks of RSk bytes + 48 parity
+ // followed by RSz padding
+
+ FECDecoder fec;
+ for (size_t i = 0; i < cmax; i++) {
+ // We need to pad the chunk ourself
+ vector<uint8_t> chunk(255);
+ const auto& block_begin = rs_block.begin() + (RSk + 48) * i;
+ copy(block_begin, block_begin + RSk, chunk.begin());
+ // bytes between RSk and 207 are 0x00 already
+ copy(block_begin + RSk, block_begin + RSk + 48,
+ chunk.begin() + 207);
+
+ int errors_corrected = -1;
+ if (erasures.count(i)) {
+ errors_corrected = fec.decode(chunk, erasures[i]);
+ }
+ else {
+ errors_corrected = fec.decode(chunk);
+ }
+
+ if (errors_corrected == -1) {
+ _af_packet.clear();
+ return {};
+ }
+
+#if 0
+ if (errors_corrected > 0) {
+ etiLog.log(debug, "Corrected %d errors at ", errors_corrected);
+ for (const auto &index : erasures[i]) {
+ etiLog.log(debug, " %d", index);
+ }
+ etiLog.log(debug, "\n");
+ }
+#endif
+
+ _af_packet.insert(_af_packet.end(), chunk.begin(), chunk.begin() + RSk);
+ }
+
+ _af_packet.resize(_af_packet.size() - RSz);
+ }
+ else {
+ // No FEC: just assemble fragments
+
+ for (size_t j = 0; j < _Fcount; ++j) {
+ const bool fragment_present = _fragments.count(j);
+ if (fragment_present)
+ {
+ const auto& fragment = _fragments.at(j);
+
+ _af_packet.insert(_af_packet.end(),
+ fragment.payload().begin(),
+ fragment.payload().end());
+ }
+ else {
+ throw logic_error("Missing fragment");
+ }
+ }
+ }
+
+ // EDI specific, must have a CRC.
+ if( _af_packet.size() >= 12 ) {
+ ok = checkCRC(_af_packet.data(), _af_packet.size());
+
+ if (not ok) {
+ etiLog.log(debug, "Too many errors to reconstruct AF from %zu/%u"
+ " PFT fragments\n", _fragments.size(), _Fcount);
+ }
+ }
+ }
+
+ if (not ok) {
+ _af_packet.clear();
+ }
+
+ return _af_packet;
+}
+
+std::string AFBuilder::visualise() const
+{
+ stringstream ss;
+ ss << "|";
+ for (size_t i = 0; i < _Fcount; i++) {
+ if (_fragments.count(i)) {
+ ss << ".";
+ }
+ else {
+ ss << " ";
+ }
+ }
+ ss << "| " << AFBuilder::dar_to_string(canAttemptToDecode()) << " " << lifeTime;
+ return ss.str();
+}
+
+void PFT::pushPFTFrag(const Fragment &fragment)
+{
+ // Start decoding the first pseq we receive. In normal
+ // operation without interruptions, the map should
+ // never become empty
+ if (m_afbuilders.empty()) {
+ m_next_pseq = fragment.Pseq();
+ etiLog.log(debug,"Initialise next_pseq to %u\n", m_next_pseq);
+ }
+
+ if (m_afbuilders.count(fragment.Pseq()) == 0) {
+ // The AFBuilder wants to know the lifetime in number of fragments,
+ // we know the delay in number of AF packets. Every AF packet
+ // is cut into Fcount fragments.
+ const size_t lifetime = fragment.Fcount() * m_max_delay;
+
+ // Build the afbuilder in the map in-place
+ m_afbuilders.emplace(std::piecewise_construct,
+ /* key */
+ std::forward_as_tuple(fragment.Pseq()),
+ /* builder */
+ std::forward_as_tuple(fragment.Pseq(), fragment.Fcount(), lifetime));
+ }
+
+ auto& p = m_afbuilders.at(fragment.Pseq());
+ p.pushPFTFrag(fragment);
+
+ if (m_verbose) {
+ etiLog.log(debug, "Got frag %u:%u, afbuilders: ",
+ fragment.Pseq(), fragment.Findex());
+ for (const auto &k : m_afbuilders) {
+ const bool isNextPseq = (m_next_pseq == k.first);
+ etiLog.level(debug) << (isNextPseq ? "->" : " ") <<
+ k.first << " " << k.second.visualise();
+ }
+ }
+}
+
+
+std::vector<uint8_t> PFT::getNextAFPacket()
+{
+ if (m_afbuilders.count(m_next_pseq) == 0) {
+ if (m_afbuilders.size() > m_max_delay) {
+ m_afbuilders.clear();
+ etiLog.level(debug) << " Reinit";
+ }
+
+ return {};
+ }
+
+ auto &builder = m_afbuilders.at(m_next_pseq);
+
+ using dar_t = AFBuilder::decode_attempt_result_t;
+
+ if (builder.canAttemptToDecode() == dar_t::yes) {
+ auto afpacket = builder.extractAF();
+ assert(not afpacket.empty());
+ incrementNextPseq();
+ return afpacket;
+ }
+ else if (builder.canAttemptToDecode() == dar_t::maybe) {
+ if (builder.lifeTime > 0) {
+ builder.lifeTime--;
+ }
+
+ if (builder.lifeTime == 0) {
+ // Attempt Reed-Solomon decoding
+ auto afpacket = builder.extractAF();
+
+ if (afpacket.empty()) {
+ etiLog.log(debug,"pseq %d timed out after RS", m_next_pseq);
+ }
+ incrementNextPseq();
+ return afpacket;
+ }
+ }
+ else {
+ if (builder.lifeTime > 0) {
+ builder.lifeTime--;
+ }
+
+ if (builder.lifeTime == 0) {
+ etiLog.log(debug, "pseq %d timed out\n", m_next_pseq);
+ incrementNextPseq();
+ }
+ }
+
+ return {};
+}
+
+void PFT::setMaxDelay(size_t num_af_packets)
+{
+ m_max_delay = num_af_packets;
+}
+
+void PFT::setVerbose(bool enable)
+{
+ m_verbose = enable;
+}
+
+void PFT::incrementNextPseq()
+{
+ if (m_afbuilders.count(m_next_pseq - NUM_AFBUILDERS_TO_KEEP) > 0) {
+ m_afbuilders.erase(m_next_pseq - NUM_AFBUILDERS_TO_KEEP);
+ }
+
+ m_next_pseq++;
+}
+
+}
+}