From c90e8a9f37444653ca0ae419f6d24b288e393dc6 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 16 Jan 2018 11:51:20 +0100 Subject: Let ZeroMQ input timeout, so that SIGINT works too --- src/DabMod.cpp | 42 ++++++++++++++++++++++++------------------ src/InputZeroMQReader.cpp | 26 +++++++++++++++++++++++--- 2 files changed, 47 insertions(+), 21 deletions(-) diff --git a/src/DabMod.cpp b/src/DabMod.cpp index ce2f249..8a0ee03 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -92,18 +92,12 @@ void signalHandler(int signalNb) struct modulator_data { - modulator_data() : - inputReader(nullptr), - framecount(0), - flowgraph(nullptr), - etiReader(nullptr) {} - - InputReader* inputReader; + std::shared_ptr inputReader; Buffer data; - uint64_t framecount; + uint64_t framecount = 0; - Flowgraph* flowgraph; - EtiReader* etiReader; + Flowgraph* flowgraph = nullptr; + EtiReader* etiReader = nullptr; }; enum class run_modulator_state_t { @@ -113,7 +107,7 @@ enum class run_modulator_state_t { reconfigure // Some sort of change of configuration we cannot handle happened }; -run_modulator_state_t run_modulator(modulator_data& m); +static run_modulator_state_t run_modulator(modulator_data& m); static void printModSettings(const mod_settings_t& mod_settings) { @@ -271,8 +265,6 @@ int launch_modulator(int argc, char* argv[]) printModSettings(mod_settings); - modulator_data m; - shared_ptr format_converter; if (mod_settings.useFileOutput and (mod_settings.fileOutputFormat == "s8" or @@ -379,7 +371,8 @@ int launch_modulator(int argc, char* argv[]) while (run_again) { Flowgraph flowgraph; - m.inputReader = inputReader.get(); + modulator_data m; + m.inputReader = inputReader; m.flowgraph = &flowgraph; m.data.setLength(6144); @@ -461,7 +454,7 @@ int launch_modulator(int argc, char* argv[]) return ret; } -run_modulator_state_t run_modulator(modulator_data& m) +static run_modulator_state_t run_modulator(modulator_data& m) { auto ret = run_modulator_state_t::failure; try { @@ -498,13 +491,26 @@ run_modulator_state_t run_modulator(modulator_data& m) } } if (framesize == 0) { - etiLog.level(info) << "End of file reached."; + if (dynamic_pointer_cast(m.inputReader)) { + etiLog.level(info) << "End of file reached."; + running = 0; + ret = run_modulator_state_t::normal_end; + } +#if defined(HAVE_ZEROMQ) + else if (dynamic_pointer_cast(m.inputReader)) { + /* An empty frame marks a timeout. We ignore it, but we are + * now able to handle SIGINT properly. + */ + } +#endif // defined(HAVE_ZEROMQ) + // No need to handle the TCP input in a special way to get SIGINT working, + // because recv() will return with EINTR. } else { etiLog.level(error) << "Input read error."; + running = 0; + ret = run_modulator_state_t::normal_end; } - running = 0; - ret = run_modulator_state_t::normal_end; } } catch (const zmq_input_overflow& e) { diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index aa342d5..f6a816a 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -96,8 +96,6 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames) int InputZeroMQReader::GetNextFrame(void* buffer) { - const size_t framesize = 6144; - if (not m_running) { return 0; } @@ -123,7 +121,17 @@ int InputZeroMQReader::GetNextFrame(void* buffer) throw zmq_input_overflow(); } - memcpy(buffer, &incoming->front(), framesize); + + const size_t framesize = 6144; + if (incoming->empty()) { + return 0; + } + else if (incoming->size() == framesize) { + memcpy(buffer, &incoming->front(), framesize); + } + else { + throw logic_error("ZMQ ETI not 6144"); + } return framesize; } @@ -170,6 +178,18 @@ void InputZeroMQReader::RecvProcess() if (success) try { while (m_running) { zmq::message_t incoming; + zmq::pollitem_t items[1]; + items[0].socket = subscriber; + items[0].events = ZMQ_POLLIN; + const int zmq_timeout_ms = 100; + const int num_events = zmq::poll(items, 1, zmq_timeout_ms); + if (num_events == 0) { + // timeout is signalled by an empty buffer + auto buf = make_shared >(); + m_in_messages.push(buf); + continue; + } + subscriber.recv(&incoming); if (m_to_drop) { -- cgit v1.2.3