summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2015-02-22 21:39:56 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2015-02-22 21:39:56 +0100
commit87b708657a78cac4690ba81967e5ca03d7faab09 (patch)
tree029f6bdc0cc65d9a1ad2849624e947a9404b94b8 /src
parent1ada0901a8fa687576fa4953044fd43bc6c06f8a (diff)
downloaddabmod-87b708657a78cac4690ba81967e5ca03d7faab09.tar.gz
dabmod-87b708657a78cac4690ba81967e5ca03d7faab09.tar.bz2
dabmod-87b708657a78cac4690ba81967e5ca03d7faab09.zip
Restart full modulator on ZeroMQ input overrun
Diffstat (limited to 'src')
-rw-r--r--src/DabMod.cpp82
-rw-r--r--src/InputReader.h1
-rw-r--r--src/InputZeroMQReader.cpp9
-rw-r--r--src/OutputUHD.cpp4
4 files changed, 56 insertions, 40 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 8178a75..3548f9d 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -172,7 +172,7 @@ int main(int argc, char* argv[])
Logger logger;
InputFileReader inputFileReader(logger);
#if defined(HAVE_ZEROMQ)
- InputZeroMQReader inputZeroMQReader(logger);
+ shared_ptr<InputZeroMQReader> inputZeroMQReader(new InputZeroMQReader(logger));
#endif
struct sigaction sa;
@@ -707,14 +707,8 @@ int main(int argc, char* argv[])
ret = -1;
throw std::runtime_error("Unable to open input");
#else
- // The URL might start with zmq+tcp://
- if (inputName.substr(0, 4) == "zmq+") {
- inputZeroMQReader.Open(inputName.substr(4), inputMaxFramesQueued);
- }
- else {
- inputZeroMQReader.Open(inputName, inputMaxFramesQueued);
- }
- m.inputReader = &inputZeroMQReader;
+ inputZeroMQReader->Open(inputName, inputMaxFramesQueued);
+ m.inputReader = inputZeroMQReader.get();
#endif
}
else
@@ -724,38 +718,39 @@ int main(int argc, char* argv[])
throw std::runtime_error("Unable to open input");
}
- while (run_again) {
- Flowgraph flowgraph;
-
- if (useFileOutput) {
- if (fileOutputFormat == "complexf") {
- output = shared_ptr<OutputFile>(new OutputFile(outputName));
- }
- else if (fileOutputFormat == "s8") {
- // We must normalise the samples to the interval [-127.0; 127.0]
- normalise = 127.0f / normalise_factor;
+ if (useFileOutput) {
+ if (fileOutputFormat == "complexf") {
+ output = shared_ptr<OutputFile>(new OutputFile(outputName));
+ }
+ else if (fileOutputFormat == "s8") {
+ // We must normalise the samples to the interval [-127.0; 127.0]
+ normalise = 127.0f / normalise_factor;
- format_converter = shared_ptr<FormatConverter>(new FormatConverter());
+ format_converter = shared_ptr<FormatConverter>(new FormatConverter());
- output = shared_ptr<OutputFile>(new OutputFile(outputName));
- }
+ output = shared_ptr<OutputFile>(new OutputFile(outputName));
}
+ }
#if defined(HAVE_OUTPUT_UHD)
- else if (useUHDOutput) {
- normalise = 1.0f / normalise_factor;
- outputuhd_conf.sampleRate = outputRate;
- output = shared_ptr<OutputUHD>(new OutputUHD(outputuhd_conf, logger));
- ((OutputUHD*)output.get())->enrol_at(rcs);
- }
+ else if (useUHDOutput) {
+ normalise = 1.0f / normalise_factor;
+ outputuhd_conf.sampleRate = outputRate;
+ output = shared_ptr<OutputUHD>(new OutputUHD(outputuhd_conf, logger));
+ ((OutputUHD*)output.get())->enrol_at(rcs);
+ }
#endif
#if defined(HAVE_ZEROMQ)
- else if (useZeroMQOutput) {
- /* We normalise the same way as for the UHD output */
- normalise = 1.0f / normalise_factor;
- output = shared_ptr<OutputZeroMQ>(new OutputZeroMQ(outputName));
- }
+ else if (useZeroMQOutput) {
+ /* We normalise the same way as for the UHD output */
+ normalise = 1.0f / normalise_factor;
+ output = shared_ptr<OutputZeroMQ>(new OutputZeroMQ(outputName));
+ }
#endif
+
+ while (run_again) {
+ Flowgraph flowgraph;
+
m.flowgraph = &flowgraph;
m.data.setLength(6144);
@@ -789,16 +784,27 @@ int main(int argc, char* argv[])
run_again = false;
ret = 1;
break;
+#if defined(HAVE_ZEROMQ)
+ case MOD_AGAIN:
+ fprintf(stderr, "\nRestart modulator\n");
+ running = true;
+ if (inputTransport == "zeromq") {
+ run_again = true;
+
+ // Create a new input reader
+ inputZeroMQReader = shared_ptr<InputZeroMQReader>(
+ new InputZeroMQReader(logger));
+ inputZeroMQReader->Open(inputName, inputMaxFramesQueued);
+ m.inputReader = inputZeroMQReader.get();
+ }
+ break;
+#endif
case MOD_NORMAL_END:
+ default:
fprintf(stderr, "\nModulator stopped.\n");
ret = 0;
run_again = false;
break;
- case MOD_AGAIN:
- fprintf(stderr, "\nRestart modulator\n");
- run_again = true;
- running = true;
- break;
}
fprintf(stderr, "\n\n");
diff --git a/src/InputReader.h b/src/InputReader.h
index ee7d657..e45e36d 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -175,6 +175,7 @@ class InputZeroMQReader : public InputReader
logger_(logger), in_messages_(10)
{
workerdata_.in_messages = &in_messages_;
+ workerdata_.running = false;
}
~InputZeroMQReader()
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index 5fab447..7ac7d41 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -64,7 +64,14 @@ struct zmq_dab_message_t
int InputZeroMQReader::Open(const std::string& uri, unsigned max_queued_frames)
{
- uri_ = uri;
+ // The URL might start with zmq+tcp://
+ if (uri.substr(0, 4) == "zmq+") {
+ uri_ = uri.substr(4);
+ }
+ else {
+ uri_ = uri;
+ }
+
workerdata_.uri = uri;
workerdata_.max_queued_frames = max_queued_frames;
// launch receiver thread
diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp
index d033700..efdf6df 100644
--- a/src/OutputUHD.cpp
+++ b/src/OutputUHD.cpp
@@ -496,7 +496,7 @@ void UHDWorker::process()
md.time_spec = uhd::time_spec_t(tx_second, pps_offset);
// md is defined, let's do some checks
- if (md.time_spec.get_real_secs() + 0.2 < usrp_time) {
+ if (md.time_spec.get_real_secs() + timeout < usrp_time) {
uwd->logger->level(warn) <<
"OutputUHD: Timestamp in the past! offset: " <<
md.time_spec.get_real_secs() - usrp_time <<
@@ -507,12 +507,14 @@ void UHDWorker::process()
goto loopend; //skip the frame
}
+#if 0 // Let uhd handle this
if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_MARGIN_FUTURE) {
uwd->logger->level(warn) <<
"OutputUHD: Timestamp too far in the future! offset: " <<
md.time_spec.get_real_secs() - usrp_time;
usleep(20000); //sleep so as to fill buffers
}
+#endif
if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_ABORT_FUTURE) {
uwd->logger->level(error) <<