aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Buffer.cpp3
-rw-r--r--src/ConfigParser.cpp3
-rw-r--r--src/ConfigParser.h1
-rw-r--r--src/DabMod.cpp481
-rw-r--r--src/DabModulator.cpp2
-rw-r--r--src/EtiReader.cpp33
-rw-r--r--src/EtiReader.h26
-rw-r--r--src/Flowgraph.cpp9
-rw-r--r--src/Flowgraph.h7
-rw-r--r--src/GainControl.cpp3
-rw-r--r--src/GuardIntervalInserter.cpp3
-rw-r--r--src/InputReader.h10
-rw-r--r--src/Log.cpp191
-rw-r--r--src/Log.h200
-rw-r--r--src/MemlessPoly.cpp3
-rw-r--r--src/MemlessPoly.h8
-rw-r--r--src/OutputFile.cpp56
-rw-r--r--src/OutputFile.h6
-rw-r--r--src/OutputMemory.cpp1
-rw-r--r--src/RemoteControl.cpp588
-rw-r--r--src/RemoteControl.h259
-rw-r--r--src/Socket.cpp275
-rw-r--r--src/Socket.h104
-rw-r--r--src/SubchannelSource.cpp4
-rw-r--r--src/SubchannelSource.h2
-rw-r--r--src/ThreadsafeQueue.h178
-rw-r--r--src/TimestampDecoder.cpp1
-rw-r--r--src/Utils.cpp3
-rw-r--r--src/output/Feedback.cpp7
-rw-r--r--src/output/SDR.cpp8
30 files changed, 358 insertions, 2117 deletions
diff --git a/src/Buffer.cpp b/src/Buffer.cpp
index 002c1eb..ab50f1a 100644
--- a/src/Buffer.cpp
+++ b/src/Buffer.cpp
@@ -97,6 +97,9 @@ Buffer& Buffer::operator=(Buffer&& other)
if (&other != this) {
m_len = other.m_len;
m_capacity = other.m_capacity;
+ if (m_data != nullptr) {
+ free(m_data);
+ }
m_data = other.m_data;
other.m_len = 0;
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp
index dd8a150..73e51dd 100644
--- a/src/ConfigParser.cpp
+++ b/src/ConfigParser.cpp
@@ -30,6 +30,7 @@
#endif
#include <cstdint>
+#include <algorithm>
#include "INIReader.h"
@@ -143,6 +144,8 @@ static void parse_configfile(
etiLog.register_backend(make_shared<LogTracer>(trace_filename));
}
+ mod_settings.showProcessTime = pt.GetInteger("log.show_process_time",
+ mod_settings.showProcessTime);
// modulator parameters:
const string gainMode_setting = pt.Get("modulator.gainmode", "var");
diff --git a/src/ConfigParser.h b/src/ConfigParser.h
index 23b0528..33d7824 100644
--- a/src/ConfigParser.h
+++ b/src/ConfigParser.h
@@ -88,6 +88,7 @@ struct mod_settings_t {
Output::SDRDeviceConfig sdr_device_config;
#endif
+ bool showProcessTime = true;
};
void parse_args(int argc, char **argv, mod_settings_t& mod_settings);
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index d340b30..d624a12 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -37,6 +37,7 @@
#include <cstdlib>
#include <stdexcept>
#include <cstdio>
+#include <cstring>
#include <cstddef>
#include <sys/stat.h>
#include <signal.h>
@@ -59,7 +60,6 @@
#include "OutputZeroMQ.h"
#include "InputReader.h"
#include "PcDebug.h"
-#include "TimestampDecoder.h"
#include "FIRFilter.h"
#include "RemoteControl.h"
#include "ConfigParser.h"
@@ -94,12 +94,16 @@ void signalHandler(int signalNb)
struct modulator_data
{
+ // For ETI
std::shared_ptr<InputReader> inputReader;
- Buffer data;
- uint64_t framecount = 0;
+ std::shared_ptr<EtiReader> etiReader;
+
+ // For EDI
+ std::shared_ptr<EdiInput> ediInput;
- Flowgraph* flowgraph = nullptr;
- EtiReader* etiReader = nullptr;
+ // Common to both EDI and EDI
+ uint64_t framecount = 0;
+ Flowgraph *flowgraph = nullptr;
};
enum class run_modulator_state_t {
@@ -318,205 +322,137 @@ int launch_modulator(int argc, char* argv[])
etiLog.level(error) << "Could not set priority for modulator:" << r;
}
+ shared_ptr<InputReader> inputReader;
+ shared_ptr<EdiInput> ediInput;
+
if (mod_settings.inputTransport == "edi") {
-#ifdef HAVE_EDI
- EdiReader ediReader(mod_settings.tist_offset_s);
- EdiDecoder::ETIDecoder ediInput(ediReader, false);
- if (mod_settings.edi_max_delay_ms > 0.0f) {
- // setMaxDelay wants number of AF packets, which correspond to 24ms ETI frames
- ediInput.setMaxDelay(lroundf(mod_settings.edi_max_delay_ms / 24.0f));
- }
- EdiTransport ediTransport(ediInput);
+ ediInput = make_shared<EdiInput>(mod_settings.tist_offset_s, mod_settings.edi_max_delay_ms);
- ediTransport.Open(mod_settings.inputName);
- if (not ediTransport.isEnabled()) {
+ ediInput->ediTransport.Open(mod_settings.inputName);
+ if (not ediInput->ediTransport.isEnabled()) {
throw runtime_error("inputTransport is edi, but ediTransport is not enabled");
}
- Flowgraph flowgraph;
-
- auto modulator = make_shared<DabModulator>(ediReader, mod_settings);
- rcs.enrol(modulator.get());
+ }
+ else if (mod_settings.inputTransport == "file") {
+ auto inputFileReader = make_shared<InputFileReader>();
- if (format_converter) {
- flowgraph.connect(modulator, format_converter);
- flowgraph.connect(format_converter, output);
- }
- else {
- flowgraph.connect(modulator, output);
+ // Opening ETI input file
+ if (inputFileReader->Open(mod_settings.inputName, mod_settings.loop) == -1) {
+ throw std::runtime_error("Unable to open input");
}
- size_t framecount = 0;
-
- bool first_frame = true;
-
- auto frame_received_tp = chrono::steady_clock::now();
-
- while (running) {
- while (running and not ediReader.isFrameReady()) {
- try {
- bool packet_received = ediTransport.rxPacket();
- if (packet_received) {
- frame_received_tp = chrono::steady_clock::now();
- }
- }
- catch (const std::runtime_error& e) {
- etiLog.level(warn) << "EDI input: " << e.what();
- running = 0;
- break;
- }
-
- if (frame_received_tp + chrono::seconds(10) < chrono::steady_clock::now()) {
- etiLog.level(error) << "No EDI data received in 10 seconds.";
- running = 0;
- break;
- }
- }
-
- if (not running) {
- break;
- }
-
- if (first_frame) {
- if (ediReader.getFp() != 0) {
- // Do not start the flowgraph before we get to FP 0
- // to ensure all blocks are properly aligned.
- ediReader.clearFrame();
- continue;
- }
- else {
- first_frame = false;
- }
- }
-
- framecount++;
- flowgraph.run();
- ediReader.clearFrame();
-
- /* Check every once in a while if the remote control
- * is still working */
- if ((framecount % 250) == 0) {
- rcs.check_faults();
- }
- }
-#else
+ inputReader = inputFileReader;
+ }
+ else if (mod_settings.inputTransport == "zeromq") {
+#if !defined(HAVE_ZEROMQ)
throw std::runtime_error("Unable to open input: "
- "EDI input transport selected, but not compiled in!");
-#endif // HAVE_EDI
+ "ZeroMQ input transport selected, but not compiled in!");
+#else
+ auto inputZeroMQReader = make_shared<InputZeroMQReader>();
+ inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued);
+ rcs.enrol(inputZeroMQReader.get());
+ inputReader = inputZeroMQReader;
+#endif
+ }
+ else if (mod_settings.inputTransport == "tcp") {
+ auto inputTcpReader = make_shared<InputTcpReader>();
+ inputTcpReader->Open(mod_settings.inputName);
+ inputReader = inputTcpReader;
}
else {
- shared_ptr<InputReader> inputReader;
+ throw std::runtime_error("Unable to open input: "
+ "invalid input transport " + mod_settings.inputTransport + " selected!");
+ }
- if (mod_settings.inputTransport == "file") {
- auto inputFileReader = make_shared<InputFileReader>();
+ bool run_again = true;
- // Opening ETI input file
- if (inputFileReader->Open(mod_settings.inputName, mod_settings.loop) == -1) {
- throw std::runtime_error("Unable to open input");
- }
+ while (run_again) {
+ Flowgraph flowgraph(mod_settings.showProcessTime);
- inputReader = inputFileReader;
+ modulator_data m;
+ m.ediInput = ediInput;
+ m.inputReader = inputReader;
+ m.flowgraph = &flowgraph;
+
+ shared_ptr<DabModulator> modulator;
+ if (inputReader) {
+ m.etiReader = make_shared<EtiReader>(mod_settings.tist_offset_s);
+ modulator = make_shared<DabModulator>(*m.etiReader, mod_settings);
}
- else if (mod_settings.inputTransport == "zeromq") {
-#if !defined(HAVE_ZEROMQ)
- throw std::runtime_error("Unable to open input: "
- "ZeroMQ input transport selected, but not compiled in!");
-#else
- auto inputZeroMQReader = make_shared<InputZeroMQReader>();
- inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued);
- rcs.enrol(inputZeroMQReader.get());
- inputReader = inputZeroMQReader;
-#endif
+ else if (ediInput) {
+ modulator = make_shared<DabModulator>(ediInput->ediReader, mod_settings);
}
- else if (mod_settings.inputTransport == "tcp") {
- auto inputTcpReader = make_shared<InputTcpReader>();
- inputTcpReader->Open(mod_settings.inputName);
- inputReader = inputTcpReader;
+
+ rcs.enrol(modulator.get());
+
+ if (format_converter) {
+ flowgraph.connect(modulator, format_converter);
+ flowgraph.connect(format_converter, output);
}
else {
- throw std::runtime_error("Unable to open input: "
- "invalid input transport " + mod_settings.inputTransport + " selected!");
+ flowgraph.connect(modulator, output);
}
- bool run_again = true;
-
- while (run_again) {
- Flowgraph flowgraph;
-
- modulator_data m;
- m.inputReader = inputReader;
- m.flowgraph = &flowgraph;
- m.data.setLength(6144);
-
- EtiReader etiReader(mod_settings.tist_offset_s);
- m.etiReader = &etiReader;
-
- auto input = make_shared<InputMemory>(&m.data);
- auto modulator = make_shared<DabModulator>(etiReader, mod_settings);
- rcs.enrol(modulator.get());
-
- if (format_converter) {
- flowgraph.connect(modulator, format_converter);
- flowgraph.connect(format_converter, output);
- }
- else {
- flowgraph.connect(modulator, output);
- }
-
+ if (inputReader) {
etiLog.level(info) << inputReader->GetPrintableInfo();
+ }
- run_modulator_state_t st = run_modulator(m);
- etiLog.log(trace, "DABMOD,run_modulator() = %d", st);
-
- switch (st) {
- case run_modulator_state_t::failure:
- etiLog.level(error) << "Modulator failure.";
- run_again = false;
- ret = 1;
- break;
- case run_modulator_state_t::again:
- etiLog.level(warn) << "Restart modulator.";
- run_again = false;
- if (auto in = dynamic_pointer_cast<InputFileReader>(inputReader)) {
- if (in->Open(mod_settings.inputName, mod_settings.loop) == -1) {
- etiLog.level(error) << "Unable to open input file!";
- ret = 1;
- }
- else {
- run_again = true;
- }
+ run_modulator_state_t st = run_modulator(m);
+ etiLog.log(trace, "DABMOD,run_modulator() = %d", st);
+
+ switch (st) {
+ case run_modulator_state_t::failure:
+ etiLog.level(error) << "Modulator failure.";
+ run_again = false;
+ ret = 1;
+ break;
+ case run_modulator_state_t::again:
+ etiLog.level(warn) << "Restart modulator.";
+ run_again = false;
+ if (auto in = dynamic_pointer_cast<InputFileReader>(inputReader)) {
+ if (in->Open(mod_settings.inputName, mod_settings.loop) == -1) {
+ etiLog.level(error) << "Unable to open input file!";
+ ret = 1;
}
-#if defined(HAVE_ZEROMQ)
- else if (auto in_zmq = dynamic_pointer_cast<InputZeroMQReader>(inputReader)) {
+ else {
run_again = true;
- // Create a new input reader
- rcs.remove_controllable(in_zmq.get());
- auto inputZeroMQReader = make_shared<InputZeroMQReader>();
- inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued);
- rcs.enrol(inputZeroMQReader.get());
- inputReader = inputZeroMQReader;
}
+ }
+#if defined(HAVE_ZEROMQ)
+ else if (auto in_zmq = dynamic_pointer_cast<InputZeroMQReader>(inputReader)) {
+ run_again = true;
+ // Create a new input reader
+ rcs.remove_controllable(in_zmq.get());
+ auto inputZeroMQReader = make_shared<InputZeroMQReader>();
+ inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued);
+ rcs.enrol(inputZeroMQReader.get());
+ inputReader = inputZeroMQReader;
+ }
#endif
- else if (dynamic_pointer_cast<InputTcpReader>(inputReader)) {
- // Keep the same inputReader, as there is no input buffer overflow
- run_again = true;
- }
- break;
- case run_modulator_state_t::reconfigure:
- etiLog.level(warn) << "Detected change in ensemble configuration.";
- /* We can keep the input in this care */
+ else if (dynamic_pointer_cast<InputTcpReader>(inputReader)) {
+ // Keep the same inputReader, as there is no input buffer overflow
run_again = true;
- break;
- case run_modulator_state_t::normal_end:
- default:
- etiLog.level(info) << "modulator stopped.";
- ret = 0;
- run_again = false;
- break;
- }
-
- etiLog.level(info) << m.framecount << " DAB frames encoded";
- etiLog.level(info) << ((float)m.framecount * 0.024f) << " seconds encoded";
+ }
+ else if (ediInput) {
+ // In EDI, keep the same input
+ run_again = true;
+ }
+ break;
+ case run_modulator_state_t::reconfigure:
+ etiLog.level(warn) << "Detected change in ensemble configuration.";
+ /* We can keep the input in this case */
+ run_again = true;
+ break;
+ case run_modulator_state_t::normal_end:
+ default:
+ etiLog.level(info) << "modulator stopped.";
+ ret = 0;
+ run_again = false;
+ break;
}
+
+ etiLog.level(info) << m.framecount << " DAB frames encoded";
+ etiLog.level(info) << ((float)m.framecount * 0.024f) << " seconds encoded";
}
etiLog.level(info) << "Terminating";
@@ -535,57 +471,142 @@ static run_modulator_state_t run_modulator(modulator_data& m)
{
auto ret = run_modulator_state_t::failure;
try {
- bool first_frame = true;
int last_eti_fct = -1;
auto last_frame_received = chrono::steady_clock::now();
+ Buffer data;
+ if (m.inputReader) {
+ data.setLength(6144);
+ }
while (running) {
- int framesize;
-
- PDEBUG("*****************************************\n");
- PDEBUG("* Starting main loop\n");
- PDEBUG("*****************************************\n");
- while ((framesize = m.inputReader->GetNextFrame(m.data.getData())) > 0) {
- if (!running) {
- break;
- }
+ while (true) {
+ unsigned fct = 0;
+ unsigned fp = 0;
+
+ /* Load ETI data from the source */
+ if (m.inputReader) {
+ int framesize = m.inputReader->GetNextFrame(data.getData());
+
+ if (framesize == 0) {
+ if (dynamic_pointer_cast<InputFileReader>(m.inputReader)) {
+ etiLog.level(info) << "End of file reached.";
+ running = 0;
+ ret = run_modulator_state_t::normal_end;
+ break;
+ }
+#if defined(HAVE_ZEROMQ)
+ else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) {
+ /* An empty frame marks a timeout. We ignore it, but we are
+ * now able to handle SIGINT properly.
+ *
+ * Also, we reconnect zmq every 10 seconds to avoid some
+ * issues, discussed in
+ * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection
+ *
+ * > It is possible that the PUB socket sees the error
+ * > while the SUB socket does not.
+ * >
+ * > The ZMTP RFC has a proposal for heartbeating that would
+ * > solve this problem. The current best solution is for
+ * > PUB sockets to send heartbeats (e.g. 1 per second) when
+ * > traffic is low, and for SUB sockets to disconnect /
+ * > reconnect if they stop getting these.
+ *
+ * We don't need a heartbeat, because our application is constant frame rate,
+ * the frames themselves can act as heartbeats.
+ */
+
+ const auto now = chrono::steady_clock::now();
+ if (last_frame_received + chrono::seconds(10) < now) {
+ throw zmq_input_timeout();
+ }
+ }
+#endif // defined(HAVE_ZEROMQ)
+ else if (dynamic_pointer_cast<InputTcpReader>(m.inputReader)) {
+ /* Same as for ZeroMQ */
+ }
+ else {
+ throw logic_error("Unhandled framesize==0!");
+ }
+ }
+ else if (framesize < 0) {
+ etiLog.level(error) << "Input read error.";
+ running = 0;
+ ret = run_modulator_state_t::normal_end;
+ break;
+ }
+
+ const int eti_bytes_read = m.etiReader->loadEtiData(data);
+ if ((size_t)eti_bytes_read != data.getLength()) {
+ etiLog.level(error) << "ETI frame incompletely read";
+ throw std::runtime_error("ETI read error");
+ }
- last_frame_received = chrono::steady_clock::now();
+ fct = m.etiReader->getFct();
+ fp = m.etiReader->getFp();
+ }
+ else if (m.ediInput) {
+ while (running and not m.ediInput->ediReader.isFrameReady()) {
+ try {
+ bool packet_received = m.ediInput->ediTransport.rxPacket();
+ if (packet_received) {
+ last_frame_received = chrono::steady_clock::now();
+ }
+ }
+ catch (const std::runtime_error& e) {
+ etiLog.level(warn) << "EDI input: " << e.what();
+ running = 0;
+ break;
+ }
- m.framecount++;
+ if (last_frame_received + chrono::seconds(10) < chrono::steady_clock::now()) {
+ etiLog.level(error) << "No EDI data received in 10 seconds.";
+ running = 0;
+ break;
+ }
+ }
- PDEBUG("*****************************************\n");
- PDEBUG("* Read frame %lu\n", m.framecount);
- PDEBUG("*****************************************\n");
+ if (!running) {
+ break;
+ }
- const int eti_bytes_read = m.etiReader->loadEtiData(m.data);
- if ((size_t)eti_bytes_read != m.data.getLength()) {
- etiLog.level(error) << "ETI frame incompletely read";
- throw std::runtime_error("ETI read error");
+ fct = m.ediInput->ediReader.getFct();
+ fp = m.ediInput->ediReader.getFp();
}
- if (first_frame) {
- if (m.etiReader->getFp() != 0) {
+ const unsigned expected_fct = (last_eti_fct + 1) % 250;
+ if (last_eti_fct == -1) {
+ if (fp != 0) {
// Do not start the flowgraph before we get to FP 0
// to ensure all blocks are properly aligned.
+ if (m.ediInput) {
+ m.ediInput->ediReader.clearFrame();
+ }
continue;
}
else {
- first_frame = false;
+ last_eti_fct = fct;
+ m.framecount++;
+ m.flowgraph->run();
}
}
-
- // Check for ETI FCT continuity
- const unsigned expected_fct = (last_eti_fct + 1) % 250;
- const unsigned fct = m.etiReader->getFct();
- if (last_eti_fct != -1 and expected_fct != fct) {
+ else if (fct == expected_fct) {
+ last_eti_fct = fct;
+ m.framecount++;
+ m.flowgraph->run();
+ }
+ else {
etiLog.level(info) << "ETI FCT discontinuity, expected " <<
- expected_fct << " received " << m.etiReader->getFct();
+ expected_fct << " received " << fct;
+ if (m.ediInput) {
+ m.ediInput->ediReader.clearFrame();
+ }
return run_modulator_state_t::again;
}
- last_eti_fct = fct;
- m.flowgraph->run();
+ if (m.ediInput) {
+ m.ediInput->ediReader.clearFrame();
+ }
/* Check every once in a while if the remote control
* is still working */
@@ -593,52 +614,6 @@ static run_modulator_state_t run_modulator(modulator_data& m)
rcs.check_faults();
}
}
- if (framesize == 0) {
- if (dynamic_pointer_cast<InputFileReader>(m.inputReader)) {
- etiLog.level(info) << "End of file reached.";
- running = 0;
- ret = run_modulator_state_t::normal_end;
- }
-#if defined(HAVE_ZEROMQ)
- else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) {
- /* An empty frame marks a timeout. We ignore it, but we are
- * now able to handle SIGINT properly.
- *
- * Also, we reconnect zmq every 10 seconds to avoid some
- * issues, discussed in
- * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection
- *
- * > It is possible that the PUB socket sees the error
- * > while the SUB socket does not.
- * >
- * > The ZMTP RFC has a proposal for heartbeating that would
- * > solve this problem. The current best solution is for
- * > PUB sockets to send heartbeats (e.g. 1 per second) when
- * > traffic is low, and for SUB sockets to disconnect /
- * > reconnect if they stop getting these.
- *
- * We don't need a heartbeat, because our application is constant frame rate,
- * the frames themselves can act as heartbeats.
- */
-
- const auto now = chrono::steady_clock::now();
- if (last_frame_received + chrono::seconds(10) < now) {
- throw zmq_input_timeout();
- }
- }
-#endif // defined(HAVE_ZEROMQ)
- else if (dynamic_pointer_cast<InputTcpReader>(m.inputReader)) {
- /* Same as for ZeroMQ */
- }
- else {
- throw logic_error("Unhandled framesize==0!");
- }
- }
- else {
- etiLog.level(error) << "Input read error.";
- running = 0;
- ret = run_modulator_state_t::normal_end;
- }
}
}
catch (const zmq_input_timeout&) {
diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp
index 7e3ccf0..aa4f2a8 100644
--- a/src/DabModulator.cpp
+++ b/src/DabModulator.cpp
@@ -132,7 +132,7 @@ int DabModulator::process(Buffer* dataOut)
const unsigned mode = m_settings.dabMode;
setMode(mode);
- myFlowgraph = make_shared<Flowgraph>();
+ myFlowgraph = make_shared<Flowgraph>(m_settings.showProcessTime);
////////////////////////////////////////////////////////////////
// CIF data initialisation
////////////////////////////////////////////////////////////////
diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp
index 94c362a..25c1ada 100644
--- a/src/EtiReader.cpp
+++ b/src/EtiReader.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2018
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -238,7 +238,7 @@ int EtiReader::loadEtiData(const Buffer& dataIn)
unsigned size = mySources[i]->framesize();
PDEBUG("Writting %i bytes of subchannel data\n", size);
Buffer subch(size, in);
- mySources[i]->loadSubchannelData(subch);
+ mySources[i]->loadSubchannelData(move(subch));
input_size -= size;
framesize -= size;
in += size;
@@ -312,7 +312,6 @@ uint32_t EtiReader::getPPSOffset()
return timestamp;
}
-#ifdef HAVE_EDI
EdiReader::EdiReader(
double& tist_offset_s) :
m_timestamp_decoder(tist_offset_s)
@@ -428,12 +427,12 @@ void EdiReader::update_fc_data(const EdiDecoder::eti_fc_data& fc_data)
m_fc_valid = true;
}
-void EdiReader::update_fic(const std::vector<uint8_t>& fic)
+void EdiReader::update_fic(std::vector<uint8_t>&& fic)
{
if (not m_proto_valid) {
throw std::logic_error("Cannot update FIC before protocol");
}
- m_fic = fic;
+ m_fic = move(fic);
}
void EdiReader::update_edi_time(
@@ -469,7 +468,7 @@ void EdiReader::update_rfu(uint16_t rfu)
m_rfu = rfu;
}
-void EdiReader::add_subchannel(const EdiDecoder::eti_stc_data& stc)
+void EdiReader::add_subchannel(EdiDecoder::eti_stc_data&& stc)
{
if (not m_proto_valid) {
throw std::logic_error("Cannot add subchannel before protocol");
@@ -485,7 +484,7 @@ void EdiReader::add_subchannel(const EdiDecoder::eti_stc_data& stc)
throw std::invalid_argument(
"EDI: MST data length inconsistent with FIC");
}
- source->loadSubchannelData(stc.mst);
+ source->loadSubchannelData(move(stc.mst));
if (m_sources.size() > 64) {
throw std::invalid_argument("Too many subchannels");
@@ -628,7 +627,10 @@ bool EdiTransport::rxPacket()
}
case Proto::TCP:
{
- m_tcpbuffer.resize(4096);
+ // The buffer size must be smaller than the size of two AF Packets, because otherwise
+ // the EDI decoder decodes two in a row and discards the first. This leads to ETI FCT
+ // discontinuity.
+ m_tcpbuffer.resize(512);
const int timeout_ms = 1000;
try {
ssize_t ret = m_tcpclient.recv(m_tcpbuffer.data(), m_tcpbuffer.size(), 0, timeout_ms);
@@ -644,11 +646,22 @@ bool EdiTransport::rxPacket()
return true;
}
}
- catch (const TCPSocket::Timeout&) {
+ catch (const Socket::TCPSocket::Timeout&) {
return false;
}
}
}
throw logic_error("Incomplete rxPacket implementation!");
}
-#endif // HAVE_EDI
+
+EdiInput::EdiInput(double& tist_offset_s, float edi_max_delay_ms) :
+ ediReader(tist_offset_s),
+ decoder(ediReader, false),
+ ediTransport(decoder)
+{
+ if (edi_max_delay_ms > 0.0f) {
+ // setMaxDelay wants number of AF packets, which correspond to 24ms ETI frames
+ decoder.setMaxDelay(lroundf(edi_max_delay_ms / 24.0f));
+ }
+}
+
diff --git a/src/EtiReader.h b/src/EtiReader.h
index 38f7903..28fb2ac 100644
--- a/src/EtiReader.h
+++ b/src/EtiReader.h
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2018
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -38,9 +38,6 @@
#include "SubchannelSource.h"
#include "TimestampDecoder.h"
#include "lib/edi/ETIDecoder.hpp"
-#ifdef HAVE_EDI
-# include "lib/UdpSocket.h"
-#endif
#include <vector>
#include <memory>
@@ -114,11 +111,10 @@ private:
std::vector<std::shared_ptr<SubchannelSource> > mySources;
};
-#ifdef HAVE_EDI
/* The EdiReader extracts the necessary data using the EDI input library in
* lib/edi
*/
-class EdiReader : public EtiSource, public EdiDecoder::DataCollector
+class EdiReader : public EtiSource, public EdiDecoder::ETIDataCollector
{
public:
EdiReader(double& tist_offset_s);
@@ -142,7 +138,7 @@ public:
// Update the data for the frame characterisation
virtual void update_fc_data(const EdiDecoder::eti_fc_data& fc_data);
- virtual void update_fic(const std::vector<uint8_t>& fic);
+ virtual void update_fic(std::vector<uint8_t>&& fic);
virtual void update_err(uint8_t err);
@@ -156,7 +152,7 @@ public:
virtual void update_rfu(uint16_t rfu);
- virtual void add_subchannel(const EdiDecoder::eti_stc_data& stc);
+ virtual void add_subchannel(EdiDecoder::eti_stc_data&& stc);
// Gets called by the EDI library to tell us that all data for a frame was given to us
virtual void assemble(void);
@@ -211,10 +207,18 @@ class EdiTransport {
enum class Proto { UDP, TCP };
Proto m_proto;
- UdpReceiver m_udp_rx;
+ Socket::UDPReceiver m_udp_rx;
std::vector<uint8_t> m_tcpbuffer;
- TCPClient m_tcpclient;
+ Socket::TCPClient m_tcpclient;
EdiDecoder::ETIDecoder& m_decoder;
};
-#endif
+
+// EdiInput wraps an EdiReader, an EdiDecoder::ETIDecoder and an EdiTransport
+class EdiInput {
+ public:
+ EdiInput(double& tist_offset_s, float edi_max_delay_ms);
+ EdiReader ediReader;
+ EdiDecoder::ETIDecoder decoder;
+ EdiTransport ediTransport;
+};
diff --git a/src/Flowgraph.cpp b/src/Flowgraph.cpp
index 4c83fe8..3d4cdcc 100644
--- a/src/Flowgraph.cpp
+++ b/src/Flowgraph.cpp
@@ -43,8 +43,7 @@ using EdgeIterator = std::vector<shared_ptr<Edge> >::iterator;
Node::Node(shared_ptr<ModPlugin> plugin) :
- myPlugin(plugin),
- myProcessTime(0)
+ myPlugin(plugin)
{
PDEBUG("Node::Node(plugin(%s): %p) @ %p\n",
plugin->name(), plugin.get(), this);
@@ -237,8 +236,8 @@ Edge::~Edge()
-Flowgraph::Flowgraph() :
- myProcessTime(0)
+Flowgraph::Flowgraph(bool showProcessTime) :
+ myShowProcessTime(showProcessTime)
{
PDEBUG("Flowgraph::Flowgraph() @ %p\n", this);
}
@@ -248,7 +247,7 @@ Flowgraph::~Flowgraph()
{
PDEBUG("Flowgraph::~Flowgraph() @ %p\n", this);
- if (myProcessTime) {
+ if (myShowProcessTime and myProcessTime) {
stringstream ss;
ss << "Process time:\n";
diff --git a/src/Flowgraph.h b/src/Flowgraph.h
index 389359b..753070b 100644
--- a/src/Flowgraph.h
+++ b/src/Flowgraph.h
@@ -71,7 +71,7 @@ protected:
#endif
std::shared_ptr<ModPlugin> myPlugin;
- time_t myProcessTime;
+ time_t myProcessTime = 0;
};
@@ -94,7 +94,7 @@ protected:
class Flowgraph
{
public:
- Flowgraph();
+ Flowgraph(bool showProcessTime);
virtual ~Flowgraph();
Flowgraph(const Flowgraph&) = delete;
Flowgraph& operator=(const Flowgraph&) = delete;
@@ -106,7 +106,8 @@ public:
protected:
std::vector<std::shared_ptr<Node> > nodes;
std::vector<std::shared_ptr<Edge> > edges;
- time_t myProcessTime;
+ time_t myProcessTime = 0;
+ bool myShowProcessTime;
};
diff --git a/src/GainControl.cpp b/src/GainControl.cpp
index 9cde8b1..b781640 100644
--- a/src/GainControl.cpp
+++ b/src/GainControl.cpp
@@ -28,8 +28,9 @@
#include "GainControl.h"
#include "PcDebug.h"
-#include <stdio.h>
+#include <cstdio>
#include <stdexcept>
+#include <algorithm>
#include <string>
#ifdef __SSE__
diff --git a/src/GuardIntervalInserter.cpp b/src/GuardIntervalInserter.cpp
index 6e1df4f..0cd5bd5 100644
--- a/src/GuardIntervalInserter.cpp
+++ b/src/GuardIntervalInserter.cpp
@@ -26,7 +26,8 @@
#include "GuardIntervalInserter.h"
#include "PcDebug.h"
-#include <string.h>
+#include <cstring>
+#include <cassert>
#include <stdexcept>
#include <complex>
#include <mutex>
diff --git a/src/InputReader.h b/src/InputReader.h
index 63451e5..ab45d4f 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -144,7 +144,7 @@ class InputTcpReader : public InputReader
virtual std::string GetPrintableInfo() const override;
private:
- TCPClient m_tcpclient;
+ Socket::TCPClient m_tcpclient;
std::string m_uri;
};
@@ -201,14 +201,6 @@ class InputZeroMQReader : public InputReader, public RemoteControllable
zmq::context_t m_zmqcontext; // is thread-safe
std::thread m_recv_thread;
-
- /* We must be careful to keep frame phase consistent. If we
- * drop a single ETI frame, we will break the transmission
- * frame vs. ETI frame phase.
- *
- * Here we keep track of how many ETI frames we must drop.
- */
- int m_to_drop = 0;
};
#endif
diff --git a/src/Log.cpp b/src/Log.cpp
deleted file mode 100644
index 4fc7ae3..0000000
--- a/src/Log.cpp
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
- Her Majesty the Queen in Right of Canada (Communications Research
- Center Canada)
-
- Copyright (C) 2018
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMod.
-
- ODR-DabMod is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMod is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include <list>
-#include <cstdarg>
-#include <cinttypes>
-#include <chrono>
-
-#include "Log.h"
-#include "Utils.h"
-
-using namespace std;
-
-/* This is called etiLog because it was copy-pasted from ODR-DabMux, even
- * though it doesn't make any more sense there than here.
- *
- * It is a singleton used in all parts of ODR-DabMod to output log messages.
- */
-Logger etiLog;
-
-void Logger::register_backend(std::shared_ptr<LogBackend> backend)
-{
- backends.push_back(backend);
-}
-
-
-void Logger::log(log_level_t lvl, const char* fmt, ...)
-{
- int size = 100;
- std::string str;
- va_list ap;
- while (1) {
- str.resize(size);
- va_start(ap, fmt);
- int n = vsnprintf((char *)str.c_str(), size, fmt, ap);
- va_end(ap);
- if (n > -1 && n < size) {
- str.resize(n);
- break;
- }
- if (n > -1)
- size = n + 1;
- else
- size *= 2;
- }
-
- logstr(lvl, move(str));
-}
-
-void Logger::logstr(log_level_t lvl, std::string&& message)
-{
- log_message_t m(lvl, move(message));
- m_message_queue.push(move(m));
-}
-
-void Logger::io_process()
-{
- set_thread_name("logger");
- while (1) {
- log_message_t m;
- try {
- m_message_queue.wait_and_pop(m);
- }
- catch (const ThreadsafeQueueWakeup&) {
- break;
- }
-
- auto message = m.message;
-
- /* Remove a potential trailing newline.
- * It doesn't look good in syslog
- */
- if (message[message.length()-1] == '\n') {
- message.resize(message.length()-1);
- }
-
- for (auto &backend : backends) {
- backend->log(m.level, message);
- }
-
- if (m.level != log_level_t::trace) {
- std::lock_guard<std::mutex> guard(m_cerr_mutex);
- std::cerr << levels_as_str[m.level] << " " << message << std::endl;
- }
- }
-}
-
-LogLine Logger::level(log_level_t lvl)
-{
- return LogLine(this, lvl);
-}
-
-LogToFile::LogToFile(const std::string& filename) : name("FILE")
-{
- FILE* fd = fopen(filename.c_str(), "a");
- if (fd == nullptr) {
- fprintf(stderr, "Cannot open log file !");
- throw std::runtime_error("Cannot open log file !");
- }
-
- log_file.reset(fd);
-}
-
-void LogToFile::log(log_level_t level, const std::string& message)
-{
- if (level != log_level_t::trace) {
- const char* log_level_text[] = {
- "DEBUG", "INFO", "WARN", "ERROR", "ALERT", "EMERG"};
-
- // fprintf is thread-safe
- fprintf(log_file.get(), SYSLOG_IDENT ": %s: %s\n",
- log_level_text[(size_t)level], message.c_str());
- fflush(log_file.get());
- }
-}
-
-void LogToSyslog::log(log_level_t level, const std::string& message)
-{
- if (level != log_level_t::trace) {
- int syslog_level = LOG_EMERG;
- switch (level) {
- case trace: break; // Do not handle TRACE in syslog
- case debug: syslog_level = LOG_DEBUG; break;
- case info: syslog_level = LOG_INFO; break;
- /* we don't have the notice level */
- case warn: syslog_level = LOG_WARNING; break;
- case error: syslog_level = LOG_ERR; break;
- default: syslog_level = LOG_CRIT; break;
- case alert: syslog_level = LOG_ALERT; break;
- case emerg: syslog_level = LOG_EMERG; break;
- }
-
- syslog(syslog_level, SYSLOG_IDENT " %s", message.c_str());
- }
-}
-
-LogTracer::LogTracer(const string& trace_filename) : name("TRACE")
-{
- etiLog.level(info) << "Setting up TRACE to " << trace_filename;
-
- FILE* fd = fopen(trace_filename.c_str(), "a");
- if (fd == nullptr) {
- fprintf(stderr, "Cannot open trace file !");
- throw std::runtime_error("Cannot open trace file !");
- }
- m_trace_file.reset(fd);
-
- using namespace std::chrono;
- auto now = steady_clock::now().time_since_epoch();
- m_trace_micros_startup = duration_cast<microseconds>(now).count();
-
- fprintf(m_trace_file.get(),
- "0,TRACER,startup at %" PRIu64 "\n", m_trace_micros_startup);
-}
-
-void LogTracer::log(log_level_t level, const std::string& message)
-{
- if (level == log_level_t::trace) {
- using namespace std::chrono;
- const auto now = steady_clock::now().time_since_epoch();
- const auto micros = duration_cast<microseconds>(now).count();
-
- fprintf(m_trace_file.get(), "%" PRIu64 ",%s\n",
- micros - m_trace_micros_startup,
- message.c_str());
- }
-}
diff --git a/src/Log.h b/src/Log.h
deleted file mode 100644
index 1253635..0000000
--- a/src/Log.h
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
- Her Majesty the Queen in Right of Canada (Communications Research
- Center Canada)
-
- Copyright (C) 2018
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMod.
-
- ODR-DabMod is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMod is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#include <stdarg.h>
-#include <stdio.h>
-#include <syslog.h>
-#include <fstream>
-#include <sstream>
-#include <iostream>
-#include <list>
-#include <stdexcept>
-#include <string>
-#include <map>
-#include <mutex>
-#include <memory>
-#include <thread>
-#include "ThreadsafeQueue.h"
-
-#define SYSLOG_IDENT "ODR-DabMod"
-#define SYSLOG_FACILITY LOG_LOCAL0
-
-enum log_level_t {debug = 0, info, warn, error, alert, emerg, trace};
-
-static const std::string levels_as_str[] =
- { " ", " ", "WARN ", "ERROR", "ALERT", "EMERG", "TRACE"} ;
-
-/** Abstract class all backends must inherit from */
-class LogBackend {
- public:
- virtual ~LogBackend() {};
- virtual void log(log_level_t level, const std::string& message) = 0;
- virtual std::string get_name() const = 0;
-};
-
-/** A Logging backend for Syslog */
-class LogToSyslog : public LogBackend {
- public:
- LogToSyslog() : name("SYSLOG") {
- openlog(SYSLOG_IDENT, LOG_PID, SYSLOG_FACILITY);
- }
-
- virtual ~LogToSyslog() {
- closelog();
- }
-
- void log(log_level_t level, const std::string& message);
-
- std::string get_name() const { return name; }
-
- private:
- const std::string name;
-
- LogToSyslog(const LogToSyslog& other) = delete;
- const LogToSyslog& operator=(const LogToSyslog& other) = delete;
-};
-
-class LogToFile : public LogBackend {
- public:
- LogToFile(const std::string& filename);
- void log(log_level_t level, const std::string& message);
- std::string get_name() const { return name; }
-
- private:
- const std::string name;
-
- struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}};
- std::unique_ptr<FILE, FILEDeleter> log_file;
-
- LogToFile(const LogToFile& other) = delete;
- const LogToFile& operator=(const LogToFile& other) = delete;
-};
-
-class LogTracer : public LogBackend {
- public:
- LogTracer(const std::string& filename);
- void log(log_level_t level, const std::string& message);
- std::string get_name() const { return name; }
- private:
- std::string name;
- uint64_t m_trace_micros_startup = 0;
-
- struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}};
- std::unique_ptr<FILE, FILEDeleter> m_trace_file;
-
- LogTracer(const LogTracer& other) = delete;
- const LogTracer& operator=(const LogTracer& other) = delete;
-};
-
-class LogLine;
-
-struct log_message_t {
- log_message_t(log_level_t _level, std::string&& _message) :
- level(_level),
- message(move(_message)) {}
-
- log_message_t() :
- level(debug),
- message("") {}
-
- log_level_t level;
- std::string message;
-};
-
-class Logger {
- public:
- Logger() {
- m_io_thread = std::thread(&Logger::io_process, this);
- }
-
- Logger(const Logger& other) = delete;
- const Logger& operator=(const Logger& other) = delete;
- ~Logger() {
- m_message_queue.trigger_wakeup();
- m_io_thread.join();
- }
-
- void register_backend(std::shared_ptr<LogBackend> backend);
-
- /* Log the message to all backends */
- void log(log_level_t level, const char* fmt, ...);
-
- void logstr(log_level_t level, std::string&& message);
-
- /* All logging IO is done in another thread */
- void io_process(void);
-
- /* Return a LogLine for the given level
- * so that you can write etiLog.level(info) << "stuff = " << 21 */
- LogLine level(log_level_t level);
-
- private:
- std::list<std::shared_ptr<LogBackend> > backends;
-
- ThreadsafeQueue<log_message_t> m_message_queue;
- std::thread m_io_thread;
- std::mutex m_cerr_mutex;
-};
-
-extern Logger etiLog;
-
-// Accumulate a line of logs, using same syntax as stringstream
-// The line is logged when the LogLine gets destroyed
-class LogLine {
- public:
- LogLine(const LogLine& logline);
- const LogLine& operator=(const LogLine& other) = delete;
- LogLine(Logger* logger, log_level_t level) :
- logger_(logger)
- {
- level_ = level;
- }
-
- // Push the new element into the stringstream
- template <typename T>
- LogLine& operator<<(T s) {
- os << s;
- return *this;
- }
-
- ~LogLine()
- {
- logger_->logstr(level_, os.str());
- }
-
- private:
- std::ostringstream os;
- log_level_t level_;
- Logger* logger_;
-};
-
diff --git a/src/MemlessPoly.cpp b/src/MemlessPoly.cpp
index ef77c07..905ca67 100644
--- a/src/MemlessPoly.cpp
+++ b/src/MemlessPoly.cpp
@@ -36,7 +36,8 @@
#include "PcDebug.h"
#include "Utils.h"
-#include <stdio.h>
+#include <cstdio>
+#include <cstring>
#include <stdexcept>
#include <string>
#include <future>
diff --git a/src/MemlessPoly.h b/src/MemlessPoly.h
index 43fdb2d..4642596 100644
--- a/src/MemlessPoly.h
+++ b/src/MemlessPoly.h
@@ -30,20 +30,20 @@
# include <config.h>
#endif
-
#include "RemoteControl.h"
#include "ModPlugin.h"
#include "PcDebug.h"
#include "ThreadsafeQueue.h"
#include <sys/types.h>
+#include <array>
#include <complex>
+#include <memory>
+#include <string>
#include <thread>
#include <vector>
-#include <time.h>
+#include <ctime>
#include <cstdio>
-#include <string>
-#include <memory>
#define MEMLESSPOLY_PIPELINE_DELAY 1
diff --git a/src/OutputFile.cpp b/src/OutputFile.cpp
index 46a9ec9..acaebad 100644
--- a/src/OutputFile.cpp
+++ b/src/OutputFile.cpp
@@ -27,12 +27,12 @@
#include "OutputFile.h"
#include "PcDebug.h"
#include "Log.h"
-#include "TimestampDecoder.h"
#include <string>
-#include <assert.h>
+#include <chrono>
#include <stdexcept>
-
+#include <cassert>
+#include <cmath>
using namespace std;
@@ -53,7 +53,6 @@ OutputFile::OutputFile(const std::string& filename, bool show_metadata) :
myFile.reset(fd);
}
-
int OutputFile::process(Buffer* dataIn)
{
PDEBUG("OutputFile::process(%p)\n", dataIn);
@@ -72,8 +71,18 @@ meta_vec_t OutputFile::process_metadata(const meta_vec_t& metadataIn)
if (myShowMetadata) {
stringstream ss;
+ frame_timestamp first_ts;
+
for (const auto& md : metadataIn) {
if (md.ts) {
+ // The following code assumes TM I, where we get called every 96ms.
+ // Support for other transmission modes skipped because this is mostly
+ // debugging code.
+
+ if (md.ts->fp == 0 or md.ts->fp == 4) {
+ first_ts = *md.ts;
+ }
+
ss << " FCT=" << md.ts->fct <<
" FP=" << (int)md.ts->fp;
if (md.ts->timestamp_valid) {
@@ -90,13 +99,46 @@ meta_vec_t OutputFile::process_metadata(const meta_vec_t& metadataIn)
}
}
- if (metadataIn.empty()) {
- etiLog.level(debug) << "Output File got no mdIn";
+ if (myLastTimestamp.timestamp_valid) {
+ if (first_ts.timestamp_valid) {
+ uint32_t timestamp = myLastTimestamp.timestamp_pps;
+ timestamp += 96 << 14; // Shift 96ms by 14 to Timestamp level 2
+ if (timestamp > 0xf9FFff) {
+ timestamp -= 0xfa0000; // Substract 16384000, corresponding to one second
+ myLastTimestamp.timestamp_sec += 1;
+ }
+ myLastTimestamp.timestamp_pps = timestamp;
+
+ if (myLastTimestamp.timestamp_sec != first_ts.timestamp_sec or
+ myLastTimestamp.timestamp_pps != first_ts.timestamp_pps) {
+ ss << " TS wrong interval; ";
+ }
+ myLastTimestamp = first_ts;
+ }
+ else {
+ ss << " TS of FP=0 MISSING; ";
+ myLastTimestamp.timestamp_valid = false;
+ }
}
else {
- etiLog.level(debug) << "Output File got metadata: " << ss.str();
+ // Includes invalid and valid cases
+ myLastTimestamp = first_ts;
}
+ if (metadataIn.empty()) {
+ etiLog.level(debug) << "Output File got no metadata";
+ }
+ else {
+ using namespace std::chrono;
+ const auto now = system_clock::now();
+ const int64_t ticks_now = duration_cast<milliseconds>(now.time_since_epoch()).count();
+ //const int64_t first_ts_ticks = first_ts.timestamp_sec * 1000 + first_ts.timestamp_pps / 16384;
+ const int64_t first_ts_ticks = std::llrint(first_ts.get_real_secs() * 1000);
+
+ ss << " DELTA: " << first_ts_ticks - ticks_now << "ms;";
+
+ etiLog.level(debug) << "Output File metadata: " << ss.str();
+ }
}
return {};
}
diff --git a/src/OutputFile.h b/src/OutputFile.h
index 745e672..b10d406 100644
--- a/src/OutputFile.h
+++ b/src/OutputFile.h
@@ -33,10 +33,11 @@
#include "ModPlugin.h"
#include "EtiReader.h"
+#include "TimestampDecoder.h"
#include <string>
-#include <stdio.h>
-#include <sys/types.h>
+#include <cstdio>
+#include <cstdint>
#include <memory>
class OutputFile : public ModOutput, public ModMetadata
@@ -52,6 +53,7 @@ public:
protected:
bool myShowMetadata = false;
+ frame_timestamp myLastTimestamp;
std::string myFilename;
struct FILEDeleter{ void operator()(FILE* fd){ if (fd) fclose(fd); }};
diff --git a/src/OutputMemory.cpp b/src/OutputMemory.cpp
index 5f24095..d6ef917 100644
--- a/src/OutputMemory.cpp
+++ b/src/OutputMemory.cpp
@@ -27,6 +27,7 @@
#include "OutputMemory.h"
#include "PcDebug.h"
#include "Log.h"
+#include "TimestampDecoder.h"
#include <stdexcept>
#include <string.h>
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp
deleted file mode 100644
index 1065456..0000000
--- a/src/RemoteControl.cpp
+++ /dev/null
@@ -1,588 +0,0 @@
-/*
- Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012
- Her Majesty the Queen in Right of Canada (Communications Research
- Center Canada)
-
- Copyright (C) 2019
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMod.
-
- ODR-DabMod is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMod is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
- */
-#include <list>
-#include <string>
-#include <iostream>
-#include <string>
-#include <thread>
-#include <functional>
-#include <asio.hpp>
-#include "RemoteControl.h"
-
-using asio::ip::tcp;
-
-using namespace std;
-
-RemoteControllers rcs;
-
-RemoteControllerTelnet::~RemoteControllerTelnet()
-{
- m_active = false;
- m_io_service.stop();
-
- if (m_restarter_thread.joinable()) {
- m_restarter_thread.join();
- }
-
- if (m_child_thread.joinable()) {
- m_child_thread.join();
- }
-}
-
-void RemoteControllerTelnet::restart()
-{
- if (m_restarter_thread.joinable()) {
- m_restarter_thread.join();
- }
-
- m_restarter_thread = std::thread(
- &RemoteControllerTelnet::restart_thread,
- this, 0);
-}
-
-RemoteControllable::~RemoteControllable() {
- rcs.remove_controllable(this);
-}
-
-std::list<std::string> RemoteControllable::get_supported_parameters() const {
- std::list<std::string> parameterlist;
- for (const auto& param : m_parameters) {
- parameterlist.push_back(param[0]);
- }
- return parameterlist;
-}
-
-void RemoteControllers::add_controller(std::shared_ptr<BaseRemoteController> rc) {
- m_controllers.push_back(rc);
-}
-
-void RemoteControllers::enrol(RemoteControllable *rc) {
- controllables.push_back(rc);
-}
-
-void RemoteControllers::remove_controllable(RemoteControllable *rc) {
- controllables.remove(rc);
-}
-
-std::list< std::vector<std::string> > RemoteControllers::get_param_list_values(const std::string& name) {
- RemoteControllable* controllable = get_controllable_(name);
-
- std::list< std::vector<std::string> > allparams;
- for (auto &param : controllable->get_supported_parameters()) {
- std::vector<std::string> item;
- item.push_back(param);
- try {
- item.push_back(controllable->get_parameter(param));
- }
- catch (const ParameterError &e) {
- item.push_back(std::string("error: ") + e.what());
- }
-
- allparams.push_back(item);
- }
- return allparams;
-}
-
-std::string RemoteControllers::get_param(const std::string& name, const std::string& param) {
- RemoteControllable* controllable = get_controllable_(name);
- return controllable->get_parameter(param);
-}
-
-void RemoteControllers::check_faults() {
- for (auto &controller : m_controllers) {
- if (controller->fault_detected()) {
- etiLog.level(warn) <<
- "Detected Remote Control fault, restarting it";
- controller->restart();
- }
- }
-}
-
-RemoteControllable* RemoteControllers::get_controllable_(const std::string& name)
-{
- auto rc = std::find_if(controllables.begin(), controllables.end(),
- [&](RemoteControllable* r) { return r->get_rc_name() == name; });
-
- if (rc == controllables.end()) {
- throw ParameterError("Module name unknown");
- }
- else {
- return *rc;
- }
-}
-
-void RemoteControllers::set_param(
- const std::string& name,
- const std::string& param,
- const std::string& value)
-{
- RemoteControllable* controllable = get_controllable_(name);
- try {
- return controllable->set_parameter(param, value);
- }
- catch (const ios_base::failure& e) {
- etiLog.level(info) << "RC: Failed to set " << name << " " << param
- << " to " << value << ": " << e.what();
- throw ParameterError("Cannot understand value");
- }
-}
-
-// This runs in a separate thread, because
-// it would take too long to be done in the main loop
-// thread.
-void RemoteControllerTelnet::restart_thread(long)
-{
- m_active = false;
- m_io_service.stop();
-
- if (m_child_thread.joinable()) {
- m_child_thread.join();
- }
-
- m_child_thread = std::thread(&RemoteControllerTelnet::process, this, 0);
-}
-
-void RemoteControllerTelnet::handle_accept(
- std::shared_ptr<asio::ip::tcp::socket> socket,
- const asio::error_code& asio_error)
-{
-
- const std::string welcome = "ODR-DabMod Remote Control CLI\n"
- "Write 'help' for help.\n"
- "**********\n";
- const std::string prompt = "> ";
-
- std::string in_message;
- size_t length;
-
- if (asio_error) {
- etiLog.level(error) << "RC: Error accepting connection";
- return;
- }
-
- try {
- etiLog.level(info) << "RC: Accepted";
-
- asio::error_code ignored_error;
-
- asio::write(*socket, asio::buffer(welcome),
- asio::transfer_all(),
- ignored_error);
-
- while (m_active && in_message != "quit") {
- asio::write(*socket, asio::buffer(prompt),
- asio::transfer_all(),
- ignored_error);
-
- in_message = "";
-
- asio::streambuf buffer;
- length = asio::read_until(*socket, buffer, "\n", ignored_error);
-
- std::istream str(&buffer);
- std::getline(str, in_message);
-
- if (length == 0) {
- etiLog.level(info) << "RC: Connection terminated";
- break;
- }
-
- while (in_message.length() > 0 &&
- (in_message[in_message.length()-1] == '\r' ||
- in_message[in_message.length()-1] == '\n')) {
- in_message.erase(in_message.length()-1, 1);
- }
-
- if (in_message.length() == 0) {
- continue;
- }
-
- etiLog.level(info) << "RC: Got message '" << in_message << "'";
-
- dispatch_command(*socket, in_message);
- }
- etiLog.level(info) << "RC: Closing socket";
- socket->close();
- }
- catch (const std::exception& e)
- {
- etiLog.level(error) << "Remote control caught exception: " << e.what();
- }
-}
-
-void RemoteControllerTelnet::process(long)
-{
- m_active = true;
-
- while (m_active) {
- m_io_service.reset();
-
- tcp::acceptor acceptor(m_io_service, tcp::endpoint(
- asio::ip::address::from_string("127.0.0.1"), m_port) );
-
- // Add a job to start accepting connections.
- auto socket = make_shared<tcp::socket>(acceptor.get_io_service());
-
- // Add an accept call to the service. This will prevent io_service::run()
- // from returning.
- etiLog.level(info) << "RC: Waiting for connection on port " << m_port;
- acceptor.async_accept(*socket,
- bind(&RemoteControllerTelnet::handle_accept, this,
- socket,
- std::placeholders::_1));
-
- // Process event loop.
- m_io_service.run();
- }
-
- etiLog.level(info) << "RC: Leaving";
- m_fault = true;
-}
-
-static std::vector<std::string> tokenise(const std::string& message) {
- stringstream ss(message);
- std::vector<std::string> all_tokens;
- std::string item;
-
- while (std::getline(ss, item, ' ')) {
- all_tokens.push_back(move(item));
- }
- return all_tokens;
-}
-
-
-void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string command)
-{
- vector<string> cmd = tokenise(command);
-
- if (cmd[0] == "help") {
- reply(socket,
- "The following commands are supported:\n"
- " list\n"
- " * Lists the modules that are loaded and their parameters\n"
- " show MODULE\n"
- " * Lists all parameters and their values from module MODULE\n"
- " get MODULE PARAMETER\n"
- " * Gets the value for the specified PARAMETER from module MODULE\n"
- " set MODULE PARAMETER VALUE\n"
- " * Sets the value for the PARAMETER ofr module MODULE\n"
- " quit\n"
- " * Terminate this session\n"
- "\n");
- }
- else if (cmd[0] == "list") {
- stringstream ss;
-
- if (cmd.size() == 1) {
- for (auto &controllable : rcs.controllables) {
- ss << controllable->get_rc_name() << endl;
-
- list< vector<string> > params = controllable->get_parameter_descriptions();
- for (auto &param : params) {
- ss << "\t" << param[0] << " : " << param[1] << endl;
- }
- }
- }
- else {
- reply(socket, "Too many arguments for command 'list'");
- }
-
- reply(socket, ss.str());
- }
- else if (cmd[0] == "show") {
- if (cmd.size() == 2) {
- try {
- stringstream ss;
- list< vector<string> > r = rcs.get_param_list_values(cmd[1]);
- for (auto &param_val : r) {
- ss << param_val[0] << ": " << param_val[1] << endl;
- }
- reply(socket, ss.str());
-
- }
- catch (const ParameterError &e) {
- reply(socket, e.what());
- }
- }
- else {
- reply(socket, "Incorrect parameters for command 'show'");
- }
- }
- else if (cmd[0] == "get") {
- if (cmd.size() == 3) {
- try {
- string r = rcs.get_param(cmd[1], cmd[2]);
- reply(socket, r);
- }
- catch (const ParameterError &e) {
- reply(socket, e.what());
- }
- }
- else {
- reply(socket, "Incorrect parameters for command 'get'");
- }
- }
- else if (cmd[0] == "set") {
- if (cmd.size() >= 4) {
- try {
- stringstream new_param_value;
- for (size_t i = 3; i < cmd.size(); i++) {
- new_param_value << cmd[i];
-
- if (i+1 < cmd.size()) {
- new_param_value << " ";
- }
- }
-
- rcs.set_param(cmd[1], cmd[2], new_param_value.str());
- reply(socket, "ok");
- }
- catch (const ParameterError &e) {
- reply(socket, e.what());
- }
- catch (const exception &e) {
- reply(socket, "Error: Invalid parameter value. ");
- }
- }
- else {
- reply(socket, "Incorrect parameters for command 'set'");
- }
- }
- else if (cmd[0] == "quit") {
- reply(socket, "Goodbye");
- }
- else {
- reply(socket, "Message not understood");
- }
-}
-
-void RemoteControllerTelnet::reply(tcp::socket& socket, string message)
-{
- asio::error_code ignored_error;
- stringstream ss;
- ss << message << "\r\n";
- asio::write(socket, asio::buffer(ss.str()),
- asio::transfer_all(),
- ignored_error);
-}
-
-#if defined(HAVE_ZEROMQ)
-
-RemoteControllerZmq::~RemoteControllerZmq() {
- m_active = false;
- m_fault = false;
-
- if (m_restarter_thread.joinable()) {
- m_restarter_thread.join();
- }
-
- if (m_child_thread.joinable()) {
- m_child_thread.join();
- }
-}
-
-void RemoteControllerZmq::restart()
-{
- if (m_restarter_thread.joinable()) {
- m_restarter_thread.join();
- }
-
- m_restarter_thread = std::thread(&RemoteControllerZmq::restart_thread, this);
-}
-
-// This runs in a separate thread, because
-// it would take too long to be done in the main loop
-// thread.
-void RemoteControllerZmq::restart_thread()
-{
- m_active = false;
-
- if (m_child_thread.joinable()) {
- m_child_thread.join();
- }
-
- m_child_thread = std::thread(&RemoteControllerZmq::process, this);
-}
-
-void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector<std::string> &message)
-{
- bool more = true;
- do {
- zmq::message_t msg;
- pSocket.recv(&msg);
- std::string incoming((char*)msg.data(), msg.size());
- message.push_back(incoming);
- more = msg.more();
- } while (more);
-}
-
-void RemoteControllerZmq::send_ok_reply(zmq::socket_t &pSocket)
-{
- zmq::message_t msg(2);
- char repCode[2] = {'o', 'k'};
- memcpy ((void*) msg.data(), repCode, 2);
- pSocket.send(msg, 0);
-}
-
-void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::string &error)
-{
- zmq::message_t msg1(4);
- char repCode[4] = {'f', 'a', 'i', 'l'};
- memcpy ((void*) msg1.data(), repCode, 4);
- pSocket.send(msg1, ZMQ_SNDMORE);
-
- zmq::message_t msg2(error.length());
- memcpy ((void*) msg2.data(), error.c_str(), error.length());
- pSocket.send(msg2, 0);
-}
-
-void RemoteControllerZmq::process()
-{
- m_fault = false;
-
- // create zmq reply socket for receiving ctrl parameters
- try {
- zmq::socket_t repSocket(m_zmqContext, ZMQ_REP);
-
- // connect the socket
- int hwm = 100;
- int linger = 0;
- repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
- repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
- repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
- repSocket.bind(m_endpoint.c_str());
-
- // create pollitem that polls the ZMQ sockets
- zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} };
- while (m_active) {
- zmq::poll(pollItems, 1, 100);
- std::vector<std::string> msg;
-
- if (pollItems[0].revents & ZMQ_POLLIN) {
- recv_all(repSocket, msg);
-
- std::string command((char*)msg[0].data(), msg[0].size());
-
- if (msg.size() == 1 && command == "ping") {
- send_ok_reply(repSocket);
- }
- else if (msg.size() == 1 && command == "list") {
- size_t cohort_size = rcs.controllables.size();
- for (auto &controllable : rcs.controllables) {
- std::stringstream ss;
- ss << "{ \"name\": \"" << controllable->get_rc_name() << "\"," <<
- " \"params\": { ";
-
- list< vector<string> > params = controllable->get_parameter_descriptions();
- size_t i = 0;
- for (auto &param : params) {
- if (i > 0) {
- ss << ", ";
- }
-
- ss << "\"" << param[0] << "\": " <<
- "\"" << param[1] << "\"";
-
- i++;
- }
-
- ss << " } }";
-
- std::string msg_s = ss.str();
-
- zmq::message_t zmsg(ss.str().size());
- memcpy ((void*) zmsg.data(), msg_s.data(), msg_s.size());
-
- int flag = (--cohort_size > 0) ? ZMQ_SNDMORE : 0;
- repSocket.send(zmsg, flag);
- }
- }
- else if (msg.size() == 2 && command == "show") {
- std::string module((char*) msg[1].data(), msg[1].size());
- try {
- list< vector<string> > r = rcs.get_param_list_values(module);
- size_t r_size = r.size();
- for (auto &param_val : r) {
- std::stringstream ss;
- ss << param_val[0] << ": " << param_val[1] << endl;
- zmq::message_t zmsg(ss.str().size());
- memcpy(zmsg.data(), ss.str().data(), ss.str().size());
-
- int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0;
- repSocket.send(zmsg, flag);
- }
- }
- catch (const ParameterError &err) {
- send_fail_reply(repSocket, err.what());
- }
- }
- else if (msg.size() == 3 && command == "get") {
- std::string module((char*) msg[1].data(), msg[1].size());
- std::string parameter((char*) msg[2].data(), msg[2].size());
-
- try {
- std::string value = rcs.get_param(module, parameter);
- zmq::message_t zmsg(value.size());
- memcpy ((void*) zmsg.data(), value.data(), value.size());
- repSocket.send(zmsg, 0);
- }
- catch (const ParameterError &err) {
- send_fail_reply(repSocket, err.what());
- }
- }
- else if (msg.size() == 4 && command == "set") {
- std::string module((char*) msg[1].data(), msg[1].size());
- std::string parameter((char*) msg[2].data(), msg[2].size());
- std::string value((char*) msg[3].data(), msg[3].size());
-
- try {
- rcs.set_param(module, parameter, value);
- send_ok_reply(repSocket);
- }
- catch (const ParameterError &err) {
- send_fail_reply(repSocket, err.what());
- }
- }
- else {
- send_fail_reply(repSocket,
- "Unsupported command. commands: list, show, get, set");
- }
- }
- }
- repSocket.close();
- }
- catch (const zmq::error_t &e) {
- etiLog.level(error) << "ZMQ RC error: " << std::string(e.what());
- }
- catch (const std::exception& e) {
- etiLog.level(error) << "ZMQ RC caught exception: " << e.what();
- m_fault = true;
- }
-}
-
-#endif
-
diff --git a/src/RemoteControl.h b/src/RemoteControl.h
deleted file mode 100644
index 087b94a..0000000
--- a/src/RemoteControl.h
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012
- Her Majesty the Queen in Right of Canada (Communications Research
- Center Canada)
-
- Copyright (C) 2019
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
-
- This module adds remote-control capability to some of the dabmod modules.
- */
-/*
- This file is part of ODR-DabMod.
-
- ODR-DabMod is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMod is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#if defined(HAVE_ZEROMQ)
-# include "zmq.hpp"
-#endif
-
-#include <list>
-#include <map>
-#include <memory>
-#include <string>
-#include <atomic>
-#include <iostream>
-#include <thread>
-#include <asio.hpp>
-#include <stdexcept>
-
-#include "Log.h"
-
-#define RC_ADD_PARAMETER(p, desc) { \
- std::vector<std::string> p; \
- p.push_back(#p); \
- p.push_back(desc); \
- m_parameters.push_back(p); \
-}
-
-class ParameterError : public std::exception
-{
- public:
- ParameterError(std::string message) : m_message(message) {}
- ~ParameterError() throw() {}
- const char* what() const throw() { return m_message.c_str(); }
-
- private:
- std::string m_message;
-};
-
-class RemoteControllable;
-
-/* Remote controllers (that recieve orders from the user)
- * must implement BaseRemoteController
- */
-class BaseRemoteController {
- public:
- /* When this returns one, the remote controller cannot be
- * used anymore, and must be restarted by DabMod
- */
- virtual bool fault_detected() = 0;
-
- /* In case of a fault, the remote controller can be
- * restarted.
- */
- virtual void restart() = 0;
-
- virtual ~BaseRemoteController() {}
-};
-
-/* Objects that support remote control must implement the following class */
-class RemoteControllable {
- public:
- RemoteControllable(const std::string& name) :
- m_rc_name(name) {}
-
- RemoteControllable(const RemoteControllable& other) = delete;
- RemoteControllable& operator=(const RemoteControllable& other) = delete;
-
- virtual ~RemoteControllable();
-
- /* return a short name used to identify the controllable.
- * It might be used in the commands the user has to type, so keep
- * it short
- */
- virtual std::string get_rc_name() const { return m_rc_name; }
-
- /* Return a list of possible parameters that can be set */
- virtual std::list<std::string> get_supported_parameters() const;
-
- /* Return a mapping of the descriptions of all parameters */
- virtual std::list< std::vector<std::string> >
- get_parameter_descriptions() const
- {
- return m_parameters;
- }
-
- /* Base function to set parameters. */
- virtual void set_parameter(
- const std::string& parameter,
- const std::string& value) = 0;
-
- /* Getting a parameter always returns a string. */
- virtual const std::string get_parameter(const std::string& parameter) const = 0;
-
- protected:
- std::string m_rc_name;
- std::list< std::vector<std::string> > m_parameters;
-};
-
-/* Holds all our remote controllers and controlled object.
- */
-class RemoteControllers {
- public:
- void add_controller(std::shared_ptr<BaseRemoteController> rc);
- void enrol(RemoteControllable *rc);
- void remove_controllable(RemoteControllable *rc);
- void check_faults();
- std::list< std::vector<std::string> > get_param_list_values(const std::string& name);
- std::string get_param(const std::string& name, const std::string& param);
-
- void set_param(
- const std::string& name,
- const std::string& param,
- const std::string& value);
-
- std::list<RemoteControllable*> controllables;
-
- private:
- RemoteControllable* get_controllable_(const std::string& name);
-
- std::list<std::shared_ptr<BaseRemoteController> > m_controllers;
-};
-
-extern RemoteControllers rcs;
-
-/* Implements a Remote controller based on a simple telnet CLI
- * that listens on localhost
- */
-class RemoteControllerTelnet : public BaseRemoteController {
- public:
- RemoteControllerTelnet()
- : m_active(false),
- m_io_service(),
- m_fault(false),
- m_port(0) { }
-
- RemoteControllerTelnet(int port)
- : m_active(port > 0),
- m_io_service(),
- m_fault(false),
- m_port(port)
- {
- restart();
- }
-
-
- RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other) = delete;
- RemoteControllerTelnet(const RemoteControllerTelnet& other) = delete;
-
- ~RemoteControllerTelnet();
-
- virtual bool fault_detected() { return m_fault; }
-
- virtual void restart();
-
- private:
- void restart_thread(long);
-
- void process(long);
-
- void dispatch_command(asio::ip::tcp::socket& socket,
- std::string command);
-
- void reply(asio::ip::tcp::socket& socket, std::string message);
-
- void handle_accept(
- std::shared_ptr<asio::ip::tcp::socket> socket,
- const asio::error_code& asio_error);
-
- std::atomic<bool> m_active;
-
- asio::io_service m_io_service;
-
- /* This is set to true if a fault occurred */
- std::atomic<bool> m_fault;
- std::thread m_restarter_thread;
-
- std::thread m_child_thread;
-
- int m_port;
-};
-
-#if defined(HAVE_ZEROMQ)
-/* Implements a Remote controller using zmq transportlayer
- * that listens on localhost
- */
-class RemoteControllerZmq : public BaseRemoteController {
- public:
- RemoteControllerZmq()
- : m_active(false), m_fault(false),
- m_zmqContext(1),
- m_endpoint("") { }
-
- RemoteControllerZmq(const std::string& endpoint)
- : m_active(not endpoint.empty()), m_fault(false),
- m_zmqContext(1),
- m_endpoint(endpoint),
- m_child_thread(&RemoteControllerZmq::process, this) { }
-
- RemoteControllerZmq& operator=(const RemoteControllerZmq& other) = delete;
- RemoteControllerZmq(const RemoteControllerZmq& other) = delete;
-
- ~RemoteControllerZmq();
-
- virtual bool fault_detected() { return m_fault; }
-
- virtual void restart();
-
- private:
- void restart_thread();
-
- void recv_all(zmq::socket_t &pSocket, std::vector<std::string> &message);
- void send_ok_reply(zmq::socket_t &pSocket);
- void send_fail_reply(zmq::socket_t &pSocket, const std::string &error);
- void process();
-
- std::atomic<bool> m_active;
-
- /* This is set to true if a fault occurred */
- std::atomic<bool> m_fault;
- std::thread m_restarter_thread;
-
- zmq::context_t m_zmqContext;
-
- std::string m_endpoint;
- std::thread m_child_thread;
-};
-#endif
-
diff --git a/src/Socket.cpp b/src/Socket.cpp
deleted file mode 100644
index 08cda68..0000000
--- a/src/Socket.cpp
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2018
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://opendigitalradio.org
-*/
-
-/*
- This file is part of ODR-DabMod.
-
- ODR-DabMod is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMod is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "Socket.h"
-#include "Log.h"
-#include <fcntl.h>
-
-TCPSocket::TCPSocket()
-{
- if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
- throw std::runtime_error("Can't create TCP socket");
- }
-
-#if defined(HAVE_SO_NOSIGPIPE)
- int val = 1;
- if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE,
- &val, sizeof(val)) < 0) {
- throw std::runtime_error("Can't set SO_NOSIGPIPE");
- }
-#endif
-}
-
-TCPSocket::~TCPSocket()
-{
- if (m_sock != -1) {
- ::close(m_sock);
- }
-}
-
-TCPSocket::TCPSocket(TCPSocket&& other)
-{
- m_sock = other.m_sock;
-
- if (other.m_sock != -1) {
- other.m_sock = -1;
- }
-}
-
-TCPSocket& TCPSocket::operator=(TCPSocket&& other)
-{
- m_sock = other.m_sock;
-
- if (other.m_sock != -1) {
- other.m_sock = -1;
- }
-
- return *this;
-}
-
-bool TCPSocket::valid() const
-{
- return m_sock != -1;
-}
-
-void TCPSocket::connect(const std::string& hostname, int port)
-{
- struct sockaddr_in addr;
- addr.sin_family = PF_INET;
- addr.sin_addr.s_addr = htons(INADDR_ANY);
- addr.sin_port = htons(port);
-
- hostent *host = gethostbyname(hostname.c_str());
- if (host) {
- addr.sin_addr = *(in_addr *)(host->h_addr);
- }
- else {
- std::string errstr(strerror(errno));
- throw std::runtime_error(
- "could not resolve hostname " +
- hostname + ":" + std::to_string(port) +
- " : " + errstr);
- }
-
- int ret = ::connect(m_sock, (struct sockaddr*)&addr, sizeof(addr));
- if (ret == -1 and errno != EINPROGRESS) {
- std::string errstr(strerror(errno));
- throw std::runtime_error(
- "could not connect to " +
- hostname + ":" + std::to_string(port) +
- " : " + errstr);
- }
-}
-
-void TCPSocket::listen(int port)
-{
- struct sockaddr_in addr;
- addr.sin_family = AF_INET;
- addr.sin_port = htons(port);
- addr.sin_addr.s_addr = htonl(INADDR_ANY);
-
- const int reuse = 1;
- if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR,
- &reuse, sizeof(reuse)) < 0) {
- throw std::runtime_error("Can't reuse address for TCP socket");
- }
-
- if (::bind(m_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
- close();
- throw std::runtime_error("Can't bind TCP socket");
- }
-
- if (::listen(m_sock, 1) < 0) {
- close();
- m_sock = -1;
- throw std::runtime_error("Can't listen TCP socket");
- }
-
-}
-
-void TCPSocket::close()
-{
- ::close(m_sock);
- m_sock = -1;
-}
-
-TCPSocket TCPSocket::accept_with_timeout(int timeout_ms, struct sockaddr_in *client)
-{
- struct pollfd fds[1];
- fds[0].fd = m_sock;
- fds[0].events = POLLIN;
-
- int retval = poll(fds, 1, timeout_ms);
-
- if (retval == -1) {
- std::string errstr(strerror(errno));
- throw std::runtime_error("TCP Socket accept error: " + errstr);
- }
- else if (retval > 0) {
- socklen_t client_len = sizeof(struct sockaddr_in);
- int sockfd = accept(m_sock, (struct sockaddr*)&client, &client_len);
- TCPSocket s(sockfd);
- return s;
- }
- else {
- TCPSocket s(-1);
- return s;
- }
-}
-
-ssize_t TCPSocket::sendall(const void *buffer, size_t buflen)
-{
- uint8_t *buf = (uint8_t*)buffer;
- while (buflen > 0) {
- /* On Linux, the MSG_NOSIGNAL flag ensures that the process
- * would not receive a SIGPIPE and die.
- * Other systems have SO_NOSIGPIPE set on the socket for the
- * same effect. */
-#if defined(HAVE_MSG_NOSIGNAL)
- const int flags = MSG_NOSIGNAL;
-#else
- const int flags = 0;
-#endif
- ssize_t sent = ::send(m_sock, buf, buflen, flags);
- if (sent < 0) {
- return -1;
- }
- else {
- buf += sent;
- buflen -= sent;
- }
- }
- return buflen;
-}
-
-ssize_t TCPSocket::recv(void *buffer, size_t length, int flags)
-{
- ssize_t ret = ::recv(m_sock, buffer, length, flags);
- if (ret == -1) {
- std::string errstr(strerror(errno));
- throw std::runtime_error("TCP receive error: " + errstr);
- }
- return ret;
-}
-
-ssize_t TCPSocket::recv(void *buffer, size_t length, int flags, int timeout_ms)
-{
- struct pollfd fds[1];
- fds[0].fd = m_sock;
- fds[0].events = POLLIN;
-
- int retval = poll(fds, 1, timeout_ms);
-
- if (retval == -1 and errno == EINTR) {
- throw Interrupted();
- }
- else if (retval == -1) {
- std::string errstr(strerror(errno));
- throw std::runtime_error("TCP receive with poll() error: " + errstr);
- }
- else if (retval > 0 and (fds[0].revents | POLLIN)) {
- ssize_t ret = ::recv(m_sock, buffer, length, flags);
- if (ret == -1) {
- if (errno == ECONNREFUSED) {
- return 0;
- }
- std::string errstr(strerror(errno));
- throw std::runtime_error("TCP receive after poll() error: " + errstr);
- }
- return ret;
- }
- else {
- throw Timeout();
- }
-}
-
-TCPSocket::TCPSocket(int sockfd) {
- m_sock = sockfd;
-}
-
-void TCPClient::connect(const std::string& hostname, int port)
-{
- m_hostname = hostname;
- m_port = port;
- reconnect();
-}
-
-ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms)
-{
- try {
- ssize_t ret = m_sock.recv(buffer, length, flags, timeout_ms);
-
- if (ret == 0) {
- m_sock.close();
-
- TCPSocket newsock;
- m_sock = std::move(newsock);
- reconnect();
- }
-
- return ret;
- }
- catch (const TCPSocket::Interrupted&) {
- return -1;
- }
- catch (const TCPSocket::Timeout&) {
- return 0;
- }
-
- return 0;
-}
-
-void TCPClient::reconnect()
-{
- int flags = fcntl(m_sock.m_sock, F_GETFL);
- if (fcntl(m_sock.m_sock, F_SETFL, flags | O_NONBLOCK) == -1) {
- std::string errstr(strerror(errno));
- throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr);
- }
-
- m_sock.connect(m_hostname, m_port);
-}
diff --git a/src/Socket.h b/src/Socket.h
deleted file mode 100644
index 14c5cbe..0000000
--- a/src/Socket.h
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2018
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://opendigitalradio.org
-
-DESCRIPTION:
- Abstraction for sockets.
-*/
-
-/*
- This file is part of ODR-DabMod.
-
- ODR-DabMod is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMod is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#ifdef HAVE_CONFIG_H
-# include <config.h>
-#endif
-
-#include <stdexcept>
-#include <string>
-#include <cstdint>
-#include <cstring>
-#include <unistd.h>
-#include <errno.h>
-#include <poll.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-
-class TCPSocket {
- public:
- TCPSocket();
- ~TCPSocket();
- TCPSocket(const TCPSocket& other) = delete;
- TCPSocket& operator=(const TCPSocket& other) = delete;
- TCPSocket(TCPSocket&& other);
- TCPSocket& operator=(TCPSocket&& other);
-
- bool valid(void) const;
- void connect(const std::string& hostname, int port);
- void listen(int port);
- void close(void);
-
- /* throws a runtime_error on failure, an invalid socket on timeout */
- TCPSocket accept_with_timeout(int timeout_ms, struct sockaddr_in *client);
-
- /* returns -1 on error */
- ssize_t sendall(const void *buffer, size_t buflen);
-
- /* Returns number of bytes read, 0 on disconnect. Throws a
- * runtime_error on error */
- ssize_t recv(void *buffer, size_t length, int flags);
-
- class Timeout {};
- class Interrupted {};
- /* Returns number of bytes read, 0 on disconnect or refused connection.
- * Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error
- * on error
- */
- ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
-
- private:
- explicit TCPSocket(int sockfd);
- int m_sock = -1;
-
- friend class TCPClient;
-};
-
-/* Implement a TCP receiver that auto-reconnects on errors */
-class TCPClient {
- public:
- void connect(const std::string& hostname, int port);
-
- /* Returns numer of bytes read, 0 on auto-reconnect, -1
- * on interruption.
- * Throws a runtime_error on error */
- ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
-
- private:
- void reconnect(void);
- TCPSocket m_sock;
- std::string m_hostname;
- int m_port;
-};
-
diff --git a/src/SubchannelSource.cpp b/src/SubchannelSource.cpp
index b4d6750..443b5d2 100644
--- a/src/SubchannelSource.cpp
+++ b/src/SubchannelSource.cpp
@@ -1046,9 +1046,9 @@ size_t SubchannelSource::protectionOption() const
return 0;
}
-void SubchannelSource::loadSubchannelData(const Buffer& data)
+void SubchannelSource::loadSubchannelData(Buffer&& data)
{
- d_buffer = data;
+ d_buffer = std::move(data);
}
int SubchannelSource::process(Buffer* outputData)
diff --git a/src/SubchannelSource.h b/src/SubchannelSource.h
index b4ca697..68e6ff2 100644
--- a/src/SubchannelSource.h
+++ b/src/SubchannelSource.h
@@ -59,7 +59,7 @@ public:
size_t protectionOption() const;
const std::vector<PuncturingRule>& get_rules() const;
- void loadSubchannelData(const Buffer& data);
+ void loadSubchannelData(Buffer&& data);
int process(Buffer* outputData);
const char* name() { return "SubchannelSource"; }
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
deleted file mode 100644
index ee26ca0..0000000
--- a/src/ThreadsafeQueue.h
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
- Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2018
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- An implementation for a threadsafe queue, depends on C++11
-
- When creating a ThreadsafeQueue, one can specify the minimal number
- of elements it must contain before it is possible to take one
- element out.
- */
-/*
- This file is part of ODR-DabMod.
-
- ODR-DabMod is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMod is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#include <mutex>
-#include <condition_variable>
-#include <queue>
-#include <utility>
-
-/* This queue is meant to be used by two threads. One producer
- * that pushes elements into the queue, and one consumer that
- * retrieves the elements.
- *
- * The queue can make the consumer block until an element
- * is available, or a wakeup requested.
- */
-
-/* Class thrown by blocking pop to tell the consumer
- * that there's a wakeup requested. */
-class ThreadsafeQueueWakeup {};
-
-template<typename T>
-class ThreadsafeQueue
-{
-public:
- /* Push one element into the queue, and notify another thread that
- * might be waiting.
- *
- * returns the new queue size.
- */
- size_t push(T const& val)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- the_queue.push(val);
- size_t queue_size = the_queue.size();
- lock.unlock();
-
- the_rx_notification.notify_one();
-
- return queue_size;
- }
-
- size_t push(T&& val)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- the_queue.emplace(std::move(val));
- size_t queue_size = the_queue.size();
- lock.unlock();
-
- the_rx_notification.notify_one();
-
- return queue_size;
- }
-
- /* Push one element into the queue, but wait until the
- * queue size goes below the threshold.
- *
- * Notify waiting thread.
- *
- * returns the new queue size.
- */
- size_t push_wait_if_full(T const& val, size_t threshold)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- while (the_queue.size() >= threshold) {
- the_tx_notification.wait(lock);
- }
- the_queue.push(val);
- size_t queue_size = the_queue.size();
- lock.unlock();
-
- the_rx_notification.notify_one();
-
- return queue_size;
- }
-
- /* Trigger a wakeup event on a blocking consumer, which
- * will receive a ThreadsafeQueueWakeup exception.
- */
- void trigger_wakeup(void)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- wakeup_requested = true;
- lock.unlock();
- the_rx_notification.notify_one();
- }
-
- /* Send a notification for the receiver thread */
- void notify(void)
- {
- the_rx_notification.notify_one();
- }
-
- bool empty() const
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- return the_queue.empty();
- }
-
- size_t size() const
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- return the_queue.size();
- }
-
- bool try_pop(T& popped_value)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- if (the_queue.empty()) {
- return false;
- }
-
- popped_value = the_queue.front();
- the_queue.pop();
-
- lock.unlock();
- the_tx_notification.notify_one();
-
- return true;
- }
-
- void wait_and_pop(T& popped_value, size_t prebuffering = 1)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- while (the_queue.size() < prebuffering and
- not wakeup_requested) {
- the_rx_notification.wait(lock);
- }
-
- if (wakeup_requested) {
- wakeup_requested = false;
- throw ThreadsafeQueueWakeup();
- }
- else {
- std::swap(popped_value, the_queue.front());
- the_queue.pop();
-
- lock.unlock();
- the_tx_notification.notify_one();
- }
- }
-
-private:
- std::queue<T> the_queue;
- mutable std::mutex the_mutex;
- std::condition_variable the_rx_notification;
- std::condition_variable the_tx_notification;
- bool wakeup_requested = false;
-};
-
diff --git a/src/TimestampDecoder.cpp b/src/TimestampDecoder.cpp
index a561237..7f1fb6e 100644
--- a/src/TimestampDecoder.cpp
+++ b/src/TimestampDecoder.cpp
@@ -27,6 +27,7 @@
#include <iostream>
#include <fstream>
#include <string>
+#include <cstring>
#include "PcDebug.h"
#include "TimestampDecoder.h"
#include "Log.h"
diff --git a/src/Utils.cpp b/src/Utils.cpp
index fd4f659..f39c4c9 100644
--- a/src/Utils.cpp
+++ b/src/Utils.cpp
@@ -46,9 +46,6 @@ static void printHeader()
#if defined(HAVE_ZEROMQ)
"zeromq " <<
#endif
-#ifdef HAVE_EDI
- "EDI " <<
-#endif
#if defined(HAVE_OUTPUT_UHD)
"output_uhd " <<
#endif
diff --git a/src/output/Feedback.cpp b/src/output/Feedback.cpp
index 17e45bf..88d8319 100644
--- a/src/output/Feedback.cpp
+++ b/src/output/Feedback.cpp
@@ -200,14 +200,13 @@ void DPDFeedbackServer::ReceiveBurstThread()
void DPDFeedbackServer::ServeFeedback()
{
- TCPSocket m_server_sock;
- m_server_sock.listen(m_port);
+ Socket::TCPSocket m_server_sock;
+ m_server_sock.listen(m_port, "127.0.0.1");
etiLog.level(info) << "DPD Feedback server listening on port " << m_port;
while (m_running) {
- struct sockaddr_in client;
- TCPSocket client_sock = m_server_sock.accept_with_timeout(1000, &client);
+ auto client_sock = m_server_sock.accept(1000);
if (not m_running) {
break;
diff --git a/src/output/SDR.cpp b/src/output/SDR.cpp
index 01312ff..ad65c1c 100644
--- a/src/output/SDR.cpp
+++ b/src/output/SDR.cpp
@@ -314,7 +314,7 @@ void SDR::handle_frame(struct FrameData& frame)
}
if (expected_sec != tx_second or expected_pps != tx_pps) {
- etiLog.level(warn) << "OutputSDR: timestamp irregularity!" <<
+ etiLog.level(warn) << "OutputSDR: timestamp irregularity at FCT=" << frame.ts.fct <<
std::fixed <<
" Expected " <<
expected_sec << "+" << (double)expected_pps/16384000.0 <<
@@ -337,7 +337,7 @@ void SDR::handle_frame(struct FrameData& frame)
if (time_spec.get_real_secs() + tx_timeout < device_time) {
etiLog.level(warn) <<
- "OutputSDR: Timestamp in the past! offset: " <<
+ "OutputSDR: Timestamp in the past at FCT=" << frame.ts.fct << " offset: " <<
std::fixed <<
time_spec.get_real_secs() - device_time <<
" (" << device_time << ")"
@@ -349,7 +349,7 @@ void SDR::handle_frame(struct FrameData& frame)
if (time_spec.get_real_secs() > device_time + TIMESTAMP_ABORT_FUTURE) {
etiLog.level(error) <<
- "OutputSDR: Timestamp way too far in the future! offset: " <<
+ "OutputSDR: Timestamp way too far in the future at FCT=" << frame.ts.fct << " offset: " <<
std::fixed <<
time_spec.get_real_secs() - device_time;
throw std::runtime_error("Timestamp error. Aborted.");
@@ -358,7 +358,7 @@ void SDR::handle_frame(struct FrameData& frame)
if (m_config.muting) {
etiLog.log(info,
- "OutputSDR: Muting sample %d requested\n",
+ "OutputSDR: Muting FCT=%d requested",
frame.ts.fct);
return;
}