diff options
37 files changed, 3371 insertions, 1381 deletions
| diff --git a/Makefile.am b/Makefile.am index 2f06879..6e5aa71 100644 --- a/Makefile.am +++ b/Makefile.am @@ -69,6 +69,8 @@ odr_dabmux_SOURCES  =src/DabMux.cpp \  					 src/input/File.h \  					 src/input/Udp.cpp \  					 src/input/Udp.h \ +					 src/input/Edi.cpp \ +					 src/input/Edi.h \  					 src/dabOutput/dabOutput.h \  					 src/dabOutput/dabOutputFile.cpp \  					 src/dabOutput/dabOutputFifo.cpp \ @@ -98,8 +100,6 @@ odr_dabmux_SOURCES  =src/DabMux.cpp \  					 src/ConfigParser.h \  					 src/Eti.h \  					 src/Eti.cpp \ -					 src/InetAddress.h \ -					 src/InetAddress.cpp \  					 src/Interleaver.h \  					 src/Interleaver.cpp \  					 src/Log.h \ @@ -109,15 +109,8 @@ odr_dabmux_SOURCES  =src/DabMux.cpp \  					 src/MuxElements.cpp \  					 src/MuxElements.h \  					 src/PcDebug.h \ -					 src/ReedSolomon.h \ -					 src/ReedSolomon.cpp \  					 src/RemoteControl.cpp \  					 src/RemoteControl.h \ -					 src/TcpSocket.h \ -					 src/TcpSocket.cpp \ -					 src/UdpSocket.h \ -					 src/UdpSocket.cpp \ -					 src/ThreadsafeQueue.h \  					 src/crc.h \  					 src/crc.c \  					 src/fig/FIG.h \ @@ -167,6 +160,20 @@ odr_dabmux_SOURCES  =src/DabMux.cpp \  					 src/PrbsGenerator.h \  					 src/utils.cpp \  					 src/utils.h \ +					 lib/edi/STIDecoder.cpp \ +					 lib/edi/STIDecoder.h \ +					 lib/edi/STIWriter.cpp \ +					 lib/edi/STIWriter.h \ +					 lib/edi/PFT.cpp \ +					 lib/edi/PFT.h \ +					 lib/edi/common.cpp \ +					 lib/edi/common.h \ +					 lib/edi/buffer_unpack.hpp \ +					 lib/ReedSolomon.h \ +					 lib/ReedSolomon.cpp \ +					 lib/Socket.h \ +					 lib/Socket.cpp \ +					 lib/ThreadsafeQueue.h \  					 lib/zmq.hpp \  					 $(lib_fec_sources) \  					 $(lib_charset_sources) @@ -205,18 +212,15 @@ odr_zmq2edi_SOURCES  = src/zmq2edi/zmq2edi.cpp \  					   src/dabOutput/edi/TagPacket.h \  					   src/dabOutput/edi/Transport.cpp \  					   src/dabOutput/edi/Transport.h \ -					   src/InetAddress.h \ -					   src/InetAddress.cpp \ -					   src/TcpSocket.h \ -					   src/TcpSocket.cpp \ -					   src/UdpSocket.h \ -					   src/UdpSocket.cpp \ -					   src/ReedSolomon.h \ -					   src/ReedSolomon.cpp \  					   src/Log.h \  					   src/Log.cpp \  					   src/crc.h \  					   src/crc.c \ +					   lib/ReedSolomon.h \ +					   lib/ReedSolomon.cpp \ +					   lib/Socket.h \ +					   lib/Socket.cpp \ +					   lib/ThreadsafeQueue.h \  					   lib/zmq.hpp \  					   $(lib_fec_sources) diff --git a/doc/advanced.mux b/doc/advanced.mux index fb67b82..b9cec05 100644 --- a/doc/advanced.mux +++ b/doc/advanced.mux @@ -163,7 +163,8 @@ subchannels {      sub-fu {          type audio          ; example file input -        inputfile "funk.mp2" +        inputproto zmq +        inputuri "funk.mp2"          nonblock false          bitrate 128          id 10 @@ -188,7 +189,8 @@ subchannels {          ; Receive STI-D(LI) carried in STI(PI, X) inside RTP using UDP.          ; This is intended to be compatible with AVT audio encoders.          ; EXPERIMENTAL! -        inputfile "sti-rtp://127.0.0.1:32010" +        inputproto sti +        inputuri "rtp://127.0.0.1:32010"          bitrate 96          id 3          protection 3 @@ -196,11 +198,12 @@ subchannels {      sub-ri {          type dabplus          ; example file input -        ;inputfile "rick.dabp" +        ;inputuri "rick.dabp"          ; example zmq input:          ; Accepts connections to port 9000 from any interface.          ; Use ODR-AudioEnc as encoder -        inputfile "tcp://*:9000" +        inputproto zmq +        inputuri "tcp://*:9000"          bitrate 96          id 1          protection 1 @@ -256,7 +259,8 @@ subchannels {          ; for audio types, you can use the ZeroMQ input (if compiled in)          ; with the following configuration in combination with          ; Toolame-DAB -        inputfile "tcp://*:9001" +        inputproto zmq +        inputuri "tcp://*:9001"          bitrate 96          id 1          protection 1 @@ -273,7 +277,8 @@ subchannels {         type data         ; Use the default PRBS polynomial. -       inputfile "prbs://" +       inputproto prbs +       inputuri "prbs://"         ; To use another polynomial, set it in the url as hexadecimal         ; The default polynomial is G(x) = x^20 + x^17 + 1, represented as diff --git a/doc/example.mux b/doc/example.mux index 6c2bc18..31e072d 100644 --- a/doc/example.mux +++ b/doc/example.mux @@ -171,7 +171,8 @@ subchannels {          type dabplus          ; Accepts connections to port 9000 from any interface.          ; Use ODR-AudioEnc as encoder -        inputfile "tcp://*:9000" +        inputproto zmq +        inputuri "tcp://*:9000"          bitrate 96          id 1          protection 3 diff --git a/src/ReedSolomon.cpp b/lib/ReedSolomon.cpp index 38d8ea8..38d8ea8 100644 --- a/src/ReedSolomon.cpp +++ b/lib/ReedSolomon.cpp diff --git a/src/ReedSolomon.h b/lib/ReedSolomon.h index abcef62..abcef62 100644 --- a/src/ReedSolomon.h +++ b/lib/ReedSolomon.h diff --git a/lib/Socket.cpp b/lib/Socket.cpp new file mode 100644 index 0000000..cd70a8e --- /dev/null +++ b/lib/Socket.cpp @@ -0,0 +1,898 @@ +/* +   Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the +   Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org +   */ +/* +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published by +   the Free Software Foundation, either version 3 of the License, or +   (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <https://www.gnu.org/licenses/>. +*/ + +#include "Socket.h" + +#include <iostream> +#include <cstdio> +#include <cstring> +#include <cerrno> +#include <fcntl.h> +#include <poll.h> + +namespace Socket { + +using namespace std; + +void InetAddress::resolveUdpDestination(const std::string& destination, int port) +{ +    char service[NI_MAXSERV]; +    snprintf(service, NI_MAXSERV-1, "%d", port); + +    struct addrinfo hints; +    memset(&hints, 0, sizeof(struct addrinfo)); +    hints.ai_family = AF_INET; +    hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ +    hints.ai_flags = 0; +    hints.ai_protocol = 0; + +    struct addrinfo *result, *rp; +    int s = getaddrinfo(destination.c_str(), service, &hints, &result); +    if (s != 0) { +        throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s)); +    } + +    for (rp = result; rp != nullptr; rp = rp->ai_next) { +        // Take the first result +        memcpy(&addr, rp->ai_addr, rp->ai_addrlen); +        break; +    } + +    freeaddrinfo(result); + +    if (rp == nullptr) { +        throw runtime_error("Could not resolve"); +    } +} + +UDPPacket::UDPPacket() { } + +UDPPacket::UDPPacket(size_t initSize) : +    buffer(initSize) +{ } + + +UDPSocket::UDPSocket() : +    m_sock(INVALID_SOCKET) +{ +    reinit(0, ""); +} + +UDPSocket::UDPSocket(int port) : +    m_sock(INVALID_SOCKET) +{ +    reinit(port, ""); +} + +UDPSocket::UDPSocket(int port, const std::string& name) : +    m_sock(INVALID_SOCKET) +{ +    reinit(port, name); +} + + +void UDPSocket::setBlocking(bool block) +{ +    int res = fcntl(m_sock, F_SETFL, block ? 0 : O_NONBLOCK); +    if (res == -1) { +        throw runtime_error(string("Can't change blocking state of socket: ") + strerror(errno)); +    } +} + +void UDPSocket::reinit(int port) +{ +    return reinit(port, ""); +} + +void UDPSocket::reinit(int port, const std::string& name) +{ +    if (m_sock != INVALID_SOCKET) { +        ::close(m_sock); +    } + +    if (port == 0) { +        // No need to bind to a given port, creating the +        // socket is enough +        m_sock = ::socket(AF_INET, SOCK_DGRAM, 0); +        return; +    } + +    char service[NI_MAXSERV]; +    snprintf(service, NI_MAXSERV-1, "%d", port); + +    struct addrinfo hints; +    memset(&hints, 0, sizeof(struct addrinfo)); +    hints.ai_family = AF_INET; +    hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ +    hints.ai_flags = AI_PASSIVE;    /* For wildcard IP address */ +    hints.ai_protocol = 0;          /* Any protocol */ +    hints.ai_canonname = nullptr; +    hints.ai_addr = nullptr; +    hints.ai_next = nullptr; + +    struct addrinfo *result, *rp; +    int s = getaddrinfo(name.empty() ? nullptr : name.c_str(), +            port == 0 ? nullptr : service, +            &hints, &result); +    if (s != 0) { +        throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s)); +    } + +    /* getaddrinfo() returns a list of address structures. +       Try each address until we successfully bind(2). +       If socket(2) (or bind(2)) fails, we (close the socket +       and) try the next address. */ +    for (rp = result; rp != nullptr; rp = rp->ai_next) { +        int sfd = ::socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); +        if (sfd == -1) { +            continue; +        } + +        if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { +            m_sock = sfd; +            break; +        } + +        ::close(sfd); +    } + +    freeaddrinfo(result); + +    if (rp == nullptr) { +        throw runtime_error("Could not bind"); +    } +} + +void UDPSocket::close() +{ +    if (m_sock != INVALID_SOCKET) { +        ::close(m_sock); +    } + +    m_sock = INVALID_SOCKET; +} + +UDPSocket::~UDPSocket() +{ +    if (m_sock != INVALID_SOCKET) { +        ::close(m_sock); +    } +} + + +UDPPacket UDPSocket::receive(size_t max_size) +{ +    UDPPacket packet(max_size); +    socklen_t addrSize; +    addrSize = sizeof(*packet.address.as_sockaddr()); +    ssize_t ret = recvfrom(m_sock, +            packet.buffer.data(), +            packet.buffer.size(), +            0, +            packet.address.as_sockaddr(), +            &addrSize); + +    if (ret == SOCKET_ERROR) { +        packet.buffer.resize(0); + +        // This suppresses the -Wlogical-op warning +#if EAGAIN == EWOULDBLOCK +        if (errno == EAGAIN) { +#else +        if (errno == EAGAIN or errno == EWOULDBLOCK) { +#endif +            return 0; +        } +        throw runtime_error(string("Can't receive data: ") + strerror(errno)); +    } + +    packet.buffer.resize(ret); +    return packet; +} + +void UDPSocket::send(UDPPacket& packet) +{ +    const int ret = sendto(m_sock, packet.buffer.data(), packet.buffer.size(), 0, +            packet.address.as_sockaddr(), sizeof(*packet.address.as_sockaddr())); +    if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { +        throw runtime_error(string("Can't send UDP packet: ") + strerror(errno)); +    } +} + + +void UDPSocket::send(const std::vector<uint8_t>& data, InetAddress destination) +{ +    const int ret = sendto(m_sock, data.data(), data.size(), 0, +            destination.as_sockaddr(), sizeof(*destination.as_sockaddr())); +    if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { +        throw runtime_error(string("Can't send UDP packet: ") + strerror(errno)); +    } +} + +void UDPSocket::joinGroup(const char* groupname, const char* if_addr) +{ +    ip_mreqn group; +    if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { +        throw runtime_error("Cannot convert multicast group name"); +    } +    if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { +        throw runtime_error("Group name is not a multicast address"); +    } + +    if (if_addr) { +        group.imr_address.s_addr = inet_addr(if_addr); +    } +    else { +        group.imr_address.s_addr = htons(INADDR_ANY); +    } +    group.imr_ifindex = 0; +    if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) +            == SOCKET_ERROR) { +        throw runtime_error(string("Can't join multicast group") + strerror(errno)); +    } +} + +void UDPSocket::setMulticastSource(const char* source_addr) +{ +    struct in_addr addr; +    if (inet_aton(source_addr, &addr) == 0) { +        throw runtime_error(string("Can't parse source address") + strerror(errno)); +    } + +    if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) +            == SOCKET_ERROR) { +        throw runtime_error(string("Can't set source address") + strerror(errno)); +    } +} + +void UDPSocket::setMulticastTTL(int ttl) +{ +    if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) +            == SOCKET_ERROR) { +        throw runtime_error(string("Can't set multicast ttl") + strerror(errno)); +    } +} + +UDPReceiver::UDPReceiver() { } + +UDPReceiver::~UDPReceiver() { +    m_stop = true; +    m_sock.close(); +    if (m_thread.joinable()) { +        m_thread.join(); +    } +} + +void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) { +    m_port = port; +    m_bindto = bindto; +    m_mcastaddr = mcastaddr; +    m_max_packets_queued = max_packets_queued; +    m_thread = std::thread(&UDPReceiver::m_run, this); +} + +std::vector<uint8_t> UDPReceiver::get_packet_buffer() +{ +    if (m_stop) { +        throw runtime_error("UDP Receiver not running"); +    } + +    UDPPacket p; +    m_packets.wait_and_pop(p); + +    return p.buffer; +} + +void UDPReceiver::m_run() +{ +    // Ensure that stop is set to true in case of exception or return +    struct SetStopOnDestruct { +        SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {} +        ~SetStopOnDestruct() { m_stop = true; } +        private: atomic<bool>& m_stop; +    } autoSetStop(m_stop); + +    if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) { +        m_sock.reinit(m_port, m_mcastaddr); +        m_sock.setMulticastSource(m_bindto.c_str()); +        m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str()); +    } +    else { +        m_sock.reinit(m_port, m_bindto); +    } + +    while (not m_stop) { +        constexpr size_t packsize = 8192; +        try { +            auto packet = m_sock.receive(packsize); +            if (packet.buffer.size() == packsize) { +                // TODO replace fprintf +                fprintf(stderr, "Warning, possible UDP truncation\n"); +            } + +            // If this blocks, the UDP socket will lose incoming packets +            m_packets.push_wait_if_full(packet, m_max_packets_queued); +        } +        catch (const std::runtime_error& e) { +            // TODO replace fprintf +            // TODO handle intr +            fprintf(stderr, "Socket error: %s\n", e.what()); +            m_stop = true; +        } +    } +} + + +TCPSocket::TCPSocket() +{ +} + +TCPSocket::~TCPSocket() +{ +    if (m_sock != -1) { +        ::close(m_sock); +    } +} + +TCPSocket::TCPSocket(TCPSocket&& other) : +    m_sock(other.m_sock), +    m_remote_address(move(other.m_remote_address)) +{ +    if (other.m_sock != -1) { +        other.m_sock = -1; +    } +} + +TCPSocket& TCPSocket::operator=(TCPSocket&& other) +{ +    swap(m_remote_address, other.m_remote_address); + +    m_sock = other.m_sock; +    if (other.m_sock != -1) { +        other.m_sock = -1; +    } + +    return *this; +} + +bool TCPSocket::valid() const +{ +    return m_sock != -1; +} + +void TCPSocket::connect(const std::string& hostname, int port) +{ +    if (m_sock != INVALID_SOCKET) { +        throw std::logic_error("You may only connect an invalid TCPSocket"); +    } + +    char service[NI_MAXSERV]; +    snprintf(service, NI_MAXSERV-1, "%d", port); + +    /* Obtain address(es) matching host/port */ +    struct addrinfo hints; +    memset(&hints, 0, sizeof(struct addrinfo)); +    hints.ai_family = AF_INET; +    hints.ai_socktype = SOCK_STREAM; +    hints.ai_flags = 0; +    hints.ai_protocol = 0; + +    struct addrinfo *result, *rp; +    int s = getaddrinfo(hostname.c_str(), service, &hints, &result); +    if (s != 0) { +        throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s)); +    } + +    /* getaddrinfo() returns a list of address structures. +       Try each address until we successfully connect(2). +       If socket(2) (or connect(2)) fails, we (close the socket +       and) try the next address. */ + +    for (rp = result; rp != nullptr; rp = rp->ai_next) { +        int sfd = ::socket(rp->ai_family, rp->ai_socktype, +                rp->ai_protocol); +        if (sfd == -1) +            continue; + +        int ret = ::connect(sfd, rp->ai_addr, rp->ai_addrlen); +        if (ret != -1 or (ret == -1 and errno == EINPROGRESS)) { +            // As the TCPClient could set the socket to nonblocking, we +            // must handle EINPROGRESS here +            m_sock = sfd; +            break; +        } + +        ::close(sfd); +    } + +    if (m_sock != INVALID_SOCKET) { +#if defined(HAVE_SO_NOSIGPIPE) +        int val = 1; +        if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)) +                == SOCKET_ERROR) { +            throw std::runtime_error("Can't set SO_NOSIGPIPE"); +        } +#endif +    } + +    freeaddrinfo(result);           /* No longer needed */ + +    if (rp == nullptr) { +        throw runtime_error("Could not connect"); +    } + +} + +void TCPSocket::listen(int port, const string& name) +{ +    if (m_sock != INVALID_SOCKET) { +        throw std::logic_error("You may only listen with an invalid TCPSocket"); +    } + +    char service[NI_MAXSERV]; +    snprintf(service, NI_MAXSERV-1, "%d", port); + +    struct addrinfo hints; +    memset(&hints, 0, sizeof(struct addrinfo)); +    hints.ai_family = AF_INET; +    hints.ai_socktype = SOCK_STREAM; +    hints.ai_flags = AI_PASSIVE;    /* For wildcard IP address */ +    hints.ai_protocol = 0; +    hints.ai_canonname = nullptr; +    hints.ai_addr = nullptr; +    hints.ai_next = nullptr; + +    struct addrinfo *result, *rp; +    int s = getaddrinfo(name.empty() ? nullptr : name.c_str(), service, &hints, &result); +    if (s != 0) { +        throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s)); +    } + +    /* getaddrinfo() returns a list of address structures. +       Try each address until we successfully bind(2). +       If socket(2) (or bind(2)) fails, we (close the socket +       and) try the next address. */ +    for (rp = result; rp != nullptr; rp = rp->ai_next) { +        int sfd = ::socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); +        if (sfd == -1) { +            continue; +        } + +        if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { +            m_sock = sfd; +            break; +        } + +        ::close(sfd); +    } + +    freeaddrinfo(result); + +    if (m_sock != INVALID_SOCKET) { +#if defined(HAVE_SO_NOSIGPIPE) +        int val = 1; +        if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, +                    &val, sizeof(val)) < 0) { +            throw std::runtime_error("Can't set SO_NOSIGPIPE"); +        } +#endif + +        int ret = ::listen(m_sock, 0); +        if (ret == -1) { +            throw std::runtime_error(string("Could not listen: ") + strerror(errno)); +        } +    } + +    if (rp == nullptr) { +        throw runtime_error("Could not bind"); +    } +} + +void TCPSocket::close() +{ +    ::close(m_sock); +    m_sock = -1; +} + +TCPSocket TCPSocket::accept(int timeout_ms) +{ +    if (timeout_ms == 0) { +        InetAddress remote_addr; +        socklen_t client_len = sizeof(remote_addr.addr); +        int sockfd = ::accept(m_sock, remote_addr.as_sockaddr(), &client_len); +        TCPSocket s(sockfd, remote_addr); +        return s; +    } +    else { +        struct pollfd fds[1]; +        fds[0].fd = m_sock; +        fds[0].events = POLLIN; + +        int retval = poll(fds, 1, timeout_ms); + +        if (retval == -1) { +            std::string errstr(strerror(errno)); +            throw std::runtime_error("TCP Socket accept error: " + errstr); +        } +        else if (retval > 0) { +            InetAddress remote_addr; +            socklen_t client_len = sizeof(remote_addr.addr); +            int sockfd = ::accept(m_sock, remote_addr.as_sockaddr(), &client_len); +            TCPSocket s(sockfd, remote_addr); +            return s; +        } +        else { +            TCPSocket s(-1); +            return s; +        } +    } +} + +ssize_t TCPSocket::sendall(const void *buffer, size_t buflen) +{ +    uint8_t *buf = (uint8_t*)buffer; +    while (buflen > 0) { +        /* On Linux, the MSG_NOSIGNAL flag ensures that the process +         * would not receive a SIGPIPE and die. +         * Other systems have SO_NOSIGPIPE set on the socket for the +         * same effect. */ +#if defined(HAVE_MSG_NOSIGNAL) +        const int flags = MSG_NOSIGNAL; +#else +        const int flags = 0; +#endif +        ssize_t sent = ::send(m_sock, buf, buflen, flags); +        if (sent < 0) { +            return -1; +        } +        else { +            buf += sent; +            buflen -= sent; +        } +    } +    return buflen; +} + +ssize_t TCPSocket::send(const void* data, size_t size, int timeout_ms) +{ +    if (timeout_ms) { +        struct pollfd fds[1]; +        fds[0].fd = m_sock; +        fds[0].events = POLLOUT; + +        const int retval = poll(fds, 1, timeout_ms); + +        if (retval == -1) { +            throw std::runtime_error(string("TCP Socket send error on poll(): ") + strerror(errno)); +        } +        else if (retval == 0) { +            // Timed out +            return 0; +        } +    } + +    /* On Linux, the MSG_NOSIGNAL flag ensures that the process would not +     * receive a SIGPIPE and die. +     * Other systems have SO_NOSIGPIPE set on the socket for the same effect. */ +#if defined(HAVE_MSG_NOSIGNAL) +    const int flags = MSG_NOSIGNAL; +#else +    const int flags = 0; +#endif +    const ssize_t ret = ::send(m_sock, (const char*)data, size, flags); + +    if (ret == SOCKET_ERROR) { +            throw std::runtime_error(string("TCP Socket send error: ") + strerror(errno)); +    } +    return ret; +} + +ssize_t TCPSocket::recv(void *buffer, size_t length, int flags) +{ +    ssize_t ret = ::recv(m_sock, buffer, length, flags); +    if (ret == -1) { +        std::string errstr(strerror(errno)); +        throw std::runtime_error("TCP receive error: " + errstr); +    } +    return ret; +} + +ssize_t TCPSocket::recv(void *buffer, size_t length, int flags, int timeout_ms) +{ +    struct pollfd fds[1]; +    fds[0].fd = m_sock; +    fds[0].events = POLLIN; + +    int retval = poll(fds, 1, timeout_ms); + +    if (retval == -1 and errno == EINTR) { +        throw Interrupted(); +    } +    else if (retval == -1) { +        std::string errstr(strerror(errno)); +        throw std::runtime_error("TCP receive with poll() error: " + errstr); +    } +    else if (retval > 0 and (fds[0].revents | POLLIN)) { +        ssize_t ret = ::recv(m_sock, buffer, length, flags); +        if (ret == -1) { +            if (errno == ECONNREFUSED) { +                return 0; +            } +            std::string errstr(strerror(errno)); +            throw std::runtime_error("TCP receive after poll() error: " + errstr); +        } +        return ret; +    } +    else { +        throw Timeout(); +    } +} + +TCPSocket::TCPSocket(int sockfd) : +    m_sock(sockfd), +    m_remote_address() +{ } + +TCPSocket::TCPSocket(int sockfd, InetAddress remote_address) : +    m_sock(sockfd), +    m_remote_address(remote_address) +{ } + +void TCPClient::connect(const std::string& hostname, int port) +{ +    m_hostname = hostname; +    m_port = port; +    reconnect(); +} + +ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms) +{ +    try { +        ssize_t ret = m_sock.recv(buffer, length, flags, timeout_ms); + +        if (ret == 0) { +            m_sock.close(); + +            TCPSocket newsock; +            m_sock = std::move(newsock); +            reconnect(); +        } + +        return ret; +    } +    catch (const TCPSocket::Interrupted&) { +        return -1; +    } +    catch (const TCPSocket::Timeout&) { +        return 0; +    } + +    return 0; +} + +void TCPClient::reconnect() +{ +    int flags = fcntl(m_sock.m_sock, F_GETFL); +    if (fcntl(m_sock.m_sock, F_SETFL, flags | O_NONBLOCK) == -1) { +        std::string errstr(strerror(errno)); +        throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr); +    } + +    m_sock.connect(m_hostname, m_port); +} + +TCPConnection::TCPConnection(TCPSocket&& sock) : +            queue(), +            m_running(true), +            m_sender_thread(), +            m_sock(move(sock)) +{ +#if MISSING_OWN_ADDR +    auto own_addr = m_sock.getOwnAddress(); +    auto addr = m_sock.getRemoteAddress(); +    etiLog.level(debug) << "New TCP Connection on port " << +        own_addr.getPort() << " from " << +        addr.getHostAddress() << ":" << addr.getPort(); +#endif +    m_sender_thread = std::thread(&TCPConnection::process, this); +} + +TCPConnection::~TCPConnection() +{ +    m_running = false; +    vector<uint8_t> termination_marker; +    queue.push(termination_marker); +    m_sender_thread.join(); +} + +void TCPConnection::process() +{ +    while (m_running) { +        vector<uint8_t> data; +        queue.wait_and_pop(data); + +        if (data.empty()) { +            // empty vector is the termination marker +            m_running = false; +            break; +        } + +        try { +            ssize_t remaining = data.size(); +            const uint8_t *buf = reinterpret_cast<const uint8_t*>(data.data()); +            const int timeout_ms = 10; // Less than one ETI frame + +            while (m_running and remaining > 0) { +                const ssize_t sent = m_sock.send(buf, remaining, timeout_ms); +                if (sent < 0 or sent > remaining) { +                    throw std::logic_error("Invalid TCPSocket::send() return value"); +                } +                remaining -= sent; +                buf += sent; +            } +        } +        catch (const std::runtime_error& e) { +            m_running = false; +        } +    } + +#if MISSING_OWN_ADDR +    auto own_addr = m_sock.getOwnAddress(); +    auto addr = m_sock.getRemoteAddress(); +    etiLog.level(debug) << "Dropping TCP Connection on port " << +        own_addr.getPort() << " from " << +        addr.getHostAddress() << ":" << addr.getPort(); +#endif +} + + +TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) : +    m_max_queue_size(max_queue_size) +{ +} + +TCPDataDispatcher::~TCPDataDispatcher() +{ +    m_running = false; +    m_connections.clear(); +    m_listener_socket.close(); +    if (m_listener_thread.joinable()) { +        m_listener_thread.join(); +    } +} + +void TCPDataDispatcher::start(int port, const string& address) +{ +    m_listener_socket.listen(port, address); + +    m_running = true; +    m_listener_thread = std::thread(&TCPDataDispatcher::process, this); +} + +void TCPDataDispatcher::write(const vector<uint8_t>& data) +{ +    if (not m_running) { +        throw runtime_error(m_exception_data); +    } + +    for (auto& connection : m_connections) { +        connection.queue.push(data); +    } + +    m_connections.remove_if( +            [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; }); +} + +void TCPDataDispatcher::process() +{ +    try { +        const int timeout_ms = 1000; + +        while (m_running) { +            // Add a new TCPConnection to the list, constructing it from the client socket +            auto sock = m_listener_socket.accept(timeout_ms); +            if (sock.valid()) { +                m_connections.emplace(m_connections.begin(), move(sock)); +            } +        } +    } +    catch (const std::runtime_error& e) { +        m_exception_data = string("TCPDataDispatcher error: ") + e.what(); +        m_running = false; +    } +} + +TCPReceiveServer::TCPReceiveServer(size_t blocksize) : +    m_blocksize(blocksize) +{ +} + +void TCPReceiveServer::start(int listen_port, const std::string& address) +{ +    m_listener_socket.listen(listen_port, address); + +    m_running = true; +    m_listener_thread = std::thread(&TCPReceiveServer::process, this); +} + +TCPReceiveServer::~TCPReceiveServer() +{ +    m_running = false; +    if (m_listener_thread.joinable()) { +        m_listener_thread.join(); +    } +} + +vector<uint8_t> TCPReceiveServer::receive() +{ +    vector<uint8_t> buffer; +    m_queue.try_pop(buffer); + +    // we can ignore try_pop()'s return value, because +    // if it is unsuccessful the buffer is not touched. +    return buffer; +} + +void TCPReceiveServer::process() +{ +    constexpr int timeout_ms = 1000; +    constexpr int disconnect_timeout_ms = 10000; +    constexpr int max_num_timeouts = disconnect_timeout_ms / timeout_ms; + +    while (m_running) { +        auto sock = m_listener_socket.accept(timeout_ms); + +        int num_timeouts = 0; + +        while (m_running and sock.valid()) { +            try { +                vector<uint8_t> buf(m_blocksize); +                ssize_t r = sock.recv(buf.data(), buf.size(), 0, timeout_ms); +                if (r < 0) { +                    throw logic_error("Invalid recv return value"); +                } +                else if (r == 0) { +                    sock.close(); +                    break; +                } +                else { +                    buf.resize(r); +                    m_queue.push(move(buf)); +                } +            } +            catch (const TCPSocket::Interrupted&) { +                break; +            } +            catch (const TCPSocket::Timeout&) { +                num_timeouts++; +            } + +            if (num_timeouts > max_num_timeouts) { +                sock.close(); +            } +        } +    } +} + +} diff --git a/lib/Socket.h b/lib/Socket.h new file mode 100644 index 0000000..8bb7fe1 --- /dev/null +++ b/lib/Socket.h @@ -0,0 +1,294 @@ +/* +   Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the +   Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org +   */ +/* +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published by +   the Free Software Foundation, either version 3 of the License, or +   (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <https://www.gnu.org/licenses/>. +*/ + +#pragma once + +#ifdef HAVE_CONFIG_H +#   include "config.h" +#endif + +#include "ThreadsafeQueue.h" +#include <cstdlib> +#include <iostream> +#include <vector> +#include <atomic> +#include <thread> +#include <list> + +#include <sys/socket.h> +#include <netinet/in.h> +#include <unistd.h> +#include <netdb.h> +#include <arpa/inet.h> +#include <pthread.h> +#define SOCKET           int +#define INVALID_SOCKET   -1 +#define SOCKET_ERROR     -1 + + +namespace Socket { + +struct InetAddress { +    struct sockaddr_storage addr; + +    struct sockaddr *as_sockaddr() { return reinterpret_cast<sockaddr*>(&addr); }; + +    void resolveUdpDestination(const std::string& destination, int port); +}; + +/** This class represents a UDP packet. + * + *  A UDP packet contains a payload (sequence of bytes) and an address. For + *  outgoing packets, the address is the destination address. For incoming + *  packets, the address tells the user from what source the packet arrived from. + */ +class UDPPacket +{ +    public: +        UDPPacket(); +        UDPPacket(size_t initSize); + +        std::vector<uint8_t> buffer; +        InetAddress address; +}; + +/** + *  This class represents a socket for sending and receiving UDP packets. + * + *  A UDP socket is the sending or receiving point for a packet delivery service. + *  Each packet sent or received on a datagram socket is individually + *  addressed and routed. Multiple packets sent from one machine to another may + *  be routed differently, and may arrive in any order. + */ +class UDPSocket +{ +    public: +        /** Create a new socket that will not be bound to any port. To be used +         * for data output. +         */ +        UDPSocket(); +        /** Create a new socket. +         *  @param port The port number on which the socket will be bound +         */ +        UDPSocket(int port); +        /** Create a new socket. +         *  @param port The port number on which the socket will be bound +         *  @param name The IP address on which the socket will be bound. +         *              It is used to bind the socket on a specific interface if +         *              the computer have many NICs. +         */ +        UDPSocket(int port, const std::string& name); +        ~UDPSocket(); +        UDPSocket(const UDPSocket& other) = delete; +        const UDPSocket& operator=(const UDPSocket& other) = delete; + +        /** Close the already open socket, and create a new one. Throws a runtime_error on error.  */ +        void reinit(int port); +        void reinit(int port, const std::string& name); + +        void close(void); +        void send(UDPPacket& packet); +        void send(const std::vector<uint8_t>& data, InetAddress destination); +        UDPPacket receive(size_t max_size); +        void joinGroup(const char* groupname, const char* if_addr = nullptr); +        void setMulticastSource(const char* source_addr); +        void setMulticastTTL(int ttl); + +        /** Set blocking mode. By default, the socket is blocking. +         * throws a runtime_error on error. +         */ +        void setBlocking(bool block); + +    protected: +        SOCKET m_sock; +}; + +/* Threaded UDP receiver */ +class UDPReceiver { +    public: +        UDPReceiver(); +        ~UDPReceiver(); +        UDPReceiver(const UDPReceiver&) = delete; +        UDPReceiver operator=(const UDPReceiver&) = delete; + +        // Start the receiver in a separate thread +        void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued); + +        // Get the data contained in a UDP packet, blocks if none available +        // In case of error, throws a runtime_error +        std::vector<uint8_t> get_packet_buffer(void); + +    private: +        void m_run(void); + +        int m_port = 0; +        std::string m_bindto; +        std::string m_mcastaddr; +        size_t m_max_packets_queued = 1; +        std::thread m_thread; +        std::atomic<bool> m_stop = ATOMIC_VAR_INIT(false); +        ThreadsafeQueue<UDPPacket> m_packets; +        UDPSocket m_sock; +}; + +class TCPSocket { +    public: +        TCPSocket(); +        ~TCPSocket(); +        TCPSocket(const TCPSocket& other) = delete; +        TCPSocket& operator=(const TCPSocket& other) = delete; +        TCPSocket(TCPSocket&& other); +        TCPSocket& operator=(TCPSocket&& other); + +        bool valid(void) const; +        void connect(const std::string& hostname, int port); +        void listen(int port, const std::string& name); +        void close(void); + +        /* throws a runtime_error on failure, an invalid socket on timeout */ +        TCPSocket accept(int timeout_ms); + +        /* returns -1 on error, doesn't work on nonblocking sockets */ +        ssize_t sendall(const void *buffer, size_t buflen); + +        /** Send data over the TCP connection. +         *  @param data The buffer that will be sent. +         *  @param size Number of bytes to send. +         *  @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout +         *  return number of bytes sent, 0 on timeout, or throws runtime_error. +         */ +        ssize_t send(const void* data, size_t size, int timeout_ms=0); + +        /* Returns number of bytes read, 0 on disconnect. Throws a +         * runtime_error on error */ +        ssize_t recv(void *buffer, size_t length, int flags); + +        class Timeout {}; +        class Interrupted {}; +        /* Returns number of bytes read, 0 on disconnect or refused connection. +         * Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error +         * on error +         */ +        ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms); + +    private: +        explicit TCPSocket(int sockfd); +        explicit TCPSocket(int sockfd, InetAddress remote_address); +        SOCKET m_sock = -1; + +        InetAddress m_remote_address; + +        friend class TCPClient; +}; + +/* Implements a TCP receiver that auto-reconnects on errors */ +class TCPClient { +    public: +        void connect(const std::string& hostname, int port); + +        /* Returns numer of bytes read, 0 on auto-reconnect, -1 +         * on interruption. +         * Throws a runtime_error on error */ +        ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms); + +    private: +        void reconnect(void); +        TCPSocket m_sock; +        std::string m_hostname; +        int m_port; +}; + +/* Helper class for TCPDataDispatcher, contains a queue of pending data and + * a sender thread. */ +class TCPConnection +{ +    public: +        TCPConnection(TCPSocket&& sock); +        TCPConnection(const TCPConnection&) = delete; +        TCPConnection& operator=(const TCPConnection&) = delete; +        ~TCPConnection(); + +        ThreadsafeQueue<std::vector<uint8_t> > queue; + +    private: +        std::atomic<bool> m_running; +        std::thread m_sender_thread; +        TCPSocket m_sock; + +        void process(void); +}; + +/* Send a TCP stream to several destinations, and automatically disconnect destinations + * whose buffer overflows. + */ +class TCPDataDispatcher +{ +    public: +        TCPDataDispatcher(size_t max_queue_size); +        ~TCPDataDispatcher(); +        TCPDataDispatcher(const TCPDataDispatcher&) = delete; +        TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete; + +        void start(int port, const std::string& address); +        void write(const std::vector<uint8_t>& data); + +    private: +        void process(); + +        size_t m_max_queue_size; + +        std::atomic<bool> m_running; +        std::string m_exception_data; +        std::thread m_listener_thread; +        TCPSocket m_listener_socket; +        std::list<TCPConnection> m_connections; +}; + +/* A TCP Server to receive data, which abstracts the handling of connects and disconnects. + */ +class TCPReceiveServer { +    public: +        TCPReceiveServer(size_t blocksize); +        ~TCPReceiveServer(); +        TCPReceiveServer(const TCPReceiveServer&) = delete; +        TCPReceiveServer& operator=(const TCPReceiveServer&) = delete; + +        void start(int listen_port, const std::string& address); + +        // Return a vector that contains up to blocksize bytes of data, or +        // and empty vector if no data is available. +        std::vector<uint8_t> receive(); + +    private: +        void process(); + +        size_t m_blocksize = 0; +        ThreadsafeQueue<std::vector<uint8_t> > m_queue; +        std::atomic<bool> m_running; +        std::string m_exception_data; +        std::thread m_listener_thread; +        TCPSocket m_listener_socket; +}; + +} diff --git a/src/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h index ab287b2..62f4c96 100644 --- a/src/ThreadsafeQueue.h +++ b/lib/ThreadsafeQueue.h @@ -12,20 +12,18 @@     element out.   */  /* -   This file is part of ODR-DabMux. +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published by +   the Free Software Foundation, either version 3 of the License, or +   (at your option) any later version. -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, +   This program is distributed in the hope that it will be useful,     but WITHOUT ANY WARRANTY; without even the implied warranty of     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the     GNU General Public License for more details.     You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. +   along with this program.  If not, see <https://www.gnu.org/licenses/>.   */  #pragma once diff --git a/lib/edi/PFT.cpp b/lib/edi/PFT.cpp new file mode 100644 index 0000000..aff7929 --- /dev/null +++ b/lib/edi/PFT.cpp @@ -0,0 +1,574 @@ +/* ------------------------------------------------------------------ + * Copyright (C) 2017 AVT GmbH - Fabien Vercasson + * Copyright (C) 2017 Matthias P. Braendli + *                    matthias.braendli@mpb.li + * + * http://opendigitalradio.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the License for the specific language governing permissions + * and limitations under the License. + * ------------------------------------------------------------------- + */ + +#include <stdio.h> +#include <cassert> +#include <cstring> +#include <sstream> +#include <stdexcept> +#include <algorithm> +#include "crc.h" +#include "PFT.hpp" +#include "Log.h" +#include "buffer_unpack.hpp" +extern "C" { +#include "fec/fec.h" +} + +namespace EdiDecoder { +namespace PFT { + +using namespace std; + +const findex_t NUM_AFBUILDERS_TO_KEEP = 10; + +static bool checkCRC(const uint8_t *buf, size_t size) +{ +    const uint16_t crc_from_packet = read_16b(buf + size - 2); +    uint16_t crc_calc = 0xffff; +    crc_calc = crc16(crc_calc, buf, size - 2); +    crc_calc ^= 0xffff; + +    return crc_from_packet == crc_calc; +} + +class FECDecoder { +    public: +        FECDecoder() { +            m_rs_handler = init_rs_char( +                    symsize, gfPoly, firstRoot, primElem, nroots, pad); +        } +        FECDecoder(const FECDecoder& other) = delete; +        FECDecoder& operator=(const FECDecoder& other) = delete; +        ~FECDecoder() { +            free_rs_char(m_rs_handler); +        } + +        // return -1 in case of failure, non-negative value if errors +        // were corrected. +        // Known positions of erasures should be given in eras_pos to +        // improve decoding probability. After calling this function +        // eras_pos will contain the positions of the corrected errors. +        int decode(vector<uint8_t> &data, vector<int> &eras_pos) { +            assert(data.size() == N); +            const size_t no_eras = eras_pos.size(); + +            eras_pos.resize(nroots); +            int num_err = decode_rs_char(m_rs_handler, data.data(), +                    eras_pos.data(), no_eras); +            if (num_err > 0) { +                eras_pos.resize(num_err); +            } +            return num_err; +        } + +        // return -1 in case of failure, non-negative value if errors +        // were corrected. No known erasures. +        int decode(vector<uint8_t> &data) { +            assert(data.size() == N); +            int num_err = decode_rs_char(m_rs_handler, data.data(), nullptr, 0); +            return num_err; +        } + +    private: +        void* m_rs_handler; + +        const int firstRoot = 1; // Discovered by analysing EDI dump +        const int gfPoly = 0x11d; + +        // The encoding has to be 255, 207 always, because the chunk has to +        // be padded at the end, and not at the beginning as libfec would +        // do +        const size_t N = 255; +        const size_t K = 207; +        const int primElem = 1; +        const int symsize = 8; +        const size_t nroots = N - K; // For EDI PFT, this must be 48 +        const size_t pad = ((1 << symsize) - 1) - N; // is 255-N + +}; + +size_t Fragment::loadData(const std::vector<uint8_t> &buf) +{ +    const size_t header_len = 14; +    if (buf.size() < header_len) { +        return 0; +    } + +    size_t index = 0; + +    // Parse PFT Fragment Header (ETSI TS 102 821 V1.4.1 ch7.1) +    if (not (buf[0] == 'P' and buf[1] == 'F') ) { +        throw invalid_argument("Invalid PFT SYNC bytes"); +    } +    index += 2; // Psync + +    _Pseq = read_16b(buf.begin()+index); index += 2; +    _Findex = read_24b(buf.begin()+index); index += 3; +    _Fcount = read_24b(buf.begin()+index); index += 3; +    _FEC = unpack1bit(buf[index], 0); +    _Addr = unpack1bit(buf[index], 1); +    _Plen = read_16b(buf.begin()+index) & 0x3FFF; index += 2; + +    const size_t required_len = header_len + +        (_FEC ? 1 : 0) + +        (_Addr ? 2 : 0) + +        2; // CRC +    if (buf.size() < required_len) { +        return 0; +    } + +    // Optional RS Header +    _RSk = 0; +    _RSz = 0; +    if (_FEC) { +        _RSk = buf[index]; index += 1; +        _RSz = buf[index]; index += 1; +    } + +    // Optional transport header +    _Source = 0; +    _Dest = 0; +    if (_Addr) { +        _Source = read_16b(buf.begin()+index); index += 2; +        _Dest = read_16b(buf.begin()+index); index += 2; +    } + +    index += 2; +    const bool crc_valid = checkCRC(buf.data(), index); +    const bool buf_has_enough_data = (buf.size() >= index + _Plen); + +    if (not buf_has_enough_data) { +        return 0; +    } + +    _valid = ((not _FEC) or crc_valid) and buf_has_enough_data; + +#if 0 +    if (!_valid) { +        stringstream ss; +        ss << "Invalid PF fragment: "; +        if (_FEC) { +            ss << " RSk=" << (uint32_t)_RSk << " RSz=" << (uint32_t)_RSz; +        } + +        if (_Addr) { +            ss << " Source=" << _Source << " Dest=" << _Dest; +        } +        etiLog.log(debug, "%s\n", ss.str().c_str()); +    } +#endif + +    _payload.clear(); +    if (_valid) { +        copy( buf.begin()+index, +                buf.begin()+index+_Plen, +                back_inserter(_payload)); +        index += _Plen; +    } + +    return index; +} + + +AFBuilder::AFBuilder(pseq_t Pseq, findex_t Fcount, size_t lifetime) +{ +    _Pseq = Pseq; +    _Fcount = Fcount; +    assert(lifetime > 0); +    lifeTime = lifetime; +} + +void AFBuilder::pushPFTFrag(const Fragment &frag) +{ +    if (_Pseq != frag.Pseq() or _Fcount != frag.Fcount()) { +        throw invalid_argument("Invalid PFT fragment Pseq or Fcount"); +    } +    const auto Findex = frag.Findex(); +    const bool fragment_already_received = _fragments.count(Findex); + +    if (not fragment_already_received) +    { +        _fragments[Findex] = frag; +    } +} + +bool Fragment::checkConsistency(const Fragment& other) const +{ +    /* Consistency check, TS 102 821 Clause 7.3.2. +     * +     * Every PFT Fragment produced from a single AF or RS Packet shall have +     * the same values in all of the PFT Header fields except for the Findex, +     * Plen and HCRC fields. +     */ + +    return other._Fcount == _Fcount and +        other._FEC == _FEC and +        other._RSk == _RSk and +        other._RSz == _RSz and +        other._Addr == _Addr and +        other._Source == _Source and +        other._Dest == _Dest and + +        /* The Plen field of all fragments shall be the s for the initial f-1 +         * fragments and s - (L%f) for the final fragment. +         * Note that when Reed Solomon has been used, all fragments will be of +         * length s. +         */ +        (_FEC ? other._Plen == _Plen : true); +} + + +AFBuilder::decode_attempt_result_t AFBuilder::canAttemptToDecode() const +{ +    if (_fragments.empty()) { +        return AFBuilder::decode_attempt_result_t::no; +    } + +    if (_fragments.size() == _Fcount) { +        return AFBuilder::decode_attempt_result_t::yes; +    } + +    /* Check that all fragments are consistent */ +    const Fragment& first = _fragments.begin()->second; +    if (not std::all_of(_fragments.begin(), _fragments.end(), +            [&](const pair<int, Fragment>& pair) { +                const Fragment& frag = pair.second; +                return first.checkConsistency(frag) and _Pseq == frag.Pseq(); +            }) ) { +        throw invalid_argument("Inconsistent PFT fragments"); +    } + +    // Calculate the minimum number of fragments necessary to apply FEC. +    // This can't be done with the last fragment that may have a +    // smaller size +    // ETSI TS 102 821 V1.4.1 ch 7.4.4 +    auto frag_it = _fragments.begin(); +    if (frag_it->second.Fcount() == _Fcount - 1) { +        frag_it++; + +        if (frag_it == _fragments.end()) { +            return AFBuilder::decode_attempt_result_t::no; +        } +    } + +    const Fragment& frag = frag_it->second; + +    if ( frag.FEC() ) +    { +        const uint16_t _Plen = frag.Plen(); + +        /* max number of RS chunks that may have been sent */ +        const uint32_t _cmax = (_Fcount*_Plen) / (frag.RSk()+48); +        assert(_cmax > 0); + +        /* Receiving _rxmin fragments does not guarantee that decoding +         * will succeed! */ +        const uint32_t _rxmin = _Fcount - (_cmax*48)/_Plen; + +        if (_fragments.size() >= _rxmin) { +            return AFBuilder::decode_attempt_result_t::maybe; +        } +    } + +    return AFBuilder::decode_attempt_result_t::no; +} + +std::vector<uint8_t> AFBuilder::extractAF() const +{ +    if (not _af_packet.empty()) { +        return _af_packet; +    } + +    bool ok = false; + +    if (canAttemptToDecode() != AFBuilder::decode_attempt_result_t::no) { + +        auto frag_it = _fragments.begin(); +        if (frag_it->second.Fcount() == _Fcount - 1) { +            frag_it++; + +            if (frag_it == _fragments.end()) { +                throw std::runtime_error("Invalid attempt at extracting AF"); +            } +        } + +        const Fragment& ref_frag = frag_it->second; +        const auto RSk = ref_frag.RSk(); +        const auto RSz = ref_frag.RSz(); +        const auto Plen = ref_frag.Plen(); + +        if ( ref_frag.FEC() ) +        { +            const uint32_t cmax = (_Fcount*Plen) / (RSk+48); + +            // Keep track of erasures (missing fragments) for +            // every chunk +            map<int, vector<int> > erasures; + + +            // Assemble fragments into a RS block, immediately +            // deinterleaving it. +            vector<uint8_t> rs_block(Plen * _Fcount); +            for (size_t j = 0; j < _Fcount; j++) { +                const bool fragment_present = _fragments.count(j); +                if (fragment_present) { +                    const auto& fragment = _fragments.at(j).payload(); + +                    if (j != _Fcount - 1 and fragment.size() != Plen) { +                        throw runtime_error("Incorrect fragment length " + +                                to_string(fragment.size()) + " " + +                                to_string(Plen)); +                    } + +                    if (j == _Fcount - 1 and fragment.size() > Plen) { +                        throw runtime_error("Incorrect last fragment length " + +                                to_string(fragment.size()) + " " + +                                to_string(Plen)); +                    } + +                    size_t k = 0; +                    for (; k < fragment.size(); k++) { +                        rs_block[k * _Fcount + j] = fragment[k]; +                    } + +                    for (; k < Plen; k++) { +                        rs_block[k * _Fcount + j] = 0x00; +                    } +                } +                else { +                    // fill with zeros if fragment is missing +                    for (size_t k = 0; k < Plen; k++) { +                        rs_block[k * _Fcount + j] = 0x00; + +                        const size_t chunk_ix = (k * _Fcount + j) / (RSk + 48); +                        const size_t chunk_offset = (k * _Fcount + j) % (RSk + 48); +                        erasures[chunk_ix].push_back(chunk_offset); +                    } +                } +            } + +            // The RS block is a concatenation of chunks of RSk bytes + 48 parity +            // followed by RSz padding + +            FECDecoder fec; +            for (size_t i = 0; i < cmax; i++) { +                // We need to pad the chunk ourself +                vector<uint8_t> chunk(255); +                const auto& block_begin = rs_block.begin() + (RSk + 48) * i; +                copy(block_begin, block_begin + RSk, chunk.begin()); +                // bytes between RSk and 207 are 0x00 already +                copy(block_begin + RSk, block_begin + RSk + 48, +                        chunk.begin() + 207); + +                int errors_corrected = -1; +                if (erasures.count(i)) { +                    errors_corrected = fec.decode(chunk, erasures[i]); +                } +                else { +                    errors_corrected = fec.decode(chunk); +                } + +                if (errors_corrected == -1) { +                    _af_packet.clear(); +                    return {}; +                } + +#if 0 +                if (errors_corrected > 0) { +                    etiLog.log(debug, "Corrected %d errors at ", errors_corrected); +                    for (const auto &index : erasures[i]) { +                        etiLog.log(debug, " %d", index); +                    } +                    etiLog.log(debug, "\n"); +                } +#endif + +                _af_packet.insert(_af_packet.end(), chunk.begin(), chunk.begin() + RSk); +            } + +            _af_packet.resize(_af_packet.size() - RSz); +        } +        else { +            // No FEC: just assemble fragments + +            for (size_t j = 0; j < _Fcount; ++j) { +                const bool fragment_present = _fragments.count(j); +                if (fragment_present) +                { +                    const auto& fragment = _fragments.at(j); + +                    _af_packet.insert(_af_packet.end(), +                       fragment.payload().begin(), +                       fragment.payload().end()); +                } +                else { +                    throw logic_error("Missing fragment"); +                } +            } +        } + +        // EDI specific, must have a CRC. +        if( _af_packet.size() >= 12 ) { +            ok = checkCRC(_af_packet.data(), _af_packet.size()); + +            if (not ok) { +                etiLog.log(debug, "Too many errors to reconstruct AF from %zu/%u" +                        " PFT fragments\n", _fragments.size(), _Fcount); +            } +        } +    } + +    if (not ok) { +        _af_packet.clear(); +    } + +    return _af_packet; +} + +std::string AFBuilder::visualise() const +{ +    stringstream ss; +    ss << "|"; +    for (size_t i = 0; i < _Fcount; i++) { +        if (_fragments.count(i)) { +            ss << "."; +        } +        else { +            ss << " "; +        } +    } +    ss << "| " << AFBuilder::dar_to_string(canAttemptToDecode()) << " " << lifeTime; +    return ss.str(); +} + +void PFT::pushPFTFrag(const Fragment &fragment) +{ +    // Start decoding the first pseq we receive. In normal +    // operation without interruptions, the map should +    // never become empty +    if (m_afbuilders.empty()) { +        m_next_pseq = fragment.Pseq(); +        etiLog.log(debug,"Initialise next_pseq to %u\n", m_next_pseq); +    } + +    if (m_afbuilders.count(fragment.Pseq()) == 0) { +        // The AFBuilder wants to know the lifetime in number of fragments, +        // we know the delay in number of AF packets. Every AF packet +        // is cut into Fcount fragments. +        const size_t lifetime = fragment.Fcount() * m_max_delay; + +        // Build the afbuilder in the map in-place +        m_afbuilders.emplace(std::piecewise_construct, +                /* key */ +                std::forward_as_tuple(fragment.Pseq()), +                /* builder */ +                std::forward_as_tuple(fragment.Pseq(), fragment.Fcount(), lifetime)); +    } + +    auto& p = m_afbuilders.at(fragment.Pseq()); +    p.pushPFTFrag(fragment); + +    if (m_verbose) { +        etiLog.log(debug, "Got frag %u:%u, afbuilders: ", +                fragment.Pseq(), fragment.Findex()); +        for (const auto &k : m_afbuilders) { +            const bool isNextPseq = (m_next_pseq == k.first); +            etiLog.level(debug) << (isNextPseq ? "->" : "  ") << +                k.first << " " << k.second.visualise(); +        } +    } +} + + +std::vector<uint8_t> PFT::getNextAFPacket() +{ +    if (m_afbuilders.count(m_next_pseq) == 0) { +        if (m_afbuilders.size() > m_max_delay) { +            m_afbuilders.clear(); +            etiLog.level(debug) << " Reinit"; +        } + +        return {}; +    } + +    auto &builder = m_afbuilders.at(m_next_pseq); + +    using dar_t = AFBuilder::decode_attempt_result_t; + +    if (builder.canAttemptToDecode() == dar_t::yes) { +        auto afpacket = builder.extractAF(); +        assert(not afpacket.empty()); +        incrementNextPseq(); +        return afpacket; +    } +    else if (builder.canAttemptToDecode() == dar_t::maybe) { +        if (builder.lifeTime > 0) { +            builder.lifeTime--; +        } + +        if (builder.lifeTime == 0) { +            // Attempt Reed-Solomon decoding +            auto afpacket = builder.extractAF(); + +            if (afpacket.empty()) { +                etiLog.log(debug,"pseq %d timed out after RS", m_next_pseq); +            } +            incrementNextPseq(); +            return afpacket; +        } +    } +    else { +        if (builder.lifeTime > 0) { +            builder.lifeTime--; +        } + +        if (builder.lifeTime == 0) { +            etiLog.log(debug, "pseq %d timed out\n", m_next_pseq); +            incrementNextPseq(); +        } +    } + +    return {}; +} + +void PFT::setMaxDelay(size_t num_af_packets) +{ +    m_max_delay = num_af_packets; +} + +void PFT::setVerbose(bool enable) +{ +    m_verbose = enable; +} + +void PFT::incrementNextPseq() +{ +    if (m_afbuilders.count(m_next_pseq - NUM_AFBUILDERS_TO_KEEP) > 0) { +        m_afbuilders.erase(m_next_pseq - NUM_AFBUILDERS_TO_KEEP); +    } + +    m_next_pseq++; +} + +} +} diff --git a/lib/edi/PFT.hpp b/lib/edi/PFT.hpp new file mode 100644 index 0000000..779509b --- /dev/null +++ b/lib/edi/PFT.hpp @@ -0,0 +1,166 @@ +/* ------------------------------------------------------------------ + * Copyright (C) 2017 AVT GmbH - Fabien Vercasson + * Copyright (C) 2017 Matthias P. Braendli + *                    matthias.braendli@mpb.li + * + * http://opendigitalradio.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the License for the specific language governing permissions + * and limitations under the License. + * ------------------------------------------------------------------- + */ + +#pragma once +#include <stdio.h> +#include <vector> +#include <map> +#include <stdint.h> + +namespace EdiDecoder { +namespace PFT { + +using pseq_t = uint16_t; +using findex_t = uint32_t; // findex is a 24-bit value + +class Fragment +{ +    public: +        // Load the data for one fragment from buf into +        // the Fragment. +        // \returns the number of bytes of useful data found in buf +        // A non-zero return value doesn't imply a valid fragment +        // the isValid() method must be used to verify this. +        size_t loadData(const std::vector<uint8_t> &buf); + +        bool isValid() const { return _valid; } +        pseq_t Pseq() const { return _Pseq; } +        findex_t Findex() const { return _Findex; } +        findex_t Fcount() const { return _Fcount; } +        bool FEC() const { return _FEC; } +        uint16_t Plen() const { return _Plen; } +        uint8_t RSk() const { return _RSk; } +        uint8_t RSz() const { return _RSz; } +        const std::vector<uint8_t>& payload() const +            { return _payload; } + +        bool checkConsistency(const Fragment& other) const; + +    private: +        std::vector<uint8_t> _payload; + +        pseq_t _Pseq = 0; +        findex_t _Findex = 0; +        findex_t _Fcount = 0; +        bool _FEC = false; +        bool _Addr = false; +        uint16_t _Plen = 0; +        uint8_t _RSk = 0; +        uint8_t _RSz = 0; +        uint16_t _Source = 0; +        uint16_t _Dest = 0; +        bool _valid = false; +}; + +/* The AFBuilder collects Fragments and builds an Application Frame + * out of them. It does error correction if necessary + */ +class AFBuilder +{ +    public: +        enum class decode_attempt_result_t { +            yes,   // The AF packet can be build because all fragments are present +            maybe, // RS decoding may correctly decode the AF packet +            no,    // Not enough fragments present to permit RS +        }; + +        static std::string dar_to_string(decode_attempt_result_t dar) { +            switch (dar) { +                case decode_attempt_result_t::yes: return "y"; +                case decode_attempt_result_t::no: return "n"; +                case decode_attempt_result_t::maybe: return "m"; +            } +            return "?"; +        } + +        AFBuilder(pseq_t Pseq, findex_t Fcount, size_t lifetime); + +        void pushPFTFrag(const Fragment &frag); + +        /* Assess if it may be possible to decode this AF packet */ +        decode_attempt_result_t canAttemptToDecode() const; + +        /* Try to build the AF with received fragments. +         * Apply error correction if necessary (missing packets/CRC errors) +         * \return an empty vector if building the AF is not possible +         */ +        std::vector<uint8_t> extractAF(void) const; + +        std::pair<findex_t, findex_t> +            numberOfFragments(void) const { +                return {_fragments.size(), _Fcount}; +            } + +        std::string visualise(void) const; + +        /* The user of this instance can keep track of the lifetime of this +         * builder +         */ +        size_t lifeTime; + +    private: + +        // A map from fragment index to fragment +        std::map<findex_t, Fragment> _fragments; + +        // cached version of decoded AF packet +        mutable std::vector<uint8_t> _af_packet; + +        pseq_t _Pseq; +        findex_t _Fcount; +}; + +class PFT +{ +    public: +        void pushPFTFrag(const Fragment &fragment); + +        /* Try to build the AF packet for the next pseq. This might +         * skip one or more pseq according to the maximum delay setting. +         * +         * \return an empty vector if building the AF is not possible +         */ +        std::vector<uint8_t> getNextAFPacket(void); + +        /* Set the maximum delay in number of AF Packets before we +         * abandon decoding a given pseq. +         */ +        void setMaxDelay(size_t num_af_packets); + +        /* Enable verbose fprintf */ +        void setVerbose(bool enable); + +    private: +        void incrementNextPseq(void); + +        pseq_t m_next_pseq; +        size_t m_max_delay = 10; // in AF packets + +        // Keep one AFBuilder for each Pseq +        std::map<pseq_t, AFBuilder> m_afbuilders; + +        bool m_verbose = 0; +}; + +} + +} diff --git a/lib/edi/STIDecoder.cpp b/lib/edi/STIDecoder.cpp new file mode 100644 index 0000000..ca8cead --- /dev/null +++ b/lib/edi/STIDecoder.cpp @@ -0,0 +1,191 @@ +/* +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +   http://opendigitalradio.org + +    This program is free software; you can redistribute it and/or modify +    it under the terms of the GNU General Public License as published by +    the Free Software Foundation; either version 2 of the License, or +    (at your option) any later version. + +    This program is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU General Public License for more details. + +    You should have received a copy of the GNU General Public License along +    with this program; if not, write to the Free Software Foundation, Inc., +    51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#include "STIDecoder.hpp" +#include "buffer_unpack.hpp" +#include "crc.h" +#include "Log.h" +#include <stdio.h> +#include <cassert> +#include <sstream> + +namespace EdiDecoder { + +using namespace std; + +STIDecoder::STIDecoder(STIDataCollector& data_collector, bool verbose) : +    m_data_collector(data_collector), +    m_dispatcher(std::bind(&STIDecoder::packet_completed, this), verbose) +{ +    using std::placeholders::_1; +    using std::placeholders::_2; +    m_dispatcher.register_tag("*ptr", +            std::bind(&STIDecoder::decode_starptr, this, _1, _2)); +    m_dispatcher.register_tag("dsti", +            std::bind(&STIDecoder::decode_dsti, this, _1, _2)); +    m_dispatcher.register_tag("ss", +            std::bind(&STIDecoder::decode_ssn, this, _1, _2)); +    m_dispatcher.register_tag("*dmy", +            std::bind(&STIDecoder::decode_stardmy, this, _1, _2)); +} + +void STIDecoder::push_bytes(const vector<uint8_t> &buf) +{ +    m_dispatcher.push_bytes(buf); +} + +void STIDecoder::push_packet(const vector<uint8_t> &buf) +{ +    m_dispatcher.push_packet(buf); +} + +void STIDecoder::setMaxDelay(int num_af_packets) +{ +    m_dispatcher.setMaxDelay(num_af_packets); +} + +#define AFPACKET_HEADER_LEN 10 // includes SYNC + +bool STIDecoder::decode_starptr(const vector<uint8_t> &value, uint16_t) +{ +    if (value.size() != 0x40 / 8) { +        etiLog.log(warn, "Incorrect length %02lx for *PTR", value.size()); +        return false; +    } + +    char protocol_sz[5]; +    protocol_sz[4] = '\0'; +    copy(value.begin(), value.begin() + 4, protocol_sz); +    string protocol(protocol_sz); + +    uint16_t major = read_16b(value.begin() + 4); +    uint16_t minor = read_16b(value.begin() + 6); + +    m_data_collector.update_protocol(protocol, major, minor); + +    return true; +} + +bool STIDecoder::decode_dsti(const vector<uint8_t> &value, uint16_t) +{ +    size_t offset = 0; + +    const uint16_t dstiHeader = read_16b(value.begin() + offset); +    offset += 2; + +    sti_management_data md; + +    md.stihf = (dstiHeader >> 15) & 0x1; +    md.atstf = (dstiHeader >> 14) & 0x1; +    md.rfadf = (dstiHeader >> 13) & 0x1; +    uint8_t dfcth = (dstiHeader >> 8) & 0x1F; +    uint8_t dfctl = dstiHeader & 0xFF; + +    md.dflc = dfcth * 250 + dfctl; // modulo 5000 counter + +    const size_t expected_length = 2 + +        (md.stihf ? 3 : 0) + +        (md.atstf ? 1 + 4 + 3 : 0) + +        (md.rfadf ? 9 : 0); + +    if (value.size() != expected_length) { +        throw std::logic_error("EDI dsti: Assertion error:" +                "value.size() != expected_length: " + +               to_string(value.size()) + " " + +               to_string(expected_length)); +    } + +    if (md.stihf) { +        const uint8_t stat = value[offset++]; +        const uint16_t spid = read_16b(value.begin() + offset); +        m_data_collector.update_stat(stat, spid); +        offset += 2; +    } + +    if (md.atstf) { +        uint8_t utco = value[offset]; +        offset++; + +        uint32_t seconds = read_32b(value.begin() + offset); +        offset += 4; + +        m_data_collector.update_edi_time(utco, seconds); + +        md.tsta = read_24b(value.begin() + offset); +        offset += 3; +    } +    else { +        // Null timestamp, ETSI ETS 300 799, C.2.2 +        md.tsta = 0xFFFFFF; +    } + + +    if (md.rfadf) { +        std::array<uint8_t, 9> rfad; +        copy(value.cbegin() + offset, +             value.cbegin() + offset + 9, +             rfad.begin()); +        offset += 9; + +        m_data_collector.update_rfad(rfad); +    } + +    m_data_collector.update_sti_management(md); + +    return true; +} + +bool STIDecoder::decode_ssn(const vector<uint8_t> &value, uint16_t n) +{ +    sti_payload_data sti; + +    sti.stream_index = n - 1; // n is 1-indexed +    sti.rfa = value[0] >> 3; +    sti.tid = value[0] & 0x07; + +    uint16_t istc = read_24b(value.begin() + 1); +    sti.tidext = istc >> 13; +    sti.crcstf = (istc >> 12) & 0x1; +    sti.stid = istc & 0xFFF; + +    if (sti.rfa != 0) { +        etiLog.level(warn) << "EDI: rfa field in SSnn tag non-null"; +    } + +    copy(   value.cbegin() + 3, +            value.cend(), +            back_inserter(sti.istd)); + +    m_data_collector.add_payload(move(sti)); + +    return true; +} + +bool STIDecoder::decode_stardmy(const vector<uint8_t>& /*value*/, uint16_t) +{ +    return true; +} + +void STIDecoder::packet_completed() +{ +    m_data_collector.assemble(); +} + +} diff --git a/lib/edi/STIDecoder.hpp b/lib/edi/STIDecoder.hpp new file mode 100644 index 0000000..201a176 --- /dev/null +++ b/lib/edi/STIDecoder.hpp @@ -0,0 +1,122 @@ +/* +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +   http://opendigitalradio.org + +    This program is free software; you can redistribute it and/or modify +    it under the terms of the GNU General Public License as published by +    the Free Software Foundation; either version 2 of the License, or +    (at your option) any later version. + +    This program is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU General Public License for more details. + +    You should have received a copy of the GNU General Public License along +    with this program; if not, write to the Free Software Foundation, Inc., +    51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#pragma once + +#include "common.hpp" +#include <cstdint> +#include <deque> +#include <string> +#include <vector> + +namespace EdiDecoder { + +// Information for STI-D Management +struct sti_management_data { +    bool stihf; +    bool atstf; +    bool rfadf; +    uint16_t dflc; + +    uint32_t tsta; +}; + +// Information for a subchannel available in EDI +struct sti_payload_data { +    uint16_t stream_index; +    uint8_t rfa; +    uint8_t tid; +    uint8_t tidext; +    bool crcstf; +    uint16_t stid; +    std::vector<uint8_t> istd; + +    // Return the length of ISTD in bytes +    uint16_t stl(void) const { return istd.size(); } +}; + +/* A class that receives STI data must implement the interface described + * in the STIDataCollector. This can be e.g. a converter to ETI, or something that + * prepares data structures for a modulator. + */ +class STIDataCollector { +    public: +        // Tell the ETIWriter what EDI protocol we receive in *ptr. +        // This is not part of the ETI data, but is used as check +        virtual void update_protocol( +                const std::string& proto, +                uint16_t major, +                uint16_t minor) = 0; + +        // STAT error field and service provider ID +        virtual void update_stat(uint8_t stat, uint16_t spid) = 0; + +        // In addition to TSTA in ETI, EDI also transports more time +        // stamp information. +        virtual void update_edi_time(uint32_t utco, uint32_t seconds) = 0; + +        virtual void update_rfad(std::array<uint8_t, 9> rfad) = 0; +        virtual void update_sti_management(const sti_management_data& data) = 0; + +        virtual void add_payload(sti_payload_data&& payload) = 0; + +        virtual void assemble() = 0; +}; + +/* The STIDecoder takes care of decoding the EDI TAGs related to the transport + * of ETI(NI) data inside AF and PF packets. + * + * PF packets are handed over to the PFT decoder, which will in turn return + * AF packets. AF packets are directly handled (TAG extraction) here. + */ +class STIDecoder { +    public: +        STIDecoder(STIDataCollector& data_collector, bool verbose); + +        /* Push bytes into the decoder. The buf can contain more +         * than a single packet. This is useful when reading from streams +         * (files, TCP) +         */ +        void push_bytes(const std::vector<uint8_t> &buf); + +        /* Push a complete packet into the decoder. Useful for UDP and other +         * datagram-oriented protocols. +         */ +        void push_packet(const std::vector<uint8_t> &buf); + +        /* Set the maximum delay in number of AF Packets before we +         * abandon decoding a given pseq. +         */ +        void setMaxDelay(int num_af_packets); + +    private: +        bool decode_starptr(const std::vector<uint8_t> &value, uint16_t); +        bool decode_dsti(const std::vector<uint8_t> &value, uint16_t); +        bool decode_ssn(const std::vector<uint8_t> &value, uint16_t n); +        bool decode_stardmy(const std::vector<uint8_t> &value, uint16_t); + +        void packet_completed(); + +        STIDataCollector& m_data_collector; +        TagDispatcher m_dispatcher; + +}; + +} diff --git a/lib/edi/STIWriter.cpp b/lib/edi/STIWriter.cpp new file mode 100644 index 0000000..6964eb1 --- /dev/null +++ b/lib/edi/STIWriter.cpp @@ -0,0 +1,138 @@ +/* +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +   http://opendigitalradio.org + +    This program is free software; you can redistribute it and/or modify +    it under the terms of the GNU General Public License as published by +    the Free Software Foundation; either version 2 of the License, or +    (at your option) any later version. + +    This program is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU General Public License for more details. + +    You should have received a copy of the GNU General Public License along +    with this program; if not, write to the Free Software Foundation, Inc., +    51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#include "STIWriter.hpp" +#include "crc.h" +#include "Log.h" +#include <cstdio> +#include <cassert> +#include <stdexcept> +#include <sstream> +#include <ctime> +#include <iostream> +#include <iomanip> +#include <sstream> + +namespace EdiDecoder { + +using namespace std; + +void STIWriter::update_protocol( +        const std::string& proto, +        uint16_t major, +        uint16_t minor) +{ +    m_proto_valid = (proto == "DSTI" and major == 0 and minor == 0); + +    if (not m_proto_valid) { +        throw std::invalid_argument("Wrong EDI protocol"); +    } +} + +void STIWriter::reinit() +{ +    m_proto_valid = false; +    m_management_data_valid = false; +    m_stat_valid = false; +    m_time_valid = false; +    m_payload_valid = false; +    m_stiFrame.frame.clear(); +} + +void STIWriter::update_stat(uint8_t stat, uint16_t spid) +{ +    m_stat = stat; +    m_spid = spid; +    m_stat_valid = true; + +    if (m_stat != 0xFF) { +        etiLog.log(warn, "STI errorlevel %02x", m_stat); +    } +} + +void STIWriter::update_rfad(std::array<uint8_t, 9> rfad) +{ +    (void)rfad; +} + +void STIWriter::update_sti_management(const sti_management_data& data) +{ +    m_management_data = data; +    m_management_data_valid = true; +} + +void STIWriter::add_payload(sti_payload_data&& payload) +{ +    m_payload = move(payload); +    m_payload_valid = true; +} + +void STIWriter::update_edi_time( +        uint32_t utco, +        uint32_t seconds) +{ +    if (not m_proto_valid) { +        throw std::logic_error("Cannot update time before protocol"); +    } + +    m_utco = utco; +    m_seconds = seconds; + +    // TODO check validity +    m_time_valid = true; +} + + +void STIWriter::assemble() +{ +    if (not m_proto_valid) { +        throw std::logic_error("Cannot assemble STI before protocol"); +    } + +    if (not m_management_data_valid) { +        throw std::logic_error("Cannot assemble STI before management data"); +    } + +    if (not m_payload_valid) { +        throw std::logic_error("Cannot assemble STI without frame data"); +    } + +    // TODO check time validity + +    // Do copies so as to preserve existing payload data +    m_stiFrame.frame = m_payload.istd; +    m_stiFrame.timestamp.seconds = m_seconds; +    m_stiFrame.timestamp.utco = m_utco; +} + +sti_frame_t STIWriter::getFrame() +{ +    if (m_stiFrame.frame.empty()) { +        return {}; +    } + +    sti_frame_t sti; +    swap(sti, m_stiFrame); +    reinit(); +    return sti; +} + +} + diff --git a/lib/edi/STIWriter.hpp b/lib/edi/STIWriter.hpp new file mode 100644 index 0000000..a75cb69 --- /dev/null +++ b/lib/edi/STIWriter.hpp @@ -0,0 +1,84 @@ +/* +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +   http://opendigitalradio.org + +    This program is free software; you can redistribute it and/or modify +    it under the terms of the GNU General Public License as published by +    the Free Software Foundation; either version 2 of the License, or +    (at your option) any later version. + +    This program is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU General Public License for more details. + +    You should have received a copy of the GNU General Public License along +    with this program; if not, write to the Free Software Foundation, Inc., +    51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#pragma once + +#include "common.hpp" +#include "STIDecoder.hpp" +#include <cstdint> +#include <string> +#include <vector> +#include <list> + +namespace EdiDecoder { + +struct sti_frame_t { +    std::vector<uint8_t> frame; +    frame_timestamp_t timestamp; +}; + +class STIWriter : public STIDataCollector { +    public: +        // Tell the ETIWriter what EDI protocol we receive in *ptr. +        // This is not part of the ETI data, but is used as check +        virtual void update_protocol( +                const std::string& proto, +                uint16_t major, +                uint16_t minor); + +        virtual void update_stat(uint8_t stat, uint16_t spid); + +        virtual void update_edi_time( +                uint32_t utco, +                uint32_t seconds); + +        virtual void update_rfad(std::array<uint8_t, 9> rfad); +        virtual void update_sti_management(const sti_management_data& data); +        virtual void add_payload(sti_payload_data&& payload); + +        virtual void assemble(void); + +        // Return the assembled frame or an empty frame if not ready +        sti_frame_t getFrame(); + +    private: +        void reinit(void); + +        bool m_proto_valid = false; + +        bool m_management_data_valid = false; +        sti_management_data m_management_data; + +        bool m_stat_valid = false; +        uint8_t m_stat = 0; +        uint16_t m_spid = 0; + +        bool m_time_valid = false; +        uint32_t m_utco = 0; +        uint32_t m_seconds = 0; + +        bool m_payload_valid = false; +        sti_payload_data m_payload; + +        sti_frame_t m_stiFrame; +}; + +} + diff --git a/lib/edi/buffer_unpack.hpp b/lib/edi/buffer_unpack.hpp new file mode 100644 index 0000000..05a1534 --- /dev/null +++ b/lib/edi/buffer_unpack.hpp @@ -0,0 +1,62 @@ +/* +   Copyright (C) 2016 +   Matthias P. Braendli, matthias.braendli@mpb.li + +   http://opendigitalradio.org + +    This program is free software; you can redistribute it and/or modify +    it under the terms of the GNU General Public License as published by +    the Free Software Foundation; either version 2 of the License, or +    (at your option) any later version. + +    This program is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU General Public License for more details. + +    You should have received a copy of the GNU General Public License along +    with this program; if not, write to the Free Software Foundation, Inc., +    51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once +#include <stdint.h> + +namespace EdiDecoder { + +template<class T> +uint16_t read_16b(T buf) +{ +    uint16_t value = 0; +    value = (uint16_t)(buf[0]) << 8; +    value |= (uint16_t)(buf[1]); +    return value; +} + +template<class T> +uint32_t read_24b(T buf) +{ +    uint32_t value = 0; +    value = (uint32_t)(buf[0]) << 16; +    value |= (uint32_t)(buf[1]) << 8; +    value |= (uint32_t)(buf[2]); +    return value; +} + +template<class T> +uint32_t read_32b(T buf) +{ +    uint32_t value = 0; +    value = (uint32_t)(buf[0]) << 24; +    value |= (uint32_t)(buf[1]) << 16; +    value |= (uint32_t)(buf[2]) << 8; +    value |= (uint32_t)(buf[3]); +    return value; +} + +inline uint32_t unpack1bit(uint8_t byte, int bitpos) +{ +    return (byte & 1 << (7-bitpos)) > (7-bitpos); +} + +} diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp new file mode 100644 index 0000000..bc0fa1b --- /dev/null +++ b/lib/edi/common.cpp @@ -0,0 +1,300 @@ +/* +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +   http://opendigitalradio.org + +    This program is free software; you can redistribute it and/or modify +    it under the terms of the GNU General Public License as published by +    the Free Software Foundation; either version 2 of the License, or +    (at your option) any later version. + +    This program is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU General Public License for more details. + +    You should have received a copy of the GNU General Public License along +    with this program; if not, write to the Free Software Foundation, Inc., +    51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#include "common.hpp" +#include "buffer_unpack.hpp" +#include "Log.h" +#include "crc.h" +#include <iomanip> +#include <sstream> +#include <cassert> +#include <cstdio> + +namespace EdiDecoder { + +using namespace std; + +string frame_timestamp_t::to_string() const +{ +    const time_t seconds_in_unix_epoch = to_unix_epoch(); + +    stringstream ss; +    ss << "Timestamp: " << std::put_time(std::gmtime(&seconds_in_unix_epoch), "%c %Z"); +    return ss.str(); +} + +time_t frame_timestamp_t::to_unix_epoch() const +{ +    // EDI epoch: 2000-01-01T00:00:00Z +    // Convert using +    // TZ=UTC python -c 'import datetime; print(datetime.datetime(2000,1,1,0,0,0,0).strftime("%s"))' +    return 946684800 + seconds - utco; +} + + +TagDispatcher::TagDispatcher( +        std::function<void()>&& af_packet_completed, bool verbose) : +    m_af_packet_completed(move(af_packet_completed)) +{ +    m_pft.setVerbose(verbose); +} + +void TagDispatcher::push_bytes(const vector<uint8_t> &buf) +{ +    copy(buf.begin(), buf.end(), back_inserter(m_input_data)); + +    while (m_input_data.size() > 2) { +        if (m_input_data[0] == 'A' and m_input_data[1] == 'F') { +            const decode_state_t st = decode_afpacket(m_input_data); + +            if (st.num_bytes_consumed == 0 and not st.complete) { +                // We need to refill our buffer +                break; +            } + +            if (st.num_bytes_consumed) { +                vector<uint8_t> remaining_data; +                copy(m_input_data.begin() + st.num_bytes_consumed, +                        m_input_data.end(), +                        back_inserter(remaining_data)); +                m_input_data = remaining_data; +            } + +            if (st.complete) { +                m_af_packet_completed(); +            } +        } +        else if (m_input_data[0] == 'P' and m_input_data[1] == 'F') { +            PFT::Fragment fragment; +            const size_t fragment_bytes = fragment.loadData(m_input_data); + +            if (fragment_bytes == 0) { +                // We need to refill our buffer +                break; +            } + +            vector<uint8_t> remaining_data; +            copy(m_input_data.begin() + fragment_bytes, +                    m_input_data.end(), +                    back_inserter(remaining_data)); +            m_input_data = remaining_data; + +            if (fragment.isValid()) { +                m_pft.pushPFTFrag(fragment); +            } + +            auto af = m_pft.getNextAFPacket(); +            if (not af.empty()) { +                decode_state_t st = decode_afpacket(af); + +                if (st.complete) { +                    m_af_packet_completed(); +                } +            } +        } +        else { +            etiLog.log(warn,"Unknown %c!", *m_input_data.data()); +            m_input_data.erase(m_input_data.begin()); +        } +    } +} + +void TagDispatcher::push_packet(const vector<uint8_t> &buf) +{ +    if (buf.size() < 2) { +        throw std::invalid_argument("Not enough bytes to read EDI packet header"); +    } + +    if (buf[0] == 'A' and buf[1] == 'F') { +        const decode_state_t st = decode_afpacket(buf); + +        if (st.complete) { +            m_af_packet_completed(); +        } + +    } +    else if (buf[0] == 'P' and buf[1] == 'F') { +        PFT::Fragment fragment; +        fragment.loadData(buf); + +        if (fragment.isValid()) { +            m_pft.pushPFTFrag(fragment); +        } + +        auto af = m_pft.getNextAFPacket(); +        if (not af.empty()) { +            const decode_state_t st = decode_afpacket(af); + +            if (st.complete) { +                m_af_packet_completed(); +            } +        } +    } +    else { +        const char packettype[3] = {(char)buf[0], (char)buf[1], '\0'}; +        std::stringstream ss; +        ss << "Unknown EDI packet "; +        ss << packettype; +        throw std::invalid_argument(ss.str()); +    } +} + +void TagDispatcher::setMaxDelay(int num_af_packets) +{ +    m_pft.setMaxDelay(num_af_packets); +} + + +#define AFPACKET_HEADER_LEN 10 // includes SYNC +decode_state_t TagDispatcher::decode_afpacket( +        const std::vector<uint8_t> &input_data) +{ +    if (input_data.size() < AFPACKET_HEADER_LEN) { +        return {false, 0}; +    } + +    // read length from packet +    uint32_t taglength = read_32b(input_data.begin() + 2); +    uint16_t seq = read_16b(input_data.begin() + 6); + +    const size_t crclength = 2; +    if (input_data.size() < AFPACKET_HEADER_LEN + taglength + crclength) { +        return {false, 0}; +    } + +    if (m_last_seq + 1 != seq) { +        etiLog.level(warn) << "EDI AF Packet sequence error, " << seq; +    } +    m_last_seq = seq; + +    bool has_crc = (input_data[8] & 0x80) ? true : false; +    uint8_t major_revision = (input_data[8] & 0x70) >> 4; +    uint8_t minor_revision = input_data[8] & 0x0F; +    if (major_revision != 1 or minor_revision != 0) { +        throw invalid_argument("EDI AF Packet has wrong revision " + +                to_string(major_revision) + "." + to_string(minor_revision)); +    } +    uint8_t pt = input_data[9]; +    if (pt != 'T') { +        // only support Tag +        return {false, 0}; +    } + + +    if (not has_crc) { +        throw invalid_argument("AF packet not supported, has no CRC"); +    } + +    uint16_t crc = 0xffff; +    for (size_t i = 0; i < AFPACKET_HEADER_LEN + taglength; i++) { +        crc = crc16(crc, &input_data[i], 1); +    } +    crc ^= 0xffff; + +    uint16_t packet_crc = read_16b(input_data.begin() + AFPACKET_HEADER_LEN + taglength); + +    if (packet_crc != crc) { +        throw invalid_argument( +                "AF Packet crc wrong"); +    } +    else { +        vector<uint8_t> payload(taglength); +        copy(input_data.begin() + AFPACKET_HEADER_LEN, +                input_data.begin() + AFPACKET_HEADER_LEN + taglength, +                payload.begin()); + +        return {decode_tagpacket(payload), +            AFPACKET_HEADER_LEN + taglength + 2}; +    } +} + +void TagDispatcher::register_tag(const std::string& tag, tag_handler&& h) +{ +    m_handlers[tag] = move(h); +} + + +bool TagDispatcher::decode_tagpacket(const vector<uint8_t> &payload) +{ +    size_t length = 0; + +    bool success = true; + +    for (size_t i = 0; i + 8 < payload.size(); i += 8 + length) { +        char tag_sz[5]; +        tag_sz[4] = '\0'; +        copy(payload.begin() + i, payload.begin() + i + 4, tag_sz); + +        string tag(tag_sz); + +        uint32_t taglength = read_32b(payload.begin() + i + 4); + +        if (taglength % 8 != 0) { +            etiLog.log(warn, "Invalid tag length!"); +            break; +        } +        taglength /= 8; + +        length = taglength; + +        vector<uint8_t> tag_value(taglength); +        copy(   payload.begin() + i+8, +                payload.begin() + i+8+taglength, +                tag_value.begin()); + +        bool tagsuccess = false; +        bool found = false; +        for (auto tag_handler : m_handlers) { +            if (tag_handler.first.size() == 4 and tag_handler.first == tag) { +                found = true; +                tagsuccess = tag_handler.second(tag_value, 0); +            } +            else if (tag_handler.first.size() == 3 and +                    tag.substr(0, 3) == tag_handler.first) { +                found = true; +                uint8_t n = tag_sz[3]; +                tagsuccess = tag_handler.second(tag_value, n); +            } +            else if (tag_handler.first.size() == 2 and +                    tag.substr(0, 2) == tag_handler.first) { +                found = true; +                uint16_t n = 0; +                n = (uint16_t)(tag_sz[2]) << 8; +                n |= (uint16_t)(tag_sz[3]); +                tagsuccess = tag_handler.second(tag_value, n); +            } +        } + +        if (not found) { +            etiLog.log(warn, "Ignoring unknown TAG %s", tag.c_str()); +            break; +        } + +        if (not tagsuccess) { +            etiLog.log(warn, "Error decoding TAG %s", tag.c_str()); +            success = tagsuccess; +            break; +        } +    } + +    return success; +} + +} diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp new file mode 100644 index 0000000..1433004 --- /dev/null +++ b/lib/edi/common.hpp @@ -0,0 +1,88 @@ +/* +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +   http://opendigitalradio.org + +    This program is free software; you can redistribute it and/or modify +    it under the terms of the GNU General Public License as published by +    the Free Software Foundation; either version 2 of the License, or +    (at your option) any later version. + +    This program is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU General Public License for more details. + +    You should have received a copy of the GNU General Public License along +    with this program; if not, write to the Free Software Foundation, Inc., +    51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#pragma once + +#include "PFT.hpp" +#include <functional> +#include <map> +#include <string> +#include <vector> +#include <cstddef> +#include <ctime> + +namespace EdiDecoder { + +struct frame_timestamp_t { +    uint32_t seconds = 0; +    uint32_t utco = 0; + +    std::string to_string() const; +    time_t to_unix_epoch() const; +}; + +struct decode_state_t { +    decode_state_t(bool _complete, size_t _num_bytes_consumed) : +        complete(_complete), num_bytes_consumed(_num_bytes_consumed) {} +    bool complete; +    size_t num_bytes_consumed; +}; + +/* The TagDispatcher takes care of decoding EDI, with or without PFT, and + * will call functions when TAGs are encountered. + * + * PF packets are handed over to the PFT decoder, which will in turn return + * AF packets. AF packets are directly dispatched to the TAG functions. + */ +class TagDispatcher { +    public: +        TagDispatcher(std::function<void()>&& af_packet_completed, bool verbose); + +        /* Push bytes into the decoder. The buf can contain more +         * than a single packet. This is useful when reading from streams +         * (files, TCP) +         */ +        void push_bytes(const std::vector<uint8_t> &buf); + +        /* Push a complete packet into the decoder. Useful for UDP and other +         * datagram-oriented protocols. +         */ +        void push_packet(const std::vector<uint8_t> &buf); + +        /* Set the maximum delay in number of AF Packets before we +         * abandon decoding a given pseq. +         */ +        void setMaxDelay(int num_af_packets); + +        using tag_handler = std::function<bool(std::vector<uint8_t>, uint16_t)>; +        void register_tag(const std::string& tag, tag_handler&& h); + +    private: +        decode_state_t decode_afpacket(const std::vector<uint8_t> &input_data); +        bool decode_tagpacket(const std::vector<uint8_t> &payload); + +        PFT::PFT m_pft; +        uint16_t m_last_seq = 0; +        std::vector<uint8_t> m_input_data; +        std::map<std::string, tag_handler> m_handlers; +        std::function<void()> m_af_packet_completed; +}; + +} diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index fb49efc..3142bb3 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -40,6 +40,7 @@  #include "utils.h"  #include "DabMux.h"  #include "ManagementServer.h" +#include "input/Edi.h"  #include "input/Prbs.h"  #include "input/Zmq.h"  #include "input/File.h" @@ -876,34 +877,46 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,          type = pt.get<string>("type");      }      catch (const ptree_error &e) { -        stringstream ss; -        ss << "Subchannel with uid " << subchanuid << " has no type defined!"; -        throw runtime_error(ss.str()); +        throw runtime_error("Subchannel with uid " + subchanuid + " has no type defined!");      } -    /* Both inputfile and inputuri are supported, and are equivalent. -     * inputuri has precedence +    /* Up to v2.3.1, both inputfile and inputuri are supported, and are +     * equivalent.  inputuri has precedence. +     * +     * After that, either inputfile or the (inputproto, inputuri) pair must be given, but not both.       */      string inputUri = pt.get<string>("inputuri", ""); +    string proto = pt.get<string>("inputproto", ""); -    if (inputUri == "") { +    if (inputUri.empty() and proto.empty()) {          try { +            /* Old approach, derives proto from scheme used in the URL. +             * This makes it impossible to distinguish between ZMQ tcp:// and +             * EDI tcp:// +             */              inputUri = pt.get<string>("inputfile"); +            size_t protopos = inputUri.find("://"); + +            if (protopos == string::npos) { +                proto = "file"; +            } +            else { +                proto = inputUri.substr(0, protopos); + +                if (proto == "tcp" or proto == "epgm" or proto == "ipc") { +                    proto = "zmq"; +                } +                else if (proto == "sti-rtp") { +                    proto = "sti"; +                } +            }          }          catch (const ptree_error &e) { -            stringstream ss; -            ss << "Subchannel with uid " << subchanuid << " has no inputUri defined!"; -            throw runtime_error(ss.str()); +            throw runtime_error("Subchannel with uid " + subchanuid + " has no input defined!");          }      } - -    string proto; -    size_t protopos = inputUri.find("://"); -    if (protopos == string::npos) { -        proto = "file"; -    } -    else { -        proto = inputUri.substr(0, protopos); +    else if (inputUri.empty() or proto.empty()) { +        throw runtime_error("Must define both inputuri and inputproto for uid " + subchanuid);      }      subchan->inputUri = inputUri; @@ -928,7 +941,7 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,                  throw logic_error("Incomplete handling of file input");              }          } -        else if (proto == "tcp"  or proto == "epgm" or proto == "ipc") { +        else if (proto == "zmq") {              auto zmqconfig = setup_zmq_input(pt, subchanuid);              if (type == "audio") { @@ -941,15 +954,11 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,                  rcs.enrol(inzmq.get());                  subchan->input = inzmq;              } - -            if (proto == "epgm") { -                etiLog.level(warn) << "Using untested epgm:// zeromq input"; -            } -            else if (proto == "ipc") { -                etiLog.level(warn) << "Using untested ipc:// zeromq input"; -            }          } -        else if (proto == "sti-rtp") { +        else if (proto == "edi") { +            subchan->input = make_shared<Inputs::Edi>(); +        } +        else if (proto == "stp") {              subchan->input = make_shared<Inputs::Sti_d_Rtp>();          }          else { diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h index 386c23c..d1075a6 100644 --- a/src/DabMultiplexer.h +++ b/src/DabMultiplexer.h @@ -37,8 +37,7 @@  #include "fig/FIGCarousel.h"  #include "crc.h"  #include "utils.h" -#include "UdpSocket.h" -#include "InetAddress.h" +#include "Socket.h"  #include "PcDebug.h"  #include "MuxElements.h"  #include "RemoteControl.h" diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 51f0310..578fc63 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -99,8 +99,7 @@ typedef DWORD32 uint32_t;  #include "dabOutput/dabOutput.h"  #include "crc.h" -#include "UdpSocket.h" -#include "InetAddress.h" +#include "Socket.h"  #include "PcDebug.h"  #include "DabMux.h"  #include "MuxElements.h" @@ -305,7 +304,7 @@ int main(int argc, char *argv[])                          edi_conf.destinations.push_back(dest);                      }                      else if (proto == "tcp") { -                        auto dest = make_shared<edi::tcp_destination_t>(); +                        auto dest = make_shared<edi::tcp_server_t>();                          dest->listen_port = pt_edi_dest.second.get<unsigned int>("listenport");                          dest->max_frames_queued = pt_edi_dest.second.get<size_t>("max_frames_queued", 500);                          edi_conf.destinations.push_back(dest); diff --git a/src/InetAddress.cpp b/src/InetAddress.cpp deleted file mode 100644 index 7660263..0000000 --- a/src/InetAddress.cpp +++ /dev/null @@ -1,155 +0,0 @@ -/* -   Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the -   Queen in Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2016 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#include "InetAddress.h" -#include <iostream> -#include <stdio.h> -#include <errno.h> -#include <string.h> - -#ifdef TRACE_ON -# ifndef TRACE_CLASS -#  define TRACE_CLASS(clas, func) cout <<"-" <<(clas) <<"\t(" <<this <<")::" <<(func) <<endl -#  define TRACE_STATIC(clas, func) cout <<"-" <<(clas) <<"\t(static)::" <<(func) <<endl -# endif -#else -# ifndef TRACE_CLASS -#  define TRACE_CLASS(clas, func) -#  define TRACE_STATIC(clas, func) -# endif -#endif - - -int inetErrNo = 0; -const char *inetErrMsg = nullptr; -const char *inetErrDesc = nullptr; - - -/** - *  Constructs an IP address. - *  @param port The port of this address - *  @param name The name of this address - */ -InetAddress::InetAddress(int port, const char* name) { -    TRACE_CLASS("InetAddress", "InetAddress(int, char)"); -    addr.sin_family = PF_INET; -    addr.sin_addr.s_addr = htons(INADDR_ANY); -    addr.sin_port = htons(port); -    if (name) -        setAddress(name); -} - - -/// Returns the raw IP address of this InetAddress object. -sockaddr *InetAddress::getAddress() { -    TRACE_CLASS("InetAddress", "getAddress()"); -    return (sockaddr *)&addr; -} - - -/// Return the port of this address. -int InetAddress::getPort() -{ -    TRACE_CLASS("InetAddress", "getPort()"); -    return ntohs(addr.sin_port); -} - - -/** - *  Returns the IP address string "%d.%d.%d.%d". - *  @return IP address - */ -const char *InetAddress::getHostAddress() { -    TRACE_CLASS("InetAddress", "getHostAddress()"); -    return inet_ntoa(addr.sin_addr); -} - - -/// Returns true if this address is multicast -bool InetAddress::isMulticastAddress() { -    TRACE_CLASS("InetAddress", "isMulticastAddress()"); -    return IN_MULTICAST(ntohl(addr.sin_addr.s_addr));		// a modifier -} - - -/** - *  Set the port number - *  @param port The new port number - */ -void InetAddress::setPort(int port) -{ -    TRACE_CLASS("InetAddress", "setPort(int)"); -    addr.sin_port = htons(port); -} - - -/** - *  Set the address - *  @param name The new address name - *  @return 0  if ok - *          -1 if error - */ -int InetAddress::setAddress(const std::string& name) -{ -    TRACE_CLASS("InetAddress", "setAddress(string)"); -    if (!name.empty()) { -        if (atoi(name.c_str())) {   // If it start with a number -            if ((addr.sin_addr.s_addr = inet_addr(name.c_str())) == INADDR_NONE) { -                addr.sin_addr.s_addr = htons(INADDR_ANY); -                inetErrNo = 0; -                inetErrMsg = "Invalid address"; -                inetErrDesc = name.c_str(); -                return -1; -            } -        } -        else {            // Assume it's a real name -            hostent *host = gethostbyname(name.c_str()); -            if (host) { -                addr.sin_addr = *(in_addr *)(host->h_addr); -            } else { -                addr.sin_addr.s_addr = htons(INADDR_ANY); -                inetErrNo = 0; -                inetErrMsg = "Could not find address"; -                inetErrDesc = name.c_str(); -                return -1; -            } -        } -    } -    else { -        addr.sin_addr.s_addr = INADDR_ANY; -    } -    return 0; -} - - -void setInetError(const char* description) -{ -    inetErrNo = 0; -    inetErrNo = errno; -    inetErrMsg = strerror(inetErrNo); -    inetErrDesc = description; -} - diff --git a/src/InetAddress.h b/src/InetAddress.h deleted file mode 100644 index e246d4c..0000000 --- a/src/InetAddress.h +++ /dev/null @@ -1,78 +0,0 @@ -/* -   Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the -   Queen in Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2016 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#ifndef _InetAddress -#define _InetAddress - -#ifdef HAVE_CONFIG_H -#   include "config.h" -#endif - -#include <stdlib.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <unistd.h> -#include <netdb.h> -#include <arpa/inet.h> -#include <pthread.h> -#include <string> - -#define SOCKET                  int -#define INVALID_SOCKET          -1 -#define INVALID_PORT            -1 - - -/// The last error number -extern int inetErrNo; -/// The last error message -extern const char *inetErrMsg; -/// The description of the last error -extern const char *inetErrDesc; -/// Set the number, message and description of the last error -void setInetError(const char* description); - - -/** - *  This class represents an Internet Protocol (IP) address. - *  @author Pascal Charest pascal.charest@crc.ca - */ -class InetAddress { - public: -  InetAddress(int port = 0, const char* name = NULL); - -  sockaddr *getAddress(); -  const char *getHostAddress(); -  int getPort(); -  int setAddress(const std::string& name); -  void setPort(int port); -  bool isMulticastAddress(); - - private: -  sockaddr_in addr; -}; - - -#endif diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp deleted file mode 100644 index 3ebe73c..0000000 --- a/src/TcpSocket.cpp +++ /dev/null @@ -1,359 +0,0 @@ -/* -   Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in -   Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2019 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#include "TcpSocket.h" -#include "Log.h" -#include <iostream> -#include <cstdio> -#include <cstring> -#include <cstdint> -#include <signal.h> -#include <errno.h> -#include <poll.h> -#include <thread> - -using namespace std; - -using vec_u8 = std::vector<uint8_t>; - - -TcpSocket::TcpSocket() : -    m_sock(INVALID_SOCKET) -{ -} - -TcpSocket::TcpSocket(int port, const string& name) : -    m_sock(INVALID_SOCKET) -{ -    if (port) { -        if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) { -            throw std::runtime_error("Can't create socket"); -        } - -        reuseopt_t reuse = 1; -        if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) -                == SOCKET_ERROR) { -            throw std::runtime_error("Can't reuse address"); -        } - -#if defined(HAVE_SO_NOSIGPIPE) -        int val = 1; -        if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)) -                == SOCKET_ERROR) { -            throw std::runtime_error("Can't set SO_NOSIGPIPE"); -        } -#endif - -        m_own_address.setAddress(name); -        m_own_address.setPort(port); - -        if (::bind(m_sock, m_own_address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) { -            ::close(m_sock); -            m_sock = INVALID_SOCKET; -            throw std::runtime_error("Can't bind socket"); -        } -    } -} - -TcpSocket::TcpSocket(SOCKET sock, InetAddress own, InetAddress remote) : -    m_own_address(own), -    m_remote_address(remote), -    m_sock(sock) { } - -// The move constructors must ensure the moved-from -// TcpSocket won't destroy our socket handle -TcpSocket::TcpSocket(TcpSocket&& other) -{ -    m_sock = other.m_sock; -    other.m_sock = INVALID_SOCKET; - -    m_own_address = other.m_own_address; -    m_remote_address = other.m_remote_address; -} - -TcpSocket& TcpSocket::operator=(TcpSocket&& other) -{ -    m_sock = other.m_sock; -    other.m_sock = INVALID_SOCKET; - -    m_own_address = other.m_own_address; -    m_remote_address = other.m_remote_address; -    return *this; -} - -/** - *  Close the underlying socket. - *  @return 0  if ok - *          -1 if error - */ -int TcpSocket::close() -{ -    if (m_sock != INVALID_SOCKET) { -        int res = ::close(m_sock); -        if (res != 0) { -            setInetError("Can't close socket"); -            return -1; -        } -        m_sock = INVALID_SOCKET; -    } -    return 0; -} - -TcpSocket::~TcpSocket() -{ -    close(); -} - -bool TcpSocket::isValid() -{ -    return m_sock != INVALID_SOCKET; -} - -ssize_t TcpSocket::recv(void* data, size_t size) -{ -    ssize_t ret = ::recv(m_sock, (char*)data, size, 0); -    if (ret == SOCKET_ERROR) { -        stringstream ss; -        ss << "TCP Socket recv error: " << strerror(errno); -        throw std::runtime_error(ss.str()); -    } -    return ret; -} - - -ssize_t TcpSocket::send(const void* data, size_t size, int timeout_ms) -{ -    if (timeout_ms) { -        struct pollfd fds[1]; -        fds[0].fd = m_sock; -        fds[0].events = POLLOUT; - -        const int retval = poll(fds, 1, timeout_ms); - -        if (retval == -1) { -            stringstream ss; -            ss << "TCP Socket send error on poll(): " << strerror(errno); -            throw std::runtime_error(ss.str()); -        } -        else if (retval == 0) { -            // Timed out -            return 0; -        } -    } - -    /* On Linux, the MSG_NOSIGNAL flag ensures that the process would not -     * receive a SIGPIPE and die. -     * Other systems have SO_NOSIGPIPE set on the socket for the same effect. */ -#if defined(HAVE_MSG_NOSIGNAL) -    const int flags = MSG_NOSIGNAL; -#else -    const int flags = 0; -#endif -    const ssize_t ret = ::send(m_sock, (const char*)data, size, flags); - -    if (ret == SOCKET_ERROR) { -        stringstream ss; -        ss << "TCP Socket send error: " << strerror(errno); -        throw std::runtime_error(ss.str()); -    } -    return ret; -} - -void TcpSocket::listen() -{ -    if (::listen(m_sock, 1) == SOCKET_ERROR) { -        stringstream ss; -        ss << "TCP Socket listen error: " << strerror(errno); -        throw std::runtime_error(ss.str()); -    } -} - -TcpSocket TcpSocket::accept() -{ -    InetAddress remote_addr; -    socklen_t addrLen = sizeof(sockaddr_in); - -    SOCKET socket = ::accept(m_sock, remote_addr.getAddress(), &addrLen); -    if (socket == SOCKET_ERROR) { -        stringstream ss; -        ss << "TCP Socket accept error: " << strerror(errno); -        throw std::runtime_error(ss.str()); -    } -    else { -        TcpSocket client(socket, m_own_address, remote_addr); -        return client; -    } -} - -TcpSocket TcpSocket::accept(int timeout_ms) -{ -    struct pollfd fds[1]; -    fds[0].fd = m_sock; -    fds[0].events = POLLIN | POLLOUT; - -    int retval = poll(fds, 1, timeout_ms); - -    if (retval == -1) { -        stringstream ss; -        ss << "TCP Socket accept error: " << strerror(errno); -        throw std::runtime_error(ss.str()); -    } -    else if (retval) { -        return accept(); -    } -    else { -        TcpSocket invalidsock(0, ""); -        return invalidsock; -    } -} - - -InetAddress TcpSocket::getOwnAddress() const -{ -    return m_own_address; -} - -InetAddress TcpSocket::getRemoteAddress() const -{ -    return m_remote_address; -} - - -TCPConnection::TCPConnection(TcpSocket&& sock) : -            queue(), -            m_running(true), -            m_sender_thread(), -            m_sock(move(sock)) -{ -    auto own_addr = m_sock.getOwnAddress(); -    auto addr = m_sock.getRemoteAddress(); -    etiLog.level(debug) << "New TCP Connection on port " << -        own_addr.getPort() << " from " << -        addr.getHostAddress() << ":" << addr.getPort(); -    m_sender_thread = std::thread(&TCPConnection::process, this); -} - -TCPConnection::~TCPConnection() -{ -    m_running = false; -    vec_u8 termination_marker; -    queue.push(termination_marker); -    m_sender_thread.join(); -} - -void TCPConnection::process() -{ -    while (m_running) { -        vec_u8 data; -        queue.wait_and_pop(data); - -        if (data.empty()) { -            // empty vector is the termination marker -            m_running = false; -            break; -        } - -        try { -            ssize_t remaining = data.size(); -            const uint8_t *buf = reinterpret_cast<const uint8_t*>(data.data()); -            const int timeout_ms = 10; // Less than one ETI frame - -            while (m_running and remaining > 0) { -                const ssize_t sent = m_sock.send(buf, remaining, timeout_ms); -                if (sent < 0 or sent > remaining) { -                    throw std::logic_error("Invalid TcpSocket::send() return value"); -                } -                remaining -= sent; -                buf += sent; -            } -        } -        catch (const std::runtime_error& e) { -            m_running = false; -        } -    } - - -    auto own_addr = m_sock.getOwnAddress(); -    auto addr = m_sock.getRemoteAddress(); -    etiLog.level(debug) << "Dropping TCP Connection on port " << -        own_addr.getPort() << " from " << -        addr.getHostAddress() << ":" << addr.getPort(); -} - - -TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) : -    m_max_queue_size(max_queue_size) -{ -} - -TCPDataDispatcher::~TCPDataDispatcher() -{ -    m_running = false; -    m_connections.clear(); -    m_listener_socket.close(); -    m_listener_thread.join(); -} - -void TCPDataDispatcher::start(int port, const string& address) -{ -    TcpSocket sock(port, address); -    m_listener_socket = move(sock); - -    m_running = true; -    m_listener_thread = std::thread(&TCPDataDispatcher::process, this); -} - -void TCPDataDispatcher::write(const vec_u8& data) -{ -    for (auto& connection : m_connections) { -        connection.queue.push(data); -    } - -    m_connections.remove_if( -            [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; }); -} - -void TCPDataDispatcher::process() -{ -    try { -        m_listener_socket.listen(); - -        const int timeout_ms = 1000; - -        while (m_running) { -            // Add a new TCPConnection to the list, constructing it from the client socket -            auto sock = m_listener_socket.accept(timeout_ms); -            if (sock.isValid()) { -                m_connections.emplace(m_connections.begin(), move(sock)); -            } -        } -    } -    catch (const std::runtime_error& e) { -        etiLog.level(error) << "TCPDataDispatcher caught runtime error: " << e.what(); -        m_running = false; -    } -} - diff --git a/src/TcpSocket.h b/src/TcpSocket.h deleted file mode 100644 index ec7afd3..0000000 --- a/src/TcpSocket.h +++ /dev/null @@ -1,164 +0,0 @@ -/* -   Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in -   Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2019 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#ifndef _TCPSOCKET -#define _TCPSOCKET - -#ifdef HAVE_CONFIG_H -#   include "config.h" -#endif - -#include "InetAddress.h" -#include "ThreadsafeQueue.h" -#include <sys/socket.h> -#include <netinet/in.h> -#include <unistd.h> -#include <netdb.h> -#include <arpa/inet.h> -#include <pthread.h> -#define SOCKET           int -#define INVALID_SOCKET   -1 -#define SOCKET_ERROR     -1 -#define reuseopt_t       int - -#include <iostream> -#include <string> -#include <vector> -#include <memory> -#include <atomic> -#include <thread> -#include <list> - -/** - *  This class represents a TCP socket. - */ -class TcpSocket -{ -    public: -        /** Create a new socket that does nothing */ -        TcpSocket(); - -        /** Create a new socket listening for incoming connections. -         *  @param port The port number on which the socket will listen. -         *  @param name The IP address on which the socket will be bound. -         *              It is used to bind the socket on a specific interface if -         *              the computer have many NICs. -         */ -        TcpSocket(int port, const std::string& name); -        ~TcpSocket(); -        TcpSocket(TcpSocket&& other); -        TcpSocket& operator=(TcpSocket&& other); -        TcpSocket(const TcpSocket& other) = delete; -        TcpSocket& operator=(const TcpSocket& other) = delete; - -        bool isValid(void); - -        int close(void); - -        /** Send data over the TCP connection. -         *  @param data The buffer that will be sent. -         *  @param size Number of bytes to send. -         *  @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout -         *  return number of bytes sent, 0 on timeout, or throws runtime_error. -         */ -        ssize_t send(const void* data, size_t size, int timeout_ms=0); - -        /** Receive data from the socket. -         *  @param data The buffer that will receive data. -         *  @param size The buffer size. -         *  @return number of bytes received or -1 (SOCKET_ERROR) if error -         */ -        ssize_t recv(void* data, size_t size); - -        void listen(void); -        TcpSocket accept(void); - -        /* Returns either valid socket if a connection was -         * accepted before the timeout expired, or an invalid -         * socket otherwise. -         */ -        TcpSocket accept(int timeout_ms); - -        /** Retrieve address this socket is bound to */ -        InetAddress getOwnAddress() const; -        InetAddress getRemoteAddress() const; - -    private: -        TcpSocket(SOCKET sock, InetAddress own, InetAddress remote); - -        /// The address on which the socket is bound. -        InetAddress m_own_address; -        InetAddress m_remote_address; -        /// The low-level socket used by system functions. -        SOCKET m_sock; -}; - -/* Helper class for TCPDataDispatcher, contains a queue of pending data and - * a sender thread. */ -class TCPConnection -{ -    public: -        TCPConnection(TcpSocket&& sock); -        TCPConnection(const TCPConnection&) = delete; -        TCPConnection& operator=(const TCPConnection&) = delete; -        ~TCPConnection(); - -        ThreadsafeQueue<std::vector<uint8_t> > queue; - -    private: -        std::atomic<bool> m_running; -        std::thread m_sender_thread; -        TcpSocket m_sock; - -        void process(void); -}; - -/* Send a TCP stream to several destinations, and automatically disconnect destinations - * whose buffer overflows. - */ -class TCPDataDispatcher -{ -    public: -        TCPDataDispatcher(size_t max_queue_size); -        ~TCPDataDispatcher(); -        TCPDataDispatcher(const TCPDataDispatcher&) = delete; -        TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete; - -        void start(int port, const std::string& address); -        void write(const std::vector<uint8_t>& data); - -    private: -        void process(void); - -        size_t m_max_queue_size; - -        std::atomic<bool> m_running; -        std::thread m_listener_thread; -        TcpSocket m_listener_socket; -        std::list<TCPConnection> m_connections; -}; - -#endif // _TCPSOCKET diff --git a/src/UdpSocket.cpp b/src/UdpSocket.cpp deleted file mode 100644 index 3d015ec..0000000 --- a/src/UdpSocket.cpp +++ /dev/null @@ -1,256 +0,0 @@ -/* -   Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the -   Queen in Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2017 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#include "UdpSocket.h" - -#include <iostream> -#include <stdio.h> -#include <errno.h> -#include <fcntl.h> -#include <string.h> - -using namespace std; - -UdpSocket::UdpSocket() : -    listenSocket(INVALID_SOCKET) -{ -    reinit(0, ""); -} - -UdpSocket::UdpSocket(int port) : -    listenSocket(INVALID_SOCKET) -{ -    reinit(port, ""); -} - -UdpSocket::UdpSocket(int port, const std::string& name) : -    listenSocket(INVALID_SOCKET) -{ -    reinit(port, name); -} - - -int UdpSocket::setBlocking(bool block) -{ -    int res; -    if (block) -        res = fcntl(listenSocket, F_SETFL, 0); -    else -        res = fcntl(listenSocket, F_SETFL, O_NONBLOCK); -    if (res == SOCKET_ERROR) { -        setInetError("Can't change blocking state of socket"); -        return -1; -    } -    return 0; -} - -int UdpSocket::reinit(int port, const std::string& name) -{ -    if (listenSocket != INVALID_SOCKET) { -        ::close(listenSocket); -    } - -    if ((listenSocket = socket(PF_INET, SOCK_DGRAM, 0)) == INVALID_SOCKET) { -        setInetError("Can't create socket"); -        return -1; -    } -    reuseopt_t reuse = 1; -    if (setsockopt(listenSocket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) -            == SOCKET_ERROR) { -        setInetError("Can't reuse address"); -        return -1; -    } - -    if (port) { -        address.setAddress(name); -        address.setPort(port); - -        if (::bind(listenSocket, address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) { -            setInetError("Can't bind socket"); -            ::close(listenSocket); -            listenSocket = INVALID_SOCKET; -            return -1; -        } -    } -    return 0; -} - -int UdpSocket::close() -{ -    if (listenSocket != INVALID_SOCKET) { -        ::close(listenSocket); -    } - -    listenSocket = INVALID_SOCKET; - -    return 0; -} - -UdpSocket::~UdpSocket() -{ -    if (listenSocket != INVALID_SOCKET) { -        ::close(listenSocket); -    } -} - - -int UdpSocket::receive(UdpPacket& packet) -{ -    socklen_t addrSize; -    addrSize = sizeof(*packet.getAddress().getAddress()); -    ssize_t ret = recvfrom(listenSocket, -            packet.getData(), -            packet.getSize(), -            0, -            packet.getAddress().getAddress(), -            &addrSize); - -    if (ret == SOCKET_ERROR) { -        packet.setSize(0); -        if (errno == EAGAIN) { -            return 0; -        } -        setInetError("Can't receive UDP packet"); -        return -1; -    } - -    packet.setSize(ret); -    return 0; -} - -int UdpSocket::send(UdpPacket& packet) -{ -    int ret = sendto(listenSocket, packet.getData(), packet.getSize(), 0, -            packet.getAddress().getAddress(), sizeof(*packet.getAddress().getAddress())); -    if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { -        setInetError("Can't send UDP packet"); -        return -1; -    } -    return 0; -} - - -int UdpSocket::send(const std::vector<uint8_t>& data, InetAddress destination) -{ -    int ret = sendto(listenSocket, &data[0], data.size(), 0, -            destination.getAddress(), sizeof(*destination.getAddress())); -    if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { -        setInetError("Can't send UDP packet"); -        return -1; -    } -    return 0; -} - - -/** - *  Must be called to receive data on a multicast address. - *  @param groupname The multica -st address to join. - *  @return 0 if ok, -1 if error - */ -int UdpSocket::joinGroup(char* groupname) -{ -    ip_mreqn group; -    if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { -        setInetError(groupname); -        return -1; -    } -    if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { -        setInetError("Not a multicast address"); -        return -1; -    } -    group.imr_address.s_addr = htons(INADDR_ANY);; -    group.imr_ifindex = 0; -    if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) -            == SOCKET_ERROR) { -        setInetError("Can't join multicast group"); -    } -    return 0; -} - -int UdpSocket::setMulticastTTL(int ttl) -{ -    if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) -            == SOCKET_ERROR) { -        setInetError("Can't set ttl"); -        return -1; -    } - -    return 0; -} - -int UdpSocket::setMulticastSource(const char* source_addr) -{ -    struct in_addr addr; -    if (inet_aton(source_addr, &addr) == 0) { -        setInetError("Can't parse source address"); -        return -1; -    } - -    if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) -            == SOCKET_ERROR) { -        setInetError("Can't set source address"); -        return -1; -    } - -    return 0; -} - -UdpPacket::UdpPacket() { } - -UdpPacket::UdpPacket(size_t initSize) : -    m_buffer(initSize) -{ } - - -void UdpPacket::setSize(size_t newSize) -{ -    m_buffer.resize(newSize); -} - - -uint8_t* UdpPacket::getData() -{ -    return &m_buffer[0]; -} - - -void UdpPacket::addData(const void *data, size_t size) -{ -    uint8_t *d = (uint8_t*)data; -    std::copy(d, d + size, std::back_inserter(m_buffer)); -} - -size_t UdpPacket::getSize() -{ -    return m_buffer.size(); -} - -InetAddress UdpPacket::getAddress() -{ -    return address; -} - diff --git a/src/UdpSocket.h b/src/UdpSocket.h deleted file mode 100644 index f51e87c..0000000 --- a/src/UdpSocket.h +++ /dev/null @@ -1,174 +0,0 @@ -/* -   Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the -   Queen in Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2017 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#pragma once - -#ifdef HAVE_CONFIG_H -#   include "config.h" -#endif - -#include "InetAddress.h" -#include <sys/socket.h> -#include <netinet/in.h> -#include <unistd.h> -#include <netdb.h> -#include <arpa/inet.h> -#include <pthread.h> -#define SOCKET           int -#define INVALID_SOCKET   -1 -#define SOCKET_ERROR     -1 -#define reuseopt_t       int - -#include <stdlib.h> -#include <iostream> -#include <vector> - -class UdpPacket; - - -/** - *  This class represents a socket for sending and receiving UDP packets. - * - *  A UDP socket is the sending or receiving point for a packet delivery service. - *  Each packet sent or received on a datagram socket is individually - *  addressed and routed. Multiple packets sent from one machine to another may - *  be routed differently, and may arrive in any order. - */ -class UdpSocket -{ -    public: -        /** Create a new socket that will not be bound to any port. To be used -         * for data output. -         */ -        UdpSocket(); -        /** Create a new socket. -         *  @param port The port number on which the socket will be bound -         */ -        UdpSocket(int port); -        /** Create a new socket. -         *  @param port The port number on which the socket will be bound -         *  @param name The IP address on which the socket will be bound. -         *              It is used to bind the socket on a specific interface if -         *              the computer have many NICs. -         */ -        UdpSocket(int port, const std::string& name); -        ~UdpSocket(); -        UdpSocket(const UdpSocket& other) = delete; -        const UdpSocket& operator=(const UdpSocket& other) = delete; - -        /** reinitialise socket. Close the already open socket, and -         * create a new one -         */ -        int reinit(int port, const std::string& name); - -        /** Close the socket -         */ -        int close(void); - -        /** Send an UDP packet. -         *  @param packet The UDP packet to be sent. It includes the data and the -         *                destination address -         *  return 0 if ok, -1 if error -         */ -        int send(UdpPacket& packet); - -        /** Send an UDP packet -         * -         *  return 0 if ok, -1 if error -         */ -        int send(const std::vector<uint8_t>& data, InetAddress destination); - -        /** Receive an UDP packet. -         *  @param packet The packet that will receive the data. The address will be set -         *                to the source address. -         *  @return 0 if ok, -1 if error -         */ -        int receive(UdpPacket& packet); - -        int joinGroup(char* groupname); -        int setMulticastSource(const char* source_addr); -        int setMulticastTTL(int ttl); - -        /** Set blocking mode. By default, the socket is blocking. -         *  @return 0  if ok -         *          -1 if error -         */ -        int setBlocking(bool block); - -    protected: - -        /// The address on which the socket is bound. -        InetAddress address; -        /// The low-level socket used by system functions. -        SOCKET listenSocket; -}; - -/** This class represents a UDP packet. - * - *  A UDP packet contains a payload (sequence of bytes) and an address. For - *  outgoing packets, the address is the destination address. For incoming - *  packets, the address tells the user from what source the packet arrived from. - */ -class UdpPacket -{ -    public: -        /** Construct an empty UDP packet. -         */ -        UdpPacket(); -        UdpPacket(size_t initSize); - -        /** Give the pointer to data. -         *  @return The pointer -         */ -        uint8_t* getData(void); - -        /** Append some data at the end of data buffer and adjust size. -         *  @param data Pointer to the data to add -         *  @param size Size in bytes of new data -         */ -        void addData(const void *data, size_t size); - -        size_t getSize(void); - -        /** Changes size of the data buffer size. Keeps data intact unless -         *  truncated. -         */ -        void setSize(size_t newSize); - -        /** Returns the UDP address of the packet. -         */ -        InetAddress getAddress(void); - -        const std::vector<uint8_t>& getBuffer(void) const { -            return m_buffer; -        } - - -    private: -        std::vector<uint8_t> m_buffer; -        InetAddress address; -}; - diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h index 9cc18d7..c7e570b 100644 --- a/src/dabOutput/dabOutput.h +++ b/src/dabOutput/dabOutput.h @@ -28,8 +28,7 @@  #pragma once -#include "UdpSocket.h" -#include "TcpSocket.h" +#include "Socket.h"  #include "Log.h"  #include "string.h"  #include <stdexcept> @@ -57,6 +56,8 @@ class DabOutput          {              return Open(name.c_str());          } + +        // Return -1 on failure          virtual int Write(void* buffer, int size) = 0;          virtual int Close() = 0; @@ -145,15 +146,7 @@ class DabOutputRaw : public DabOutput  class DabOutputUdp : public DabOutput  {      public: -        DabOutputUdp() { -            packet_ = new UdpPacket(6144); -            socket_ = new UdpSocket(); -        } - -        virtual ~DabOutputUdp() { -            delete socket_; -            delete packet_; -        } +        DabOutputUdp();          int Open(const char* name);          int Write(void* buffer, int size); @@ -171,8 +164,8 @@ class DabOutputUdp : public DabOutput          DabOutputUdp operator=(const DabOutputUdp& other) = delete;          std::string uri_; -        UdpSocket* socket_; -        UdpPacket* packet_; +        Socket::UDPSocket socket_; +        Socket::UDPPacket packet_;  };  // -------------- TCP ------------------ @@ -190,7 +183,7 @@ class DabOutputTcp : public DabOutput      private:          std::string uri_; -        std::shared_ptr<TCPDataDispatcher> dispatcher_; +        std::shared_ptr<Socket::TCPDataDispatcher> dispatcher_;  };  // -------------- Simul ------------------ diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp index 87dbfd5..4dc3538 100644 --- a/src/dabOutput/dabOutputTcp.cpp +++ b/src/dabOutput/dabOutputTcp.cpp @@ -94,7 +94,7 @@ int DabOutputTcp::Open(const char* name)      uri_ = name;      if (success) { -        dispatcher_ = make_shared<TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES); +        dispatcher_ = make_shared<Socket::TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES);          dispatcher_->start(port, address);      }      else { diff --git a/src/dabOutput/dabOutputUdp.cpp b/src/dabOutput/dabOutputUdp.cpp index c129569..b9c22db 100644 --- a/src/dabOutput/dabOutputUdp.cpp +++ b/src/dabOutput/dabOutputUdp.cpp @@ -38,18 +38,12 @@  #include <cstdio>  #include <limits.h>  #include "dabOutput.h" -#include "UdpSocket.h" - -#ifdef _WIN32 -#   include <fscfg.h> -#   include <sdci.h> -#else -#   include <netinet/in.h> -#   include <sys/types.h> -#   include <sys/socket.h> -#   include <sys/ioctl.h> -#   include <net/if_arp.h> -#endif +#include "Socket.h" + +DabOutputUdp::DabOutputUdp() : +    socket_(), +    packet_(6144) +{ }  int DabOutputUdp::Open(const char* name)  { @@ -64,12 +58,6 @@ int DabOutputUdp::Open(const char* name)                  regex_constants::match_default)) {          string address = what[1]; -        if (this->packet_->getAddress().setAddress(address.c_str()) == -1) { -            etiLog.level(error) << "can't set address " << -               address <<  "(" << inetErrDesc << ": " << inetErrMsg << ")"; -            return -1; -        } -          string port_str = what[2];          long port = std::strtol(port_str.c_str(), nullptr, 0); @@ -79,7 +67,7 @@ int DabOutputUdp::Open(const char* name)              return -1;          } -        this->packet_->getAddress().setPort(port); +        packet_.address.resolveUdpDestination(address, port);          string query_params = what[3];          smatch query_what; @@ -87,28 +75,25 @@ int DabOutputUdp::Open(const char* name)                      regex_constants::match_default)) {              string src = query_what[1]; -            int err = socket_->setMulticastSource(src.c_str()); -            if (err) { -                etiLog.level(error) << "UDP output socket set source failed!"; -                return -1; -            } +            try { +                socket_.setMulticastSource(src.c_str()); -            string ttl_str = query_what[2]; +                string ttl_str = query_what[2]; -            if (not ttl_str.empty()) { -                long ttl = std::strtol(ttl_str.c_str(), nullptr, 0); -                if ((ttl <= 0) || (ttl >= 255)) { -                    etiLog.level(error) << "Invalid TTL setting in " << -                        uri_without_proto; -                    return -1; -                } +                if (not ttl_str.empty()) { +                    long ttl = std::strtol(ttl_str.c_str(), nullptr, 0); +                    if ((ttl <= 0) || (ttl >= 255)) { +                        etiLog.level(error) << "Invalid TTL setting in " << +                            uri_without_proto; +                        return -1; +                    } -                err = socket_->setMulticastTTL(ttl); -                if (err) { -                    etiLog.level(error) << "UDP output socket set TTL failed!"; -                    return -1; +                    socket_.setMulticastTTL(ttl);                  }              } +            catch (const std::runtime_error& e) { +                etiLog.level(error) << "Failed to set UDP output settings" << e.what(); +            }          }          else if (not query_params.empty()) {              etiLog.level(error) << "UDP output: could not parse parameters " << @@ -129,9 +114,11 @@ int DabOutputUdp::Open(const char* name)  int DabOutputUdp::Write(void* buffer, int size)  { -    this->packet_->setSize(0); -    this->packet_->addData(buffer, size); -    return this->socket_->send(*this->packet_); +    const uint8_t *buf = reinterpret_cast<uint8_t*>(buffer); +    packet_.buffer.resize(0); +    std::copy(buf, buf + size, std::back_inserter(packet_.buffer)); +    socket_.send(packet_); +    return 0;  }  #endif // defined(HAVE_OUTPUT_UDP) diff --git a/src/dabOutput/edi/Config.h b/src/dabOutput/edi/Config.h index 55d5f0f..0c7dce8 100644 --- a/src/dabOutput/edi/Config.h +++ b/src/dabOutput/edi/Config.h @@ -50,11 +50,18 @@ struct udp_destination_t : public destination_t {  };  // TCP server that can accept multiple connections -struct tcp_destination_t : public destination_t { +struct tcp_server_t : public destination_t {      unsigned int listen_port = 0;      size_t max_frames_queued = 1024;  }; +// TCP client that connects to one endpoint +struct tcp_client_t : public destination_t { +    std::string dest_addr; +    unsigned int dest_port = 0; +    size_t max_frames_queued = 1024; +}; +  struct configuration_t {      unsigned chunk_len = 207;        // RSk, data length of each chunk      unsigned fec       = 0;          // number of fragments that can be recovered diff --git a/src/dabOutput/edi/PFT.cpp b/src/dabOutput/edi/PFT.cpp index 5b93016..63dfa34 100644 --- a/src/dabOutput/edi/PFT.cpp +++ b/src/dabOutput/edi/PFT.cpp @@ -314,7 +314,7 @@ std::vector< PFTFragment > PFT::Assemble(AFPacket af_packet)  #if 0          fprintf(stderr, "* PFT pseq %d, findex %d, fcount %d, plen %d\n", -                m_pseq, findex, fcount, plen & ~0x8000); +                m_pseq, findex, fcount, plen & ~0xC000);  #endif      } diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp index d99e987..187aabe 100644 --- a/src/dabOutput/edi/Transport.cpp +++ b/src/dabOutput/edi/Transport.cpp @@ -45,12 +45,16 @@ void configuration_t::print() const              }              etiLog.level(info) << "  source port " << udp_dest->source_port;          } -        else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) { +        else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {              etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port;              etiLog.level(info) << "  max frames queued    " << tcp_dest->max_frames_queued;          } +        else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { +            etiLog.level(info) << " TCP client connecting to " << tcp_dest->dest_addr << ":" << tcp_dest->dest_port; +            etiLog.level(info) << "  max frames queued    " << tcp_dest->max_frames_queued; +        }          else { -            throw std::logic_error("EDI destination not implemented"); +            throw logic_error("EDI destination not implemented");          }      }      if (interleaver_enabled()) { @@ -69,28 +73,27 @@ Sender::Sender(const configuration_t& conf) :      for (const auto& edi_dest : m_conf.destinations) {          if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) { -            auto udp_socket = std::make_shared<UdpSocket>(udp_dest->source_port); +            auto udp_socket = std::make_shared<Socket::UDPSocket>(udp_dest->source_port);              if (not udp_dest->source_addr.empty()) { -                int err = udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); -                if (err) { -                    throw runtime_error("EDI socket set source failed!"); -                } -                err = udp_socket->setMulticastTTL(udp_dest->ttl); -                if (err) { -                    throw runtime_error("EDI socket set TTL failed!"); -                } +                udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); +                udp_socket->setMulticastTTL(udp_dest->ttl);              }              udp_sockets.emplace(udp_dest.get(), udp_socket);          } -        else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) { -            auto dispatcher = make_shared<TCPDataDispatcher>(tcp_dest->max_frames_queued); +        else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) { +            auto dispatcher = make_shared<Socket::TCPDataDispatcher>(tcp_dest->max_frames_queued);              dispatcher->start(tcp_dest->listen_port, "0.0.0.0");              tcp_dispatchers.emplace(tcp_dest.get(), dispatcher);          } +        else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { +            auto tcp_socket = make_shared<Socket::TCPSocket>(); +            tcp_socket->connect(tcp_dest->dest_addr, tcp_dest->dest_port); +            tcp_senders.emplace(tcp_dest.get(), tcp_socket); +        }          else { -            throw std::logic_error("EDI destination not implemented"); +            throw logic_error("EDI destination not implemented");          }      } @@ -117,7 +120,7 @@ void Sender::write(const TagPacket& tagpacket)          vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet);          if (m_conf.verbose) { -            fprintf(stderr, "EDI number of PFT fragment before interleaver %zu", +            fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n",                      edi_fragments.size());          } @@ -129,28 +132,30 @@ void Sender::write(const TagPacket& tagpacket)          for (const auto& edi_frag : edi_fragments) {              for (auto& dest : m_conf.destinations) {                  if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { -                    InetAddress addr; -                    addr.setAddress(udp_dest->dest_addr.c_str()); -                    addr.setPort(m_conf.dest_port); +                    Socket::InetAddress addr; +                    addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port);                      udp_sockets.at(udp_dest.get())->send(edi_frag, addr);                  } -                else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) { +                else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {                      tcp_dispatchers.at(tcp_dest.get())->write(edi_frag);                  } +                else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { +                    tcp_senders.at(tcp_dest.get())->sendall(edi_frag.data(), edi_frag.size()); +                }                  else { -                    throw std::logic_error("EDI destination not implemented"); +                    throw logic_error("EDI destination not implemented");                  }              }              if (m_conf.dump) { -                std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); -                std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator); +                ostream_iterator<uint8_t> debug_iterator(edi_debug_file); +                copy(edi_frag.begin(), edi_frag.end(), debug_iterator);              }          }          if (m_conf.verbose) { -            fprintf(stderr, "EDI number of PFT fragments %zu", +            fprintf(stderr, "EDI number of PFT fragments %zu\n",                      edi_fragments.size());          }      } @@ -158,23 +163,25 @@ void Sender::write(const TagPacket& tagpacket)          // Send over ethernet          for (auto& dest : m_conf.destinations) {              if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { -                InetAddress addr; -                addr.setAddress(udp_dest->dest_addr.c_str()); -                addr.setPort(m_conf.dest_port); +                Socket::InetAddress addr; +                addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port);                  udp_sockets.at(udp_dest.get())->send(af_packet, addr);              } -            else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) { +            else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {                  tcp_dispatchers.at(tcp_dest.get())->write(af_packet);              } +            else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { +                tcp_senders.at(tcp_dest.get())->sendall(af_packet.data(), af_packet.size()); +            }              else { -                throw std::logic_error("EDI destination not implemented"); +                throw logic_error("EDI destination not implemented");              }          }          if (m_conf.dump) { -            std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); -            std::copy(af_packet.begin(), af_packet.end(), debug_iterator); +            ostream_iterator<uint8_t> debug_iterator(edi_debug_file); +            copy(af_packet.begin(), af_packet.end(), debug_iterator);          }      }  } diff --git a/src/dabOutput/edi/Transport.h b/src/dabOutput/edi/Transport.h index 7b0a0db..9633275 100644 --- a/src/dabOutput/edi/Transport.h +++ b/src/dabOutput/edi/Transport.h @@ -32,11 +32,12 @@  #include "AFPacket.h"  #include "PFT.h"  #include "Interleaver.h" +#include "Socket.h"  #include <vector>  #include <unordered_map>  #include <stdexcept> +#include <fstream>  #include <cstdint> -#include "dabOutput/dabOutput.h"  namespace edi { @@ -61,8 +62,9 @@ class Sender {          // To mitigate for burst packet loss, PFT fragments can be sent out-of-order          edi::Interleaver edi_interleaver; -        std::unordered_map<udp_destination_t*, std::shared_ptr<UdpSocket>> udp_sockets; -        std::unordered_map<tcp_destination_t*, std::shared_ptr<TCPDataDispatcher>> tcp_dispatchers; +        std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets; +        std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers; +        std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSocket>> tcp_senders;  };  } diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp new file mode 100644 index 0000000..8aee296 --- /dev/null +++ b/src/input/Edi.cpp @@ -0,0 +1,189 @@ +/* +   Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications +   Research Center Canada) + +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org +   */ +/* +   This file is part of ODR-DabMux. + +   ODR-DabMux is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMux is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. +   */ + +#include "input/Edi.h" + +#include <regex> +#include <stdexcept> +#include <sstream> +#include <cstring> +#include <cstdlib> +#include <cerrno> +#include <climits> +#include "utils.h" + +using namespace std; + +namespace Inputs { + +constexpr bool VERBOSE = false; +constexpr size_t TCP_BLOCKSIZE = 2048; +constexpr size_t MAX_FRAMES_QUEUED = 10; + +Edi::Edi() : +    m_tcp_receive_server(TCP_BLOCKSIZE), +    m_sti_writer(), +    m_sti_decoder(m_sti_writer, VERBOSE) +{ } + +Edi::~Edi() { +    m_running = false; +    if (m_thread.joinable()) { +        m_thread.join(); +    } +} + +int Edi::open(const std::string& name) +{ +    const std::regex re_udp("udp://:([0-9]+)"); +    const std::regex re_tcp("tcp://(.*):([0-9]+)"); + +    lock_guard<mutex> lock(m_mutex); + +    m_running = false; +    if (m_thread.joinable()) { +        m_thread.join(); +    } + +    std::smatch m; +    if (std::regex_match(name, m, re_udp)) { +        const int udp_port = std::stoi(m[1].str()); +        m_input_used = InputUsed::UDP; +        m_udp_sock.reinit(udp_port); +        m_udp_sock.setBlocking(false); +        // TODO multicast +    } +    else if (std::regex_match(name, m, re_tcp)) { +        m_input_used = InputUsed::TCP; +        const string addr = m[1].str(); +        const int tcp_port = std::stoi(m[2].str()); +        m_tcp_receive_server.start(tcp_port, addr); +    } +    else { +        throw runtime_error("Cannot parse EDI input URI"); +    } + +    m_name = name; + +    m_running = true; +    m_thread = std::thread(&Edi::m_run, this); + +    return 0; +} + +int Edi::readFrame(uint8_t* buffer, size_t size) +{ +    vector<uint8_t> frame; +    if (m_is_prebuffering) { +        m_is_prebuffering = m_frames.size() < 10; +        if (not m_is_prebuffering) { +            etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete."; +        } +    } +    else if (m_frames.try_pop(frame)) { +        if (frame.size() == 0) { +            etiLog.level(debug) << "EDI input " << m_name << " empty frame"; +            return 0; +        } +        else if (frame.size() == size) { +            std::copy(frame.cbegin(), frame.cend(), buffer); +        } +        else { +            etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << frame.size() << +                " received, " << size << " requested"; +            memset(buffer, 0, size * sizeof(*buffer)); +        } +    } +    else { +        memset(buffer, 0, size * sizeof(*buffer)); +        m_is_prebuffering = true; +        etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering"; +    } +    return size; +} + +void Edi::m_run() +{ +    while (m_running) { +        bool work_done = false; + +        switch (m_input_used) { +            case InputUsed::UDP: +                { +                    constexpr size_t packsize = 2048; +                    const auto packet = m_udp_sock.receive(packsize); +                    if (packet.buffer.size() == packsize) { +                        fprintf(stderr, "Warning, possible UDP truncation\n"); +                    } +                    if (not packet.buffer.empty()) { +                        m_sti_decoder.push_packet(packet.buffer); +                        work_done = true; +                    } +                } +                break; +            case InputUsed::TCP: +                { +                    auto packet = m_tcp_receive_server.receive(); +                    if (not packet.empty()) { +                        m_sti_decoder.push_bytes(packet); +                        work_done = true; +                    } +                } +                break; +            default: +                throw logic_error("unimplemented input"); +        } + +        const auto sti = m_sti_writer.getFrame(); +        if (not sti.frame.empty()) { +            m_frames.push_wait_if_full(move(sti.frame), MAX_FRAMES_QUEUED); +            work_done = true; +        } + +        if (not work_done) { +            // Avoid fast loop +            this_thread::sleep_for(chrono::milliseconds(12)); +        } +    } +} + +int Edi::setBitrate(int bitrate) +{ +    if (bitrate <= 0) { +        etiLog.level(error) << "Invalid bitrate (" << bitrate << ") for " << m_name; +        return -1; +    } + +    return bitrate; +} + +int Edi::close() +{ +    m_udp_sock.close(); +    return 0; +} + +} diff --git a/src/input/Edi.h b/src/input/Edi.h new file mode 100644 index 0000000..7b3dc04 --- /dev/null +++ b/src/input/Edi.h @@ -0,0 +1,82 @@ +/* +   Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications +   Research Center Canada) + +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org +   */ +/* +   This file is part of ODR-DabMux. + +   ODR-DabMux is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMux is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. +   */ + +#pragma once + +#include <string> +#include <vector> +#include <deque> +#include <thread> +#include <mutex> +#include "Socket.h" +#include "input/inputs.h" +#include "edi/STIDecoder.hpp" +#include "edi/STIWriter.hpp" +#include "ThreadsafeQueue.h" + +namespace Inputs { + +/* + * Receives EDI from UDP or TCP in a separate thread and pushes that data + * into the STIDecoder. Complete frames are then put into a queue for the consumer. + * + * This way, the EDI decoding happens in a separate thread. + */ +class Edi : public InputBase { +    public: +        Edi(); +        Edi(const Edi&) = delete; +        Edi& operator=(const Edi&) = delete; +        ~Edi(); + +        virtual int open(const std::string& name); +        virtual int readFrame(uint8_t* buffer, size_t size); +        virtual int setBitrate(int bitrate); +        virtual int close(); + +    protected: +        void m_run(); + +        std::mutex m_mutex; + +        enum class InputUsed { Invalid, UDP, TCP }; +        InputUsed m_input_used = InputUsed::Invalid; +        Socket::UDPSocket m_udp_sock; +        Socket::TCPReceiveServer m_tcp_receive_server; + +        EdiDecoder::STIWriter m_sti_writer; +        EdiDecoder::STIDecoder m_sti_decoder; +        std::thread m_thread; +        std::atomic<bool> m_running = ATOMIC_VAR_INIT(false); +        ThreadsafeQueue<std::vector<uint8_t> > m_frames; + +        bool m_is_prebuffering = true; + +        std::string m_name; +}; + +}; + diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp index 2cb49e7..5d4f964 100644 --- a/src/input/Udp.cpp +++ b/src/input/Udp.cpp @@ -82,17 +82,8 @@ void Udp::openUdpSocket(const std::string& endpoint)          throw out_of_range("can't use port number 0 in udp address");      } -    if (m_sock.reinit(port, address) == -1) { -        stringstream ss; -        ss << "Could not init UDP socket: " << inetErrMsg; -        throw runtime_error(ss.str()); -    } - -    if (m_sock.setBlocking(false) == -1) { -        stringstream ss; -        ss << "Could not set non-blocking UDP socket: " << inetErrMsg; -        throw runtime_error(ss.str()); -    } +    m_sock.reinit(port, address); +    m_sock.setBlocking(false);      etiLog.level(info) << "Opened UDP port " << address << ":" << port;  } @@ -100,17 +91,9 @@ void Udp::openUdpSocket(const std::string& endpoint)  int Udp::readFrame(uint8_t* buffer, size_t size)  {      // Regardless of buffer contents, try receiving data. -    UdpPacket packet(32768); -    int ret = m_sock.receive(packet); - -    if (ret == -1) { -        stringstream ss; -        ss << "Could not read from UDP socket: " << inetErrMsg; -        throw runtime_error(ss.str()); -    } +    auto packet = m_sock.receive(32768); -    std::copy(packet.getData(), packet.getData() + packet.getSize(), -            back_inserter(m_buffer)); +    std::copy(packet.buffer.cbegin(), packet.buffer.cend(), back_inserter(m_buffer));      // Take data from the buffer if it contains enough data,      // in any case write the buffer @@ -136,7 +119,8 @@ int Udp::setBitrate(int bitrate)  int Udp::close()  { -    return m_sock.close(); +    m_sock.close(); +    return 0;  } @@ -167,8 +151,8 @@ static uint16_t unpack2(const uint8_t *buf)  int Sti_d_Rtp::open(const std::string& name)  { -    // Skip the sti-rtp:// part if it is present -    const string endpoint = (name.substr(0, 10) == "sti-rtp://") ? +    // Skip the rtp:// part if it is present +    const string endpoint = (name.substr(0, 10) == "rtp://") ?          name.substr(10) : name;      // The endpoint should be address:port @@ -176,8 +160,8 @@ int Sti_d_Rtp::open(const std::string& name)      if (colon_pos == string::npos) {          stringstream ss;          ss << "'" << name << -                " is an invalid format for sti-rtp address: " -                "expected [sti-rtp://]address:port"; +                " is an invalid format for rtp address: " +                "expected [rtp://]address:port";          throw invalid_argument(ss.str());      } @@ -190,29 +174,22 @@ int Sti_d_Rtp::open(const std::string& name)  void Sti_d_Rtp::receive_packet()  { -    UdpPacket packet(32768); -    int ret = m_sock.receive(packet); - -    if (ret == -1) { -        stringstream ss; -        ss << "Could not read from UDP socket: " << inetErrMsg; -        throw runtime_error(ss.str()); -    } +    auto packet = m_sock.receive(32768); -    if (packet.getSize() == 0) { +    if (packet.buffer.empty()) {          // No packet was received          return;      }      const size_t STI_FC_LEN = 8; -    if (packet.getSize() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) { +    if (packet.buffer.size() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) {          etiLog.level(info) << "Received too small RTP packet for " <<              m_name;          return;      } -    if (not rtpHeaderValid(packet.getData())) { +    if (not rtpHeaderValid(packet.buffer.data())) {          etiLog.level(info) << "Received invalid RTP header for " <<              m_name;          return; @@ -220,7 +197,7 @@ void Sti_d_Rtp::receive_packet()      //  STI(PI, X)      size_t index = RTP_HEADER_LEN; -    const uint8_t *buf = packet.getData(); +    const uint8_t *buf = packet.buffer.data();      //   SYNC      index++; // Advance over STAT @@ -242,7 +219,7 @@ void Sti_d_Rtp::receive_packet()              m_name;          return;      } -    if (packet.getSize() < index + DFS) { +    if (packet.buffer.size() < index + DFS) {          etiLog.level(info) << "Received STI too small for given DFS for " <<              m_name;          return; @@ -270,9 +247,9 @@ void Sti_d_Rtp::receive_packet()      uint16_t NST   = unpack2(buf+index) & 0x7FF; // 11 bits      index += 2; -    if (packet.getSize() < index + 4*NST) { +    if (packet.buffer.size() < index + 4*NST) {          etiLog.level(info) << "Received STI too small to contain NST for " << -            m_name << " packet: " << packet.getSize() << " need " << +            m_name << " packet: " << packet.buffer.size() << " need " <<              index + 4*NST;          return;      } diff --git a/src/input/Udp.h b/src/input/Udp.h index dc01486..dd637c6 100644 --- a/src/input/Udp.h +++ b/src/input/Udp.h @@ -31,7 +31,7 @@  #include <deque>  #include <boost/thread.hpp>  #include "input/inputs.h" -#include "UdpSocket.h" +#include "Socket.h"  namespace Inputs { @@ -46,7 +46,7 @@ class Udp : public InputBase {          virtual int close();      protected: -        UdpSocket m_sock; +        Socket::UDPSocket m_sock;          std::string m_name;          void openUdpSocket(const std::string& endpoint); | 
