summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ConfigParser.cpp8
-rw-r--r--src/DabMod.cpp6
-rw-r--r--src/DabModulator.cpp2
-rw-r--r--src/FIRFilter.cpp2
-rw-r--r--src/InputReader.h10
-rw-r--r--src/InputZeroMQReader.cpp93
-rw-r--r--src/Log.cpp5
-rw-r--r--src/MemlessPoly.cpp4
-rw-r--r--src/output/UHD.cpp4
9 files changed, 62 insertions, 72 deletions
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp
index 107ee45..296ecdb 100644
--- a/src/ConfigParser.cpp
+++ b/src/ConfigParser.cpp
@@ -85,7 +85,7 @@ static void parse_configfile(
auto telnetrc = make_shared<RemoteControllerTelnet>(telnetport);
rcs.add_controller(telnetrc);
}
- catch (std::exception &e) {
+ catch (const std::exception &e) {
std::cerr << "Error: " << e.what() << "\n";
std::cerr << " telnet remote control enabled, but no telnetport defined.\n";
throw std::runtime_error("Configuration error");
@@ -98,7 +98,7 @@ static void parse_configfile(
auto zmqrc = make_shared<RemoteControllerZmq>(zmqCtrlEndpoint);
rcs.add_controller(zmqrc);
}
- catch (std::exception &e) {
+ catch (const std::exception &e) {
std::cerr << "Error: " << e.what() << "\n";
std::cerr << " zmq remote control enabled, but no endpoint defined.\n";
throw std::runtime_error("Configuration error");
@@ -129,7 +129,7 @@ static void parse_configfile(
try {
logfilename = pt.Get("log.filename", "");
}
- catch (std::exception &e) {
+ catch (const std::exception &e) {
std::cerr << "Error: " << e.what() << "\n";
std::cerr << " Configuration enables file log, but does not specify log filename\n";
throw std::runtime_error("Configuration error");
@@ -365,7 +365,7 @@ static void parse_configfile(
try {
mod_settings.tist_offset_s = pt.GetReal("delaymanagement.offset", 0.0);
}
- catch (std::exception &e) {
+ catch (const std::exception &e) {
std::cerr << "Error: delaymanagement: synchronous is enabled, but no offset defined!\n";
throw std::runtime_error("Configuration error");
}
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 3806048..8267060 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -354,7 +354,7 @@ int launch_modulator(int argc, char* argv[])
try {
ediUdpInput.rxPacket();
}
- catch (std::runtime_error& e) {
+ catch (const std::runtime_error& e) {
etiLog.level(warn) << "EDI input: " << e.what();
running = 0;
break;
@@ -626,13 +626,13 @@ int main(int argc, char* argv[])
try {
return launch_modulator(argc, argv);
}
- catch (std::invalid_argument& e) {
+ catch (const std::invalid_argument& e) {
std::string what(e.what());
if (not what.empty()) {
std::cerr << "Modulator error: " << what << std::endl;
}
}
- catch (std::runtime_error& e) {
+ catch (const std::runtime_error& e) {
std::cerr << "Modulator runtime error: " << e.what() << std::endl;
}
diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp
index 666745d..7e3ccf0 100644
--- a/src/DabModulator.cpp
+++ b/src/DabModulator.cpp
@@ -183,7 +183,7 @@ int DabModulator::process(Buffer* dataOut)
rcs.enrol(tii.get());
tiiRef = make_shared<PhaseReference>(mode);
}
- catch (TIIError& e) {
+ catch (const TIIError& e) {
etiLog.level(error) << "Could not initialise TII: " << e.what();
}
diff --git a/src/FIRFilter.cpp b/src/FIRFilter.cpp
index 8b6fe58..89cf0da 100644
--- a/src/FIRFilter.cpp
+++ b/src/FIRFilter.cpp
@@ -318,7 +318,7 @@ void FIRFilter::set_parameter(const string& parameter, const string& value)
load_filter_taps(value);
m_taps_file = value;
}
- catch (std::runtime_error &e) {
+ catch (const std::runtime_error &e) {
throw ParameterError(e.what());
}
}
diff --git a/src/InputReader.h b/src/InputReader.h
index 98eab2b..63451e5 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -184,7 +184,15 @@ class InputZeroMQReader : public InputReader, public RemoteControllable
std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
std::string m_uri;
size_t m_max_queued_frames = 0;
- ThreadsafeQueue<std::vector<uint8_t> > m_in_messages;
+
+ // Either must contain a full ETI frame, or one flag must be set
+ struct message_t {
+ std::vector<uint8_t> eti_frame;
+ bool overflow = false;
+ bool timeout = false;
+ bool fault = false;
+ };
+ ThreadsafeQueue<message_t> m_in_messages;
mutable std::mutex m_last_in_messages_size_mutex;
size_t m_last_in_messages_size = 0;
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index f6c3c34..3661748 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -78,7 +78,7 @@ InputZeroMQReader::~InputZeroMQReader()
m_running = false;
m_zmqcontext.close();
if (m_recv_thread.joinable()) {
- m_recv_thread.join();
+ m_recv_thread.join();
}
}
@@ -94,6 +94,7 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames)
m_max_queued_frames = max_queued_frames;
+ m_running = true;
m_recv_thread = std::thread(&InputZeroMQReader::RecvProcess, this);
return 0;
@@ -102,10 +103,10 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames)
int InputZeroMQReader::GetNextFrame(void* buffer)
{
if (not m_running) {
- return 0;
+ throw runtime_error("ZMQ input is not ready yet");
}
- vector<uint8_t> incoming;
+ message_t incoming;
/* Do some prebuffering because reads will happen in bursts
* (4 ETI frames in TM1) and we should make sure that
@@ -122,27 +123,29 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
}
etiLog.log(trace, "ZMQ,pop");
- if (not m_running) {
- throw zmq_input_overflow();
- }
-
+ constexpr size_t framesize = 6144;
- const size_t framesize = 6144;
- if (incoming.empty()) {
+ if (incoming.timeout) {
return 0;
}
- else if (incoming.size() == framesize) {
+ else if (incoming.fault) {
+ throw runtime_error("ZMQ input has terminated");
+ }
+ else if (incoming.overflow) {
+ throw zmq_input_overflow();
+ }
+ else if (incoming.eti_frame.size() == framesize) {
unique_lock<mutex> lock(m_last_in_messages_size_mutex);
m_last_in_messages_size--;
lock.unlock();
- memcpy(buffer, &incoming.front(), framesize);
+ memcpy(buffer, &incoming.eti_frame.front(), framesize);
+
+ return framesize;
}
else {
throw logic_error("ZMQ ETI not 6144");
}
-
- return framesize;
}
std::string InputZeroMQReader::GetPrintableInfo() const
@@ -153,10 +156,8 @@ std::string InputZeroMQReader::GetPrintableInfo() const
void InputZeroMQReader::RecvProcess()
{
set_thread_name("zmqinput");
- m_running = true;
size_t queue_size = 0;
- bool buffer_full = false;
zmq::socket_t subscriber(m_zmqcontext, ZMQ_SUB);
// zmq sockets are not thread safe. That's why
@@ -167,7 +168,7 @@ void InputZeroMQReader::RecvProcess()
try {
subscriber.connect(m_uri.c_str());
}
- catch (zmq::error_t& err) {
+ catch (const zmq::error_t& err) {
etiLog.level(error) << "Failed to connect ZeroMQ socket to '" <<
m_uri << "': '" << err.what() << "'";
success = false;
@@ -177,7 +178,7 @@ void InputZeroMQReader::RecvProcess()
// subscribe to all messages
subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0);
}
- catch (zmq::error_t& err) {
+ catch (const zmq::error_t& err) {
etiLog.level(error) << "Failed to subscribe ZeroMQ socket to messages: '" <<
err.what() << "'";
success = false;
@@ -192,28 +193,15 @@ void InputZeroMQReader::RecvProcess()
const int zmq_timeout_ms = 100;
const int num_events = zmq::poll(items, 1, zmq_timeout_ms);
if (num_events == 0) {
- // timeout is signalled by an empty buffer
- vector<uint8_t> buf;
- m_in_messages.push(buf);
+ message_t msg;
+ msg.timeout = true;
+ m_in_messages.push(move(msg));
continue;
}
subscriber.recv(&incoming);
- if (m_to_drop) {
- queue_size = m_in_messages.size();
- if (queue_size > 4) {
- m_in_messages.notify();
- }
- m_to_drop--;
- }
- else if (queue_size < m_max_queued_frames) {
- if (buffer_full) {
- etiLog.level(info) << "ZeroMQ buffer recovered: " <<
- queue_size << " elements";
- buffer_full = false;
- }
-
+ if (queue_size < m_max_queued_frames) {
if (incoming.size() < ZMQ_DAB_MESSAGE_T_HEADERSIZE) {
throw runtime_error("ZeroMQ packet too small for header");
}
@@ -251,7 +239,9 @@ void InputZeroMQReader::RecvProcess()
offset += framesize;
- queue_size = m_in_messages.push(move(buf));
+ message_t msg;
+ msg.eti_frame = move(buf);
+ queue_size = m_in_messages.push(move(msg));
etiLog.log(trace, "ZMQ,push %zu", queue_size);
unique_lock<mutex> lock(m_last_in_messages_size_mutex);
@@ -261,23 +251,11 @@ void InputZeroMQReader::RecvProcess()
}
}
else {
- m_in_messages.notify();
-
- if (!buffer_full) {
- etiLog.level(warn) << "ZeroMQ buffer overfull !";
-
- buffer_full = true;
- throw runtime_error("ZMQ input full");
- }
-
- queue_size = m_in_messages.size();
-
- /* Drop three more incoming ETI frames before
- * we start accepting them again, to guarantee
- * that we keep transmission frame vs. ETI frame
- * phase.
- */
- m_to_drop = 3;
+ message_t msg;
+ msg.overflow = true;
+ queue_size = m_in_messages.push(move(msg));
+ etiLog.level(warn) << "ZeroMQ buffer overfull !";
+ throw runtime_error("ZMQ input full");
}
if (queue_size < 5) {
@@ -285,19 +263,22 @@ void InputZeroMQReader::RecvProcess()
}
}
}
- catch (zmq::error_t& err) {
+ catch (const zmq::error_t& err) {
etiLog.level(error) << "ZeroMQ error during receive: '" << err.what() << "'";
}
- catch (std::exception& err) {
+ catch (const std::exception& err) {
etiLog.level(error) << "Exception during receive: '" << err.what() << "'";
}
+ m_running = false;
+
etiLog.level(info) << "ZeroMQ input worker terminated";
subscriber.close();
- m_running = false;
- m_in_messages.notify();
+ message_t msg;
+ msg.fault = true;
+ queue_size = m_in_messages.push(move(msg));
}
// =======================================
diff --git a/src/Log.cpp b/src/Log.cpp
index 3c5b181..4fc7ae3 100644
--- a/src/Log.cpp
+++ b/src/Log.cpp
@@ -27,6 +27,7 @@
#include <list>
#include <cstdarg>
+#include <cinttypes>
#include <chrono>
#include "Log.h"
@@ -173,7 +174,7 @@ LogTracer::LogTracer(const string& trace_filename) : name("TRACE")
m_trace_micros_startup = duration_cast<microseconds>(now).count();
fprintf(m_trace_file.get(),
- "0,TRACER,startup at %ld\n", m_trace_micros_startup);
+ "0,TRACER,startup at %" PRIu64 "\n", m_trace_micros_startup);
}
void LogTracer::log(log_level_t level, const std::string& message)
@@ -183,7 +184,7 @@ void LogTracer::log(log_level_t level, const std::string& message)
const auto now = steady_clock::now().time_since_epoch();
const auto micros = duration_cast<microseconds>(now).count();
- fprintf(m_trace_file.get(), "%ld,%s\n",
+ fprintf(m_trace_file.get(), "%" PRIu64 ",%s\n",
micros - m_trace_micros_startup,
message.c_str());
}
diff --git a/src/MemlessPoly.cpp b/src/MemlessPoly.cpp
index 07e4c1a..ef77c07 100644
--- a/src/MemlessPoly.cpp
+++ b/src/MemlessPoly.cpp
@@ -420,7 +420,7 @@ void MemlessPoly::set_parameter(const string& parameter, const string& value)
load_coefficients(coefs_fstream);
m_coefs_file = value;
}
- catch (std::runtime_error &e) {
+ catch (const std::runtime_error &e) {
throw ParameterError(e.what());
}
}
@@ -434,7 +434,7 @@ void MemlessPoly::set_parameter(const string& parameter, const string& value)
ofstream coefs_fstream(m_coefs_file);
coefs_fstream << value;
}
- catch (std::runtime_error &e) {
+ catch (const std::runtime_error &e) {
throw ParameterError(e.what());
}
}
diff --git a/src/output/UHD.cpp b/src/output/UHD.cpp
index 9358072..c6c500b 100644
--- a/src/output/UHD.cpp
+++ b/src/output/UHD.cpp
@@ -432,7 +432,7 @@ bool UHD::is_clk_source_ok(void) const
}
}
}
- catch (uhd::lookup_error &e) {
+ catch (const uhd::lookup_error &e) {
suppress_refclk_loss_check = true;
etiLog.log(warn, "OutputUHD: This USRP does not have mboard "
"sensor for ext clock loss. Check disabled.");
@@ -456,7 +456,7 @@ double UHD::get_temperature(void) const
try {
return std::round(m_usrp->get_tx_sensor("temp", 0).to_real());
}
- catch (uhd::lookup_error &e) {
+ catch (const uhd::lookup_error &e) {
return std::numeric_limits<double>::quiet_NaN();
}
}