diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2018-01-07 10:38:55 +0100 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2018-01-07 10:38:55 +0100 | 
| commit | 8953a94893fe39a10b044ea62cf6971d36801880 (patch) | |
| tree | 0cd0ab921827400f3ada7449827bbffaa97e7450 /src | |
| parent | e0f9c8909ecba56da4c7a2ec3507b8af19b737bd (diff) | |
| parent | 0315433aef00644085d2278af405eaedbc184c5c (diff) | |
| download | dabmod-8953a94893fe39a10b044ea62cf6971d36801880.tar.gz dabmod-8953a94893fe39a10b044ea62cf6971d36801880.tar.bz2 dabmod-8953a94893fe39a10b044ea62cf6971d36801880.zip  | |
Merge branch 'flowgraphmetadata' into outputRefactoring
Diffstat (limited to 'src')
| -rw-r--r-- | src/BlockPartitioner.cpp | 45 | ||||
| -rw-r--r-- | src/BlockPartitioner.h | 21 | ||||
| -rw-r--r-- | src/DabMod.cpp | 6 | ||||
| -rw-r--r-- | src/DabModulator.cpp | 13 | ||||
| -rw-r--r-- | src/DabModulator.h | 9 | ||||
| -rw-r--r-- | src/EtiReader.cpp | 7 | ||||
| -rw-r--r-- | src/FicSource.cpp | 24 | ||||
| -rw-r--r-- | src/FicSource.h | 11 | ||||
| -rw-r--r-- | src/Flowgraph.cpp | 68 | ||||
| -rw-r--r-- | src/Flowgraph.h | 14 | ||||
| -rw-r--r-- | src/Log.cpp | 39 | ||||
| -rw-r--r-- | src/Log.h | 41 | ||||
| -rw-r--r-- | src/ModPlugin.cpp | 19 | ||||
| -rw-r--r-- | src/ModPlugin.h | 37 | ||||
| -rw-r--r-- | src/OutputFile.cpp | 60 | ||||
| -rw-r--r-- | src/OutputFile.h | 24 | ||||
| -rw-r--r-- | src/OutputMemory.cpp | 14 | ||||
| -rw-r--r-- | src/OutputMemory.h | 16 | ||||
| -rw-r--r-- | src/TII.cpp | 6 | ||||
| -rw-r--r-- | src/TII.h | 8 | ||||
| -rw-r--r-- | src/TimestampDecoder.cpp | 48 | ||||
| -rw-r--r-- | src/TimestampDecoder.h | 71 | 
22 files changed, 428 insertions, 173 deletions
diff --git a/src/BlockPartitioner.cpp b/src/BlockPartitioner.cpp index 9e9f80b..54405d9 100644 --- a/src/BlockPartitioner.cpp +++ b/src/BlockPartitioner.cpp @@ -1,6 +1,11 @@  /*     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2018 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org   */  /*     This file is part of ODR-DabMod. @@ -21,6 +26,8 @@  #include "BlockPartitioner.h"  #include "PcDebug.h" +#include "Log.h" +#include "TimestampDecoder.h"  #include <stdio.h>  #include <stdexcept> @@ -31,6 +38,7 @@  BlockPartitioner::BlockPartitioner(unsigned mode, unsigned phase) :      ModMux(), +    ModMetadata(),      d_mode(mode)  {      PDEBUG("BlockPartitioner::BlockPartitioner(%i)\n", mode); @@ -68,17 +76,11 @@ BlockPartitioner::BlockPartitioner(unsigned mode, unsigned phase) :      d_cifNb = 0;      // For Synchronisation purpose, count nb of CIF to drop      d_cifPhase = phase % d_cifCount; +    d_metaPhase = phase % d_cifCount;      d_cifSize = 864 * 8;  } -BlockPartitioner::~BlockPartitioner() -{ -    PDEBUG("BlockPartitioner::~BlockPartitioner()\n"); - -} - -  // dataIn[0] -> FIC  // dataIn[1] -> CIF  int BlockPartitioner::process(std::vector<Buffer*> dataIn, Buffer* dataOut) @@ -124,10 +126,10 @@ int BlockPartitioner::process(std::vector<Buffer*> dataIn, Buffer* dataOut)      uint8_t* out = reinterpret_cast<uint8_t*>(dataOut->getData());      // Copy FIC data -    PDEBUG("Writting FIC %zu bytes to %zu\n", d_ficSize, d_cifNb * d_ficSize); +    PDEBUG("Writing FIC %zu bytes to %zu\n", d_ficSize, d_cifNb * d_ficSize);      memcpy(out + (d_cifNb * d_ficSize), fic, d_ficSize);      // Copy CIF data -    PDEBUG("Writting CIF %u bytes to %zu\n", 864 * 8, +    PDEBUG("Writing CIF %u bytes to %zu\n", 864 * 8,              (d_cifCount * d_ficSize) + (d_cifNb * 864 * 8));      memcpy(out + (d_cifCount * d_ficSize) + (d_cifNb * 864 * 8), cif, 864 * 8); @@ -137,3 +139,28 @@ int BlockPartitioner::process(std::vector<Buffer*> dataIn, Buffer* dataOut)      return d_cifNb == 0;  } + +meta_vec_t BlockPartitioner::process_metadata(const meta_vec_t& metadataIn) +{ +    // Synchronize CIF phase +    if (d_metaPhase != 0) { +        if (++d_metaPhase == d_cifCount) { +            d_metaPhase = 0; +        } +        // Drop this metadata +        return {}; +    } + +    if (d_cifNb == 1) { +        d_meta.clear(); +    } + +    std::copy(metadataIn.begin(), metadataIn.end(), std::back_inserter(d_meta)); + +    if (d_cifNb == 0) { +        return d_meta; +    } +    else { +        return {}; +    } +} diff --git a/src/BlockPartitioner.h b/src/BlockPartitioner.h index 90cffa3..a4656a1 100644 --- a/src/BlockPartitioner.h +++ b/src/BlockPartitioner.h @@ -1,6 +1,11 @@  /*     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2018 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org   */  /*     This file is part of ODR-DabMod. @@ -25,32 +30,32 @@  #   include <config.h>  #endif -  #include "ModPlugin.h"  #include <vector> +#include <cstddef> -#include <sys/types.h> - - -class BlockPartitioner : public ModMux +class BlockPartitioner : public ModMux, public ModMetadata  {  public:      BlockPartitioner(unsigned mode, unsigned phase); -    virtual ~BlockPartitioner(); -    BlockPartitioner(const BlockPartitioner&); -    BlockPartitioner& operator=(const BlockPartitioner&);      int process(std::vector<Buffer*> dataIn, Buffer* dataOut);      const char* name() { return "BlockPartitioner"; } +    // The implementation assumes process_metadata is always called after process +    virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn); +  protected:      int d_mode;      size_t d_ficSize;      size_t d_cifCount;      size_t d_cifNb;      size_t d_cifPhase; +    size_t d_metaPhase;      size_t d_cifSize;      size_t d_outputFramesize;      size_t d_outputFramecount; + +    meta_vec_t d_meta;  }; diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 12cfa43..9880938 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -425,6 +425,12 @@ int launch_modulator(int argc, char* argv[])                  ((Output::SDR*)output.get())->setETISource(modulator->getEtiSource());              } +            // TODO remove +            auto output_as_file = dynamic_pointer_cast<OutputFile>(output); +            if (output_as_file) { +                output_as_file->setETISource(modulator->getEtiSource()); +            } +              inputReader->PrintInfo();              run_modulator_state_t st = run_modulator(m); diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp index 1ea06de..337a595 100644 --- a/src/DabModulator.cpp +++ b/src/DabModulator.cpp @@ -3,7 +3,7 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Copyright (C) 2017 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -221,7 +221,7 @@ int DabModulator::process(Buffer* dataOut)              rcs.enrol(cifPoly.get());          } -        auto myOutput = make_shared<OutputMemory>(dataOut); +        myOutput = make_shared<OutputMemory>(dataOut);          shared_ptr<Resampler> cifRes;          if (m_settings.outputRate != 2048000) { @@ -376,3 +376,12 @@ int DabModulator::process(Buffer* dataOut)      return myFlowgraph->run();  } +meta_vec_t DabModulator::process_metadata(const meta_vec_t& metadataIn) +{ +    if (myOutput) { +        return myOutput->get_latest_metadata(); +    } + +    return {}; +} + diff --git a/src/DabModulator.h b/src/DabModulator.h index 6878853..e806c92 100644 --- a/src/DabModulator.h +++ b/src/DabModulator.h @@ -3,7 +3,7 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Copyright (C) 2017 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -46,7 +46,7 @@  #include "TII.h" -class DabModulator : public ModInput +class DabModulator : public ModInput, public ModMetadata  {  public:      DabModulator(EtiSource& etiSource, @@ -55,6 +55,9 @@ public:      int process(Buffer* dataOut);      const char* name() { return "DabModulator"; } +    virtual meta_vec_t process_metadata( +            const meta_vec_t& metadataIn); +      /* Required to get the timestamp */      EtiSource* getEtiSource() { return &myEtiSource; } @@ -73,5 +76,7 @@ protected:      size_t mySymSize;      size_t myFicSizeOut;      size_t myFicSizeIn; + +    std::shared_ptr<OutputMemory> myOutput;  }; diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp index d84ed1f..dc5df84 100644 --- a/src/EtiReader.cpp +++ b/src/EtiReader.cpp @@ -281,6 +281,8 @@ int EtiReader::loadEtiData(const Buffer& dataIn)      myTimestampDecoder.updateTimestampEti(eti_fc.FP & 0x3,              eti_eoh.MNSC, getPPSOffset(), eti_fc.FCT); +    myFicSource->loadTimestamp(myTimestampDecoder.getTimestamp()); +      return dataIn.getLength() - input_size;  } @@ -533,8 +535,9 @@ void EdiReader::assemble()      const std::time_t posix_timestamp_1_jan_2000 = 946684800;      auto utc_ts = posix_timestamp_1_jan_2000 + m_seconds - m_utco; -    m_timestamp_decoder.updateTimestampEdi( -            utc_ts, m_fc.tsta, m_fc.fct()); +    m_timestamp_decoder.updateTimestampEdi(utc_ts, m_fc.tsta, m_fc.fct(), m_fc.fp); + +    myFicSource->loadTimestamp(m_timestamp_decoder.getTimestamp());      m_frameReady = true;  } diff --git a/src/FicSource.cpp b/src/FicSource.cpp index 04197db..2b95085 100644 --- a/src/FicSource.cpp +++ b/src/FicSource.cpp @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2016 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -26,6 +26,8 @@  #include "FicSource.h"  #include "PcDebug.h" +#include "Log.h" +#include "TimestampDecoder.h"  #include <stdexcept>  #include <string> @@ -92,3 +94,23 @@ int FicSource::process(Buffer* outputData)      return outputData->getLength();  } +void FicSource::loadTimestamp(const std::shared_ptr<struct frame_timestamp>& ts) +{ +    d_ts = ts; +} + + +meta_vec_t FicSource::process_metadata(const meta_vec_t& metadataIn) +{ +    if (not d_ts) { +        return {}; +    } + +    using namespace std; +    meta_vec_t md_vec; +    flowgraph_metadata meta; +    meta.ts = d_ts; +    md_vec.push_back(meta); +    return md_vec; +} + diff --git a/src/FicSource.h b/src/FicSource.h index 77ac741..93c1a7f 100644 --- a/src/FicSource.h +++ b/src/FicSource.h @@ -36,7 +36,7 @@  #include <vector>  #include <sys/types.h> -class FicSource : public ModInput +class FicSource : public ModInput, public ModMetadata  {  public:      FicSource(unsigned ficf, unsigned mid); @@ -45,12 +45,17 @@ public:      const std::vector<PuncturingRule>& get_rules();      void loadFicData(const Buffer& fic); -    int process(Buffer* outputData); -    const char* name() { return "FicSource"; } +    int process(Buffer* outputData) override; +    const char* name() override { return "FicSource"; } + +    void loadTimestamp(const std::shared_ptr<struct frame_timestamp>& ts); +    virtual meta_vec_t process_metadata( +            const meta_vec_t& metadataIn) override;  private:      size_t d_framesize;      Buffer d_buffer; +    std::shared_ptr<struct frame_timestamp> d_ts;      std::vector<PuncturingRule> d_puncturing_rules;  }; diff --git a/src/Flowgraph.cpp b/src/Flowgraph.cpp index 465ef41..0b80a8c 100644 --- a/src/Flowgraph.cpp +++ b/src/Flowgraph.cpp @@ -26,6 +26,8 @@  #include "Flowgraph.h"  #include "PcDebug.h" +#include "Log.h" +#include "TimestampDecoder.h"  #include <string>  #include <memory>  #include <algorithm> @@ -57,9 +59,10 @@ Node::~Node()      assert(myOutputBuffers.size() == 0);  } -void Node::addOutputBuffer(Buffer::sptr& buffer) +void Node::addOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)  {      myOutputBuffers.push_back(buffer); +    myOutputMetadata.push_back(md);  #if DEBUG      std::string fname = string(myPlugin->name()) +          "-" + to_string(myDebugFiles.size()) + @@ -71,7 +74,7 @@ void Node::addOutputBuffer(Buffer::sptr& buffer)  #endif  } -void Node::removeOutputBuffer(Buffer::sptr& buffer) +void Node::removeOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)  {      auto it = std::find(              myOutputBuffers.begin(), @@ -89,14 +92,23 @@ void Node::removeOutputBuffer(Buffer::sptr& buffer)  #endif          myOutputBuffers.erase(it);      } + +    auto mdit = std::find( +            myOutputMetadata.begin(), +            myOutputMetadata.end(), +            md); +    if (mdit != myOutputMetadata.end()) { +        myOutputMetadata.erase(mdit); +    }  } -void Node::addInputBuffer(Buffer::sptr& buffer) +void Node::addInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)  {      myInputBuffers.push_back(buffer); +    myInputMetadata.push_back(md);  } -void Node::removeInputBuffer(Buffer::sptr& buffer) +void Node::removeInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)  {      auto it = std::find(              myInputBuffers.begin(), @@ -105,6 +117,14 @@ void Node::removeInputBuffer(Buffer::sptr& buffer)      if (it != myInputBuffers.end()) {          myInputBuffers.erase(it);      } + +    auto mdit = std::find( +            myInputMetadata.begin(), +            myInputMetadata.end(), +            md); +    if (mdit != myInputMetadata.end()) { +        myInputMetadata.erase(mdit); +    }  }  int Node::process() @@ -127,6 +147,36 @@ int Node::process()      }      int ret = myPlugin->process(inBuffers, outBuffers); + +    // Collect all incoming metadata into a single vector +    meta_vec_t all_input_mds; +    for (auto& md_vec_sp : myInputMetadata) { +        if (md_vec_sp) { +            copy(md_vec_sp->begin(), md_vec_sp->end(), +                    back_inserter(all_input_mds)); +        } +    } + +    auto mod_meta = dynamic_pointer_cast<ModMetadata>(myPlugin); +    if (mod_meta) { +        auto outputMetadata = mod_meta->process_metadata(all_input_mds); +        // Distribute the result metadata to all outputs +        for (auto& out_md : myOutputMetadata) { +            out_md->clear(); +            std::move(outputMetadata.begin(), outputMetadata.end(), +                    std::back_inserter(*out_md)); +        } +    } +    else { +        // Propagate the unmodified input metadata to all outputs +        for (auto& out_md : myOutputMetadata) { +            out_md->clear(); +            std::move(all_input_mds.begin(), all_input_mds.end(), +                    std::back_inserter(*out_md)); +        } +    } + +  #if DEBUG      assert(myDebugFiles.size() == myOutputBuffers.size()); @@ -158,8 +208,10 @@ Edge::Edge(shared_ptr<Node>& srcNode, shared_ptr<Node>& dstNode) :              this);      myBuffer = make_shared<Buffer>(); -    srcNode->addOutputBuffer(myBuffer); -    dstNode->addInputBuffer(myBuffer); +    myMetadata = make_shared<vector<flowgraph_metadata> >(); + +    srcNode->addOutputBuffer(myBuffer, myMetadata); +    dstNode->addInputBuffer(myBuffer, myMetadata);  } @@ -168,8 +220,8 @@ Edge::~Edge()      PDEBUG("Edge::~Edge() @ %p\n", this);      if (myBuffer) { -        mySrcNode->removeOutputBuffer(myBuffer); -        myDstNode->removeInputBuffer(myBuffer); +        mySrcNode->removeOutputBuffer(myBuffer, myMetadata); +        myDstNode->removeInputBuffer(myBuffer, myMetadata);      }  } diff --git a/src/Flowgraph.h b/src/Flowgraph.h index ebb7314..2742824 100644 --- a/src/Flowgraph.h +++ b/src/Flowgraph.h @@ -39,6 +39,8 @@  #include <list>  #include <cstdio> +using Metadata_vec_sptr = std::shared_ptr<std::vector<flowgraph_metadata> >; +  class Node  {  public: @@ -55,15 +57,18 @@ public:          myProcessTime += processTime;      } -    void addOutputBuffer(Buffer::sptr& buffer); -    void removeOutputBuffer(Buffer::sptr& buffer); +    void addOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md); +    void removeOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md); -    void addInputBuffer(Buffer::sptr& buffer); -    void removeInputBuffer(Buffer::sptr& buffer); +    void addInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md); +    void removeInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md);  protected:      std::list<Buffer::sptr> myInputBuffers;      std::list<Buffer::sptr> myOutputBuffers; +    std::list<Metadata_vec_sptr> myInputMetadata; +    std::list<Metadata_vec_sptr> myOutputMetadata; +  #if DEBUG      std::list<FILE*> myDebugFiles;  #endif @@ -85,6 +90,7 @@ protected:      std::shared_ptr<Node> mySrcNode;      std::shared_ptr<Node> myDstNode;      std::shared_ptr<Buffer> myBuffer; +    std::shared_ptr<std::vector<flowgraph_metadata> > myMetadata;  }; diff --git a/src/Log.cpp b/src/Log.cpp index 0792fcf..f2219eb 100644 --- a/src/Log.cpp +++ b/src/Log.cpp @@ -109,6 +109,17 @@ LogLine Logger::level(log_level_t level)      return LogLine(this, level);  } +LogToFile::LogToFile(const std::string& filename) : name("FILE") +{ +    FILE* fd = fopen(filename.c_str(), "a"); +    if (fd == nullptr) { +        fprintf(stderr, "Cannot open log file !"); +        throw std::runtime_error("Cannot open log file !"); +    } + +    log_file.reset(fd); +} +  void LogToFile::log(log_level_t level, const std::string& message)  {      if (level != log_level_t::trace) { @@ -116,9 +127,9 @@ void LogToFile::log(log_level_t level, const std::string& message)              "DEBUG", "INFO", "WARN", "ERROR", "ALERT", "EMERG"};          // fprintf is thread-safe -        fprintf(log_file, SYSLOG_IDENT ": %s: %s\n", +        fprintf(log_file.get(), SYSLOG_IDENT ": %s: %s\n",                  log_level_text[(size_t)level], message.c_str()); -        fflush(log_file); +        fflush(log_file.get());      }  } @@ -142,31 +153,33 @@ void LogToSyslog::log(log_level_t level, const std::string& message)      }  } -LogTracer::LogTracer(const string& trace_filename) +LogTracer::LogTracer(const string& trace_filename) : name("TRACE")  { -    name = "TRACE";      etiLog.level(info) << "Setting up TRACE to " << trace_filename; -    m_trace_file = fopen(trace_filename.c_str(), "a"); -    if (m_trace_file == NULL) { +    FILE* fd = fopen(trace_filename.c_str(), "a"); +    if (fd == nullptr) {          fprintf(stderr, "Cannot open trace file !");          throw std::runtime_error("Cannot open trace file !");      } +    m_trace_file.reset(fd); -    auto now = chrono::steady_clock::now().time_since_epoch(); -    m_trace_micros_startup = -        chrono::duration_cast<chrono::microseconds>(now).count(); +    using namespace std::chrono; +    auto now = steady_clock::now().time_since_epoch(); +    m_trace_micros_startup = duration_cast<microseconds>(now).count(); -    fprintf(m_trace_file, "0,TRACER,startup at %ld\n", m_trace_micros_startup); +    fprintf(m_trace_file.get(), +            "0,TRACER,startup at %ld\n", m_trace_micros_startup);  }  void LogTracer::log(log_level_t level, const std::string& message)  {      if (level == log_level_t::trace) { -        const auto now = chrono::steady_clock::now().time_since_epoch(); -        const auto micros = chrono::duration_cast<chrono::microseconds>(now).count(); +        using namespace std::chrono; +        const auto now = steady_clock::now().time_since_epoch(); +        const auto micros = duration_cast<microseconds>(now).count(); -        fprintf(m_trace_file, "%ld,%s\n", +        fprintf(m_trace_file.get(), "%ld,%s\n",                  micros - m_trace_micros_startup,                  message.c_str());      } @@ -57,7 +57,7 @@ static const std::string levels_as_str[] =  class LogBackend {      public:          virtual void log(log_level_t level, const std::string& message) = 0; -        virtual std::string get_name() = 0; +        virtual std::string get_name() const = 0;  };  /** A Logging backend for Syslog */ @@ -73,7 +73,7 @@ class LogToSyslog : public LogBackend {          void log(log_level_t level, const std::string& message); -        std::string get_name() { return name; } +        std::string get_name() const { return name; }      private:          const std::string name; @@ -84,27 +84,15 @@ class LogToSyslog : public LogBackend {  class LogToFile : public LogBackend {      public: -        LogToFile(const std::string& filename) : name("FILE") { -            log_file = fopen(filename.c_str(), "a"); -            if (log_file == NULL) { -                fprintf(stderr, "Cannot open log file !"); -                throw std::runtime_error("Cannot open log file !"); -            } -        } - -        ~LogToFile() { -            if (log_file != NULL) { -                fclose(log_file); -            } -        } - +        LogToFile(const std::string& filename);          void log(log_level_t level, const std::string& message); - -        std::string get_name() { return name; } +        std::string get_name() const { return name; }      private:          const std::string name; -        FILE* log_file; + +        struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}}; +        std::unique_ptr<FILE, FILEDeleter> log_file;          LogToFile(const LogToFile& other) = delete;          const LogToFile& operator=(const LogToFile& other) = delete; @@ -113,19 +101,14 @@ class LogToFile : public LogBackend {  class LogTracer : public LogBackend {      public:          LogTracer(const std::string& filename); - -        ~LogTracer() { -            if (m_trace_file != NULL) { -                fclose(m_trace_file); -            } -        } -          void log(log_level_t level, const std::string& message); -        std::string get_name() { return name; } +        std::string get_name() const { return name; }      private:          std::string name; -        uint64_t m_trace_micros_startup; -        FILE* m_trace_file; +        uint64_t m_trace_micros_startup = 0; + +        struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}}; +        std::unique_ptr<FILE, FILEDeleter> m_trace_file;          LogTracer(const LogTracer& other) = delete;          const LogTracer& operator=(const LogTracer& other) = delete; diff --git a/src/ModPlugin.cpp b/src/ModPlugin.cpp index c39d883..f86bfb2 100644 --- a/src/ModPlugin.cpp +++ b/src/ModPlugin.cpp @@ -74,9 +74,9 @@ int ModOutput::process(  PipelinedModCodec::PipelinedModCodec() :      ModCodec(), -    m_number_of_runs(0),      m_input_queue(),      m_output_queue(), +    m_metadata_fifo(),      m_running(false),      m_thread()  { @@ -107,7 +107,7 @@ int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut)      m_input_queue.push(inbuffer); -    if (m_number_of_runs > 0) { +    if (m_ready_to_output_data) {          std::shared_ptr<Buffer> outbuffer;          m_output_queue.wait_and_pop(outbuffer); @@ -116,13 +116,26 @@ int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut)      else {          dataOut->setLength(dataIn->getLength());          memset(dataOut->getData(), 0, dataOut->getLength()); -        m_number_of_runs++; +        m_ready_to_output_data = true;      }      return dataOut->getLength();  } +meta_vec_t PipelinedModCodec::process_metadata(const meta_vec_t& metadataIn) +{ +    m_metadata_fifo.push_back(metadataIn); +    if (m_metadata_fifo.size() == 2) { +        auto r = std::move(m_metadata_fifo.front()); +        m_metadata_fifo.pop_front(); +        return r; +    } +    else { +        return {}; +    } +} +  void PipelinedModCodec::process_thread()  {      set_thread_name(name()); diff --git a/src/ModPlugin.h b/src/ModPlugin.h index d3aa780..c0f1c1a 100644 --- a/src/ModPlugin.h +++ b/src/ModPlugin.h @@ -30,16 +30,34 @@  #   include <config.h>  #endif -  #include "Buffer.h"  #include "ThreadsafeQueue.h" - -#include <sys/types.h> +#include <cstddef>  #include <vector>  #include <memory>  #include <thread>  #include <atomic> +// All flowgraph elements derive from ModPlugin, or a variant of it. +// Some ModPlugins also support handling metadata. + +struct frame_timestamp; +struct flowgraph_metadata { +    std::shared_ptr<struct frame_timestamp> ts; +}; + +using meta_vec_t = std::vector<flowgraph_metadata>; + +/* ModPlugins that support metadata derive from ModMetadata */ +class ModMetadata { +    public: +        // Receives metadata from all inputs, and process them, and output +        // a sequence of metadata. +        virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) = 0; +}; + + +/* Abstract base class for all flowgraph elements */  class ModPlugin  {  public: @@ -69,7 +87,11 @@ public:      virtual int process(Buffer* const dataIn, Buffer* dataOut) = 0;  }; -class PipelinedModCodec : public ModCodec +/* Pipelined ModCodecs run their processing in a separate thread, and + * have a one-call-to-process() latency. Because of this latency, they + * must also handle the metadata + */ +class PipelinedModCodec : public ModCodec, public ModMetadata  {  public:      PipelinedModCodec(); @@ -82,6 +104,8 @@ public:      virtual int process(Buffer* const dataIn, Buffer* dataOut) final;      virtual const char* name() = 0; +    virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) final; +  protected:      // Once the instance implementing PipelinedModCodec has been constructed,      // it must call start_pipeline_thread() @@ -89,11 +113,13 @@ protected:      virtual int internal_process(Buffer* const dataIn, Buffer* dataOut) = 0;  private: -    size_t m_number_of_runs; +    bool m_ready_to_output_data = false;      ThreadsafeQueue<std::shared_ptr<Buffer> > m_input_queue;      ThreadsafeQueue<std::shared_ptr<Buffer> > m_output_queue; +    std::deque<meta_vec_t> m_metadata_fifo; +      std::atomic<bool> m_running;      std::thread m_thread;      void process_thread(void); @@ -119,3 +145,4 @@ public:              std::vector<Buffer*> dataOut);      virtual int process(Buffer* dataIn) = 0;  }; + diff --git a/src/OutputFile.cpp b/src/OutputFile.cpp index 23d5523..481e858 100644 --- a/src/OutputFile.cpp +++ b/src/OutputFile.cpp @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2016 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -26,47 +26,77 @@  #include "OutputFile.h"  #include "PcDebug.h" +#include "Log.h" +#include "TimestampDecoder.h"  #include <string>  #include <assert.h>  #include <stdexcept> +using namespace std; +  OutputFile::OutputFile(std::string filename) :      ModOutput(), +    ModMetadata(),      myFilename(filename)  {      PDEBUG("OutputFile::OutputFile(filename: %s) @ %p\n",              filename.c_str(), this); -    myFile = fopen(filename.c_str(), "w"); -    if (myFile == NULL) { +    FILE* fd = fopen(filename.c_str(), "w"); +    if (fd == nullptr) {          perror(filename.c_str());          throw std::runtime_error(                  "OutputFile::OutputFile() unable to open file!");      } -} - - -OutputFile::~OutputFile() -{ -    PDEBUG("OutputFile::~OutputFile() @ %p\n", this); - -    if (myFile != NULL) { -        fclose(myFile); -    } +    myFile.reset(fd);  }  int OutputFile::process(Buffer* dataIn)  {      PDEBUG("OutputFile::process(%p)\n", dataIn); -    assert(dataIn != NULL); +    assert(dataIn != nullptr); -    if (fwrite(dataIn->getData(), dataIn->getLength(), 1, myFile) == 0) { +    if (fwrite(dataIn->getData(), dataIn->getLength(), 1, myFile.get()) == 0) {          throw std::runtime_error(                  "OutputFile::process() unable to write to file!");      }      return dataIn->getLength();  } + +meta_vec_t OutputFile::process_metadata(const meta_vec_t& metadataIn) +{ +    stringstream ss; + +    if (metadataIn.empty()) { +        etiLog.level(debug) << "OutputFile: no mdIn"; +    } + +    for (const auto& md : metadataIn) { +        if (md.ts) { +            ss << " FCT=" << md.ts->fct << +                " FP=" << (int)md.ts->fp; +        } +        else { +            ss << " void, "; +        } +    } + +    if (myEtiSource) { +        frame_timestamp ts; +        myEtiSource->calculateTimestamp(ts); +        ss << " ETI FCT=" << ts.fct; +    } + +    etiLog.level(debug) << "Output File got metadata: " << ss.str(); + +    return {}; +} + +void OutputFile::setETISource(EtiSource *etiSource) +{ +    myEtiSource = etiSource; +} diff --git a/src/OutputFile.h b/src/OutputFile.h index 7121ef3..a586921 100644 --- a/src/OutputFile.h +++ b/src/OutputFile.h @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2016 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -32,23 +32,33 @@  #include "ModPlugin.h" +#include "EtiReader.h"  #include <string>  #include <stdio.h>  #include <sys/types.h> +#include <memory> - -class OutputFile : public ModOutput +class OutputFile : public ModOutput, public ModMetadata  {  public:      OutputFile(std::string filename); -    virtual ~OutputFile(); -    virtual int process(Buffer* dataIn); -    const char* name() { return "OutputFile"; } +    virtual int process(Buffer* dataIn) override; +    const char* name() override { return "OutputFile"; } + +    virtual meta_vec_t process_metadata( +            const meta_vec_t& metadataIn) override; + +    void setETISource(EtiSource *etiSource);  protected: +    // TODO remove +    EtiSource *myEtiSource = nullptr; +      std::string myFilename; -    FILE* myFile; + +    struct FILEDeleter{ void operator()(FILE* fd){ if (fd) fclose(fd); }}; +    std::unique_ptr<FILE, FILEDeleter> myFile;  }; diff --git a/src/OutputMemory.cpp b/src/OutputMemory.cpp index 6e2fd49..5f24095 100644 --- a/src/OutputMemory.cpp +++ b/src/OutputMemory.cpp @@ -2,7 +2,7 @@     Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in     Right of Canada (Communications Research Center Canada) -   Copyright (C) 2016 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -26,6 +26,7 @@  #include "OutputMemory.h"  #include "PcDebug.h" +#include "Log.h"  #include <stdexcept>  #include <string.h> @@ -94,3 +95,14 @@ int OutputMemory::process(Buffer* dataIn)      return myDataOut->getLength();  } +meta_vec_t OutputMemory::process_metadata(const meta_vec_t& metadataIn) +{ +    myMetadata = metadataIn; +    return {}; +} + +meta_vec_t OutputMemory::get_latest_metadata() +{ +    return myMetadata; +} + diff --git a/src/OutputMemory.h b/src/OutputMemory.h index 715cb2d..f0a5fbb 100644 --- a/src/OutputMemory.h +++ b/src/OutputMemory.h @@ -2,7 +2,7 @@     Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in     Right of Canada (Communications Research Center Canada) -   Copyright (C) 2016 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -46,18 +46,26 @@  #include "ModPlugin.h" -class OutputMemory : public ModOutput +class OutputMemory : public ModOutput, public ModMetadata  {  public:      OutputMemory(Buffer* dataOut);      virtual ~OutputMemory(); -    virtual int process(Buffer* dataIn); -    const char* name() { return "OutputMemory"; } +    OutputMemory(OutputMemory& other) = delete; +    OutputMemory& operator=(OutputMemory& other) = delete; + +    virtual int process(Buffer* dataIn) override; +    const char* name() override { return "OutputMemory"; } +    virtual meta_vec_t process_metadata( +            const meta_vec_t& metadataIn) override; + +    meta_vec_t get_latest_metadata(void);      void setOutput(Buffer* dataOut);  protected:      Buffer* myDataOut; +    meta_vec_t myMetadata;  #if OUTPUT_MEM_HISTOGRAM      // keep track of max value diff --git a/src/TII.cpp b/src/TII.cpp index 89cd6d0..3c5823b 100644 --- a/src/TII.cpp +++ b/src/TII.cpp @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2017 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -187,7 +187,7 @@ int TII::process(Buffer* dataIn, Buffer* dataOut)      memset(dataOut->getData(), 0,  dataOut->getLength());      if (m_conf.enable and m_insert) { -        boost::mutex::scoped_lock lock(m_enabled_carriers_mutex); +        std::lock_guard<std::mutex> lock(m_enabled_carriers_mutex);          complexf* in = reinterpret_cast<complexf*>(dataIn->getData());          complexf* out = reinterpret_cast<complexf*>(dataOut->getData()); @@ -231,7 +231,7 @@ void TII::enable_carrier(int k) {  void TII::prepare_pattern() {      int comb = m_conf.comb; // Convert from unsigned to signed -    boost::mutex::scoped_lock lock(m_enabled_carriers_mutex); +    std::lock_guard<std::mutex> lock(m_enabled_carriers_mutex);      // Clear previous pattern      for (size_t i = 0; i < m_enabled_carriers.size(); i++) { @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2017 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -35,8 +35,8 @@  #include "ModPlugin.h"  #include "RemoteControl.h" -#include <boost/thread.hpp> -#include <sys/types.h> +#include <cstddef> +#include <thread>  #include <complex>  #include <vector>  #include <string> @@ -118,7 +118,7 @@ class TII : public ModCodec, public RemoteControllable          // m_enabled_carriers is read by modulator thread, and written          // to by RC thread. -        mutable boost::mutex m_enabled_carriers_mutex; +        mutable std::mutex m_enabled_carriers_mutex;          // m_enabled_carriers is true only for the first carrier in the          // active pair diff --git a/src/TimestampDecoder.cpp b/src/TimestampDecoder.cpp index 26deb60..6cf2875 100644 --- a/src/TimestampDecoder.cpp +++ b/src/TimestampDecoder.cpp @@ -37,6 +37,22 @@  //#define MDEBUG(fmt, args...) fprintf (LOG, "*****" fmt , ## args)  #define MDEBUG(fmt, args...) PDEBUG(fmt, ## args) +TimestampDecoder::TimestampDecoder(double& offset_s, unsigned tist_delay_stages) : +        RemoteControllable("tist"), +        timestamp_offset(offset_s), +        m_tist_delay_stages(tist_delay_stages) +{ +    // Properly initialise temp_time +    memset(&temp_time, 0, sizeof(temp_time)); +    const time_t timep = 0; +    gmtime_r(&timep, &temp_time); + +    RC_ADD_PARAMETER(offset, "TIST offset [s]"); +    RC_ADD_PARAMETER(timestamp, "FCT and timestamp [s]"); + +    etiLog.level(info) << "Setting up timestamp decoder with " << +        timestamp_offset << " offset"; +}  void TimestampDecoder::calculateTimestamp(frame_timestamp& ts)  { @@ -48,6 +64,7 @@ void TimestampDecoder::calculateTimestamp(frame_timestamp& ts)      ts_queued->timestamp_sec = time_secs;      ts_queued->timestamp_pps = time_pps;      ts_queued->fct = latestFCT; +    ts_queued->fp = latestFP;      ts_queued->timestamp_refresh = offset_changed;      offset_changed = false; @@ -102,7 +119,29 @@ void TimestampDecoder::calculateTimestamp(frame_timestamp& ts)      //ts.print("calc2 ");  } -void TimestampDecoder::pushMNSCData(int framephase, uint16_t mnsc) +std::shared_ptr<frame_timestamp> TimestampDecoder::getTimestamp() +{ +    std::shared_ptr<frame_timestamp> ts = +        std::make_shared<frame_timestamp>(); + +    /* Push new timestamp into queue */ +    ts->timestamp_valid = full_timestamp_received; +    ts->timestamp_sec = time_secs; +    ts->timestamp_pps = time_pps; +    ts->fct = latestFCT; +    ts->fp = latestFP; + +    ts->timestamp_refresh = offset_changed; +    offset_changed = false; + +    MDEBUG("time_secs=%d, time_pps=%f\n", time_secs, +            (double)time_pps / 16384000.0); +    *ts += timestamp_offset; + +    return ts; +} + +void TimestampDecoder::pushMNSCData(uint8_t framephase, uint16_t mnsc)  {      struct eti_MNSC_TIME_0 *mnsc0;      struct eti_MNSC_TIME_1 *mnsc1; @@ -190,7 +229,7 @@ void TimestampDecoder::updateTimestampPPS(uint32_t pps)  }  void TimestampDecoder::updateTimestampEti( -        int framephase, +        uint8_t framephase,          uint16_t mnsc,          uint32_t pps, // In units of 1/16384000 s          int32_t fct) @@ -198,16 +237,19 @@ void TimestampDecoder::updateTimestampEti(      updateTimestampPPS(pps);      pushMNSCData(framephase, mnsc);      latestFCT = fct; +    latestFP = framephase;  }  void TimestampDecoder::updateTimestampEdi(          uint32_t seconds_utc,          uint32_t pps, // In units of 1/16384000 s -        int32_t fct) +        int32_t fct, +        uint8_t framephase)  {      time_secs = seconds_utc;      time_pps  = pps;      latestFCT = fct; +    latestFP = framephase;      full_timestamp_received = true;  } diff --git a/src/TimestampDecoder.h b/src/TimestampDecoder.h index 2272fe0..0953f76 100644 --- a/src/TimestampDecoder.h +++ b/src/TimestampDecoder.h @@ -40,6 +40,7 @@ struct frame_timestamp  {      // Which frame count does this timestamp apply to      int32_t fct; +    uint8_t fp; // Frame Phase      uint32_t timestamp_sec;      uint32_t timestamp_pps; // In units of 1/16384000 s @@ -56,6 +57,7 @@ struct frame_timestamp              this->timestamp_valid = rhs.timestamp_valid;              this->timestamp_refresh = rhs.timestamp_refresh;              this->fct = rhs.fct; +            this->fp = rhs.fp;          }          return *this; @@ -115,49 +117,23 @@ struct frame_timestamp  class TimestampDecoder : public RemoteControllable  {      public: -        TimestampDecoder( -                /* The modulator adds this offset to the TIST to define time of -                 * frame transmission -                 */ -                double& offset_s, - -                /* Specifies by how many stages the timestamp must be delayed. -                 * (e.g. The FIRFilter is pipelined, therefore we must increase -                 * tist_delay_stages by one if the filter is used -                 */ -                unsigned tist_delay_stages) : -            RemoteControllable("tist"), -            timestamp_offset(offset_s) -        { -            m_tist_delay_stages = tist_delay_stages; -            inhibit_second_update = 0; -            time_pps = 0.0; -            time_secs = 0; -            latestFCT = 0; -            enableDecode = false; -            full_timestamp_received = false; - -            // Properly initialise temp_time -            memset(&temp_time, 0, sizeof(temp_time)); -            const time_t timep = 0; -            gmtime_r(&timep, &temp_time); - -            offset_changed = false; - -            RC_ADD_PARAMETER(offset, "TIST offset [s]"); -            RC_ADD_PARAMETER(timestamp, "FCT and timestamp [s]"); - -            etiLog.level(info) << "Setting up timestamp decoder with " << -                timestamp_offset << " offset"; - -        }; +        /* offset_s: The modulator adds this offset to the TIST to define time of +         * frame transmission +         * +         * tist_delay_stages: Specifies by how many stages the timestamp must +         * be delayed.  (e.g. The FIRFilter is pipelined, therefore we must +         * increase tist_delay_stages by one if the filter is used +         */ +        TimestampDecoder(double& offset_s, unsigned tist_delay_stages);          /* Calculate the timestamp for the current frame. */          void calculateTimestamp(frame_timestamp& ts); +        std::shared_ptr<frame_timestamp> getTimestamp(void); +          /* Update timestamp data from ETI */          void updateTimestampEti( -                int framephase, +                uint8_t framephase,                  uint16_t mnsc,                  uint32_t pps, // In units of 1/16384000 s                  int32_t fct); @@ -166,7 +142,8 @@ class TimestampDecoder : public RemoteControllable          void updateTimestampEdi(                  uint32_t seconds_utc,                  uint32_t pps, // In units of 1/16384000 s -                int32_t fct); +                int32_t fct, +                uint8_t framephase);          /*********** REMOTE CONTROL ***************/ @@ -183,7 +160,7 @@ class TimestampDecoder : public RemoteControllable      protected:          /* Push a new MNSC field into the decoder */ -        void pushMNSCData(int framephase, uint16_t mnsc); +        void pushMNSCData(uint8_t framephase, uint16_t mnsc);          /* Each frame contains the TIST field with the PPS offset.           * For each frame, this function must be called to update @@ -203,21 +180,22 @@ class TimestampDecoder : public RemoteControllable          void updateTimestampSeconds(uint32_t secs);          struct tm temp_time; -        uint32_t time_secs; -        int32_t latestFCT; -        uint32_t time_pps; +        uint32_t time_secs = 0; +        int32_t latestFCT = 0; +        uint32_t latestFP = 0; +        uint32_t time_pps = 0;          double& timestamp_offset;          unsigned m_tist_delay_stages; -        int inhibit_second_update; -        bool offset_changed; +        int inhibit_second_update = 0; +        bool offset_changed = false;          /* When the type or identifier don't match, the decoder must           * be disabled           */ -        bool enableDecode; +        bool enableDecode = false;          /* Disable timstamps until full time has been received */ -        bool full_timestamp_received; +        bool full_timestamp_received = false;          /* when pipelining, we must shift the calculated timestamps           * through this queue. Otherwise, it would not be possible to @@ -225,6 +203,5 @@ class TimestampDecoder : public RemoteControllable           * FIRFilter (1 stage pipeline)           */          std::queue<std::shared_ptr<frame_timestamp> > queue_timestamps; -  };  | 
