summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/BlockPartitioner.cpp44
-rw-r--r--src/BlockPartitioner.h21
-rw-r--r--src/ConfigParser.cpp112
-rw-r--r--src/ConfigParser.h27
-rw-r--r--src/DabMod.cpp119
-rw-r--r--src/DabModulator.cpp69
-rw-r--r--src/DabModulator.h7
-rw-r--r--src/EtiReader.cpp27
-rw-r--r--src/EtiReader.h11
-rw-r--r--src/FIRFilter.cpp5
-rw-r--r--src/FIRFilter.h13
-rw-r--r--src/FicSource.cpp24
-rw-r--r--src/FicSource.h11
-rw-r--r--src/Flowgraph.cpp73
-rw-r--r--src/Flowgraph.h16
-rw-r--r--src/GainControl.cpp2
-rw-r--r--src/GainControl.h4
-rw-r--r--src/GuardIntervalInserter.cpp2
-rw-r--r--src/InputFileReader.cpp67
-rw-r--r--src/InputReader.h204
-rw-r--r--src/InputTcpReader.cpp7
-rw-r--r--src/InputZeroMQReader.cpp114
-rw-r--r--src/Log.cpp39
-rw-r--r--src/Log.h41
-rw-r--r--src/MemlessPoly.cpp5
-rw-r--r--src/MemlessPoly.h3
-rw-r--r--src/ModPlugin.cpp29
-rw-r--r--src/ModPlugin.h52
-rw-r--r--src/OfdmGenerator.cpp8
-rw-r--r--src/OutputFile.cpp65
-rw-r--r--src/OutputFile.h22
-rw-r--r--src/OutputMemory.cpp14
-rw-r--r--src/OutputMemory.h16
-rw-r--r--src/OutputSoapy.cpp287
-rw-r--r--src/OutputSoapy.h138
-rw-r--r--src/OutputUHD.cpp1035
-rw-r--r--src/OutputUHD.h250
-rw-r--r--src/OutputZeroMQ.cpp9
-rw-r--r--src/OutputZeroMQ.h7
-rw-r--r--src/Resampler.h1
-rw-r--r--src/Socket.h25
-rw-r--r--src/TII.cpp6
-rw-r--r--src/TII.h8
-rw-r--r--src/TimestampDecoder.cpp115
-rw-r--r--src/TimestampDecoder.h124
-rw-r--r--src/Utils.cpp16
-rw-r--r--src/Utils.h22
-rw-r--r--src/output/Feedback.cpp (renamed from src/OutputUHDFeedback.cpp)80
-rw-r--r--src/output/Feedback.h (renamed from src/OutputUHDFeedback.h)33
-rw-r--r--src/output/SDR.cpp425
-rw-r--r--src/output/SDR.h95
-rw-r--r--src/output/SDRDevice.h137
-rw-r--r--src/output/Soapy.cpp274
-rw-r--r--src/output/Soapy.h98
-rw-r--r--src/output/UHD.cpp500
-rw-r--r--src/output/UHD.h127
-rw-r--r--src/output/USRPTime.cpp283
-rw-r--r--src/output/USRPTime.h116
58 files changed, 2953 insertions, 2531 deletions
diff --git a/src/BlockPartitioner.cpp b/src/BlockPartitioner.cpp
index 9e9f80b..5767650 100644
--- a/src/BlockPartitioner.cpp
+++ b/src/BlockPartitioner.cpp
@@ -1,6 +1,11 @@
/*
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
*/
/*
This file is part of ODR-DabMod.
@@ -21,6 +26,7 @@
#include "BlockPartitioner.h"
#include "PcDebug.h"
+#include "Log.h"
#include <stdio.h>
#include <stdexcept>
@@ -31,6 +37,7 @@
BlockPartitioner::BlockPartitioner(unsigned mode, unsigned phase) :
ModMux(),
+ ModMetadata(),
d_mode(mode)
{
PDEBUG("BlockPartitioner::BlockPartitioner(%i)\n", mode);
@@ -68,17 +75,11 @@ BlockPartitioner::BlockPartitioner(unsigned mode, unsigned phase) :
d_cifNb = 0;
// For Synchronisation purpose, count nb of CIF to drop
d_cifPhase = phase % d_cifCount;
+ d_metaPhase = phase % d_cifCount;
d_cifSize = 864 * 8;
}
-BlockPartitioner::~BlockPartitioner()
-{
- PDEBUG("BlockPartitioner::~BlockPartitioner()\n");
-
-}
-
-
// dataIn[0] -> FIC
// dataIn[1] -> CIF
int BlockPartitioner::process(std::vector<Buffer*> dataIn, Buffer* dataOut)
@@ -124,10 +125,10 @@ int BlockPartitioner::process(std::vector<Buffer*> dataIn, Buffer* dataOut)
uint8_t* out = reinterpret_cast<uint8_t*>(dataOut->getData());
// Copy FIC data
- PDEBUG("Writting FIC %zu bytes to %zu\n", d_ficSize, d_cifNb * d_ficSize);
+ PDEBUG("Writing FIC %zu bytes to %zu\n", d_ficSize, d_cifNb * d_ficSize);
memcpy(out + (d_cifNb * d_ficSize), fic, d_ficSize);
// Copy CIF data
- PDEBUG("Writting CIF %u bytes to %zu\n", 864 * 8,
+ PDEBUG("Writing CIF %u bytes to %zu\n", 864 * 8,
(d_cifCount * d_ficSize) + (d_cifNb * 864 * 8));
memcpy(out + (d_cifCount * d_ficSize) + (d_cifNb * 864 * 8), cif, 864 * 8);
@@ -137,3 +138,28 @@ int BlockPartitioner::process(std::vector<Buffer*> dataIn, Buffer* dataOut)
return d_cifNb == 0;
}
+
+meta_vec_t BlockPartitioner::process_metadata(const meta_vec_t& metadataIn)
+{
+ // Synchronize CIF phase
+ if (d_metaPhase != 0) {
+ if (++d_metaPhase == d_cifCount) {
+ d_metaPhase = 0;
+ }
+ // Drop this metadata
+ return {};
+ }
+
+ if (d_cifNb == 1) {
+ d_meta.clear();
+ }
+
+ std::copy(metadataIn.begin(), metadataIn.end(), std::back_inserter(d_meta));
+
+ if (d_cifNb == 0) {
+ return d_meta;
+ }
+ else {
+ return {};
+ }
+}
diff --git a/src/BlockPartitioner.h b/src/BlockPartitioner.h
index 90cffa3..a4656a1 100644
--- a/src/BlockPartitioner.h
+++ b/src/BlockPartitioner.h
@@ -1,6 +1,11 @@
/*
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
*/
/*
This file is part of ODR-DabMod.
@@ -25,32 +30,32 @@
# include <config.h>
#endif
-
#include "ModPlugin.h"
#include <vector>
+#include <cstddef>
-#include <sys/types.h>
-
-
-class BlockPartitioner : public ModMux
+class BlockPartitioner : public ModMux, public ModMetadata
{
public:
BlockPartitioner(unsigned mode, unsigned phase);
- virtual ~BlockPartitioner();
- BlockPartitioner(const BlockPartitioner&);
- BlockPartitioner& operator=(const BlockPartitioner&);
int process(std::vector<Buffer*> dataIn, Buffer* dataOut);
const char* name() { return "BlockPartitioner"; }
+ // The implementation assumes process_metadata is always called after process
+ virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn);
+
protected:
int d_mode;
size_t d_ficSize;
size_t d_cifCount;
size_t d_cifNb;
size_t d_cifPhase;
+ size_t d_metaPhase;
size_t d_cifSize;
size_t d_outputFramesize;
size_t d_outputFramecount;
+
+ meta_vec_t d_meta;
};
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp
index 2c93a57..7603c1e 100644
--- a/src/ConfigParser.cpp
+++ b/src/ConfigParser.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -29,15 +29,17 @@
# include "config.h"
#endif
+#include <cstdint>
+#include <boost/property_tree/ptree.hpp>
+#include <boost/property_tree/ini_parser.hpp>
+
#include "ConfigParser.h"
#include "porting.h"
#include "Utils.h"
#include "Log.h"
#include "DabModulator.h"
+#include "output/SDR.h"
-#include <unistd.h>
-#include <boost/property_tree/ptree.hpp>
-#include <boost/property_tree/ini_parser.hpp>
using namespace std;
@@ -202,87 +204,101 @@ static void parse_configfile(
if (output_selected == "file") {
try {
mod_settings.outputName = pt.get<std::string>("fileoutput.filename");
+ mod_settings.fileOutputShowMetadata =
+ (pt.get("fileoutput.show_metadata", 0) > 0);
}
catch (std::exception &e) {
std::cerr << "Error: " << e.what() << "\n";
std::cerr << " Configuration does not specify file name for file output\n";
throw std::runtime_error("Configuration error");
}
- mod_settings.useFileOutput = 1;
+ mod_settings.useFileOutput = true;
mod_settings.fileOutputFormat = pt.get("fileoutput.format", mod_settings.fileOutputFormat);
}
#if defined(HAVE_OUTPUT_UHD)
else if (output_selected == "uhd") {
- OutputUHDConfig outputuhd_conf;
+ Output::SDRDeviceConfig sdr_device_config;
- outputuhd_conf.device = pt.get("uhdoutput.device", "");
- outputuhd_conf.usrpType = pt.get("uhdoutput.type", "");
- outputuhd_conf.subDevice = pt.get("uhdoutput.subdevice", "");
- outputuhd_conf.masterClockRate = pt.get<long>("uhdoutput.master_clock_rate", 0);
+ string device = pt.get("uhdoutput.device", "");
+ const auto usrpType = pt.get("uhdoutput.type", "");
+ if (usrpType != "") {
+ if (not device.empty()) {
+ device += ",";
+ }
+ device += "type=" + usrpType;
+ }
+ sdr_device_config.device = device;
+
+ sdr_device_config.subDevice = pt.get("uhdoutput.subdevice", "");
+ sdr_device_config.masterClockRate = pt.get<long>("uhdoutput.master_clock_rate", 0);
- if (outputuhd_conf.device.find("master_clock_rate") != std::string::npos) {
+ if (sdr_device_config.device.find("master_clock_rate") != std::string::npos) {
std::cerr << "Warning:"
"setting master_clock_rate in [uhd] device is deprecated !\n";
}
- if (outputuhd_conf.device.find("type=") != std::string::npos) {
+ if (sdr_device_config.device.find("type=") != std::string::npos) {
std::cerr << "Warning:"
"setting type in [uhd] device is deprecated !\n";
}
- outputuhd_conf.txgain = pt.get("uhdoutput.txgain", 0.0);
- outputuhd_conf.rxgain = pt.get("uhdoutput.rxgain", 0.0);
- outputuhd_conf.frequency = pt.get<double>("uhdoutput.frequency", 0);
+ sdr_device_config.txgain = pt.get("uhdoutput.txgain", 0.0);
+ sdr_device_config.tx_antenna = pt.get("uhdoutput.tx_antenna", "");
+ sdr_device_config.rx_antenna = pt.get("uhdoutput.rx_antenna", "RX2");
+ sdr_device_config.rxgain = pt.get("uhdoutput.rxgain", 0.0);
+ sdr_device_config.frequency = pt.get<double>("uhdoutput.frequency", 0);
std::string chan = pt.get<std::string>("uhdoutput.channel", "");
- outputuhd_conf.dabMode = mod_settings.dabMode;
+ sdr_device_config.dabMode = mod_settings.dabMode;
- if (outputuhd_conf.frequency == 0 && chan == "") {
+ if (sdr_device_config.frequency == 0 && chan == "") {
std::cerr << " UHD output enabled, but neither frequency nor channel defined.\n";
throw std::runtime_error("Configuration error");
}
- else if (outputuhd_conf.frequency == 0) {
- outputuhd_conf.frequency = parseChannel(chan);
+ else if (sdr_device_config.frequency == 0) {
+ sdr_device_config.frequency = parseChannel(chan);
}
- else if (outputuhd_conf.frequency != 0 && chan != "") {
+ else if (sdr_device_config.frequency != 0 && chan != "") {
std::cerr << " UHD output: cannot define both frequency and channel.\n";
throw std::runtime_error("Configuration error");
}
- outputuhd_conf.lo_offset = pt.get<double>("uhdoutput.lo_offset", 0);
+ sdr_device_config.lo_offset = pt.get<double>("uhdoutput.lo_offset", 0);
- outputuhd_conf.refclk_src = pt.get("uhdoutput.refclk_source", "internal");
- outputuhd_conf.pps_src = pt.get("uhdoutput.pps_source", "none");
- outputuhd_conf.pps_polarity = pt.get("uhdoutput.pps_polarity", "pos");
+ sdr_device_config.refclk_src = pt.get("uhdoutput.refclk_source", "internal");
+ sdr_device_config.pps_src = pt.get("uhdoutput.pps_source", "none");
+ sdr_device_config.pps_polarity = pt.get("uhdoutput.pps_polarity", "pos");
std::string behave = pt.get("uhdoutput.behaviour_refclk_lock_lost", "ignore");
if (behave == "crash") {
- outputuhd_conf.refclk_lock_loss_behaviour = CRASH;
+ sdr_device_config.refclk_lock_loss_behaviour = Output::CRASH;
}
else if (behave == "ignore") {
- outputuhd_conf.refclk_lock_loss_behaviour = IGNORE;
+ sdr_device_config.refclk_lock_loss_behaviour = Output::IGNORE;
}
else {
std::cerr << "Error: UHD output: behaviour_refclk_lock_lost invalid." << std::endl;
throw std::runtime_error("Configuration error");
}
- outputuhd_conf.maxGPSHoldoverTime = pt.get("uhdoutput.max_gps_holdover_time", 0);
+ sdr_device_config.maxGPSHoldoverTime = pt.get("uhdoutput.max_gps_holdover_time", 0);
- outputuhd_conf.dpdFeedbackServerPort = pt.get<long>("uhdoutput.dpd_port", 0);
+ sdr_device_config.dpdFeedbackServerPort = pt.get<long>("uhdoutput.dpd_port", 0);
- mod_settings.outputuhd_conf = outputuhd_conf;
- mod_settings.useUHDOutput = 1;
+ mod_settings.sdr_device_config = sdr_device_config;
+ mod_settings.useUHDOutput = true;
}
#endif
#if defined(HAVE_SOAPYSDR)
else if (output_selected == "soapysdr") {
- auto& outputsoapy_conf = mod_settings.outputsoapy_conf;
+ auto& outputsoapy_conf = mod_settings.sdr_device_config;
outputsoapy_conf.device = pt.get("soapyoutput.device", "");
outputsoapy_conf.masterClockRate = pt.get<long>("soapyoutput.master_clock_rate", 0);
outputsoapy_conf.txgain = pt.get("soapyoutput.txgain", 0.0);
+ outputsoapy_conf.tx_antenna = pt.get("soapyoutput.tx_antenna", "");
+ outputsoapy_conf.lo_offset = pt.get<double>("soapyoutput.lo_offset", 0.0);
outputsoapy_conf.frequency = pt.get<double>("soapyoutput.frequency", 0);
std::string chan = pt.get<std::string>("soapyoutput.channel", "");
outputsoapy_conf.dabMode = mod_settings.dabMode;
@@ -299,14 +315,16 @@ static void parse_configfile(
throw std::runtime_error("Configuration error");
}
- mod_settings.useSoapyOutput = 1;
+ outputsoapy_conf.dpdFeedbackServerPort = pt.get<long>("soapyoutput.dpd_port", 0);
+
+ mod_settings.useSoapyOutput = true;
}
#endif
#if defined(HAVE_ZEROMQ)
else if (output_selected == "zmq") {
mod_settings.outputName = pt.get<std::string>("zmqoutput.listen");
mod_settings.zmqOutputSocketType = pt.get<std::string>("zmqoutput.socket_type");
- mod_settings.useZeroMQOutput = 1;
+ mod_settings.useZeroMQOutput = true;
}
#endif
else {
@@ -315,9 +333,9 @@ static void parse_configfile(
}
#if defined(HAVE_OUTPUT_UHD)
- mod_settings.outputuhd_conf.enableSync = (pt.get("delaymanagement.synchronous", 0) == 1);
- mod_settings.outputuhd_conf.muteNoTimestamps = (pt.get("delaymanagement.mutenotimestamps", 0) == 1);
- if (mod_settings.outputuhd_conf.enableSync) {
+ mod_settings.sdr_device_config.enableSync = (pt.get("delaymanagement.synchronous", 0) == 1);
+ mod_settings.sdr_device_config.muteNoTimestamps = (pt.get("delaymanagement.mutenotimestamps", 0) == 1);
+ if (mod_settings.sdr_device_config.enableSync) {
std::string delay_mgmt = pt.get<std::string>("delaymanagement.management", "");
std::string fixedoffset = pt.get<std::string>("delaymanagement.fixedoffset", "");
std::string offset_filename = pt.get<std::string>("delaymanagement.dynamicoffsetfile", "");
@@ -388,11 +406,11 @@ void parse_args(int argc, char **argv, mod_settings_t& mod_settings)
}
#endif
mod_settings.outputName = optarg;
- mod_settings.useFileOutput = 1;
+ mod_settings.useFileOutput = true;
break;
case 'F':
#if defined(HAVE_OUTPUT_UHD)
- mod_settings.outputuhd_conf.frequency = strtof(optarg, NULL);
+ mod_settings.sdr_device_config.frequency = strtof(optarg, NULL);
#endif
break;
case 'g':
@@ -400,7 +418,7 @@ void parse_args(int argc, char **argv, mod_settings_t& mod_settings)
break;
case 'G':
#if defined(HAVE_OUTPUT_UHD)
- mod_settings.outputuhd_conf.txgain = strtod(optarg, NULL);
+ mod_settings.sdr_device_config.txgain = strtod(optarg, NULL);
#endif
break;
case 'l':
@@ -408,9 +426,7 @@ void parse_args(int argc, char **argv, mod_settings_t& mod_settings)
break;
case 'o':
mod_settings.tist_offset_s = strtod(optarg, NULL);
-#if defined(HAVE_OUTPUT_UHD)
- mod_settings.outputuhd_conf.enableSync = true;
-#endif
+ mod_settings.sdr_device_config.enableSync = true;
break;
case 'm':
mod_settings.dabMode = strtol(optarg, NULL, 0);
@@ -427,11 +443,13 @@ void parse_args(int argc, char **argv, mod_settings_t& mod_settings)
fprintf(stderr, "Options -u and -f are mutually exclusive\n");
throw std::invalid_argument("Invalid command line options");
}
- mod_settings.outputuhd_conf.device = optarg;
- mod_settings.outputuhd_conf.refclk_src = "internal";
- mod_settings.outputuhd_conf.pps_src = "none";
- mod_settings.outputuhd_conf.pps_polarity = "pos";
- mod_settings.useUHDOutput = 1;
+ mod_settings.sdr_device_config.device = optarg;
+ mod_settings.sdr_device_config.refclk_src = "internal";
+ mod_settings.sdr_device_config.pps_src = "none";
+ mod_settings.sdr_device_config.pps_polarity = "pos";
+ mod_settings.useUHDOutput = true;
+#else
+ throw std::invalid_argument("Cannot select UHD output, not compiled in!");
#endif
break;
case 'V':
diff --git a/src/ConfigParser.h b/src/ConfigParser.h
index 0be3558..dc5ac4f 100644
--- a/src/ConfigParser.h
+++ b/src/ConfigParser.h
@@ -34,23 +34,21 @@
#include <string>
#include "GainControl.h"
#include "TII.h"
-#if defined(HAVE_OUTPUT_UHD)
-# include "OutputUHD.h"
-#endif
-#if defined(HAVE_SOAPYSDR)
-# include "OutputSoapy.h"
-#endif
+#include "output/SDR.h"
+#include "output/UHD.h"
+#include "output/Soapy.h"
#define ZMQ_INPUT_MAX_FRAME_QUEUE 500
struct mod_settings_t {
std::string outputName;
- int useZeroMQOutput = 0;
+ bool useZeroMQOutput = false;
std::string zmqOutputSocketType = "";
- int useFileOutput = 0;
+ bool useFileOutput = false;
std::string fileOutputFormat = "complexf";
- int useUHDOutput = 0;
- int useSoapyOutput = 0;
+ bool fileOutputShowMetadata = false;
+ bool useUHDOutput = false;
+ bool useSoapyOutput = false;
size_t outputRate = 2048000;
size_t clockRate = 0;
@@ -61,7 +59,6 @@ struct mod_settings_t {
float gainmodeVariance = 4.0f;
// To handle the timestamp offset of the modulator
- unsigned tist_delay_stages = 1; // because GainControl is pipelined
double tist_offset_s = 0.0;
bool loop = false;
@@ -85,12 +82,8 @@ struct mod_settings_t {
// Settings for the OFDM windowing
unsigned ofdmWindowOverlap = 0;
-#if defined(HAVE_OUTPUT_UHD)
- OutputUHDConfig outputuhd_conf;
-#endif
-
-#if defined(HAVE_SOAPYSDR)
- OutputSoapyConfig outputsoapy_conf;
+#if defined(HAVE_OUTPUT_UHD) || defined(HAVE_SOAPYSDR)
+ Output::SDRDeviceConfig sdr_device_config;
#endif
};
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 25a93bf..8a0ee03 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -37,12 +37,9 @@
#include "OutputFile.h"
#include "FormatConverter.h"
#include "FrameMultiplexer.h"
-#if defined(HAVE_OUTPUT_UHD)
-# include "OutputUHD.h"
-#endif
-#if defined(HAVE_SOAPYSDR)
-# include "OutputSoapy.h"
-#endif
+#include "output/SDR.h"
+#include "output/UHD.h"
+#include "output/Soapy.h"
#include "OutputZeroMQ.h"
#include "InputReader.h"
#include "PcDebug.h"
@@ -95,18 +92,12 @@ void signalHandler(int signalNb)
struct modulator_data
{
- modulator_data() :
- inputReader(nullptr),
- framecount(0),
- flowgraph(nullptr),
- etiReader(nullptr) {}
-
- InputReader* inputReader;
+ std::shared_ptr<InputReader> inputReader;
Buffer data;
- uint64_t framecount;
+ uint64_t framecount = 0;
- Flowgraph* flowgraph;
- EtiReader* etiReader;
+ Flowgraph* flowgraph = nullptr;
+ EtiReader* etiReader = nullptr;
};
enum class run_modulator_state_t {
@@ -116,7 +107,7 @@ enum class run_modulator_state_t {
reconfigure // Some sort of change of configuration we cannot handle happened
};
-run_modulator_state_t run_modulator(modulator_data& m);
+static run_modulator_state_t run_modulator(modulator_data& m);
static void printModSettings(const mod_settings_t& mod_settings)
{
@@ -133,15 +124,15 @@ static void printModSettings(const mod_settings_t& mod_settings)
else if (mod_settings.useUHDOutput) {
fprintf(stderr, " UHD\n"
" Device: %s\n"
- " Type: %s\n"
+ " Subdevice: %s\n"
" master_clock_rate: %ld\n"
" refclk: %s\n"
" pps source: %s\n",
- mod_settings.outputuhd_conf.device.c_str(),
- mod_settings.outputuhd_conf.usrpType.c_str(),
- mod_settings.outputuhd_conf.masterClockRate,
- mod_settings.outputuhd_conf.refclk_src.c_str(),
- mod_settings.outputuhd_conf.pps_src.c_str());
+ mod_settings.sdr_device_config.device.c_str(),
+ mod_settings.sdr_device_config.subDevice.c_str(),
+ mod_settings.sdr_device_config.masterClockRate,
+ mod_settings.sdr_device_config.refclk_src.c_str(),
+ mod_settings.sdr_device_config.pps_src.c_str());
}
#endif
#if defined(HAVE_SOAPYSDR)
@@ -149,8 +140,8 @@ static void printModSettings(const mod_settings_t& mod_settings)
fprintf(stderr, " SoapySDR\n"
" Device: %s\n"
" master_clock_rate: %ld\n",
- mod_settings.outputsoapy_conf.device.c_str(),
- mod_settings.outputsoapy_conf.masterClockRate);
+ mod_settings.sdr_device_config.device.c_str(),
+ mod_settings.sdr_device_config.masterClockRate);
}
#endif
else if (mod_settings.useZeroMQOutput) {
@@ -180,7 +171,7 @@ static shared_ptr<ModOutput> prepare_output(
if (s.useFileOutput) {
if (s.fileOutputFormat == "complexf") {
- output = make_shared<OutputFile>(s.outputName);
+ output = make_shared<OutputFile>(s.outputName, s.fileOutputShowMetadata);
}
else if (s.fileOutputFormat == "complexf_normalised") {
if (s.gainMode == GainMode::GAIN_FIX)
@@ -189,7 +180,7 @@ static shared_ptr<ModOutput> prepare_output(
s.normalise = 1.0f / normalise_factor_file_max;
else if (s.gainMode == GainMode::GAIN_VAR)
s.normalise = 1.0f / normalise_factor_file_var;
- output = make_shared<OutputFile>(s.outputName);
+ output = make_shared<OutputFile>(s.outputName, s.fileOutputShowMetadata);
}
else if (s.fileOutputFormat == "s8" or
s.fileOutputFormat == "u8") {
@@ -198,7 +189,7 @@ static shared_ptr<ModOutput> prepare_output(
// [0; 255]
s.normalise = 127.0f / normalise_factor;
- output = make_shared<OutputFile>(s.outputName);
+ output = make_shared<OutputFile>(s.outputName, s.fileOutputShowMetadata);
}
else {
throw runtime_error("File output format " + s.fileOutputFormat +
@@ -208,18 +199,20 @@ static shared_ptr<ModOutput> prepare_output(
#if defined(HAVE_OUTPUT_UHD)
else if (s.useUHDOutput) {
s.normalise = 1.0f / normalise_factor;
- s.outputuhd_conf.sampleRate = s.outputRate;
- output = make_shared<OutputUHD>(s.outputuhd_conf);
- rcs.enrol((OutputUHD*)output.get());
+ s.sdr_device_config.sampleRate = s.outputRate;
+ auto uhddevice = make_shared<Output::UHD>(s.sdr_device_config);
+ output = make_shared<Output::SDR>(s.sdr_device_config, uhddevice);
+ rcs.enrol((Output::SDR*)output.get());
}
#endif
#if defined(HAVE_SOAPYSDR)
else if (s.useSoapyOutput) {
/* We normalise the same way as for the UHD output */
s.normalise = 1.0f / normalise_factor;
- s.outputsoapy_conf.sampleRate = s.outputRate;
- output = make_shared<OutputSoapy>(s.outputsoapy_conf);
- rcs.enrol((OutputSoapy*)output.get());
+ s.sdr_device_config.sampleRate = s.outputRate;
+ auto soapydevice = make_shared<Output::Soapy>(s.sdr_device_config);
+ output = make_shared<Output::SDR>(s.sdr_device_config, soapydevice);
+ rcs.enrol((Output::SDR*)output.get());
}
#endif
#if defined(HAVE_ZEROMQ)
@@ -270,16 +263,8 @@ int launch_modulator(int argc, char* argv[])
throw std::runtime_error("Configuration error");
}
- // When using the FIRFilter, increase the modulator offset pipelining delay
- // by the correct amount
- if (not mod_settings.filterTapsFilename.empty()) {
- mod_settings.tist_delay_stages += FIRFILTER_PIPELINE_DELAY;
- }
-
printModSettings(mod_settings);
- modulator_data m;
-
shared_ptr<FormatConverter> format_converter;
if (mod_settings.useFileOutput and
(mod_settings.fileOutputFormat == "s8" or
@@ -296,7 +281,7 @@ int launch_modulator(int argc, char* argv[])
set_thread_name("modulator");
if (mod_settings.inputTransport == "edi") {
- EdiReader ediReader(mod_settings.tist_offset_s, mod_settings.tist_delay_stages);
+ EdiReader ediReader(mod_settings.tist_offset_s);
EdiDecoder::ETIDecoder ediInput(ediReader, false);
if (mod_settings.edi_max_delay_ms > 0.0f) {
// setMaxDelay wants number of AF packets, which correspond to 24ms ETI frames
@@ -321,17 +306,6 @@ int launch_modulator(int argc, char* argv[])
flowgraph.connect(modulator, output);
}
-#if defined(HAVE_OUTPUT_UHD)
- if (mod_settings.useUHDOutput) {
- ((OutputUHD*)output.get())->setETISource(modulator->getEtiSource());
- }
-#endif
-#if defined(HAVE_SOAPYSDR)
- if (mod_settings.useSoapyOutput) {
- ((OutputSoapy*)output.get())->setETISource(modulator->getEtiSource());
- }
-#endif
-
size_t framecount = 0;
while (running) {
@@ -397,11 +371,12 @@ int launch_modulator(int argc, char* argv[])
while (run_again) {
Flowgraph flowgraph;
- m.inputReader = inputReader.get();
+ modulator_data m;
+ m.inputReader = inputReader;
m.flowgraph = &flowgraph;
m.data.setLength(6144);
- EtiReader etiReader(mod_settings.tist_offset_s, mod_settings.tist_delay_stages);
+ EtiReader etiReader(mod_settings.tist_offset_s);
m.etiReader = &etiReader;
auto input = make_shared<InputMemory>(&m.data);
@@ -415,17 +390,6 @@ int launch_modulator(int argc, char* argv[])
flowgraph.connect(modulator, output);
}
-#if defined(HAVE_OUTPUT_UHD)
- if (mod_settings.useUHDOutput) {
- ((OutputUHD*)output.get())->setETISource(modulator->getEtiSource());
- }
-#endif
-#if defined(HAVE_SOAPYSDR)
- if (mod_settings.useSoapyOutput) {
- ((OutputSoapy*)output.get())->setETISource(modulator->getEtiSource());
- }
-#endif
-
inputReader->PrintInfo();
run_modulator_state_t st = run_modulator(m);
@@ -490,7 +454,7 @@ int launch_modulator(int argc, char* argv[])
return ret;
}
-run_modulator_state_t run_modulator(modulator_data& m)
+static run_modulator_state_t run_modulator(modulator_data& m)
{
auto ret = run_modulator_state_t::failure;
try {
@@ -527,13 +491,26 @@ run_modulator_state_t run_modulator(modulator_data& m)
}
}
if (framesize == 0) {
- etiLog.level(info) << "End of file reached.";
+ if (dynamic_pointer_cast<InputFileReader>(m.inputReader)) {
+ etiLog.level(info) << "End of file reached.";
+ running = 0;
+ ret = run_modulator_state_t::normal_end;
+ }
+#if defined(HAVE_ZEROMQ)
+ else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) {
+ /* An empty frame marks a timeout. We ignore it, but we are
+ * now able to handle SIGINT properly.
+ */
+ }
+#endif // defined(HAVE_ZEROMQ)
+ // No need to handle the TCP input in a special way to get SIGINT working,
+ // because recv() will return with EINTR.
}
else {
etiLog.level(error) << "Input read error.";
+ running = 0;
+ ret = run_modulator_state_t::normal_end;
}
- running = 0;
- ret = run_modulator_state_t::normal_end;
}
}
catch (const zmq_input_overflow& e) {
diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp
index 8ba8af6..5b77d18 100644
--- a/src/DabModulator.cpp
+++ b/src/DabModulator.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -162,10 +162,13 @@ int DabModulator::process(Buffer* dataOut)
}
}
- auto cifCicEq = make_shared<CicEqualizer>(
+ shared_ptr<CicEqualizer> cifCicEq;
+ if (useCicEq) {
+ cifCicEq = make_shared<CicEqualizer>(
myNbCarriers,
(float)mySpacing * (float)m_settings.outputRate / 2048000.0f,
cic_ratio);
+ }
shared_ptr<TII> tii;
shared_ptr<PhaseReference> tiiRef;
@@ -218,7 +221,7 @@ int DabModulator::process(Buffer* dataOut)
rcs.enrol(cifPoly.get());
}
- auto myOutput = make_shared<OutputMemory>(dataOut);
+ myOutput = make_shared<OutputMemory>(dataOut);
shared_ptr<Resampler> cifRes;
if (m_settings.outputRate != 2048000) {
@@ -347,43 +350,24 @@ int DabModulator::process(Buffer* dataOut)
myFlowgraph->connect(tii, cifSig);
}
- if (useCicEq) {
- myFlowgraph->connect(cifSig, cifCicEq);
- myFlowgraph->connect(cifCicEq, cifOfdm);
- }
- else {
- myFlowgraph->connect(cifSig, cifOfdm);
- }
- myFlowgraph->connect(cifOfdm, cifGain);
- myFlowgraph->connect(cifGain, cifGuard);
-
- auto cifOut = cifPoly ?
- static_pointer_cast<ModPlugin>(cifPoly) :
- static_pointer_cast<ModPlugin>(myOutput);
-
- if (cifFilter) {
- myFlowgraph->connect(cifGuard, cifFilter);
- if (cifRes) {
- myFlowgraph->connect(cifFilter, cifRes);
- myFlowgraph->connect(cifRes, cifOut);
- }
- else {
- myFlowgraph->connect(cifFilter, cifOut);
- }
- }
- else {
- if (cifRes) {
- myFlowgraph->connect(cifGuard, cifRes);
- myFlowgraph->connect(cifRes, cifOut);
- }
- else {
- myFlowgraph->connect(cifGuard, cifOut);
+ shared_ptr<ModPlugin> prev_plugin = static_pointer_cast<ModPlugin>(cifSig);
+ const std::list<shared_ptr<ModPlugin> > plugins({
+ static_pointer_cast<ModPlugin>(cifCicEq),
+ static_pointer_cast<ModPlugin>(cifOfdm),
+ static_pointer_cast<ModPlugin>(cifGain),
+ static_pointer_cast<ModPlugin>(cifGuard),
+ static_pointer_cast<ModPlugin>(cifFilter), // optional block
+ static_pointer_cast<ModPlugin>(cifRes), // optional block
+ static_pointer_cast<ModPlugin>(cifPoly), // optional block
+ static_pointer_cast<ModPlugin>(myOutput),
+ });
+
+ for (auto& p : plugins) {
+ if (p) {
+ myFlowgraph->connect(prev_plugin, p);
+ prev_plugin = p;
}
}
-
- if (cifPoly) {
- myFlowgraph->connect(cifPoly, myOutput);
- }
}
////////////////////////////////////////////////////////////////////
@@ -392,3 +376,12 @@ int DabModulator::process(Buffer* dataOut)
return myFlowgraph->run();
}
+meta_vec_t DabModulator::process_metadata(const meta_vec_t& metadataIn)
+{
+ if (myOutput) {
+ return myOutput->get_latest_metadata();
+ }
+
+ return {};
+}
+
diff --git a/src/DabModulator.h b/src/DabModulator.h
index 2d7bc35..e84ce96 100644
--- a/src/DabModulator.h
+++ b/src/DabModulator.h
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -46,7 +46,7 @@
#include "TII.h"
-class DabModulator : public ModInput
+class DabModulator : public ModInput, public ModMetadata
{
public:
DabModulator(EtiSource& etiSource,
@@ -55,6 +55,9 @@ public:
int process(Buffer* dataOut);
const char* name() { return "DabModulator"; }
+ virtual meta_vec_t process_metadata(
+ const meta_vec_t& metadataIn);
+
/* Required to get the timestamp */
EtiSource* getEtiSource() { return &myEtiSource; }
diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp
index d84ed1f..17f4953 100644
--- a/src/EtiReader.cpp
+++ b/src/EtiReader.cpp
@@ -54,10 +54,9 @@ enum ETI_READER_STATE {
EtiReader::EtiReader(
- double& tist_offset_s,
- unsigned tist_delay_stages) :
+ double& tist_offset_s) :
state(EtiReaderStateSync),
- myTimestampDecoder(tist_offset_s, tist_delay_stages),
+ myTimestampDecoder(tist_offset_s),
eti_fc_valid(false)
{
rcs.enrol(&myTimestampDecoder);
@@ -281,6 +280,8 @@ int EtiReader::loadEtiData(const Buffer& dataIn)
myTimestampDecoder.updateTimestampEti(eti_fc.FP & 0x3,
eti_eoh.MNSC, getPPSOffset(), eti_fc.FCT);
+ myFicSource->loadTimestamp(myTimestampDecoder.getTimestamp());
+
return dataIn.getLength() - input_size;
}
@@ -290,11 +291,6 @@ bool EtiReader::sourceContainsTimestamp()
/* See ETS 300 799, Annex C.2.2 */
}
-void EtiReader::calculateTimestamp(struct frame_timestamp& ts)
-{
- myTimestampDecoder.calculateTimestamp(ts);
-}
-
uint32_t EtiReader::getPPSOffset()
{
if (!sourceContainsTimestamp()) {
@@ -309,9 +305,8 @@ uint32_t EtiReader::getPPSOffset()
}
EdiReader::EdiReader(
- double& tist_offset_s,
- unsigned tist_delay_stages) :
- m_timestamp_decoder(tist_offset_s, tist_delay_stages)
+ double& tist_offset_s) :
+ m_timestamp_decoder(tist_offset_s)
{
rcs.enrol(&m_timestamp_decoder);
}
@@ -359,11 +354,6 @@ bool EdiReader::sourceContainsTimestamp()
return m_fc.tsta != 0xFFFFFF;
}
-void EdiReader::calculateTimestamp(struct frame_timestamp& ts)
-{
- m_timestamp_decoder.calculateTimestamp(ts);
-}
-
bool EdiReader::isFrameReady()
{
return m_frameReady;
@@ -533,8 +523,9 @@ void EdiReader::assemble()
const std::time_t posix_timestamp_1_jan_2000 = 946684800;
auto utc_ts = posix_timestamp_1_jan_2000 + m_seconds - m_utco;
- m_timestamp_decoder.updateTimestampEdi(
- utc_ts, m_fc.tsta, m_fc.fct());
+ m_timestamp_decoder.updateTimestampEdi(utc_ts, m_fc.tsta, m_fc.fct(), m_fc.fp);
+
+ myFicSource->loadTimestamp(m_timestamp_decoder.getTimestamp());
m_frameReady = true;
}
diff --git a/src/EtiReader.h b/src/EtiReader.h
index f3a9764..8270592 100644
--- a/src/EtiReader.h
+++ b/src/EtiReader.h
@@ -59,7 +59,6 @@ public:
/* Returns true if we have valid time stamps in the ETI*/
virtual bool sourceContainsTimestamp() = 0;
- virtual void calculateTimestamp(struct frame_timestamp& ts) = 0;
/* Return the FIC source to be used for modulation */
virtual std::shared_ptr<FicSource>& getFic(void);
@@ -75,9 +74,7 @@ protected:
class EtiReader : public EtiSource
{
public:
- EtiReader(
- double& tist_offset_s,
- unsigned tist_delay_stages);
+ EtiReader(double& tist_offset_s);
virtual unsigned getMode();
virtual unsigned getFp();
@@ -88,7 +85,6 @@ public:
int loadEtiData(const Buffer& dataIn);
virtual bool sourceContainsTimestamp();
- virtual void calculateTimestamp(struct frame_timestamp& ts);
virtual const std::vector<std::shared_ptr<SubchannelSource> > getSubchannels() const;
@@ -118,14 +114,11 @@ private:
class EdiReader : public EtiSource, public EdiDecoder::DataCollector
{
public:
- EdiReader(
- double& tist_offset_s,
- unsigned tist_delay_stages);
+ EdiReader(double& tist_offset_s);
virtual unsigned getMode();
virtual unsigned getFp();
virtual bool sourceContainsTimestamp();
- virtual void calculateTimestamp(struct frame_timestamp& ts);
virtual const std::vector<std::shared_ptr<SubchannelSource> > getSubchannels() const;
virtual bool isFrameReady(void);
diff --git a/src/FIRFilter.cpp b/src/FIRFilter.cpp
index bc2314a..96ad1b9 100644
--- a/src/FIRFilter.cpp
+++ b/src/FIRFilter.cpp
@@ -87,6 +87,11 @@ FIRFilter::FIRFilter(const std::string& taps_file) :
start_pipeline_thread();
}
+FIRFilter::~FIRFilter()
+{
+ stop_pipeline_thread();
+}
+
void FIRFilter::load_filter_taps(const std::string &tapsFile)
{
std::vector<float> filter_taps;
diff --git a/src/FIRFilter.h b/src/FIRFilter.h
index a63bfb9..d04c456 100644
--- a/src/FIRFilter.h
+++ b/src/FIRFilter.h
@@ -53,20 +53,21 @@ class FIRFilter : public PipelinedModCodec, public RemoteControllable
{
public:
FIRFilter(const std::string& taps_file);
- virtual ~FIRFilter() = default;
+ FIRFilter(const FIRFilter& other) = delete;
+ FIRFilter& operator=(const FIRFilter& other) = delete;
+ virtual ~FIRFilter();
- const char* name() { return "FIRFilter"; }
+ const char* name() override { return "FIRFilter"; }
/******* REMOTE CONTROL ********/
virtual void set_parameter(const std::string& parameter,
- const std::string& value);
+ const std::string& value) override;
virtual const std::string get_parameter(
- const std::string& parameter) const;
-
+ const std::string& parameter) const override;
protected:
- virtual int internal_process(Buffer* const dataIn, Buffer* dataOut);
+ virtual int internal_process(Buffer* const dataIn, Buffer* dataOut) override;
void load_filter_taps(const std::string &tapsFile);
std::string m_taps_file;
diff --git a/src/FicSource.cpp b/src/FicSource.cpp
index 04197db..2b95085 100644
--- a/src/FicSource.cpp
+++ b/src/FicSource.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -26,6 +26,8 @@
#include "FicSource.h"
#include "PcDebug.h"
+#include "Log.h"
+#include "TimestampDecoder.h"
#include <stdexcept>
#include <string>
@@ -92,3 +94,23 @@ int FicSource::process(Buffer* outputData)
return outputData->getLength();
}
+void FicSource::loadTimestamp(const std::shared_ptr<struct frame_timestamp>& ts)
+{
+ d_ts = ts;
+}
+
+
+meta_vec_t FicSource::process_metadata(const meta_vec_t& metadataIn)
+{
+ if (not d_ts) {
+ return {};
+ }
+
+ using namespace std;
+ meta_vec_t md_vec;
+ flowgraph_metadata meta;
+ meta.ts = d_ts;
+ md_vec.push_back(meta);
+ return md_vec;
+}
+
diff --git a/src/FicSource.h b/src/FicSource.h
index 77ac741..93c1a7f 100644
--- a/src/FicSource.h
+++ b/src/FicSource.h
@@ -36,7 +36,7 @@
#include <vector>
#include <sys/types.h>
-class FicSource : public ModInput
+class FicSource : public ModInput, public ModMetadata
{
public:
FicSource(unsigned ficf, unsigned mid);
@@ -45,12 +45,17 @@ public:
const std::vector<PuncturingRule>& get_rules();
void loadFicData(const Buffer& fic);
- int process(Buffer* outputData);
- const char* name() { return "FicSource"; }
+ int process(Buffer* outputData) override;
+ const char* name() override { return "FicSource"; }
+
+ void loadTimestamp(const std::shared_ptr<struct frame_timestamp>& ts);
+ virtual meta_vec_t process_metadata(
+ const meta_vec_t& metadataIn) override;
private:
size_t d_framesize;
Buffer d_buffer;
+ std::shared_ptr<struct frame_timestamp> d_ts;
std::vector<PuncturingRule> d_puncturing_rules;
};
diff --git a/src/Flowgraph.cpp b/src/Flowgraph.cpp
index 465ef41..506832c 100644
--- a/src/Flowgraph.cpp
+++ b/src/Flowgraph.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -26,6 +26,7 @@
#include "Flowgraph.h"
#include "PcDebug.h"
+#include "Log.h"
#include <string>
#include <memory>
#include <algorithm>
@@ -57,9 +58,10 @@ Node::~Node()
assert(myOutputBuffers.size() == 0);
}
-void Node::addOutputBuffer(Buffer::sptr& buffer)
+void Node::addOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
myOutputBuffers.push_back(buffer);
+ myOutputMetadata.push_back(md);
#if DEBUG
std::string fname = string(myPlugin->name()) +
"-" + to_string(myDebugFiles.size()) +
@@ -71,7 +73,7 @@ void Node::addOutputBuffer(Buffer::sptr& buffer)
#endif
}
-void Node::removeOutputBuffer(Buffer::sptr& buffer)
+void Node::removeOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
auto it = std::find(
myOutputBuffers.begin(),
@@ -89,14 +91,23 @@ void Node::removeOutputBuffer(Buffer::sptr& buffer)
#endif
myOutputBuffers.erase(it);
}
+
+ auto mdit = std::find(
+ myOutputMetadata.begin(),
+ myOutputMetadata.end(),
+ md);
+ if (mdit != myOutputMetadata.end()) {
+ myOutputMetadata.erase(mdit);
+ }
}
-void Node::addInputBuffer(Buffer::sptr& buffer)
+void Node::addInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
myInputBuffers.push_back(buffer);
+ myInputMetadata.push_back(md);
}
-void Node::removeInputBuffer(Buffer::sptr& buffer)
+void Node::removeInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
auto it = std::find(
myInputBuffers.begin(),
@@ -105,6 +116,14 @@ void Node::removeInputBuffer(Buffer::sptr& buffer)
if (it != myInputBuffers.end()) {
myInputBuffers.erase(it);
}
+
+ auto mdit = std::find(
+ myInputMetadata.begin(),
+ myInputMetadata.end(),
+ md);
+ if (mdit != myInputMetadata.end()) {
+ myInputMetadata.erase(mdit);
+ }
}
int Node::process()
@@ -127,6 +146,37 @@ int Node::process()
}
int ret = myPlugin->process(inBuffers, outBuffers);
+
+ // Collect all incoming metadata into a single vector
+ meta_vec_t all_input_mds;
+ for (auto& md_vec_sp : myInputMetadata) {
+ if (md_vec_sp) {
+ move(md_vec_sp->begin(), md_vec_sp->end(),
+ back_inserter(all_input_mds));
+ md_vec_sp->clear();
+ }
+ }
+
+ auto mod_meta = dynamic_pointer_cast<ModMetadata>(myPlugin);
+ if (mod_meta) {
+ auto outputMetadata = mod_meta->process_metadata(all_input_mds);
+ // Distribute the result metadata to all outputs
+ for (auto& out_md : myOutputMetadata) {
+ out_md->clear();
+ std::move(outputMetadata.begin(), outputMetadata.end(),
+ std::back_inserter(*out_md));
+ }
+ }
+ else {
+ // Propagate the unmodified input metadata to all outputs
+ for (auto& out_md : myOutputMetadata) {
+ out_md->clear();
+ std::move(all_input_mds.begin(), all_input_mds.end(),
+ std::back_inserter(*out_md));
+ }
+ }
+
+
#if DEBUG
assert(myDebugFiles.size() == myOutputBuffers.size());
@@ -158,8 +208,10 @@ Edge::Edge(shared_ptr<Node>& srcNode, shared_ptr<Node>& dstNode) :
this);
myBuffer = make_shared<Buffer>();
- srcNode->addOutputBuffer(myBuffer);
- dstNode->addInputBuffer(myBuffer);
+ myMetadata = make_shared<vector<flowgraph_metadata> >();
+
+ srcNode->addOutputBuffer(myBuffer, myMetadata);
+ dstNode->addInputBuffer(myBuffer, myMetadata);
}
@@ -168,8 +220,8 @@ Edge::~Edge()
PDEBUG("Edge::~Edge() @ %p\n", this);
if (myBuffer) {
- mySrcNode->removeOutputBuffer(myBuffer);
- myDstNode->removeInputBuffer(myBuffer);
+ mySrcNode->removeOutputBuffer(myBuffer, myMetadata);
+ myDstNode->removeInputBuffer(myBuffer, myMetadata);
}
}
@@ -186,9 +238,8 @@ Flowgraph::~Flowgraph()
{
PDEBUG("Flowgraph::~Flowgraph() @ %p\n", this);
- stringstream ss;
-
if (myProcessTime) {
+ stringstream ss;
ss << "Process time:\n";
char node_time_sz[1024] = {};
diff --git a/src/Flowgraph.h b/src/Flowgraph.h
index ebb7314..b074ee6 100644
--- a/src/Flowgraph.h
+++ b/src/Flowgraph.h
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -39,6 +39,8 @@
#include <list>
#include <cstdio>
+using Metadata_vec_sptr = std::shared_ptr<std::vector<flowgraph_metadata> >;
+
class Node
{
public:
@@ -55,15 +57,18 @@ public:
myProcessTime += processTime;
}
- void addOutputBuffer(Buffer::sptr& buffer);
- void removeOutputBuffer(Buffer::sptr& buffer);
+ void addOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md);
+ void removeOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md);
- void addInputBuffer(Buffer::sptr& buffer);
- void removeInputBuffer(Buffer::sptr& buffer);
+ void addInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md);
+ void removeInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md);
protected:
std::list<Buffer::sptr> myInputBuffers;
std::list<Buffer::sptr> myOutputBuffers;
+ std::list<Metadata_vec_sptr> myInputMetadata;
+ std::list<Metadata_vec_sptr> myOutputMetadata;
+
#if DEBUG
std::list<FILE*> myDebugFiles;
#endif
@@ -85,6 +90,7 @@ protected:
std::shared_ptr<Node> mySrcNode;
std::shared_ptr<Node> myDstNode;
std::shared_ptr<Buffer> myBuffer;
+ std::shared_ptr<std::vector<flowgraph_metadata> > myMetadata;
};
diff --git a/src/GainControl.cpp b/src/GainControl.cpp
index 0411482..5657fc2 100644
--- a/src/GainControl.cpp
+++ b/src/GainControl.cpp
@@ -75,7 +75,7 @@ GainControl::GainControl(size_t framesize,
GainControl::~GainControl()
{
- PDEBUG("GainControl::~GainControl() @ %p\n", this);
+ stop_pipeline_thread();
}
int GainControl::internal_process(Buffer* const dataIn, Buffer* dataOut)
diff --git a/src/GainControl.h b/src/GainControl.h
index e9eaa8c..b4579cd 100644
--- a/src/GainControl.h
+++ b/src/GainControl.h
@@ -57,8 +57,8 @@ class GainControl : public PipelinedModCodec, public RemoteControllable
float varVariance);
virtual ~GainControl();
- GainControl(const GainControl&);
- GainControl& operator=(const GainControl&);
+ GainControl(const GainControl&) = delete;
+ GainControl& operator=(const GainControl&) = delete;
const char* name() override { return "GainControl"; }
diff --git a/src/GuardIntervalInserter.cpp b/src/GuardIntervalInserter.cpp
index afb9213..79692f5 100644
--- a/src/GuardIntervalInserter.cpp
+++ b/src/GuardIntervalInserter.cpp
@@ -97,8 +97,6 @@ void GuardIntervalInserter::update_window(size_t new_window_overlap)
}
}
-#pragma GCC optimize ("O0")
-
int GuardIntervalInserter::process(Buffer* const dataIn, Buffer* dataOut)
{
PDEBUG("GuardIntervalInserter::process(dataIn: %p, dataOut: %p)\n",
diff --git a/src/InputFileReader.cpp b/src/InputFileReader.cpp
index 84f0be4..5e93477 100644
--- a/src/InputFileReader.cpp
+++ b/src/InputFileReader.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyrigth (C) 2013
+ Copyrigth (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
@@ -47,28 +47,29 @@ int InputFileReader::Open(std::string filename, bool loop)
{
filename_ = filename;
loop_ = loop;
- inputfile_ = fopen(filename_.c_str(), "r");
- if (inputfile_ == NULL) {
+ FILE* fd = fopen(filename_.c_str(), "r");
+ if (fd == nullptr) {
etiLog.level(error) << "Unable to open input file!";
perror(filename_.c_str());
return -1;
}
+ inputfile_.reset(fd);
return IdentifyType();
}
int InputFileReader::Rewind()
{
- rewind(inputfile_); // Also clears the EOF flag
+ rewind(inputfile_.get()); // Also clears the EOF flag
return IdentifyType();
}
int InputFileReader::IdentifyType()
{
- EtiStreamType streamType = ETI_STREAM_TYPE_NONE;
+ EtiStreamType streamType = EtiStreamType::None;
struct stat inputFileStat;
- fstat(fileno(inputfile_), &inputFileStat);
+ fstat(fileno(inputfile_.get()), &inputFileStat);
inputfilelength_ = inputFileStat.st_size;
uint32_t sync;
@@ -77,22 +78,22 @@ int InputFileReader::IdentifyType()
char discard_buffer[6144];
- if (fread(&sync, sizeof(sync), 1, inputfile_) != 1) {
+ if (fread(&sync, sizeof(sync), 1, inputfile_.get()) != 1) {
etiLog.level(error) << "Unable to read sync in input file!";
perror(filename_.c_str());
return -1;
}
if ((sync == 0x49c5f8ff) || (sync == 0xb63a07ff)) {
- streamType = ETI_STREAM_TYPE_RAW;
+ streamType = EtiStreamType::Raw;
if (inputfilelength_ > 0) {
nbframes_ = inputfilelength_ / 6144;
}
else {
nbframes_ = ~0;
}
- if (fseek(inputfile_, -sizeof(sync), SEEK_CUR) != 0) {
+ if (fseek(inputfile_.get(), -sizeof(sync), SEEK_CUR) != 0) {
// if the seek fails, consume the rest of the frame
- if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_)
+ if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_.get())
!= 1) {
etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
@@ -104,7 +105,7 @@ int InputFileReader::IdentifyType()
}
nbFrames = sync;
- if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) {
+ if (fread(&frameSize, sizeof(frameSize), 1, inputfile_.get()) != 1) {
etiLog.level(error) << "Unable to read frame size in input file!";
perror(filename_.c_str());
return -1;
@@ -114,7 +115,7 @@ int InputFileReader::IdentifyType()
sync |= ((uint32_t)frameSize) << 16;
if ((sync == 0x49c5f8ff) || (sync == 0xb63a07ff)) {
- streamType = ETI_STREAM_TYPE_STREAMED;
+ streamType = EtiStreamType::Streamed;
frameSize = nbFrames & 0xffff;
if (inputfilelength_ > 0) {
nbframes_ = inputfilelength_ / (frameSize + 2);
@@ -122,9 +123,9 @@ int InputFileReader::IdentifyType()
else {
nbframes_ = ~0;
}
- if (fseek(inputfile_, -6, SEEK_CUR) != 0) {
+ if (fseek(inputfile_.get(), -6, SEEK_CUR) != 0) {
// if the seek fails, consume the rest of the frame
- if (fread(discard_buffer, frameSize - 4, 1, inputfile_)
+ if (fread(discard_buffer, frameSize - 4, 1, inputfile_.get())
!= 1) {
etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
@@ -135,16 +136,16 @@ int InputFileReader::IdentifyType()
return 0;
}
- if (fread(&sync, sizeof(sync), 1, inputfile_) != 1) {
+ if (fread(&sync, sizeof(sync), 1, inputfile_.get()) != 1) {
etiLog.level(error) << "Unable to read nb frame in input file!";
perror(filename_.c_str());
return -1;
}
if ((sync == 0x49c5f8ff) || (sync == 0xb63a07ff)) {
- streamType = ETI_STREAM_TYPE_FRAMED;
- if (fseek(inputfile_, -6, SEEK_CUR) != 0) {
+ streamType = EtiStreamType::Framed;
+ if (fseek(inputfile_.get(), -6, SEEK_CUR) != 0) {
// if the seek fails, consume the rest of the frame
- if (fread(discard_buffer, frameSize - 4, 1, inputfile_)
+ if (fread(discard_buffer, frameSize - 4, 1, inputfile_.get())
!= 1) {
etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
@@ -160,21 +161,21 @@ int InputFileReader::IdentifyType()
for (size_t i = 10; i < 6144 + 10; ++i) {
sync >>= 8;
sync &= 0xffffff;
- if (fread((uint8_t*)&sync + 3, 1, 1, inputfile_) != 1) {
+ if (fread((uint8_t*)&sync + 3, 1, 1, inputfile_.get()) != 1) {
etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
return -1;
}
if ((sync == 0x49c5f8ff) || (sync == 0xb63a07ff)) {
- streamType = ETI_STREAM_TYPE_RAW;
+ streamType = EtiStreamType::Raw;
if (inputfilelength_ > 0) {
nbframes_ = (inputfilelength_ - i) / 6144;
}
else {
nbframes_ = ~0;
}
- if (fseek(inputfile_, -sizeof(sync), SEEK_CUR) != 0) {
- if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_)
+ if (fseek(inputfile_.get(), -sizeof(sync), SEEK_CUR) != 0) {
+ if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_.get())
!= 1) {
etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
@@ -190,17 +191,17 @@ int InputFileReader::IdentifyType()
return -1;
}
-void InputFileReader::PrintInfo()
+void InputFileReader::PrintInfo() const
{
fprintf(stderr, "Input file format: ");
switch (streamtype_) {
- case ETI_STREAM_TYPE_RAW:
+ case EtiStreamType::Raw:
fprintf(stderr, "raw");
break;
- case ETI_STREAM_TYPE_STREAMED:
+ case EtiStreamType::Streamed:
fprintf(stderr, "streamed");
break;
- case ETI_STREAM_TYPE_FRAMED:
+ case EtiStreamType::Framed:
fprintf(stderr, "framed");
break;
default:
@@ -221,15 +222,15 @@ int InputFileReader::GetNextFrame(void* buffer)
{
uint16_t frameSize;
- if (streamtype_ == ETI_STREAM_TYPE_RAW) {
+ if (streamtype_ == EtiStreamType::Raw) {
frameSize = 6144;
}
else {
- if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) {
+ if (fread(&frameSize, sizeof(frameSize), 1, inputfile_.get()) != 1) {
etiLog.level(error) << "Reached end of file.";
if (loop_) {
if (Rewind() == 0) {
- if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) {
+ if (fread(&frameSize, sizeof(frameSize), 1, inputfile_.get()) != 1) {
PDEBUG("Error after rewinding file!\n");
etiLog.level(error) << "Error after rewinding file!";
return -1;
@@ -252,15 +253,15 @@ int InputFileReader::GetNextFrame(void* buffer)
}
PDEBUG("Frame size: %u\n", frameSize);
- size_t read_bytes = fread(buffer, 1, frameSize, inputfile_);
+ size_t read_bytes = fread(buffer, 1, frameSize, inputfile_.get());
if ( loop_ &&
- streamtype_ == ETI_STREAM_TYPE_RAW && //implies frameSize == 6144
- read_bytes == 0 && feof(inputfile_)) {
+ streamtype_ == EtiStreamType::Raw && //implies frameSize == 6144
+ read_bytes == 0 && feof(inputfile_.get())) {
// in case of an EOF from a RAW that we loop, rewind
// otherwise, we won't tolerate it
if (Rewind() == 0) {
- read_bytes = fread(buffer, 1, frameSize, inputfile_);
+ read_bytes = fread(buffer, 1, frameSize, inputfile_.get());
}
else {
PDEBUG("Impossible to rewind file!\n");
diff --git a/src/InputReader.h b/src/InputReader.h
index 7d6b373..07326cf 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -33,6 +33,7 @@
#include <cstdio>
#include <vector>
+#include <atomic>
#include <memory>
#if defined(HAVE_ZEROMQ)
# include "zmq.hpp"
@@ -40,47 +41,8 @@
#endif
#include "porting.h"
#include "Log.h"
-#include "lib/UdpSocket.h"
-#include <sys/socket.h>
-#include <netinet/in.h>
#include <unistd.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-#define SOCKET int
#define INVALID_SOCKET -1
-#define SOCKET_ERROR -1
-
-/* Known types of input streams. Description taken from the CRC mmbTools forum.
-
- All numbers are little-endian.
-
- Framed format is used for file recording. It is the default format. The
- padding can be removed from data. Format:
- uint32_t nbFrames
- for each frame
- uint16_t frameSize
- uint8_t data[frameSize]
-
- Streamed format is used for streamed applications. As the total number of
- frames is unknown before end of transmission, the corresponding field is
- removed. The padding can be removed from data. Format:
- for each frame
- uint16_t frameSize
- uint8_t data[frameSize]
-
- Raw format is a bit-by-bit (but byte aligned on sync) recording of a G.703
- data stream. The padding is always present. Format:
- for each frame
- uint8_t data[6144]
-
- Please note that our raw format can also be referred to as ETI(NI, G.703) or ETI(NI).
-*/
-enum EtiStreamType {
- ETI_STREAM_TYPE_NONE = 0,
- ETI_STREAM_TYPE_RAW,
- ETI_STREAM_TYPE_STREAMED,
- ETI_STREAM_TYPE_FRAMED,
-};
class InputReader
{
@@ -91,43 +53,25 @@ class InputReader
virtual int GetNextFrame(void* buffer) = 0;
// Print some information
- virtual void PrintInfo() = 0;
+ virtual void PrintInfo() const = 0;
};
class InputFileReader : public InputReader
{
public:
- InputFileReader() :
- streamtype_(ETI_STREAM_TYPE_NONE),
- inputfile_(NULL) { }
-
- ~InputFileReader()
- {
- if (inputfile_ != NULL) {
- fprintf(stderr, "\nClosing input file...\n");
-
- fclose(inputfile_);
- }
- }
+ InputFileReader() = default;
+ InputFileReader(const InputFileReader& other) = delete;
+ InputFileReader& operator=(const InputFileReader& other) = delete;
// open file and determine stream type
// When loop=1, GetNextFrame will never return 0
int Open(std::string filename, bool loop);
// Print information about the file opened
- void PrintInfo();
-
+ void PrintInfo() const;
int GetNextFrame(void* buffer);
- EtiStreamType GetStreamType()
- {
- return streamtype_;
- }
-
private:
- InputFileReader(const InputFileReader& other) = delete;
- InputFileReader& operator=(const InputFileReader& other) = delete;
-
int IdentifyType();
// Rewind the file, and replay anew
@@ -136,19 +80,60 @@ class InputFileReader : public InputReader
bool loop_; // if shall we loop the file over and over
std::string filename_;
- EtiStreamType streamtype_;
- FILE* inputfile_;
- size_t inputfilelength_;
- uint64_t nbframes_; // 64-bit because 32-bit overflow is
- // after 2**32 * 24ms ~= 3.3 years
+ /* Known types of input streams. Description taken from the CRC
+ * mmbTools forum. All values are are little-endian. */
+ enum class EtiStreamType {
+ /* Not yet identified */
+ None,
+
+ /* Raw format is a bit-by-bit (but byte aligned on sync) recording
+ * of a G.703 data stream. The padding is always present.
+ * The raw format can also be referred to as ETI(NI, G.703) or ETI(NI).
+ * Format:
+ for each frame:
+ uint8_t data[6144]
+ */
+ Raw,
+
+ /* Streamed format is used for streamed applications. As the total
+ * number of frames is unknown before end of transmission, the
+ * corresponding field is removed. The padding can be removed from
+ * data.
+ * Format:
+ for each frame:
+ uint16_t frameSize
+ uint8_t data[frameSize]
+ */
+ Streamed,
+
+ /* Framed format is used for file recording. It is the default format.
+ * The padding can be removed from data.
+ * Format:
+ uint32_t nbFrames
+ for each frame:
+ uint16_t frameSize
+ uint8_t data[frameSize]
+ */
+ Framed,
+ };
+
+ EtiStreamType streamtype_ = EtiStreamType::None;
+ struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}};
+ std::unique_ptr<FILE, FILEDeleter> inputfile_;
+
+ size_t inputfilelength_ = 0;
+ uint64_t nbframes_ = 0; // 64-bit because 32-bit overflow is
+ // after 2**32 * 24ms ~= 3.3 years
};
class InputTcpReader : public InputReader
{
public:
InputTcpReader();
- ~InputTcpReader();
+ InputTcpReader(const InputTcpReader& other) = delete;
+ InputTcpReader& operator=(const InputTcpReader& other) = delete;
+ virtual ~InputTcpReader();
// Endpoint is either host:port or tcp://host:port
void Open(const std::string& endpoint);
@@ -159,89 +144,54 @@ class InputTcpReader : public InputReader
virtual int GetNextFrame(void* buffer);
// Print some information
- virtual void PrintInfo();
+ virtual void PrintInfo() const;
private:
- InputTcpReader(const InputTcpReader& other) = delete;
- InputTcpReader& operator=(const InputTcpReader& other) = delete;
- SOCKET m_sock;
+ int m_sock = INVALID_SOCKET;
std::string m_uri;
};
struct zmq_input_overflow : public std::exception
{
- const char* what () const throw ()
- {
- return "InputZMQ buffer overflow";
- }
+ const char* what () const throw ()
+ {
+ return "InputZMQ buffer overflow";
+ }
};
#if defined(HAVE_ZEROMQ)
/* A ZeroMQ input. See www.zeromq.org for more info */
-struct InputZeroMQThreadData
-{
- ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > *in_messages;
- std::string uri;
- size_t max_queued_frames;
-};
-
-class InputZeroMQWorker
+class InputZeroMQReader : public InputReader
{
public:
- InputZeroMQWorker() :
- running(false),
- zmqcontext(1),
- m_to_drop(0) { }
+ InputZeroMQReader() = default;
+ InputZeroMQReader(const InputZeroMQReader& other) = delete;
+ InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete;
+ ~InputZeroMQReader();
- void Start(struct InputZeroMQThreadData* workerdata);
- void Stop();
+ int Open(const std::string& uri, size_t max_queued_frames);
+ int GetNextFrame(void* buffer);
+ void PrintInfo() const;
- bool is_running(void) { return running; }
private:
- bool running;
+ std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
+ std::string m_uri;
+ size_t m_max_queued_frames = 0;
+ ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > m_in_messages;
- void RecvProcess(struct InputZeroMQThreadData* workerdata);
+ void RecvProcess(void);
- zmq::context_t zmqcontext; // is thread-safe
- boost::thread recv_thread;
+ zmq::context_t m_zmqcontext; // is thread-safe
+ boost::thread m_recv_thread;
/* We must be careful to keep frame phase consistent. If we
* drop a single ETI frame, we will break the transmission
* frame vs. ETI frame phase.
*
- * Here we keep track of how many ETI frames we must drop
+ * Here we keep track of how many ETI frames we must drop.
*/
- int m_to_drop;
-};
-
-class InputZeroMQReader : public InputReader
-{
- public:
- InputZeroMQReader()
- {
- workerdata_.in_messages = &in_messages_;
- }
-
- ~InputZeroMQReader()
- {
- worker_.Stop();
- }
-
- int Open(const std::string& uri, size_t max_queued_frames);
-
- int GetNextFrame(void* buffer);
-
- void PrintInfo();
-
- private:
- InputZeroMQReader(const InputZeroMQReader& other) = delete;
- InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete;
- std::string uri_;
-
- InputZeroMQWorker worker_;
- ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > in_messages_;
- struct InputZeroMQThreadData workerdata_;
+ int m_to_drop = 0;
};
#endif
diff --git a/src/InputTcpReader.cpp b/src/InputTcpReader.cpp
index 94ec0ad..9a93ad1 100644
--- a/src/InputTcpReader.cpp
+++ b/src/InputTcpReader.cpp
@@ -32,9 +32,12 @@
#include "InputReader.h"
#include "PcDebug.h"
#include "Utils.h"
-#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <arpa/inet.h>
InputTcpReader::InputTcpReader()
{
@@ -126,7 +129,7 @@ int InputTcpReader::GetNextFrame(void* buffer)
return r;
}
-void InputTcpReader::PrintInfo()
+void InputTcpReader::PrintInfo() const
{
fprintf(stderr, "Input TCP:\n");
fprintf(stderr, " Receiving from %s\n\n", m_uri.c_str());
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index 783f0f5..f6a816a 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -68,29 +68,35 @@ struct zmq_dab_message_t
#define ZMQ_DAB_MESSAGE_T_HEADERSIZE \
(sizeof(uint32_t) + NUM_FRAMES_PER_ZMQ_MESSAGE*sizeof(uint16_t))
+InputZeroMQReader::~InputZeroMQReader()
+{
+ m_running = false;
+ m_zmqcontext.close();
+ if (m_recv_thread.joinable()) {
+ m_recv_thread.join();
+ }
+}
+
int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames)
{
// The URL might start with zmq+tcp://
if (uri.substr(0, 4) == "zmq+") {
- uri_ = uri.substr(4);
+ m_uri = uri.substr(4);
}
else {
- uri_ = uri;
+ m_uri = uri;
}
- workerdata_.uri = uri_;
- workerdata_.max_queued_frames = max_queued_frames;
- // launch receiver thread
- worker_.Start(&workerdata_);
+ m_max_queued_frames = max_queued_frames;
+
+ m_recv_thread = boost::thread(&InputZeroMQReader::RecvProcess, this);
return 0;
}
int InputZeroMQReader::GetNextFrame(void* buffer)
{
- const size_t framesize = 6144;
-
- if (not worker_.is_running()) {
+ if (not m_running) {
return 0;
}
@@ -100,77 +106,100 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
* (4 ETI frames in TM1) and we should make sure that
* we can serve the data required for a full transmission frame.
*/
- if (in_messages_.size() < 4) {
+ if (m_in_messages.size() < 4) {
const size_t prebuffering = 10;
etiLog.log(trace, "ZMQ,wait1");
- in_messages_.wait_and_pop(incoming, prebuffering);
+ m_in_messages.wait_and_pop(incoming, prebuffering);
}
else {
etiLog.log(trace, "ZMQ,wait2");
- in_messages_.wait_and_pop(incoming);
+ m_in_messages.wait_and_pop(incoming);
}
etiLog.log(trace, "ZMQ,pop");
- if (not worker_.is_running()) {
+ if (not m_running) {
throw zmq_input_overflow();
}
- memcpy(buffer, &incoming->front(), framesize);
+
+ const size_t framesize = 6144;
+ if (incoming->empty()) {
+ return 0;
+ }
+ else if (incoming->size() == framesize) {
+ memcpy(buffer, &incoming->front(), framesize);
+ }
+ else {
+ throw logic_error("ZMQ ETI not 6144");
+ }
return framesize;
}
-void InputZeroMQReader::PrintInfo()
+void InputZeroMQReader::PrintInfo() const
{
fprintf(stderr, "Input ZeroMQ:\n");
- fprintf(stderr, " Receiving from %s\n\n", uri_.c_str());
+ fprintf(stderr, " Receiving from %s\n\n", m_uri.c_str());
}
-// ------------- Worker functions
-
-void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
+void InputZeroMQReader::RecvProcess()
{
set_thread_name("zmqinput");
- size_t queue_size = 0;
+ m_running = true;
+ size_t queue_size = 0;
bool buffer_full = false;
- zmq::socket_t subscriber(zmqcontext, ZMQ_SUB);
+ zmq::socket_t subscriber(m_zmqcontext, ZMQ_SUB);
// zmq sockets are not thread safe. That's why
// we create it here, and not at object creation.
bool success = true;
try {
- subscriber.connect(workerdata->uri.c_str());
+ subscriber.connect(m_uri.c_str());
}
catch (zmq::error_t& err) {
- etiLog.level(error) << "Failed to connect ZeroMQ socket to '" << workerdata->uri << "': '" << err.what() << "'";
+ etiLog.level(error) << "Failed to connect ZeroMQ socket to '" <<
+ m_uri << "': '" << err.what() << "'";
success = false;
}
if (success) try {
- subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
+ // subscribe to all messages
+ subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0);
}
catch (zmq::error_t& err) {
- etiLog.level(error) << "Failed to subscribe ZeroMQ socket to messages: '" << err.what() << "'";
+ etiLog.level(error) << "Failed to subscribe ZeroMQ socket to messages: '" <<
+ err.what() << "'";
success = false;
}
if (success) try {
- while (running)
- {
+ while (m_running) {
zmq::message_t incoming;
+ zmq::pollitem_t items[1];
+ items[0].socket = subscriber;
+ items[0].events = ZMQ_POLLIN;
+ const int zmq_timeout_ms = 100;
+ const int num_events = zmq::poll(items, 1, zmq_timeout_ms);
+ if (num_events == 0) {
+ // timeout is signalled by an empty buffer
+ auto buf = make_shared<vector<uint8_t> >();
+ m_in_messages.push(buf);
+ continue;
+ }
+
subscriber.recv(&incoming);
if (m_to_drop) {
- queue_size = workerdata->in_messages->size();
+ queue_size = m_in_messages.size();
if (queue_size > 4) {
- workerdata->in_messages->notify();
+ m_in_messages.notify();
}
m_to_drop--;
}
- else if (queue_size < workerdata->max_queued_frames) {
+ else if (queue_size < m_max_queued_frames) {
if (buffer_full) {
etiLog.level(info) << "ZeroMQ buffer recovered: " <<
queue_size << " elements";
@@ -214,14 +243,14 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
offset += framesize;
- queue_size = workerdata->in_messages->push(buf);
+ queue_size = m_in_messages.push(buf);
etiLog.log(trace, "ZMQ,push %zu", queue_size);
}
}
}
}
else {
- workerdata->in_messages->notify();
+ m_in_messages.notify();
if (!buffer_full) {
etiLog.level(warn) << "ZeroMQ buffer overfull !";
@@ -230,7 +259,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
throw runtime_error("ZMQ input full");
}
- queue_size = workerdata->in_messages->size();
+ queue_size = m_in_messages.size();
/* Drop three more incoming ETI frames before
* we start accepting them again, to guarantee
@@ -256,21 +285,8 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
subscriber.close();
- running = false;
- workerdata->in_messages->notify();
-}
-
-void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata)
-{
- running = true;
- recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata);
-}
-
-void InputZeroMQWorker::Stop()
-{
- running = false;
- zmqcontext.close();
- recv_thread.join();
+ m_running = false;
+ m_in_messages.notify();
}
#endif
diff --git a/src/Log.cpp b/src/Log.cpp
index 0792fcf..f2219eb 100644
--- a/src/Log.cpp
+++ b/src/Log.cpp
@@ -109,6 +109,17 @@ LogLine Logger::level(log_level_t level)
return LogLine(this, level);
}
+LogToFile::LogToFile(const std::string& filename) : name("FILE")
+{
+ FILE* fd = fopen(filename.c_str(), "a");
+ if (fd == nullptr) {
+ fprintf(stderr, "Cannot open log file !");
+ throw std::runtime_error("Cannot open log file !");
+ }
+
+ log_file.reset(fd);
+}
+
void LogToFile::log(log_level_t level, const std::string& message)
{
if (level != log_level_t::trace) {
@@ -116,9 +127,9 @@ void LogToFile::log(log_level_t level, const std::string& message)
"DEBUG", "INFO", "WARN", "ERROR", "ALERT", "EMERG"};
// fprintf is thread-safe
- fprintf(log_file, SYSLOG_IDENT ": %s: %s\n",
+ fprintf(log_file.get(), SYSLOG_IDENT ": %s: %s\n",
log_level_text[(size_t)level], message.c_str());
- fflush(log_file);
+ fflush(log_file.get());
}
}
@@ -142,31 +153,33 @@ void LogToSyslog::log(log_level_t level, const std::string& message)
}
}
-LogTracer::LogTracer(const string& trace_filename)
+LogTracer::LogTracer(const string& trace_filename) : name("TRACE")
{
- name = "TRACE";
etiLog.level(info) << "Setting up TRACE to " << trace_filename;
- m_trace_file = fopen(trace_filename.c_str(), "a");
- if (m_trace_file == NULL) {
+ FILE* fd = fopen(trace_filename.c_str(), "a");
+ if (fd == nullptr) {
fprintf(stderr, "Cannot open trace file !");
throw std::runtime_error("Cannot open trace file !");
}
+ m_trace_file.reset(fd);
- auto now = chrono::steady_clock::now().time_since_epoch();
- m_trace_micros_startup =
- chrono::duration_cast<chrono::microseconds>(now).count();
+ using namespace std::chrono;
+ auto now = steady_clock::now().time_since_epoch();
+ m_trace_micros_startup = duration_cast<microseconds>(now).count();
- fprintf(m_trace_file, "0,TRACER,startup at %ld\n", m_trace_micros_startup);
+ fprintf(m_trace_file.get(),
+ "0,TRACER,startup at %ld\n", m_trace_micros_startup);
}
void LogTracer::log(log_level_t level, const std::string& message)
{
if (level == log_level_t::trace) {
- const auto now = chrono::steady_clock::now().time_since_epoch();
- const auto micros = chrono::duration_cast<chrono::microseconds>(now).count();
+ using namespace std::chrono;
+ const auto now = steady_clock::now().time_since_epoch();
+ const auto micros = duration_cast<microseconds>(now).count();
- fprintf(m_trace_file, "%ld,%s\n",
+ fprintf(m_trace_file.get(), "%ld,%s\n",
micros - m_trace_micros_startup,
message.c_str());
}
diff --git a/src/Log.h b/src/Log.h
index ae252a6..0e09bc9 100644
--- a/src/Log.h
+++ b/src/Log.h
@@ -57,7 +57,7 @@ static const std::string levels_as_str[] =
class LogBackend {
public:
virtual void log(log_level_t level, const std::string& message) = 0;
- virtual std::string get_name() = 0;
+ virtual std::string get_name() const = 0;
};
/** A Logging backend for Syslog */
@@ -73,7 +73,7 @@ class LogToSyslog : public LogBackend {
void log(log_level_t level, const std::string& message);
- std::string get_name() { return name; }
+ std::string get_name() const { return name; }
private:
const std::string name;
@@ -84,27 +84,15 @@ class LogToSyslog : public LogBackend {
class LogToFile : public LogBackend {
public:
- LogToFile(const std::string& filename) : name("FILE") {
- log_file = fopen(filename.c_str(), "a");
- if (log_file == NULL) {
- fprintf(stderr, "Cannot open log file !");
- throw std::runtime_error("Cannot open log file !");
- }
- }
-
- ~LogToFile() {
- if (log_file != NULL) {
- fclose(log_file);
- }
- }
-
+ LogToFile(const std::string& filename);
void log(log_level_t level, const std::string& message);
-
- std::string get_name() { return name; }
+ std::string get_name() const { return name; }
private:
const std::string name;
- FILE* log_file;
+
+ struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}};
+ std::unique_ptr<FILE, FILEDeleter> log_file;
LogToFile(const LogToFile& other) = delete;
const LogToFile& operator=(const LogToFile& other) = delete;
@@ -113,19 +101,14 @@ class LogToFile : public LogBackend {
class LogTracer : public LogBackend {
public:
LogTracer(const std::string& filename);
-
- ~LogTracer() {
- if (m_trace_file != NULL) {
- fclose(m_trace_file);
- }
- }
-
void log(log_level_t level, const std::string& message);
- std::string get_name() { return name; }
+ std::string get_name() const { return name; }
private:
std::string name;
- uint64_t m_trace_micros_startup;
- FILE* m_trace_file;
+ uint64_t m_trace_micros_startup = 0;
+
+ struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}};
+ std::unique_ptr<FILE, FILEDeleter> m_trace_file;
LogTracer(const LogTracer& other) = delete;
const LogTracer& operator=(const LogTracer& other) = delete;
diff --git a/src/MemlessPoly.cpp b/src/MemlessPoly.cpp
index ae097c9..5d1c02d 100644
--- a/src/MemlessPoly.cpp
+++ b/src/MemlessPoly.cpp
@@ -99,6 +99,11 @@ MemlessPoly::MemlessPoly(const std::string& coefs_file, unsigned int num_threads
start_pipeline_thread();
}
+MemlessPoly::~MemlessPoly()
+{
+ stop_pipeline_thread();
+}
+
void MemlessPoly::load_coefficients(const std::string &coefFile)
{
std::ifstream coef_fstream(coefFile.c_str());
diff --git a/src/MemlessPoly.h b/src/MemlessPoly.h
index 612934f..4c67d46 100644
--- a/src/MemlessPoly.h
+++ b/src/MemlessPoly.h
@@ -59,6 +59,9 @@ class MemlessPoly : public PipelinedModCodec, public RemoteControllable
{
public:
MemlessPoly(const std::string& coefs_file, unsigned int num_threads);
+ MemlessPoly(const MemlessPoly& other) = delete;
+ MemlessPoly& operator=(const MemlessPoly& other) = delete;
+ virtual ~MemlessPoly();
virtual const char* name() { return "MemlessPoly"; }
diff --git a/src/ModPlugin.cpp b/src/ModPlugin.cpp
index c39d883..d567a90 100644
--- a/src/ModPlugin.cpp
+++ b/src/ModPlugin.cpp
@@ -72,17 +72,7 @@ int ModOutput::process(
return process(dataIn[0]);
}
-PipelinedModCodec::PipelinedModCodec() :
- ModCodec(),
- m_number_of_runs(0),
- m_input_queue(),
- m_output_queue(),
- m_running(false),
- m_thread()
-{
-}
-
-PipelinedModCodec::~PipelinedModCodec()
+void PipelinedModCodec::stop_pipeline_thread()
{
m_input_queue.push({});
if (m_thread.joinable()) {
@@ -107,7 +97,7 @@ int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut)
m_input_queue.push(inbuffer);
- if (m_number_of_runs > 0) {
+ if (m_ready_to_output_data) {
std::shared_ptr<Buffer> outbuffer;
m_output_queue.wait_and_pop(outbuffer);
@@ -116,13 +106,26 @@ int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut)
else {
dataOut->setLength(dataIn->getLength());
memset(dataOut->getData(), 0, dataOut->getLength());
- m_number_of_runs++;
+ m_ready_to_output_data = true;
}
return dataOut->getLength();
}
+meta_vec_t PipelinedModCodec::process_metadata(const meta_vec_t& metadataIn)
+{
+ m_metadata_fifo.push_back(metadataIn);
+ if (m_metadata_fifo.size() == 2) {
+ auto r = std::move(m_metadata_fifo.front());
+ m_metadata_fifo.pop_front();
+ return r;
+ }
+ else {
+ return {};
+ }
+}
+
void PipelinedModCodec::process_thread()
{
set_thread_name(name());
diff --git a/src/ModPlugin.h b/src/ModPlugin.h
index d3aa780..e9cfa21 100644
--- a/src/ModPlugin.h
+++ b/src/ModPlugin.h
@@ -30,16 +30,34 @@
# include <config.h>
#endif
-
#include "Buffer.h"
#include "ThreadsafeQueue.h"
-
-#include <sys/types.h>
+#include <cstddef>
#include <vector>
#include <memory>
#include <thread>
#include <atomic>
+// All flowgraph elements derive from ModPlugin, or a variant of it.
+// Some ModPlugins also support handling metadata.
+
+struct frame_timestamp;
+struct flowgraph_metadata {
+ std::shared_ptr<struct frame_timestamp> ts;
+};
+
+using meta_vec_t = std::vector<flowgraph_metadata>;
+
+/* ModPlugins that support metadata derive from ModMetadata */
+class ModMetadata {
+ public:
+ // Receives metadata from all inputs, and process them, and output
+ // a sequence of metadata.
+ virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) = 0;
+};
+
+
+/* Abstract base class for all flowgraph elements */
class ModPlugin
{
public:
@@ -47,6 +65,7 @@ public:
std::vector<Buffer*> dataIn,
std::vector<Buffer*> dataOut) = 0;
virtual const char* name() = 0;
+ virtual ~ModPlugin() = default;
};
/* Inputs are sources, the output buffers without reading any */
@@ -69,32 +88,38 @@ public:
virtual int process(Buffer* const dataIn, Buffer* dataOut) = 0;
};
-class PipelinedModCodec : public ModCodec
+/* Pipelined ModCodecs run their processing in a separate thread, and
+ * have a one-call-to-process() latency. Because of this latency, they
+ * must also handle the metadata
+ */
+class PipelinedModCodec : public ModCodec, public ModMetadata
{
public:
- PipelinedModCodec();
- PipelinedModCodec(const PipelinedModCodec&) = delete;
- PipelinedModCodec& operator=(const PipelinedModCodec&) = delete;
- PipelinedModCodec(PipelinedModCodec&&) = delete;
- PipelinedModCodec& operator=(PipelinedModCodec&&) = delete;
- ~PipelinedModCodec();
-
virtual int process(Buffer* const dataIn, Buffer* dataOut) final;
virtual const char* name() = 0;
+ virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) final;
+
protected:
// Once the instance implementing PipelinedModCodec has been constructed,
// it must call start_pipeline_thread()
void start_pipeline_thread(void);
+ // To avoid race conditions on teardown, plugins must call
+ // stop_pipeline_thread in their destructor.
+ void stop_pipeline_thread(void);
+
+ // The real processing must be implemented in internal_process
virtual int internal_process(Buffer* const dataIn, Buffer* dataOut) = 0;
private:
- size_t m_number_of_runs;
+ bool m_ready_to_output_data = false;
ThreadsafeQueue<std::shared_ptr<Buffer> > m_input_queue;
ThreadsafeQueue<std::shared_ptr<Buffer> > m_output_queue;
- std::atomic<bool> m_running;
+ std::deque<meta_vec_t> m_metadata_fifo;
+
+ std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
std::thread m_thread;
void process_thread(void);
};
@@ -119,3 +144,4 @@ public:
std::vector<Buffer*> dataOut);
virtual int process(Buffer* dataIn) = 0;
};
+
diff --git a/src/OfdmGenerator.cpp b/src/OfdmGenerator.cpp
index b00d66b..57e0e0e 100644
--- a/src/OfdmGenerator.cpp
+++ b/src/OfdmGenerator.cpp
@@ -139,6 +139,14 @@ OfdmGenerator::~OfdmGenerator()
fftwf_destroy_plan(myFftPlan);
}
+ if (myCfrPostClip) {
+ fftwf_free(myCfrPostClip);
+ }
+
+ if (myCfrPostFft) {
+ fftwf_free(myCfrPostFft);
+ }
+
if (myCfrFft) {
fftwf_destroy_plan(myCfrFft);
}
diff --git a/src/OutputFile.cpp b/src/OutputFile.cpp
index 23d5523..46a9ec9 100644
--- a/src/OutputFile.cpp
+++ b/src/OutputFile.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -26,47 +26,78 @@
#include "OutputFile.h"
#include "PcDebug.h"
+#include "Log.h"
+#include "TimestampDecoder.h"
#include <string>
#include <assert.h>
#include <stdexcept>
-OutputFile::OutputFile(std::string filename) :
- ModOutput(),
+using namespace std;
+
+OutputFile::OutputFile(const std::string& filename, bool show_metadata) :
+ ModOutput(), ModMetadata(),
+ myShowMetadata(show_metadata),
myFilename(filename)
{
PDEBUG("OutputFile::OutputFile(filename: %s) @ %p\n",
filename.c_str(), this);
- myFile = fopen(filename.c_str(), "w");
- if (myFile == NULL) {
+ FILE* fd = fopen(filename.c_str(), "w");
+ if (fd == nullptr) {
perror(filename.c_str());
throw std::runtime_error(
"OutputFile::OutputFile() unable to open file!");
}
-}
-
-
-OutputFile::~OutputFile()
-{
- PDEBUG("OutputFile::~OutputFile() @ %p\n", this);
-
- if (myFile != NULL) {
- fclose(myFile);
- }
+ myFile.reset(fd);
}
int OutputFile::process(Buffer* dataIn)
{
PDEBUG("OutputFile::process(%p)\n", dataIn);
- assert(dataIn != NULL);
+ assert(dataIn != nullptr);
- if (fwrite(dataIn->getData(), dataIn->getLength(), 1, myFile) == 0) {
+ if (fwrite(dataIn->getData(), dataIn->getLength(), 1, myFile.get()) == 0) {
throw std::runtime_error(
"OutputFile::process() unable to write to file!");
}
return dataIn->getLength();
}
+
+meta_vec_t OutputFile::process_metadata(const meta_vec_t& metadataIn)
+{
+ if (myShowMetadata) {
+ stringstream ss;
+
+ for (const auto& md : metadataIn) {
+ if (md.ts) {
+ ss << " FCT=" << md.ts->fct <<
+ " FP=" << (int)md.ts->fp;
+ if (md.ts->timestamp_valid) {
+ ss << " TS=" << md.ts->timestamp_sec << " + " <<
+ std::fixed
+ << (double)md.ts->timestamp_pps / 163840000.0 << ";";
+ }
+ else {
+ ss << " TS invalid;";
+ }
+ }
+ else {
+ ss << " void, ";
+ }
+ }
+
+ if (metadataIn.empty()) {
+ etiLog.level(debug) << "Output File got no mdIn";
+ }
+ else {
+ etiLog.level(debug) << "Output File got metadata: " << ss.str();
+ }
+
+ }
+ return {};
+}
+
diff --git a/src/OutputFile.h b/src/OutputFile.h
index 7121ef3..745e672 100644
--- a/src/OutputFile.h
+++ b/src/OutputFile.h
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -32,23 +32,29 @@
#include "ModPlugin.h"
+#include "EtiReader.h"
#include <string>
#include <stdio.h>
#include <sys/types.h>
+#include <memory>
-
-class OutputFile : public ModOutput
+class OutputFile : public ModOutput, public ModMetadata
{
public:
- OutputFile(std::string filename);
- virtual ~OutputFile();
+ OutputFile(const std::string& filename, bool show_metadata);
+
+ virtual int process(Buffer* dataIn) override;
+ const char* name() override { return "OutputFile"; }
- virtual int process(Buffer* dataIn);
- const char* name() { return "OutputFile"; }
+ virtual meta_vec_t process_metadata(
+ const meta_vec_t& metadataIn) override;
protected:
+ bool myShowMetadata = false;
std::string myFilename;
- FILE* myFile;
+
+ struct FILEDeleter{ void operator()(FILE* fd){ if (fd) fclose(fd); }};
+ std::unique_ptr<FILE, FILEDeleter> myFile;
};
diff --git a/src/OutputMemory.cpp b/src/OutputMemory.cpp
index 6e2fd49..5f24095 100644
--- a/src/OutputMemory.cpp
+++ b/src/OutputMemory.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -26,6 +26,7 @@
#include "OutputMemory.h"
#include "PcDebug.h"
+#include "Log.h"
#include <stdexcept>
#include <string.h>
@@ -94,3 +95,14 @@ int OutputMemory::process(Buffer* dataIn)
return myDataOut->getLength();
}
+meta_vec_t OutputMemory::process_metadata(const meta_vec_t& metadataIn)
+{
+ myMetadata = metadataIn;
+ return {};
+}
+
+meta_vec_t OutputMemory::get_latest_metadata()
+{
+ return myMetadata;
+}
+
diff --git a/src/OutputMemory.h b/src/OutputMemory.h
index 715cb2d..f0a5fbb 100644
--- a/src/OutputMemory.h
+++ b/src/OutputMemory.h
@@ -2,7 +2,7 @@
Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -46,18 +46,26 @@
#include "ModPlugin.h"
-class OutputMemory : public ModOutput
+class OutputMemory : public ModOutput, public ModMetadata
{
public:
OutputMemory(Buffer* dataOut);
virtual ~OutputMemory();
- virtual int process(Buffer* dataIn);
- const char* name() { return "OutputMemory"; }
+ OutputMemory(OutputMemory& other) = delete;
+ OutputMemory& operator=(OutputMemory& other) = delete;
+
+ virtual int process(Buffer* dataIn) override;
+ const char* name() override { return "OutputMemory"; }
+ virtual meta_vec_t process_metadata(
+ const meta_vec_t& metadataIn) override;
+
+ meta_vec_t get_latest_metadata(void);
void setOutput(Buffer* dataOut);
protected:
Buffer* myDataOut;
+ meta_vec_t myMetadata;
#if OUTPUT_MEM_HISTOGRAM
// keep track of max value
diff --git a/src/OutputSoapy.cpp b/src/OutputSoapy.cpp
deleted file mode 100644
index 699501c..0000000
--- a/src/OutputSoapy.cpp
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2017
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://opendigitalradio.org
-
-DESCRIPTION:
- It is an output driver using the SoapySDR library that can output to
- many devices.
-*/
-
-/*
- This file is part of ODR-DabMod.
-
- ODR-DabMod is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMod is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "OutputSoapy.h"
-#ifdef HAVE_SOAPYSDR
-
-#include <SoapySDR/Errors.hpp>
-#include <deque>
-#include <chrono>
-
-#include "Log.h"
-#include "Utils.h"
-
-#include <stdio.h>
-
-static const size_t FRAMES_MAX_SIZE = 2;
-
-
-using namespace std;
-
-
-
-OutputSoapy::OutputSoapy(OutputSoapyConfig& config) :
- ModOutput(),
- RemoteControllable("soapy"),
- m_conf(config),
- m_device(nullptr)
-{
- RC_ADD_PARAMETER(txgain, "SoapySDR analog daughterboard TX gain");
- RC_ADD_PARAMETER(freq, "SoapySDR transmission frequency");
- RC_ADD_PARAMETER(overflows, "SoapySDR overflow count [r/o]");
- RC_ADD_PARAMETER(underflows, "SoapySDR underflow count [r/o]");
-
- etiLog.level(info) <<
- "OutputSoapy:Creating the device with: " <<
- config.device;
- try
- {
- m_device = SoapySDR::Device::make(config.device);
- stringstream ss;
- ss << "SoapySDR driver=" << m_device->getDriverKey();
- ss << " hardware=" << m_device->getHardwareKey();
- for (const auto &it : m_device->getHardwareInfo())
- {
- ss << " " << it.first << "=" << it.second;
- }
- }
- catch (const std::exception &ex)
- {
- etiLog.level(error) << "Error making SoapySDR device: " <<
- ex.what();
- throw std::runtime_error("Cannot create SoapySDR output");
- }
-
- m_device->setMasterClockRate(config.masterClockRate);
- etiLog.level(info) << "SoapySDR master clock rate set to " <<
- m_device->getMasterClockRate()/1000.0 << " kHz";
-
- m_device->setSampleRate(SOAPY_SDR_TX, 0, m_conf.sampleRate);
- etiLog.level(info) << "OutputSoapySDR:Actual TX rate: " <<
- m_device->getSampleRate(SOAPY_SDR_TX, 0) / 1000.0 <<
- " ksps.";
-
- m_device->setFrequency(SOAPY_SDR_TX, 0, m_conf.frequency);
- m_conf.frequency = m_device->getFrequency(SOAPY_SDR_TX, 0);
- etiLog.level(info) << "OutputSoapySDR:Actual frequency: " <<
- m_conf.frequency / 1000.0 <<
- " kHz.";
-
- m_device->setGain(SOAPY_SDR_TX, 0, m_conf.txgain);
- etiLog.level(info) << "OutputSoapySDR:Actual tx gain: " <<
- m_device->getGain(SOAPY_SDR_TX, 0);
-
-}
-
-OutputSoapy::~OutputSoapy()
-{
- m_worker.stop();
- if (m_device != nullptr) {
- SoapySDR::Device::unmake(m_device);
- }
-}
-
-void SoapyWorker::stop()
-{
- running = false;
- queue.push({});
- if (m_thread.joinable()) {
- m_thread.join();
- }
-}
-
-void SoapyWorker::start(SoapySDR::Device *device)
-{
- m_device = device;
- underflows = 0;
- overflows = 0;
- running = true;
- m_thread = std::thread(&SoapyWorker::process_start, this);
-}
-
-void SoapyWorker::process_start()
-{
- // Set thread priority to realtime
- if (int ret = set_realtime_prio(1)) {
- etiLog.level(error) << "Could not set priority for SoapySDR worker:" << ret;
- }
-
- set_thread_name("soapyworker");
-
- std::vector<size_t> channels;
- channels.push_back(0);
- auto stream = m_device->setupStream(SOAPY_SDR_TX, "CF32", channels);
- m_device->activateStream(stream);
- process(stream);
- m_device->closeStream(stream);
- running = false;
- etiLog.level(warn) << "SoapySDR worker terminated";
-}
-
-void SoapyWorker::process(SoapySDR::Stream *stream)
-{
- while (running) {
- struct SoapyWorkerFrameData frame;
- queue.wait_and_pop(frame);
-
- // The frame buffer contains bytes representing FC32 samples
- const complexf *buf = reinterpret_cast<complexf*>(frame.buf.data());
- const size_t numSamples = frame.buf.size() / sizeof(complexf);
- if ((frame.buf.size() % sizeof(complexf)) != 0) {
- throw std::runtime_error("OutputSoapy: invalid buffer size");
- }
-
- // Stream MTU is in samples, not bytes.
- const size_t mtu = m_device->getStreamMTU(stream);
-
- size_t num_acc_samps = 0;
- while (running && (num_acc_samps < numSamples)) {
- const void *buffs[1];
- buffs[0] = buf + num_acc_samps;
-
- const size_t samps_to_send = std::min(numSamples - num_acc_samps, mtu);
-
- int flags = 0;
-
- auto ret = m_device->writeStream(stream, buffs, samps_to_send, flags);
-
- if (ret == SOAPY_SDR_TIMEOUT) {
- continue;
- }
- else if (ret == SOAPY_SDR_OVERFLOW) {
- overflows++;
- continue;
- }
- else if (ret == SOAPY_SDR_UNDERFLOW) {
- underflows++;
- continue;
- }
-
- if (ret < 0) {
- etiLog.level(error) << "Unexpected stream error " <<
- SoapySDR::errToStr(ret);
- running = false;
- }
-
- num_acc_samps += ret;
- }
- }
-}
-
-int OutputSoapy::process(Buffer* dataIn)
-{
- if (first_run) {
- m_worker.start(m_device);
- first_run = false;
- }
- else if (!m_worker.running) {
- etiLog.level(error) << "OutputSoapy: worker thread died";
- throw std::runtime_error("Fault in OutputSoapy");
- }
-
- SoapyWorkerFrameData frame;
- m_eti_source->calculateTimestamp(frame.ts);
-
-
- if (frame.ts.fct == -1) {
- etiLog.level(info) <<
- "OutputSoapy: dropping one frame with invalid FCT";
- }
- else {
- const uint8_t* pInData = reinterpret_cast<uint8_t*>(dataIn->getData());
- frame.buf.resize(dataIn->getLength());
- std::copy(pInData, pInData + dataIn->getLength(),
- frame.buf.begin());
- m_worker.queue.push_wait_if_full(frame, FRAMES_MAX_SIZE);
- }
-
- return dataIn->getLength();
-}
-
-
-void OutputSoapy::setETISource(EtiSource *etiSource)
-{
- m_eti_source = etiSource;
-}
-
-void OutputSoapy::set_parameter(const string& parameter, const string& value)
-{
- stringstream ss(value);
- ss.exceptions ( stringstream::failbit | stringstream::badbit );
-
- if (parameter == "txgain") {
- ss >> m_conf.txgain;
- m_device->setGain(SOAPY_SDR_TX, 0, m_conf.txgain);
- }
- else if (parameter == "freq") {
- ss >> m_conf.frequency;
- m_device->setFrequency(SOAPY_SDR_TX, 0, m_conf.frequency);
- m_conf.frequency = m_device->getFrequency(SOAPY_SDR_TX, 0);
- }
- else if (parameter == "underflows") {
- throw ParameterError("Parameter 'underflows' is read-only");
- }
- else if (parameter == "overflows") {
- throw ParameterError("Parameter 'overflows' is read-only");
- }
- else {
- stringstream ss_err;
- ss_err << "Parameter '" << parameter
- << "' is not exported by controllable " << get_rc_name();
- throw ParameterError(ss_err.str());
- }
-}
-
-const string OutputSoapy::get_parameter(const string& parameter) const
-{
- stringstream ss;
- if (parameter == "txgain") {
- ss << m_conf.txgain;
- }
- else if (parameter == "freq") {
- ss << m_conf.frequency;
- }
- else if (parameter == "underflows") {
- ss << m_worker.underflows;
- }
- else if (parameter == "overflows") {
- ss << m_worker.overflows;
- }
- else {
- ss << "Parameter '" << parameter <<
- "' is not exported by controllable " << get_rc_name();
- throw ParameterError(ss.str());
- }
- return ss.str();
-}
-
-#endif // HAVE_SOAPYSDR
-
diff --git a/src/OutputSoapy.h b/src/OutputSoapy.h
deleted file mode 100644
index 230f11b..0000000
--- a/src/OutputSoapy.h
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2017
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://opendigitalradio.org
-
-DESCRIPTION:
- It is an output driver using the SoapySDR library that can output to
- many devices.
-*/
-
-/*
- This file is part of ODR-DabMod.
-
- ODR-DabMod is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMod is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#ifdef HAVE_CONFIG_H
-# include <config.h>
-#endif
-
-#ifdef HAVE_SOAPYSDR
-#include <SoapySDR/Version.hpp>
-#include <SoapySDR/Modules.hpp>
-#include <SoapySDR/Registry.hpp>
-#include <SoapySDR/Device.hpp>
-
-#include <string>
-#include <memory>
-
-#include "ModPlugin.h"
-#include "EtiReader.h"
-#include "RemoteControl.h"
-#include "ThreadsafeQueue.h"
-
-typedef std::complex<float> complexf;
-
-/* This structure is used as initial configuration for the Soapy output.
- * It must also contain all remote-controllable settings, otherwise
- * they will get lost on a modulator restart. */
-struct OutputSoapyConfig {
- std::string device;
-
- long masterClockRate = 32768000;
- unsigned sampleRate = 2048000;
- double frequency = 0.0;
- double txgain = 0.0;
- unsigned dabMode = 0;
-};
-
-// Each frame contains one OFDM frame, and its
-// associated timestamp
-struct SoapyWorkerFrameData {
- // Buffer holding frame data
- std::vector<uint8_t> buf;
-
- // A full timestamp contains a TIST according to standard
- // and time information within MNSC with tx_second.
- struct frame_timestamp ts;
-};
-
-class SoapyWorker
-{
- public:
- ThreadsafeQueue<SoapyWorkerFrameData> queue;
- SoapySDR::Device *m_device;
- std::atomic<bool> running;
- size_t underflows;
- size_t overflows;
-
- SoapyWorker() {}
- SoapyWorker(const SoapyWorker&) = delete;
- SoapyWorker operator=(const SoapyWorker&) = delete;
- ~SoapyWorker() { stop(); }
-
- void start(SoapySDR::Device *device);
- void stop(void);
-
- private:
- std::thread m_thread;
-
- void process_start(void);
- void process(SoapySDR::Stream *stream);
-};
-
-class OutputSoapy: public ModOutput, public RemoteControllable
-{
- public:
- OutputSoapy(OutputSoapyConfig& config);
- OutputSoapy(const OutputSoapy& other) = delete;
- OutputSoapy& operator=(const OutputSoapy& other) = delete;
- ~OutputSoapy();
-
- int process(Buffer* dataIn);
-
- const char* name() { return "OutputSoapy"; }
-
- void setETISource(EtiSource *etiSource);
-
- /*********** REMOTE CONTROL ***************/
-
- /* Base function to set parameters. */
- virtual void set_parameter(const std::string& parameter,
- const std::string& value);
-
- /* Getting a parameter always returns a string. */
- virtual const std::string get_parameter(
- const std::string& parameter) const;
-
-
- protected:
- SoapyWorker m_worker;
- EtiSource *m_eti_source;
- OutputSoapyConfig& m_conf;
-
- SoapySDR::Device *m_device;
-
- bool first_run = true;
-};
-
-
-#endif //HAVE_SOAPYSDR
diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp
deleted file mode 100644
index b533075..0000000
--- a/src/OutputUHD.cpp
+++ /dev/null
@@ -1,1035 +0,0 @@
-/*
- Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2017
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMod.
-
- ODR-DabMod is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMod is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "OutputUHD.h"
-
-#ifdef HAVE_OUTPUT_UHD
-
-#include "PcDebug.h"
-#include "Log.h"
-#include "RemoteControl.h"
-#include "Utils.h"
-
-#include <boost/thread/future.hpp>
-
-#include <uhd/utils/msg.hpp>
-
-#include <cmath>
-#include <iostream>
-#include <assert.h>
-#include <stdexcept>
-#include <stdio.h>
-#include <time.h>
-#include <errno.h>
-#include <unistd.h>
-#include <pthread.h>
-
-using namespace std;
-
-// Maximum number of frames that can wait in frames
-static const size_t FRAMES_MAX_SIZE = 8;
-
-typedef std::complex<float> complexf;
-
-std::string stringtrim(const std::string &s)
-{
- auto wsfront = std::find_if_not(s.begin(), s.end(),
- [](int c){ return std::isspace(c);} );
- return std::string(wsfront,
- std::find_if_not(s.rbegin(),
- std::string::const_reverse_iterator(wsfront),
- [](int c){ return std::isspace(c);} ).base());
-}
-
-void uhd_msg_handler(uhd::msg::type_t type, const std::string &msg)
-{
- if (type == uhd::msg::warning) {
- etiLog.level(warn) << "UHD Warning: " << msg;
- }
- else if (type == uhd::msg::error) {
- etiLog.level(error) << "UHD Error: " << msg;
- }
- else {
- // do not print very short U messages and such
- if (stringtrim(msg).size() != 1) {
- etiLog.level(debug) << "UHD Message: " << msg;
- }
- }
-}
-
-static void tune_usrp_to(
- uhd::usrp::multi_usrp::sptr usrp,
- double lo_offset,
- double frequency)
-{
- if (lo_offset != 0.0) {
- etiLog.level(info) << std::fixed << std::setprecision(3) <<
- "OutputUHD:Setting freq to " << frequency <<
- " with LO offset " << lo_offset << "...";
-
- const auto tr = uhd::tune_request_t(frequency, lo_offset);
- uhd::tune_result_t result = usrp->set_tx_freq(tr);
-
- etiLog.level(debug) << "OutputUHD:" <<
- std::fixed << std::setprecision(0) <<
- " Target RF: " << result.target_rf_freq <<
- " Actual RF: " << result.actual_rf_freq <<
- " Target DSP: " << result.target_dsp_freq <<
- " Actual DSP: " << result.actual_dsp_freq;
- }
- else {
- //set the centre frequency
- etiLog.level(info) << std::fixed << std::setprecision(3) <<
- "OutputUHD:Setting freq to " << frequency << "...";
- usrp->set_tx_freq(frequency);
- }
-
- usrp->set_rx_freq(frequency);
-}
-
-// Check function for GPS TIMELOCK sensor from the ODR LEA-M8F board GPSDO
-bool check_gps_timelock(uhd::usrp::multi_usrp::sptr usrp)
-{
- try {
- std::string sensor_value(
- usrp->get_mboard_sensor("gps_timelock", 0).to_pp_string());
-
- if (sensor_value.find("TIME LOCKED") == std::string::npos) {
- etiLog.level(warn) << "OutputUHD: gps_timelock " << sensor_value;
- return false;
- }
-
- return true;
- }
- catch (uhd::lookup_error &e) {
- etiLog.level(warn) << "OutputUHD: no gps_timelock sensor";
- return false;
- }
-}
-
-// Check function for GPS LOCKED sensor from the Ettus GPSDO
-bool check_gps_locked(uhd::usrp::multi_usrp::sptr usrp)
-{
- try {
- uhd::sensor_value_t sensor_value(
- usrp->get_mboard_sensor("gps_locked", 0));
- if (not sensor_value.to_bool()) {
- etiLog.level(warn) << "OutputUHD: gps_locked " <<
- sensor_value.to_pp_string();
- return false;
- }
-
- return true;
- }
- catch (uhd::lookup_error &e) {
- etiLog.level(warn) << "OutputUHD: no gps_locked sensor";
- return false;
- }
-}
-
-
-OutputUHD::OutputUHD(
- OutputUHDConfig& config) :
- ModOutput(),
- RemoteControllable("uhd"),
- myConf(config),
- // Since we don't know the buffer size, we cannot initialise
- // the buffers at object initialisation.
- myDelayBuf(0),
- running(false)
-{
- myConf.muting = true; // is remote-controllable, and reset by the GPS fix check
- myConf.staticDelayUs = 0; // is remote-controllable
-
- // Variables needed for GPS fix check
- first_gps_fix_check.tv_sec = 0;
- last_gps_fix_check.tv_sec = 0;
- time_last_frame.tv_sec = 0;
-
-
- std::stringstream device;
- device << myConf.device;
-
- if (myConf.masterClockRate != 0) {
- if (device.str() != "") {
- device << ",";
- }
- device << "master_clock_rate=" << myConf.masterClockRate;
- }
-
- if (myConf.usrpType != "") {
- if (device.str() != "") {
- device << ",";
- }
- device << "type=" << myConf.usrpType;
- }
-
- MDEBUG("OutputUHD::OutputUHD(device: %s) @ %p\n",
- device.str().c_str(), this);
-
- /* register the parameters that can be remote controlled */
- RC_ADD_PARAMETER(txgain, "UHD analog daughterboard TX gain");
- RC_ADD_PARAMETER(rxgain, "UHD analog daughterboard RX gain for DPD feedback");
- RC_ADD_PARAMETER(freq, "UHD transmission frequency");
- RC_ADD_PARAMETER(muting, "Mute the output by stopping the transmitter");
- RC_ADD_PARAMETER(staticdelay, "Set static delay (uS) between 0 and 96000");
- RC_ADD_PARAMETER(underruns, "Read-only counter of number of underruns");
- RC_ADD_PARAMETER(latepackets, "Read-only counter of number of late packets");
- RC_ADD_PARAMETER(frames, "Read-only counter of number of frames modulated");
-
- uhd::msg::register_handler(uhd_msg_handler);
-
- uhd::set_thread_priority_safe();
-
- etiLog.log(info, "OutputUHD:Creating the usrp device with: %s...",
- device.str().c_str());
-
- myUsrp = uhd::usrp::multi_usrp::make(device.str());
-
- etiLog.log(info, "OutputUHD:Using device: %s...",
- myUsrp->get_pp_string().c_str());
-
- if (myConf.masterClockRate != 0.0) {
- double master_clk_rate = myUsrp->get_master_clock_rate();
- etiLog.log(debug, "OutputUHD:Checking master clock rate: %f...",
- master_clk_rate);
-
- if (fabs(master_clk_rate - myConf.masterClockRate) >
- (myConf.masterClockRate * 1e-6)) {
- throw std::runtime_error("Cannot set USRP master_clock_rate. Aborted.");
- }
- }
-
- MDEBUG("OutputUHD:Setting REFCLK and PPS input...\n");
-
- if (myConf.refclk_src == "gpsdo-ettus") {
- myUsrp->set_clock_source("gpsdo");
- }
- else {
- myUsrp->set_clock_source(myConf.refclk_src);
- }
- myUsrp->set_time_source(myConf.pps_src);
-
- if (myConf.subDevice != "") {
- myUsrp->set_tx_subdev_spec(uhd::usrp::subdev_spec_t(myConf.subDevice),
- uhd::usrp::multi_usrp::ALL_MBOARDS);
- }
-
- etiLog.level(debug) << "UHD clock source is " << myUsrp->get_clock_source(0);
-
- etiLog.level(debug) << "UHD time source is " << myUsrp->get_time_source(0);
-
- myUsrp->set_tx_rate(myConf.sampleRate);
- etiLog.log(debug, "OutputUHD:Set rate to %d. Actual TX Rate: %f sps...",
- myConf.sampleRate, myUsrp->get_tx_rate());
-
- if (fabs(myUsrp->get_tx_rate() / myConf.sampleRate) >
- myConf.sampleRate * 1e-6) {
- throw std::runtime_error("Cannot set USRP sample rate. Aborted.");
- }
-
- tune_usrp_to(myUsrp, myConf.lo_offset, myConf.frequency);
-
- myConf.frequency = myUsrp->get_tx_freq();
- etiLog.level(info) << std::fixed << std::setprecision(3) <<
- "OutputUHD:Actual TX frequency: " << myConf.frequency;
-
- etiLog.level(info) << std::fixed << std::setprecision(3) <<
- "OutputUHD:Actual RX frequency: " << myUsrp->get_tx_freq();
-
- myUsrp->set_tx_gain(myConf.txgain);
- etiLog.log(debug, "OutputUHD:Actual TX Gain: %f", myUsrp->get_tx_gain());
-
- etiLog.log(debug, "OutputUHD:Mute on missing timestamps: %s",
- myConf.muteNoTimestamps ? "enabled" : "disabled");
-
- // preparing output thread worker data
- sync_and_ts_valid = false;
-
- SetDelayBuffer(myConf.dabMode);
-
- myUsrp->set_rx_rate(myConf.sampleRate);
- etiLog.log(debug, "OutputUHD:Actual RX Rate: %f sps.", myUsrp->get_rx_rate());
-
- myUsrp->set_rx_antenna("RX2");
- etiLog.log(debug, "OutputUHD:Set RX Antenna: %s",
- myUsrp->get_rx_antenna().c_str());
-
- myUsrp->set_rx_gain(myConf.rxgain);
- etiLog.log(debug, "OutputUHD:Actual RX Gain: %f", myUsrp->get_rx_gain());
-
- uhdFeedback = std::make_shared<OutputUHDFeedback>(
- myUsrp, myConf.dpdFeedbackServerPort, myConf.sampleRate);
-
- MDEBUG("OutputUHD:UHD ready.\n");
-}
-
-bool OutputUHD::refclk_loss_needs_check() const
-{
- if (suppress_refclk_loss_check) {
- return false;
- }
- return myConf.refclk_src != "internal";
-}
-
-bool OutputUHD::gpsfix_needs_check() const
-{
- if (myConf.refclk_src == "internal") {
- return false;
- }
- else if (myConf.refclk_src == "gpsdo") {
- return (myConf.maxGPSHoldoverTime != 0);
- }
- else if (myConf.refclk_src == "gpsdo-ettus") {
- return (myConf.maxGPSHoldoverTime != 0);
- }
- else {
- return false;
- }
-}
-
-bool OutputUHD::gpsdo_is_ettus() const
-{
- return (myConf.refclk_src == "gpsdo-ettus");
-}
-
-OutputUHD::~OutputUHD()
-{
- stop_threads();
-}
-
-void OutputUHD::stop_threads()
-{
- running.store(false);
- uhd_thread.interrupt();
- uhd_thread.join();
- async_rx_thread.join();
-}
-
-
-void OutputUHD::setETISource(EtiSource *etiSource)
-{
- myEtiSource = etiSource;
-}
-
-int transmission_frame_duration_ms(unsigned int dabMode)
-{
- switch (dabMode) {
- // could happen when called from constructor and we take the mode from ETI
- case 0: return 0;
-
- case 1: return 96;
- case 2: return 24;
- case 3: return 24;
- case 4: return 48;
- default:
- throw std::runtime_error("OutputUHD: invalid DAB mode");
- }
-}
-
-void OutputUHD::SetDelayBuffer(unsigned int dabMode)
-{
- // find out the duration of the transmission frame (Table 2 in ETSI 300 401)
- myTFDurationMs = transmission_frame_duration_ms(dabMode);
-
- // The buffer size equals the number of samples per transmission frame so
- // we calculate it by multiplying the duration of the transmission frame
- // with the samplerate.
- myDelayBuf.resize(myTFDurationMs * myConf.sampleRate / 1000);
-}
-
-int OutputUHD::process(Buffer* dataIn)
-{
- if (not gps_fix_verified) {
- if (gpsfix_needs_check()) {
- initial_gps_check();
-
- if (num_checks_without_gps_fix == 0) {
- set_usrp_time();
- gps_fix_verified = true;
- myConf.muting = false;
- }
- }
- else {
- set_usrp_time();
- gps_fix_verified = true;
- myConf.muting = false;
- }
- }
- else {
- if (first_run) {
- etiLog.level(debug) << "OutputUHD: UHD initialising...";
-
- // we only set the delay buffer from the dab mode signaled in ETI if the
- // dab mode was not set in contructor
- if (myTFDurationMs == 0) {
- SetDelayBuffer(myEtiSource->getMode());
- }
-
- running.store(true);
- uhd_thread = boost::thread(&OutputUHD::workerthread, this);
- async_rx_thread = boost::thread(
- &OutputUHD::print_async_thread, this);
-
- lastLen = dataIn->getLength();
- first_run = false;
- etiLog.level(debug) << "OutputUHD: UHD initialising complete";
- }
-
- if (lastLen != dataIn->getLength()) {
- // I expect that this never happens.
- etiLog.level(emerg) <<
- "OutputUHD: Fatal error, input length changed from " << lastLen <<
- " to " << dataIn->getLength();
- throw std::runtime_error("Non-constant input length!");
- }
-
- sync_and_ts_valid = myConf.enableSync and
- myEtiSource->sourceContainsTimestamp();
-
- if (gpsfix_needs_check()) {
- try {
- check_gps();
- }
- catch (std::runtime_error& e) {
- running.store(false);
- etiLog.level(error) << e.what();
- }
- }
-
- // Prepare the frame for the worker
- UHDWorkerFrameData frame;
- frame.buf.resize(dataIn->getLength());
-
- // calculate delay and fill buffer
- uint32_t noSampleDelay = (myConf.staticDelayUs * (myConf.sampleRate / 1000)) / 1000;
- uint32_t noByteDelay = noSampleDelay * sizeof(complexf);
-
- const uint8_t* pInData = (uint8_t*)dataIn->getData();
-
- uint8_t *pTmp = &frame.buf[0];
- if (noByteDelay) {
- // copy remain from delaybuf
- memcpy(pTmp, &myDelayBuf[0], noByteDelay);
- // copy new data
- memcpy(&pTmp[noByteDelay], pInData, dataIn->getLength() - noByteDelay);
- // copy remaining data to delay buf
- memcpy(&myDelayBuf[0], &pInData[dataIn->getLength() - noByteDelay], noByteDelay);
- }
- else {
- std::copy(pInData, pInData + dataIn->getLength(),
- frame.buf.begin());
- }
-
- myEtiSource->calculateTimestamp(frame.ts);
-
- if (not running.load()) {
- uhd_thread.interrupt();
- uhd_thread.join();
- async_rx_thread.join();
- first_run = true;
-
- etiLog.level(error) << "OutputUHD UHD worker failed";
- throw std::runtime_error("UHD worker failed");
- }
-
- if (frame.ts.fct == -1) {
- etiLog.level(info) <<
- "OutputUHD: dropping one frame with invalid FCT";
- }
- else {
- try {
- uhdFeedback->set_tx_frame(frame.buf, frame.ts);
- }
- catch (const runtime_error& e) {
- etiLog.level(warn) <<
- "OutputUHD: Feedback server failed, restarting...";
-
- uhdFeedback = std::make_shared<OutputUHDFeedback>(
- myUsrp, myConf.dpdFeedbackServerPort, myConf.sampleRate);
- }
-
- size_t num_frames = m_frames.push_wait_if_full(frame,
- FRAMES_MAX_SIZE);
- etiLog.log(trace, "UHD,push %zu", num_frames);
- }
- }
-
- return dataIn->getLength();
-}
-
-
-void OutputUHD::set_usrp_time()
-{
- if (myConf.enableSync and (myConf.pps_src == "none")) {
- etiLog.level(warn) <<
- "OutputUHD: WARNING:"
- " you are using synchronous transmission without PPS input!";
-
- struct timespec now;
- if (clock_gettime(CLOCK_REALTIME, &now)) {
- perror("OutputUHD:Error: could not get time: ");
- etiLog.level(error) << "OutputUHD: could not get time";
- }
- else {
- myUsrp->set_time_now(uhd::time_spec_t(now.tv_sec));
- etiLog.level(info) << "OutputUHD: Setting USRP time to " <<
- std::fixed <<
- uhd::time_spec_t(now.tv_sec).get_real_secs();
- }
- }
-
- if (myConf.pps_src != "none") {
- /* handling time for synchronisation: wait until the next full
- * second, and set the USRP time at next PPS */
- struct timespec now;
- time_t seconds;
- if (clock_gettime(CLOCK_REALTIME, &now)) {
- etiLog.level(error) << "OutputUHD: could not get time :" <<
- strerror(errno);
- throw std::runtime_error("OutputUHD: could not get time.");
- }
- else {
- seconds = now.tv_sec;
-
- MDEBUG("OutputUHD:sec+1: %ld ; now: %ld ...\n", seconds+1, now.tv_sec);
- while (seconds + 1 > now.tv_sec) {
- usleep(1);
- if (clock_gettime(CLOCK_REALTIME, &now)) {
- etiLog.level(error) << "OutputUHD: could not get time :" <<
- strerror(errno);
- throw std::runtime_error("OutputUHD: could not get time.");
- }
- }
- MDEBUG("OutputUHD:sec+1: %ld ; now: %ld ...\n", seconds+1, now.tv_sec);
- /* We are now shortly after the second change. */
-
- usleep(200000); // 200ms, we want the PPS to be later
- myUsrp->set_time_unknown_pps(uhd::time_spec_t(seconds + 2));
- etiLog.level(info) << "OutputUHD: Setting USRP time next pps to " <<
- std::fixed <<
- uhd::time_spec_t(seconds + 2).get_real_secs();
- }
-
- usleep(1e6);
- etiLog.log(info, "OutputUHD: USRP time %f\n",
- myUsrp->get_time_now().get_real_secs());
- }
-}
-
-void OutputUHD::initial_gps_check()
-{
- if (first_gps_fix_check.tv_sec == 0) {
- etiLog.level(info) << "Waiting for GPS fix";
-
- if (clock_gettime(CLOCK_MONOTONIC, &first_gps_fix_check) != 0) {
- stringstream ss;
- ss << "clock_gettime failure: " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
- }
-
- check_gps();
-
- if (last_gps_fix_check.tv_sec >
- first_gps_fix_check.tv_sec + initial_gps_fix_wait) {
- stringstream ss;
- ss << "GPS did not show time lock in " << initial_gps_fix_wait << " seconds";
- throw std::runtime_error(ss.str());
- }
-
- if (time_last_frame.tv_sec == 0) {
- if (clock_gettime(CLOCK_MONOTONIC, &time_last_frame) != 0) {
- stringstream ss;
- ss << "clock_gettime failure: " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
- }
-
- struct timespec now;
- if (clock_gettime(CLOCK_MONOTONIC, &now) != 0) {
- stringstream ss;
- ss << "clock_gettime failure: " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
-
- long delta_us = timespecdiff_us(time_last_frame, now);
- long wait_time_us = transmission_frame_duration_ms(myConf.dabMode);
-
- if (wait_time_us - delta_us > 0) {
- usleep(wait_time_us - delta_us);
- }
-
- time_last_frame.tv_nsec += wait_time_us * 1000;
- if (time_last_frame.tv_nsec >= 1000000000L) {
- time_last_frame.tv_nsec -= 1000000000L;
- time_last_frame.tv_sec++;
- }
-}
-
-void OutputUHD::check_gps()
-{
- struct timespec time_now;
- if (clock_gettime(CLOCK_MONOTONIC, &time_now) != 0) {
- stringstream ss;
- ss << "clock_gettime failure: " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
-
- // Divide interval by two because we alternate between
- // launch and check
- if (gpsfix_needs_check() and
- last_gps_fix_check.tv_sec + gps_fix_check_interval/2.0 <
- time_now.tv_sec) {
- last_gps_fix_check = time_now;
-
- // Alternate between launching thread and checking the
- // result.
- if (gps_fix_task.joinable()) {
- if (gps_fix_future.has_value()) {
-
- gps_fix_future.wait();
-
- gps_fix_task.join();
-
- if (not gps_fix_future.get()) {
- if (num_checks_without_gps_fix == 0) {
- etiLog.level(alert) <<
- "OutputUHD: GPS Time Lock lost";
- }
- num_checks_without_gps_fix++;
- }
- else {
- if (num_checks_without_gps_fix) {
- etiLog.level(info) <<
- "OutputUHD: GPS Time Lock recovered";
- }
- num_checks_without_gps_fix = 0;
- }
-
- if (gps_fix_check_interval * num_checks_without_gps_fix >
- myConf.maxGPSHoldoverTime) {
- std::stringstream ss;
- ss << "Lost GPS Time Lock for " << gps_fix_check_interval *
- num_checks_without_gps_fix << " seconds";
- throw std::runtime_error(ss.str());
- }
- }
- }
- else {
- // Checking the sensor here takes too much
- // time, it has to be done in a separate thread.
- if (gpsdo_is_ettus()) {
- gps_fix_pt = boost::packaged_task<bool>(
- boost::bind(check_gps_locked, myUsrp) );
- }
- else {
- gps_fix_pt = boost::packaged_task<bool>(
- boost::bind(check_gps_timelock, myUsrp) );
- }
- gps_fix_future = gps_fix_pt.get_future();
-
- gps_fix_task = boost::thread(boost::move(gps_fix_pt));
- }
- }
-}
-
-void OutputUHD::workerthread()
-{
- // Set thread priority to realtime
- if (int ret = set_realtime_prio(1)) {
- etiLog.level(error) << "Could not set priority for UHD worker:" << ret;
- }
-
- set_thread_name("uhdworker");
-
- last_tx_time_initialised = false;
-
- uhd::stream_args_t stream_args("fc32"); //complex floats
- myTxStream = myUsrp->get_tx_stream(stream_args);
-
- md.start_of_burst = false;
- md.end_of_burst = false;
-
- num_underflows = 0;
- num_late_packets = 0;
-
- size_t last_num_underflows = 0;
- size_t pop_prebuffering = FRAMES_MAX_SIZE;
-
- while (running.load()) {
- md.has_time_spec = false;
- md.time_spec = uhd::time_spec_t(0.0);
-
- struct UHDWorkerFrameData frame;
- etiLog.log(trace, "UHD,wait");
- m_frames.wait_and_pop(frame, pop_prebuffering);
- etiLog.log(trace, "UHD,pop");
-
- handle_frame(&frame);
- num_frames_modulated++;
-
- /* Ensure we fill frames after every underrun and
- * at startup to reduce underrun likelihood. */
- if (last_num_underflows < num_underflows) {
- pop_prebuffering = FRAMES_MAX_SIZE;
- }
- else {
- pop_prebuffering = 1;
- }
- last_num_underflows = num_underflows;
- }
- running.store(false);
- etiLog.level(warn) << "UHD worker terminated";
-}
-
-void OutputUHD::handle_frame(const struct UHDWorkerFrameData *frame)
-{
- // Transmit timeout
- static const double tx_timeout = 20.0;
-
- // Check for ref_lock
- if (refclk_loss_needs_check()) {
- try {
- if (not myUsrp->get_mboard_sensor("ref_locked", 0).to_bool()) {
- etiLog.log(alert,
- "OutputUHD: External reference clock lock lost !");
- if (myConf.refclk_lock_loss_behaviour == CRASH) {
- throw std::runtime_error(
- "OutputUHD: External reference clock lock lost.");
- }
- }
- }
- catch (uhd::lookup_error &e) {
- suppress_refclk_loss_check = true;
- etiLog.log(warn, "OutputUHD: This USRP does not have mboard "
- "sensor for ext clock loss. Check disabled.");
- }
- }
-
- double usrp_time = myUsrp->get_time_now().get_real_secs();
- bool timestamp_discontinuity = false;
-
- if (sync_and_ts_valid) {
- // Tx time from MNSC and TIST
- uint32_t tx_second = frame->ts.timestamp_sec;
- uint32_t tx_pps = frame->ts.timestamp_pps;
-
- if (!frame->ts.timestamp_valid) {
- /* We have not received a full timestamp through
- * MNSC. We sleep through the frame.
- */
- etiLog.level(info) <<
- "OutputUHD: Throwing sample " << frame->ts.fct <<
- " away: incomplete timestamp " << tx_second <<
- " / " << tx_pps;
- usleep(20000); //TODO should this be TM-dependant ?
- return;
- }
-
- if (last_tx_time_initialised) {
- const size_t sizeIn = frame->buf.size() / sizeof(complexf);
- uint64_t increment = (uint64_t)sizeIn * 16384000ul /
- (uint64_t)myConf.sampleRate;
- // samps * ticks/s / (samps/s)
- // (samps * ticks * s) / (s * samps)
- // ticks
-
- uint32_t expected_sec = last_tx_second + increment / 16384000ul;
- uint32_t expected_pps = last_tx_pps + increment % 16384000ul;
-
- while (expected_pps >= 16384000) {
- expected_sec++;
- expected_pps -= 16384000;
- }
-
- if (expected_sec != tx_second or
- expected_pps != tx_pps) {
- etiLog.level(warn) << "OutputUHD: timestamp irregularity!" <<
- std::fixed <<
- " Expected " <<
- expected_sec << "+" << (double)expected_pps/16384000.0 <<
- "(" << expected_pps << ")" <<
- " Got " <<
- tx_second << "+" << (double)tx_pps/16384000.0 <<
- "(" << tx_pps << ")";
-
- timestamp_discontinuity = true;
- }
- }
-
- last_tx_second = tx_second;
- last_tx_pps = tx_pps;
- last_tx_time_initialised = true;
-
- double pps_offset = tx_pps / 16384000.0;
-
- md.has_time_spec = true;
- md.time_spec = uhd::time_spec_t(tx_second, pps_offset);
- etiLog.log(trace, "UHD,tist %f", md.time_spec.get_real_secs());
-
- // md is defined, let's do some checks
- if (md.time_spec.get_real_secs() + tx_timeout < usrp_time) {
- etiLog.level(warn) <<
- "OutputUHD: Timestamp in the past! offset: " <<
- std::fixed <<
- md.time_spec.get_real_secs() - usrp_time <<
- " (" << usrp_time << ")"
- " frame " << frame->ts.fct <<
- ", tx_second " << tx_second <<
- ", pps " << pps_offset;
- return;
- }
-
- if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_ABORT_FUTURE) {
- etiLog.level(error) <<
- "OutputUHD: Timestamp way too far in the future! offset: " <<
- std::fixed <<
- md.time_spec.get_real_secs() - usrp_time;
- throw std::runtime_error("Timestamp error. Aborted.");
- }
- }
- else { // !sync_and_ts_valid
- if (myConf.muting or myConf.muteNoTimestamps) {
- /* There was some error decoding the timestamp */
- if (myConf.muting) {
- etiLog.log(info,
- "OutputUHD: Muting sample %d requested\n",
- frame->ts.fct);
- }
- else {
- etiLog.log(info,
- "OutputUHD: Muting sample %d : no timestamp\n",
- frame->ts.fct);
- }
- usleep(20000);
- return;
- }
- }
-
- tx_frame(frame, timestamp_discontinuity);
-}
-
-void OutputUHD::tx_frame(const struct UHDWorkerFrameData *frame, bool ts_update)
-{
- const double tx_timeout = 20.0;
- const size_t sizeIn = frame->buf.size() / sizeof(complexf);
- const complexf* in_data = reinterpret_cast<const complexf*>(&frame->buf[0]);
-
- size_t usrp_max_num_samps = myTxStream->get_max_num_samps();
- size_t num_acc_samps = 0; //number of accumulated samples
- while (running.load() and (not myConf.muting) and (num_acc_samps < sizeIn)) {
- size_t samps_to_send = std::min(sizeIn - num_acc_samps, usrp_max_num_samps);
-
- uhd::tx_metadata_t md_tx = md;
-
- //ensure the the last packet has EOB set if the timestamps has been
- //refreshed and need to be reconsidered.
- md_tx.end_of_burst = (
- sync_and_ts_valid and
- (frame->ts.timestamp_refresh or ts_update) and
- samps_to_send <= usrp_max_num_samps );
-
-
- //send a single packet
- size_t num_tx_samps = myTxStream->send(
- &in_data[num_acc_samps],
- samps_to_send, md_tx, tx_timeout);
- etiLog.log(trace, "UHD,sent %zu of %zu", num_tx_samps, samps_to_send);
-
- num_acc_samps += num_tx_samps;
-
- md_tx.time_spec = md.time_spec +
- uhd::time_spec_t(0, num_tx_samps/myConf.sampleRate);
-
- if (num_tx_samps == 0) {
- etiLog.log(warn,
- "OutputUHD::workerthread() unable to write to device, skipping frame!\n");
- break;
- }
- }
-}
-
-void OutputUHD::print_async_thread()
-{
- while (running.load()) {
- uhd::async_metadata_t async_md;
- if (myUsrp->get_device()->recv_async_msg(async_md, 1)) {
- const char* uhd_async_message = "";
- bool failure = false;
- switch (async_md.event_code) {
- case uhd::async_metadata_t::EVENT_CODE_BURST_ACK:
- break;
- case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW:
- uhd_async_message = "Underflow";
- num_underflows++;
- break;
- case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR:
- uhd_async_message = "Packet loss between host and device.";
- failure = true;
- break;
- case uhd::async_metadata_t::EVENT_CODE_TIME_ERROR:
- uhd_async_message = "Packet had time that was late.";
- num_late_packets++;
- break;
- case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET:
- uhd_async_message = "Underflow occurred inside a packet.";
- failure = true;
- break;
- case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR_IN_BURST:
- uhd_async_message = "Packet loss within a burst.";
- failure = true;
- break;
- default:
- uhd_async_message = "unknown event code";
- failure = true;
- break;
- }
-
- if (failure) {
- etiLog.level(alert) <<
- "Received Async UHD Message '" <<
- uhd_async_message << "' at time " <<
- md.time_spec.get_real_secs();
-
- }
- }
-
- auto time_now = std::chrono::steady_clock::now();
- if (last_print_time + std::chrono::seconds(1) < time_now) {
- const double usrp_time =
- myUsrp->get_time_now().get_real_secs();
-
- if ( (num_underflows > num_underflows_previous) or
- (num_late_packets > num_late_packets_previous)) {
- etiLog.log(info,
- "OutputUHD status (usrp time: %f): "
- "%d underruns and %d late packets since last status.\n",
- usrp_time,
- num_underflows, num_late_packets);
- }
-
- num_underflows_previous = num_underflows;
- num_late_packets_previous = num_late_packets;
-
- last_print_time = time_now;
- }
- }
-}
-
-// =======================================
-// Remote Control for UHD
-// =======================================
-void OutputUHD::set_parameter(const string& parameter, const string& value)
-{
- stringstream ss(value);
- ss.exceptions ( stringstream::failbit | stringstream::badbit );
-
- if (parameter == "txgain") {
- ss >> myConf.txgain;
- myUsrp->set_tx_gain(myConf.txgain);
- }
- else if (parameter == "rxgain") {
- ss >> myConf.rxgain;
- myUsrp->set_rx_gain(myConf.rxgain);
- }
- else if (parameter == "freq") {
- ss >> myConf.frequency;
- tune_usrp_to(myUsrp, myConf.lo_offset, myConf.frequency);
- myConf.frequency = myUsrp->get_tx_freq();
- }
- else if (parameter == "muting") {
- ss >> myConf.muting;
- }
- else if (parameter == "staticdelay") {
- int64_t adjust;
- ss >> adjust;
- if (adjust > (myTFDurationMs * 1000))
- { // reset static delay for values outside range
- myConf.staticDelayUs = 0;
- }
- else
- { // the new adjust value is added to the existing delay and the result
- // is wrapped around at TF duration
- int newStaticDelayUs = myConf.staticDelayUs + adjust;
- if (newStaticDelayUs > (myTFDurationMs * 1000))
- myConf.staticDelayUs = newStaticDelayUs - (myTFDurationMs * 1000);
- else if (newStaticDelayUs < 0)
- myConf.staticDelayUs = newStaticDelayUs + (myTFDurationMs * 1000);
- else
- myConf.staticDelayUs = newStaticDelayUs;
- }
- }
- else if (parameter == "underruns" or
- parameter == "latepackets" or
- parameter == "frames") {
- throw ParameterError("Parameter " + parameter + " is read-only.");
- }
- else {
- stringstream ss_err;
- ss_err << "Parameter '" << parameter
- << "' is not exported by controllable " << get_rc_name();
- throw ParameterError(ss_err.str());
- }
-}
-
-const string OutputUHD::get_parameter(const string& parameter) const
-{
- stringstream ss;
- if (parameter == "txgain") {
- ss << myConf.txgain;
- }
- else if (parameter == "rxgain") {
- ss << myConf.rxgain;
- }
- else if (parameter == "freq") {
- ss << myConf.frequency;
- }
- else if (parameter == "muting") {
- ss << myConf.muting;
- }
- else if (parameter == "staticdelay") {
- ss << myConf.staticDelayUs;
- }
- else if (parameter == "underruns") {
- ss << num_underflows;
- }
- else if (parameter == "latepackets") {
- ss << num_late_packets;
- }
- else if (parameter == "frames") {
- ss << num_frames_modulated;
- }
- else {
- ss << "Parameter '" << parameter <<
- "' is not exported by controllable " << get_rc_name();
- throw ParameterError(ss.str());
- }
- return ss.str();
-}
-
-#endif // HAVE_OUTPUT_UHD
-
diff --git a/src/OutputUHD.h b/src/OutputUHD.h
deleted file mode 100644
index 9213183..0000000
--- a/src/OutputUHD.h
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2017
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://opendigitalradio.org
-
-DESCRIPTION:
- It is an output driver for the USRP family of devices, and uses the UHD
- library. This version is multi-threaded. A separate thread sends the data to
- the device.
-
- Data between the modulator and the UHD thread are exchanged through a
- threadsafe queue.
-*/
-
-/*
- This file is part of ODR-DabMod.
-
- ODR-DabMod is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMod is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#ifdef HAVE_CONFIG_H
-# include <config.h>
-#endif
-
-#ifdef HAVE_OUTPUT_UHD
-
-#include <uhd/utils/thread_priority.hpp>
-#include <uhd/utils/safe_main.hpp>
-#include <uhd/usrp/multi_usrp.hpp>
-#include <boost/thread.hpp>
-#include <deque>
-#include <chrono>
-#include <memory>
-#include <string>
-#include <atomic>
-
-#include "Log.h"
-#include "ModPlugin.h"
-#include "EtiReader.h"
-#include "TimestampDecoder.h"
-#include "RemoteControl.h"
-#include "ThreadsafeQueue.h"
-#include "OutputUHDFeedback.h"
-
-#include <stdio.h>
-#include <sys/types.h>
-
-//#define MDEBUG(fmt, args...) fprintf(LOG, fmt , ## args)
-#define MDEBUG(fmt, args...)
-
-// If the timestamp is further in the future than
-// 100 seconds, abort
-#define TIMESTAMP_ABORT_FUTURE 100
-
-// Add a delay to increase buffers when
-// frames are too far in the future
-#define TIMESTAMP_MARGIN_FUTURE 0.5
-
-typedef std::complex<float> complexf;
-
-// Each frame contains one OFDM frame, and its
-// associated timestamp
-struct UHDWorkerFrameData {
- // Buffer holding frame data
- std::vector<uint8_t> buf;
-
- // A full timestamp contains a TIST according to standard
- // and time information within MNSC with tx_second.
- struct frame_timestamp ts;
-};
-
-enum refclk_lock_loss_behaviour_t { CRASH, IGNORE };
-
-/* This structure is used as initial configuration for OutputUHD.
- * It must also contain all remote-controllable settings, otherwise
- * they will get lost on a modulator restart. */
-struct OutputUHDConfig {
- std::string device;
- std::string usrpType; // e.g. b100, b200, usrp2
-
- // The USRP1 can accept two daughterboards
- std::string subDevice; // e.g. A:0
-
- long masterClockRate = 32768000;
- unsigned sampleRate = 2048000;
- double frequency = 0.0;
- double lo_offset = 0.0;
- double txgain = 0.0;
- double rxgain = 0.0;
- bool enableSync = false;
-
- // When working with timestamps, mute the frames that
- // do not have a timestamp
- bool muteNoTimestamps = false;
- unsigned dabMode = 0;
- unsigned maxGPSHoldoverTime = 0;
-
- /* allowed values : auto, int, sma, mimo */
- std::string refclk_src;
-
- /* allowed values : int, sma, mimo */
- std::string pps_src;
-
- /* allowed values : pos, neg */
- std::string pps_polarity;
-
- /* What to do when the reference clock PLL loses lock */
- refclk_lock_loss_behaviour_t refclk_lock_loss_behaviour;
-
- // muting can only be changed using the remote control
- bool muting = false;
-
- // static delay in microseconds
- int staticDelayUs = 0;
-
- // TCP port on which to serve TX and RX samples for the
- // digital pre distortion learning tool
- uint16_t dpdFeedbackServerPort = 0;
-};
-
-class OutputUHD: public ModOutput, public RemoteControllable {
- public:
- OutputUHD(OutputUHDConfig& config);
- OutputUHD(const OutputUHD& other) = delete;
- OutputUHD operator=(const OutputUHD& other) = delete;
- ~OutputUHD();
-
- int process(Buffer* dataIn);
-
- const char* name() { return "OutputUHD"; }
-
- void setETISource(EtiSource *etiSource);
-
- /*********** REMOTE CONTROL ***************/
-
- /* Base function to set parameters. */
- virtual void set_parameter(const std::string& parameter,
- const std::string& value);
-
- /* Getting a parameter always returns a string. */
- virtual const std::string get_parameter(
- const std::string& parameter) const;
-
- protected:
- EtiSource *myEtiSource = nullptr;
- OutputUHDConfig& myConf;
- uhd::usrp::multi_usrp::sptr myUsrp;
- std::shared_ptr<boost::barrier> mySyncBarrier;
- bool first_run = true;
- bool gps_fix_verified = false;
- std::shared_ptr<OutputUHDFeedback> uhdFeedback;
-
- private:
- // Resize the internal delay buffer according to the dabMode and
- // the sample rate.
- void SetDelayBuffer(unsigned int dabMode);
-
- // data
- // The remote-controllable static delay is in the OutputUHDConfig
- int myTFDurationMs; // TF duration in milliseconds
- std::vector<complexf> myDelayBuf;
- size_t lastLen = 0;
-
- // GPS Fix check variables
- int num_checks_without_gps_fix = 1;
- struct timespec first_gps_fix_check;
- struct timespec last_gps_fix_check;
- struct timespec time_last_frame;
- boost::packaged_task<bool> gps_fix_pt;
- boost::unique_future<bool> gps_fix_future;
- boost::thread gps_fix_task;
-
- // Wait time in seconds to get fix
- static const int initial_gps_fix_wait = 180;
-
- // Interval for checking the GPS at runtime
- static constexpr double gps_fix_check_interval = 10.0; // seconds
-
- // Asynchronous message statistics
- size_t num_underflows = 0;
- size_t num_late_packets = 0;
- size_t num_underflows_previous = 0;
- size_t num_late_packets_previous = 0;
-
- size_t num_frames_modulated = 0;
-
- uhd::tx_metadata_t md;
- bool last_tx_time_initialised = false;
- uint32_t last_tx_second = 0;
- uint32_t last_tx_pps = 0;
-
- // Used to print statistics once a second
- std::chrono::steady_clock::time_point last_print_time;
-
- bool sync_and_ts_valid = false;
-
- ThreadsafeQueue<UHDWorkerFrameData> m_frames;
-
- // Returns true if we want to verify loss of refclk
- bool refclk_loss_needs_check(void) const;
- bool suppress_refclk_loss_check = false;
-
- // Returns true if we want to check for the gps_timelock sensor
- bool gpsfix_needs_check(void) const;
-
- // Return true if the gpsdo is from ettus, false if it is the ODR
- // LEA-M8F board is used
- bool gpsdo_is_ettus(void) const;
-
- std::atomic<bool> running;
- boost::thread uhd_thread;
- boost::thread async_rx_thread;
- void stop_threads(void);
-
- uhd::tx_streamer::sptr myTxStream;
-
- // The worker thread decouples the modulator from UHD
- void workerthread();
- void handle_frame(const struct UHDWorkerFrameData *frame);
- void tx_frame(const struct UHDWorkerFrameData *frame, bool ts_update);
-
- // Poll asynchronous metadata from UHD
- void print_async_thread(void);
-
- void check_gps();
-
- void set_usrp_time();
-
- void initial_gps_check();
-};
-
-#endif // HAVE_OUTPUT_UHD
-
diff --git a/src/OutputZeroMQ.cpp b/src/OutputZeroMQ.cpp
index 93fe3c0..69f4aa1 100644
--- a/src/OutputZeroMQ.cpp
+++ b/src/OutputZeroMQ.cpp
@@ -32,8 +32,8 @@
#if defined(HAVE_ZEROMQ)
-OutputZeroMQ::OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut)
- : ModOutput(),
+OutputZeroMQ::OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut) :
+ ModOutput(),
m_type(type),
m_zmq_context(1),
m_zmq_sock(m_zmq_context, type),
@@ -59,11 +59,6 @@ OutputZeroMQ::OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut)
m_zmq_sock.bind(m_endpoint.c_str());
}
-OutputZeroMQ::~OutputZeroMQ()
-{
- PDEBUG("OutputZeroMQ::~OutputZeroMQ() @ %p\n", this);
-}
-
int OutputZeroMQ::process(Buffer* dataIn)
{
PDEBUG("OutputZeroMQ::process"
diff --git a/src/OutputZeroMQ.h b/src/OutputZeroMQ.h
index 3107225..be2451b 100644
--- a/src/OutputZeroMQ.h
+++ b/src/OutputZeroMQ.h
@@ -38,10 +38,9 @@
class OutputZeroMQ : public ModOutput
{
public:
- OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut = NULL);
- virtual ~OutputZeroMQ();
- virtual int process(Buffer* dataIn);
- const char* name() { return m_name.c_str(); }
+ OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut = nullptr);
+ virtual int process(Buffer* dataIn) override;
+ const char* name() override { return m_name.c_str(); }
protected:
int m_type; // zmq socket type
diff --git a/src/Resampler.h b/src/Resampler.h
index d9d9d89..ed94a8c 100644
--- a/src/Resampler.h
+++ b/src/Resampler.h
@@ -59,7 +59,6 @@ protected:
FFT_PLAN myFftPlan2;
size_t L;
size_t M;
- size_t K;
size_t myFftSizeIn;
size_t myFftSizeOut;
FFT_TYPE* myFftIn;
diff --git a/src/Socket.h b/src/Socket.h
index 39554ca..392e758 100644
--- a/src/Socket.h
+++ b/src/Socket.h
@@ -49,6 +49,14 @@ class TCPSocket {
if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
throw std::runtime_error("Can't create TCP socket");
}
+
+#if defined(HAVE_SO_NOSIGPIPE)
+ int val = 1;
+ if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE,
+ &val, sizeof(val)) < 0) {
+ throw std::runtime_error("Can't set SO_NOSIGPIPE");
+ }
+#endif
}
~TCPSocket() {
@@ -89,11 +97,12 @@ class TCPSocket {
addr.sin_addr.s_addr = htonl(INADDR_ANY);
const int reuse = 1;
- if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) {
+ if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR,
+ &reuse, sizeof(reuse)) < 0) {
throw std::runtime_error("Can't reuse address for TCP socket");
}
- if (bind(m_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
+ if (::bind(m_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
close();
throw std::runtime_error("Can't bind TCP socket");
}
@@ -138,8 +147,16 @@ class TCPSocket {
{
uint8_t *buf = (uint8_t*)buffer;
while (buflen > 0) {
- // Set MSG_NOSIGNAL to avoid that this thread gets a SIGPIPE
- ssize_t sent = send(m_sock, buf, buflen, MSG_NOSIGNAL);
+ /* On Linux, the MSG_NOSIGNAL flag ensures that the process
+ * would not receive a SIGPIPE and die.
+ * Other systems have SO_NOSIGPIPE set on the socket for the
+ * same effect. */
+#if defined(HAVE_MSG_NOSIGNAL)
+ const int flags = MSG_NOSIGNAL;
+#else
+ const int flags = 0;
+#endif
+ ssize_t sent = ::send(m_sock, buf, buflen, flags);
if (sent < 0) {
return -1;
}
diff --git a/src/TII.cpp b/src/TII.cpp
index 89cd6d0..3c5823b 100644
--- a/src/TII.cpp
+++ b/src/TII.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -187,7 +187,7 @@ int TII::process(Buffer* dataIn, Buffer* dataOut)
memset(dataOut->getData(), 0, dataOut->getLength());
if (m_conf.enable and m_insert) {
- boost::mutex::scoped_lock lock(m_enabled_carriers_mutex);
+ std::lock_guard<std::mutex> lock(m_enabled_carriers_mutex);
complexf* in = reinterpret_cast<complexf*>(dataIn->getData());
complexf* out = reinterpret_cast<complexf*>(dataOut->getData());
@@ -231,7 +231,7 @@ void TII::enable_carrier(int k) {
void TII::prepare_pattern() {
int comb = m_conf.comb; // Convert from unsigned to signed
- boost::mutex::scoped_lock lock(m_enabled_carriers_mutex);
+ std::lock_guard<std::mutex> lock(m_enabled_carriers_mutex);
// Clear previous pattern
for (size_t i = 0; i < m_enabled_carriers.size(); i++) {
diff --git a/src/TII.h b/src/TII.h
index b0ffdb3..b86dbbf 100644
--- a/src/TII.h
+++ b/src/TII.h
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -35,8 +35,8 @@
#include "ModPlugin.h"
#include "RemoteControl.h"
-#include <boost/thread.hpp>
-#include <sys/types.h>
+#include <cstddef>
+#include <thread>
#include <complex>
#include <vector>
#include <string>
@@ -118,7 +118,7 @@ class TII : public ModCodec, public RemoteControllable
// m_enabled_carriers is read by modulator thread, and written
// to by RC thread.
- mutable boost::mutex m_enabled_carriers_mutex;
+ mutable std::mutex m_enabled_carriers_mutex;
// m_enabled_carriers is true only for the first carrier in the
// active pair
diff --git a/src/TimestampDecoder.cpp b/src/TimestampDecoder.cpp
index 26deb60..b942c37 100644
--- a/src/TimestampDecoder.cpp
+++ b/src/TimestampDecoder.cpp
@@ -24,93 +24,61 @@
along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
*/
-#include <queue>
#include <iostream>
#include <fstream>
#include <string>
-#include <sys/types.h>
#include "PcDebug.h"
#include "TimestampDecoder.h"
-#include "Eti.h"
#include "Log.h"
+#include "Eti.h"
//#define MDEBUG(fmt, args...) fprintf (LOG, "*****" fmt , ## args)
#define MDEBUG(fmt, args...) PDEBUG(fmt, ## args)
+TimestampDecoder::TimestampDecoder(double& offset_s) :
+ RemoteControllable("tist"),
+ timestamp_offset(offset_s)
+{
+ // Properly initialise temp_time
+ memset(&temp_time, 0, sizeof(temp_time));
+ const time_t timep = 0;
+ gmtime_r(&timep, &temp_time);
+
+ RC_ADD_PARAMETER(offset, "TIST offset [s]");
+ RC_ADD_PARAMETER(timestamp, "FCT and timestamp [s]");
-void TimestampDecoder::calculateTimestamp(frame_timestamp& ts)
+ etiLog.level(info) << "Setting up timestamp decoder with " <<
+ timestamp_offset << " offset";
+}
+
+std::shared_ptr<frame_timestamp> TimestampDecoder::getTimestamp()
{
- std::shared_ptr<frame_timestamp> ts_queued =
- std::make_shared<frame_timestamp>();
+ auto ts = std::make_shared<frame_timestamp>();
- /* Push new timestamp into queue */
- ts_queued->timestamp_valid = full_timestamp_received;
- ts_queued->timestamp_sec = time_secs;
- ts_queued->timestamp_pps = time_pps;
- ts_queued->fct = latestFCT;
+ ts->timestamp_valid = full_timestamp_received;
+ ts->timestamp_sec = time_secs;
+ ts->timestamp_pps = time_pps;
+ ts->fct = latestFCT;
+ ts->fp = latestFP;
- ts_queued->timestamp_refresh = offset_changed;
+ ts->timestamp_refresh = offset_changed;
offset_changed = false;
MDEBUG("time_secs=%d, time_pps=%f\n", time_secs,
(double)time_pps / 16384000.0);
- *ts_queued += timestamp_offset;
-
- queue_timestamps.push(ts_queued);
-
- /* Here, the queue size is one more than the pipeline delay, because
- * we've just added a new element in the queue.
- *
- * Therefore, use <= and not < for comparison
- */
- if (queue_timestamps.size() <= m_tist_delay_stages) {
- //fprintf(stderr, "* %zu %u ", queue_timestamps.size(), m_tist_delay_stages);
- /* Return invalid timestamp until the queue is full */
- ts.timestamp_valid = false;
- ts.timestamp_sec = 0;
- ts.timestamp_pps = 0;
- ts.timestamp_refresh = false;
- ts.fct = -1;
- }
- else {
- //fprintf(stderr, ". %zu ", queue_timestamps.size());
- /* Return timestamp from queue */
- ts_queued = queue_timestamps.front();
- queue_timestamps.pop();
- /*fprintf(stderr, "ts_queued v:%d, sec:%d, pps:%f, ref:%d\n",
- ts_queued->timestamp_valid,
- ts_queued->timestamp_sec,
- ts_queued->timestamp_pps_offset,
- ts_queued->timestamp_refresh);*/
- ts = *ts_queued;
- /*fprintf(stderr, "ts v:%d, sec:%d, pps:%f, ref:%d\n\n",
- ts.timestamp_valid,
- ts.timestamp_sec,
- ts.timestamp_pps_offset,
- ts.timestamp_refresh);*/
- }
-
- MDEBUG("Timestamp queue size %zu, delay_calc %u\n",
- queue_timestamps.size(),
- m_tist_delay_stages);
-
- if (queue_timestamps.size() > m_tist_delay_stages) {
- etiLog.level(error) << "Error: Timestamp queue is too large : size " <<
- queue_timestamps.size() << "! This should not happen !";
- }
+ *ts += timestamp_offset;
- //ts.print("calc2 ");
+ return ts;
}
-void TimestampDecoder::pushMNSCData(int framephase, uint16_t mnsc)
+void TimestampDecoder::pushMNSCData(uint8_t framephase, uint16_t mnsc)
{
struct eti_MNSC_TIME_0 *mnsc0;
struct eti_MNSC_TIME_1 *mnsc1;
struct eti_MNSC_TIME_2 *mnsc2;
struct eti_MNSC_TIME_3 *mnsc3;
- switch (framephase)
- {
+ switch (framephase) {
case 0:
mnsc0 = (struct eti_MNSC_TIME_0*)&mnsc;
enableDecode = (mnsc0->type == 0) &&
@@ -126,10 +94,10 @@ void TimestampDecoder::pushMNSCData(int framephase, uint16_t mnsc)
temp_time.tm_sec = mnsc1->second_tens * 10 + mnsc1->second_unit;
temp_time.tm_min = mnsc1->minute_tens * 10 + mnsc1->minute_unit;
- if (!mnsc1->sync_to_frame)
- {
+ if (!mnsc1->sync_to_frame) {
enableDecode = false;
- PDEBUG("TimestampDecoder: MNSC time info is not synchronised to frame\n");
+ PDEBUG("TimestampDecoder: "
+ "MNSC time info is not synchronised to frame\n");
}
break;
@@ -145,9 +113,7 @@ void TimestampDecoder::pushMNSCData(int framephase, uint16_t mnsc)
temp_time.tm_mon = (mnsc3->month_tens * 10 + mnsc3->month_unit) - 1;
temp_time.tm_year = (mnsc3->year_tens * 10 + mnsc3->year_unit) + 100;
- if (enableDecode)
- {
- full_timestamp_received = true;
+ if (enableDecode) {
updateTimestampSeconds(mktime(&temp_time));
}
break;
@@ -160,15 +126,14 @@ void TimestampDecoder::pushMNSCData(int framephase, uint16_t mnsc)
void TimestampDecoder::updateTimestampSeconds(uint32_t secs)
{
- if (inhibit_second_update > 0)
- {
+ if (inhibit_second_update > 0) {
MDEBUG("TimestampDecoder::updateTimestampSeconds(%d) inhibit\n", secs);
inhibit_second_update--;
}
- else
- {
+ else {
MDEBUG("TimestampDecoder::updateTimestampSeconds(%d) apply\n", secs);
time_secs = secs;
+ full_timestamp_received = true;
}
}
@@ -176,8 +141,7 @@ void TimestampDecoder::updateTimestampPPS(uint32_t pps)
{
MDEBUG("TimestampDecoder::updateTimestampPPS(%f)\n", (double)pps / 16384000.0);
- if (time_pps > pps) // Second boundary crossed
- {
+ if (time_pps > pps) { // Second boundary crossed
MDEBUG("TimestampDecoder::updateTimestampPPS crossed second\n");
// The second for the next eight frames will not
@@ -190,7 +154,7 @@ void TimestampDecoder::updateTimestampPPS(uint32_t pps)
}
void TimestampDecoder::updateTimestampEti(
- int framephase,
+ uint8_t framephase,
uint16_t mnsc,
uint32_t pps, // In units of 1/16384000 s
int32_t fct)
@@ -198,16 +162,19 @@ void TimestampDecoder::updateTimestampEti(
updateTimestampPPS(pps);
pushMNSCData(framephase, mnsc);
latestFCT = fct;
+ latestFP = framephase;
}
void TimestampDecoder::updateTimestampEdi(
uint32_t seconds_utc,
uint32_t pps, // In units of 1/16384000 s
- int32_t fct)
+ int32_t fct,
+ uint8_t framephase)
{
time_secs = seconds_utc;
time_pps = pps;
latestFCT = fct;
+ latestFP = framephase;
full_timestamp_received = true;
}
diff --git a/src/TimestampDecoder.h b/src/TimestampDecoder.h
index db8f816..33d9992 100644
--- a/src/TimestampDecoder.h
+++ b/src/TimestampDecoder.h
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -26,41 +26,24 @@
#pragma once
-#include <queue>
+#include <cstdint>
#include <memory>
#include <string>
-#include <time.h>
#include <math.h>
#include <stdio.h>
-#include "Eti.h"
-#include "Log.h"
#include "RemoteControl.h"
struct frame_timestamp
{
// Which frame count does this timestamp apply to
int32_t fct;
+ uint8_t fp; // Frame Phase
uint32_t timestamp_sec;
uint32_t timestamp_pps; // In units of 1/16384000 s
- bool timestamp_valid;
+ bool timestamp_valid = false;
bool timestamp_refresh;
- frame_timestamp() = default;
- frame_timestamp(const frame_timestamp& other) = default;
- frame_timestamp& operator=(const frame_timestamp &rhs)
- {
- if (this != &rhs) {
- this->timestamp_sec = rhs.timestamp_sec;
- this->timestamp_pps = rhs.timestamp_pps;
- this->timestamp_valid = rhs.timestamp_valid;
- this->timestamp_refresh = rhs.timestamp_refresh;
- this->fct = rhs.fct;
- }
-
- return *this;
- }
-
frame_timestamp& operator+=(const double& diff)
{
double offset_pps, offset_secs;
@@ -69,16 +52,14 @@ struct frame_timestamp
this->timestamp_sec += lrintf(offset_secs);
this->timestamp_pps += lrintf(offset_pps * 16384000.0);
- while (this->timestamp_pps >= 16384000)
- {
+ while (this->timestamp_pps >= 16384000) {
this->timestamp_pps -= 16384000;
this->timestamp_sec += 1;
- };
+ }
return *this;
}
- const frame_timestamp operator+(const double diff)
- {
+ const frame_timestamp operator+(const double diff) {
frame_timestamp ts = *this;
ts += diff;
return ts;
@@ -88,8 +69,19 @@ struct frame_timestamp
return timestamp_pps / 16384000.0;
}
- void print(const char* t)
- {
+ double get_real_secs() const {
+ double t = timestamp_sec;
+ t += pps_offset();
+ return t;
+ }
+
+ long long int get_ns() const {
+ long long int ns = timestamp_sec * 1000000000ull;
+ ns += llrint((double)timestamp_pps / 0.016384);
+ return ns;
+ }
+
+ void print(const char* t) const {
fprintf(stderr,
"%s <frame_timestamp(%s, %d, %.9f, %d)>\n",
t, this->timestamp_valid ? "valid" : "invalid",
@@ -103,49 +95,16 @@ struct frame_timestamp
class TimestampDecoder : public RemoteControllable
{
public:
- TimestampDecoder(
- /* The modulator adds this offset to the TIST to define time of
- * frame transmission
- */
- double& offset_s,
-
- /* Specifies by how many stages the timestamp must be delayed.
- * (e.g. The FIRFilter is pipelined, therefore we must increase
- * tist_delay_stages by one if the filter is used
- */
- unsigned tist_delay_stages) :
- RemoteControllable("tist"),
- timestamp_offset(offset_s)
- {
- m_tist_delay_stages = tist_delay_stages;
- inhibit_second_update = 0;
- time_pps = 0.0;
- time_secs = 0;
- latestFCT = 0;
- enableDecode = false;
- full_timestamp_received = false;
-
- // Properly initialise temp_time
- memset(&temp_time, 0, sizeof(temp_time));
- const time_t timep = 0;
- gmtime_r(&timep, &temp_time);
-
- offset_changed = false;
-
- RC_ADD_PARAMETER(offset, "TIST offset [s]");
- RC_ADD_PARAMETER(timestamp, "FCT and timestamp [s]");
-
- etiLog.level(info) << "Setting up timestamp decoder with " <<
- timestamp_offset << " offset";
-
- };
-
- /* Calculate the timestamp for the current frame. */
- void calculateTimestamp(frame_timestamp& ts);
+ /* offset_s: The modulator adds this offset to the TIST to define time of
+ * frame transmission
+ */
+ TimestampDecoder(double& offset_s);
+
+ std::shared_ptr<frame_timestamp> getTimestamp(void);
/* Update timestamp data from ETI */
void updateTimestampEti(
- int framephase,
+ uint8_t framephase,
uint16_t mnsc,
uint32_t pps, // In units of 1/16384000 s
int32_t fct);
@@ -154,7 +113,8 @@ class TimestampDecoder : public RemoteControllable
void updateTimestampEdi(
uint32_t seconds_utc,
uint32_t pps, // In units of 1/16384000 s
- int32_t fct);
+ int32_t fct,
+ uint8_t framephase);
/*********** REMOTE CONTROL ***************/
@@ -171,7 +131,7 @@ class TimestampDecoder : public RemoteControllable
protected:
/* Push a new MNSC field into the decoder */
- void pushMNSCData(int framephase, uint16_t mnsc);
+ void pushMNSCData(uint8_t framephase, uint16_t mnsc);
/* Each frame contains the TIST field with the PPS offset.
* For each frame, this function must be called to update
@@ -191,28 +151,20 @@ class TimestampDecoder : public RemoteControllable
void updateTimestampSeconds(uint32_t secs);
struct tm temp_time;
- uint32_t time_secs;
- int32_t latestFCT;
- uint32_t time_pps;
+ uint32_t time_secs = 0;
+ int32_t latestFCT = 0;
+ uint32_t latestFP = 0;
+ uint32_t time_pps = 0;
double& timestamp_offset;
- unsigned m_tist_delay_stages;
- int inhibit_second_update;
- bool offset_changed;
+ int inhibit_second_update = 0;
+ bool offset_changed = false;
/* When the type or identifier don't match, the decoder must
* be disabled
*/
- bool enableDecode;
+ bool enableDecode = false;
/* Disable timstamps until full time has been received */
- bool full_timestamp_received;
-
- /* when pipelining, we must shift the calculated timestamps
- * through this queue. Otherwise, it would not be possible to
- * synchronise two modulators if only one uses (for instance) the
- * FIRFilter (1 stage pipeline)
- */
- std::queue<std::shared_ptr<frame_timestamp> > queue_timestamps;
-
+ bool full_timestamp_received = false;
};
diff --git a/src/Utils.cpp b/src/Utils.cpp
index f423dc1..b4816d3 100644
--- a/src/Utils.cpp
+++ b/src/Utils.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -214,3 +214,17 @@ double parseChannel(const std::string& chan)
}
return freq;
}
+
+std::chrono::milliseconds transmission_frame_duration(unsigned int dabmode)
+{
+ using namespace std::chrono;
+ switch (dabmode) {
+ case 1: return milliseconds(96);
+ case 2: return milliseconds(24);
+ case 3: return milliseconds(24);
+ case 4: return milliseconds(48);
+ default:
+ throw std::runtime_error("invalid DAB mode");
+ }
+}
+
diff --git a/src/Utils.h b/src/Utils.h
index 6a36baf..9e88488 100644
--- a/src/Utils.h
+++ b/src/Utils.h
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -36,6 +36,7 @@
#include <stdio.h>
#include <time.h>
#include <string>
+#include <chrono>
void printUsage(const char* progName);
@@ -43,22 +44,6 @@ void printVersion(void);
void printStartupInfo(void);
-inline long timespecdiff_us(struct timespec& oldTime, struct timespec& time)
-{
- long tv_sec;
- long tv_nsec;
- if (time.tv_nsec < oldTime.tv_nsec) {
- tv_sec = time.tv_sec - 1 - oldTime.tv_sec;
- tv_nsec = 1000000000L + time.tv_nsec - oldTime.tv_nsec;
- }
- else {
- tv_sec = time.tv_sec - oldTime.tv_sec;
- tv_nsec = time.tv_nsec - oldTime.tv_nsec;
- }
-
- return tv_sec * 1000 + tv_nsec / 1000;
-}
-
// Set SCHED_RR with priority prio (0=lowest)
int set_realtime_prio(int prio);
@@ -68,3 +53,6 @@ void set_thread_name(const char *name);
// Convert a channel like 10A to a frequency
double parseChannel(const std::string& chan);
+// dabMode is either 1, 2, 3, 4, corresponding to TM I, TM II, TM III and TM IV.
+// throws a runtime_error if dabMode is not one of these values.
+std::chrono::milliseconds transmission_frame_duration(unsigned int dabmode);
diff --git a/src/OutputUHDFeedback.cpp b/src/output/Feedback.cpp
index 68783f2..f0bbd98 100644
--- a/src/OutputUHDFeedback.cpp
+++ b/src/output/Feedback.cpp
@@ -34,41 +34,41 @@ DESCRIPTION:
# include <config.h>
#endif
-#ifdef HAVE_OUTPUT_UHD
-
#include <vector>
#include <complex>
#include <cstring>
-#include <uhd/types/stream_cmd.hpp>
#include <sys/socket.h>
#include <errno.h>
#include <poll.h>
#include <boost/date_time/posix_time/posix_time.hpp>
-#include "OutputUHDFeedback.h"
+#include "output/Feedback.h"
#include "Utils.h"
#include "Socket.h"
using namespace std;
-typedef std::complex<float> complexf;
-OutputUHDFeedback::OutputUHDFeedback(
- uhd::usrp::multi_usrp::sptr usrp,
+namespace Output {
+
+DPDFeedbackServer::DPDFeedbackServer(
+ std::shared_ptr<SDRDevice> device,
uint16_t port,
- uint32_t sampleRate)
+ uint32_t sampleRate) :
+ m_port(port),
+ m_sampleRate(sampleRate),
+ m_device(device)
{
- m_port = port;
- m_sampleRate = sampleRate;
- m_usrp = usrp;
-
if (m_port) {
m_running.store(true);
- rx_burst_thread = boost::thread(&OutputUHDFeedback::ReceiveBurstThread, this);
- burst_tcp_thread = boost::thread(&OutputUHDFeedback::ServeFeedbackThread, this);
+ rx_burst_thread = boost::thread(
+ &DPDFeedbackServer::ReceiveBurstThread, this);
+
+ burst_tcp_thread = boost::thread(
+ &DPDFeedbackServer::ServeFeedbackThread, this);
}
}
-OutputUHDFeedback::~OutputUHDFeedback()
+DPDFeedbackServer::~DPDFeedbackServer()
{
m_running.store(false);
@@ -83,12 +83,12 @@ OutputUHDFeedback::~OutputUHDFeedback()
}
}
-void OutputUHDFeedback::set_tx_frame(
+void DPDFeedbackServer::set_tx_frame(
const std::vector<uint8_t> &buf,
const struct frame_timestamp &buf_ts)
{
if (not m_running) {
- throw runtime_error("OutputUHDFeedback not running");
+ throw runtime_error("DPDFeedbackServer not running");
}
boost::mutex::scoped_lock lock(burstRequest.mutex);
@@ -131,13 +131,10 @@ void OutputUHDFeedback::set_tx_frame(
}
}
-void OutputUHDFeedback::ReceiveBurstThread()
+void DPDFeedbackServer::ReceiveBurstThread()
{
try {
- set_thread_name("uhdreceiveburst");
-
- uhd::stream_args_t stream_args("fc32"); //complex floats
- auto rxStream = m_usrp->get_rx_stream(stream_args);
+ set_thread_name("dpdreceiveburst");
while (m_running) {
boost::mutex::scoped_lock lock(burstRequest.mutex);
@@ -148,43 +145,40 @@ void OutputUHDFeedback::ReceiveBurstThread()
if (not m_running) break;
- uhd::stream_cmd_t cmd(
- uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE);
- cmd.num_samps = burstRequest.num_samples;
- cmd.stream_now = false;
+ const size_t num_samps = burstRequest.num_samples;
- double pps = burstRequest.rx_pps / 16384000.0;
- cmd.time_spec = uhd::time_spec_t(burstRequest.rx_second, pps);
+ frame_timestamp ts;
+ ts.timestamp_sec = burstRequest.rx_second;
+ ts.timestamp_pps = burstRequest.rx_pps;
+ ts.timestamp_valid = true;
// We need to free the mutex while we recv(), because otherwise we block the
// TX thread
lock.unlock();
- const double usrp_time = m_usrp->get_time_now().get_real_secs();
- const double cmd_time = cmd.time_spec.get_real_secs();
-
- rxStream->issue_stream_cmd(cmd);
+ const double device_time = m_device->get_real_secs();
+ const double cmd_time = ts.get_real_secs();
- uhd::rx_metadata_t md;
-
- std::vector<uint8_t> buf(cmd.num_samps * sizeof(complexf));
+ std::vector<uint8_t> buf(num_samps * sizeof(complexf));
const double timeout = 60;
- size_t samples_read = rxStream->recv(&buf[0], cmd.num_samps, md, timeout);
+ size_t samples_read = m_device->receive_frame(
+ reinterpret_cast<complexf*>(buf.data()),
+ num_samps, ts, timeout);
lock.lock();
burstRequest.rx_samples = std::move(buf);
burstRequest.rx_samples.resize(samples_read * sizeof(complexf));
// The recv might have happened at another time than requested
- burstRequest.rx_second = md.time_spec.get_full_secs();
- burstRequest.rx_pps = md.time_spec.get_frac_secs() * 16384000.0;
+ burstRequest.rx_second = ts.timestamp_sec;
+ burstRequest.rx_pps = ts.timestamp_pps;
etiLog.level(debug) << "DPD: acquired " << samples_read <<
" RX feedback samples " <<
"at time " << burstRequest.tx_second << " + " <<
std::fixed << burstRequest.tx_pps / 16384000.0 <<
- " Delta=" << cmd_time - usrp_time;
+ " Delta=" << cmd_time - device_time;
burstRequest.state = BurstRequestState::Acquired;
@@ -205,7 +199,7 @@ void OutputUHDFeedback::ReceiveBurstThread()
m_running.store(false);
}
-void OutputUHDFeedback::ServeFeedback()
+void DPDFeedbackServer::ServeFeedback()
{
TCPSocket m_server_sock;
m_server_sock.listen(m_port);
@@ -335,9 +329,9 @@ void OutputUHDFeedback::ServeFeedback()
}
}
-void OutputUHDFeedback::ServeFeedbackThread()
+void DPDFeedbackServer::ServeFeedbackThread()
{
- set_thread_name("uhdservefeedback");
+ set_thread_name("dpdfeedbackserver");
while (m_running) {
try {
@@ -359,4 +353,4 @@ void OutputUHDFeedback::ServeFeedbackThread()
m_running.store(false);
}
-#endif
+} // namespace Output
diff --git a/src/OutputUHDFeedback.h b/src/output/Feedback.h
index 80d287f..2cad508 100644
--- a/src/OutputUHDFeedback.h
+++ b/src/output/Feedback.h
@@ -36,11 +36,6 @@ DESCRIPTION:
# include <config.h>
#endif
-#ifdef HAVE_OUTPUT_UHD
-
-#include <uhd/utils/thread_priority.hpp>
-#include <uhd/utils/safe_main.hpp>
-#include <uhd/usrp/multi_usrp.hpp>
#include <boost/thread.hpp>
#include <memory>
#include <string>
@@ -48,6 +43,9 @@ DESCRIPTION:
#include "Log.h"
#include "TimestampDecoder.h"
+#include "output/SDRDevice.h"
+
+namespace Output {
enum class BurstRequestState {
None, // To pending request
@@ -56,7 +54,7 @@ enum class BurstRequestState {
Acquired, // Both TX and RX frames are ready
};
-struct UHDReceiveBurstRequest {
+struct FeedbackBurstRequest {
// All fields in this struct are protected
mutable boost::mutex mutex;
boost::condition_variable mutex_notification;
@@ -83,21 +81,21 @@ struct UHDReceiveBurstRequest {
};
// Serve TX samples and RX feedback samples over a TCP connection
-class OutputUHDFeedback {
+class DPDFeedbackServer {
public:
- OutputUHDFeedback(
- uhd::usrp::multi_usrp::sptr usrp,
- uint16_t port,
+ DPDFeedbackServer(
+ std::shared_ptr<SDRDevice> device,
+ uint16_t port, // Set to 0 to disable the Feedbackserver
uint32_t sampleRate);
- OutputUHDFeedback(const OutputUHDFeedback& other) = delete;
- OutputUHDFeedback& operator=(const OutputUHDFeedback& other) = delete;
- ~OutputUHDFeedback();
+ DPDFeedbackServer(const DPDFeedbackServer& other) = delete;
+ DPDFeedbackServer& operator=(const DPDFeedbackServer& other) = delete;
+ ~DPDFeedbackServer();
void set_tx_frame(const std::vector<uint8_t> &buf,
const struct frame_timestamp& ts);
private:
- // Thread that reacts to burstRequests and receives from the USRP
+ // Thread that reacts to burstRequests and receives from the SDR device
void ReceiveBurstThread(void);
// Thread that listens for requests over TCP to get TX and RX feedback
@@ -107,13 +105,12 @@ class OutputUHDFeedback {
boost::thread rx_burst_thread;
boost::thread burst_tcp_thread;
- UHDReceiveBurstRequest burstRequest;
+ FeedbackBurstRequest burstRequest;
std::atomic_bool m_running;
uint16_t m_port = 0;
uint32_t m_sampleRate = 0;
- uhd::usrp::multi_usrp::sptr m_usrp;
+ std::shared_ptr<SDRDevice> m_device;
};
-
-#endif // HAVE_OUTPUT_UHD
+} // namespace Output
diff --git a/src/output/SDR.cpp b/src/output/SDR.cpp
new file mode 100644
index 0000000..34341bd
--- /dev/null
+++ b/src/output/SDR.cpp
@@ -0,0 +1,425 @@
+/*
+ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
+ Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMod.
+
+ ODR-DabMod is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMod is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "output/SDR.h"
+
+#include "PcDebug.h"
+#include "Log.h"
+#include "RemoteControl.h"
+#include "Utils.h"
+
+#include <cmath>
+#include <iostream>
+#include <assert.h>
+#include <stdexcept>
+#include <stdio.h>
+#include <time.h>
+#include <errno.h>
+#include <unistd.h>
+#include <pthread.h>
+
+using namespace std;
+
+namespace Output {
+
+// Maximum number of frames that can wait in frames
+static constexpr size_t FRAMES_MAX_SIZE = 8;
+
+// If the timestamp is further in the future than
+// 100 seconds, abort
+static constexpr double TIMESTAMP_ABORT_FUTURE = 100;
+
+// Add a delay to increase buffers when
+// frames are too far in the future
+static constexpr double TIMESTAMP_MARGIN_FUTURE = 0.5;
+
+SDR::SDR(SDRDeviceConfig& config, std::shared_ptr<SDRDevice> device) :
+ ModOutput(), ModMetadata(), RemoteControllable("sdr"),
+ m_config(config),
+ m_device(device)
+{
+ // muting is remote-controllable
+ m_config.muting = false;
+
+ m_device_thread = std::thread(&SDR::process_thread_entry, this);
+
+ m_dpd_feedback_server = make_shared<DPDFeedbackServer>(
+ m_device,
+ m_config.dpdFeedbackServerPort,
+ m_config.sampleRate);
+}
+
+SDR::~SDR()
+{
+ m_running.store(false);
+
+ FrameData end_marker;
+ end_marker.buf.clear();
+ m_queue.push(end_marker);
+
+ if (m_device_thread.joinable()) {
+ m_device_thread.join();
+ }
+}
+
+int SDR::process(Buffer *dataIn)
+{
+ if (not m_running) {
+ throw std::runtime_error("SDR thread failed");
+ }
+
+ const uint8_t* pDataIn = (uint8_t*)dataIn->getData();
+ m_frame.resize(dataIn->getLength());
+ std::copy(pDataIn, pDataIn + dataIn->getLength(),
+ m_frame.begin());
+
+ // We will effectively transmit the frame once we got the metadata.
+
+ return dataIn->getLength();
+}
+
+meta_vec_t SDR::process_metadata(const meta_vec_t& metadataIn)
+{
+ if (m_device and m_running) {
+ FrameData frame;
+ frame.buf = std::move(m_frame);
+
+ if (metadataIn.empty()) {
+ etiLog.level(info) <<
+ "SDR output: dropping one frame with invalid FCT";
+ }
+ else {
+ /* In transmission modes where several ETI frames are needed to
+ * build one transmission frame (like in TM 1), we will have
+ * several entries in metadataIn. Take the first one, which
+ * comes from the earliest ETI frame.
+ * This behaviour is different to earlier versions of ODR-DabMod,
+ * which took the timestamp from the latest ETI frame.
+ */
+ frame.ts = *(metadataIn[0].ts);
+
+ // TODO check device running
+
+ try {
+ if (m_dpd_feedback_server) {
+ m_dpd_feedback_server->set_tx_frame(frame.buf, frame.ts);
+ }
+ }
+ catch (const runtime_error& e) {
+ etiLog.level(warn) <<
+ "SDR output: Feedback server failed, restarting...";
+
+ m_dpd_feedback_server = std::make_shared<DPDFeedbackServer>(
+ m_device,
+ m_config.dpdFeedbackServerPort,
+ m_config.sampleRate);
+ }
+
+ size_t num_frames = m_queue.push_wait_if_full(frame,
+ FRAMES_MAX_SIZE);
+ etiLog.log(trace, "SDR,push %zu", num_frames);
+ }
+ }
+ else {
+ // Ignore frame
+ }
+ return {};
+}
+
+
+void SDR::process_thread_entry()
+{
+ // Set thread priority to realtime
+ if (int ret = set_realtime_prio(1)) {
+ etiLog.level(error) << "Could not set priority for SDR device thread:" << ret;
+ }
+
+ set_thread_name("sdrdevice");
+
+ last_tx_time_initialised = false;
+
+ size_t last_num_underflows = 0;
+ size_t pop_prebuffering = FRAMES_MAX_SIZE;
+
+ m_running.store(true);
+
+ try {
+ while (m_running.load()) {
+ struct FrameData frame;
+ etiLog.log(trace, "SDR,wait");
+ m_queue.wait_and_pop(frame, pop_prebuffering);
+ etiLog.log(trace, "SDR,pop");
+
+ if (m_running.load() == false or frame.buf.empty()) {
+ break;
+ }
+
+ if (m_device) {
+ handle_frame(frame);
+
+ const auto rs = m_device->get_run_statistics();
+
+ /* Ensure we fill frames after every underrun and
+ * at startup to reduce underrun likelihood. */
+ if (last_num_underflows < rs.num_underruns) {
+ pop_prebuffering = FRAMES_MAX_SIZE;
+ }
+ else {
+ pop_prebuffering = 1;
+ }
+
+ last_num_underflows = rs.num_underruns;
+ }
+ }
+ }
+ catch (const runtime_error& e) {
+ etiLog.level(error) << "SDR output thread caught runtime error: " << e.what();
+ }
+
+ m_running.store(false);
+}
+
+const char* SDR::name()
+{
+ if (m_device) {
+ m_name = "OutputSDR(";
+ m_name += m_device->device_name();
+ m_name += ")";
+ }
+ else {
+ m_name = "OutputSDR(<no device>)";
+ }
+ return m_name.c_str();
+}
+
+void SDR::sleep_through_frame()
+{
+ using namespace std::chrono;
+
+ const auto now = steady_clock::now();
+
+ if (not t_last_frame_initialised) {
+ t_last_frame = now;
+ t_last_frame_initialised = true;
+ }
+
+ const auto delta = now - t_last_frame;
+ const auto wait_time = transmission_frame_duration(m_config.dabMode);
+
+ if (wait_time > delta) {
+ this_thread::sleep_for(wait_time - delta);
+ }
+
+ t_last_frame += wait_time;
+}
+
+void SDR::handle_frame(struct FrameData& frame)
+{
+ // Assumes m_device is valid
+
+ constexpr double tx_timeout = 20.0;
+
+ if (not m_device->is_clk_source_ok()) {
+ sleep_through_frame();
+ return;
+ }
+
+ double device_time = m_device->get_real_secs();
+ const auto& time_spec = frame.ts;
+
+ if (m_config.enableSync and m_config.muteNoTimestamps and
+ not time_spec.timestamp_valid) {
+ sleep_through_frame();
+ etiLog.log(info,
+ "OutputSDR: Muting sample %d : no timestamp\n",
+ frame.ts.fct);
+ return;
+ }
+
+ if (m_config.enableSync and time_spec.timestamp_valid) {
+ // Tx time from MNSC and TIST
+ const uint32_t tx_second = frame.ts.timestamp_sec;
+ const uint32_t tx_pps = frame.ts.timestamp_pps;
+
+ if (not frame.ts.timestamp_valid) {
+ /* We have not received a full timestamp through
+ * MNSC. We sleep through the frame.
+ */
+ etiLog.level(info) <<
+ "OutputSDR: Throwing sample " << frame.ts.fct <<
+ " away: incomplete timestamp " << tx_second <<
+ " / " << tx_pps;
+ return;
+ }
+
+ if (last_tx_time_initialised) {
+ const size_t sizeIn = frame.buf.size() / sizeof(complexf);
+ uint64_t increment = (uint64_t)sizeIn * 16384000ul /
+ (uint64_t)m_config.sampleRate;
+ // samps * ticks/s / (samps/s)
+ // (samps * ticks * s) / (s * samps)
+ // ticks
+
+ uint32_t expected_sec = last_tx_second + increment / 16384000ul;
+ uint32_t expected_pps = last_tx_pps + increment % 16384000ul;
+
+ while (expected_pps >= 16384000) {
+ expected_sec++;
+ expected_pps -= 16384000;
+ }
+
+ if (expected_sec != tx_second or expected_pps != tx_pps) {
+ etiLog.level(warn) << "OutputSDR: timestamp irregularity!" <<
+ std::fixed <<
+ " Expected " <<
+ expected_sec << "+" << (double)expected_pps/16384000.0 <<
+ "(" << expected_pps << ")" <<
+ " Got " <<
+ tx_second << "+" << (double)tx_pps/16384000.0 <<
+ "(" << tx_pps << ")";
+
+ frame.ts.timestamp_refresh = true;
+ }
+ }
+
+ last_tx_second = tx_second;
+ last_tx_pps = tx_pps;
+ last_tx_time_initialised = true;
+
+ const double pps_offset = tx_pps / 16384000.0;
+
+ etiLog.log(trace, "SDR,tist %f", time_spec.get_real_secs());
+
+ if (time_spec.get_real_secs() + tx_timeout < device_time) {
+ etiLog.level(warn) <<
+ "OutputSDR: Timestamp in the past! offset: " <<
+ std::fixed <<
+ time_spec.get_real_secs() - device_time <<
+ " (" << device_time << ")"
+ " frame " << frame.ts.fct <<
+ ", tx_second " << tx_second <<
+ ", pps " << pps_offset;
+ return;
+ }
+
+ if (time_spec.get_real_secs() > device_time + TIMESTAMP_ABORT_FUTURE) {
+ etiLog.level(error) <<
+ "OutputSDR: Timestamp way too far in the future! offset: " <<
+ std::fixed <<
+ time_spec.get_real_secs() - device_time;
+ throw std::runtime_error("Timestamp error. Aborted.");
+ }
+ }
+
+ if (m_config.muting) {
+ etiLog.log(info,
+ "OutputSDR: Muting sample %d requested\n",
+ frame.ts.fct);
+ return;
+ }
+
+ m_device->transmit_frame(frame);
+}
+
+// =======================================
+// Remote Control
+// =======================================
+void SDR::set_parameter(const string& parameter, const string& value)
+{
+ stringstream ss(value);
+ ss.exceptions ( stringstream::failbit | stringstream::badbit );
+
+ if (parameter == "txgain") {
+ ss >> m_config.txgain;
+ m_device->set_txgain(m_config.txgain);
+ }
+ else if (parameter == "rxgain") {
+ ss >> m_config.rxgain;
+ m_device->set_rxgain(m_config.rxgain);
+ }
+ else if (parameter == "freq") {
+ ss >> m_config.frequency;
+ m_device->tune(m_config.lo_offset, m_config.frequency);
+ m_config.frequency = m_device->get_tx_freq();
+ }
+ else if (parameter == "muting") {
+ ss >> m_config.muting;
+ }
+ else if (parameter == "underruns" or
+ parameter == "latepackets" or
+ parameter == "frames") {
+ throw ParameterError("Parameter " + parameter + " is read-only.");
+ }
+ else {
+ stringstream ss_err;
+ ss_err << "Parameter '" << parameter
+ << "' is not exported by controllable " << get_rc_name();
+ throw ParameterError(ss_err.str());
+ }
+}
+
+const string SDR::get_parameter(const string& parameter) const
+{
+ stringstream ss;
+ if (parameter == "txgain") {
+ ss << m_config.txgain;
+ }
+ else if (parameter == "rxgain") {
+ ss << m_config.rxgain;
+ }
+ else if (parameter == "freq") {
+ ss << m_config.frequency;
+ }
+ else if (parameter == "muting") {
+ ss << m_config.muting;
+ }
+ else if (parameter == "underruns" or
+ parameter == "latepackets" or
+ parameter == "frames" ) {
+ if (not m_device) {
+ throw ParameterError("OutputSDR has no device");
+ }
+ const auto stat = m_device->get_run_statistics();
+
+ if (parameter == "underruns") {
+ ss << stat.num_underruns;
+ }
+ else if (parameter == "latepackets") {
+ ss << stat.num_late_packets;
+ }
+ else if (parameter == "frames") {
+ ss << stat.num_frames_modulated;
+ }
+ }
+ else {
+ ss << "Parameter '" << parameter <<
+ "' is not exported by controllable " << get_rc_name();
+ throw ParameterError(ss.str());
+ }
+ return ss.str();
+}
+
+} // namespace Output
diff --git a/src/output/SDR.h b/src/output/SDR.h
new file mode 100644
index 0000000..a55f7c0
--- /dev/null
+++ b/src/output/SDR.h
@@ -0,0 +1,95 @@
+/*
+ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
+ Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+
+DESCRIPTION:
+ Common interface for all SDR outputs
+*/
+
+/*
+ This file is part of ODR-DabMod.
+
+ ODR-DabMod is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMod is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#ifdef HAVE_CONFIG_H
+# include <config.h>
+#endif
+
+#include <chrono>
+#include "ModPlugin.h"
+#include "EtiReader.h"
+#include "output/SDRDevice.h"
+#include "output/Feedback.h"
+
+namespace Output {
+
+using complexf = std::complex<float>;
+
+class SDR : public ModOutput, public ModMetadata, public RemoteControllable {
+ public:
+ SDR(SDRDeviceConfig& config, std::shared_ptr<SDRDevice> device);
+ SDR(const SDR& other) = delete;
+ SDR operator=(const SDR& other) = delete;
+ ~SDR();
+
+ virtual int process(Buffer *dataIn) override;
+ virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) override;
+
+ virtual const char* name() override;
+
+ /*********** REMOTE CONTROL ***************/
+
+ /* Base function to set parameters. */
+ virtual void set_parameter(const std::string& parameter,
+ const std::string& value) override;
+
+ /* Getting a parameter always returns a string. */
+ virtual const std::string get_parameter(
+ const std::string& parameter) const override;
+
+ private:
+ void process_thread_entry(void);
+ void handle_frame(struct FrameData &frame);
+ void sleep_through_frame(void);
+
+ SDRDeviceConfig& m_config;
+
+ std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
+ std::thread m_device_thread;
+ std::vector<uint8_t> m_frame;
+ ThreadsafeQueue<FrameData> m_queue;
+
+ std::shared_ptr<SDRDevice> m_device;
+ std::string m_name;
+
+ std::shared_ptr<DPDFeedbackServer> m_dpd_feedback_server;
+
+ bool last_tx_time_initialised = false;
+ uint32_t last_tx_second = 0;
+ uint32_t last_tx_pps = 0;
+
+ bool t_last_frame_initialised = false;
+ std::chrono::steady_clock::time_point t_last_frame;
+};
+
+}
+
diff --git a/src/output/SDRDevice.h b/src/output/SDRDevice.h
new file mode 100644
index 0000000..bd1a518
--- /dev/null
+++ b/src/output/SDRDevice.h
@@ -0,0 +1,137 @@
+/*
+ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
+ Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2017
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+
+DESCRIPTION:
+ Common interface for all SDR outputs
+*/
+
+/*
+ This file is part of ODR-DabMod.
+
+ ODR-DabMod is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMod is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#ifdef HAVE_CONFIG_H
+# include <config.h>
+#endif
+
+#include <cstdint>
+#include <string>
+#include <vector>
+#include <complex>
+
+#include "TimestampDecoder.h"
+
+namespace Output {
+
+enum refclk_lock_loss_behaviour_t { CRASH, IGNORE };
+
+using complexf = std::complex<float>;
+
+/* This structure is used as initial configuration for all SDR devices.
+ * It must also contain all remote-controllable settings, otherwise
+ * they will get lost on a modulator restart. */
+struct SDRDeviceConfig {
+ std::string device;
+ std::string subDevice; // For UHD
+ std::string tx_antenna;
+ std::string rx_antenna;
+
+ long masterClockRate = 32768000;
+ unsigned sampleRate = 2048000;
+ double frequency = 0.0;
+ double lo_offset = 0.0;
+ double txgain = 0.0;
+ double rxgain = 0.0;
+ bool enableSync = false;
+
+ // When working with timestamps, mute the frames that
+ // do not have a timestamp
+ bool muteNoTimestamps = false;
+ unsigned dabMode = 0;
+ unsigned maxGPSHoldoverTime = 0;
+
+ /* allowed values for UHD : auto, int, sma, mimo */
+ std::string refclk_src;
+
+ /* allowed values for UHD : int, sma, mimo */
+ std::string pps_src;
+
+ /* allowed values for UHD : pos, neg */
+ std::string pps_polarity;
+
+ /* What to do when the reference clock PLL loses lock */
+ refclk_lock_loss_behaviour_t refclk_lock_loss_behaviour;
+
+ // muting can only be changed using the remote control
+ bool muting = false;
+
+ // TCP port on which to serve TX and RX samples for the
+ // digital pre distortion learning tool
+ uint16_t dpdFeedbackServerPort = 0;
+};
+
+// Each frame contains one OFDM frame, and its
+// associated timestamp
+struct FrameData {
+ // Buffer holding frame data
+ std::vector<uint8_t> buf;
+
+ // A full timestamp contains a TIST according to standard
+ // and time information within MNSC with tx_second.
+ struct frame_timestamp ts;
+};
+
+
+// All SDR Devices must implement the SDRDevice interface
+class SDRDevice {
+ public:
+ struct RunStatistics {
+ size_t num_underruns;
+ size_t num_late_packets;
+ size_t num_overruns;
+ size_t num_frames_modulated;
+ };
+
+ virtual void tune(double lo_offset, double frequency) = 0;
+ virtual double get_tx_freq(void) const = 0;
+ virtual void set_txgain(double txgain) = 0;
+ virtual double get_txgain(void) const = 0;
+ virtual void transmit_frame(const struct FrameData& frame) = 0;
+ virtual RunStatistics get_run_statistics(void) const = 0;
+ virtual double get_real_secs(void) const = 0;
+ virtual void set_rxgain(double rxgain) = 0;
+ virtual double get_rxgain(void) const = 0;
+ virtual size_t receive_frame(
+ complexf *buf,
+ size_t num_samples,
+ struct frame_timestamp& ts,
+ double timeout_secs) = 0;
+
+
+ // Return true if GPS and reference clock inputs are ok
+ virtual bool is_clk_source_ok(void) const = 0;
+
+ virtual const char* device_name(void) const = 0;
+};
+
+} // namespace Output
diff --git a/src/output/Soapy.cpp b/src/output/Soapy.cpp
new file mode 100644
index 0000000..8ee420e
--- /dev/null
+++ b/src/output/Soapy.cpp
@@ -0,0 +1,274 @@
+/*
+ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
+ Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+
+DESCRIPTION:
+ It is an output driver using the SoapySDR library that can output to
+ many devices.
+*/
+
+/*
+ This file is part of ODR-DabMod.
+
+ ODR-DabMod is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMod is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "output/Soapy.h"
+
+#ifdef HAVE_SOAPYSDR
+
+#include <SoapySDR/Errors.hpp>
+#include <chrono>
+#include <cstdio>
+
+#include "Log.h"
+#include "Utils.h"
+
+using namespace std;
+
+namespace Output {
+
+static constexpr size_t FRAMES_MAX_SIZE = 2;
+
+Soapy::Soapy(SDRDeviceConfig& config) :
+ SDRDevice(),
+ m_conf(config)
+{
+ etiLog.level(info) <<
+ "Soapy:Creating the device with: " <<
+ m_conf.device;
+
+ try {
+ m_device = SoapySDR::Device::make(m_conf.device);
+ stringstream ss;
+ ss << "SoapySDR driver=" << m_device->getDriverKey();
+ ss << " hardware=" << m_device->getHardwareKey();
+ for (const auto &it : m_device->getHardwareInfo()) {
+ ss << " " << it.first << "=" << it.second;
+ }
+ }
+ catch (const std::exception &ex) {
+ etiLog.level(error) << "Error making SoapySDR device: " <<
+ ex.what();
+ throw std::runtime_error("Cannot create SoapySDR output");
+ }
+
+ m_device->setMasterClockRate(m_conf.masterClockRate);
+ etiLog.level(info) << "SoapySDR master clock rate set to " <<
+ std::fixed << std::setprecision(4) <<
+ m_device->getMasterClockRate()/1000.0 << " kHz";
+
+ m_device->setSampleRate(SOAPY_SDR_TX, 0, m_conf.sampleRate);
+ etiLog.level(info) << "SoapySDR:Actual TX rate: " <<
+ std::fixed << std::setprecision(4) <<
+ m_device->getSampleRate(SOAPY_SDR_TX, 0) / 1000.0 <<
+ " ksps.";
+
+ tune(m_conf.lo_offset, m_conf.frequency);
+ m_conf.frequency = m_device->getFrequency(SOAPY_SDR_TX, 0);
+ etiLog.level(info) << "SoapySDR:Actual frequency: " <<
+ std::fixed << std::setprecision(3) <<
+ m_conf.frequency / 1000.0 << " kHz.";
+
+ m_device->setGain(SOAPY_SDR_TX, 0, m_conf.txgain);
+ etiLog.level(info) << "SoapySDR:Actual tx gain: " <<
+ std::fixed << std::setprecision(2) <<
+ m_device->getGain(SOAPY_SDR_TX, 0);
+
+ if (not m_conf.tx_antenna.empty()) {
+ m_device->setAntenna(SOAPY_SDR_TX, 0, m_conf.tx_antenna);
+ }
+ etiLog.level(info) << "SoapySDR:Actual tx antenna: " <<
+ m_device->getAntenna(SOAPY_SDR_TX, 0);
+
+ const std::vector<size_t> channels({0});
+ m_tx_stream = m_device->setupStream(SOAPY_SDR_TX, "CF32", channels);
+ m_device->activateStream(m_tx_stream);
+
+ m_rx_stream = m_device->setupStream(SOAPY_SDR_RX, "CF32", channels);
+}
+
+Soapy::~Soapy()
+{
+ if (m_device != nullptr) {
+ if (m_tx_stream != nullptr) {
+ m_device->closeStream(m_tx_stream);
+ }
+ SoapySDR::Device::unmake(m_device);
+ }
+}
+
+void Soapy::tune(double lo_offset, double frequency)
+{
+ if (not m_device) throw runtime_error("Soapy device not set up");
+
+ SoapySDR::Kwargs offset_arg;
+ offset_arg["OFFSET"] = to_string(lo_offset);
+ m_device->setFrequency(SOAPY_SDR_TX, 0, m_conf.frequency, offset_arg);
+}
+
+double Soapy::get_tx_freq(void) const
+{
+ if (not m_device) throw runtime_error("Soapy device not set up");
+
+ // TODO lo offset
+ return m_device->getFrequency(SOAPY_SDR_TX, 0);
+}
+
+void Soapy::set_txgain(double txgain)
+{
+ m_conf.txgain = txgain;
+ if (not m_device) throw runtime_error("Soapy device not set up");
+ m_device->setGain(SOAPY_SDR_TX, 0, m_conf.txgain);
+}
+
+double Soapy::get_txgain(void) const
+{
+ if (not m_device) throw runtime_error("Soapy device not set up");
+ return m_device->getGain(SOAPY_SDR_TX, 0);
+}
+
+SDRDevice::RunStatistics Soapy::get_run_statistics(void) const
+{
+ RunStatistics rs;
+ rs.num_underruns = underflows;
+ rs.num_overruns = overflows;
+ rs.num_late_packets = late_packets;
+ rs.num_frames_modulated = num_frames_modulated;
+ return rs;
+}
+
+
+double Soapy::get_real_secs(void) const
+{
+ if (m_device) {
+ long long time_ns = m_device->getHardwareTime();
+ return time_ns / 1e9;
+ }
+ else {
+ return 0.0;
+ }
+}
+
+void Soapy::set_rxgain(double rxgain)
+{
+ m_device->setGain(SOAPY_SDR_RX, 0, m_conf.rxgain);
+ m_conf.rxgain = m_device->getGain(SOAPY_SDR_RX, 0);
+}
+
+double Soapy::get_rxgain(void) const
+{
+ return m_device->getGain(SOAPY_SDR_RX, 0);
+}
+
+size_t Soapy::receive_frame(
+ complexf *buf,
+ size_t num_samples,
+ struct frame_timestamp& ts,
+ double timeout_secs)
+{
+ int flags = 0;
+ long long timeNs = ts.get_ns();
+ const size_t numElems = num_samples;
+
+ void *buffs[1];
+ buffs[0] = buf;
+
+ m_device->activateStream(m_rx_stream, flags, timeNs, numElems);
+
+ auto ret = m_device->readStream(m_tx_stream, buffs, num_samples, flags, timeNs);
+
+ m_device->deactivateStream(m_rx_stream);
+
+ // TODO update effective receive ts
+
+ if (ret < 0) {
+ throw runtime_error("Soapy readStream error: " + to_string(ret));
+ }
+
+ return ret;
+}
+
+
+bool Soapy::is_clk_source_ok() const
+{
+ // TODO
+ return true;
+}
+
+const char* Soapy::device_name(void) const
+{
+ return "Soapy";
+}
+
+void Soapy::transmit_frame(const struct FrameData& frame)
+{
+ if (not m_device) throw runtime_error("Soapy device not set up");
+
+ // TODO timestamps
+
+ // The frame buffer contains bytes representing FC32 samples
+ const complexf *buf = reinterpret_cast<const complexf*>(frame.buf.data());
+ const size_t numSamples = frame.buf.size() / sizeof(complexf);
+ if ((frame.buf.size() % sizeof(complexf)) != 0) {
+ throw std::runtime_error("Soapy: invalid buffer size");
+ }
+
+ // Stream MTU is in samples, not bytes.
+ const size_t mtu = m_device->getStreamMTU(m_tx_stream);
+
+ size_t num_acc_samps = 0;
+ while (num_acc_samps < numSamples) {
+ const void *buffs[1];
+ buffs[0] = buf + num_acc_samps;
+
+ const size_t samps_to_send = std::min(numSamples - num_acc_samps, mtu);
+
+ int flags = 0;
+
+ auto ret = m_device->writeStream(m_tx_stream, buffs, samps_to_send, flags);
+
+ if (ret == SOAPY_SDR_TIMEOUT) {
+ continue;
+ }
+ else if (ret == SOAPY_SDR_OVERFLOW) {
+ overflows++;
+ continue;
+ }
+ else if (ret == SOAPY_SDR_UNDERFLOW) {
+ underflows++;
+ continue;
+ }
+
+ if (ret < 0) {
+ etiLog.level(error) << "Unexpected stream error " <<
+ SoapySDR::errToStr(ret);
+ throw std::runtime_error("Fault in Soapy");
+ }
+
+ num_acc_samps += ret;
+ }
+ num_frames_modulated++;
+}
+
+} // namespace Output
+
+#endif // HAVE_SOAPYSDR
+
+
diff --git a/src/output/Soapy.h b/src/output/Soapy.h
new file mode 100644
index 0000000..67b280d
--- /dev/null
+++ b/src/output/Soapy.h
@@ -0,0 +1,98 @@
+/*
+ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
+ Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+
+DESCRIPTION:
+ It is an output driver using the SoapySDR library that can output to
+ many devices.
+*/
+
+/*
+ This file is part of ODR-DabMod.
+
+ ODR-DabMod is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMod is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#ifdef HAVE_CONFIG_H
+# include <config.h>
+#endif
+
+#ifdef HAVE_SOAPYSDR
+#include <SoapySDR/Version.hpp>
+#include <SoapySDR/Modules.hpp>
+#include <SoapySDR/Registry.hpp>
+#include <SoapySDR/Device.hpp>
+
+#include <string>
+#include <memory>
+
+#include "output/SDR.h"
+#include "ModPlugin.h"
+#include "EtiReader.h"
+#include "RemoteControl.h"
+#include "ThreadsafeQueue.h"
+
+namespace Output {
+
+class Soapy : public Output::SDRDevice
+{
+ public:
+ Soapy(SDRDeviceConfig& config);
+ Soapy(const Soapy& other) = delete;
+ Soapy& operator=(const Soapy& other) = delete;
+ ~Soapy();
+
+ virtual void tune(double lo_offset, double frequency) override;
+ virtual double get_tx_freq(void) const override;
+ virtual void set_txgain(double txgain) override;
+ virtual double get_txgain(void) const override;
+ virtual void transmit_frame(const struct FrameData& frame) override;
+ virtual RunStatistics get_run_statistics(void) const override;
+ virtual double get_real_secs(void) const override;
+
+ virtual void set_rxgain(double rxgain) override;
+ virtual double get_rxgain(void) const override;
+ virtual size_t receive_frame(
+ complexf *buf,
+ size_t num_samples,
+ struct frame_timestamp& ts,
+ double timeout_secs) override;
+
+ // Return true if GPS and reference clock inputs are ok
+ virtual bool is_clk_source_ok(void) const override;
+ virtual const char* device_name(void) const override;
+
+ private:
+ SDRDeviceConfig& m_conf;
+ SoapySDR::Device *m_device = nullptr;
+ SoapySDR::Stream *m_tx_stream = nullptr;
+ SoapySDR::Stream *m_rx_stream = nullptr;
+
+ size_t underflows = 0;
+ size_t overflows = 0;
+ size_t late_packets = 0;
+ size_t num_frames_modulated = 0;
+};
+
+} // namespace Output
+
+#endif //HAVE_SOAPYSDR
+
diff --git a/src/output/UHD.cpp b/src/output/UHD.cpp
new file mode 100644
index 0000000..2c571fd
--- /dev/null
+++ b/src/output/UHD.cpp
@@ -0,0 +1,500 @@
+/*
+ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
+ Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMod.
+
+ ODR-DabMod is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMod is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "output/UHD.h"
+
+#ifdef HAVE_OUTPUT_UHD
+
+//#define MDEBUG(fmt, args...) fprintf(LOG, fmt , ## args)
+#define MDEBUG(fmt, args...)
+
+#include "PcDebug.h"
+#include "Log.h"
+#include "RemoteControl.h"
+#include "Utils.h"
+
+#include <boost/thread/future.hpp>
+
+#include <uhd/utils/msg.hpp>
+
+#include <cmath>
+#include <iostream>
+#include <assert.h>
+#include <stdexcept>
+#include <stdio.h>
+#include <time.h>
+#include <errno.h>
+#include <unistd.h>
+#include <pthread.h>
+
+using namespace std;
+
+namespace Output {
+
+// Maximum number of frames that can wait in frames
+static const size_t FRAMES_MAX_SIZE = 8;
+
+static std::string stringtrim(const std::string &s)
+{
+ auto wsfront = std::find_if_not(s.begin(), s.end(),
+ [](int c){ return std::isspace(c);} );
+ return std::string(wsfront,
+ std::find_if_not(s.rbegin(),
+ std::string::const_reverse_iterator(wsfront),
+ [](int c){ return std::isspace(c);} ).base());
+}
+
+static void uhd_msg_handler(uhd::msg::type_t type, const std::string &msg)
+{
+ if (type == uhd::msg::warning) {
+ etiLog.level(warn) << "UHD Warning: " << msg;
+ }
+ else if (type == uhd::msg::error) {
+ etiLog.level(error) << "UHD Error: " << msg;
+ }
+ else {
+ // do not print very short U messages and such
+ if (stringtrim(msg).size() != 1) {
+ etiLog.level(debug) << "UHD Message: " << msg;
+ }
+ }
+}
+
+
+
+UHD::UHD(SDRDeviceConfig& config) :
+ SDRDevice(),
+ m_conf(config),
+ m_running(false)
+{
+ std::stringstream device;
+ device << m_conf.device;
+
+ if (m_conf.masterClockRate != 0) {
+ if (device.str() != "") {
+ device << ",";
+ }
+ device << "master_clock_rate=" << m_conf.masterClockRate;
+ }
+
+ MDEBUG("OutputUHD::OutputUHD(device: %s) @ %p\n",
+ device.str().c_str(), this);
+
+ uhd::msg::register_handler(uhd_msg_handler);
+
+ uhd::set_thread_priority_safe();
+
+ etiLog.log(info, "OutputUHD:Creating the usrp device with: %s...",
+ device.str().c_str());
+
+ m_usrp = uhd::usrp::multi_usrp::make(device.str());
+
+ etiLog.log(info, "OutputUHD:Using device: %s...",
+ m_usrp->get_pp_string().c_str());
+
+ if (m_conf.masterClockRate != 0.0) {
+ double master_clk_rate = m_usrp->get_master_clock_rate();
+ etiLog.log(debug, "OutputUHD:Checking master clock rate: %f...",
+ master_clk_rate);
+
+ if (fabs(master_clk_rate - m_conf.masterClockRate) >
+ (m_conf.masterClockRate * 1e-6)) {
+ throw std::runtime_error("Cannot set USRP master_clock_rate. Aborted.");
+ }
+ }
+
+ MDEBUG("OutputUHD:Setting REFCLK and PPS input...\n");
+
+ if (m_conf.refclk_src == "gpsdo-ettus") {
+ m_usrp->set_clock_source("gpsdo");
+ }
+ else {
+ m_usrp->set_clock_source(m_conf.refclk_src);
+ }
+ m_usrp->set_time_source(m_conf.pps_src);
+
+ m_device_time = std::make_shared<USRPTime>(m_usrp, m_conf);
+
+ if (m_conf.subDevice != "") {
+ m_usrp->set_tx_subdev_spec(uhd::usrp::subdev_spec_t(m_conf.subDevice),
+ uhd::usrp::multi_usrp::ALL_MBOARDS);
+ }
+
+ etiLog.level(debug) << "UHD clock source is " << m_usrp->get_clock_source(0);
+
+ etiLog.level(debug) << "UHD time source is " << m_usrp->get_time_source(0);
+
+ m_usrp->set_tx_rate(m_conf.sampleRate);
+ etiLog.log(debug, "OutputUHD:Set rate to %d. Actual TX Rate: %f sps...",
+ m_conf.sampleRate, m_usrp->get_tx_rate());
+
+ if (fabs(m_usrp->get_tx_rate() / m_conf.sampleRate) >
+ m_conf.sampleRate * 1e-6) {
+ throw std::runtime_error("Cannot set USRP sample rate. Aborted.");
+ }
+
+ tune(m_conf.lo_offset, m_conf.frequency);
+
+ m_conf.frequency = m_usrp->get_tx_freq();
+ etiLog.level(debug) << std::fixed << std::setprecision(3) <<
+ "OutputUHD:Actual TX frequency: " << m_conf.frequency;
+
+ etiLog.level(debug) << std::fixed << std::setprecision(3) <<
+ "OutputUHD:Actual RX frequency: " << m_usrp->get_tx_freq();
+
+ m_usrp->set_tx_gain(m_conf.txgain);
+ m_conf.txgain = m_usrp->get_tx_gain();
+ etiLog.log(debug, "OutputUHD:Actual TX Gain: %f", m_conf.txgain);
+
+ etiLog.log(debug, "OutputUHD:Mute on missing timestamps: %s",
+ m_conf.muteNoTimestamps ? "enabled" : "disabled");
+
+ m_usrp->set_rx_rate(m_conf.sampleRate);
+ etiLog.log(debug, "OutputUHD:Actual RX Rate: %f sps.", m_usrp->get_rx_rate());
+
+ if (not m_conf.rx_antenna.empty()) {
+ m_usrp->set_rx_antenna(m_conf.rx_antenna);
+ }
+ etiLog.log(debug, "OutputUHD:Actual RX Antenna: %s",
+ m_usrp->get_rx_antenna().c_str());
+
+ if (not m_conf.tx_antenna.empty()) {
+ m_usrp->set_tx_antenna(m_conf.tx_antenna);
+ }
+ etiLog.log(debug, "OutputUHD:Actual TX Antenna: %s",
+ m_usrp->get_tx_antenna().c_str());
+
+ m_usrp->set_rx_gain(m_conf.rxgain);
+ etiLog.log(debug, "OutputUHD:Actual RX Gain: %f", m_usrp->get_rx_gain());
+
+ const uhd::stream_args_t stream_args("fc32"); //complex floats
+ m_rx_stream = m_usrp->get_rx_stream(stream_args);
+ m_tx_stream = m_usrp->get_tx_stream(stream_args);
+
+ m_running.store(true);
+ m_async_rx_thread = boost::thread(&UHD::print_async_thread, this);
+
+ MDEBUG("OutputUHD:UHD ready.\n");
+}
+
+UHD::~UHD()
+{
+ stop_threads();
+}
+
+void UHD::tune(double lo_offset, double frequency)
+{
+ if (lo_offset != 0.0) {
+ etiLog.level(info) << std::fixed << std::setprecision(3) <<
+ "OutputUHD:Setting freq to " << frequency <<
+ " with LO offset " << lo_offset << "...";
+
+ const auto tr = uhd::tune_request_t(frequency, lo_offset);
+ uhd::tune_result_t result = m_usrp->set_tx_freq(tr);
+
+ etiLog.level(debug) << "OutputUHD: TX freq" <<
+ std::fixed << std::setprecision(0) <<
+ " Target RF: " << result.target_rf_freq <<
+ " Actual RF: " << result.actual_rf_freq <<
+ " Target DSP: " << result.target_dsp_freq <<
+ " Actual DSP: " << result.actual_dsp_freq;
+
+ uhd::tune_result_t result_rx = m_usrp->set_rx_freq(tr);
+
+ etiLog.level(debug) << "OutputUHD: RX freq" <<
+ std::fixed << std::setprecision(0) <<
+ " Target RF: " << result_rx.target_rf_freq <<
+ " Actual RF: " << result_rx.actual_rf_freq <<
+ " Target DSP: " << result_rx.target_dsp_freq <<
+ " Actual DSP: " << result_rx.actual_dsp_freq;
+ }
+ else {
+ //set the centre frequency
+ etiLog.level(info) << std::fixed << std::setprecision(3) <<
+ "OutputUHD:Setting freq to " << frequency << "...";
+ m_usrp->set_tx_freq(frequency);
+
+ m_usrp->set_rx_freq(frequency);
+ }
+}
+
+double UHD::get_tx_freq(void) const
+{
+ return m_usrp->get_tx_freq();
+}
+
+void UHD::set_txgain(double txgain)
+{
+ m_usrp->set_tx_gain(txgain);
+ m_conf.txgain = m_usrp->get_tx_gain();
+}
+
+double UHD::get_txgain(void) const
+{
+ return m_usrp->get_tx_gain();
+}
+
+void UHD::transmit_frame(const struct FrameData& frame)
+{
+ const double tx_timeout = 20.0;
+ const size_t sizeIn = frame.buf.size() / sizeof(complexf);
+ const complexf* in_data = reinterpret_cast<const complexf*>(&frame.buf[0]);
+
+ uhd::tx_metadata_t md_tx;
+
+ bool tx_allowed = true;
+
+ // muting and mutenotimestamp is handled by SDR
+ if (m_conf.enableSync and frame.ts.timestamp_valid) {
+ uhd::time_spec_t timespec(
+ frame.ts.timestamp_sec, frame.ts.pps_offset());
+ md_tx.time_spec = timespec;
+ md_tx.has_time_spec = true;
+ }
+ else {
+ md_tx.has_time_spec = false;
+ }
+
+ size_t usrp_max_num_samps = m_tx_stream->get_max_num_samps();
+ size_t num_acc_samps = 0; //number of accumulated samples
+ while (tx_allowed and m_running.load() and (num_acc_samps < sizeIn)) {
+ size_t samps_to_send = std::min(sizeIn - num_acc_samps, usrp_max_num_samps);
+
+ const bool eob_because_muting = m_conf.muting;
+
+ // ensure the the last packet has EOB set if the timestamps has been
+ // refreshed and need to be reconsidered. If muting was set, set the
+ // EOB and quit the loop afterwards, to avoid an underrun.
+ md_tx.end_of_burst = eob_because_muting or (
+ frame.ts.timestamp_valid and
+ frame.ts.timestamp_refresh and
+ samps_to_send <= usrp_max_num_samps );
+
+ //send a single packet
+ size_t num_tx_samps = m_tx_stream->send(
+ &in_data[num_acc_samps],
+ samps_to_send, md_tx, tx_timeout);
+ etiLog.log(trace, "UHD,sent %zu of %zu", num_tx_samps, samps_to_send);
+
+ num_acc_samps += num_tx_samps;
+
+ md_tx.time_spec = md_tx.time_spec +
+ uhd::time_spec_t(0, num_tx_samps/m_conf.sampleRate);
+
+ if (num_tx_samps == 0) {
+ etiLog.log(warn,
+ "OutputUHD unable to write to device, skipping frame!");
+ break;
+ }
+
+ if (eob_because_muting) {
+ break;
+ }
+ }
+
+ num_frames_modulated++;
+}
+
+
+SDRDevice::RunStatistics UHD::get_run_statistics(void) const
+{
+ RunStatistics rs;
+ rs.num_underruns = num_underflows;
+ rs.num_overruns = num_overflows;
+ rs.num_late_packets = num_late_packets;
+ rs.num_frames_modulated = num_frames_modulated;
+ return rs;
+}
+
+double UHD::get_real_secs(void) const
+{
+ return m_usrp->get_time_now().get_real_secs();
+}
+
+void UHD::set_rxgain(double rxgain)
+{
+ m_usrp->set_rx_gain(m_conf.rxgain);
+ m_conf.rxgain = m_usrp->get_rx_gain();
+}
+
+double UHD::get_rxgain() const
+{
+ return m_usrp->get_rx_gain();
+}
+
+size_t UHD::receive_frame(
+ complexf *buf,
+ size_t num_samples,
+ struct frame_timestamp& ts,
+ double timeout_secs)
+{
+ uhd::stream_cmd_t cmd(
+ uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE);
+ cmd.num_samps = num_samples;
+ cmd.stream_now = false;
+ cmd.time_spec = uhd::time_spec_t(ts.timestamp_sec, ts.pps_offset());
+
+ m_rx_stream->issue_stream_cmd(cmd);
+
+ uhd::rx_metadata_t md_rx;
+
+ constexpr double timeout = 60;
+ size_t samples_read = m_rx_stream->recv(buf, num_samples, md_rx, timeout);
+
+ // Update the ts with the effective receive TS
+ ts.timestamp_sec = md_rx.time_spec.get_full_secs();
+ ts.timestamp_pps = md_rx.time_spec.get_frac_secs() * 16384000.0;
+ return samples_read;
+}
+
+// Return true if GPS and reference clock inputs are ok
+bool UHD::is_clk_source_ok(void) const
+{
+ bool ok = true;
+
+ if (refclk_loss_needs_check()) {
+ try {
+ if (not m_usrp->get_mboard_sensor("ref_locked", 0).to_bool()) {
+ ok = false;
+
+ etiLog.level(alert) <<
+ "OutputUHD: External reference clock lock lost !";
+
+ if (m_conf.refclk_lock_loss_behaviour == CRASH) {
+ throw std::runtime_error(
+ "OutputUHD: External reference clock lock lost.");
+ }
+ }
+ }
+ catch (uhd::lookup_error &e) {
+ suppress_refclk_loss_check = true;
+ etiLog.log(warn, "OutputUHD: This USRP does not have mboard "
+ "sensor for ext clock loss. Check disabled.");
+ }
+ }
+
+ if (m_device_time) {
+ ok |= m_device_time->verify_time();
+ }
+
+ return ok;
+}
+
+const char* UHD::device_name(void) const
+{
+ return "UHD";
+}
+
+
+bool UHD::refclk_loss_needs_check() const
+{
+ if (suppress_refclk_loss_check) {
+ return false;
+ }
+ return m_conf.refclk_src != "internal";
+}
+
+void UHD::stop_threads()
+{
+ m_running.store(false);
+ if (m_async_rx_thread.joinable()) {
+ m_async_rx_thread.join();
+ }
+}
+
+
+
+void UHD::print_async_thread()
+{
+ while (m_running.load()) {
+ uhd::async_metadata_t async_md;
+ if (m_usrp->get_device()->recv_async_msg(async_md, 1)) {
+ const char* uhd_async_message = "";
+ bool failure = false;
+ switch (async_md.event_code) {
+ case uhd::async_metadata_t::EVENT_CODE_BURST_ACK:
+ break;
+ case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW:
+ uhd_async_message = "Underflow";
+ num_underflows++;
+ break;
+ case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR:
+ uhd_async_message = "Packet loss between host and device.";
+ failure = true;
+ break;
+ case uhd::async_metadata_t::EVENT_CODE_TIME_ERROR:
+ uhd_async_message = "Packet had time that was late.";
+ num_late_packets++;
+ break;
+ case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET:
+ uhd_async_message = "Underflow occurred inside a packet.";
+ failure = true;
+ break;
+ case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR_IN_BURST:
+ uhd_async_message = "Packet loss within a burst.";
+ failure = true;
+ break;
+ default:
+ uhd_async_message = "unknown event code";
+ failure = true;
+ break;
+ }
+
+ if (failure) {
+ etiLog.level(alert) <<
+ "Received Async UHD Message '" <<
+ uhd_async_message << "' at time " <<
+ async_md.time_spec.get_real_secs();
+ }
+ }
+
+ auto time_now = std::chrono::steady_clock::now();
+ if (last_print_time + std::chrono::seconds(1) < time_now) {
+ const double usrp_time =
+ m_usrp->get_time_now().get_real_secs();
+
+ if ( (num_underflows > num_underflows_previous) or
+ (num_late_packets > num_late_packets_previous)) {
+ etiLog.log(info,
+ "OutputUHD status (usrp time: %f): "
+ "%d underruns and %d late packets since last status.\n",
+ usrp_time,
+ num_underflows - num_underflows_previous,
+ num_late_packets - num_late_packets_previous);
+ }
+
+ num_underflows_previous = num_underflows;
+ num_late_packets_previous = num_late_packets;
+
+ last_print_time = time_now;
+ }
+ }
+}
+
+} // namespace Output
+
+#endif // HAVE_OUTPUT_UHD
+
diff --git a/src/output/UHD.h b/src/output/UHD.h
new file mode 100644
index 0000000..b34455c
--- /dev/null
+++ b/src/output/UHD.h
@@ -0,0 +1,127 @@
+/*
+ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
+ Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+
+DESCRIPTION:
+ It is an output driver for the USRP family of devices, and uses the UHD
+ library.
+*/
+
+/*
+ This file is part of ODR-DabMod.
+
+ ODR-DabMod is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMod is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#ifdef HAVE_CONFIG_H
+# include <config.h>
+#endif
+
+#ifdef HAVE_OUTPUT_UHD
+
+#include <uhd/utils/thread_priority.hpp>
+#include <uhd/utils/safe_main.hpp>
+#include <uhd/usrp/multi_usrp.hpp>
+#include <chrono>
+#include <memory>
+#include <string>
+#include <atomic>
+
+#include "Log.h"
+#include "output/SDR.h"
+#include "output/USRPTime.h"
+#include "TimestampDecoder.h"
+#include "RemoteControl.h"
+#include "ThreadsafeQueue.h"
+
+#include <stdio.h>
+#include <sys/types.h>
+
+// If the timestamp is further in the future than
+// 100 seconds, abort
+#define TIMESTAMP_ABORT_FUTURE 100
+
+// Add a delay to increase buffers when
+// frames are too far in the future
+#define TIMESTAMP_MARGIN_FUTURE 0.5
+
+namespace Output {
+
+class UHD : public Output::SDRDevice
+{
+ public:
+ UHD(SDRDeviceConfig& config);
+ UHD(const UHD& other) = delete;
+ UHD& operator=(const UHD& other) = delete;
+ ~UHD();
+
+ virtual void tune(double lo_offset, double frequency) override;
+ virtual double get_tx_freq(void) const override;
+ virtual void set_txgain(double txgain) override;
+ virtual double get_txgain(void) const override;
+ virtual void transmit_frame(const struct FrameData& frame) override;
+ virtual RunStatistics get_run_statistics(void) const override;
+ virtual double get_real_secs(void) const override;
+
+ virtual void set_rxgain(double rxgain) override;
+ virtual double get_rxgain(void) const override;
+ virtual size_t receive_frame(
+ complexf *buf,
+ size_t num_samples,
+ struct frame_timestamp& ts,
+ double timeout_secs) override;
+
+ // Return true if GPS and reference clock inputs are ok
+ virtual bool is_clk_source_ok(void) const override;
+ virtual const char* device_name(void) const override;
+
+ private:
+ SDRDeviceConfig& m_conf;
+ uhd::usrp::multi_usrp::sptr m_usrp;
+ uhd::tx_streamer::sptr m_tx_stream;
+ uhd::rx_streamer::sptr m_rx_stream;
+ std::shared_ptr<USRPTime> m_device_time;
+
+ size_t num_underflows = 0;
+ size_t num_overflows = 0;
+ size_t num_late_packets = 0;
+ size_t num_frames_modulated = 0;
+ size_t num_underflows_previous = 0;
+ size_t num_late_packets_previous = 0;
+
+ // Used to print statistics once a second
+ std::chrono::steady_clock::time_point last_print_time;
+
+ // Returns true if we want to verify loss of refclk
+ bool refclk_loss_needs_check(void) const;
+ mutable bool suppress_refclk_loss_check = false;
+
+ // Poll asynchronous metadata from UHD
+ std::atomic<bool> m_running;
+ boost::thread m_async_rx_thread;
+ void stop_threads(void);
+ void print_async_thread(void);
+};
+
+} // namespace Output
+
+#endif // HAVE_OUTPUT_UHD
+
diff --git a/src/output/USRPTime.cpp b/src/output/USRPTime.cpp
new file mode 100644
index 0000000..935d56b
--- /dev/null
+++ b/src/output/USRPTime.cpp
@@ -0,0 +1,283 @@
+/*
+ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
+ Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+
+DESCRIPTION:
+ The part of the UHD output that takes care of the GPSDO.
+*/
+
+/*
+ This file is part of ODR-DabMod.
+
+ ODR-DabMod is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMod is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "output/USRPTime.h"
+
+#ifdef HAVE_OUTPUT_UHD
+
+//#define MDEBUG(fmt, args...) fprintf(LOG, fmt , ## args)
+#define MDEBUG(fmt, args...)
+
+namespace Output {
+
+using namespace std;
+
+
+// Check function for GPS TIMELOCK sensor from the ODR LEA-M8F board GPSDO
+static bool check_gps_timelock(uhd::usrp::multi_usrp::sptr& usrp)
+{
+ try {
+ const string sensor_value =
+ usrp->get_mboard_sensor("gps_timelock", 0).to_pp_string();
+
+ if (sensor_value.find("TIME LOCKED") == string::npos) {
+ etiLog.level(warn) << "OutputUHD: gps_timelock " << sensor_value;
+ return false;
+ }
+
+ return true;
+ }
+ catch (const uhd::lookup_error &e) {
+ etiLog.level(warn) << "OutputUHD: no gps_timelock sensor";
+ return false;
+ }
+}
+
+// Check function for GPS LOCKED sensor from the Ettus GPSDO
+static bool check_gps_locked(uhd::usrp::multi_usrp::sptr& usrp)
+{
+ try {
+ const uhd::sensor_value_t sensor_value(
+ usrp->get_mboard_sensor("gps_locked", 0));
+ if (not sensor_value.to_bool()) {
+ etiLog.level(warn) << "OutputUHD: gps_locked " <<
+ sensor_value.to_pp_string();
+ return false;
+ }
+
+ return true;
+ }
+ catch (const uhd::lookup_error &e) {
+ etiLog.level(warn) << "OutputUHD: no gps_locked sensor";
+ return false;
+ }
+}
+
+
+USRPTime::USRPTime(
+ uhd::usrp::multi_usrp::sptr usrp,
+ SDRDeviceConfig& conf) :
+ m_usrp(usrp),
+ m_conf(conf),
+ time_last_check(timepoint_t::clock::now())
+{
+ if (m_conf.pps_src == "none") {
+ if (m_conf.enableSync) {
+ etiLog.level(warn) <<
+ "OutputUHD: WARNING:"
+ " you are using synchronous transmission without PPS input!";
+ }
+
+ set_usrp_time_from_localtime();
+ }
+ else if (m_conf.pps_src == "pps" or m_conf.pps_src == "gpsdo") {
+ set_usrp_time_from_pps();
+ }
+ else {
+ throw std::runtime_error("USRPTime not implemented yet: " +
+ m_conf.pps_src);
+ }
+}
+
+bool USRPTime::verify_time()
+{
+ if (not gpsfix_needs_check()) {
+ return true;
+ }
+
+ /* During bootup, we say the gpsdo is not ok, and we poll the GPSDO until
+ * we reach lock. Then we sync time. If we do not reach lock in time, we
+ * crash.
+ *
+ * Once we are synced and we have lock, everything ok. If we lose lock for
+ * a number of seconds, we switch to the lost_fix state.
+ *
+ * In the lost fix state, we return false to get the TX muted, and we monitor.
+ * If the fix comes back, we unmute. If we reach the timeout, we crash.
+ */
+
+ check_gps();
+
+ const auto duration_without_fix =
+ gps_fix_check_interval * num_checks_without_gps_fix;
+
+ switch (gps_state) {
+ case gps_state_e::bootup:
+ if (duration_without_fix > initial_gps_fix_wait) {
+ throw runtime_error("GPS did not fix in " +
+ to_string(initial_gps_fix_wait) + " seconds");
+ }
+
+ if (num_checks_without_gps_fix == 0) {
+ if (m_conf.pps_src != "none") {
+ set_usrp_time_from_pps();
+ }
+ gps_state = gps_state_e::monitor_fix;
+ return true;
+ }
+
+ return false;
+
+ case gps_state_e::monitor_fix:
+ if (duration_without_fix > m_conf.maxGPSHoldoverTime) {
+ throw runtime_error("Lost GPS Fix for " +
+ to_string(duration_without_fix) + " seconds");
+ }
+
+ return true;
+ }
+
+ throw logic_error("End of USRPTime::verify_time() reached");
+}
+
+void USRPTime::check_gps()
+{
+ timepoint_t time_now = timepoint_t::clock::now();
+
+ // Divide interval by two because we alternate between
+ // launch and check
+ const auto checkinterval = chrono::seconds(lrint(gps_fix_check_interval/2.0));
+
+ if (gpsfix_needs_check() and time_last_check + checkinterval < time_now) {
+ time_last_check = time_now;
+
+ // Alternate between launching thread and checking the
+ // result.
+ if (gps_fix_task.joinable()) {
+ if (gps_fix_future.has_value()) {
+
+ gps_fix_future.wait();
+
+ gps_fix_task.join();
+
+ if (not gps_fix_future.get()) {
+ if (num_checks_without_gps_fix == 0) {
+ etiLog.level(alert) << "OutputUHD: GPS Time Lock lost";
+ }
+ num_checks_without_gps_fix++;
+ }
+ else {
+ if (num_checks_without_gps_fix) {
+ etiLog.level(info) << "OutputUHD: GPS Time Lock recovered";
+ }
+ num_checks_without_gps_fix = 0;
+ }
+ }
+ }
+ else {
+ // Checking the sensor here takes too much
+ // time, it has to be done in a separate thread.
+ if (gpsdo_is_ettus()) {
+ gps_fix_pt = boost::packaged_task<bool>(
+ boost::bind(check_gps_locked, m_usrp) );
+ }
+ else {
+ gps_fix_pt = boost::packaged_task<bool>(
+ boost::bind(check_gps_timelock, m_usrp) );
+ }
+ gps_fix_future = gps_fix_pt.get_future();
+
+ gps_fix_task = boost::thread(boost::move(gps_fix_pt));
+ }
+ }
+}
+
+bool USRPTime::gpsfix_needs_check() const
+{
+ if (m_conf.refclk_src == "internal") {
+ return false;
+ }
+ else if (m_conf.refclk_src == "gpsdo") {
+ return (m_conf.maxGPSHoldoverTime != 0);
+ }
+ else if (m_conf.refclk_src == "gpsdo-ettus") {
+ return (m_conf.maxGPSHoldoverTime != 0);
+ }
+ else {
+ return false;
+ }
+}
+
+bool USRPTime::gpsdo_is_ettus() const
+{
+ return (m_conf.refclk_src == "gpsdo-ettus");
+}
+
+/* Return a uhd:time_spec representing current system time
+ * with 1ms granularity. */
+static uhd::time_spec_t uhd_timespec_now(void)
+{
+ using namespace std::chrono;
+ auto n = system_clock::now();
+ const long long ticks = duration_cast<milliseconds>(n.time_since_epoch()).count();
+ return uhd::time_spec_t::from_ticks(ticks, 1000);
+}
+
+void USRPTime::set_usrp_time_from_localtime()
+{
+ const auto t = uhd_timespec_now();
+ m_usrp->set_time_now(t);
+
+ etiLog.level(info) << "OutputUHD: Setting USRP time to " <<
+ std::fixed << t.get_real_secs();
+}
+
+void USRPTime::set_usrp_time_from_pps()
+{
+ using namespace std::chrono;
+
+ /* handling time for synchronisation: wait until the next full
+ * second, and set the USRP time at next PPS */
+ auto now = uhd_timespec_now();
+ const time_t secs_since_epoch = now.get_full_secs();
+
+ while (secs_since_epoch + 1 > now.get_full_secs()) {
+ this_thread::sleep_for(milliseconds(1));
+ now = uhd_timespec_now();
+ }
+ /* We are now shortly after the second change.
+ * Wait 200ms to ensure the PPS comes later. */
+ this_thread::sleep_for(milliseconds(200));
+
+ const auto time_set = uhd::time_spec_t(secs_since_epoch + 2);
+ etiLog.level(info) << "OutputUHD: Setting USRP time next pps to " <<
+ std::fixed << time_set.get_real_secs();
+ m_usrp->set_time_next_pps(time_set);
+
+ // The UHD doc says we need to give the USRP one second to update
+ // all the internal registers.
+ this_thread::sleep_for(seconds(1));
+ etiLog.level(info) << "OutputUHD: USRP time " <<
+ std::fixed << m_usrp->get_time_now().get_real_secs();
+}
+
+} // namespace Output
+
+#endif // HAVE_OUTPUT_UHD
diff --git a/src/output/USRPTime.h b/src/output/USRPTime.h
new file mode 100644
index 0000000..7527f21
--- /dev/null
+++ b/src/output/USRPTime.h
@@ -0,0 +1,116 @@
+/*
+ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
+ Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2017
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+
+DESCRIPTION:
+ The part of the UHD output that takes care of the GPSDO and setting device
+ time.
+*/
+
+/*
+ This file is part of ODR-DabMod.
+
+ ODR-DabMod is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMod is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#ifdef HAVE_CONFIG_H
+# include <config.h>
+#endif
+
+#ifdef HAVE_OUTPUT_UHD
+
+#include <uhd/usrp/multi_usrp.hpp>
+#include <chrono>
+#include <memory>
+#include <string>
+#include <atomic>
+
+#include "Log.h"
+#include "output/SDR.h"
+#include "TimestampDecoder.h"
+#include "RemoteControl.h"
+#include "ThreadsafeQueue.h"
+
+#include <stdio.h>
+#include <sys/types.h>
+
+namespace Output {
+
+class USRPTime {
+ public:
+ USRPTime( uhd::usrp::multi_usrp::sptr usrp,
+ SDRDeviceConfig& conf);
+
+ // Verifies the GPSDO state, that the device time is ok.
+ // Returns true if all ok.
+ // Should be called more often than the gps_fix_check_interval
+ bool verify_time(void);
+
+ // Wait time in seconds to get fix
+ static const int initial_gps_fix_wait = 180;
+
+ // Interval for checking the GPS at runtime
+ static constexpr double gps_fix_check_interval = 10.0; // seconds
+
+ private:
+ enum class gps_state_e {
+ /* At startup, the LEA-M8F GPSDO gets issued a hotstart request to
+ * make sure we will not sync time on a PPS edge that is generated
+ * while the GPSDO is in holdover. In the bootup state, we wait for
+ * the first PPS after hotstart, and then sync time.
+ */
+ bootup,
+
+ /* Once the system is up, we check lock every now and then. If the
+ * fix is lost for too long, we crash.
+ */
+ monitor_fix,
+ };
+
+ void check_gps();
+
+ uhd::usrp::multi_usrp::sptr m_usrp;
+ SDRDeviceConfig& m_conf;
+
+ gps_state_e gps_state = gps_state_e::bootup;
+ int num_checks_without_gps_fix = 1;
+
+ using timepoint_t = std::chrono::time_point<std::chrono::steady_clock>;
+ timepoint_t time_last_check;
+
+ boost::packaged_task<bool> gps_fix_pt;
+ boost::unique_future<bool> gps_fix_future;
+ boost::thread gps_fix_task;
+
+ // Returns true if we want to check for the gps_timelock sensor
+ bool gpsfix_needs_check(void) const;
+
+ // Return true if the gpsdo is from ettus, false if it is the ODR
+ // LEA-M8F board is used
+ bool gpsdo_is_ettus(void) const;
+
+ void set_usrp_time_from_localtime(void);
+ void set_usrp_time_from_pps(void);
+};
+
+} // namespace Output
+
+#endif // HAVE_OUTPUT_UHD