/*
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
Copyright (C) 2023
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
*/
/*
This file is part of ODR-DabMod.
ODR-DabMod is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
ODR-DabMod is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with ODR-DabMod. If not, see .
*/
#include
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#if HAVE_NETINET_IN_H
# include
#endif
#include "Events.h"
#include "Utils.h"
#include "Log.h"
#include "DabModulator.h"
#include "InputMemory.h"
#include "OutputFile.h"
#include "FormatConverter.h"
#include "FrameMultiplexer.h"
#include "output/SDR.h"
#include "output/UHD.h"
#include "output/Soapy.h"
#include "output/Dexter.h"
#include "output/Lime.h"
#include "output/BladeRF.h"
#include "OutputZeroMQ.h"
#include "InputReader.h"
#include "PcDebug.h"
#include "FIRFilter.h"
#include "RemoteControl.h"
#include "ConfigParser.h"
/* UHD requires the input I and Q samples to be in the interval
* [-1.0,1.0], otherwise they get truncated, which creates very
* wide-spectrum spikes. Depending on the Transmission Mode, the
* Gain Mode and the sample rate (and maybe other parameters), the
* samples can have peaks up to about 48000. The value of 50000
* should guarantee that with a digital gain of 1.0, UHD never clips
* our samples.
*/
static const float normalise_factor = 50000.0f;
//Empirical normalisation factors used to normalise the samples to amplitude 1.
static const float normalise_factor_file_fix = 81000.0f;
static const float normalise_factor_file_var = 46000.0f;
static const float normalise_factor_file_max = 46000.0f;
typedef std::complex complexf;
using namespace std;
volatile sig_atomic_t running = 1;
void signalHandler(int signalNb)
{
PDEBUG("signalHandler(%i)\n", signalNb);
running = 0;
}
class ModulatorData : public RemoteControllable {
public:
// For ETI
std::shared_ptr inputReader;
std::shared_ptr etiReader;
// For EDI
std::shared_ptr ediInput;
// Common to both EDI and EDI
uint64_t framecount = 0;
Flowgraph *flowgraph = nullptr;
// RC-related
ModulatorData() : RemoteControllable("mainloop") {
RC_ADD_PARAMETER(num_modulator_restarts, "(Read-only) Number of mod restarts");
RC_ADD_PARAMETER(most_recent_edi_decoded, "(Read-only) UNIX Timestamp of most recently decoded EDI frame");
RC_ADD_PARAMETER(edi_source, "(Read-only) URL of the EDI/TCP source");
RC_ADD_PARAMETER(running_since, "(Read-only) UNIX Timestamp of most recent modulator restart");
RC_ADD_PARAMETER(ensemble_label, "(Read-only) Label of the ensemble");
RC_ADD_PARAMETER(ensemble_eid, "(Read-only) Ensemble ID");
RC_ADD_PARAMETER(ensemble_services, "(Read-only, only JSON) Ensemble service information");
RC_ADD_PARAMETER(num_services, "(Read-only) Number of services in the ensemble");
}
virtual ~ModulatorData() {}
virtual void set_parameter(const std::string& parameter, const std::string& value) {
throw ParameterError("Parameter " + parameter + " is read-only");
}
virtual const std::string get_parameter(const std::string& parameter) const {
stringstream ss;
if (parameter == "num_modulator_restarts") {
ss << num_modulator_restarts;
}
else if (parameter == "running_since") {
ss << running_since;
}
else if (parameter == "most_recent_edi_decoded") {
ss << most_recent_edi_decoded;
}
else if (parameter == "ensemble_label") {
if (ediInput) {
const auto ens = ediInput->ediReader.getEnsembleInfo();
if (ens) {
ss << FICDecoder::ConvertLabelToUTF8(ens->label, nullptr);
}
else {
throw ParameterError("Not available yet");
}
}
else {
throw ParameterError("Not available yet");
}
}
else if (parameter == "ensemble_eid") {
if (ediInput) {
const auto ens = ediInput->ediReader.getEnsembleInfo();
if (ens) {
ss << ens->eid;
}
else {
throw ParameterError("Not available yet");
}
}
else {
throw ParameterError("Not available yet");
}
}
else if (parameter == "edi_source") {
if (ediInput) {
ss << ediInput->ediTransport.getTcpUri();
}
else {
throw ParameterError("Not available yet");
}
}
else if (parameter == "num_services") {
if (ediInput) {
ss << ediInput->ediReader.getSubchannels().size();
}
else {
throw ParameterError("Not available yet");
}
}
else if (parameter == "ensemble_services") {
throw ParameterError("ensemble_services is only available through 'showjson'");
}
else {
ss << "Parameter '" << parameter <<
"' is not exported by controllable " << get_rc_name();
throw ParameterError(ss.str());
}
return ss.str();
}
virtual const json::map_t get_all_values() const
{
json::map_t map;
map["num_modulator_restarts"].v = num_modulator_restarts;
map["running_since"].v = running_since;
map["most_recent_edi_decoded"].v = most_recent_edi_decoded;
if (ediInput) {
map["edi_source"].v = ediInput->ediTransport.getTcpUri();
map["num_services"].v = ediInput->ediReader.getSubchannels().size();
const auto ens = ediInput->ediReader.getEnsembleInfo();
if (ens) {
map["ensemble_label"].v = FICDecoder::ConvertLabelToUTF8(ens->label, nullptr);
map["ensemble_eid"].v = ens->eid;
}
else {
map["ensemble_label"].v = nullopt;
map["ensemble_eid"].v = nullopt;
}
std::vector services;
for (const auto& s : ediInput->ediReader.getServiceInfo()) {
auto service_map = make_shared();
(*service_map)["sad"].v = s.second.subchannel.start;
(*service_map)["sid"].v = s.second.sid;
(*service_map)["label"].v = FICDecoder::ConvertLabelToUTF8(s.second.label, nullptr);
(*service_map)["bitrate"].v = s.second.subchannel.bitrate;
(*service_map)["protection_level"].v = s.second.subchannel.pl;
json::value_t v;
v.v = service_map;
services.push_back(v);
}
map["ensemble_services"].v = services;
}
return map;
}
size_t num_modulator_restarts = 0;
time_t most_recent_edi_decoded = 0;
time_t running_since = 0;
};
enum class run_modulator_state_t {
failure, // Corresponds to all failures
normal_end, // Number of frames to modulate was reached
again, // Restart the modulator part
reconfigure // Some sort of change of configuration we cannot handle happened
};
static run_modulator_state_t run_modulator(const mod_settings_t& mod_settings, ModulatorData& m);
static shared_ptr prepare_output(mod_settings_t& s)
{
shared_ptr output;
if (s.useFileOutput) {
if (s.fileOutputFormat == "complexf") {
output = make_shared(s.outputName, s.fileOutputShowMetadata);
}
else if (s.fileOutputFormat == "complexf_normalised") {
if (s.gainMode == GainMode::GAIN_FIX)
s.normalise = 1.0f / normalise_factor_file_fix;
else if (s.gainMode == GainMode::GAIN_MAX)
s.normalise = 1.0f / normalise_factor_file_max;
else if (s.gainMode == GainMode::GAIN_VAR)
s.normalise = 1.0f / normalise_factor_file_var;
output = make_shared(s.outputName, s.fileOutputShowMetadata);
}
else if (s.fileOutputFormat == "s16") {
// We must normalise the samples to the interval [-32767.0; 32767.0]
s.normalise = 32767.0f / normalise_factor;
output = make_shared(s.outputName, s.fileOutputShowMetadata);
}
else if (s.fileOutputFormat == "s8" or
s.fileOutputFormat == "u8") {
// We must normalise the samples to the interval [-127.0; 127.0]
// The formatconverter will add 127 for u8 so that it ends up in
// [0; 255]
s.normalise = 127.0f / normalise_factor;
output = make_shared(s.outputName, s.fileOutputShowMetadata);
}
else {
throw runtime_error("File output format " + s.fileOutputFormat +
" not known");
}
}
#if defined(HAVE_OUTPUT_UHD)
else if (s.useUHDOutput) {
s.normalise = 1.0f / normalise_factor;
s.sdr_device_config.sampleRate = s.outputRate;
auto uhddevice = make_shared(s.sdr_device_config);
output = make_shared(s.sdr_device_config, uhddevice);
rcs.enrol((Output::SDR*)output.get());
}
#endif
#if defined(HAVE_SOAPYSDR)
else if (s.useSoapyOutput) {
/* We normalise the same way as for the UHD output */
s.normalise = 1.0f / normalise_factor;
s.sdr_device_config.sampleRate = s.outputRate;
auto soapydevice = make_shared(s.sdr_device_config);
output = make_shared(s.sdr_device_config, soapydevice);
rcs.enrol((Output::SDR*)output.get());
}
#endif
#if defined(HAVE_DEXTER)
else if (s.useDexterOutput) {
/* We normalise specifically range [-32768; 32767] */
s.normalise = 32767.0f / normalise_factor;
s.sdr_device_config.sampleRate = s.outputRate;
auto dexterdevice = make_shared(s.sdr_device_config);
output = make_shared(s.sdr_device_config, dexterdevice);
rcs.enrol((Output::SDR*)output.get());
}
#endif
#if defined(HAVE_LIMESDR)
else if (s.useLimeOutput) {
/* We normalise the same way as for the UHD output */
s.normalise = 1.0f / normalise_factor;
s.sdr_device_config.sampleRate = s.outputRate;
auto limedevice = make_shared(s.sdr_device_config);
output = make_shared(s.sdr_device_config, limedevice);
rcs.enrol((Output::SDR*)output.get());
}
#endif
#if defined(HAVE_BLADERF)
else if (s.useBladeRFOutput) {
/* We normalise specifically for the BladeRF output : range [-2048; 2047] */
s.normalise = 2047.0f / normalise_factor;
s.sdr_device_config.sampleRate = s.outputRate;
auto bladerfdevice = make_shared(s.sdr_device_config);
output = make_shared(s.sdr_device_config, bladerfdevice);
rcs.enrol((Output::SDR*)output.get());
}
#endif
#if defined(HAVE_ZEROMQ)
else if (s.useZeroMQOutput) {
/* We normalise the same way as for the UHD output */
s.normalise = 1.0f / normalise_factor;
if (s.zmqOutputSocketType == "pub") {
output = make_shared(s.outputName, ZMQ_PUB);
}
else if (s.zmqOutputSocketType == "rep") {
output = make_shared(s.outputName, ZMQ_REP);
}
else {
std::stringstream ss;
ss << "ZeroMQ output socket type " << s.zmqOutputSocketType << " invalid";
throw std::invalid_argument(ss.str());
}
}
#endif
return output;
}
int launch_modulator(int argc, char* argv[])
{
int ret = 0;
struct sigaction sa;
memset(&sa, 0, sizeof(struct sigaction));
sa.sa_handler = &signalHandler;
if (sigaction(SIGINT, &sa, NULL) == -1) {
const string errstr = strerror(errno);
throw runtime_error("Could not set signal handler: " + errstr);
}
printStartupInfo();
mod_settings_t mod_settings;
parse_args(argc, argv, mod_settings);
#if defined(HAVE_ZEROMQ)
etiLog.register_backend(make_shared());
#endif // defined(HAVE_ZEROMQ)
etiLog.level(info) << "Configuration parsed. Starting up version " <<
#if defined(GITVERSION)
GITVERSION;
#else
VERSION;
#endif
if (not (mod_settings.useFileOutput or
mod_settings.useUHDOutput or
mod_settings.useZeroMQOutput or
mod_settings.useSoapyOutput or
mod_settings.useDexterOutput or
mod_settings.useLimeOutput or
mod_settings.useBladeRFOutput)) {
throw std::runtime_error("Configuration error: Output not specified");
}
if (not mod_settings.startupCheck.empty()) {
etiLog.level(info) << "Running startup check '" << mod_settings.startupCheck << "'";
int wstatus = system(mod_settings.startupCheck.c_str());
if (WIFEXITED(wstatus)) {
if (WEXITSTATUS(wstatus) == 0) {
etiLog.level(info) << "Startup check ok";
}
else {
etiLog.level(error) << "Startup check failed, returned " << WEXITSTATUS(wstatus);
return 1;
}
}
else {
etiLog.level(error) << "Startup check failed, child didn't terminate normally";
return 1;
}
}
printModSettings(mod_settings);
ModulatorData m;
rcs.enrol(&m);
{
// This is mostly useful on ARM systems where FFTW planning takes some time. If we do it here
// it will be done before the modulator starts up
etiLog.level(debug) << "Running FFTW planning...";
constexpr size_t fft_size = 2048; // Transmission Mode I. If different, it'll recalculate on OfdmGenerator
// initialisation
auto *fft_in = (fftwf_complex*)fftwf_malloc(sizeof(fftwf_complex) * fft_size);
auto *fft_out = (fftwf_complex*)fftwf_malloc(sizeof(fftwf_complex) * fft_size);
if (fft_in == nullptr or fft_out == nullptr) {
throw std::runtime_error("FFTW malloc failed");
}
fftwf_set_timelimit(2);
fftwf_plan plan = fftwf_plan_dft_1d(fft_size, fft_in, fft_out, FFTW_FORWARD, FFTW_MEASURE);
fftwf_destroy_plan(plan);
plan = fftwf_plan_dft_1d(fft_size, fft_in, fft_out, FFTW_BACKWARD, FFTW_MEASURE);
fftwf_destroy_plan(plan);
fftwf_free(fft_in);
fftwf_free(fft_out);
etiLog.level(debug) << "FFTW planning done.";
}
std::string output_format;
if (mod_settings.useFileOutput and
(mod_settings.fileOutputFormat == "s8" or
mod_settings.fileOutputFormat == "u8" or
mod_settings.fileOutputFormat == "s16")) {
output_format = mod_settings.fileOutputFormat;
}
else if (mod_settings.useBladeRFOutput or mod_settings.useDexterOutput) {
output_format = "s16";
}
auto output = prepare_output(mod_settings);
if (not output_format.empty()) {
if (auto o = dynamic_pointer_cast(output)) {
o->set_sample_size(FormatConverter::get_format_size(output_format));
}
}
// Set thread priority to realtime
if (int r = set_realtime_prio(1)) {
etiLog.level(error) << "Could not set priority for modulator:" << r;
}
shared_ptr inputReader;
shared_ptr ediInput;
if (mod_settings.inputTransport == "edi") {
ediInput = make_shared(mod_settings.tist_offset_s, mod_settings.edi_max_delay_ms);
ediInput->ediTransport.Open(mod_settings.inputName);
if (not ediInput->ediTransport.isEnabled()) {
throw runtime_error("inputTransport is edi, but ediTransport is not enabled");
}
}
else if (mod_settings.inputTransport == "file") {
auto inputFileReader = make_shared();
// Opening ETI input file
if (inputFileReader->Open(mod_settings.inputName, mod_settings.loop) == -1) {
throw std::runtime_error("Unable to open input");
}
inputReader = inputFileReader;
}
else if (mod_settings.inputTransport == "tcp") {
auto inputTcpReader = make_shared();
inputTcpReader->Open(mod_settings.inputName);
inputReader = inputTcpReader;
}
else {
throw std::runtime_error("Unable to open input: "
"invalid input transport " + mod_settings.inputTransport + " selected!");
}
m.ediInput = ediInput;
m.inputReader = inputReader;
bool run_again = true;
while (run_again) {
m.running_since = get_clock_realtime_seconds();
Flowgraph flowgraph(mod_settings.showProcessTime);
m.framecount = 0;
m.flowgraph = &flowgraph;
shared_ptr modulator;
if (inputReader) {
m.etiReader = make_shared(mod_settings.tist_offset_s);
modulator = make_shared(*m.etiReader, mod_settings, output_format);
}
else if (ediInput) {
modulator = make_shared(ediInput->ediReader, mod_settings, output_format);
}
rcs.enrol(modulator.get());
flowgraph.connect(modulator, output);
if (inputReader) {
etiLog.level(info) << inputReader->GetPrintableInfo();
}
run_modulator_state_t st = run_modulator(mod_settings, m);
etiLog.log(trace, "DABMOD,run_modulator() = %d", st);
switch (st) {
case run_modulator_state_t::failure:
etiLog.level(error) << "Modulator failure.";
run_again = false;
ret = 1;
break;
case run_modulator_state_t::again:
etiLog.level(warn) << "Restart modulator.";
run_again = false;
if (auto in = dynamic_pointer_cast(inputReader)) {
if (in->Open(mod_settings.inputName, mod_settings.loop) == -1) {
etiLog.level(error) << "Unable to open input file!";
ret = 1;
}
else {
run_again = true;
}
}
else if (dynamic_pointer_cast(inputReader)) {
// Keep the same inputReader, as there is no input buffer overflow
run_again = true;
}
else if (ediInput) {
// In EDI, keep the same input
run_again = true;
}
break;
case run_modulator_state_t::reconfigure:
etiLog.level(warn) << "Detected change in ensemble configuration.";
/* We can keep the input in this case */
run_again = true;
break;
case run_modulator_state_t::normal_end:
default:
etiLog.level(info) << "modulator stopped.";
ret = 0;
run_again = false;
break;
}
etiLog.level(info) << m.framecount << " DAB frames, " << ((float)m.framecount * 0.024f) << " seconds encoded";
m.num_modulator_restarts++;
}
etiLog.level(info) << "Terminating";
return ret;
}
static run_modulator_state_t run_modulator(const mod_settings_t& mod_settings, ModulatorData& m)
{
auto ret = run_modulator_state_t::failure;
try {
int last_eti_fct = -1;
auto last_frame_received = chrono::steady_clock::now();
frame_timestamp ts;
Buffer data;
if (m.inputReader) {
data.setLength(6144);
}
while (running) {
unsigned fct = 0;
unsigned fp = 0;
/* Load ETI data from the source */
if (m.inputReader) {
int framesize = m.inputReader->GetNextFrame(data.getData());
if (framesize == 0) {
if (dynamic_pointer_cast(m.inputReader)) {
etiLog.level(info) << "End of file reached.";
running = 0;
ret = run_modulator_state_t::normal_end;
break;
}
else if (dynamic_pointer_cast(m.inputReader)) {
/* An empty frame marks a timeout. We ignore it, but we are
* now able to handle SIGINT properly. */
}
else {
throw logic_error("Unhandled framesize==0!");
}
continue;
}
else if (framesize < 0) {
etiLog.level(error) << "Input read error.";
running = 0;
ret = run_modulator_state_t::normal_end;
break;
}
const int eti_bytes_read = m.etiReader->loadEtiData(data);
if ((size_t)eti_bytes_read != data.getLength()) {
etiLog.level(error) << "ETI frame incompletely read";
throw std::runtime_error("ETI read error");
}
last_frame_received = chrono::steady_clock::now();
fct = m.etiReader->getFct();
fp = m.etiReader->getFp();
ts = m.etiReader->getTimestamp();
}
else if (m.ediInput) {
while (running and not m.ediInput->ediReader.isFrameReady()) {
try {
bool packet_received = m.ediInput->ediTransport.rxPacket();
if (packet_received) {
last_frame_received = chrono::steady_clock::now();
}
}
catch (const std::runtime_error& e) {
etiLog.level(warn) << "EDI input: " << e.what();
running = 0;
break;
}
}
if (!running) {
break;
}
m.most_recent_edi_decoded = get_clock_realtime_seconds();
fct = m.ediInput->ediReader.getFct();
fp = m.ediInput->ediReader.getFp();
ts = m.ediInput->ediReader.getTimestamp();
}
// timestamp is good if we run unsynchronised, or if margin is sufficient
bool ts_good = not mod_settings.sdr_device_config.enableSync or
(ts.timestamp_valid and ts.offset_to_system_time() > 0.2);
if (!ts_good) {
etiLog.level(warn) << "Modulator skipping frame " << fct <<
" TS " << (ts.timestamp_valid ? "valid" : "invalid") <<
" offset " << (ts.timestamp_valid ? ts.offset_to_system_time() : 0);
}
else {
bool modulate = true;
if (last_eti_fct == -1) {
if (fp != 0) {
// Do not start the flowgraph before we get to FP 0
// to ensure all blocks are properly aligned.
modulate = false;
}
else {
last_eti_fct = fct;
}
}
else {
const unsigned expected_fct = (last_eti_fct + 1) % 250;
if (fct == expected_fct) {
last_eti_fct = fct;
}
else {
etiLog.level(warn) << "ETI FCT discontinuity, expected " <<
expected_fct << " received " << fct;
if (m.ediInput) {
m.ediInput->ediReader.clearFrame();
}
return run_modulator_state_t::again;
}
}
if (modulate) {
m.framecount++;
m.flowgraph->run();
}
}
if (m.ediInput) {
m.ediInput->ediReader.clearFrame();
}
/* Check every once in a while if the remote control
* is still working */
if ((m.framecount % 250) == 0) {
rcs.check_faults();
}
}
}
catch (const FrameMultiplexerError& e) {
// The FrameMultiplexer saw an error or a change in the size of a
// subchannel. This can be due to a multiplex reconfiguration.
etiLog.level(warn) << e.what();
ret = run_modulator_state_t::reconfigure;
}
catch (const std::exception& e) {
etiLog.level(error) << "Exception caught: " << e.what();
ret = run_modulator_state_t::failure;
}
return ret;
}
int main(int argc, char* argv[])
{
// Set timezone to UTC
setenv("TZ", "", 1);
tzset();
// Version handling is done very early to ensure nothing else but the version gets printed out
if (argc == 2 and strcmp(argv[1], "--version") == 0) {
fprintf(stdout, "%s\n",
#if defined(GITVERSION)
GITVERSION
#else
PACKAGE_VERSION
#endif
);
return 0;
}
try {
return launch_modulator(argc, argv);
}
catch (const std::invalid_argument& e) {
std::string what(e.what());
if (not what.empty()) {
std::cerr << "Modulator error: " << what << std::endl;
}
}
catch (const std::runtime_error& e) {
std::cerr << "Modulator runtime error: " << e.what() << std::endl;
}
return 1;
}