diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/DabMod.cpp | 220 |
1 files changed, 109 insertions, 111 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 436ce9a..63a1e98 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -479,143 +479,141 @@ static run_modulator_state_t run_modulator(modulator_data& m) } while (running) { - while (true) { - unsigned fct = 0; - unsigned fp = 0; - - /* Load ETI data from the source */ - if (m.inputReader) { - int framesize = m.inputReader->GetNextFrame(data.getData()); - - if (framesize == 0) { - if (dynamic_pointer_cast<InputFileReader>(m.inputReader)) { - etiLog.level(info) << "End of file reached."; - running = 0; - ret = run_modulator_state_t::normal_end; - break; - } -#if defined(HAVE_ZEROMQ) - else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) { - /* An empty frame marks a timeout. We ignore it, but we are - * now able to handle SIGINT properly. - * - * Also, we reconnect zmq every 10 seconds to avoid some - * issues, discussed in - * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection - * - * > It is possible that the PUB socket sees the error - * > while the SUB socket does not. - * > - * > The ZMTP RFC has a proposal for heartbeating that would - * > solve this problem. The current best solution is for - * > PUB sockets to send heartbeats (e.g. 1 per second) when - * > traffic is low, and for SUB sockets to disconnect / - * > reconnect if they stop getting these. - * - * We don't need a heartbeat, because our application is constant frame rate, - * the frames themselves can act as heartbeats. - */ - - const auto now = chrono::steady_clock::now(); - if (last_frame_received + chrono::seconds(10) < now) { - throw zmq_input_timeout(); - } - } -#endif // defined(HAVE_ZEROMQ) - else if (dynamic_pointer_cast<InputTcpReader>(m.inputReader)) { - /* Same as for ZeroMQ */ - } - else { - throw logic_error("Unhandled framesize==0!"); - } - continue; - } - else if (framesize < 0) { - etiLog.level(error) << "Input read error."; + unsigned fct = 0; + unsigned fp = 0; + + /* Load ETI data from the source */ + if (m.inputReader) { + int framesize = m.inputReader->GetNextFrame(data.getData()); + + if (framesize == 0) { + if (dynamic_pointer_cast<InputFileReader>(m.inputReader)) { + etiLog.level(info) << "End of file reached."; running = 0; ret = run_modulator_state_t::normal_end; break; } - - const int eti_bytes_read = m.etiReader->loadEtiData(data); - if ((size_t)eti_bytes_read != data.getLength()) { - etiLog.level(error) << "ETI frame incompletely read"; - throw std::runtime_error("ETI read error"); +#if defined(HAVE_ZEROMQ) + else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) { + /* An empty frame marks a timeout. We ignore it, but we are + * now able to handle SIGINT properly. + * + * Also, we reconnect zmq every 10 seconds to avoid some + * issues, discussed in + * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection + * + * > It is possible that the PUB socket sees the error + * > while the SUB socket does not. + * > + * > The ZMTP RFC has a proposal for heartbeating that would + * > solve this problem. The current best solution is for + * > PUB sockets to send heartbeats (e.g. 1 per second) when + * > traffic is low, and for SUB sockets to disconnect / + * > reconnect if they stop getting these. + * + * We don't need a heartbeat, because our application is constant frame rate, + * the frames themselves can act as heartbeats. + */ + + const auto now = chrono::steady_clock::now(); + if (last_frame_received + chrono::seconds(10) < now) { + throw zmq_input_timeout(); + } } +#endif // defined(HAVE_ZEROMQ) + else if (dynamic_pointer_cast<InputTcpReader>(m.inputReader)) { + /* Same as for ZeroMQ */ + } + else { + throw logic_error("Unhandled framesize==0!"); + } + continue; + } + else if (framesize < 0) { + etiLog.level(error) << "Input read error."; + running = 0; + ret = run_modulator_state_t::normal_end; + break; + } - last_frame_received = chrono::steady_clock::now(); - - fct = m.etiReader->getFct(); - fp = m.etiReader->getFp(); + const int eti_bytes_read = m.etiReader->loadEtiData(data); + if ((size_t)eti_bytes_read != data.getLength()) { + etiLog.level(error) << "ETI frame incompletely read"; + throw std::runtime_error("ETI read error"); } - else if (m.ediInput) { - while (running and not m.ediInput->ediReader.isFrameReady()) { - try { - bool packet_received = m.ediInput->ediTransport.rxPacket(); - if (packet_received) { - last_frame_received = chrono::steady_clock::now(); - } - } - catch (const std::runtime_error& e) { - etiLog.level(warn) << "EDI input: " << e.what(); - running = 0; - break; - } - if (last_frame_received + chrono::seconds(10) < chrono::steady_clock::now()) { - etiLog.level(error) << "No EDI data received in 10 seconds."; - running = 0; - break; + last_frame_received = chrono::steady_clock::now(); + + fct = m.etiReader->getFct(); + fp = m.etiReader->getFp(); + } + else if (m.ediInput) { + while (running and not m.ediInput->ediReader.isFrameReady()) { + try { + bool packet_received = m.ediInput->ediTransport.rxPacket(); + if (packet_received) { + last_frame_received = chrono::steady_clock::now(); } } + catch (const std::runtime_error& e) { + etiLog.level(warn) << "EDI input: " << e.what(); + running = 0; + break; + } - if (!running) { + if (last_frame_received + chrono::seconds(10) < chrono::steady_clock::now()) { + etiLog.level(error) << "No EDI data received in 10 seconds."; + running = 0; break; } + } - fct = m.ediInput->ediReader.getFct(); - fp = m.ediInput->ediReader.getFp(); + if (!running) { + break; } - const unsigned expected_fct = (last_eti_fct + 1) % 250; - if (last_eti_fct == -1) { - if (fp != 0) { - // Do not start the flowgraph before we get to FP 0 - // to ensure all blocks are properly aligned. - if (m.ediInput) { - m.ediInput->ediReader.clearFrame(); - } - continue; - } - else { - last_eti_fct = fct; - m.framecount++; - m.flowgraph->run(); + fct = m.ediInput->ediReader.getFct(); + fp = m.ediInput->ediReader.getFp(); + } + + const unsigned expected_fct = (last_eti_fct + 1) % 250; + if (last_eti_fct == -1) { + if (fp != 0) { + // Do not start the flowgraph before we get to FP 0 + // to ensure all blocks are properly aligned. + if (m.ediInput) { + m.ediInput->ediReader.clearFrame(); } + continue; } - else if (fct == expected_fct) { + else { last_eti_fct = fct; m.framecount++; m.flowgraph->run(); } - else { - etiLog.level(info) << "ETI FCT discontinuity, expected " << - expected_fct << " received " << fct; - if (m.ediInput) { - m.ediInput->ediReader.clearFrame(); - } - return run_modulator_state_t::again; - } - + } + else if (fct == expected_fct) { + last_eti_fct = fct; + m.framecount++; + m.flowgraph->run(); + } + else { + etiLog.level(info) << "ETI FCT discontinuity, expected " << + expected_fct << " received " << fct; if (m.ediInput) { m.ediInput->ediReader.clearFrame(); } + return run_modulator_state_t::again; + } - /* Check every once in a while if the remote control - * is still working */ - if ((m.framecount % 250) == 0) { - rcs.check_faults(); - } + if (m.ediInput) { + m.ediInput->ediReader.clearFrame(); + } + + /* Check every once in a while if the remote control + * is still working */ + if ((m.framecount % 250) == 0) { + rcs.check_faults(); } } } |