diff options
Diffstat (limited to 'src/DabMod.cpp')
-rw-r--r-- | src/DabMod.cpp | 37 |
1 files changed, 37 insertions, 0 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp index ad8101c..7ebde12 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -492,12 +492,21 @@ int launch_modulator(int argc, char* argv[]) return ret; } +struct zmq_input_timeout : public std::exception +{ + const char* what() const throw() + { + return "InputZMQ timeout"; + } +}; + static run_modulator_state_t run_modulator(modulator_data& m) { auto ret = run_modulator_state_t::failure; try { bool first_frame = true; int last_eti_fct = -1; + auto last_frame_received = chrono::steady_clock::now(); while (running) { int framesize; @@ -510,6 +519,8 @@ static run_modulator_state_t run_modulator(modulator_data& m) break; } + last_frame_received = chrono::steady_clock::now(); + m.framecount++; PDEBUG("*****************************************\n"); @@ -561,7 +572,28 @@ static run_modulator_state_t run_modulator(modulator_data& m) else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) { /* An empty frame marks a timeout. We ignore it, but we are * now able to handle SIGINT properly. + * + * Also, we reconnect zmq every 10 seconds to avoid some + * issues, discussed in + * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection + * + * > It is possible that the PUB socket sees the error + * > while the SUB socket does not. + * > + * > The ZMTP RFC has a proposal for heartbeating that would + * > solve this problem. The current best solution is for + * > PUB sockets to send heartbeats (e.g. 1 per second) when + * > traffic is low, and for SUB sockets to disconnect / + * > reconnect if they stop getting these. + * + * We don't need a heartbeat, because our application is constant frame rate, + * the frames themselves can act as heartbeats. */ + + const auto now = chrono::steady_clock::now(); + if (last_frame_received + chrono::seconds(10) < now) { + throw zmq_input_timeout(); + } } #endif // defined(HAVE_ZEROMQ) else if (dynamic_pointer_cast<InputTcpReader>(m.inputReader)) { @@ -578,6 +610,11 @@ static run_modulator_state_t run_modulator(modulator_data& m) } } } + catch (const zmq_input_timeout&) { + // The ZeroMQ input timeout + etiLog.level(warn) << "Timeout"; + ret = run_modulator_state_t::again; + } catch (const zmq_input_overflow& e) { // The ZeroMQ input has overflowed its buffer etiLog.level(warn) << e.what(); |