diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-05-06 17:18:25 +0200 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-05-06 17:18:25 +0200 | 
| commit | 107dca66a069c5e317d040360b8aafd62c8282db (patch) | |
| tree | 534e9e4860327b8b49170f103b9f7419832c1928 | |
| parent | 473232ab177a4811115ff5713bb25893448dafd7 (diff) | |
| download | dabmod-107dca66a069c5e317d040360b8aafd62c8282db.tar.gz dabmod-107dca66a069c5e317d040360b8aafd62c8282db.tar.bz2 dabmod-107dca66a069c5e317d040360b8aafd62c8282db.zip | |
Implement EDI over TCP
| -rw-r--r-- | doc/example.ini | 7 | ||||
| -rw-r--r-- | src/DabMod.cpp | 21 | ||||
| -rw-r--r-- | src/EtiReader.cpp | 115 | ||||
| -rw-r--r-- | src/EtiReader.h | 12 | 
4 files changed, 114 insertions, 41 deletions
| diff --git a/doc/example.ini b/doc/example.ini index 7f4d3e5..b3e2eb3 100644 --- a/doc/example.ini +++ b/doc/example.ini @@ -45,6 +45,13 @@ loop=0  ; Listen for EDI data on a given UDP port, unicast or multicast.  ;transport=edi  ; +; EDI over TCP: +; +; Connect to TCP server on a given host +;source=tcp://localhost:9201 +; +; EDI over UDP: +;  ; Supported syntax for the source setting:  ;  Bind to default interface and receive data from port 12000  ;source=udp://:12000 diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 7ebde12..1f435bf 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -306,11 +306,11 @@ int launch_modulator(int argc, char* argv[])              // setMaxDelay wants number of AF packets, which correspond to 24ms ETI frames              ediInput.setMaxDelay(lroundf(mod_settings.edi_max_delay_ms / 24.0f));          } -        EdiUdpInput ediUdpInput(ediInput); +        EdiTransport ediTransport(ediInput); -        ediUdpInput.Open(mod_settings.inputName); -        if (not ediUdpInput.isEnabled()) { -            throw runtime_error("inputTransport is edi, but ediUdpInput is not enabled"); +        ediTransport.Open(mod_settings.inputName); +        if (not ediTransport.isEnabled()) { +            throw runtime_error("inputTransport is edi, but ediTransport is not enabled");          }          Flowgraph flowgraph; @@ -329,16 +329,27 @@ int launch_modulator(int argc, char* argv[])          bool first_frame = true; +        auto frame_received_tp = chrono::steady_clock::now(); +          while (running) {              while (running and not ediReader.isFrameReady()) {                  try { -                    ediUdpInput.rxPacket(); +                    bool packet_received = ediTransport.rxPacket(); +                    if (packet_received) { +                        frame_received_tp = chrono::steady_clock::now(); +                    }                  }                  catch (const std::runtime_error& e) {                      etiLog.level(warn) << "EDI input: " << e.what();                      running = 0;                      break;                  } + +                if (frame_received_tp + chrono::seconds(10) < chrono::steady_clock::now()) { +                    etiLog.level(error) << "No EDI data received in 10 seconds."; +                    running = 0; +                    break; +                }              }              if (not running) { diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp index 4c5ad79..94c362a 100644 --- a/src/EtiReader.cpp +++ b/src/EtiReader.cpp @@ -547,7 +547,7 @@ void EdiReader::assemble()      m_frameReady = true;  } -EdiUdpInput::EdiUdpInput(EdiDecoder::ETIDecoder& decoder) : +EdiTransport::EdiTransport(EdiDecoder::ETIDecoder& decoder) :      m_enabled(false),      m_port(0),      m_bindto("0.0.0.0"), @@ -555,49 +555,100 @@ EdiUdpInput::EdiUdpInput(EdiDecoder::ETIDecoder& decoder) :      m_decoder(decoder) { } -void EdiUdpInput::Open(const std::string& uri) +void EdiTransport::Open(const std::string& uri)  {      etiLog.level(info) << "Opening EDI :" << uri; -    size_t found_port = uri.find_first_of(":", 6); -    if (found_port == string::npos) { -        throw std::invalid_argument("EDI input port must be provided"); -    } -    m_port = std::stoi(uri.substr(found_port+1)); -    std::string host_full = uri.substr(6, found_port-6);// ignore udp:// -    size_t found_mcast = host_full.find_first_of("@"); //have multicast address: -    if (found_mcast != string::npos) { -        if (found_mcast > 0) { -            m_bindto = host_full.substr(0, found_mcast); +    const string proto = uri.substr(0, 3); +    if (proto == "udp") { +        size_t found_port = uri.find_first_of(":", 6); +        if (found_port == string::npos) { +            throw std::invalid_argument("EDI UDP input port must be provided");          } -        m_mcastaddr = host_full.substr(found_mcast+1); -    } -    else if (found_port != 6) { -        m_bindto=host_full; + +        m_port = std::stoi(uri.substr(found_port+1)); +        std::string host_full = uri.substr(6, found_port-6);// skip udp:// +        size_t found_mcast = host_full.find_first_of("@"); //have multicast address: +        if (found_mcast != string::npos) { +            if (found_mcast > 0) { +                m_bindto = host_full.substr(0, found_mcast); +            } +            m_mcastaddr = host_full.substr(found_mcast+1); +        } +        else if (found_port != 6) { +            m_bindto=host_full; +        } + +        etiLog.level(info) << "EDI UDP input: host:" << m_bindto << +            ", source:" << m_mcastaddr << ", port:" << m_port; + +        // The max_fragments_queued is only a protection against a runaway +        // memory usage. +        // Rough calculation: +        // 300 seconds, 24ms per frame, up to 20 fragments per frame +        const size_t max_fragments_queued = 20 * 300 * 1000 / 24; + +        m_udp_rx.start(m_port, m_bindto, m_mcastaddr, max_fragments_queued); +        m_proto = Proto::UDP; +        m_enabled = true;      } +    else if (proto == "tcp") { +        size_t found_port = uri.find_first_of(":", 6); +        if (found_port == string::npos) { +            throw std::invalid_argument("EDI TCP input port must be provided"); +        } -    etiLog.level(info) << "EDI input: host:" << m_bindto << -        ", source:" << m_mcastaddr << ", port:" << m_port; +        m_port = std::stoi(uri.substr(found_port+1)); +        const std::string hostname = uri.substr(6, found_port-6);// skip tcp:// -    // The max_fragments_queued is only a protection against a runaway -    // memory usage. -    // Rough calculation: -    // 300 seconds, 24ms per frame, up to 20 fragments per frame -    const size_t max_fragments_queued = 20 * 300 * 1000 / 24; +        etiLog.level(info) << "EDI TCP connect to " << hostname << ":" << m_port; -    m_udp_rx.start(m_port, m_bindto, m_mcastaddr, max_fragments_queued); -    m_enabled = true; +        m_tcpclient.connect(hostname, m_port); +        m_proto = Proto::TCP; +        m_enabled = true; +    } +    else { +        throw std::invalid_argument("ETI protocol '" + proto + "' unknown"); +    }  } -bool EdiUdpInput::rxPacket() +bool EdiTransport::rxPacket()  { -    auto udp_data = m_udp_rx.get_packet_buffer(); +    switch (m_proto) { +        case Proto::UDP: +            { +                auto udp_data = m_udp_rx.get_packet_buffer(); -    if (udp_data.empty()) { -        return false; -    } +                if (udp_data.empty()) { +                    return false; +                } -    m_decoder.push_packet(udp_data); -    return true; +                m_decoder.push_packet(udp_data); +                return true; +            } +        case Proto::TCP: +            { +                m_tcpbuffer.resize(4096); +                const int timeout_ms = 1000; +                try { +                    ssize_t ret = m_tcpclient.recv(m_tcpbuffer.data(), m_tcpbuffer.size(), 0, timeout_ms); +                    if (ret == 0 or ret == -1) { +                        return false; +                    } +                    else if (ret > (ssize_t)m_tcpbuffer.size()) { +                        throw logic_error("EDI TCP: invalid recv() return value"); +                    } +                    else { +                        m_tcpbuffer.resize(ret); +                        m_decoder.push_bytes(m_tcpbuffer); +                        return true; +                    } +                } +                catch (const TCPSocket::Timeout&) { +                    return false; +                } +            } +    } +    throw logic_error("Incomplete rxPacket implementation!");  }  #endif // HAVE_EDI diff --git a/src/EtiReader.h b/src/EtiReader.h index 554231e..38f7903 100644 --- a/src/EtiReader.h +++ b/src/EtiReader.h @@ -34,6 +34,7 @@  #include "Eti.h"  #include "Log.h"  #include "FicSource.h" +#include "Socket.h"  #include "SubchannelSource.h"  #include "TimestampDecoder.h"  #include "lib/edi/ETIDecoder.hpp" @@ -185,13 +186,12 @@ private:  };  /* The EDI input does not use the inputs defined in InputReader.h, as they were - * designed for ETI. It uses the EdiUdpInput which in turn uses a threaded + * designed for ETI. It uses the EdiTransport which in turn uses a threaded   * receiver.   */ - -class EdiUdpInput { +class EdiTransport {      public: -        EdiUdpInput(EdiDecoder::ETIDecoder& decoder); +        EdiTransport(EdiDecoder::ETIDecoder& decoder);          void Open(const std::string& uri); @@ -209,7 +209,11 @@ class EdiUdpInput {          std::string m_bindto;          std::string m_mcastaddr; +        enum class Proto { UDP, TCP }; +        Proto m_proto;          UdpReceiver m_udp_rx; +        std::vector<uint8_t> m_tcpbuffer; +        TCPClient m_tcpclient;          EdiDecoder::ETIDecoder& m_decoder;  };  #endif | 
