From 2e646a26c9db66dd5776667d9c9b73d798f5ffda Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 23 Jan 2015 10:46:26 +0100 Subject: Merge input-zeromq and output-zeromq configure options --- src/InputZeroMQReader.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/InputZeroMQReader.cpp') diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index cfb56b2..f7f5702 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -29,7 +29,7 @@ # include "config.h" #endif -#if defined(HAVE_INPUT_ZEROMQ) +#if defined(HAVE_ZEROMQ) #include #include -- cgit v1.2.3 From c5c21c73c310c29675bff1a1f2da4ddd298c0f92 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 20 Feb 2015 10:12:06 +0100 Subject: Add max_frames_queued option for zmq input --- doc/example.ini | 3 +++ src/DabMod.cpp | 10 ++++++++-- src/InputReader.h | 6 ++++-- src/InputZeroMQReader.cpp | 9 ++++----- 4 files changed, 19 insertions(+), 9 deletions(-) (limited to 'src/InputZeroMQReader.cpp') diff --git a/doc/example.ini b/doc/example.ini index cec0f23..3c51142 100644 --- a/doc/example.ini +++ b/doc/example.ini @@ -49,6 +49,9 @@ loop=0 ; When recieving data using ZeroMQ, the source is the URI to be used ;transport=zeromq ;source=tcp://localhost:8080 +; The option max_frames_queued defines the maximum number of ETI frames +; that can be in the input queue +;max_frames_queued=100 [modulator] ; Gain mode: 0=FIX, 1=MAX, 2=VAR diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 1bbfc99..f546e45 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -68,6 +68,8 @@ # define memalign(a, b) malloc(b) #endif +#define ZMQ_INPUT_MAX_FRAME_QUEUE 50 + typedef std::complex complexf; @@ -87,6 +89,7 @@ int main(int argc, char* argv[]) bool loop = false; std::string inputName = ""; std::string inputTransport = "file"; + unsigned inputMaxFramesQueued = ZMQ_INPUT_MAX_FRAME_QUEUE; std::string outputName; int useZeroMQOutput = 0; @@ -362,6 +365,9 @@ int main(int argc, char* argv[]) } inputTransport = pt.get("input.transport", "file"); + inputMaxFramesQueued = pt.get("input.max_frames_queued", + ZMQ_INPUT_MAX_FRAME_QUEUE); + inputName = pt.get("input.source", "/dev/stdin"); // log parameters: @@ -677,10 +683,10 @@ int main(int argc, char* argv[]) #else // The URL might start with zmq+tcp:// if (inputName.substr(0, 4) == "zmq+") { - inputZeroMQReader.Open(inputName.substr(4)); + inputZeroMQReader.Open(inputName.substr(4), inputMaxFramesQueued); } else { - inputZeroMQReader.Open(inputName); + inputZeroMQReader.Open(inputName, inputMaxFramesQueued); } inputReader = &inputZeroMQReader; #endif diff --git a/src/InputReader.h b/src/InputReader.h index 3e0dcab..3e3e000 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyrigth (C) 2013 + Copyrigth (C) 2013, 2015 Matthias P. Braendli, matthias.braendli@mpb.li */ /* @@ -137,6 +137,7 @@ struct InputZeroMQThreadData { ThreadsafeQueue *in_messages; std::string uri; + unsigned max_queued_frames; }; class InputZeroMQWorker @@ -179,7 +180,7 @@ class InputZeroMQReader : public InputReader worker_.Stop(); } - int Open(std::string uri); + int Open(const std::string& uri, unsigned max_queued_frames); int GetNextFrame(void* buffer); @@ -197,3 +198,4 @@ class InputZeroMQReader : public InputReader #endif #endif + diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index f7f5702..01d8720 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013, 2014 + Copyright (C) 2013, 2014, 2015 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -41,8 +41,6 @@ #include "InputReader.h" #include "PcDebug.h" -#define MAX_QUEUE_SIZE 50 - #define NUM_FRAMES_PER_ZMQ_MESSAGE 4 /* A concatenation of four ETI frames, * whose maximal size is 6144. @@ -64,10 +62,11 @@ struct zmq_dab_message_t uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144]; }; -int InputZeroMQReader::Open(std::string uri) +int InputZeroMQReader::Open(const std::string& uri, unsigned max_queued_frames) { uri_ = uri; workerdata_.uri = uri; + workerdata_.max_queued_frames = max_queued_frames; // launch receiver thread worker_.Start(&workerdata_); @@ -123,7 +122,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) } m_to_drop--; } - else if (queue_size < MAX_QUEUE_SIZE) { + else if (queue_size < workerdata->max_queued_frames) { if (buffer_full) { fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n", queue_size); -- cgit v1.2.3 From 1ada0901a8fa687576fa4953044fd43bc6c06f8a Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 22 Feb 2015 17:56:49 +0100 Subject: Move main flowgraph to distinct function --- src/DabMod.cpp | 193 ++++++++++++++++++++++++++++++---------------- src/InputReader.h | 2 + src/InputZeroMQReader.cpp | 11 +++ 3 files changed, 141 insertions(+), 65 deletions(-) (limited to 'src/InputZeroMQReader.cpp') diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 1f6eedf..8178a75 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -85,6 +85,30 @@ void signalHandler(int signalNb) running = 0; } +struct modulator_data +{ + modulator_data() : + inputReader(NULL), + framecount(0), + flowgraph(NULL), + rcs(NULL) {} + + InputReader* inputReader; + Buffer data; + uint64_t framecount; + + Flowgraph* flowgraph; + RemoteControllers* rcs; +}; + +enum run_modulator_state { + MOD_FAILURE, + MOD_NORMAL_END, + MOD_AGAIN +}; + +run_modulator_state run_modulator(modulator_data& m); + int main(int argc, char* argv[]) { int ret = 0; @@ -99,14 +123,12 @@ int main(int argc, char* argv[]) std::string fileOutputFormat = "complexf"; int useUHDOutput = 0; - uint64_t frame = 0; size_t outputRate = 2048000; size_t clockRate = 0; unsigned dabMode = 0; float digitalgain = 1.0f; float normalise = 1.0f; GainMode gainMode = GAIN_VAR; - Buffer data; /* UHD requires the input I and Q samples to be in the interval @@ -130,6 +152,8 @@ int main(int argc, char* argv[]) OutputUHDConfig outputuhd_conf; #endif + modulator_data m; + // To handle the timestamp offset of the modulator struct modulator_offset_config modconf; modconf.use_offset_file = false; @@ -141,13 +165,15 @@ int main(int argc, char* argv[]) shared_ptr output; RemoteControllers rcs; + m.rcs = &rcs; + + bool run_again = true; Logger logger; InputFileReader inputFileReader(logger); #if defined(HAVE_ZEROMQ) InputZeroMQReader inputZeroMQReader(logger); #endif - InputReader* inputReader; struct sigaction sa; memset(&sa, 0, sizeof(struct sigaction)); @@ -673,7 +699,7 @@ int main(int argc, char* argv[]) throw std::runtime_error("Unable to open input"); } - inputReader = &inputFileReader; + m.inputReader = &inputFileReader; } else if (inputTransport == "zeromq") { #if !defined(HAVE_ZEROMQ) @@ -688,7 +714,7 @@ int main(int argc, char* argv[]) else { inputZeroMQReader.Open(inputName, inputMaxFramesQueued); } - inputReader = &inputZeroMQReader; + m.inputReader = &inputZeroMQReader; #endif } else @@ -698,59 +724,103 @@ int main(int argc, char* argv[]) throw std::runtime_error("Unable to open input"); } - if (useFileOutput) { - if (fileOutputFormat == "complexf") { - output = shared_ptr(new OutputFile(outputName)); - } - else if (fileOutputFormat == "s8") { - // We must normalise the samples to the interval [-127.0; 127.0] - normalise = 127.0f / normalise_factor; + while (run_again) { + Flowgraph flowgraph; - format_converter = shared_ptr(new FormatConverter()); + if (useFileOutput) { + if (fileOutputFormat == "complexf") { + output = shared_ptr(new OutputFile(outputName)); + } + else if (fileOutputFormat == "s8") { + // We must normalise the samples to the interval [-127.0; 127.0] + normalise = 127.0f / normalise_factor; - output = shared_ptr(new OutputFile(outputName)); + format_converter = shared_ptr(new FormatConverter()); + + output = shared_ptr(new OutputFile(outputName)); + } } - } #if defined(HAVE_OUTPUT_UHD) - else if (useUHDOutput) { - - normalise = 1.0f / normalise_factor; - - outputuhd_conf.sampleRate = outputRate; - output = shared_ptr(new OutputUHD(outputuhd_conf, logger)); - ((OutputUHD*)output.get())->enrol_at(rcs); - } + else if (useUHDOutput) { + normalise = 1.0f / normalise_factor; + outputuhd_conf.sampleRate = outputRate; + output = shared_ptr(new OutputUHD(outputuhd_conf, logger)); + ((OutputUHD*)output.get())->enrol_at(rcs); + } #endif #if defined(HAVE_ZEROMQ) - else if (useZeroMQOutput) { - /* We normalise the same way as for the UHD output */ - normalise = 1.0f / normalise_factor; - - output = shared_ptr(new OutputZeroMQ(outputName)); - } + else if (useZeroMQOutput) { + /* We normalise the same way as for the UHD output */ + normalise = 1.0f / normalise_factor; + output = shared_ptr(new OutputZeroMQ(outputName)); + } #endif - data.setLength(6144); - shared_ptr input(new InputMemory(&data)); - shared_ptr modulator(new DabModulator(modconf, &rcs, logger, outputRate, clockRate, - dabMode, gainMode, digitalgain, normalise, filterTapsFilename)); - flowgraph->connect(input, modulator); - if (format_converter) { - flowgraph->connect(modulator, format_converter); - flowgraph->connect(format_converter, output); - } - else { - flowgraph->connect(modulator, output); - } + m.flowgraph = &flowgraph; + m.data.setLength(6144); + + shared_ptr input(new InputMemory(&m.data)); + shared_ptr modulator( + new DabModulator(modconf, &rcs, logger, outputRate, clockRate, + dabMode, gainMode, digitalgain, normalise, filterTapsFilename)); + + flowgraph.connect(input, modulator); + if (format_converter) { + flowgraph.connect(modulator, format_converter); + flowgraph.connect(format_converter, output); + } + else { + flowgraph.connect(modulator, output); + } #if defined(HAVE_OUTPUT_UHD) - if (useUHDOutput) { - ((OutputUHD*)output.get())->setETIReader(modulator->getEtiReader()); - } + if (useUHDOutput) { + ((OutputUHD*)output.get())->setETIReader(modulator->getEtiReader()); + } #endif - inputReader->PrintInfo(); + m.inputReader->PrintInfo(); + + run_modulator_state st = run_modulator(m); + + switch (st) { + case MOD_FAILURE: + fprintf(stderr, "\nModulator failure.\n"); + run_again = false; + ret = 1; + break; + case MOD_NORMAL_END: + fprintf(stderr, "\nModulator stopped.\n"); + ret = 0; + run_again = false; + break; + case MOD_AGAIN: + fprintf(stderr, "\nRestart modulator\n"); + run_again = true; + running = true; + break; + } + fprintf(stderr, "\n\n"); + fprintf(stderr, "%lu DAB frames encoded\n", m.framecount); + fprintf(stderr, "%f seconds encoded\n", (float)m.framecount * 0.024f); + + fprintf(stderr, "\nCleaning flowgraph...\n"); + + m.data.setLength(0); + } + + //////////////////////////////////////////////////////////////////////// + // Cleaning things + //////////////////////////////////////////////////////////////////////// + + logger.level(info) << "Terminating"; + return ret; +} + +run_modulator_state run_modulator(modulator_data& m) +{ + run_modulator_state ret = MOD_FAILURE; try { while (running) { @@ -759,26 +829,26 @@ int main(int argc, char* argv[]) PDEBUG("*****************************************\n"); PDEBUG("* Starting main loop\n"); PDEBUG("*****************************************\n"); - while ((framesize = inputReader->GetNextFrame(data.getData())) > 0) { + while ((framesize = m.inputReader->GetNextFrame(m.data.getData())) > 0) { if (!running) { break; } - frame++; + m.framecount++; PDEBUG("*****************************************\n"); - PDEBUG("* Read frame %lu\n", frame); + PDEBUG("* Read frame %lu\n", m.framecount); PDEBUG("*****************************************\n"); //////////////////////////////////////////////////////////////// - // Proccessing data + // Processing data //////////////////////////////////////////////////////////////// - flowgraph->run(); + m.flowgraph->run(); /* Check every once in a while if the remote control * is still working */ - if (rcs.get_no_controllers() > 0 && (frame % 250) == 0) { - rcs.check_faults(); + if (m.rcs->get_no_controllers() > 0 && (m.framecount % 250) == 0) { + m.rcs->check_faults(); } } if (framesize == 0) { @@ -788,24 +858,17 @@ int main(int argc, char* argv[]) fprintf(stderr, "Input read error.\n"); } running = 0; + ret = MOD_NORMAL_END; } + } catch (std::overflow_error& e) { + // The ZeroMQ input has overflowed its buffer + fprintf(stderr, "overflow error: %s\n", e.what()); + ret = MOD_AGAIN; } catch (std::exception& e) { fprintf(stderr, "EXCEPTION: %s\n", e.what()); - ret = -1; + ret = MOD_FAILURE; } - //////////////////////////////////////////////////////////////////////// - // Cleaning things - //////////////////////////////////////////////////////////////////////// - fprintf(stderr, "\n\n"); - fprintf(stderr, "%lu DAB frames encoded\n", frame); - fprintf(stderr, "%f seconds encoded\n", (float)frame * 0.024f); - - // Cif - fprintf(stderr, "\nCleaning buffers...\n"); - - logger.level(info) << "Terminating"; - return ret; } diff --git a/src/InputReader.h b/src/InputReader.h index 3e3e000..ee7d657 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -138,6 +138,8 @@ struct InputZeroMQThreadData ThreadsafeQueue *in_messages; std::string uri; unsigned max_queued_frames; + + bool running; }; class InputZeroMQWorker diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 01d8720..5fab447 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -80,6 +80,10 @@ int InputZeroMQReader::GetNextFrame(void* buffer) uint8_t* incoming; in_messages_.wait_and_pop(incoming); + if (! workerdata_.running) { + throw std::overflow_error("InputZeroMQ worker dead"); + } + memcpy(buffer, incoming, framesize); delete incoming; @@ -174,6 +178,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) fprintf(stderr, "ZeroMQ buffer overfull !\n"); buffer_full = true; + throw std::runtime_error("ZMQ input full"); } queue_size = workerdata->in_messages->size(); @@ -195,15 +200,21 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) catch (zmq::error_t& err) { fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what()); } + catch (std::exception& err) { + } fprintf(stderr, "ZeroMQ input worker terminated\n"); subscriber.close(); + + workerdata->running = false; + workerdata->in_messages->notify(); } void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata) { running = true; + workerdata->running = true; recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata); } -- cgit v1.2.3 From 87b708657a78cac4690ba81967e5ca03d7faab09 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 22 Feb 2015 21:39:56 +0100 Subject: Restart full modulator on ZeroMQ input overrun --- src/DabMod.cpp | 82 +++++++++++++++++++++++++---------------------- src/InputReader.h | 1 + src/InputZeroMQReader.cpp | 9 +++++- src/OutputUHD.cpp | 4 ++- 4 files changed, 56 insertions(+), 40 deletions(-) (limited to 'src/InputZeroMQReader.cpp') diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 8178a75..3548f9d 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -172,7 +172,7 @@ int main(int argc, char* argv[]) Logger logger; InputFileReader inputFileReader(logger); #if defined(HAVE_ZEROMQ) - InputZeroMQReader inputZeroMQReader(logger); + shared_ptr inputZeroMQReader(new InputZeroMQReader(logger)); #endif struct sigaction sa; @@ -707,14 +707,8 @@ int main(int argc, char* argv[]) ret = -1; throw std::runtime_error("Unable to open input"); #else - // The URL might start with zmq+tcp:// - if (inputName.substr(0, 4) == "zmq+") { - inputZeroMQReader.Open(inputName.substr(4), inputMaxFramesQueued); - } - else { - inputZeroMQReader.Open(inputName, inputMaxFramesQueued); - } - m.inputReader = &inputZeroMQReader; + inputZeroMQReader->Open(inputName, inputMaxFramesQueued); + m.inputReader = inputZeroMQReader.get(); #endif } else @@ -724,38 +718,39 @@ int main(int argc, char* argv[]) throw std::runtime_error("Unable to open input"); } - while (run_again) { - Flowgraph flowgraph; - - if (useFileOutput) { - if (fileOutputFormat == "complexf") { - output = shared_ptr(new OutputFile(outputName)); - } - else if (fileOutputFormat == "s8") { - // We must normalise the samples to the interval [-127.0; 127.0] - normalise = 127.0f / normalise_factor; + if (useFileOutput) { + if (fileOutputFormat == "complexf") { + output = shared_ptr(new OutputFile(outputName)); + } + else if (fileOutputFormat == "s8") { + // We must normalise the samples to the interval [-127.0; 127.0] + normalise = 127.0f / normalise_factor; - format_converter = shared_ptr(new FormatConverter()); + format_converter = shared_ptr(new FormatConverter()); - output = shared_ptr(new OutputFile(outputName)); - } + output = shared_ptr(new OutputFile(outputName)); } + } #if defined(HAVE_OUTPUT_UHD) - else if (useUHDOutput) { - normalise = 1.0f / normalise_factor; - outputuhd_conf.sampleRate = outputRate; - output = shared_ptr(new OutputUHD(outputuhd_conf, logger)); - ((OutputUHD*)output.get())->enrol_at(rcs); - } + else if (useUHDOutput) { + normalise = 1.0f / normalise_factor; + outputuhd_conf.sampleRate = outputRate; + output = shared_ptr(new OutputUHD(outputuhd_conf, logger)); + ((OutputUHD*)output.get())->enrol_at(rcs); + } #endif #if defined(HAVE_ZEROMQ) - else if (useZeroMQOutput) { - /* We normalise the same way as for the UHD output */ - normalise = 1.0f / normalise_factor; - output = shared_ptr(new OutputZeroMQ(outputName)); - } + else if (useZeroMQOutput) { + /* We normalise the same way as for the UHD output */ + normalise = 1.0f / normalise_factor; + output = shared_ptr(new OutputZeroMQ(outputName)); + } #endif + + while (run_again) { + Flowgraph flowgraph; + m.flowgraph = &flowgraph; m.data.setLength(6144); @@ -789,16 +784,27 @@ int main(int argc, char* argv[]) run_again = false; ret = 1; break; +#if defined(HAVE_ZEROMQ) + case MOD_AGAIN: + fprintf(stderr, "\nRestart modulator\n"); + running = true; + if (inputTransport == "zeromq") { + run_again = true; + + // Create a new input reader + inputZeroMQReader = shared_ptr( + new InputZeroMQReader(logger)); + inputZeroMQReader->Open(inputName, inputMaxFramesQueued); + m.inputReader = inputZeroMQReader.get(); + } + break; +#endif case MOD_NORMAL_END: + default: fprintf(stderr, "\nModulator stopped.\n"); ret = 0; run_again = false; break; - case MOD_AGAIN: - fprintf(stderr, "\nRestart modulator\n"); - run_again = true; - running = true; - break; } fprintf(stderr, "\n\n"); diff --git a/src/InputReader.h b/src/InputReader.h index ee7d657..e45e36d 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -175,6 +175,7 @@ class InputZeroMQReader : public InputReader logger_(logger), in_messages_(10) { workerdata_.in_messages = &in_messages_; + workerdata_.running = false; } ~InputZeroMQReader() diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 5fab447..7ac7d41 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -64,7 +64,14 @@ struct zmq_dab_message_t int InputZeroMQReader::Open(const std::string& uri, unsigned max_queued_frames) { - uri_ = uri; + // The URL might start with zmq+tcp:// + if (uri.substr(0, 4) == "zmq+") { + uri_ = uri.substr(4); + } + else { + uri_ = uri; + } + workerdata_.uri = uri; workerdata_.max_queued_frames = max_queued_frames; // launch receiver thread diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index d033700..efdf6df 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -496,7 +496,7 @@ void UHDWorker::process() md.time_spec = uhd::time_spec_t(tx_second, pps_offset); // md is defined, let's do some checks - if (md.time_spec.get_real_secs() + 0.2 < usrp_time) { + if (md.time_spec.get_real_secs() + timeout < usrp_time) { uwd->logger->level(warn) << "OutputUHD: Timestamp in the past! offset: " << md.time_spec.get_real_secs() - usrp_time << @@ -507,12 +507,14 @@ void UHDWorker::process() goto loopend; //skip the frame } +#if 0 // Let uhd handle this if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_MARGIN_FUTURE) { uwd->logger->level(warn) << "OutputUHD: Timestamp too far in the future! offset: " << md.time_spec.get_real_secs() - usrp_time; usleep(20000); //sleep so as to fill buffers } +#endif if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_ABORT_FUTURE) { uwd->logger->level(error) << -- cgit v1.2.3 From 1e2a6c6e367849335fd82efb4afc827fd4fe82a6 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 27 Feb 2015 14:16:58 +0100 Subject: Create custom exception for ZMQ overflow --- src/DabMod.cpp | 18 +++++++++--------- src/InputReader.h | 8 ++++++++ src/InputZeroMQReader.cpp | 2 +- 3 files changed, 18 insertions(+), 10 deletions(-) (limited to 'src/InputZeroMQReader.cpp') diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 2489797..6cf1b96 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -107,7 +107,7 @@ enum run_modulator_state { MOD_AGAIN }; -run_modulator_state run_modulator(modulator_data& m); +run_modulator_state run_modulator(Logger& logger, modulator_data& m); int main(int argc, char* argv[]) { @@ -776,7 +776,7 @@ int main(int argc, char* argv[]) m.inputReader->PrintInfo(); - run_modulator_state st = run_modulator(m); + run_modulator_state st = run_modulator(logger, m); switch (st) { case MOD_FAILURE: @@ -824,7 +824,7 @@ int main(int argc, char* argv[]) return ret; } -run_modulator_state run_modulator(modulator_data& m) +run_modulator_state run_modulator(Logger& logger, modulator_data& m) { run_modulator_state ret = MOD_FAILURE; try { @@ -858,24 +858,24 @@ run_modulator_state run_modulator(modulator_data& m) } } if (framesize == 0) { - fprintf(stderr, "End of file reached.\n"); + logger.level(info) << "End of file reached."; } else { - fprintf(stderr, "Input read error.\n"); + logger.level(error) << "Input read error."; } running = 0; ret = MOD_NORMAL_END; } } catch (fct_discontinuity_error& e) { // The OutputUHD saw a FCT discontinuity - fprintf(stderr, "Stream discontinuity\n"); + logger.level(warn) << e.what(); ret = MOD_AGAIN; - } catch (std::overflow_error& e) { + } catch (zmq_input_overflow& e) { // The ZeroMQ input has overflowed its buffer - fprintf(stderr, "overflow error: %s\n", e.what()); + logger.level(warn) << e.what(); ret = MOD_AGAIN; } catch (std::exception& e) { - fprintf(stderr, "EXCEPTION: %s\n", e.what()); + logger.level(error) << "Exception caught: " << e.what(); ret = MOD_FAILURE; } diff --git a/src/InputReader.h b/src/InputReader.h index e45e36d..dcf88cc 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -130,6 +130,14 @@ class InputFileReader : public InputReader // after 2**32 * 24ms ~= 3.3 years }; +struct zmq_input_overflow : public std::exception +{ + const char* what () const throw () + { + return "InputZMQ buffer overflow"; + } +}; + #if defined(HAVE_ZEROMQ) /* A ZeroMQ input. See www.zeromq.org for more info */ diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 7ac7d41..51909c2 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -88,7 +88,7 @@ int InputZeroMQReader::GetNextFrame(void* buffer) in_messages_.wait_and_pop(incoming); if (! workerdata_.running) { - throw std::overflow_error("InputZeroMQ worker dead"); + throw zmq_input_overflow(); } memcpy(buffer, incoming, framesize); -- cgit v1.2.3 From c126ec3bfc44ab62017e7a75a1a1f49855f46f9a Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 10 Apr 2015 11:19:54 +0200 Subject: 'underfull' sounds wrong --- src/InputZeroMQReader.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/InputZeroMQReader.cpp') diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 51909c2..f8c15c4 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -199,7 +199,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) } if (queue_size < 5) { - fprintf(stderr, "ZeroMQ buffer underfull: %zu elements !\n", + fprintf(stderr, "ZeroMQ buffer low: %zu elements !\n", queue_size); } } -- cgit v1.2.3