aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Buffer.cpp1
-rw-r--r--src/CicEqualizer.cpp12
-rw-r--r--src/DabMod.cpp476
-rw-r--r--src/DabModulator.cpp153
-rw-r--r--src/DabModulator.h14
-rw-r--r--src/EtiReader.cpp43
-rw-r--r--src/EtiReader.h23
-rw-r--r--src/FIRFilter.cpp21
-rw-r--r--src/FIRFilter.h8
-rw-r--r--src/Flowgraph.cpp82
-rw-r--r--src/Flowgraph.h32
-rw-r--r--src/FrameMultiplexer.cpp8
-rw-r--r--src/FrameMultiplexer.h9
-rw-r--r--src/InputFileReader.cpp49
-rw-r--r--src/InputReader.h42
-rw-r--r--src/InputZeroMQReader.cpp75
-rw-r--r--src/Makefile.am127
-rw-r--r--src/OutputMemory.h2
-rw-r--r--src/OutputUHD.cpp880
-rw-r--r--src/OutputUHD.h93
-rw-r--r--src/OutputZeroMQ.cpp28
-rw-r--r--src/OutputZeroMQ.h5
-rw-r--r--src/RemoteControl.cpp6
-rw-r--r--src/RemoteControl.h10
-rw-r--r--src/ThreadsafeQueue.h24
-rw-r--r--src/TimestampDecoder.cpp112
-rw-r--r--src/TimestampDecoder.h84
-rw-r--r--src/Utils.cpp118
-rw-r--r--src/Utils.h62
29 files changed, 1463 insertions, 1136 deletions
diff --git a/src/Buffer.cpp b/src/Buffer.cpp
index aa0ef4c..fa7f52f 100644
--- a/src/Buffer.cpp
+++ b/src/Buffer.cpp
@@ -47,6 +47,7 @@ Buffer::Buffer(size_t len, const void *data)
Buffer::~Buffer()
{
+ PDEBUG("Buffer::~Buffer() len=%zu, data=%p\n", len, data);
free(data);
}
diff --git a/src/CicEqualizer.cpp b/src/CicEqualizer.cpp
index d8eb2ee..a9c0dd6 100644
--- a/src/CicEqualizer.cpp
+++ b/src/CicEqualizer.cpp
@@ -46,11 +46,12 @@ CicEqualizer::CicEqualizer(size_t nbCarriers, size_t spacing, int R) :
float angle = pi * k / spacing;
if (k == 0) {
myFilter[i] = 1.0f;
- } else {
- myFilter[i] = sinf(angle / R) / sinf(angle * M);
- myFilter[i] = fabsf(myFilter[i]) * R * M;
- myFilter[i] = powf(myFilter[i], N);
- }
+ }
+ else {
+ myFilter[i] = sinf(angle / R) / sinf(angle * M);
+ myFilter[i] = fabsf(myFilter[i]) * R * M;
+ myFilter[i] = powf(myFilter[i], N);
+ }
PDEBUG("HCic[%zu -> %i] = %f (%f dB) -> angle: %f\n",
i, k,myFilter[i], 20.0 * log10(myFilter[i]), angle);
}
@@ -93,3 +94,4 @@ int CicEqualizer::process(Buffer* const dataIn, Buffer* dataOut)
return sizeOut;
}
+
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 214231c..1fc7e3c 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2014
+ Copyright (C) 2014, 2015
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -30,7 +30,7 @@
#endif
#include "porting.h"
-
+#include "Utils.h"
#include "Log.h"
#include "DabModulator.h"
#include "InputMemory.h"
@@ -46,6 +46,8 @@
#include "FIRFilter.h"
#include "RemoteControl.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/make_shared.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/ini_parser.hpp>
#include <complex>
@@ -68,9 +70,12 @@
# define memalign(a, b) malloc(b)
#endif
+#define ZMQ_INPUT_MAX_FRAME_QUEUE 500
+
typedef std::complex<float> complexf;
+using namespace boost;
volatile sig_atomic_t running = 1;
@@ -81,113 +86,51 @@ void signalHandler(int signalNb)
running = 0;
}
-
-void printUsage(char* progName, FILE* out = stderr)
+struct modulator_data
{
- fprintf(out, "Welcome to %s %s, compiled at %s, %s\n\n",
- PACKAGE,
-#if defined(GITVERSION)
- GITVERSION,
-#else
- VERSION,
-#endif
- __DATE__, __TIME__);
- fprintf(out, "Usage with configuration file:\n");
- fprintf(out, "\t%s [-C] config_file.ini\n\n", progName);
-
- fprintf(out, "Usage with command line options:\n");
- fprintf(out, "\t%s"
- " input"
- " (-f filename | -u uhddevice -F frequency) "
- " [-G txgain]"
- " [-o offset]"
- " [-O offsetfile]"
- " [-T filter_taps_file]"
- " [-a gain]"
- " [-c clockrate]"
- " [-g gainMode]"
- " [-h]"
- " [-l]"
- " [-m dabMode]"
- " [-r samplingRate]"
- "\n", progName);
- fprintf(out, "Where:\n");
- fprintf(out, "input: ETI input filename (default: stdin).\n");
- fprintf(out, "-f name: Use file output with given filename. (use /dev/stdout for standard output)\n");
- fprintf(out, "-u device: Use UHD output with given device string. (use "" for default device)\n");
- fprintf(out, "-F frequency: Set the transmit frequency when using UHD output. (mandatory option when using UHD)\n");
- fprintf(out, "-G txgain: Set the transmit gain for the UHD driver (default: 0)\n");
- fprintf(out, "-o: (UHD only) Set the timestamp offset added to the timestamp in the ETI. The offset is a double.\n");
- fprintf(out, "-O: (UHD only) Set the file containing the timestamp offset added to the timestamp in the ETI.\n"
- "The file is read every six seconds, and must contain a double value.\n");
- fprintf(out, " Specifying either -o or -O has two implications: It enables synchronous transmission,\n"
- " requiring an external REFCLK and PPS signal and frames that do not contain a valid timestamp\n"
- " get muted.\n\n");
- fprintf(out, "-T taps_file: Enable filtering before the output, using the specified file containing the filter taps.\n");
- fprintf(out, "-a gain: Apply digital amplitude gain.\n");
- fprintf(out, "-c rate: Set the DAC clock rate and enable Cic Equalisation.\n");
- fprintf(out, "-g: Set computation gain mode: "
- "%u FIX, %u MAX, %u VAR\n", GAIN_FIX, GAIN_MAX, GAIN_VAR);
- fprintf(out, "-h: Print this help.\n");
- fprintf(out, "-l: Loop file when reach end of file.\n");
- fprintf(out, "-m mode: Set DAB mode: (0: auto, 1-4: force).\n");
- fprintf(out, "-r rate: Set output sampling rate (default: 2048000).\n");
-}
+ modulator_data() :
+ inputReader(NULL),
+ framecount(0),
+ flowgraph(NULL),
+ rcs(NULL) {}
+ InputReader* inputReader;
+ Buffer data;
+ uint64_t framecount;
-void printVersion(FILE *out = stderr)
-{
- fprintf(out, "Welcome to %s %s, compiled at %s, %s\n\n",
- PACKAGE, VERSION, __DATE__, __TIME__);
- fprintf(out,
- " ODR-DabMod is copyright (C) Her Majesty the Queen in Right of Canada,\n"
- " 2009, 2010, 2011, 2012 Communications Research Centre (CRC),\n"
- " and\n"
- " Copyright (C) 2014 Matthias P. Braendli, matthias.braendli@mpb.li\n"
- "\n"
- " http://opendigitalradio.org\n"
- "\n"
- " This program is available free of charge and is licensed to you on a\n"
- " non-exclusive basis; you may not redistribute it.\n"
- "\n"
- " This program is provided \"AS IS\" in the hope that it will be useful, but\n"
- " WITHOUT ANY WARRANTY with respect to its accurancy or usefulness; witout\n"
- " even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR\n"
- " PURPOSE and NONINFRINGEMENT.\n"
- "\n"
- " In no event shall CRC be LIABLE for any LOSS, DAMAGE or COST that may be\n"
- " incurred in connection with the use of this software.\n"
- "\n"
-#if USE_KISS_FFT
- "ODR-DabMod makes use of the following open source packages:\n"
- " Kiss FFT v1.2.9 (Revised BSD) - http://kissfft.sourceforge.net/\n"
-#endif
- );
+ Flowgraph* flowgraph;
+ RemoteControllers* rcs;
+};
-}
+enum run_modulator_state {
+ MOD_FAILURE,
+ MOD_NORMAL_END,
+ MOD_AGAIN
+};
+run_modulator_state run_modulator(modulator_data& m);
-int main(int argc, char* argv[])
+int launch_modulator(int argc, char* argv[])
{
int ret = 0;
bool loop = false;
std::string inputName = "";
std::string inputTransport = "file";
+ unsigned inputMaxFramesQueued = ZMQ_INPUT_MAX_FRAME_QUEUE;
std::string outputName;
int useZeroMQOutput = 0;
+ std::string zmqOutputSocketType = "";
int useFileOutput = 0;
std::string fileOutputFormat = "complexf";
int useUHDOutput = 0;
- uint64_t frame = 0;
size_t outputRate = 2048000;
size_t clockRate = 0;
unsigned dabMode = 0;
float digitalgain = 1.0f;
float normalise = 1.0f;
GainMode gainMode = GAIN_VAR;
- Buffer data;
/* UHD requires the input I and Q samples to be in the interval
@@ -211,26 +154,25 @@ int main(int argc, char* argv[])
OutputUHDConfig outputuhd_conf;
#endif
+ modulator_data m;
+
// To handle the timestamp offset of the modulator
- struct modulator_offset_config modconf;
- modconf.use_offset_file = false;
- modconf.use_offset_fixed = false;
- modconf.delay_calculation_pipeline_stages = 0;
+ unsigned tist_delay_stages = 0;
+ double tist_offset_s = 0.0;
- Flowgraph* flowgraph = NULL;
- DabModulator* modulator = NULL;
- InputMemory* input = NULL;
- FormatConverter* format_converter = NULL;
- ModOutput* output = NULL;
+ shared_ptr<Flowgraph> flowgraph(new Flowgraph());
+ shared_ptr<FormatConverter> format_converter;
+ shared_ptr<ModOutput> output;
RemoteControllers rcs;
+ m.rcs = &rcs;
- Logger logger;
- InputFileReader inputFileReader(logger);
+ bool run_again = true;
+
+ InputFileReader inputFileReader;
#if defined(HAVE_ZEROMQ)
- InputZeroMQReader inputZeroMQReader(logger);
+ shared_ptr<InputZeroMQReader> inputZeroMQReader(new InputZeroMQReader());
#endif
- InputReader* inputReader;
struct sigaction sa;
memset(&sa, 0, sizeof(struct sigaction));
@@ -271,7 +213,7 @@ int main(int argc, char* argv[])
#if defined(HAVE_OUTPUT_UHD)
if (useUHDOutput) {
fprintf(stderr, "Options -u and -f are mutually exclusive\n");
- goto END_MAIN;
+ throw std::invalid_argument("Invalid command line options");
}
#endif
outputName = optarg;
@@ -294,25 +236,7 @@ int main(int argc, char* argv[])
loop = true;
break;
case 'o':
- if (modconf.use_offset_file)
- {
- fprintf(stderr, "Options -o and -O are mutually exclusive\n");
- goto END_MAIN;
- }
- modconf.use_offset_fixed = true;
- modconf.offset_fixed = strtod(optarg, NULL);
-#if defined(HAVE_OUTPUT_UHD)
- outputuhd_conf.enableSync = true;
-#endif
- break;
- case 'O':
- if (modconf.use_offset_fixed)
- {
- fprintf(stderr, "Options -o and -O are mutually exclusive\n");
- goto END_MAIN;
- }
- modconf.use_offset_file = true;
- modconf.offset_filename = std::string(optarg);
+ tist_offset_s = strtod(optarg, NULL);
#if defined(HAVE_OUTPUT_UHD)
outputuhd_conf.enableSync = true;
#endif
@@ -330,7 +254,7 @@ int main(int argc, char* argv[])
#if defined(HAVE_OUTPUT_UHD)
if (useFileOutput) {
fprintf(stderr, "Options -u and -f are mutually exclusive\n");
- goto END_MAIN;
+ throw std::invalid_argument("Invalid command line options");
}
outputuhd_conf.device = optarg;
useUHDOutput = 1;
@@ -338,17 +262,17 @@ int main(int argc, char* argv[])
break;
case 'V':
printVersion();
- goto END_MAIN;
+ throw std::invalid_argument("");
break;
case '?':
case 'h':
printUsage(argv[0]);
- goto END_MAIN;
+ throw std::invalid_argument("");
break;
default:
fprintf(stderr, "Option '%c' not coded yet!\n", c);
ret = -1;
- goto END_MAIN;
+ throw std::invalid_argument("Invalid command line options");
}
}
@@ -389,7 +313,7 @@ int main(int argc, char* argv[])
// No argument given ? You can't be serious ! Show usage.
if (argc == 1) {
printUsage(argv[0]);
- goto END_MAIN;
+ throw std::invalid_argument("Invalid command line options");
}
// If only one argument is given, interpret as configuration file name
@@ -408,8 +332,9 @@ int main(int argc, char* argv[])
}
catch (boost::property_tree::ini_parser::ini_parser_error &e)
{
- fprintf(stderr, "Error, cannot read configuration file '%s'\n", configuration_file.c_str());
- goto END_MAIN;
+ std::cerr << "Error, cannot read configuration file '" << configuration_file.c_str() << "'" << std::endl;
+ std::cerr << " " << e.what() << std::endl;
+ throw std::runtime_error("Cannot read configuration file");
}
// remote controller:
@@ -422,7 +347,7 @@ int main(int argc, char* argv[])
catch (std::exception &e) {
std::cerr << "Error: " << e.what() << "\n";
std::cerr << " telnet remote control enabled, but no telnetport defined.\n";
- goto END_MAIN;
+ throw std::runtime_error("Configuration error");
}
}
@@ -437,7 +362,7 @@ int main(int argc, char* argv[])
catch (std::exception &e) {
std::cerr << "Error: " << e.what() << "\n";
std::cerr << " zmq remote control enabled, but no endpoint defined.\n";
- goto END_MAIN;
+ throw std::runtime_error("Configuration error");
}
}
#endif
@@ -448,12 +373,15 @@ int main(int argc, char* argv[])
}
inputTransport = pt.get("input.transport", "file");
+ inputMaxFramesQueued = pt.get("input.max_frames_queued",
+ ZMQ_INPUT_MAX_FRAME_QUEUE);
+
inputName = pt.get("input.source", "/dev/stdin");
// log parameters:
if (pt.get("log.syslog", 0) == 1) {
LogToSyslog* log_syslog = new LogToSyslog();
- logger.register_backend(log_syslog);
+ etiLog.register_backend(log_syslog);
}
if (pt.get("log.filelog", 0) == 1) {
@@ -464,11 +392,11 @@ int main(int argc, char* argv[])
catch (std::exception &e) {
std::cerr << "Error: " << e.what() << "\n";
std::cerr << " Configuration enables file log, but does not specify log filename\n";
- goto END_MAIN;
+ throw std::runtime_error("Configuration error");
}
LogToFile* log_file = new LogToFile(logfilename);
- logger.register_backend(log_file);
+ etiLog.register_backend(log_file);
}
@@ -487,7 +415,7 @@ int main(int argc, char* argv[])
catch (std::exception &e) {
std::cerr << "Error: " << e.what() << "\n";
std::cerr << " Configuration enables firfilter, but does not specify filter taps file\n";
- goto END_MAIN;
+ throw std::runtime_error("Configuration error");
}
}
@@ -499,7 +427,7 @@ int main(int argc, char* argv[])
catch (std::exception &e) {
std::cerr << "Error: " << e.what() << "\n";
std::cerr << " Configuration does not specify output\n";
- goto END_MAIN;
+ throw std::runtime_error("Configuration error");
}
if (output_selected == "file") {
@@ -509,7 +437,7 @@ int main(int argc, char* argv[])
catch (std::exception &e) {
std::cerr << "Error: " << e.what() << "\n";
std::cerr << " Configuration does not specify file name for file output\n";
- goto END_MAIN;
+ throw std::runtime_error("Configuration error");
}
useFileOutput = 1;
@@ -535,11 +463,11 @@ int main(int argc, char* argv[])
outputuhd_conf.txgain = pt.get("uhdoutput.txgain", 0.0);
outputuhd_conf.frequency = pt.get<double>("uhdoutput.frequency", 0);
std::string chan = pt.get<std::string>("uhdoutput.channel", "");
- outputuhd_conf.dabMode = dabMode;
+ outputuhd_conf.dabMode = dabMode;
if (outputuhd_conf.frequency == 0 && chan == "") {
std::cerr << " UHD output enabled, but neither frequency nor channel defined.\n";
- goto END_MAIN;
+ throw std::runtime_error("Configuration error");
}
else if (outputuhd_conf.frequency == 0) {
double freq;
@@ -583,13 +511,13 @@ int main(int argc, char* argv[])
else if (chan == "13F") freq = 239200000;
else {
std::cerr << " UHD output: channel " << chan << " does not exist in table\n";
- goto END_MAIN;
+ throw std::out_of_range("UHD channel selection error");
}
outputuhd_conf.frequency = freq;
}
else if (outputuhd_conf.frequency != 0 && chan != "") {
std::cerr << " UHD output: cannot define both frequency and channel.\n";
- goto END_MAIN;
+ throw std::runtime_error("Configuration error");
}
@@ -607,44 +535,44 @@ int main(int argc, char* argv[])
}
else {
std::cerr << "Error: UHD output: behaviour_refclk_lock_lost invalid." << std::endl;
- goto END_MAIN;
+ throw std::runtime_error("Configuration error");
}
+ outputuhd_conf.maxGPSHoldoverTime = pt.get("uhdoutput.max_gps_holdover_time", 0);
+
useUHDOutput = 1;
}
#endif
#if defined(HAVE_ZEROMQ)
else if (output_selected == "zmq") {
outputName = pt.get<std::string>("zmqoutput.listen");
+ zmqOutputSocketType = pt.get<std::string>("zmqoutput.socket_type");
useZeroMQOutput = 1;
}
#endif
else {
std::cerr << "Error: Invalid output defined.\n";
- goto END_MAIN;
+ throw std::runtime_error("Configuration error");
}
#if defined(HAVE_OUTPUT_UHD)
outputuhd_conf.enableSync = (pt.get("delaymanagement.synchronous", 0) == 1);
if (outputuhd_conf.enableSync) {
+ std::string delay_mgmt = pt.get<std::string>("delaymanagement.management", "");
+ std::string fixedoffset = pt.get<std::string>("delaymanagement.fixedoffset", "");
+ std::string offset_filename = pt.get<std::string>("delaymanagement.dynamicoffsetfile", "");
+
+ if (not(delay_mgmt.empty() and fixedoffset.empty() and offset_filename.empty())) {
+ std::cerr << "Warning: you are using the old config syntax for the offset management.\n";
+ std::cerr << " Please see the example.ini configuration for the new settings.\n";
+ }
+
try {
- std::string delay_mgmt = pt.get<std::string>("delaymanagement.management");
- if (delay_mgmt == "fixed") {
- modconf.offset_fixed = pt.get<double>("delaymanagement.fixedoffset");
- modconf.use_offset_fixed = true;
- }
- else if (delay_mgmt == "dynamic") {
- modconf.offset_filename = pt.get<std::string>("delaymanagement.dynamicoffsetfile");
- modconf.use_offset_file = true;
- }
- else {
- throw std::runtime_error("invalid management value");
- }
+ tist_offset_s = pt.get<double>("delaymanagement.offset");
}
catch (std::exception &e) {
- std::cerr << "Error: " << e.what() << "\n";
- std::cerr << " Synchronised transmission enabled, but delay management specification is incomplete.\n";
- goto END_MAIN;
+ std::cerr << "Error: delaymanagement: synchronous is enabled, but no offset defined!\n";
+ throw std::runtime_error("Configuration error");
}
}
@@ -653,23 +581,17 @@ int main(int argc, char* argv[])
}
if (rcs.get_no_controllers() == 0) {
- logger.level(warn) << "No Remote-Control started";
+ etiLog.level(warn) << "No Remote-Control started";
rcs.add_controller(new RemoteControllerDummy());
}
- logger.level(info) << "Starting up";
-
- if (!(modconf.use_offset_file || modconf.use_offset_fixed)) {
- logger.level(debug) << "No Modulator offset defined, setting to 0";
- modconf.use_offset_fixed = true;
- modconf.offset_fixed = 0;
- }
+ etiLog.level(info) << "Starting up";
// When using the FIRFilter, increase the modulator offset pipelining delay
// by the correct amount
if (filterTapsFilename != "") {
- modconf.delay_calculation_pipeline_stages += FIRFILTER_PIPELINE_DELAY;
+ tist_delay_stages += FIRFILTER_PIPELINE_DELAY;
}
// Setting ETI input filename
@@ -697,14 +619,14 @@ int main(int argc, char* argv[])
fprintf(stderr, "\n");
printUsage(argv[0]);
ret = -1;
- logger.level(error) << "Received invalid command line arguments";
- goto END_MAIN;
+ etiLog.level(error) << "Received invalid command line arguments";
+ throw std::invalid_argument("Invalid command line options");
}
if (!useFileOutput && !useUHDOutput && !useZeroMQOutput) {
- logger.level(error) << "Output not specified";
+ etiLog.level(error) << "Output not specified";
fprintf(stderr, "Must specify output !");
- goto END_MAIN;
+ throw std::runtime_error("Configuration error");
}
// Print settings
@@ -721,16 +643,22 @@ int main(int argc, char* argv[])
fprintf(stderr, " UHD\n"
" Device: %s\n"
" Type: %s\n"
- " master_clock_rate: %ld\n",
+ " master_clock_rate: %ld\n"
+ " refclk: %s\n"
+ " pps source: %s\n",
outputuhd_conf.device.c_str(),
outputuhd_conf.usrpType.c_str(),
- outputuhd_conf.masterClockRate);
+ outputuhd_conf.masterClockRate,
+ outputuhd_conf.refclk_src.c_str(),
+ outputuhd_conf.pps_src.c_str());
}
#endif
else if (useZeroMQOutput) {
fprintf(stderr, " ZeroMQ\n"
- " Listening on: %s\n",
- outputName.c_str());
+ " Listening on: %s\n"
+ " Socket type : %s\n",
+ outputName.c_str(),
+ zmqOutputSocketType.c_str());
}
fprintf(stderr, " Sampling rate: ");
@@ -748,96 +676,147 @@ int main(int argc, char* argv[])
// Opening ETI input file
if (inputFileReader.Open(inputName, loop) == -1) {
fprintf(stderr, "Unable to open input file!\n");
- logger.level(error) << "Unable to open input file!";
+ etiLog.level(error) << "Unable to open input file!";
ret = -1;
- goto END_MAIN;
+ throw std::runtime_error("Unable to open input");
}
- inputReader = &inputFileReader;
+ m.inputReader = &inputFileReader;
}
else if (inputTransport == "zeromq") {
#if !defined(HAVE_ZEROMQ)
fprintf(stderr, "Error, ZeroMQ input transport selected, but not compiled in!\n");
ret = -1;
- goto END_MAIN;
+ throw std::runtime_error("Unable to open input");
#else
- // The URL might start with zmq+tcp://
- if (inputName.substr(0, 4) == "zmq+") {
- inputZeroMQReader.Open(inputName.substr(4));
- }
- else {
- inputZeroMQReader.Open(inputName);
- }
- inputReader = &inputZeroMQReader;
+ inputZeroMQReader->Open(inputName, inputMaxFramesQueued);
+ m.inputReader = inputZeroMQReader.get();
#endif
}
else
{
fprintf(stderr, "Error, invalid input transport %s selected!\n", inputTransport.c_str());
ret = -1;
- goto END_MAIN;
+ throw std::runtime_error("Unable to open input");
}
if (useFileOutput) {
if (fileOutputFormat == "complexf") {
- output = new OutputFile(outputName);
+ output = make_shared<OutputFile>(outputName);
}
else if (fileOutputFormat == "s8") {
// We must normalise the samples to the interval [-127.0; 127.0]
normalise = 127.0f / normalise_factor;
- format_converter = new FormatConverter();
+ format_converter = make_shared<FormatConverter>();
- output = new OutputFile(outputName);
+ output = make_shared<OutputFile>(outputName);
}
}
#if defined(HAVE_OUTPUT_UHD)
else if (useUHDOutput) {
-
normalise = 1.0f / normalise_factor;
-
outputuhd_conf.sampleRate = outputRate;
- try {
- output = new OutputUHD(outputuhd_conf, logger);
- ((OutputUHD*)output)->enrol_at(rcs);
- }
- catch (std::exception& e) {
- logger.level(error) << "UHD initialisation failed:" << e.what();
- goto END_MAIN;
- }
+ output = make_shared<OutputUHD>(outputuhd_conf);
+ ((OutputUHD*)output.get())->enrol_at(rcs);
}
#endif
#if defined(HAVE_ZEROMQ)
else if (useZeroMQOutput) {
/* We normalise the same way as for the UHD output */
normalise = 1.0f / normalise_factor;
-
- output = new OutputZeroMQ(outputName);
+ if (zmqOutputSocketType == "pub") {
+ output = make_shared<OutputZeroMQ>(outputName, ZMQ_PUB);
+ }
+ else if (zmqOutputSocketType == "rep") {
+ output = make_shared<OutputZeroMQ>(outputName, ZMQ_REP);
+ }
+ else {
+ std::stringstream ss;
+ ss << "ZeroMQ output socket type " << zmqOutputSocketType << " invalid";
+ throw std::invalid_argument(ss.str());
+ }
}
#endif
- flowgraph = new Flowgraph();
- data.setLength(6144);
- input = new InputMemory(&data);
- modulator = new DabModulator(modconf, &rcs, logger, outputRate, clockRate,
- dabMode, gainMode, digitalgain, normalise, filterTapsFilename);
- flowgraph->connect(input, modulator);
- if (format_converter) {
- flowgraph->connect(modulator, format_converter);
- flowgraph->connect(format_converter, output);
- }
- else {
- flowgraph->connect(modulator, output);
- }
+
+ while (run_again) {
+ Flowgraph flowgraph;
+
+ m.flowgraph = &flowgraph;
+ m.data.setLength(6144);
+
+ shared_ptr<InputMemory> input(new InputMemory(&m.data));
+ shared_ptr<DabModulator> modulator(
+ new DabModulator(tist_offset_s, tist_delay_stages, &rcs,
+ outputRate, clockRate, dabMode, gainMode, digitalgain,
+ normalise, filterTapsFilename));
+
+ flowgraph.connect(input, modulator);
+ if (format_converter) {
+ flowgraph.connect(modulator, format_converter);
+ flowgraph.connect(format_converter, output);
+ }
+ else {
+ flowgraph.connect(modulator, output);
+ }
#if defined(HAVE_OUTPUT_UHD)
- if (useUHDOutput) {
- ((OutputUHD*)output)->setETIReader(modulator->getEtiReader());
- }
+ if (useUHDOutput) {
+ ((OutputUHD*)output.get())->setETIReader(modulator->getEtiReader());
+ }
+#endif
+
+ m.inputReader->PrintInfo();
+
+ run_modulator_state st = run_modulator(m);
+
+ switch (st) {
+ case MOD_FAILURE:
+ etiLog.level(error) << "Modulator failure.";
+ run_again = false;
+ ret = 1;
+ break;
+#if defined(HAVE_ZEROMQ)
+ case MOD_AGAIN:
+ etiLog.level(warn) << "Restart modulator.";
+ running = true;
+ if (inputTransport == "zeromq") {
+ run_again = true;
+
+ // Create a new input reader
+ inputZeroMQReader = make_shared<InputZeroMQReader>();
+ inputZeroMQReader->Open(inputName, inputMaxFramesQueued);
+ m.inputReader = inputZeroMQReader.get();
+ }
+ break;
#endif
+ case MOD_NORMAL_END:
+ default:
+ etiLog.level(info) << "modulator stopped.";
+ ret = 0;
+ run_again = false;
+ break;
+ }
- inputReader->PrintInfo();
+ fprintf(stderr, "\n\n");
+ etiLog.level(info) << m.framecount << " DAB frames encoded";
+ etiLog.level(info) << ((float)m.framecount * 0.024f) << " seconds encoded";
+
+ m.data.setLength(0);
+ }
+
+ ////////////////////////////////////////////////////////////////////////
+ // Cleaning things
+ ////////////////////////////////////////////////////////////////////////
+ etiLog.level(info) << "Terminating";
+ return ret;
+}
+
+run_modulator_state run_modulator(modulator_data& m)
+{
+ run_modulator_state ret = MOD_FAILURE;
try {
while (running) {
@@ -846,57 +825,68 @@ int main(int argc, char* argv[])
PDEBUG("*****************************************\n");
PDEBUG("* Starting main loop\n");
PDEBUG("*****************************************\n");
- while ((framesize = inputReader->GetNextFrame(data.getData())) > 0) {
+ while ((framesize = m.inputReader->GetNextFrame(m.data.getData())) > 0) {
if (!running) {
break;
}
- frame++;
+ m.framecount++;
PDEBUG("*****************************************\n");
- PDEBUG("* Read frame %lu\n", frame);
+ PDEBUG("* Read frame %lu\n", m.framecount);
PDEBUG("*****************************************\n");
////////////////////////////////////////////////////////////////
- // Proccessing data
+ // Processing data
////////////////////////////////////////////////////////////////
- flowgraph->run();
+ m.flowgraph->run();
/* Check every once in a while if the remote control
* is still working */
- if (rcs.get_no_controllers() > 0 && (frame % 250) == 0) {
- rcs.check_faults();
+ if (m.rcs->get_no_controllers() > 0 && (m.framecount % 250) == 0) {
+ m.rcs->check_faults();
}
}
if (framesize == 0) {
- fprintf(stderr, "End of file reached.\n");
+ etiLog.level(info) << "End of file reached.";
}
else {
- fprintf(stderr, "Input read error.\n");
+ etiLog.level(error) << "Input read error.";
}
running = 0;
+ ret = MOD_NORMAL_END;
}
+#if defined(HAVE_OUTPUT_UHD)
+ } catch (fct_discontinuity_error& e) {
+ // The OutputUHD saw a FCT discontinuity
+ etiLog.level(warn) << e.what();
+ ret = MOD_AGAIN;
+#endif
+ } catch (zmq_input_overflow& e) {
+ // The ZeroMQ input has overflowed its buffer
+ etiLog.level(warn) << e.what();
+ ret = MOD_AGAIN;
} catch (std::exception& e) {
- fprintf(stderr, "EXCEPTION: %s\n", e.what());
- ret = -1;
+ etiLog.level(error) << "Exception caught: " << e.what();
+ ret = MOD_FAILURE;
}
-END_MAIN:
- ////////////////////////////////////////////////////////////////////////
- // Cleaning things
- ////////////////////////////////////////////////////////////////////////
- fprintf(stderr, "\n\n");
- fprintf(stderr, "%lu DAB frames encoded\n", frame);
- fprintf(stderr, "%f seconds encoded\n", (float)frame * 0.024f);
-
- fprintf(stderr, "\nCleaning flowgraph...\n");
- delete flowgraph;
-
- // Cif
- fprintf(stderr, "\nCleaning buffers...\n");
-
- logger.level(info) << "Terminating";
-
return ret;
}
+int main(int argc, char* argv[])
+{
+ try {
+ return launch_modulator(argc, argv);
+ }
+ catch (std::invalid_argument& e) {
+ std::string what(e.what());
+ if (not what.empty()) {
+ std::cerr << "Modulator error: " << what << std::endl;
+ }
+ }
+ catch (std::runtime_error& e) {
+ std::cerr << "Modulator runtime error: " << e.what() << std::endl;
+ }
+}
+
diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp
index 2664a08..35ef7cb 100644
--- a/src/DabModulator.cpp
+++ b/src/DabModulator.cpp
@@ -3,8 +3,10 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Includes modifications for which no copyright is claimed
- 2012, Matthias P. Braendli, matthias.braendli@mpb.li
+ Copyright (C) 2015
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
*/
/*
This file is part of ODR-DabMod.
@@ -50,25 +52,24 @@
#include "RemoteControl.h"
#include "Log.h"
+using namespace boost;
DabModulator::DabModulator(
- struct modulator_offset_config& modconf,
+ double tist_offset_s, unsigned tist_delay_stages,
RemoteControllers* rcs,
- Logger& logger,
unsigned outputRate, unsigned clockRate,
unsigned dabMode, GainMode gainMode,
float digGain, float normalise,
std::string filterTapsFilename
) :
ModCodec(ModFormat(1), ModFormat(0)),
- myLogger(logger),
myOutputRate(outputRate),
myClockRate(clockRate),
myDabMode(dabMode),
myGainMode(gainMode),
myDigGain(digGain),
myNormalise(normalise),
- myEtiReader(EtiReader(modconf, myLogger)),
+ myEtiReader(EtiReader(tist_offset_s, tist_delay_stages, rcs)),
myFlowgraph(NULL),
myFilterTapsFilename(filterTapsFilename),
myRCs(rcs)
@@ -155,62 +156,65 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)
////////////////////////////////////////////////////////////////
// CIF data initialisation
////////////////////////////////////////////////////////////////
- FrameMultiplexer* cifMux = NULL;
- PrbsGenerator* cifPrbs = NULL;
- BlockPartitioner* cifPart = NULL;
- QpskSymbolMapper* cifMap = NULL;
- FrequencyInterleaver* cifFreq = NULL;
- PhaseReference* cifRef = NULL;
- DifferentialModulator* cifDiff = NULL;
- NullSymbol* cifNull = NULL;
- SignalMultiplexer* cifSig = NULL;
- CicEqualizer* cifCicEq = NULL;
- OfdmGenerator* cifOfdm = NULL;
- GainControl* cifGain = NULL;
- GuardIntervalInserter* cifGuard = NULL;
- FIRFilter* cifFilter = NULL;
- Resampler* cifRes = NULL;
-
- cifPrbs = new PrbsGenerator(864 * 8, 0x110);
- cifMux = new FrameMultiplexer(myFicSizeOut + 864 * 8,
- &myEtiReader.getSubchannels());
- cifPart = new BlockPartitioner(mode, myEtiReader.getFp());
- cifMap = new QpskSymbolMapper(myNbCarriers);
- cifRef = new PhaseReference(mode);
- cifFreq = new FrequencyInterleaver(mode);
- cifDiff = new DifferentialModulator(myNbCarriers);
- cifNull = new NullSymbol(myNbCarriers);
- cifSig = new SignalMultiplexer(
- (1 + myNbSymbols) * myNbCarriers * sizeof(complexf));
-
+ shared_ptr<PrbsGenerator> cifPrbs(new PrbsGenerator(864 * 8, 0x110));
+ shared_ptr<FrameMultiplexer> cifMux(
+ new FrameMultiplexer(myFicSizeOut + 864 * 8,
+ &myEtiReader.getSubchannels()));
+
+ shared_ptr<BlockPartitioner> cifPart(
+ new BlockPartitioner(mode, myEtiReader.getFp()));
+
+ shared_ptr<QpskSymbolMapper> cifMap(new QpskSymbolMapper(myNbCarriers));
+ shared_ptr<PhaseReference> cifRef(new PhaseReference(mode));
+ shared_ptr<FrequencyInterleaver> cifFreq(new FrequencyInterleaver(mode));
+ shared_ptr<DifferentialModulator> cifDiff(
+ new DifferentialModulator(myNbCarriers));
+
+ shared_ptr<NullSymbol> cifNull(new NullSymbol(myNbCarriers));
+ shared_ptr<SignalMultiplexer> cifSig(new SignalMultiplexer(
+ (1 + myNbSymbols) * myNbCarriers * sizeof(complexf)));
+
+ // TODO this needs a review
+ bool useCicEq = false;
+ unsigned cic_ratio = 1;
if (myClockRate) {
- unsigned ratio = myClockRate / myOutputRate;
- ratio /= 4; // FPGA DUC
+ cic_ratio = myClockRate / myOutputRate;
+ cic_ratio /= 4; // FPGA DUC
if (myClockRate == 400000000) { // USRP2
- if (ratio & 1) { // odd
- cifCicEq = new CicEqualizer(myNbCarriers,
- (float)mySpacing * (float)myOutputRate / 2048000.0f,
- ratio);
+ if (cic_ratio & 1) { // odd
+ useCicEq = true;
} // even, no filter
- } else {
- cifCicEq = new CicEqualizer(myNbCarriers,
- (float)mySpacing * (float)myOutputRate / 2048000.0f,
- ratio);
+ }
+ else {
+ useCicEq = true;
}
}
- cifOfdm = new OfdmGenerator((1 + myNbSymbols), myNbCarriers, mySpacing);
- cifGain = new GainControl(mySpacing, myGainMode, myDigGain, myNormalise);
+ shared_ptr<CicEqualizer> cifCicEq(new CicEqualizer(myNbCarriers,
+ (float)mySpacing * (float)myOutputRate / 2048000.0f,
+ cic_ratio));
+
+
+ shared_ptr<OfdmGenerator> cifOfdm(
+ new OfdmGenerator((1 + myNbSymbols), myNbCarriers, mySpacing));
+
+ shared_ptr<GainControl> cifGain(
+ new GainControl(mySpacing, myGainMode, myDigGain, myNormalise));
+
cifGain->enrol_at(*myRCs);
- cifGuard = new GuardIntervalInserter(myNbSymbols, mySpacing,
- myNullSize, mySymSize);
+ shared_ptr<GuardIntervalInserter> cifGuard(
+ new GuardIntervalInserter(myNbSymbols, mySpacing,
+ myNullSize, mySymSize));
+
+ FIRFilter* cifFilter = NULL;
if (myFilterTapsFilename != "") {
cifFilter = new FIRFilter(myFilterTapsFilename);
cifFilter->enrol_at(*myRCs);
}
- myOutput = new OutputMemory();
+ shared_ptr<OutputMemory> myOutput(new OutputMemory(dataOut));
+ Resampler* cifRes = NULL;
if (myOutputRate != 2048000) {
cifRes = new Resampler(2048000, myOutputRate, mySpacing);
} else {
@@ -222,10 +226,7 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)
////////////////////////////////////////////////////////////////
// Processing FIC
////////////////////////////////////////////////////////////////
- FicSource* fic = myEtiReader.getFic();
- PrbsGenerator* ficPrbs = NULL;
- ConvEncoder* ficConv = NULL;
- PuncturingEncoder* ficPunc = NULL;
+ shared_ptr<FicSource> fic(myEtiReader.getFic());
////////////////////////////////////////////////////////////////
// Data initialisation
////////////////////////////////////////////////////////////////
@@ -241,13 +242,13 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)
PDEBUG(" Framesize: %zu\n", fic->getFramesize());
// Configuring prbs generator
- ficPrbs = new PrbsGenerator(myFicSizeIn, 0x110);
+ shared_ptr<PrbsGenerator> ficPrbs(new PrbsGenerator(myFicSizeIn, 0x110));
// Configuring convolutionnal encoder
- ficConv = new ConvEncoder(myFicSizeIn);
+ shared_ptr<ConvEncoder> ficConv(new ConvEncoder(myFicSizeIn));
// Configuring puncturing encoder
- ficPunc = new PuncturingEncoder();
+ shared_ptr<PuncturingEncoder> ficPunc(new PuncturingEncoder());
std::vector<PuncturingRule*> rules = fic->get_rules();
std::vector<PuncturingRule*>::const_iterator rule;
for (rule = rules.begin(); rule != rules.end(); ++rule) {
@@ -267,16 +268,12 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)
////////////////////////////////////////////////////////////////
// Configuring subchannels
////////////////////////////////////////////////////////////////
- std::vector<SubchannelSource*> subchannels =
+ std::vector<shared_ptr<SubchannelSource> > subchannels =
myEtiReader.getSubchannels();
- std::vector<SubchannelSource*>::const_iterator subchannel;
+ std::vector<shared_ptr<SubchannelSource> >::const_iterator subchannel;
for (subchannel = subchannels.begin();
subchannel != subchannels.end();
++subchannel) {
- PrbsGenerator* subchPrbs = NULL;
- ConvEncoder* subchConv = NULL;
- PuncturingEncoder* subchPunc = NULL;
- TimeInterleaver* subchInterleaver = NULL;
////////////////////////////////////////////////////////////
// Data initialisation
@@ -307,13 +304,17 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)
(*subchannel)->protectionOption());
// Configuring prbs genrerator
- subchPrbs = new PrbsGenerator(subchSizeIn, 0x110);
+ shared_ptr<PrbsGenerator> subchPrbs(
+ new PrbsGenerator(subchSizeIn, 0x110));
// Configuring convolutionnal encoder
- subchConv = new ConvEncoder(subchSizeIn);
+ shared_ptr<ConvEncoder> subchConv(
+ new ConvEncoder(subchSizeIn));
// Configuring puncturing encoder
- subchPunc = new PuncturingEncoder();
+ shared_ptr<PuncturingEncoder> subchPunc(
+ new PuncturingEncoder());
+
std::vector<PuncturingRule*> rules = (*subchannel)->get_rules();
std::vector<PuncturingRule*>::const_iterator rule;
for (rule = rules.begin(); rule != rules.end(); ++rule) {
@@ -326,7 +327,8 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)
subchPunc->append_tail_rule(PuncturingRule(3, 0xcccccc));
// Configuring time interleaver
- subchInterleaver = new TimeInterleaver(subchSizeOut);
+ shared_ptr<TimeInterleaver> subchInterleaver(
+ new TimeInterleaver(subchSizeOut));
myFlowgraph->connect(*subchannel, subchPrbs);
myFlowgraph->connect(subchPrbs, subchConv);
@@ -342,7 +344,7 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)
myFlowgraph->connect(cifFreq, cifDiff);
myFlowgraph->connect(cifNull, cifSig);
myFlowgraph->connect(cifDiff, cifSig);
- if (myClockRate) {
+ if (useCicEq) {
myFlowgraph->connect(cifSig, cifCicEq);
myFlowgraph->connect(cifCicEq, cifOfdm);
} else {
@@ -352,18 +354,21 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)
myFlowgraph->connect(cifGain, cifGuard);
if (myFilterTapsFilename != "") {
- myFlowgraph->connect(cifGuard, cifFilter);
+ shared_ptr<FIRFilter> cifFilterptr(cifFilter);
+ myFlowgraph->connect(cifGuard, cifFilterptr);
if (cifRes != NULL) {
- myFlowgraph->connect(cifFilter, cifRes);
- myFlowgraph->connect(cifRes, myOutput);
+ shared_ptr<Resampler> res(cifRes);
+ myFlowgraph->connect(cifFilterptr, res);
+ myFlowgraph->connect(res, myOutput);
} else {
- myFlowgraph->connect(cifFilter, myOutput);
+ myFlowgraph->connect(cifFilterptr, myOutput);
}
}
else { //no filtering
if (cifRes != NULL) {
- myFlowgraph->connect(cifGuard, cifRes);
- myFlowgraph->connect(cifRes, myOutput);
+ shared_ptr<Resampler> res(cifRes);
+ myFlowgraph->connect(cifGuard, res);
+ myFlowgraph->connect(res, myOutput);
} else {
myFlowgraph->connect(cifGuard, myOutput);
}
@@ -374,6 +379,6 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)
////////////////////////////////////////////////////////////////////
// Proccessing data
////////////////////////////////////////////////////////////////////
- myOutput->setOutput(dataOut);
return myFlowgraph->run();
}
+
diff --git a/src/DabModulator.h b/src/DabModulator.h
index 84c9926..1a9e477 100644
--- a/src/DabModulator.h
+++ b/src/DabModulator.h
@@ -3,8 +3,10 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Includes modifications for which no copyright is claimed
- 2012, Matthias P. Braendli, matthias.braendli@mpb.li
+ Copyright (C) 2015
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
*/
/*
This file is part of ODR-DabMod.
@@ -32,6 +34,7 @@
#include <sys/types.h>
#include <string>
+#include <boost/shared_ptr.hpp>
#include "ModCodec.h"
#include "EtiReader.h"
@@ -46,9 +49,8 @@ class DabModulator : public ModCodec
{
public:
DabModulator(
- struct modulator_offset_config& modconf,
+ double tist_offset_s, unsigned tist_delay_stages,
RemoteControllers* rcs,
- Logger& logger,
unsigned outputRate = 2048000, unsigned clockRate = 0,
unsigned dabMode = 0, GainMode gainMode = GAIN_VAR,
float digGain = 1.0, float normalise = 1.0,
@@ -63,8 +65,6 @@ public:
EtiReader* getEtiReader() { return &myEtiReader; }
protected:
- Logger& myLogger;
-
void setMode(unsigned mode);
unsigned myOutputRate;
@@ -88,5 +88,5 @@ protected:
size_t myFicSizeIn;
};
-
#endif // DAB_MODULATOR_H
+
diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp
index fe54f55..f584275 100644
--- a/src/EtiReader.cpp
+++ b/src/EtiReader.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2014
+ Copyright (C) 2014, 2015
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -34,6 +34,7 @@
#include <string.h>
#include <arpa/inet.h>
+using namespace boost;
enum ETI_READER_STATE {
EtiReaderStateNbFrame,
@@ -50,16 +51,20 @@ enum ETI_READER_STATE {
};
-EtiReader::EtiReader(struct modulator_offset_config& modconf,
- Logger& logger) :
- myLogger(logger),
+EtiReader::EtiReader(
+ double tist_offset_s,
+ unsigned tist_delay_stages,
+ RemoteControllers* rcs) :
state(EtiReaderStateSync),
myFicSource(NULL),
- myTimestampDecoder(modconf, myLogger)
+ myTimestampDecoder(tist_offset_s, tist_delay_stages)
{
PDEBUG("EtiReader::EtiReader()\n");
+ myTimestampDecoder.enrol_at(*rcs);
+
myCurrentFrame = 0;
+ eti_fc_valid = false;
}
EtiReader::~EtiReader()
@@ -69,9 +74,6 @@ EtiReader::~EtiReader()
// if (myFicSource != NULL) {
// delete myFicSource;
// }
-// for (unsigned i = 0; i < mySources.size(); ++i) {
-// delete mySources[i];
-// }
}
@@ -83,23 +85,29 @@ FicSource* EtiReader::getFic()
unsigned EtiReader::getMode()
{
+ if (not eti_fc_valid) {
+ throw std::runtime_error("Trying to access Mode before it is ready!");
+ }
return eti_fc.MID;
}
unsigned EtiReader::getFp()
{
+ if (not eti_fc_valid) {
+ throw std::runtime_error("Trying to access FP before it is ready!");
+ }
return eti_fc.FP;
}
-const std::vector<SubchannelSource*>& EtiReader::getSubchannels()
+const std::vector<boost::shared_ptr<SubchannelSource> >& EtiReader::getSubchannels()
{
return mySources;
}
-int EtiReader::process(Buffer* dataIn)
+int EtiReader::process(const Buffer* dataIn)
{
PDEBUG("EtiReader::process(dataIn: %p)\n", dataIn);
PDEBUG(" state: %u\n", state);
@@ -146,6 +154,7 @@ int EtiReader::process(Buffer* dataIn)
return dataIn->getLength() - input_size;
}
memcpy(&eti_fc, in, 4);
+ eti_fc_valid = true;
input_size -= 4;
framesize -= 4;
in += 4;
@@ -171,13 +180,12 @@ int EtiReader::process(Buffer* dataIn)
(memcmp(&eti_stc[0], in, 4 * eti_fc.NST))) {
PDEBUG("New stc!\n");
eti_stc.resize(eti_fc.NST);
- for (unsigned i = 0; i < mySources.size(); ++i) {
- delete mySources[i];
- }
- mySources.resize(eti_fc.NST);
memcpy(&eti_stc[0], in, 4 * eti_fc.NST);
+
+ mySources.clear();
for (unsigned i = 0; i < eti_fc.NST; ++i) {
- mySources[i] = new SubchannelSource(eti_stc[i]);
+ mySources.push_back(shared_ptr<SubchannelSource>(
+ new SubchannelSource(eti_stc[i])));
PDEBUG("Sstc %u:\n", i);
PDEBUG(" Stc%i.scid: %i\n", i, eti_stc[i].SCID);
PDEBUG(" Stc%i.sad: %u\n", i, eti_stc[i].getStartAddress());
@@ -281,11 +289,6 @@ int EtiReader::process(Buffer* dataIn)
myTimestampDecoder.updateTimestampEti(eti_fc.FP & 0x3,
eti_eoh.MNSC, getPPSOffset(), eti_fc.FCT);
- if (eti_fc.FCT % 125 == 0) //every 3 seconds is fine enough
- {
- myTimestampDecoder.updateModulatorOffset();
- }
-
return dataIn->getLength() - input_size;
}
diff --git a/src/EtiReader.h b/src/EtiReader.h
index 209b208..84ad9b4 100644
--- a/src/EtiReader.h
+++ b/src/EtiReader.h
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2014
+ Copyright (C) 2014, 2015
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -41,12 +41,16 @@
#include <vector>
#include <stdint.h>
#include <sys/types.h>
+#include <boost/shared_ptr.hpp>
class EtiReader
{
public:
- EtiReader(struct modulator_offset_config& modconf, Logger& logger);
+ EtiReader(
+ double tist_offset_s,
+ unsigned tist_delay_stages,
+ RemoteControllers* rcs);
virtual ~EtiReader();
EtiReader(const EtiReader&);
EtiReader& operator=(const EtiReader&);
@@ -54,8 +58,8 @@ public:
FicSource* getFic();
unsigned getMode();
unsigned getFp();
- const std::vector<SubchannelSource*>& getSubchannels();
- int process(Buffer* dataIn);
+ const std::vector<boost::shared_ptr<SubchannelSource> >& getSubchannels();
+ int process(const Buffer* dataIn);
void calculateTimestamp(struct frame_timestamp& ts)
{
@@ -66,9 +70,6 @@ public:
bool sourceContainsTimestamp();
protected:
- /* Main program logger */
- Logger& myLogger;
-
/* Transform the ETI TIST to a PPS offset in ms */
double getPPSOffset();
@@ -83,14 +84,14 @@ protected:
eti_EOF eti_eof;
eti_TIST eti_tist;
FicSource* myFicSource;
- std::vector<SubchannelSource*> mySources;
+ std::vector<boost::shared_ptr<SubchannelSource> > mySources;
TimestampDecoder myTimestampDecoder;
-
+
private:
size_t myCurrentFrame;
- bool time_ext_enabled;
- unsigned long timestamp_seconds;
+ bool eti_fc_valid;
};
#endif // ETI_READER_H
+
diff --git a/src/FIRFilter.cpp b/src/FIRFilter.cpp
index 805c6d2..b1ce618 100644
--- a/src/FIRFilter.cpp
+++ b/src/FIRFilter.cpp
@@ -36,6 +36,8 @@
#include <iostream>
#include <fstream>
+#include <boost/make_shared.hpp>
+
#ifdef __AVX__
# include <immintrin.h>
#else
@@ -58,11 +60,10 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd)
// the incoming buffer
while(running) {
- Buffer* dataIn;
+ boost::shared_ptr<Buffer> dataIn;
fwd->input_queue.wait_and_pop(dataIn);
- Buffer* dataOut;
- dataOut = new Buffer();
+ boost::shared_ptr<Buffer> dataOut = boost::make_shared<Buffer>();
dataOut->setLength(dataIn->getLength());
PDEBUG("FIRFilterWorker: dataIn->getLength() %zu\n", dataIn->getLength());
@@ -91,7 +92,7 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd)
fprintf(stderr, "FIRFilterWorker: out not aligned %p ", out);
throw std::runtime_error("FIRFilterWorker: out not aligned");
}
-
+
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &time_start);
__m256 AVXout;
@@ -141,7 +142,7 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd)
fprintf(stderr, "FIRFilterWorker: out not aligned %p ", out);
throw std::runtime_error("FIRFilterWorker: out not aligned");
}
-
+
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &time_start);
__m128 SSEout;
@@ -290,11 +291,10 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd)
}
}
#endif
-
+
calculationTime += (time_end.tv_sec - time_start.tv_sec) * 1000000000L +
time_end.tv_nsec - time_start.tv_nsec;
fwd->output_queue.push(dataOut);
- delete dataIn;
}
}
@@ -393,17 +393,16 @@ int FIRFilter::process(Buffer* const dataIn, Buffer* dataOut)
// This thread creates the dataIn buffer, and deletes
// the outgoing buffer
- Buffer* inbuffer = new Buffer(dataIn->getLength(), dataIn->getData());
+ boost::shared_ptr<Buffer> inbuffer =
+ boost::make_shared<Buffer>(dataIn->getLength(), dataIn->getData());
firwd.input_queue.push(inbuffer);
if (number_of_runs > 2) {
- Buffer* outbuffer;
+ boost::shared_ptr<Buffer> outbuffer;
firwd.output_queue.wait_and_pop(outbuffer);
dataOut->setData(outbuffer->getData(), outbuffer->getLength());
-
- delete outbuffer;
}
else {
dataOut->setLength(dataIn->getLength());
diff --git a/src/FIRFilter.h b/src/FIRFilter.h
index 0ecae3e..751be91 100644
--- a/src/FIRFilter.h
+++ b/src/FIRFilter.h
@@ -30,7 +30,7 @@
#endif
#include <boost/thread.hpp>
-#include "ThreadsafeQueue.h"
+#include <boost/shared_ptr.hpp>
#include "RemoteControl.h"
#include "ModCodec.h"
@@ -52,8 +52,8 @@ struct FIRFilterWorkerData {
/* Thread-safe queues to give data to and get data from
* the worker
*/
- ThreadsafeQueue<Buffer*> input_queue;
- ThreadsafeQueue<Buffer*> output_queue;
+ ThreadsafeQueue<boost::shared_ptr<Buffer> > input_queue;
+ ThreadsafeQueue<boost::shared_ptr<Buffer> > output_queue;
/* Remote-control can change the taps while the filter
* runs. This lock makes sure nothing bad happens when
@@ -127,5 +127,5 @@ protected:
struct FIRFilterWorkerData firwd;
};
-
#endif //FIRFILTER_H
+
diff --git a/src/Flowgraph.cpp b/src/Flowgraph.cpp
index dd9c68b..3844e86 100644
--- a/src/Flowgraph.cpp
+++ b/src/Flowgraph.cpp
@@ -1,6 +1,11 @@
/*
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2015
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
*/
/*
This file is part of ODR-DabMod.
@@ -38,16 +43,18 @@
#include <sys/time.h>
#endif
+using namespace boost;
-typedef std::vector<Node*>::iterator NodeIterator;
-typedef std::vector<Edge*>::iterator EdgeIterator;
+typedef std::vector<shared_ptr<Node> >::iterator NodeIterator;
+typedef std::vector<shared_ptr<Edge> >::iterator EdgeIterator;
-Node::Node(ModPlugin* plugin) :
+Node::Node(shared_ptr<ModPlugin> plugin) :
myPlugin(plugin),
myProcessTime(0)
{
- PDEBUG("Node::Node(plugin(%s): %p) @ %p\n", plugin->name(), plugin, this);
+ PDEBUG("Node::Node(plugin(%s): %p) @ %p\n",
+ plugin->name(), plugin.get(), this);
}
@@ -56,24 +63,21 @@ Node::~Node()
{
PDEBUG("Node::~Node() @ %p\n", this);
- if (myPlugin != NULL) {
- delete myPlugin;
- }
assert(myInputBuffers.size() == 0);
assert(myOutputBuffers.size() == 0);
}
-Edge::Edge(Node* srcNode, Node* dstNode) :
+Edge::Edge(shared_ptr<Node>& srcNode, shared_ptr<Node>& dstNode) :
mySrcNode(srcNode),
myDstNode(dstNode)
{
PDEBUG("Edge::Edge(srcNode(%s): %p, dstNode(%s): %p) @ %p\n",
- srcNode->plugin()->name(), srcNode,
- dstNode->plugin()->name(), dstNode,
+ srcNode->plugin()->name(), srcNode.get(),
+ dstNode->plugin()->name(), dstNode.get(),
this);
- myBuffer = new Buffer();
+ myBuffer = shared_ptr<Buffer>(new Buffer());
srcNode->myOutputBuffers.push_back(myBuffer);
dstNode->myInputBuffers.push_back(myBuffer);
}
@@ -83,7 +87,7 @@ Edge::~Edge()
{
PDEBUG("Edge::~Edge() @ %p\n", this);
- std::vector<Buffer*>::iterator buffer;
+ std::vector<shared_ptr<Buffer> >::iterator buffer;
if (myBuffer != NULL) {
for (buffer = mySrcNode->myOutputBuffers.begin();
buffer != mySrcNode->myOutputBuffers.end();
@@ -102,7 +106,6 @@ Edge::~Edge()
break;
}
}
- delete myBuffer;
}
}
@@ -110,9 +113,26 @@ Edge::~Edge()
int Node::process()
{
PDEBUG("Edge::process()\n");
- PDEBUG(" Plugin name: %s (%p)\n", myPlugin->name(), myPlugin);
+ PDEBUG(" Plugin name: %s (%p)\n", myPlugin->name(), myPlugin.get());
+
+ // the plugin process() still wants vector<Buffer*>
+ // arguments.
+ std::vector<Buffer*> inBuffers;
+ std::vector<shared_ptr<Buffer> >::iterator buffer;
+ for (buffer = myInputBuffers.begin();
+ buffer != myInputBuffers.end();
+ ++buffer) {
+ inBuffers.push_back(buffer->get());
+ }
- return myPlugin->process(myInputBuffers, myOutputBuffers);
+ std::vector<Buffer*> outBuffers;
+ for (buffer = myOutputBuffers.begin();
+ buffer != myOutputBuffers.end();
+ ++buffer) {
+ outBuffers.push_back(buffer->get());
+ }
+
+ return myPlugin->process(inBuffers, outBuffers);
}
@@ -128,35 +148,26 @@ Flowgraph::~Flowgraph()
{
PDEBUG("Flowgraph::~Flowgraph() @ %p\n", this);
- std::vector<Edge*>::const_iterator edge;
- for (edge = edges.begin(); edge != edges.end(); ++edge) {
- delete *edge;
- }
-
if (myProcessTime) {
fprintf(stderr, "Process time:\n");
- }
- std::vector<Node*>::const_iterator node;
- for (node = nodes.begin(); node != nodes.end(); ++node) {
- if (myProcessTime) {
+
+ std::vector<shared_ptr<Node> >::const_iterator node;
+ for (node = nodes.begin(); node != nodes.end(); ++node) {
fprintf(stderr, " %30s: %10u us (%2.2f %%)\n",
(*node)->plugin()->name(),
(unsigned)(*node)->processTime(),
(*node)->processTime() * 100.0 / myProcessTime);
}
- delete *node;
- }
- if (myProcessTime) {
+
fprintf(stderr, " %30s: %10u us (100.00 %%)\n", "total",
(unsigned)myProcessTime);
}
}
-
-void Flowgraph::connect(ModPlugin* input, ModPlugin* output)
+void Flowgraph::connect(shared_ptr<ModPlugin> input, shared_ptr<ModPlugin> output)
{
PDEBUG("Flowgraph::connect(input(%s): %p, output(%s): %p)\n",
- input->name(), input, output->name(), output);
+ input->name(), input.get(), output->name(), output.get());
NodeIterator inputNode;
NodeIterator outputNode;
@@ -167,7 +178,7 @@ void Flowgraph::connect(ModPlugin* input, ModPlugin* output)
}
}
if (inputNode == nodes.end()) {
- inputNode = nodes.insert(nodes.end(), new Node(input));
+ inputNode = nodes.insert(nodes.end(), shared_ptr<Node>(new Node(input)));
}
for (outputNode = nodes.begin(); outputNode != nodes.end(); ++outputNode) {
@@ -176,14 +187,14 @@ void Flowgraph::connect(ModPlugin* input, ModPlugin* output)
}
}
if (outputNode == nodes.end()) {
- outputNode = nodes.insert(nodes.end(), new Node(output));
+ outputNode = nodes.insert(nodes.end(), shared_ptr<Node>(new Node(output)));
for (inputNode = nodes.begin(); inputNode != nodes.end(); ++inputNode) {
if ((*inputNode)->plugin() == input) {
break;
}
}
} else if (inputNode > outputNode) {
- Node* node = *outputNode;
+ shared_ptr<Node> node = *outputNode;
nodes.erase(outputNode);
outputNode = nodes.insert(nodes.end(), node);
for (inputNode = nodes.begin(); inputNode != nodes.end(); ++inputNode) {
@@ -196,7 +207,7 @@ void Flowgraph::connect(ModPlugin* input, ModPlugin* output)
assert((*inputNode)->plugin() == input);
assert((*outputNode)->plugin() == output);
- edges.push_back(new Edge(*inputNode, *outputNode));
+ edges.push_back(shared_ptr<Edge>(new Edge(*inputNode, *outputNode)));
}
@@ -204,7 +215,7 @@ bool Flowgraph::run()
{
PDEBUG("Flowgraph::run()\n");
- std::vector<Node*>::const_iterator node;
+ std::vector<shared_ptr<Node> >::const_iterator node;
timeval start, stop;
time_t diff;
@@ -224,3 +235,4 @@ bool Flowgraph::run()
}
return true;
}
+
diff --git a/src/Flowgraph.h b/src/Flowgraph.h
index 178b6a9..1129668 100644
--- a/src/Flowgraph.h
+++ b/src/Flowgraph.h
@@ -1,6 +1,11 @@
/*
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2015
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
*/
/*
This file is part of ODR-DabMod.
@@ -32,20 +37,21 @@
#include <sys/types.h>
#include <vector>
+#include <boost/shared_ptr.hpp>
class Node
{
public:
- Node(ModPlugin* plugin);
+ Node(boost::shared_ptr<ModPlugin> plugin);
~Node();
Node(const Node&);
Node& operator=(const Node&);
- ModPlugin* plugin() { return myPlugin; }
+ boost::shared_ptr<ModPlugin> plugin() { return myPlugin; }
- std::vector<Buffer*> myInputBuffers;
- std::vector<Buffer*> myOutputBuffers;
+ std::vector<boost::shared_ptr<Buffer> > myInputBuffers;
+ std::vector<boost::shared_ptr<Buffer> > myOutputBuffers;
int process();
time_t processTime() { return myProcessTime; }
@@ -54,7 +60,7 @@ public:
}
protected:
- ModPlugin* myPlugin;
+ boost::shared_ptr<ModPlugin> myPlugin;
time_t myProcessTime;
};
@@ -62,15 +68,15 @@ protected:
class Edge
{
public:
- Edge(Node* src, Node* dst);
+ Edge(boost::shared_ptr<Node>& src, boost::shared_ptr<Node>& dst);
~Edge();
Edge(const Edge&);
Edge& operator=(const Edge&);
protected:
- Node* mySrcNode;
- Node* myDstNode;
- Buffer* myBuffer;
+ boost::shared_ptr<Node> mySrcNode;
+ boost::shared_ptr<Node> myDstNode;
+ boost::shared_ptr<Buffer> myBuffer;
};
@@ -82,14 +88,16 @@ public:
Flowgraph(const Flowgraph&);
Flowgraph& operator=(const Flowgraph&);
- void connect(ModPlugin* input, ModPlugin* output);
+ void connect(boost::shared_ptr<ModPlugin> input,
+ boost::shared_ptr<ModPlugin> output);
bool run();
protected:
- std::vector<Node*> nodes;
- std::vector<Edge*> edges;
+ std::vector<boost::shared_ptr<Node> > nodes;
+ std::vector<boost::shared_ptr<Edge> > edges;
time_t myProcessTime;
};
#endif // FLOWGRAPH_H
+
diff --git a/src/FrameMultiplexer.cpp b/src/FrameMultiplexer.cpp
index c5e58b7..843f72d 100644
--- a/src/FrameMultiplexer.cpp
+++ b/src/FrameMultiplexer.cpp
@@ -30,8 +30,11 @@
typedef std::complex<float> complexf;
+using namespace boost;
-FrameMultiplexer::FrameMultiplexer(size_t framesize, const std::vector<SubchannelSource*>* subchannels) :
+FrameMultiplexer::FrameMultiplexer(
+ size_t framesize,
+ const std::vector<shared_ptr<SubchannelSource> >* subchannels) :
ModMux(ModFormat(framesize), ModFormat(framesize)),
d_frameSize(framesize),
mySubchannels(subchannels)
@@ -76,7 +79,7 @@ int FrameMultiplexer::process(std::vector<Buffer*> dataIn, Buffer* dataOut)
++in;
// Write subchannel
assert(mySubchannels->size() == dataIn.size() - 1);
- std::vector<SubchannelSource*>::const_iterator subchannel =
+ std::vector<shared_ptr<SubchannelSource> >::const_iterator subchannel =
mySubchannels->begin();
while (in != dataIn.end()) {
assert((*subchannel)->framesizeCu() * 8 == (*in)->getLength());
@@ -88,3 +91,4 @@ int FrameMultiplexer::process(std::vector<Buffer*> dataIn, Buffer* dataOut)
return dataOut->getLength();
}
+
diff --git a/src/FrameMultiplexer.h b/src/FrameMultiplexer.h
index f1bd587..ba571f6 100644
--- a/src/FrameMultiplexer.h
+++ b/src/FrameMultiplexer.h
@@ -29,7 +29,7 @@
#include "ModMux.h"
#include "SubchannelSource.h"
-
+#include <boost/shared_ptr.hpp>
#include <sys/types.h>
@@ -37,7 +37,8 @@
class FrameMultiplexer : public ModMux
{
public:
- FrameMultiplexer(size_t frameSize, const std::vector<SubchannelSource*>* subchannels);
+ FrameMultiplexer(size_t frameSize,
+ const std::vector<boost::shared_ptr<SubchannelSource> >* subchannels);
virtual ~FrameMultiplexer();
FrameMultiplexer(const FrameMultiplexer&);
FrameMultiplexer& operator=(const FrameMultiplexer&);
@@ -48,8 +49,8 @@ public:
protected:
size_t d_frameSize;
- const std::vector<SubchannelSource*>* mySubchannels;
+ const std::vector<boost::shared_ptr<SubchannelSource> >* mySubchannels;
};
-
#endif // FRAME_MULTIPLEXER_H
+
diff --git a/src/InputFileReader.cpp b/src/InputFileReader.cpp
index 205fbfa..84f0be4 100644
--- a/src/InputFileReader.cpp
+++ b/src/InputFileReader.cpp
@@ -49,8 +49,7 @@ int InputFileReader::Open(std::string filename, bool loop)
loop_ = loop;
inputfile_ = fopen(filename_.c_str(), "r");
if (inputfile_ == NULL) {
- fprintf(stderr, "Unable to open input file!\n");
- logger_.level(error) << "Unable to open input file!";
+ etiLog.level(error) << "Unable to open input file!";
perror(filename_.c_str());
return -1;
}
@@ -79,8 +78,7 @@ int InputFileReader::IdentifyType()
char discard_buffer[6144];
if (fread(&sync, sizeof(sync), 1, inputfile_) != 1) {
- fprintf(stderr, "Unable to read sync in input file!\n");
- logger_.level(error) << "Unable to read sync in input file!";
+ etiLog.level(error) << "Unable to read sync in input file!";
perror(filename_.c_str());
return -1;
}
@@ -96,8 +94,7 @@ int InputFileReader::IdentifyType()
// if the seek fails, consume the rest of the frame
if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_)
!= 1) {
- fprintf(stderr, "Unable to read from input file!\n");
- logger_.level(error) << "Unable to read from input file!";
+ etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
return -1;
}
@@ -108,8 +105,7 @@ int InputFileReader::IdentifyType()
nbFrames = sync;
if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) {
- fprintf(stderr, "Unable to read frame size in input file!\n");
- logger_.level(error) << "Unable to read frame size in input file!";
+ etiLog.level(error) << "Unable to read frame size in input file!";
perror(filename_.c_str());
return -1;
}
@@ -130,8 +126,7 @@ int InputFileReader::IdentifyType()
// if the seek fails, consume the rest of the frame
if (fread(discard_buffer, frameSize - 4, 1, inputfile_)
!= 1) {
- fprintf(stderr, "Unable to read from input file!\n");
- logger_.level(error) << "Unable to read from input file!";
+ etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
return -1;
}
@@ -141,8 +136,7 @@ int InputFileReader::IdentifyType()
}
if (fread(&sync, sizeof(sync), 1, inputfile_) != 1) {
- fprintf(stderr, "Unable to read nb frame in input file!\n");
- logger_.level(error) << "Unable to read nb frame in input file!";
+ etiLog.level(error) << "Unable to read nb frame in input file!";
perror(filename_.c_str());
return -1;
}
@@ -152,8 +146,7 @@ int InputFileReader::IdentifyType()
// if the seek fails, consume the rest of the frame
if (fread(discard_buffer, frameSize - 4, 1, inputfile_)
!= 1) {
- fprintf(stderr, "Unable to read from input file!\n");
- logger_.level(error) << "Unable to read from input file!";
+ etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
return -1;
}
@@ -168,8 +161,7 @@ int InputFileReader::IdentifyType()
sync >>= 8;
sync &= 0xffffff;
if (fread((uint8_t*)&sync + 3, 1, 1, inputfile_) != 1) {
- fprintf(stderr, "Unable to read from input file!\n");
- logger_.level(error) << "Unable to read from input file!";
+ etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
return -1;
}
@@ -184,8 +176,7 @@ int InputFileReader::IdentifyType()
if (fseek(inputfile_, -sizeof(sync), SEEK_CUR) != 0) {
if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_)
!= 1) {
- fprintf(stderr, "Unable to read from input file!\n");
- logger_.level(error) << "Unable to read from input file!";
+ etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
return -1;
}
@@ -195,8 +186,7 @@ int InputFileReader::IdentifyType()
}
}
- fprintf(stderr, "Bad input file format!\n");
- logger_.level(error) << "Bad input file format!";
+ etiLog.level(error) << "Bad input file format!";
return -1;
}
@@ -236,18 +226,18 @@ int InputFileReader::GetNextFrame(void* buffer)
}
else {
if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) {
- logger_.level(error) << "Reached end of file.";
+ etiLog.level(error) << "Reached end of file.";
if (loop_) {
if (Rewind() == 0) {
if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) {
PDEBUG("Error after rewinding file!\n");
- logger_.level(error) << "Error after rewinding file!";
+ etiLog.level(error) << "Error after rewinding file!";
return -1;
}
}
else {
PDEBUG("Impossible to rewind file!\n");
- logger_.level(error) << "Impossible to rewind file!";
+ etiLog.level(error) << "Impossible to rewind file!";
return -1;
}
}
@@ -257,8 +247,7 @@ int InputFileReader::GetNextFrame(void* buffer)
}
}
if (frameSize > 6144) { // there might be a better limit
- logger_.level(error) << "Wrong frame size " << frameSize << " in ETI file!";
- fprintf(stderr, "Wrong frame size %u in ETI file!\n", frameSize);
+ etiLog.level(error) << "Wrong frame size " << frameSize << " in ETI file!";
return -1;
}
@@ -275,7 +264,7 @@ int InputFileReader::GetNextFrame(void* buffer)
}
else {
PDEBUG("Impossible to rewind file!\n");
- logger_.level(error) << "Impossible to rewind file!";
+ etiLog.level(error) << "Impossible to rewind file!";
return -1;
}
}
@@ -285,12 +274,8 @@ int InputFileReader::GetNextFrame(void* buffer)
// A short read of a frame (i.e. reading an incomplete frame)
// is not tolerated. Input files must not contain incomplete frames
if (read_bytes != 0) {
- fprintf(stderr,
- "Unable to read a complete frame of %u data bytes from input file!\n",
- frameSize);
-
- perror(filename_.c_str());
- logger_.level(error) << "Unable to read from input file!";
+ etiLog.level(error) <<
+ "Unable to read a complete frame of " << frameSize << " data bytes from input file!";
return -1;
}
else {
diff --git a/src/InputReader.h b/src/InputReader.h
index 3e0dcab..b262cc9 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyrigth (C) 2013
+ Copyrigth (C) 2013, 2015
Matthias P. Braendli, matthias.braendli@mpb.li
*/
/*
@@ -31,6 +31,8 @@
#endif
#include <cstdio>
+#include <vector>
+#include <boost/shared_ptr.hpp>
#if defined(HAVE_ZEROMQ)
# include "zmq.hpp"
# include "ThreadsafeQueue.h"
@@ -85,15 +87,15 @@ class InputReader
class InputFileReader : public InputReader
{
public:
- InputFileReader(Logger logger) :
+ InputFileReader() :
streamtype_(ETI_STREAM_TYPE_NONE),
- inputfile_(NULL), logger_(logger) {};
+ inputfile_(NULL) { }
~InputFileReader()
{
- fprintf(stderr, "\nClosing input file...\n");
-
if (inputfile_ != NULL) {
+ fprintf(stderr, "\nClosing input file...\n");
+
fclose(inputfile_);
}
}
@@ -113,6 +115,9 @@ class InputFileReader : public InputReader
}
private:
+ InputFileReader(const InputFileReader& other);
+ InputFileReader& operator=(const InputFileReader& other);
+
int IdentifyType();
// Rewind the file, and replay anew
@@ -123,20 +128,30 @@ class InputFileReader : public InputReader
std::string filename_;
EtiStreamType streamtype_;
FILE* inputfile_;
- Logger logger_;
size_t inputfilelength_;
uint64_t nbframes_; // 64-bit because 32-bit overflow is
// after 2**32 * 24ms ~= 3.3 years
};
+struct zmq_input_overflow : public std::exception
+{
+ const char* what () const throw ()
+ {
+ return "InputZMQ buffer overflow";
+ }
+};
+
#if defined(HAVE_ZEROMQ)
/* A ZeroMQ input. See www.zeromq.org for more info */
struct InputZeroMQThreadData
{
- ThreadsafeQueue<uint8_t*> *in_messages;
+ ThreadsafeQueue<boost::shared_ptr<std::vector<uint8_t> > > *in_messages;
std::string uri;
+ unsigned max_queued_frames;
+
+ bool running;
};
class InputZeroMQWorker
@@ -168,10 +183,10 @@ class InputZeroMQWorker
class InputZeroMQReader : public InputReader
{
public:
- InputZeroMQReader(Logger logger) :
- logger_(logger), in_messages_(10)
+ InputZeroMQReader()
{
workerdata_.in_messages = &in_messages_;
+ workerdata_.running = false;
}
~InputZeroMQReader()
@@ -179,21 +194,22 @@ class InputZeroMQReader : public InputReader
worker_.Stop();
}
- int Open(std::string uri);
+ int Open(const std::string& uri, unsigned max_queued_frames);
int GetNextFrame(void* buffer);
void PrintInfo();
private:
- InputZeroMQReader(const InputZeroMQReader& other) {}
- Logger logger_;
+ InputZeroMQReader(const InputZeroMQReader& other);
+ InputZeroMQReader& operator=(const InputZeroMQReader& other);
std::string uri_;
InputZeroMQWorker worker_;
- ThreadsafeQueue<uint8_t*> in_messages_;
+ ThreadsafeQueue<boost::shared_ptr<std::vector<uint8_t> > > in_messages_;
struct InputZeroMQThreadData workerdata_;
};
#endif
#endif
+
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index f7f5702..36d4e4b 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2013, 2014
+ Copyright (C) 2013, 2014, 2015
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -37,12 +37,11 @@
#include <stdint.h>
#include "zmq.hpp"
#include <boost/thread/thread.hpp>
+#include <boost/make_shared.hpp>
#include "porting.h"
#include "InputReader.h"
#include "PcDebug.h"
-#define MAX_QUEUE_SIZE 50
-
#define NUM_FRAMES_PER_ZMQ_MESSAGE 4
/* A concatenation of four ETI frames,
* whose maximal size is 6144.
@@ -64,10 +63,18 @@ struct zmq_dab_message_t
uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144];
};
-int InputZeroMQReader::Open(std::string uri)
+int InputZeroMQReader::Open(const std::string& uri, unsigned max_queued_frames)
{
- uri_ = uri;
+ // The URL might start with zmq+tcp://
+ if (uri.substr(0, 4) == "zmq+") {
+ uri_ = uri.substr(4);
+ }
+ else {
+ uri_ = uri;
+ }
+
workerdata_.uri = uri;
+ workerdata_.max_queued_frames = max_queued_frames;
// launch receiver thread
worker_.Start(&workerdata_);
@@ -78,12 +85,25 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
{
const size_t framesize = 6144;
- uint8_t* incoming;
- in_messages_.wait_and_pop(incoming);
+ boost::shared_ptr<std::vector<uint8_t> > incoming;
- memcpy(buffer, incoming, framesize);
+ /* Do some prebuffering because reads will happen in bursts
+ * (4 ETI frames in TM1) and we should make sure that
+ * we can serve the data required for a full transmission frame.
+ */
+ if (in_messages_.size() < 4) {
+ const size_t prebuffering = 10;
+ in_messages_.wait_and_pop(incoming, prebuffering);
+ }
+ else {
+ in_messages_.wait_and_pop(incoming);
+ }
- delete incoming;
+ if (! workerdata_.running) {
+ throw zmq_input_overflow();
+ }
+
+ memcpy(buffer, &incoming->front(), framesize);
return framesize;
}
@@ -123,18 +143,16 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
}
m_to_drop--;
}
- else if (queue_size < MAX_QUEUE_SIZE) {
+ else if (queue_size < workerdata->max_queued_frames) {
if (buffer_full) {
- fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n",
- queue_size);
+ etiLog.level(info) << "ZeroMQ buffer recovered: " << queue_size << " elements";
buffer_full = false;
}
zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
if (dab_msg->version != 1) {
- fprintf(stderr, "ZeroMQ input: wrong packet version %d\n",
- dab_msg->version);
+ etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
}
int offset = sizeof(dab_msg->version) +
@@ -145,23 +163,20 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
if (dab_msg->buflen[i] <= 0 ||
dab_msg->buflen[i] > 6144)
{
- fprintf(stderr, "ZeroMQ buffer %d: invalid length %d\n",
- i, dab_msg->buflen[i]);
+ etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " <<
+ dab_msg->buflen[i];
// TODO error handling
}
else {
- uint8_t* buf = new uint8_t[6144];
+ boost::shared_ptr<std::vector<uint8_t> > buf =
+ boost::make_shared<std::vector<uint8_t> >(6144, 0x55);
const int framesize = dab_msg->buflen[i];
- memcpy(buf,
+ memcpy(&buf->front(),
((uint8_t*)incoming.data()) + offset,
framesize);
- // pad to 6144 bytes
- memset(&((uint8_t*)buf)[framesize],
- 0x55, 6144 - framesize);
-
offset += framesize;
queue_size = workerdata->in_messages->push(buf);
@@ -172,9 +187,10 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
workerdata->in_messages->notify();
if (!buffer_full) {
- fprintf(stderr, "ZeroMQ buffer overfull !\n");
+ etiLog.level(warn) << "ZeroMQ buffer overfull !";
buffer_full = true;
+ throw std::runtime_error("ZMQ input full");
}
queue_size = workerdata->in_messages->size();
@@ -188,23 +204,28 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
}
if (queue_size < 5) {
- fprintf(stderr, "ZeroMQ buffer underfull: %zu elements !\n",
- queue_size);
+ etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << " elements !";
}
}
}
catch (zmq::error_t& err) {
- fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what());
+ etiLog.level(error) << "ZeroMQ error in RecvProcess: '" << err.what() << "'";
+ }
+ catch (std::exception& err) {
}
- fprintf(stderr, "ZeroMQ input worker terminated\n");
+ etiLog.level(info) << "ZeroMQ input worker terminated";
subscriber.close();
+
+ workerdata->running = false;
+ workerdata->in_messages->notify();
}
void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata)
{
running = true;
+ workerdata->running = true;
recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata);
}
diff --git a/src/Makefile.am b/src/Makefile.am
deleted file mode 100644
index f8ba7c2..0000000
--- a/src/Makefile.am
+++ /dev/null
@@ -1,127 +0,0 @@
-# Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012 Her Majesty the
-# Queen in Right of Canada (Communications Research Center Canada)
-
-# This file is part of ODR-DabMod.
-#
-# ODR-DabMod is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# ODR-DabMod is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
-
-if IS_GIT_REPO
-GITVERSION_FLAGS = -DGITVERSION="\"`git describe`\""
-else
-GITVERSION_FLAGS =
-endif
-
-if HAVE_SSE
-SIMD_CFLAGS = -msse -msse2
-else
-SIMD_CFLAGS =
-endif
-
-bin_PROGRAMS = odr-dabmod
-
-if USE_KISS_FFT
-FFT_DIR=$(top_builddir)/lib/kiss_fft129
-FFT_INC=-I$(FFT_DIR) -I$(FFT_DIR)/tools
-FFT_SRC=$(FFT_DIR)/kiss_fft.c \
- $(FFT_DIR)/kiss_fft.h \
- $(FFT_DIR)/tools/kiss_fftr.c \
- $(FFT_DIR)/tools/kiss_fftr.h \
- kiss_fftsimd.c \
- kiss_fftsimd.h
-FFT_FLG=-ffast-math
-
-.PHONY: kiss_fft129 reed-solomon-4.0
-
-DabModulator.cpp: $(FFT_DIR)
-
-BUILT_SOURCES: $(FFT_DIR)
-
-FFT_LDADD=
-
-$(FFT_DIR):
- if [ ! -e $(FFT_DIR) ]; then \
- tar xzf $(top_srcdir)/lib/kiss_fft129.tar.gz -C $(top_builddir)/lib; \
- fi
-
-else
-FFT_LDADD=
-FFT_DIR=
-FFT_INC=
-FFT_SRC=
-FFT_FLG=
-endif
-
-odr_dabmod_CPPFLAGS = -Wall \
- $(FFT_INC) $(FFT_FLG) $(SIMD_CFLAGS) $(GITVERSION_FLAGS)
-odr_dabmod_LDADD = $(FFT_LDADD)
-odr_dabmod_SOURCES = DabMod.cpp \
- PcDebug.h \
- porting.c porting.h \
- DabModulator.cpp DabModulator.h \
- Buffer.cpp Buffer.h \
- ModCodec.cpp ModCodec.h \
- ModPlugin.cpp ModPlugin.h \
- ModFormat.cpp ModFormat.h \
- EtiReader.cpp EtiReader.h \
- Eti.cpp Eti.h \
- FicSource.cpp FicSource.h \
- FIRFilter.cpp FIRFilter.h \
- ModInput.cpp ModInput.h \
- PuncturingRule.cpp PuncturingRule.h \
- PuncturingEncoder.cpp PuncturingEncoder.h \
- SubchannelSource.cpp SubchannelSource.h \
- Flowgraph.cpp Flowgraph.h \
- GainControl.cpp GainControl.h \
- OutputMemory.cpp OutputMemory.h \
- OutputZeroMQ.cpp OutputZeroMQ.h \
- TimestampDecoder.h TimestampDecoder.cpp \
- OutputUHD.cpp OutputUHD.h \
- ModOutput.cpp ModOutput.h \
- InputMemory.cpp InputMemory.h \
- InputFileReader.cpp InputZeroMQReader.cpp InputReader.h \
- OutputFile.cpp OutputFile.h \
- FrameMultiplexer.cpp FrameMultiplexer.h \
- ModMux.cpp ModMux.h \
- PrbsGenerator.cpp PrbsGenerator.h \
- BlockPartitioner.cpp BlockPartitioner.h \
- QpskSymbolMapper.cpp QpskSymbolMapper.h \
- FrequencyInterleaver.cpp FrequencyInterleaver.h \
- PhaseReference.cpp PhaseReference.h \
- DifferentialModulator.cpp DifferentialModulator.h \
- NullSymbol.cpp NullSymbol.h \
- SignalMultiplexer.cpp SignalMultiplexer.h \
- CicEqualizer.cpp CicEqualizer.h \
- OfdmGenerator.cpp OfdmGenerator.h \
- GuardIntervalInserter.cpp GuardIntervalInserter.h \
- Resampler.cpp Resampler.h \
- ConvEncoder.cpp ConvEncoder.h \
- TimeInterleaver.cpp TimeInterleaver.h \
- ThreadsafeQueue.h \
- Log.cpp Log.h \
- RemoteControl.cpp RemoteControl.h \
- FormatConverter.cpp FormatConverter.h \
- zmq.hpp
-
-nodist_odr_dabmod_SOURCES = $(FFT_SRC)
-
-dist_bin_SCRIPTS = crc-dwap.py
-
-if USE_KISS_FFT
-EXTRA_DIST = kiss_fftsimd.c kiss_fftsimd.h
-
-clean-local:
- rm -rf $(FFT_DIR)
-
-endif
-
diff --git a/src/OutputMemory.h b/src/OutputMemory.h
index 2dd49c5..56cbc01 100644
--- a/src/OutputMemory.h
+++ b/src/OutputMemory.h
@@ -50,7 +50,7 @@
class OutputMemory : public ModOutput
{
public:
- OutputMemory(Buffer* dataOut = NULL);
+ OutputMemory(Buffer* dataOut);
virtual ~OutputMemory();
virtual int process(Buffer* dataIn, Buffer* dataOut);
const char* name() { return "OutputMemory"; }
diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp
index bfd24a8..b815a4c 100644
--- a/src/OutputUHD.cpp
+++ b/src/OutputUHD.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2014
+ Copyright (C) 2014, 2015
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -31,6 +31,11 @@
#include "PcDebug.h"
#include "Log.h"
#include "RemoteControl.h"
+#include "Utils.h"
+
+#include <boost/thread/future.hpp>
+
+#include <uhd/utils/msg.hpp>
#include <cmath>
#include <iostream>
@@ -46,23 +51,61 @@ using namespace std;
typedef std::complex<float> complexf;
+void uhd_msg_handler(uhd::msg::type_t type, const std::string &msg)
+{
+ if (type == uhd::msg::warning) {
+ etiLog.level(warn) << "UHD Warning: " << msg;
+ }
+ else if (type == uhd::msg::error) {
+ etiLog.level(error) << "UHD Error: " << msg;
+ }
+}
+
+// Check function for GPS fixtype
+bool check_gps_fix_ok(uhd::usrp::multi_usrp::sptr usrp)
+{
+ try {
+ std::string fixtype(
+ usrp->get_mboard_sensor("gps_fixtype", 0).to_pp_string());
+
+ if (fixtype.find("3d fix") == std::string::npos) {
+ etiLog.level(warn) << "OutputUHD: " << fixtype;
+
+ return false;
+ }
+
+ return true;
+ }
+ catch (uhd::lookup_error &e) {
+ etiLog.level(warn) << "OutputUHD: no gps_fixtype sensor";
+ return false;
+ }
+}
+
+
OutputUHD::OutputUHD(
- OutputUHDConfig& config,
- Logger& logger) :
+ const OutputUHDConfig& config) :
ModOutput(ModFormat(1), ModFormat(0)),
RemoteControllable("uhd"),
- myLogger(logger),
myConf(config),
// Since we don't know the buffer size, we cannot initialise
// the buffers at object initialisation.
first_run(true),
+ gps_fix_verified(false),
activebuffer(1),
myDelayBuf(0)
{
- myMuting = 0; // is remote-controllable
+ myMuting = true; // is remote-controllable, and reset by the GPS fix check
myStaticDelayUs = 0; // is remote-controllable
+ // Variables needed for GPS fix check
+ num_checks_without_gps_fix = 1;
+ first_gps_fix_check.tv_sec = 0;
+ last_gps_fix_check.tv_sec = 0;
+ time_last_frame.tv_sec = 0;
+
+
#if FAKE_UHD
MDEBUG("OutputUHD:Using fake UHD output");
#else
@@ -91,7 +134,10 @@ OutputUHD::OutputUHD(
RC_ADD_PARAMETER(freq, "UHD transmission frequency");
RC_ADD_PARAMETER(muting, "Mute the output by stopping the transmitter");
RC_ADD_PARAMETER(staticdelay, "Set static delay (uS) between 0 and 96000");
- RC_ADD_PARAMETER(iqbalance, "Set I/Q balance between 0 and 1.0");
+
+ // TODO: find out how to use boost::bind to give the logger to the
+ // uhd_msg_handler
+ uhd::msg::register_handler(uhd_msg_handler);
uhd::set_thread_priority_safe();
@@ -152,59 +198,7 @@ OutputUHD::OutputUHD(
MDEBUG("OutputUHD:Mute on missing timestamps: %s ...\n",
myConf.muteNoTimestamps ? "enabled" : "disabled");
- if (myConf.enableSync && (myConf.pps_src == "none")) {
- myLogger.level(warn) <<
- "OutputUHD: WARNING:"
- " you are using synchronous transmission without PPS input!";
-
- struct timespec now;
- if (clock_gettime(CLOCK_REALTIME, &now)) {
- perror("OutputUHD:Error: could not get time: ");
- myLogger.level(error) << "OutputUHD: could not get time";
- }
- else {
- myUsrp->set_time_now(uhd::time_spec_t(now.tv_sec));
- myLogger.level(info) << "OutputUHD: Setting USRP time to " <<
- uhd::time_spec_t(now.tv_sec).get_real_secs();
- }
- }
-
- if (myConf.pps_src != "none") {
- /* handling time for synchronisation: wait until the next full
- * second, and set the USRP time at next PPS */
- struct timespec now;
- time_t seconds;
- if (clock_gettime(CLOCK_REALTIME, &now)) {
- myLogger.level(error) << "OutputUHD: could not get time :" <<
- strerror(errno);
- throw std::runtime_error("OutputUHD: could not get time.");
- }
- else {
- seconds = now.tv_sec;
-
- MDEBUG("OutputUHD:sec+1: %ld ; now: %ld ...\n", seconds+1, now.tv_sec);
- while (seconds + 1 > now.tv_sec) {
- usleep(1);
- if (clock_gettime(CLOCK_REALTIME, &now)) {
- myLogger.level(error) << "OutputUHD: could not get time :" <<
- strerror(errno);
- throw std::runtime_error("OutputUHD: could not get time.");
- }
- }
- MDEBUG("OutputUHD:sec+1: %ld ; now: %ld ...\n", seconds+1, now.tv_sec);
- /* We are now shortly after the second change. */
-
- usleep(200000); // 200ms, we want the PPS to be later
- myUsrp->set_time_unknown_pps(uhd::time_spec_t(seconds + 2));
- myLogger.level(info) << "OutputUHD: Setting USRP time next pps to " <<
- uhd::time_spec_t(seconds + 2).get_real_secs();
- }
-
- usleep(1e6);
- myLogger.log(info, "OutputUHD: USRP time %f\n",
- myUsrp->get_time_now().get_real_secs());
- }
-
+ set_usrp_time();
// preparing output thread worker data
uwd.myUsrp = myUsrp;
@@ -215,24 +209,27 @@ OutputUHD::OutputUHD(
uwd.sampleRate = myConf.sampleRate;
uwd.sourceContainsTimestamp = false;
uwd.muteNoTimestamps = myConf.muteNoTimestamps;
- uwd.logger = &myLogger;
uwd.refclk_lock_loss_behaviour = myConf.refclk_lock_loss_behaviour;
if (myConf.refclk_src == "internal") {
uwd.check_refclk_loss = false;
+ uwd.check_gpsfix = false;
+ }
+ else if (myConf.refclk_src == "gpsdo") {
+ uwd.check_refclk_loss = true;
+ uwd.check_gpsfix = (myConf.maxGPSHoldoverTime != 0);
}
else {
uwd.check_refclk_loss = true;
+ uwd.check_gpsfix = false;
}
- SetDelayBuffer(config.dabMode);
+ SetDelayBuffer(myConf.dabMode);
shared_ptr<barrier> b(new barrier(2));
mySyncBarrier = b;
uwd.sync_barrier = b;
- worker.start(&uwd);
-
MDEBUG("OutputUHD:UHD ready.\n");
}
@@ -247,32 +244,30 @@ OutputUHD::~OutputUHD()
}
}
-void OutputUHD::SetDelayBuffer(unsigned int dabMode)
+int transmission_frame_duration_ms(unsigned int dabMode)
{
- // find out the duration of the transmission frame (Table 2 in ETSI 300 401)
switch (dabMode) {
- case 0: // could happen when called from constructor and we take the mode from ETI
- myTFDurationMs = 0;
- break;
- case 1:
- myTFDurationMs = 96;
- break;
- case 2:
- myTFDurationMs = 24;
- break;
- case 3:
- myTFDurationMs = 24;
- break;
- case 4:
- myTFDurationMs = 48;
- break;
- default:
- throw std::runtime_error("OutPutUHD: invalid DAB mode");
- }
- // The buffer size equals the number of samples per transmission frame so
- // we calculate it by multiplying the duration of the transmission frame
- // with the samplerate.
- myDelayBuf.resize(myTFDurationMs * myConf.sampleRate / 1000);
+ // could happen when called from constructor and we take the mode from ETI
+ case 0: return 0;
+
+ case 1: return 96;
+ case 2: return 24;
+ case 3: return 24;
+ case 4: return 48;
+ default:
+ throw std::runtime_error("OutputUHD: invalid DAB mode");
+ }
+}
+
+void OutputUHD::SetDelayBuffer(unsigned int dabMode)
+{
+ // find out the duration of the transmission frame (Table 2 in ETSI 300 401)
+ myTFDurationMs = transmission_frame_duration_ms(dabMode);
+
+ // The buffer size equals the number of samples per transmission frame so
+ // we calculate it by multiplying the duration of the transmission frame
+ // with the samplerate.
+ myDelayBuf.resize(myTFDurationMs * myConf.sampleRate / 1000);
}
int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
@@ -286,8 +281,25 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
// the first buffer
// We will only wait on the barrier on the subsequent calls to
// OutputUHD::process
- if (first_run) {
- myLogger.level(debug) << "OutputUHD: UHD initialising...";
+ if (not gps_fix_verified) {
+ if (uwd.check_gpsfix) {
+ initial_gps_check();
+
+ if (num_checks_without_gps_fix == 0) {
+ set_usrp_time();
+ gps_fix_verified = true;
+ myMuting = false;
+ }
+ }
+ else {
+ gps_fix_verified = true;
+ myMuting = false;
+ }
+ }
+ else if (first_run) {
+ etiLog.level(debug) << "OutputUHD: UHD initialising...";
+
+ worker.start(&uwd);
uwd.bufsize = dataIn->getLength();
uwd.frame0.buf = malloc(uwd.bufsize);
@@ -310,28 +322,53 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
default: break;
}
- // we only set the delay buffer from the dab mode signaled in ETI if the
- // dab mode was not set in contructor
- if (myTFDurationMs == 0)
- SetDelayBuffer(myEtiReader->getMode());
+ // we only set the delay buffer from the dab mode signaled in ETI if the
+ // dab mode was not set in contructor
+ if (myTFDurationMs == 0) {
+ SetDelayBuffer(myEtiReader->getMode());
+ }
activebuffer = 1;
lastLen = uwd.bufsize;
first_run = false;
- myLogger.level(debug) << "OutputUHD: UHD initialising complete";
+ etiLog.level(debug) << "OutputUHD: UHD initialising complete";
}
else {
if (lastLen != dataIn->getLength()) {
// I expect that this never happens.
- myLogger.level(emerg) <<
+ etiLog.level(emerg) <<
"OutputUHD: Fatal error, input length changed from " << lastLen <<
" to " << dataIn->getLength();
throw std::runtime_error("Non-constant input length!");
}
+
+ if (uwd.check_gpsfix) {
+ try {
+ check_gps();
+ }
+ catch (std::runtime_error& e) {
+ uwd.running = false;
+ etiLog.level(error) << e.what();
+ }
+ }
+
mySyncBarrier.get()->wait();
+ if (!uwd.running) {
+ worker.stop();
+ first_run = true;
+ if (uwd.failed_due_to_fct) {
+ throw fct_discontinuity_error();
+ }
+ else {
+ etiLog.level(error) <<
+ "OutputUHD: Error, UHD worker failed";
+ throw std::runtime_error("UHD worker failed");
+ }
+ }
+
// write into the our buffer while
// the worker sends the other.
@@ -371,56 +408,226 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
}
return uwd.bufsize;
+}
+
+
+void OutputUHD::set_usrp_time()
+{
+ if (myConf.enableSync && (myConf.pps_src == "none")) {
+ etiLog.level(warn) <<
+ "OutputUHD: WARNING:"
+ " you are using synchronous transmission without PPS input!";
+
+ struct timespec now;
+ if (clock_gettime(CLOCK_REALTIME, &now)) {
+ perror("OutputUHD:Error: could not get time: ");
+ etiLog.level(error) << "OutputUHD: could not get time";
+ }
+ else {
+ myUsrp->set_time_now(uhd::time_spec_t(now.tv_sec));
+ etiLog.level(info) << "OutputUHD: Setting USRP time to " <<
+ uhd::time_spec_t(now.tv_sec).get_real_secs();
+ }
+ }
+
+ if (myConf.pps_src != "none") {
+ /* handling time for synchronisation: wait until the next full
+ * second, and set the USRP time at next PPS */
+ struct timespec now;
+ time_t seconds;
+ if (clock_gettime(CLOCK_REALTIME, &now)) {
+ etiLog.level(error) << "OutputUHD: could not get time :" <<
+ strerror(errno);
+ throw std::runtime_error("OutputUHD: could not get time.");
+ }
+ else {
+ seconds = now.tv_sec;
+
+ MDEBUG("OutputUHD:sec+1: %ld ; now: %ld ...\n", seconds+1, now.tv_sec);
+ while (seconds + 1 > now.tv_sec) {
+ usleep(1);
+ if (clock_gettime(CLOCK_REALTIME, &now)) {
+ etiLog.level(error) << "OutputUHD: could not get time :" <<
+ strerror(errno);
+ throw std::runtime_error("OutputUHD: could not get time.");
+ }
+ }
+ MDEBUG("OutputUHD:sec+1: %ld ; now: %ld ...\n", seconds+1, now.tv_sec);
+ /* We are now shortly after the second change. */
+
+ usleep(200000); // 200ms, we want the PPS to be later
+ myUsrp->set_time_unknown_pps(uhd::time_spec_t(seconds + 2));
+ etiLog.level(info) << "OutputUHD: Setting USRP time next pps to " <<
+ uhd::time_spec_t(seconds + 2).get_real_secs();
+ }
+ usleep(1e6);
+ etiLog.log(info, "OutputUHD: USRP time %f\n",
+ myUsrp->get_time_now().get_real_secs());
+ }
}
-void UHDWorker::process()
+void OutputUHD::initial_gps_check()
{
- int workerbuffer = 0;
- time_t tx_second = 0;
- double pps_offset = 0;
- double last_pps = 2.0;
- double usrp_time;
+ if (first_gps_fix_check.tv_sec == 0) {
+ etiLog.level(info) << "Waiting for GPS fix";
- //const struct timespec hundred_nano = {0, 100};
+ if (clock_gettime(CLOCK_MONOTONIC, &first_gps_fix_check) != 0) {
+ stringstream ss;
+ ss << "clock_gettime failure: " << strerror(errno);
+ throw std::runtime_error(ss.str());
+ }
+ }
- size_t sizeIn;
- struct UHDWorkerFrameData* frame;
+ check_gps();
- size_t num_acc_samps; //number of accumulated samples
- //int write_fail_count;
+ if (last_gps_fix_check.tv_sec >
+ first_gps_fix_check.tv_sec + initial_gps_fix_wait) {
+ stringstream ss;
+ ss << "GPS did not fix in " << initial_gps_fix_wait << " seconds";
+ throw std::runtime_error(ss.str());
+ }
- // Transmit timeout
- const double timeout = 0.2;
+ if (time_last_frame.tv_sec == 0) {
+ if (clock_gettime(CLOCK_MONOTONIC, &time_last_frame) != 0) {
+ stringstream ss;
+ ss << "clock_gettime failure: " << strerror(errno);
+ throw std::runtime_error(ss.str());
+ }
+ }
+
+ struct timespec now;
+ if (clock_gettime(CLOCK_MONOTONIC, &now) != 0) {
+ stringstream ss;
+ ss << "clock_gettime failure: " << strerror(errno);
+ throw std::runtime_error(ss.str());
+ }
+
+ long delta_us = timespecdiff_us(time_last_frame, now);
+ long wait_time_us = transmission_frame_duration_ms(myConf.dabMode);
+
+ if (wait_time_us - delta_us > 0) {
+ usleep(wait_time_us - delta_us);
+ }
+
+ time_last_frame.tv_nsec += wait_time_us * 1000;
+ if (time_last_frame.tv_nsec >= 1000000000L) {
+ time_last_frame.tv_nsec -= 1000000000L;
+ time_last_frame.tv_sec++;
+ }
+}
+
+void OutputUHD::check_gps()
+{
+ struct timespec time_now;
+ if (clock_gettime(CLOCK_MONOTONIC, &time_now) != 0) {
+ stringstream ss;
+ ss << "clock_gettime failure: " << strerror(errno);
+ throw std::runtime_error(ss.str());
+ }
+
+ // Divide interval by two because we alternate between
+ // launch and check
+ if (uwd.check_gpsfix and
+ last_gps_fix_check.tv_sec + gps_fix_check_interval/2.0 <
+ time_now.tv_sec) {
+ last_gps_fix_check = time_now;
+
+ // Alternate between launching thread and checking the
+ // result.
+ if (gps_fix_task.joinable()) {
+ if (gps_fix_future.has_value()) {
+
+ gps_fix_future.wait();
+
+ gps_fix_task.join();
+
+ if (not gps_fix_future.get()) {
+ if (num_checks_without_gps_fix == 0) {
+ etiLog.level(alert) <<
+ "OutputUHD: GPS Fix lost";
+ }
+ num_checks_without_gps_fix++;
+ }
+ else {
+ if (num_checks_without_gps_fix) {
+ etiLog.level(info) <<
+ "OutputUHD: GPS Fix recovered";
+ }
+ num_checks_without_gps_fix = 0;
+ }
+
+ if (gps_fix_check_interval * num_checks_without_gps_fix >
+ myConf.maxGPSHoldoverTime) {
+ std::stringstream ss;
+ ss << "Lost GPS fix for " << gps_fix_check_interval *
+ num_checks_without_gps_fix << " seconds";
+ throw std::runtime_error(ss.str());
+ }
+ }
+ }
+ else {
+ // Checking the sensor here takes too much
+ // time, it has to be done in a separate thread.
+ gps_fix_pt = boost::packaged_task<bool>(
+ boost::bind(check_gps_fix_ok, myUsrp) );
+
+ gps_fix_future = gps_fix_pt.get_future();
+
+ gps_fix_task = boost::thread(boost::move(gps_fix_pt));
+ }
+ }
+}
+
+//============================ UHD Worker ========================
+
+void UHDWorker::process_errhandler()
+{
+ try {
+ process();
+ }
+ catch (fct_discontinuity_error& e) {
+ etiLog.level(warn) << e.what();
+ uwd->failed_due_to_fct = true;
+ }
+
+ uwd->running = false;
+ uwd->sync_barrier.get()->wait();
+ etiLog.level(warn) << "UHD worker terminated";
+}
+
+void UHDWorker::process()
+{
+ int workerbuffer = 0;
+ tx_second = 0;
+ pps_offset = 0.0;
+ last_pps = 2.0;
#if FAKE_UHD == 0
uhd::stream_args_t stream_args("fc32"); //complex floats
- uhd::tx_streamer::sptr myTxStream = uwd->myUsrp->get_tx_stream(stream_args);
- size_t usrp_max_num_samps = myTxStream->get_max_num_samps();
-#else
- size_t usrp_max_num_samps = 2048; // arbitrarily chosen
+ myTxStream = uwd->myUsrp->get_tx_stream(stream_args);
#endif
- const complexf* in;
-
- uhd::tx_metadata_t md;
md.start_of_burst = false;
- md.end_of_burst = false;
+ md.end_of_burst = false;
+
+ expected_next_fct = -1;
- int expected_next_fct = -1;
+ num_underflows = 0;
+ num_late_packets = 0;
- while (running) {
- bool fct_discontinuity = false;
- md.has_time_spec = false;
- md.time_spec = uhd::time_spec_t(0.0);
- num_acc_samps = 0;
- //write_fail_count = 0;
+ while (uwd->running) {
+ fct_discontinuity = false;
+ md.has_time_spec = false;
+ md.time_spec = uhd::time_spec_t(0.0);
/* Wait for barrier */
// this wait will hopefully always be the second one
// because modulation should be quicker than transmission
uwd->sync_barrier.get()->wait();
+ struct UHDWorkerFrameData* frame;
+
if (workerbuffer == 0) {
frame = &(uwd->frame0);
}
@@ -432,241 +639,240 @@ void UHDWorker::process()
"UHDWorker.process: workerbuffer is neither 0 nor 1 !");
}
- in = reinterpret_cast<const complexf*>(frame->buf);
- pps_offset = frame->ts.timestamp_pps_offset;
+ handle_frame(frame);
- // Tx second from MNSC
- tx_second = frame->ts.timestamp_sec;
+ // swap buffers
+ workerbuffer = (workerbuffer + 1) % 2;
+ }
+}
- sizeIn = uwd->bufsize / sizeof(complexf);
+void UHDWorker::handle_frame(const struct UHDWorkerFrameData *frame)
+{
+ // Transmit timeout
+ static const double tx_timeout = 20.0;
- /* Verify that the FCT value is correct. If we miss one transmission
- * frame we must interrupt UHD and resync to the timestamps
- */
- if (expected_next_fct != -1) {
- if (expected_next_fct != (int)frame->ts.fct) {
- uwd->logger->level(warn) <<
- "OutputUHD: Incorrect expect fct " << frame->ts.fct;
+ pps_offset = frame->ts.timestamp_pps_offset;
- fct_discontinuity = true;
- }
- }
+ // Tx second from MNSC
+ tx_second = frame->ts.timestamp_sec;
- expected_next_fct = (frame->ts.fct + uwd->fct_increment) % 250;
+ /* Verify that the FCT value is correct. If we miss one transmission
+ * frame we must interrupt UHD and resync to the timestamps
+ */
+ if (frame->ts.fct == -1) {
+ etiLog.level(info) <<
+ "OutputUHD: dropping one frame with invalid FCT";
+ return;
+ }
+ if (expected_next_fct != -1) {
+ if (expected_next_fct != (int)frame->ts.fct) {
+ etiLog.level(warn) <<
+ "OutputUHD: Incorrect expect fct " << frame->ts.fct <<
+ ", expected " << expected_next_fct;
+
+ fct_discontinuity = true;
+ throw fct_discontinuity_error();
+ }
+ }
- // Check for ref_lock
- if (uwd->check_refclk_loss)
- {
- try {
- // TODO: Is this check specific to the B100 and USRP2 ?
- if (! uwd->myUsrp->get_mboard_sensor("ref_locked", 0).to_bool()) {
- uwd->logger->log(alert,
- "OutputUHD: External reference clock lock lost !");
- if (uwd->refclk_lock_loss_behaviour == CRASH) {
- throw std::runtime_error(
- "OutputUHD: External reference clock lock lost.");
- }
+ expected_next_fct = (frame->ts.fct + uwd->fct_increment) % 250;
+
+ // Check for ref_lock
+ if (uwd->check_refclk_loss) {
+ try {
+ // TODO: Is this check specific to the B100 and USRP2 ?
+ if (! uwd->myUsrp->get_mboard_sensor("ref_locked", 0).to_bool()) {
+ etiLog.log(alert,
+ "OutputUHD: External reference clock lock lost !");
+ if (uwd->refclk_lock_loss_behaviour == CRASH) {
+ throw std::runtime_error(
+ "OutputUHD: External reference clock lock lost.");
}
}
- catch (uhd::lookup_error &e) {
- uwd->check_refclk_loss = false;
- uwd->logger->log(warn,
- "OutputUHD: This USRP does not have mboard sensor for ext clock loss."
- " Check disabled.");
- }
}
+ catch (uhd::lookup_error &e) {
+ uwd->check_refclk_loss = false;
+ etiLog.log(warn,
+ "OutputUHD: This USRP does not have mboard sensor for ext clock loss."
+ " Check disabled.");
+ }
+ }
- usrp_time = uwd->myUsrp->get_time_now().get_real_secs();
-
- if (uwd->sourceContainsTimestamp) {
- if (!frame->ts.timestamp_valid) {
- /* We have not received a full timestamp through
- * MNSC. We sleep through the frame.
- */
- uwd->logger->level(info) <<
- "OutputUHD: Throwing sample " << frame->ts.fct <<
- " away: incomplete timestamp " << tx_second <<
- " + " << pps_offset;
- usleep(20000); //TODO should this be TM-dependant ?
- goto loopend;
- }
-
- md.has_time_spec = true;
- md.time_spec = uhd::time_spec_t(tx_second, pps_offset);
-
- // md is defined, let's do some checks
- if (md.time_spec.get_real_secs() + 0.2 < usrp_time) {
- uwd->logger->level(warn) <<
- "OutputUHD: Timestamp in the past! offset: " <<
- md.time_spec.get_real_secs() - usrp_time <<
- " (" << usrp_time << ")"
- " frame " << frame->ts.fct <<
- ", tx_second " << tx_second <<
- ", pps " << pps_offset;
- goto loopend; //skip the frame
- }
+ double usrp_time = uwd->myUsrp->get_time_now().get_real_secs();
- if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_MARGIN_FUTURE) {
- uwd->logger->level(warn) <<
- "OutputUHD: Timestamp too far in the future! offset: " <<
- md.time_spec.get_real_secs() - usrp_time;
- usleep(20000); //sleep so as to fill buffers
- }
- if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_ABORT_FUTURE) {
- uwd->logger->level(error) <<
- "OutputUHD: Timestamp way too far in the future! offset: " <<
- md.time_spec.get_real_secs() - usrp_time;
- throw std::runtime_error("Timestamp error. Aborted.");
- }
+ if (uwd->sourceContainsTimestamp) {
+ if (!frame->ts.timestamp_valid) {
+ /* We have not received a full timestamp through
+ * MNSC. We sleep through the frame.
+ */
+ etiLog.level(info) <<
+ "OutputUHD: Throwing sample " << frame->ts.fct <<
+ " away: incomplete timestamp " << tx_second <<
+ " + " << pps_offset;
+ usleep(20000); //TODO should this be TM-dependant ?
+ return;
+ }
- if (last_pps > pps_offset) {
- uwd->logger->log(info,
- "OutputUHD (usrp time: %f): frame %d;"
- " tx_second %zu; pps %.9f\n",
- usrp_time,
- frame->ts.fct, tx_second, pps_offset);
- }
+ md.has_time_spec = true;
+ md.time_spec = uhd::time_spec_t(tx_second, pps_offset);
+
+ // md is defined, let's do some checks
+ if (md.time_spec.get_real_secs() + tx_timeout < usrp_time) {
+ etiLog.level(warn) <<
+ "OutputUHD: Timestamp in the past! offset: " <<
+ md.time_spec.get_real_secs() - usrp_time <<
+ " (" << usrp_time << ")"
+ " frame " << frame->ts.fct <<
+ ", tx_second " << tx_second <<
+ ", pps " << pps_offset;
+ return;
+ }
+ if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_ABORT_FUTURE) {
+ etiLog.level(error) <<
+ "OutputUHD: Timestamp way too far in the future! offset: " <<
+ md.time_spec.get_real_secs() - usrp_time;
+ throw std::runtime_error("Timestamp error. Aborted.");
}
- else { // !uwd->sourceContainsTimestamp
- if (uwd->muting || uwd->muteNoTimestamps) {
- /* There was some error decoding the timestamp
- */
- if (uwd->muting) {
- uwd->logger->log(info,
- "OutputUHD: Muting sample %d requested\n",
- frame->ts.fct);
- }
- else {
- uwd->logger->log(info,
- "OutputUHD: Muting sample %d : no timestamp\n",
- frame->ts.fct);
- }
- usleep(20000);
- goto loopend;
+ }
+ else { // !uwd->sourceContainsTimestamp
+ if (uwd->muting || uwd->muteNoTimestamps) {
+ /* There was some error decoding the timestamp
+ */
+ if (uwd->muting) {
+ etiLog.log(info,
+ "OutputUHD: Muting sample %d requested\n",
+ frame->ts.fct);
}
+ else {
+ etiLog.log(info,
+ "OutputUHD: Muting sample %d : no timestamp\n",
+ frame->ts.fct);
+ }
+ usleep(20000);
+ return;
}
+ }
- PDEBUG("UHDWorker::process:max_num_samps: %zu.\n",
- usrp_max_num_samps);
+ tx_frame(frame);
- while (running && !uwd->muting && (num_acc_samps < sizeIn)) {
- size_t samps_to_send = std::min(sizeIn - num_acc_samps, usrp_max_num_samps);
+ if (last_pps > pps_offset) {
+ if (num_underflows or num_late_packets) {
+ etiLog.log(info,
+ "OutputUHD status (usrp time: %f): "
+ "%d underruns and %d late packets since last status.\n",
+ usrp_time,
+ num_underflows, num_late_packets);
+ }
+ num_underflows = 0;
+ num_late_packets = 0;
+ }
- //ensure the the last packet has EOB set if the timestamps has been
- //refreshed and need to be reconsidered.
- //Also, if we saw that the FCT did not increment as expected, which
- //could be due to a lost incoming packet.
- md.end_of_burst = (
- uwd->sourceContainsTimestamp &&
- (frame->ts.timestamp_refresh || fct_discontinuity) &&
- samps_to_send <= usrp_max_num_samps );
+ last_pps = pps_offset;
+}
+void UHDWorker::tx_frame(const struct UHDWorkerFrameData *frame)
+{
+ const double tx_timeout = 20.0;
+ const size_t sizeIn = uwd->bufsize / sizeof(complexf);
+ const complexf* in_data = reinterpret_cast<const complexf*>(frame->buf);
-#if FAKE_UHD
- // This is probably very approximate
- usleep( (1000000 / uwd->sampleRate) * samps_to_send);
- size_t num_tx_samps = samps_to_send;
+#if FAKE_UHD == 0
+ size_t usrp_max_num_samps = myTxStream->get_max_num_samps();
#else
- //send a single packet
- size_t num_tx_samps = myTxStream->send(
- &in[num_acc_samps],
- samps_to_send, md, timeout);
+ size_t usrp_max_num_samps = 2048; // arbitrarily chosen
#endif
- num_acc_samps += num_tx_samps;
+ size_t num_acc_samps = 0; //number of accumulated samples
+ while (uwd->running && !uwd->muting && (num_acc_samps < sizeIn)) {
+ size_t samps_to_send = std::min(sizeIn - num_acc_samps, usrp_max_num_samps);
- md.time_spec = uhd::time_spec_t(tx_second, pps_offset)
- + uhd::time_spec_t(0, num_acc_samps/uwd->sampleRate);
+ //ensure the the last packet has EOB set if the timestamps has been
+ //refreshed and need to be reconsidered.
+ //Also, if we saw that the FCT did not increment as expected, which
+ //could be due to a lost incoming packet.
+ md.end_of_burst = (
+ uwd->sourceContainsTimestamp &&
+ (frame->ts.timestamp_refresh || fct_discontinuity) &&
+ samps_to_send <= usrp_max_num_samps );
- /*
- fprintf(stderr, "*** pps_offset %f, md.time_spec %f, usrp->now %f\n",
- pps_offset,
- md.time_spec.get_real_secs(),
- uwd->myUsrp->get_time_now().get_real_secs());
- // */
-
- if (num_tx_samps == 0) {
-#if 1
- uwd->logger->log(warn,
- "UHDWorker::process() unable to write to device, skipping frame!\n");
- break;
+#if FAKE_UHD
+ // This is probably very approximate
+ usleep( (1000000 / uwd->sampleRate) * samps_to_send);
+ size_t num_tx_samps = samps_to_send;
#else
- // This has been disabled, because if there is a write failure,
- // we'd better not insist and try to go on transmitting future
- // frames.
- // The goal is not to try to send by all means possible. It's
- // more important to make sure the SFN is not disturbed.
-
- fprintf(stderr, "F");
- nanosleep(&hundred_nano, NULL);
- write_fail_count++;
- if (write_fail_count >= 3) {
- double ts = md.time_spec.get_real_secs();
- double t_usrp = uwd->myUsrp->get_time_now().get_real_secs();
-
- fprintf(stderr, "*** USRP write fail count %d\n", write_fail_count);
- fprintf(stderr, "*** delta %f, md.time_spec %f, usrp->now %f\n",
- ts - t_usrp,
- ts, t_usrp);
-
- fprintf(stderr, "UHDWorker::process() unable to write to device, skipping frame!\n");
- break;
- }
+ //send a single packet
+ size_t num_tx_samps = myTxStream->send(
+ &in_data[num_acc_samps],
+ samps_to_send, md, tx_timeout);
#endif
- }
-#if FAKE_UHD == 0
- uhd::async_metadata_t async_md;
- if (uwd->myUsrp->get_device()->recv_async_msg(async_md, 0)) {
- const char* uhd_async_message = "";
- bool failure = true;
- switch (async_md.event_code) {
- case uhd::async_metadata_t::EVENT_CODE_BURST_ACK:
- failure = false;
- break;
- case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW:
- uhd_async_message = "Underflow";
- break;
- case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR:
- uhd_async_message = "Packet loss between host and device.";
- break;
- case uhd::async_metadata_t::EVENT_CODE_TIME_ERROR:
- uhd_async_message = "Packet had time that was late.";
- break;
- case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET:
- uhd_async_message = "Underflow occurred inside a packet.";
- break;
- case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR_IN_BURST:
- uhd_async_message = "Packet loss within a burst.";
- break;
- default:
- uhd_async_message = "unknown event code";
- break;
- }
+ num_acc_samps += num_tx_samps;
- if (failure) {
- uwd->logger->level(alert) << "Near frame " <<
- frame->ts.fct << ": Received Async UHD Message '" <<
- uhd_async_message << "'";
+ md.time_spec = uhd::time_spec_t(tx_second, pps_offset)
+ + uhd::time_spec_t(0, num_acc_samps/uwd->sampleRate);
- }
- }
-#endif
+ if (num_tx_samps == 0) {
+ etiLog.log(warn,
+ "UHDWorker::process() unable to write to device, skipping frame!\n");
+ break;
}
- last_pps = pps_offset;
-
-loopend:
- // swap buffers
- workerbuffer = (workerbuffer + 1) % 2;
+ print_async_metadata(frame);
}
+}
- uwd->logger->level(warn) << "UHD worker terminated";
+void UHDWorker::print_async_metadata(const struct UHDWorkerFrameData *frame)
+{
+#if FAKE_UHD == 0
+ uhd::async_metadata_t async_md;
+ if (uwd->myUsrp->get_device()->recv_async_msg(async_md, 0)) {
+ const char* uhd_async_message = "";
+ bool failure = false;
+ switch (async_md.event_code) {
+ case uhd::async_metadata_t::EVENT_CODE_BURST_ACK:
+ break;
+ case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW:
+ uhd_async_message = "Underflow";
+ num_underflows++;
+ break;
+ case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR:
+ uhd_async_message = "Packet loss between host and device.";
+ failure = true;
+ break;
+ case uhd::async_metadata_t::EVENT_CODE_TIME_ERROR:
+ uhd_async_message = "Packet had time that was late.";
+ num_late_packets++;
+ break;
+ case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET:
+ uhd_async_message = "Underflow occurred inside a packet.";
+ failure = true;
+ break;
+ case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR_IN_BURST:
+ uhd_async_message = "Packet loss within a burst.";
+ failure = true;
+ break;
+ default:
+ uhd_async_message = "unknown event code";
+ failure = true;
+ break;
+ }
+
+ if (failure) {
+ etiLog.level(alert) << "Near frame " <<
+ frame->ts.fct << ": Received Async UHD Message '" <<
+ uhd_async_message << "'";
+
+ }
+ }
+#endif
}
+// =======================================
+// Remote Control for UHD
+// =======================================
void OutputUHD::set_parameter(const string& parameter, const string& value)
{
@@ -688,26 +894,21 @@ void OutputUHD::set_parameter(const string& parameter, const string& value)
else if (parameter == "staticdelay") {
int64_t adjust;
ss >> adjust;
- if (adjust > (myTFDurationMs * 1000))
- { // reset static delay for values outside range
- myStaticDelayUs = 0;
- }
- else
- { // the new adjust value is added to the existing delay and the result
- // is wrapped around at TF duration
- int newStaticDelayUs = myStaticDelayUs + adjust;
- if (newStaticDelayUs > (myTFDurationMs * 1000))
- myStaticDelayUs = newStaticDelayUs - (myTFDurationMs * 1000);
- else if (newStaticDelayUs < 0)
- myStaticDelayUs = newStaticDelayUs + (myTFDurationMs * 1000);
- else
- myStaticDelayUs = newStaticDelayUs;
- }
- }
- else if (parameter == "iqbalance") {
- ss >> myConf.frequency;
- myUsrp->set_tx_freq(myConf.frequency);
- myConf.frequency = myUsrp->get_tx_freq();
+ if (adjust > (myTFDurationMs * 1000))
+ { // reset static delay for values outside range
+ myStaticDelayUs = 0;
+ }
+ else
+ { // the new adjust value is added to the existing delay and the result
+ // is wrapped around at TF duration
+ int newStaticDelayUs = myStaticDelayUs + adjust;
+ if (newStaticDelayUs > (myTFDurationMs * 1000))
+ myStaticDelayUs = newStaticDelayUs - (myTFDurationMs * 1000);
+ else if (newStaticDelayUs < 0)
+ myStaticDelayUs = newStaticDelayUs + (myTFDurationMs * 1000);
+ else
+ myStaticDelayUs = newStaticDelayUs;
+ }
}
else {
stringstream ss;
@@ -741,3 +942,4 @@ const string OutputUHD::get_parameter(const string& parameter) const
}
#endif // HAVE_OUTPUT_UHD
+
diff --git a/src/OutputUHD.h b/src/OutputUHD.h
index d002e98..633de04 100644
--- a/src/OutputUHD.h
+++ b/src/OutputUHD.h
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2014
+ Copyright (C) 2014, 2015
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -50,6 +50,7 @@ DESCRIPTION:
#include <boost/thread/thread.hpp>
#include <boost/thread/barrier.hpp>
#include <boost/shared_ptr.hpp>
+#include <boost/atomic.hpp>
#include <list>
#include <string>
@@ -83,9 +84,20 @@ struct UHDWorkerFrameData {
struct frame_timestamp ts;
};
+struct fct_discontinuity_error : public std::exception
+{
+ const char* what () const throw ()
+ {
+ return "FCT discontinuity detected";
+ }
+};
+
enum refclk_lock_loss_behaviour_t { CRASH, IGNORE };
struct UHDWorkerData {
+ boost::atomic<bool> running;
+ bool failed_due_to_fct;
+
#if FAKE_UHD == 0
uhd::usrp::multi_usrp::sptr myUsrp;
#endif
@@ -109,6 +121,9 @@ struct UHDWorkerData {
// If we want to verify loss of refclk
bool check_refclk_loss;
+ // If we want to check for the gps_fixtype sensor
+ bool check_gpsfix;
+
// muting set by remote control
bool muting;
@@ -118,9 +133,6 @@ struct UHDWorkerData {
// What to do when the reference clock PLL loses lock
refclk_lock_loss_behaviour_t refclk_lock_loss_behaviour;
- // The common logger
- Logger* logger;
-
// What transmission mode we're using defines by how
// much the FCT should increment for each
// transmission frame.
@@ -130,31 +142,44 @@ struct UHDWorkerData {
class UHDWorker {
public:
- UHDWorker () {
- running = false;
- }
-
void start(struct UHDWorkerData *uhdworkerdata) {
- running = true;
uwd = uhdworkerdata;
- uhd_thread = boost::thread(&UHDWorker::process, this);
+
+ uwd->running = true;
+ uwd->failed_due_to_fct = false;
+ uhd_thread = boost::thread(&UHDWorker::process_errhandler, this);
}
void stop() {
- running = false;
+ uwd->running = false;
uhd_thread.interrupt();
uhd_thread.join();
}
- void process();
+ private:
+ // Asynchronous message statistics
+ int num_underflows;
+ int num_late_packets;
+ bool fct_discontinuity;
+ int expected_next_fct;
+ uhd::tx_metadata_t md;
+ time_t tx_second;
+ double pps_offset;
+ double last_pps;
+
+ void print_async_metadata(const struct UHDWorkerFrameData *frame);
+
+ void handle_frame(const struct UHDWorkerFrameData *frame);
+ void tx_frame(const struct UHDWorkerFrameData *frame);
- private:
struct UHDWorkerData *uwd;
- bool running;
boost::thread uhd_thread;
uhd::tx_streamer::sptr myTxStream;
+
+ void process();
+ void process_errhandler();
};
/* This structure is used as initial configuration for OutputUHD */
@@ -171,7 +196,8 @@ struct OutputUHDConfig {
double txgain;
bool enableSync;
bool muteNoTimestamps;
- unsigned dabMode;
+ unsigned dabMode;
+ unsigned maxGPSHoldoverTime;
/* allowed values : auto, int, sma, mimo */
std::string refclk_src;
@@ -190,9 +216,7 @@ struct OutputUHDConfig {
class OutputUHD: public ModOutput, public RemoteControllable {
public:
- OutputUHD(
- OutputUHDConfig& config,
- Logger& logger);
+ OutputUHD(const OutputUHDConfig& config);
~OutputUHD();
int process(Buffer* dataIn, Buffer* dataOut);
@@ -218,13 +242,16 @@ class OutputUHD: public ModOutput, public RemoteControllable {
protected:
- Logger& myLogger;
+ OutputUHD(const OutputUHD& other);
+ OutputUHD& operator=(const OutputUHD& other);
+
EtiReader *myEtiReader;
OutputUHDConfig myConf;
uhd::usrp::multi_usrp::sptr myUsrp;
boost::shared_ptr<boost::barrier> mySyncBarrier;
UHDWorker worker;
bool first_run;
+ bool gps_fix_verified;
struct UHDWorkerData uwd;
int activebuffer;
@@ -232,14 +259,36 @@ class OutputUHD: public ModOutput, public RemoteControllable {
bool myMuting;
private:
- // methods
- void SetDelayBuffer(unsigned int dabMode);
+ // Resize the internal delay buffer according to the dabMode and
+ // the sample rate.
+ void SetDelayBuffer(unsigned int dabMode);
// data
int myStaticDelayUs; // static delay in microseconds
- int myTFDurationMs; // TF duration in milliseconds
+ int myTFDurationMs; // TF duration in milliseconds
std::vector<complexf> myDelayBuf;
size_t lastLen;
+
+ // GPS Fix check variables
+ int num_checks_without_gps_fix;
+ struct timespec first_gps_fix_check;
+ struct timespec last_gps_fix_check;
+ struct timespec time_last_frame;
+ boost::packaged_task<bool> gps_fix_pt;
+ boost::unique_future<bool> gps_fix_future;
+ boost::thread gps_fix_task;
+
+ // Wait time in seconds to get fix
+ static const int initial_gps_fix_wait = 60;
+
+ // Interval for checking the GPS at runtime
+ static const double gps_fix_check_interval = 10.0; // seconds
+
+ void check_gps();
+
+ void set_usrp_time();
+
+ void initial_gps_check();
};
#endif // HAVE_OUTPUT_UHD
diff --git a/src/OutputZeroMQ.cpp b/src/OutputZeroMQ.cpp
index 793e473..da4473e 100644
--- a/src/OutputZeroMQ.cpp
+++ b/src/OutputZeroMQ.cpp
@@ -32,19 +32,31 @@
#if defined(HAVE_ZEROMQ)
-OutputZeroMQ::OutputZeroMQ(std::string endpoint, Buffer* dataOut)
+OutputZeroMQ::OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut)
: ModOutput(ModFormat(1), ModFormat(0)),
+ m_type(type),
m_zmq_context(1),
- m_zmq_pub_sock(m_zmq_context, ZMQ_PUB),
+ m_zmq_sock(m_zmq_context, type),
m_endpoint(endpoint)
{
PDEBUG("OutputZeroMQ::OutputZeroMQ(%p) @ %p\n", dataOut, this);
std::stringstream ss;
- ss << "OutputZeroMQ(" << m_endpoint << ")";
+ ss << "OutputZeroMQ(" << m_endpoint << " ";
+
+ if (type == ZMQ_PUB) {
+ ss << "ZMQ_PUB";
+ }
+ else if (type == ZMQ_REP) {
+ ss << "ZMQ_REP";
+ }
+ else {
+ throw std::invalid_argument("ZMQ socket type unknown");
+ }
+ ss << ")";
m_name = ss.str();
- m_zmq_pub_sock.bind(m_endpoint.c_str());
+ m_zmq_sock.bind(m_endpoint.c_str());
}
OutputZeroMQ::~OutputZeroMQ()
@@ -58,7 +70,13 @@ int OutputZeroMQ::process(Buffer* dataIn, Buffer* dataOut)
"(dataIn: %p, dataOut: %p)\n",
dataIn, dataOut);
- m_zmq_pub_sock.send(dataIn->getData(), dataIn->getLength());
+ if (m_type == ZMQ_REP) {
+ // A ZMQ_REP socket requires a request first
+ zmq::message_t msg;
+ m_zmq_sock.recv(&msg);
+ }
+
+ m_zmq_sock.send(dataIn->getData(), dataIn->getLength());
return dataIn->getLength();
}
diff --git a/src/OutputZeroMQ.h b/src/OutputZeroMQ.h
index a80eab4..85f85a7 100644
--- a/src/OutputZeroMQ.h
+++ b/src/OutputZeroMQ.h
@@ -39,14 +39,15 @@
class OutputZeroMQ : public ModOutput
{
public:
- OutputZeroMQ(std::string endpoint, Buffer* dataOut = NULL);
+ OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut = NULL);
virtual ~OutputZeroMQ();
virtual int process(Buffer* dataIn, Buffer* dataOut);
const char* name() { return m_name.c_str(); }
protected:
+ int m_type; // zmq socket type
zmq::context_t m_zmq_context; // handle for the zmq context
- zmq::socket_t m_zmq_pub_sock; // handle for the zmq publisher socket
+ zmq::socket_t m_zmq_sock; // handle for the zmq publisher socket
std::string m_endpoint; // On which port to listen: e.g.
// tcp://*:58300
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp
index 7a2af00..21a6c81 100644
--- a/src/RemoteControl.cpp
+++ b/src/RemoteControl.cpp
@@ -58,8 +58,10 @@ void RemoteControllerTelnet::restart_thread(long)
void RemoteControllerTelnet::process(long)
{
- m_welcome = "ODR-DabMod Remote Control CLI\nWrite 'help' for help.\n**********\n";
- m_prompt = "> ";
+ std::string m_welcome = "ODR-DabMod Remote Control CLI\n"
+ "Write 'help' for help.\n"
+ "**********\n";
+ std::string m_prompt = "> ";
std::string in_message;
size_t length;
diff --git a/src/RemoteControl.h b/src/RemoteControl.h
index 89a1583..1b5e447 100644
--- a/src/RemoteControl.h
+++ b/src/RemoteControl.h
@@ -34,7 +34,7 @@
#endif
#if defined(HAVE_ZEROMQ)
-#include <zmq.hpp>
+#include "zmq.hpp"
#endif
#include <list>
@@ -50,6 +50,7 @@
#include <boost/thread.hpp>
#include <stdexcept>
+#include "Log.h"
#define RC_ADD_PARAMETER(p, desc) { \
std::vector<std::string> p; \
@@ -114,8 +115,8 @@ class RemoteControllers {
it != m_controllers.end(); ++it) {
if ((*it)->fault_detected())
{
- fprintf(stderr,
- "Detected Remote Control fault, restarting it\n");
+ etiLog.level(warn) <<
+ "Detected Remote Control fault, restarting it";
(*it)->restart();
}
}
@@ -289,9 +290,6 @@ class RemoteControllerTelnet : public BaseRemoteController {
/* This controller commands the controllables in the cohort */
std::list<RemoteControllable*> m_cohort;
- std::string m_welcome;
- std::string m_prompt;
-
int m_port;
};
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
index 78e9ef0..e5e83ef 100644
--- a/src/ThreadsafeQueue.h
+++ b/src/ThreadsafeQueue.h
@@ -38,25 +38,14 @@
* that pushes elements into the queue, and one consumer that
* retrieves the elements.
*
- * The queue can make the consumer block until enough elements
- * are available.
+ * The queue can make the consumer block until an element
+ * is available.
*/
template<typename T>
class ThreadsafeQueue
{
public:
- /* Create a new queue without any minimum required
- * fill before it is possible to pop an element
- */
- ThreadsafeQueue() : the_required_size(1) {}
-
- /* Create a queue where it has to contain at least
- * required_size elements before pop is possible
- */
- ThreadsafeQueue(size_t required_size) : the_required_size(required_size) {
- }
-
/* Push one element into the queue, and notify another thread that
* might be waiting.
*
@@ -87,14 +76,14 @@ public:
size_t size() const
{
+ boost::mutex::scoped_lock lock(the_mutex);
return the_queue.size();
}
bool try_pop(T& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
- if(the_queue.size() < the_required_size)
- {
+ if (the_queue.empty()) {
return false;
}
@@ -103,10 +92,10 @@ public:
return true;
}
- void wait_and_pop(T& popped_value)
+ void wait_and_pop(T& popped_value, size_t prebuffering = 1)
{
boost::mutex::scoped_lock lock(the_mutex);
- while(the_queue.size() < the_required_size) {
+ while (the_queue.size() < prebuffering) {
the_condition_variable.wait(lock);
}
@@ -118,7 +107,6 @@ private:
std::queue<T> the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
- size_t the_required_size;
};
#endif
diff --git a/src/TimestampDecoder.cpp b/src/TimestampDecoder.cpp
index 96c84c0..5044366 100644
--- a/src/TimestampDecoder.cpp
+++ b/src/TimestampDecoder.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2014
+ Copyright (C) 2014, 2015
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -29,6 +29,7 @@
#include <fstream>
#include <string>
#include <boost/lexical_cast.hpp>
+#include <boost/make_shared.hpp>
#include <sys/types.h>
#include "PcDebug.h"
#include "TimestampDecoder.h"
@@ -41,7 +42,8 @@
void TimestampDecoder::calculateTimestamp(struct frame_timestamp& ts)
{
- struct frame_timestamp* ts_queued = new struct frame_timestamp;
+ boost::shared_ptr<struct frame_timestamp> ts_queued =
+ boost::make_shared<struct frame_timestamp>();
/* Push new timestamp into queue */
ts_queued->timestamp_valid = full_timestamp_received_mnsc;
@@ -62,14 +64,14 @@ void TimestampDecoder::calculateTimestamp(struct frame_timestamp& ts)
*
* Therefore, use <= and not < for comparison
*/
- if (queue_timestamps.size() <= modconfig.delay_calculation_pipeline_stages) {
- //fprintf(stderr, "* %zu %u ", queue_timestamps.size(), modconfig.delay_calculation_pipeline_stages);
+ if (queue_timestamps.size() <= m_tist_delay_stages) {
+ //fprintf(stderr, "* %zu %u ", queue_timestamps.size(), m_tist_delay_stages);
/* Return invalid timestamp until the queue is full */
ts.timestamp_valid = false;
ts.timestamp_sec = 0;
ts.timestamp_pps_offset = 0;
ts.timestamp_refresh = false;
- ts.fct = 0;
+ ts.fct = -1;
}
else {
//fprintf(stderr, ". %zu ", queue_timestamps.size());
@@ -87,16 +89,14 @@ void TimestampDecoder::calculateTimestamp(struct frame_timestamp& ts)
ts.timestamp_sec,
ts.timestamp_pps_offset,
ts.timestamp_refresh);*/
-
- delete ts_queued;
}
MDEBUG("Timestamp queue size %zu, delay_calc %u\n",
queue_timestamps.size(),
- modconfig.delay_calculation_pipeline_stages);
+ m_tist_delay_stages);
- if (queue_timestamps.size() > modconfig.delay_calculation_pipeline_stages) {
- myLogger.level(error) << "Error: Timestamp queue is too large : size " <<
+ if (queue_timestamps.size() > m_tist_delay_stages) {
+ etiLog.level(error) << "Error: Timestamp queue is too large : size " <<
queue_timestamps.size() << "! This should not happen !";
}
@@ -191,84 +191,48 @@ void TimestampDecoder::updateTimestampEti(
int framephase,
uint16_t mnsc,
double pps,
- uint32_t fct)
+ int32_t fct)
{
updateTimestampPPS(pps);
pushMNSCData(framephase, mnsc);
latestFCT = fct;
}
-
-bool TimestampDecoder::updateModulatorOffset()
+void TimestampDecoder::set_parameter(
+ const std::string& parameter,
+ const std::string& value)
{
using namespace std;
- using boost::lexical_cast;
- using boost::bad_lexical_cast;
- if (modconfig.use_offset_fixed)
- {
- timestamp_offset = modconfig.offset_fixed;
- return true;
- }
- else if (modconfig.use_offset_file)
- {
- bool r = false;
- double newoffset;
+ stringstream ss(value);
+ ss.exceptions ( stringstream::failbit | stringstream::badbit );
- std::string filedata;
- ifstream filestream;
-
- try
- {
- filestream.open(modconfig.offset_filename.c_str());
- if (!filestream.eof())
- {
- getline(filestream, filedata);
- try
- {
- newoffset = lexical_cast<double>(filedata);
- r = true;
- }
- catch (bad_lexical_cast& e)
- {
- myLogger.level(error) <<
- "Error parsing timestamp offset from file '" <<
- modconfig.offset_filename << "'";
- r = false;
- }
- }
- else
- {
- myLogger.level(error) <<
- "Error reading from timestamp offset file: eof reached\n";
- r = false;
- }
- filestream.close();
- }
- catch (exception& e)
- {
- myLogger.level(error) << "Error opening timestamp offset file\n";
- r = false;
- }
-
-
- if (r)
- {
- if (timestamp_offset != newoffset)
- {
- timestamp_offset = newoffset;
- myLogger.level(info) <<
- "TimestampDecoder::updateTimestampOffset: new offset is " <<
- timestamp_offset;
- offset_changed = true;
- }
+ if (parameter == "offset") {
+ ss >> timestamp_offset;
+ offset_changed = true;
+ }
+ else {
+ stringstream ss;
+ ss << "Parameter '" << parameter
+ << "' is not exported by controllable " << get_rc_name();
+ throw ParameterError(ss.str());
+ }
+}
- }
+const std::string TimestampDecoder::get_parameter(
+ const std::string& parameter) const
+{
+ using namespace std;
- return r;
+ stringstream ss;
+ if (parameter == "offset") {
+ ss << timestamp_offset;
}
else {
- return false;
+ ss << "Parameter '" << parameter <<
+ "' is not exported by controllable " << get_rc_name();
+ throw ParameterError(ss.str());
}
+ return ss.str();
}
diff --git a/src/TimestampDecoder.h b/src/TimestampDecoder.h
index 0c393e4..d8ab633 100644
--- a/src/TimestampDecoder.h
+++ b/src/TimestampDecoder.h
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2014
+ Copyright (C) 2014, 2015
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -28,34 +28,19 @@
#define TIMESTAMP_DECODER_H
#include <queue>
+#include <boost/shared_ptr.hpp>
#include <string>
#include <time.h>
#include <math.h>
#include <stdio.h>
#include "Eti.h"
#include "Log.h"
-
-struct modulator_offset_config
-{
- bool use_offset_fixed;
- double offset_fixed;
- /* These two fields are used when the modulator is run with a fixed offset */
-
- bool use_offset_file;
- std::string offset_filename;
- /* These two fields are used when the modulator reads the offset from a file */
-
- unsigned delay_calculation_pipeline_stages;
- /* Specifies by how many stages the timestamp must be delayed.
- * (e.g. The FIRFilter is pipelined, therefore we must increase
- * delay_calculation_pipeline_stages by one if the filter is used
- */
-};
+#include "RemoteControl.h"
struct frame_timestamp
{
// Which frame count does this timestamp apply to
- uint32_t fct;
+ int32_t fct;
uint32_t timestamp_sec;
double timestamp_pps_offset;
@@ -101,21 +86,32 @@ struct frame_timestamp
void print(const char* t)
{
fprintf(stderr,
- "%s <struct frame_timestamp(%s, %d, %.9f)>\n",
+ "%s <struct frame_timestamp(%s, %d, %.9f, %d)>\n",
t, this->timestamp_valid ? "valid" : "invalid",
- this->timestamp_sec, this->timestamp_pps_offset);
+ this->timestamp_sec, this->timestamp_pps_offset,
+ this->fct);
}
};
/* This module decodes MNSC time information */
-class TimestampDecoder
+class TimestampDecoder : public RemoteControllable
{
public:
TimestampDecoder(
- struct modulator_offset_config& config,
- Logger& logger):
- myLogger(logger), modconfig(config)
+ /* The modulator adds this offset to the TIST to define time of
+ * frame transmission
+ */
+ double offset_s,
+
+ /* Specifies by how many stages the timestamp must be delayed.
+ * (e.g. The FIRFilter is pipelined, therefore we must increase
+ * tist_delay_stages by one if the filter is used
+ */
+ unsigned tist_delay_stages) :
+ RemoteControllable("tist")
{
+ timestamp_offset = offset_s;
+ m_tist_delay_stages = tist_delay_stages;
inhibit_second_update = 0;
time_pps = 0.0;
time_secs = 0;
@@ -125,10 +121,10 @@ class TimestampDecoder
gmtime_r(0, &temp_time);
offset_changed = false;
- myLogger.level(info) << "Setting up timestamp decoder with " <<
- (modconfig.use_offset_fixed ? "fixed" :
- (modconfig.use_offset_file ? "dynamic" : "none")) <<
- " offset";
+ RC_ADD_PARAMETER(offset, "TIST offset [s]");
+
+ etiLog.level(info) << "Setting up timestamp decoder with " <<
+ timestamp_offset << " offset";
};
@@ -140,16 +136,25 @@ class TimestampDecoder
int framephase,
uint16_t mnsc,
double pps,
- uint32_t fct);
+ int32_t fct);
- /* Update the modulator timestamp offset according to the modconf
+ /*********** REMOTE CONTROL ***************/
+ /* virtual void enrol_at(BaseRemoteController& controller)
+ * is inherited
*/
- bool updateModulatorOffset();
- protected:
- /* Main program logger */
- Logger& myLogger;
+ /* Base function to set parameters. */
+ virtual void set_parameter(const std::string& parameter,
+ const std::string& value);
+
+ /* Getting a parameter always returns a string. */
+ virtual const std::string get_parameter(
+ const std::string& parameter) const;
+ const char* name() { return "TS"; }
+
+
+ protected:
/* Push a new MNSC field into the decoder */
void pushMNSCData(int framephase, uint16_t mnsc);
@@ -167,15 +172,13 @@ class TimestampDecoder
struct tm temp_time;
uint32_t time_secs;
- uint32_t latestFCT;
+ int32_t latestFCT;
double time_pps;
double timestamp_offset;
+ unsigned m_tist_delay_stages;
int inhibit_second_update;
bool offset_changed;
- /* configuration for the offset management */
- struct modulator_offset_config& modconfig;
-
/* When the type or identifier don't match, the decoder must
* be disabled
*/
@@ -189,8 +192,9 @@ class TimestampDecoder
* synchronise two modulators if only one uses (for instance) the
* FIRFilter (1 stage pipeline)
*/
- std::queue<struct frame_timestamp*> queue_timestamps;
+ std::queue<boost::shared_ptr<struct frame_timestamp> > queue_timestamps;
};
#endif
+
diff --git a/src/Utils.cpp b/src/Utils.cpp
new file mode 100644
index 0000000..6c9b0fc
--- /dev/null
+++ b/src/Utils.cpp
@@ -0,0 +1,118 @@
+/*
+ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
+ Her Majesty the Queen in Right of Canada (Communications Research
+ Center Canada)
+
+ Copyright (C) 2015
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMod.
+
+ ODR-DabMod is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMod is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "Utils.h"
+#include "GainControl.h"
+
+void printUsage(char* progName)
+{
+ FILE* out = stderr;
+
+ fprintf(out, "Welcome to %s %s, compiled at %s, %s\n\n",
+ PACKAGE,
+#if defined(GITVERSION)
+ GITVERSION,
+#else
+ VERSION,
+#endif
+ __DATE__, __TIME__);
+ fprintf(out, "Usage with configuration file:\n");
+ fprintf(out, "\t%s [-C] config_file.ini\n\n", progName);
+
+ fprintf(out, "Usage with command line options:\n");
+ fprintf(out, "\t%s"
+ " input"
+ " (-f filename | -u uhddevice -F frequency) "
+ " [-G txgain]"
+ " [-o offset]"
+ " [-T filter_taps_file]"
+ " [-a gain]"
+ " [-c clockrate]"
+ " [-g gainMode]"
+ " [-h]"
+ " [-l]"
+ " [-m dabMode]"
+ " [-r samplingRate]"
+ "\n", progName);
+ fprintf(out, "Where:\n");
+ fprintf(out, "input: ETI input filename (default: stdin).\n");
+ fprintf(out, "-f name: Use file output with given filename. (use /dev/stdout for standard output)\n");
+ fprintf(out, "-u device: Use UHD output with given device string. (use "" for default device)\n");
+ fprintf(out, "-F frequency: Set the transmit frequency when using UHD output. (mandatory option when using UHD)\n");
+ fprintf(out, "-G txgain: Set the transmit gain for the UHD driver (default: 0)\n");
+ fprintf(out, "-o: (UHD only) Set the timestamp offset added to the timestamp in the ETI. The offset is a double.\n");
+ fprintf(out, " Specifying this option has two implications: It enables synchronous transmission,\n"
+ " requiring an external REFCLK and PPS signal and frames that do not contain a valid timestamp\n"
+ " get muted.\n\n");
+ fprintf(out, "-T taps_file: Enable filtering before the output, using the specified file containing the filter taps.\n");
+ fprintf(out, "-a gain: Apply digital amplitude gain.\n");
+ fprintf(out, "-c rate: Set the DAC clock rate and enable Cic Equalisation.\n");
+ fprintf(out, "-g: Set computation gain mode: "
+ "%u FIX, %u MAX, %u VAR\n", GAIN_FIX, GAIN_MAX, GAIN_VAR);
+ fprintf(out, "-h: Print this help.\n");
+ fprintf(out, "-l: Loop file when reach end of file.\n");
+ fprintf(out, "-m mode: Set DAB mode: (0: auto, 1-4: force).\n");
+ fprintf(out, "-r rate: Set output sampling rate (default: 2048000).\n\n");
+}
+
+
+void printVersion(void)
+{
+ FILE *out = stderr;
+
+ fprintf(out, "Welcome to %s %s, compiled at %s, %s\n\n",
+ PACKAGE, VERSION, __DATE__, __TIME__);
+ fprintf(out,
+ " ODR-DabMod is copyright (C) Her Majesty the Queen in Right of Canada,\n"
+ " 2009, 2010, 2011, 2012 Communications Research Centre (CRC),\n"
+ " and\n"
+ " Copyright (C) 2014, 2015 Matthias P. Braendli, matthias.braendli@mpb.li\n"
+ "\n"
+ " http://opendigitalradio.org\n"
+ "\n"
+ " ODR-DabMod is free software: you can redistribute it and/or modify it\n"
+ " under the terms of the GNU General Public License as published by the\n"
+ " Free Software Foundation, either version 3 of the License, or (at your\n"
+ " option) any later version.\n"
+ "\n"
+ " ODR-DabMod is distributed in the hope that it will be useful, but\n"
+ " WITHOUT ANY WARRANTY; without even the implied warranty of\n"
+ " MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU\n"
+ " General Public License for more details.\n"
+ "\n"
+ " You should have received a copy of the GNU General Public License along\n"
+ " with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.\n"
+ "\n"
+#if USE_KISS_FFT
+ "ODR-DabMod makes use of the following open source packages:\n"
+ " Kiss FFT v1.2.9 (Revised BSD) - http://kissfft.sourceforge.net/\n"
+#endif
+ );
+
+}
+
+
diff --git a/src/Utils.h b/src/Utils.h
new file mode 100644
index 0000000..f023646
--- /dev/null
+++ b/src/Utils.h
@@ -0,0 +1,62 @@
+/*
+ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
+ Her Majesty the Queen in Right of Canada (Communications Research
+ Center Canada)
+
+ Copyright (C) 2015
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMod.
+
+ ODR-DabMod is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMod is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __UTILS_H_
+#define __UTILS_H_
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <time.h>
+
+void printUsage(char* progName);
+
+void printVersion(void);
+
+inline long timespecdiff_us(struct timespec& oldTime, struct timespec& time)
+{
+ long tv_sec;
+ long tv_nsec;
+ if (time.tv_nsec < oldTime.tv_nsec) {
+ tv_sec = time.tv_sec - 1 - oldTime.tv_sec;
+ tv_nsec = 1000000000L + time.tv_nsec - oldTime.tv_nsec;
+ }
+ else {
+ tv_sec = time.tv_sec - oldTime.tv_sec;
+ tv_nsec = time.tv_nsec - oldTime.tv_nsec;
+ }
+
+ return tv_sec * 1000 + tv_nsec / 1000;
+}
+
+
+#endif
+