diff options
Diffstat (limited to 'src/DabMod.cpp')
-rw-r--r-- | src/DabMod.cpp | 58 |
1 files changed, 53 insertions, 5 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 8267060..d340b30 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -326,11 +326,11 @@ int launch_modulator(int argc, char* argv[]) // setMaxDelay wants number of AF packets, which correspond to 24ms ETI frames ediInput.setMaxDelay(lroundf(mod_settings.edi_max_delay_ms / 24.0f)); } - EdiUdpInput ediUdpInput(ediInput); + EdiTransport ediTransport(ediInput); - ediUdpInput.Open(mod_settings.inputName); - if (not ediUdpInput.isEnabled()) { - throw runtime_error("inputTransport is edi, but ediUdpInput is not enabled"); + ediTransport.Open(mod_settings.inputName); + if (not ediTransport.isEnabled()) { + throw runtime_error("inputTransport is edi, but ediTransport is not enabled"); } Flowgraph flowgraph; @@ -349,16 +349,27 @@ int launch_modulator(int argc, char* argv[]) bool first_frame = true; + auto frame_received_tp = chrono::steady_clock::now(); + while (running) { while (running and not ediReader.isFrameReady()) { try { - ediUdpInput.rxPacket(); + bool packet_received = ediTransport.rxPacket(); + if (packet_received) { + frame_received_tp = chrono::steady_clock::now(); + } } catch (const std::runtime_error& e) { etiLog.level(warn) << "EDI input: " << e.what(); running = 0; break; } + + if (frame_received_tp + chrono::seconds(10) < chrono::steady_clock::now()) { + etiLog.level(error) << "No EDI data received in 10 seconds."; + running = 0; + break; + } } if (not running) { @@ -512,12 +523,21 @@ int launch_modulator(int argc, char* argv[]) return ret; } +struct zmq_input_timeout : public std::exception +{ + const char* what() const throw() + { + return "InputZMQ timeout"; + } +}; + static run_modulator_state_t run_modulator(modulator_data& m) { auto ret = run_modulator_state_t::failure; try { bool first_frame = true; int last_eti_fct = -1; + auto last_frame_received = chrono::steady_clock::now(); while (running) { int framesize; @@ -530,6 +550,8 @@ static run_modulator_state_t run_modulator(modulator_data& m) break; } + last_frame_received = chrono::steady_clock::now(); + m.framecount++; PDEBUG("*****************************************\n"); @@ -581,7 +603,28 @@ static run_modulator_state_t run_modulator(modulator_data& m) else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) { /* An empty frame marks a timeout. We ignore it, but we are * now able to handle SIGINT properly. + * + * Also, we reconnect zmq every 10 seconds to avoid some + * issues, discussed in + * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection + * + * > It is possible that the PUB socket sees the error + * > while the SUB socket does not. + * > + * > The ZMTP RFC has a proposal for heartbeating that would + * > solve this problem. The current best solution is for + * > PUB sockets to send heartbeats (e.g. 1 per second) when + * > traffic is low, and for SUB sockets to disconnect / + * > reconnect if they stop getting these. + * + * We don't need a heartbeat, because our application is constant frame rate, + * the frames themselves can act as heartbeats. */ + + const auto now = chrono::steady_clock::now(); + if (last_frame_received + chrono::seconds(10) < now) { + throw zmq_input_timeout(); + } } #endif // defined(HAVE_ZEROMQ) else if (dynamic_pointer_cast<InputTcpReader>(m.inputReader)) { @@ -598,6 +641,11 @@ static run_modulator_state_t run_modulator(modulator_data& m) } } } + catch (const zmq_input_timeout&) { + // The ZeroMQ input timeout + etiLog.level(warn) << "Timeout"; + ret = run_modulator_state_t::again; + } catch (const zmq_input_overflow& e) { // The ZeroMQ input has overflowed its buffer etiLog.level(warn) << e.what(); |