diff options
| -rw-r--r-- | Makefile.am | 37 | ||||
| -rw-r--r-- | doc/advanced.mux | 17 | ||||
| -rw-r--r-- | doc/example.mux | 3 | ||||
| -rw-r--r-- | lib/ReedSolomon.cpp (renamed from src/ReedSolomon.cpp) | 0 | ||||
| -rw-r--r-- | lib/ReedSolomon.h (renamed from src/ReedSolomon.h) | 0 | ||||
| -rw-r--r-- | lib/Socket.cpp | 817 | ||||
| -rw-r--r-- | lib/Socket.h | 268 | ||||
| -rw-r--r-- | lib/ThreadsafeQueue.h (renamed from src/ThreadsafeQueue.h) | 14 | ||||
| -rw-r--r-- | src/ConfigParser.cpp | 61 | ||||
| -rw-r--r-- | src/DabMultiplexer.h | 3 | ||||
| -rw-r--r-- | src/DabMux.cpp | 5 | ||||
| -rw-r--r-- | src/InetAddress.cpp | 155 | ||||
| -rw-r--r-- | src/InetAddress.h | 78 | ||||
| -rw-r--r-- | src/TcpSocket.cpp | 359 | ||||
| -rw-r--r-- | src/TcpSocket.h | 164 | ||||
| -rw-r--r-- | src/UdpSocket.cpp | 256 | ||||
| -rw-r--r-- | src/UdpSocket.h | 174 | ||||
| -rw-r--r-- | src/dabOutput/dabOutput.h | 21 | ||||
| -rw-r--r-- | src/dabOutput/dabOutputTcp.cpp | 2 | ||||
| -rw-r--r-- | src/dabOutput/dabOutputUdp.cpp | 65 | ||||
| -rw-r--r-- | src/dabOutput/edi/Config.h | 9 | ||||
| -rw-r--r-- | src/dabOutput/edi/PFT.cpp | 2 | ||||
| -rw-r--r-- | src/dabOutput/edi/Transport.cpp | 67 | ||||
| -rw-r--r-- | src/dabOutput/edi/Transport.h | 8 | ||||
| -rw-r--r-- | src/input/Udp.cpp | 59 | ||||
| -rw-r--r-- | src/input/Udp.h | 4 | 
26 files changed, 1267 insertions, 1381 deletions
diff --git a/Makefile.am b/Makefile.am index 2f06879..80d24e0 100644 --- a/Makefile.am +++ b/Makefile.am @@ -69,6 +69,8 @@ odr_dabmux_SOURCES  =src/DabMux.cpp \  					 src/input/File.h \  					 src/input/Udp.cpp \  					 src/input/Udp.h \ +					 src/input/Edi.cpp \ +					 src/input/Edi.h \  					 src/dabOutput/dabOutput.h \  					 src/dabOutput/dabOutputFile.cpp \  					 src/dabOutput/dabOutputFifo.cpp \ @@ -98,8 +100,6 @@ odr_dabmux_SOURCES  =src/DabMux.cpp \  					 src/ConfigParser.h \  					 src/Eti.h \  					 src/Eti.cpp \ -					 src/InetAddress.h \ -					 src/InetAddress.cpp \  					 src/Interleaver.h \  					 src/Interleaver.cpp \  					 src/Log.h \ @@ -109,15 +109,8 @@ odr_dabmux_SOURCES  =src/DabMux.cpp \  					 src/MuxElements.cpp \  					 src/MuxElements.h \  					 src/PcDebug.h \ -					 src/ReedSolomon.h \ -					 src/ReedSolomon.cpp \  					 src/RemoteControl.cpp \  					 src/RemoteControl.h \ -					 src/TcpSocket.h \ -					 src/TcpSocket.cpp \ -					 src/UdpSocket.h \ -					 src/UdpSocket.cpp \ -					 src/ThreadsafeQueue.h \  					 src/crc.h \  					 src/crc.c \  					 src/fig/FIG.h \ @@ -167,6 +160,19 @@ odr_dabmux_SOURCES  =src/DabMux.cpp \  					 src/PrbsGenerator.h \  					 src/utils.cpp \  					 src/utils.h \ +					 lib/edi/STIDecoder.cpp \ +					 lib/edi/STIDecoder.h \ +					 lib/edi/STIWriter.cpp \ +					 lib/edi/STIWriter.h \ +					 lib/edi/PFT.cpp \ +					 lib/edi/PFT.h \ +					 lib/edi/common.cpp \ +					 lib/edi/common.h \ +					 lib/ReedSolomon.h \ +					 lib/ReedSolomon.cpp \ +					 lib/Socket.h \ +					 lib/Socket.cpp \ +					 lib/ThreadsafeQueue.h \  					 lib/zmq.hpp \  					 $(lib_fec_sources) \  					 $(lib_charset_sources) @@ -205,18 +211,15 @@ odr_zmq2edi_SOURCES  = src/zmq2edi/zmq2edi.cpp \  					   src/dabOutput/edi/TagPacket.h \  					   src/dabOutput/edi/Transport.cpp \  					   src/dabOutput/edi/Transport.h \ -					   src/InetAddress.h \ -					   src/InetAddress.cpp \ -					   src/TcpSocket.h \ -					   src/TcpSocket.cpp \ -					   src/UdpSocket.h \ -					   src/UdpSocket.cpp \ -					   src/ReedSolomon.h \ -					   src/ReedSolomon.cpp \  					   src/Log.h \  					   src/Log.cpp \  					   src/crc.h \  					   src/crc.c \ +					   lib/ReedSolomon.h \ +					   lib/ReedSolomon.cpp \ +					   lib/Socket.h \ +					   lib/Socket.cpp \ +					   lib/ThreadsafeQueue.h \  					   lib/zmq.hpp \  					   $(lib_fec_sources) diff --git a/doc/advanced.mux b/doc/advanced.mux index fb67b82..b9cec05 100644 --- a/doc/advanced.mux +++ b/doc/advanced.mux @@ -163,7 +163,8 @@ subchannels {      sub-fu {          type audio          ; example file input -        inputfile "funk.mp2" +        inputproto zmq +        inputuri "funk.mp2"          nonblock false          bitrate 128          id 10 @@ -188,7 +189,8 @@ subchannels {          ; Receive STI-D(LI) carried in STI(PI, X) inside RTP using UDP.          ; This is intended to be compatible with AVT audio encoders.          ; EXPERIMENTAL! -        inputfile "sti-rtp://127.0.0.1:32010" +        inputproto sti +        inputuri "rtp://127.0.0.1:32010"          bitrate 96          id 3          protection 3 @@ -196,11 +198,12 @@ subchannels {      sub-ri {          type dabplus          ; example file input -        ;inputfile "rick.dabp" +        ;inputuri "rick.dabp"          ; example zmq input:          ; Accepts connections to port 9000 from any interface.          ; Use ODR-AudioEnc as encoder -        inputfile "tcp://*:9000" +        inputproto zmq +        inputuri "tcp://*:9000"          bitrate 96          id 1          protection 1 @@ -256,7 +259,8 @@ subchannels {          ; for audio types, you can use the ZeroMQ input (if compiled in)          ; with the following configuration in combination with          ; Toolame-DAB -        inputfile "tcp://*:9001" +        inputproto zmq +        inputuri "tcp://*:9001"          bitrate 96          id 1          protection 1 @@ -273,7 +277,8 @@ subchannels {         type data         ; Use the default PRBS polynomial. -       inputfile "prbs://" +       inputproto prbs +       inputuri "prbs://"         ; To use another polynomial, set it in the url as hexadecimal         ; The default polynomial is G(x) = x^20 + x^17 + 1, represented as diff --git a/doc/example.mux b/doc/example.mux index 6c2bc18..31e072d 100644 --- a/doc/example.mux +++ b/doc/example.mux @@ -171,7 +171,8 @@ subchannels {          type dabplus          ; Accepts connections to port 9000 from any interface.          ; Use ODR-AudioEnc as encoder -        inputfile "tcp://*:9000" +        inputproto zmq +        inputuri "tcp://*:9000"          bitrate 96          id 1          protection 3 diff --git a/src/ReedSolomon.cpp b/lib/ReedSolomon.cpp index 38d8ea8..38d8ea8 100644 --- a/src/ReedSolomon.cpp +++ b/lib/ReedSolomon.cpp diff --git a/src/ReedSolomon.h b/lib/ReedSolomon.h index abcef62..abcef62 100644 --- a/src/ReedSolomon.h +++ b/lib/ReedSolomon.h diff --git a/lib/Socket.cpp b/lib/Socket.cpp new file mode 100644 index 0000000..9b404eb --- /dev/null +++ b/lib/Socket.cpp @@ -0,0 +1,817 @@ +/* +   Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the +   Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org +   */ +/* +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published by +   the Free Software Foundation, either version 3 of the License, or +   (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <https://www.gnu.org/licenses/>. +*/ + +#include "Socket.h" + +#include <iostream> +#include <cstdio> +#include <cstring> +#include <cerrno> +#include <fcntl.h> +#include <poll.h> + +namespace Socket { + +using namespace std; + +void InetAddress::resolveUdpDestination(const std::string& destination, int port) +{ +    char service[NI_MAXSERV]; +    snprintf(service, NI_MAXSERV-1, "%d", port); + +    struct addrinfo hints; +    memset(&hints, 0, sizeof(struct addrinfo)); +    hints.ai_family = AF_INET; +    hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ +    hints.ai_flags = 0; +    hints.ai_protocol = 0; + +    struct addrinfo *result, *rp; +    int s = getaddrinfo(destination.c_str(), service, &hints, &result); +    if (s != 0) { +        throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s)); +    } + +    for (rp = result; rp != nullptr; rp = rp->ai_next) { +        // Take the first result +        memcpy(&addr, rp->ai_addr, rp->ai_addrlen); +        break; +    } + +    freeaddrinfo(result); + +    if (rp == nullptr) { +        throw runtime_error("Could not resolve"); +    } +} + +UDPPacket::UDPPacket() { } + +UDPPacket::UDPPacket(size_t initSize) : +    buffer(initSize) +{ } + + +UDPSocket::UDPSocket() : +    m_sock(INVALID_SOCKET) +{ +    reinit(0, ""); +} + +UDPSocket::UDPSocket(int port) : +    m_sock(INVALID_SOCKET) +{ +    reinit(port, ""); +} + +UDPSocket::UDPSocket(int port, const std::string& name) : +    m_sock(INVALID_SOCKET) +{ +    reinit(port, name); +} + + +void UDPSocket::setBlocking(bool block) +{ +    int res = fcntl(m_sock, F_SETFL, block ? 0 : O_NONBLOCK); +    if (res == -1) { +        throw runtime_error(string("Can't change blocking state of socket: ") + strerror(errno)); +    } +} + +void UDPSocket::reinit(int port) +{ +    return reinit(port, ""); +} + +void UDPSocket::reinit(int port, const std::string& name) +{ +    if (m_sock != INVALID_SOCKET) { +        ::close(m_sock); +    } + +    if (port == 0) { +        // No need to bind to a given port, creating the +        // socket is enough +        m_sock = ::socket(AF_INET, SOCK_DGRAM, 0); +        return; +    } + +    char service[NI_MAXSERV]; +    snprintf(service, NI_MAXSERV-1, "%d", port); + +    struct addrinfo hints; +    memset(&hints, 0, sizeof(struct addrinfo)); +    hints.ai_family = AF_INET; +    hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ +    hints.ai_flags = AI_PASSIVE;    /* For wildcard IP address */ +    hints.ai_protocol = 0;          /* Any protocol */ +    hints.ai_canonname = nullptr; +    hints.ai_addr = nullptr; +    hints.ai_next = nullptr; + +    struct addrinfo *result, *rp; +    int s = getaddrinfo(name.empty() ? nullptr : name.c_str(), +            port == 0 ? nullptr : service, +            &hints, &result); +    if (s != 0) { +        throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s)); +    } + +    /* getaddrinfo() returns a list of address structures. +       Try each address until we successfully bind(2). +       If socket(2) (or bind(2)) fails, we (close the socket +       and) try the next address. */ +    for (rp = result; rp != nullptr; rp = rp->ai_next) { +        int sfd = ::socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); +        if (sfd == -1) { +            continue; +        } + +        if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { +            m_sock = sfd; +            break; +        } + +        ::close(sfd); +    } + +    freeaddrinfo(result); + +    if (rp == nullptr) { +        throw runtime_error("Could not bind"); +    } +} + +void UDPSocket::close() +{ +    if (m_sock != INVALID_SOCKET) { +        ::close(m_sock); +    } + +    m_sock = INVALID_SOCKET; +} + +UDPSocket::~UDPSocket() +{ +    if (m_sock != INVALID_SOCKET) { +        ::close(m_sock); +    } +} + + +UDPPacket UDPSocket::receive(size_t max_size) +{ +    UDPPacket packet(max_size); +    socklen_t addrSize; +    addrSize = sizeof(*packet.address.as_sockaddr()); +    ssize_t ret = recvfrom(m_sock, +            packet.buffer.data(), +            packet.buffer.size(), +            0, +            packet.address.as_sockaddr(), +            &addrSize); + +    if (ret == SOCKET_ERROR) { +        packet.buffer.resize(0); + +        // This suppresses the -Wlogical-op warning +#if EAGAIN == EWOULDBLOCK +        if (errno == EAGAIN) { +#else +        if (errno == EAGAIN or errno == EWOULDBLOCK) { +#endif +            return 0; +        } +        throw runtime_error(string("Can't receive data: ") + strerror(errno)); +    } + +    packet.buffer.resize(ret); +    return packet; +} + +void UDPSocket::send(UDPPacket& packet) +{ +    const int ret = sendto(m_sock, packet.buffer.data(), packet.buffer.size(), 0, +            packet.address.as_sockaddr(), sizeof(*packet.address.as_sockaddr())); +    if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { +        throw runtime_error(string("Can't send UDP packet: ") + strerror(errno)); +    } +} + + +void UDPSocket::send(const std::vector<uint8_t>& data, InetAddress destination) +{ +    const int ret = sendto(m_sock, data.data(), data.size(), 0, +            destination.as_sockaddr(), sizeof(*destination.as_sockaddr())); +    if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { +        throw runtime_error(string("Can't send UDP packet: ") + strerror(errno)); +    } +} + +void UDPSocket::joinGroup(const char* groupname, const char* if_addr) +{ +    ip_mreqn group; +    if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { +        throw runtime_error("Cannot convert multicast group name"); +    } +    if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { +        throw runtime_error("Group name is not a multicast address"); +    } + +    if (if_addr) { +        group.imr_address.s_addr = inet_addr(if_addr); +    } +    else { +        group.imr_address.s_addr = htons(INADDR_ANY); +    } +    group.imr_ifindex = 0; +    if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) +            == SOCKET_ERROR) { +        throw runtime_error(string("Can't join multicast group") + strerror(errno)); +    } +} + +void UDPSocket::setMulticastSource(const char* source_addr) +{ +    struct in_addr addr; +    if (inet_aton(source_addr, &addr) == 0) { +        throw runtime_error(string("Can't parse source address") + strerror(errno)); +    } + +    if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) +            == SOCKET_ERROR) { +        throw runtime_error(string("Can't set source address") + strerror(errno)); +    } +} + +void UDPSocket::setMulticastTTL(int ttl) +{ +    if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) +            == SOCKET_ERROR) { +        throw runtime_error(string("Can't set multicast ttl") + strerror(errno)); +    } +} + +UDPReceiver::~UDPReceiver() { +    m_stop = true; +    m_sock.close(); +    if (m_thread.joinable()) { +        m_thread.join(); +    } +} + +void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) { +    m_port = port; +    m_bindto = bindto; +    m_mcastaddr = mcastaddr; +    m_max_packets_queued = max_packets_queued; +    m_thread = std::thread(&UDPReceiver::m_run, this); +} + +std::vector<uint8_t> UDPReceiver::get_packet_buffer() +{ +    if (m_stop) { +        throw runtime_error("UDP Receiver not running"); +    } + +    UDPPacket p; +    m_packets.wait_and_pop(p); + +    return p.buffer; +} + +void UDPReceiver::m_run() +{ +    // Ensure that stop is set to true in case of exception or return +    struct SetStopOnDestruct { +        SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {} +        ~SetStopOnDestruct() { m_stop = true; } +        private: atomic<bool>& m_stop; +    } autoSetStop(m_stop); + +    if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) { +        m_sock.reinit(m_port, m_mcastaddr); +        m_sock.setMulticastSource(m_bindto.c_str()); +        m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str()); +    } +    else { +        m_sock.reinit(m_port, m_bindto); +    } + +    while (not m_stop) { +        constexpr size_t packsize = 8192; +        try { +            auto packet = m_sock.receive(packsize); +            if (packet.buffer.size() == packsize) { +                // TODO replace fprintf +                fprintf(stderr, "Warning, possible UDP truncation\n"); +            } + +            // If this blocks, the UDP socket will lose incoming packets +            m_packets.push_wait_if_full(packet, m_max_packets_queued); +        } +        catch (const std::runtime_error& e) { +            // TODO replace fprintf +            // TODO handle intr +            fprintf(stderr, "Socket error: %s\n", e.what()); +            m_stop = true; +        } +    } +} + + +TCPSocket::TCPSocket() +{ +} + +TCPSocket::~TCPSocket() +{ +    if (m_sock != -1) { +        ::close(m_sock); +    } +} + +TCPSocket::TCPSocket(TCPSocket&& other) : +    m_sock(other.m_sock), +    m_remote_address(other.m_remote_address) +{ +    if (other.m_sock != -1) { +        other.m_sock = -1; +    } +} + +TCPSocket& TCPSocket::operator=(TCPSocket&& other) +{ +    m_sock = other.m_sock; +    m_remote_address = other.m_remote_address; + +    if (other.m_sock != -1) { +        other.m_sock = -1; +    } + +    return *this; +} + +bool TCPSocket::valid() const +{ +    return m_sock != -1; +} + +void TCPSocket::connect(const std::string& hostname, int port) +{ +    if (m_sock != INVALID_SOCKET) { +        throw std::logic_error("You may only connect an invalid TCPSocket"); +    } + +    char service[NI_MAXSERV]; +    snprintf(service, NI_MAXSERV-1, "%d", port); + +    /* Obtain address(es) matching host/port */ +    struct addrinfo hints; +    memset(&hints, 0, sizeof(struct addrinfo)); +    hints.ai_family = AF_INET; +    hints.ai_socktype = SOCK_STREAM; +    hints.ai_flags = 0; +    hints.ai_protocol = 0; + +    struct addrinfo *result, *rp; +    int s = getaddrinfo(hostname.c_str(), service, &hints, &result); +    if (s != 0) { +        throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s)); +    } + +    /* getaddrinfo() returns a list of address structures. +       Try each address until we successfully connect(2). +       If socket(2) (or connect(2)) fails, we (close the socket +       and) try the next address. */ + +    for (rp = result; rp != nullptr; rp = rp->ai_next) { +        int sfd = ::socket(rp->ai_family, rp->ai_socktype, +                rp->ai_protocol); +        if (sfd == -1) +            continue; + +        int ret = ::connect(sfd, rp->ai_addr, rp->ai_addrlen); +        if (ret != -1 or (ret == -1 and errno == EINPROGRESS)) { +            // As the TCPClient could set the socket to nonblocking, we +            // must handle EINPROGRESS here +            m_sock = sfd; +            break; +        } + +        ::close(sfd); +    } + +    if (m_sock != INVALID_SOCKET) { +#if defined(HAVE_SO_NOSIGPIPE) +        int val = 1; +        if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)) +                == SOCKET_ERROR) { +            throw std::runtime_error("Can't set SO_NOSIGPIPE"); +        } +#endif +    } + +    freeaddrinfo(result);           /* No longer needed */ + +    if (rp == nullptr) { +        throw runtime_error("Could not connect"); +    } + +} + +void TCPSocket::listen(int port, const string& name) +{ +    if (m_sock != INVALID_SOCKET) { +        throw std::logic_error("You may only listen with an invalid TCPSocket"); +    } + +    char service[NI_MAXSERV]; +    snprintf(service, NI_MAXSERV-1, "%d", port); + +    struct addrinfo hints; +    memset(&hints, 0, sizeof(struct addrinfo)); +    hints.ai_family = AF_INET; +    hints.ai_socktype = SOCK_STREAM; +    hints.ai_flags = AI_PASSIVE;    /* For wildcard IP address */ +    hints.ai_protocol = 0; +    hints.ai_canonname = nullptr; +    hints.ai_addr = nullptr; +    hints.ai_next = nullptr; + +    struct addrinfo *result, *rp; +    int s = getaddrinfo(name.empty() ? nullptr : name.c_str(), service, &hints, &result); +    if (s != 0) { +        throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s)); +    } + +    /* getaddrinfo() returns a list of address structures. +       Try each address until we successfully bind(2). +       If socket(2) (or bind(2)) fails, we (close the socket +       and) try the next address. */ +    for (rp = result; rp != nullptr; rp = rp->ai_next) { +        int sfd = ::socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); +        if (sfd == -1) { +            continue; +        } + +        if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { +            m_sock = sfd; +            break; +        } + +        ::close(sfd); +    } + +    freeaddrinfo(result); + +#if defined(HAVE_SO_NOSIGPIPE) +    int val = 1; +    if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, +                &val, sizeof(val)) < 0) { +        throw std::runtime_error("Can't set SO_NOSIGPIPE"); +    } +#endif + +    if (rp == nullptr) { +        throw runtime_error("Could not bind"); +    } +} + +void TCPSocket::close() +{ +    ::close(m_sock); +    m_sock = -1; +} + +TCPSocket TCPSocket::accept(int timeout_ms) +{ +    if (timeout_ms == 0) { +        InetAddress remote_addr; +        socklen_t client_len = sizeof(remote_addr.addr); +        int sockfd = ::accept(m_sock, remote_addr.as_sockaddr(), &client_len); +        TCPSocket s(sockfd, remote_addr); +        return s; +    } +    else { +        struct pollfd fds[1]; +        fds[0].fd = m_sock; +        fds[0].events = POLLIN; + +        int retval = poll(fds, 1, timeout_ms); + +        if (retval == -1) { +            std::string errstr(strerror(errno)); +            throw std::runtime_error("TCP Socket accept error: " + errstr); +        } +        else if (retval > 0) { +            InetAddress remote_addr; +            socklen_t client_len = sizeof(remote_addr.addr); +            int sockfd = ::accept(m_sock, remote_addr.as_sockaddr(), &client_len); +            TCPSocket s(sockfd, remote_addr); +            return s; +        } +        else { +            TCPSocket s(-1); +            return s; +        } +    } +} + +ssize_t TCPSocket::sendall(const void *buffer, size_t buflen) +{ +    uint8_t *buf = (uint8_t*)buffer; +    while (buflen > 0) { +        /* On Linux, the MSG_NOSIGNAL flag ensures that the process +         * would not receive a SIGPIPE and die. +         * Other systems have SO_NOSIGPIPE set on the socket for the +         * same effect. */ +#if defined(HAVE_MSG_NOSIGNAL) +        const int flags = MSG_NOSIGNAL; +#else +        const int flags = 0; +#endif +        ssize_t sent = ::send(m_sock, buf, buflen, flags); +        if (sent < 0) { +            return -1; +        } +        else { +            buf += sent; +            buflen -= sent; +        } +    } +    return buflen; +} + +ssize_t TCPSocket::send(const void* data, size_t size, int timeout_ms) +{ +    if (timeout_ms) { +        struct pollfd fds[1]; +        fds[0].fd = m_sock; +        fds[0].events = POLLOUT; + +        const int retval = poll(fds, 1, timeout_ms); + +        if (retval == -1) { +            throw std::runtime_error(string("TCP Socket send error on poll(): ") + strerror(errno)); +        } +        else if (retval == 0) { +            // Timed out +            return 0; +        } +    } + +    /* On Linux, the MSG_NOSIGNAL flag ensures that the process would not +     * receive a SIGPIPE and die. +     * Other systems have SO_NOSIGPIPE set on the socket for the same effect. */ +#if defined(HAVE_MSG_NOSIGNAL) +    const int flags = MSG_NOSIGNAL; +#else +    const int flags = 0; +#endif +    const ssize_t ret = ::send(m_sock, (const char*)data, size, flags); + +    if (ret == SOCKET_ERROR) { +            throw std::runtime_error(string("TCP Socket send error: ") + strerror(errno)); +    } +    return ret; +} + +ssize_t TCPSocket::recv(void *buffer, size_t length, int flags) +{ +    ssize_t ret = ::recv(m_sock, buffer, length, flags); +    if (ret == -1) { +        std::string errstr(strerror(errno)); +        throw std::runtime_error("TCP receive error: " + errstr); +    } +    return ret; +} + +ssize_t TCPSocket::recv(void *buffer, size_t length, int flags, int timeout_ms) +{ +    struct pollfd fds[1]; +    fds[0].fd = m_sock; +    fds[0].events = POLLIN; + +    int retval = poll(fds, 1, timeout_ms); + +    if (retval == -1 and errno == EINTR) { +        throw Interrupted(); +    } +    else if (retval == -1) { +        std::string errstr(strerror(errno)); +        throw std::runtime_error("TCP receive with poll() error: " + errstr); +    } +    else if (retval > 0 and (fds[0].revents | POLLIN)) { +        ssize_t ret = ::recv(m_sock, buffer, length, flags); +        if (ret == -1) { +            if (errno == ECONNREFUSED) { +                return 0; +            } +            std::string errstr(strerror(errno)); +            throw std::runtime_error("TCP receive after poll() error: " + errstr); +        } +        return ret; +    } +    else { +        throw Timeout(); +    } +} + +TCPSocket::TCPSocket(int sockfd) : +    m_sock(sockfd), +    m_remote_address() +{ } + +TCPSocket::TCPSocket(int sockfd, InetAddress remote_address) : +    m_sock(sockfd), +    m_remote_address(remote_address) +{ } + +void TCPClient::connect(const std::string& hostname, int port) +{ +    m_hostname = hostname; +    m_port = port; +    reconnect(); +} + +ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms) +{ +    try { +        ssize_t ret = m_sock.recv(buffer, length, flags, timeout_ms); + +        if (ret == 0) { +            m_sock.close(); + +            TCPSocket newsock; +            m_sock = std::move(newsock); +            reconnect(); +        } + +        return ret; +    } +    catch (const TCPSocket::Interrupted&) { +        return -1; +    } +    catch (const TCPSocket::Timeout&) { +        return 0; +    } + +    return 0; +} + +void TCPClient::reconnect() +{ +    int flags = fcntl(m_sock.m_sock, F_GETFL); +    if (fcntl(m_sock.m_sock, F_SETFL, flags | O_NONBLOCK) == -1) { +        std::string errstr(strerror(errno)); +        throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr); +    } + +    m_sock.connect(m_hostname, m_port); +} + +TCPConnection::TCPConnection(TCPSocket&& sock) : +            queue(), +            m_running(true), +            m_sender_thread(), +            m_sock(move(sock)) +{ +#if MISSING_OWN_ADDR +    auto own_addr = m_sock.getOwnAddress(); +    auto addr = m_sock.getRemoteAddress(); +    etiLog.level(debug) << "New TCP Connection on port " << +        own_addr.getPort() << " from " << +        addr.getHostAddress() << ":" << addr.getPort(); +#endif +    m_sender_thread = std::thread(&TCPConnection::process, this); +} + +TCPConnection::~TCPConnection() +{ +    m_running = false; +    vector<uint8_t> termination_marker; +    queue.push(termination_marker); +    m_sender_thread.join(); +} + +void TCPConnection::process() +{ +    while (m_running) { +        vector<uint8_t> data; +        queue.wait_and_pop(data); + +        if (data.empty()) { +            // empty vector is the termination marker +            m_running = false; +            break; +        } + +        try { +            ssize_t remaining = data.size(); +            const uint8_t *buf = reinterpret_cast<const uint8_t*>(data.data()); +            const int timeout_ms = 10; // Less than one ETI frame + +            while (m_running and remaining > 0) { +                const ssize_t sent = m_sock.send(buf, remaining, timeout_ms); +                if (sent < 0 or sent > remaining) { +                    throw std::logic_error("Invalid TCPSocket::send() return value"); +                } +                remaining -= sent; +                buf += sent; +            } +        } +        catch (const std::runtime_error& e) { +            m_running = false; +        } +    } + +#if MISSING_OWN_ADDR +    auto own_addr = m_sock.getOwnAddress(); +    auto addr = m_sock.getRemoteAddress(); +    etiLog.level(debug) << "Dropping TCP Connection on port " << +        own_addr.getPort() << " from " << +        addr.getHostAddress() << ":" << addr.getPort(); +#endif +} + + +TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) : +    m_max_queue_size(max_queue_size) +{ +} + +TCPDataDispatcher::~TCPDataDispatcher() +{ +    m_running = false; +    m_connections.clear(); +    m_listener_socket.close(); +    if (m_listener_thread.joinable()) { +        m_listener_thread.join(); +    } +} + +void TCPDataDispatcher::start(int port, const string& address) +{ +    m_listener_socket.listen(port, address); + +    m_running = true; +    m_listener_thread = std::thread(&TCPDataDispatcher::process, this); +} + +void TCPDataDispatcher::write(const vector<uint8_t>& data) +{ +    if (not m_running) { +        throw runtime_error(m_exception_data); +    } + +    for (auto& connection : m_connections) { +        connection.queue.push(data); +    } + +    m_connections.remove_if( +            [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; }); +} + +void TCPDataDispatcher::process() +{ +    try { +        const int timeout_ms = 1000; + +        while (m_running) { +            // Add a new TCPConnection to the list, constructing it from the client socket +            auto sock = m_listener_socket.accept(timeout_ms); +            if (sock.valid()) { +                m_connections.emplace(m_connections.begin(), move(sock)); +            } +        } +    } +    catch (const std::runtime_error& e) { +        m_exception_data = string("TCPDataDispatcher error: ") + e.what(); +        m_running = false; +    } +} + +} diff --git a/lib/Socket.h b/lib/Socket.h new file mode 100644 index 0000000..2393584 --- /dev/null +++ b/lib/Socket.h @@ -0,0 +1,268 @@ +/* +   Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the +   Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org +   */ +/* +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published by +   the Free Software Foundation, either version 3 of the License, or +   (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <https://www.gnu.org/licenses/>. +*/ + +#pragma once + +#ifdef HAVE_CONFIG_H +#   include "config.h" +#endif + +#include "ThreadsafeQueue.h" +#include <cstdlib> +#include <iostream> +#include <vector> +#include <atomic> +#include <thread> +#include <list> + +#include <sys/socket.h> +#include <netinet/in.h> +#include <unistd.h> +#include <netdb.h> +#include <arpa/inet.h> +#include <pthread.h> +#define SOCKET           int +#define INVALID_SOCKET   -1 +#define SOCKET_ERROR     -1 + + +namespace Socket { + +struct InetAddress { +    struct sockaddr_storage addr; + +    struct sockaddr *as_sockaddr() { return reinterpret_cast<sockaddr*>(&addr); }; + +    void resolveUdpDestination(const std::string& destination, int port); +}; + +/** This class represents a UDP packet. + * + *  A UDP packet contains a payload (sequence of bytes) and an address. For + *  outgoing packets, the address is the destination address. For incoming + *  packets, the address tells the user from what source the packet arrived from. + */ +class UDPPacket +{ +    public: +        UDPPacket(); +        UDPPacket(size_t initSize); + +        std::vector<uint8_t> buffer; +        InetAddress address; +}; + +/** + *  This class represents a socket for sending and receiving UDP packets. + * + *  A UDP socket is the sending or receiving point for a packet delivery service. + *  Each packet sent or received on a datagram socket is individually + *  addressed and routed. Multiple packets sent from one machine to another may + *  be routed differently, and may arrive in any order. + */ +class UDPSocket +{ +    public: +        /** Create a new socket that will not be bound to any port. To be used +         * for data output. +         */ +        UDPSocket(); +        /** Create a new socket. +         *  @param port The port number on which the socket will be bound +         */ +        UDPSocket(int port); +        /** Create a new socket. +         *  @param port The port number on which the socket will be bound +         *  @param name The IP address on which the socket will be bound. +         *              It is used to bind the socket on a specific interface if +         *              the computer have many NICs. +         */ +        UDPSocket(int port, const std::string& name); +        ~UDPSocket(); +        UDPSocket(const UDPSocket& other) = delete; +        const UDPSocket& operator=(const UDPSocket& other) = delete; + +        /** Close the already open socket, and create a new one. Throws a runtime_error on error.  */ +        void reinit(int port); +        void reinit(int port, const std::string& name); + +        void close(void); +        void send(UDPPacket& packet); +        void send(const std::vector<uint8_t>& data, InetAddress destination); +        UDPPacket receive(size_t max_size); +        void joinGroup(const char* groupname, const char* if_addr = nullptr); +        void setMulticastSource(const char* source_addr); +        void setMulticastTTL(int ttl); + +        /** Set blocking mode. By default, the socket is blocking. +         * throws a runtime_error on error. +         */ +        void setBlocking(bool block); + +    protected: +        SOCKET m_sock; +}; + +/* Threaded UDP receiver */ +class UDPReceiver { +    public: +        UDPReceiver() : m_port(0), m_thread(), m_stop(false), m_packets() {} +        ~UDPReceiver(); +        UDPReceiver(const UDPReceiver&) = delete; +        UDPReceiver operator=(const UDPReceiver&) = delete; + +        // Start the receiver in a separate thread +        void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued); + +        // Get the data contained in a UDP packet, blocks if none available +        // In case of error, throws a runtime_error +        std::vector<uint8_t> get_packet_buffer(void); + +    private: +        void m_run(void); + +        int m_port; +        std::string m_bindto; +        std::string m_mcastaddr; +        size_t m_max_packets_queued; +        std::thread m_thread; +        std::atomic<bool> m_stop; +        ThreadsafeQueue<UDPPacket> m_packets; +        UDPSocket m_sock; +}; + +class TCPSocket { +    public: +        TCPSocket(); +        ~TCPSocket(); +        TCPSocket(const TCPSocket& other) = delete; +        TCPSocket& operator=(const TCPSocket& other) = delete; +        TCPSocket(TCPSocket&& other); +        TCPSocket& operator=(TCPSocket&& other); + +        bool valid(void) const; +        void connect(const std::string& hostname, int port); +        void listen(int port, const std::string& name); +        void close(void); + +        /* throws a runtime_error on failure, an invalid socket on timeout */ +        TCPSocket accept(int timeout_ms); + +        /* returns -1 on error, doesn't work on nonblocking sockets */ +        ssize_t sendall(const void *buffer, size_t buflen); + +        /** Send data over the TCP connection. +         *  @param data The buffer that will be sent. +         *  @param size Number of bytes to send. +         *  @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout +         *  return number of bytes sent, 0 on timeout, or throws runtime_error. +         */ +        ssize_t send(const void* data, size_t size, int timeout_ms=0); + +        /* Returns number of bytes read, 0 on disconnect. Throws a +         * runtime_error on error */ +        ssize_t recv(void *buffer, size_t length, int flags); + +        class Timeout {}; +        class Interrupted {}; +        /* Returns number of bytes read, 0 on disconnect or refused connection. +         * Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error +         * on error +         */ +        ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms); + +    private: +        explicit TCPSocket(int sockfd); +        explicit TCPSocket(int sockfd, InetAddress remote_address); +        SOCKET m_sock = -1; + +        InetAddress m_remote_address; + +        friend class TCPClient; +}; + +/* Implements a TCP receiver that auto-reconnects on errors */ +class TCPClient { +    public: +        void connect(const std::string& hostname, int port); + +        /* Returns numer of bytes read, 0 on auto-reconnect, -1 +         * on interruption. +         * Throws a runtime_error on error */ +        ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms); + +    private: +        void reconnect(void); +        TCPSocket m_sock; +        std::string m_hostname; +        int m_port; +}; + +/* Helper class for TCPDataDispatcher, contains a queue of pending data and + * a sender thread. */ +class TCPConnection +{ +    public: +        TCPConnection(TCPSocket&& sock); +        TCPConnection(const TCPConnection&) = delete; +        TCPConnection& operator=(const TCPConnection&) = delete; +        ~TCPConnection(); + +        ThreadsafeQueue<std::vector<uint8_t> > queue; + +    private: +        std::atomic<bool> m_running; +        std::thread m_sender_thread; +        TCPSocket m_sock; + +        void process(void); +}; + +/* Send a TCP stream to several destinations, and automatically disconnect destinations + * whose buffer overflows. + */ +class TCPDataDispatcher +{ +    public: +        TCPDataDispatcher(size_t max_queue_size); +        ~TCPDataDispatcher(); +        TCPDataDispatcher(const TCPDataDispatcher&) = delete; +        TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete; + +        void start(int port, const std::string& address); +        void write(const std::vector<uint8_t>& data); + +    private: +        void process(void); + +        size_t m_max_queue_size; + +        std::atomic<bool> m_running; +        std::string m_exception_data; +        std::thread m_listener_thread; +        TCPSocket m_listener_socket; +        std::list<TCPConnection> m_connections; +}; + +} diff --git a/src/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h index ab287b2..62f4c96 100644 --- a/src/ThreadsafeQueue.h +++ b/lib/ThreadsafeQueue.h @@ -12,20 +12,18 @@     element out.   */  /* -   This file is part of ODR-DabMux. +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published by +   the Free Software Foundation, either version 3 of the License, or +   (at your option) any later version. -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, +   This program is distributed in the hope that it will be useful,     but WITHOUT ANY WARRANTY; without even the implied warranty of     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the     GNU General Public License for more details.     You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. +   along with this program.  If not, see <https://www.gnu.org/licenses/>.   */  #pragma once diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index fb49efc..3142bb3 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -40,6 +40,7 @@  #include "utils.h"  #include "DabMux.h"  #include "ManagementServer.h" +#include "input/Edi.h"  #include "input/Prbs.h"  #include "input/Zmq.h"  #include "input/File.h" @@ -876,34 +877,46 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,          type = pt.get<string>("type");      }      catch (const ptree_error &e) { -        stringstream ss; -        ss << "Subchannel with uid " << subchanuid << " has no type defined!"; -        throw runtime_error(ss.str()); +        throw runtime_error("Subchannel with uid " + subchanuid + " has no type defined!");      } -    /* Both inputfile and inputuri are supported, and are equivalent. -     * inputuri has precedence +    /* Up to v2.3.1, both inputfile and inputuri are supported, and are +     * equivalent.  inputuri has precedence. +     * +     * After that, either inputfile or the (inputproto, inputuri) pair must be given, but not both.       */      string inputUri = pt.get<string>("inputuri", ""); +    string proto = pt.get<string>("inputproto", ""); -    if (inputUri == "") { +    if (inputUri.empty() and proto.empty()) {          try { +            /* Old approach, derives proto from scheme used in the URL. +             * This makes it impossible to distinguish between ZMQ tcp:// and +             * EDI tcp:// +             */              inputUri = pt.get<string>("inputfile"); +            size_t protopos = inputUri.find("://"); + +            if (protopos == string::npos) { +                proto = "file"; +            } +            else { +                proto = inputUri.substr(0, protopos); + +                if (proto == "tcp" or proto == "epgm" or proto == "ipc") { +                    proto = "zmq"; +                } +                else if (proto == "sti-rtp") { +                    proto = "sti"; +                } +            }          }          catch (const ptree_error &e) { -            stringstream ss; -            ss << "Subchannel with uid " << subchanuid << " has no inputUri defined!"; -            throw runtime_error(ss.str()); +            throw runtime_error("Subchannel with uid " + subchanuid + " has no input defined!");          }      } - -    string proto; -    size_t protopos = inputUri.find("://"); -    if (protopos == string::npos) { -        proto = "file"; -    } -    else { -        proto = inputUri.substr(0, protopos); +    else if (inputUri.empty() or proto.empty()) { +        throw runtime_error("Must define both inputuri and inputproto for uid " + subchanuid);      }      subchan->inputUri = inputUri; @@ -928,7 +941,7 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,                  throw logic_error("Incomplete handling of file input");              }          } -        else if (proto == "tcp"  or proto == "epgm" or proto == "ipc") { +        else if (proto == "zmq") {              auto zmqconfig = setup_zmq_input(pt, subchanuid);              if (type == "audio") { @@ -941,15 +954,11 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,                  rcs.enrol(inzmq.get());                  subchan->input = inzmq;              } - -            if (proto == "epgm") { -                etiLog.level(warn) << "Using untested epgm:// zeromq input"; -            } -            else if (proto == "ipc") { -                etiLog.level(warn) << "Using untested ipc:// zeromq input"; -            }          } -        else if (proto == "sti-rtp") { +        else if (proto == "edi") { +            subchan->input = make_shared<Inputs::Edi>(); +        } +        else if (proto == "stp") {              subchan->input = make_shared<Inputs::Sti_d_Rtp>();          }          else { diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h index 386c23c..d1075a6 100644 --- a/src/DabMultiplexer.h +++ b/src/DabMultiplexer.h @@ -37,8 +37,7 @@  #include "fig/FIGCarousel.h"  #include "crc.h"  #include "utils.h" -#include "UdpSocket.h" -#include "InetAddress.h" +#include "Socket.h"  #include "PcDebug.h"  #include "MuxElements.h"  #include "RemoteControl.h" diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 51f0310..578fc63 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -99,8 +99,7 @@ typedef DWORD32 uint32_t;  #include "dabOutput/dabOutput.h"  #include "crc.h" -#include "UdpSocket.h" -#include "InetAddress.h" +#include "Socket.h"  #include "PcDebug.h"  #include "DabMux.h"  #include "MuxElements.h" @@ -305,7 +304,7 @@ int main(int argc, char *argv[])                          edi_conf.destinations.push_back(dest);                      }                      else if (proto == "tcp") { -                        auto dest = make_shared<edi::tcp_destination_t>(); +                        auto dest = make_shared<edi::tcp_server_t>();                          dest->listen_port = pt_edi_dest.second.get<unsigned int>("listenport");                          dest->max_frames_queued = pt_edi_dest.second.get<size_t>("max_frames_queued", 500);                          edi_conf.destinations.push_back(dest); diff --git a/src/InetAddress.cpp b/src/InetAddress.cpp deleted file mode 100644 index 7660263..0000000 --- a/src/InetAddress.cpp +++ /dev/null @@ -1,155 +0,0 @@ -/* -   Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the -   Queen in Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2016 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#include "InetAddress.h" -#include <iostream> -#include <stdio.h> -#include <errno.h> -#include <string.h> - -#ifdef TRACE_ON -# ifndef TRACE_CLASS -#  define TRACE_CLASS(clas, func) cout <<"-" <<(clas) <<"\t(" <<this <<")::" <<(func) <<endl -#  define TRACE_STATIC(clas, func) cout <<"-" <<(clas) <<"\t(static)::" <<(func) <<endl -# endif -#else -# ifndef TRACE_CLASS -#  define TRACE_CLASS(clas, func) -#  define TRACE_STATIC(clas, func) -# endif -#endif - - -int inetErrNo = 0; -const char *inetErrMsg = nullptr; -const char *inetErrDesc = nullptr; - - -/** - *  Constructs an IP address. - *  @param port The port of this address - *  @param name The name of this address - */ -InetAddress::InetAddress(int port, const char* name) { -    TRACE_CLASS("InetAddress", "InetAddress(int, char)"); -    addr.sin_family = PF_INET; -    addr.sin_addr.s_addr = htons(INADDR_ANY); -    addr.sin_port = htons(port); -    if (name) -        setAddress(name); -} - - -/// Returns the raw IP address of this InetAddress object. -sockaddr *InetAddress::getAddress() { -    TRACE_CLASS("InetAddress", "getAddress()"); -    return (sockaddr *)&addr; -} - - -/// Return the port of this address. -int InetAddress::getPort() -{ -    TRACE_CLASS("InetAddress", "getPort()"); -    return ntohs(addr.sin_port); -} - - -/** - *  Returns the IP address string "%d.%d.%d.%d". - *  @return IP address - */ -const char *InetAddress::getHostAddress() { -    TRACE_CLASS("InetAddress", "getHostAddress()"); -    return inet_ntoa(addr.sin_addr); -} - - -/// Returns true if this address is multicast -bool InetAddress::isMulticastAddress() { -    TRACE_CLASS("InetAddress", "isMulticastAddress()"); -    return IN_MULTICAST(ntohl(addr.sin_addr.s_addr));		// a modifier -} - - -/** - *  Set the port number - *  @param port The new port number - */ -void InetAddress::setPort(int port) -{ -    TRACE_CLASS("InetAddress", "setPort(int)"); -    addr.sin_port = htons(port); -} - - -/** - *  Set the address - *  @param name The new address name - *  @return 0  if ok - *          -1 if error - */ -int InetAddress::setAddress(const std::string& name) -{ -    TRACE_CLASS("InetAddress", "setAddress(string)"); -    if (!name.empty()) { -        if (atoi(name.c_str())) {   // If it start with a number -            if ((addr.sin_addr.s_addr = inet_addr(name.c_str())) == INADDR_NONE) { -                addr.sin_addr.s_addr = htons(INADDR_ANY); -                inetErrNo = 0; -                inetErrMsg = "Invalid address"; -                inetErrDesc = name.c_str(); -                return -1; -            } -        } -        else {            // Assume it's a real name -            hostent *host = gethostbyname(name.c_str()); -            if (host) { -                addr.sin_addr = *(in_addr *)(host->h_addr); -            } else { -                addr.sin_addr.s_addr = htons(INADDR_ANY); -                inetErrNo = 0; -                inetErrMsg = "Could not find address"; -                inetErrDesc = name.c_str(); -                return -1; -            } -        } -    } -    else { -        addr.sin_addr.s_addr = INADDR_ANY; -    } -    return 0; -} - - -void setInetError(const char* description) -{ -    inetErrNo = 0; -    inetErrNo = errno; -    inetErrMsg = strerror(inetErrNo); -    inetErrDesc = description; -} - diff --git a/src/InetAddress.h b/src/InetAddress.h deleted file mode 100644 index e246d4c..0000000 --- a/src/InetAddress.h +++ /dev/null @@ -1,78 +0,0 @@ -/* -   Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the -   Queen in Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2016 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#ifndef _InetAddress -#define _InetAddress - -#ifdef HAVE_CONFIG_H -#   include "config.h" -#endif - -#include <stdlib.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <unistd.h> -#include <netdb.h> -#include <arpa/inet.h> -#include <pthread.h> -#include <string> - -#define SOCKET                  int -#define INVALID_SOCKET          -1 -#define INVALID_PORT            -1 - - -/// The last error number -extern int inetErrNo; -/// The last error message -extern const char *inetErrMsg; -/// The description of the last error -extern const char *inetErrDesc; -/// Set the number, message and description of the last error -void setInetError(const char* description); - - -/** - *  This class represents an Internet Protocol (IP) address. - *  @author Pascal Charest pascal.charest@crc.ca - */ -class InetAddress { - public: -  InetAddress(int port = 0, const char* name = NULL); - -  sockaddr *getAddress(); -  const char *getHostAddress(); -  int getPort(); -  int setAddress(const std::string& name); -  void setPort(int port); -  bool isMulticastAddress(); - - private: -  sockaddr_in addr; -}; - - -#endif diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp deleted file mode 100644 index 3ebe73c..0000000 --- a/src/TcpSocket.cpp +++ /dev/null @@ -1,359 +0,0 @@ -/* -   Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in -   Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2019 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#include "TcpSocket.h" -#include "Log.h" -#include <iostream> -#include <cstdio> -#include <cstring> -#include <cstdint> -#include <signal.h> -#include <errno.h> -#include <poll.h> -#include <thread> - -using namespace std; - -using vec_u8 = std::vector<uint8_t>; - - -TcpSocket::TcpSocket() : -    m_sock(INVALID_SOCKET) -{ -} - -TcpSocket::TcpSocket(int port, const string& name) : -    m_sock(INVALID_SOCKET) -{ -    if (port) { -        if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) { -            throw std::runtime_error("Can't create socket"); -        } - -        reuseopt_t reuse = 1; -        if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) -                == SOCKET_ERROR) { -            throw std::runtime_error("Can't reuse address"); -        } - -#if defined(HAVE_SO_NOSIGPIPE) -        int val = 1; -        if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)) -                == SOCKET_ERROR) { -            throw std::runtime_error("Can't set SO_NOSIGPIPE"); -        } -#endif - -        m_own_address.setAddress(name); -        m_own_address.setPort(port); - -        if (::bind(m_sock, m_own_address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) { -            ::close(m_sock); -            m_sock = INVALID_SOCKET; -            throw std::runtime_error("Can't bind socket"); -        } -    } -} - -TcpSocket::TcpSocket(SOCKET sock, InetAddress own, InetAddress remote) : -    m_own_address(own), -    m_remote_address(remote), -    m_sock(sock) { } - -// The move constructors must ensure the moved-from -// TcpSocket won't destroy our socket handle -TcpSocket::TcpSocket(TcpSocket&& other) -{ -    m_sock = other.m_sock; -    other.m_sock = INVALID_SOCKET; - -    m_own_address = other.m_own_address; -    m_remote_address = other.m_remote_address; -} - -TcpSocket& TcpSocket::operator=(TcpSocket&& other) -{ -    m_sock = other.m_sock; -    other.m_sock = INVALID_SOCKET; - -    m_own_address = other.m_own_address; -    m_remote_address = other.m_remote_address; -    return *this; -} - -/** - *  Close the underlying socket. - *  @return 0  if ok - *          -1 if error - */ -int TcpSocket::close() -{ -    if (m_sock != INVALID_SOCKET) { -        int res = ::close(m_sock); -        if (res != 0) { -            setInetError("Can't close socket"); -            return -1; -        } -        m_sock = INVALID_SOCKET; -    } -    return 0; -} - -TcpSocket::~TcpSocket() -{ -    close(); -} - -bool TcpSocket::isValid() -{ -    return m_sock != INVALID_SOCKET; -} - -ssize_t TcpSocket::recv(void* data, size_t size) -{ -    ssize_t ret = ::recv(m_sock, (char*)data, size, 0); -    if (ret == SOCKET_ERROR) { -        stringstream ss; -        ss << "TCP Socket recv error: " << strerror(errno); -        throw std::runtime_error(ss.str()); -    } -    return ret; -} - - -ssize_t TcpSocket::send(const void* data, size_t size, int timeout_ms) -{ -    if (timeout_ms) { -        struct pollfd fds[1]; -        fds[0].fd = m_sock; -        fds[0].events = POLLOUT; - -        const int retval = poll(fds, 1, timeout_ms); - -        if (retval == -1) { -            stringstream ss; -            ss << "TCP Socket send error on poll(): " << strerror(errno); -            throw std::runtime_error(ss.str()); -        } -        else if (retval == 0) { -            // Timed out -            return 0; -        } -    } - -    /* On Linux, the MSG_NOSIGNAL flag ensures that the process would not -     * receive a SIGPIPE and die. -     * Other systems have SO_NOSIGPIPE set on the socket for the same effect. */ -#if defined(HAVE_MSG_NOSIGNAL) -    const int flags = MSG_NOSIGNAL; -#else -    const int flags = 0; -#endif -    const ssize_t ret = ::send(m_sock, (const char*)data, size, flags); - -    if (ret == SOCKET_ERROR) { -        stringstream ss; -        ss << "TCP Socket send error: " << strerror(errno); -        throw std::runtime_error(ss.str()); -    } -    return ret; -} - -void TcpSocket::listen() -{ -    if (::listen(m_sock, 1) == SOCKET_ERROR) { -        stringstream ss; -        ss << "TCP Socket listen error: " << strerror(errno); -        throw std::runtime_error(ss.str()); -    } -} - -TcpSocket TcpSocket::accept() -{ -    InetAddress remote_addr; -    socklen_t addrLen = sizeof(sockaddr_in); - -    SOCKET socket = ::accept(m_sock, remote_addr.getAddress(), &addrLen); -    if (socket == SOCKET_ERROR) { -        stringstream ss; -        ss << "TCP Socket accept error: " << strerror(errno); -        throw std::runtime_error(ss.str()); -    } -    else { -        TcpSocket client(socket, m_own_address, remote_addr); -        return client; -    } -} - -TcpSocket TcpSocket::accept(int timeout_ms) -{ -    struct pollfd fds[1]; -    fds[0].fd = m_sock; -    fds[0].events = POLLIN | POLLOUT; - -    int retval = poll(fds, 1, timeout_ms); - -    if (retval == -1) { -        stringstream ss; -        ss << "TCP Socket accept error: " << strerror(errno); -        throw std::runtime_error(ss.str()); -    } -    else if (retval) { -        return accept(); -    } -    else { -        TcpSocket invalidsock(0, ""); -        return invalidsock; -    } -} - - -InetAddress TcpSocket::getOwnAddress() const -{ -    return m_own_address; -} - -InetAddress TcpSocket::getRemoteAddress() const -{ -    return m_remote_address; -} - - -TCPConnection::TCPConnection(TcpSocket&& sock) : -            queue(), -            m_running(true), -            m_sender_thread(), -            m_sock(move(sock)) -{ -    auto own_addr = m_sock.getOwnAddress(); -    auto addr = m_sock.getRemoteAddress(); -    etiLog.level(debug) << "New TCP Connection on port " << -        own_addr.getPort() << " from " << -        addr.getHostAddress() << ":" << addr.getPort(); -    m_sender_thread = std::thread(&TCPConnection::process, this); -} - -TCPConnection::~TCPConnection() -{ -    m_running = false; -    vec_u8 termination_marker; -    queue.push(termination_marker); -    m_sender_thread.join(); -} - -void TCPConnection::process() -{ -    while (m_running) { -        vec_u8 data; -        queue.wait_and_pop(data); - -        if (data.empty()) { -            // empty vector is the termination marker -            m_running = false; -            break; -        } - -        try { -            ssize_t remaining = data.size(); -            const uint8_t *buf = reinterpret_cast<const uint8_t*>(data.data()); -            const int timeout_ms = 10; // Less than one ETI frame - -            while (m_running and remaining > 0) { -                const ssize_t sent = m_sock.send(buf, remaining, timeout_ms); -                if (sent < 0 or sent > remaining) { -                    throw std::logic_error("Invalid TcpSocket::send() return value"); -                } -                remaining -= sent; -                buf += sent; -            } -        } -        catch (const std::runtime_error& e) { -            m_running = false; -        } -    } - - -    auto own_addr = m_sock.getOwnAddress(); -    auto addr = m_sock.getRemoteAddress(); -    etiLog.level(debug) << "Dropping TCP Connection on port " << -        own_addr.getPort() << " from " << -        addr.getHostAddress() << ":" << addr.getPort(); -} - - -TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) : -    m_max_queue_size(max_queue_size) -{ -} - -TCPDataDispatcher::~TCPDataDispatcher() -{ -    m_running = false; -    m_connections.clear(); -    m_listener_socket.close(); -    m_listener_thread.join(); -} - -void TCPDataDispatcher::start(int port, const string& address) -{ -    TcpSocket sock(port, address); -    m_listener_socket = move(sock); - -    m_running = true; -    m_listener_thread = std::thread(&TCPDataDispatcher::process, this); -} - -void TCPDataDispatcher::write(const vec_u8& data) -{ -    for (auto& connection : m_connections) { -        connection.queue.push(data); -    } - -    m_connections.remove_if( -            [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; }); -} - -void TCPDataDispatcher::process() -{ -    try { -        m_listener_socket.listen(); - -        const int timeout_ms = 1000; - -        while (m_running) { -            // Add a new TCPConnection to the list, constructing it from the client socket -            auto sock = m_listener_socket.accept(timeout_ms); -            if (sock.isValid()) { -                m_connections.emplace(m_connections.begin(), move(sock)); -            } -        } -    } -    catch (const std::runtime_error& e) { -        etiLog.level(error) << "TCPDataDispatcher caught runtime error: " << e.what(); -        m_running = false; -    } -} - diff --git a/src/TcpSocket.h b/src/TcpSocket.h deleted file mode 100644 index ec7afd3..0000000 --- a/src/TcpSocket.h +++ /dev/null @@ -1,164 +0,0 @@ -/* -   Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in -   Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2019 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#ifndef _TCPSOCKET -#define _TCPSOCKET - -#ifdef HAVE_CONFIG_H -#   include "config.h" -#endif - -#include "InetAddress.h" -#include "ThreadsafeQueue.h" -#include <sys/socket.h> -#include <netinet/in.h> -#include <unistd.h> -#include <netdb.h> -#include <arpa/inet.h> -#include <pthread.h> -#define SOCKET           int -#define INVALID_SOCKET   -1 -#define SOCKET_ERROR     -1 -#define reuseopt_t       int - -#include <iostream> -#include <string> -#include <vector> -#include <memory> -#include <atomic> -#include <thread> -#include <list> - -/** - *  This class represents a TCP socket. - */ -class TcpSocket -{ -    public: -        /** Create a new socket that does nothing */ -        TcpSocket(); - -        /** Create a new socket listening for incoming connections. -         *  @param port The port number on which the socket will listen. -         *  @param name The IP address on which the socket will be bound. -         *              It is used to bind the socket on a specific interface if -         *              the computer have many NICs. -         */ -        TcpSocket(int port, const std::string& name); -        ~TcpSocket(); -        TcpSocket(TcpSocket&& other); -        TcpSocket& operator=(TcpSocket&& other); -        TcpSocket(const TcpSocket& other) = delete; -        TcpSocket& operator=(const TcpSocket& other) = delete; - -        bool isValid(void); - -        int close(void); - -        /** Send data over the TCP connection. -         *  @param data The buffer that will be sent. -         *  @param size Number of bytes to send. -         *  @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout -         *  return number of bytes sent, 0 on timeout, or throws runtime_error. -         */ -        ssize_t send(const void* data, size_t size, int timeout_ms=0); - -        /** Receive data from the socket. -         *  @param data The buffer that will receive data. -         *  @param size The buffer size. -         *  @return number of bytes received or -1 (SOCKET_ERROR) if error -         */ -        ssize_t recv(void* data, size_t size); - -        void listen(void); -        TcpSocket accept(void); - -        /* Returns either valid socket if a connection was -         * accepted before the timeout expired, or an invalid -         * socket otherwise. -         */ -        TcpSocket accept(int timeout_ms); - -        /** Retrieve address this socket is bound to */ -        InetAddress getOwnAddress() const; -        InetAddress getRemoteAddress() const; - -    private: -        TcpSocket(SOCKET sock, InetAddress own, InetAddress remote); - -        /// The address on which the socket is bound. -        InetAddress m_own_address; -        InetAddress m_remote_address; -        /// The low-level socket used by system functions. -        SOCKET m_sock; -}; - -/* Helper class for TCPDataDispatcher, contains a queue of pending data and - * a sender thread. */ -class TCPConnection -{ -    public: -        TCPConnection(TcpSocket&& sock); -        TCPConnection(const TCPConnection&) = delete; -        TCPConnection& operator=(const TCPConnection&) = delete; -        ~TCPConnection(); - -        ThreadsafeQueue<std::vector<uint8_t> > queue; - -    private: -        std::atomic<bool> m_running; -        std::thread m_sender_thread; -        TcpSocket m_sock; - -        void process(void); -}; - -/* Send a TCP stream to several destinations, and automatically disconnect destinations - * whose buffer overflows. - */ -class TCPDataDispatcher -{ -    public: -        TCPDataDispatcher(size_t max_queue_size); -        ~TCPDataDispatcher(); -        TCPDataDispatcher(const TCPDataDispatcher&) = delete; -        TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete; - -        void start(int port, const std::string& address); -        void write(const std::vector<uint8_t>& data); - -    private: -        void process(void); - -        size_t m_max_queue_size; - -        std::atomic<bool> m_running; -        std::thread m_listener_thread; -        TcpSocket m_listener_socket; -        std::list<TCPConnection> m_connections; -}; - -#endif // _TCPSOCKET diff --git a/src/UdpSocket.cpp b/src/UdpSocket.cpp deleted file mode 100644 index 3d015ec..0000000 --- a/src/UdpSocket.cpp +++ /dev/null @@ -1,256 +0,0 @@ -/* -   Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the -   Queen in Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2017 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#include "UdpSocket.h" - -#include <iostream> -#include <stdio.h> -#include <errno.h> -#include <fcntl.h> -#include <string.h> - -using namespace std; - -UdpSocket::UdpSocket() : -    listenSocket(INVALID_SOCKET) -{ -    reinit(0, ""); -} - -UdpSocket::UdpSocket(int port) : -    listenSocket(INVALID_SOCKET) -{ -    reinit(port, ""); -} - -UdpSocket::UdpSocket(int port, const std::string& name) : -    listenSocket(INVALID_SOCKET) -{ -    reinit(port, name); -} - - -int UdpSocket::setBlocking(bool block) -{ -    int res; -    if (block) -        res = fcntl(listenSocket, F_SETFL, 0); -    else -        res = fcntl(listenSocket, F_SETFL, O_NONBLOCK); -    if (res == SOCKET_ERROR) { -        setInetError("Can't change blocking state of socket"); -        return -1; -    } -    return 0; -} - -int UdpSocket::reinit(int port, const std::string& name) -{ -    if (listenSocket != INVALID_SOCKET) { -        ::close(listenSocket); -    } - -    if ((listenSocket = socket(PF_INET, SOCK_DGRAM, 0)) == INVALID_SOCKET) { -        setInetError("Can't create socket"); -        return -1; -    } -    reuseopt_t reuse = 1; -    if (setsockopt(listenSocket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) -            == SOCKET_ERROR) { -        setInetError("Can't reuse address"); -        return -1; -    } - -    if (port) { -        address.setAddress(name); -        address.setPort(port); - -        if (::bind(listenSocket, address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) { -            setInetError("Can't bind socket"); -            ::close(listenSocket); -            listenSocket = INVALID_SOCKET; -            return -1; -        } -    } -    return 0; -} - -int UdpSocket::close() -{ -    if (listenSocket != INVALID_SOCKET) { -        ::close(listenSocket); -    } - -    listenSocket = INVALID_SOCKET; - -    return 0; -} - -UdpSocket::~UdpSocket() -{ -    if (listenSocket != INVALID_SOCKET) { -        ::close(listenSocket); -    } -} - - -int UdpSocket::receive(UdpPacket& packet) -{ -    socklen_t addrSize; -    addrSize = sizeof(*packet.getAddress().getAddress()); -    ssize_t ret = recvfrom(listenSocket, -            packet.getData(), -            packet.getSize(), -            0, -            packet.getAddress().getAddress(), -            &addrSize); - -    if (ret == SOCKET_ERROR) { -        packet.setSize(0); -        if (errno == EAGAIN) { -            return 0; -        } -        setInetError("Can't receive UDP packet"); -        return -1; -    } - -    packet.setSize(ret); -    return 0; -} - -int UdpSocket::send(UdpPacket& packet) -{ -    int ret = sendto(listenSocket, packet.getData(), packet.getSize(), 0, -            packet.getAddress().getAddress(), sizeof(*packet.getAddress().getAddress())); -    if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { -        setInetError("Can't send UDP packet"); -        return -1; -    } -    return 0; -} - - -int UdpSocket::send(const std::vector<uint8_t>& data, InetAddress destination) -{ -    int ret = sendto(listenSocket, &data[0], data.size(), 0, -            destination.getAddress(), sizeof(*destination.getAddress())); -    if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { -        setInetError("Can't send UDP packet"); -        return -1; -    } -    return 0; -} - - -/** - *  Must be called to receive data on a multicast address. - *  @param groupname The multica -st address to join. - *  @return 0 if ok, -1 if error - */ -int UdpSocket::joinGroup(char* groupname) -{ -    ip_mreqn group; -    if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { -        setInetError(groupname); -        return -1; -    } -    if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { -        setInetError("Not a multicast address"); -        return -1; -    } -    group.imr_address.s_addr = htons(INADDR_ANY);; -    group.imr_ifindex = 0; -    if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) -            == SOCKET_ERROR) { -        setInetError("Can't join multicast group"); -    } -    return 0; -} - -int UdpSocket::setMulticastTTL(int ttl) -{ -    if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) -            == SOCKET_ERROR) { -        setInetError("Can't set ttl"); -        return -1; -    } - -    return 0; -} - -int UdpSocket::setMulticastSource(const char* source_addr) -{ -    struct in_addr addr; -    if (inet_aton(source_addr, &addr) == 0) { -        setInetError("Can't parse source address"); -        return -1; -    } - -    if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) -            == SOCKET_ERROR) { -        setInetError("Can't set source address"); -        return -1; -    } - -    return 0; -} - -UdpPacket::UdpPacket() { } - -UdpPacket::UdpPacket(size_t initSize) : -    m_buffer(initSize) -{ } - - -void UdpPacket::setSize(size_t newSize) -{ -    m_buffer.resize(newSize); -} - - -uint8_t* UdpPacket::getData() -{ -    return &m_buffer[0]; -} - - -void UdpPacket::addData(const void *data, size_t size) -{ -    uint8_t *d = (uint8_t*)data; -    std::copy(d, d + size, std::back_inserter(m_buffer)); -} - -size_t UdpPacket::getSize() -{ -    return m_buffer.size(); -} - -InetAddress UdpPacket::getAddress() -{ -    return address; -} - diff --git a/src/UdpSocket.h b/src/UdpSocket.h deleted file mode 100644 index f51e87c..0000000 --- a/src/UdpSocket.h +++ /dev/null @@ -1,174 +0,0 @@ -/* -   Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the -   Queen in Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2017 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#pragma once - -#ifdef HAVE_CONFIG_H -#   include "config.h" -#endif - -#include "InetAddress.h" -#include <sys/socket.h> -#include <netinet/in.h> -#include <unistd.h> -#include <netdb.h> -#include <arpa/inet.h> -#include <pthread.h> -#define SOCKET           int -#define INVALID_SOCKET   -1 -#define SOCKET_ERROR     -1 -#define reuseopt_t       int - -#include <stdlib.h> -#include <iostream> -#include <vector> - -class UdpPacket; - - -/** - *  This class represents a socket for sending and receiving UDP packets. - * - *  A UDP socket is the sending or receiving point for a packet delivery service. - *  Each packet sent or received on a datagram socket is individually - *  addressed and routed. Multiple packets sent from one machine to another may - *  be routed differently, and may arrive in any order. - */ -class UdpSocket -{ -    public: -        /** Create a new socket that will not be bound to any port. To be used -         * for data output. -         */ -        UdpSocket(); -        /** Create a new socket. -         *  @param port The port number on which the socket will be bound -         */ -        UdpSocket(int port); -        /** Create a new socket. -         *  @param port The port number on which the socket will be bound -         *  @param name The IP address on which the socket will be bound. -         *              It is used to bind the socket on a specific interface if -         *              the computer have many NICs. -         */ -        UdpSocket(int port, const std::string& name); -        ~UdpSocket(); -        UdpSocket(const UdpSocket& other) = delete; -        const UdpSocket& operator=(const UdpSocket& other) = delete; - -        /** reinitialise socket. Close the already open socket, and -         * create a new one -         */ -        int reinit(int port, const std::string& name); - -        /** Close the socket -         */ -        int close(void); - -        /** Send an UDP packet. -         *  @param packet The UDP packet to be sent. It includes the data and the -         *                destination address -         *  return 0 if ok, -1 if error -         */ -        int send(UdpPacket& packet); - -        /** Send an UDP packet -         * -         *  return 0 if ok, -1 if error -         */ -        int send(const std::vector<uint8_t>& data, InetAddress destination); - -        /** Receive an UDP packet. -         *  @param packet The packet that will receive the data. The address will be set -         *                to the source address. -         *  @return 0 if ok, -1 if error -         */ -        int receive(UdpPacket& packet); - -        int joinGroup(char* groupname); -        int setMulticastSource(const char* source_addr); -        int setMulticastTTL(int ttl); - -        /** Set blocking mode. By default, the socket is blocking. -         *  @return 0  if ok -         *          -1 if error -         */ -        int setBlocking(bool block); - -    protected: - -        /// The address on which the socket is bound. -        InetAddress address; -        /// The low-level socket used by system functions. -        SOCKET listenSocket; -}; - -/** This class represents a UDP packet. - * - *  A UDP packet contains a payload (sequence of bytes) and an address. For - *  outgoing packets, the address is the destination address. For incoming - *  packets, the address tells the user from what source the packet arrived from. - */ -class UdpPacket -{ -    public: -        /** Construct an empty UDP packet. -         */ -        UdpPacket(); -        UdpPacket(size_t initSize); - -        /** Give the pointer to data. -         *  @return The pointer -         */ -        uint8_t* getData(void); - -        /** Append some data at the end of data buffer and adjust size. -         *  @param data Pointer to the data to add -         *  @param size Size in bytes of new data -         */ -        void addData(const void *data, size_t size); - -        size_t getSize(void); - -        /** Changes size of the data buffer size. Keeps data intact unless -         *  truncated. -         */ -        void setSize(size_t newSize); - -        /** Returns the UDP address of the packet. -         */ -        InetAddress getAddress(void); - -        const std::vector<uint8_t>& getBuffer(void) const { -            return m_buffer; -        } - - -    private: -        std::vector<uint8_t> m_buffer; -        InetAddress address; -}; - diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h index 9cc18d7..c7e570b 100644 --- a/src/dabOutput/dabOutput.h +++ b/src/dabOutput/dabOutput.h @@ -28,8 +28,7 @@  #pragma once -#include "UdpSocket.h" -#include "TcpSocket.h" +#include "Socket.h"  #include "Log.h"  #include "string.h"  #include <stdexcept> @@ -57,6 +56,8 @@ class DabOutput          {              return Open(name.c_str());          } + +        // Return -1 on failure          virtual int Write(void* buffer, int size) = 0;          virtual int Close() = 0; @@ -145,15 +146,7 @@ class DabOutputRaw : public DabOutput  class DabOutputUdp : public DabOutput  {      public: -        DabOutputUdp() { -            packet_ = new UdpPacket(6144); -            socket_ = new UdpSocket(); -        } - -        virtual ~DabOutputUdp() { -            delete socket_; -            delete packet_; -        } +        DabOutputUdp();          int Open(const char* name);          int Write(void* buffer, int size); @@ -171,8 +164,8 @@ class DabOutputUdp : public DabOutput          DabOutputUdp operator=(const DabOutputUdp& other) = delete;          std::string uri_; -        UdpSocket* socket_; -        UdpPacket* packet_; +        Socket::UDPSocket socket_; +        Socket::UDPPacket packet_;  };  // -------------- TCP ------------------ @@ -190,7 +183,7 @@ class DabOutputTcp : public DabOutput      private:          std::string uri_; -        std::shared_ptr<TCPDataDispatcher> dispatcher_; +        std::shared_ptr<Socket::TCPDataDispatcher> dispatcher_;  };  // -------------- Simul ------------------ diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp index 87dbfd5..4dc3538 100644 --- a/src/dabOutput/dabOutputTcp.cpp +++ b/src/dabOutput/dabOutputTcp.cpp @@ -94,7 +94,7 @@ int DabOutputTcp::Open(const char* name)      uri_ = name;      if (success) { -        dispatcher_ = make_shared<TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES); +        dispatcher_ = make_shared<Socket::TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES);          dispatcher_->start(port, address);      }      else { diff --git a/src/dabOutput/dabOutputUdp.cpp b/src/dabOutput/dabOutputUdp.cpp index c129569..b9c22db 100644 --- a/src/dabOutput/dabOutputUdp.cpp +++ b/src/dabOutput/dabOutputUdp.cpp @@ -38,18 +38,12 @@  #include <cstdio>  #include <limits.h>  #include "dabOutput.h" -#include "UdpSocket.h" - -#ifdef _WIN32 -#   include <fscfg.h> -#   include <sdci.h> -#else -#   include <netinet/in.h> -#   include <sys/types.h> -#   include <sys/socket.h> -#   include <sys/ioctl.h> -#   include <net/if_arp.h> -#endif +#include "Socket.h" + +DabOutputUdp::DabOutputUdp() : +    socket_(), +    packet_(6144) +{ }  int DabOutputUdp::Open(const char* name)  { @@ -64,12 +58,6 @@ int DabOutputUdp::Open(const char* name)                  regex_constants::match_default)) {          string address = what[1]; -        if (this->packet_->getAddress().setAddress(address.c_str()) == -1) { -            etiLog.level(error) << "can't set address " << -               address <<  "(" << inetErrDesc << ": " << inetErrMsg << ")"; -            return -1; -        } -          string port_str = what[2];          long port = std::strtol(port_str.c_str(), nullptr, 0); @@ -79,7 +67,7 @@ int DabOutputUdp::Open(const char* name)              return -1;          } -        this->packet_->getAddress().setPort(port); +        packet_.address.resolveUdpDestination(address, port);          string query_params = what[3];          smatch query_what; @@ -87,28 +75,25 @@ int DabOutputUdp::Open(const char* name)                      regex_constants::match_default)) {              string src = query_what[1]; -            int err = socket_->setMulticastSource(src.c_str()); -            if (err) { -                etiLog.level(error) << "UDP output socket set source failed!"; -                return -1; -            } +            try { +                socket_.setMulticastSource(src.c_str()); -            string ttl_str = query_what[2]; +                string ttl_str = query_what[2]; -            if (not ttl_str.empty()) { -                long ttl = std::strtol(ttl_str.c_str(), nullptr, 0); -                if ((ttl <= 0) || (ttl >= 255)) { -                    etiLog.level(error) << "Invalid TTL setting in " << -                        uri_without_proto; -                    return -1; -                } +                if (not ttl_str.empty()) { +                    long ttl = std::strtol(ttl_str.c_str(), nullptr, 0); +                    if ((ttl <= 0) || (ttl >= 255)) { +                        etiLog.level(error) << "Invalid TTL setting in " << +                            uri_without_proto; +                        return -1; +                    } -                err = socket_->setMulticastTTL(ttl); -                if (err) { -                    etiLog.level(error) << "UDP output socket set TTL failed!"; -                    return -1; +                    socket_.setMulticastTTL(ttl);                  }              } +            catch (const std::runtime_error& e) { +                etiLog.level(error) << "Failed to set UDP output settings" << e.what(); +            }          }          else if (not query_params.empty()) {              etiLog.level(error) << "UDP output: could not parse parameters " << @@ -129,9 +114,11 @@ int DabOutputUdp::Open(const char* name)  int DabOutputUdp::Write(void* buffer, int size)  { -    this->packet_->setSize(0); -    this->packet_->addData(buffer, size); -    return this->socket_->send(*this->packet_); +    const uint8_t *buf = reinterpret_cast<uint8_t*>(buffer); +    packet_.buffer.resize(0); +    std::copy(buf, buf + size, std::back_inserter(packet_.buffer)); +    socket_.send(packet_); +    return 0;  }  #endif // defined(HAVE_OUTPUT_UDP) diff --git a/src/dabOutput/edi/Config.h b/src/dabOutput/edi/Config.h index 55d5f0f..0c7dce8 100644 --- a/src/dabOutput/edi/Config.h +++ b/src/dabOutput/edi/Config.h @@ -50,11 +50,18 @@ struct udp_destination_t : public destination_t {  };  // TCP server that can accept multiple connections -struct tcp_destination_t : public destination_t { +struct tcp_server_t : public destination_t {      unsigned int listen_port = 0;      size_t max_frames_queued = 1024;  }; +// TCP client that connects to one endpoint +struct tcp_client_t : public destination_t { +    std::string dest_addr; +    unsigned int dest_port = 0; +    size_t max_frames_queued = 1024; +}; +  struct configuration_t {      unsigned chunk_len = 207;        // RSk, data length of each chunk      unsigned fec       = 0;          // number of fragments that can be recovered diff --git a/src/dabOutput/edi/PFT.cpp b/src/dabOutput/edi/PFT.cpp index 5b93016..63dfa34 100644 --- a/src/dabOutput/edi/PFT.cpp +++ b/src/dabOutput/edi/PFT.cpp @@ -314,7 +314,7 @@ std::vector< PFTFragment > PFT::Assemble(AFPacket af_packet)  #if 0          fprintf(stderr, "* PFT pseq %d, findex %d, fcount %d, plen %d\n", -                m_pseq, findex, fcount, plen & ~0x8000); +                m_pseq, findex, fcount, plen & ~0xC000);  #endif      } diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp index d99e987..187aabe 100644 --- a/src/dabOutput/edi/Transport.cpp +++ b/src/dabOutput/edi/Transport.cpp @@ -45,12 +45,16 @@ void configuration_t::print() const              }              etiLog.level(info) << "  source port " << udp_dest->source_port;          } -        else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) { +        else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {              etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port;              etiLog.level(info) << "  max frames queued    " << tcp_dest->max_frames_queued;          } +        else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { +            etiLog.level(info) << " TCP client connecting to " << tcp_dest->dest_addr << ":" << tcp_dest->dest_port; +            etiLog.level(info) << "  max frames queued    " << tcp_dest->max_frames_queued; +        }          else { -            throw std::logic_error("EDI destination not implemented"); +            throw logic_error("EDI destination not implemented");          }      }      if (interleaver_enabled()) { @@ -69,28 +73,27 @@ Sender::Sender(const configuration_t& conf) :      for (const auto& edi_dest : m_conf.destinations) {          if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) { -            auto udp_socket = std::make_shared<UdpSocket>(udp_dest->source_port); +            auto udp_socket = std::make_shared<Socket::UDPSocket>(udp_dest->source_port);              if (not udp_dest->source_addr.empty()) { -                int err = udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); -                if (err) { -                    throw runtime_error("EDI socket set source failed!"); -                } -                err = udp_socket->setMulticastTTL(udp_dest->ttl); -                if (err) { -                    throw runtime_error("EDI socket set TTL failed!"); -                } +                udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); +                udp_socket->setMulticastTTL(udp_dest->ttl);              }              udp_sockets.emplace(udp_dest.get(), udp_socket);          } -        else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) { -            auto dispatcher = make_shared<TCPDataDispatcher>(tcp_dest->max_frames_queued); +        else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) { +            auto dispatcher = make_shared<Socket::TCPDataDispatcher>(tcp_dest->max_frames_queued);              dispatcher->start(tcp_dest->listen_port, "0.0.0.0");              tcp_dispatchers.emplace(tcp_dest.get(), dispatcher);          } +        else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { +            auto tcp_socket = make_shared<Socket::TCPSocket>(); +            tcp_socket->connect(tcp_dest->dest_addr, tcp_dest->dest_port); +            tcp_senders.emplace(tcp_dest.get(), tcp_socket); +        }          else { -            throw std::logic_error("EDI destination not implemented"); +            throw logic_error("EDI destination not implemented");          }      } @@ -117,7 +120,7 @@ void Sender::write(const TagPacket& tagpacket)          vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet);          if (m_conf.verbose) { -            fprintf(stderr, "EDI number of PFT fragment before interleaver %zu", +            fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n",                      edi_fragments.size());          } @@ -129,28 +132,30 @@ void Sender::write(const TagPacket& tagpacket)          for (const auto& edi_frag : edi_fragments) {              for (auto& dest : m_conf.destinations) {                  if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { -                    InetAddress addr; -                    addr.setAddress(udp_dest->dest_addr.c_str()); -                    addr.setPort(m_conf.dest_port); +                    Socket::InetAddress addr; +                    addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port);                      udp_sockets.at(udp_dest.get())->send(edi_frag, addr);                  } -                else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) { +                else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {                      tcp_dispatchers.at(tcp_dest.get())->write(edi_frag);                  } +                else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { +                    tcp_senders.at(tcp_dest.get())->sendall(edi_frag.data(), edi_frag.size()); +                }                  else { -                    throw std::logic_error("EDI destination not implemented"); +                    throw logic_error("EDI destination not implemented");                  }              }              if (m_conf.dump) { -                std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); -                std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator); +                ostream_iterator<uint8_t> debug_iterator(edi_debug_file); +                copy(edi_frag.begin(), edi_frag.end(), debug_iterator);              }          }          if (m_conf.verbose) { -            fprintf(stderr, "EDI number of PFT fragments %zu", +            fprintf(stderr, "EDI number of PFT fragments %zu\n",                      edi_fragments.size());          }      } @@ -158,23 +163,25 @@ void Sender::write(const TagPacket& tagpacket)          // Send over ethernet          for (auto& dest : m_conf.destinations) {              if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { -                InetAddress addr; -                addr.setAddress(udp_dest->dest_addr.c_str()); -                addr.setPort(m_conf.dest_port); +                Socket::InetAddress addr; +                addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port);                  udp_sockets.at(udp_dest.get())->send(af_packet, addr);              } -            else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) { +            else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {                  tcp_dispatchers.at(tcp_dest.get())->write(af_packet);              } +            else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { +                tcp_senders.at(tcp_dest.get())->sendall(af_packet.data(), af_packet.size()); +            }              else { -                throw std::logic_error("EDI destination not implemented"); +                throw logic_error("EDI destination not implemented");              }          }          if (m_conf.dump) { -            std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); -            std::copy(af_packet.begin(), af_packet.end(), debug_iterator); +            ostream_iterator<uint8_t> debug_iterator(edi_debug_file); +            copy(af_packet.begin(), af_packet.end(), debug_iterator);          }      }  } diff --git a/src/dabOutput/edi/Transport.h b/src/dabOutput/edi/Transport.h index 7b0a0db..9633275 100644 --- a/src/dabOutput/edi/Transport.h +++ b/src/dabOutput/edi/Transport.h @@ -32,11 +32,12 @@  #include "AFPacket.h"  #include "PFT.h"  #include "Interleaver.h" +#include "Socket.h"  #include <vector>  #include <unordered_map>  #include <stdexcept> +#include <fstream>  #include <cstdint> -#include "dabOutput/dabOutput.h"  namespace edi { @@ -61,8 +62,9 @@ class Sender {          // To mitigate for burst packet loss, PFT fragments can be sent out-of-order          edi::Interleaver edi_interleaver; -        std::unordered_map<udp_destination_t*, std::shared_ptr<UdpSocket>> udp_sockets; -        std::unordered_map<tcp_destination_t*, std::shared_ptr<TCPDataDispatcher>> tcp_dispatchers; +        std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets; +        std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers; +        std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSocket>> tcp_senders;  };  } diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp index 2cb49e7..5d4f964 100644 --- a/src/input/Udp.cpp +++ b/src/input/Udp.cpp @@ -82,17 +82,8 @@ void Udp::openUdpSocket(const std::string& endpoint)          throw out_of_range("can't use port number 0 in udp address");      } -    if (m_sock.reinit(port, address) == -1) { -        stringstream ss; -        ss << "Could not init UDP socket: " << inetErrMsg; -        throw runtime_error(ss.str()); -    } - -    if (m_sock.setBlocking(false) == -1) { -        stringstream ss; -        ss << "Could not set non-blocking UDP socket: " << inetErrMsg; -        throw runtime_error(ss.str()); -    } +    m_sock.reinit(port, address); +    m_sock.setBlocking(false);      etiLog.level(info) << "Opened UDP port " << address << ":" << port;  } @@ -100,17 +91,9 @@ void Udp::openUdpSocket(const std::string& endpoint)  int Udp::readFrame(uint8_t* buffer, size_t size)  {      // Regardless of buffer contents, try receiving data. -    UdpPacket packet(32768); -    int ret = m_sock.receive(packet); - -    if (ret == -1) { -        stringstream ss; -        ss << "Could not read from UDP socket: " << inetErrMsg; -        throw runtime_error(ss.str()); -    } +    auto packet = m_sock.receive(32768); -    std::copy(packet.getData(), packet.getData() + packet.getSize(), -            back_inserter(m_buffer)); +    std::copy(packet.buffer.cbegin(), packet.buffer.cend(), back_inserter(m_buffer));      // Take data from the buffer if it contains enough data,      // in any case write the buffer @@ -136,7 +119,8 @@ int Udp::setBitrate(int bitrate)  int Udp::close()  { -    return m_sock.close(); +    m_sock.close(); +    return 0;  } @@ -167,8 +151,8 @@ static uint16_t unpack2(const uint8_t *buf)  int Sti_d_Rtp::open(const std::string& name)  { -    // Skip the sti-rtp:// part if it is present -    const string endpoint = (name.substr(0, 10) == "sti-rtp://") ? +    // Skip the rtp:// part if it is present +    const string endpoint = (name.substr(0, 10) == "rtp://") ?          name.substr(10) : name;      // The endpoint should be address:port @@ -176,8 +160,8 @@ int Sti_d_Rtp::open(const std::string& name)      if (colon_pos == string::npos) {          stringstream ss;          ss << "'" << name << -                " is an invalid format for sti-rtp address: " -                "expected [sti-rtp://]address:port"; +                " is an invalid format for rtp address: " +                "expected [rtp://]address:port";          throw invalid_argument(ss.str());      } @@ -190,29 +174,22 @@ int Sti_d_Rtp::open(const std::string& name)  void Sti_d_Rtp::receive_packet()  { -    UdpPacket packet(32768); -    int ret = m_sock.receive(packet); - -    if (ret == -1) { -        stringstream ss; -        ss << "Could not read from UDP socket: " << inetErrMsg; -        throw runtime_error(ss.str()); -    } +    auto packet = m_sock.receive(32768); -    if (packet.getSize() == 0) { +    if (packet.buffer.empty()) {          // No packet was received          return;      }      const size_t STI_FC_LEN = 8; -    if (packet.getSize() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) { +    if (packet.buffer.size() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) {          etiLog.level(info) << "Received too small RTP packet for " <<              m_name;          return;      } -    if (not rtpHeaderValid(packet.getData())) { +    if (not rtpHeaderValid(packet.buffer.data())) {          etiLog.level(info) << "Received invalid RTP header for " <<              m_name;          return; @@ -220,7 +197,7 @@ void Sti_d_Rtp::receive_packet()      //  STI(PI, X)      size_t index = RTP_HEADER_LEN; -    const uint8_t *buf = packet.getData(); +    const uint8_t *buf = packet.buffer.data();      //   SYNC      index++; // Advance over STAT @@ -242,7 +219,7 @@ void Sti_d_Rtp::receive_packet()              m_name;          return;      } -    if (packet.getSize() < index + DFS) { +    if (packet.buffer.size() < index + DFS) {          etiLog.level(info) << "Received STI too small for given DFS for " <<              m_name;          return; @@ -270,9 +247,9 @@ void Sti_d_Rtp::receive_packet()      uint16_t NST   = unpack2(buf+index) & 0x7FF; // 11 bits      index += 2; -    if (packet.getSize() < index + 4*NST) { +    if (packet.buffer.size() < index + 4*NST) {          etiLog.level(info) << "Received STI too small to contain NST for " << -            m_name << " packet: " << packet.getSize() << " need " << +            m_name << " packet: " << packet.buffer.size() << " need " <<              index + 4*NST;          return;      } diff --git a/src/input/Udp.h b/src/input/Udp.h index dc01486..dd637c6 100644 --- a/src/input/Udp.h +++ b/src/input/Udp.h @@ -31,7 +31,7 @@  #include <deque>  #include <boost/thread.hpp>  #include "input/inputs.h" -#include "UdpSocket.h" +#include "Socket.h"  namespace Inputs { @@ -46,7 +46,7 @@ class Udp : public InputBase {          virtual int close();      protected: -        UdpSocket m_sock; +        Socket::UDPSocket m_sock;          std::string m_name;          void openUdpSocket(const std::string& endpoint);  | 
