diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-02-27 10:30:57 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-02-27 10:30:57 +0100 |
commit | 6db4e5b3ad28973601b8e7426a4cffa01322b9a2 (patch) | |
tree | 20ad9a5dc6a18dd513f4a254f7a86ddd2eba52f3 /src | |
parent | 201d711a1d3dfbe46d622871731005937598e790 (diff) | |
parent | c3dbbec39aef32789aacb872c88801f0c5d15ef7 (diff) | |
download | dabmod-6db4e5b3ad28973601b8e7426a4cffa01322b9a2.tar.gz dabmod-6db4e5b3ad28973601b8e7426a4cffa01322b9a2.tar.bz2 dabmod-6db4e5b3ad28973601b8e7426a4cffa01322b9a2.zip |
Merge branch 'next' into lime
Diffstat (limited to 'src')
-rw-r--r-- | src/ConfigParser.cpp | 8 | ||||
-rw-r--r-- | src/DabMod.cpp | 6 | ||||
-rw-r--r-- | src/DabModulator.cpp | 2 | ||||
-rw-r--r-- | src/FIRFilter.cpp | 2 | ||||
-rw-r--r-- | src/InputReader.h | 10 | ||||
-rw-r--r-- | src/InputZeroMQReader.cpp | 93 | ||||
-rw-r--r-- | src/Log.cpp | 5 | ||||
-rw-r--r-- | src/MemlessPoly.cpp | 4 | ||||
-rw-r--r-- | src/output/UHD.cpp | 4 |
9 files changed, 62 insertions, 72 deletions
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 107ee45..296ecdb 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -85,7 +85,7 @@ static void parse_configfile( auto telnetrc = make_shared<RemoteControllerTelnet>(telnetport); rcs.add_controller(telnetrc); } - catch (std::exception &e) { + catch (const std::exception &e) { std::cerr << "Error: " << e.what() << "\n"; std::cerr << " telnet remote control enabled, but no telnetport defined.\n"; throw std::runtime_error("Configuration error"); @@ -98,7 +98,7 @@ static void parse_configfile( auto zmqrc = make_shared<RemoteControllerZmq>(zmqCtrlEndpoint); rcs.add_controller(zmqrc); } - catch (std::exception &e) { + catch (const std::exception &e) { std::cerr << "Error: " << e.what() << "\n"; std::cerr << " zmq remote control enabled, but no endpoint defined.\n"; throw std::runtime_error("Configuration error"); @@ -129,7 +129,7 @@ static void parse_configfile( try { logfilename = pt.Get("log.filename", ""); } - catch (std::exception &e) { + catch (const std::exception &e) { std::cerr << "Error: " << e.what() << "\n"; std::cerr << " Configuration enables file log, but does not specify log filename\n"; throw std::runtime_error("Configuration error"); @@ -365,7 +365,7 @@ static void parse_configfile( try { mod_settings.tist_offset_s = pt.GetReal("delaymanagement.offset", 0.0); } - catch (std::exception &e) { + catch (const std::exception &e) { std::cerr << "Error: delaymanagement: synchronous is enabled, but no offset defined!\n"; throw std::runtime_error("Configuration error"); } diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 3806048..8267060 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -354,7 +354,7 @@ int launch_modulator(int argc, char* argv[]) try { ediUdpInput.rxPacket(); } - catch (std::runtime_error& e) { + catch (const std::runtime_error& e) { etiLog.level(warn) << "EDI input: " << e.what(); running = 0; break; @@ -626,13 +626,13 @@ int main(int argc, char* argv[]) try { return launch_modulator(argc, argv); } - catch (std::invalid_argument& e) { + catch (const std::invalid_argument& e) { std::string what(e.what()); if (not what.empty()) { std::cerr << "Modulator error: " << what << std::endl; } } - catch (std::runtime_error& e) { + catch (const std::runtime_error& e) { std::cerr << "Modulator runtime error: " << e.what() << std::endl; } diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp index 666745d..7e3ccf0 100644 --- a/src/DabModulator.cpp +++ b/src/DabModulator.cpp @@ -183,7 +183,7 @@ int DabModulator::process(Buffer* dataOut) rcs.enrol(tii.get()); tiiRef = make_shared<PhaseReference>(mode); } - catch (TIIError& e) { + catch (const TIIError& e) { etiLog.level(error) << "Could not initialise TII: " << e.what(); } diff --git a/src/FIRFilter.cpp b/src/FIRFilter.cpp index 8b6fe58..89cf0da 100644 --- a/src/FIRFilter.cpp +++ b/src/FIRFilter.cpp @@ -318,7 +318,7 @@ void FIRFilter::set_parameter(const string& parameter, const string& value) load_filter_taps(value); m_taps_file = value; } - catch (std::runtime_error &e) { + catch (const std::runtime_error &e) { throw ParameterError(e.what()); } } diff --git a/src/InputReader.h b/src/InputReader.h index 98eab2b..63451e5 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -184,7 +184,15 @@ class InputZeroMQReader : public InputReader, public RemoteControllable std::atomic<bool> m_running = ATOMIC_VAR_INIT(false); std::string m_uri; size_t m_max_queued_frames = 0; - ThreadsafeQueue<std::vector<uint8_t> > m_in_messages; + + // Either must contain a full ETI frame, or one flag must be set + struct message_t { + std::vector<uint8_t> eti_frame; + bool overflow = false; + bool timeout = false; + bool fault = false; + }; + ThreadsafeQueue<message_t> m_in_messages; mutable std::mutex m_last_in_messages_size_mutex; size_t m_last_in_messages_size = 0; diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index f6c3c34..3661748 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -78,7 +78,7 @@ InputZeroMQReader::~InputZeroMQReader() m_running = false; m_zmqcontext.close(); if (m_recv_thread.joinable()) { - m_recv_thread.join(); + m_recv_thread.join(); } } @@ -94,6 +94,7 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames) m_max_queued_frames = max_queued_frames; + m_running = true; m_recv_thread = std::thread(&InputZeroMQReader::RecvProcess, this); return 0; @@ -102,10 +103,10 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames) int InputZeroMQReader::GetNextFrame(void* buffer) { if (not m_running) { - return 0; + throw runtime_error("ZMQ input is not ready yet"); } - vector<uint8_t> incoming; + message_t incoming; /* Do some prebuffering because reads will happen in bursts * (4 ETI frames in TM1) and we should make sure that @@ -122,27 +123,29 @@ int InputZeroMQReader::GetNextFrame(void* buffer) } etiLog.log(trace, "ZMQ,pop"); - if (not m_running) { - throw zmq_input_overflow(); - } - + constexpr size_t framesize = 6144; - const size_t framesize = 6144; - if (incoming.empty()) { + if (incoming.timeout) { return 0; } - else if (incoming.size() == framesize) { + else if (incoming.fault) { + throw runtime_error("ZMQ input has terminated"); + } + else if (incoming.overflow) { + throw zmq_input_overflow(); + } + else if (incoming.eti_frame.size() == framesize) { unique_lock<mutex> lock(m_last_in_messages_size_mutex); m_last_in_messages_size--; lock.unlock(); - memcpy(buffer, &incoming.front(), framesize); + memcpy(buffer, &incoming.eti_frame.front(), framesize); + + return framesize; } else { throw logic_error("ZMQ ETI not 6144"); } - - return framesize; } std::string InputZeroMQReader::GetPrintableInfo() const @@ -153,10 +156,8 @@ std::string InputZeroMQReader::GetPrintableInfo() const void InputZeroMQReader::RecvProcess() { set_thread_name("zmqinput"); - m_running = true; size_t queue_size = 0; - bool buffer_full = false; zmq::socket_t subscriber(m_zmqcontext, ZMQ_SUB); // zmq sockets are not thread safe. That's why @@ -167,7 +168,7 @@ void InputZeroMQReader::RecvProcess() try { subscriber.connect(m_uri.c_str()); } - catch (zmq::error_t& err) { + catch (const zmq::error_t& err) { etiLog.level(error) << "Failed to connect ZeroMQ socket to '" << m_uri << "': '" << err.what() << "'"; success = false; @@ -177,7 +178,7 @@ void InputZeroMQReader::RecvProcess() // subscribe to all messages subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); } - catch (zmq::error_t& err) { + catch (const zmq::error_t& err) { etiLog.level(error) << "Failed to subscribe ZeroMQ socket to messages: '" << err.what() << "'"; success = false; @@ -192,28 +193,15 @@ void InputZeroMQReader::RecvProcess() const int zmq_timeout_ms = 100; const int num_events = zmq::poll(items, 1, zmq_timeout_ms); if (num_events == 0) { - // timeout is signalled by an empty buffer - vector<uint8_t> buf; - m_in_messages.push(buf); + message_t msg; + msg.timeout = true; + m_in_messages.push(move(msg)); continue; } subscriber.recv(&incoming); - if (m_to_drop) { - queue_size = m_in_messages.size(); - if (queue_size > 4) { - m_in_messages.notify(); - } - m_to_drop--; - } - else if (queue_size < m_max_queued_frames) { - if (buffer_full) { - etiLog.level(info) << "ZeroMQ buffer recovered: " << - queue_size << " elements"; - buffer_full = false; - } - + if (queue_size < m_max_queued_frames) { if (incoming.size() < ZMQ_DAB_MESSAGE_T_HEADERSIZE) { throw runtime_error("ZeroMQ packet too small for header"); } @@ -251,7 +239,9 @@ void InputZeroMQReader::RecvProcess() offset += framesize; - queue_size = m_in_messages.push(move(buf)); + message_t msg; + msg.eti_frame = move(buf); + queue_size = m_in_messages.push(move(msg)); etiLog.log(trace, "ZMQ,push %zu", queue_size); unique_lock<mutex> lock(m_last_in_messages_size_mutex); @@ -261,23 +251,11 @@ void InputZeroMQReader::RecvProcess() } } else { - m_in_messages.notify(); - - if (!buffer_full) { - etiLog.level(warn) << "ZeroMQ buffer overfull !"; - - buffer_full = true; - throw runtime_error("ZMQ input full"); - } - - queue_size = m_in_messages.size(); - - /* Drop three more incoming ETI frames before - * we start accepting them again, to guarantee - * that we keep transmission frame vs. ETI frame - * phase. - */ - m_to_drop = 3; + message_t msg; + msg.overflow = true; + queue_size = m_in_messages.push(move(msg)); + etiLog.level(warn) << "ZeroMQ buffer overfull !"; + throw runtime_error("ZMQ input full"); } if (queue_size < 5) { @@ -285,19 +263,22 @@ void InputZeroMQReader::RecvProcess() } } } - catch (zmq::error_t& err) { + catch (const zmq::error_t& err) { etiLog.level(error) << "ZeroMQ error during receive: '" << err.what() << "'"; } - catch (std::exception& err) { + catch (const std::exception& err) { etiLog.level(error) << "Exception during receive: '" << err.what() << "'"; } + m_running = false; + etiLog.level(info) << "ZeroMQ input worker terminated"; subscriber.close(); - m_running = false; - m_in_messages.notify(); + message_t msg; + msg.fault = true; + queue_size = m_in_messages.push(move(msg)); } // ======================================= diff --git a/src/Log.cpp b/src/Log.cpp index 3c5b181..4fc7ae3 100644 --- a/src/Log.cpp +++ b/src/Log.cpp @@ -27,6 +27,7 @@ #include <list> #include <cstdarg> +#include <cinttypes> #include <chrono> #include "Log.h" @@ -173,7 +174,7 @@ LogTracer::LogTracer(const string& trace_filename) : name("TRACE") m_trace_micros_startup = duration_cast<microseconds>(now).count(); fprintf(m_trace_file.get(), - "0,TRACER,startup at %ld\n", m_trace_micros_startup); + "0,TRACER,startup at %" PRIu64 "\n", m_trace_micros_startup); } void LogTracer::log(log_level_t level, const std::string& message) @@ -183,7 +184,7 @@ void LogTracer::log(log_level_t level, const std::string& message) const auto now = steady_clock::now().time_since_epoch(); const auto micros = duration_cast<microseconds>(now).count(); - fprintf(m_trace_file.get(), "%ld,%s\n", + fprintf(m_trace_file.get(), "%" PRIu64 ",%s\n", micros - m_trace_micros_startup, message.c_str()); } diff --git a/src/MemlessPoly.cpp b/src/MemlessPoly.cpp index 07e4c1a..ef77c07 100644 --- a/src/MemlessPoly.cpp +++ b/src/MemlessPoly.cpp @@ -420,7 +420,7 @@ void MemlessPoly::set_parameter(const string& parameter, const string& value) load_coefficients(coefs_fstream); m_coefs_file = value; } - catch (std::runtime_error &e) { + catch (const std::runtime_error &e) { throw ParameterError(e.what()); } } @@ -434,7 +434,7 @@ void MemlessPoly::set_parameter(const string& parameter, const string& value) ofstream coefs_fstream(m_coefs_file); coefs_fstream << value; } - catch (std::runtime_error &e) { + catch (const std::runtime_error &e) { throw ParameterError(e.what()); } } diff --git a/src/output/UHD.cpp b/src/output/UHD.cpp index 9358072..c6c500b 100644 --- a/src/output/UHD.cpp +++ b/src/output/UHD.cpp @@ -432,7 +432,7 @@ bool UHD::is_clk_source_ok(void) const } } } - catch (uhd::lookup_error &e) { + catch (const uhd::lookup_error &e) { suppress_refclk_loss_check = true; etiLog.log(warn, "OutputUHD: This USRP does not have mboard " "sensor for ext clock loss. Check disabled."); @@ -456,7 +456,7 @@ double UHD::get_temperature(void) const try { return std::round(m_usrp->get_tx_sensor("temp", 0).to_real()); } - catch (uhd::lookup_error &e) { + catch (const uhd::lookup_error &e) { return std::numeric_limits<double>::quiet_NaN(); } } |