From 2dac8f5fa6d63a71a726ec373af9bf45f22de8b7 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 6 Jan 2017 16:22:40 +0100 Subject: EDI: handle pseq resync and SIGINT --- lib/edi/ETIDecoder.cpp | 2 -- lib/edi/PFT.cpp | 43 +++++++++++++++++-------------- lib/edi/PFT.hpp | 15 +++++------ src/DabMod.cpp | 69 +++++++++++++++++++++++--------------------------- src/EtiReader.cpp | 16 ++++++++---- src/EtiReader.h | 6 ++++- 6 files changed, 78 insertions(+), 73 deletions(-) diff --git a/lib/edi/ETIDecoder.cpp b/lib/edi/ETIDecoder.cpp index baede11..939706b 100644 --- a/lib/edi/ETIDecoder.cpp +++ b/lib/edi/ETIDecoder.cpp @@ -331,7 +331,6 @@ bool ETIDecoder::decode_deti(const vector &value) to_string(expected_length)); } - etiLog.level(debug) << "EDI DETI"; m_data_collector.update_err(stat); m_data_collector.update_mnsc(mnsc); @@ -402,7 +401,6 @@ bool ETIDecoder::decode_estn(const vector &value, uint8_t n) value.end(), back_inserter(stc.mst)); - etiLog.level(debug) << "EDI ESTn " << (int)stc.stream_index; m_data_collector.add_subchannel(stc); return true; diff --git a/lib/edi/PFT.cpp b/lib/edi/PFT.cpp index fa5840c..4bd1030 100644 --- a/lib/edi/PFT.cpp +++ b/lib/edi/PFT.cpp @@ -1,6 +1,6 @@ /* ------------------------------------------------------------------ * Copyright (C) 2017 AVT GmbH - Fabien Vercasson - * Copyright (C) 2016 Matthias P. Braendli + * Copyright (C) 2017 Matthias P. Braendli * matthias.braendli@mpb.li * * http://opendigitalradio.org @@ -35,7 +35,6 @@ extern "C" { } namespace EdiDecoder { - namespace PFT { using namespace std; @@ -191,10 +190,11 @@ size_t Fragment::loadData(const std::vector &buf) } -AFBuilder::AFBuilder(pseq_t Pseq, findex_t Fcount, int lifetime) +AFBuilder::AFBuilder(pseq_t Pseq, findex_t Fcount, size_t lifetime) { _Pseq = Pseq; _Fcount = Fcount; + assert(lifetime > 0); lifeTime = lifetime; } @@ -459,7 +459,7 @@ void PFT::pushPFTFrag(const Fragment &fragment) // The AFBuilder wants to know the lifetime in number of fragments, // we know the delay in number of AF packets. Every AF packet // is cut into Fcount fragments. - const int lifetime = fragment.Fcount() * m_max_delay; + const size_t lifetime = fragment.Fcount() * m_max_delay; // Build the afbuilder in the map in-place m_afbuilders.emplace(std::piecewise_construct, @@ -485,26 +485,34 @@ void PFT::pushPFTFrag(const Fragment &fragment) std::vector PFT::getNextAFPacket() { if (m_afbuilders.count(m_next_pseq) == 0) { - assert(m_afbuilders.empty()); + if (m_afbuilders.size() > m_max_delay) { + m_afbuilders.clear(); + etiLog.level(debug) << " Reinit"; + } + return {}; } auto &builder = m_afbuilders.at(m_next_pseq); - //const auto lt = builder.lifeTime; //const auto nf = builder.numberOfFragments(); using dar_t = AFBuilder::decode_attempt_result_t; if (builder.canAttemptToDecode() == dar_t::yes) { - //etiLog.log(debug, "pseq %d (%d %d/%d) yes\n", m_next_pseq, lt, nf.first, nf.second); + //etiLog.log(debug, "pseq %d (%d %d/%d) yes\n", + // m_next_pseq, lt, nf.first, nf.second); auto afpacket = builder.extractAF(); assert(not afpacket.empty()); incrementNextPseq(); return afpacket; } else if (builder.canAttemptToDecode() == dar_t::maybe) { - //etiLog.log(debug, "pseq %d (%d %d/%d) maybe\n", m_next_pseq, lt, nf.first, nf.second); - builder.lifeTime--; + //etiLog.log(debug, "pseq %d (%d %d/%d) maybe\n", + // m_next_pseq, lt, nf.first, nf.second); + if (builder.lifeTime > 0) { + builder.lifeTime--; + } + if (builder.lifeTime == 0) { // Attempt Reed-Solomon decoding auto afpacket = builder.extractAF(); @@ -517,8 +525,12 @@ std::vector PFT::getNextAFPacket() } } else { - //etiLog.log(debug, "pseq %d (%d %d/%d) no\n", m_next_pseq, lt, nf.first, nf.second); - builder.lifeTime--; + //etiLog.log(debug, "pseq %d (%d %d/%d) no\n", + // m_next_pseq, lt, nf.first, nf.second); + if (builder.lifeTime > 0) { + builder.lifeTime--; + } + if (builder.lifeTime == 0) { etiLog.log(debug, "pseq %d timed out\n", m_next_pseq); incrementNextPseq(); @@ -528,17 +540,11 @@ std::vector PFT::getNextAFPacket() return {}; } -void PFT::setMaxDelay(int num_af_packets) +void PFT::setMaxDelay(size_t num_af_packets) { m_max_delay = num_af_packets; } -bool PFT::isPacketBuildable(pseq_t pseq) const -{ - return m_afbuilders.count(pseq) > 0 and - not m_afbuilders.at(pseq).extractAF().empty(); -} - void PFT::incrementNextPseq() { if (m_afbuilders.count(m_next_pseq - NUM_AFBUILDERS_TO_KEEP) > 0) { @@ -549,5 +555,4 @@ void PFT::incrementNextPseq() } } - } diff --git a/lib/edi/PFT.hpp b/lib/edi/PFT.hpp index 9ab14ae..9fb96a7 100644 --- a/lib/edi/PFT.hpp +++ b/lib/edi/PFT.hpp @@ -1,6 +1,6 @@ /* ------------------------------------------------------------------ * Copyright (C) 2017 AVT GmbH - Fabien Vercasson - * Copyright (C) 2016 Matthias P. Braendli + * Copyright (C) 2017 Matthias P. Braendli * matthias.braendli@mpb.li * * http://opendigitalradio.org @@ -22,7 +22,6 @@ #pragma once #include -#include #include #include #include @@ -84,7 +83,7 @@ class AFBuilder no, // Not enough fragments present to permit RS }; - AFBuilder(pseq_t Pseq, findex_t Fcount, int lifetime); + AFBuilder(pseq_t Pseq, findex_t Fcount, size_t lifetime); void pushPFTFrag(const Fragment &frag); @@ -105,7 +104,7 @@ class AFBuilder /* The user of this instance can keep track of the lifetime of this * builder */ - int lifeTime; + size_t lifeTime; private: @@ -125,7 +124,7 @@ class PFT void pushPFTFrag(const Fragment &fragment); /* Try to build the AF packet for the next pseq. This might - * skip one pseq according to the maximum delay setting. + * skip one or more pseq according to the maximum delay setting. * * \return an empty vector if building the AF is not possible */ @@ -134,15 +133,13 @@ class PFT /* Set the maximum delay in number of AF Packets before we * abandon decoding a given pseq. */ - void setMaxDelay(int num_af_packets); + void setMaxDelay(size_t num_af_packets); private: - bool isPacketBuildable(pseq_t pseq) const; - void incrementNextPseq(void); pseq_t m_next_pseq; - int m_max_delay = 10; + size_t m_max_delay = 10; // Keep one AFBuilder for each Pseq std::map m_afbuilders; diff --git a/src/DabMod.cpp b/src/DabMod.cpp index e48e748..a5c0de6 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -777,51 +777,46 @@ int launch_modulator(int argc, char* argv[]) set_thread_name("modulator"); if (ediUdpInput.isEnabled()) { - while (run_again) { - Flowgraph flowgraph; + Flowgraph flowgraph; - etiLog.level(debug) << "Build mod"; - auto modulator = make_shared( - ediReader, tiiConfig, outputRate, clockRate, - dabMode, gainMode, digitalgain, normalise, - filterTapsFilename); - - etiLog.level(debug) << "Connect"; - if (format_converter) { - flowgraph.connect(modulator, format_converter); - flowgraph.connect(format_converter, output); - } - else { - flowgraph.connect(modulator, output); - } + auto modulator = make_shared( + ediReader, tiiConfig, outputRate, clockRate, + dabMode, gainMode, digitalgain, normalise, + filterTapsFilename); - etiLog.level(debug) << "SetETISource"; + if (format_converter) { + flowgraph.connect(modulator, format_converter); + flowgraph.connect(format_converter, output); + } + else { + flowgraph.connect(modulator, output); + } #if defined(HAVE_OUTPUT_UHD) - if (useUHDOutput) { - ((OutputUHD*)output.get())->setETISource(modulator->getEtiSource()); - } + if (useUHDOutput) { + ((OutputUHD*)output.get())->setETISource(modulator->getEtiSource()); + } #endif - etiLog.level(debug) << "Loop"; - size_t framecount = 0; - while (true) { - while (not ediReader.isFrameReady()) { - ediUdpInput.rxPacket(); - } - etiLog.level(debug) << "Frame Ready"; - framecount++; - flowgraph.run(); - etiLog.level(debug) << "now clear"; - ediReader.clearFrame(); - - /* Check every once in a while if the remote control - * is still working */ - if ((framecount % 250) == 0) { - rcs.check_faults(); + size_t framecount = 0; + bool running = true; + while (running) { + while (not ediReader.isFrameReady()) { + bool success = ediUdpInput.rxPacket(); + if (not success) { + running = false; + break; } } + framecount++; + flowgraph.run(); + ediReader.clearFrame(); + /* Check every once in a while if the remote control + * is still working */ + if ((framecount % 250) == 0) { + rcs.check_faults(); + } } } else { diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp index 3207a1f..e646392 100644 --- a/src/EtiReader.cpp +++ b/src/EtiReader.cpp @@ -308,7 +308,6 @@ uint32_t EtiReader::getPPSOffset() unsigned EdiReader::getMode() { if (not m_fc_valid) { - assert(false); throw std::runtime_error("Trying to access Mode before it is ready!"); } return m_fc.mid; @@ -463,7 +462,6 @@ void EdiReader::add_subchannel(const EdiDecoder::eti_stc_data& stc) void EdiReader::assemble() { - etiLog.level(debug) << "Calling assemble"; if (not m_proto_valid) { throw std::logic_error("Cannot assemble EDI data before protocol"); } @@ -529,7 +527,7 @@ int EdiUdpInput::Open(const std::string& uri) return ret; } -void EdiUdpInput::rxPacket() +bool EdiUdpInput::rxPacket() { const size_t packsize = 8192; UdpPacket packet(packsize); @@ -538,13 +536,21 @@ void EdiUdpInput::rxPacket() if (ret == 0) { const auto &buf = packet.getBuffer(); if (packet.getSize() == packsize) { - fprintf(stderr, "Warning, possible UDP truncation\n"); + etiLog.log(warn, "Warning, possible UDP truncation"); } m_decoder.push_packet(buf); + return true; } else { - fprintf(stderr, "Socket error: %s\n", inetErrMsg); + if (inetErrNo == EINTR) { + return false; + } + else { + stringstream ss; + ss << "EDI UDP Socket error: " << inetErrMsg; + throw std::runtime_error(ss.str()); + } } } diff --git a/src/EtiReader.h b/src/EtiReader.h index 04d627d..78f0d3d 100644 --- a/src/EtiReader.h +++ b/src/EtiReader.h @@ -176,7 +176,11 @@ class EdiUdpInput { bool isEnabled(void) const { return m_enabled; } - void rxPacket(void); + /* Receive a packet and give it to the decoder. Returns + * true if a packet was received, false in case of socket + * read was interrupted by a signal. + */ + bool rxPacket(void); private: bool m_enabled; -- cgit v1.2.3