aboutsummaryrefslogtreecommitdiffstats
path: root/src/DabMod.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/DabMod.cpp')
-rw-r--r--src/DabMod.cpp480
1 files changed, 227 insertions, 253 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 81882e4..922f9e4 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -59,7 +59,6 @@
#include "OutputZeroMQ.h"
#include "InputReader.h"
#include "PcDebug.h"
-#include "TimestampDecoder.h"
#include "FIRFilter.h"
#include "RemoteControl.h"
#include "ConfigParser.h"
@@ -94,12 +93,16 @@ void signalHandler(int signalNb)
struct modulator_data
{
+ // For ETI
std::shared_ptr<InputReader> inputReader;
- Buffer data;
- uint64_t framecount = 0;
+ std::shared_ptr<EtiReader> etiReader;
+
+ // For EDI
+ std::shared_ptr<EdiInput> ediInput;
- Flowgraph* flowgraph = nullptr;
- EtiReader* etiReader = nullptr;
+ // Common to both EDI and EDI
+ uint64_t framecount = 0;
+ Flowgraph *flowgraph = nullptr;
};
enum class run_modulator_state_t {
@@ -299,205 +302,137 @@ int launch_modulator(int argc, char* argv[])
etiLog.level(error) << "Could not set priority for modulator:" << r;
}
+ shared_ptr<InputReader> inputReader;
+ shared_ptr<EdiInput> ediInput;
+
if (mod_settings.inputTransport == "edi") {
-#ifdef HAVE_EDI
- EdiReader ediReader(mod_settings.tist_offset_s);
- EdiDecoder::ETIDecoder ediInput(ediReader, false);
- if (mod_settings.edi_max_delay_ms > 0.0f) {
- // setMaxDelay wants number of AF packets, which correspond to 24ms ETI frames
- ediInput.setMaxDelay(lroundf(mod_settings.edi_max_delay_ms / 24.0f));
- }
- EdiTransport ediTransport(ediInput);
+ ediInput = make_shared<EdiInput>(mod_settings.tist_offset_s, mod_settings.edi_max_delay_ms);
- ediTransport.Open(mod_settings.inputName);
- if (not ediTransport.isEnabled()) {
+ ediInput->ediTransport.Open(mod_settings.inputName);
+ if (not ediInput->ediTransport.isEnabled()) {
throw runtime_error("inputTransport is edi, but ediTransport is not enabled");
}
- Flowgraph flowgraph;
-
- auto modulator = make_shared<DabModulator>(ediReader, mod_settings);
- rcs.enrol(modulator.get());
+ }
+ else if (mod_settings.inputTransport == "file") {
+ auto inputFileReader = make_shared<InputFileReader>();
- if (format_converter) {
- flowgraph.connect(modulator, format_converter);
- flowgraph.connect(format_converter, output);
- }
- else {
- flowgraph.connect(modulator, output);
+ // Opening ETI input file
+ if (inputFileReader->Open(mod_settings.inputName, mod_settings.loop) == -1) {
+ throw std::runtime_error("Unable to open input");
}
- size_t framecount = 0;
-
- bool first_frame = true;
-
- auto frame_received_tp = chrono::steady_clock::now();
-
- while (running) {
- while (running and not ediReader.isFrameReady()) {
- try {
- bool packet_received = ediTransport.rxPacket();
- if (packet_received) {
- frame_received_tp = chrono::steady_clock::now();
- }
- }
- catch (const std::runtime_error& e) {
- etiLog.level(warn) << "EDI input: " << e.what();
- running = 0;
- break;
- }
-
- if (frame_received_tp + chrono::seconds(10) < chrono::steady_clock::now()) {
- etiLog.level(error) << "No EDI data received in 10 seconds.";
- running = 0;
- break;
- }
- }
-
- if (not running) {
- break;
- }
-
- if (first_frame) {
- if (ediReader.getFp() != 0) {
- // Do not start the flowgraph before we get to FP 0
- // to ensure all blocks are properly aligned.
- ediReader.clearFrame();
- continue;
- }
- else {
- first_frame = false;
- }
- }
-
- framecount++;
- flowgraph.run();
- ediReader.clearFrame();
-
- /* Check every once in a while if the remote control
- * is still working */
- if ((framecount % 250) == 0) {
- rcs.check_faults();
- }
- }
-#else
+ inputReader = inputFileReader;
+ }
+ else if (mod_settings.inputTransport == "zeromq") {
+#if !defined(HAVE_ZEROMQ)
throw std::runtime_error("Unable to open input: "
- "EDI input transport selected, but not compiled in!");
-#endif // HAVE_EDI
+ "ZeroMQ input transport selected, but not compiled in!");
+#else
+ auto inputZeroMQReader = make_shared<InputZeroMQReader>();
+ inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued);
+ rcs.enrol(inputZeroMQReader.get());
+ inputReader = inputZeroMQReader;
+#endif
+ }
+ else if (mod_settings.inputTransport == "tcp") {
+ auto inputTcpReader = make_shared<InputTcpReader>();
+ inputTcpReader->Open(mod_settings.inputName);
+ inputReader = inputTcpReader;
}
else {
- shared_ptr<InputReader> inputReader;
+ throw std::runtime_error("Unable to open input: "
+ "invalid input transport " + mod_settings.inputTransport + " selected!");
+ }
- if (mod_settings.inputTransport == "file") {
- auto inputFileReader = make_shared<InputFileReader>();
+ bool run_again = true;
- // Opening ETI input file
- if (inputFileReader->Open(mod_settings.inputName, mod_settings.loop) == -1) {
- throw std::runtime_error("Unable to open input");
- }
+ while (run_again) {
+ Flowgraph flowgraph(mod_settings.showProcessTime);
- inputReader = inputFileReader;
+ modulator_data m;
+ m.ediInput = ediInput;
+ m.inputReader = inputReader;
+ m.flowgraph = &flowgraph;
+
+ shared_ptr<DabModulator> modulator;
+ if (inputReader) {
+ m.etiReader = make_shared<EtiReader>(mod_settings.tist_offset_s);
+ modulator = make_shared<DabModulator>(*m.etiReader, mod_settings);
}
- else if (mod_settings.inputTransport == "zeromq") {
-#if !defined(HAVE_ZEROMQ)
- throw std::runtime_error("Unable to open input: "
- "ZeroMQ input transport selected, but not compiled in!");
-#else
- auto inputZeroMQReader = make_shared<InputZeroMQReader>();
- inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued);
- rcs.enrol(inputZeroMQReader.get());
- inputReader = inputZeroMQReader;
-#endif
+ else if (ediInput) {
+ modulator = make_shared<DabModulator>(ediInput->ediReader, mod_settings);
}
- else if (mod_settings.inputTransport == "tcp") {
- auto inputTcpReader = make_shared<InputTcpReader>();
- inputTcpReader->Open(mod_settings.inputName);
- inputReader = inputTcpReader;
+
+ rcs.enrol(modulator.get());
+
+ if (format_converter) {
+ flowgraph.connect(modulator, format_converter);
+ flowgraph.connect(format_converter, output);
}
else {
- throw std::runtime_error("Unable to open input: "
- "invalid input transport " + mod_settings.inputTransport + " selected!");
+ flowgraph.connect(modulator, output);
}
- bool run_again = true;
-
- while (run_again) {
- Flowgraph flowgraph;
-
- modulator_data m;
- m.inputReader = inputReader;
- m.flowgraph = &flowgraph;
- m.data.setLength(6144);
-
- EtiReader etiReader(mod_settings.tist_offset_s);
- m.etiReader = &etiReader;
-
- auto input = make_shared<InputMemory>(&m.data);
- auto modulator = make_shared<DabModulator>(etiReader, mod_settings);
- rcs.enrol(modulator.get());
-
- if (format_converter) {
- flowgraph.connect(modulator, format_converter);
- flowgraph.connect(format_converter, output);
- }
- else {
- flowgraph.connect(modulator, output);
- }
-
+ if (inputReader) {
etiLog.level(info) << inputReader->GetPrintableInfo();
+ }
- run_modulator_state_t st = run_modulator(m);
- etiLog.log(trace, "DABMOD,run_modulator() = %d", st);
-
- switch (st) {
- case run_modulator_state_t::failure:
- etiLog.level(error) << "Modulator failure.";
- run_again = false;
- ret = 1;
- break;
- case run_modulator_state_t::again:
- etiLog.level(warn) << "Restart modulator.";
- run_again = false;
- if (auto in = dynamic_pointer_cast<InputFileReader>(inputReader)) {
- if (in->Open(mod_settings.inputName, mod_settings.loop) == -1) {
- etiLog.level(error) << "Unable to open input file!";
- ret = 1;
- }
- else {
- run_again = true;
- }
+ run_modulator_state_t st = run_modulator(m);
+ etiLog.log(trace, "DABMOD,run_modulator() = %d", st);
+
+ switch (st) {
+ case run_modulator_state_t::failure:
+ etiLog.level(error) << "Modulator failure.";
+ run_again = false;
+ ret = 1;
+ break;
+ case run_modulator_state_t::again:
+ etiLog.level(warn) << "Restart modulator.";
+ run_again = false;
+ if (auto in = dynamic_pointer_cast<InputFileReader>(inputReader)) {
+ if (in->Open(mod_settings.inputName, mod_settings.loop) == -1) {
+ etiLog.level(error) << "Unable to open input file!";
+ ret = 1;
}
-#if defined(HAVE_ZEROMQ)
- else if (auto in_zmq = dynamic_pointer_cast<InputZeroMQReader>(inputReader)) {
+ else {
run_again = true;
- // Create a new input reader
- rcs.remove_controllable(in_zmq.get());
- auto inputZeroMQReader = make_shared<InputZeroMQReader>();
- inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued);
- rcs.enrol(inputZeroMQReader.get());
- inputReader = inputZeroMQReader;
}
+ }
+#if defined(HAVE_ZEROMQ)
+ else if (auto in_zmq = dynamic_pointer_cast<InputZeroMQReader>(inputReader)) {
+ run_again = true;
+ // Create a new input reader
+ rcs.remove_controllable(in_zmq.get());
+ auto inputZeroMQReader = make_shared<InputZeroMQReader>();
+ inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued);
+ rcs.enrol(inputZeroMQReader.get());
+ inputReader = inputZeroMQReader;
+ }
#endif
- else if (dynamic_pointer_cast<InputTcpReader>(inputReader)) {
- // Keep the same inputReader, as there is no input buffer overflow
- run_again = true;
- }
- break;
- case run_modulator_state_t::reconfigure:
- etiLog.level(warn) << "Detected change in ensemble configuration.";
- /* We can keep the input in this care */
+ else if (dynamic_pointer_cast<InputTcpReader>(inputReader)) {
+ // Keep the same inputReader, as there is no input buffer overflow
run_again = true;
- break;
- case run_modulator_state_t::normal_end:
- default:
- etiLog.level(info) << "modulator stopped.";
- ret = 0;
- run_again = false;
- break;
- }
-
- etiLog.level(info) << m.framecount << " DAB frames encoded";
- etiLog.level(info) << ((float)m.framecount * 0.024f) << " seconds encoded";
+ }
+ else if (ediInput) {
+ // In EDI, keep the same input
+ run_again = true;
+ }
+ break;
+ case run_modulator_state_t::reconfigure:
+ etiLog.level(warn) << "Detected change in ensemble configuration.";
+ /* We can keep the input in this case */
+ run_again = true;
+ break;
+ case run_modulator_state_t::normal_end:
+ default:
+ etiLog.level(info) << "modulator stopped.";
+ ret = 0;
+ run_again = false;
+ break;
}
+
+ etiLog.level(info) << m.framecount << " DAB frames encoded";
+ etiLog.level(info) << ((float)m.framecount * 0.024f) << " seconds encoded";
}
etiLog.level(info) << "Terminating";
@@ -516,57 +451,142 @@ static run_modulator_state_t run_modulator(modulator_data& m)
{
auto ret = run_modulator_state_t::failure;
try {
- bool first_frame = true;
int last_eti_fct = -1;
auto last_frame_received = chrono::steady_clock::now();
+ Buffer data;
+ if (m.inputReader) {
+ data.setLength(6144);
+ }
while (running) {
- int framesize;
-
- PDEBUG("*****************************************\n");
- PDEBUG("* Starting main loop\n");
- PDEBUG("*****************************************\n");
- while ((framesize = m.inputReader->GetNextFrame(m.data.getData())) > 0) {
- if (!running) {
- break;
- }
+ while (true) {
+ unsigned fct = 0;
+ unsigned fp = 0;
+
+ /* Load ETI data from the source */
+ if (m.inputReader) {
+ int framesize = m.inputReader->GetNextFrame(data.getData());
+
+ if (framesize == 0) {
+ if (dynamic_pointer_cast<InputFileReader>(m.inputReader)) {
+ etiLog.level(info) << "End of file reached.";
+ running = 0;
+ ret = run_modulator_state_t::normal_end;
+ break;
+ }
+#if defined(HAVE_ZEROMQ)
+ else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) {
+ /* An empty frame marks a timeout. We ignore it, but we are
+ * now able to handle SIGINT properly.
+ *
+ * Also, we reconnect zmq every 10 seconds to avoid some
+ * issues, discussed in
+ * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection
+ *
+ * > It is possible that the PUB socket sees the error
+ * > while the SUB socket does not.
+ * >
+ * > The ZMTP RFC has a proposal for heartbeating that would
+ * > solve this problem. The current best solution is for
+ * > PUB sockets to send heartbeats (e.g. 1 per second) when
+ * > traffic is low, and for SUB sockets to disconnect /
+ * > reconnect if they stop getting these.
+ *
+ * We don't need a heartbeat, because our application is constant frame rate,
+ * the frames themselves can act as heartbeats.
+ */
+
+ const auto now = chrono::steady_clock::now();
+ if (last_frame_received + chrono::seconds(10) < now) {
+ throw zmq_input_timeout();
+ }
+ }
+#endif // defined(HAVE_ZEROMQ)
+ else if (dynamic_pointer_cast<InputTcpReader>(m.inputReader)) {
+ /* Same as for ZeroMQ */
+ }
+ else {
+ throw logic_error("Unhandled framesize==0!");
+ }
+ }
+ else if (framesize < 0) {
+ etiLog.level(error) << "Input read error.";
+ running = 0;
+ ret = run_modulator_state_t::normal_end;
+ break;
+ }
+
+ const int eti_bytes_read = m.etiReader->loadEtiData(data);
+ if ((size_t)eti_bytes_read != data.getLength()) {
+ etiLog.level(error) << "ETI frame incompletely read";
+ throw std::runtime_error("ETI read error");
+ }
- last_frame_received = chrono::steady_clock::now();
+ fct = m.etiReader->getFct();
+ fp = m.etiReader->getFp();
+ }
+ else if (m.ediInput) {
+ while (running and not m.ediInput->ediReader.isFrameReady()) {
+ try {
+ bool packet_received = m.ediInput->ediTransport.rxPacket();
+ if (packet_received) {
+ last_frame_received = chrono::steady_clock::now();
+ }
+ }
+ catch (const std::runtime_error& e) {
+ etiLog.level(warn) << "EDI input: " << e.what();
+ running = 0;
+ break;
+ }
- m.framecount++;
+ if (last_frame_received + chrono::seconds(10) < chrono::steady_clock::now()) {
+ etiLog.level(error) << "No EDI data received in 10 seconds.";
+ running = 0;
+ break;
+ }
+ }
- PDEBUG("*****************************************\n");
- PDEBUG("* Read frame %lu\n", m.framecount);
- PDEBUG("*****************************************\n");
+ if (!running) {
+ break;
+ }
- const int eti_bytes_read = m.etiReader->loadEtiData(m.data);
- if ((size_t)eti_bytes_read != m.data.getLength()) {
- etiLog.level(error) << "ETI frame incompletely read";
- throw std::runtime_error("ETI read error");
+ fct = m.ediInput->ediReader.getFct();
+ fp = m.ediInput->ediReader.getFp();
}
- if (first_frame) {
- if (m.etiReader->getFp() != 0) {
+ const unsigned expected_fct = (last_eti_fct + 1) % 250;
+ if (last_eti_fct == -1) {
+ if (fp != 0) {
// Do not start the flowgraph before we get to FP 0
// to ensure all blocks are properly aligned.
+ if (m.ediInput) {
+ m.ediInput->ediReader.clearFrame();
+ }
continue;
}
else {
- first_frame = false;
+ last_eti_fct = fct;
+ m.framecount++;
+ m.flowgraph->run();
}
}
-
- // Check for ETI FCT continuity
- const unsigned expected_fct = (last_eti_fct + 1) % 250;
- const unsigned fct = m.etiReader->getFct();
- if (last_eti_fct != -1 and expected_fct != fct) {
+ else if (fct == expected_fct) {
+ last_eti_fct = fct;
+ m.framecount++;
+ m.flowgraph->run();
+ }
+ else {
etiLog.level(info) << "ETI FCT discontinuity, expected " <<
- expected_fct << " received " << m.etiReader->getFct();
+ expected_fct << " received " << fct;
+ if (m.ediInput) {
+ m.ediInput->ediReader.clearFrame();
+ }
return run_modulator_state_t::again;
}
- last_eti_fct = fct;
- m.flowgraph->run();
+ if (m.ediInput) {
+ m.ediInput->ediReader.clearFrame();
+ }
/* Check every once in a while if the remote control
* is still working */
@@ -574,52 +594,6 @@ static run_modulator_state_t run_modulator(modulator_data& m)
rcs.check_faults();
}
}
- if (framesize == 0) {
- if (dynamic_pointer_cast<InputFileReader>(m.inputReader)) {
- etiLog.level(info) << "End of file reached.";
- running = 0;
- ret = run_modulator_state_t::normal_end;
- }
-#if defined(HAVE_ZEROMQ)
- else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) {
- /* An empty frame marks a timeout. We ignore it, but we are
- * now able to handle SIGINT properly.
- *
- * Also, we reconnect zmq every 10 seconds to avoid some
- * issues, discussed in
- * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection
- *
- * > It is possible that the PUB socket sees the error
- * > while the SUB socket does not.
- * >
- * > The ZMTP RFC has a proposal for heartbeating that would
- * > solve this problem. The current best solution is for
- * > PUB sockets to send heartbeats (e.g. 1 per second) when
- * > traffic is low, and for SUB sockets to disconnect /
- * > reconnect if they stop getting these.
- *
- * We don't need a heartbeat, because our application is constant frame rate,
- * the frames themselves can act as heartbeats.
- */
-
- const auto now = chrono::steady_clock::now();
- if (last_frame_received + chrono::seconds(10) < now) {
- throw zmq_input_timeout();
- }
- }
-#endif // defined(HAVE_ZEROMQ)
- else if (dynamic_pointer_cast<InputTcpReader>(m.inputReader)) {
- /* Same as for ZeroMQ */
- }
- else {
- throw logic_error("Unhandled framesize==0!");
- }
- }
- else {
- etiLog.level(error) << "Input read error.";
- running = 0;
- ret = run_modulator_state_t::normal_end;
- }
}
}
catch (const zmq_input_timeout&) {