From 1d833b718845b97a5b1d90f33b547b1772bc0708 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 7 Jan 2018 08:49:29 +0100 Subject: Add new flowgraph path for metadata --- src/BlockPartitioner.cpp | 45 ++++++++++++++++++++++++------ src/BlockPartitioner.h | 21 ++++++++------ src/DabModulator.cpp | 13 +++++++-- src/DabModulator.h | 9 ++++-- src/EtiReader.cpp | 7 +++-- src/FicSource.cpp | 24 +++++++++++++++- src/FicSource.h | 11 ++++++-- src/Flowgraph.cpp | 68 ++++++++++++++++++++++++++++++++++++++++------ src/Flowgraph.h | 14 +++++++--- src/ModPlugin.h | 17 ++++++++++++ src/OutputFile.cpp | 56 ++++++++++++++++++++++++++++---------- src/OutputFile.h | 22 +++++++++++---- src/OutputMemory.cpp | 14 +++++++++- src/OutputMemory.h | 16 ++++++++--- src/TimestampDecoder.cpp | 48 ++++++++++++++++++++++++++++++-- src/TimestampDecoder.h | 71 ++++++++++++++++-------------------------------- 16 files changed, 341 insertions(+), 115 deletions(-) diff --git a/src/BlockPartitioner.cpp b/src/BlockPartitioner.cpp index 9e9f80b..54405d9 100644 --- a/src/BlockPartitioner.cpp +++ b/src/BlockPartitioner.cpp @@ -1,6 +1,11 @@ /* Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2018 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org */ /* This file is part of ODR-DabMod. @@ -21,6 +26,8 @@ #include "BlockPartitioner.h" #include "PcDebug.h" +#include "Log.h" +#include "TimestampDecoder.h" #include #include @@ -31,6 +38,7 @@ BlockPartitioner::BlockPartitioner(unsigned mode, unsigned phase) : ModMux(), + ModMetadata(), d_mode(mode) { PDEBUG("BlockPartitioner::BlockPartitioner(%i)\n", mode); @@ -68,17 +76,11 @@ BlockPartitioner::BlockPartitioner(unsigned mode, unsigned phase) : d_cifNb = 0; // For Synchronisation purpose, count nb of CIF to drop d_cifPhase = phase % d_cifCount; + d_metaPhase = phase % d_cifCount; d_cifSize = 864 * 8; } -BlockPartitioner::~BlockPartitioner() -{ - PDEBUG("BlockPartitioner::~BlockPartitioner()\n"); - -} - - // dataIn[0] -> FIC // dataIn[1] -> CIF int BlockPartitioner::process(std::vector dataIn, Buffer* dataOut) @@ -124,10 +126,10 @@ int BlockPartitioner::process(std::vector dataIn, Buffer* dataOut) uint8_t* out = reinterpret_cast(dataOut->getData()); // Copy FIC data - PDEBUG("Writting FIC %zu bytes to %zu\n", d_ficSize, d_cifNb * d_ficSize); + PDEBUG("Writing FIC %zu bytes to %zu\n", d_ficSize, d_cifNb * d_ficSize); memcpy(out + (d_cifNb * d_ficSize), fic, d_ficSize); // Copy CIF data - PDEBUG("Writting CIF %u bytes to %zu\n", 864 * 8, + PDEBUG("Writing CIF %u bytes to %zu\n", 864 * 8, (d_cifCount * d_ficSize) + (d_cifNb * 864 * 8)); memcpy(out + (d_cifCount * d_ficSize) + (d_cifNb * 864 * 8), cif, 864 * 8); @@ -137,3 +139,28 @@ int BlockPartitioner::process(std::vector dataIn, Buffer* dataOut) return d_cifNb == 0; } + +meta_vec_t BlockPartitioner::process_metadata(const meta_vec_t& metadataIn) +{ + // Synchronize CIF phase + if (d_metaPhase != 0) { + if (++d_metaPhase == d_cifCount) { + d_metaPhase = 0; + } + // Drop this metadata + return {}; + } + + if (d_cifNb == 1) { + d_meta.clear(); + } + + std::copy(metadataIn.begin(), metadataIn.end(), std::back_inserter(d_meta)); + + if (d_cifNb == 0) { + return d_meta; + } + else { + return {}; + } +} diff --git a/src/BlockPartitioner.h b/src/BlockPartitioner.h index 90cffa3..a4656a1 100644 --- a/src/BlockPartitioner.h +++ b/src/BlockPartitioner.h @@ -1,6 +1,11 @@ /* Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2018 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org */ /* This file is part of ODR-DabMod. @@ -25,32 +30,32 @@ # include #endif - #include "ModPlugin.h" #include +#include -#include - - -class BlockPartitioner : public ModMux +class BlockPartitioner : public ModMux, public ModMetadata { public: BlockPartitioner(unsigned mode, unsigned phase); - virtual ~BlockPartitioner(); - BlockPartitioner(const BlockPartitioner&); - BlockPartitioner& operator=(const BlockPartitioner&); int process(std::vector dataIn, Buffer* dataOut); const char* name() { return "BlockPartitioner"; } + // The implementation assumes process_metadata is always called after process + virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn); + protected: int d_mode; size_t d_ficSize; size_t d_cifCount; size_t d_cifNb; size_t d_cifPhase; + size_t d_metaPhase; size_t d_cifSize; size_t d_outputFramesize; size_t d_outputFramecount; + + meta_vec_t d_meta; }; diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp index 0818f4f..f1ea071 100644 --- a/src/DabModulator.cpp +++ b/src/DabModulator.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2017 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -218,7 +218,7 @@ int DabModulator::process(Buffer* dataOut) rcs.enrol(cifPoly.get()); } - auto myOutput = make_shared(dataOut); + myOutput = make_shared(dataOut); shared_ptr cifRes; if (m_settings.outputRate != 2048000) { @@ -392,3 +392,12 @@ int DabModulator::process(Buffer* dataOut) return myFlowgraph->run(); } +meta_vec_t DabModulator::process_metadata(const meta_vec_t& metadataIn) +{ + if (myOutput) { + return myOutput->get_latest_metadata(); + } + + return {}; +} + diff --git a/src/DabModulator.h b/src/DabModulator.h index 6878853..e806c92 100644 --- a/src/DabModulator.h +++ b/src/DabModulator.h @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2017 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -46,7 +46,7 @@ #include "TII.h" -class DabModulator : public ModInput +class DabModulator : public ModInput, public ModMetadata { public: DabModulator(EtiSource& etiSource, @@ -55,6 +55,9 @@ public: int process(Buffer* dataOut); const char* name() { return "DabModulator"; } + virtual meta_vec_t process_metadata( + const meta_vec_t& metadataIn); + /* Required to get the timestamp */ EtiSource* getEtiSource() { return &myEtiSource; } @@ -73,5 +76,7 @@ protected: size_t mySymSize; size_t myFicSizeOut; size_t myFicSizeIn; + + std::shared_ptr myOutput; }; diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp index d84ed1f..dc5df84 100644 --- a/src/EtiReader.cpp +++ b/src/EtiReader.cpp @@ -281,6 +281,8 @@ int EtiReader::loadEtiData(const Buffer& dataIn) myTimestampDecoder.updateTimestampEti(eti_fc.FP & 0x3, eti_eoh.MNSC, getPPSOffset(), eti_fc.FCT); + myFicSource->loadTimestamp(myTimestampDecoder.getTimestamp()); + return dataIn.getLength() - input_size; } @@ -533,8 +535,9 @@ void EdiReader::assemble() const std::time_t posix_timestamp_1_jan_2000 = 946684800; auto utc_ts = posix_timestamp_1_jan_2000 + m_seconds - m_utco; - m_timestamp_decoder.updateTimestampEdi( - utc_ts, m_fc.tsta, m_fc.fct()); + m_timestamp_decoder.updateTimestampEdi(utc_ts, m_fc.tsta, m_fc.fct(), m_fc.fp); + + myFicSource->loadTimestamp(m_timestamp_decoder.getTimestamp()); m_frameReady = true; } diff --git a/src/FicSource.cpp b/src/FicSource.cpp index 04197db..2b95085 100644 --- a/src/FicSource.cpp +++ b/src/FicSource.cpp @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -26,6 +26,8 @@ #include "FicSource.h" #include "PcDebug.h" +#include "Log.h" +#include "TimestampDecoder.h" #include #include @@ -92,3 +94,23 @@ int FicSource::process(Buffer* outputData) return outputData->getLength(); } +void FicSource::loadTimestamp(const std::shared_ptr& ts) +{ + d_ts = ts; +} + + +meta_vec_t FicSource::process_metadata(const meta_vec_t& metadataIn) +{ + if (not d_ts) { + return {}; + } + + using namespace std; + meta_vec_t md_vec; + flowgraph_metadata meta; + meta.ts = d_ts; + md_vec.push_back(meta); + return md_vec; +} + diff --git a/src/FicSource.h b/src/FicSource.h index 77ac741..93c1a7f 100644 --- a/src/FicSource.h +++ b/src/FicSource.h @@ -36,7 +36,7 @@ #include #include -class FicSource : public ModInput +class FicSource : public ModInput, public ModMetadata { public: FicSource(unsigned ficf, unsigned mid); @@ -45,12 +45,17 @@ public: const std::vector& get_rules(); void loadFicData(const Buffer& fic); - int process(Buffer* outputData); - const char* name() { return "FicSource"; } + int process(Buffer* outputData) override; + const char* name() override { return "FicSource"; } + + void loadTimestamp(const std::shared_ptr& ts); + virtual meta_vec_t process_metadata( + const meta_vec_t& metadataIn) override; private: size_t d_framesize; Buffer d_buffer; + std::shared_ptr d_ts; std::vector d_puncturing_rules; }; diff --git a/src/Flowgraph.cpp b/src/Flowgraph.cpp index 465ef41..0b80a8c 100644 --- a/src/Flowgraph.cpp +++ b/src/Flowgraph.cpp @@ -26,6 +26,8 @@ #include "Flowgraph.h" #include "PcDebug.h" +#include "Log.h" +#include "TimestampDecoder.h" #include #include #include @@ -57,9 +59,10 @@ Node::~Node() assert(myOutputBuffers.size() == 0); } -void Node::addOutputBuffer(Buffer::sptr& buffer) +void Node::addOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md) { myOutputBuffers.push_back(buffer); + myOutputMetadata.push_back(md); #if DEBUG std::string fname = string(myPlugin->name()) + "-" + to_string(myDebugFiles.size()) + @@ -71,7 +74,7 @@ void Node::addOutputBuffer(Buffer::sptr& buffer) #endif } -void Node::removeOutputBuffer(Buffer::sptr& buffer) +void Node::removeOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md) { auto it = std::find( myOutputBuffers.begin(), @@ -89,14 +92,23 @@ void Node::removeOutputBuffer(Buffer::sptr& buffer) #endif myOutputBuffers.erase(it); } + + auto mdit = std::find( + myOutputMetadata.begin(), + myOutputMetadata.end(), + md); + if (mdit != myOutputMetadata.end()) { + myOutputMetadata.erase(mdit); + } } -void Node::addInputBuffer(Buffer::sptr& buffer) +void Node::addInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md) { myInputBuffers.push_back(buffer); + myInputMetadata.push_back(md); } -void Node::removeInputBuffer(Buffer::sptr& buffer) +void Node::removeInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md) { auto it = std::find( myInputBuffers.begin(), @@ -105,6 +117,14 @@ void Node::removeInputBuffer(Buffer::sptr& buffer) if (it != myInputBuffers.end()) { myInputBuffers.erase(it); } + + auto mdit = std::find( + myInputMetadata.begin(), + myInputMetadata.end(), + md); + if (mdit != myInputMetadata.end()) { + myInputMetadata.erase(mdit); + } } int Node::process() @@ -127,6 +147,36 @@ int Node::process() } int ret = myPlugin->process(inBuffers, outBuffers); + + // Collect all incoming metadata into a single vector + meta_vec_t all_input_mds; + for (auto& md_vec_sp : myInputMetadata) { + if (md_vec_sp) { + copy(md_vec_sp->begin(), md_vec_sp->end(), + back_inserter(all_input_mds)); + } + } + + auto mod_meta = dynamic_pointer_cast(myPlugin); + if (mod_meta) { + auto outputMetadata = mod_meta->process_metadata(all_input_mds); + // Distribute the result metadata to all outputs + for (auto& out_md : myOutputMetadata) { + out_md->clear(); + std::move(outputMetadata.begin(), outputMetadata.end(), + std::back_inserter(*out_md)); + } + } + else { + // Propagate the unmodified input metadata to all outputs + for (auto& out_md : myOutputMetadata) { + out_md->clear(); + std::move(all_input_mds.begin(), all_input_mds.end(), + std::back_inserter(*out_md)); + } + } + + #if DEBUG assert(myDebugFiles.size() == myOutputBuffers.size()); @@ -158,8 +208,10 @@ Edge::Edge(shared_ptr& srcNode, shared_ptr& dstNode) : this); myBuffer = make_shared(); - srcNode->addOutputBuffer(myBuffer); - dstNode->addInputBuffer(myBuffer); + myMetadata = make_shared >(); + + srcNode->addOutputBuffer(myBuffer, myMetadata); + dstNode->addInputBuffer(myBuffer, myMetadata); } @@ -168,8 +220,8 @@ Edge::~Edge() PDEBUG("Edge::~Edge() @ %p\n", this); if (myBuffer) { - mySrcNode->removeOutputBuffer(myBuffer); - myDstNode->removeInputBuffer(myBuffer); + mySrcNode->removeOutputBuffer(myBuffer, myMetadata); + myDstNode->removeInputBuffer(myBuffer, myMetadata); } } diff --git a/src/Flowgraph.h b/src/Flowgraph.h index ebb7314..2742824 100644 --- a/src/Flowgraph.h +++ b/src/Flowgraph.h @@ -39,6 +39,8 @@ #include #include +using Metadata_vec_sptr = std::shared_ptr >; + class Node { public: @@ -55,15 +57,18 @@ public: myProcessTime += processTime; } - void addOutputBuffer(Buffer::sptr& buffer); - void removeOutputBuffer(Buffer::sptr& buffer); + void addOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md); + void removeOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md); - void addInputBuffer(Buffer::sptr& buffer); - void removeInputBuffer(Buffer::sptr& buffer); + void addInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md); + void removeInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md); protected: std::list myInputBuffers; std::list myOutputBuffers; + std::list myInputMetadata; + std::list myOutputMetadata; + #if DEBUG std::list myDebugFiles; #endif @@ -85,6 +90,7 @@ protected: std::shared_ptr mySrcNode; std::shared_ptr myDstNode; std::shared_ptr myBuffer; + std::shared_ptr > myMetadata; }; diff --git a/src/ModPlugin.h b/src/ModPlugin.h index d3aa780..3c3e8b3 100644 --- a/src/ModPlugin.h +++ b/src/ModPlugin.h @@ -119,3 +119,20 @@ public: std::vector dataOut); virtual int process(Buffer* dataIn) = 0; }; + +struct frame_timestamp; +struct flowgraph_metadata { + std::shared_ptr ts; +}; + + +using meta_vec_t = std::vector; + +/* Some ModPlugins also support metadata */ +class ModMetadata { + public: + // Receives metadata from all inputs, and process them, and output + // a sequence of metadata. + virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) = 0; +}; + diff --git a/src/OutputFile.cpp b/src/OutputFile.cpp index 23d5523..3bb45c4 100644 --- a/src/OutputFile.cpp +++ b/src/OutputFile.cpp @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -26,47 +26,73 @@ #include "OutputFile.h" #include "PcDebug.h" +#include "Log.h" +#include "TimestampDecoder.h" #include #include #include +using namespace std; + OutputFile::OutputFile(std::string filename) : ModOutput(), + ModMetadata(), myFilename(filename) { PDEBUG("OutputFile::OutputFile(filename: %s) @ %p\n", filename.c_str(), this); - myFile = fopen(filename.c_str(), "w"); - if (myFile == NULL) { + FILE* fd = fopen(filename.c_str(), "w"); + if (fd == nullptr) { perror(filename.c_str()); throw std::runtime_error( "OutputFile::OutputFile() unable to open file!"); } -} - - -OutputFile::~OutputFile() -{ - PDEBUG("OutputFile::~OutputFile() @ %p\n", this); - - if (myFile != NULL) { - fclose(myFile); - } + myFile.reset(fd); } int OutputFile::process(Buffer* dataIn) { PDEBUG("OutputFile::process(%p)\n", dataIn); - assert(dataIn != NULL); + assert(dataIn != nullptr); - if (fwrite(dataIn->getData(), dataIn->getLength(), 1, myFile) == 0) { + if (fwrite(dataIn->getData(), dataIn->getLength(), 1, myFile.get()) == 0) { throw std::runtime_error( "OutputFile::process() unable to write to file!"); } return dataIn->getLength(); } + +meta_vec_t OutputFile::process_metadata(const meta_vec_t& metadataIn) +{ + stringstream ss; + + if (metadataIn.empty()) { + etiLog.level(debug) << "OutputFile: no mdIn"; + } + + for (const auto& md : metadataIn) { + if (md.ts) { + ss << "FCT=" << md.ts->fct << + " FP=" << (int)md.ts->fp; + if (md.ts->timestamp_valid) { + ss << " ts=" << md.ts->timestamp_sec << + "+" << md.ts->timestamp_pps << ", "; + } + else { + ss << " no ts"; + } + } + else { + ss << "void, "; + } + } + + etiLog.level(debug) << "Output File got metadata: " << ss.str(); + + return {}; +} diff --git a/src/OutputFile.h b/src/OutputFile.h index 7121ef3..2b92369 100644 --- a/src/OutputFile.h +++ b/src/OutputFile.h @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -36,19 +36,29 @@ #include #include #include +#include +struct FILEDeleter { + void operator()(FILE* fd) { + if (fd) { + fclose(fd); + } + } +}; -class OutputFile : public ModOutput +class OutputFile : public ModOutput, public ModMetadata { public: OutputFile(std::string filename); - virtual ~OutputFile(); - virtual int process(Buffer* dataIn); - const char* name() { return "OutputFile"; } + virtual int process(Buffer* dataIn) override; + const char* name() override { return "OutputFile"; } + + virtual meta_vec_t process_metadata( + const meta_vec_t& metadataIn) override; protected: std::string myFilename; - FILE* myFile; + std::unique_ptr myFile; }; diff --git a/src/OutputMemory.cpp b/src/OutputMemory.cpp index 6e2fd49..5f24095 100644 --- a/src/OutputMemory.cpp +++ b/src/OutputMemory.cpp @@ -2,7 +2,7 @@ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -26,6 +26,7 @@ #include "OutputMemory.h" #include "PcDebug.h" +#include "Log.h" #include #include @@ -94,3 +95,14 @@ int OutputMemory::process(Buffer* dataIn) return myDataOut->getLength(); } +meta_vec_t OutputMemory::process_metadata(const meta_vec_t& metadataIn) +{ + myMetadata = metadataIn; + return {}; +} + +meta_vec_t OutputMemory::get_latest_metadata() +{ + return myMetadata; +} + diff --git a/src/OutputMemory.h b/src/OutputMemory.h index 715cb2d..f0a5fbb 100644 --- a/src/OutputMemory.h +++ b/src/OutputMemory.h @@ -2,7 +2,7 @@ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -46,18 +46,26 @@ #include "ModPlugin.h" -class OutputMemory : public ModOutput +class OutputMemory : public ModOutput, public ModMetadata { public: OutputMemory(Buffer* dataOut); virtual ~OutputMemory(); - virtual int process(Buffer* dataIn); - const char* name() { return "OutputMemory"; } + OutputMemory(OutputMemory& other) = delete; + OutputMemory& operator=(OutputMemory& other) = delete; + + virtual int process(Buffer* dataIn) override; + const char* name() override { return "OutputMemory"; } + virtual meta_vec_t process_metadata( + const meta_vec_t& metadataIn) override; + + meta_vec_t get_latest_metadata(void); void setOutput(Buffer* dataOut); protected: Buffer* myDataOut; + meta_vec_t myMetadata; #if OUTPUT_MEM_HISTOGRAM // keep track of max value diff --git a/src/TimestampDecoder.cpp b/src/TimestampDecoder.cpp index 26deb60..6cf2875 100644 --- a/src/TimestampDecoder.cpp +++ b/src/TimestampDecoder.cpp @@ -37,6 +37,22 @@ //#define MDEBUG(fmt, args...) fprintf (LOG, "*****" fmt , ## args) #define MDEBUG(fmt, args...) PDEBUG(fmt, ## args) +TimestampDecoder::TimestampDecoder(double& offset_s, unsigned tist_delay_stages) : + RemoteControllable("tist"), + timestamp_offset(offset_s), + m_tist_delay_stages(tist_delay_stages) +{ + // Properly initialise temp_time + memset(&temp_time, 0, sizeof(temp_time)); + const time_t timep = 0; + gmtime_r(&timep, &temp_time); + + RC_ADD_PARAMETER(offset, "TIST offset [s]"); + RC_ADD_PARAMETER(timestamp, "FCT and timestamp [s]"); + + etiLog.level(info) << "Setting up timestamp decoder with " << + timestamp_offset << " offset"; +} void TimestampDecoder::calculateTimestamp(frame_timestamp& ts) { @@ -48,6 +64,7 @@ void TimestampDecoder::calculateTimestamp(frame_timestamp& ts) ts_queued->timestamp_sec = time_secs; ts_queued->timestamp_pps = time_pps; ts_queued->fct = latestFCT; + ts_queued->fp = latestFP; ts_queued->timestamp_refresh = offset_changed; offset_changed = false; @@ -102,7 +119,29 @@ void TimestampDecoder::calculateTimestamp(frame_timestamp& ts) //ts.print("calc2 "); } -void TimestampDecoder::pushMNSCData(int framephase, uint16_t mnsc) +std::shared_ptr TimestampDecoder::getTimestamp() +{ + std::shared_ptr ts = + std::make_shared(); + + /* Push new timestamp into queue */ + ts->timestamp_valid = full_timestamp_received; + ts->timestamp_sec = time_secs; + ts->timestamp_pps = time_pps; + ts->fct = latestFCT; + ts->fp = latestFP; + + ts->timestamp_refresh = offset_changed; + offset_changed = false; + + MDEBUG("time_secs=%d, time_pps=%f\n", time_secs, + (double)time_pps / 16384000.0); + *ts += timestamp_offset; + + return ts; +} + +void TimestampDecoder::pushMNSCData(uint8_t framephase, uint16_t mnsc) { struct eti_MNSC_TIME_0 *mnsc0; struct eti_MNSC_TIME_1 *mnsc1; @@ -190,7 +229,7 @@ void TimestampDecoder::updateTimestampPPS(uint32_t pps) } void TimestampDecoder::updateTimestampEti( - int framephase, + uint8_t framephase, uint16_t mnsc, uint32_t pps, // In units of 1/16384000 s int32_t fct) @@ -198,16 +237,19 @@ void TimestampDecoder::updateTimestampEti( updateTimestampPPS(pps); pushMNSCData(framephase, mnsc); latestFCT = fct; + latestFP = framephase; } void TimestampDecoder::updateTimestampEdi( uint32_t seconds_utc, uint32_t pps, // In units of 1/16384000 s - int32_t fct) + int32_t fct, + uint8_t framephase) { time_secs = seconds_utc; time_pps = pps; latestFCT = fct; + latestFP = framephase; full_timestamp_received = true; } diff --git a/src/TimestampDecoder.h b/src/TimestampDecoder.h index 943b241..4bbf5cc 100644 --- a/src/TimestampDecoder.h +++ b/src/TimestampDecoder.h @@ -40,6 +40,7 @@ struct frame_timestamp { // Which frame count does this timestamp apply to int32_t fct; + uint8_t fp; // Frame Phase uint32_t timestamp_sec; uint32_t timestamp_pps; // In units of 1/16384000 s @@ -56,6 +57,7 @@ struct frame_timestamp this->timestamp_valid = rhs.timestamp_valid; this->timestamp_refresh = rhs.timestamp_refresh; this->fct = rhs.fct; + this->fp = rhs.fp; } return *this; @@ -103,49 +105,23 @@ struct frame_timestamp class TimestampDecoder : public RemoteControllable { public: - TimestampDecoder( - /* The modulator adds this offset to the TIST to define time of - * frame transmission - */ - double& offset_s, - - /* Specifies by how many stages the timestamp must be delayed. - * (e.g. The FIRFilter is pipelined, therefore we must increase - * tist_delay_stages by one if the filter is used - */ - unsigned tist_delay_stages) : - RemoteControllable("tist"), - timestamp_offset(offset_s) - { - m_tist_delay_stages = tist_delay_stages; - inhibit_second_update = 0; - time_pps = 0.0; - time_secs = 0; - latestFCT = 0; - enableDecode = false; - full_timestamp_received = false; - - // Properly initialise temp_time - memset(&temp_time, 0, sizeof(temp_time)); - const time_t timep = 0; - gmtime_r(&timep, &temp_time); - - offset_changed = false; - - RC_ADD_PARAMETER(offset, "TIST offset [s]"); - RC_ADD_PARAMETER(timestamp, "FCT and timestamp [s]"); - - etiLog.level(info) << "Setting up timestamp decoder with " << - timestamp_offset << " offset"; - - }; + /* offset_s: The modulator adds this offset to the TIST to define time of + * frame transmission + * + * tist_delay_stages: Specifies by how many stages the timestamp must + * be delayed. (e.g. The FIRFilter is pipelined, therefore we must + * increase tist_delay_stages by one if the filter is used + */ + TimestampDecoder(double& offset_s, unsigned tist_delay_stages); /* Calculate the timestamp for the current frame. */ void calculateTimestamp(frame_timestamp& ts); + std::shared_ptr getTimestamp(void); + /* Update timestamp data from ETI */ void updateTimestampEti( - int framephase, + uint8_t framephase, uint16_t mnsc, uint32_t pps, // In units of 1/16384000 s int32_t fct); @@ -154,7 +130,8 @@ class TimestampDecoder : public RemoteControllable void updateTimestampEdi( uint32_t seconds_utc, uint32_t pps, // In units of 1/16384000 s - int32_t fct); + int32_t fct, + uint8_t framephase); /*********** REMOTE CONTROL ***************/ @@ -171,7 +148,7 @@ class TimestampDecoder : public RemoteControllable protected: /* Push a new MNSC field into the decoder */ - void pushMNSCData(int framephase, uint16_t mnsc); + void pushMNSCData(uint8_t framephase, uint16_t mnsc); /* Each frame contains the TIST field with the PPS offset. * For each frame, this function must be called to update @@ -191,21 +168,22 @@ class TimestampDecoder : public RemoteControllable void updateTimestampSeconds(uint32_t secs); struct tm temp_time; - uint32_t time_secs; - int32_t latestFCT; - uint32_t time_pps; + uint32_t time_secs = 0; + int32_t latestFCT = 0; + uint32_t latestFP = 0; + uint32_t time_pps = 0; double& timestamp_offset; unsigned m_tist_delay_stages; - int inhibit_second_update; - bool offset_changed; + int inhibit_second_update = 0; + bool offset_changed = false; /* When the type or identifier don't match, the decoder must * be disabled */ - bool enableDecode; + bool enableDecode = false; /* Disable timstamps until full time has been received */ - bool full_timestamp_received; + bool full_timestamp_received = false; /* when pipelining, we must shift the calculated timestamps * through this queue. Otherwise, it would not be possible to @@ -213,6 +191,5 @@ class TimestampDecoder : public RemoteControllable * FIRFilter (1 stage pipeline) */ std::queue > queue_timestamps; - }; -- cgit v1.2.3 From 23b369e1145c5652778345827b7df9c33e09d0e8 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 7 Jan 2018 09:21:25 +0100 Subject: Add metadata latency for all PipelinedModCodec --- src/DabMod.cpp | 6 ++++++ src/ModPlugin.cpp | 19 ++++++++++++++++--- src/ModPlugin.h | 52 +++++++++++++++++++++++++++++++--------------------- src/OutputFile.cpp | 22 +++++++++++++--------- src/OutputFile.h | 6 ++++++ 5 files changed, 72 insertions(+), 33 deletions(-) diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 25a93bf..4a4ea82 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -426,6 +426,12 @@ int launch_modulator(int argc, char* argv[]) } #endif + // TODO remove + auto output_as_file = dynamic_pointer_cast(output); + if (output_as_file) { + output_as_file->setETISource(modulator->getEtiSource()); + } + inputReader->PrintInfo(); run_modulator_state_t st = run_modulator(m); diff --git a/src/ModPlugin.cpp b/src/ModPlugin.cpp index c39d883..f86bfb2 100644 --- a/src/ModPlugin.cpp +++ b/src/ModPlugin.cpp @@ -74,9 +74,9 @@ int ModOutput::process( PipelinedModCodec::PipelinedModCodec() : ModCodec(), - m_number_of_runs(0), m_input_queue(), m_output_queue(), + m_metadata_fifo(), m_running(false), m_thread() { @@ -107,7 +107,7 @@ int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut) m_input_queue.push(inbuffer); - if (m_number_of_runs > 0) { + if (m_ready_to_output_data) { std::shared_ptr outbuffer; m_output_queue.wait_and_pop(outbuffer); @@ -116,13 +116,26 @@ int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut) else { dataOut->setLength(dataIn->getLength()); memset(dataOut->getData(), 0, dataOut->getLength()); - m_number_of_runs++; + m_ready_to_output_data = true; } return dataOut->getLength(); } +meta_vec_t PipelinedModCodec::process_metadata(const meta_vec_t& metadataIn) +{ + m_metadata_fifo.push_back(metadataIn); + if (m_metadata_fifo.size() == 2) { + auto r = std::move(m_metadata_fifo.front()); + m_metadata_fifo.pop_front(); + return r; + } + else { + return {}; + } +} + void PipelinedModCodec::process_thread() { set_thread_name(name()); diff --git a/src/ModPlugin.h b/src/ModPlugin.h index 3c3e8b3..c0f1c1a 100644 --- a/src/ModPlugin.h +++ b/src/ModPlugin.h @@ -30,16 +30,34 @@ # include #endif - #include "Buffer.h" #include "ThreadsafeQueue.h" - -#include +#include #include #include #include #include +// All flowgraph elements derive from ModPlugin, or a variant of it. +// Some ModPlugins also support handling metadata. + +struct frame_timestamp; +struct flowgraph_metadata { + std::shared_ptr ts; +}; + +using meta_vec_t = std::vector; + +/* ModPlugins that support metadata derive from ModMetadata */ +class ModMetadata { + public: + // Receives metadata from all inputs, and process them, and output + // a sequence of metadata. + virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) = 0; +}; + + +/* Abstract base class for all flowgraph elements */ class ModPlugin { public: @@ -69,7 +87,11 @@ public: virtual int process(Buffer* const dataIn, Buffer* dataOut) = 0; }; -class PipelinedModCodec : public ModCodec +/* Pipelined ModCodecs run their processing in a separate thread, and + * have a one-call-to-process() latency. Because of this latency, they + * must also handle the metadata + */ +class PipelinedModCodec : public ModCodec, public ModMetadata { public: PipelinedModCodec(); @@ -82,6 +104,8 @@ public: virtual int process(Buffer* const dataIn, Buffer* dataOut) final; virtual const char* name() = 0; + virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) final; + protected: // Once the instance implementing PipelinedModCodec has been constructed, // it must call start_pipeline_thread() @@ -89,11 +113,13 @@ protected: virtual int internal_process(Buffer* const dataIn, Buffer* dataOut) = 0; private: - size_t m_number_of_runs; + bool m_ready_to_output_data = false; ThreadsafeQueue > m_input_queue; ThreadsafeQueue > m_output_queue; + std::deque m_metadata_fifo; + std::atomic m_running; std::thread m_thread; void process_thread(void); @@ -120,19 +146,3 @@ public: virtual int process(Buffer* dataIn) = 0; }; -struct frame_timestamp; -struct flowgraph_metadata { - std::shared_ptr ts; -}; - - -using meta_vec_t = std::vector; - -/* Some ModPlugins also support metadata */ -class ModMetadata { - public: - // Receives metadata from all inputs, and process them, and output - // a sequence of metadata. - virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) = 0; -}; - diff --git a/src/OutputFile.cpp b/src/OutputFile.cpp index 3bb45c4..481e858 100644 --- a/src/OutputFile.cpp +++ b/src/OutputFile.cpp @@ -77,22 +77,26 @@ meta_vec_t OutputFile::process_metadata(const meta_vec_t& metadataIn) for (const auto& md : metadataIn) { if (md.ts) { - ss << "FCT=" << md.ts->fct << + ss << " FCT=" << md.ts->fct << " FP=" << (int)md.ts->fp; - if (md.ts->timestamp_valid) { - ss << " ts=" << md.ts->timestamp_sec << - "+" << md.ts->timestamp_pps << ", "; - } - else { - ss << " no ts"; - } } else { - ss << "void, "; + ss << " void, "; } } + if (myEtiSource) { + frame_timestamp ts; + myEtiSource->calculateTimestamp(ts); + ss << " ETI FCT=" << ts.fct; + } + etiLog.level(debug) << "Output File got metadata: " << ss.str(); return {}; } + +void OutputFile::setETISource(EtiSource *etiSource) +{ + myEtiSource = etiSource; +} diff --git a/src/OutputFile.h b/src/OutputFile.h index 2b92369..97fdcb7 100644 --- a/src/OutputFile.h +++ b/src/OutputFile.h @@ -32,6 +32,7 @@ #include "ModPlugin.h" +#include "EtiReader.h" #include #include @@ -57,7 +58,12 @@ public: virtual meta_vec_t process_metadata( const meta_vec_t& metadataIn) override; + void setETISource(EtiSource *etiSource); + protected: + // TODO remove + EtiSource *myEtiSource = nullptr; + std::string myFilename; std::unique_ptr myFile; }; -- cgit v1.2.3 From 37b4c4591d6835c4b88ded16286d6f4145aea367 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 7 Jan 2018 09:25:22 +0100 Subject: Replace boost lock by std lock in TII --- src/TII.cpp | 6 +++--- src/TII.h | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/TII.cpp b/src/TII.cpp index 89cd6d0..3c5823b 100644 --- a/src/TII.cpp +++ b/src/TII.cpp @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2017 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -187,7 +187,7 @@ int TII::process(Buffer* dataIn, Buffer* dataOut) memset(dataOut->getData(), 0, dataOut->getLength()); if (m_conf.enable and m_insert) { - boost::mutex::scoped_lock lock(m_enabled_carriers_mutex); + std::lock_guard lock(m_enabled_carriers_mutex); complexf* in = reinterpret_cast(dataIn->getData()); complexf* out = reinterpret_cast(dataOut->getData()); @@ -231,7 +231,7 @@ void TII::enable_carrier(int k) { void TII::prepare_pattern() { int comb = m_conf.comb; // Convert from unsigned to signed - boost::mutex::scoped_lock lock(m_enabled_carriers_mutex); + std::lock_guard lock(m_enabled_carriers_mutex); // Clear previous pattern for (size_t i = 0; i < m_enabled_carriers.size(); i++) { diff --git a/src/TII.h b/src/TII.h index b0ffdb3..b86dbbf 100644 --- a/src/TII.h +++ b/src/TII.h @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2017 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -35,8 +35,8 @@ #include "ModPlugin.h" #include "RemoteControl.h" -#include -#include +#include +#include #include #include #include @@ -118,7 +118,7 @@ class TII : public ModCodec, public RemoteControllable // m_enabled_carriers is read by modulator thread, and written // to by RC thread. - mutable boost::mutex m_enabled_carriers_mutex; + mutable std::mutex m_enabled_carriers_mutex; // m_enabled_carriers is true only for the first carrier in the // active pair -- cgit v1.2.3 From 0315433aef00644085d2278af405eaedbc184c5c Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 7 Jan 2018 09:40:06 +0100 Subject: use unique_ptr to handle FILE* in Log --- src/Log.cpp | 39 ++++++++++++++++++++++++++------------- src/Log.h | 41 ++++++++++++----------------------------- src/OutputFile.h | 10 ++-------- 3 files changed, 40 insertions(+), 50 deletions(-) diff --git a/src/Log.cpp b/src/Log.cpp index 0792fcf..f2219eb 100644 --- a/src/Log.cpp +++ b/src/Log.cpp @@ -109,6 +109,17 @@ LogLine Logger::level(log_level_t level) return LogLine(this, level); } +LogToFile::LogToFile(const std::string& filename) : name("FILE") +{ + FILE* fd = fopen(filename.c_str(), "a"); + if (fd == nullptr) { + fprintf(stderr, "Cannot open log file !"); + throw std::runtime_error("Cannot open log file !"); + } + + log_file.reset(fd); +} + void LogToFile::log(log_level_t level, const std::string& message) { if (level != log_level_t::trace) { @@ -116,9 +127,9 @@ void LogToFile::log(log_level_t level, const std::string& message) "DEBUG", "INFO", "WARN", "ERROR", "ALERT", "EMERG"}; // fprintf is thread-safe - fprintf(log_file, SYSLOG_IDENT ": %s: %s\n", + fprintf(log_file.get(), SYSLOG_IDENT ": %s: %s\n", log_level_text[(size_t)level], message.c_str()); - fflush(log_file); + fflush(log_file.get()); } } @@ -142,31 +153,33 @@ void LogToSyslog::log(log_level_t level, const std::string& message) } } -LogTracer::LogTracer(const string& trace_filename) +LogTracer::LogTracer(const string& trace_filename) : name("TRACE") { - name = "TRACE"; etiLog.level(info) << "Setting up TRACE to " << trace_filename; - m_trace_file = fopen(trace_filename.c_str(), "a"); - if (m_trace_file == NULL) { + FILE* fd = fopen(trace_filename.c_str(), "a"); + if (fd == nullptr) { fprintf(stderr, "Cannot open trace file !"); throw std::runtime_error("Cannot open trace file !"); } + m_trace_file.reset(fd); - auto now = chrono::steady_clock::now().time_since_epoch(); - m_trace_micros_startup = - chrono::duration_cast(now).count(); + using namespace std::chrono; + auto now = steady_clock::now().time_since_epoch(); + m_trace_micros_startup = duration_cast(now).count(); - fprintf(m_trace_file, "0,TRACER,startup at %ld\n", m_trace_micros_startup); + fprintf(m_trace_file.get(), + "0,TRACER,startup at %ld\n", m_trace_micros_startup); } void LogTracer::log(log_level_t level, const std::string& message) { if (level == log_level_t::trace) { - const auto now = chrono::steady_clock::now().time_since_epoch(); - const auto micros = chrono::duration_cast(now).count(); + using namespace std::chrono; + const auto now = steady_clock::now().time_since_epoch(); + const auto micros = duration_cast(now).count(); - fprintf(m_trace_file, "%ld,%s\n", + fprintf(m_trace_file.get(), "%ld,%s\n", micros - m_trace_micros_startup, message.c_str()); } diff --git a/src/Log.h b/src/Log.h index ae252a6..0e09bc9 100644 --- a/src/Log.h +++ b/src/Log.h @@ -57,7 +57,7 @@ static const std::string levels_as_str[] = class LogBackend { public: virtual void log(log_level_t level, const std::string& message) = 0; - virtual std::string get_name() = 0; + virtual std::string get_name() const = 0; }; /** A Logging backend for Syslog */ @@ -73,7 +73,7 @@ class LogToSyslog : public LogBackend { void log(log_level_t level, const std::string& message); - std::string get_name() { return name; } + std::string get_name() const { return name; } private: const std::string name; @@ -84,27 +84,15 @@ class LogToSyslog : public LogBackend { class LogToFile : public LogBackend { public: - LogToFile(const std::string& filename) : name("FILE") { - log_file = fopen(filename.c_str(), "a"); - if (log_file == NULL) { - fprintf(stderr, "Cannot open log file !"); - throw std::runtime_error("Cannot open log file !"); - } - } - - ~LogToFile() { - if (log_file != NULL) { - fclose(log_file); - } - } - + LogToFile(const std::string& filename); void log(log_level_t level, const std::string& message); - - std::string get_name() { return name; } + std::string get_name() const { return name; } private: const std::string name; - FILE* log_file; + + struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}}; + std::unique_ptr log_file; LogToFile(const LogToFile& other) = delete; const LogToFile& operator=(const LogToFile& other) = delete; @@ -113,19 +101,14 @@ class LogToFile : public LogBackend { class LogTracer : public LogBackend { public: LogTracer(const std::string& filename); - - ~LogTracer() { - if (m_trace_file != NULL) { - fclose(m_trace_file); - } - } - void log(log_level_t level, const std::string& message); - std::string get_name() { return name; } + std::string get_name() const { return name; } private: std::string name; - uint64_t m_trace_micros_startup; - FILE* m_trace_file; + uint64_t m_trace_micros_startup = 0; + + struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}}; + std::unique_ptr m_trace_file; LogTracer(const LogTracer& other) = delete; const LogTracer& operator=(const LogTracer& other) = delete; diff --git a/src/OutputFile.h b/src/OutputFile.h index 97fdcb7..a586921 100644 --- a/src/OutputFile.h +++ b/src/OutputFile.h @@ -39,14 +39,6 @@ #include #include -struct FILEDeleter { - void operator()(FILE* fd) { - if (fd) { - fclose(fd); - } - } -}; - class OutputFile : public ModOutput, public ModMetadata { public: @@ -65,6 +57,8 @@ protected: EtiSource *myEtiSource = nullptr; std::string myFilename; + + struct FILEDeleter{ void operator()(FILE* fd){ if (fd) fclose(fd); }}; std::unique_ptr myFile; }; -- cgit v1.2.3