summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/DabMod.cpp153
-rw-r--r--src/DabModulator.cpp22
-rw-r--r--src/DabModulator.h8
-rw-r--r--src/EtiReader.cpp16
-rw-r--r--src/EtiReader.h10
-rw-r--r--src/FIRFilter.cpp21
-rw-r--r--src/FIRFilter.h8
-rw-r--r--src/InputFileReader.cpp49
-rw-r--r--src/InputReader.h21
-rw-r--r--src/InputZeroMQReader.cpp48
-rw-r--r--src/OutputUHD.cpp808
-rw-r--r--src/OutputUHD.h58
-rw-r--r--src/RemoteControl.cpp15
-rw-r--r--src/RemoteControl.h10
-rw-r--r--src/SignalMultiplexer.cpp14
-rw-r--r--src/TII.cpp367
-rw-r--r--src/TII.h105
-rw-r--r--src/ThreadsafeQueue.h24
-rw-r--r--src/TimestampDecoder.cpp108
-rw-r--r--src/TimestampDecoder.h73
-rw-r--r--src/Utils.cpp5
-rw-r--r--src/Utils.h18
22 files changed, 1305 insertions, 656 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index ec1a4cd..3ed5e40 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -70,7 +70,7 @@
# define memalign(a, b) malloc(b)
#endif
-#define ZMQ_INPUT_MAX_FRAME_QUEUE 50
+#define ZMQ_INPUT_MAX_FRAME_QUEUE 500
typedef std::complex<float> complexf;
@@ -108,7 +108,7 @@ enum run_modulator_state {
MOD_AGAIN
};
-run_modulator_state run_modulator(Logger& logger, modulator_data& m);
+run_modulator_state run_modulator(modulator_data& m);
int launch_modulator(int argc, char* argv[])
{
@@ -132,6 +132,7 @@ int launch_modulator(int argc, char* argv[])
float normalise = 1.0f;
GainMode gainMode = GAIN_VAR;
+ tii_config_t tiiConfig;
/* UHD requires the input I and Q samples to be in the interval
* [-1.0,1.0], otherwise they get truncated, which creates very
@@ -157,10 +158,8 @@ int launch_modulator(int argc, char* argv[])
modulator_data m;
// To handle the timestamp offset of the modulator
- struct modulator_offset_config modconf;
- modconf.use_offset_file = false;
- modconf.use_offset_fixed = false;
- modconf.delay_calculation_pipeline_stages = 0;
+ unsigned tist_delay_stages = 0;
+ double tist_offset_s = 0.0;
shared_ptr<Flowgraph> flowgraph(new Flowgraph());
shared_ptr<FormatConverter> format_converter;
@@ -171,10 +170,9 @@ int launch_modulator(int argc, char* argv[])
bool run_again = true;
- Logger logger;
- InputFileReader inputFileReader(logger);
+ InputFileReader inputFileReader;
#if defined(HAVE_ZEROMQ)
- shared_ptr<InputZeroMQReader> inputZeroMQReader(new InputZeroMQReader(logger));
+ shared_ptr<InputZeroMQReader> inputZeroMQReader(new InputZeroMQReader());
#endif
struct sigaction sa;
@@ -239,25 +237,7 @@ int launch_modulator(int argc, char* argv[])
loop = true;
break;
case 'o':
- if (modconf.use_offset_file)
- {
- fprintf(stderr, "Options -o and -O are mutually exclusive\n");
- throw std::invalid_argument("Invalid command line options");
- }
- modconf.use_offset_fixed = true;
- modconf.offset_fixed = strtod(optarg, NULL);
-#if defined(HAVE_OUTPUT_UHD)
- outputuhd_conf.enableSync = true;
-#endif
- break;
- case 'O':
- if (modconf.use_offset_fixed)
- {
- fprintf(stderr, "Options -o and -O are mutually exclusive\n");
- throw std::invalid_argument("Invalid command line options");
- }
- modconf.use_offset_file = true;
- modconf.offset_filename = std::string(optarg);
+ tist_offset_s = strtod(optarg, NULL);
#if defined(HAVE_OUTPUT_UHD)
outputuhd_conf.enableSync = true;
#endif
@@ -402,7 +382,7 @@ int launch_modulator(int argc, char* argv[])
// log parameters:
if (pt.get("log.syslog", 0) == 1) {
LogToSyslog* log_syslog = new LogToSyslog();
- logger.register_backend(log_syslog);
+ etiLog.register_backend(log_syslog);
}
if (pt.get("log.filelog", 0) == 1) {
@@ -417,7 +397,7 @@ int launch_modulator(int argc, char* argv[])
}
LogToFile* log_file = new LogToFile(logfilename);
- logger.register_backend(log_file);
+ etiLog.register_backend(log_file);
}
@@ -559,6 +539,8 @@ int launch_modulator(int argc, char* argv[])
throw std::runtime_error("Configuration error");
}
+ outputuhd_conf.maxGPSHoldoverTime = pt.get("uhdoutput.max_gps_holdover_time", 0);
+
useUHDOutput = 1;
}
#endif
@@ -577,49 +559,45 @@ int launch_modulator(int argc, char* argv[])
#if defined(HAVE_OUTPUT_UHD)
outputuhd_conf.enableSync = (pt.get("delaymanagement.synchronous", 0) == 1);
if (outputuhd_conf.enableSync) {
+ std::string delay_mgmt = pt.get<std::string>("delaymanagement.management", "");
+ std::string fixedoffset = pt.get<std::string>("delaymanagement.fixedoffset", "");
+ std::string offset_filename = pt.get<std::string>("delaymanagement.dynamicoffsetfile", "");
+
+ if (not(delay_mgmt.empty() and fixedoffset.empty() and offset_filename.empty())) {
+ std::cerr << "Warning: you are using the old config syntax for the offset management.\n";
+ std::cerr << " Please see the example.ini configuration for the new settings.\n";
+ }
+
try {
- std::string delay_mgmt = pt.get<std::string>("delaymanagement.management");
- if (delay_mgmt == "fixed") {
- modconf.offset_fixed = pt.get<double>("delaymanagement.fixedoffset");
- modconf.use_offset_fixed = true;
- }
- else if (delay_mgmt == "dynamic") {
- modconf.offset_filename = pt.get<std::string>("delaymanagement.dynamicoffsetfile");
- modconf.use_offset_file = true;
- }
- else {
- throw std::runtime_error("invalid management value");
- }
+ tist_offset_s = pt.get<double>("delaymanagement.offset");
}
catch (std::exception &e) {
- std::cerr << "Error: " << e.what() << "\n";
- std::cerr << " Synchronised transmission enabled, but delay management specification is incomplete.\n";
+ std::cerr << "Error: delaymanagement: synchronous is enabled, but no offset defined!\n";
throw std::runtime_error("Configuration error");
}
}
outputuhd_conf.muteNoTimestamps = (pt.get("delaymanagement.mutenotimestamps", 0) == 1);
#endif
+
+ /* Read TII parameters from config file */
+ tiiConfig.enable = pt.get("tii.enable", 0);
+ tiiConfig.comb = pt.get("tii.comb", 0);
+ tiiConfig.pattern = pt.get("tii.pattern", 0);
}
if (rcs.get_no_controllers() == 0) {
- logger.level(warn) << "No Remote-Control started";
+ etiLog.level(warn) << "No Remote-Control started";
rcs.add_controller(new RemoteControllerDummy());
}
- logger.level(info) << "Starting up";
-
- if (!(modconf.use_offset_file || modconf.use_offset_fixed)) {
- logger.level(debug) << "No Modulator offset defined, setting to 0";
- modconf.use_offset_fixed = true;
- modconf.offset_fixed = 0;
- }
+ etiLog.level(info) << "Starting up";
// When using the FIRFilter, increase the modulator offset pipelining delay
// by the correct amount
if (filterTapsFilename != "") {
- modconf.delay_calculation_pipeline_stages += FIRFILTER_PIPELINE_DELAY;
+ tist_delay_stages += FIRFILTER_PIPELINE_DELAY;
}
// Setting ETI input filename
@@ -647,12 +625,12 @@ int launch_modulator(int argc, char* argv[])
fprintf(stderr, "\n");
printUsage(argv[0]);
ret = -1;
- logger.level(error) << "Received invalid command line arguments";
+ etiLog.level(error) << "Received invalid command line arguments";
throw std::invalid_argument("Invalid command line options");
}
if (!useFileOutput && !useUHDOutput && !useZeroMQOutput) {
- logger.level(error) << "Output not specified";
+ etiLog.level(error) << "Output not specified";
fprintf(stderr, "Must specify output !");
throw std::runtime_error("Configuration error");
}
@@ -671,10 +649,14 @@ int launch_modulator(int argc, char* argv[])
fprintf(stderr, " UHD\n"
" Device: %s\n"
" Type: %s\n"
- " master_clock_rate: %ld\n",
+ " master_clock_rate: %ld\n"
+ " refclk: %s\n"
+ " pps source: %s\n",
outputuhd_conf.device.c_str(),
outputuhd_conf.usrpType.c_str(),
- outputuhd_conf.masterClockRate);
+ outputuhd_conf.masterClockRate,
+ outputuhd_conf.refclk_src.c_str(),
+ outputuhd_conf.pps_src.c_str());
}
#endif
else if (useZeroMQOutput) {
@@ -700,7 +682,7 @@ int launch_modulator(int argc, char* argv[])
// Opening ETI input file
if (inputFileReader.Open(inputName, loop) == -1) {
fprintf(stderr, "Unable to open input file!\n");
- logger.level(error) << "Unable to open input file!";
+ etiLog.level(error) << "Unable to open input file!";
ret = -1;
throw std::runtime_error("Unable to open input");
}
@@ -741,7 +723,7 @@ int launch_modulator(int argc, char* argv[])
else if (useUHDOutput) {
normalise = 1.0f / normalise_factor;
outputuhd_conf.sampleRate = outputRate;
- output = make_shared<OutputUHD>(outputuhd_conf, &logger);
+ output = make_shared<OutputUHD>(outputuhd_conf);
((OutputUHD*)output.get())->enrol_at(rcs);
}
#endif
@@ -772,8 +754,9 @@ int launch_modulator(int argc, char* argv[])
shared_ptr<InputMemory> input(new InputMemory(&m.data));
shared_ptr<DabModulator> modulator(
- new DabModulator(modconf, &rcs, logger, outputRate, clockRate,
- dabMode, gainMode, digitalgain, normalise, filterTapsFilename));
+ new DabModulator(tist_offset_s, tist_delay_stages, &rcs,
+ tiiConfig, outputRate, clockRate, dabMode, gainMode,
+ digitalgain, normalise, filterTapsFilename));
flowgraph.connect(input, modulator);
if (format_converter) {
@@ -792,41 +775,47 @@ int launch_modulator(int argc, char* argv[])
m.inputReader->PrintInfo();
- run_modulator_state st = run_modulator(logger, m);
+ run_modulator_state st = run_modulator(m);
switch (st) {
case MOD_FAILURE:
- fprintf(stderr, "\nModulator failure.\n");
+ etiLog.level(error) << "Modulator failure.";
run_again = false;
ret = 1;
break;
-#if defined(HAVE_ZEROMQ)
case MOD_AGAIN:
- fprintf(stderr, "\nRestart modulator\n");
- running = true;
- if (inputTransport == "zeromq") {
+ etiLog.level(warn) << "Restart modulator.";
+ run_again = false;
+ if (inputTransport == "file") {
+ if (inputFileReader.Open(inputName, loop) == -1) {
+ etiLog.level(error) << "Unable to open input file!";
+ ret = 1;
+ }
+ else {
+ run_again = true;
+ }
+ }
+ else if (inputTransport == "zeromq") {
+#if defined(HAVE_ZEROMQ)
run_again = true;
-
// Create a new input reader
- inputZeroMQReader = make_shared<InputZeroMQReader>(logger);
+ inputZeroMQReader = make_shared<InputZeroMQReader>();
inputZeroMQReader->Open(inputName, inputMaxFramesQueued);
m.inputReader = inputZeroMQReader.get();
+#endif
}
break;
-#endif
case MOD_NORMAL_END:
default:
- fprintf(stderr, "\nModulator stopped.\n");
+ etiLog.level(info) << "modulator stopped.";
ret = 0;
run_again = false;
break;
}
fprintf(stderr, "\n\n");
- fprintf(stderr, "%lu DAB frames encoded\n", m.framecount);
- fprintf(stderr, "%f seconds encoded\n", (float)m.framecount * 0.024f);
-
- fprintf(stderr, "\nCleaning flowgraph...\n");
+ etiLog.level(info) << m.framecount << " DAB frames encoded";
+ etiLog.level(info) << ((float)m.framecount * 0.024f) << " seconds encoded";
m.data.setLength(0);
}
@@ -835,11 +824,11 @@ int launch_modulator(int argc, char* argv[])
// Cleaning things
////////////////////////////////////////////////////////////////////////
- logger.level(info) << "Terminating";
+ etiLog.level(info) << "Terminating";
return ret;
}
-run_modulator_state run_modulator(Logger& logger, modulator_data& m)
+run_modulator_state run_modulator(modulator_data& m)
{
run_modulator_state ret = MOD_FAILURE;
try {
@@ -873,10 +862,10 @@ run_modulator_state run_modulator(Logger& logger, modulator_data& m)
}
}
if (framesize == 0) {
- logger.level(info) << "End of file reached.";
+ etiLog.level(info) << "End of file reached.";
}
else {
- logger.level(error) << "Input read error.";
+ etiLog.level(error) << "Input read error.";
}
running = 0;
ret = MOD_NORMAL_END;
@@ -884,15 +873,15 @@ run_modulator_state run_modulator(Logger& logger, modulator_data& m)
#if defined(HAVE_OUTPUT_UHD)
} catch (fct_discontinuity_error& e) {
// The OutputUHD saw a FCT discontinuity
- logger.level(warn) << e.what();
+ etiLog.level(warn) << e.what();
ret = MOD_AGAIN;
#endif
} catch (zmq_input_overflow& e) {
// The ZeroMQ input has overflowed its buffer
- logger.level(warn) << e.what();
+ etiLog.level(warn) << e.what();
ret = MOD_AGAIN;
} catch (std::exception& e) {
- logger.level(error) << "Exception caught: " << e.what();
+ etiLog.level(error) << "Exception caught: " << e.what();
ret = MOD_FAILURE;
}
diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp
index 667d885..8a52401 100644
--- a/src/DabModulator.cpp
+++ b/src/DabModulator.cpp
@@ -26,6 +26,7 @@
*/
#include <string>
+#include <boost/make_shared.hpp>
#include "DabModulator.h"
#include "PcDebug.h"
@@ -46,6 +47,7 @@
#include "Resampler.h"
#include "ConvEncoder.h"
#include "FIRFilter.h"
+#include "TII.h"
#include "PuncturingEncoder.h"
#include "TimeInterleaver.h"
#include "TimestampDecoder.h"
@@ -55,25 +57,25 @@
using namespace boost;
DabModulator::DabModulator(
- struct modulator_offset_config& modconf,
+ double tist_offset_s, unsigned tist_delay_stages,
RemoteControllers* rcs,
- Logger& logger,
+ const tii_config_t& tiiConfig,
unsigned outputRate, unsigned clockRate,
unsigned dabMode, GainMode gainMode,
float digGain, float normalise,
std::string filterTapsFilename
) :
ModCodec(ModFormat(1), ModFormat(0)),
- myLogger(logger),
myOutputRate(outputRate),
myClockRate(clockRate),
myDabMode(dabMode),
myGainMode(gainMode),
myDigGain(digGain),
myNormalise(normalise),
- myEtiReader(EtiReader(modconf, myLogger)),
+ myEtiReader(EtiReader(tist_offset_s, tist_delay_stages, rcs)),
myFlowgraph(NULL),
myFilterTapsFilename(filterTapsFilename),
+ myTiiConfig(tiiConfig),
myRCs(rcs)
{
PDEBUG("DabModulator::DabModulator(%u, %u, %u, %u) @ %p\n",
@@ -196,6 +198,14 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)
(float)mySpacing * (float)myOutputRate / 2048000.0f,
cic_ratio));
+ shared_ptr<TII> tii;
+ try {
+ tii = make_shared<TII>(myDabMode, myTiiConfig);
+ tii->enrol_at(*myRCs);
+ }
+ catch (std::runtime_error& e) {
+ etiLog.level(error) << "Could not initialise TII, skipping!";
+ }
shared_ptr<OfdmGenerator> cifOfdm(
new OfdmGenerator((1 + myNbSymbols), myNbCarriers, mySpacing));
@@ -346,6 +356,10 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)
myFlowgraph->connect(cifFreq, cifDiff);
myFlowgraph->connect(cifNull, cifSig);
myFlowgraph->connect(cifDiff, cifSig);
+ if (tii) {
+ myFlowgraph->connect(tii, cifSig);
+ }
+
if (useCicEq) {
myFlowgraph->connect(cifSig, cifCicEq);
myFlowgraph->connect(cifCicEq, cifOfdm);
diff --git a/src/DabModulator.h b/src/DabModulator.h
index 89ddd7c..cee066a 100644
--- a/src/DabModulator.h
+++ b/src/DabModulator.h
@@ -43,15 +43,16 @@
#include "OutputMemory.h"
#include "RemoteControl.h"
#include "Log.h"
+#include "TII.h"
class DabModulator : public ModCodec
{
public:
DabModulator(
- struct modulator_offset_config& modconf,
+ double tist_offset_s, unsigned tist_delay_stages,
RemoteControllers* rcs,
- Logger& logger,
+ const tii_config_t& tiiConfig,
unsigned outputRate = 2048000, unsigned clockRate = 0,
unsigned dabMode = 0, GainMode gainMode = GAIN_VAR,
float digGain = 1.0, float normalise = 1.0,
@@ -66,8 +67,6 @@ public:
EtiReader* getEtiReader() { return &myEtiReader; }
protected:
- Logger& myLogger;
-
void setMode(unsigned mode);
unsigned myOutputRate;
@@ -80,6 +79,7 @@ protected:
Flowgraph* myFlowgraph;
OutputMemory* myOutput;
std::string myFilterTapsFilename;
+ tii_config_t myTiiConfig;
RemoteControllers* myRCs;
size_t myNbSymbols;
diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp
index 0e4182d..f584275 100644
--- a/src/EtiReader.cpp
+++ b/src/EtiReader.cpp
@@ -51,15 +51,18 @@ enum ETI_READER_STATE {
};
-EtiReader::EtiReader(struct modulator_offset_config& modconf,
- Logger& logger) :
- myLogger(logger),
+EtiReader::EtiReader(
+ double tist_offset_s,
+ unsigned tist_delay_stages,
+ RemoteControllers* rcs) :
state(EtiReaderStateSync),
myFicSource(NULL),
- myTimestampDecoder(modconf, myLogger)
+ myTimestampDecoder(tist_offset_s, tist_delay_stages)
{
PDEBUG("EtiReader::EtiReader()\n");
+ myTimestampDecoder.enrol_at(*rcs);
+
myCurrentFrame = 0;
eti_fc_valid = false;
}
@@ -286,11 +289,6 @@ int EtiReader::process(const Buffer* dataIn)
myTimestampDecoder.updateTimestampEti(eti_fc.FP & 0x3,
eti_eoh.MNSC, getPPSOffset(), eti_fc.FCT);
- if (eti_fc.FCT % 125 == 0) //every 3 seconds is fine enough
- {
- myTimestampDecoder.updateModulatorOffset();
- }
-
return dataIn->getLength() - input_size;
}
diff --git a/src/EtiReader.h b/src/EtiReader.h
index b893f01..84ad9b4 100644
--- a/src/EtiReader.h
+++ b/src/EtiReader.h
@@ -47,7 +47,10 @@
class EtiReader
{
public:
- EtiReader(struct modulator_offset_config& modconf, Logger& logger);
+ EtiReader(
+ double tist_offset_s,
+ unsigned tist_delay_stages,
+ RemoteControllers* rcs);
virtual ~EtiReader();
EtiReader(const EtiReader&);
EtiReader& operator=(const EtiReader&);
@@ -67,9 +70,6 @@ public:
bool sourceContainsTimestamp();
protected:
- /* Main program logger */
- Logger& myLogger;
-
/* Transform the ETI TIST to a PPS offset in ms */
double getPPSOffset();
@@ -89,8 +89,6 @@ protected:
private:
size_t myCurrentFrame;
- bool time_ext_enabled;
- unsigned long timestamp_seconds;
bool eti_fc_valid;
};
diff --git a/src/FIRFilter.cpp b/src/FIRFilter.cpp
index 805c6d2..b1ce618 100644
--- a/src/FIRFilter.cpp
+++ b/src/FIRFilter.cpp
@@ -36,6 +36,8 @@
#include <iostream>
#include <fstream>
+#include <boost/make_shared.hpp>
+
#ifdef __AVX__
# include <immintrin.h>
#else
@@ -58,11 +60,10 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd)
// the incoming buffer
while(running) {
- Buffer* dataIn;
+ boost::shared_ptr<Buffer> dataIn;
fwd->input_queue.wait_and_pop(dataIn);
- Buffer* dataOut;
- dataOut = new Buffer();
+ boost::shared_ptr<Buffer> dataOut = boost::make_shared<Buffer>();
dataOut->setLength(dataIn->getLength());
PDEBUG("FIRFilterWorker: dataIn->getLength() %zu\n", dataIn->getLength());
@@ -91,7 +92,7 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd)
fprintf(stderr, "FIRFilterWorker: out not aligned %p ", out);
throw std::runtime_error("FIRFilterWorker: out not aligned");
}
-
+
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &time_start);
__m256 AVXout;
@@ -141,7 +142,7 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd)
fprintf(stderr, "FIRFilterWorker: out not aligned %p ", out);
throw std::runtime_error("FIRFilterWorker: out not aligned");
}
-
+
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &time_start);
__m128 SSEout;
@@ -290,11 +291,10 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd)
}
}
#endif
-
+
calculationTime += (time_end.tv_sec - time_start.tv_sec) * 1000000000L +
time_end.tv_nsec - time_start.tv_nsec;
fwd->output_queue.push(dataOut);
- delete dataIn;
}
}
@@ -393,17 +393,16 @@ int FIRFilter::process(Buffer* const dataIn, Buffer* dataOut)
// This thread creates the dataIn buffer, and deletes
// the outgoing buffer
- Buffer* inbuffer = new Buffer(dataIn->getLength(), dataIn->getData());
+ boost::shared_ptr<Buffer> inbuffer =
+ boost::make_shared<Buffer>(dataIn->getLength(), dataIn->getData());
firwd.input_queue.push(inbuffer);
if (number_of_runs > 2) {
- Buffer* outbuffer;
+ boost::shared_ptr<Buffer> outbuffer;
firwd.output_queue.wait_and_pop(outbuffer);
dataOut->setData(outbuffer->getData(), outbuffer->getLength());
-
- delete outbuffer;
}
else {
dataOut->setLength(dataIn->getLength());
diff --git a/src/FIRFilter.h b/src/FIRFilter.h
index 0ecae3e..751be91 100644
--- a/src/FIRFilter.h
+++ b/src/FIRFilter.h
@@ -30,7 +30,7 @@
#endif
#include <boost/thread.hpp>
-#include "ThreadsafeQueue.h"
+#include <boost/shared_ptr.hpp>
#include "RemoteControl.h"
#include "ModCodec.h"
@@ -52,8 +52,8 @@ struct FIRFilterWorkerData {
/* Thread-safe queues to give data to and get data from
* the worker
*/
- ThreadsafeQueue<Buffer*> input_queue;
- ThreadsafeQueue<Buffer*> output_queue;
+ ThreadsafeQueue<boost::shared_ptr<Buffer> > input_queue;
+ ThreadsafeQueue<boost::shared_ptr<Buffer> > output_queue;
/* Remote-control can change the taps while the filter
* runs. This lock makes sure nothing bad happens when
@@ -127,5 +127,5 @@ protected:
struct FIRFilterWorkerData firwd;
};
-
#endif //FIRFILTER_H
+
diff --git a/src/InputFileReader.cpp b/src/InputFileReader.cpp
index 205fbfa..84f0be4 100644
--- a/src/InputFileReader.cpp
+++ b/src/InputFileReader.cpp
@@ -49,8 +49,7 @@ int InputFileReader::Open(std::string filename, bool loop)
loop_ = loop;
inputfile_ = fopen(filename_.c_str(), "r");
if (inputfile_ == NULL) {
- fprintf(stderr, "Unable to open input file!\n");
- logger_.level(error) << "Unable to open input file!";
+ etiLog.level(error) << "Unable to open input file!";
perror(filename_.c_str());
return -1;
}
@@ -79,8 +78,7 @@ int InputFileReader::IdentifyType()
char discard_buffer[6144];
if (fread(&sync, sizeof(sync), 1, inputfile_) != 1) {
- fprintf(stderr, "Unable to read sync in input file!\n");
- logger_.level(error) << "Unable to read sync in input file!";
+ etiLog.level(error) << "Unable to read sync in input file!";
perror(filename_.c_str());
return -1;
}
@@ -96,8 +94,7 @@ int InputFileReader::IdentifyType()
// if the seek fails, consume the rest of the frame
if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_)
!= 1) {
- fprintf(stderr, "Unable to read from input file!\n");
- logger_.level(error) << "Unable to read from input file!";
+ etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
return -1;
}
@@ -108,8 +105,7 @@ int InputFileReader::IdentifyType()
nbFrames = sync;
if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) {
- fprintf(stderr, "Unable to read frame size in input file!\n");
- logger_.level(error) << "Unable to read frame size in input file!";
+ etiLog.level(error) << "Unable to read frame size in input file!";
perror(filename_.c_str());
return -1;
}
@@ -130,8 +126,7 @@ int InputFileReader::IdentifyType()
// if the seek fails, consume the rest of the frame
if (fread(discard_buffer, frameSize - 4, 1, inputfile_)
!= 1) {
- fprintf(stderr, "Unable to read from input file!\n");
- logger_.level(error) << "Unable to read from input file!";
+ etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
return -1;
}
@@ -141,8 +136,7 @@ int InputFileReader::IdentifyType()
}
if (fread(&sync, sizeof(sync), 1, inputfile_) != 1) {
- fprintf(stderr, "Unable to read nb frame in input file!\n");
- logger_.level(error) << "Unable to read nb frame in input file!";
+ etiLog.level(error) << "Unable to read nb frame in input file!";
perror(filename_.c_str());
return -1;
}
@@ -152,8 +146,7 @@ int InputFileReader::IdentifyType()
// if the seek fails, consume the rest of the frame
if (fread(discard_buffer, frameSize - 4, 1, inputfile_)
!= 1) {
- fprintf(stderr, "Unable to read from input file!\n");
- logger_.level(error) << "Unable to read from input file!";
+ etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
return -1;
}
@@ -168,8 +161,7 @@ int InputFileReader::IdentifyType()
sync >>= 8;
sync &= 0xffffff;
if (fread((uint8_t*)&sync + 3, 1, 1, inputfile_) != 1) {
- fprintf(stderr, "Unable to read from input file!\n");
- logger_.level(error) << "Unable to read from input file!";
+ etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
return -1;
}
@@ -184,8 +176,7 @@ int InputFileReader::IdentifyType()
if (fseek(inputfile_, -sizeof(sync), SEEK_CUR) != 0) {
if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_)
!= 1) {
- fprintf(stderr, "Unable to read from input file!\n");
- logger_.level(error) << "Unable to read from input file!";
+ etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
return -1;
}
@@ -195,8 +186,7 @@ int InputFileReader::IdentifyType()
}
}
- fprintf(stderr, "Bad input file format!\n");
- logger_.level(error) << "Bad input file format!";
+ etiLog.level(error) << "Bad input file format!";
return -1;
}
@@ -236,18 +226,18 @@ int InputFileReader::GetNextFrame(void* buffer)
}
else {
if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) {
- logger_.level(error) << "Reached end of file.";
+ etiLog.level(error) << "Reached end of file.";
if (loop_) {
if (Rewind() == 0) {
if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) {
PDEBUG("Error after rewinding file!\n");
- logger_.level(error) << "Error after rewinding file!";
+ etiLog.level(error) << "Error after rewinding file!";
return -1;
}
}
else {
PDEBUG("Impossible to rewind file!\n");
- logger_.level(error) << "Impossible to rewind file!";
+ etiLog.level(error) << "Impossible to rewind file!";
return -1;
}
}
@@ -257,8 +247,7 @@ int InputFileReader::GetNextFrame(void* buffer)
}
}
if (frameSize > 6144) { // there might be a better limit
- logger_.level(error) << "Wrong frame size " << frameSize << " in ETI file!";
- fprintf(stderr, "Wrong frame size %u in ETI file!\n", frameSize);
+ etiLog.level(error) << "Wrong frame size " << frameSize << " in ETI file!";
return -1;
}
@@ -275,7 +264,7 @@ int InputFileReader::GetNextFrame(void* buffer)
}
else {
PDEBUG("Impossible to rewind file!\n");
- logger_.level(error) << "Impossible to rewind file!";
+ etiLog.level(error) << "Impossible to rewind file!";
return -1;
}
}
@@ -285,12 +274,8 @@ int InputFileReader::GetNextFrame(void* buffer)
// A short read of a frame (i.e. reading an incomplete frame)
// is not tolerated. Input files must not contain incomplete frames
if (read_bytes != 0) {
- fprintf(stderr,
- "Unable to read a complete frame of %u data bytes from input file!\n",
- frameSize);
-
- perror(filename_.c_str());
- logger_.level(error) << "Unable to read from input file!";
+ etiLog.level(error) <<
+ "Unable to read a complete frame of " << frameSize << " data bytes from input file!";
return -1;
}
else {
diff --git a/src/InputReader.h b/src/InputReader.h
index 13d49b8..b262cc9 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -31,6 +31,8 @@
#endif
#include <cstdio>
+#include <vector>
+#include <boost/shared_ptr.hpp>
#if defined(HAVE_ZEROMQ)
# include "zmq.hpp"
# include "ThreadsafeQueue.h"
@@ -85,9 +87,9 @@ class InputReader
class InputFileReader : public InputReader
{
public:
- InputFileReader(Logger logger) :
+ InputFileReader() :
streamtype_(ETI_STREAM_TYPE_NONE),
- inputfile_(NULL), logger_(logger) {};
+ inputfile_(NULL) { }
~InputFileReader()
{
@@ -113,6 +115,9 @@ class InputFileReader : public InputReader
}
private:
+ InputFileReader(const InputFileReader& other);
+ InputFileReader& operator=(const InputFileReader& other);
+
int IdentifyType();
// Rewind the file, and replay anew
@@ -123,7 +128,6 @@ class InputFileReader : public InputReader
std::string filename_;
EtiStreamType streamtype_;
FILE* inputfile_;
- Logger logger_;
size_t inputfilelength_;
uint64_t nbframes_; // 64-bit because 32-bit overflow is
@@ -143,7 +147,7 @@ struct zmq_input_overflow : public std::exception
struct InputZeroMQThreadData
{
- ThreadsafeQueue<uint8_t*> *in_messages;
+ ThreadsafeQueue<boost::shared_ptr<std::vector<uint8_t> > > *in_messages;
std::string uri;
unsigned max_queued_frames;
@@ -179,8 +183,7 @@ class InputZeroMQWorker
class InputZeroMQReader : public InputReader
{
public:
- InputZeroMQReader(Logger logger) :
- logger_(logger), in_messages_(10)
+ InputZeroMQReader()
{
workerdata_.in_messages = &in_messages_;
workerdata_.running = false;
@@ -198,12 +201,12 @@ class InputZeroMQReader : public InputReader
void PrintInfo();
private:
- InputZeroMQReader(const InputZeroMQReader& other) {}
- Logger logger_;
+ InputZeroMQReader(const InputZeroMQReader& other);
+ InputZeroMQReader& operator=(const InputZeroMQReader& other);
std::string uri_;
InputZeroMQWorker worker_;
- ThreadsafeQueue<uint8_t*> in_messages_;
+ ThreadsafeQueue<boost::shared_ptr<std::vector<uint8_t> > > in_messages_;
struct InputZeroMQThreadData workerdata_;
};
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index f8c15c4..36d4e4b 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -37,6 +37,7 @@
#include <stdint.h>
#include "zmq.hpp"
#include <boost/thread/thread.hpp>
+#include <boost/make_shared.hpp>
#include "porting.h"
#include "InputReader.h"
#include "PcDebug.h"
@@ -84,16 +85,25 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
{
const size_t framesize = 6144;
- uint8_t* incoming;
- in_messages_.wait_and_pop(incoming);
+ boost::shared_ptr<std::vector<uint8_t> > incoming;
+
+ /* Do some prebuffering because reads will happen in bursts
+ * (4 ETI frames in TM1) and we should make sure that
+ * we can serve the data required for a full transmission frame.
+ */
+ if (in_messages_.size() < 4) {
+ const size_t prebuffering = 10;
+ in_messages_.wait_and_pop(incoming, prebuffering);
+ }
+ else {
+ in_messages_.wait_and_pop(incoming);
+ }
if (! workerdata_.running) {
throw zmq_input_overflow();
}
- memcpy(buffer, incoming, framesize);
-
- delete incoming;
+ memcpy(buffer, &incoming->front(), framesize);
return framesize;
}
@@ -135,16 +145,14 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
}
else if (queue_size < workerdata->max_queued_frames) {
if (buffer_full) {
- fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n",
- queue_size);
+ etiLog.level(info) << "ZeroMQ buffer recovered: " << queue_size << " elements";
buffer_full = false;
}
zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
if (dab_msg->version != 1) {
- fprintf(stderr, "ZeroMQ input: wrong packet version %d\n",
- dab_msg->version);
+ etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
}
int offset = sizeof(dab_msg->version) +
@@ -155,23 +163,20 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
if (dab_msg->buflen[i] <= 0 ||
dab_msg->buflen[i] > 6144)
{
- fprintf(stderr, "ZeroMQ buffer %d: invalid length %d\n",
- i, dab_msg->buflen[i]);
+ etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " <<
+ dab_msg->buflen[i];
// TODO error handling
}
else {
- uint8_t* buf = new uint8_t[6144];
+ boost::shared_ptr<std::vector<uint8_t> > buf =
+ boost::make_shared<std::vector<uint8_t> >(6144, 0x55);
const int framesize = dab_msg->buflen[i];
- memcpy(buf,
+ memcpy(&buf->front(),
((uint8_t*)incoming.data()) + offset,
framesize);
- // pad to 6144 bytes
- memset(&((uint8_t*)buf)[framesize],
- 0x55, 6144 - framesize);
-
offset += framesize;
queue_size = workerdata->in_messages->push(buf);
@@ -182,7 +187,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
workerdata->in_messages->notify();
if (!buffer_full) {
- fprintf(stderr, "ZeroMQ buffer overfull !\n");
+ etiLog.level(warn) << "ZeroMQ buffer overfull !";
buffer_full = true;
throw std::runtime_error("ZMQ input full");
@@ -199,18 +204,17 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
}
if (queue_size < 5) {
- fprintf(stderr, "ZeroMQ buffer low: %zu elements !\n",
- queue_size);
+ etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << " elements !";
}
}
}
catch (zmq::error_t& err) {
- fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what());
+ etiLog.level(error) << "ZeroMQ error in RecvProcess: '" << err.what() << "'";
}
catch (std::exception& err) {
}
- fprintf(stderr, "ZeroMQ input worker terminated\n");
+ etiLog.level(info) << "ZeroMQ input worker terminated";
subscriber.close();
diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp
index dbf8b9d..adc7b9a 100644
--- a/src/OutputUHD.cpp
+++ b/src/OutputUHD.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2014
+ Copyright (C) 2014, 2015
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -31,6 +31,11 @@
#include "PcDebug.h"
#include "Log.h"
#include "RemoteControl.h"
+#include "Utils.h"
+
+#include <boost/thread/future.hpp>
+
+#include <uhd/utils/msg.hpp>
#include <cmath>
#include <iostream>
@@ -46,23 +51,63 @@ using namespace std;
typedef std::complex<float> complexf;
+void uhd_msg_handler(uhd::msg::type_t type, const std::string &msg)
+{
+ if (type == uhd::msg::warning) {
+ etiLog.level(warn) << "UHD Warning: " << msg;
+ }
+ else if (type == uhd::msg::error) {
+ etiLog.level(error) << "UHD Error: " << msg;
+ }
+ else {
+ etiLog.level(debug) << "UHD Message: " << msg;
+ }
+}
+
+// Check function for GPS TIMELOCK sensor
+bool check_gps_timelock(uhd::usrp::multi_usrp::sptr usrp)
+{
+ try {
+ std::string sensor_value(
+ usrp->get_mboard_sensor("gps_timelock", 0).to_pp_string());
+
+ if (sensor_value.find("TIME LOCKED") == std::string::npos) {
+ etiLog.level(warn) << "OutputUHD: gps_timelock " << sensor_value;
+ return false;
+ }
+
+ return true;
+ }
+ catch (uhd::lookup_error &e) {
+ etiLog.level(warn) << "OutputUHD: no gps_timelock sensor";
+ return false;
+ }
+}
+
+
OutputUHD::OutputUHD(
- const OutputUHDConfig& config,
- Logger *logger) :
+ const OutputUHDConfig& config) :
ModOutput(ModFormat(1), ModFormat(0)),
RemoteControllable("uhd"),
- myLogger(logger),
myConf(config),
// Since we don't know the buffer size, we cannot initialise
// the buffers at object initialisation.
first_run(true),
+ gps_fix_verified(false),
activebuffer(1),
myDelayBuf(0)
{
- myMuting = 0; // is remote-controllable
+ myMuting = true; // is remote-controllable, and reset by the GPS fix check
myStaticDelayUs = 0; // is remote-controllable
+ // Variables needed for GPS fix check
+ num_checks_without_gps_fix = 1;
+ first_gps_fix_check.tv_sec = 0;
+ last_gps_fix_check.tv_sec = 0;
+ time_last_frame.tv_sec = 0;
+
+
#if FAKE_UHD
MDEBUG("OutputUHD:Using fake UHD output");
#else
@@ -92,6 +137,10 @@ OutputUHD::OutputUHD(
RC_ADD_PARAMETER(muting, "Mute the output by stopping the transmitter");
RC_ADD_PARAMETER(staticdelay, "Set static delay (uS) between 0 and 96000");
+ // TODO: find out how to use boost::bind to give the logger to the
+ // uhd_msg_handler
+ uhd::msg::register_handler(uhd_msg_handler);
+
uhd::set_thread_priority_safe();
//create a usrp device
@@ -151,60 +200,6 @@ OutputUHD::OutputUHD(
MDEBUG("OutputUHD:Mute on missing timestamps: %s ...\n",
myConf.muteNoTimestamps ? "enabled" : "disabled");
- if (myConf.enableSync && (myConf.pps_src == "none")) {
- myLogger->level(warn) <<
- "OutputUHD: WARNING:"
- " you are using synchronous transmission without PPS input!";
-
- struct timespec now;
- if (clock_gettime(CLOCK_REALTIME, &now)) {
- perror("OutputUHD:Error: could not get time: ");
- myLogger->level(error) << "OutputUHD: could not get time";
- }
- else {
- myUsrp->set_time_now(uhd::time_spec_t(now.tv_sec));
- myLogger->level(info) << "OutputUHD: Setting USRP time to " <<
- uhd::time_spec_t(now.tv_sec).get_real_secs();
- }
- }
-
- if (myConf.pps_src != "none") {
- /* handling time for synchronisation: wait until the next full
- * second, and set the USRP time at next PPS */
- struct timespec now;
- time_t seconds;
- if (clock_gettime(CLOCK_REALTIME, &now)) {
- myLogger->level(error) << "OutputUHD: could not get time :" <<
- strerror(errno);
- throw std::runtime_error("OutputUHD: could not get time.");
- }
- else {
- seconds = now.tv_sec;
-
- MDEBUG("OutputUHD:sec+1: %ld ; now: %ld ...\n", seconds+1, now.tv_sec);
- while (seconds + 1 > now.tv_sec) {
- usleep(1);
- if (clock_gettime(CLOCK_REALTIME, &now)) {
- myLogger->level(error) << "OutputUHD: could not get time :" <<
- strerror(errno);
- throw std::runtime_error("OutputUHD: could not get time.");
- }
- }
- MDEBUG("OutputUHD:sec+1: %ld ; now: %ld ...\n", seconds+1, now.tv_sec);
- /* We are now shortly after the second change. */
-
- usleep(200000); // 200ms, we want the PPS to be later
- myUsrp->set_time_unknown_pps(uhd::time_spec_t(seconds + 2));
- myLogger->level(info) << "OutputUHD: Setting USRP time next pps to " <<
- uhd::time_spec_t(seconds + 2).get_real_secs();
- }
-
- usleep(1e6);
- myLogger->log(info, "OutputUHD: USRP time %f\n",
- myUsrp->get_time_now().get_real_secs());
- }
-
-
// preparing output thread worker data
uwd.myUsrp = myUsrp;
#endif
@@ -214,17 +209,22 @@ OutputUHD::OutputUHD(
uwd.sampleRate = myConf.sampleRate;
uwd.sourceContainsTimestamp = false;
uwd.muteNoTimestamps = myConf.muteNoTimestamps;
- uwd.logger = myLogger;
uwd.refclk_lock_loss_behaviour = myConf.refclk_lock_loss_behaviour;
if (myConf.refclk_src == "internal") {
uwd.check_refclk_loss = false;
+ uwd.check_gpsfix = false;
+ }
+ else if (myConf.refclk_src == "gpsdo") {
+ uwd.check_refclk_loss = true;
+ uwd.check_gpsfix = (myConf.maxGPSHoldoverTime != 0);
}
else {
uwd.check_refclk_loss = true;
+ uwd.check_gpsfix = false;
}
- SetDelayBuffer(config.dabMode);
+ SetDelayBuffer(myConf.dabMode);
shared_ptr<barrier> b(new barrier(2));
mySyncBarrier = b;
@@ -244,28 +244,26 @@ OutputUHD::~OutputUHD()
}
}
-void OutputUHD::SetDelayBuffer(unsigned int dabMode)
+int transmission_frame_duration_ms(unsigned int dabMode)
{
- // find out the duration of the transmission frame (Table 2 in ETSI 300 401)
switch (dabMode) {
- case 0: // could happen when called from constructor and we take the mode from ETI
- myTFDurationMs = 0;
- break;
- case 1:
- myTFDurationMs = 96;
- break;
- case 2:
- myTFDurationMs = 24;
- break;
- case 3:
- myTFDurationMs = 24;
- break;
- case 4:
- myTFDurationMs = 48;
- break;
+ // could happen when called from constructor and we take the mode from ETI
+ case 0: return 0;
+
+ case 1: return 96;
+ case 2: return 24;
+ case 3: return 24;
+ case 4: return 48;
default:
throw std::runtime_error("OutputUHD: invalid DAB mode");
}
+}
+
+void OutputUHD::SetDelayBuffer(unsigned int dabMode)
+{
+ // find out the duration of the transmission frame (Table 2 in ETSI 300 401)
+ myTFDurationMs = transmission_frame_duration_ms(dabMode);
+
// The buffer size equals the number of samples per transmission frame so
// we calculate it by multiplying the duration of the transmission frame
// with the samplerate.
@@ -283,8 +281,23 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
// the first buffer
// We will only wait on the barrier on the subsequent calls to
// OutputUHD::process
- if (first_run) {
- myLogger->level(debug) << "OutputUHD: UHD initialising...";
+ if (not gps_fix_verified) {
+ if (uwd.check_gpsfix) {
+ initial_gps_check();
+
+ if (num_checks_without_gps_fix == 0) {
+ set_usrp_time();
+ gps_fix_verified = true;
+ myMuting = false;
+ }
+ }
+ else {
+ gps_fix_verified = true;
+ myMuting = false;
+ }
+ }
+ else if (first_run) {
+ etiLog.level(debug) << "OutputUHD: UHD initialising...";
worker.start(&uwd);
@@ -319,17 +332,28 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
lastLen = uwd.bufsize;
first_run = false;
- myLogger->level(debug) << "OutputUHD: UHD initialising complete";
+ etiLog.level(debug) << "OutputUHD: UHD initialising complete";
}
else {
if (lastLen != dataIn->getLength()) {
// I expect that this never happens.
- myLogger->level(emerg) <<
+ etiLog.level(emerg) <<
"OutputUHD: Fatal error, input length changed from " << lastLen <<
" to " << dataIn->getLength();
throw std::runtime_error("Non-constant input length!");
}
+
+ if (uwd.check_gpsfix) {
+ try {
+ check_gps();
+ }
+ catch (std::runtime_error& e) {
+ uwd.running = false;
+ etiLog.level(error) << e.what();
+ }
+ }
+
mySyncBarrier.get()->wait();
if (!uwd.running) {
@@ -339,7 +363,7 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
throw fct_discontinuity_error();
}
else {
- myLogger->level(error) <<
+ etiLog.level(error) <<
"OutputUHD: Error, UHD worker failed";
throw std::runtime_error("UHD worker failed");
}
@@ -384,71 +408,226 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)
}
return uwd.bufsize;
+}
+
+
+void OutputUHD::set_usrp_time()
+{
+ if (myConf.enableSync && (myConf.pps_src == "none")) {
+ etiLog.level(warn) <<
+ "OutputUHD: WARNING:"
+ " you are using synchronous transmission without PPS input!";
+
+ struct timespec now;
+ if (clock_gettime(CLOCK_REALTIME, &now)) {
+ perror("OutputUHD:Error: could not get time: ");
+ etiLog.level(error) << "OutputUHD: could not get time";
+ }
+ else {
+ myUsrp->set_time_now(uhd::time_spec_t(now.tv_sec));
+ etiLog.level(info) << "OutputUHD: Setting USRP time to " <<
+ uhd::time_spec_t(now.tv_sec).get_real_secs();
+ }
+ }
+
+ if (myConf.pps_src != "none") {
+ /* handling time for synchronisation: wait until the next full
+ * second, and set the USRP time at next PPS */
+ struct timespec now;
+ time_t seconds;
+ if (clock_gettime(CLOCK_REALTIME, &now)) {
+ etiLog.level(error) << "OutputUHD: could not get time :" <<
+ strerror(errno);
+ throw std::runtime_error("OutputUHD: could not get time.");
+ }
+ else {
+ seconds = now.tv_sec;
+
+ MDEBUG("OutputUHD:sec+1: %ld ; now: %ld ...\n", seconds+1, now.tv_sec);
+ while (seconds + 1 > now.tv_sec) {
+ usleep(1);
+ if (clock_gettime(CLOCK_REALTIME, &now)) {
+ etiLog.level(error) << "OutputUHD: could not get time :" <<
+ strerror(errno);
+ throw std::runtime_error("OutputUHD: could not get time.");
+ }
+ }
+ MDEBUG("OutputUHD:sec+1: %ld ; now: %ld ...\n", seconds+1, now.tv_sec);
+ /* We are now shortly after the second change. */
+
+ usleep(200000); // 200ms, we want the PPS to be later
+ myUsrp->set_time_unknown_pps(uhd::time_spec_t(seconds + 2));
+ etiLog.level(info) << "OutputUHD: Setting USRP time next pps to " <<
+ uhd::time_spec_t(seconds + 2).get_real_secs();
+ }
+
+ usleep(1e6);
+ etiLog.log(info, "OutputUHD: USRP time %f\n",
+ myUsrp->get_time_now().get_real_secs());
+ }
+}
+
+void OutputUHD::initial_gps_check()
+{
+ if (first_gps_fix_check.tv_sec == 0) {
+ etiLog.level(info) << "Waiting for GPS fix";
+
+ if (clock_gettime(CLOCK_MONOTONIC, &first_gps_fix_check) != 0) {
+ stringstream ss;
+ ss << "clock_gettime failure: " << strerror(errno);
+ throw std::runtime_error(ss.str());
+ }
+ }
+
+ check_gps();
+
+ if (last_gps_fix_check.tv_sec >
+ first_gps_fix_check.tv_sec + initial_gps_fix_wait) {
+ stringstream ss;
+ ss << "GPS did not show time lock in " << initial_gps_fix_wait << " seconds";
+ throw std::runtime_error(ss.str());
+ }
+
+ if (time_last_frame.tv_sec == 0) {
+ if (clock_gettime(CLOCK_MONOTONIC, &time_last_frame) != 0) {
+ stringstream ss;
+ ss << "clock_gettime failure: " << strerror(errno);
+ throw std::runtime_error(ss.str());
+ }
+ }
+
+ struct timespec now;
+ if (clock_gettime(CLOCK_MONOTONIC, &now) != 0) {
+ stringstream ss;
+ ss << "clock_gettime failure: " << strerror(errno);
+ throw std::runtime_error(ss.str());
+ }
+
+ long delta_us = timespecdiff_us(time_last_frame, now);
+ long wait_time_us = transmission_frame_duration_ms(myConf.dabMode);
+
+ if (wait_time_us - delta_us > 0) {
+ usleep(wait_time_us - delta_us);
+ }
+
+ time_last_frame.tv_nsec += wait_time_us * 1000;
+ if (time_last_frame.tv_nsec >= 1000000000L) {
+ time_last_frame.tv_nsec -= 1000000000L;
+ time_last_frame.tv_sec++;
+ }
+}
+
+void OutputUHD::check_gps()
+{
+ struct timespec time_now;
+ if (clock_gettime(CLOCK_MONOTONIC, &time_now) != 0) {
+ stringstream ss;
+ ss << "clock_gettime failure: " << strerror(errno);
+ throw std::runtime_error(ss.str());
+ }
+
+ // Divide interval by two because we alternate between
+ // launch and check
+ if (uwd.check_gpsfix and
+ last_gps_fix_check.tv_sec + gps_fix_check_interval/2.0 <
+ time_now.tv_sec) {
+ last_gps_fix_check = time_now;
+
+ // Alternate between launching thread and checking the
+ // result.
+ if (gps_fix_task.joinable()) {
+ if (gps_fix_future.has_value()) {
+ gps_fix_future.wait();
+
+ gps_fix_task.join();
+
+ if (not gps_fix_future.get()) {
+ if (num_checks_without_gps_fix == 0) {
+ etiLog.level(alert) <<
+ "OutputUHD: GPS Time Lock lost";
+ }
+ num_checks_without_gps_fix++;
+ }
+ else {
+ if (num_checks_without_gps_fix) {
+ etiLog.level(info) <<
+ "OutputUHD: GPS Time Lock recovered";
+ }
+ num_checks_without_gps_fix = 0;
+ }
+
+ if (gps_fix_check_interval * num_checks_without_gps_fix >
+ myConf.maxGPSHoldoverTime) {
+ std::stringstream ss;
+ ss << "Lost GPS Time Lock for " << gps_fix_check_interval *
+ num_checks_without_gps_fix << " seconds";
+ throw std::runtime_error(ss.str());
+ }
+ }
+ }
+ else {
+ // Checking the sensor here takes too much
+ // time, it has to be done in a separate thread.
+ gps_fix_pt = boost::packaged_task<bool>(
+ boost::bind(check_gps_timelock, myUsrp) );
+
+ gps_fix_future = gps_fix_pt.get_future();
+
+ gps_fix_task = boost::thread(boost::move(gps_fix_pt));
+ }
+ }
}
+//============================ UHD Worker ========================
+
void UHDWorker::process_errhandler()
{
try {
process();
}
catch (fct_discontinuity_error& e) {
- uwd->logger->level(warn) << e.what();
+ etiLog.level(warn) << e.what();
uwd->failed_due_to_fct = true;
}
uwd->running = false;
uwd->sync_barrier.get()->wait();
- uwd->logger->level(warn) << "UHD worker terminated";
+ etiLog.level(warn) << "UHD worker terminated";
}
void UHDWorker::process()
{
- int workerbuffer = 0;
- time_t tx_second = 0;
- double pps_offset = 0;
- double last_pps = 2.0;
- double usrp_time;
-
- //const struct timespec hundred_nano = {0, 100};
-
- size_t sizeIn;
- struct UHDWorkerFrameData* frame;
-
- size_t num_acc_samps; //number of accumulated samples
- //int write_fail_count;
-
- // Transmit timeout
- const double timeout = 0.2;
+ int workerbuffer = 0;
+ tx_second = 0;
+ pps_offset = 0.0;
+ last_pps = 2.0;
#if FAKE_UHD == 0
uhd::stream_args_t stream_args("fc32"); //complex floats
- uhd::tx_streamer::sptr myTxStream = uwd->myUsrp->get_tx_stream(stream_args);
- size_t usrp_max_num_samps = myTxStream->get_max_num_samps();
-#else
- size_t usrp_max_num_samps = 2048; // arbitrarily chosen
+ myTxStream = uwd->myUsrp->get_tx_stream(stream_args);
#endif
- const complexf* in;
-
- uhd::tx_metadata_t md;
md.start_of_burst = false;
- md.end_of_burst = false;
+ md.end_of_burst = false;
- int expected_next_fct = -1;
+ expected_next_fct = -1;
+
+ num_underflows = 0;
+ num_late_packets = 0;
while (uwd->running) {
- bool fct_discontinuity = false;
- md.has_time_spec = false;
- md.time_spec = uhd::time_spec_t(0.0);
- num_acc_samps = 0;
- //write_fail_count = 0;
+ fct_discontinuity = false;
+ md.has_time_spec = false;
+ md.time_spec = uhd::time_spec_t(0.0);
/* Wait for barrier */
// this wait will hopefully always be the second one
// because modulation should be quicker than transmission
uwd->sync_barrier.get()->wait();
+ struct UHDWorkerFrameData* frame;
+
if (workerbuffer == 0) {
frame = &(uwd->frame0);
}
@@ -460,248 +639,240 @@ void UHDWorker::process()
"UHDWorker.process: workerbuffer is neither 0 nor 1 !");
}
- in = reinterpret_cast<const complexf*>(frame->buf);
- pps_offset = frame->ts.timestamp_pps_offset;
+ handle_frame(frame);
- // Tx second from MNSC
- tx_second = frame->ts.timestamp_sec;
+ // swap buffers
+ workerbuffer = (workerbuffer + 1) % 2;
+ }
+}
- sizeIn = uwd->bufsize / sizeof(complexf);
+void UHDWorker::handle_frame(const struct UHDWorkerFrameData *frame)
+{
+ // Transmit timeout
+ static const double tx_timeout = 20.0;
- /* Verify that the FCT value is correct. If we miss one transmission
- * frame we must interrupt UHD and resync to the timestamps
- */
- if (frame->ts.fct == -1) {
- uwd->logger->level(info) <<
- "OutputUHD: dropping one frame with invalid FCT";
- goto loopend;
- }
- if (expected_next_fct != -1) {
- if (expected_next_fct != (int)frame->ts.fct) {
- uwd->logger->level(warn) <<
- "OutputUHD: Incorrect expect fct " << frame->ts.fct <<
- ", expected " << expected_next_fct;
+ pps_offset = frame->ts.timestamp_pps_offset;
- fct_discontinuity = true;
- throw fct_discontinuity_error();
- }
- }
+ // Tx second from MNSC
+ tx_second = frame->ts.timestamp_sec;
- expected_next_fct = (frame->ts.fct + uwd->fct_increment) % 250;
+ /* Verify that the FCT value is correct. If we miss one transmission
+ * frame we must interrupt UHD and resync to the timestamps
+ */
+ if (frame->ts.fct == -1) {
+ etiLog.level(info) <<
+ "OutputUHD: dropping one frame with invalid FCT";
+ return;
+ }
+ if (expected_next_fct != -1) {
+ if (expected_next_fct != (int)frame->ts.fct) {
+ etiLog.level(warn) <<
+ "OutputUHD: Incorrect expect fct " << frame->ts.fct <<
+ ", expected " << expected_next_fct;
+
+ fct_discontinuity = true;
+ throw fct_discontinuity_error();
+ }
+ }
- // Check for ref_lock
- if (uwd->check_refclk_loss)
- {
- try {
- // TODO: Is this check specific to the B100 and USRP2 ?
- if (! uwd->myUsrp->get_mboard_sensor("ref_locked", 0).to_bool()) {
- uwd->logger->log(alert,
- "OutputUHD: External reference clock lock lost !");
- if (uwd->refclk_lock_loss_behaviour == CRASH) {
- throw std::runtime_error(
- "OutputUHD: External reference clock lock lost.");
- }
+ expected_next_fct = (frame->ts.fct + uwd->fct_increment) % 250;
+
+ // Check for ref_lock
+ if (uwd->check_refclk_loss) {
+ try {
+ // TODO: Is this check specific to the B100 and USRP2 ?
+ if (! uwd->myUsrp->get_mboard_sensor("ref_locked", 0).to_bool()) {
+ etiLog.log(alert,
+ "OutputUHD: External reference clock lock lost !");
+ if (uwd->refclk_lock_loss_behaviour == CRASH) {
+ throw std::runtime_error(
+ "OutputUHD: External reference clock lock lost.");
}
}
- catch (uhd::lookup_error &e) {
- uwd->check_refclk_loss = false;
- uwd->logger->log(warn,
- "OutputUHD: This USRP does not have mboard sensor for ext clock loss."
- " Check disabled.");
- }
}
+ catch (uhd::lookup_error &e) {
+ uwd->check_refclk_loss = false;
+ etiLog.log(warn,
+ "OutputUHD: This USRP does not have mboard sensor for ext clock loss."
+ " Check disabled.");
+ }
+ }
- usrp_time = uwd->myUsrp->get_time_now().get_real_secs();
-
- if (uwd->sourceContainsTimestamp) {
- if (!frame->ts.timestamp_valid) {
- /* We have not received a full timestamp through
- * MNSC. We sleep through the frame.
- */
- uwd->logger->level(info) <<
- "OutputUHD: Throwing sample " << frame->ts.fct <<
- " away: incomplete timestamp " << tx_second <<
- " + " << pps_offset;
- usleep(20000); //TODO should this be TM-dependant ?
- goto loopend;
- }
-
- md.has_time_spec = true;
- md.time_spec = uhd::time_spec_t(tx_second, pps_offset);
-
- // md is defined, let's do some checks
- if (md.time_spec.get_real_secs() + timeout < usrp_time) {
- uwd->logger->level(warn) <<
- "OutputUHD: Timestamp in the past! offset: " <<
- md.time_spec.get_real_secs() - usrp_time <<
- " (" << usrp_time << ")"
- " frame " << frame->ts.fct <<
- ", tx_second " << tx_second <<
- ", pps " << pps_offset;
- goto loopend; //skip the frame
- }
+ double usrp_time = uwd->myUsrp->get_time_now().get_real_secs();
-#if 0 // Let uhd handle this
- if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_MARGIN_FUTURE) {
- uwd->logger->level(warn) <<
- "OutputUHD: Timestamp too far in the future! offset: " <<
- md.time_spec.get_real_secs() - usrp_time;
- usleep(20000); //sleep so as to fill buffers
- }
-#endif
- if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_ABORT_FUTURE) {
- uwd->logger->level(error) <<
- "OutputUHD: Timestamp way too far in the future! offset: " <<
- md.time_spec.get_real_secs() - usrp_time;
- throw std::runtime_error("Timestamp error. Aborted.");
- }
+ if (uwd->sourceContainsTimestamp) {
+ if (!frame->ts.timestamp_valid) {
+ /* We have not received a full timestamp through
+ * MNSC. We sleep through the frame.
+ */
+ etiLog.level(info) <<
+ "OutputUHD: Throwing sample " << frame->ts.fct <<
+ " away: incomplete timestamp " << tx_second <<
+ " + " << pps_offset;
+ usleep(20000); //TODO should this be TM-dependant ?
+ return;
+ }
- if (last_pps > pps_offset) {
- uwd->logger->log(info,
- "OutputUHD (usrp time: %f): frame %d;"
- " tx_second %zu; pps %.9f\n",
- usrp_time,
- frame->ts.fct, tx_second, pps_offset);
- }
+ md.has_time_spec = true;
+ md.time_spec = uhd::time_spec_t(tx_second, pps_offset);
+
+ // md is defined, let's do some checks
+ if (md.time_spec.get_real_secs() + tx_timeout < usrp_time) {
+ etiLog.level(warn) <<
+ "OutputUHD: Timestamp in the past! offset: " <<
+ md.time_spec.get_real_secs() - usrp_time <<
+ " (" << usrp_time << ")"
+ " frame " << frame->ts.fct <<
+ ", tx_second " << tx_second <<
+ ", pps " << pps_offset;
+ return;
+ }
+ if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_ABORT_FUTURE) {
+ etiLog.level(error) <<
+ "OutputUHD: Timestamp way too far in the future! offset: " <<
+ md.time_spec.get_real_secs() - usrp_time;
+ throw std::runtime_error("Timestamp error. Aborted.");
}
- else { // !uwd->sourceContainsTimestamp
- if (uwd->muting || uwd->muteNoTimestamps) {
- /* There was some error decoding the timestamp
- */
- if (uwd->muting) {
- uwd->logger->log(info,
- "OutputUHD: Muting sample %d requested\n",
- frame->ts.fct);
- }
- else {
- uwd->logger->log(info,
- "OutputUHD: Muting sample %d : no timestamp\n",
- frame->ts.fct);
- }
- usleep(20000);
- goto loopend;
+ }
+ else { // !uwd->sourceContainsTimestamp
+ if (uwd->muting || uwd->muteNoTimestamps) {
+ /* There was some error decoding the timestamp
+ */
+ if (uwd->muting) {
+ etiLog.log(info,
+ "OutputUHD: Muting sample %d requested\n",
+ frame->ts.fct);
}
+ else {
+ etiLog.log(info,
+ "OutputUHD: Muting sample %d : no timestamp\n",
+ frame->ts.fct);
+ }
+ usleep(20000);
+ return;
}
+ }
- PDEBUG("UHDWorker::process:max_num_samps: %zu.\n",
- usrp_max_num_samps);
+ tx_frame(frame);
- while (uwd->running && !uwd->muting && (num_acc_samps < sizeIn)) {
- size_t samps_to_send = std::min(sizeIn - num_acc_samps, usrp_max_num_samps);
+ if (last_pps > pps_offset) {
+ if (num_underflows or num_late_packets) {
+ etiLog.log(info,
+ "OutputUHD status (usrp time: %f): "
+ "%d underruns and %d late packets since last status.\n",
+ usrp_time,
+ num_underflows, num_late_packets);
+ }
+ num_underflows = 0;
+ num_late_packets = 0;
+ }
- //ensure the the last packet has EOB set if the timestamps has been
- //refreshed and need to be reconsidered.
- //Also, if we saw that the FCT did not increment as expected, which
- //could be due to a lost incoming packet.
- md.end_of_burst = (
- uwd->sourceContainsTimestamp &&
- (frame->ts.timestamp_refresh || fct_discontinuity) &&
- samps_to_send <= usrp_max_num_samps );
+ last_pps = pps_offset;
+}
+void UHDWorker::tx_frame(const struct UHDWorkerFrameData *frame)
+{
+ const double tx_timeout = 20.0;
+ const size_t sizeIn = uwd->bufsize / sizeof(complexf);
+ const complexf* in_data = reinterpret_cast<const complexf*>(frame->buf);
-#if FAKE_UHD
- // This is probably very approximate
- usleep( (1000000 / uwd->sampleRate) * samps_to_send);
- size_t num_tx_samps = samps_to_send;
+#if FAKE_UHD == 0
+ size_t usrp_max_num_samps = myTxStream->get_max_num_samps();
#else
- //send a single packet
- size_t num_tx_samps = myTxStream->send(
- &in[num_acc_samps],
- samps_to_send, md, timeout);
+ size_t usrp_max_num_samps = 2048; // arbitrarily chosen
#endif
- num_acc_samps += num_tx_samps;
+ size_t num_acc_samps = 0; //number of accumulated samples
+ while (uwd->running && !uwd->muting && (num_acc_samps < sizeIn)) {
+ size_t samps_to_send = std::min(sizeIn - num_acc_samps, usrp_max_num_samps);
- md.time_spec = uhd::time_spec_t(tx_second, pps_offset)
- + uhd::time_spec_t(0, num_acc_samps/uwd->sampleRate);
+ //ensure the the last packet has EOB set if the timestamps has been
+ //refreshed and need to be reconsidered.
+ //Also, if we saw that the FCT did not increment as expected, which
+ //could be due to a lost incoming packet.
+ md.end_of_burst = (
+ uwd->sourceContainsTimestamp &&
+ (frame->ts.timestamp_refresh || fct_discontinuity) &&
+ samps_to_send <= usrp_max_num_samps );
- /*
- fprintf(stderr, "*** pps_offset %f, md.time_spec %f, usrp->now %f\n",
- pps_offset,
- md.time_spec.get_real_secs(),
- uwd->myUsrp->get_time_now().get_real_secs());
- // */
-
- if (num_tx_samps == 0) {
-#if 1
- uwd->logger->log(warn,
- "UHDWorker::process() unable to write to device, skipping frame!\n");
- break;
+#if FAKE_UHD
+ // This is probably very approximate
+ usleep( (1000000 / uwd->sampleRate) * samps_to_send);
+ size_t num_tx_samps = samps_to_send;
#else
- // This has been disabled, because if there is a write failure,
- // we'd better not insist and try to go on transmitting future
- // frames.
- // The goal is not to try to send by all means possible. It's
- // more important to make sure the SFN is not disturbed.
-
- fprintf(stderr, "F");
- nanosleep(&hundred_nano, NULL);
- write_fail_count++;
- if (write_fail_count >= 3) {
- double ts = md.time_spec.get_real_secs();
- double t_usrp = uwd->myUsrp->get_time_now().get_real_secs();
-
- fprintf(stderr, "*** USRP write fail count %d\n", write_fail_count);
- fprintf(stderr, "*** delta %f, md.time_spec %f, usrp->now %f\n",
- ts - t_usrp,
- ts, t_usrp);
-
- fprintf(stderr, "UHDWorker::process() unable to write to device, skipping frame!\n");
- break;
- }
+ //send a single packet
+ size_t num_tx_samps = myTxStream->send(
+ &in_data[num_acc_samps],
+ samps_to_send, md, tx_timeout);
#endif
- }
-#if FAKE_UHD == 0
- uhd::async_metadata_t async_md;
- if (uwd->myUsrp->get_device()->recv_async_msg(async_md, 0)) {
- const char* uhd_async_message = "";
- bool failure = true;
- switch (async_md.event_code) {
- case uhd::async_metadata_t::EVENT_CODE_BURST_ACK:
- failure = false;
- break;
- case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW:
- uhd_async_message = "Underflow";
- break;
- case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR:
- uhd_async_message = "Packet loss between host and device.";
- break;
- case uhd::async_metadata_t::EVENT_CODE_TIME_ERROR:
- uhd_async_message = "Packet had time that was late.";
- break;
- case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET:
- uhd_async_message = "Underflow occurred inside a packet.";
- break;
- case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR_IN_BURST:
- uhd_async_message = "Packet loss within a burst.";
- break;
- default:
- uhd_async_message = "unknown event code";
- break;
- }
+ num_acc_samps += num_tx_samps;
- if (failure) {
- uwd->logger->level(alert) << "Near frame " <<
- frame->ts.fct << ": Received Async UHD Message '" <<
- uhd_async_message << "'";
+ md.time_spec = uhd::time_spec_t(tx_second, pps_offset)
+ + uhd::time_spec_t(0, num_acc_samps/uwd->sampleRate);
- }
- }
-#endif
+ if (num_tx_samps == 0) {
+ etiLog.log(warn,
+ "UHDWorker::process() unable to write to device, skipping frame!\n");
+ break;
}
- last_pps = pps_offset;
+ print_async_metadata(frame);
+ }
+}
-loopend:
- // swap buffers
- workerbuffer = (workerbuffer + 1) % 2;
+void UHDWorker::print_async_metadata(const struct UHDWorkerFrameData *frame)
+{
+#if FAKE_UHD == 0
+ uhd::async_metadata_t async_md;
+ if (uwd->myUsrp->get_device()->recv_async_msg(async_md, 0)) {
+ const char* uhd_async_message = "";
+ bool failure = false;
+ switch (async_md.event_code) {
+ case uhd::async_metadata_t::EVENT_CODE_BURST_ACK:
+ break;
+ case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW:
+ uhd_async_message = "Underflow";
+ num_underflows++;
+ break;
+ case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR:
+ uhd_async_message = "Packet loss between host and device.";
+ failure = true;
+ break;
+ case uhd::async_metadata_t::EVENT_CODE_TIME_ERROR:
+ uhd_async_message = "Packet had time that was late.";
+ num_late_packets++;
+ break;
+ case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET:
+ uhd_async_message = "Underflow occurred inside a packet.";
+ failure = true;
+ break;
+ case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR_IN_BURST:
+ uhd_async_message = "Packet loss within a burst.";
+ failure = true;
+ break;
+ default:
+ uhd_async_message = "unknown event code";
+ failure = true;
+ break;
+ }
+
+ if (failure) {
+ etiLog.level(alert) << "Near frame " <<
+ frame->ts.fct << ": Received Async UHD Message '" <<
+ uhd_async_message << "'";
+
+ }
}
+#endif
}
+// =======================================
+// Remote Control for UHD
+// =======================================
void OutputUHD::set_parameter(const string& parameter, const string& value)
{
@@ -771,3 +942,4 @@ const string OutputUHD::get_parameter(const string& parameter) const
}
#endif // HAVE_OUTPUT_UHD
+
diff --git a/src/OutputUHD.h b/src/OutputUHD.h
index aed80f6..8234340 100644
--- a/src/OutputUHD.h
+++ b/src/OutputUHD.h
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2014
+ Copyright (C) 2014, 2015
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -120,6 +120,9 @@ struct UHDWorkerData {
// If we want to verify loss of refclk
bool check_refclk_loss;
+ // If we want to check for the gps_timelock sensor
+ bool check_gpsfix;
+
// muting set by remote control
bool muting;
@@ -129,9 +132,6 @@ struct UHDWorkerData {
// What to do when the reference clock PLL loses lock
refclk_lock_loss_behaviour_t refclk_lock_loss_behaviour;
- // The common logger
- Logger* logger;
-
// What transmission mode we're using defines by how
// much the FCT should increment for each
// transmission frame.
@@ -156,14 +156,29 @@ class UHDWorker {
}
private:
- void process();
- void process_errhandler();
+ // Asynchronous message statistics
+ int num_underflows;
+ int num_late_packets;
+ bool fct_discontinuity;
+ int expected_next_fct;
+ uhd::tx_metadata_t md;
+ time_t tx_second;
+ double pps_offset;
+ double last_pps;
+
+ void print_async_metadata(const struct UHDWorkerFrameData *frame);
+
+ void handle_frame(const struct UHDWorkerFrameData *frame);
+ void tx_frame(const struct UHDWorkerFrameData *frame);
struct UHDWorkerData *uwd;
boost::thread uhd_thread;
uhd::tx_streamer::sptr myTxStream;
+
+ void process();
+ void process_errhandler();
};
/* This structure is used as initial configuration for OutputUHD */
@@ -181,6 +196,7 @@ struct OutputUHDConfig {
bool enableSync;
bool muteNoTimestamps;
unsigned dabMode;
+ unsigned maxGPSHoldoverTime;
/* allowed values : auto, int, sma, mimo */
std::string refclk_src;
@@ -199,9 +215,7 @@ struct OutputUHDConfig {
class OutputUHD: public ModOutput, public RemoteControllable {
public:
- OutputUHD(
- const OutputUHDConfig& config,
- Logger *logger);
+ OutputUHD(const OutputUHDConfig& config);
~OutputUHD();
int process(Buffer* dataIn, Buffer* dataOut);
@@ -227,13 +241,16 @@ class OutputUHD: public ModOutput, public RemoteControllable {
protected:
- Logger *myLogger;
+ OutputUHD(const OutputUHD& other);
+ OutputUHD& operator=(const OutputUHD& other);
+
EtiReader *myEtiReader;
OutputUHDConfig myConf;
uhd::usrp::multi_usrp::sptr myUsrp;
boost::shared_ptr<boost::barrier> mySyncBarrier;
UHDWorker worker;
bool first_run;
+ bool gps_fix_verified;
struct UHDWorkerData uwd;
int activebuffer;
@@ -250,6 +267,27 @@ class OutputUHD: public ModOutput, public RemoteControllable {
int myTFDurationMs; // TF duration in milliseconds
std::vector<complexf> myDelayBuf;
size_t lastLen;
+
+ // GPS Fix check variables
+ int num_checks_without_gps_fix;
+ struct timespec first_gps_fix_check;
+ struct timespec last_gps_fix_check;
+ struct timespec time_last_frame;
+ boost::packaged_task<bool> gps_fix_pt;
+ boost::unique_future<bool> gps_fix_future;
+ boost::thread gps_fix_task;
+
+ // Wait time in seconds to get fix
+ static const int initial_gps_fix_wait = 180;
+
+ // Interval for checking the GPS at runtime
+ static const double gps_fix_check_interval = 10.0; // seconds
+
+ void check_gps();
+
+ void set_usrp_time();
+
+ void initial_gps_check();
};
#endif // HAVE_OUTPUT_UHD
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp
index 65da3b7..21a6c81 100644
--- a/src/RemoteControl.cpp
+++ b/src/RemoteControl.cpp
@@ -58,8 +58,10 @@ void RemoteControllerTelnet::restart_thread(long)
void RemoteControllerTelnet::process(long)
{
- m_welcome = "ODR-DabMod Remote Control CLI\nWrite 'help' for help.\n**********\n";
- m_prompt = "> ";
+ std::string m_welcome = "ODR-DabMod Remote Control CLI\n"
+ "Write 'help' for help.\n"
+ "**********\n";
+ std::string m_prompt = "> ";
std::string in_message;
size_t length;
@@ -308,7 +310,7 @@ void RemoteControllerZmq::process()
{
// create zmq reply socket for receiving ctrl parameters
zmq::socket_t repSocket(m_zmqContext, ZMQ_REP);
- std::cout << "Starting zmq remote control thread" << std::endl;
+ std::cerr << "Starting zmq remote control thread" << std::endl;
try
{
// connect the socket
@@ -342,10 +344,9 @@ void RemoteControllerZmq::process()
try
{
std::string value = get_param_(module, parameter);
- zmq::message_t *pMsg = new zmq::message_t(value.size());
- memcpy ((void*) pMsg->data(), value.data(), value.size());
- repSocket.send(*pMsg, 0);
- delete pMsg;
+ zmq::message_t msg(value.size());
+ memcpy ((void*) msg.data(), value.data(), value.size());
+ repSocket.send(&msg, 0);
}
catch (ParameterError &err)
{
diff --git a/src/RemoteControl.h b/src/RemoteControl.h
index 89a1583..1b5e447 100644
--- a/src/RemoteControl.h
+++ b/src/RemoteControl.h
@@ -34,7 +34,7 @@
#endif
#if defined(HAVE_ZEROMQ)
-#include <zmq.hpp>
+#include "zmq.hpp"
#endif
#include <list>
@@ -50,6 +50,7 @@
#include <boost/thread.hpp>
#include <stdexcept>
+#include "Log.h"
#define RC_ADD_PARAMETER(p, desc) { \
std::vector<std::string> p; \
@@ -114,8 +115,8 @@ class RemoteControllers {
it != m_controllers.end(); ++it) {
if ((*it)->fault_detected())
{
- fprintf(stderr,
- "Detected Remote Control fault, restarting it\n");
+ etiLog.level(warn) <<
+ "Detected Remote Control fault, restarting it";
(*it)->restart();
}
}
@@ -289,9 +290,6 @@ class RemoteControllerTelnet : public BaseRemoteController {
/* This controller commands the controllables in the cohort */
std::list<RemoteControllable*> m_cohort;
- std::string m_welcome;
- std::string m_prompt;
-
int m_port;
};
diff --git a/src/SignalMultiplexer.cpp b/src/SignalMultiplexer.cpp
index c5be552..8edcdc2 100644
--- a/src/SignalMultiplexer.cpp
+++ b/src/SignalMultiplexer.cpp
@@ -46,6 +46,7 @@ SignalMultiplexer::~SignalMultiplexer()
// dataIn[0] -> null symbol
// dataIn[1] -> MSC symbols
+// dataIn[2] -> (optional) TII symbol
int SignalMultiplexer::process(std::vector<Buffer*> dataIn, Buffer* dataOut)
{
#ifdef DEBUG
@@ -60,10 +61,17 @@ int SignalMultiplexer::process(std::vector<Buffer*> dataIn, Buffer* dataOut)
fprintf(stderr, ", dataOut: %p, sizeOut: %zu)\n", dataOut, dataOut->getLength());
#endif
- assert(dataIn.size() == 2);
+ assert(dataIn.size() == 2 or dataIn.size() == 3);
- *dataOut = *dataIn[0];
- *dataOut += *dataIn[1];
+ if (dataIn.size() == 2) {
+ *dataOut = *dataIn[0];
+ *dataOut += *dataIn[1];
+ }
+ else if (dataIn.size() == 3) {
+ *dataOut = *dataIn[2];
+ *dataOut += *dataIn[1];
+ }
return dataOut->getLength();
}
+
diff --git a/src/TII.cpp b/src/TII.cpp
new file mode 100644
index 0000000..6d969a5
--- /dev/null
+++ b/src/TII.cpp
@@ -0,0 +1,367 @@
+/*
+ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
+ the Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2015
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMod.
+
+ ODR-DabMod is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMod is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "TII.h"
+#include "PcDebug.h"
+
+#include <stdio.h>
+#include <stdexcept>
+#include <string.h>
+
+typedef std::complex<float> complexf;
+
+/* TII pattern for TM I, II, IV */
+const int pattern_tm1_2_4[][8] = { // {{{
+ {0,0,0,0,1,1,1,1},
+ {0,0,0,1,0,1,1,1},
+ {0,0,0,1,1,0,1,1},
+ {0,0,0,1,1,1,0,1},
+ {0,0,0,1,1,1,1,0},
+ {0,0,1,0,0,1,1,1},
+ {0,0,1,0,1,0,1,1},
+ {0,0,1,0,1,1,0,1},
+ {0,0,1,0,1,1,1,0},
+ {0,0,1,1,0,0,1,1},
+ {0,0,1,1,0,1,0,1},
+ {0,0,1,1,0,1,1,0},
+ {0,0,1,1,1,0,0,1},
+ {0,0,1,1,1,0,1,0},
+ {0,0,1,1,1,1,0,0},
+ {0,1,0,0,0,1,1,1},
+ {0,1,0,0,1,0,1,1},
+ {0,1,0,0,1,1,0,1},
+ {0,1,0,0,1,1,1,0},
+ {0,1,0,1,0,0,1,1},
+ {0,1,0,1,0,1,0,1},
+ {0,1,0,1,0,1,1,0},
+ {0,1,0,1,1,0,0,1},
+ {0,1,0,1,1,0,1,0},
+ {0,1,0,1,1,1,0,0},
+ {0,1,1,0,0,0,1,1},
+ {0,1,1,0,0,1,0,1},
+ {0,1,1,0,0,1,1,0},
+ {0,1,1,0,1,0,0,1},
+ {0,1,1,0,1,0,1,0},
+ {0,1,1,0,1,1,0,0},
+ {0,1,1,1,0,0,0,1},
+ {0,1,1,1,0,0,1,0},
+ {0,1,1,1,0,1,0,0},
+ {0,1,1,1,1,0,0,0},
+ {1,0,0,0,0,1,1,1},
+ {1,0,0,0,1,0,1,1},
+ {1,0,0,0,1,1,0,1},
+ {1,0,0,0,1,1,1,0},
+ {1,0,0,1,0,0,1,1},
+ {1,0,0,1,0,1,0,1},
+ {1,0,0,1,0,1,1,0},
+ {1,0,0,1,1,0,0,1},
+ {1,0,0,1,1,0,1,0},
+ {1,0,0,1,1,1,0,0},
+ {1,0,1,0,0,0,1,1},
+ {1,0,1,0,0,1,0,1},
+ {1,0,1,0,0,1,1,0},
+ {1,0,1,0,1,0,0,1},
+ {1,0,1,0,1,0,1,0},
+ {1,0,1,0,1,1,0,0},
+ {1,0,1,1,0,0,0,1},
+ {1,0,1,1,0,0,1,0},
+ {1,0,1,1,0,1,0,0},
+ {1,0,1,1,1,0,0,0},
+ {1,1,0,0,0,0,1,1},
+ {1,1,0,0,0,1,0,1},
+ {1,1,0,0,0,1,1,0},
+ {1,1,0,0,1,0,0,1},
+ {1,1,0,0,1,0,1,0},
+ {1,1,0,0,1,1,0,0},
+ {1,1,0,1,0,0,0,1},
+ {1,1,0,1,0,0,1,0},
+ {1,1,0,1,0,1,0,0},
+ {1,1,0,1,1,0,0,0},
+ {1,1,1,0,0,0,0,1},
+ {1,1,1,0,0,0,1,0},
+ {1,1,1,0,0,1,0,0},
+ {1,1,1,0,1,0,0,0},
+ {1,1,1,1,0,0,0,0} }; // }}}
+
+TII::TII(unsigned int dabmode, const tii_config_t& tii_config) :
+ ModCodec(ModFormat(0), ModFormat(0)),
+ RemoteControllable("tii"),
+ m_dabmode(dabmode),
+ m_enable(tii_config.enable),
+ m_comb(tii_config.comb),
+ m_pattern(tii_config.pattern),
+ m_insert(true)
+{
+ PDEBUG("TII::TII(%u) @ %p\n", dabmode, this);
+
+ RC_ADD_PARAMETER(enable, "enable TII [0-1]");
+ RC_ADD_PARAMETER(comb, "TII comb number [0-23]");
+ RC_ADD_PARAMETER(pattern, "TII pattern number [0-69]");
+
+ switch (m_dabmode) {
+ case 1:
+ m_carriers = 1536;
+
+ if (not(0 <= m_pattern and m_pattern <= 69) ) {
+ throw std::runtime_error(
+ "TII::TII pattern not valid!");
+ }
+ break;
+ case 2:
+ m_carriers = 384;
+
+ if (not(0 <= m_pattern and m_pattern <= 69) ) {
+ throw std::runtime_error(
+ "TII::TII pattern not valid!");
+ }
+ break;
+ /* unsupported
+ case 3:
+ m_carriers = 192;
+ break;
+ case 4:
+ d_dabmode = 0;
+ case 0:
+ */
+ default:
+ std::stringstream ss_exception;
+ ss_exception <<
+ "TII::TII DAB mode " << m_dabmode << " not valid!";
+ throw std::runtime_error(ss_exception.str());
+ }
+
+ if (not(0 <= m_comb and m_comb <= 23) ) {
+ throw std::runtime_error(
+ "TII::TII comb not valid!");
+ }
+
+ m_dataIn.clear();
+ m_dataIn.resize(m_carriers);
+ prepare_pattern();
+
+ myOutputFormat.size(m_carriers * sizeof(complexf));
+}
+
+
+TII::~TII()
+{
+ PDEBUG("TII::~TII() @ %p\n", this);
+}
+
+const char* TII::name()
+{
+ // Calculate name on demand because comb and pattern are
+ // modifiable through RC
+ std::stringstream ss;
+ ss << "TII(comb:" << m_comb << ", pattern:" << m_pattern << ")";
+ m_name = ss.str();
+
+ return m_name.c_str();
+}
+
+
+int TII::process(Buffer* const dataIn, Buffer* dataOut)
+{
+ PDEBUG("TII::process(dataIn: %p, dataOut: %p)\n",
+ dataIn, dataOut);
+
+ if ((dataIn != NULL) && (dataIn->getLength() != 0)) {
+ throw std::runtime_error(
+ "TII::process input size not valid!");
+ }
+
+ if (m_enable and m_insert) {
+ boost::mutex::scoped_lock lock(m_dataIn_mutex);
+ dataOut->setData(&m_dataIn[0], m_carriers * sizeof(complexf));
+ }
+ else {
+ dataOut->setLength(m_carriers * sizeof(complexf));
+ bzero(dataOut->getData(), dataOut->getLength());
+ }
+
+ // TODO wrong! Must align with frames containing the right data
+ m_insert = not m_insert;
+
+ return 1;
+}
+
+void TII::enable_carrier(int k) {
+ int ix = m_carriers/2 + k;
+
+ if (ix < 0 or ix+1 >= (ssize_t)m_dataIn.size()) {
+ throw std::runtime_error(
+ "TII::enable_carrier invalid k!");
+ }
+
+ // TODO power of the carrier ?
+ m_dataIn.at(ix) = 1.0;
+ m_dataIn.at(ix+1) = 1.0; // TODO verify if +1 is really correct
+}
+
+void TII::prepare_pattern() {
+ int comb = m_comb; // Convert from unsigned to signed
+
+ boost::mutex::scoped_lock lock(m_dataIn_mutex);
+
+ // Clear previous pattern
+ for (size_t i = 0; i < m_dataIn.size(); i++) {
+ m_dataIn[i] = 0.0;
+ }
+
+ // This could be written more efficiently, but since it is
+ // not performance-critial, it makes sense to write it
+ // in the same way as the specification in
+ // ETSI EN 300 401 Clause 14.8
+ if (m_dabmode == 1) {
+ for (int k = -768; k < -384; k++) {
+ for (int b = 0; b < 8; b++) {
+ if ( k == -768 + 2 * comb + 48 * b and
+ pattern_tm1_2_4[m_pattern][b]) {
+ enable_carrier(k);
+ }
+ }
+ }
+
+ for (int k = -384; k < -0; k++) {
+ for (int b = 0; b < 8; b++) {
+ if ( k == -384 + 2 * comb + 48 * b and
+ pattern_tm1_2_4[m_pattern][b]) {
+ enable_carrier(k);
+ }
+ }
+ }
+
+ for (int k = 1; k <= 384; k++) {
+ for (int b = 0; b < 8; b++) {
+ if ( k == 1 + 2 * comb + 48 * b and
+ pattern_tm1_2_4[m_pattern][b]) {
+ enable_carrier(k);
+ }
+ }
+ }
+
+ for (int k = 384; k <= 768; k++) {
+ for (int b = 0; b < 8; b++) {
+ if ( k == 385 + 2 * comb + 48 * b and
+ pattern_tm1_2_4[m_pattern][b]) {
+ enable_carrier(k);
+ }
+ }
+ }
+ }
+ else if (m_dabmode == 2) {
+ for (int k = -192; k <= 192; k++) {
+ for (int b = 0; b < 4; b++) {
+ if ( k == -192 + 2 * comb + 48 * b and
+ pattern_tm1_2_4[m_pattern][b]) {
+ enable_carrier(k);
+ }
+ }
+
+ for (int b = 4; b < 8; b++) {
+ if ( k == -191 + 2 * comb + 48 * b and
+ pattern_tm1_2_4[m_pattern][b]) {
+ enable_carrier(k);
+ }
+ }
+ }
+ }
+ else {
+ throw std::runtime_error(
+ "TII::TII DAB mode not valid!");
+ }
+}
+
+void TII::set_parameter(const std::string& parameter, const std::string& value)
+{
+ using namespace std;
+ stringstream ss(value);
+ ss.exceptions ( stringstream::failbit | stringstream::badbit );
+
+ if (parameter == "enable") {
+ ss >> m_enable;
+ }
+ else if (parameter == "pattern") {
+ int new_pattern;
+ ss >> new_pattern;
+ if ( (m_dabmode == 1 or m_dabmode == 2) and
+ not(0 <= new_pattern and new_pattern <= 69) ) {
+ throw std::runtime_error(
+ "TII pattern not valid!");
+ }
+ m_pattern = new_pattern;
+ prepare_pattern();
+ }
+ else if (parameter == "comb") {
+ int new_comb;
+ ss >> new_comb;
+ if (not(0 <= new_comb and new_comb <= 23) ) {
+ throw std::runtime_error(
+ "TII comb not valid!");
+ }
+ m_comb = new_comb;
+ prepare_pattern();
+ }
+ else {
+ stringstream ss;
+ ss << "Parameter '" << parameter << "' is not exported by controllable " << get_rc_name();
+ throw ParameterError(ss.str());
+ }
+}
+
+const std::string TII::get_parameter(const std::string& parameter) const
+{
+ using namespace std;
+ stringstream ss;
+ if (parameter == "enable") {
+ ss << (m_enable ? 1 : 0);
+ }
+ else if (parameter == "pattern") {
+ ss << m_pattern;
+ }
+ else if (parameter == "comb") {
+ ss << m_comb;
+ }
+ else {
+ ss << "Parameter '" << parameter << "' is not exported by controllable " << get_rc_name();
+ throw ParameterError(ss.str());
+ }
+ return ss.str();
+}
+
+
+#ifdef TII_TEST
+int main(int argc, char** argv)
+{
+ const unsigned int mode = 2;
+ const unsigned int comb = 4;
+ const unsigned int pattern = 16;
+ TII tii(mode, comb, pattern);
+
+ return 0;
+}
+#endif
+
diff --git a/src/TII.h b/src/TII.h
new file mode 100644
index 0000000..b241bed
--- /dev/null
+++ b/src/TII.h
@@ -0,0 +1,105 @@
+/*
+ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
+ the Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2015
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+
+ TII generation according to ETSI EN 300 401 Clause 14.8
+ */
+/*
+ This file is part of ODR-DabMod.
+
+ ODR-DabMod is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMod is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef TII_H
+#define TII_H
+
+#ifdef HAVE_CONFIG_H
+# include <config.h>
+#endif
+
+#include "ModCodec.h"
+#include "RemoteControl.h"
+
+#include <boost/thread.hpp>
+#include <sys/types.h>
+#include <complex>
+#include <vector>
+#include <string>
+
+struct tii_config_t
+{
+ tii_config_t() : enable(false), comb(0), pattern(0) {}
+
+ bool enable;
+ int comb;
+ int pattern;
+};
+
+class TII : public ModCodec, public RemoteControllable
+{
+ public:
+ TII(unsigned int dabmode, const tii_config_t& tii_config);
+ virtual ~TII();
+
+ int process(Buffer* const dataIn, Buffer* dataOut);
+ const char* name();
+
+ /******* REMOTE CONTROL ********/
+ virtual void set_parameter(const std::string& parameter,
+ const std::string& value);
+
+ virtual const std::string get_parameter(
+ const std::string& parameter) const;
+
+
+ protected:
+ // Fill m_dataIn with the correct carriers for the pattern/comb
+ // combination
+ void prepare_pattern(void);
+
+ // prerequisites: calling thread must hold m_dataIn mutex
+ void enable_carrier(int k);
+
+ // Configuration settings
+ unsigned int m_dabmode;
+
+ // Remote-controllable settings
+ bool m_enable;
+ unsigned int m_comb;
+ unsigned int m_pattern;
+
+ // Internal flag when to insert TII
+ bool m_insert;
+
+ size_t m_carriers;
+
+ std::string m_name;
+
+ // m_dataIn is read by modulator thread, and written
+ // to by RC thread.
+ mutable boost::mutex m_dataIn_mutex;
+ std::vector<std::complex<float> > m_dataIn;
+
+ private:
+ TII(const TII&);
+ TII& operator=(const TII&);
+};
+
+#endif // TII_H
+
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
index 78e9ef0..e5e83ef 100644
--- a/src/ThreadsafeQueue.h
+++ b/src/ThreadsafeQueue.h
@@ -38,25 +38,14 @@
* that pushes elements into the queue, and one consumer that
* retrieves the elements.
*
- * The queue can make the consumer block until enough elements
- * are available.
+ * The queue can make the consumer block until an element
+ * is available.
*/
template<typename T>
class ThreadsafeQueue
{
public:
- /* Create a new queue without any minimum required
- * fill before it is possible to pop an element
- */
- ThreadsafeQueue() : the_required_size(1) {}
-
- /* Create a queue where it has to contain at least
- * required_size elements before pop is possible
- */
- ThreadsafeQueue(size_t required_size) : the_required_size(required_size) {
- }
-
/* Push one element into the queue, and notify another thread that
* might be waiting.
*
@@ -87,14 +76,14 @@ public:
size_t size() const
{
+ boost::mutex::scoped_lock lock(the_mutex);
return the_queue.size();
}
bool try_pop(T& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
- if(the_queue.size() < the_required_size)
- {
+ if (the_queue.empty()) {
return false;
}
@@ -103,10 +92,10 @@ public:
return true;
}
- void wait_and_pop(T& popped_value)
+ void wait_and_pop(T& popped_value, size_t prebuffering = 1)
{
boost::mutex::scoped_lock lock(the_mutex);
- while(the_queue.size() < the_required_size) {
+ while (the_queue.size() < prebuffering) {
the_condition_variable.wait(lock);
}
@@ -118,7 +107,6 @@ private:
std::queue<T> the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
- size_t the_required_size;
};
#endif
diff --git a/src/TimestampDecoder.cpp b/src/TimestampDecoder.cpp
index 6063048..5044366 100644
--- a/src/TimestampDecoder.cpp
+++ b/src/TimestampDecoder.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2014
+ Copyright (C) 2014, 2015
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -29,6 +29,7 @@
#include <fstream>
#include <string>
#include <boost/lexical_cast.hpp>
+#include <boost/make_shared.hpp>
#include <sys/types.h>
#include "PcDebug.h"
#include "TimestampDecoder.h"
@@ -41,7 +42,8 @@
void TimestampDecoder::calculateTimestamp(struct frame_timestamp& ts)
{
- struct frame_timestamp* ts_queued = new struct frame_timestamp;
+ boost::shared_ptr<struct frame_timestamp> ts_queued =
+ boost::make_shared<struct frame_timestamp>();
/* Push new timestamp into queue */
ts_queued->timestamp_valid = full_timestamp_received_mnsc;
@@ -62,8 +64,8 @@ void TimestampDecoder::calculateTimestamp(struct frame_timestamp& ts)
*
* Therefore, use <= and not < for comparison
*/
- if (queue_timestamps.size() <= modconfig.delay_calculation_pipeline_stages) {
- //fprintf(stderr, "* %zu %u ", queue_timestamps.size(), modconfig.delay_calculation_pipeline_stages);
+ if (queue_timestamps.size() <= m_tist_delay_stages) {
+ //fprintf(stderr, "* %zu %u ", queue_timestamps.size(), m_tist_delay_stages);
/* Return invalid timestamp until the queue is full */
ts.timestamp_valid = false;
ts.timestamp_sec = 0;
@@ -87,16 +89,14 @@ void TimestampDecoder::calculateTimestamp(struct frame_timestamp& ts)
ts.timestamp_sec,
ts.timestamp_pps_offset,
ts.timestamp_refresh);*/
-
- delete ts_queued;
}
MDEBUG("Timestamp queue size %zu, delay_calc %u\n",
queue_timestamps.size(),
- modconfig.delay_calculation_pipeline_stages);
+ m_tist_delay_stages);
- if (queue_timestamps.size() > modconfig.delay_calculation_pipeline_stages) {
- myLogger.level(error) << "Error: Timestamp queue is too large : size " <<
+ if (queue_timestamps.size() > m_tist_delay_stages) {
+ etiLog.level(error) << "Error: Timestamp queue is too large : size " <<
queue_timestamps.size() << "! This should not happen !";
}
@@ -198,77 +198,41 @@ void TimestampDecoder::updateTimestampEti(
latestFCT = fct;
}
-
-bool TimestampDecoder::updateModulatorOffset()
+void TimestampDecoder::set_parameter(
+ const std::string& parameter,
+ const std::string& value)
{
using namespace std;
- using boost::lexical_cast;
- using boost::bad_lexical_cast;
- if (modconfig.use_offset_fixed)
- {
- timestamp_offset = modconfig.offset_fixed;
- return true;
- }
- else if (modconfig.use_offset_file)
- {
- bool r = false;
- double newoffset;
+ stringstream ss(value);
+ ss.exceptions ( stringstream::failbit | stringstream::badbit );
- std::string filedata;
- ifstream filestream;
-
- try
- {
- filestream.open(modconfig.offset_filename.c_str());
- if (!filestream.eof())
- {
- getline(filestream, filedata);
- try
- {
- newoffset = lexical_cast<double>(filedata);
- r = true;
- }
- catch (bad_lexical_cast& e)
- {
- myLogger.level(error) <<
- "Error parsing timestamp offset from file '" <<
- modconfig.offset_filename << "'";
- r = false;
- }
- }
- else
- {
- myLogger.level(error) <<
- "Error reading from timestamp offset file: eof reached\n";
- r = false;
- }
- filestream.close();
- }
- catch (exception& e)
- {
- myLogger.level(error) << "Error opening timestamp offset file\n";
- r = false;
- }
-
-
- if (r)
- {
- if (timestamp_offset != newoffset)
- {
- timestamp_offset = newoffset;
- myLogger.level(info) <<
- "TimestampDecoder::updateTimestampOffset: new offset is " <<
- timestamp_offset;
- offset_changed = true;
- }
+ if (parameter == "offset") {
+ ss >> timestamp_offset;
+ offset_changed = true;
+ }
+ else {
+ stringstream ss;
+ ss << "Parameter '" << parameter
+ << "' is not exported by controllable " << get_rc_name();
+ throw ParameterError(ss.str());
+ }
+}
- }
+const std::string TimestampDecoder::get_parameter(
+ const std::string& parameter) const
+{
+ using namespace std;
- return r;
+ stringstream ss;
+ if (parameter == "offset") {
+ ss << timestamp_offset;
}
else {
- return false;
+ ss << "Parameter '" << parameter <<
+ "' is not exported by controllable " << get_rc_name();
+ throw ParameterError(ss.str());
}
+ return ss.str();
}
diff --git a/src/TimestampDecoder.h b/src/TimestampDecoder.h
index 8c6b362..d8ab633 100644
--- a/src/TimestampDecoder.h
+++ b/src/TimestampDecoder.h
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2014
+ Copyright (C) 2014, 2015
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -28,29 +28,14 @@
#define TIMESTAMP_DECODER_H
#include <queue>
+#include <boost/shared_ptr.hpp>
#include <string>
#include <time.h>
#include <math.h>
#include <stdio.h>
#include "Eti.h"
#include "Log.h"
-
-struct modulator_offset_config
-{
- bool use_offset_fixed;
- double offset_fixed;
- /* These two fields are used when the modulator is run with a fixed offset */
-
- bool use_offset_file;
- std::string offset_filename;
- /* These two fields are used when the modulator reads the offset from a file */
-
- unsigned delay_calculation_pipeline_stages;
- /* Specifies by how many stages the timestamp must be delayed.
- * (e.g. The FIRFilter is pipelined, therefore we must increase
- * delay_calculation_pipeline_stages by one if the filter is used
- */
-};
+#include "RemoteControl.h"
struct frame_timestamp
{
@@ -109,14 +94,24 @@ struct frame_timestamp
};
/* This module decodes MNSC time information */
-class TimestampDecoder
+class TimestampDecoder : public RemoteControllable
{
public:
TimestampDecoder(
- struct modulator_offset_config& config,
- Logger& logger):
- myLogger(logger), modconfig(config)
+ /* The modulator adds this offset to the TIST to define time of
+ * frame transmission
+ */
+ double offset_s,
+
+ /* Specifies by how many stages the timestamp must be delayed.
+ * (e.g. The FIRFilter is pipelined, therefore we must increase
+ * tist_delay_stages by one if the filter is used
+ */
+ unsigned tist_delay_stages) :
+ RemoteControllable("tist")
{
+ timestamp_offset = offset_s;
+ m_tist_delay_stages = tist_delay_stages;
inhibit_second_update = 0;
time_pps = 0.0;
time_secs = 0;
@@ -126,10 +121,10 @@ class TimestampDecoder
gmtime_r(0, &temp_time);
offset_changed = false;
- myLogger.level(info) << "Setting up timestamp decoder with " <<
- (modconfig.use_offset_fixed ? "fixed" :
- (modconfig.use_offset_file ? "dynamic" : "none")) <<
- " offset";
+ RC_ADD_PARAMETER(offset, "TIST offset [s]");
+
+ etiLog.level(info) << "Setting up timestamp decoder with " <<
+ timestamp_offset << " offset";
};
@@ -143,14 +138,23 @@ class TimestampDecoder
double pps,
int32_t fct);
- /* Update the modulator timestamp offset according to the modconf
+ /*********** REMOTE CONTROL ***************/
+ /* virtual void enrol_at(BaseRemoteController& controller)
+ * is inherited
*/
- bool updateModulatorOffset();
- protected:
- /* Main program logger */
- Logger& myLogger;
+ /* Base function to set parameters. */
+ virtual void set_parameter(const std::string& parameter,
+ const std::string& value);
+
+ /* Getting a parameter always returns a string. */
+ virtual const std::string get_parameter(
+ const std::string& parameter) const;
+ const char* name() { return "TS"; }
+
+
+ protected:
/* Push a new MNSC field into the decoder */
void pushMNSCData(int framephase, uint16_t mnsc);
@@ -171,12 +175,10 @@ class TimestampDecoder
int32_t latestFCT;
double time_pps;
double timestamp_offset;
+ unsigned m_tist_delay_stages;
int inhibit_second_update;
bool offset_changed;
- /* configuration for the offset management */
- struct modulator_offset_config& modconfig;
-
/* When the type or identifier don't match, the decoder must
* be disabled
*/
@@ -190,8 +192,9 @@ class TimestampDecoder
* synchronise two modulators if only one uses (for instance) the
* FIRFilter (1 stage pipeline)
*/
- std::queue<struct frame_timestamp*> queue_timestamps;
+ std::queue<boost::shared_ptr<struct frame_timestamp> > queue_timestamps;
};
#endif
+
diff --git a/src/Utils.cpp b/src/Utils.cpp
index 8b97602..6c9b0fc 100644
--- a/src/Utils.cpp
+++ b/src/Utils.cpp
@@ -49,7 +49,6 @@ void printUsage(char* progName)
" (-f filename | -u uhddevice -F frequency) "
" [-G txgain]"
" [-o offset]"
- " [-O offsetfile]"
" [-T filter_taps_file]"
" [-a gain]"
" [-c clockrate]"
@@ -66,9 +65,7 @@ void printUsage(char* progName)
fprintf(out, "-F frequency: Set the transmit frequency when using UHD output. (mandatory option when using UHD)\n");
fprintf(out, "-G txgain: Set the transmit gain for the UHD driver (default: 0)\n");
fprintf(out, "-o: (UHD only) Set the timestamp offset added to the timestamp in the ETI. The offset is a double.\n");
- fprintf(out, "-O: (UHD only) Set the file containing the timestamp offset added to the timestamp in the ETI.\n"
- "The file is read every six seconds, and must contain a double value.\n");
- fprintf(out, " Specifying either -o or -O has two implications: It enables synchronous transmission,\n"
+ fprintf(out, " Specifying this option has two implications: It enables synchronous transmission,\n"
" requiring an external REFCLK and PPS signal and frames that do not contain a valid timestamp\n"
" get muted.\n\n");
fprintf(out, "-T taps_file: Enable filtering before the output, using the specified file containing the filter taps.\n");
diff --git a/src/Utils.h b/src/Utils.h
index 7c3129c..f023646 100644
--- a/src/Utils.h
+++ b/src/Utils.h
@@ -35,10 +35,28 @@
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
+#include <time.h>
void printUsage(char* progName);
void printVersion(void);
+inline long timespecdiff_us(struct timespec& oldTime, struct timespec& time)
+{
+ long tv_sec;
+ long tv_nsec;
+ if (time.tv_nsec < oldTime.tv_nsec) {
+ tv_sec = time.tv_sec - 1 - oldTime.tv_sec;
+ tv_nsec = 1000000000L + time.tv_nsec - oldTime.tv_nsec;
+ }
+ else {
+ tv_sec = time.tv_sec - oldTime.tv_sec;
+ tv_nsec = time.tv_nsec - oldTime.tv_nsec;
+ }
+
+ return tv_sec * 1000 + tv_nsec / 1000;
+}
+
+
#endif