diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Buffer.cpp | 3 | ||||
-rw-r--r-- | src/ConfigParser.cpp | 3 | ||||
-rw-r--r-- | src/ConfigParser.h | 1 | ||||
-rw-r--r-- | src/DabMod.cpp | 481 | ||||
-rw-r--r-- | src/DabModulator.cpp | 2 | ||||
-rw-r--r-- | src/EtiReader.cpp | 33 | ||||
-rw-r--r-- | src/EtiReader.h | 26 | ||||
-rw-r--r-- | src/Flowgraph.cpp | 9 | ||||
-rw-r--r-- | src/Flowgraph.h | 7 | ||||
-rw-r--r-- | src/GainControl.cpp | 3 | ||||
-rw-r--r-- | src/GuardIntervalInserter.cpp | 3 | ||||
-rw-r--r-- | src/InputReader.h | 10 | ||||
-rw-r--r-- | src/Log.cpp | 191 | ||||
-rw-r--r-- | src/Log.h | 200 | ||||
-rw-r--r-- | src/MemlessPoly.cpp | 3 | ||||
-rw-r--r-- | src/MemlessPoly.h | 8 | ||||
-rw-r--r-- | src/OutputFile.cpp | 56 | ||||
-rw-r--r-- | src/OutputFile.h | 6 | ||||
-rw-r--r-- | src/OutputMemory.cpp | 1 | ||||
-rw-r--r-- | src/RemoteControl.cpp | 588 | ||||
-rw-r--r-- | src/RemoteControl.h | 259 | ||||
-rw-r--r-- | src/Socket.cpp | 275 | ||||
-rw-r--r-- | src/Socket.h | 104 | ||||
-rw-r--r-- | src/SubchannelSource.cpp | 4 | ||||
-rw-r--r-- | src/SubchannelSource.h | 2 | ||||
-rw-r--r-- | src/ThreadsafeQueue.h | 178 | ||||
-rw-r--r-- | src/TimestampDecoder.cpp | 1 | ||||
-rw-r--r-- | src/Utils.cpp | 3 | ||||
-rw-r--r-- | src/output/Feedback.cpp | 7 | ||||
-rw-r--r-- | src/output/SDR.cpp | 8 |
30 files changed, 358 insertions, 2117 deletions
diff --git a/src/Buffer.cpp b/src/Buffer.cpp index 002c1eb..ab50f1a 100644 --- a/src/Buffer.cpp +++ b/src/Buffer.cpp @@ -97,6 +97,9 @@ Buffer& Buffer::operator=(Buffer&& other) if (&other != this) { m_len = other.m_len; m_capacity = other.m_capacity; + if (m_data != nullptr) { + free(m_data); + } m_data = other.m_data; other.m_len = 0; diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index dd8a150..73e51dd 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -30,6 +30,7 @@ #endif #include <cstdint> +#include <algorithm> #include "INIReader.h" @@ -143,6 +144,8 @@ static void parse_configfile( etiLog.register_backend(make_shared<LogTracer>(trace_filename)); } + mod_settings.showProcessTime = pt.GetInteger("log.show_process_time", + mod_settings.showProcessTime); // modulator parameters: const string gainMode_setting = pt.Get("modulator.gainmode", "var"); diff --git a/src/ConfigParser.h b/src/ConfigParser.h index 23b0528..33d7824 100644 --- a/src/ConfigParser.h +++ b/src/ConfigParser.h @@ -88,6 +88,7 @@ struct mod_settings_t { Output::SDRDeviceConfig sdr_device_config; #endif + bool showProcessTime = true; }; void parse_args(int argc, char **argv, mod_settings_t& mod_settings); diff --git a/src/DabMod.cpp b/src/DabMod.cpp index d340b30..d624a12 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -37,6 +37,7 @@ #include <cstdlib> #include <stdexcept> #include <cstdio> +#include <cstring> #include <cstddef> #include <sys/stat.h> #include <signal.h> @@ -59,7 +60,6 @@ #include "OutputZeroMQ.h" #include "InputReader.h" #include "PcDebug.h" -#include "TimestampDecoder.h" #include "FIRFilter.h" #include "RemoteControl.h" #include "ConfigParser.h" @@ -94,12 +94,16 @@ void signalHandler(int signalNb) struct modulator_data { + // For ETI std::shared_ptr<InputReader> inputReader; - Buffer data; - uint64_t framecount = 0; + std::shared_ptr<EtiReader> etiReader; + + // For EDI + std::shared_ptr<EdiInput> ediInput; - Flowgraph* flowgraph = nullptr; - EtiReader* etiReader = nullptr; + // Common to both EDI and EDI + uint64_t framecount = 0; + Flowgraph *flowgraph = nullptr; }; enum class run_modulator_state_t { @@ -318,205 +322,137 @@ int launch_modulator(int argc, char* argv[]) etiLog.level(error) << "Could not set priority for modulator:" << r; } + shared_ptr<InputReader> inputReader; + shared_ptr<EdiInput> ediInput; + if (mod_settings.inputTransport == "edi") { -#ifdef HAVE_EDI - EdiReader ediReader(mod_settings.tist_offset_s); - EdiDecoder::ETIDecoder ediInput(ediReader, false); - if (mod_settings.edi_max_delay_ms > 0.0f) { - // setMaxDelay wants number of AF packets, which correspond to 24ms ETI frames - ediInput.setMaxDelay(lroundf(mod_settings.edi_max_delay_ms / 24.0f)); - } - EdiTransport ediTransport(ediInput); + ediInput = make_shared<EdiInput>(mod_settings.tist_offset_s, mod_settings.edi_max_delay_ms); - ediTransport.Open(mod_settings.inputName); - if (not ediTransport.isEnabled()) { + ediInput->ediTransport.Open(mod_settings.inputName); + if (not ediInput->ediTransport.isEnabled()) { throw runtime_error("inputTransport is edi, but ediTransport is not enabled"); } - Flowgraph flowgraph; - - auto modulator = make_shared<DabModulator>(ediReader, mod_settings); - rcs.enrol(modulator.get()); + } + else if (mod_settings.inputTransport == "file") { + auto inputFileReader = make_shared<InputFileReader>(); - if (format_converter) { - flowgraph.connect(modulator, format_converter); - flowgraph.connect(format_converter, output); - } - else { - flowgraph.connect(modulator, output); + // Opening ETI input file + if (inputFileReader->Open(mod_settings.inputName, mod_settings.loop) == -1) { + throw std::runtime_error("Unable to open input"); } - size_t framecount = 0; - - bool first_frame = true; - - auto frame_received_tp = chrono::steady_clock::now(); - - while (running) { - while (running and not ediReader.isFrameReady()) { - try { - bool packet_received = ediTransport.rxPacket(); - if (packet_received) { - frame_received_tp = chrono::steady_clock::now(); - } - } - catch (const std::runtime_error& e) { - etiLog.level(warn) << "EDI input: " << e.what(); - running = 0; - break; - } - - if (frame_received_tp + chrono::seconds(10) < chrono::steady_clock::now()) { - etiLog.level(error) << "No EDI data received in 10 seconds."; - running = 0; - break; - } - } - - if (not running) { - break; - } - - if (first_frame) { - if (ediReader.getFp() != 0) { - // Do not start the flowgraph before we get to FP 0 - // to ensure all blocks are properly aligned. - ediReader.clearFrame(); - continue; - } - else { - first_frame = false; - } - } - - framecount++; - flowgraph.run(); - ediReader.clearFrame(); - - /* Check every once in a while if the remote control - * is still working */ - if ((framecount % 250) == 0) { - rcs.check_faults(); - } - } -#else + inputReader = inputFileReader; + } + else if (mod_settings.inputTransport == "zeromq") { +#if !defined(HAVE_ZEROMQ) throw std::runtime_error("Unable to open input: " - "EDI input transport selected, but not compiled in!"); -#endif // HAVE_EDI + "ZeroMQ input transport selected, but not compiled in!"); +#else + auto inputZeroMQReader = make_shared<InputZeroMQReader>(); + inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); + rcs.enrol(inputZeroMQReader.get()); + inputReader = inputZeroMQReader; +#endif + } + else if (mod_settings.inputTransport == "tcp") { + auto inputTcpReader = make_shared<InputTcpReader>(); + inputTcpReader->Open(mod_settings.inputName); + inputReader = inputTcpReader; } else { - shared_ptr<InputReader> inputReader; + throw std::runtime_error("Unable to open input: " + "invalid input transport " + mod_settings.inputTransport + " selected!"); + } - if (mod_settings.inputTransport == "file") { - auto inputFileReader = make_shared<InputFileReader>(); + bool run_again = true; - // Opening ETI input file - if (inputFileReader->Open(mod_settings.inputName, mod_settings.loop) == -1) { - throw std::runtime_error("Unable to open input"); - } + while (run_again) { + Flowgraph flowgraph(mod_settings.showProcessTime); - inputReader = inputFileReader; + modulator_data m; + m.ediInput = ediInput; + m.inputReader = inputReader; + m.flowgraph = &flowgraph; + + shared_ptr<DabModulator> modulator; + if (inputReader) { + m.etiReader = make_shared<EtiReader>(mod_settings.tist_offset_s); + modulator = make_shared<DabModulator>(*m.etiReader, mod_settings); } - else if (mod_settings.inputTransport == "zeromq") { -#if !defined(HAVE_ZEROMQ) - throw std::runtime_error("Unable to open input: " - "ZeroMQ input transport selected, but not compiled in!"); -#else - auto inputZeroMQReader = make_shared<InputZeroMQReader>(); - inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); - rcs.enrol(inputZeroMQReader.get()); - inputReader = inputZeroMQReader; -#endif + else if (ediInput) { + modulator = make_shared<DabModulator>(ediInput->ediReader, mod_settings); } - else if (mod_settings.inputTransport == "tcp") { - auto inputTcpReader = make_shared<InputTcpReader>(); - inputTcpReader->Open(mod_settings.inputName); - inputReader = inputTcpReader; + + rcs.enrol(modulator.get()); + + if (format_converter) { + flowgraph.connect(modulator, format_converter); + flowgraph.connect(format_converter, output); } else { - throw std::runtime_error("Unable to open input: " - "invalid input transport " + mod_settings.inputTransport + " selected!"); + flowgraph.connect(modulator, output); } - bool run_again = true; - - while (run_again) { - Flowgraph flowgraph; - - modulator_data m; - m.inputReader = inputReader; - m.flowgraph = &flowgraph; - m.data.setLength(6144); - - EtiReader etiReader(mod_settings.tist_offset_s); - m.etiReader = &etiReader; - - auto input = make_shared<InputMemory>(&m.data); - auto modulator = make_shared<DabModulator>(etiReader, mod_settings); - rcs.enrol(modulator.get()); - - if (format_converter) { - flowgraph.connect(modulator, format_converter); - flowgraph.connect(format_converter, output); - } - else { - flowgraph.connect(modulator, output); - } - + if (inputReader) { etiLog.level(info) << inputReader->GetPrintableInfo(); + } - run_modulator_state_t st = run_modulator(m); - etiLog.log(trace, "DABMOD,run_modulator() = %d", st); - - switch (st) { - case run_modulator_state_t::failure: - etiLog.level(error) << "Modulator failure."; - run_again = false; - ret = 1; - break; - case run_modulator_state_t::again: - etiLog.level(warn) << "Restart modulator."; - run_again = false; - if (auto in = dynamic_pointer_cast<InputFileReader>(inputReader)) { - if (in->Open(mod_settings.inputName, mod_settings.loop) == -1) { - etiLog.level(error) << "Unable to open input file!"; - ret = 1; - } - else { - run_again = true; - } + run_modulator_state_t st = run_modulator(m); + etiLog.log(trace, "DABMOD,run_modulator() = %d", st); + + switch (st) { + case run_modulator_state_t::failure: + etiLog.level(error) << "Modulator failure."; + run_again = false; + ret = 1; + break; + case run_modulator_state_t::again: + etiLog.level(warn) << "Restart modulator."; + run_again = false; + if (auto in = dynamic_pointer_cast<InputFileReader>(inputReader)) { + if (in->Open(mod_settings.inputName, mod_settings.loop) == -1) { + etiLog.level(error) << "Unable to open input file!"; + ret = 1; } -#if defined(HAVE_ZEROMQ) - else if (auto in_zmq = dynamic_pointer_cast<InputZeroMQReader>(inputReader)) { + else { run_again = true; - // Create a new input reader - rcs.remove_controllable(in_zmq.get()); - auto inputZeroMQReader = make_shared<InputZeroMQReader>(); - inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); - rcs.enrol(inputZeroMQReader.get()); - inputReader = inputZeroMQReader; } + } +#if defined(HAVE_ZEROMQ) + else if (auto in_zmq = dynamic_pointer_cast<InputZeroMQReader>(inputReader)) { + run_again = true; + // Create a new input reader + rcs.remove_controllable(in_zmq.get()); + auto inputZeroMQReader = make_shared<InputZeroMQReader>(); + inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); + rcs.enrol(inputZeroMQReader.get()); + inputReader = inputZeroMQReader; + } #endif - else if (dynamic_pointer_cast<InputTcpReader>(inputReader)) { - // Keep the same inputReader, as there is no input buffer overflow - run_again = true; - } - break; - case run_modulator_state_t::reconfigure: - etiLog.level(warn) << "Detected change in ensemble configuration."; - /* We can keep the input in this care */ + else if (dynamic_pointer_cast<InputTcpReader>(inputReader)) { + // Keep the same inputReader, as there is no input buffer overflow run_again = true; - break; - case run_modulator_state_t::normal_end: - default: - etiLog.level(info) << "modulator stopped."; - ret = 0; - run_again = false; - break; - } - - etiLog.level(info) << m.framecount << " DAB frames encoded"; - etiLog.level(info) << ((float)m.framecount * 0.024f) << " seconds encoded"; + } + else if (ediInput) { + // In EDI, keep the same input + run_again = true; + } + break; + case run_modulator_state_t::reconfigure: + etiLog.level(warn) << "Detected change in ensemble configuration."; + /* We can keep the input in this case */ + run_again = true; + break; + case run_modulator_state_t::normal_end: + default: + etiLog.level(info) << "modulator stopped."; + ret = 0; + run_again = false; + break; } + + etiLog.level(info) << m.framecount << " DAB frames encoded"; + etiLog.level(info) << ((float)m.framecount * 0.024f) << " seconds encoded"; } etiLog.level(info) << "Terminating"; @@ -535,57 +471,142 @@ static run_modulator_state_t run_modulator(modulator_data& m) { auto ret = run_modulator_state_t::failure; try { - bool first_frame = true; int last_eti_fct = -1; auto last_frame_received = chrono::steady_clock::now(); + Buffer data; + if (m.inputReader) { + data.setLength(6144); + } while (running) { - int framesize; - - PDEBUG("*****************************************\n"); - PDEBUG("* Starting main loop\n"); - PDEBUG("*****************************************\n"); - while ((framesize = m.inputReader->GetNextFrame(m.data.getData())) > 0) { - if (!running) { - break; - } + while (true) { + unsigned fct = 0; + unsigned fp = 0; + + /* Load ETI data from the source */ + if (m.inputReader) { + int framesize = m.inputReader->GetNextFrame(data.getData()); + + if (framesize == 0) { + if (dynamic_pointer_cast<InputFileReader>(m.inputReader)) { + etiLog.level(info) << "End of file reached."; + running = 0; + ret = run_modulator_state_t::normal_end; + break; + } +#if defined(HAVE_ZEROMQ) + else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) { + /* An empty frame marks a timeout. We ignore it, but we are + * now able to handle SIGINT properly. + * + * Also, we reconnect zmq every 10 seconds to avoid some + * issues, discussed in + * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection + * + * > It is possible that the PUB socket sees the error + * > while the SUB socket does not. + * > + * > The ZMTP RFC has a proposal for heartbeating that would + * > solve this problem. The current best solution is for + * > PUB sockets to send heartbeats (e.g. 1 per second) when + * > traffic is low, and for SUB sockets to disconnect / + * > reconnect if they stop getting these. + * + * We don't need a heartbeat, because our application is constant frame rate, + * the frames themselves can act as heartbeats. + */ + + const auto now = chrono::steady_clock::now(); + if (last_frame_received + chrono::seconds(10) < now) { + throw zmq_input_timeout(); + } + } +#endif // defined(HAVE_ZEROMQ) + else if (dynamic_pointer_cast<InputTcpReader>(m.inputReader)) { + /* Same as for ZeroMQ */ + } + else { + throw logic_error("Unhandled framesize==0!"); + } + } + else if (framesize < 0) { + etiLog.level(error) << "Input read error."; + running = 0; + ret = run_modulator_state_t::normal_end; + break; + } + + const int eti_bytes_read = m.etiReader->loadEtiData(data); + if ((size_t)eti_bytes_read != data.getLength()) { + etiLog.level(error) << "ETI frame incompletely read"; + throw std::runtime_error("ETI read error"); + } - last_frame_received = chrono::steady_clock::now(); + fct = m.etiReader->getFct(); + fp = m.etiReader->getFp(); + } + else if (m.ediInput) { + while (running and not m.ediInput->ediReader.isFrameReady()) { + try { + bool packet_received = m.ediInput->ediTransport.rxPacket(); + if (packet_received) { + last_frame_received = chrono::steady_clock::now(); + } + } + catch (const std::runtime_error& e) { + etiLog.level(warn) << "EDI input: " << e.what(); + running = 0; + break; + } - m.framecount++; + if (last_frame_received + chrono::seconds(10) < chrono::steady_clock::now()) { + etiLog.level(error) << "No EDI data received in 10 seconds."; + running = 0; + break; + } + } - PDEBUG("*****************************************\n"); - PDEBUG("* Read frame %lu\n", m.framecount); - PDEBUG("*****************************************\n"); + if (!running) { + break; + } - const int eti_bytes_read = m.etiReader->loadEtiData(m.data); - if ((size_t)eti_bytes_read != m.data.getLength()) { - etiLog.level(error) << "ETI frame incompletely read"; - throw std::runtime_error("ETI read error"); + fct = m.ediInput->ediReader.getFct(); + fp = m.ediInput->ediReader.getFp(); } - if (first_frame) { - if (m.etiReader->getFp() != 0) { + const unsigned expected_fct = (last_eti_fct + 1) % 250; + if (last_eti_fct == -1) { + if (fp != 0) { // Do not start the flowgraph before we get to FP 0 // to ensure all blocks are properly aligned. + if (m.ediInput) { + m.ediInput->ediReader.clearFrame(); + } continue; } else { - first_frame = false; + last_eti_fct = fct; + m.framecount++; + m.flowgraph->run(); } } - - // Check for ETI FCT continuity - const unsigned expected_fct = (last_eti_fct + 1) % 250; - const unsigned fct = m.etiReader->getFct(); - if (last_eti_fct != -1 and expected_fct != fct) { + else if (fct == expected_fct) { + last_eti_fct = fct; + m.framecount++; + m.flowgraph->run(); + } + else { etiLog.level(info) << "ETI FCT discontinuity, expected " << - expected_fct << " received " << m.etiReader->getFct(); + expected_fct << " received " << fct; + if (m.ediInput) { + m.ediInput->ediReader.clearFrame(); + } return run_modulator_state_t::again; } - last_eti_fct = fct; - m.flowgraph->run(); + if (m.ediInput) { + m.ediInput->ediReader.clearFrame(); + } /* Check every once in a while if the remote control * is still working */ @@ -593,52 +614,6 @@ static run_modulator_state_t run_modulator(modulator_data& m) rcs.check_faults(); } } - if (framesize == 0) { - if (dynamic_pointer_cast<InputFileReader>(m.inputReader)) { - etiLog.level(info) << "End of file reached."; - running = 0; - ret = run_modulator_state_t::normal_end; - } -#if defined(HAVE_ZEROMQ) - else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) { - /* An empty frame marks a timeout. We ignore it, but we are - * now able to handle SIGINT properly. - * - * Also, we reconnect zmq every 10 seconds to avoid some - * issues, discussed in - * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection - * - * > It is possible that the PUB socket sees the error - * > while the SUB socket does not. - * > - * > The ZMTP RFC has a proposal for heartbeating that would - * > solve this problem. The current best solution is for - * > PUB sockets to send heartbeats (e.g. 1 per second) when - * > traffic is low, and for SUB sockets to disconnect / - * > reconnect if they stop getting these. - * - * We don't need a heartbeat, because our application is constant frame rate, - * the frames themselves can act as heartbeats. - */ - - const auto now = chrono::steady_clock::now(); - if (last_frame_received + chrono::seconds(10) < now) { - throw zmq_input_timeout(); - } - } -#endif // defined(HAVE_ZEROMQ) - else if (dynamic_pointer_cast<InputTcpReader>(m.inputReader)) { - /* Same as for ZeroMQ */ - } - else { - throw logic_error("Unhandled framesize==0!"); - } - } - else { - etiLog.level(error) << "Input read error."; - running = 0; - ret = run_modulator_state_t::normal_end; - } } } catch (const zmq_input_timeout&) { diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp index 7e3ccf0..aa4f2a8 100644 --- a/src/DabModulator.cpp +++ b/src/DabModulator.cpp @@ -132,7 +132,7 @@ int DabModulator::process(Buffer* dataOut) const unsigned mode = m_settings.dabMode; setMode(mode); - myFlowgraph = make_shared<Flowgraph>(); + myFlowgraph = make_shared<Flowgraph>(m_settings.showProcessTime); //////////////////////////////////////////////////////////////// // CIF data initialisation //////////////////////////////////////////////////////////////// diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp index 94c362a..25c1ada 100644 --- a/src/EtiReader.cpp +++ b/src/EtiReader.cpp @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 + Copyright (C) 2019 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -238,7 +238,7 @@ int EtiReader::loadEtiData(const Buffer& dataIn) unsigned size = mySources[i]->framesize(); PDEBUG("Writting %i bytes of subchannel data\n", size); Buffer subch(size, in); - mySources[i]->loadSubchannelData(subch); + mySources[i]->loadSubchannelData(move(subch)); input_size -= size; framesize -= size; in += size; @@ -312,7 +312,6 @@ uint32_t EtiReader::getPPSOffset() return timestamp; } -#ifdef HAVE_EDI EdiReader::EdiReader( double& tist_offset_s) : m_timestamp_decoder(tist_offset_s) @@ -428,12 +427,12 @@ void EdiReader::update_fc_data(const EdiDecoder::eti_fc_data& fc_data) m_fc_valid = true; } -void EdiReader::update_fic(const std::vector<uint8_t>& fic) +void EdiReader::update_fic(std::vector<uint8_t>&& fic) { if (not m_proto_valid) { throw std::logic_error("Cannot update FIC before protocol"); } - m_fic = fic; + m_fic = move(fic); } void EdiReader::update_edi_time( @@ -469,7 +468,7 @@ void EdiReader::update_rfu(uint16_t rfu) m_rfu = rfu; } -void EdiReader::add_subchannel(const EdiDecoder::eti_stc_data& stc) +void EdiReader::add_subchannel(EdiDecoder::eti_stc_data&& stc) { if (not m_proto_valid) { throw std::logic_error("Cannot add subchannel before protocol"); @@ -485,7 +484,7 @@ void EdiReader::add_subchannel(const EdiDecoder::eti_stc_data& stc) throw std::invalid_argument( "EDI: MST data length inconsistent with FIC"); } - source->loadSubchannelData(stc.mst); + source->loadSubchannelData(move(stc.mst)); if (m_sources.size() > 64) { throw std::invalid_argument("Too many subchannels"); @@ -628,7 +627,10 @@ bool EdiTransport::rxPacket() } case Proto::TCP: { - m_tcpbuffer.resize(4096); + // The buffer size must be smaller than the size of two AF Packets, because otherwise + // the EDI decoder decodes two in a row and discards the first. This leads to ETI FCT + // discontinuity. + m_tcpbuffer.resize(512); const int timeout_ms = 1000; try { ssize_t ret = m_tcpclient.recv(m_tcpbuffer.data(), m_tcpbuffer.size(), 0, timeout_ms); @@ -644,11 +646,22 @@ bool EdiTransport::rxPacket() return true; } } - catch (const TCPSocket::Timeout&) { + catch (const Socket::TCPSocket::Timeout&) { return false; } } } throw logic_error("Incomplete rxPacket implementation!"); } -#endif // HAVE_EDI + +EdiInput::EdiInput(double& tist_offset_s, float edi_max_delay_ms) : + ediReader(tist_offset_s), + decoder(ediReader, false), + ediTransport(decoder) +{ + if (edi_max_delay_ms > 0.0f) { + // setMaxDelay wants number of AF packets, which correspond to 24ms ETI frames + decoder.setMaxDelay(lroundf(edi_max_delay_ms / 24.0f)); + } +} + diff --git a/src/EtiReader.h b/src/EtiReader.h index 38f7903..28fb2ac 100644 --- a/src/EtiReader.h +++ b/src/EtiReader.h @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 + Copyright (C) 2019 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -38,9 +38,6 @@ #include "SubchannelSource.h" #include "TimestampDecoder.h" #include "lib/edi/ETIDecoder.hpp" -#ifdef HAVE_EDI -# include "lib/UdpSocket.h" -#endif #include <vector> #include <memory> @@ -114,11 +111,10 @@ private: std::vector<std::shared_ptr<SubchannelSource> > mySources; }; -#ifdef HAVE_EDI /* The EdiReader extracts the necessary data using the EDI input library in * lib/edi */ -class EdiReader : public EtiSource, public EdiDecoder::DataCollector +class EdiReader : public EtiSource, public EdiDecoder::ETIDataCollector { public: EdiReader(double& tist_offset_s); @@ -142,7 +138,7 @@ public: // Update the data for the frame characterisation virtual void update_fc_data(const EdiDecoder::eti_fc_data& fc_data); - virtual void update_fic(const std::vector<uint8_t>& fic); + virtual void update_fic(std::vector<uint8_t>&& fic); virtual void update_err(uint8_t err); @@ -156,7 +152,7 @@ public: virtual void update_rfu(uint16_t rfu); - virtual void add_subchannel(const EdiDecoder::eti_stc_data& stc); + virtual void add_subchannel(EdiDecoder::eti_stc_data&& stc); // Gets called by the EDI library to tell us that all data for a frame was given to us virtual void assemble(void); @@ -211,10 +207,18 @@ class EdiTransport { enum class Proto { UDP, TCP }; Proto m_proto; - UdpReceiver m_udp_rx; + Socket::UDPReceiver m_udp_rx; std::vector<uint8_t> m_tcpbuffer; - TCPClient m_tcpclient; + Socket::TCPClient m_tcpclient; EdiDecoder::ETIDecoder& m_decoder; }; -#endif + +// EdiInput wraps an EdiReader, an EdiDecoder::ETIDecoder and an EdiTransport +class EdiInput { + public: + EdiInput(double& tist_offset_s, float edi_max_delay_ms); + EdiReader ediReader; + EdiDecoder::ETIDecoder decoder; + EdiTransport ediTransport; +}; diff --git a/src/Flowgraph.cpp b/src/Flowgraph.cpp index 4c83fe8..3d4cdcc 100644 --- a/src/Flowgraph.cpp +++ b/src/Flowgraph.cpp @@ -43,8 +43,7 @@ using EdgeIterator = std::vector<shared_ptr<Edge> >::iterator; Node::Node(shared_ptr<ModPlugin> plugin) : - myPlugin(plugin), - myProcessTime(0) + myPlugin(plugin) { PDEBUG("Node::Node(plugin(%s): %p) @ %p\n", plugin->name(), plugin.get(), this); @@ -237,8 +236,8 @@ Edge::~Edge() -Flowgraph::Flowgraph() : - myProcessTime(0) +Flowgraph::Flowgraph(bool showProcessTime) : + myShowProcessTime(showProcessTime) { PDEBUG("Flowgraph::Flowgraph() @ %p\n", this); } @@ -248,7 +247,7 @@ Flowgraph::~Flowgraph() { PDEBUG("Flowgraph::~Flowgraph() @ %p\n", this); - if (myProcessTime) { + if (myShowProcessTime and myProcessTime) { stringstream ss; ss << "Process time:\n"; diff --git a/src/Flowgraph.h b/src/Flowgraph.h index 389359b..753070b 100644 --- a/src/Flowgraph.h +++ b/src/Flowgraph.h @@ -71,7 +71,7 @@ protected: #endif std::shared_ptr<ModPlugin> myPlugin; - time_t myProcessTime; + time_t myProcessTime = 0; }; @@ -94,7 +94,7 @@ protected: class Flowgraph { public: - Flowgraph(); + Flowgraph(bool showProcessTime); virtual ~Flowgraph(); Flowgraph(const Flowgraph&) = delete; Flowgraph& operator=(const Flowgraph&) = delete; @@ -106,7 +106,8 @@ public: protected: std::vector<std::shared_ptr<Node> > nodes; std::vector<std::shared_ptr<Edge> > edges; - time_t myProcessTime; + time_t myProcessTime = 0; + bool myShowProcessTime; }; diff --git a/src/GainControl.cpp b/src/GainControl.cpp index 9cde8b1..b781640 100644 --- a/src/GainControl.cpp +++ b/src/GainControl.cpp @@ -28,8 +28,9 @@ #include "GainControl.h" #include "PcDebug.h" -#include <stdio.h> +#include <cstdio> #include <stdexcept> +#include <algorithm> #include <string> #ifdef __SSE__ diff --git a/src/GuardIntervalInserter.cpp b/src/GuardIntervalInserter.cpp index 6e1df4f..0cd5bd5 100644 --- a/src/GuardIntervalInserter.cpp +++ b/src/GuardIntervalInserter.cpp @@ -26,7 +26,8 @@ #include "GuardIntervalInserter.h" #include "PcDebug.h" -#include <string.h> +#include <cstring> +#include <cassert> #include <stdexcept> #include <complex> #include <mutex> diff --git a/src/InputReader.h b/src/InputReader.h index 63451e5..ab45d4f 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -144,7 +144,7 @@ class InputTcpReader : public InputReader virtual std::string GetPrintableInfo() const override; private: - TCPClient m_tcpclient; + Socket::TCPClient m_tcpclient; std::string m_uri; }; @@ -201,14 +201,6 @@ class InputZeroMQReader : public InputReader, public RemoteControllable zmq::context_t m_zmqcontext; // is thread-safe std::thread m_recv_thread; - - /* We must be careful to keep frame phase consistent. If we - * drop a single ETI frame, we will break the transmission - * frame vs. ETI frame phase. - * - * Here we keep track of how many ETI frames we must drop. - */ - int m_to_drop = 0; }; #endif diff --git a/src/Log.cpp b/src/Log.cpp deleted file mode 100644 index 4fc7ae3..0000000 --- a/src/Log.cpp +++ /dev/null @@ -1,191 +0,0 @@ -/* - Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 - Her Majesty the Queen in Right of Canada (Communications Research - Center Canada) - - Copyright (C) 2018 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMod. - - ODR-DabMod is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMod is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>. - */ - -#include <list> -#include <cstdarg> -#include <cinttypes> -#include <chrono> - -#include "Log.h" -#include "Utils.h" - -using namespace std; - -/* This is called etiLog because it was copy-pasted from ODR-DabMux, even - * though it doesn't make any more sense there than here. - * - * It is a singleton used in all parts of ODR-DabMod to output log messages. - */ -Logger etiLog; - -void Logger::register_backend(std::shared_ptr<LogBackend> backend) -{ - backends.push_back(backend); -} - - -void Logger::log(log_level_t lvl, const char* fmt, ...) -{ - int size = 100; - std::string str; - va_list ap; - while (1) { - str.resize(size); - va_start(ap, fmt); - int n = vsnprintf((char *)str.c_str(), size, fmt, ap); - va_end(ap); - if (n > -1 && n < size) { - str.resize(n); - break; - } - if (n > -1) - size = n + 1; - else - size *= 2; - } - - logstr(lvl, move(str)); -} - -void Logger::logstr(log_level_t lvl, std::string&& message) -{ - log_message_t m(lvl, move(message)); - m_message_queue.push(move(m)); -} - -void Logger::io_process() -{ - set_thread_name("logger"); - while (1) { - log_message_t m; - try { - m_message_queue.wait_and_pop(m); - } - catch (const ThreadsafeQueueWakeup&) { - break; - } - - auto message = m.message; - - /* Remove a potential trailing newline. - * It doesn't look good in syslog - */ - if (message[message.length()-1] == '\n') { - message.resize(message.length()-1); - } - - for (auto &backend : backends) { - backend->log(m.level, message); - } - - if (m.level != log_level_t::trace) { - std::lock_guard<std::mutex> guard(m_cerr_mutex); - std::cerr << levels_as_str[m.level] << " " << message << std::endl; - } - } -} - -LogLine Logger::level(log_level_t lvl) -{ - return LogLine(this, lvl); -} - -LogToFile::LogToFile(const std::string& filename) : name("FILE") -{ - FILE* fd = fopen(filename.c_str(), "a"); - if (fd == nullptr) { - fprintf(stderr, "Cannot open log file !"); - throw std::runtime_error("Cannot open log file !"); - } - - log_file.reset(fd); -} - -void LogToFile::log(log_level_t level, const std::string& message) -{ - if (level != log_level_t::trace) { - const char* log_level_text[] = { - "DEBUG", "INFO", "WARN", "ERROR", "ALERT", "EMERG"}; - - // fprintf is thread-safe - fprintf(log_file.get(), SYSLOG_IDENT ": %s: %s\n", - log_level_text[(size_t)level], message.c_str()); - fflush(log_file.get()); - } -} - -void LogToSyslog::log(log_level_t level, const std::string& message) -{ - if (level != log_level_t::trace) { - int syslog_level = LOG_EMERG; - switch (level) { - case trace: break; // Do not handle TRACE in syslog - case debug: syslog_level = LOG_DEBUG; break; - case info: syslog_level = LOG_INFO; break; - /* we don't have the notice level */ - case warn: syslog_level = LOG_WARNING; break; - case error: syslog_level = LOG_ERR; break; - default: syslog_level = LOG_CRIT; break; - case alert: syslog_level = LOG_ALERT; break; - case emerg: syslog_level = LOG_EMERG; break; - } - - syslog(syslog_level, SYSLOG_IDENT " %s", message.c_str()); - } -} - -LogTracer::LogTracer(const string& trace_filename) : name("TRACE") -{ - etiLog.level(info) << "Setting up TRACE to " << trace_filename; - - FILE* fd = fopen(trace_filename.c_str(), "a"); - if (fd == nullptr) { - fprintf(stderr, "Cannot open trace file !"); - throw std::runtime_error("Cannot open trace file !"); - } - m_trace_file.reset(fd); - - using namespace std::chrono; - auto now = steady_clock::now().time_since_epoch(); - m_trace_micros_startup = duration_cast<microseconds>(now).count(); - - fprintf(m_trace_file.get(), - "0,TRACER,startup at %" PRIu64 "\n", m_trace_micros_startup); -} - -void LogTracer::log(log_level_t level, const std::string& message) -{ - if (level == log_level_t::trace) { - using namespace std::chrono; - const auto now = steady_clock::now().time_since_epoch(); - const auto micros = duration_cast<microseconds>(now).count(); - - fprintf(m_trace_file.get(), "%" PRIu64 ",%s\n", - micros - m_trace_micros_startup, - message.c_str()); - } -} diff --git a/src/Log.h b/src/Log.h deleted file mode 100644 index 1253635..0000000 --- a/src/Log.h +++ /dev/null @@ -1,200 +0,0 @@ -/* - Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 - Her Majesty the Queen in Right of Canada (Communications Research - Center Canada) - - Copyright (C) 2018 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMod. - - ODR-DabMod is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMod is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#include <stdarg.h> -#include <stdio.h> -#include <syslog.h> -#include <fstream> -#include <sstream> -#include <iostream> -#include <list> -#include <stdexcept> -#include <string> -#include <map> -#include <mutex> -#include <memory> -#include <thread> -#include "ThreadsafeQueue.h" - -#define SYSLOG_IDENT "ODR-DabMod" -#define SYSLOG_FACILITY LOG_LOCAL0 - -enum log_level_t {debug = 0, info, warn, error, alert, emerg, trace}; - -static const std::string levels_as_str[] = - { " ", " ", "WARN ", "ERROR", "ALERT", "EMERG", "TRACE"} ; - -/** Abstract class all backends must inherit from */ -class LogBackend { - public: - virtual ~LogBackend() {}; - virtual void log(log_level_t level, const std::string& message) = 0; - virtual std::string get_name() const = 0; -}; - -/** A Logging backend for Syslog */ -class LogToSyslog : public LogBackend { - public: - LogToSyslog() : name("SYSLOG") { - openlog(SYSLOG_IDENT, LOG_PID, SYSLOG_FACILITY); - } - - virtual ~LogToSyslog() { - closelog(); - } - - void log(log_level_t level, const std::string& message); - - std::string get_name() const { return name; } - - private: - const std::string name; - - LogToSyslog(const LogToSyslog& other) = delete; - const LogToSyslog& operator=(const LogToSyslog& other) = delete; -}; - -class LogToFile : public LogBackend { - public: - LogToFile(const std::string& filename); - void log(log_level_t level, const std::string& message); - std::string get_name() const { return name; } - - private: - const std::string name; - - struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}}; - std::unique_ptr<FILE, FILEDeleter> log_file; - - LogToFile(const LogToFile& other) = delete; - const LogToFile& operator=(const LogToFile& other) = delete; -}; - -class LogTracer : public LogBackend { - public: - LogTracer(const std::string& filename); - void log(log_level_t level, const std::string& message); - std::string get_name() const { return name; } - private: - std::string name; - uint64_t m_trace_micros_startup = 0; - - struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}}; - std::unique_ptr<FILE, FILEDeleter> m_trace_file; - - LogTracer(const LogTracer& other) = delete; - const LogTracer& operator=(const LogTracer& other) = delete; -}; - -class LogLine; - -struct log_message_t { - log_message_t(log_level_t _level, std::string&& _message) : - level(_level), - message(move(_message)) {} - - log_message_t() : - level(debug), - message("") {} - - log_level_t level; - std::string message; -}; - -class Logger { - public: - Logger() { - m_io_thread = std::thread(&Logger::io_process, this); - } - - Logger(const Logger& other) = delete; - const Logger& operator=(const Logger& other) = delete; - ~Logger() { - m_message_queue.trigger_wakeup(); - m_io_thread.join(); - } - - void register_backend(std::shared_ptr<LogBackend> backend); - - /* Log the message to all backends */ - void log(log_level_t level, const char* fmt, ...); - - void logstr(log_level_t level, std::string&& message); - - /* All logging IO is done in another thread */ - void io_process(void); - - /* Return a LogLine for the given level - * so that you can write etiLog.level(info) << "stuff = " << 21 */ - LogLine level(log_level_t level); - - private: - std::list<std::shared_ptr<LogBackend> > backends; - - ThreadsafeQueue<log_message_t> m_message_queue; - std::thread m_io_thread; - std::mutex m_cerr_mutex; -}; - -extern Logger etiLog; - -// Accumulate a line of logs, using same syntax as stringstream -// The line is logged when the LogLine gets destroyed -class LogLine { - public: - LogLine(const LogLine& logline); - const LogLine& operator=(const LogLine& other) = delete; - LogLine(Logger* logger, log_level_t level) : - logger_(logger) - { - level_ = level; - } - - // Push the new element into the stringstream - template <typename T> - LogLine& operator<<(T s) { - os << s; - return *this; - } - - ~LogLine() - { - logger_->logstr(level_, os.str()); - } - - private: - std::ostringstream os; - log_level_t level_; - Logger* logger_; -}; - diff --git a/src/MemlessPoly.cpp b/src/MemlessPoly.cpp index ef77c07..905ca67 100644 --- a/src/MemlessPoly.cpp +++ b/src/MemlessPoly.cpp @@ -36,7 +36,8 @@ #include "PcDebug.h" #include "Utils.h" -#include <stdio.h> +#include <cstdio> +#include <cstring> #include <stdexcept> #include <string> #include <future> diff --git a/src/MemlessPoly.h b/src/MemlessPoly.h index 43fdb2d..4642596 100644 --- a/src/MemlessPoly.h +++ b/src/MemlessPoly.h @@ -30,20 +30,20 @@ # include <config.h> #endif - #include "RemoteControl.h" #include "ModPlugin.h" #include "PcDebug.h" #include "ThreadsafeQueue.h" #include <sys/types.h> +#include <array> #include <complex> +#include <memory> +#include <string> #include <thread> #include <vector> -#include <time.h> +#include <ctime> #include <cstdio> -#include <string> -#include <memory> #define MEMLESSPOLY_PIPELINE_DELAY 1 diff --git a/src/OutputFile.cpp b/src/OutputFile.cpp index 46a9ec9..acaebad 100644 --- a/src/OutputFile.cpp +++ b/src/OutputFile.cpp @@ -27,12 +27,12 @@ #include "OutputFile.h" #include "PcDebug.h" #include "Log.h" -#include "TimestampDecoder.h" #include <string> -#include <assert.h> +#include <chrono> #include <stdexcept> - +#include <cassert> +#include <cmath> using namespace std; @@ -53,7 +53,6 @@ OutputFile::OutputFile(const std::string& filename, bool show_metadata) : myFile.reset(fd); } - int OutputFile::process(Buffer* dataIn) { PDEBUG("OutputFile::process(%p)\n", dataIn); @@ -72,8 +71,18 @@ meta_vec_t OutputFile::process_metadata(const meta_vec_t& metadataIn) if (myShowMetadata) { stringstream ss; + frame_timestamp first_ts; + for (const auto& md : metadataIn) { if (md.ts) { + // The following code assumes TM I, where we get called every 96ms. + // Support for other transmission modes skipped because this is mostly + // debugging code. + + if (md.ts->fp == 0 or md.ts->fp == 4) { + first_ts = *md.ts; + } + ss << " FCT=" << md.ts->fct << " FP=" << (int)md.ts->fp; if (md.ts->timestamp_valid) { @@ -90,13 +99,46 @@ meta_vec_t OutputFile::process_metadata(const meta_vec_t& metadataIn) } } - if (metadataIn.empty()) { - etiLog.level(debug) << "Output File got no mdIn"; + if (myLastTimestamp.timestamp_valid) { + if (first_ts.timestamp_valid) { + uint32_t timestamp = myLastTimestamp.timestamp_pps; + timestamp += 96 << 14; // Shift 96ms by 14 to Timestamp level 2 + if (timestamp > 0xf9FFff) { + timestamp -= 0xfa0000; // Substract 16384000, corresponding to one second + myLastTimestamp.timestamp_sec += 1; + } + myLastTimestamp.timestamp_pps = timestamp; + + if (myLastTimestamp.timestamp_sec != first_ts.timestamp_sec or + myLastTimestamp.timestamp_pps != first_ts.timestamp_pps) { + ss << " TS wrong interval; "; + } + myLastTimestamp = first_ts; + } + else { + ss << " TS of FP=0 MISSING; "; + myLastTimestamp.timestamp_valid = false; + } } else { - etiLog.level(debug) << "Output File got metadata: " << ss.str(); + // Includes invalid and valid cases + myLastTimestamp = first_ts; } + if (metadataIn.empty()) { + etiLog.level(debug) << "Output File got no metadata"; + } + else { + using namespace std::chrono; + const auto now = system_clock::now(); + const int64_t ticks_now = duration_cast<milliseconds>(now.time_since_epoch()).count(); + //const int64_t first_ts_ticks = first_ts.timestamp_sec * 1000 + first_ts.timestamp_pps / 16384; + const int64_t first_ts_ticks = std::llrint(first_ts.get_real_secs() * 1000); + + ss << " DELTA: " << first_ts_ticks - ticks_now << "ms;"; + + etiLog.level(debug) << "Output File metadata: " << ss.str(); + } } return {}; } diff --git a/src/OutputFile.h b/src/OutputFile.h index 745e672..b10d406 100644 --- a/src/OutputFile.h +++ b/src/OutputFile.h @@ -33,10 +33,11 @@ #include "ModPlugin.h" #include "EtiReader.h" +#include "TimestampDecoder.h" #include <string> -#include <stdio.h> -#include <sys/types.h> +#include <cstdio> +#include <cstdint> #include <memory> class OutputFile : public ModOutput, public ModMetadata @@ -52,6 +53,7 @@ public: protected: bool myShowMetadata = false; + frame_timestamp myLastTimestamp; std::string myFilename; struct FILEDeleter{ void operator()(FILE* fd){ if (fd) fclose(fd); }}; diff --git a/src/OutputMemory.cpp b/src/OutputMemory.cpp index 5f24095..d6ef917 100644 --- a/src/OutputMemory.cpp +++ b/src/OutputMemory.cpp @@ -27,6 +27,7 @@ #include "OutputMemory.h" #include "PcDebug.h" #include "Log.h" +#include "TimestampDecoder.h" #include <stdexcept> #include <string.h> diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp deleted file mode 100644 index 1065456..0000000 --- a/src/RemoteControl.cpp +++ /dev/null @@ -1,588 +0,0 @@ -/* - Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012 - Her Majesty the Queen in Right of Canada (Communications Research - Center Canada) - - Copyright (C) 2019 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMod. - - ODR-DabMod is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMod is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>. - */ -#include <list> -#include <string> -#include <iostream> -#include <string> -#include <thread> -#include <functional> -#include <asio.hpp> -#include "RemoteControl.h" - -using asio::ip::tcp; - -using namespace std; - -RemoteControllers rcs; - -RemoteControllerTelnet::~RemoteControllerTelnet() -{ - m_active = false; - m_io_service.stop(); - - if (m_restarter_thread.joinable()) { - m_restarter_thread.join(); - } - - if (m_child_thread.joinable()) { - m_child_thread.join(); - } -} - -void RemoteControllerTelnet::restart() -{ - if (m_restarter_thread.joinable()) { - m_restarter_thread.join(); - } - - m_restarter_thread = std::thread( - &RemoteControllerTelnet::restart_thread, - this, 0); -} - -RemoteControllable::~RemoteControllable() { - rcs.remove_controllable(this); -} - -std::list<std::string> RemoteControllable::get_supported_parameters() const { - std::list<std::string> parameterlist; - for (const auto& param : m_parameters) { - parameterlist.push_back(param[0]); - } - return parameterlist; -} - -void RemoteControllers::add_controller(std::shared_ptr<BaseRemoteController> rc) { - m_controllers.push_back(rc); -} - -void RemoteControllers::enrol(RemoteControllable *rc) { - controllables.push_back(rc); -} - -void RemoteControllers::remove_controllable(RemoteControllable *rc) { - controllables.remove(rc); -} - -std::list< std::vector<std::string> > RemoteControllers::get_param_list_values(const std::string& name) { - RemoteControllable* controllable = get_controllable_(name); - - std::list< std::vector<std::string> > allparams; - for (auto ¶m : controllable->get_supported_parameters()) { - std::vector<std::string> item; - item.push_back(param); - try { - item.push_back(controllable->get_parameter(param)); - } - catch (const ParameterError &e) { - item.push_back(std::string("error: ") + e.what()); - } - - allparams.push_back(item); - } - return allparams; -} - -std::string RemoteControllers::get_param(const std::string& name, const std::string& param) { - RemoteControllable* controllable = get_controllable_(name); - return controllable->get_parameter(param); -} - -void RemoteControllers::check_faults() { - for (auto &controller : m_controllers) { - if (controller->fault_detected()) { - etiLog.level(warn) << - "Detected Remote Control fault, restarting it"; - controller->restart(); - } - } -} - -RemoteControllable* RemoteControllers::get_controllable_(const std::string& name) -{ - auto rc = std::find_if(controllables.begin(), controllables.end(), - [&](RemoteControllable* r) { return r->get_rc_name() == name; }); - - if (rc == controllables.end()) { - throw ParameterError("Module name unknown"); - } - else { - return *rc; - } -} - -void RemoteControllers::set_param( - const std::string& name, - const std::string& param, - const std::string& value) -{ - RemoteControllable* controllable = get_controllable_(name); - try { - return controllable->set_parameter(param, value); - } - catch (const ios_base::failure& e) { - etiLog.level(info) << "RC: Failed to set " << name << " " << param - << " to " << value << ": " << e.what(); - throw ParameterError("Cannot understand value"); - } -} - -// This runs in a separate thread, because -// it would take too long to be done in the main loop -// thread. -void RemoteControllerTelnet::restart_thread(long) -{ - m_active = false; - m_io_service.stop(); - - if (m_child_thread.joinable()) { - m_child_thread.join(); - } - - m_child_thread = std::thread(&RemoteControllerTelnet::process, this, 0); -} - -void RemoteControllerTelnet::handle_accept( - std::shared_ptr<asio::ip::tcp::socket> socket, - const asio::error_code& asio_error) -{ - - const std::string welcome = "ODR-DabMod Remote Control CLI\n" - "Write 'help' for help.\n" - "**********\n"; - const std::string prompt = "> "; - - std::string in_message; - size_t length; - - if (asio_error) { - etiLog.level(error) << "RC: Error accepting connection"; - return; - } - - try { - etiLog.level(info) << "RC: Accepted"; - - asio::error_code ignored_error; - - asio::write(*socket, asio::buffer(welcome), - asio::transfer_all(), - ignored_error); - - while (m_active && in_message != "quit") { - asio::write(*socket, asio::buffer(prompt), - asio::transfer_all(), - ignored_error); - - in_message = ""; - - asio::streambuf buffer; - length = asio::read_until(*socket, buffer, "\n", ignored_error); - - std::istream str(&buffer); - std::getline(str, in_message); - - if (length == 0) { - etiLog.level(info) << "RC: Connection terminated"; - break; - } - - while (in_message.length() > 0 && - (in_message[in_message.length()-1] == '\r' || - in_message[in_message.length()-1] == '\n')) { - in_message.erase(in_message.length()-1, 1); - } - - if (in_message.length() == 0) { - continue; - } - - etiLog.level(info) << "RC: Got message '" << in_message << "'"; - - dispatch_command(*socket, in_message); - } - etiLog.level(info) << "RC: Closing socket"; - socket->close(); - } - catch (const std::exception& e) - { - etiLog.level(error) << "Remote control caught exception: " << e.what(); - } -} - -void RemoteControllerTelnet::process(long) -{ - m_active = true; - - while (m_active) { - m_io_service.reset(); - - tcp::acceptor acceptor(m_io_service, tcp::endpoint( - asio::ip::address::from_string("127.0.0.1"), m_port) ); - - // Add a job to start accepting connections. - auto socket = make_shared<tcp::socket>(acceptor.get_io_service()); - - // Add an accept call to the service. This will prevent io_service::run() - // from returning. - etiLog.level(info) << "RC: Waiting for connection on port " << m_port; - acceptor.async_accept(*socket, - bind(&RemoteControllerTelnet::handle_accept, this, - socket, - std::placeholders::_1)); - - // Process event loop. - m_io_service.run(); - } - - etiLog.level(info) << "RC: Leaving"; - m_fault = true; -} - -static std::vector<std::string> tokenise(const std::string& message) { - stringstream ss(message); - std::vector<std::string> all_tokens; - std::string item; - - while (std::getline(ss, item, ' ')) { - all_tokens.push_back(move(item)); - } - return all_tokens; -} - - -void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string command) -{ - vector<string> cmd = tokenise(command); - - if (cmd[0] == "help") { - reply(socket, - "The following commands are supported:\n" - " list\n" - " * Lists the modules that are loaded and their parameters\n" - " show MODULE\n" - " * Lists all parameters and their values from module MODULE\n" - " get MODULE PARAMETER\n" - " * Gets the value for the specified PARAMETER from module MODULE\n" - " set MODULE PARAMETER VALUE\n" - " * Sets the value for the PARAMETER ofr module MODULE\n" - " quit\n" - " * Terminate this session\n" - "\n"); - } - else if (cmd[0] == "list") { - stringstream ss; - - if (cmd.size() == 1) { - for (auto &controllable : rcs.controllables) { - ss << controllable->get_rc_name() << endl; - - list< vector<string> > params = controllable->get_parameter_descriptions(); - for (auto ¶m : params) { - ss << "\t" << param[0] << " : " << param[1] << endl; - } - } - } - else { - reply(socket, "Too many arguments for command 'list'"); - } - - reply(socket, ss.str()); - } - else if (cmd[0] == "show") { - if (cmd.size() == 2) { - try { - stringstream ss; - list< vector<string> > r = rcs.get_param_list_values(cmd[1]); - for (auto ¶m_val : r) { - ss << param_val[0] << ": " << param_val[1] << endl; - } - reply(socket, ss.str()); - - } - catch (const ParameterError &e) { - reply(socket, e.what()); - } - } - else { - reply(socket, "Incorrect parameters for command 'show'"); - } - } - else if (cmd[0] == "get") { - if (cmd.size() == 3) { - try { - string r = rcs.get_param(cmd[1], cmd[2]); - reply(socket, r); - } - catch (const ParameterError &e) { - reply(socket, e.what()); - } - } - else { - reply(socket, "Incorrect parameters for command 'get'"); - } - } - else if (cmd[0] == "set") { - if (cmd.size() >= 4) { - try { - stringstream new_param_value; - for (size_t i = 3; i < cmd.size(); i++) { - new_param_value << cmd[i]; - - if (i+1 < cmd.size()) { - new_param_value << " "; - } - } - - rcs.set_param(cmd[1], cmd[2], new_param_value.str()); - reply(socket, "ok"); - } - catch (const ParameterError &e) { - reply(socket, e.what()); - } - catch (const exception &e) { - reply(socket, "Error: Invalid parameter value. "); - } - } - else { - reply(socket, "Incorrect parameters for command 'set'"); - } - } - else if (cmd[0] == "quit") { - reply(socket, "Goodbye"); - } - else { - reply(socket, "Message not understood"); - } -} - -void RemoteControllerTelnet::reply(tcp::socket& socket, string message) -{ - asio::error_code ignored_error; - stringstream ss; - ss << message << "\r\n"; - asio::write(socket, asio::buffer(ss.str()), - asio::transfer_all(), - ignored_error); -} - -#if defined(HAVE_ZEROMQ) - -RemoteControllerZmq::~RemoteControllerZmq() { - m_active = false; - m_fault = false; - - if (m_restarter_thread.joinable()) { - m_restarter_thread.join(); - } - - if (m_child_thread.joinable()) { - m_child_thread.join(); - } -} - -void RemoteControllerZmq::restart() -{ - if (m_restarter_thread.joinable()) { - m_restarter_thread.join(); - } - - m_restarter_thread = std::thread(&RemoteControllerZmq::restart_thread, this); -} - -// This runs in a separate thread, because -// it would take too long to be done in the main loop -// thread. -void RemoteControllerZmq::restart_thread() -{ - m_active = false; - - if (m_child_thread.joinable()) { - m_child_thread.join(); - } - - m_child_thread = std::thread(&RemoteControllerZmq::process, this); -} - -void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector<std::string> &message) -{ - bool more = true; - do { - zmq::message_t msg; - pSocket.recv(&msg); - std::string incoming((char*)msg.data(), msg.size()); - message.push_back(incoming); - more = msg.more(); - } while (more); -} - -void RemoteControllerZmq::send_ok_reply(zmq::socket_t &pSocket) -{ - zmq::message_t msg(2); - char repCode[2] = {'o', 'k'}; - memcpy ((void*) msg.data(), repCode, 2); - pSocket.send(msg, 0); -} - -void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::string &error) -{ - zmq::message_t msg1(4); - char repCode[4] = {'f', 'a', 'i', 'l'}; - memcpy ((void*) msg1.data(), repCode, 4); - pSocket.send(msg1, ZMQ_SNDMORE); - - zmq::message_t msg2(error.length()); - memcpy ((void*) msg2.data(), error.c_str(), error.length()); - pSocket.send(msg2, 0); -} - -void RemoteControllerZmq::process() -{ - m_fault = false; - - // create zmq reply socket for receiving ctrl parameters - try { - zmq::socket_t repSocket(m_zmqContext, ZMQ_REP); - - // connect the socket - int hwm = 100; - int linger = 0; - repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); - repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); - repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); - repSocket.bind(m_endpoint.c_str()); - - // create pollitem that polls the ZMQ sockets - zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; - while (m_active) { - zmq::poll(pollItems, 1, 100); - std::vector<std::string> msg; - - if (pollItems[0].revents & ZMQ_POLLIN) { - recv_all(repSocket, msg); - - std::string command((char*)msg[0].data(), msg[0].size()); - - if (msg.size() == 1 && command == "ping") { - send_ok_reply(repSocket); - } - else if (msg.size() == 1 && command == "list") { - size_t cohort_size = rcs.controllables.size(); - for (auto &controllable : rcs.controllables) { - std::stringstream ss; - ss << "{ \"name\": \"" << controllable->get_rc_name() << "\"," << - " \"params\": { "; - - list< vector<string> > params = controllable->get_parameter_descriptions(); - size_t i = 0; - for (auto ¶m : params) { - if (i > 0) { - ss << ", "; - } - - ss << "\"" << param[0] << "\": " << - "\"" << param[1] << "\""; - - i++; - } - - ss << " } }"; - - std::string msg_s = ss.str(); - - zmq::message_t zmsg(ss.str().size()); - memcpy ((void*) zmsg.data(), msg_s.data(), msg_s.size()); - - int flag = (--cohort_size > 0) ? ZMQ_SNDMORE : 0; - repSocket.send(zmsg, flag); - } - } - else if (msg.size() == 2 && command == "show") { - std::string module((char*) msg[1].data(), msg[1].size()); - try { - list< vector<string> > r = rcs.get_param_list_values(module); - size_t r_size = r.size(); - for (auto ¶m_val : r) { - std::stringstream ss; - ss << param_val[0] << ": " << param_val[1] << endl; - zmq::message_t zmsg(ss.str().size()); - memcpy(zmsg.data(), ss.str().data(), ss.str().size()); - - int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0; - repSocket.send(zmsg, flag); - } - } - catch (const ParameterError &err) { - send_fail_reply(repSocket, err.what()); - } - } - else if (msg.size() == 3 && command == "get") { - std::string module((char*) msg[1].data(), msg[1].size()); - std::string parameter((char*) msg[2].data(), msg[2].size()); - - try { - std::string value = rcs.get_param(module, parameter); - zmq::message_t zmsg(value.size()); - memcpy ((void*) zmsg.data(), value.data(), value.size()); - repSocket.send(zmsg, 0); - } - catch (const ParameterError &err) { - send_fail_reply(repSocket, err.what()); - } - } - else if (msg.size() == 4 && command == "set") { - std::string module((char*) msg[1].data(), msg[1].size()); - std::string parameter((char*) msg[2].data(), msg[2].size()); - std::string value((char*) msg[3].data(), msg[3].size()); - - try { - rcs.set_param(module, parameter, value); - send_ok_reply(repSocket); - } - catch (const ParameterError &err) { - send_fail_reply(repSocket, err.what()); - } - } - else { - send_fail_reply(repSocket, - "Unsupported command. commands: list, show, get, set"); - } - } - } - repSocket.close(); - } - catch (const zmq::error_t &e) { - etiLog.level(error) << "ZMQ RC error: " << std::string(e.what()); - } - catch (const std::exception& e) { - etiLog.level(error) << "ZMQ RC caught exception: " << e.what(); - m_fault = true; - } -} - -#endif - diff --git a/src/RemoteControl.h b/src/RemoteControl.h deleted file mode 100644 index 087b94a..0000000 --- a/src/RemoteControl.h +++ /dev/null @@ -1,259 +0,0 @@ -/* - Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012 - Her Majesty the Queen in Right of Canada (Communications Research - Center Canada) - - Copyright (C) 2019 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - - This module adds remote-control capability to some of the dabmod modules. - */ -/* - This file is part of ODR-DabMod. - - ODR-DabMod is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMod is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#if defined(HAVE_ZEROMQ) -# include "zmq.hpp" -#endif - -#include <list> -#include <map> -#include <memory> -#include <string> -#include <atomic> -#include <iostream> -#include <thread> -#include <asio.hpp> -#include <stdexcept> - -#include "Log.h" - -#define RC_ADD_PARAMETER(p, desc) { \ - std::vector<std::string> p; \ - p.push_back(#p); \ - p.push_back(desc); \ - m_parameters.push_back(p); \ -} - -class ParameterError : public std::exception -{ - public: - ParameterError(std::string message) : m_message(message) {} - ~ParameterError() throw() {} - const char* what() const throw() { return m_message.c_str(); } - - private: - std::string m_message; -}; - -class RemoteControllable; - -/* Remote controllers (that recieve orders from the user) - * must implement BaseRemoteController - */ -class BaseRemoteController { - public: - /* When this returns one, the remote controller cannot be - * used anymore, and must be restarted by DabMod - */ - virtual bool fault_detected() = 0; - - /* In case of a fault, the remote controller can be - * restarted. - */ - virtual void restart() = 0; - - virtual ~BaseRemoteController() {} -}; - -/* Objects that support remote control must implement the following class */ -class RemoteControllable { - public: - RemoteControllable(const std::string& name) : - m_rc_name(name) {} - - RemoteControllable(const RemoteControllable& other) = delete; - RemoteControllable& operator=(const RemoteControllable& other) = delete; - - virtual ~RemoteControllable(); - - /* return a short name used to identify the controllable. - * It might be used in the commands the user has to type, so keep - * it short - */ - virtual std::string get_rc_name() const { return m_rc_name; } - - /* Return a list of possible parameters that can be set */ - virtual std::list<std::string> get_supported_parameters() const; - - /* Return a mapping of the descriptions of all parameters */ - virtual std::list< std::vector<std::string> > - get_parameter_descriptions() const - { - return m_parameters; - } - - /* Base function to set parameters. */ - virtual void set_parameter( - const std::string& parameter, - const std::string& value) = 0; - - /* Getting a parameter always returns a string. */ - virtual const std::string get_parameter(const std::string& parameter) const = 0; - - protected: - std::string m_rc_name; - std::list< std::vector<std::string> > m_parameters; -}; - -/* Holds all our remote controllers and controlled object. - */ -class RemoteControllers { - public: - void add_controller(std::shared_ptr<BaseRemoteController> rc); - void enrol(RemoteControllable *rc); - void remove_controllable(RemoteControllable *rc); - void check_faults(); - std::list< std::vector<std::string> > get_param_list_values(const std::string& name); - std::string get_param(const std::string& name, const std::string& param); - - void set_param( - const std::string& name, - const std::string& param, - const std::string& value); - - std::list<RemoteControllable*> controllables; - - private: - RemoteControllable* get_controllable_(const std::string& name); - - std::list<std::shared_ptr<BaseRemoteController> > m_controllers; -}; - -extern RemoteControllers rcs; - -/* Implements a Remote controller based on a simple telnet CLI - * that listens on localhost - */ -class RemoteControllerTelnet : public BaseRemoteController { - public: - RemoteControllerTelnet() - : m_active(false), - m_io_service(), - m_fault(false), - m_port(0) { } - - RemoteControllerTelnet(int port) - : m_active(port > 0), - m_io_service(), - m_fault(false), - m_port(port) - { - restart(); - } - - - RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other) = delete; - RemoteControllerTelnet(const RemoteControllerTelnet& other) = delete; - - ~RemoteControllerTelnet(); - - virtual bool fault_detected() { return m_fault; } - - virtual void restart(); - - private: - void restart_thread(long); - - void process(long); - - void dispatch_command(asio::ip::tcp::socket& socket, - std::string command); - - void reply(asio::ip::tcp::socket& socket, std::string message); - - void handle_accept( - std::shared_ptr<asio::ip::tcp::socket> socket, - const asio::error_code& asio_error); - - std::atomic<bool> m_active; - - asio::io_service m_io_service; - - /* This is set to true if a fault occurred */ - std::atomic<bool> m_fault; - std::thread m_restarter_thread; - - std::thread m_child_thread; - - int m_port; -}; - -#if defined(HAVE_ZEROMQ) -/* Implements a Remote controller using zmq transportlayer - * that listens on localhost - */ -class RemoteControllerZmq : public BaseRemoteController { - public: - RemoteControllerZmq() - : m_active(false), m_fault(false), - m_zmqContext(1), - m_endpoint("") { } - - RemoteControllerZmq(const std::string& endpoint) - : m_active(not endpoint.empty()), m_fault(false), - m_zmqContext(1), - m_endpoint(endpoint), - m_child_thread(&RemoteControllerZmq::process, this) { } - - RemoteControllerZmq& operator=(const RemoteControllerZmq& other) = delete; - RemoteControllerZmq(const RemoteControllerZmq& other) = delete; - - ~RemoteControllerZmq(); - - virtual bool fault_detected() { return m_fault; } - - virtual void restart(); - - private: - void restart_thread(); - - void recv_all(zmq::socket_t &pSocket, std::vector<std::string> &message); - void send_ok_reply(zmq::socket_t &pSocket); - void send_fail_reply(zmq::socket_t &pSocket, const std::string &error); - void process(); - - std::atomic<bool> m_active; - - /* This is set to true if a fault occurred */ - std::atomic<bool> m_fault; - std::thread m_restarter_thread; - - zmq::context_t m_zmqContext; - - std::string m_endpoint; - std::thread m_child_thread; -}; -#endif - diff --git a/src/Socket.cpp b/src/Socket.cpp deleted file mode 100644 index 08cda68..0000000 --- a/src/Socket.cpp +++ /dev/null @@ -1,275 +0,0 @@ -/* - Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the - Queen in Right of Canada (Communications Research Center Canada) - - Copyright (C) 2018 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://opendigitalradio.org -*/ - -/* - This file is part of ODR-DabMod. - - ODR-DabMod is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMod is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "Socket.h" -#include "Log.h" -#include <fcntl.h> - -TCPSocket::TCPSocket() -{ - if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { - throw std::runtime_error("Can't create TCP socket"); - } - -#if defined(HAVE_SO_NOSIGPIPE) - int val = 1; - if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, - &val, sizeof(val)) < 0) { - throw std::runtime_error("Can't set SO_NOSIGPIPE"); - } -#endif -} - -TCPSocket::~TCPSocket() -{ - if (m_sock != -1) { - ::close(m_sock); - } -} - -TCPSocket::TCPSocket(TCPSocket&& other) -{ - m_sock = other.m_sock; - - if (other.m_sock != -1) { - other.m_sock = -1; - } -} - -TCPSocket& TCPSocket::operator=(TCPSocket&& other) -{ - m_sock = other.m_sock; - - if (other.m_sock != -1) { - other.m_sock = -1; - } - - return *this; -} - -bool TCPSocket::valid() const -{ - return m_sock != -1; -} - -void TCPSocket::connect(const std::string& hostname, int port) -{ - struct sockaddr_in addr; - addr.sin_family = PF_INET; - addr.sin_addr.s_addr = htons(INADDR_ANY); - addr.sin_port = htons(port); - - hostent *host = gethostbyname(hostname.c_str()); - if (host) { - addr.sin_addr = *(in_addr *)(host->h_addr); - } - else { - std::string errstr(strerror(errno)); - throw std::runtime_error( - "could not resolve hostname " + - hostname + ":" + std::to_string(port) + - " : " + errstr); - } - - int ret = ::connect(m_sock, (struct sockaddr*)&addr, sizeof(addr)); - if (ret == -1 and errno != EINPROGRESS) { - std::string errstr(strerror(errno)); - throw std::runtime_error( - "could not connect to " + - hostname + ":" + std::to_string(port) + - " : " + errstr); - } -} - -void TCPSocket::listen(int port) -{ - struct sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = htonl(INADDR_ANY); - - const int reuse = 1; - if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, - &reuse, sizeof(reuse)) < 0) { - throw std::runtime_error("Can't reuse address for TCP socket"); - } - - if (::bind(m_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { - close(); - throw std::runtime_error("Can't bind TCP socket"); - } - - if (::listen(m_sock, 1) < 0) { - close(); - m_sock = -1; - throw std::runtime_error("Can't listen TCP socket"); - } - -} - -void TCPSocket::close() -{ - ::close(m_sock); - m_sock = -1; -} - -TCPSocket TCPSocket::accept_with_timeout(int timeout_ms, struct sockaddr_in *client) -{ - struct pollfd fds[1]; - fds[0].fd = m_sock; - fds[0].events = POLLIN; - - int retval = poll(fds, 1, timeout_ms); - - if (retval == -1) { - std::string errstr(strerror(errno)); - throw std::runtime_error("TCP Socket accept error: " + errstr); - } - else if (retval > 0) { - socklen_t client_len = sizeof(struct sockaddr_in); - int sockfd = accept(m_sock, (struct sockaddr*)&client, &client_len); - TCPSocket s(sockfd); - return s; - } - else { - TCPSocket s(-1); - return s; - } -} - -ssize_t TCPSocket::sendall(const void *buffer, size_t buflen) -{ - uint8_t *buf = (uint8_t*)buffer; - while (buflen > 0) { - /* On Linux, the MSG_NOSIGNAL flag ensures that the process - * would not receive a SIGPIPE and die. - * Other systems have SO_NOSIGPIPE set on the socket for the - * same effect. */ -#if defined(HAVE_MSG_NOSIGNAL) - const int flags = MSG_NOSIGNAL; -#else - const int flags = 0; -#endif - ssize_t sent = ::send(m_sock, buf, buflen, flags); - if (sent < 0) { - return -1; - } - else { - buf += sent; - buflen -= sent; - } - } - return buflen; -} - -ssize_t TCPSocket::recv(void *buffer, size_t length, int flags) -{ - ssize_t ret = ::recv(m_sock, buffer, length, flags); - if (ret == -1) { - std::string errstr(strerror(errno)); - throw std::runtime_error("TCP receive error: " + errstr); - } - return ret; -} - -ssize_t TCPSocket::recv(void *buffer, size_t length, int flags, int timeout_ms) -{ - struct pollfd fds[1]; - fds[0].fd = m_sock; - fds[0].events = POLLIN; - - int retval = poll(fds, 1, timeout_ms); - - if (retval == -1 and errno == EINTR) { - throw Interrupted(); - } - else if (retval == -1) { - std::string errstr(strerror(errno)); - throw std::runtime_error("TCP receive with poll() error: " + errstr); - } - else if (retval > 0 and (fds[0].revents | POLLIN)) { - ssize_t ret = ::recv(m_sock, buffer, length, flags); - if (ret == -1) { - if (errno == ECONNREFUSED) { - return 0; - } - std::string errstr(strerror(errno)); - throw std::runtime_error("TCP receive after poll() error: " + errstr); - } - return ret; - } - else { - throw Timeout(); - } -} - -TCPSocket::TCPSocket(int sockfd) { - m_sock = sockfd; -} - -void TCPClient::connect(const std::string& hostname, int port) -{ - m_hostname = hostname; - m_port = port; - reconnect(); -} - -ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms) -{ - try { - ssize_t ret = m_sock.recv(buffer, length, flags, timeout_ms); - - if (ret == 0) { - m_sock.close(); - - TCPSocket newsock; - m_sock = std::move(newsock); - reconnect(); - } - - return ret; - } - catch (const TCPSocket::Interrupted&) { - return -1; - } - catch (const TCPSocket::Timeout&) { - return 0; - } - - return 0; -} - -void TCPClient::reconnect() -{ - int flags = fcntl(m_sock.m_sock, F_GETFL); - if (fcntl(m_sock.m_sock, F_SETFL, flags | O_NONBLOCK) == -1) { - std::string errstr(strerror(errno)); - throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr); - } - - m_sock.connect(m_hostname, m_port); -} diff --git a/src/Socket.h b/src/Socket.h deleted file mode 100644 index 14c5cbe..0000000 --- a/src/Socket.h +++ /dev/null @@ -1,104 +0,0 @@ -/* - Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the - Queen in Right of Canada (Communications Research Center Canada) - - Copyright (C) 2018 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://opendigitalradio.org - -DESCRIPTION: - Abstraction for sockets. -*/ - -/* - This file is part of ODR-DabMod. - - ODR-DabMod is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMod is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#ifdef HAVE_CONFIG_H -# include <config.h> -#endif - -#include <stdexcept> -#include <string> -#include <cstdint> -#include <cstring> -#include <unistd.h> -#include <errno.h> -#include <poll.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <netdb.h> -#include <arpa/inet.h> - -class TCPSocket { - public: - TCPSocket(); - ~TCPSocket(); - TCPSocket(const TCPSocket& other) = delete; - TCPSocket& operator=(const TCPSocket& other) = delete; - TCPSocket(TCPSocket&& other); - TCPSocket& operator=(TCPSocket&& other); - - bool valid(void) const; - void connect(const std::string& hostname, int port); - void listen(int port); - void close(void); - - /* throws a runtime_error on failure, an invalid socket on timeout */ - TCPSocket accept_with_timeout(int timeout_ms, struct sockaddr_in *client); - - /* returns -1 on error */ - ssize_t sendall(const void *buffer, size_t buflen); - - /* Returns number of bytes read, 0 on disconnect. Throws a - * runtime_error on error */ - ssize_t recv(void *buffer, size_t length, int flags); - - class Timeout {}; - class Interrupted {}; - /* Returns number of bytes read, 0 on disconnect or refused connection. - * Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error - * on error - */ - ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms); - - private: - explicit TCPSocket(int sockfd); - int m_sock = -1; - - friend class TCPClient; -}; - -/* Implement a TCP receiver that auto-reconnects on errors */ -class TCPClient { - public: - void connect(const std::string& hostname, int port); - - /* Returns numer of bytes read, 0 on auto-reconnect, -1 - * on interruption. - * Throws a runtime_error on error */ - ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms); - - private: - void reconnect(void); - TCPSocket m_sock; - std::string m_hostname; - int m_port; -}; - diff --git a/src/SubchannelSource.cpp b/src/SubchannelSource.cpp index b4d6750..443b5d2 100644 --- a/src/SubchannelSource.cpp +++ b/src/SubchannelSource.cpp @@ -1046,9 +1046,9 @@ size_t SubchannelSource::protectionOption() const return 0; } -void SubchannelSource::loadSubchannelData(const Buffer& data) +void SubchannelSource::loadSubchannelData(Buffer&& data) { - d_buffer = data; + d_buffer = std::move(data); } int SubchannelSource::process(Buffer* outputData) diff --git a/src/SubchannelSource.h b/src/SubchannelSource.h index b4ca697..68e6ff2 100644 --- a/src/SubchannelSource.h +++ b/src/SubchannelSource.h @@ -59,7 +59,7 @@ public: size_t protectionOption() const; const std::vector<PuncturingRule>& get_rules() const; - void loadSubchannelData(const Buffer& data); + void loadSubchannelData(Buffer&& data); int process(Buffer* outputData); const char* name() { return "SubchannelSource"; } diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h deleted file mode 100644 index ee26ca0..0000000 --- a/src/ThreadsafeQueue.h +++ /dev/null @@ -1,178 +0,0 @@ -/* - Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in - Right of Canada (Communications Research Center Canada) - - Copyright (C) 2018 - Matthias P. Braendli, matthias.braendli@mpb.li - - An implementation for a threadsafe queue, depends on C++11 - - When creating a ThreadsafeQueue, one can specify the minimal number - of elements it must contain before it is possible to take one - element out. - */ -/* - This file is part of ODR-DabMod. - - ODR-DabMod is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMod is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#include <mutex> -#include <condition_variable> -#include <queue> -#include <utility> - -/* This queue is meant to be used by two threads. One producer - * that pushes elements into the queue, and one consumer that - * retrieves the elements. - * - * The queue can make the consumer block until an element - * is available, or a wakeup requested. - */ - -/* Class thrown by blocking pop to tell the consumer - * that there's a wakeup requested. */ -class ThreadsafeQueueWakeup {}; - -template<typename T> -class ThreadsafeQueue -{ -public: - /* Push one element into the queue, and notify another thread that - * might be waiting. - * - * returns the new queue size. - */ - size_t push(T const& val) - { - std::unique_lock<std::mutex> lock(the_mutex); - the_queue.push(val); - size_t queue_size = the_queue.size(); - lock.unlock(); - - the_rx_notification.notify_one(); - - return queue_size; - } - - size_t push(T&& val) - { - std::unique_lock<std::mutex> lock(the_mutex); - the_queue.emplace(std::move(val)); - size_t queue_size = the_queue.size(); - lock.unlock(); - - the_rx_notification.notify_one(); - - return queue_size; - } - - /* Push one element into the queue, but wait until the - * queue size goes below the threshold. - * - * Notify waiting thread. - * - * returns the new queue size. - */ - size_t push_wait_if_full(T const& val, size_t threshold) - { - std::unique_lock<std::mutex> lock(the_mutex); - while (the_queue.size() >= threshold) { - the_tx_notification.wait(lock); - } - the_queue.push(val); - size_t queue_size = the_queue.size(); - lock.unlock(); - - the_rx_notification.notify_one(); - - return queue_size; - } - - /* Trigger a wakeup event on a blocking consumer, which - * will receive a ThreadsafeQueueWakeup exception. - */ - void trigger_wakeup(void) - { - std::unique_lock<std::mutex> lock(the_mutex); - wakeup_requested = true; - lock.unlock(); - the_rx_notification.notify_one(); - } - - /* Send a notification for the receiver thread */ - void notify(void) - { - the_rx_notification.notify_one(); - } - - bool empty() const - { - std::unique_lock<std::mutex> lock(the_mutex); - return the_queue.empty(); - } - - size_t size() const - { - std::unique_lock<std::mutex> lock(the_mutex); - return the_queue.size(); - } - - bool try_pop(T& popped_value) - { - std::unique_lock<std::mutex> lock(the_mutex); - if (the_queue.empty()) { - return false; - } - - popped_value = the_queue.front(); - the_queue.pop(); - - lock.unlock(); - the_tx_notification.notify_one(); - - return true; - } - - void wait_and_pop(T& popped_value, size_t prebuffering = 1) - { - std::unique_lock<std::mutex> lock(the_mutex); - while (the_queue.size() < prebuffering and - not wakeup_requested) { - the_rx_notification.wait(lock); - } - - if (wakeup_requested) { - wakeup_requested = false; - throw ThreadsafeQueueWakeup(); - } - else { - std::swap(popped_value, the_queue.front()); - the_queue.pop(); - - lock.unlock(); - the_tx_notification.notify_one(); - } - } - -private: - std::queue<T> the_queue; - mutable std::mutex the_mutex; - std::condition_variable the_rx_notification; - std::condition_variable the_tx_notification; - bool wakeup_requested = false; -}; - diff --git a/src/TimestampDecoder.cpp b/src/TimestampDecoder.cpp index a561237..7f1fb6e 100644 --- a/src/TimestampDecoder.cpp +++ b/src/TimestampDecoder.cpp @@ -27,6 +27,7 @@ #include <iostream> #include <fstream> #include <string> +#include <cstring> #include "PcDebug.h" #include "TimestampDecoder.h" #include "Log.h" diff --git a/src/Utils.cpp b/src/Utils.cpp index fd4f659..f39c4c9 100644 --- a/src/Utils.cpp +++ b/src/Utils.cpp @@ -46,9 +46,6 @@ static void printHeader() #if defined(HAVE_ZEROMQ) "zeromq " << #endif -#ifdef HAVE_EDI - "EDI " << -#endif #if defined(HAVE_OUTPUT_UHD) "output_uhd " << #endif diff --git a/src/output/Feedback.cpp b/src/output/Feedback.cpp index 17e45bf..88d8319 100644 --- a/src/output/Feedback.cpp +++ b/src/output/Feedback.cpp @@ -200,14 +200,13 @@ void DPDFeedbackServer::ReceiveBurstThread() void DPDFeedbackServer::ServeFeedback() { - TCPSocket m_server_sock; - m_server_sock.listen(m_port); + Socket::TCPSocket m_server_sock; + m_server_sock.listen(m_port, "127.0.0.1"); etiLog.level(info) << "DPD Feedback server listening on port " << m_port; while (m_running) { - struct sockaddr_in client; - TCPSocket client_sock = m_server_sock.accept_with_timeout(1000, &client); + auto client_sock = m_server_sock.accept(1000); if (not m_running) { break; diff --git a/src/output/SDR.cpp b/src/output/SDR.cpp index 01312ff..ad65c1c 100644 --- a/src/output/SDR.cpp +++ b/src/output/SDR.cpp @@ -314,7 +314,7 @@ void SDR::handle_frame(struct FrameData& frame) } if (expected_sec != tx_second or expected_pps != tx_pps) { - etiLog.level(warn) << "OutputSDR: timestamp irregularity!" << + etiLog.level(warn) << "OutputSDR: timestamp irregularity at FCT=" << frame.ts.fct << std::fixed << " Expected " << expected_sec << "+" << (double)expected_pps/16384000.0 << @@ -337,7 +337,7 @@ void SDR::handle_frame(struct FrameData& frame) if (time_spec.get_real_secs() + tx_timeout < device_time) { etiLog.level(warn) << - "OutputSDR: Timestamp in the past! offset: " << + "OutputSDR: Timestamp in the past at FCT=" << frame.ts.fct << " offset: " << std::fixed << time_spec.get_real_secs() - device_time << " (" << device_time << ")" @@ -349,7 +349,7 @@ void SDR::handle_frame(struct FrameData& frame) if (time_spec.get_real_secs() > device_time + TIMESTAMP_ABORT_FUTURE) { etiLog.level(error) << - "OutputSDR: Timestamp way too far in the future! offset: " << + "OutputSDR: Timestamp way too far in the future at FCT=" << frame.ts.fct << " offset: " << std::fixed << time_spec.get_real_secs() - device_time; throw std::runtime_error("Timestamp error. Aborted."); @@ -358,7 +358,7 @@ void SDR::handle_frame(struct FrameData& frame) if (m_config.muting) { etiLog.log(info, - "OutputSDR: Muting sample %d requested\n", + "OutputSDR: Muting FCT=%d requested", frame.ts.fct); return; } |