diff options
-rw-r--r-- | doc/example.ini | 7 | ||||
-rw-r--r-- | src/DabMod.cpp | 21 | ||||
-rw-r--r-- | src/EtiReader.cpp | 115 | ||||
-rw-r--r-- | src/EtiReader.h | 12 |
4 files changed, 114 insertions, 41 deletions
diff --git a/doc/example.ini b/doc/example.ini index 7f4d3e5..b3e2eb3 100644 --- a/doc/example.ini +++ b/doc/example.ini @@ -45,6 +45,13 @@ loop=0 ; Listen for EDI data on a given UDP port, unicast or multicast. ;transport=edi ; +; EDI over TCP: +; +; Connect to TCP server on a given host +;source=tcp://localhost:9201 +; +; EDI over UDP: +; ; Supported syntax for the source setting: ; Bind to default interface and receive data from port 12000 ;source=udp://:12000 diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 7ebde12..1f435bf 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -306,11 +306,11 @@ int launch_modulator(int argc, char* argv[]) // setMaxDelay wants number of AF packets, which correspond to 24ms ETI frames ediInput.setMaxDelay(lroundf(mod_settings.edi_max_delay_ms / 24.0f)); } - EdiUdpInput ediUdpInput(ediInput); + EdiTransport ediTransport(ediInput); - ediUdpInput.Open(mod_settings.inputName); - if (not ediUdpInput.isEnabled()) { - throw runtime_error("inputTransport is edi, but ediUdpInput is not enabled"); + ediTransport.Open(mod_settings.inputName); + if (not ediTransport.isEnabled()) { + throw runtime_error("inputTransport is edi, but ediTransport is not enabled"); } Flowgraph flowgraph; @@ -329,16 +329,27 @@ int launch_modulator(int argc, char* argv[]) bool first_frame = true; + auto frame_received_tp = chrono::steady_clock::now(); + while (running) { while (running and not ediReader.isFrameReady()) { try { - ediUdpInput.rxPacket(); + bool packet_received = ediTransport.rxPacket(); + if (packet_received) { + frame_received_tp = chrono::steady_clock::now(); + } } catch (const std::runtime_error& e) { etiLog.level(warn) << "EDI input: " << e.what(); running = 0; break; } + + if (frame_received_tp + chrono::seconds(10) < chrono::steady_clock::now()) { + etiLog.level(error) << "No EDI data received in 10 seconds."; + running = 0; + break; + } } if (not running) { diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp index 4c5ad79..94c362a 100644 --- a/src/EtiReader.cpp +++ b/src/EtiReader.cpp @@ -547,7 +547,7 @@ void EdiReader::assemble() m_frameReady = true; } -EdiUdpInput::EdiUdpInput(EdiDecoder::ETIDecoder& decoder) : +EdiTransport::EdiTransport(EdiDecoder::ETIDecoder& decoder) : m_enabled(false), m_port(0), m_bindto("0.0.0.0"), @@ -555,49 +555,100 @@ EdiUdpInput::EdiUdpInput(EdiDecoder::ETIDecoder& decoder) : m_decoder(decoder) { } -void EdiUdpInput::Open(const std::string& uri) +void EdiTransport::Open(const std::string& uri) { etiLog.level(info) << "Opening EDI :" << uri; - size_t found_port = uri.find_first_of(":", 6); - if (found_port == string::npos) { - throw std::invalid_argument("EDI input port must be provided"); - } - m_port = std::stoi(uri.substr(found_port+1)); - std::string host_full = uri.substr(6, found_port-6);// ignore udp:// - size_t found_mcast = host_full.find_first_of("@"); //have multicast address: - if (found_mcast != string::npos) { - if (found_mcast > 0) { - m_bindto = host_full.substr(0, found_mcast); + const string proto = uri.substr(0, 3); + if (proto == "udp") { + size_t found_port = uri.find_first_of(":", 6); + if (found_port == string::npos) { + throw std::invalid_argument("EDI UDP input port must be provided"); } - m_mcastaddr = host_full.substr(found_mcast+1); - } - else if (found_port != 6) { - m_bindto=host_full; + + m_port = std::stoi(uri.substr(found_port+1)); + std::string host_full = uri.substr(6, found_port-6);// skip udp:// + size_t found_mcast = host_full.find_first_of("@"); //have multicast address: + if (found_mcast != string::npos) { + if (found_mcast > 0) { + m_bindto = host_full.substr(0, found_mcast); + } + m_mcastaddr = host_full.substr(found_mcast+1); + } + else if (found_port != 6) { + m_bindto=host_full; + } + + etiLog.level(info) << "EDI UDP input: host:" << m_bindto << + ", source:" << m_mcastaddr << ", port:" << m_port; + + // The max_fragments_queued is only a protection against a runaway + // memory usage. + // Rough calculation: + // 300 seconds, 24ms per frame, up to 20 fragments per frame + const size_t max_fragments_queued = 20 * 300 * 1000 / 24; + + m_udp_rx.start(m_port, m_bindto, m_mcastaddr, max_fragments_queued); + m_proto = Proto::UDP; + m_enabled = true; } + else if (proto == "tcp") { + size_t found_port = uri.find_first_of(":", 6); + if (found_port == string::npos) { + throw std::invalid_argument("EDI TCP input port must be provided"); + } - etiLog.level(info) << "EDI input: host:" << m_bindto << - ", source:" << m_mcastaddr << ", port:" << m_port; + m_port = std::stoi(uri.substr(found_port+1)); + const std::string hostname = uri.substr(6, found_port-6);// skip tcp:// - // The max_fragments_queued is only a protection against a runaway - // memory usage. - // Rough calculation: - // 300 seconds, 24ms per frame, up to 20 fragments per frame - const size_t max_fragments_queued = 20 * 300 * 1000 / 24; + etiLog.level(info) << "EDI TCP connect to " << hostname << ":" << m_port; - m_udp_rx.start(m_port, m_bindto, m_mcastaddr, max_fragments_queued); - m_enabled = true; + m_tcpclient.connect(hostname, m_port); + m_proto = Proto::TCP; + m_enabled = true; + } + else { + throw std::invalid_argument("ETI protocol '" + proto + "' unknown"); + } } -bool EdiUdpInput::rxPacket() +bool EdiTransport::rxPacket() { - auto udp_data = m_udp_rx.get_packet_buffer(); + switch (m_proto) { + case Proto::UDP: + { + auto udp_data = m_udp_rx.get_packet_buffer(); - if (udp_data.empty()) { - return false; - } + if (udp_data.empty()) { + return false; + } - m_decoder.push_packet(udp_data); - return true; + m_decoder.push_packet(udp_data); + return true; + } + case Proto::TCP: + { + m_tcpbuffer.resize(4096); + const int timeout_ms = 1000; + try { + ssize_t ret = m_tcpclient.recv(m_tcpbuffer.data(), m_tcpbuffer.size(), 0, timeout_ms); + if (ret == 0 or ret == -1) { + return false; + } + else if (ret > (ssize_t)m_tcpbuffer.size()) { + throw logic_error("EDI TCP: invalid recv() return value"); + } + else { + m_tcpbuffer.resize(ret); + m_decoder.push_bytes(m_tcpbuffer); + return true; + } + } + catch (const TCPSocket::Timeout&) { + return false; + } + } + } + throw logic_error("Incomplete rxPacket implementation!"); } #endif // HAVE_EDI diff --git a/src/EtiReader.h b/src/EtiReader.h index 554231e..38f7903 100644 --- a/src/EtiReader.h +++ b/src/EtiReader.h @@ -34,6 +34,7 @@ #include "Eti.h" #include "Log.h" #include "FicSource.h" +#include "Socket.h" #include "SubchannelSource.h" #include "TimestampDecoder.h" #include "lib/edi/ETIDecoder.hpp" @@ -185,13 +186,12 @@ private: }; /* The EDI input does not use the inputs defined in InputReader.h, as they were - * designed for ETI. It uses the EdiUdpInput which in turn uses a threaded + * designed for ETI. It uses the EdiTransport which in turn uses a threaded * receiver. */ - -class EdiUdpInput { +class EdiTransport { public: - EdiUdpInput(EdiDecoder::ETIDecoder& decoder); + EdiTransport(EdiDecoder::ETIDecoder& decoder); void Open(const std::string& uri); @@ -209,7 +209,11 @@ class EdiUdpInput { std::string m_bindto; std::string m_mcastaddr; + enum class Proto { UDP, TCP }; + Proto m_proto; UdpReceiver m_udp_rx; + std::vector<uint8_t> m_tcpbuffer; + TCPClient m_tcpclient; EdiDecoder::ETIDecoder& m_decoder; }; #endif |