summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2018-01-07 10:38:55 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2018-01-07 10:38:55 +0100
commit8953a94893fe39a10b044ea62cf6971d36801880 (patch)
tree0cd0ab921827400f3ada7449827bbffaa97e7450 /src
parente0f9c8909ecba56da4c7a2ec3507b8af19b737bd (diff)
parent0315433aef00644085d2278af405eaedbc184c5c (diff)
downloaddabmod-8953a94893fe39a10b044ea62cf6971d36801880.tar.gz
dabmod-8953a94893fe39a10b044ea62cf6971d36801880.tar.bz2
dabmod-8953a94893fe39a10b044ea62cf6971d36801880.zip
Merge branch 'flowgraphmetadata' into outputRefactoring
Diffstat (limited to 'src')
-rw-r--r--src/BlockPartitioner.cpp45
-rw-r--r--src/BlockPartitioner.h21
-rw-r--r--src/DabMod.cpp6
-rw-r--r--src/DabModulator.cpp13
-rw-r--r--src/DabModulator.h9
-rw-r--r--src/EtiReader.cpp7
-rw-r--r--src/FicSource.cpp24
-rw-r--r--src/FicSource.h11
-rw-r--r--src/Flowgraph.cpp68
-rw-r--r--src/Flowgraph.h14
-rw-r--r--src/Log.cpp39
-rw-r--r--src/Log.h41
-rw-r--r--src/ModPlugin.cpp19
-rw-r--r--src/ModPlugin.h37
-rw-r--r--src/OutputFile.cpp60
-rw-r--r--src/OutputFile.h24
-rw-r--r--src/OutputMemory.cpp14
-rw-r--r--src/OutputMemory.h16
-rw-r--r--src/TII.cpp6
-rw-r--r--src/TII.h8
-rw-r--r--src/TimestampDecoder.cpp48
-rw-r--r--src/TimestampDecoder.h71
22 files changed, 428 insertions, 173 deletions
diff --git a/src/BlockPartitioner.cpp b/src/BlockPartitioner.cpp
index 9e9f80b..54405d9 100644
--- a/src/BlockPartitioner.cpp
+++ b/src/BlockPartitioner.cpp
@@ -1,6 +1,11 @@
/*
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
*/
/*
This file is part of ODR-DabMod.
@@ -21,6 +26,8 @@
#include "BlockPartitioner.h"
#include "PcDebug.h"
+#include "Log.h"
+#include "TimestampDecoder.h"
#include <stdio.h>
#include <stdexcept>
@@ -31,6 +38,7 @@
BlockPartitioner::BlockPartitioner(unsigned mode, unsigned phase) :
ModMux(),
+ ModMetadata(),
d_mode(mode)
{
PDEBUG("BlockPartitioner::BlockPartitioner(%i)\n", mode);
@@ -68,17 +76,11 @@ BlockPartitioner::BlockPartitioner(unsigned mode, unsigned phase) :
d_cifNb = 0;
// For Synchronisation purpose, count nb of CIF to drop
d_cifPhase = phase % d_cifCount;
+ d_metaPhase = phase % d_cifCount;
d_cifSize = 864 * 8;
}
-BlockPartitioner::~BlockPartitioner()
-{
- PDEBUG("BlockPartitioner::~BlockPartitioner()\n");
-
-}
-
-
// dataIn[0] -> FIC
// dataIn[1] -> CIF
int BlockPartitioner::process(std::vector<Buffer*> dataIn, Buffer* dataOut)
@@ -124,10 +126,10 @@ int BlockPartitioner::process(std::vector<Buffer*> dataIn, Buffer* dataOut)
uint8_t* out = reinterpret_cast<uint8_t*>(dataOut->getData());
// Copy FIC data
- PDEBUG("Writting FIC %zu bytes to %zu\n", d_ficSize, d_cifNb * d_ficSize);
+ PDEBUG("Writing FIC %zu bytes to %zu\n", d_ficSize, d_cifNb * d_ficSize);
memcpy(out + (d_cifNb * d_ficSize), fic, d_ficSize);
// Copy CIF data
- PDEBUG("Writting CIF %u bytes to %zu\n", 864 * 8,
+ PDEBUG("Writing CIF %u bytes to %zu\n", 864 * 8,
(d_cifCount * d_ficSize) + (d_cifNb * 864 * 8));
memcpy(out + (d_cifCount * d_ficSize) + (d_cifNb * 864 * 8), cif, 864 * 8);
@@ -137,3 +139,28 @@ int BlockPartitioner::process(std::vector<Buffer*> dataIn, Buffer* dataOut)
return d_cifNb == 0;
}
+
+meta_vec_t BlockPartitioner::process_metadata(const meta_vec_t& metadataIn)
+{
+ // Synchronize CIF phase
+ if (d_metaPhase != 0) {
+ if (++d_metaPhase == d_cifCount) {
+ d_metaPhase = 0;
+ }
+ // Drop this metadata
+ return {};
+ }
+
+ if (d_cifNb == 1) {
+ d_meta.clear();
+ }
+
+ std::copy(metadataIn.begin(), metadataIn.end(), std::back_inserter(d_meta));
+
+ if (d_cifNb == 0) {
+ return d_meta;
+ }
+ else {
+ return {};
+ }
+}
diff --git a/src/BlockPartitioner.h b/src/BlockPartitioner.h
index 90cffa3..a4656a1 100644
--- a/src/BlockPartitioner.h
+++ b/src/BlockPartitioner.h
@@ -1,6 +1,11 @@
/*
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
*/
/*
This file is part of ODR-DabMod.
@@ -25,32 +30,32 @@
# include <config.h>
#endif
-
#include "ModPlugin.h"
#include <vector>
+#include <cstddef>
-#include <sys/types.h>
-
-
-class BlockPartitioner : public ModMux
+class BlockPartitioner : public ModMux, public ModMetadata
{
public:
BlockPartitioner(unsigned mode, unsigned phase);
- virtual ~BlockPartitioner();
- BlockPartitioner(const BlockPartitioner&);
- BlockPartitioner& operator=(const BlockPartitioner&);
int process(std::vector<Buffer*> dataIn, Buffer* dataOut);
const char* name() { return "BlockPartitioner"; }
+ // The implementation assumes process_metadata is always called after process
+ virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn);
+
protected:
int d_mode;
size_t d_ficSize;
size_t d_cifCount;
size_t d_cifNb;
size_t d_cifPhase;
+ size_t d_metaPhase;
size_t d_cifSize;
size_t d_outputFramesize;
size_t d_outputFramecount;
+
+ meta_vec_t d_meta;
};
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 12cfa43..9880938 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -425,6 +425,12 @@ int launch_modulator(int argc, char* argv[])
((Output::SDR*)output.get())->setETISource(modulator->getEtiSource());
}
+ // TODO remove
+ auto output_as_file = dynamic_pointer_cast<OutputFile>(output);
+ if (output_as_file) {
+ output_as_file->setETISource(modulator->getEtiSource());
+ }
+
inputReader->PrintInfo();
run_modulator_state_t st = run_modulator(m);
diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp
index 1ea06de..337a595 100644
--- a/src/DabModulator.cpp
+++ b/src/DabModulator.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -221,7 +221,7 @@ int DabModulator::process(Buffer* dataOut)
rcs.enrol(cifPoly.get());
}
- auto myOutput = make_shared<OutputMemory>(dataOut);
+ myOutput = make_shared<OutputMemory>(dataOut);
shared_ptr<Resampler> cifRes;
if (m_settings.outputRate != 2048000) {
@@ -376,3 +376,12 @@ int DabModulator::process(Buffer* dataOut)
return myFlowgraph->run();
}
+meta_vec_t DabModulator::process_metadata(const meta_vec_t& metadataIn)
+{
+ if (myOutput) {
+ return myOutput->get_latest_metadata();
+ }
+
+ return {};
+}
+
diff --git a/src/DabModulator.h b/src/DabModulator.h
index 6878853..e806c92 100644
--- a/src/DabModulator.h
+++ b/src/DabModulator.h
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -46,7 +46,7 @@
#include "TII.h"
-class DabModulator : public ModInput
+class DabModulator : public ModInput, public ModMetadata
{
public:
DabModulator(EtiSource& etiSource,
@@ -55,6 +55,9 @@ public:
int process(Buffer* dataOut);
const char* name() { return "DabModulator"; }
+ virtual meta_vec_t process_metadata(
+ const meta_vec_t& metadataIn);
+
/* Required to get the timestamp */
EtiSource* getEtiSource() { return &myEtiSource; }
@@ -73,5 +76,7 @@ protected:
size_t mySymSize;
size_t myFicSizeOut;
size_t myFicSizeIn;
+
+ std::shared_ptr<OutputMemory> myOutput;
};
diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp
index d84ed1f..dc5df84 100644
--- a/src/EtiReader.cpp
+++ b/src/EtiReader.cpp
@@ -281,6 +281,8 @@ int EtiReader::loadEtiData(const Buffer& dataIn)
myTimestampDecoder.updateTimestampEti(eti_fc.FP & 0x3,
eti_eoh.MNSC, getPPSOffset(), eti_fc.FCT);
+ myFicSource->loadTimestamp(myTimestampDecoder.getTimestamp());
+
return dataIn.getLength() - input_size;
}
@@ -533,8 +535,9 @@ void EdiReader::assemble()
const std::time_t posix_timestamp_1_jan_2000 = 946684800;
auto utc_ts = posix_timestamp_1_jan_2000 + m_seconds - m_utco;
- m_timestamp_decoder.updateTimestampEdi(
- utc_ts, m_fc.tsta, m_fc.fct());
+ m_timestamp_decoder.updateTimestampEdi(utc_ts, m_fc.tsta, m_fc.fct(), m_fc.fp);
+
+ myFicSource->loadTimestamp(m_timestamp_decoder.getTimestamp());
m_frameReady = true;
}
diff --git a/src/FicSource.cpp b/src/FicSource.cpp
index 04197db..2b95085 100644
--- a/src/FicSource.cpp
+++ b/src/FicSource.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -26,6 +26,8 @@
#include "FicSource.h"
#include "PcDebug.h"
+#include "Log.h"
+#include "TimestampDecoder.h"
#include <stdexcept>
#include <string>
@@ -92,3 +94,23 @@ int FicSource::process(Buffer* outputData)
return outputData->getLength();
}
+void FicSource::loadTimestamp(const std::shared_ptr<struct frame_timestamp>& ts)
+{
+ d_ts = ts;
+}
+
+
+meta_vec_t FicSource::process_metadata(const meta_vec_t& metadataIn)
+{
+ if (not d_ts) {
+ return {};
+ }
+
+ using namespace std;
+ meta_vec_t md_vec;
+ flowgraph_metadata meta;
+ meta.ts = d_ts;
+ md_vec.push_back(meta);
+ return md_vec;
+}
+
diff --git a/src/FicSource.h b/src/FicSource.h
index 77ac741..93c1a7f 100644
--- a/src/FicSource.h
+++ b/src/FicSource.h
@@ -36,7 +36,7 @@
#include <vector>
#include <sys/types.h>
-class FicSource : public ModInput
+class FicSource : public ModInput, public ModMetadata
{
public:
FicSource(unsigned ficf, unsigned mid);
@@ -45,12 +45,17 @@ public:
const std::vector<PuncturingRule>& get_rules();
void loadFicData(const Buffer& fic);
- int process(Buffer* outputData);
- const char* name() { return "FicSource"; }
+ int process(Buffer* outputData) override;
+ const char* name() override { return "FicSource"; }
+
+ void loadTimestamp(const std::shared_ptr<struct frame_timestamp>& ts);
+ virtual meta_vec_t process_metadata(
+ const meta_vec_t& metadataIn) override;
private:
size_t d_framesize;
Buffer d_buffer;
+ std::shared_ptr<struct frame_timestamp> d_ts;
std::vector<PuncturingRule> d_puncturing_rules;
};
diff --git a/src/Flowgraph.cpp b/src/Flowgraph.cpp
index 465ef41..0b80a8c 100644
--- a/src/Flowgraph.cpp
+++ b/src/Flowgraph.cpp
@@ -26,6 +26,8 @@
#include "Flowgraph.h"
#include "PcDebug.h"
+#include "Log.h"
+#include "TimestampDecoder.h"
#include <string>
#include <memory>
#include <algorithm>
@@ -57,9 +59,10 @@ Node::~Node()
assert(myOutputBuffers.size() == 0);
}
-void Node::addOutputBuffer(Buffer::sptr& buffer)
+void Node::addOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
myOutputBuffers.push_back(buffer);
+ myOutputMetadata.push_back(md);
#if DEBUG
std::string fname = string(myPlugin->name()) +
"-" + to_string(myDebugFiles.size()) +
@@ -71,7 +74,7 @@ void Node::addOutputBuffer(Buffer::sptr& buffer)
#endif
}
-void Node::removeOutputBuffer(Buffer::sptr& buffer)
+void Node::removeOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
auto it = std::find(
myOutputBuffers.begin(),
@@ -89,14 +92,23 @@ void Node::removeOutputBuffer(Buffer::sptr& buffer)
#endif
myOutputBuffers.erase(it);
}
+
+ auto mdit = std::find(
+ myOutputMetadata.begin(),
+ myOutputMetadata.end(),
+ md);
+ if (mdit != myOutputMetadata.end()) {
+ myOutputMetadata.erase(mdit);
+ }
}
-void Node::addInputBuffer(Buffer::sptr& buffer)
+void Node::addInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
myInputBuffers.push_back(buffer);
+ myInputMetadata.push_back(md);
}
-void Node::removeInputBuffer(Buffer::sptr& buffer)
+void Node::removeInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
auto it = std::find(
myInputBuffers.begin(),
@@ -105,6 +117,14 @@ void Node::removeInputBuffer(Buffer::sptr& buffer)
if (it != myInputBuffers.end()) {
myInputBuffers.erase(it);
}
+
+ auto mdit = std::find(
+ myInputMetadata.begin(),
+ myInputMetadata.end(),
+ md);
+ if (mdit != myInputMetadata.end()) {
+ myInputMetadata.erase(mdit);
+ }
}
int Node::process()
@@ -127,6 +147,36 @@ int Node::process()
}
int ret = myPlugin->process(inBuffers, outBuffers);
+
+ // Collect all incoming metadata into a single vector
+ meta_vec_t all_input_mds;
+ for (auto& md_vec_sp : myInputMetadata) {
+ if (md_vec_sp) {
+ copy(md_vec_sp->begin(), md_vec_sp->end(),
+ back_inserter(all_input_mds));
+ }
+ }
+
+ auto mod_meta = dynamic_pointer_cast<ModMetadata>(myPlugin);
+ if (mod_meta) {
+ auto outputMetadata = mod_meta->process_metadata(all_input_mds);
+ // Distribute the result metadata to all outputs
+ for (auto& out_md : myOutputMetadata) {
+ out_md->clear();
+ std::move(outputMetadata.begin(), outputMetadata.end(),
+ std::back_inserter(*out_md));
+ }
+ }
+ else {
+ // Propagate the unmodified input metadata to all outputs
+ for (auto& out_md : myOutputMetadata) {
+ out_md->clear();
+ std::move(all_input_mds.begin(), all_input_mds.end(),
+ std::back_inserter(*out_md));
+ }
+ }
+
+
#if DEBUG
assert(myDebugFiles.size() == myOutputBuffers.size());
@@ -158,8 +208,10 @@ Edge::Edge(shared_ptr<Node>& srcNode, shared_ptr<Node>& dstNode) :
this);
myBuffer = make_shared<Buffer>();
- srcNode->addOutputBuffer(myBuffer);
- dstNode->addInputBuffer(myBuffer);
+ myMetadata = make_shared<vector<flowgraph_metadata> >();
+
+ srcNode->addOutputBuffer(myBuffer, myMetadata);
+ dstNode->addInputBuffer(myBuffer, myMetadata);
}
@@ -168,8 +220,8 @@ Edge::~Edge()
PDEBUG("Edge::~Edge() @ %p\n", this);
if (myBuffer) {
- mySrcNode->removeOutputBuffer(myBuffer);
- myDstNode->removeInputBuffer(myBuffer);
+ mySrcNode->removeOutputBuffer(myBuffer, myMetadata);
+ myDstNode->removeInputBuffer(myBuffer, myMetadata);
}
}
diff --git a/src/Flowgraph.h b/src/Flowgraph.h
index ebb7314..2742824 100644
--- a/src/Flowgraph.h
+++ b/src/Flowgraph.h
@@ -39,6 +39,8 @@
#include <list>
#include <cstdio>
+using Metadata_vec_sptr = std::shared_ptr<std::vector<flowgraph_metadata> >;
+
class Node
{
public:
@@ -55,15 +57,18 @@ public:
myProcessTime += processTime;
}
- void addOutputBuffer(Buffer::sptr& buffer);
- void removeOutputBuffer(Buffer::sptr& buffer);
+ void addOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md);
+ void removeOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md);
- void addInputBuffer(Buffer::sptr& buffer);
- void removeInputBuffer(Buffer::sptr& buffer);
+ void addInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md);
+ void removeInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md);
protected:
std::list<Buffer::sptr> myInputBuffers;
std::list<Buffer::sptr> myOutputBuffers;
+ std::list<Metadata_vec_sptr> myInputMetadata;
+ std::list<Metadata_vec_sptr> myOutputMetadata;
+
#if DEBUG
std::list<FILE*> myDebugFiles;
#endif
@@ -85,6 +90,7 @@ protected:
std::shared_ptr<Node> mySrcNode;
std::shared_ptr<Node> myDstNode;
std::shared_ptr<Buffer> myBuffer;
+ std::shared_ptr<std::vector<flowgraph_metadata> > myMetadata;
};
diff --git a/src/Log.cpp b/src/Log.cpp
index 0792fcf..f2219eb 100644
--- a/src/Log.cpp
+++ b/src/Log.cpp
@@ -109,6 +109,17 @@ LogLine Logger::level(log_level_t level)
return LogLine(this, level);
}
+LogToFile::LogToFile(const std::string& filename) : name("FILE")
+{
+ FILE* fd = fopen(filename.c_str(), "a");
+ if (fd == nullptr) {
+ fprintf(stderr, "Cannot open log file !");
+ throw std::runtime_error("Cannot open log file !");
+ }
+
+ log_file.reset(fd);
+}
+
void LogToFile::log(log_level_t level, const std::string& message)
{
if (level != log_level_t::trace) {
@@ -116,9 +127,9 @@ void LogToFile::log(log_level_t level, const std::string& message)
"DEBUG", "INFO", "WARN", "ERROR", "ALERT", "EMERG"};
// fprintf is thread-safe
- fprintf(log_file, SYSLOG_IDENT ": %s: %s\n",
+ fprintf(log_file.get(), SYSLOG_IDENT ": %s: %s\n",
log_level_text[(size_t)level], message.c_str());
- fflush(log_file);
+ fflush(log_file.get());
}
}
@@ -142,31 +153,33 @@ void LogToSyslog::log(log_level_t level, const std::string& message)
}
}
-LogTracer::LogTracer(const string& trace_filename)
+LogTracer::LogTracer(const string& trace_filename) : name("TRACE")
{
- name = "TRACE";
etiLog.level(info) << "Setting up TRACE to " << trace_filename;
- m_trace_file = fopen(trace_filename.c_str(), "a");
- if (m_trace_file == NULL) {
+ FILE* fd = fopen(trace_filename.c_str(), "a");
+ if (fd == nullptr) {
fprintf(stderr, "Cannot open trace file !");
throw std::runtime_error("Cannot open trace file !");
}
+ m_trace_file.reset(fd);
- auto now = chrono::steady_clock::now().time_since_epoch();
- m_trace_micros_startup =
- chrono::duration_cast<chrono::microseconds>(now).count();
+ using namespace std::chrono;
+ auto now = steady_clock::now().time_since_epoch();
+ m_trace_micros_startup = duration_cast<microseconds>(now).count();
- fprintf(m_trace_file, "0,TRACER,startup at %ld\n", m_trace_micros_startup);
+ fprintf(m_trace_file.get(),
+ "0,TRACER,startup at %ld\n", m_trace_micros_startup);
}
void LogTracer::log(log_level_t level, const std::string& message)
{
if (level == log_level_t::trace) {
- const auto now = chrono::steady_clock::now().time_since_epoch();
- const auto micros = chrono::duration_cast<chrono::microseconds>(now).count();
+ using namespace std::chrono;
+ const auto now = steady_clock::now().time_since_epoch();
+ const auto micros = duration_cast<microseconds>(now).count();
- fprintf(m_trace_file, "%ld,%s\n",
+ fprintf(m_trace_file.get(), "%ld,%s\n",
micros - m_trace_micros_startup,
message.c_str());
}
diff --git a/src/Log.h b/src/Log.h
index ae252a6..0e09bc9 100644
--- a/src/Log.h
+++ b/src/Log.h
@@ -57,7 +57,7 @@ static const std::string levels_as_str[] =
class LogBackend {
public:
virtual void log(log_level_t level, const std::string& message) = 0;
- virtual std::string get_name() = 0;
+ virtual std::string get_name() const = 0;
};
/** A Logging backend for Syslog */
@@ -73,7 +73,7 @@ class LogToSyslog : public LogBackend {
void log(log_level_t level, const std::string& message);
- std::string get_name() { return name; }
+ std::string get_name() const { return name; }
private:
const std::string name;
@@ -84,27 +84,15 @@ class LogToSyslog : public LogBackend {
class LogToFile : public LogBackend {
public:
- LogToFile(const std::string& filename) : name("FILE") {
- log_file = fopen(filename.c_str(), "a");
- if (log_file == NULL) {
- fprintf(stderr, "Cannot open log file !");
- throw std::runtime_error("Cannot open log file !");
- }
- }
-
- ~LogToFile() {
- if (log_file != NULL) {
- fclose(log_file);
- }
- }
-
+ LogToFile(const std::string& filename);
void log(log_level_t level, const std::string& message);
-
- std::string get_name() { return name; }
+ std::string get_name() const { return name; }
private:
const std::string name;
- FILE* log_file;
+
+ struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}};
+ std::unique_ptr<FILE, FILEDeleter> log_file;
LogToFile(const LogToFile& other) = delete;
const LogToFile& operator=(const LogToFile& other) = delete;
@@ -113,19 +101,14 @@ class LogToFile : public LogBackend {
class LogTracer : public LogBackend {
public:
LogTracer(const std::string& filename);
-
- ~LogTracer() {
- if (m_trace_file != NULL) {
- fclose(m_trace_file);
- }
- }
-
void log(log_level_t level, const std::string& message);
- std::string get_name() { return name; }
+ std::string get_name() const { return name; }
private:
std::string name;
- uint64_t m_trace_micros_startup;
- FILE* m_trace_file;
+ uint64_t m_trace_micros_startup = 0;
+
+ struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}};
+ std::unique_ptr<FILE, FILEDeleter> m_trace_file;
LogTracer(const LogTracer& other) = delete;
const LogTracer& operator=(const LogTracer& other) = delete;
diff --git a/src/ModPlugin.cpp b/src/ModPlugin.cpp
index c39d883..f86bfb2 100644
--- a/src/ModPlugin.cpp
+++ b/src/ModPlugin.cpp
@@ -74,9 +74,9 @@ int ModOutput::process(
PipelinedModCodec::PipelinedModCodec() :
ModCodec(),
- m_number_of_runs(0),
m_input_queue(),
m_output_queue(),
+ m_metadata_fifo(),
m_running(false),
m_thread()
{
@@ -107,7 +107,7 @@ int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut)
m_input_queue.push(inbuffer);
- if (m_number_of_runs > 0) {
+ if (m_ready_to_output_data) {
std::shared_ptr<Buffer> outbuffer;
m_output_queue.wait_and_pop(outbuffer);
@@ -116,13 +116,26 @@ int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut)
else {
dataOut->setLength(dataIn->getLength());
memset(dataOut->getData(), 0, dataOut->getLength());
- m_number_of_runs++;
+ m_ready_to_output_data = true;
}
return dataOut->getLength();
}
+meta_vec_t PipelinedModCodec::process_metadata(const meta_vec_t& metadataIn)
+{
+ m_metadata_fifo.push_back(metadataIn);
+ if (m_metadata_fifo.size() == 2) {
+ auto r = std::move(m_metadata_fifo.front());
+ m_metadata_fifo.pop_front();
+ return r;
+ }
+ else {
+ return {};
+ }
+}
+
void PipelinedModCodec::process_thread()
{
set_thread_name(name());
diff --git a/src/ModPlugin.h b/src/ModPlugin.h
index d3aa780..c0f1c1a 100644
--- a/src/ModPlugin.h
+++ b/src/ModPlugin.h
@@ -30,16 +30,34 @@
# include <config.h>
#endif
-
#include "Buffer.h"
#include "ThreadsafeQueue.h"
-
-#include <sys/types.h>
+#include <cstddef>
#include <vector>
#include <memory>
#include <thread>
#include <atomic>
+// All flowgraph elements derive from ModPlugin, or a variant of it.
+// Some ModPlugins also support handling metadata.
+
+struct frame_timestamp;
+struct flowgraph_metadata {
+ std::shared_ptr<struct frame_timestamp> ts;
+};
+
+using meta_vec_t = std::vector<flowgraph_metadata>;
+
+/* ModPlugins that support metadata derive from ModMetadata */
+class ModMetadata {
+ public:
+ // Receives metadata from all inputs, and process them, and output
+ // a sequence of metadata.
+ virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) = 0;
+};
+
+
+/* Abstract base class for all flowgraph elements */
class ModPlugin
{
public:
@@ -69,7 +87,11 @@ public:
virtual int process(Buffer* const dataIn, Buffer* dataOut) = 0;
};
-class PipelinedModCodec : public ModCodec
+/* Pipelined ModCodecs run their processing in a separate thread, and
+ * have a one-call-to-process() latency. Because of this latency, they
+ * must also handle the metadata
+ */
+class PipelinedModCodec : public ModCodec, public ModMetadata
{
public:
PipelinedModCodec();
@@ -82,6 +104,8 @@ public:
virtual int process(Buffer* const dataIn, Buffer* dataOut) final;
virtual const char* name() = 0;
+ virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) final;
+
protected:
// Once the instance implementing PipelinedModCodec has been constructed,
// it must call start_pipeline_thread()
@@ -89,11 +113,13 @@ protected:
virtual int internal_process(Buffer* const dataIn, Buffer* dataOut) = 0;
private:
- size_t m_number_of_runs;
+ bool m_ready_to_output_data = false;
ThreadsafeQueue<std::shared_ptr<Buffer> > m_input_queue;
ThreadsafeQueue<std::shared_ptr<Buffer> > m_output_queue;
+ std::deque<meta_vec_t> m_metadata_fifo;
+
std::atomic<bool> m_running;
std::thread m_thread;
void process_thread(void);
@@ -119,3 +145,4 @@ public:
std::vector<Buffer*> dataOut);
virtual int process(Buffer* dataIn) = 0;
};
+
diff --git a/src/OutputFile.cpp b/src/OutputFile.cpp
index 23d5523..481e858 100644
--- a/src/OutputFile.cpp
+++ b/src/OutputFile.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -26,47 +26,77 @@
#include "OutputFile.h"
#include "PcDebug.h"
+#include "Log.h"
+#include "TimestampDecoder.h"
#include <string>
#include <assert.h>
#include <stdexcept>
+using namespace std;
+
OutputFile::OutputFile(std::string filename) :
ModOutput(),
+ ModMetadata(),
myFilename(filename)
{
PDEBUG("OutputFile::OutputFile(filename: %s) @ %p\n",
filename.c_str(), this);
- myFile = fopen(filename.c_str(), "w");
- if (myFile == NULL) {
+ FILE* fd = fopen(filename.c_str(), "w");
+ if (fd == nullptr) {
perror(filename.c_str());
throw std::runtime_error(
"OutputFile::OutputFile() unable to open file!");
}
-}
-
-
-OutputFile::~OutputFile()
-{
- PDEBUG("OutputFile::~OutputFile() @ %p\n", this);
-
- if (myFile != NULL) {
- fclose(myFile);
- }
+ myFile.reset(fd);
}
int OutputFile::process(Buffer* dataIn)
{
PDEBUG("OutputFile::process(%p)\n", dataIn);
- assert(dataIn != NULL);
+ assert(dataIn != nullptr);
- if (fwrite(dataIn->getData(), dataIn->getLength(), 1, myFile) == 0) {
+ if (fwrite(dataIn->getData(), dataIn->getLength(), 1, myFile.get()) == 0) {
throw std::runtime_error(
"OutputFile::process() unable to write to file!");
}
return dataIn->getLength();
}
+
+meta_vec_t OutputFile::process_metadata(const meta_vec_t& metadataIn)
+{
+ stringstream ss;
+
+ if (metadataIn.empty()) {
+ etiLog.level(debug) << "OutputFile: no mdIn";
+ }
+
+ for (const auto& md : metadataIn) {
+ if (md.ts) {
+ ss << " FCT=" << md.ts->fct <<
+ " FP=" << (int)md.ts->fp;
+ }
+ else {
+ ss << " void, ";
+ }
+ }
+
+ if (myEtiSource) {
+ frame_timestamp ts;
+ myEtiSource->calculateTimestamp(ts);
+ ss << " ETI FCT=" << ts.fct;
+ }
+
+ etiLog.level(debug) << "Output File got metadata: " << ss.str();
+
+ return {};
+}
+
+void OutputFile::setETISource(EtiSource *etiSource)
+{
+ myEtiSource = etiSource;
+}
diff --git a/src/OutputFile.h b/src/OutputFile.h
index 7121ef3..a586921 100644
--- a/src/OutputFile.h
+++ b/src/OutputFile.h
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -32,23 +32,33 @@
#include "ModPlugin.h"
+#include "EtiReader.h"
#include <string>
#include <stdio.h>
#include <sys/types.h>
+#include <memory>
-
-class OutputFile : public ModOutput
+class OutputFile : public ModOutput, public ModMetadata
{
public:
OutputFile(std::string filename);
- virtual ~OutputFile();
- virtual int process(Buffer* dataIn);
- const char* name() { return "OutputFile"; }
+ virtual int process(Buffer* dataIn) override;
+ const char* name() override { return "OutputFile"; }
+
+ virtual meta_vec_t process_metadata(
+ const meta_vec_t& metadataIn) override;
+
+ void setETISource(EtiSource *etiSource);
protected:
+ // TODO remove
+ EtiSource *myEtiSource = nullptr;
+
std::string myFilename;
- FILE* myFile;
+
+ struct FILEDeleter{ void operator()(FILE* fd){ if (fd) fclose(fd); }};
+ std::unique_ptr<FILE, FILEDeleter> myFile;
};
diff --git a/src/OutputMemory.cpp b/src/OutputMemory.cpp
index 6e2fd49..5f24095 100644
--- a/src/OutputMemory.cpp
+++ b/src/OutputMemory.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -26,6 +26,7 @@
#include "OutputMemory.h"
#include "PcDebug.h"
+#include "Log.h"
#include <stdexcept>
#include <string.h>
@@ -94,3 +95,14 @@ int OutputMemory::process(Buffer* dataIn)
return myDataOut->getLength();
}
+meta_vec_t OutputMemory::process_metadata(const meta_vec_t& metadataIn)
+{
+ myMetadata = metadataIn;
+ return {};
+}
+
+meta_vec_t OutputMemory::get_latest_metadata()
+{
+ return myMetadata;
+}
+
diff --git a/src/OutputMemory.h b/src/OutputMemory.h
index 715cb2d..f0a5fbb 100644
--- a/src/OutputMemory.h
+++ b/src/OutputMemory.h
@@ -2,7 +2,7 @@
Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -46,18 +46,26 @@
#include "ModPlugin.h"
-class OutputMemory : public ModOutput
+class OutputMemory : public ModOutput, public ModMetadata
{
public:
OutputMemory(Buffer* dataOut);
virtual ~OutputMemory();
- virtual int process(Buffer* dataIn);
- const char* name() { return "OutputMemory"; }
+ OutputMemory(OutputMemory& other) = delete;
+ OutputMemory& operator=(OutputMemory& other) = delete;
+
+ virtual int process(Buffer* dataIn) override;
+ const char* name() override { return "OutputMemory"; }
+ virtual meta_vec_t process_metadata(
+ const meta_vec_t& metadataIn) override;
+
+ meta_vec_t get_latest_metadata(void);
void setOutput(Buffer* dataOut);
protected:
Buffer* myDataOut;
+ meta_vec_t myMetadata;
#if OUTPUT_MEM_HISTOGRAM
// keep track of max value
diff --git a/src/TII.cpp b/src/TII.cpp
index 89cd6d0..3c5823b 100644
--- a/src/TII.cpp
+++ b/src/TII.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -187,7 +187,7 @@ int TII::process(Buffer* dataIn, Buffer* dataOut)
memset(dataOut->getData(), 0, dataOut->getLength());
if (m_conf.enable and m_insert) {
- boost::mutex::scoped_lock lock(m_enabled_carriers_mutex);
+ std::lock_guard<std::mutex> lock(m_enabled_carriers_mutex);
complexf* in = reinterpret_cast<complexf*>(dataIn->getData());
complexf* out = reinterpret_cast<complexf*>(dataOut->getData());
@@ -231,7 +231,7 @@ void TII::enable_carrier(int k) {
void TII::prepare_pattern() {
int comb = m_conf.comb; // Convert from unsigned to signed
- boost::mutex::scoped_lock lock(m_enabled_carriers_mutex);
+ std::lock_guard<std::mutex> lock(m_enabled_carriers_mutex);
// Clear previous pattern
for (size_t i = 0; i < m_enabled_carriers.size(); i++) {
diff --git a/src/TII.h b/src/TII.h
index b0ffdb3..b86dbbf 100644
--- a/src/TII.h
+++ b/src/TII.h
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -35,8 +35,8 @@
#include "ModPlugin.h"
#include "RemoteControl.h"
-#include <boost/thread.hpp>
-#include <sys/types.h>
+#include <cstddef>
+#include <thread>
#include <complex>
#include <vector>
#include <string>
@@ -118,7 +118,7 @@ class TII : public ModCodec, public RemoteControllable
// m_enabled_carriers is read by modulator thread, and written
// to by RC thread.
- mutable boost::mutex m_enabled_carriers_mutex;
+ mutable std::mutex m_enabled_carriers_mutex;
// m_enabled_carriers is true only for the first carrier in the
// active pair
diff --git a/src/TimestampDecoder.cpp b/src/TimestampDecoder.cpp
index 26deb60..6cf2875 100644
--- a/src/TimestampDecoder.cpp
+++ b/src/TimestampDecoder.cpp
@@ -37,6 +37,22 @@
//#define MDEBUG(fmt, args...) fprintf (LOG, "*****" fmt , ## args)
#define MDEBUG(fmt, args...) PDEBUG(fmt, ## args)
+TimestampDecoder::TimestampDecoder(double& offset_s, unsigned tist_delay_stages) :
+ RemoteControllable("tist"),
+ timestamp_offset(offset_s),
+ m_tist_delay_stages(tist_delay_stages)
+{
+ // Properly initialise temp_time
+ memset(&temp_time, 0, sizeof(temp_time));
+ const time_t timep = 0;
+ gmtime_r(&timep, &temp_time);
+
+ RC_ADD_PARAMETER(offset, "TIST offset [s]");
+ RC_ADD_PARAMETER(timestamp, "FCT and timestamp [s]");
+
+ etiLog.level(info) << "Setting up timestamp decoder with " <<
+ timestamp_offset << " offset";
+}
void TimestampDecoder::calculateTimestamp(frame_timestamp& ts)
{
@@ -48,6 +64,7 @@ void TimestampDecoder::calculateTimestamp(frame_timestamp& ts)
ts_queued->timestamp_sec = time_secs;
ts_queued->timestamp_pps = time_pps;
ts_queued->fct = latestFCT;
+ ts_queued->fp = latestFP;
ts_queued->timestamp_refresh = offset_changed;
offset_changed = false;
@@ -102,7 +119,29 @@ void TimestampDecoder::calculateTimestamp(frame_timestamp& ts)
//ts.print("calc2 ");
}
-void TimestampDecoder::pushMNSCData(int framephase, uint16_t mnsc)
+std::shared_ptr<frame_timestamp> TimestampDecoder::getTimestamp()
+{
+ std::shared_ptr<frame_timestamp> ts =
+ std::make_shared<frame_timestamp>();
+
+ /* Push new timestamp into queue */
+ ts->timestamp_valid = full_timestamp_received;
+ ts->timestamp_sec = time_secs;
+ ts->timestamp_pps = time_pps;
+ ts->fct = latestFCT;
+ ts->fp = latestFP;
+
+ ts->timestamp_refresh = offset_changed;
+ offset_changed = false;
+
+ MDEBUG("time_secs=%d, time_pps=%f\n", time_secs,
+ (double)time_pps / 16384000.0);
+ *ts += timestamp_offset;
+
+ return ts;
+}
+
+void TimestampDecoder::pushMNSCData(uint8_t framephase, uint16_t mnsc)
{
struct eti_MNSC_TIME_0 *mnsc0;
struct eti_MNSC_TIME_1 *mnsc1;
@@ -190,7 +229,7 @@ void TimestampDecoder::updateTimestampPPS(uint32_t pps)
}
void TimestampDecoder::updateTimestampEti(
- int framephase,
+ uint8_t framephase,
uint16_t mnsc,
uint32_t pps, // In units of 1/16384000 s
int32_t fct)
@@ -198,16 +237,19 @@ void TimestampDecoder::updateTimestampEti(
updateTimestampPPS(pps);
pushMNSCData(framephase, mnsc);
latestFCT = fct;
+ latestFP = framephase;
}
void TimestampDecoder::updateTimestampEdi(
uint32_t seconds_utc,
uint32_t pps, // In units of 1/16384000 s
- int32_t fct)
+ int32_t fct,
+ uint8_t framephase)
{
time_secs = seconds_utc;
time_pps = pps;
latestFCT = fct;
+ latestFP = framephase;
full_timestamp_received = true;
}
diff --git a/src/TimestampDecoder.h b/src/TimestampDecoder.h
index 2272fe0..0953f76 100644
--- a/src/TimestampDecoder.h
+++ b/src/TimestampDecoder.h
@@ -40,6 +40,7 @@ struct frame_timestamp
{
// Which frame count does this timestamp apply to
int32_t fct;
+ uint8_t fp; // Frame Phase
uint32_t timestamp_sec;
uint32_t timestamp_pps; // In units of 1/16384000 s
@@ -56,6 +57,7 @@ struct frame_timestamp
this->timestamp_valid = rhs.timestamp_valid;
this->timestamp_refresh = rhs.timestamp_refresh;
this->fct = rhs.fct;
+ this->fp = rhs.fp;
}
return *this;
@@ -115,49 +117,23 @@ struct frame_timestamp
class TimestampDecoder : public RemoteControllable
{
public:
- TimestampDecoder(
- /* The modulator adds this offset to the TIST to define time of
- * frame transmission
- */
- double& offset_s,
-
- /* Specifies by how many stages the timestamp must be delayed.
- * (e.g. The FIRFilter is pipelined, therefore we must increase
- * tist_delay_stages by one if the filter is used
- */
- unsigned tist_delay_stages) :
- RemoteControllable("tist"),
- timestamp_offset(offset_s)
- {
- m_tist_delay_stages = tist_delay_stages;
- inhibit_second_update = 0;
- time_pps = 0.0;
- time_secs = 0;
- latestFCT = 0;
- enableDecode = false;
- full_timestamp_received = false;
-
- // Properly initialise temp_time
- memset(&temp_time, 0, sizeof(temp_time));
- const time_t timep = 0;
- gmtime_r(&timep, &temp_time);
-
- offset_changed = false;
-
- RC_ADD_PARAMETER(offset, "TIST offset [s]");
- RC_ADD_PARAMETER(timestamp, "FCT and timestamp [s]");
-
- etiLog.level(info) << "Setting up timestamp decoder with " <<
- timestamp_offset << " offset";
-
- };
+ /* offset_s: The modulator adds this offset to the TIST to define time of
+ * frame transmission
+ *
+ * tist_delay_stages: Specifies by how many stages the timestamp must
+ * be delayed. (e.g. The FIRFilter is pipelined, therefore we must
+ * increase tist_delay_stages by one if the filter is used
+ */
+ TimestampDecoder(double& offset_s, unsigned tist_delay_stages);
/* Calculate the timestamp for the current frame. */
void calculateTimestamp(frame_timestamp& ts);
+ std::shared_ptr<frame_timestamp> getTimestamp(void);
+
/* Update timestamp data from ETI */
void updateTimestampEti(
- int framephase,
+ uint8_t framephase,
uint16_t mnsc,
uint32_t pps, // In units of 1/16384000 s
int32_t fct);
@@ -166,7 +142,8 @@ class TimestampDecoder : public RemoteControllable
void updateTimestampEdi(
uint32_t seconds_utc,
uint32_t pps, // In units of 1/16384000 s
- int32_t fct);
+ int32_t fct,
+ uint8_t framephase);
/*********** REMOTE CONTROL ***************/
@@ -183,7 +160,7 @@ class TimestampDecoder : public RemoteControllable
protected:
/* Push a new MNSC field into the decoder */
- void pushMNSCData(int framephase, uint16_t mnsc);
+ void pushMNSCData(uint8_t framephase, uint16_t mnsc);
/* Each frame contains the TIST field with the PPS offset.
* For each frame, this function must be called to update
@@ -203,21 +180,22 @@ class TimestampDecoder : public RemoteControllable
void updateTimestampSeconds(uint32_t secs);
struct tm temp_time;
- uint32_t time_secs;
- int32_t latestFCT;
- uint32_t time_pps;
+ uint32_t time_secs = 0;
+ int32_t latestFCT = 0;
+ uint32_t latestFP = 0;
+ uint32_t time_pps = 0;
double& timestamp_offset;
unsigned m_tist_delay_stages;
- int inhibit_second_update;
- bool offset_changed;
+ int inhibit_second_update = 0;
+ bool offset_changed = false;
/* When the type or identifier don't match, the decoder must
* be disabled
*/
- bool enableDecode;
+ bool enableDecode = false;
/* Disable timstamps until full time has been received */
- bool full_timestamp_received;
+ bool full_timestamp_received = false;
/* when pipelining, we must shift the calculated timestamps
* through this queue. Otherwise, it would not be possible to
@@ -225,6 +203,5 @@ class TimestampDecoder : public RemoteControllable
* FIRFilter (1 stage pipeline)
*/
std::queue<std::shared_ptr<frame_timestamp> > queue_timestamps;
-
};