From f5820b347ea6920764023d6cf71f7a254bd7106d Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 16 Jan 2018 10:55:15 +0100 Subject: Do some InputReader cleanup --- src/InputZeroMQReader.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/InputZeroMQReader.cpp') diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 783f0f5..5d0e513 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -120,7 +120,7 @@ int InputZeroMQReader::GetNextFrame(void* buffer) return framesize; } -void InputZeroMQReader::PrintInfo() +void InputZeroMQReader::PrintInfo() const { fprintf(stderr, "Input ZeroMQ:\n"); fprintf(stderr, " Receiving from %s\n\n", uri_.c_str()); -- cgit v1.2.3 From 7db444f95310419382146d7f072670f2df855a5f Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 16 Jan 2018 11:19:26 +0100 Subject: Simplify InputZeroMQReader, remove worker class --- src/InputReader.h | 62 ++++++++++------------------------ src/InputZeroMQReader.cpp | 86 ++++++++++++++++++++++------------------------- 2 files changed, 58 insertions(+), 90 deletions(-) (limited to 'src/InputZeroMQReader.cpp') diff --git a/src/InputReader.h b/src/InputReader.h index c897c2d..07326cf 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -162,65 +162,37 @@ struct zmq_input_overflow : public std::exception #if defined(HAVE_ZEROMQ) /* A ZeroMQ input. See www.zeromq.org for more info */ -struct InputZeroMQThreadData -{ - ThreadsafeQueue > > *in_messages; - std::string uri; - size_t max_queued_frames; -}; - -class InputZeroMQWorker +class InputZeroMQReader : public InputReader { public: - void Start(struct InputZeroMQThreadData* workerdata); - void Stop(); - bool is_running(void) const { return running; } + InputZeroMQReader() = default; + InputZeroMQReader(const InputZeroMQReader& other) = delete; + InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete; + ~InputZeroMQReader(); + + int Open(const std::string& uri, size_t max_queued_frames); + int GetNextFrame(void* buffer); + void PrintInfo() const; private: - std::atomic running = ATOMIC_VAR_INIT(false); + std::atomic m_running = ATOMIC_VAR_INIT(false); + std::string m_uri; + size_t m_max_queued_frames = 0; + ThreadsafeQueue > > m_in_messages; - void RecvProcess(struct InputZeroMQThreadData* workerdata); + void RecvProcess(void); - zmq::context_t zmqcontext; // is thread-safe - boost::thread recv_thread; + zmq::context_t m_zmqcontext; // is thread-safe + boost::thread m_recv_thread; /* We must be careful to keep frame phase consistent. If we * drop a single ETI frame, we will break the transmission * frame vs. ETI frame phase. * - * Here we keep track of how many ETI frames we must drop + * Here we keep track of how many ETI frames we must drop. */ int m_to_drop = 0; }; -class InputZeroMQReader : public InputReader -{ - public: - InputZeroMQReader() - { - workerdata_.in_messages = &in_messages_; - } - - ~InputZeroMQReader() - { - worker_.Stop(); - } - - int Open(const std::string& uri, size_t max_queued_frames); - - int GetNextFrame(void* buffer); - - void PrintInfo() const; - - private: - InputZeroMQReader(const InputZeroMQReader& other) = delete; - InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete; - std::string uri_; - - InputZeroMQWorker worker_; - ThreadsafeQueue > > in_messages_; - struct InputZeroMQThreadData workerdata_; -}; - #endif diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 5d0e513..aa342d5 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2017 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -68,20 +68,28 @@ struct zmq_dab_message_t #define ZMQ_DAB_MESSAGE_T_HEADERSIZE \ (sizeof(uint32_t) + NUM_FRAMES_PER_ZMQ_MESSAGE*sizeof(uint16_t)) +InputZeroMQReader::~InputZeroMQReader() +{ + m_running = false; + m_zmqcontext.close(); + if (m_recv_thread.joinable()) { + m_recv_thread.join(); + } +} + int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames) { // The URL might start with zmq+tcp:// if (uri.substr(0, 4) == "zmq+") { - uri_ = uri.substr(4); + m_uri = uri.substr(4); } else { - uri_ = uri; + m_uri = uri; } - workerdata_.uri = uri_; - workerdata_.max_queued_frames = max_queued_frames; - // launch receiver thread - worker_.Start(&workerdata_); + m_max_queued_frames = max_queued_frames; + + m_recv_thread = boost::thread(&InputZeroMQReader::RecvProcess, this); return 0; } @@ -90,7 +98,7 @@ int InputZeroMQReader::GetNextFrame(void* buffer) { const size_t framesize = 6144; - if (not worker_.is_running()) { + if (not m_running) { return 0; } @@ -100,18 +108,18 @@ int InputZeroMQReader::GetNextFrame(void* buffer) * (4 ETI frames in TM1) and we should make sure that * we can serve the data required for a full transmission frame. */ - if (in_messages_.size() < 4) { + if (m_in_messages.size() < 4) { const size_t prebuffering = 10; etiLog.log(trace, "ZMQ,wait1"); - in_messages_.wait_and_pop(incoming, prebuffering); + m_in_messages.wait_and_pop(incoming, prebuffering); } else { etiLog.log(trace, "ZMQ,wait2"); - in_messages_.wait_and_pop(incoming); + m_in_messages.wait_and_pop(incoming); } etiLog.log(trace, "ZMQ,pop"); - if (not worker_.is_running()) { + if (not m_running) { throw zmq_input_overflow(); } @@ -123,54 +131,55 @@ int InputZeroMQReader::GetNextFrame(void* buffer) void InputZeroMQReader::PrintInfo() const { fprintf(stderr, "Input ZeroMQ:\n"); - fprintf(stderr, " Receiving from %s\n\n", uri_.c_str()); + fprintf(stderr, " Receiving from %s\n\n", m_uri.c_str()); } -// ------------- Worker functions - -void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) +void InputZeroMQReader::RecvProcess() { set_thread_name("zmqinput"); - size_t queue_size = 0; + m_running = true; + size_t queue_size = 0; bool buffer_full = false; - zmq::socket_t subscriber(zmqcontext, ZMQ_SUB); + zmq::socket_t subscriber(m_zmqcontext, ZMQ_SUB); // zmq sockets are not thread safe. That's why // we create it here, and not at object creation. bool success = true; try { - subscriber.connect(workerdata->uri.c_str()); + subscriber.connect(m_uri.c_str()); } catch (zmq::error_t& err) { - etiLog.level(error) << "Failed to connect ZeroMQ socket to '" << workerdata->uri << "': '" << err.what() << "'"; + etiLog.level(error) << "Failed to connect ZeroMQ socket to '" << + m_uri << "': '" << err.what() << "'"; success = false; } if (success) try { - subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages + // subscribe to all messages + subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); } catch (zmq::error_t& err) { - etiLog.level(error) << "Failed to subscribe ZeroMQ socket to messages: '" << err.what() << "'"; + etiLog.level(error) << "Failed to subscribe ZeroMQ socket to messages: '" << + err.what() << "'"; success = false; } if (success) try { - while (running) - { + while (m_running) { zmq::message_t incoming; subscriber.recv(&incoming); if (m_to_drop) { - queue_size = workerdata->in_messages->size(); + queue_size = m_in_messages.size(); if (queue_size > 4) { - workerdata->in_messages->notify(); + m_in_messages.notify(); } m_to_drop--; } - else if (queue_size < workerdata->max_queued_frames) { + else if (queue_size < m_max_queued_frames) { if (buffer_full) { etiLog.level(info) << "ZeroMQ buffer recovered: " << queue_size << " elements"; @@ -214,14 +223,14 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) offset += framesize; - queue_size = workerdata->in_messages->push(buf); + queue_size = m_in_messages.push(buf); etiLog.log(trace, "ZMQ,push %zu", queue_size); } } } } else { - workerdata->in_messages->notify(); + m_in_messages.notify(); if (!buffer_full) { etiLog.level(warn) << "ZeroMQ buffer overfull !"; @@ -230,7 +239,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) throw runtime_error("ZMQ input full"); } - queue_size = workerdata->in_messages->size(); + queue_size = m_in_messages.size(); /* Drop three more incoming ETI frames before * we start accepting them again, to guarantee @@ -256,21 +265,8 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) subscriber.close(); - running = false; - workerdata->in_messages->notify(); -} - -void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata) -{ - running = true; - recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata); -} - -void InputZeroMQWorker::Stop() -{ - running = false; - zmqcontext.close(); - recv_thread.join(); + m_running = false; + m_in_messages.notify(); } #endif -- cgit v1.2.3 From c90e8a9f37444653ca0ae419f6d24b288e393dc6 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 16 Jan 2018 11:51:20 +0100 Subject: Let ZeroMQ input timeout, so that SIGINT works too --- src/DabMod.cpp | 42 ++++++++++++++++++++++++------------------ src/InputZeroMQReader.cpp | 26 +++++++++++++++++++++++--- 2 files changed, 47 insertions(+), 21 deletions(-) (limited to 'src/InputZeroMQReader.cpp') diff --git a/src/DabMod.cpp b/src/DabMod.cpp index ce2f249..8a0ee03 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -92,18 +92,12 @@ void signalHandler(int signalNb) struct modulator_data { - modulator_data() : - inputReader(nullptr), - framecount(0), - flowgraph(nullptr), - etiReader(nullptr) {} - - InputReader* inputReader; + std::shared_ptr inputReader; Buffer data; - uint64_t framecount; + uint64_t framecount = 0; - Flowgraph* flowgraph; - EtiReader* etiReader; + Flowgraph* flowgraph = nullptr; + EtiReader* etiReader = nullptr; }; enum class run_modulator_state_t { @@ -113,7 +107,7 @@ enum class run_modulator_state_t { reconfigure // Some sort of change of configuration we cannot handle happened }; -run_modulator_state_t run_modulator(modulator_data& m); +static run_modulator_state_t run_modulator(modulator_data& m); static void printModSettings(const mod_settings_t& mod_settings) { @@ -271,8 +265,6 @@ int launch_modulator(int argc, char* argv[]) printModSettings(mod_settings); - modulator_data m; - shared_ptr format_converter; if (mod_settings.useFileOutput and (mod_settings.fileOutputFormat == "s8" or @@ -379,7 +371,8 @@ int launch_modulator(int argc, char* argv[]) while (run_again) { Flowgraph flowgraph; - m.inputReader = inputReader.get(); + modulator_data m; + m.inputReader = inputReader; m.flowgraph = &flowgraph; m.data.setLength(6144); @@ -461,7 +454,7 @@ int launch_modulator(int argc, char* argv[]) return ret; } -run_modulator_state_t run_modulator(modulator_data& m) +static run_modulator_state_t run_modulator(modulator_data& m) { auto ret = run_modulator_state_t::failure; try { @@ -498,13 +491,26 @@ run_modulator_state_t run_modulator(modulator_data& m) } } if (framesize == 0) { - etiLog.level(info) << "End of file reached."; + if (dynamic_pointer_cast(m.inputReader)) { + etiLog.level(info) << "End of file reached."; + running = 0; + ret = run_modulator_state_t::normal_end; + } +#if defined(HAVE_ZEROMQ) + else if (dynamic_pointer_cast(m.inputReader)) { + /* An empty frame marks a timeout. We ignore it, but we are + * now able to handle SIGINT properly. + */ + } +#endif // defined(HAVE_ZEROMQ) + // No need to handle the TCP input in a special way to get SIGINT working, + // because recv() will return with EINTR. } else { etiLog.level(error) << "Input read error."; + running = 0; + ret = run_modulator_state_t::normal_end; } - running = 0; - ret = run_modulator_state_t::normal_end; } } catch (const zmq_input_overflow& e) { diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index aa342d5..f6a816a 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -96,8 +96,6 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames) int InputZeroMQReader::GetNextFrame(void* buffer) { - const size_t framesize = 6144; - if (not m_running) { return 0; } @@ -123,7 +121,17 @@ int InputZeroMQReader::GetNextFrame(void* buffer) throw zmq_input_overflow(); } - memcpy(buffer, &incoming->front(), framesize); + + const size_t framesize = 6144; + if (incoming->empty()) { + return 0; + } + else if (incoming->size() == framesize) { + memcpy(buffer, &incoming->front(), framesize); + } + else { + throw logic_error("ZMQ ETI not 6144"); + } return framesize; } @@ -170,6 +178,18 @@ void InputZeroMQReader::RecvProcess() if (success) try { while (m_running) { zmq::message_t incoming; + zmq::pollitem_t items[1]; + items[0].socket = subscriber; + items[0].events = ZMQ_POLLIN; + const int zmq_timeout_ms = 100; + const int num_events = zmq::poll(items, 1, zmq_timeout_ms); + if (num_events == 0) { + // timeout is signalled by an empty buffer + auto buf = make_shared >(); + m_in_messages.push(buf); + continue; + } + subscriber.recv(&incoming); if (m_to_drop) { -- cgit v1.2.3