summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/DabMod.cpp42
-rw-r--r--src/InputZeroMQReader.cpp26
2 files changed, 47 insertions, 21 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index ce2f249..8a0ee03 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -92,18 +92,12 @@ void signalHandler(int signalNb)
struct modulator_data
{
- modulator_data() :
- inputReader(nullptr),
- framecount(0),
- flowgraph(nullptr),
- etiReader(nullptr) {}
-
- InputReader* inputReader;
+ std::shared_ptr<InputReader> inputReader;
Buffer data;
- uint64_t framecount;
+ uint64_t framecount = 0;
- Flowgraph* flowgraph;
- EtiReader* etiReader;
+ Flowgraph* flowgraph = nullptr;
+ EtiReader* etiReader = nullptr;
};
enum class run_modulator_state_t {
@@ -113,7 +107,7 @@ enum class run_modulator_state_t {
reconfigure // Some sort of change of configuration we cannot handle happened
};
-run_modulator_state_t run_modulator(modulator_data& m);
+static run_modulator_state_t run_modulator(modulator_data& m);
static void printModSettings(const mod_settings_t& mod_settings)
{
@@ -271,8 +265,6 @@ int launch_modulator(int argc, char* argv[])
printModSettings(mod_settings);
- modulator_data m;
-
shared_ptr<FormatConverter> format_converter;
if (mod_settings.useFileOutput and
(mod_settings.fileOutputFormat == "s8" or
@@ -379,7 +371,8 @@ int launch_modulator(int argc, char* argv[])
while (run_again) {
Flowgraph flowgraph;
- m.inputReader = inputReader.get();
+ modulator_data m;
+ m.inputReader = inputReader;
m.flowgraph = &flowgraph;
m.data.setLength(6144);
@@ -461,7 +454,7 @@ int launch_modulator(int argc, char* argv[])
return ret;
}
-run_modulator_state_t run_modulator(modulator_data& m)
+static run_modulator_state_t run_modulator(modulator_data& m)
{
auto ret = run_modulator_state_t::failure;
try {
@@ -498,13 +491,26 @@ run_modulator_state_t run_modulator(modulator_data& m)
}
}
if (framesize == 0) {
- etiLog.level(info) << "End of file reached.";
+ if (dynamic_pointer_cast<InputFileReader>(m.inputReader)) {
+ etiLog.level(info) << "End of file reached.";
+ running = 0;
+ ret = run_modulator_state_t::normal_end;
+ }
+#if defined(HAVE_ZEROMQ)
+ else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) {
+ /* An empty frame marks a timeout. We ignore it, but we are
+ * now able to handle SIGINT properly.
+ */
+ }
+#endif // defined(HAVE_ZEROMQ)
+ // No need to handle the TCP input in a special way to get SIGINT working,
+ // because recv() will return with EINTR.
}
else {
etiLog.level(error) << "Input read error.";
+ running = 0;
+ ret = run_modulator_state_t::normal_end;
}
- running = 0;
- ret = run_modulator_state_t::normal_end;
}
}
catch (const zmq_input_overflow& e) {
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index aa342d5..f6a816a 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -96,8 +96,6 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames)
int InputZeroMQReader::GetNextFrame(void* buffer)
{
- const size_t framesize = 6144;
-
if (not m_running) {
return 0;
}
@@ -123,7 +121,17 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
throw zmq_input_overflow();
}
- memcpy(buffer, &incoming->front(), framesize);
+
+ const size_t framesize = 6144;
+ if (incoming->empty()) {
+ return 0;
+ }
+ else if (incoming->size() == framesize) {
+ memcpy(buffer, &incoming->front(), framesize);
+ }
+ else {
+ throw logic_error("ZMQ ETI not 6144");
+ }
return framesize;
}
@@ -170,6 +178,18 @@ void InputZeroMQReader::RecvProcess()
if (success) try {
while (m_running) {
zmq::message_t incoming;
+ zmq::pollitem_t items[1];
+ items[0].socket = subscriber;
+ items[0].events = ZMQ_POLLIN;
+ const int zmq_timeout_ms = 100;
+ const int num_events = zmq::poll(items, 1, zmq_timeout_ms);
+ if (num_events == 0) {
+ // timeout is signalled by an empty buffer
+ auto buf = make_shared<vector<uint8_t> >();
+ m_in_messages.push(buf);
+ continue;
+ }
+
subscriber.recv(&incoming);
if (m_to_drop) {