diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-02-22 21:39:56 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-02-22 21:39:56 +0100 |
commit | 87b708657a78cac4690ba81967e5ca03d7faab09 (patch) | |
tree | 029f6bdc0cc65d9a1ad2849624e947a9404b94b8 | |
parent | 1ada0901a8fa687576fa4953044fd43bc6c06f8a (diff) | |
download | dabmod-87b708657a78cac4690ba81967e5ca03d7faab09.tar.gz dabmod-87b708657a78cac4690ba81967e5ca03d7faab09.tar.bz2 dabmod-87b708657a78cac4690ba81967e5ca03d7faab09.zip |
Restart full modulator on ZeroMQ input overrun
-rw-r--r-- | src/DabMod.cpp | 82 | ||||
-rw-r--r-- | src/InputReader.h | 1 | ||||
-rw-r--r-- | src/InputZeroMQReader.cpp | 9 | ||||
-rw-r--r-- | src/OutputUHD.cpp | 4 |
4 files changed, 56 insertions, 40 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 8178a75..3548f9d 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -172,7 +172,7 @@ int main(int argc, char* argv[]) Logger logger; InputFileReader inputFileReader(logger); #if defined(HAVE_ZEROMQ) - InputZeroMQReader inputZeroMQReader(logger); + shared_ptr<InputZeroMQReader> inputZeroMQReader(new InputZeroMQReader(logger)); #endif struct sigaction sa; @@ -707,14 +707,8 @@ int main(int argc, char* argv[]) ret = -1; throw std::runtime_error("Unable to open input"); #else - // The URL might start with zmq+tcp:// - if (inputName.substr(0, 4) == "zmq+") { - inputZeroMQReader.Open(inputName.substr(4), inputMaxFramesQueued); - } - else { - inputZeroMQReader.Open(inputName, inputMaxFramesQueued); - } - m.inputReader = &inputZeroMQReader; + inputZeroMQReader->Open(inputName, inputMaxFramesQueued); + m.inputReader = inputZeroMQReader.get(); #endif } else @@ -724,38 +718,39 @@ int main(int argc, char* argv[]) throw std::runtime_error("Unable to open input"); } - while (run_again) { - Flowgraph flowgraph; - - if (useFileOutput) { - if (fileOutputFormat == "complexf") { - output = shared_ptr<OutputFile>(new OutputFile(outputName)); - } - else if (fileOutputFormat == "s8") { - // We must normalise the samples to the interval [-127.0; 127.0] - normalise = 127.0f / normalise_factor; + if (useFileOutput) { + if (fileOutputFormat == "complexf") { + output = shared_ptr<OutputFile>(new OutputFile(outputName)); + } + else if (fileOutputFormat == "s8") { + // We must normalise the samples to the interval [-127.0; 127.0] + normalise = 127.0f / normalise_factor; - format_converter = shared_ptr<FormatConverter>(new FormatConverter()); + format_converter = shared_ptr<FormatConverter>(new FormatConverter()); - output = shared_ptr<OutputFile>(new OutputFile(outputName)); - } + output = shared_ptr<OutputFile>(new OutputFile(outputName)); } + } #if defined(HAVE_OUTPUT_UHD) - else if (useUHDOutput) { - normalise = 1.0f / normalise_factor; - outputuhd_conf.sampleRate = outputRate; - output = shared_ptr<OutputUHD>(new OutputUHD(outputuhd_conf, logger)); - ((OutputUHD*)output.get())->enrol_at(rcs); - } + else if (useUHDOutput) { + normalise = 1.0f / normalise_factor; + outputuhd_conf.sampleRate = outputRate; + output = shared_ptr<OutputUHD>(new OutputUHD(outputuhd_conf, logger)); + ((OutputUHD*)output.get())->enrol_at(rcs); + } #endif #if defined(HAVE_ZEROMQ) - else if (useZeroMQOutput) { - /* We normalise the same way as for the UHD output */ - normalise = 1.0f / normalise_factor; - output = shared_ptr<OutputZeroMQ>(new OutputZeroMQ(outputName)); - } + else if (useZeroMQOutput) { + /* We normalise the same way as for the UHD output */ + normalise = 1.0f / normalise_factor; + output = shared_ptr<OutputZeroMQ>(new OutputZeroMQ(outputName)); + } #endif + + while (run_again) { + Flowgraph flowgraph; + m.flowgraph = &flowgraph; m.data.setLength(6144); @@ -789,16 +784,27 @@ int main(int argc, char* argv[]) run_again = false; ret = 1; break; +#if defined(HAVE_ZEROMQ) + case MOD_AGAIN: + fprintf(stderr, "\nRestart modulator\n"); + running = true; + if (inputTransport == "zeromq") { + run_again = true; + + // Create a new input reader + inputZeroMQReader = shared_ptr<InputZeroMQReader>( + new InputZeroMQReader(logger)); + inputZeroMQReader->Open(inputName, inputMaxFramesQueued); + m.inputReader = inputZeroMQReader.get(); + } + break; +#endif case MOD_NORMAL_END: + default: fprintf(stderr, "\nModulator stopped.\n"); ret = 0; run_again = false; break; - case MOD_AGAIN: - fprintf(stderr, "\nRestart modulator\n"); - run_again = true; - running = true; - break; } fprintf(stderr, "\n\n"); diff --git a/src/InputReader.h b/src/InputReader.h index ee7d657..e45e36d 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -175,6 +175,7 @@ class InputZeroMQReader : public InputReader logger_(logger), in_messages_(10) { workerdata_.in_messages = &in_messages_; + workerdata_.running = false; } ~InputZeroMQReader() diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 5fab447..7ac7d41 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -64,7 +64,14 @@ struct zmq_dab_message_t int InputZeroMQReader::Open(const std::string& uri, unsigned max_queued_frames) { - uri_ = uri; + // The URL might start with zmq+tcp:// + if (uri.substr(0, 4) == "zmq+") { + uri_ = uri.substr(4); + } + else { + uri_ = uri; + } + workerdata_.uri = uri; workerdata_.max_queued_frames = max_queued_frames; // launch receiver thread diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index d033700..efdf6df 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -496,7 +496,7 @@ void UHDWorker::process() md.time_spec = uhd::time_spec_t(tx_second, pps_offset); // md is defined, let's do some checks - if (md.time_spec.get_real_secs() + 0.2 < usrp_time) { + if (md.time_spec.get_real_secs() + timeout < usrp_time) { uwd->logger->level(warn) << "OutputUHD: Timestamp in the past! offset: " << md.time_spec.get_real_secs() - usrp_time << @@ -507,12 +507,14 @@ void UHDWorker::process() goto loopend; //skip the frame } +#if 0 // Let uhd handle this if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_MARGIN_FUTURE) { uwd->logger->level(warn) << "OutputUHD: Timestamp too far in the future! offset: " << md.time_spec.get_real_secs() - usrp_time; usleep(20000); //sleep so as to fill buffers } +#endif if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_ABORT_FUTURE) { uwd->logger->level(error) << |