From 03967733d70220e2de7af3cdad320aec5c82ede1 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 25 Jun 2019 10:50:23 +0200 Subject: Add more EDI input improvements --- Makefile.am | 1 + lib/Socket.cpp | 97 +++++++- lib/Socket.h | 36 ++- lib/edi/PFT.cpp | 574 ++++++++++++++++++++++++++++++++++++++++++++++ lib/edi/PFT.hpp | 166 ++++++++++++++ lib/edi/STIDecoder.cpp | 191 +++++++++++++++ lib/edi/STIDecoder.hpp | 122 ++++++++++ lib/edi/STIWriter.cpp | 138 +++++++++++ lib/edi/STIWriter.hpp | 84 +++++++ lib/edi/buffer_unpack.hpp | 62 +++++ lib/edi/common.cpp | 300 ++++++++++++++++++++++++ lib/edi/common.hpp | 88 +++++++ src/input/Edi.cpp | 189 +++++++++++++++ src/input/Edi.h | 82 +++++++ 14 files changed, 2117 insertions(+), 13 deletions(-) create mode 100644 lib/edi/PFT.cpp create mode 100644 lib/edi/PFT.hpp create mode 100644 lib/edi/STIDecoder.cpp create mode 100644 lib/edi/STIDecoder.hpp create mode 100644 lib/edi/STIWriter.cpp create mode 100644 lib/edi/STIWriter.hpp create mode 100644 lib/edi/buffer_unpack.hpp create mode 100644 lib/edi/common.cpp create mode 100644 lib/edi/common.hpp create mode 100644 src/input/Edi.cpp create mode 100644 src/input/Edi.h diff --git a/Makefile.am b/Makefile.am index 80d24e0..6e5aa71 100644 --- a/Makefile.am +++ b/Makefile.am @@ -168,6 +168,7 @@ odr_dabmux_SOURCES =src/DabMux.cpp \ lib/edi/PFT.h \ lib/edi/common.cpp \ lib/edi/common.h \ + lib/edi/buffer_unpack.hpp \ lib/ReedSolomon.h \ lib/ReedSolomon.cpp \ lib/Socket.h \ diff --git a/lib/Socket.cpp b/lib/Socket.cpp index 9b404eb..cd70a8e 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -274,6 +274,8 @@ void UDPSocket::setMulticastTTL(int ttl) } } +UDPReceiver::UDPReceiver() { } + UDPReceiver::~UDPReceiver() { m_stop = true; m_sock.close(); @@ -355,7 +357,7 @@ TCPSocket::~TCPSocket() TCPSocket::TCPSocket(TCPSocket&& other) : m_sock(other.m_sock), - m_remote_address(other.m_remote_address) + m_remote_address(move(other.m_remote_address)) { if (other.m_sock != -1) { other.m_sock = -1; @@ -364,9 +366,9 @@ TCPSocket::TCPSocket(TCPSocket&& other) : TCPSocket& TCPSocket::operator=(TCPSocket&& other) { - m_sock = other.m_sock; - m_remote_address = other.m_remote_address; + swap(m_remote_address, other.m_remote_address); + m_sock = other.m_sock; if (other.m_sock != -1) { other.m_sock = -1; } @@ -487,14 +489,21 @@ void TCPSocket::listen(int port, const string& name) freeaddrinfo(result); + if (m_sock != INVALID_SOCKET) { #if defined(HAVE_SO_NOSIGPIPE) - int val = 1; - if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, - &val, sizeof(val)) < 0) { - throw std::runtime_error("Can't set SO_NOSIGPIPE"); - } + int val = 1; + if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, + &val, sizeof(val)) < 0) { + throw std::runtime_error("Can't set SO_NOSIGPIPE"); + } #endif + int ret = ::listen(m_sock, 0); + if (ret == -1) { + throw std::runtime_error(string("Could not listen: ") + strerror(errno)); + } + } + if (rp == nullptr) { throw runtime_error("Could not bind"); } @@ -814,4 +823,76 @@ void TCPDataDispatcher::process() } } +TCPReceiveServer::TCPReceiveServer(size_t blocksize) : + m_blocksize(blocksize) +{ +} + +void TCPReceiveServer::start(int listen_port, const std::string& address) +{ + m_listener_socket.listen(listen_port, address); + + m_running = true; + m_listener_thread = std::thread(&TCPReceiveServer::process, this); +} + +TCPReceiveServer::~TCPReceiveServer() +{ + m_running = false; + if (m_listener_thread.joinable()) { + m_listener_thread.join(); + } +} + +vector TCPReceiveServer::receive() +{ + vector buffer; + m_queue.try_pop(buffer); + + // we can ignore try_pop()'s return value, because + // if it is unsuccessful the buffer is not touched. + return buffer; +} + +void TCPReceiveServer::process() +{ + constexpr int timeout_ms = 1000; + constexpr int disconnect_timeout_ms = 10000; + constexpr int max_num_timeouts = disconnect_timeout_ms / timeout_ms; + + while (m_running) { + auto sock = m_listener_socket.accept(timeout_ms); + + int num_timeouts = 0; + + while (m_running and sock.valid()) { + try { + vector buf(m_blocksize); + ssize_t r = sock.recv(buf.data(), buf.size(), 0, timeout_ms); + if (r < 0) { + throw logic_error("Invalid recv return value"); + } + else if (r == 0) { + sock.close(); + break; + } + else { + buf.resize(r); + m_queue.push(move(buf)); + } + } + catch (const TCPSocket::Interrupted&) { + break; + } + catch (const TCPSocket::Timeout&) { + num_timeouts++; + } + + if (num_timeouts > max_num_timeouts) { + sock.close(); + } + } + } +} + } diff --git a/lib/Socket.h b/lib/Socket.h index 2393584..8bb7fe1 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -127,7 +127,7 @@ class UDPSocket /* Threaded UDP receiver */ class UDPReceiver { public: - UDPReceiver() : m_port(0), m_thread(), m_stop(false), m_packets() {} + UDPReceiver(); ~UDPReceiver(); UDPReceiver(const UDPReceiver&) = delete; UDPReceiver operator=(const UDPReceiver&) = delete; @@ -142,12 +142,12 @@ class UDPReceiver { private: void m_run(void); - int m_port; + int m_port = 0; std::string m_bindto; std::string m_mcastaddr; - size_t m_max_packets_queued; + size_t m_max_packets_queued = 1; std::thread m_thread; - std::atomic m_stop; + std::atomic m_stop = ATOMIC_VAR_INIT(false); ThreadsafeQueue m_packets; UDPSocket m_sock; }; @@ -254,7 +254,7 @@ class TCPDataDispatcher void write(const std::vector& data); private: - void process(void); + void process(); size_t m_max_queue_size; @@ -265,4 +265,30 @@ class TCPDataDispatcher std::list m_connections; }; +/* A TCP Server to receive data, which abstracts the handling of connects and disconnects. + */ +class TCPReceiveServer { + public: + TCPReceiveServer(size_t blocksize); + ~TCPReceiveServer(); + TCPReceiveServer(const TCPReceiveServer&) = delete; + TCPReceiveServer& operator=(const TCPReceiveServer&) = delete; + + void start(int listen_port, const std::string& address); + + // Return a vector that contains up to blocksize bytes of data, or + // and empty vector if no data is available. + std::vector receive(); + + private: + void process(); + + size_t m_blocksize = 0; + ThreadsafeQueue > m_queue; + std::atomic m_running; + std::string m_exception_data; + std::thread m_listener_thread; + TCPSocket m_listener_socket; +}; + } diff --git a/lib/edi/PFT.cpp b/lib/edi/PFT.cpp new file mode 100644 index 0000000..aff7929 --- /dev/null +++ b/lib/edi/PFT.cpp @@ -0,0 +1,574 @@ +/* ------------------------------------------------------------------ + * Copyright (C) 2017 AVT GmbH - Fabien Vercasson + * Copyright (C) 2017 Matthias P. Braendli + * matthias.braendli@mpb.li + * + * http://opendigitalradio.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the License for the specific language governing permissions + * and limitations under the License. + * ------------------------------------------------------------------- + */ + +#include +#include +#include +#include +#include +#include +#include "crc.h" +#include "PFT.hpp" +#include "Log.h" +#include "buffer_unpack.hpp" +extern "C" { +#include "fec/fec.h" +} + +namespace EdiDecoder { +namespace PFT { + +using namespace std; + +const findex_t NUM_AFBUILDERS_TO_KEEP = 10; + +static bool checkCRC(const uint8_t *buf, size_t size) +{ + const uint16_t crc_from_packet = read_16b(buf + size - 2); + uint16_t crc_calc = 0xffff; + crc_calc = crc16(crc_calc, buf, size - 2); + crc_calc ^= 0xffff; + + return crc_from_packet == crc_calc; +} + +class FECDecoder { + public: + FECDecoder() { + m_rs_handler = init_rs_char( + symsize, gfPoly, firstRoot, primElem, nroots, pad); + } + FECDecoder(const FECDecoder& other) = delete; + FECDecoder& operator=(const FECDecoder& other) = delete; + ~FECDecoder() { + free_rs_char(m_rs_handler); + } + + // return -1 in case of failure, non-negative value if errors + // were corrected. + // Known positions of erasures should be given in eras_pos to + // improve decoding probability. After calling this function + // eras_pos will contain the positions of the corrected errors. + int decode(vector &data, vector &eras_pos) { + assert(data.size() == N); + const size_t no_eras = eras_pos.size(); + + eras_pos.resize(nroots); + int num_err = decode_rs_char(m_rs_handler, data.data(), + eras_pos.data(), no_eras); + if (num_err > 0) { + eras_pos.resize(num_err); + } + return num_err; + } + + // return -1 in case of failure, non-negative value if errors + // were corrected. No known erasures. + int decode(vector &data) { + assert(data.size() == N); + int num_err = decode_rs_char(m_rs_handler, data.data(), nullptr, 0); + return num_err; + } + + private: + void* m_rs_handler; + + const int firstRoot = 1; // Discovered by analysing EDI dump + const int gfPoly = 0x11d; + + // The encoding has to be 255, 207 always, because the chunk has to + // be padded at the end, and not at the beginning as libfec would + // do + const size_t N = 255; + const size_t K = 207; + const int primElem = 1; + const int symsize = 8; + const size_t nroots = N - K; // For EDI PFT, this must be 48 + const size_t pad = ((1 << symsize) - 1) - N; // is 255-N + +}; + +size_t Fragment::loadData(const std::vector &buf) +{ + const size_t header_len = 14; + if (buf.size() < header_len) { + return 0; + } + + size_t index = 0; + + // Parse PFT Fragment Header (ETSI TS 102 821 V1.4.1 ch7.1) + if (not (buf[0] == 'P' and buf[1] == 'F') ) { + throw invalid_argument("Invalid PFT SYNC bytes"); + } + index += 2; // Psync + + _Pseq = read_16b(buf.begin()+index); index += 2; + _Findex = read_24b(buf.begin()+index); index += 3; + _Fcount = read_24b(buf.begin()+index); index += 3; + _FEC = unpack1bit(buf[index], 0); + _Addr = unpack1bit(buf[index], 1); + _Plen = read_16b(buf.begin()+index) & 0x3FFF; index += 2; + + const size_t required_len = header_len + + (_FEC ? 1 : 0) + + (_Addr ? 2 : 0) + + 2; // CRC + if (buf.size() < required_len) { + return 0; + } + + // Optional RS Header + _RSk = 0; + _RSz = 0; + if (_FEC) { + _RSk = buf[index]; index += 1; + _RSz = buf[index]; index += 1; + } + + // Optional transport header + _Source = 0; + _Dest = 0; + if (_Addr) { + _Source = read_16b(buf.begin()+index); index += 2; + _Dest = read_16b(buf.begin()+index); index += 2; + } + + index += 2; + const bool crc_valid = checkCRC(buf.data(), index); + const bool buf_has_enough_data = (buf.size() >= index + _Plen); + + if (not buf_has_enough_data) { + return 0; + } + + _valid = ((not _FEC) or crc_valid) and buf_has_enough_data; + +#if 0 + if (!_valid) { + stringstream ss; + ss << "Invalid PF fragment: "; + if (_FEC) { + ss << " RSk=" << (uint32_t)_RSk << " RSz=" << (uint32_t)_RSz; + } + + if (_Addr) { + ss << " Source=" << _Source << " Dest=" << _Dest; + } + etiLog.log(debug, "%s\n", ss.str().c_str()); + } +#endif + + _payload.clear(); + if (_valid) { + copy( buf.begin()+index, + buf.begin()+index+_Plen, + back_inserter(_payload)); + index += _Plen; + } + + return index; +} + + +AFBuilder::AFBuilder(pseq_t Pseq, findex_t Fcount, size_t lifetime) +{ + _Pseq = Pseq; + _Fcount = Fcount; + assert(lifetime > 0); + lifeTime = lifetime; +} + +void AFBuilder::pushPFTFrag(const Fragment &frag) +{ + if (_Pseq != frag.Pseq() or _Fcount != frag.Fcount()) { + throw invalid_argument("Invalid PFT fragment Pseq or Fcount"); + } + const auto Findex = frag.Findex(); + const bool fragment_already_received = _fragments.count(Findex); + + if (not fragment_already_received) + { + _fragments[Findex] = frag; + } +} + +bool Fragment::checkConsistency(const Fragment& other) const +{ + /* Consistency check, TS 102 821 Clause 7.3.2. + * + * Every PFT Fragment produced from a single AF or RS Packet shall have + * the same values in all of the PFT Header fields except for the Findex, + * Plen and HCRC fields. + */ + + return other._Fcount == _Fcount and + other._FEC == _FEC and + other._RSk == _RSk and + other._RSz == _RSz and + other._Addr == _Addr and + other._Source == _Source and + other._Dest == _Dest and + + /* The Plen field of all fragments shall be the s for the initial f-1 + * fragments and s - (L%f) for the final fragment. + * Note that when Reed Solomon has been used, all fragments will be of + * length s. + */ + (_FEC ? other._Plen == _Plen : true); +} + + +AFBuilder::decode_attempt_result_t AFBuilder::canAttemptToDecode() const +{ + if (_fragments.empty()) { + return AFBuilder::decode_attempt_result_t::no; + } + + if (_fragments.size() == _Fcount) { + return AFBuilder::decode_attempt_result_t::yes; + } + + /* Check that all fragments are consistent */ + const Fragment& first = _fragments.begin()->second; + if (not std::all_of(_fragments.begin(), _fragments.end(), + [&](const pair& pair) { + const Fragment& frag = pair.second; + return first.checkConsistency(frag) and _Pseq == frag.Pseq(); + }) ) { + throw invalid_argument("Inconsistent PFT fragments"); + } + + // Calculate the minimum number of fragments necessary to apply FEC. + // This can't be done with the last fragment that may have a + // smaller size + // ETSI TS 102 821 V1.4.1 ch 7.4.4 + auto frag_it = _fragments.begin(); + if (frag_it->second.Fcount() == _Fcount - 1) { + frag_it++; + + if (frag_it == _fragments.end()) { + return AFBuilder::decode_attempt_result_t::no; + } + } + + const Fragment& frag = frag_it->second; + + if ( frag.FEC() ) + { + const uint16_t _Plen = frag.Plen(); + + /* max number of RS chunks that may have been sent */ + const uint32_t _cmax = (_Fcount*_Plen) / (frag.RSk()+48); + assert(_cmax > 0); + + /* Receiving _rxmin fragments does not guarantee that decoding + * will succeed! */ + const uint32_t _rxmin = _Fcount - (_cmax*48)/_Plen; + + if (_fragments.size() >= _rxmin) { + return AFBuilder::decode_attempt_result_t::maybe; + } + } + + return AFBuilder::decode_attempt_result_t::no; +} + +std::vector AFBuilder::extractAF() const +{ + if (not _af_packet.empty()) { + return _af_packet; + } + + bool ok = false; + + if (canAttemptToDecode() != AFBuilder::decode_attempt_result_t::no) { + + auto frag_it = _fragments.begin(); + if (frag_it->second.Fcount() == _Fcount - 1) { + frag_it++; + + if (frag_it == _fragments.end()) { + throw std::runtime_error("Invalid attempt at extracting AF"); + } + } + + const Fragment& ref_frag = frag_it->second; + const auto RSk = ref_frag.RSk(); + const auto RSz = ref_frag.RSz(); + const auto Plen = ref_frag.Plen(); + + if ( ref_frag.FEC() ) + { + const uint32_t cmax = (_Fcount*Plen) / (RSk+48); + + // Keep track of erasures (missing fragments) for + // every chunk + map > erasures; + + + // Assemble fragments into a RS block, immediately + // deinterleaving it. + vector rs_block(Plen * _Fcount); + for (size_t j = 0; j < _Fcount; j++) { + const bool fragment_present = _fragments.count(j); + if (fragment_present) { + const auto& fragment = _fragments.at(j).payload(); + + if (j != _Fcount - 1 and fragment.size() != Plen) { + throw runtime_error("Incorrect fragment length " + + to_string(fragment.size()) + " " + + to_string(Plen)); + } + + if (j == _Fcount - 1 and fragment.size() > Plen) { + throw runtime_error("Incorrect last fragment length " + + to_string(fragment.size()) + " " + + to_string(Plen)); + } + + size_t k = 0; + for (; k < fragment.size(); k++) { + rs_block[k * _Fcount + j] = fragment[k]; + } + + for (; k < Plen; k++) { + rs_block[k * _Fcount + j] = 0x00; + } + } + else { + // fill with zeros if fragment is missing + for (size_t k = 0; k < Plen; k++) { + rs_block[k * _Fcount + j] = 0x00; + + const size_t chunk_ix = (k * _Fcount + j) / (RSk + 48); + const size_t chunk_offset = (k * _Fcount + j) % (RSk + 48); + erasures[chunk_ix].push_back(chunk_offset); + } + } + } + + // The RS block is a concatenation of chunks of RSk bytes + 48 parity + // followed by RSz padding + + FECDecoder fec; + for (size_t i = 0; i < cmax; i++) { + // We need to pad the chunk ourself + vector chunk(255); + const auto& block_begin = rs_block.begin() + (RSk + 48) * i; + copy(block_begin, block_begin + RSk, chunk.begin()); + // bytes between RSk and 207 are 0x00 already + copy(block_begin + RSk, block_begin + RSk + 48, + chunk.begin() + 207); + + int errors_corrected = -1; + if (erasures.count(i)) { + errors_corrected = fec.decode(chunk, erasures[i]); + } + else { + errors_corrected = fec.decode(chunk); + } + + if (errors_corrected == -1) { + _af_packet.clear(); + return {}; + } + +#if 0 + if (errors_corrected > 0) { + etiLog.log(debug, "Corrected %d errors at ", errors_corrected); + for (const auto &index : erasures[i]) { + etiLog.log(debug, " %d", index); + } + etiLog.log(debug, "\n"); + } +#endif + + _af_packet.insert(_af_packet.end(), chunk.begin(), chunk.begin() + RSk); + } + + _af_packet.resize(_af_packet.size() - RSz); + } + else { + // No FEC: just assemble fragments + + for (size_t j = 0; j < _Fcount; ++j) { + const bool fragment_present = _fragments.count(j); + if (fragment_present) + { + const auto& fragment = _fragments.at(j); + + _af_packet.insert(_af_packet.end(), + fragment.payload().begin(), + fragment.payload().end()); + } + else { + throw logic_error("Missing fragment"); + } + } + } + + // EDI specific, must have a CRC. + if( _af_packet.size() >= 12 ) { + ok = checkCRC(_af_packet.data(), _af_packet.size()); + + if (not ok) { + etiLog.log(debug, "Too many errors to reconstruct AF from %zu/%u" + " PFT fragments\n", _fragments.size(), _Fcount); + } + } + } + + if (not ok) { + _af_packet.clear(); + } + + return _af_packet; +} + +std::string AFBuilder::visualise() const +{ + stringstream ss; + ss << "|"; + for (size_t i = 0; i < _Fcount; i++) { + if (_fragments.count(i)) { + ss << "."; + } + else { + ss << " "; + } + } + ss << "| " << AFBuilder::dar_to_string(canAttemptToDecode()) << " " << lifeTime; + return ss.str(); +} + +void PFT::pushPFTFrag(const Fragment &fragment) +{ + // Start decoding the first pseq we receive. In normal + // operation without interruptions, the map should + // never become empty + if (m_afbuilders.empty()) { + m_next_pseq = fragment.Pseq(); + etiLog.log(debug,"Initialise next_pseq to %u\n", m_next_pseq); + } + + if (m_afbuilders.count(fragment.Pseq()) == 0) { + // The AFBuilder wants to know the lifetime in number of fragments, + // we know the delay in number of AF packets. Every AF packet + // is cut into Fcount fragments. + const size_t lifetime = fragment.Fcount() * m_max_delay; + + // Build the afbuilder in the map in-place + m_afbuilders.emplace(std::piecewise_construct, + /* key */ + std::forward_as_tuple(fragment.Pseq()), + /* builder */ + std::forward_as_tuple(fragment.Pseq(), fragment.Fcount(), lifetime)); + } + + auto& p = m_afbuilders.at(fragment.Pseq()); + p.pushPFTFrag(fragment); + + if (m_verbose) { + etiLog.log(debug, "Got frag %u:%u, afbuilders: ", + fragment.Pseq(), fragment.Findex()); + for (const auto &k : m_afbuilders) { + const bool isNextPseq = (m_next_pseq == k.first); + etiLog.level(debug) << (isNextPseq ? "->" : " ") << + k.first << " " << k.second.visualise(); + } + } +} + + +std::vector PFT::getNextAFPacket() +{ + if (m_afbuilders.count(m_next_pseq) == 0) { + if (m_afbuilders.size() > m_max_delay) { + m_afbuilders.clear(); + etiLog.level(debug) << " Reinit"; + } + + return {}; + } + + auto &builder = m_afbuilders.at(m_next_pseq); + + using dar_t = AFBuilder::decode_attempt_result_t; + + if (builder.canAttemptToDecode() == dar_t::yes) { + auto afpacket = builder.extractAF(); + assert(not afpacket.empty()); + incrementNextPseq(); + return afpacket; + } + else if (builder.canAttemptToDecode() == dar_t::maybe) { + if (builder.lifeTime > 0) { + builder.lifeTime--; + } + + if (builder.lifeTime == 0) { + // Attempt Reed-Solomon decoding + auto afpacket = builder.extractAF(); + + if (afpacket.empty()) { + etiLog.log(debug,"pseq %d timed out after RS", m_next_pseq); + } + incrementNextPseq(); + return afpacket; + } + } + else { + if (builder.lifeTime > 0) { + builder.lifeTime--; + } + + if (builder.lifeTime == 0) { + etiLog.log(debug, "pseq %d timed out\n", m_next_pseq); + incrementNextPseq(); + } + } + + return {}; +} + +void PFT::setMaxDelay(size_t num_af_packets) +{ + m_max_delay = num_af_packets; +} + +void PFT::setVerbose(bool enable) +{ + m_verbose = enable; +} + +void PFT::incrementNextPseq() +{ + if (m_afbuilders.count(m_next_pseq - NUM_AFBUILDERS_TO_KEEP) > 0) { + m_afbuilders.erase(m_next_pseq - NUM_AFBUILDERS_TO_KEEP); + } + + m_next_pseq++; +} + +} +} diff --git a/lib/edi/PFT.hpp b/lib/edi/PFT.hpp new file mode 100644 index 0000000..779509b --- /dev/null +++ b/lib/edi/PFT.hpp @@ -0,0 +1,166 @@ +/* ------------------------------------------------------------------ + * Copyright (C) 2017 AVT GmbH - Fabien Vercasson + * Copyright (C) 2017 Matthias P. Braendli + * matthias.braendli@mpb.li + * + * http://opendigitalradio.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the License for the specific language governing permissions + * and limitations under the License. + * ------------------------------------------------------------------- + */ + +#pragma once +#include +#include +#include +#include + +namespace EdiDecoder { +namespace PFT { + +using pseq_t = uint16_t; +using findex_t = uint32_t; // findex is a 24-bit value + +class Fragment +{ + public: + // Load the data for one fragment from buf into + // the Fragment. + // \returns the number of bytes of useful data found in buf + // A non-zero return value doesn't imply a valid fragment + // the isValid() method must be used to verify this. + size_t loadData(const std::vector &buf); + + bool isValid() const { return _valid; } + pseq_t Pseq() const { return _Pseq; } + findex_t Findex() const { return _Findex; } + findex_t Fcount() const { return _Fcount; } + bool FEC() const { return _FEC; } + uint16_t Plen() const { return _Plen; } + uint8_t RSk() const { return _RSk; } + uint8_t RSz() const { return _RSz; } + const std::vector& payload() const + { return _payload; } + + bool checkConsistency(const Fragment& other) const; + + private: + std::vector _payload; + + pseq_t _Pseq = 0; + findex_t _Findex = 0; + findex_t _Fcount = 0; + bool _FEC = false; + bool _Addr = false; + uint16_t _Plen = 0; + uint8_t _RSk = 0; + uint8_t _RSz = 0; + uint16_t _Source = 0; + uint16_t _Dest = 0; + bool _valid = false; +}; + +/* The AFBuilder collects Fragments and builds an Application Frame + * out of them. It does error correction if necessary + */ +class AFBuilder +{ + public: + enum class decode_attempt_result_t { + yes, // The AF packet can be build because all fragments are present + maybe, // RS decoding may correctly decode the AF packet + no, // Not enough fragments present to permit RS + }; + + static std::string dar_to_string(decode_attempt_result_t dar) { + switch (dar) { + case decode_attempt_result_t::yes: return "y"; + case decode_attempt_result_t::no: return "n"; + case decode_attempt_result_t::maybe: return "m"; + } + return "?"; + } + + AFBuilder(pseq_t Pseq, findex_t Fcount, size_t lifetime); + + void pushPFTFrag(const Fragment &frag); + + /* Assess if it may be possible to decode this AF packet */ + decode_attempt_result_t canAttemptToDecode() const; + + /* Try to build the AF with received fragments. + * Apply error correction if necessary (missing packets/CRC errors) + * \return an empty vector if building the AF is not possible + */ + std::vector extractAF(void) const; + + std::pair + numberOfFragments(void) const { + return {_fragments.size(), _Fcount}; + } + + std::string visualise(void) const; + + /* The user of this instance can keep track of the lifetime of this + * builder + */ + size_t lifeTime; + + private: + + // A map from fragment index to fragment + std::map _fragments; + + // cached version of decoded AF packet + mutable std::vector _af_packet; + + pseq_t _Pseq; + findex_t _Fcount; +}; + +class PFT +{ + public: + void pushPFTFrag(const Fragment &fragment); + + /* Try to build the AF packet for the next pseq. This might + * skip one or more pseq according to the maximum delay setting. + * + * \return an empty vector if building the AF is not possible + */ + std::vector getNextAFPacket(void); + + /* Set the maximum delay in number of AF Packets before we + * abandon decoding a given pseq. + */ + void setMaxDelay(size_t num_af_packets); + + /* Enable verbose fprintf */ + void setVerbose(bool enable); + + private: + void incrementNextPseq(void); + + pseq_t m_next_pseq; + size_t m_max_delay = 10; // in AF packets + + // Keep one AFBuilder for each Pseq + std::map m_afbuilders; + + bool m_verbose = 0; +}; + +} + +} diff --git a/lib/edi/STIDecoder.cpp b/lib/edi/STIDecoder.cpp new file mode 100644 index 0000000..ca8cead --- /dev/null +++ b/lib/edi/STIDecoder.cpp @@ -0,0 +1,191 @@ +/* + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#include "STIDecoder.hpp" +#include "buffer_unpack.hpp" +#include "crc.h" +#include "Log.h" +#include +#include +#include + +namespace EdiDecoder { + +using namespace std; + +STIDecoder::STIDecoder(STIDataCollector& data_collector, bool verbose) : + m_data_collector(data_collector), + m_dispatcher(std::bind(&STIDecoder::packet_completed, this), verbose) +{ + using std::placeholders::_1; + using std::placeholders::_2; + m_dispatcher.register_tag("*ptr", + std::bind(&STIDecoder::decode_starptr, this, _1, _2)); + m_dispatcher.register_tag("dsti", + std::bind(&STIDecoder::decode_dsti, this, _1, _2)); + m_dispatcher.register_tag("ss", + std::bind(&STIDecoder::decode_ssn, this, _1, _2)); + m_dispatcher.register_tag("*dmy", + std::bind(&STIDecoder::decode_stardmy, this, _1, _2)); +} + +void STIDecoder::push_bytes(const vector &buf) +{ + m_dispatcher.push_bytes(buf); +} + +void STIDecoder::push_packet(const vector &buf) +{ + m_dispatcher.push_packet(buf); +} + +void STIDecoder::setMaxDelay(int num_af_packets) +{ + m_dispatcher.setMaxDelay(num_af_packets); +} + +#define AFPACKET_HEADER_LEN 10 // includes SYNC + +bool STIDecoder::decode_starptr(const vector &value, uint16_t) +{ + if (value.size() != 0x40 / 8) { + etiLog.log(warn, "Incorrect length %02lx for *PTR", value.size()); + return false; + } + + char protocol_sz[5]; + protocol_sz[4] = '\0'; + copy(value.begin(), value.begin() + 4, protocol_sz); + string protocol(protocol_sz); + + uint16_t major = read_16b(value.begin() + 4); + uint16_t minor = read_16b(value.begin() + 6); + + m_data_collector.update_protocol(protocol, major, minor); + + return true; +} + +bool STIDecoder::decode_dsti(const vector &value, uint16_t) +{ + size_t offset = 0; + + const uint16_t dstiHeader = read_16b(value.begin() + offset); + offset += 2; + + sti_management_data md; + + md.stihf = (dstiHeader >> 15) & 0x1; + md.atstf = (dstiHeader >> 14) & 0x1; + md.rfadf = (dstiHeader >> 13) & 0x1; + uint8_t dfcth = (dstiHeader >> 8) & 0x1F; + uint8_t dfctl = dstiHeader & 0xFF; + + md.dflc = dfcth * 250 + dfctl; // modulo 5000 counter + + const size_t expected_length = 2 + + (md.stihf ? 3 : 0) + + (md.atstf ? 1 + 4 + 3 : 0) + + (md.rfadf ? 9 : 0); + + if (value.size() != expected_length) { + throw std::logic_error("EDI dsti: Assertion error:" + "value.size() != expected_length: " + + to_string(value.size()) + " " + + to_string(expected_length)); + } + + if (md.stihf) { + const uint8_t stat = value[offset++]; + const uint16_t spid = read_16b(value.begin() + offset); + m_data_collector.update_stat(stat, spid); + offset += 2; + } + + if (md.atstf) { + uint8_t utco = value[offset]; + offset++; + + uint32_t seconds = read_32b(value.begin() + offset); + offset += 4; + + m_data_collector.update_edi_time(utco, seconds); + + md.tsta = read_24b(value.begin() + offset); + offset += 3; + } + else { + // Null timestamp, ETSI ETS 300 799, C.2.2 + md.tsta = 0xFFFFFF; + } + + + if (md.rfadf) { + std::array rfad; + copy(value.cbegin() + offset, + value.cbegin() + offset + 9, + rfad.begin()); + offset += 9; + + m_data_collector.update_rfad(rfad); + } + + m_data_collector.update_sti_management(md); + + return true; +} + +bool STIDecoder::decode_ssn(const vector &value, uint16_t n) +{ + sti_payload_data sti; + + sti.stream_index = n - 1; // n is 1-indexed + sti.rfa = value[0] >> 3; + sti.tid = value[0] & 0x07; + + uint16_t istc = read_24b(value.begin() + 1); + sti.tidext = istc >> 13; + sti.crcstf = (istc >> 12) & 0x1; + sti.stid = istc & 0xFFF; + + if (sti.rfa != 0) { + etiLog.level(warn) << "EDI: rfa field in SSnn tag non-null"; + } + + copy( value.cbegin() + 3, + value.cend(), + back_inserter(sti.istd)); + + m_data_collector.add_payload(move(sti)); + + return true; +} + +bool STIDecoder::decode_stardmy(const vector& /*value*/, uint16_t) +{ + return true; +} + +void STIDecoder::packet_completed() +{ + m_data_collector.assemble(); +} + +} diff --git a/lib/edi/STIDecoder.hpp b/lib/edi/STIDecoder.hpp new file mode 100644 index 0000000..201a176 --- /dev/null +++ b/lib/edi/STIDecoder.hpp @@ -0,0 +1,122 @@ +/* + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#pragma once + +#include "common.hpp" +#include +#include +#include +#include + +namespace EdiDecoder { + +// Information for STI-D Management +struct sti_management_data { + bool stihf; + bool atstf; + bool rfadf; + uint16_t dflc; + + uint32_t tsta; +}; + +// Information for a subchannel available in EDI +struct sti_payload_data { + uint16_t stream_index; + uint8_t rfa; + uint8_t tid; + uint8_t tidext; + bool crcstf; + uint16_t stid; + std::vector istd; + + // Return the length of ISTD in bytes + uint16_t stl(void) const { return istd.size(); } +}; + +/* A class that receives STI data must implement the interface described + * in the STIDataCollector. This can be e.g. a converter to ETI, or something that + * prepares data structures for a modulator. + */ +class STIDataCollector { + public: + // Tell the ETIWriter what EDI protocol we receive in *ptr. + // This is not part of the ETI data, but is used as check + virtual void update_protocol( + const std::string& proto, + uint16_t major, + uint16_t minor) = 0; + + // STAT error field and service provider ID + virtual void update_stat(uint8_t stat, uint16_t spid) = 0; + + // In addition to TSTA in ETI, EDI also transports more time + // stamp information. + virtual void update_edi_time(uint32_t utco, uint32_t seconds) = 0; + + virtual void update_rfad(std::array rfad) = 0; + virtual void update_sti_management(const sti_management_data& data) = 0; + + virtual void add_payload(sti_payload_data&& payload) = 0; + + virtual void assemble() = 0; +}; + +/* The STIDecoder takes care of decoding the EDI TAGs related to the transport + * of ETI(NI) data inside AF and PF packets. + * + * PF packets are handed over to the PFT decoder, which will in turn return + * AF packets. AF packets are directly handled (TAG extraction) here. + */ +class STIDecoder { + public: + STIDecoder(STIDataCollector& data_collector, bool verbose); + + /* Push bytes into the decoder. The buf can contain more + * than a single packet. This is useful when reading from streams + * (files, TCP) + */ + void push_bytes(const std::vector &buf); + + /* Push a complete packet into the decoder. Useful for UDP and other + * datagram-oriented protocols. + */ + void push_packet(const std::vector &buf); + + /* Set the maximum delay in number of AF Packets before we + * abandon decoding a given pseq. + */ + void setMaxDelay(int num_af_packets); + + private: + bool decode_starptr(const std::vector &value, uint16_t); + bool decode_dsti(const std::vector &value, uint16_t); + bool decode_ssn(const std::vector &value, uint16_t n); + bool decode_stardmy(const std::vector &value, uint16_t); + + void packet_completed(); + + STIDataCollector& m_data_collector; + TagDispatcher m_dispatcher; + +}; + +} diff --git a/lib/edi/STIWriter.cpp b/lib/edi/STIWriter.cpp new file mode 100644 index 0000000..6964eb1 --- /dev/null +++ b/lib/edi/STIWriter.cpp @@ -0,0 +1,138 @@ +/* + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#include "STIWriter.hpp" +#include "crc.h" +#include "Log.h" +#include +#include +#include +#include +#include +#include +#include +#include + +namespace EdiDecoder { + +using namespace std; + +void STIWriter::update_protocol( + const std::string& proto, + uint16_t major, + uint16_t minor) +{ + m_proto_valid = (proto == "DSTI" and major == 0 and minor == 0); + + if (not m_proto_valid) { + throw std::invalid_argument("Wrong EDI protocol"); + } +} + +void STIWriter::reinit() +{ + m_proto_valid = false; + m_management_data_valid = false; + m_stat_valid = false; + m_time_valid = false; + m_payload_valid = false; + m_stiFrame.frame.clear(); +} + +void STIWriter::update_stat(uint8_t stat, uint16_t spid) +{ + m_stat = stat; + m_spid = spid; + m_stat_valid = true; + + if (m_stat != 0xFF) { + etiLog.log(warn, "STI errorlevel %02x", m_stat); + } +} + +void STIWriter::update_rfad(std::array rfad) +{ + (void)rfad; +} + +void STIWriter::update_sti_management(const sti_management_data& data) +{ + m_management_data = data; + m_management_data_valid = true; +} + +void STIWriter::add_payload(sti_payload_data&& payload) +{ + m_payload = move(payload); + m_payload_valid = true; +} + +void STIWriter::update_edi_time( + uint32_t utco, + uint32_t seconds) +{ + if (not m_proto_valid) { + throw std::logic_error("Cannot update time before protocol"); + } + + m_utco = utco; + m_seconds = seconds; + + // TODO check validity + m_time_valid = true; +} + + +void STIWriter::assemble() +{ + if (not m_proto_valid) { + throw std::logic_error("Cannot assemble STI before protocol"); + } + + if (not m_management_data_valid) { + throw std::logic_error("Cannot assemble STI before management data"); + } + + if (not m_payload_valid) { + throw std::logic_error("Cannot assemble STI without frame data"); + } + + // TODO check time validity + + // Do copies so as to preserve existing payload data + m_stiFrame.frame = m_payload.istd; + m_stiFrame.timestamp.seconds = m_seconds; + m_stiFrame.timestamp.utco = m_utco; +} + +sti_frame_t STIWriter::getFrame() +{ + if (m_stiFrame.frame.empty()) { + return {}; + } + + sti_frame_t sti; + swap(sti, m_stiFrame); + reinit(); + return sti; +} + +} + diff --git a/lib/edi/STIWriter.hpp b/lib/edi/STIWriter.hpp new file mode 100644 index 0000000..a75cb69 --- /dev/null +++ b/lib/edi/STIWriter.hpp @@ -0,0 +1,84 @@ +/* + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#pragma once + +#include "common.hpp" +#include "STIDecoder.hpp" +#include +#include +#include +#include + +namespace EdiDecoder { + +struct sti_frame_t { + std::vector frame; + frame_timestamp_t timestamp; +}; + +class STIWriter : public STIDataCollector { + public: + // Tell the ETIWriter what EDI protocol we receive in *ptr. + // This is not part of the ETI data, but is used as check + virtual void update_protocol( + const std::string& proto, + uint16_t major, + uint16_t minor); + + virtual void update_stat(uint8_t stat, uint16_t spid); + + virtual void update_edi_time( + uint32_t utco, + uint32_t seconds); + + virtual void update_rfad(std::array rfad); + virtual void update_sti_management(const sti_management_data& data); + virtual void add_payload(sti_payload_data&& payload); + + virtual void assemble(void); + + // Return the assembled frame or an empty frame if not ready + sti_frame_t getFrame(); + + private: + void reinit(void); + + bool m_proto_valid = false; + + bool m_management_data_valid = false; + sti_management_data m_management_data; + + bool m_stat_valid = false; + uint8_t m_stat = 0; + uint16_t m_spid = 0; + + bool m_time_valid = false; + uint32_t m_utco = 0; + uint32_t m_seconds = 0; + + bool m_payload_valid = false; + sti_payload_data m_payload; + + sti_frame_t m_stiFrame; +}; + +} + diff --git a/lib/edi/buffer_unpack.hpp b/lib/edi/buffer_unpack.hpp new file mode 100644 index 0000000..05a1534 --- /dev/null +++ b/lib/edi/buffer_unpack.hpp @@ -0,0 +1,62 @@ +/* + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once +#include + +namespace EdiDecoder { + +template +uint16_t read_16b(T buf) +{ + uint16_t value = 0; + value = (uint16_t)(buf[0]) << 8; + value |= (uint16_t)(buf[1]); + return value; +} + +template +uint32_t read_24b(T buf) +{ + uint32_t value = 0; + value = (uint32_t)(buf[0]) << 16; + value |= (uint32_t)(buf[1]) << 8; + value |= (uint32_t)(buf[2]); + return value; +} + +template +uint32_t read_32b(T buf) +{ + uint32_t value = 0; + value = (uint32_t)(buf[0]) << 24; + value |= (uint32_t)(buf[1]) << 16; + value |= (uint32_t)(buf[2]) << 8; + value |= (uint32_t)(buf[3]); + return value; +} + +inline uint32_t unpack1bit(uint8_t byte, int bitpos) +{ + return (byte & 1 << (7-bitpos)) > (7-bitpos); +} + +} diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp new file mode 100644 index 0000000..bc0fa1b --- /dev/null +++ b/lib/edi/common.cpp @@ -0,0 +1,300 @@ +/* + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#include "common.hpp" +#include "buffer_unpack.hpp" +#include "Log.h" +#include "crc.h" +#include +#include +#include +#include + +namespace EdiDecoder { + +using namespace std; + +string frame_timestamp_t::to_string() const +{ + const time_t seconds_in_unix_epoch = to_unix_epoch(); + + stringstream ss; + ss << "Timestamp: " << std::put_time(std::gmtime(&seconds_in_unix_epoch), "%c %Z"); + return ss.str(); +} + +time_t frame_timestamp_t::to_unix_epoch() const +{ + // EDI epoch: 2000-01-01T00:00:00Z + // Convert using + // TZ=UTC python -c 'import datetime; print(datetime.datetime(2000,1,1,0,0,0,0).strftime("%s"))' + return 946684800 + seconds - utco; +} + + +TagDispatcher::TagDispatcher( + std::function&& af_packet_completed, bool verbose) : + m_af_packet_completed(move(af_packet_completed)) +{ + m_pft.setVerbose(verbose); +} + +void TagDispatcher::push_bytes(const vector &buf) +{ + copy(buf.begin(), buf.end(), back_inserter(m_input_data)); + + while (m_input_data.size() > 2) { + if (m_input_data[0] == 'A' and m_input_data[1] == 'F') { + const decode_state_t st = decode_afpacket(m_input_data); + + if (st.num_bytes_consumed == 0 and not st.complete) { + // We need to refill our buffer + break; + } + + if (st.num_bytes_consumed) { + vector remaining_data; + copy(m_input_data.begin() + st.num_bytes_consumed, + m_input_data.end(), + back_inserter(remaining_data)); + m_input_data = remaining_data; + } + + if (st.complete) { + m_af_packet_completed(); + } + } + else if (m_input_data[0] == 'P' and m_input_data[1] == 'F') { + PFT::Fragment fragment; + const size_t fragment_bytes = fragment.loadData(m_input_data); + + if (fragment_bytes == 0) { + // We need to refill our buffer + break; + } + + vector remaining_data; + copy(m_input_data.begin() + fragment_bytes, + m_input_data.end(), + back_inserter(remaining_data)); + m_input_data = remaining_data; + + if (fragment.isValid()) { + m_pft.pushPFTFrag(fragment); + } + + auto af = m_pft.getNextAFPacket(); + if (not af.empty()) { + decode_state_t st = decode_afpacket(af); + + if (st.complete) { + m_af_packet_completed(); + } + } + } + else { + etiLog.log(warn,"Unknown %c!", *m_input_data.data()); + m_input_data.erase(m_input_data.begin()); + } + } +} + +void TagDispatcher::push_packet(const vector &buf) +{ + if (buf.size() < 2) { + throw std::invalid_argument("Not enough bytes to read EDI packet header"); + } + + if (buf[0] == 'A' and buf[1] == 'F') { + const decode_state_t st = decode_afpacket(buf); + + if (st.complete) { + m_af_packet_completed(); + } + + } + else if (buf[0] == 'P' and buf[1] == 'F') { + PFT::Fragment fragment; + fragment.loadData(buf); + + if (fragment.isValid()) { + m_pft.pushPFTFrag(fragment); + } + + auto af = m_pft.getNextAFPacket(); + if (not af.empty()) { + const decode_state_t st = decode_afpacket(af); + + if (st.complete) { + m_af_packet_completed(); + } + } + } + else { + const char packettype[3] = {(char)buf[0], (char)buf[1], '\0'}; + std::stringstream ss; + ss << "Unknown EDI packet "; + ss << packettype; + throw std::invalid_argument(ss.str()); + } +} + +void TagDispatcher::setMaxDelay(int num_af_packets) +{ + m_pft.setMaxDelay(num_af_packets); +} + + +#define AFPACKET_HEADER_LEN 10 // includes SYNC +decode_state_t TagDispatcher::decode_afpacket( + const std::vector &input_data) +{ + if (input_data.size() < AFPACKET_HEADER_LEN) { + return {false, 0}; + } + + // read length from packet + uint32_t taglength = read_32b(input_data.begin() + 2); + uint16_t seq = read_16b(input_data.begin() + 6); + + const size_t crclength = 2; + if (input_data.size() < AFPACKET_HEADER_LEN + taglength + crclength) { + return {false, 0}; + } + + if (m_last_seq + 1 != seq) { + etiLog.level(warn) << "EDI AF Packet sequence error, " << seq; + } + m_last_seq = seq; + + bool has_crc = (input_data[8] & 0x80) ? true : false; + uint8_t major_revision = (input_data[8] & 0x70) >> 4; + uint8_t minor_revision = input_data[8] & 0x0F; + if (major_revision != 1 or minor_revision != 0) { + throw invalid_argument("EDI AF Packet has wrong revision " + + to_string(major_revision) + "." + to_string(minor_revision)); + } + uint8_t pt = input_data[9]; + if (pt != 'T') { + // only support Tag + return {false, 0}; + } + + + if (not has_crc) { + throw invalid_argument("AF packet not supported, has no CRC"); + } + + uint16_t crc = 0xffff; + for (size_t i = 0; i < AFPACKET_HEADER_LEN + taglength; i++) { + crc = crc16(crc, &input_data[i], 1); + } + crc ^= 0xffff; + + uint16_t packet_crc = read_16b(input_data.begin() + AFPACKET_HEADER_LEN + taglength); + + if (packet_crc != crc) { + throw invalid_argument( + "AF Packet crc wrong"); + } + else { + vector payload(taglength); + copy(input_data.begin() + AFPACKET_HEADER_LEN, + input_data.begin() + AFPACKET_HEADER_LEN + taglength, + payload.begin()); + + return {decode_tagpacket(payload), + AFPACKET_HEADER_LEN + taglength + 2}; + } +} + +void TagDispatcher::register_tag(const std::string& tag, tag_handler&& h) +{ + m_handlers[tag] = move(h); +} + + +bool TagDispatcher::decode_tagpacket(const vector &payload) +{ + size_t length = 0; + + bool success = true; + + for (size_t i = 0; i + 8 < payload.size(); i += 8 + length) { + char tag_sz[5]; + tag_sz[4] = '\0'; + copy(payload.begin() + i, payload.begin() + i + 4, tag_sz); + + string tag(tag_sz); + + uint32_t taglength = read_32b(payload.begin() + i + 4); + + if (taglength % 8 != 0) { + etiLog.log(warn, "Invalid tag length!"); + break; + } + taglength /= 8; + + length = taglength; + + vector tag_value(taglength); + copy( payload.begin() + i+8, + payload.begin() + i+8+taglength, + tag_value.begin()); + + bool tagsuccess = false; + bool found = false; + for (auto tag_handler : m_handlers) { + if (tag_handler.first.size() == 4 and tag_handler.first == tag) { + found = true; + tagsuccess = tag_handler.second(tag_value, 0); + } + else if (tag_handler.first.size() == 3 and + tag.substr(0, 3) == tag_handler.first) { + found = true; + uint8_t n = tag_sz[3]; + tagsuccess = tag_handler.second(tag_value, n); + } + else if (tag_handler.first.size() == 2 and + tag.substr(0, 2) == tag_handler.first) { + found = true; + uint16_t n = 0; + n = (uint16_t)(tag_sz[2]) << 8; + n |= (uint16_t)(tag_sz[3]); + tagsuccess = tag_handler.second(tag_value, n); + } + } + + if (not found) { + etiLog.log(warn, "Ignoring unknown TAG %s", tag.c_str()); + break; + } + + if (not tagsuccess) { + etiLog.log(warn, "Error decoding TAG %s", tag.c_str()); + success = tagsuccess; + break; + } + } + + return success; +} + +} diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp new file mode 100644 index 0000000..1433004 --- /dev/null +++ b/lib/edi/common.hpp @@ -0,0 +1,88 @@ +/* + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#pragma once + +#include "PFT.hpp" +#include +#include +#include +#include +#include +#include + +namespace EdiDecoder { + +struct frame_timestamp_t { + uint32_t seconds = 0; + uint32_t utco = 0; + + std::string to_string() const; + time_t to_unix_epoch() const; +}; + +struct decode_state_t { + decode_state_t(bool _complete, size_t _num_bytes_consumed) : + complete(_complete), num_bytes_consumed(_num_bytes_consumed) {} + bool complete; + size_t num_bytes_consumed; +}; + +/* The TagDispatcher takes care of decoding EDI, with or without PFT, and + * will call functions when TAGs are encountered. + * + * PF packets are handed over to the PFT decoder, which will in turn return + * AF packets. AF packets are directly dispatched to the TAG functions. + */ +class TagDispatcher { + public: + TagDispatcher(std::function&& af_packet_completed, bool verbose); + + /* Push bytes into the decoder. The buf can contain more + * than a single packet. This is useful when reading from streams + * (files, TCP) + */ + void push_bytes(const std::vector &buf); + + /* Push a complete packet into the decoder. Useful for UDP and other + * datagram-oriented protocols. + */ + void push_packet(const std::vector &buf); + + /* Set the maximum delay in number of AF Packets before we + * abandon decoding a given pseq. + */ + void setMaxDelay(int num_af_packets); + + using tag_handler = std::function, uint16_t)>; + void register_tag(const std::string& tag, tag_handler&& h); + + private: + decode_state_t decode_afpacket(const std::vector &input_data); + bool decode_tagpacket(const std::vector &payload); + + PFT::PFT m_pft; + uint16_t m_last_seq = 0; + std::vector m_input_data; + std::map m_handlers; + std::function m_af_packet_completed; +}; + +} diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp new file mode 100644 index 0000000..8aee296 --- /dev/null +++ b/src/input/Edi.cpp @@ -0,0 +1,189 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#include "input/Edi.h" + +#include +#include +#include +#include +#include +#include +#include +#include "utils.h" + +using namespace std; + +namespace Inputs { + +constexpr bool VERBOSE = false; +constexpr size_t TCP_BLOCKSIZE = 2048; +constexpr size_t MAX_FRAMES_QUEUED = 10; + +Edi::Edi() : + m_tcp_receive_server(TCP_BLOCKSIZE), + m_sti_writer(), + m_sti_decoder(m_sti_writer, VERBOSE) +{ } + +Edi::~Edi() { + m_running = false; + if (m_thread.joinable()) { + m_thread.join(); + } +} + +int Edi::open(const std::string& name) +{ + const std::regex re_udp("udp://:([0-9]+)"); + const std::regex re_tcp("tcp://(.*):([0-9]+)"); + + lock_guard lock(m_mutex); + + m_running = false; + if (m_thread.joinable()) { + m_thread.join(); + } + + std::smatch m; + if (std::regex_match(name, m, re_udp)) { + const int udp_port = std::stoi(m[1].str()); + m_input_used = InputUsed::UDP; + m_udp_sock.reinit(udp_port); + m_udp_sock.setBlocking(false); + // TODO multicast + } + else if (std::regex_match(name, m, re_tcp)) { + m_input_used = InputUsed::TCP; + const string addr = m[1].str(); + const int tcp_port = std::stoi(m[2].str()); + m_tcp_receive_server.start(tcp_port, addr); + } + else { + throw runtime_error("Cannot parse EDI input URI"); + } + + m_name = name; + + m_running = true; + m_thread = std::thread(&Edi::m_run, this); + + return 0; +} + +int Edi::readFrame(uint8_t* buffer, size_t size) +{ + vector frame; + if (m_is_prebuffering) { + m_is_prebuffering = m_frames.size() < 10; + if (not m_is_prebuffering) { + etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete."; + } + } + else if (m_frames.try_pop(frame)) { + if (frame.size() == 0) { + etiLog.level(debug) << "EDI input " << m_name << " empty frame"; + return 0; + } + else if (frame.size() == size) { + std::copy(frame.cbegin(), frame.cend(), buffer); + } + else { + etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << frame.size() << + " received, " << size << " requested"; + memset(buffer, 0, size * sizeof(*buffer)); + } + } + else { + memset(buffer, 0, size * sizeof(*buffer)); + m_is_prebuffering = true; + etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering"; + } + return size; +} + +void Edi::m_run() +{ + while (m_running) { + bool work_done = false; + + switch (m_input_used) { + case InputUsed::UDP: + { + constexpr size_t packsize = 2048; + const auto packet = m_udp_sock.receive(packsize); + if (packet.buffer.size() == packsize) { + fprintf(stderr, "Warning, possible UDP truncation\n"); + } + if (not packet.buffer.empty()) { + m_sti_decoder.push_packet(packet.buffer); + work_done = true; + } + } + break; + case InputUsed::TCP: + { + auto packet = m_tcp_receive_server.receive(); + if (not packet.empty()) { + m_sti_decoder.push_bytes(packet); + work_done = true; + } + } + break; + default: + throw logic_error("unimplemented input"); + } + + const auto sti = m_sti_writer.getFrame(); + if (not sti.frame.empty()) { + m_frames.push_wait_if_full(move(sti.frame), MAX_FRAMES_QUEUED); + work_done = true; + } + + if (not work_done) { + // Avoid fast loop + this_thread::sleep_for(chrono::milliseconds(12)); + } + } +} + +int Edi::setBitrate(int bitrate) +{ + if (bitrate <= 0) { + etiLog.level(error) << "Invalid bitrate (" << bitrate << ") for " << m_name; + return -1; + } + + return bitrate; +} + +int Edi::close() +{ + m_udp_sock.close(); + return 0; +} + +} diff --git a/src/input/Edi.h b/src/input/Edi.h new file mode 100644 index 0000000..7b3dc04 --- /dev/null +++ b/src/input/Edi.h @@ -0,0 +1,82 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#pragma once + +#include +#include +#include +#include +#include +#include "Socket.h" +#include "input/inputs.h" +#include "edi/STIDecoder.hpp" +#include "edi/STIWriter.hpp" +#include "ThreadsafeQueue.h" + +namespace Inputs { + +/* + * Receives EDI from UDP or TCP in a separate thread and pushes that data + * into the STIDecoder. Complete frames are then put into a queue for the consumer. + * + * This way, the EDI decoding happens in a separate thread. + */ +class Edi : public InputBase { + public: + Edi(); + Edi(const Edi&) = delete; + Edi& operator=(const Edi&) = delete; + ~Edi(); + + virtual int open(const std::string& name); + virtual int readFrame(uint8_t* buffer, size_t size); + virtual int setBitrate(int bitrate); + virtual int close(); + + protected: + void m_run(); + + std::mutex m_mutex; + + enum class InputUsed { Invalid, UDP, TCP }; + InputUsed m_input_used = InputUsed::Invalid; + Socket::UDPSocket m_udp_sock; + Socket::TCPReceiveServer m_tcp_receive_server; + + EdiDecoder::STIWriter m_sti_writer; + EdiDecoder::STIDecoder m_sti_decoder; + std::thread m_thread; + std::atomic m_running = ATOMIC_VAR_INIT(false); + ThreadsafeQueue > m_frames; + + bool m_is_prebuffering = true; + + std::string m_name; +}; + +}; + -- cgit v1.2.3