summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/DabMod.cpp37
1 files changed, 37 insertions, 0 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index ad8101c..7ebde12 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -492,12 +492,21 @@ int launch_modulator(int argc, char* argv[])
return ret;
}
+struct zmq_input_timeout : public std::exception
+{
+ const char* what() const throw()
+ {
+ return "InputZMQ timeout";
+ }
+};
+
static run_modulator_state_t run_modulator(modulator_data& m)
{
auto ret = run_modulator_state_t::failure;
try {
bool first_frame = true;
int last_eti_fct = -1;
+ auto last_frame_received = chrono::steady_clock::now();
while (running) {
int framesize;
@@ -510,6 +519,8 @@ static run_modulator_state_t run_modulator(modulator_data& m)
break;
}
+ last_frame_received = chrono::steady_clock::now();
+
m.framecount++;
PDEBUG("*****************************************\n");
@@ -561,7 +572,28 @@ static run_modulator_state_t run_modulator(modulator_data& m)
else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) {
/* An empty frame marks a timeout. We ignore it, but we are
* now able to handle SIGINT properly.
+ *
+ * Also, we reconnect zmq every 10 seconds to avoid some
+ * issues, discussed in
+ * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection
+ *
+ * > It is possible that the PUB socket sees the error
+ * > while the SUB socket does not.
+ * >
+ * > The ZMTP RFC has a proposal for heartbeating that would
+ * > solve this problem. The current best solution is for
+ * > PUB sockets to send heartbeats (e.g. 1 per second) when
+ * > traffic is low, and for SUB sockets to disconnect /
+ * > reconnect if they stop getting these.
+ *
+ * We don't need a heartbeat, because our application is constant frame rate,
+ * the frames themselves can act as heartbeats.
*/
+
+ const auto now = chrono::steady_clock::now();
+ if (last_frame_received + chrono::seconds(10) < now) {
+ throw zmq_input_timeout();
+ }
}
#endif // defined(HAVE_ZEROMQ)
else if (dynamic_pointer_cast<InputTcpReader>(m.inputReader)) {
@@ -578,6 +610,11 @@ static run_modulator_state_t run_modulator(modulator_data& m)
}
}
}
+ catch (const zmq_input_timeout&) {
+ // The ZeroMQ input timeout
+ etiLog.level(warn) << "Timeout";
+ ret = run_modulator_state_t::again;
+ }
catch (const zmq_input_overflow& e) {
// The ZeroMQ input has overflowed its buffer
etiLog.level(warn) << e.what();