From 899dcb83ec873cb35d38583d6f48922e1312e9be Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 25 Jun 2019 17:13:27 +0200 Subject: Replace socket library --- Makefile.am | 10 +- lib/Socket.cpp | 894 ++++++++++++++++++++++++++++++++++++++++++++++++++ lib/Socket.h | 294 +++++++++++++++++ lib/ThreadsafeQueue.h | 176 ++++++++++ src/AVTEDIInput.cpp | 738 ----------------------------------------- src/AVTEDIInput.h | 188 ----------- src/AVTInput.cpp | 261 ++++----------- src/AVTInput.h | 24 +- src/UdpSocket.cpp | 510 ---------------------------- src/UdpSocket.h | 138 -------- 10 files changed, 1442 insertions(+), 1791 deletions(-) create mode 100644 lib/Socket.cpp create mode 100644 lib/Socket.h create mode 100644 lib/ThreadsafeQueue.h delete mode 100644 src/AVTEDIInput.cpp delete mode 100644 src/AVTEDIInput.h delete mode 100644 src/UdpSocket.cpp delete mode 100644 src/UdpSocket.h diff --git a/Makefile.am b/Makefile.am index 9594ae2..aff4694 100644 --- a/Makefile.am +++ b/Makefile.am @@ -9,13 +9,11 @@ endif odr_sourcecompanion_LDADD = -lzmq odr_sourcecompanion_CFLAGS = $(GITVERSION_FLAGS) -ggdb -O2 -Wall -odr_sourcecompanion_CXXFLAGS = $(GITVERSION_FLAGS) -ggdb -O2 -Wall -Isrc/fec +odr_sourcecompanion_CXXFLAGS = $(GITVERSION_FLAGS) -ggdb -O2 -Wall -Isrc/fec -Ilib odr_sourcecompanion_SOURCES = src/odr-sourcecompanion.cpp \ src/AACDecoder.h src/AACDecoder.cpp \ - src/AVTInput.h src/AVTInput.cpp \ - src/InetAddress.h src/InetAddress.cpp \ + src/AVTInput.h src/AVTInput.cpp \ src/OrderedQueue.h src/OrderedQueue.cpp \ - src/UdpSocket.h src/UdpSocket.cpp \ src/crc.h src/crc.c \ src/encryption.h src/encryption.c \ src/utils.h src/utils.c \ @@ -27,7 +25,9 @@ odr_sourcecompanion_SOURCES = src/odr-sourcecompanion.cpp \ src/fec/fec.h \ src/fec/init_rs_char.c \ src/fec/init_rs.h \ - src/fec/rs-common.h + src/fec/rs-common.h \ + lib/ThreadsafeQueue.h \ + lib/Socket.h lib/Socket.cpp bin_PROGRAMS = odr-sourcecompanion$(EXEEXT) diff --git a/lib/Socket.cpp b/lib/Socket.cpp new file mode 100644 index 0000000..d14902e --- /dev/null +++ b/lib/Socket.cpp @@ -0,0 +1,894 @@ +/* + Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the + Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#include "Socket.h" + +#include +#include +#include +#include +#include +#include + +namespace Socket { + +using namespace std; + +void InetAddress::resolveUdpDestination(const std::string& destination, int port) +{ + char service[NI_MAXSERV]; + snprintf(service, NI_MAXSERV-1, "%d", port); + + struct addrinfo hints; + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ + hints.ai_flags = 0; + hints.ai_protocol = 0; + + struct addrinfo *result, *rp; + int s = getaddrinfo(destination.c_str(), service, &hints, &result); + if (s != 0) { + throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s)); + } + + for (rp = result; rp != nullptr; rp = rp->ai_next) { + // Take the first result + memcpy(&addr, rp->ai_addr, rp->ai_addrlen); + break; + } + + freeaddrinfo(result); + + if (rp == nullptr) { + throw runtime_error("Could not resolve"); + } +} + +UDPPacket::UDPPacket() { } + +UDPPacket::UDPPacket(size_t initSize) : + buffer(initSize) +{ } + + +UDPSocket::UDPSocket() : + m_sock(INVALID_SOCKET) +{ + reinit(0, ""); +} + +UDPSocket::UDPSocket(int port) : + m_sock(INVALID_SOCKET) +{ + reinit(port, ""); +} + +UDPSocket::UDPSocket(int port, const std::string& name) : + m_sock(INVALID_SOCKET) +{ + reinit(port, name); +} + + +void UDPSocket::setBlocking(bool block) +{ + int res = fcntl(m_sock, F_SETFL, block ? 0 : O_NONBLOCK); + if (res == -1) { + throw runtime_error(string("Can't change blocking state of socket: ") + strerror(errno)); + } +} + +void UDPSocket::reinit(int port) +{ + return reinit(port, ""); +} + +void UDPSocket::reinit(int port, const std::string& name) +{ + if (m_sock != INVALID_SOCKET) { + ::close(m_sock); + } + + if (port == 0) { + // No need to bind to a given port, creating the + // socket is enough + m_sock = ::socket(AF_INET, SOCK_DGRAM, 0); + return; + } + + char service[NI_MAXSERV]; + snprintf(service, NI_MAXSERV-1, "%d", port); + + struct addrinfo hints; + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ + hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ + hints.ai_protocol = 0; /* Any protocol */ + hints.ai_canonname = nullptr; + hints.ai_addr = nullptr; + hints.ai_next = nullptr; + + struct addrinfo *result, *rp; + int s = getaddrinfo(name.empty() ? nullptr : name.c_str(), + port == 0 ? nullptr : service, + &hints, &result); + if (s != 0) { + throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s)); + } + + /* getaddrinfo() returns a list of address structures. + Try each address until we successfully bind(2). + If socket(2) (or bind(2)) fails, we (close the socket + and) try the next address. */ + for (rp = result; rp != nullptr; rp = rp->ai_next) { + int sfd = ::socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sfd == -1) { + continue; + } + + if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { + m_sock = sfd; + break; + } + + ::close(sfd); + } + + freeaddrinfo(result); + + if (rp == nullptr) { + throw runtime_error("Could not bind"); + } +} + +void UDPSocket::close() +{ + if (m_sock != INVALID_SOCKET) { + ::close(m_sock); + } + + m_sock = INVALID_SOCKET; +} + +UDPSocket::~UDPSocket() +{ + if (m_sock != INVALID_SOCKET) { + ::close(m_sock); + } +} + + +UDPPacket UDPSocket::receive(size_t max_size) +{ + UDPPacket packet(max_size); + socklen_t addrSize; + addrSize = sizeof(*packet.address.as_sockaddr()); + ssize_t ret = recvfrom(m_sock, + packet.buffer.data(), + packet.buffer.size(), + 0, + packet.address.as_sockaddr(), + &addrSize); + + if (ret == SOCKET_ERROR) { + packet.buffer.resize(0); + + // This suppresses the -Wlogical-op warning +#if EAGAIN == EWOULDBLOCK + if (errno == EAGAIN) { +#else + if (errno == EAGAIN or errno == EWOULDBLOCK) { +#endif + return 0; + } + throw runtime_error(string("Can't receive data: ") + strerror(errno)); + } + + packet.buffer.resize(ret); + return packet; +} + +void UDPSocket::send(UDPPacket& packet) +{ + const int ret = sendto(m_sock, packet.buffer.data(), packet.buffer.size(), 0, + packet.address.as_sockaddr(), sizeof(*packet.address.as_sockaddr())); + if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { + throw runtime_error(string("Can't send UDP packet: ") + strerror(errno)); + } +} + + +void UDPSocket::send(const std::vector& data, InetAddress destination) +{ + const int ret = sendto(m_sock, data.data(), data.size(), 0, + destination.as_sockaddr(), sizeof(*destination.as_sockaddr())); + if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { + throw runtime_error(string("Can't send UDP packet: ") + strerror(errno)); + } +} + +void UDPSocket::joinGroup(const char* groupname, const char* if_addr) +{ + ip_mreqn group; + if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { + throw runtime_error("Cannot convert multicast group name"); + } + if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { + throw runtime_error("Group name is not a multicast address"); + } + + if (if_addr) { + group.imr_address.s_addr = inet_addr(if_addr); + } + else { + group.imr_address.s_addr = htons(INADDR_ANY); + } + group.imr_ifindex = 0; + if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) + == SOCKET_ERROR) { + throw runtime_error(string("Can't join multicast group") + strerror(errno)); + } +} + +void UDPSocket::setMulticastSource(const char* source_addr) +{ + struct in_addr addr; + if (inet_aton(source_addr, &addr) == 0) { + throw runtime_error(string("Can't parse source address") + strerror(errno)); + } + + if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) + == SOCKET_ERROR) { + throw runtime_error(string("Can't set source address") + strerror(errno)); + } +} + +void UDPSocket::setMulticastTTL(int ttl) +{ + if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) + == SOCKET_ERROR) { + throw runtime_error(string("Can't set multicast ttl") + strerror(errno)); + } +} + +UDPReceiver::UDPReceiver() { } + +UDPReceiver::~UDPReceiver() { + m_stop = true; + m_sock.close(); + if (m_thread.joinable()) { + m_thread.join(); + } +} + +void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) { + m_port = port; + m_bindto = bindto; + m_mcastaddr = mcastaddr; + m_max_packets_queued = max_packets_queued; + m_thread = std::thread(&UDPReceiver::m_run, this); +} + +std::vector UDPReceiver::get_packet_buffer() +{ + if (m_stop) { + throw runtime_error("UDP Receiver not running"); + } + + UDPPacket p; + m_packets.wait_and_pop(p); + + return p.buffer; +} + +void UDPReceiver::m_run() +{ + // Ensure that stop is set to true in case of exception or return + struct SetStopOnDestruct { + SetStopOnDestruct(atomic& stop) : m_stop(stop) {} + ~SetStopOnDestruct() { m_stop = true; } + private: atomic& m_stop; + } autoSetStop(m_stop); + + if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) { + m_sock.reinit(m_port, m_mcastaddr); + m_sock.setMulticastSource(m_bindto.c_str()); + m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str()); + } + else { + m_sock.reinit(m_port, m_bindto); + } + + while (not m_stop) { + constexpr size_t packsize = 8192; + try { + auto packet = m_sock.receive(packsize); + if (packet.buffer.size() == packsize) { + // TODO replace fprintf + fprintf(stderr, "Warning, possible UDP truncation\n"); + } + + // If this blocks, the UDP socket will lose incoming packets + m_packets.push_wait_if_full(packet, m_max_packets_queued); + } + catch (const std::runtime_error& e) { + // TODO replace fprintf + // TODO handle intr + fprintf(stderr, "Socket error: %s\n", e.what()); + m_stop = true; + } + } +} + + +TCPSocket::TCPSocket() +{ +} + +TCPSocket::~TCPSocket() +{ + if (m_sock != -1) { + ::close(m_sock); + } +} + +TCPSocket::TCPSocket(TCPSocket&& other) : + m_sock(other.m_sock), + m_remote_address(move(other.m_remote_address)) +{ + if (other.m_sock != -1) { + other.m_sock = -1; + } +} + +TCPSocket& TCPSocket::operator=(TCPSocket&& other) +{ + swap(m_remote_address, other.m_remote_address); + + m_sock = other.m_sock; + if (other.m_sock != -1) { + other.m_sock = -1; + } + + return *this; +} + +bool TCPSocket::valid() const +{ + return m_sock != -1; +} + +void TCPSocket::connect(const std::string& hostname, int port) +{ + if (m_sock != INVALID_SOCKET) { + throw std::logic_error("You may only connect an invalid TCPSocket"); + } + + char service[NI_MAXSERV]; + snprintf(service, NI_MAXSERV-1, "%d", port); + + /* Obtain address(es) matching host/port */ + struct addrinfo hints; + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = 0; + hints.ai_protocol = 0; + + struct addrinfo *result, *rp; + int s = getaddrinfo(hostname.c_str(), service, &hints, &result); + if (s != 0) { + throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s)); + } + + /* getaddrinfo() returns a list of address structures. + Try each address until we successfully connect(2). + If socket(2) (or connect(2)) fails, we (close the socket + and) try the next address. */ + + for (rp = result; rp != nullptr; rp = rp->ai_next) { + int sfd = ::socket(rp->ai_family, rp->ai_socktype, + rp->ai_protocol); + if (sfd == -1) + continue; + + int ret = ::connect(sfd, rp->ai_addr, rp->ai_addrlen); + if (ret != -1 or (ret == -1 and errno == EINPROGRESS)) { + // As the TCPClient could set the socket to nonblocking, we + // must handle EINPROGRESS here + m_sock = sfd; + break; + } + + ::close(sfd); + } + + if (m_sock != INVALID_SOCKET) { +#if defined(HAVE_SO_NOSIGPIPE) + int val = 1; + if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)) + == SOCKET_ERROR) { + throw std::runtime_error("Can't set SO_NOSIGPIPE"); + } +#endif + } + + freeaddrinfo(result); /* No longer needed */ + + if (rp == nullptr) { + throw runtime_error("Could not connect"); + } + +} + +void TCPSocket::listen(int port, const string& name) +{ + if (m_sock != INVALID_SOCKET) { + throw std::logic_error("You may only listen with an invalid TCPSocket"); + } + + char service[NI_MAXSERV]; + snprintf(service, NI_MAXSERV-1, "%d", port); + + struct addrinfo hints; + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ + hints.ai_protocol = 0; + hints.ai_canonname = nullptr; + hints.ai_addr = nullptr; + hints.ai_next = nullptr; + + struct addrinfo *result, *rp; + int s = getaddrinfo(name.empty() ? nullptr : name.c_str(), service, &hints, &result); + if (s != 0) { + throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s)); + } + + /* getaddrinfo() returns a list of address structures. + Try each address until we successfully bind(2). + If socket(2) (or bind(2)) fails, we (close the socket + and) try the next address. */ + for (rp = result; rp != nullptr; rp = rp->ai_next) { + int sfd = ::socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sfd == -1) { + continue; + } + + if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { + m_sock = sfd; + break; + } + + ::close(sfd); + } + + freeaddrinfo(result); + + if (m_sock != INVALID_SOCKET) { +#if defined(HAVE_SO_NOSIGPIPE) + int val = 1; + if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, + &val, sizeof(val)) < 0) { + throw std::runtime_error("Can't set SO_NOSIGPIPE"); + } +#endif + + int ret = ::listen(m_sock, 0); + if (ret == -1) { + throw std::runtime_error(string("Could not listen: ") + strerror(errno)); + } + } + + if (rp == nullptr) { + throw runtime_error("Could not bind"); + } +} + +void TCPSocket::close() +{ + ::close(m_sock); + m_sock = -1; +} + +TCPSocket TCPSocket::accept(int timeout_ms) +{ + if (timeout_ms == 0) { + InetAddress remote_addr; + socklen_t client_len = sizeof(remote_addr.addr); + int sockfd = ::accept(m_sock, remote_addr.as_sockaddr(), &client_len); + TCPSocket s(sockfd, remote_addr); + return s; + } + else { + struct pollfd fds[1]; + fds[0].fd = m_sock; + fds[0].events = POLLIN; + + int retval = poll(fds, 1, timeout_ms); + + if (retval == -1) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP Socket accept error: " + errstr); + } + else if (retval > 0) { + InetAddress remote_addr; + socklen_t client_len = sizeof(remote_addr.addr); + int sockfd = ::accept(m_sock, remote_addr.as_sockaddr(), &client_len); + TCPSocket s(sockfd, remote_addr); + return s; + } + else { + TCPSocket s(-1); + return s; + } + } +} + +ssize_t TCPSocket::sendall(const void *buffer, size_t buflen) +{ + uint8_t *buf = (uint8_t*)buffer; + while (buflen > 0) { + /* On Linux, the MSG_NOSIGNAL flag ensures that the process + * would not receive a SIGPIPE and die. + * Other systems have SO_NOSIGPIPE set on the socket for the + * same effect. */ +#if defined(HAVE_MSG_NOSIGNAL) + const int flags = MSG_NOSIGNAL; +#else + const int flags = 0; +#endif + ssize_t sent = ::send(m_sock, buf, buflen, flags); + if (sent < 0) { + return -1; + } + else { + buf += sent; + buflen -= sent; + } + } + return buflen; +} + +ssize_t TCPSocket::send(const void* data, size_t size, int timeout_ms) +{ + if (timeout_ms) { + struct pollfd fds[1]; + fds[0].fd = m_sock; + fds[0].events = POLLOUT; + + const int retval = poll(fds, 1, timeout_ms); + + if (retval == -1) { + throw std::runtime_error(string("TCP Socket send error on poll(): ") + strerror(errno)); + } + else if (retval == 0) { + // Timed out + return 0; + } + } + + /* On Linux, the MSG_NOSIGNAL flag ensures that the process would not + * receive a SIGPIPE and die. + * Other systems have SO_NOSIGPIPE set on the socket for the same effect. */ +#if defined(HAVE_MSG_NOSIGNAL) + const int flags = MSG_NOSIGNAL; +#else + const int flags = 0; +#endif + const ssize_t ret = ::send(m_sock, (const char*)data, size, flags); + + if (ret == SOCKET_ERROR) { + throw std::runtime_error(string("TCP Socket send error: ") + strerror(errno)); + } + return ret; +} + +ssize_t TCPSocket::recv(void *buffer, size_t length, int flags) +{ + ssize_t ret = ::recv(m_sock, buffer, length, flags); + if (ret == -1) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP receive error: " + errstr); + } + return ret; +} + +ssize_t TCPSocket::recv(void *buffer, size_t length, int flags, int timeout_ms) +{ + struct pollfd fds[1]; + fds[0].fd = m_sock; + fds[0].events = POLLIN; + + int retval = poll(fds, 1, timeout_ms); + + if (retval == -1 and errno == EINTR) { + throw Interrupted(); + } + else if (retval == -1) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP receive with poll() error: " + errstr); + } + else if (retval > 0 and (fds[0].revents | POLLIN)) { + ssize_t ret = ::recv(m_sock, buffer, length, flags); + if (ret == -1) { + if (errno == ECONNREFUSED) { + return 0; + } + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP receive after poll() error: " + errstr); + } + return ret; + } + else { + throw Timeout(); + } +} + +TCPSocket::TCPSocket(int sockfd) : + m_sock(sockfd), + m_remote_address() +{ } + +TCPSocket::TCPSocket(int sockfd, InetAddress remote_address) : + m_sock(sockfd), + m_remote_address(remote_address) +{ } + +void TCPClient::connect(const std::string& hostname, int port) +{ + m_hostname = hostname; + m_port = port; + reconnect(); +} + +ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms) +{ + try { + ssize_t ret = m_sock.recv(buffer, length, flags, timeout_ms); + + if (ret == 0) { + m_sock.close(); + + TCPSocket newsock; + m_sock = std::move(newsock); + reconnect(); + } + + return ret; + } + catch (const TCPSocket::Interrupted&) { + return -1; + } + catch (const TCPSocket::Timeout&) { + return 0; + } + + return 0; +} + +void TCPClient::reconnect() +{ + int flags = fcntl(m_sock.m_sock, F_GETFL); + if (fcntl(m_sock.m_sock, F_SETFL, flags | O_NONBLOCK) == -1) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr); + } + + m_sock.connect(m_hostname, m_port); +} + +TCPConnection::TCPConnection(TCPSocket&& sock) : + queue(), + m_running(true), + m_sender_thread(), + m_sock(move(sock)) +{ +#if MISSING_OWN_ADDR + auto own_addr = m_sock.getOwnAddress(); + auto addr = m_sock.getRemoteAddress(); + etiLog.level(debug) << "New TCP Connection on port " << + own_addr.getPort() << " from " << + addr.getHostAddress() << ":" << addr.getPort(); +#endif + m_sender_thread = std::thread(&TCPConnection::process, this); +} + +TCPConnection::~TCPConnection() +{ + m_running = false; + vector termination_marker; + queue.push(termination_marker); + m_sender_thread.join(); +} + +void TCPConnection::process() +{ + while (m_running) { + vector data; + queue.wait_and_pop(data); + + if (data.empty()) { + // empty vector is the termination marker + m_running = false; + break; + } + + try { + ssize_t remaining = data.size(); + const uint8_t *buf = reinterpret_cast(data.data()); + const int timeout_ms = 10; // Less than one ETI frame + + while (m_running and remaining > 0) { + const ssize_t sent = m_sock.send(buf, remaining, timeout_ms); + if (sent < 0 or sent > remaining) { + throw std::logic_error("Invalid TCPSocket::send() return value"); + } + remaining -= sent; + buf += sent; + } + } + catch (const std::runtime_error& e) { + m_running = false; + } + } + +#if MISSING_OWN_ADDR + auto own_addr = m_sock.getOwnAddress(); + auto addr = m_sock.getRemoteAddress(); + etiLog.level(debug) << "Dropping TCP Connection on port " << + own_addr.getPort() << " from " << + addr.getHostAddress() << ":" << addr.getPort(); +#endif +} + + +TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) : + m_max_queue_size(max_queue_size) +{ +} + +TCPDataDispatcher::~TCPDataDispatcher() +{ + m_running = false; + m_connections.clear(); + m_listener_socket.close(); + if (m_listener_thread.joinable()) { + m_listener_thread.join(); + } +} + +void TCPDataDispatcher::start(int port, const string& address) +{ + m_listener_socket.listen(port, address); + + m_running = true; + m_listener_thread = std::thread(&TCPDataDispatcher::process, this); +} + +void TCPDataDispatcher::write(const vector& data) +{ + if (not m_running) { + throw runtime_error(m_exception_data); + } + + for (auto& connection : m_connections) { + connection.queue.push(data); + } + + m_connections.remove_if( + [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; }); +} + +void TCPDataDispatcher::process() +{ + try { + const int timeout_ms = 1000; + + while (m_running) { + // Add a new TCPConnection to the list, constructing it from the client socket + auto sock = m_listener_socket.accept(timeout_ms); + if (sock.valid()) { + m_connections.emplace(m_connections.begin(), move(sock)); + } + } + } + catch (const std::runtime_error& e) { + m_exception_data = string("TCPDataDispatcher error: ") + e.what(); + m_running = false; + } +} + +TCPReceiveServer::TCPReceiveServer(size_t blocksize) : + m_blocksize(blocksize) +{ +} + +void TCPReceiveServer::start(int listen_port, const std::string& address) +{ + m_listener_socket.listen(listen_port, address); + + m_running = true; + m_listener_thread = std::thread(&TCPReceiveServer::process, this); +} + +TCPReceiveServer::~TCPReceiveServer() +{ + m_running = false; + if (m_listener_thread.joinable()) { + m_listener_thread.join(); + } +} + +vector TCPReceiveServer::receive() +{ + vector buffer; + m_queue.try_pop(buffer); + + // we can ignore try_pop()'s return value, because + // if it is unsuccessful the buffer is not touched. + return buffer; +} + +void TCPReceiveServer::process() +{ + constexpr int timeout_ms = 1000; + constexpr int disconnect_timeout_ms = 10000; + constexpr int max_num_timeouts = disconnect_timeout_ms / timeout_ms; + + while (m_running) { + auto sock = m_listener_socket.accept(timeout_ms); + + int num_timeouts = 0; + + while (m_running and sock.valid()) { + try { + vector buf(m_blocksize); + ssize_t r = sock.recv(buf.data(), buf.size(), 0, timeout_ms); + if (r < 0) { + throw logic_error("Invalid recv return value"); + } + else { + buf.resize(r); + m_queue.push(move(buf)); + } + } + catch (const TCPSocket::Interrupted&) { + break; + } + catch (const TCPSocket::Timeout&) { + num_timeouts++; + } + + if (num_timeouts > max_num_timeouts) { + sock.close(); + } + } + } +} + +} diff --git a/lib/Socket.h b/lib/Socket.h new file mode 100644 index 0000000..8bb7fe1 --- /dev/null +++ b/lib/Socket.h @@ -0,0 +1,294 @@ +/* + Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the + Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#pragma once + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include "ThreadsafeQueue.h" +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#define SOCKET int +#define INVALID_SOCKET -1 +#define SOCKET_ERROR -1 + + +namespace Socket { + +struct InetAddress { + struct sockaddr_storage addr; + + struct sockaddr *as_sockaddr() { return reinterpret_cast(&addr); }; + + void resolveUdpDestination(const std::string& destination, int port); +}; + +/** This class represents a UDP packet. + * + * A UDP packet contains a payload (sequence of bytes) and an address. For + * outgoing packets, the address is the destination address. For incoming + * packets, the address tells the user from what source the packet arrived from. + */ +class UDPPacket +{ + public: + UDPPacket(); + UDPPacket(size_t initSize); + + std::vector buffer; + InetAddress address; +}; + +/** + * This class represents a socket for sending and receiving UDP packets. + * + * A UDP socket is the sending or receiving point for a packet delivery service. + * Each packet sent or received on a datagram socket is individually + * addressed and routed. Multiple packets sent from one machine to another may + * be routed differently, and may arrive in any order. + */ +class UDPSocket +{ + public: + /** Create a new socket that will not be bound to any port. To be used + * for data output. + */ + UDPSocket(); + /** Create a new socket. + * @param port The port number on which the socket will be bound + */ + UDPSocket(int port); + /** Create a new socket. + * @param port The port number on which the socket will be bound + * @param name The IP address on which the socket will be bound. + * It is used to bind the socket on a specific interface if + * the computer have many NICs. + */ + UDPSocket(int port, const std::string& name); + ~UDPSocket(); + UDPSocket(const UDPSocket& other) = delete; + const UDPSocket& operator=(const UDPSocket& other) = delete; + + /** Close the already open socket, and create a new one. Throws a runtime_error on error. */ + void reinit(int port); + void reinit(int port, const std::string& name); + + void close(void); + void send(UDPPacket& packet); + void send(const std::vector& data, InetAddress destination); + UDPPacket receive(size_t max_size); + void joinGroup(const char* groupname, const char* if_addr = nullptr); + void setMulticastSource(const char* source_addr); + void setMulticastTTL(int ttl); + + /** Set blocking mode. By default, the socket is blocking. + * throws a runtime_error on error. + */ + void setBlocking(bool block); + + protected: + SOCKET m_sock; +}; + +/* Threaded UDP receiver */ +class UDPReceiver { + public: + UDPReceiver(); + ~UDPReceiver(); + UDPReceiver(const UDPReceiver&) = delete; + UDPReceiver operator=(const UDPReceiver&) = delete; + + // Start the receiver in a separate thread + void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued); + + // Get the data contained in a UDP packet, blocks if none available + // In case of error, throws a runtime_error + std::vector get_packet_buffer(void); + + private: + void m_run(void); + + int m_port = 0; + std::string m_bindto; + std::string m_mcastaddr; + size_t m_max_packets_queued = 1; + std::thread m_thread; + std::atomic m_stop = ATOMIC_VAR_INIT(false); + ThreadsafeQueue m_packets; + UDPSocket m_sock; +}; + +class TCPSocket { + public: + TCPSocket(); + ~TCPSocket(); + TCPSocket(const TCPSocket& other) = delete; + TCPSocket& operator=(const TCPSocket& other) = delete; + TCPSocket(TCPSocket&& other); + TCPSocket& operator=(TCPSocket&& other); + + bool valid(void) const; + void connect(const std::string& hostname, int port); + void listen(int port, const std::string& name); + void close(void); + + /* throws a runtime_error on failure, an invalid socket on timeout */ + TCPSocket accept(int timeout_ms); + + /* returns -1 on error, doesn't work on nonblocking sockets */ + ssize_t sendall(const void *buffer, size_t buflen); + + /** Send data over the TCP connection. + * @param data The buffer that will be sent. + * @param size Number of bytes to send. + * @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout + * return number of bytes sent, 0 on timeout, or throws runtime_error. + */ + ssize_t send(const void* data, size_t size, int timeout_ms=0); + + /* Returns number of bytes read, 0 on disconnect. Throws a + * runtime_error on error */ + ssize_t recv(void *buffer, size_t length, int flags); + + class Timeout {}; + class Interrupted {}; + /* Returns number of bytes read, 0 on disconnect or refused connection. + * Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error + * on error + */ + ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms); + + private: + explicit TCPSocket(int sockfd); + explicit TCPSocket(int sockfd, InetAddress remote_address); + SOCKET m_sock = -1; + + InetAddress m_remote_address; + + friend class TCPClient; +}; + +/* Implements a TCP receiver that auto-reconnects on errors */ +class TCPClient { + public: + void connect(const std::string& hostname, int port); + + /* Returns numer of bytes read, 0 on auto-reconnect, -1 + * on interruption. + * Throws a runtime_error on error */ + ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms); + + private: + void reconnect(void); + TCPSocket m_sock; + std::string m_hostname; + int m_port; +}; + +/* Helper class for TCPDataDispatcher, contains a queue of pending data and + * a sender thread. */ +class TCPConnection +{ + public: + TCPConnection(TCPSocket&& sock); + TCPConnection(const TCPConnection&) = delete; + TCPConnection& operator=(const TCPConnection&) = delete; + ~TCPConnection(); + + ThreadsafeQueue > queue; + + private: + std::atomic m_running; + std::thread m_sender_thread; + TCPSocket m_sock; + + void process(void); +}; + +/* Send a TCP stream to several destinations, and automatically disconnect destinations + * whose buffer overflows. + */ +class TCPDataDispatcher +{ + public: + TCPDataDispatcher(size_t max_queue_size); + ~TCPDataDispatcher(); + TCPDataDispatcher(const TCPDataDispatcher&) = delete; + TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete; + + void start(int port, const std::string& address); + void write(const std::vector& data); + + private: + void process(); + + size_t m_max_queue_size; + + std::atomic m_running; + std::string m_exception_data; + std::thread m_listener_thread; + TCPSocket m_listener_socket; + std::list m_connections; +}; + +/* A TCP Server to receive data, which abstracts the handling of connects and disconnects. + */ +class TCPReceiveServer { + public: + TCPReceiveServer(size_t blocksize); + ~TCPReceiveServer(); + TCPReceiveServer(const TCPReceiveServer&) = delete; + TCPReceiveServer& operator=(const TCPReceiveServer&) = delete; + + void start(int listen_port, const std::string& address); + + // Return a vector that contains up to blocksize bytes of data, or + // and empty vector if no data is available. + std::vector receive(); + + private: + void process(); + + size_t m_blocksize = 0; + ThreadsafeQueue > m_queue; + std::atomic m_running; + std::string m_exception_data; + std::thread m_listener_thread; + TCPSocket m_listener_socket; +}; + +} diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h new file mode 100644 index 0000000..62f4c96 --- /dev/null +++ b/lib/ThreadsafeQueue.h @@ -0,0 +1,176 @@ +/* + Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in + Right of Canada (Communications Research Center Canada) + + Copyright (C) 2018 + Matthias P. Braendli, matthias.braendli@mpb.li + + An implementation for a threadsafe queue, depends on C++11 + + When creating a ThreadsafeQueue, one can specify the minimal number + of elements it must contain before it is possible to take one + element out. + */ +/* + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ + +#pragma once + +#include +#include +#include +#include + +/* This queue is meant to be used by two threads. One producer + * that pushes elements into the queue, and one consumer that + * retrieves the elements. + * + * The queue can make the consumer block until an element + * is available, or a wakeup requested. + */ + +/* Class thrown by blocking pop to tell the consumer + * that there's a wakeup requested. */ +class ThreadsafeQueueWakeup {}; + +template +class ThreadsafeQueue +{ +public: + /* Push one element into the queue, and notify another thread that + * might be waiting. + * + * returns the new queue size. + */ + size_t push(T const& val) + { + std::unique_lock lock(the_mutex); + the_queue.push(val); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + size_t push(T&& val) + { + std::unique_lock lock(the_mutex); + the_queue.emplace(std::move(val)); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + /* Push one element into the queue, but wait until the + * queue size goes below the threshold. + * + * Notify waiting thread. + * + * returns the new queue size. + */ + size_t push_wait_if_full(T const& val, size_t threshold) + { + std::unique_lock lock(the_mutex); + while (the_queue.size() >= threshold) { + the_tx_notification.wait(lock); + } + the_queue.push(val); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + /* Trigger a wakeup event on a blocking consumer, which + * will receive a ThreadsafeQueueWakeup exception. + */ + void trigger_wakeup(void) + { + std::unique_lock lock(the_mutex); + wakeup_requested = true; + lock.unlock(); + the_rx_notification.notify_one(); + } + + /* Send a notification for the receiver thread */ + void notify(void) + { + the_rx_notification.notify_one(); + } + + bool empty() const + { + std::unique_lock lock(the_mutex); + return the_queue.empty(); + } + + size_t size() const + { + std::unique_lock lock(the_mutex); + return the_queue.size(); + } + + bool try_pop(T& popped_value) + { + std::unique_lock lock(the_mutex); + if (the_queue.empty()) { + return false; + } + + popped_value = the_queue.front(); + the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); + + return true; + } + + void wait_and_pop(T& popped_value, size_t prebuffering = 1) + { + std::unique_lock lock(the_mutex); + while (the_queue.size() < prebuffering and + not wakeup_requested) { + the_rx_notification.wait(lock); + } + + if (wakeup_requested) { + wakeup_requested = false; + throw ThreadsafeQueueWakeup(); + } + else { + std::swap(popped_value, the_queue.front()); + the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); + } + } + +private: + std::queue the_queue; + mutable std::mutex the_mutex; + std::condition_variable the_rx_notification; + std::condition_variable the_tx_notification; + bool wakeup_requested = false; +}; + diff --git a/src/AVTEDIInput.cpp b/src/AVTEDIInput.cpp deleted file mode 100644 index f8a9e60..0000000 --- a/src/AVTEDIInput.cpp +++ /dev/null @@ -1,738 +0,0 @@ -/* ------------------------------------------------------------------ - * Copyright (C) 2017 AVT GmbH - Fabien Vercasson - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. - * See the License for the specific language governing permissions - * and limitations under the License. - * ------------------------------------------------------------------- - */ - -#include "AVTEDIInput.h" -#include -#include -#include -#include - -#include "crc.h" -#include "OrderedQueue.h" - -extern "C" { -#include -} - -#define SUBCH_QUEUE_SIZE (50) /* In 24ms frames. Intermediate buffer */ - -#define RS_DECODE 1 /* Set to 0 to disable rs decoding */ -#define RS_TEST1 0 /* Remove one fragment on each PFT */ -#define RS_TEST2 0 /* Remove regularily fragments */ -#define RS_TEST2_NBDROP 3 /* For RS_TEST2, nb packet remove on each time */ - -#define PRINTF(fmt, A...) fprintf(stderr, fmt, ##A) -//#define PRINTF(x ...) -#define INFO(fmt, A...) fprintf(stderr, "AVT EDI: " fmt, ##A) -//#define DEBUG(fmt, A...) fprintf(stderr, "AVT EDI: " fmt, ##A) -#define DEBUG(X...) -#define ERROR(fmt, A...) fprintf(stderr, "AVT EDI: ERROR " fmt, ##A) - -static int hideFirstPFTErrors = 30; /* Hide the errors that can occurs on */ - /* the first PFT, as they are likely incomplete */ - -#define TAG_NAME_DETI (('d'<<24)|('e'<<16)|('t'<<8)|('i')) -#define TAG_NAME_EST (('e'<<24)|('s'<<16)|('t'<<8)) - -/* ------------------------------------------------------------------ -static void _dump(const uint8_t* buf, int size) -{ - for( int i = 0 ; i < size ; i ++) - { - PRINTF("%02X ", buf[i]); - if( (i+1) % 16 == 0 ) PRINTF("\n"); - } - if( size % 16 != 0 ) PRINTF("\n"); -} -*/ - -/* ------------------------------------------------------------------ - * - */ -static uint32_t unpack2(const uint8_t* buf) -{ - return( buf[0] << 8 | - buf[1]); -} - -/* ------------------------------------------------------------------ - * - */ -static uint32_t unpack3(const uint8_t* buf) -{ - return( buf[0] << 16 | - buf[1] << 8 | - buf[2]); -} - -/* ------------------------------------------------------------------ - * - */ -static uint32_t unpack4(const uint8_t* buf) -{ - return( buf[0] << 24 | - buf[1] << 16 | - buf[2] << 8 | - buf[3]); -} - -/* ------------------------------------------------------------------ - * bitpos 0 : left most bit. - * - */ -static uint32_t unpack1bit(uint8_t byte, int bitpos) -{ - return (byte & 1 << (7-bitpos)) > (7-bitpos); -} - - -/* ------------------------------------------------------------------ - * - */ -static bool _checkCRC(uint8_t* buf, size_t length) -{ - if (length <= 2) return false; - - uint16_t CRC = unpack2(buf+length-2); - - uint16_t crc = 0xffff; - crc = crc16(crc, buf, length-2); - crc ^= 0xffff; - - return (CRC == crc); -} - -/* ------------------------------------------------------------------ - * - */ -AVTEDIInput::AVTEDIInput(uint32_t fragmentTimeoutMs) - : _fragmentTimeoutMs(fragmentTimeoutMs) -{ - _subChannelQueue = new OrderedQueue(5000, SUBCH_QUEUE_SIZE); -} - -/* ------------------------------------------------------------------ - * - */ -AVTEDIInput::~AVTEDIInput() -{ - PFTIterator it = _pft.begin(); - while (it != _pft.end()) { - delete it->second; - it++; - } - delete _subChannelQueue; -} - -/* ------------------------------------------------------------------ - * - */ -bool AVTEDIInput::pushData(uint8_t* buf, size_t length) -{ - bool identified = false; - - if (length >= 12 && buf[0] == 'P' && buf[1] == 'F') - { - -#if RS_TEST2 - static int count=0; - if (++count%1421= 10 && buf[0] == 'A' && buf[1] == 'F') - { - identified = _pushAF(buf, length, false); - } - return identified; -} - -/* ------------------------------------------------------------------ - * - */ -size_t AVTEDIInput::popFrame(std::vector& data, int32_t& frameNumber) -{ - return _subChannelQueue->pop(data, &frameNumber); -} - -/* ------------------------------------------------------------------ - * - */ -bool AVTEDIInput::_pushPFTFrag(uint8_t* buf, size_t length) -{ - PFTFrag* frag = new PFTFrag(buf, length); - bool isValid = frag->isValid(); - if (!isValid) { - delete frag; - } else { - // Find PFT - PFT* pft = NULL; - PFTIterator it = _pft.find(frag->Pseq()); - if (it != _pft.end()) { - pft = it->second; - } else { - // create PFT is new - pft = new PFT(frag->Pseq(), frag->Fcount()); - if (_pft.insert(std::make_pair(frag->Pseq(), pft)).second == false) - { - // Not inserted - delete pft; - pft = NULL; - } - it = _pft.find(frag->Pseq()); - } - - if (pft) { - // Add frag to PFT - pft->pushPFTFrag(frag); - - // If the PFT is complete, extract the AF - if (pft->complete()) { - std::vector af; - bool ok = pft->extractAF(af); - - if (ok) { - _pushAF(af.data(), af.size(), ok); - } else { - ERROR("AF Frame Corrupted, Size=%zu\n", af.size()); - //_dump(af.data(), 10); - } - - _pft.erase(it); - delete pft; - } - } - } - - // Check old incomplete PFT to either try to extract AF or discard it - // TODO - const auto now = std::chrono::steady_clock::now(); - const auto timeout_duration = std::chrono::milliseconds(_fragmentTimeoutMs); - - PFTIterator it = _pft.begin(); - while (it != _pft.end()) { - PFT* pft = it->second; - bool erased = false; - if (pft) { - const auto creation = pft->creation(); - const auto diff = now - creation; - if (diff > timeout_duration) { - //DEBUG("PFT timeout\n"); - std::vector af; - bool ok = pft->extractAF(af); - if (ok) { - _pushAF(af.data(), af.size(), ok); - } else { - //ERROR("AF Frame CorruptedSize=%zu\n", af.size()); - //_dump(af.data(), 10); - } - - it = _pft.erase(it); - delete pft; - erased = true; - } - } - if (!erased) ++it; - } - - return isValid; -} - -/* ------------------------------------------------------------------ - * - */ -bool AVTEDIInput::_pushAF(uint8_t* buf, size_t length, bool checked) -{ - bool ok = checked; - - // Check the AF integrity - if (!ok) { - // EDI specific, must have a CRC. - if (length >= 12) { - ok = (buf[0] == 'A' && buf[1] == 'F'); - ok &= _checkCRC(buf, length); - } - } - - int index = 0; - - index += 2; - uint32_t LEN = unpack4(buf+index); index += 4; - ok = (LEN == length-12); - //uint32_t SEQ = unpack2(buf+index); index += 2; - - if (ok) { - uint32_t CF = unpack1bit(buf[index], 0); - uint32_t MAJ = (buf[index]&0x70) >> 4; - uint32_t MIN = (buf[index]&0x0F); - index += 1; - uint32_t PT = buf[index]; index += 1; - - // EDI specific - ok = (CF == 1 && PT == 'T' && MAJ == 1 && MIN == 0); - -// DEBUG("AF Header: LEN=%u SEQ=%u CF=%u MAJ=%u MIN=%u PT=%c ok=%d\n", -// LEN, SEQ, CF, MAJ, MIN, PT, ok); - } - - if (ok) { - // Extract the first stream and FrameCount from AF - int tagIndex = index; - uint32_t frameCount = 0; - bool frameCountFound = false; - int est0Index = 0; - size_t est0Length = 0; - // Iterate through tags - while (tagIndex < (ssize_t)length - 2/*CRC*/ - 8/*Min tag length*/ && (!frameCountFound || est0Index==0) ) - { - uint32_t tagName = unpack4(buf+tagIndex); tagIndex += 4; - uint32_t tagLen = unpack4(buf+tagIndex); tagIndex += 4; - uint32_t tagLenByte = (tagLen+7)/8; -// DEBUG("TAG %c%c%c%c size %u bits %u bytes\n", -// tagName>>24&0xFF, tagName>>16&0xFF, tagName>>8&0xFF, tagName&0xFF, -// tagLen, tagLenByte); - - if (tagName == TAG_NAME_DETI) { - uint32_t FCTH = buf[tagIndex] & 0x1F; - uint32_t FCT = buf[tagIndex+1]; - frameCount = FCTH * 250 + FCT; - frameCountFound = true; -// DEBUG("frameCount=%u\n", frameCount); - } else if ((tagName & 0xFFFFFF00) == TAG_NAME_EST) { - est0Index = tagIndex+3 /*3 bytes SSTC*/; - est0Length = tagLenByte-3; -// DEBUG("Stream found at index %u, size=%zu\n", est0Index, est0Length); - } - - tagIndex += tagLenByte; - } - if (frameCountFound && est0Index !=0) { - _subChannelQueue->push(frameCount, buf+est0Index, est0Length); - } else { - ok = false; - } - } - - return ok; -} - -/* ------------------------------------------------------------------ - * ------------------------------------------------------------------ - * ------------------------------------------------------------------ - * ------------------------------------------------------------------ - */ - -/* ------------------------------------------------------------------ - * - */ -//static int nbPFTFrag = 0; -PFTFrag::PFTFrag(uint8_t* buf, size_t length) -{ - //DEBUG("+ PFTFrag %d\n", ++nbPFTFrag); - _valid = _parse(buf, length); -} - -/* ------------------------------------------------------------------ - * - */ -PFTFrag::~PFTFrag() -{ - //DEBUG("- PFTFrag %d\n", --nbPFTFrag); -} - -/* ------------------------------------------------------------------ - * - */ -bool PFTFrag::_parse(uint8_t* buf, size_t length) -{ - int index = 0; - - // Parse PFT Fragment Header (ETSI TS 102 821 V1.4.1 ch7.1) - index += 2; // Psync - - _Pseq = unpack2(buf+index); index += 2; - _Findex = unpack3(buf+index); index += 3; - _Fcount = unpack3(buf+index); index += 3; - _FEC = unpack1bit(buf[index], 0); - _Addr = unpack1bit(buf[index], 1); - _Plen = unpack2(buf+index) & 0x3FFF; index += 2; - - // Optional RS Header - _RSk = 0; - _RSz = 0; - if (_FEC) { - _RSk = buf[index]; index += 1; - _RSz = buf[index]; index += 1; - } - - // Optional transport header - _Source = 0; - _Dest = 0; - if (_Addr) { - _Source = unpack2(buf+index); index += 2; - _Dest = unpack2(buf+index); index += 2; - } - - index += 2; - bool isValid = (_FEC==0) || _checkCRC(buf, index); - isValid &= length == index + _Plen; - - if (!isValid) { -// DEBUG("PFT isValid=%d Pseq=%u Findex=%u Fcount=%u FEC=%u " -// "Addr=%u Plen=%u", -// isValid, _Pseq, _Findex, _Fcount, _FEC, -// _Addr, _Plen); - if (_FEC) PRINTF(" RSk=%u RSz=%u", _RSk, _RSz); - if (_Addr) PRINTF(" Source=%u Dest=%u", _Source, _Dest); - PRINTF("\n"); - } - - if (isValid) { - _payload.resize(_Plen); - memcpy(_payload.data(), buf+index, _Plen); - } - - return isValid; -} - -/* ------------------------------------------------------------------ - * ------------------------------------------------------------------ - * ------------------------------------------------------------------ - * ------------------------------------------------------------------ - */ -void* PFT::_rs_handler = NULL; - -/* ------------------------------------------------------------------ - * - */ -//static int nbPFT = 0; -PFT::PFT(uint32_t Pseq, uint32_t Fcount) - : _frags(NULL) - , _Pseq(Pseq) - , _Fcount(Fcount) - , _Plen(0) - , _nbFrag(0) - , _RSk(0) - , _RSz(0) - , _cmax(0) - , _rxmin(0) - , _creation(std::chrono::steady_clock::now()) -{ -// DEBUG("+ PFT %d\n", ++nbPFT); - if (Fcount > 0) { - _frags = new PFTFrag* [Fcount]; - memset(_frags, 0, Fcount*sizeof(PFTFrag*)); - } -} - -/* ------------------------------------------------------------------ - * - */ -PFT::~PFT() -{ -// DEBUG("- PFT %d\n", --nbPFT); - if (_frags) { - for (size_t i = 0 ; i < _Fcount ; i++) { - delete _frags[i]; - } - delete [] _frags; - } -} - -/* ------------------------------------------------------------------ - * static - */ -void PFT::_initRSDecoder() -{ -#if RS_DECODE - if (!_rs_handler) { - // From ODR-DabMux: PFT.h/cpp and ReedSolomon.h/cpp - - // Create the RS(k+p,k) encoder - const int firstRoot = 1; // Discovered by analysing EDI dump - const int gfPoly = 0x11d; - - // The encoding has to be 255, 207 always, because the chunk has to - // be padded at the end, and not at the beginning as libfec would - // do - const int N = 255; - const int K = 207; - const int primElem = 1; - const int symsize = 8; - const int nroots = N - K; // For EDI PFT, this must be 48 - const int pad = ((1 << symsize) - 1) - N; // is 255-N - - _rs_handler = init_rs_char(symsize, gfPoly, firstRoot, primElem, nroots, pad); - - -/* TEST RS CODE */ -#if 0 - - // Populate data - uint8_t data[255]; - memset(data, 0x00, 255); - for (int i=0;i<207;i++) data[i] = i%10; - - // Add RS Code - encode_rs_char(_rs_handler, data, data+207); - _dump(data, 255); - - // Disturb data - for (int i=50; i<50+24; i++) data[i]+=0x50; - - // Correct data - int nbErr = decode_rs_char(_rs_handler, data, NULL, 0); - printf("nbErr=%d\n", nbErr); - _dump(data, 255); - - // Check data - for (int i=0;i<207;i++) { - if (data[i] != i%10) { - printf("Error position %d %hhu != %d\n", i, data[i], i%10); - } - } - - // STOP (sorry :-| ) - int* i=0; - *i = 9; -#endif // 0 - } -#endif -} - -/* ------------------------------------------------------------------ - * - */ -void PFT::pushPFTFrag(PFTFrag* frag) -{ - uint32_t Findex = frag->Findex(); -#if RS_TEST1 - if (Findex != 0 && _frags[Findex] == NULL) /* TEST */ -#else - if (_frags[Findex] == NULL) -#endif - { - _frags[Findex] = frag; - _nbFrag++; - - // Calculate the minimum number of fragment necessary to apply FEC - // This can't be done with the last fragment that does may have a smaller size - // ETSI TS 102 821 V1.4.1 ch 7.4.4 - if (_Plen == 0 && (Findex == 0 || Findex < (_Fcount-1))) - { - _Plen = frag->Plen(); - } - - if (_cmax == 0 && frag->FEC() && (Findex == 0 || Findex < (_Fcount-1)) && _Plen>0) - { - _RSk = frag->RSk(); - _RSz = frag->RSz(); - _cmax = (_Fcount*_Plen) / (_RSk+48); - _rxmin = _Fcount - (_cmax*48)/_Plen; - } - } else { - // Already received, delete the fragment - delete frag; - } -} - -/* ------------------------------------------------------------------ - * - */ -bool PFT::complete() -{ -#if RS_TEST1 - return _nbFrag == _Fcount-1; -#else - return _nbFrag == _Fcount; -#endif -} - -/* ------------------------------------------------------------------ - * - */ -bool PFT::_canAttemptToDecode() -{ - if (complete()) return true; - - if (_cmax>0 && _nbFrag >= _rxmin) return true; - - return false; -} - -/* ------------------------------------------------------------------ - * - */ -bool PFT::extractAF(std::vector& afdata) -{ - bool ok = false; -// DEBUG("extractAF from PFT %u. Fcount=%u nbFrag=%u Plen=%u cmax=%u rxmin=%u RSk=%u RSz=%u\n", -// _Pseq, _Fcount, _nbFrag, _Plen, _cmax, _rxmin, _RSk, _RSz); - - if (_canAttemptToDecode()) { - int totCorrectedErr = 0; - - if (_cmax > 0) // FEC present. - { - uint8_t* p_data_w; - uint8_t* p_data_r; - size_t data_len = 0; - - // Re-assemble RS block - uint8_t rs_block[_Plen*_Fcount]; - int eras_pos[_cmax][/*48*/255]; /* 48 theoritically but ... */ - int no_eras[_cmax]; - memset(no_eras, 0, sizeof(no_eras)); - - p_data_w = rs_block; - for (size_t j = 0; j < _Fcount; ++j) { - if (!_frags[j]) // fill with zeros if fragment is missing - { - for (size_t k = 0; k < _Plen; k++) { - size_t pos = k * _Fcount; - p_data_w[pos] = 0x00; - size_t chunk = pos / (_RSk+48); - size_t chunkpos = (pos) % (_RSk+48); - if (chunkpos > _RSk) { - chunkpos += (207-_RSk); - } - eras_pos[chunk][no_eras[chunk]] = chunkpos; - no_eras[chunk]++; - } - } else { - uint8_t* p_data_r = _frags[j]->payload(); - for (size_t k = 0; k < _frags[j]->Plen(); k++) - p_data_w[k * _Fcount] = *p_data_r++; - for (size_t k = _frags[j]->Plen(); k < _Plen; k++) - p_data_w[k * _Fcount] = 0x00; - } - p_data_w++; - } - - // Apply RS Code -#if RS_DECODE - uint8_t rs_chunks[255 * _cmax]; - _initRSDecoder(); - if (_rs_handler) { - size_t k = _RSk; - memset(rs_chunks, 0, sizeof(rs_chunks)); - p_data_w = rs_chunks; - p_data_r = rs_block; - for (size_t j = 0; j < _cmax; j++) { - memcpy(p_data_w, p_data_r, k); - p_data_w += k; - p_data_r += k; - if (k < 207) - memset(p_data_w, 0, 207 - k); - p_data_w += 207 - k; - memcpy(p_data_w, p_data_r, 48); - p_data_w += 48; - p_data_r += 48; - } - - p_data_r = rs_chunks; - for (size_t j = 0 ; j < _cmax && totCorrectedErr != -1 ; j++) { -#if RS_TEST1 || RS_TEST2 - if (no_eras[j]>0) { - DEBUG("RS Chuck %d: %d errors\n", j, no_eras[j]); - } -#endif - int nbErr = decode_rs_char(_rs_handler, p_data_r, eras_pos[j], no_eras[j]); -// int nbErr = decode_rs_char(_rs_handler, p_data_r, NULL, 0); - if (nbErr >= 0) { -#if RS_TEST1 || RS_TEST2 - if (nbErr > 0) DEBUG("RS Chuck %d: %d corrections\n", j, nbErr); -#endif - totCorrectedErr += nbErr; - } else { -#if RS_TEST1 || RS_TEST2 - DEBUG("RS Chuck %d: too many errors\n", j); -#endif - totCorrectedErr = -1; - } - p_data_r += 255; - } -#if RS_TEST1 || RS_TEST2 - if (totCorrectedErr>0) { - DEBUG("RS corrected %d errors in %d chunks\n", totCorrectedErr, _cmax); - } -#endif - } -#endif // RS_DECODE - // Assemble AF frame from rs code - /* --- re-assemble packet from Reed-Solomon block ----------- */ - afdata.resize(_Plen*_Fcount); - p_data_w = afdata.data(); -#if RS_DECODE - p_data_r = rs_chunks; - for (size_t j = 0; j < _cmax; j++) { - memcpy(p_data_w, p_data_r, _RSk); - p_data_w += _RSk; - p_data_r += 255; - data_len += _RSk; - } -#else - p_data_r = rs_block; - for (size_t j = 0; j < _cmax; j++) { - memcpy(p_data_w, p_data_r, _RSk); - p_data_w += _RSk; - p_data_r += _RSk + 48; - data_len += _RSk; - } -#endif // RS_DECODE - data_len -= _RSz; - afdata.resize(data_len); - } else { // No Fec Just assemble packets - afdata.resize(0); - for (size_t j = 0; j < _Fcount; ++j) { - if (_frags[j]) - { - afdata.insert(afdata.end(), - _frags[j]->payloadVector().begin(), _frags[j]->payloadVector().end()); - } - } - } - - // EDI specific, must have a CRC. - if( afdata.size()>=12 ) { - ok = _checkCRC(afdata.data(), afdata.size()); - if (ok && totCorrectedErr > 0) { - if (hideFirstPFTErrors==0) { - INFO("AF reconstructed from %u/%u PFT fragments\n", _nbFrag, _Fcount); - } - } - if (!ok && totCorrectedErr == -1) { - if (hideFirstPFTErrors==0) { - ERROR("Too many errors to reconstruct AF from %u/%u PFT fragments\n", _nbFrag, _Fcount); - } - } - } - } - else { - if (hideFirstPFTErrors==0) { - ERROR("Not enough fragments to reconstruct AF from %u/%u PFT fragments (min=%u)\n", _nbFrag, _Fcount, _rxmin); - } - } - - if( hideFirstPFTErrors > 0 ) hideFirstPFTErrors--; - - return ok; -} diff --git a/src/AVTEDIInput.h b/src/AVTEDIInput.h deleted file mode 100644 index a882278..0000000 --- a/src/AVTEDIInput.h +++ /dev/null @@ -1,188 +0,0 @@ -/* ------------------------------------------------------------------ - * Copyright (C) 2017 AVT GmbH - Fabien Vercasson - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. - * See the License for the specific language governing permissions - * and limitations under the License. - * ------------------------------------------------------------------- - */ - - -/*! \section AVT Input - * - * Extract audio frame from EDI frames produced by AVT encoder. - * - * The EDI frames are not special, it is just assumed that the audio is transported - * into the first stream. - * - * PFT with spreaded packets is supported. TODO - * Error correction is applied TODO - * AF without PFT supported TODO - * Resend not supported - * - * ref: ETSI TS 102 821 V1.4.1 - * ETSI TS 102 693 V1.1.2 - */ - -#ifndef _AVT_EDI_INPUT_ -#define _AVT_EDI_INPUT_ - -#include -#include -#include -#include -#include -#include - -class OrderedQueue; -class PFTFrag; -class PFT; -class EDISubCh; - -/* ------------------------------------------------------------------ - * - */ -class AVTEDIInput -{ - public: - /*\param fragmentTimeoutMs How long to wait for all fragment before applying FEC or dropping old frames*/ - AVTEDIInput(uint32_t fragmentTimeoutMs = 120); - AVTEDIInput(const AVTEDIInput&) = delete; - AVTEDIInput& operator=(const AVTEDIInput&) = delete; - ~AVTEDIInput(); - - /*! Push new data to edi decoder - * \return false is data is not EDI - */ - bool pushData(uint8_t* buf, size_t length); - - /*! Give next available audio frame from EDI - * \return The size of the buffer. 0 if not data available - */ - size_t popFrame(std::vector& data, int32_t& frameNumber); - - private: - uint32_t _fragmentTimeoutMs; - std::map _pft; - typedef std::map::iterator PFTIterator; - - OrderedQueue *_subChannelQueue; - - bool _pushPFTFrag(uint8_t* buf, size_t length); - bool _pushAF(uint8_t* buf, size_t length, bool checked); -}; - -/* ------------------------------------------------------------------ - * - */ -class PFTFrag -{ - public: - PFTFrag(uint8_t* buf, size_t length); - ~PFTFrag(); - PFTFrag(const PFTFrag&) = delete; - PFTFrag& operator=(const PFTFrag&) = delete; - - inline bool isValid() { return _valid; } - inline uint32_t Pseq() { return _Pseq; } - inline uint32_t Findex() { return _Findex; } - inline uint32_t Fcount() { return _Fcount; } - inline uint32_t FEC() { return _FEC; } - inline uint32_t Plen() { return _Plen; } - inline uint32_t RSk() { return _RSk; } - inline uint32_t RSz() { return _RSz; } - inline uint8_t* payload() { return _payload.data(); } - inline const std::vector& payloadVector() - { return _payload; } - - private: - std::vector _payload; - - uint32_t _Pseq; - uint32_t _Findex; - uint32_t _Fcount; - uint32_t _FEC; - uint32_t _Addr; - uint32_t _Plen; - uint32_t _RSk; - uint32_t _RSz; - uint32_t _Source; - uint32_t _Dest; - bool _valid; - - bool _parse(uint8_t* buf, size_t length); -}; - -/* ------------------------------------------------------------------ - * - */ -class PFT -{ - public: - PFT(uint32_t Pseq, uint32_t Fcount); - ~PFT(); - PFT(const PFT&) = delete; - PFT& operator=(const PFT&) = delete; - - /*! the given frag belongs to the PFT class, - *! it will be deleted by the class */ - void pushPFTFrag(PFTFrag* frag); - - /* \return true if all framgnements are received*/ - bool complete(); - - /*! try to build the AF with received fragments. - *! Apply error correction if necessary (missing packets/CRC errors) - * \return true if the AF is completed - */ - bool extractAF(std::vector& afdata); - - inline std::chrono::steady_clock::time_point creation() - { return _creation; } - - private: - PFTFrag** _frags; - uint32_t _Pseq; - uint32_t _Fcount; - uint32_t _Plen; - uint32_t _nbFrag; - uint32_t _RSk; - uint32_t _RSz; - uint32_t _cmax; - uint32_t _rxmin; - - std::chrono::steady_clock::time_point _creation; - - bool _canAttemptToDecode(); - - static void* _rs_handler; - static void _initRSDecoder(); -}; - -/* ------------------------------------------------------------------ - * - */ -class EDISubCh { - public: - EDISubCh(uint8_t* buf, size_t length); - - inline uint32_t frameCount() { return _frameCount; } - inline uint8_t* payload() { return _payload.data(); } - inline const std::vector& payloadVector() - { return _payload; } - - private: - uint32_t _frameCount; - std::vector _payload; -}; - -#endif // _AVT_EDI_INPUT_ diff --git a/src/AVTInput.cpp b/src/AVTInput.cpp index 48b2de1..973ed7b 100644 --- a/src/AVTInput.cpp +++ b/src/AVTInput.cpp @@ -56,24 +56,25 @@ static uint32_t unpack2(const uint8_t* buf) return (buf[0] << 8) | buf[1]; } -AVTInput::AVTInput(const std::string& input_uri, const std::string& output_uri, uint32_t pad_port, size_t jitterBufferSize) - : _input_uri(input_uri), - _output_uri(output_uri), - _pad_port(pad_port), - _jitterBufferSize(jitterBufferSize), - - _output_packet(2048), - _input_pad_packet(2048), - _ordered(5000, _jitterBufferSize), - _lastInfoFrameType(_typeCantExtract) +AVTInput::AVTInput(const std::string& input_uri, + const std::string& output_uri, + uint32_t pad_port, + size_t jitterBufferSize) : + _input_uri(input_uri), + _output_uri(output_uri), + _pad_port(pad_port), + _jitterBufferSize(jitterBufferSize), + + _output_packet(2048), + _input_pad_packet(2048), + _ordered(5000, _jitterBufferSize), + _lastInfoFrameType(_typeCantExtract) { } int AVTInput::prepare(void) { - UdpSocket::init(); - INFO("Open input socket\n"); int ret = _openSocketSrv(&_input_socket, _input_uri.c_str()); @@ -82,7 +83,7 @@ int AVTInput::prepare(void) ret = _openSocketCli(); } - if ( ret == 0 && _pad_port > 0) { + if (ret == 0 && _pad_port > 0) { INFO("Open PAD Port %d\n", _pad_port); char uri[50]; sprintf(uri, "udp://:%d", _pad_port); @@ -105,13 +106,13 @@ int AVTInput::setDabPlusParameters(int bitrate, int channels, int sample_rate, b return 1; } - if ( sample_rate != 48000 && sample_rate != 32000 ) { + if (sample_rate != 48000 && sample_rate != 32000) { ERROR("Bad sample rate for DAB+ (32000,48000)"); return 1; } _dac = sample_rate == 48000 ? AVT_DAC_48 : AVT_DAC_32; - if ( channels != 1 && channels != 2 ) { + if (channels != 1 && channels != 2) { ERROR("Bad channel number for DAB+ (1,2)"); return 1; } @@ -169,7 +170,7 @@ bool AVTInput::_parseURI(const char* uri, std::string& address, long& port) return true; } -int AVTInput::_openSocketSrv(UdpSocket* socket, const char* uri) +int AVTInput::_openSocketSrv(Socket::UDPSocket* socket, const char* uri) { int returnCode = -1; @@ -178,27 +179,13 @@ int AVTInput::_openSocketSrv(UdpSocket* socket, const char* uri) if (_parseURI(uri, address, port)) { returnCode = 0; - if (socket->create(port) == -1) { - fprintf(stderr, "can't set port %li on Udp input (%s: %s)\n", - port, inetErrDesc, inetErrMsg); - returnCode = -1; - } + socket->reinit(port); if (!address.empty()) { - // joinGroup should accept const char* - if (socket->joinGroup((char*)address.c_str()) == -1) { - fprintf(stderr, - "can't join multicast group %s (%s: %s)\n", - address.c_str(), inetErrDesc, inetErrMsg); - returnCode = -1; - } + socket->joinGroup(address.c_str()); } - if (socket->setBlocking(false) == -1) { - fprintf(stderr, "can't set Udp input socket in non-blocking mode " - "(%s: %s)\n", inetErrDesc, inetErrMsg); - returnCode = -1; - } + socket->setBlocking(false); } return returnCode; @@ -216,74 +203,16 @@ int AVTInput::_openSocketCli() return -1; } - if (_output_packet.getAddress().setAddress(address.c_str()) == -1) { - fprintf(stderr, "Can't set address %s (%s: %s)\n", address.c_str(), - inetErrDesc, inetErrMsg); - return -1; - } - - _output_packet.getAddress().setPort(port); - - if (_output_socket.create() == -1) { - fprintf(stderr, "Can't create UDP socket (%s: %s)\n", - inetErrDesc, inetErrMsg); - return -1; - } - + _output_packet.address.resolveUdpDestination(address.c_str(), port); return 0; } -/* ------------------------------------------------------------------ - * From ODR-Dabmux dabInputUdp::dabInputUdpRead - */ -ssize_t AVTInput::_read(uint8_t* buf, size_t size, bool onlyOnePacket) -{ - size_t nbBytes = 0; - - uint8_t* data = buf; - UdpPacket _input_packet(2048); - - if (_input_packet.getLength() == 0) { - _input_socket.receive(_input_packet); - } - - while (nbBytes < size) { - size_t freeSize = size - nbBytes; - if (_input_packet.getLength() > freeSize) { - // Not enought place in output - memcpy(&data[nbBytes], _input_packet.getData(), freeSize); - nbBytes = size; - _input_packet.setOffset(_input_packet.getOffset() + freeSize); - } - else { - size_t length = _input_packet.getLength(); - memcpy(&data[nbBytes], _input_packet.getData(), length); - nbBytes += length; - _input_packet.setOffset(0); - - _input_socket.receive(_input_packet); - if (_input_packet.getLength() == 0 || onlyOnePacket) { - break; - } - } - } - bzero(&data[nbBytes], size - nbBytes); - - return nbBytes; -} - -/* ------------------------------------------------------------------ - * - */ bool AVTInput::_isSTI(const uint8_t* buf) { return (memcmp(buf+1, STI_FSync0, sizeof(STI_FSync0)) == 0) || (memcmp(buf+1, STI_FSync1, sizeof(STI_FSync1)) == 0); } -/* ------------------------------------------------------------------ - * - */ const uint8_t* AVTInput::_findDABFrameFromUDP(const uint8_t* buf, size_t size, int32_t& frameNumber, size_t& dataSize) { @@ -362,19 +291,13 @@ const uint8_t* AVTInput::_findDABFrameFromUDP(const uint8_t* buf, size_t size, void AVTInput::_sendCtrlMessage() { if (!_output_uri.empty()) { - uint8_t data[50]; - uint32_t index = 0; - - data[index++] = 0xFD; - data[index++] = 0x07; - data[index++] = _subChannelIndex; - data[index++] = _audioMode; - data[index++] = _dac; - data[index++] = _monoMode; - - _output_packet.setOffset(0); - _output_packet.setLength(0); - _output_packet.addData(data, index); + std::vector buf({ 0xFD, 0x07, + static_cast(_subChannelIndex), + static_cast(_audioMode), + static_cast(_dac), + static_cast(_monoMode)}); + + _output_packet.buffer = buf; _output_socket.send(_output_packet); INFO("Send control packet to encoder\n"); @@ -390,28 +313,21 @@ void AVTInput::_sendCtrlMessage() * : 1 Byte : Size of pad data * Pad datas : X Bytes : In natural order, strating with FPAD bytes */ -void AVTInput::_sendPADFrame(UdpPacket* packet) +void AVTInput::_sendPADFrame() { - if (packet && _padFrameQueue.size() > 0) { + if (_padFrameQueue.size() > 0) { std::vector frame(move(_padFrameQueue.front())); _padFrameQueue.pop(); - uint8_t data[500]; - uint32_t index = 0; - - data[index++] = 0xFD; - data[index++] = 0x18; - data[index++] = frame.size()+2; - data[index++] = 0xAD; - data[index++] = frame.size(); - memcpy( data+index, frame.data(), frame.size()); - index += frame.size(); + std::vector buf({ 0xFD, 0x18, + static_cast(frame.size()+2), + 0xAD, + static_cast(frame.size())}); - packet->setOffset(0); - packet->setLength(0); - packet->addData(data, index); - - _input_pad_socket.send(*packet); + Socket::UDPPacket packet; + packet.buffer = buf; + copy(frame.begin(), frame.end(), back_inserter(packet.buffer)); + _input_pad_socket.send(packet); } } @@ -421,111 +337,71 @@ void AVTInput::_sendPADFrame(UdpPacket* packet) * Command code : 1 Byte * * 0x17 = Request for 1 PAD Frame */ -void AVTInput::_interpretMessage(const uint8_t* data, size_t size, UdpPacket* packet) +void AVTInput::_interpretMessage(const uint8_t* data, size_t size) { if (size >= 2) { if (data[0] == 0xFD) { switch (data[1]) { case 0x17: - _sendPADFrame(packet); + _sendPADFrame(); break; } } } } -/* ------------------------------------------------------------------ - * - */ bool AVTInput::_checkMessage() { - bool dataRecevied = false; - - if (_input_pad_packet.getLength() == 0) { - _input_pad_socket.receive(_input_pad_packet); + const auto packet = _input_pad_socket.receive(2048); + if (packet.buffer.empty()) { + return false; } - if (_input_pad_packet.getLength() > 0) { - _interpretMessage((uint8_t*)_input_pad_packet.getData(), _input_pad_packet.getLength(), - &_input_pad_packet); - _input_pad_packet.setOffset(0); - _input_pad_socket.receive(_input_pad_packet); - - dataRecevied = true; - } + _interpretMessage(packet.buffer.data(), packet.buffer.size()); - return dataRecevied; + return true; } -/* ------------------------------------------------------------------ - * - */ void AVTInput::_purgeMessages() { - bool dataRecevied; int nb = 0; - do { - dataRecevied = false; - if (_input_pad_packet.getLength() == 0) { - _input_pad_socket.receive(_input_pad_packet); - } - - if (_input_pad_packet.getLength() > 0) { - nb++; - _input_pad_packet.setOffset(0); - _input_pad_socket.receive(_input_pad_packet); - - dataRecevied = true; - } - } while (dataRecevied); + while (not _input_pad_socket.receive(2048).buffer.empty()) { + nb++; + } if (nb>0) DEBUG("%d messages purged\n", nb); } -/* ------------------------------------------------------------------ - * - */ bool AVTInput::_readFrame() { - bool dataRecevied = false; - - uint8_t readBuf[MAX_AVT_FRAME_SIZE]; int32_t frameNumber; const uint8_t* dataPtr = NULL; size_t dataSize = 0; - std::vector data; - size_t readBytes = _read(readBuf, sizeof(readBuf), true/*onlyOnePacket*/); - if (readBytes > 0) - { - dataRecevied = true; + auto packet = _input_socket.receive(MAX_AVT_FRAME_SIZE); + const size_t readBytes = packet.buffer.size(); + + if (readBytes > 0) { + const uint8_t *readBuf = packet.buffer.data(); if (readBytes > _dab24msFrameSize) { // Extract frame data and frame number from buf dataPtr = _findDABFrameFromUDP(readBuf, readBytes, frameNumber, dataSize); } -// if (!data) { -// // Assuming pure RAW data -// data = buf; -// dataSize = _dab24msFrameSize; -// frameNumber = _dummyFrameNumber++; -// } - if (!dataPtr) { - _info(_typeCantExtract, 0); - } + if (dataPtr) { - if (dataSize == _dab24msFrameSize ) { - if( _frameAligned || frameNumber%5 == 0) { + if (dataSize == _dab24msFrameSize) { + if (_frameAligned or frameNumber%5 == 0) { #if defined(DISTURB_INPUT) // Duplicate a frame - if(frameNumber%250==0) _ordered.push(frameNumber, dataPtr, dataSize); + if (frameNumber % 250 == 0) _ordered.push(frameNumber, dataPtr, dataSize); // Invert 2 frames (content inverted, audio distrubed by this test)) - if( frameNumber % 200 == 0) frameNumber += 10; - else if( (frameNumber-10) % 200 == 0) frameNumber -= 10; + if (frameNumber % 200 == 0) frameNumber += 10; + else if ((frameNumber-10) % 200 == 0) frameNumber -= 10; // Remove a frame (audio distrubed, frame missing) - if(frameNumber%300 > 5) + if (frameNumber % 300 > 5) #endif _ordered.push(frameNumber, dataPtr, dataSize); _frameAligned = true; @@ -533,14 +409,14 @@ bool AVTInput::_readFrame() } else ERROR("Wrong frame size from encoder %zu != %zu\n", dataSize, _dab24msFrameSize); } + else { + _info(_typeCantExtract, 0); + } } - return dataRecevied; + return readBytes > 0; } -/* ------------------------------------------------------------------ - * - */ ssize_t AVTInput::getNextFrame(std::vector &buf) { ssize_t nbBytes = 0; @@ -577,9 +453,6 @@ ssize_t AVTInput::getNextFrame(std::vector &buf) return nbBytes; } -/* ------------------------------------------------------------------ - * - */ void AVTInput::pushPADFrame(const uint8_t* buf, size_t size) { if (_pad_port == 0) { @@ -593,17 +466,11 @@ void AVTInput::pushPADFrame(const uint8_t* buf, size_t size) } } -/* ------------------------------------------------------------------ - * - */ bool AVTInput::padQueueFull() { return _padFrameQueue.size() >= MAX_PAD_FRAME_QUEUE_SIZE; } -/* ------------------------------------------------------------------ - * - */ void AVTInput::_info(_frameType type, size_t size) { if (_lastInfoFrameType != type || _lastInfoSize != size) { diff --git a/src/AVTInput.h b/src/AVTInput.h index ffa632f..62b2248 100644 --- a/src/AVTInput.h +++ b/src/AVTInput.h @@ -30,7 +30,7 @@ #pragma once -#include "UdpSocket.h" +#include "Socket.h" #include "OrderedQueue.h" #include #include @@ -101,11 +101,11 @@ class AVTInput uint32_t _pad_port; size_t _jitterBufferSize; - UdpSocket _input_socket; - UdpSocket _output_socket; - UdpPacket _output_packet; - UdpSocket _input_pad_socket; - UdpPacket _input_pad_packet; + Socket::UDPSocket _input_socket; + Socket::UDPSocket _output_socket; + Socket::UDPPacket _output_packet; + Socket::UDPSocket _input_pad_socket; + Socket::UDPPacket _input_pad_packet; OrderedQueue _ordered; std::queue > _padFrameQueue; @@ -123,21 +123,15 @@ class AVTInput uint8_t* _nextFrameIndex = 0; bool _parseURI(const char* uri, std::string& address, long& port); - int _openSocketSrv(UdpSocket* socket, const char* uri); + int _openSocketSrv(Socket::UDPSocket* socket, const char* uri); int _openSocketCli(); void _sendCtrlMessage(); - void _sendPADFrame(UdpPacket* packet = NULL); - void _interpretMessage(const uint8_t* data, size_t size, UdpPacket* packet = NULL); + void _sendPADFrame(); + void _interpretMessage(const uint8_t* data, size_t size); bool _checkMessage(); void _purgeMessages(); - /*! Read length bytes into buf. - * - * \return the number of bytes read. - */ - ssize_t _read(uint8_t* buf, size_t length, bool onlyOnePacket=false); - /*! Test Bytes 1,2,3 for STI detection */ bool _isSTI(const uint8_t* buf); diff --git a/src/UdpSocket.cpp b/src/UdpSocket.cpp deleted file mode 100644 index 8ac3706..0000000 --- a/src/UdpSocket.cpp +++ /dev/null @@ -1,510 +0,0 @@ -/* - Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the - Queen in Right of Canada (Communications Research Center Canada) - - Copyright (C) 2015 Matthias P. Braendli - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#include "UdpSocket.h" - -#include -#include -#include -#include -#include - -#ifdef TRACE_ON -# ifndef TRACE_CLASS -# define TRACE_CLASS(class, func) cout <<"-" <<(class) <<"\t(" < data, InetAddress destination) -{ -#ifdef DUMP - TRACE_CLASS("UdpSocket", "send(vector)"); -#endif - int ret = sendto(listenSocket, &data[0], data.size(), 0, - destination.getAddress(), sizeof(*destination.getAddress())); - if (ret == SOCKET_ERROR -#ifndef _WIN32 - && errno != ECONNREFUSED -#endif - ) { - setInetError("Can't send UDP packet"); - return -1; - } - return 0; -} - - -/** - * Must be called to receive data on a multicast address. - * @param groupname The multica -st address to join. - * @return 0 if ok, -1 if error - */ -int UdpSocket::joinGroup(char* groupname) -{ - TRACE_CLASS("UdpSocket", "joinGroup(char*)"); -#ifdef _WIN32 - ip_mreq group; -#else - ip_mreqn group; -#endif - if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { - setInetError(groupname); - return -1; - } - if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { - setInetError("Not a multicast address"); - return -1; - } -#ifdef _WIN32 - group.imr_interface.s_addr = 0; - if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*)&group, sizeof(group)) - == SOCKET_ERROR) { - setInetError("Can't join multicast group"); - return -1; - } -#else - group.imr_address.s_addr = htons(INADDR_ANY);; - group.imr_ifindex = 0; - if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) - == SOCKET_ERROR) { - setInetError("Can't join multicast group"); - } -#endif - return 0; -} - -int UdpSocket::setMulticastTTL(int ttl) -{ - if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) - == SOCKET_ERROR) { - setInetError("Can't set ttl"); - return -1; - } - - return 0; -} - -int UdpSocket::setMulticastSource(const char* source_addr) -{ - struct in_addr addr; - if (inet_aton(source_addr, &addr) == 0) { - setInetError("Can't parse source address"); - return -1; - } - - if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) - == SOCKET_ERROR) { - setInetError("Can't set source address"); - return -1; - } - - return 0; -} - - -/** - * Constructs an UDP packet. - * @param initSize The initial size of the data buffer - */ -UdpPacket::UdpPacket(unsigned int initSize) : - dataBuf(new char[initSize]), - length(0), - size(initSize), - offset(0) -{ - TRACE_CLASS("UdpPacket", "UdpPacket(unsigned int)"); - if (dataBuf == NULL) - size = 0; -} - - -/// Destructor -UdpPacket::~UdpPacket() -{ - TRACE_CLASS("UdpPacket", "~UdpPacket()"); - if (dataBuf != NULL) { - delete []dataBuf; - dataBuf = NULL; - } -} - - -/** - * Changes size of the data buffer size. \a Length + \a offset data will be copied - * in the new buffer. - * @warning The pointer to data will be changed - * @param newSize The new data buffer size - */ -void UdpPacket::setSize(unsigned newSize) -{ - TRACE_CLASS("UdpPacket", "setSize(unsigned)"); - char *tmp = new char[newSize]; - if (length > newSize) - length = newSize; - if (tmp) { - memcpy(tmp, dataBuf, length); - delete []dataBuf; - dataBuf = tmp; - size = newSize; - } -} - - -/** - * Give the pointer to data. It is ajusted with the \a offset. - * @warning This pointer change. when the \a size of the buffer and the \a offset change. - * @return The pointer - */ -char *UdpPacket::getData() -{ - return dataBuf + offset; -} - - -/** - * Add some data at the end of data buffer and adjust size. - * @param data Pointer to the data to add - * @param size Size in bytes of new data - */ -void UdpPacket::addData(const void *data, unsigned size) -{ - if (length + size > this->size) { - setSize(this->size << 1); - } - memcpy(dataBuf + length, data, size); - length += size; -} - - -/** - * Returns the length of useful data. Data before the \a offset are ignored. - * @return The data length - */ -unsigned long UdpPacket::getLength() -{ - return length - offset; -} - - -/** - * Returns the size of the data buffer. - * @return The data buffer size - */ -unsigned long UdpPacket::getSize() -{ - return size; -} - - -/** - * Returns the offset value. - * @return The offset value - */ -unsigned long UdpPacket::getOffset() -{ - return offset; -} - - -/** - * Sets the data length value. Data before the \a offset are ignored. - * @param len The new length of data - */ -void UdpPacket::setLength(unsigned long len) -{ - length = len + offset; -} - - -/** - * Sets the data offset. Data length is ajusted to ignore data before the \a offset. - * @param val The new data offset. - */ -void UdpPacket::setOffset(unsigned long val) -{ - offset = val; - if (offset > length) - length = offset; -} - - -/** - * Returns the UDP address of the data. - * @return The UDP address - */ -InetAddress &UdpPacket::getAddress() -{ - return address; -} - -/* -WSAEINTR -WSAEBADF -WSAEACCES -WSAEFAULT -WSAEINVAL -WSAEMFILE -WSAEWOULDBLOCK -WSAEINPROGRESS -WSAEALREADY -WSAENOTSOCK -WSAEDESTADDRREQ -WSAEMSGSIZE -WSAEPROTOTYPE -WSAENOPROTOOPT -WSAEPROTONOSUPPORT -WSAESOCKTNOSUPPORT -WSAEOPNOTSUPP -WSAEPFNOSUPPORT -WSAEAFNOSUPPORT -WSAEADDRINUSE -WSAEADDRNOTAVAIL -WSAENETDOWN -WSAENETUNREACH -WSAENETRESET -WSAECONNABORTED -WSAECONNRESET -WSAENOBUFS -WSAEISCONN -WSAENOTCONN -WSAESHUTDOWN -WSAETOOMANYREFS -WSAETIMEDOUT -WSAECONNREFUSED -WSAELOOP -WSAENAMETOOLONG -WSAEHOSTDOWN -WSAEHOSTUNREACH -WSAENOTEMPTY -WSAEPROCLIM -WSAEUSERS -WSAEDQUOT -WSAESTALE -WSAEREMOTE -WSAEDISCON -WSASYSNOTREADY -WSAVERNOTSUPPORTED -WSANOTINITIALISED -*/ diff --git a/src/UdpSocket.h b/src/UdpSocket.h deleted file mode 100644 index 07e9f0e..0000000 --- a/src/UdpSocket.h +++ /dev/null @@ -1,138 +0,0 @@ -/* - Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the - Queen in Right of Canada (Communications Research Center Canada) - - Copyright (C) 2015 Matthias P. Braendli - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#ifndef _UDPSOCKET -#define _UDPSOCKET - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#include "InetAddress.h" -#ifdef _WIN32 -# include -# define socklen_t int -# define reuseopt_t char -#else -# include -# include -# include -# include -# include -# include -# define SOCKET int -# define INVALID_SOCKET -1 -# define SOCKET_ERROR -1 -# define reuseopt_t int -#endif -//#define INVALID_PORT -1 - -#include -#include -#include - -class UdpPacket; - - -/** - * This class represents a socket for sending and receiving UDP packets. - * - * A UDP socket is the sending or receiving point for a packet delivery service. - * Each packet sent or received on a datagram socket is individually - * addressed and routed. Multiple packets sent from one machine to another may - * be routed differently, and may arrive in any order. - * @author Pascal Charest pascal.charest@crc.ca - */ -class UdpSocket { - public: - UdpSocket(); - UdpSocket(int port, char *name = NULL); - ~UdpSocket(); - UdpSocket(const UdpSocket& other) = delete; - const UdpSocket& operator=(const UdpSocket& other) = delete; - - static int init(); - static int clean(); - - int create(int port = 0, char *name = NULL); - - int send(UdpPacket &packet); - int send(const std::vector data); - int send(std::vector data, InetAddress destination); - int receive(UdpPacket &packet); - int joinGroup(char* groupname); - int setMulticastSource(const char* source_addr); - int setMulticastTTL(int ttl); - /** - * Connects the socket on a specific address. Only data from this address - * will be received. - * @param addr The address to connect the socket - * @warning Not implemented yet. - */ - void connect(InetAddress &addr); - int setBlocking(bool block); - - protected: - /// The address on which the socket is binded. - InetAddress address; - /// The low-level socket used by system functions. - SOCKET listenSocket; -}; - -/** - * This class represents a UDP packet. - * - * UDP packets are used to implement a connectionless packet delivery service. - * Each message is routed from one machine to another based solely on - * information contained within that packet. Multiple packets sent from one - * machine to another might be routed differently, and might arrive in any order. - * @author Pascal Charest pascal.charest@crc.ca - */ -class UdpPacket { - public: - UdpPacket(unsigned int initSize = 1024); - UdpPacket(const UdpPacket& packet) = delete; - const UdpPacket& operator=(const UdpPacket&) = delete; - UdpPacket(const UdpPacket&& packet) = delete; - const UdpPacket& operator=(const UdpPacket&&) = delete; - ~UdpPacket(); - - char *getData(); - void addData(const void *data, unsigned size); - unsigned long getLength(); - unsigned long getSize(); - unsigned long getOffset(); - void setLength(unsigned long len); - void setOffset(unsigned long val); - void setSize(unsigned newSize); - InetAddress &getAddress(); - - private: - char *dataBuf; - unsigned long length, size, offset; - InetAddress address; -}; - -#endif // _UDPSOCKET - -- cgit v1.2.3