From 1ada0901a8fa687576fa4953044fd43bc6c06f8a Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 22 Feb 2015 17:56:49 +0100 Subject: Move main flowgraph to distinct function --- src/DabMod.cpp | 193 ++++++++++++++++++++++++++++++---------------- src/InputReader.h | 2 + src/InputZeroMQReader.cpp | 11 +++ 3 files changed, 141 insertions(+), 65 deletions(-) diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 1f6eedf..8178a75 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -85,6 +85,30 @@ void signalHandler(int signalNb) running = 0; } +struct modulator_data +{ + modulator_data() : + inputReader(NULL), + framecount(0), + flowgraph(NULL), + rcs(NULL) {} + + InputReader* inputReader; + Buffer data; + uint64_t framecount; + + Flowgraph* flowgraph; + RemoteControllers* rcs; +}; + +enum run_modulator_state { + MOD_FAILURE, + MOD_NORMAL_END, + MOD_AGAIN +}; + +run_modulator_state run_modulator(modulator_data& m); + int main(int argc, char* argv[]) { int ret = 0; @@ -99,14 +123,12 @@ int main(int argc, char* argv[]) std::string fileOutputFormat = "complexf"; int useUHDOutput = 0; - uint64_t frame = 0; size_t outputRate = 2048000; size_t clockRate = 0; unsigned dabMode = 0; float digitalgain = 1.0f; float normalise = 1.0f; GainMode gainMode = GAIN_VAR; - Buffer data; /* UHD requires the input I and Q samples to be in the interval @@ -130,6 +152,8 @@ int main(int argc, char* argv[]) OutputUHDConfig outputuhd_conf; #endif + modulator_data m; + // To handle the timestamp offset of the modulator struct modulator_offset_config modconf; modconf.use_offset_file = false; @@ -141,13 +165,15 @@ int main(int argc, char* argv[]) shared_ptr output; RemoteControllers rcs; + m.rcs = &rcs; + + bool run_again = true; Logger logger; InputFileReader inputFileReader(logger); #if defined(HAVE_ZEROMQ) InputZeroMQReader inputZeroMQReader(logger); #endif - InputReader* inputReader; struct sigaction sa; memset(&sa, 0, sizeof(struct sigaction)); @@ -673,7 +699,7 @@ int main(int argc, char* argv[]) throw std::runtime_error("Unable to open input"); } - inputReader = &inputFileReader; + m.inputReader = &inputFileReader; } else if (inputTransport == "zeromq") { #if !defined(HAVE_ZEROMQ) @@ -688,7 +714,7 @@ int main(int argc, char* argv[]) else { inputZeroMQReader.Open(inputName, inputMaxFramesQueued); } - inputReader = &inputZeroMQReader; + m.inputReader = &inputZeroMQReader; #endif } else @@ -698,59 +724,103 @@ int main(int argc, char* argv[]) throw std::runtime_error("Unable to open input"); } - if (useFileOutput) { - if (fileOutputFormat == "complexf") { - output = shared_ptr(new OutputFile(outputName)); - } - else if (fileOutputFormat == "s8") { - // We must normalise the samples to the interval [-127.0; 127.0] - normalise = 127.0f / normalise_factor; + while (run_again) { + Flowgraph flowgraph; - format_converter = shared_ptr(new FormatConverter()); + if (useFileOutput) { + if (fileOutputFormat == "complexf") { + output = shared_ptr(new OutputFile(outputName)); + } + else if (fileOutputFormat == "s8") { + // We must normalise the samples to the interval [-127.0; 127.0] + normalise = 127.0f / normalise_factor; - output = shared_ptr(new OutputFile(outputName)); + format_converter = shared_ptr(new FormatConverter()); + + output = shared_ptr(new OutputFile(outputName)); + } } - } #if defined(HAVE_OUTPUT_UHD) - else if (useUHDOutput) { - - normalise = 1.0f / normalise_factor; - - outputuhd_conf.sampleRate = outputRate; - output = shared_ptr(new OutputUHD(outputuhd_conf, logger)); - ((OutputUHD*)output.get())->enrol_at(rcs); - } + else if (useUHDOutput) { + normalise = 1.0f / normalise_factor; + outputuhd_conf.sampleRate = outputRate; + output = shared_ptr(new OutputUHD(outputuhd_conf, logger)); + ((OutputUHD*)output.get())->enrol_at(rcs); + } #endif #if defined(HAVE_ZEROMQ) - else if (useZeroMQOutput) { - /* We normalise the same way as for the UHD output */ - normalise = 1.0f / normalise_factor; - - output = shared_ptr(new OutputZeroMQ(outputName)); - } + else if (useZeroMQOutput) { + /* We normalise the same way as for the UHD output */ + normalise = 1.0f / normalise_factor; + output = shared_ptr(new OutputZeroMQ(outputName)); + } #endif - data.setLength(6144); - shared_ptr input(new InputMemory(&data)); - shared_ptr modulator(new DabModulator(modconf, &rcs, logger, outputRate, clockRate, - dabMode, gainMode, digitalgain, normalise, filterTapsFilename)); - flowgraph->connect(input, modulator); - if (format_converter) { - flowgraph->connect(modulator, format_converter); - flowgraph->connect(format_converter, output); - } - else { - flowgraph->connect(modulator, output); - } + m.flowgraph = &flowgraph; + m.data.setLength(6144); + + shared_ptr input(new InputMemory(&m.data)); + shared_ptr modulator( + new DabModulator(modconf, &rcs, logger, outputRate, clockRate, + dabMode, gainMode, digitalgain, normalise, filterTapsFilename)); + + flowgraph.connect(input, modulator); + if (format_converter) { + flowgraph.connect(modulator, format_converter); + flowgraph.connect(format_converter, output); + } + else { + flowgraph.connect(modulator, output); + } #if defined(HAVE_OUTPUT_UHD) - if (useUHDOutput) { - ((OutputUHD*)output.get())->setETIReader(modulator->getEtiReader()); - } + if (useUHDOutput) { + ((OutputUHD*)output.get())->setETIReader(modulator->getEtiReader()); + } #endif - inputReader->PrintInfo(); + m.inputReader->PrintInfo(); + + run_modulator_state st = run_modulator(m); + + switch (st) { + case MOD_FAILURE: + fprintf(stderr, "\nModulator failure.\n"); + run_again = false; + ret = 1; + break; + case MOD_NORMAL_END: + fprintf(stderr, "\nModulator stopped.\n"); + ret = 0; + run_again = false; + break; + case MOD_AGAIN: + fprintf(stderr, "\nRestart modulator\n"); + run_again = true; + running = true; + break; + } + fprintf(stderr, "\n\n"); + fprintf(stderr, "%lu DAB frames encoded\n", m.framecount); + fprintf(stderr, "%f seconds encoded\n", (float)m.framecount * 0.024f); + + fprintf(stderr, "\nCleaning flowgraph...\n"); + + m.data.setLength(0); + } + + //////////////////////////////////////////////////////////////////////// + // Cleaning things + //////////////////////////////////////////////////////////////////////// + + logger.level(info) << "Terminating"; + return ret; +} + +run_modulator_state run_modulator(modulator_data& m) +{ + run_modulator_state ret = MOD_FAILURE; try { while (running) { @@ -759,26 +829,26 @@ int main(int argc, char* argv[]) PDEBUG("*****************************************\n"); PDEBUG("* Starting main loop\n"); PDEBUG("*****************************************\n"); - while ((framesize = inputReader->GetNextFrame(data.getData())) > 0) { + while ((framesize = m.inputReader->GetNextFrame(m.data.getData())) > 0) { if (!running) { break; } - frame++; + m.framecount++; PDEBUG("*****************************************\n"); - PDEBUG("* Read frame %lu\n", frame); + PDEBUG("* Read frame %lu\n", m.framecount); PDEBUG("*****************************************\n"); //////////////////////////////////////////////////////////////// - // Proccessing data + // Processing data //////////////////////////////////////////////////////////////// - flowgraph->run(); + m.flowgraph->run(); /* Check every once in a while if the remote control * is still working */ - if (rcs.get_no_controllers() > 0 && (frame % 250) == 0) { - rcs.check_faults(); + if (m.rcs->get_no_controllers() > 0 && (m.framecount % 250) == 0) { + m.rcs->check_faults(); } } if (framesize == 0) { @@ -788,24 +858,17 @@ int main(int argc, char* argv[]) fprintf(stderr, "Input read error.\n"); } running = 0; + ret = MOD_NORMAL_END; } + } catch (std::overflow_error& e) { + // The ZeroMQ input has overflowed its buffer + fprintf(stderr, "overflow error: %s\n", e.what()); + ret = MOD_AGAIN; } catch (std::exception& e) { fprintf(stderr, "EXCEPTION: %s\n", e.what()); - ret = -1; + ret = MOD_FAILURE; } - //////////////////////////////////////////////////////////////////////// - // Cleaning things - //////////////////////////////////////////////////////////////////////// - fprintf(stderr, "\n\n"); - fprintf(stderr, "%lu DAB frames encoded\n", frame); - fprintf(stderr, "%f seconds encoded\n", (float)frame * 0.024f); - - // Cif - fprintf(stderr, "\nCleaning buffers...\n"); - - logger.level(info) << "Terminating"; - return ret; } diff --git a/src/InputReader.h b/src/InputReader.h index 3e3e000..ee7d657 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -138,6 +138,8 @@ struct InputZeroMQThreadData ThreadsafeQueue *in_messages; std::string uri; unsigned max_queued_frames; + + bool running; }; class InputZeroMQWorker diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 01d8720..5fab447 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -80,6 +80,10 @@ int InputZeroMQReader::GetNextFrame(void* buffer) uint8_t* incoming; in_messages_.wait_and_pop(incoming); + if (! workerdata_.running) { + throw std::overflow_error("InputZeroMQ worker dead"); + } + memcpy(buffer, incoming, framesize); delete incoming; @@ -174,6 +178,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) fprintf(stderr, "ZeroMQ buffer overfull !\n"); buffer_full = true; + throw std::runtime_error("ZMQ input full"); } queue_size = workerdata->in_messages->size(); @@ -195,15 +200,21 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) catch (zmq::error_t& err) { fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what()); } + catch (std::exception& err) { + } fprintf(stderr, "ZeroMQ input worker terminated\n"); subscriber.close(); + + workerdata->running = false; + workerdata->in_messages->notify(); } void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata) { running = true; + workerdata->running = true; recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata); } -- cgit v1.2.3