aboutsummaryrefslogtreecommitdiffstats
path: root/src/DabMod.cpp
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2023-11-21 22:12:14 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2023-11-21 22:12:14 +0100
commit5fe36a627405b8fc65bdb212a6d505b9a6c8e489 (patch)
tree8646d654e0467be8603c5ad37fb6cd89656dfd26 /src/DabMod.cpp
parent477ac4639a7c7f74f07a6164096fc7de102528ff (diff)
parentf84065c3cc6fff0edb771f85190f7228f4d740b6 (diff)
downloaddabmod-5fe36a627405b8fc65bdb212a6d505b9a6c8e489.tar.gz
dabmod-5fe36a627405b8fc65bdb212a6d505b9a6c8e489.tar.bz2
dabmod-5fe36a627405b8fc65bdb212a6d505b9a6c8e489.zip
Merge branch 'dexter' into next
Diffstat (limited to 'src/DabMod.cpp')
-rw-r--r--src/DabMod.cpp447
1 files changed, 246 insertions, 201 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index f97c05d..e0e8a5b 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2019
+ Copyright (C) 2023
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -25,6 +25,7 @@
along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <fftw3.h>
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
@@ -46,6 +47,7 @@
# include <netinet/in.h>
#endif
+#include "Events.h"
#include "Utils.h"
#include "Log.h"
#include "DabModulator.h"
@@ -56,6 +58,7 @@
#include "output/SDR.h"
#include "output/UHD.h"
#include "output/Soapy.h"
+#include "output/Dexter.h"
#include "output/Lime.h"
#include "output/BladeRF.h"
#include "OutputZeroMQ.h"
@@ -93,18 +96,147 @@ void signalHandler(int signalNb)
running = 0;
}
-struct modulator_data
-{
- // For ETI
- std::shared_ptr<InputReader> inputReader;
- std::shared_ptr<EtiReader> etiReader;
+class ModulatorData : public RemoteControllable {
+ public:
+ // For ETI
+ std::shared_ptr<InputReader> inputReader;
+ std::shared_ptr<EtiReader> etiReader;
+
+ // For EDI
+ std::shared_ptr<EdiInput> ediInput;
+
+ // Common to both EDI and EDI
+ uint64_t framecount = 0;
+ Flowgraph *flowgraph = nullptr;
+
+
+ // RC-related
+ ModulatorData() : RemoteControllable("mainloop") {
+ RC_ADD_PARAMETER(num_modulator_restarts, "(Read-only) Number of mod restarts");
+ RC_ADD_PARAMETER(most_recent_edi_decoded, "(Read-only) UNIX Timestamp of most recently decoded EDI frame");
+ RC_ADD_PARAMETER(edi_source, "(Read-only) URL of the EDI/TCP source");
+ RC_ADD_PARAMETER(running_since, "(Read-only) UNIX Timestamp of most recent modulator restart");
+ RC_ADD_PARAMETER(ensemble_label, "(Read-only) Label of the ensemble");
+ RC_ADD_PARAMETER(ensemble_eid, "(Read-only) Ensemble ID");
+ RC_ADD_PARAMETER(ensemble_services, "(Read-only, only JSON) Ensemble service information");
+ RC_ADD_PARAMETER(num_services, "(Read-only) Number of services in the ensemble");
+ }
- // For EDI
- std::shared_ptr<EdiInput> ediInput;
+ virtual ~ModulatorData() {}
+
+ virtual void set_parameter(const std::string& parameter, const std::string& value) {
+ throw ParameterError("Parameter " + parameter + " is read-only");
+ }
- // Common to both EDI and EDI
- uint64_t framecount = 0;
- Flowgraph *flowgraph = nullptr;
+ virtual const std::string get_parameter(const std::string& parameter) const {
+ stringstream ss;
+ if (parameter == "num_modulator_restarts") {
+ ss << num_modulator_restarts;
+ }
+ if (parameter == "running_since") {
+ ss << running_since;
+ }
+ else if (parameter == "most_recent_edi_decoded") {
+ ss << most_recent_edi_decoded;
+ }
+ else if (parameter == "ensemble_label") {
+ if (ediInput) {
+ const auto ens = ediInput->ediReader.getEnsembleInfo();
+ if (ens) {
+ ss << FICDecoder::ConvertLabelToUTF8(ens->label, nullptr);
+ }
+ else {
+ throw ParameterError("Not available yet");
+ }
+ }
+ else {
+ throw ParameterError("Not available yet");
+ }
+ }
+ else if (parameter == "ensemble_eid") {
+ if (ediInput) {
+ const auto ens = ediInput->ediReader.getEnsembleInfo();
+ if (ens) {
+ ss << ens->eid;
+ }
+ else {
+ throw ParameterError("Not available yet");
+ }
+ }
+ else {
+ throw ParameterError("Not available yet");
+ }
+ }
+ else if (parameter == "edi_source") {
+ if (ediInput) {
+ ss << ediInput->ediTransport.getTcpUri();
+ }
+ else {
+ throw ParameterError("Not available yet");
+ }
+ }
+ else if (parameter == "num_services") {
+ if (ediInput) {
+ ss << ediInput->ediReader.getSubchannels().size();
+ }
+ else {
+ throw ParameterError("Not available yet");
+ }
+ }
+ else if (parameter == "ensemble_services") {
+ throw ParameterError("ensemble_services is only available through 'showjson'");
+ }
+ else {
+ ss << "Parameter '" << parameter <<
+ "' is not exported by controllable " << get_rc_name();
+ throw ParameterError(ss.str());
+ }
+ return ss.str();
+ }
+
+ virtual const json::map_t get_all_values() const
+ {
+ json::map_t map;
+ map["num_modulator_restarts"].v = num_modulator_restarts;
+ map["running_since"].v = running_since;
+ map["most_recent_edi_decoded"].v = most_recent_edi_decoded;
+
+ if (ediInput) {
+ map["edi_source"].v = ediInput->ediTransport.getTcpUri();
+ map["num_services"].v = ediInput->ediReader.getSubchannels().size();
+
+ const auto ens = ediInput->ediReader.getEnsembleInfo();
+ if (ens) {
+ map["ensemble_label"].v = FICDecoder::ConvertLabelToUTF8(ens->label, nullptr);
+ map["ensemble_eid"].v = ens->eid;
+ }
+ else {
+ map["ensemble_label"].v = nullopt;
+ map["ensemble_eid"].v = nullopt;
+ }
+
+ std::vector<json::value_t> services;
+
+ for (const auto& s : ediInput->ediReader.getServiceInfo()) {
+ auto service_map = make_shared<json::map_t>();
+ (*service_map)["sad"].v = s.second.subchannel.start;
+ (*service_map)["sid"].v = s.second.sid;
+ (*service_map)["label"].v = FICDecoder::ConvertLabelToUTF8(s.second.label, nullptr);
+ (*service_map)["bitrate"].v = s.second.subchannel.bitrate;
+ json::value_t v;
+ v.v = service_map;
+ services.push_back(v);
+ }
+
+ map["ensemble_services"].v = services;
+
+ }
+ return map;
+ }
+
+ size_t num_modulator_restarts = 0;
+ time_t most_recent_edi_decoded = 0;
+ time_t running_since = 0;
};
enum class run_modulator_state_t {
@@ -114,86 +246,10 @@ enum class run_modulator_state_t {
reconfigure // Some sort of change of configuration we cannot handle happened
};
-static run_modulator_state_t run_modulator(modulator_data& m);
+static run_modulator_state_t run_modulator(const mod_settings_t& mod_settings, ModulatorData& m);
-static void printModSettings(const mod_settings_t& mod_settings)
-{
- stringstream ss;
- // Print settings
- ss << "Input\n";
- ss << " Type: " << mod_settings.inputTransport << "\n";
- ss << " Source: " << mod_settings.inputName << "\n";
-
- ss << "Output\n";
- if (mod_settings.useFileOutput) {
- ss << " Name: " << mod_settings.outputName << "\n";
- }
-#if defined(HAVE_OUTPUT_UHD)
- else if (mod_settings.useUHDOutput) {
- ss << " UHD\n" <<
- " Device: " << mod_settings.sdr_device_config.device << "\n" <<
- " Subdevice: " <<
- mod_settings.sdr_device_config.subDevice << "\n" <<
- " master_clock_rate: " <<
- mod_settings.sdr_device_config.masterClockRate << "\n" <<
- " refclk: " <<
- mod_settings.sdr_device_config.refclk_src << "\n" <<
- " pps source: " <<
- mod_settings.sdr_device_config.pps_src << "\n";
- }
-#endif
-#if defined(HAVE_SOAPYSDR)
- else if (mod_settings.useSoapyOutput) {
- ss << " SoapySDR\n"
- " Device: " << mod_settings.sdr_device_config.device << "\n" <<
- " master_clock_rate: " <<
- mod_settings.sdr_device_config.masterClockRate << "\n";
- }
-#endif
-#if defined(HAVE_LIMESDR)
- else if (mod_settings.useLimeOutput) {
- ss << " LimeSDR\n"
- " Device: " << mod_settings.sdr_device_config.device << "\n" <<
- " master_clock_rate: " <<
- mod_settings.sdr_device_config.masterClockRate << "\n";
- }
-#endif
-#if defined(HAVE_BLADERF)
- else if (mod_settings.useBladeRFOutput) {
- ss << " BladeRF\n"
- " Device: " << mod_settings.sdr_device_config.device << "\n" <<
- " refclk: " << mod_settings.sdr_device_config.refclk_src << "\n";
- }
-#endif
- else if (mod_settings.useZeroMQOutput) {
- ss << " ZeroMQ\n" <<
- " Listening on: " << mod_settings.outputName << "\n" <<
- " Socket type : " << mod_settings.zmqOutputSocketType << "\n";
- }
-
- ss << " Sampling rate: ";
- if (mod_settings.outputRate > 1000) {
- if (mod_settings.outputRate > 1000000) {
- ss << std::fixed << std::setprecision(4) <<
- mod_settings.outputRate / 1000000.0 <<
- " MHz\n";
- }
- else {
- ss << std::fixed << std::setprecision(4) <<
- mod_settings.outputRate / 1000.0 <<
- " kHz\n";
- }
- }
- else {
- ss << std::fixed << std::setprecision(4) <<
- mod_settings.outputRate << " Hz\n";
- }
- fprintf(stderr, "%s", ss.str().c_str());
-}
-
-static shared_ptr<ModOutput> prepare_output(
- mod_settings_t& s)
+static shared_ptr<ModOutput> prepare_output(mod_settings_t& s)
{
shared_ptr<ModOutput> output;
@@ -249,6 +305,16 @@ static shared_ptr<ModOutput> prepare_output(
rcs.enrol((Output::SDR*)output.get());
}
#endif
+#if defined(HAVE_DEXTER)
+ else if (s.useDexterOutput) {
+ /* We normalise specifically range [-32768; 32767] */
+ s.normalise = 32767.0f / normalise_factor;
+ s.sdr_device_config.sampleRate = s.outputRate;
+ auto dexterdevice = make_shared<Output::Dexter>(s.sdr_device_config);
+ output = make_shared<Output::SDR>(s.sdr_device_config, dexterdevice);
+ rcs.enrol((Output::SDR*)output.get());
+ }
+#endif
#if defined(HAVE_LIMESDR)
else if (s.useLimeOutput) {
/* We normalise the same way as for the UHD output */
@@ -308,6 +374,8 @@ int launch_modulator(int argc, char* argv[])
mod_settings_t mod_settings;
parse_args(argc, argv, mod_settings);
+ etiLog.register_backend(make_shared<LogToEventSender>());
+
etiLog.level(info) << "Configuration parsed. Starting up version " <<
#if defined(GITVERSION)
GITVERSION;
@@ -319,6 +387,7 @@ int launch_modulator(int argc, char* argv[])
mod_settings.useUHDOutput or
mod_settings.useZeroMQOutput or
mod_settings.useSoapyOutput or
+ mod_settings.useDexterOutput or
mod_settings.useLimeOutput or
mod_settings.useBladeRFOutput)) {
throw std::runtime_error("Configuration error: Output not specified");
@@ -326,19 +395,49 @@ int launch_modulator(int argc, char* argv[])
printModSettings(mod_settings);
- shared_ptr<FormatConverter> format_converter;
+ ModulatorData m;
+ rcs.enrol(&m);
+
+ {
+ // This is mostly useful on ARM systems where FFTW planning takes some time. If we do it here
+ // it will be done before the modulator starts up
+ etiLog.level(debug) << "Running FFTW planning...";
+ constexpr size_t fft_size = 2048; // Transmission Mode I. If different, it'll recalculate on OfdmGenerator
+ // initialisation
+ auto *fft_in = (fftwf_complex*)fftwf_malloc(sizeof(fftwf_complex) * fft_size);
+ auto *fft_out = (fftwf_complex*)fftwf_malloc(sizeof(fftwf_complex) * fft_size);
+ if (fft_in == nullptr or fft_out == nullptr) {
+ throw std::runtime_error("FFTW malloc failed");
+ }
+ fftwf_set_timelimit(2);
+ fftwf_plan plan = fftwf_plan_dft_1d(fft_size, fft_in, fft_out, FFTW_FORWARD, FFTW_MEASURE);
+ fftwf_destroy_plan(plan);
+ plan = fftwf_plan_dft_1d(fft_size, fft_in, fft_out, FFTW_BACKWARD, FFTW_MEASURE);
+ fftwf_destroy_plan(plan);
+ fftwf_free(fft_in);
+ fftwf_free(fft_out);
+ etiLog.level(debug) << "FFTW planning done.";
+ }
+
+ std::string output_format;
if (mod_settings.useFileOutput and
(mod_settings.fileOutputFormat == "s8" or
mod_settings.fileOutputFormat == "u8" or
mod_settings.fileOutputFormat == "s16")) {
- format_converter = make_shared<FormatConverter>(mod_settings.fileOutputFormat);
+ output_format = mod_settings.fileOutputFormat;
+ }
+ else if (mod_settings.useBladeRFOutput or mod_settings.useDexterOutput) {
+ output_format = "s16";
}
- else if (mod_settings.useBladeRFOutput) {
- format_converter = make_shared<FormatConverter>(mod_settings.BladeRFOutputFormat);
- }
auto output = prepare_output(mod_settings);
+ if (not output_format.empty()) {
+ if (auto o = dynamic_pointer_cast<Output::SDR>(output)) {
+ o->set_sample_size(FormatConverter::get_format_size(output_format));
+ }
+ }
+
// Set thread priority to realtime
if (int r = set_realtime_prio(1)) {
etiLog.level(error) << "Could not set priority for modulator:" << r;
@@ -365,17 +464,6 @@ int launch_modulator(int argc, char* argv[])
inputReader = inputFileReader;
}
- else if (mod_settings.inputTransport == "zeromq") {
-#if !defined(HAVE_ZEROMQ)
- throw std::runtime_error("Unable to open input: "
- "ZeroMQ input transport selected, but not compiled in!");
-#else
- auto inputZeroMQReader = make_shared<InputZeroMQReader>();
- inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued);
- rcs.enrol(inputZeroMQReader.get());
- inputReader = inputZeroMQReader;
-#endif
- }
else if (mod_settings.inputTransport == "tcp") {
auto inputTcpReader = make_shared<InputTcpReader>();
inputTcpReader->Open(mod_settings.inputName);
@@ -386,40 +474,37 @@ int launch_modulator(int argc, char* argv[])
"invalid input transport " + mod_settings.inputTransport + " selected!");
}
+ m.ediInput = ediInput;
+ m.inputReader = inputReader;
+
bool run_again = true;
while (run_again) {
+ m.running_since = get_clock_realtime_seconds();
+
Flowgraph flowgraph(mod_settings.showProcessTime);
- modulator_data m;
- m.ediInput = ediInput;
- m.inputReader = inputReader;
+ m.framecount = 0;
m.flowgraph = &flowgraph;
shared_ptr<DabModulator> modulator;
if (inputReader) {
m.etiReader = make_shared<EtiReader>(mod_settings.tist_offset_s);
- modulator = make_shared<DabModulator>(*m.etiReader, mod_settings);
+ modulator = make_shared<DabModulator>(*m.etiReader, mod_settings, output_format);
}
else if (ediInput) {
- modulator = make_shared<DabModulator>(ediInput->ediReader, mod_settings);
+ modulator = make_shared<DabModulator>(ediInput->ediReader, mod_settings, output_format);
}
rcs.enrol(modulator.get());
- if (format_converter) {
- flowgraph.connect(modulator, format_converter);
- flowgraph.connect(format_converter, output);
- }
- else {
- flowgraph.connect(modulator, output);
- }
+ flowgraph.connect(modulator, output);
if (inputReader) {
etiLog.level(info) << inputReader->GetPrintableInfo();
}
- run_modulator_state_t st = run_modulator(m);
+ run_modulator_state_t st = run_modulator(mod_settings, m);
etiLog.log(trace, "DABMOD,run_modulator() = %d", st);
switch (st) {
@@ -440,17 +525,6 @@ int launch_modulator(int argc, char* argv[])
run_again = true;
}
}
-#if defined(HAVE_ZEROMQ)
- else if (auto in_zmq = dynamic_pointer_cast<InputZeroMQReader>(inputReader)) {
- run_again = true;
- // Create a new input reader
- rcs.remove_controllable(in_zmq.get());
- auto inputZeroMQReader = make_shared<InputZeroMQReader>();
- inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued);
- rcs.enrol(inputZeroMQReader.get());
- inputReader = inputZeroMQReader;
- }
-#endif
else if (dynamic_pointer_cast<InputTcpReader>(inputReader)) {
// Keep the same inputReader, as there is no input buffer overflow
run_again = true;
@@ -473,28 +547,21 @@ int launch_modulator(int argc, char* argv[])
break;
}
- etiLog.level(info) << m.framecount << " DAB frames encoded";
- etiLog.level(info) << ((float)m.framecount * 0.024f) << " seconds encoded";
+ etiLog.level(info) << m.framecount << " DAB frames, " << ((float)m.framecount * 0.024f) << " seconds encoded";
+ m.num_modulator_restarts++;
}
etiLog.level(info) << "Terminating";
return ret;
}
-struct zmq_input_timeout : public std::exception
-{
- const char* what() const throw()
- {
- return "InputZMQ timeout";
- }
-};
-
-static run_modulator_state_t run_modulator(modulator_data& m)
+static run_modulator_state_t run_modulator(const mod_settings_t& mod_settings, ModulatorData& m)
{
auto ret = run_modulator_state_t::failure;
try {
int last_eti_fct = -1;
auto last_frame_received = chrono::steady_clock::now();
+ frame_timestamp ts;
Buffer data;
if (m.inputReader) {
data.setLength(6144);
@@ -515,36 +582,9 @@ static run_modulator_state_t run_modulator(modulator_data& m)
ret = run_modulator_state_t::normal_end;
break;
}
-#if defined(HAVE_ZEROMQ)
- else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) {
- /* An empty frame marks a timeout. We ignore it, but we are
- * now able to handle SIGINT properly.
- *
- * Also, we reconnect zmq every 10 seconds to avoid some
- * issues, discussed in
- * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection
- *
- * > It is possible that the PUB socket sees the error
- * > while the SUB socket does not.
- * >
- * > The ZMTP RFC has a proposal for heartbeating that would
- * > solve this problem. The current best solution is for
- * > PUB sockets to send heartbeats (e.g. 1 per second) when
- * > traffic is low, and for SUB sockets to disconnect /
- * > reconnect if they stop getting these.
- *
- * We don't need a heartbeat, because our application is constant frame rate,
- * the frames themselves can act as heartbeats.
- */
-
- const auto now = chrono::steady_clock::now();
- if (last_frame_received + chrono::seconds(10) < now) {
- throw zmq_input_timeout();
- }
- }
-#endif // defined(HAVE_ZEROMQ)
else if (dynamic_pointer_cast<InputTcpReader>(m.inputReader)) {
- /* Same as for ZeroMQ */
+ /* An empty frame marks a timeout. We ignore it, but we are
+ * now able to handle SIGINT properly. */
}
else {
throw logic_error("Unhandled framesize==0!");
@@ -568,6 +608,7 @@ static run_modulator_state_t run_modulator(modulator_data& m)
fct = m.etiReader->getFct();
fp = m.etiReader->getFp();
+ ts = m.etiReader->getTimestamp();
}
else if (m.ediInput) {
while (running and not m.ediInput->ediReader.isFrameReady()) {
@@ -594,39 +635,53 @@ static run_modulator_state_t run_modulator(modulator_data& m)
break;
}
+ m.most_recent_edi_decoded = get_clock_realtime_seconds();
fct = m.ediInput->ediReader.getFct();
fp = m.ediInput->ediReader.getFp();
+ ts = m.ediInput->ediReader.getTimestamp();
}
- const unsigned expected_fct = (last_eti_fct + 1) % 250;
- if (last_eti_fct == -1) {
- if (fp != 0) {
- // Do not start the flowgraph before we get to FP 0
- // to ensure all blocks are properly aligned.
- if (m.ediInput) {
- m.ediInput->ediReader.clearFrame();
+ // timestamp is good if we run unsynchronised, or if margin is sufficient
+ bool ts_good = not mod_settings.sdr_device_config.enableSync or
+ (ts.timestamp_valid and ts.offset_to_system_time() > 0.2);
+
+ if (!ts_good) {
+ etiLog.level(warn) << "Modulator skipping frame " << fct <<
+ " TS " << (ts.timestamp_valid ? "valid" : "invalid") <<
+ " offset " << (ts.timestamp_valid ? ts.offset_to_system_time() : 0);
+ }
+ else {
+ bool modulate = true;
+ if (last_eti_fct == -1) {
+ if (fp != 0) {
+ // Do not start the flowgraph before we get to FP 0
+ // to ensure all blocks are properly aligned.
+ modulate = false;
+ }
+ else {
+ last_eti_fct = fct;
}
- continue;
}
else {
- last_eti_fct = fct;
+ const unsigned expected_fct = (last_eti_fct + 1) % 250;
+ if (fct == expected_fct) {
+ last_eti_fct = fct;
+ }
+ else {
+ etiLog.level(warn) << "ETI FCT discontinuity, expected " <<
+ expected_fct << " received " << fct;
+ if (m.ediInput) {
+ m.ediInput->ediReader.clearFrame();
+ }
+ return run_modulator_state_t::again;
+ }
+ }
+
+ if (modulate) {
m.framecount++;
m.flowgraph->run();
}
}
- else if (fct == expected_fct) {
- last_eti_fct = fct;
- m.framecount++;
- m.flowgraph->run();
- }
- else {
- etiLog.level(info) << "ETI FCT discontinuity, expected " <<
- expected_fct << " received " << fct;
- if (m.ediInput) {
- m.ediInput->ediReader.clearFrame();
- }
- return run_modulator_state_t::again;
- }
if (m.ediInput) {
m.ediInput->ediReader.clearFrame();
@@ -639,16 +694,6 @@ static run_modulator_state_t run_modulator(modulator_data& m)
}
}
}
- catch (const zmq_input_timeout&) {
- // The ZeroMQ input timeout
- etiLog.level(warn) << "Timeout";
- ret = run_modulator_state_t::again;
- }
- catch (const zmq_input_overflow& e) {
- // The ZeroMQ input has overflowed its buffer
- etiLog.level(warn) << e.what();
- ret = run_modulator_state_t::again;
- }
catch (const FrameMultiplexerError& e) {
// The FrameMultiplexer saw an error or a change in the size of a
// subchannel. This can be due to a multiplex reconfiguration.