aboutsummaryrefslogtreecommitdiffstats
path: root/src/DabMod.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/DabMod.cpp')
-rw-r--r--src/DabMod.cpp58
1 files changed, 53 insertions, 5 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 8267060..d340b30 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -326,11 +326,11 @@ int launch_modulator(int argc, char* argv[])
// setMaxDelay wants number of AF packets, which correspond to 24ms ETI frames
ediInput.setMaxDelay(lroundf(mod_settings.edi_max_delay_ms / 24.0f));
}
- EdiUdpInput ediUdpInput(ediInput);
+ EdiTransport ediTransport(ediInput);
- ediUdpInput.Open(mod_settings.inputName);
- if (not ediUdpInput.isEnabled()) {
- throw runtime_error("inputTransport is edi, but ediUdpInput is not enabled");
+ ediTransport.Open(mod_settings.inputName);
+ if (not ediTransport.isEnabled()) {
+ throw runtime_error("inputTransport is edi, but ediTransport is not enabled");
}
Flowgraph flowgraph;
@@ -349,16 +349,27 @@ int launch_modulator(int argc, char* argv[])
bool first_frame = true;
+ auto frame_received_tp = chrono::steady_clock::now();
+
while (running) {
while (running and not ediReader.isFrameReady()) {
try {
- ediUdpInput.rxPacket();
+ bool packet_received = ediTransport.rxPacket();
+ if (packet_received) {
+ frame_received_tp = chrono::steady_clock::now();
+ }
}
catch (const std::runtime_error& e) {
etiLog.level(warn) << "EDI input: " << e.what();
running = 0;
break;
}
+
+ if (frame_received_tp + chrono::seconds(10) < chrono::steady_clock::now()) {
+ etiLog.level(error) << "No EDI data received in 10 seconds.";
+ running = 0;
+ break;
+ }
}
if (not running) {
@@ -512,12 +523,21 @@ int launch_modulator(int argc, char* argv[])
return ret;
}
+struct zmq_input_timeout : public std::exception
+{
+ const char* what() const throw()
+ {
+ return "InputZMQ timeout";
+ }
+};
+
static run_modulator_state_t run_modulator(modulator_data& m)
{
auto ret = run_modulator_state_t::failure;
try {
bool first_frame = true;
int last_eti_fct = -1;
+ auto last_frame_received = chrono::steady_clock::now();
while (running) {
int framesize;
@@ -530,6 +550,8 @@ static run_modulator_state_t run_modulator(modulator_data& m)
break;
}
+ last_frame_received = chrono::steady_clock::now();
+
m.framecount++;
PDEBUG("*****************************************\n");
@@ -581,7 +603,28 @@ static run_modulator_state_t run_modulator(modulator_data& m)
else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) {
/* An empty frame marks a timeout. We ignore it, but we are
* now able to handle SIGINT properly.
+ *
+ * Also, we reconnect zmq every 10 seconds to avoid some
+ * issues, discussed in
+ * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection
+ *
+ * > It is possible that the PUB socket sees the error
+ * > while the SUB socket does not.
+ * >
+ * > The ZMTP RFC has a proposal for heartbeating that would
+ * > solve this problem. The current best solution is for
+ * > PUB sockets to send heartbeats (e.g. 1 per second) when
+ * > traffic is low, and for SUB sockets to disconnect /
+ * > reconnect if they stop getting these.
+ *
+ * We don't need a heartbeat, because our application is constant frame rate,
+ * the frames themselves can act as heartbeats.
*/
+
+ const auto now = chrono::steady_clock::now();
+ if (last_frame_received + chrono::seconds(10) < now) {
+ throw zmq_input_timeout();
+ }
}
#endif // defined(HAVE_ZEROMQ)
else if (dynamic_pointer_cast<InputTcpReader>(m.inputReader)) {
@@ -598,6 +641,11 @@ static run_modulator_state_t run_modulator(modulator_data& m)
}
}
}
+ catch (const zmq_input_timeout&) {
+ // The ZeroMQ input timeout
+ etiLog.level(warn) << "Timeout";
+ ret = run_modulator_state_t::again;
+ }
catch (const zmq_input_overflow& e) {
// The ZeroMQ input has overflowed its buffer
etiLog.level(warn) << e.what();