aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Makefile.am10
-rw-r--r--lib/Socket.cpp894
-rw-r--r--lib/Socket.h294
-rw-r--r--lib/ThreadsafeQueue.h176
-rw-r--r--src/AVTEDIInput.cpp738
-rw-r--r--src/AVTEDIInput.h188
-rw-r--r--src/AVTInput.cpp261
-rw-r--r--src/AVTInput.h24
-rw-r--r--src/UdpSocket.cpp510
-rw-r--r--src/UdpSocket.h138
10 files changed, 1442 insertions, 1791 deletions
diff --git a/Makefile.am b/Makefile.am
index 9594ae2..aff4694 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -9,13 +9,11 @@ endif
odr_sourcecompanion_LDADD = -lzmq
odr_sourcecompanion_CFLAGS = $(GITVERSION_FLAGS) -ggdb -O2 -Wall
-odr_sourcecompanion_CXXFLAGS = $(GITVERSION_FLAGS) -ggdb -O2 -Wall -Isrc/fec
+odr_sourcecompanion_CXXFLAGS = $(GITVERSION_FLAGS) -ggdb -O2 -Wall -Isrc/fec -Ilib
odr_sourcecompanion_SOURCES = src/odr-sourcecompanion.cpp \
src/AACDecoder.h src/AACDecoder.cpp \
- src/AVTInput.h src/AVTInput.cpp \
- src/InetAddress.h src/InetAddress.cpp \
+ src/AVTInput.h src/AVTInput.cpp \
src/OrderedQueue.h src/OrderedQueue.cpp \
- src/UdpSocket.h src/UdpSocket.cpp \
src/crc.h src/crc.c \
src/encryption.h src/encryption.c \
src/utils.h src/utils.c \
@@ -27,7 +25,9 @@ odr_sourcecompanion_SOURCES = src/odr-sourcecompanion.cpp \
src/fec/fec.h \
src/fec/init_rs_char.c \
src/fec/init_rs.h \
- src/fec/rs-common.h
+ src/fec/rs-common.h \
+ lib/ThreadsafeQueue.h \
+ lib/Socket.h lib/Socket.cpp
bin_PROGRAMS = odr-sourcecompanion$(EXEEXT)
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
new file mode 100644
index 0000000..d14902e
--- /dev/null
+++ b/lib/Socket.cpp
@@ -0,0 +1,894 @@
+/*
+ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
+ Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+*/
+
+#include "Socket.h"
+
+#include <iostream>
+#include <cstdio>
+#include <cstring>
+#include <cerrno>
+#include <fcntl.h>
+#include <poll.h>
+
+namespace Socket {
+
+using namespace std;
+
+void InetAddress::resolveUdpDestination(const std::string& destination, int port)
+{
+ char service[NI_MAXSERV];
+ snprintf(service, NI_MAXSERV-1, "%d", port);
+
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
+ hints.ai_flags = 0;
+ hints.ai_protocol = 0;
+
+ struct addrinfo *result, *rp;
+ int s = getaddrinfo(destination.c_str(), service, &hints, &result);
+ if (s != 0) {
+ throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s));
+ }
+
+ for (rp = result; rp != nullptr; rp = rp->ai_next) {
+ // Take the first result
+ memcpy(&addr, rp->ai_addr, rp->ai_addrlen);
+ break;
+ }
+
+ freeaddrinfo(result);
+
+ if (rp == nullptr) {
+ throw runtime_error("Could not resolve");
+ }
+}
+
+UDPPacket::UDPPacket() { }
+
+UDPPacket::UDPPacket(size_t initSize) :
+ buffer(initSize)
+{ }
+
+
+UDPSocket::UDPSocket() :
+ m_sock(INVALID_SOCKET)
+{
+ reinit(0, "");
+}
+
+UDPSocket::UDPSocket(int port) :
+ m_sock(INVALID_SOCKET)
+{
+ reinit(port, "");
+}
+
+UDPSocket::UDPSocket(int port, const std::string& name) :
+ m_sock(INVALID_SOCKET)
+{
+ reinit(port, name);
+}
+
+
+void UDPSocket::setBlocking(bool block)
+{
+ int res = fcntl(m_sock, F_SETFL, block ? 0 : O_NONBLOCK);
+ if (res == -1) {
+ throw runtime_error(string("Can't change blocking state of socket: ") + strerror(errno));
+ }
+}
+
+void UDPSocket::reinit(int port)
+{
+ return reinit(port, "");
+}
+
+void UDPSocket::reinit(int port, const std::string& name)
+{
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
+ }
+
+ if (port == 0) {
+ // No need to bind to a given port, creating the
+ // socket is enough
+ m_sock = ::socket(AF_INET, SOCK_DGRAM, 0);
+ return;
+ }
+
+ char service[NI_MAXSERV];
+ snprintf(service, NI_MAXSERV-1, "%d", port);
+
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
+ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
+ hints.ai_protocol = 0; /* Any protocol */
+ hints.ai_canonname = nullptr;
+ hints.ai_addr = nullptr;
+ hints.ai_next = nullptr;
+
+ struct addrinfo *result, *rp;
+ int s = getaddrinfo(name.empty() ? nullptr : name.c_str(),
+ port == 0 ? nullptr : service,
+ &hints, &result);
+ if (s != 0) {
+ throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s));
+ }
+
+ /* getaddrinfo() returns a list of address structures.
+ Try each address until we successfully bind(2).
+ If socket(2) (or bind(2)) fails, we (close the socket
+ and) try the next address. */
+ for (rp = result; rp != nullptr; rp = rp->ai_next) {
+ int sfd = ::socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+ if (sfd == -1) {
+ continue;
+ }
+
+ if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
+ m_sock = sfd;
+ break;
+ }
+
+ ::close(sfd);
+ }
+
+ freeaddrinfo(result);
+
+ if (rp == nullptr) {
+ throw runtime_error("Could not bind");
+ }
+}
+
+void UDPSocket::close()
+{
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
+ }
+
+ m_sock = INVALID_SOCKET;
+}
+
+UDPSocket::~UDPSocket()
+{
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
+ }
+}
+
+
+UDPPacket UDPSocket::receive(size_t max_size)
+{
+ UDPPacket packet(max_size);
+ socklen_t addrSize;
+ addrSize = sizeof(*packet.address.as_sockaddr());
+ ssize_t ret = recvfrom(m_sock,
+ packet.buffer.data(),
+ packet.buffer.size(),
+ 0,
+ packet.address.as_sockaddr(),
+ &addrSize);
+
+ if (ret == SOCKET_ERROR) {
+ packet.buffer.resize(0);
+
+ // This suppresses the -Wlogical-op warning
+#if EAGAIN == EWOULDBLOCK
+ if (errno == EAGAIN) {
+#else
+ if (errno == EAGAIN or errno == EWOULDBLOCK) {
+#endif
+ return 0;
+ }
+ throw runtime_error(string("Can't receive data: ") + strerror(errno));
+ }
+
+ packet.buffer.resize(ret);
+ return packet;
+}
+
+void UDPSocket::send(UDPPacket& packet)
+{
+ const int ret = sendto(m_sock, packet.buffer.data(), packet.buffer.size(), 0,
+ packet.address.as_sockaddr(), sizeof(*packet.address.as_sockaddr()));
+ if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
+ throw runtime_error(string("Can't send UDP packet: ") + strerror(errno));
+ }
+}
+
+
+void UDPSocket::send(const std::vector<uint8_t>& data, InetAddress destination)
+{
+ const int ret = sendto(m_sock, data.data(), data.size(), 0,
+ destination.as_sockaddr(), sizeof(*destination.as_sockaddr()));
+ if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
+ throw runtime_error(string("Can't send UDP packet: ") + strerror(errno));
+ }
+}
+
+void UDPSocket::joinGroup(const char* groupname, const char* if_addr)
+{
+ ip_mreqn group;
+ if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) {
+ throw runtime_error("Cannot convert multicast group name");
+ }
+ if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) {
+ throw runtime_error("Group name is not a multicast address");
+ }
+
+ if (if_addr) {
+ group.imr_address.s_addr = inet_addr(if_addr);
+ }
+ else {
+ group.imr_address.s_addr = htons(INADDR_ANY);
+ }
+ group.imr_ifindex = 0;
+ if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
+ == SOCKET_ERROR) {
+ throw runtime_error(string("Can't join multicast group") + strerror(errno));
+ }
+}
+
+void UDPSocket::setMulticastSource(const char* source_addr)
+{
+ struct in_addr addr;
+ if (inet_aton(source_addr, &addr) == 0) {
+ throw runtime_error(string("Can't parse source address") + strerror(errno));
+ }
+
+ if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))
+ == SOCKET_ERROR) {
+ throw runtime_error(string("Can't set source address") + strerror(errno));
+ }
+}
+
+void UDPSocket::setMulticastTTL(int ttl)
+{
+ if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl))
+ == SOCKET_ERROR) {
+ throw runtime_error(string("Can't set multicast ttl") + strerror(errno));
+ }
+}
+
+UDPReceiver::UDPReceiver() { }
+
+UDPReceiver::~UDPReceiver() {
+ m_stop = true;
+ m_sock.close();
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
+void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) {
+ m_port = port;
+ m_bindto = bindto;
+ m_mcastaddr = mcastaddr;
+ m_max_packets_queued = max_packets_queued;
+ m_thread = std::thread(&UDPReceiver::m_run, this);
+}
+
+std::vector<uint8_t> UDPReceiver::get_packet_buffer()
+{
+ if (m_stop) {
+ throw runtime_error("UDP Receiver not running");
+ }
+
+ UDPPacket p;
+ m_packets.wait_and_pop(p);
+
+ return p.buffer;
+}
+
+void UDPReceiver::m_run()
+{
+ // Ensure that stop is set to true in case of exception or return
+ struct SetStopOnDestruct {
+ SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {}
+ ~SetStopOnDestruct() { m_stop = true; }
+ private: atomic<bool>& m_stop;
+ } autoSetStop(m_stop);
+
+ if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) {
+ m_sock.reinit(m_port, m_mcastaddr);
+ m_sock.setMulticastSource(m_bindto.c_str());
+ m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str());
+ }
+ else {
+ m_sock.reinit(m_port, m_bindto);
+ }
+
+ while (not m_stop) {
+ constexpr size_t packsize = 8192;
+ try {
+ auto packet = m_sock.receive(packsize);
+ if (packet.buffer.size() == packsize) {
+ // TODO replace fprintf
+ fprintf(stderr, "Warning, possible UDP truncation\n");
+ }
+
+ // If this blocks, the UDP socket will lose incoming packets
+ m_packets.push_wait_if_full(packet, m_max_packets_queued);
+ }
+ catch (const std::runtime_error& e) {
+ // TODO replace fprintf
+ // TODO handle intr
+ fprintf(stderr, "Socket error: %s\n", e.what());
+ m_stop = true;
+ }
+ }
+}
+
+
+TCPSocket::TCPSocket()
+{
+}
+
+TCPSocket::~TCPSocket()
+{
+ if (m_sock != -1) {
+ ::close(m_sock);
+ }
+}
+
+TCPSocket::TCPSocket(TCPSocket&& other) :
+ m_sock(other.m_sock),
+ m_remote_address(move(other.m_remote_address))
+{
+ if (other.m_sock != -1) {
+ other.m_sock = -1;
+ }
+}
+
+TCPSocket& TCPSocket::operator=(TCPSocket&& other)
+{
+ swap(m_remote_address, other.m_remote_address);
+
+ m_sock = other.m_sock;
+ if (other.m_sock != -1) {
+ other.m_sock = -1;
+ }
+
+ return *this;
+}
+
+bool TCPSocket::valid() const
+{
+ return m_sock != -1;
+}
+
+void TCPSocket::connect(const std::string& hostname, int port)
+{
+ if (m_sock != INVALID_SOCKET) {
+ throw std::logic_error("You may only connect an invalid TCPSocket");
+ }
+
+ char service[NI_MAXSERV];
+ snprintf(service, NI_MAXSERV-1, "%d", port);
+
+ /* Obtain address(es) matching host/port */
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = 0;
+ hints.ai_protocol = 0;
+
+ struct addrinfo *result, *rp;
+ int s = getaddrinfo(hostname.c_str(), service, &hints, &result);
+ if (s != 0) {
+ throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s));
+ }
+
+ /* getaddrinfo() returns a list of address structures.
+ Try each address until we successfully connect(2).
+ If socket(2) (or connect(2)) fails, we (close the socket
+ and) try the next address. */
+
+ for (rp = result; rp != nullptr; rp = rp->ai_next) {
+ int sfd = ::socket(rp->ai_family, rp->ai_socktype,
+ rp->ai_protocol);
+ if (sfd == -1)
+ continue;
+
+ int ret = ::connect(sfd, rp->ai_addr, rp->ai_addrlen);
+ if (ret != -1 or (ret == -1 and errno == EINPROGRESS)) {
+ // As the TCPClient could set the socket to nonblocking, we
+ // must handle EINPROGRESS here
+ m_sock = sfd;
+ break;
+ }
+
+ ::close(sfd);
+ }
+
+ if (m_sock != INVALID_SOCKET) {
+#if defined(HAVE_SO_NOSIGPIPE)
+ int val = 1;
+ if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))
+ == SOCKET_ERROR) {
+ throw std::runtime_error("Can't set SO_NOSIGPIPE");
+ }
+#endif
+ }
+
+ freeaddrinfo(result); /* No longer needed */
+
+ if (rp == nullptr) {
+ throw runtime_error("Could not connect");
+ }
+
+}
+
+void TCPSocket::listen(int port, const string& name)
+{
+ if (m_sock != INVALID_SOCKET) {
+ throw std::logic_error("You may only listen with an invalid TCPSocket");
+ }
+
+ char service[NI_MAXSERV];
+ snprintf(service, NI_MAXSERV-1, "%d", port);
+
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
+ hints.ai_protocol = 0;
+ hints.ai_canonname = nullptr;
+ hints.ai_addr = nullptr;
+ hints.ai_next = nullptr;
+
+ struct addrinfo *result, *rp;
+ int s = getaddrinfo(name.empty() ? nullptr : name.c_str(), service, &hints, &result);
+ if (s != 0) {
+ throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s));
+ }
+
+ /* getaddrinfo() returns a list of address structures.
+ Try each address until we successfully bind(2).
+ If socket(2) (or bind(2)) fails, we (close the socket
+ and) try the next address. */
+ for (rp = result; rp != nullptr; rp = rp->ai_next) {
+ int sfd = ::socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+ if (sfd == -1) {
+ continue;
+ }
+
+ if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
+ m_sock = sfd;
+ break;
+ }
+
+ ::close(sfd);
+ }
+
+ freeaddrinfo(result);
+
+ if (m_sock != INVALID_SOCKET) {
+#if defined(HAVE_SO_NOSIGPIPE)
+ int val = 1;
+ if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE,
+ &val, sizeof(val)) < 0) {
+ throw std::runtime_error("Can't set SO_NOSIGPIPE");
+ }
+#endif
+
+ int ret = ::listen(m_sock, 0);
+ if (ret == -1) {
+ throw std::runtime_error(string("Could not listen: ") + strerror(errno));
+ }
+ }
+
+ if (rp == nullptr) {
+ throw runtime_error("Could not bind");
+ }
+}
+
+void TCPSocket::close()
+{
+ ::close(m_sock);
+ m_sock = -1;
+}
+
+TCPSocket TCPSocket::accept(int timeout_ms)
+{
+ if (timeout_ms == 0) {
+ InetAddress remote_addr;
+ socklen_t client_len = sizeof(remote_addr.addr);
+ int sockfd = ::accept(m_sock, remote_addr.as_sockaddr(), &client_len);
+ TCPSocket s(sockfd, remote_addr);
+ return s;
+ }
+ else {
+ struct pollfd fds[1];
+ fds[0].fd = m_sock;
+ fds[0].events = POLLIN;
+
+ int retval = poll(fds, 1, timeout_ms);
+
+ if (retval == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP Socket accept error: " + errstr);
+ }
+ else if (retval > 0) {
+ InetAddress remote_addr;
+ socklen_t client_len = sizeof(remote_addr.addr);
+ int sockfd = ::accept(m_sock, remote_addr.as_sockaddr(), &client_len);
+ TCPSocket s(sockfd, remote_addr);
+ return s;
+ }
+ else {
+ TCPSocket s(-1);
+ return s;
+ }
+ }
+}
+
+ssize_t TCPSocket::sendall(const void *buffer, size_t buflen)
+{
+ uint8_t *buf = (uint8_t*)buffer;
+ while (buflen > 0) {
+ /* On Linux, the MSG_NOSIGNAL flag ensures that the process
+ * would not receive a SIGPIPE and die.
+ * Other systems have SO_NOSIGPIPE set on the socket for the
+ * same effect. */
+#if defined(HAVE_MSG_NOSIGNAL)
+ const int flags = MSG_NOSIGNAL;
+#else
+ const int flags = 0;
+#endif
+ ssize_t sent = ::send(m_sock, buf, buflen, flags);
+ if (sent < 0) {
+ return -1;
+ }
+ else {
+ buf += sent;
+ buflen -= sent;
+ }
+ }
+ return buflen;
+}
+
+ssize_t TCPSocket::send(const void* data, size_t size, int timeout_ms)
+{
+ if (timeout_ms) {
+ struct pollfd fds[1];
+ fds[0].fd = m_sock;
+ fds[0].events = POLLOUT;
+
+ const int retval = poll(fds, 1, timeout_ms);
+
+ if (retval == -1) {
+ throw std::runtime_error(string("TCP Socket send error on poll(): ") + strerror(errno));
+ }
+ else if (retval == 0) {
+ // Timed out
+ return 0;
+ }
+ }
+
+ /* On Linux, the MSG_NOSIGNAL flag ensures that the process would not
+ * receive a SIGPIPE and die.
+ * Other systems have SO_NOSIGPIPE set on the socket for the same effect. */
+#if defined(HAVE_MSG_NOSIGNAL)
+ const int flags = MSG_NOSIGNAL;
+#else
+ const int flags = 0;
+#endif
+ const ssize_t ret = ::send(m_sock, (const char*)data, size, flags);
+
+ if (ret == SOCKET_ERROR) {
+ throw std::runtime_error(string("TCP Socket send error: ") + strerror(errno));
+ }
+ return ret;
+}
+
+ssize_t TCPSocket::recv(void *buffer, size_t length, int flags)
+{
+ ssize_t ret = ::recv(m_sock, buffer, length, flags);
+ if (ret == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP receive error: " + errstr);
+ }
+ return ret;
+}
+
+ssize_t TCPSocket::recv(void *buffer, size_t length, int flags, int timeout_ms)
+{
+ struct pollfd fds[1];
+ fds[0].fd = m_sock;
+ fds[0].events = POLLIN;
+
+ int retval = poll(fds, 1, timeout_ms);
+
+ if (retval == -1 and errno == EINTR) {
+ throw Interrupted();
+ }
+ else if (retval == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP receive with poll() error: " + errstr);
+ }
+ else if (retval > 0 and (fds[0].revents | POLLIN)) {
+ ssize_t ret = ::recv(m_sock, buffer, length, flags);
+ if (ret == -1) {
+ if (errno == ECONNREFUSED) {
+ return 0;
+ }
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP receive after poll() error: " + errstr);
+ }
+ return ret;
+ }
+ else {
+ throw Timeout();
+ }
+}
+
+TCPSocket::TCPSocket(int sockfd) :
+ m_sock(sockfd),
+ m_remote_address()
+{ }
+
+TCPSocket::TCPSocket(int sockfd, InetAddress remote_address) :
+ m_sock(sockfd),
+ m_remote_address(remote_address)
+{ }
+
+void TCPClient::connect(const std::string& hostname, int port)
+{
+ m_hostname = hostname;
+ m_port = port;
+ reconnect();
+}
+
+ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms)
+{
+ try {
+ ssize_t ret = m_sock.recv(buffer, length, flags, timeout_ms);
+
+ if (ret == 0) {
+ m_sock.close();
+
+ TCPSocket newsock;
+ m_sock = std::move(newsock);
+ reconnect();
+ }
+
+ return ret;
+ }
+ catch (const TCPSocket::Interrupted&) {
+ return -1;
+ }
+ catch (const TCPSocket::Timeout&) {
+ return 0;
+ }
+
+ return 0;
+}
+
+void TCPClient::reconnect()
+{
+ int flags = fcntl(m_sock.m_sock, F_GETFL);
+ if (fcntl(m_sock.m_sock, F_SETFL, flags | O_NONBLOCK) == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr);
+ }
+
+ m_sock.connect(m_hostname, m_port);
+}
+
+TCPConnection::TCPConnection(TCPSocket&& sock) :
+ queue(),
+ m_running(true),
+ m_sender_thread(),
+ m_sock(move(sock))
+{
+#if MISSING_OWN_ADDR
+ auto own_addr = m_sock.getOwnAddress();
+ auto addr = m_sock.getRemoteAddress();
+ etiLog.level(debug) << "New TCP Connection on port " <<
+ own_addr.getPort() << " from " <<
+ addr.getHostAddress() << ":" << addr.getPort();
+#endif
+ m_sender_thread = std::thread(&TCPConnection::process, this);
+}
+
+TCPConnection::~TCPConnection()
+{
+ m_running = false;
+ vector<uint8_t> termination_marker;
+ queue.push(termination_marker);
+ m_sender_thread.join();
+}
+
+void TCPConnection::process()
+{
+ while (m_running) {
+ vector<uint8_t> data;
+ queue.wait_and_pop(data);
+
+ if (data.empty()) {
+ // empty vector is the termination marker
+ m_running = false;
+ break;
+ }
+
+ try {
+ ssize_t remaining = data.size();
+ const uint8_t *buf = reinterpret_cast<const uint8_t*>(data.data());
+ const int timeout_ms = 10; // Less than one ETI frame
+
+ while (m_running and remaining > 0) {
+ const ssize_t sent = m_sock.send(buf, remaining, timeout_ms);
+ if (sent < 0 or sent > remaining) {
+ throw std::logic_error("Invalid TCPSocket::send() return value");
+ }
+ remaining -= sent;
+ buf += sent;
+ }
+ }
+ catch (const std::runtime_error& e) {
+ m_running = false;
+ }
+ }
+
+#if MISSING_OWN_ADDR
+ auto own_addr = m_sock.getOwnAddress();
+ auto addr = m_sock.getRemoteAddress();
+ etiLog.level(debug) << "Dropping TCP Connection on port " <<
+ own_addr.getPort() << " from " <<
+ addr.getHostAddress() << ":" << addr.getPort();
+#endif
+}
+
+
+TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) :
+ m_max_queue_size(max_queue_size)
+{
+}
+
+TCPDataDispatcher::~TCPDataDispatcher()
+{
+ m_running = false;
+ m_connections.clear();
+ m_listener_socket.close();
+ if (m_listener_thread.joinable()) {
+ m_listener_thread.join();
+ }
+}
+
+void TCPDataDispatcher::start(int port, const string& address)
+{
+ m_listener_socket.listen(port, address);
+
+ m_running = true;
+ m_listener_thread = std::thread(&TCPDataDispatcher::process, this);
+}
+
+void TCPDataDispatcher::write(const vector<uint8_t>& data)
+{
+ if (not m_running) {
+ throw runtime_error(m_exception_data);
+ }
+
+ for (auto& connection : m_connections) {
+ connection.queue.push(data);
+ }
+
+ m_connections.remove_if(
+ [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; });
+}
+
+void TCPDataDispatcher::process()
+{
+ try {
+ const int timeout_ms = 1000;
+
+ while (m_running) {
+ // Add a new TCPConnection to the list, constructing it from the client socket
+ auto sock = m_listener_socket.accept(timeout_ms);
+ if (sock.valid()) {
+ m_connections.emplace(m_connections.begin(), move(sock));
+ }
+ }
+ }
+ catch (const std::runtime_error& e) {
+ m_exception_data = string("TCPDataDispatcher error: ") + e.what();
+ m_running = false;
+ }
+}
+
+TCPReceiveServer::TCPReceiveServer(size_t blocksize) :
+ m_blocksize(blocksize)
+{
+}
+
+void TCPReceiveServer::start(int listen_port, const std::string& address)
+{
+ m_listener_socket.listen(listen_port, address);
+
+ m_running = true;
+ m_listener_thread = std::thread(&TCPReceiveServer::process, this);
+}
+
+TCPReceiveServer::~TCPReceiveServer()
+{
+ m_running = false;
+ if (m_listener_thread.joinable()) {
+ m_listener_thread.join();
+ }
+}
+
+vector<uint8_t> TCPReceiveServer::receive()
+{
+ vector<uint8_t> buffer;
+ m_queue.try_pop(buffer);
+
+ // we can ignore try_pop()'s return value, because
+ // if it is unsuccessful the buffer is not touched.
+ return buffer;
+}
+
+void TCPReceiveServer::process()
+{
+ constexpr int timeout_ms = 1000;
+ constexpr int disconnect_timeout_ms = 10000;
+ constexpr int max_num_timeouts = disconnect_timeout_ms / timeout_ms;
+
+ while (m_running) {
+ auto sock = m_listener_socket.accept(timeout_ms);
+
+ int num_timeouts = 0;
+
+ while (m_running and sock.valid()) {
+ try {
+ vector<uint8_t> buf(m_blocksize);
+ ssize_t r = sock.recv(buf.data(), buf.size(), 0, timeout_ms);
+ if (r < 0) {
+ throw logic_error("Invalid recv return value");
+ }
+ else {
+ buf.resize(r);
+ m_queue.push(move(buf));
+ }
+ }
+ catch (const TCPSocket::Interrupted&) {
+ break;
+ }
+ catch (const TCPSocket::Timeout&) {
+ num_timeouts++;
+ }
+
+ if (num_timeouts > max_num_timeouts) {
+ sock.close();
+ }
+ }
+ }
+}
+
+}
diff --git a/lib/Socket.h b/lib/Socket.h
new file mode 100644
index 0000000..8bb7fe1
--- /dev/null
+++ b/lib/Socket.h
@@ -0,0 +1,294 @@
+/*
+ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
+ Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include "ThreadsafeQueue.h"
+#include <cstdlib>
+#include <iostream>
+#include <vector>
+#include <atomic>
+#include <thread>
+#include <list>
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <unistd.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#include <pthread.h>
+#define SOCKET int
+#define INVALID_SOCKET -1
+#define SOCKET_ERROR -1
+
+
+namespace Socket {
+
+struct InetAddress {
+ struct sockaddr_storage addr;
+
+ struct sockaddr *as_sockaddr() { return reinterpret_cast<sockaddr*>(&addr); };
+
+ void resolveUdpDestination(const std::string& destination, int port);
+};
+
+/** This class represents a UDP packet.
+ *
+ * A UDP packet contains a payload (sequence of bytes) and an address. For
+ * outgoing packets, the address is the destination address. For incoming
+ * packets, the address tells the user from what source the packet arrived from.
+ */
+class UDPPacket
+{
+ public:
+ UDPPacket();
+ UDPPacket(size_t initSize);
+
+ std::vector<uint8_t> buffer;
+ InetAddress address;
+};
+
+/**
+ * This class represents a socket for sending and receiving UDP packets.
+ *
+ * A UDP socket is the sending or receiving point for a packet delivery service.
+ * Each packet sent or received on a datagram socket is individually
+ * addressed and routed. Multiple packets sent from one machine to another may
+ * be routed differently, and may arrive in any order.
+ */
+class UDPSocket
+{
+ public:
+ /** Create a new socket that will not be bound to any port. To be used
+ * for data output.
+ */
+ UDPSocket();
+ /** Create a new socket.
+ * @param port The port number on which the socket will be bound
+ */
+ UDPSocket(int port);
+ /** Create a new socket.
+ * @param port The port number on which the socket will be bound
+ * @param name The IP address on which the socket will be bound.
+ * It is used to bind the socket on a specific interface if
+ * the computer have many NICs.
+ */
+ UDPSocket(int port, const std::string& name);
+ ~UDPSocket();
+ UDPSocket(const UDPSocket& other) = delete;
+ const UDPSocket& operator=(const UDPSocket& other) = delete;
+
+ /** Close the already open socket, and create a new one. Throws a runtime_error on error. */
+ void reinit(int port);
+ void reinit(int port, const std::string& name);
+
+ void close(void);
+ void send(UDPPacket& packet);
+ void send(const std::vector<uint8_t>& data, InetAddress destination);
+ UDPPacket receive(size_t max_size);
+ void joinGroup(const char* groupname, const char* if_addr = nullptr);
+ void setMulticastSource(const char* source_addr);
+ void setMulticastTTL(int ttl);
+
+ /** Set blocking mode. By default, the socket is blocking.
+ * throws a runtime_error on error.
+ */
+ void setBlocking(bool block);
+
+ protected:
+ SOCKET m_sock;
+};
+
+/* Threaded UDP receiver */
+class UDPReceiver {
+ public:
+ UDPReceiver();
+ ~UDPReceiver();
+ UDPReceiver(const UDPReceiver&) = delete;
+ UDPReceiver operator=(const UDPReceiver&) = delete;
+
+ // Start the receiver in a separate thread
+ void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued);
+
+ // Get the data contained in a UDP packet, blocks if none available
+ // In case of error, throws a runtime_error
+ std::vector<uint8_t> get_packet_buffer(void);
+
+ private:
+ void m_run(void);
+
+ int m_port = 0;
+ std::string m_bindto;
+ std::string m_mcastaddr;
+ size_t m_max_packets_queued = 1;
+ std::thread m_thread;
+ std::atomic<bool> m_stop = ATOMIC_VAR_INIT(false);
+ ThreadsafeQueue<UDPPacket> m_packets;
+ UDPSocket m_sock;
+};
+
+class TCPSocket {
+ public:
+ TCPSocket();
+ ~TCPSocket();
+ TCPSocket(const TCPSocket& other) = delete;
+ TCPSocket& operator=(const TCPSocket& other) = delete;
+ TCPSocket(TCPSocket&& other);
+ TCPSocket& operator=(TCPSocket&& other);
+
+ bool valid(void) const;
+ void connect(const std::string& hostname, int port);
+ void listen(int port, const std::string& name);
+ void close(void);
+
+ /* throws a runtime_error on failure, an invalid socket on timeout */
+ TCPSocket accept(int timeout_ms);
+
+ /* returns -1 on error, doesn't work on nonblocking sockets */
+ ssize_t sendall(const void *buffer, size_t buflen);
+
+ /** Send data over the TCP connection.
+ * @param data The buffer that will be sent.
+ * @param size Number of bytes to send.
+ * @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout
+ * return number of bytes sent, 0 on timeout, or throws runtime_error.
+ */
+ ssize_t send(const void* data, size_t size, int timeout_ms=0);
+
+ /* Returns number of bytes read, 0 on disconnect. Throws a
+ * runtime_error on error */
+ ssize_t recv(void *buffer, size_t length, int flags);
+
+ class Timeout {};
+ class Interrupted {};
+ /* Returns number of bytes read, 0 on disconnect or refused connection.
+ * Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error
+ * on error
+ */
+ ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
+
+ private:
+ explicit TCPSocket(int sockfd);
+ explicit TCPSocket(int sockfd, InetAddress remote_address);
+ SOCKET m_sock = -1;
+
+ InetAddress m_remote_address;
+
+ friend class TCPClient;
+};
+
+/* Implements a TCP receiver that auto-reconnects on errors */
+class TCPClient {
+ public:
+ void connect(const std::string& hostname, int port);
+
+ /* Returns numer of bytes read, 0 on auto-reconnect, -1
+ * on interruption.
+ * Throws a runtime_error on error */
+ ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
+
+ private:
+ void reconnect(void);
+ TCPSocket m_sock;
+ std::string m_hostname;
+ int m_port;
+};
+
+/* Helper class for TCPDataDispatcher, contains a queue of pending data and
+ * a sender thread. */
+class TCPConnection
+{
+ public:
+ TCPConnection(TCPSocket&& sock);
+ TCPConnection(const TCPConnection&) = delete;
+ TCPConnection& operator=(const TCPConnection&) = delete;
+ ~TCPConnection();
+
+ ThreadsafeQueue<std::vector<uint8_t> > queue;
+
+ private:
+ std::atomic<bool> m_running;
+ std::thread m_sender_thread;
+ TCPSocket m_sock;
+
+ void process(void);
+};
+
+/* Send a TCP stream to several destinations, and automatically disconnect destinations
+ * whose buffer overflows.
+ */
+class TCPDataDispatcher
+{
+ public:
+ TCPDataDispatcher(size_t max_queue_size);
+ ~TCPDataDispatcher();
+ TCPDataDispatcher(const TCPDataDispatcher&) = delete;
+ TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete;
+
+ void start(int port, const std::string& address);
+ void write(const std::vector<uint8_t>& data);
+
+ private:
+ void process();
+
+ size_t m_max_queue_size;
+
+ std::atomic<bool> m_running;
+ std::string m_exception_data;
+ std::thread m_listener_thread;
+ TCPSocket m_listener_socket;
+ std::list<TCPConnection> m_connections;
+};
+
+/* A TCP Server to receive data, which abstracts the handling of connects and disconnects.
+ */
+class TCPReceiveServer {
+ public:
+ TCPReceiveServer(size_t blocksize);
+ ~TCPReceiveServer();
+ TCPReceiveServer(const TCPReceiveServer&) = delete;
+ TCPReceiveServer& operator=(const TCPReceiveServer&) = delete;
+
+ void start(int listen_port, const std::string& address);
+
+ // Return a vector that contains up to blocksize bytes of data, or
+ // and empty vector if no data is available.
+ std::vector<uint8_t> receive();
+
+ private:
+ void process();
+
+ size_t m_blocksize = 0;
+ ThreadsafeQueue<std::vector<uint8_t> > m_queue;
+ std::atomic<bool> m_running;
+ std::string m_exception_data;
+ std::thread m_listener_thread;
+ TCPSocket m_listener_socket;
+};
+
+}
diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h
new file mode 100644
index 0000000..62f4c96
--- /dev/null
+++ b/lib/ThreadsafeQueue.h
@@ -0,0 +1,176 @@
+/*
+ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
+ Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ An implementation for a threadsafe queue, depends on C++11
+
+ When creating a ThreadsafeQueue, one can specify the minimal number
+ of elements it must contain before it is possible to take one
+ element out.
+ */
+/*
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <mutex>
+#include <condition_variable>
+#include <queue>
+#include <utility>
+
+/* This queue is meant to be used by two threads. One producer
+ * that pushes elements into the queue, and one consumer that
+ * retrieves the elements.
+ *
+ * The queue can make the consumer block until an element
+ * is available, or a wakeup requested.
+ */
+
+/* Class thrown by blocking pop to tell the consumer
+ * that there's a wakeup requested. */
+class ThreadsafeQueueWakeup {};
+
+template<typename T>
+class ThreadsafeQueue
+{
+public:
+ /* Push one element into the queue, and notify another thread that
+ * might be waiting.
+ *
+ * returns the new queue size.
+ */
+ size_t push(T const& val)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ the_queue.push(val);
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ size_t push(T&& val)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ the_queue.emplace(std::move(val));
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ /* Push one element into the queue, but wait until the
+ * queue size goes below the threshold.
+ *
+ * Notify waiting thread.
+ *
+ * returns the new queue size.
+ */
+ size_t push_wait_if_full(T const& val, size_t threshold)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ while (the_queue.size() >= threshold) {
+ the_tx_notification.wait(lock);
+ }
+ the_queue.push(val);
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ /* Trigger a wakeup event on a blocking consumer, which
+ * will receive a ThreadsafeQueueWakeup exception.
+ */
+ void trigger_wakeup(void)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ wakeup_requested = true;
+ lock.unlock();
+ the_rx_notification.notify_one();
+ }
+
+ /* Send a notification for the receiver thread */
+ void notify(void)
+ {
+ the_rx_notification.notify_one();
+ }
+
+ bool empty() const
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ return the_queue.empty();
+ }
+
+ size_t size() const
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ return the_queue.size();
+ }
+
+ bool try_pop(T& popped_value)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ if (the_queue.empty()) {
+ return false;
+ }
+
+ popped_value = the_queue.front();
+ the_queue.pop();
+
+ lock.unlock();
+ the_tx_notification.notify_one();
+
+ return true;
+ }
+
+ void wait_and_pop(T& popped_value, size_t prebuffering = 1)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ while (the_queue.size() < prebuffering and
+ not wakeup_requested) {
+ the_rx_notification.wait(lock);
+ }
+
+ if (wakeup_requested) {
+ wakeup_requested = false;
+ throw ThreadsafeQueueWakeup();
+ }
+ else {
+ std::swap(popped_value, the_queue.front());
+ the_queue.pop();
+
+ lock.unlock();
+ the_tx_notification.notify_one();
+ }
+ }
+
+private:
+ std::queue<T> the_queue;
+ mutable std::mutex the_mutex;
+ std::condition_variable the_rx_notification;
+ std::condition_variable the_tx_notification;
+ bool wakeup_requested = false;
+};
+
diff --git a/src/AVTEDIInput.cpp b/src/AVTEDIInput.cpp
deleted file mode 100644
index f8a9e60..0000000
--- a/src/AVTEDIInput.cpp
+++ /dev/null
@@ -1,738 +0,0 @@
-/* ------------------------------------------------------------------
- * Copyright (C) 2017 AVT GmbH - Fabien Vercasson
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- * -------------------------------------------------------------------
- */
-
-#include "AVTEDIInput.h"
-#include <cstring>
-#include <cstdio>
-#include <stdint.h>
-#include <limits.h>
-
-#include "crc.h"
-#include "OrderedQueue.h"
-
-extern "C" {
-#include <fec.h>
-}
-
-#define SUBCH_QUEUE_SIZE (50) /* In 24ms frames. Intermediate buffer */
-
-#define RS_DECODE 1 /* Set to 0 to disable rs decoding */
-#define RS_TEST1 0 /* Remove one fragment on each PFT */
-#define RS_TEST2 0 /* Remove regularily fragments */
-#define RS_TEST2_NBDROP 3 /* For RS_TEST2, nb packet remove on each time */
-
-#define PRINTF(fmt, A...) fprintf(stderr, fmt, ##A)
-//#define PRINTF(x ...)
-#define INFO(fmt, A...) fprintf(stderr, "AVT EDI: " fmt, ##A)
-//#define DEBUG(fmt, A...) fprintf(stderr, "AVT EDI: " fmt, ##A)
-#define DEBUG(X...)
-#define ERROR(fmt, A...) fprintf(stderr, "AVT EDI: ERROR " fmt, ##A)
-
-static int hideFirstPFTErrors = 30; /* Hide the errors that can occurs on */
- /* the first PFT, as they are likely incomplete */
-
-#define TAG_NAME_DETI (('d'<<24)|('e'<<16)|('t'<<8)|('i'))
-#define TAG_NAME_EST (('e'<<24)|('s'<<16)|('t'<<8))
-
-/* ------------------------------------------------------------------
-static void _dump(const uint8_t* buf, int size)
-{
- for( int i = 0 ; i < size ; i ++)
- {
- PRINTF("%02X ", buf[i]);
- if( (i+1) % 16 == 0 ) PRINTF("\n");
- }
- if( size % 16 != 0 ) PRINTF("\n");
-}
-*/
-
-/* ------------------------------------------------------------------
- *
- */
-static uint32_t unpack2(const uint8_t* buf)
-{
- return( buf[0] << 8 |
- buf[1]);
-}
-
-/* ------------------------------------------------------------------
- *
- */
-static uint32_t unpack3(const uint8_t* buf)
-{
- return( buf[0] << 16 |
- buf[1] << 8 |
- buf[2]);
-}
-
-/* ------------------------------------------------------------------
- *
- */
-static uint32_t unpack4(const uint8_t* buf)
-{
- return( buf[0] << 24 |
- buf[1] << 16 |
- buf[2] << 8 |
- buf[3]);
-}
-
-/* ------------------------------------------------------------------
- * bitpos 0 : left most bit.
- *
- */
-static uint32_t unpack1bit(uint8_t byte, int bitpos)
-{
- return (byte & 1 << (7-bitpos)) > (7-bitpos);
-}
-
-
-/* ------------------------------------------------------------------
- *
- */
-static bool _checkCRC(uint8_t* buf, size_t length)
-{
- if (length <= 2) return false;
-
- uint16_t CRC = unpack2(buf+length-2);
-
- uint16_t crc = 0xffff;
- crc = crc16(crc, buf, length-2);
- crc ^= 0xffff;
-
- return (CRC == crc);
-}
-
-/* ------------------------------------------------------------------
- *
- */
-AVTEDIInput::AVTEDIInput(uint32_t fragmentTimeoutMs)
- : _fragmentTimeoutMs(fragmentTimeoutMs)
-{
- _subChannelQueue = new OrderedQueue(5000, SUBCH_QUEUE_SIZE);
-}
-
-/* ------------------------------------------------------------------
- *
- */
-AVTEDIInput::~AVTEDIInput()
-{
- PFTIterator it = _pft.begin();
- while (it != _pft.end()) {
- delete it->second;
- it++;
- }
- delete _subChannelQueue;
-}
-
-/* ------------------------------------------------------------------
- *
- */
-bool AVTEDIInput::pushData(uint8_t* buf, size_t length)
-{
- bool identified = false;
-
- if (length >= 12 && buf[0] == 'P' && buf[1] == 'F')
- {
-
-#if RS_TEST2
- static int count=0;
- if (++count%1421<RS_TEST2_NBDROP)
- identified = true;
- else
-#endif // RS_TEST2
- identified = _pushPFTFrag(buf, length);
-
- }
- else if (length >= 10 && buf[0] == 'A' && buf[1] == 'F')
- {
- identified = _pushAF(buf, length, false);
- }
- return identified;
-}
-
-/* ------------------------------------------------------------------
- *
- */
-size_t AVTEDIInput::popFrame(std::vector<uint8_t>& data, int32_t& frameNumber)
-{
- return _subChannelQueue->pop(data, &frameNumber);
-}
-
-/* ------------------------------------------------------------------
- *
- */
-bool AVTEDIInput::_pushPFTFrag(uint8_t* buf, size_t length)
-{
- PFTFrag* frag = new PFTFrag(buf, length);
- bool isValid = frag->isValid();
- if (!isValid) {
- delete frag;
- } else {
- // Find PFT
- PFT* pft = NULL;
- PFTIterator it = _pft.find(frag->Pseq());
- if (it != _pft.end()) {
- pft = it->second;
- } else {
- // create PFT is new
- pft = new PFT(frag->Pseq(), frag->Fcount());
- if (_pft.insert(std::make_pair(frag->Pseq(), pft)).second == false)
- {
- // Not inserted
- delete pft;
- pft = NULL;
- }
- it = _pft.find(frag->Pseq());
- }
-
- if (pft) {
- // Add frag to PFT
- pft->pushPFTFrag(frag);
-
- // If the PFT is complete, extract the AF
- if (pft->complete()) {
- std::vector<uint8_t> af;
- bool ok = pft->extractAF(af);
-
- if (ok) {
- _pushAF(af.data(), af.size(), ok);
- } else {
- ERROR("AF Frame Corrupted, Size=%zu\n", af.size());
- //_dump(af.data(), 10);
- }
-
- _pft.erase(it);
- delete pft;
- }
- }
- }
-
- // Check old incomplete PFT to either try to extract AF or discard it
- // TODO
- const auto now = std::chrono::steady_clock::now();
- const auto timeout_duration = std::chrono::milliseconds(_fragmentTimeoutMs);
-
- PFTIterator it = _pft.begin();
- while (it != _pft.end()) {
- PFT* pft = it->second;
- bool erased = false;
- if (pft) {
- const auto creation = pft->creation();
- const auto diff = now - creation;
- if (diff > timeout_duration) {
- //DEBUG("PFT timeout\n");
- std::vector<uint8_t> af;
- bool ok = pft->extractAF(af);
- if (ok) {
- _pushAF(af.data(), af.size(), ok);
- } else {
- //ERROR("AF Frame CorruptedSize=%zu\n", af.size());
- //_dump(af.data(), 10);
- }
-
- it = _pft.erase(it);
- delete pft;
- erased = true;
- }
- }
- if (!erased) ++it;
- }
-
- return isValid;
-}
-
-/* ------------------------------------------------------------------
- *
- */
-bool AVTEDIInput::_pushAF(uint8_t* buf, size_t length, bool checked)
-{
- bool ok = checked;
-
- // Check the AF integrity
- if (!ok) {
- // EDI specific, must have a CRC.
- if (length >= 12) {
- ok = (buf[0] == 'A' && buf[1] == 'F');
- ok &= _checkCRC(buf, length);
- }
- }
-
- int index = 0;
-
- index += 2;
- uint32_t LEN = unpack4(buf+index); index += 4;
- ok = (LEN == length-12);
- //uint32_t SEQ = unpack2(buf+index); index += 2;
-
- if (ok) {
- uint32_t CF = unpack1bit(buf[index], 0);
- uint32_t MAJ = (buf[index]&0x70) >> 4;
- uint32_t MIN = (buf[index]&0x0F);
- index += 1;
- uint32_t PT = buf[index]; index += 1;
-
- // EDI specific
- ok = (CF == 1 && PT == 'T' && MAJ == 1 && MIN == 0);
-
-// DEBUG("AF Header: LEN=%u SEQ=%u CF=%u MAJ=%u MIN=%u PT=%c ok=%d\n",
-// LEN, SEQ, CF, MAJ, MIN, PT, ok);
- }
-
- if (ok) {
- // Extract the first stream and FrameCount from AF
- int tagIndex = index;
- uint32_t frameCount = 0;
- bool frameCountFound = false;
- int est0Index = 0;
- size_t est0Length = 0;
- // Iterate through tags
- while (tagIndex < (ssize_t)length - 2/*CRC*/ - 8/*Min tag length*/ && (!frameCountFound || est0Index==0) )
- {
- uint32_t tagName = unpack4(buf+tagIndex); tagIndex += 4;
- uint32_t tagLen = unpack4(buf+tagIndex); tagIndex += 4;
- uint32_t tagLenByte = (tagLen+7)/8;
-// DEBUG("TAG %c%c%c%c size %u bits %u bytes\n",
-// tagName>>24&0xFF, tagName>>16&0xFF, tagName>>8&0xFF, tagName&0xFF,
-// tagLen, tagLenByte);
-
- if (tagName == TAG_NAME_DETI) {
- uint32_t FCTH = buf[tagIndex] & 0x1F;
- uint32_t FCT = buf[tagIndex+1];
- frameCount = FCTH * 250 + FCT;
- frameCountFound = true;
-// DEBUG("frameCount=%u\n", frameCount);
- } else if ((tagName & 0xFFFFFF00) == TAG_NAME_EST) {
- est0Index = tagIndex+3 /*3 bytes SSTC*/;
- est0Length = tagLenByte-3;
-// DEBUG("Stream found at index %u, size=%zu\n", est0Index, est0Length);
- }
-
- tagIndex += tagLenByte;
- }
- if (frameCountFound && est0Index !=0) {
- _subChannelQueue->push(frameCount, buf+est0Index, est0Length);
- } else {
- ok = false;
- }
- }
-
- return ok;
-}
-
-/* ------------------------------------------------------------------
- * ------------------------------------------------------------------
- * ------------------------------------------------------------------
- * ------------------------------------------------------------------
- */
-
-/* ------------------------------------------------------------------
- *
- */
-//static int nbPFTFrag = 0;
-PFTFrag::PFTFrag(uint8_t* buf, size_t length)
-{
- //DEBUG("+ PFTFrag %d\n", ++nbPFTFrag);
- _valid = _parse(buf, length);
-}
-
-/* ------------------------------------------------------------------
- *
- */
-PFTFrag::~PFTFrag()
-{
- //DEBUG("- PFTFrag %d\n", --nbPFTFrag);
-}
-
-/* ------------------------------------------------------------------
- *
- */
-bool PFTFrag::_parse(uint8_t* buf, size_t length)
-{
- int index = 0;
-
- // Parse PFT Fragment Header (ETSI TS 102 821 V1.4.1 ch7.1)
- index += 2; // Psync
-
- _Pseq = unpack2(buf+index); index += 2;
- _Findex = unpack3(buf+index); index += 3;
- _Fcount = unpack3(buf+index); index += 3;
- _FEC = unpack1bit(buf[index], 0);
- _Addr = unpack1bit(buf[index], 1);
- _Plen = unpack2(buf+index) & 0x3FFF; index += 2;
-
- // Optional RS Header
- _RSk = 0;
- _RSz = 0;
- if (_FEC) {
- _RSk = buf[index]; index += 1;
- _RSz = buf[index]; index += 1;
- }
-
- // Optional transport header
- _Source = 0;
- _Dest = 0;
- if (_Addr) {
- _Source = unpack2(buf+index); index += 2;
- _Dest = unpack2(buf+index); index += 2;
- }
-
- index += 2;
- bool isValid = (_FEC==0) || _checkCRC(buf, index);
- isValid &= length == index + _Plen;
-
- if (!isValid) {
-// DEBUG("PFT isValid=%d Pseq=%u Findex=%u Fcount=%u FEC=%u "
-// "Addr=%u Plen=%u",
-// isValid, _Pseq, _Findex, _Fcount, _FEC,
-// _Addr, _Plen);
- if (_FEC) PRINTF(" RSk=%u RSz=%u", _RSk, _RSz);
- if (_Addr) PRINTF(" Source=%u Dest=%u", _Source, _Dest);
- PRINTF("\n");
- }
-
- if (isValid) {
- _payload.resize(_Plen);
- memcpy(_payload.data(), buf+index, _Plen);
- }
-
- return isValid;
-}
-
-/* ------------------------------------------------------------------
- * ------------------------------------------------------------------
- * ------------------------------------------------------------------
- * ------------------------------------------------------------------
- */
-void* PFT::_rs_handler = NULL;
-
-/* ------------------------------------------------------------------
- *
- */
-//static int nbPFT = 0;
-PFT::PFT(uint32_t Pseq, uint32_t Fcount)
- : _frags(NULL)
- , _Pseq(Pseq)
- , _Fcount(Fcount)
- , _Plen(0)
- , _nbFrag(0)
- , _RSk(0)
- , _RSz(0)
- , _cmax(0)
- , _rxmin(0)
- , _creation(std::chrono::steady_clock::now())
-{
-// DEBUG("+ PFT %d\n", ++nbPFT);
- if (Fcount > 0) {
- _frags = new PFTFrag* [Fcount];
- memset(_frags, 0, Fcount*sizeof(PFTFrag*));
- }
-}
-
-/* ------------------------------------------------------------------
- *
- */
-PFT::~PFT()
-{
-// DEBUG("- PFT %d\n", --nbPFT);
- if (_frags) {
- for (size_t i = 0 ; i < _Fcount ; i++) {
- delete _frags[i];
- }
- delete [] _frags;
- }
-}
-
-/* ------------------------------------------------------------------
- * static
- */
-void PFT::_initRSDecoder()
-{
-#if RS_DECODE
- if (!_rs_handler) {
- // From ODR-DabMux: PFT.h/cpp and ReedSolomon.h/cpp
-
- // Create the RS(k+p,k) encoder
- const int firstRoot = 1; // Discovered by analysing EDI dump
- const int gfPoly = 0x11d;
-
- // The encoding has to be 255, 207 always, because the chunk has to
- // be padded at the end, and not at the beginning as libfec would
- // do
- const int N = 255;
- const int K = 207;
- const int primElem = 1;
- const int symsize = 8;
- const int nroots = N - K; // For EDI PFT, this must be 48
- const int pad = ((1 << symsize) - 1) - N; // is 255-N
-
- _rs_handler = init_rs_char(symsize, gfPoly, firstRoot, primElem, nroots, pad);
-
-
-/* TEST RS CODE */
-#if 0
-
- // Populate data
- uint8_t data[255];
- memset(data, 0x00, 255);
- for (int i=0;i<207;i++) data[i] = i%10;
-
- // Add RS Code
- encode_rs_char(_rs_handler, data, data+207);
- _dump(data, 255);
-
- // Disturb data
- for (int i=50; i<50+24; i++) data[i]+=0x50;
-
- // Correct data
- int nbErr = decode_rs_char(_rs_handler, data, NULL, 0);
- printf("nbErr=%d\n", nbErr);
- _dump(data, 255);
-
- // Check data
- for (int i=0;i<207;i++) {
- if (data[i] != i%10) {
- printf("Error position %d %hhu != %d\n", i, data[i], i%10);
- }
- }
-
- // STOP (sorry :-| )
- int* i=0;
- *i = 9;
-#endif // 0
- }
-#endif
-}
-
-/* ------------------------------------------------------------------
- *
- */
-void PFT::pushPFTFrag(PFTFrag* frag)
-{
- uint32_t Findex = frag->Findex();
-#if RS_TEST1
- if (Findex != 0 && _frags[Findex] == NULL) /* TEST */
-#else
- if (_frags[Findex] == NULL)
-#endif
- {
- _frags[Findex] = frag;
- _nbFrag++;
-
- // Calculate the minimum number of fragment necessary to apply FEC
- // This can't be done with the last fragment that does may have a smaller size
- // ETSI TS 102 821 V1.4.1 ch 7.4.4
- if (_Plen == 0 && (Findex == 0 || Findex < (_Fcount-1)))
- {
- _Plen = frag->Plen();
- }
-
- if (_cmax == 0 && frag->FEC() && (Findex == 0 || Findex < (_Fcount-1)) && _Plen>0)
- {
- _RSk = frag->RSk();
- _RSz = frag->RSz();
- _cmax = (_Fcount*_Plen) / (_RSk+48);
- _rxmin = _Fcount - (_cmax*48)/_Plen;
- }
- } else {
- // Already received, delete the fragment
- delete frag;
- }
-}
-
-/* ------------------------------------------------------------------
- *
- */
-bool PFT::complete()
-{
-#if RS_TEST1
- return _nbFrag == _Fcount-1;
-#else
- return _nbFrag == _Fcount;
-#endif
-}
-
-/* ------------------------------------------------------------------
- *
- */
-bool PFT::_canAttemptToDecode()
-{
- if (complete()) return true;
-
- if (_cmax>0 && _nbFrag >= _rxmin) return true;
-
- return false;
-}
-
-/* ------------------------------------------------------------------
- *
- */
-bool PFT::extractAF(std::vector<uint8_t>& afdata)
-{
- bool ok = false;
-// DEBUG("extractAF from PFT %u. Fcount=%u nbFrag=%u Plen=%u cmax=%u rxmin=%u RSk=%u RSz=%u\n",
-// _Pseq, _Fcount, _nbFrag, _Plen, _cmax, _rxmin, _RSk, _RSz);
-
- if (_canAttemptToDecode()) {
- int totCorrectedErr = 0;
-
- if (_cmax > 0) // FEC present.
- {
- uint8_t* p_data_w;
- uint8_t* p_data_r;
- size_t data_len = 0;
-
- // Re-assemble RS block
- uint8_t rs_block[_Plen*_Fcount];
- int eras_pos[_cmax][/*48*/255]; /* 48 theoritically but ... */
- int no_eras[_cmax];
- memset(no_eras, 0, sizeof(no_eras));
-
- p_data_w = rs_block;
- for (size_t j = 0; j < _Fcount; ++j) {
- if (!_frags[j]) // fill with zeros if fragment is missing
- {
- for (size_t k = 0; k < _Plen; k++) {
- size_t pos = k * _Fcount;
- p_data_w[pos] = 0x00;
- size_t chunk = pos / (_RSk+48);
- size_t chunkpos = (pos) % (_RSk+48);
- if (chunkpos > _RSk) {
- chunkpos += (207-_RSk);
- }
- eras_pos[chunk][no_eras[chunk]] = chunkpos;
- no_eras[chunk]++;
- }
- } else {
- uint8_t* p_data_r = _frags[j]->payload();
- for (size_t k = 0; k < _frags[j]->Plen(); k++)
- p_data_w[k * _Fcount] = *p_data_r++;
- for (size_t k = _frags[j]->Plen(); k < _Plen; k++)
- p_data_w[k * _Fcount] = 0x00;
- }
- p_data_w++;
- }
-
- // Apply RS Code
-#if RS_DECODE
- uint8_t rs_chunks[255 * _cmax];
- _initRSDecoder();
- if (_rs_handler) {
- size_t k = _RSk;
- memset(rs_chunks, 0, sizeof(rs_chunks));
- p_data_w = rs_chunks;
- p_data_r = rs_block;
- for (size_t j = 0; j < _cmax; j++) {
- memcpy(p_data_w, p_data_r, k);
- p_data_w += k;
- p_data_r += k;
- if (k < 207)
- memset(p_data_w, 0, 207 - k);
- p_data_w += 207 - k;
- memcpy(p_data_w, p_data_r, 48);
- p_data_w += 48;
- p_data_r += 48;
- }
-
- p_data_r = rs_chunks;
- for (size_t j = 0 ; j < _cmax && totCorrectedErr != -1 ; j++) {
-#if RS_TEST1 || RS_TEST2
- if (no_eras[j]>0) {
- DEBUG("RS Chuck %d: %d errors\n", j, no_eras[j]);
- }
-#endif
- int nbErr = decode_rs_char(_rs_handler, p_data_r, eras_pos[j], no_eras[j]);
-// int nbErr = decode_rs_char(_rs_handler, p_data_r, NULL, 0);
- if (nbErr >= 0) {
-#if RS_TEST1 || RS_TEST2
- if (nbErr > 0) DEBUG("RS Chuck %d: %d corrections\n", j, nbErr);
-#endif
- totCorrectedErr += nbErr;
- } else {
-#if RS_TEST1 || RS_TEST2
- DEBUG("RS Chuck %d: too many errors\n", j);
-#endif
- totCorrectedErr = -1;
- }
- p_data_r += 255;
- }
-#if RS_TEST1 || RS_TEST2
- if (totCorrectedErr>0) {
- DEBUG("RS corrected %d errors in %d chunks\n", totCorrectedErr, _cmax);
- }
-#endif
- }
-#endif // RS_DECODE
- // Assemble AF frame from rs code
- /* --- re-assemble packet from Reed-Solomon block ----------- */
- afdata.resize(_Plen*_Fcount);
- p_data_w = afdata.data();
-#if RS_DECODE
- p_data_r = rs_chunks;
- for (size_t j = 0; j < _cmax; j++) {
- memcpy(p_data_w, p_data_r, _RSk);
- p_data_w += _RSk;
- p_data_r += 255;
- data_len += _RSk;
- }
-#else
- p_data_r = rs_block;
- for (size_t j = 0; j < _cmax; j++) {
- memcpy(p_data_w, p_data_r, _RSk);
- p_data_w += _RSk;
- p_data_r += _RSk + 48;
- data_len += _RSk;
- }
-#endif // RS_DECODE
- data_len -= _RSz;
- afdata.resize(data_len);
- } else { // No Fec Just assemble packets
- afdata.resize(0);
- for (size_t j = 0; j < _Fcount; ++j) {
- if (_frags[j])
- {
- afdata.insert(afdata.end(),
- _frags[j]->payloadVector().begin(), _frags[j]->payloadVector().end());
- }
- }
- }
-
- // EDI specific, must have a CRC.
- if( afdata.size()>=12 ) {
- ok = _checkCRC(afdata.data(), afdata.size());
- if (ok && totCorrectedErr > 0) {
- if (hideFirstPFTErrors==0) {
- INFO("AF reconstructed from %u/%u PFT fragments\n", _nbFrag, _Fcount);
- }
- }
- if (!ok && totCorrectedErr == -1) {
- if (hideFirstPFTErrors==0) {
- ERROR("Too many errors to reconstruct AF from %u/%u PFT fragments\n", _nbFrag, _Fcount);
- }
- }
- }
- }
- else {
- if (hideFirstPFTErrors==0) {
- ERROR("Not enough fragments to reconstruct AF from %u/%u PFT fragments (min=%u)\n", _nbFrag, _Fcount, _rxmin);
- }
- }
-
- if( hideFirstPFTErrors > 0 ) hideFirstPFTErrors--;
-
- return ok;
-}
diff --git a/src/AVTEDIInput.h b/src/AVTEDIInput.h
deleted file mode 100644
index a882278..0000000
--- a/src/AVTEDIInput.h
+++ /dev/null
@@ -1,188 +0,0 @@
-/* ------------------------------------------------------------------
- * Copyright (C) 2017 AVT GmbH - Fabien Vercasson
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- * -------------------------------------------------------------------
- */
-
-
-/*! \section AVT Input
- *
- * Extract audio frame from EDI frames produced by AVT encoder.
- *
- * The EDI frames are not special, it is just assumed that the audio is transported
- * into the first stream.
- *
- * PFT with spreaded packets is supported. TODO
- * Error correction is applied TODO
- * AF without PFT supported TODO
- * Resend not supported
- *
- * ref: ETSI TS 102 821 V1.4.1
- * ETSI TS 102 693 V1.1.2
- */
-
-#ifndef _AVT_EDI_INPUT_
-#define _AVT_EDI_INPUT_
-
-#include <stdint.h>
-#include <stdio.h>
-#include <string>
-#include <map>
-#include <vector>
-#include <chrono>
-
-class OrderedQueue;
-class PFTFrag;
-class PFT;
-class EDISubCh;
-
-/* ------------------------------------------------------------------
- *
- */
-class AVTEDIInput
-{
- public:
- /*\param fragmentTimeoutMs How long to wait for all fragment before applying FEC or dropping old frames*/
- AVTEDIInput(uint32_t fragmentTimeoutMs = 120);
- AVTEDIInput(const AVTEDIInput&) = delete;
- AVTEDIInput& operator=(const AVTEDIInput&) = delete;
- ~AVTEDIInput();
-
- /*! Push new data to edi decoder
- * \return false is data is not EDI
- */
- bool pushData(uint8_t* buf, size_t length);
-
- /*! Give next available audio frame from EDI
- * \return The size of the buffer. 0 if not data available
- */
- size_t popFrame(std::vector<uint8_t>& data, int32_t& frameNumber);
-
- private:
- uint32_t _fragmentTimeoutMs;
- std::map<int, PFT*> _pft;
- typedef std::map<int, PFT*>::iterator PFTIterator;
-
- OrderedQueue *_subChannelQueue;
-
- bool _pushPFTFrag(uint8_t* buf, size_t length);
- bool _pushAF(uint8_t* buf, size_t length, bool checked);
-};
-
-/* ------------------------------------------------------------------
- *
- */
-class PFTFrag
-{
- public:
- PFTFrag(uint8_t* buf, size_t length);
- ~PFTFrag();
- PFTFrag(const PFTFrag&) = delete;
- PFTFrag& operator=(const PFTFrag&) = delete;
-
- inline bool isValid() { return _valid; }
- inline uint32_t Pseq() { return _Pseq; }
- inline uint32_t Findex() { return _Findex; }
- inline uint32_t Fcount() { return _Fcount; }
- inline uint32_t FEC() { return _FEC; }
- inline uint32_t Plen() { return _Plen; }
- inline uint32_t RSk() { return _RSk; }
- inline uint32_t RSz() { return _RSz; }
- inline uint8_t* payload() { return _payload.data(); }
- inline const std::vector<uint8_t>& payloadVector()
- { return _payload; }
-
- private:
- std::vector<uint8_t> _payload;
-
- uint32_t _Pseq;
- uint32_t _Findex;
- uint32_t _Fcount;
- uint32_t _FEC;
- uint32_t _Addr;
- uint32_t _Plen;
- uint32_t _RSk;
- uint32_t _RSz;
- uint32_t _Source;
- uint32_t _Dest;
- bool _valid;
-
- bool _parse(uint8_t* buf, size_t length);
-};
-
-/* ------------------------------------------------------------------
- *
- */
-class PFT
-{
- public:
- PFT(uint32_t Pseq, uint32_t Fcount);
- ~PFT();
- PFT(const PFT&) = delete;
- PFT& operator=(const PFT&) = delete;
-
- /*! the given frag belongs to the PFT class,
- *! it will be deleted by the class */
- void pushPFTFrag(PFTFrag* frag);
-
- /* \return true if all framgnements are received*/
- bool complete();
-
- /*! try to build the AF with received fragments.
- *! Apply error correction if necessary (missing packets/CRC errors)
- * \return true if the AF is completed
- */
- bool extractAF(std::vector<uint8_t>& afdata);
-
- inline std::chrono::steady_clock::time_point creation()
- { return _creation; }
-
- private:
- PFTFrag** _frags;
- uint32_t _Pseq;
- uint32_t _Fcount;
- uint32_t _Plen;
- uint32_t _nbFrag;
- uint32_t _RSk;
- uint32_t _RSz;
- uint32_t _cmax;
- uint32_t _rxmin;
-
- std::chrono::steady_clock::time_point _creation;
-
- bool _canAttemptToDecode();
-
- static void* _rs_handler;
- static void _initRSDecoder();
-};
-
-/* ------------------------------------------------------------------
- *
- */
-class EDISubCh {
- public:
- EDISubCh(uint8_t* buf, size_t length);
-
- inline uint32_t frameCount() { return _frameCount; }
- inline uint8_t* payload() { return _payload.data(); }
- inline const std::vector<uint8_t>& payloadVector()
- { return _payload; }
-
- private:
- uint32_t _frameCount;
- std::vector<uint8_t> _payload;
-};
-
-#endif // _AVT_EDI_INPUT_
diff --git a/src/AVTInput.cpp b/src/AVTInput.cpp
index 48b2de1..973ed7b 100644
--- a/src/AVTInput.cpp
+++ b/src/AVTInput.cpp
@@ -56,24 +56,25 @@ static uint32_t unpack2(const uint8_t* buf)
return (buf[0] << 8) | buf[1];
}
-AVTInput::AVTInput(const std::string& input_uri, const std::string& output_uri, uint32_t pad_port, size_t jitterBufferSize)
- : _input_uri(input_uri),
- _output_uri(output_uri),
- _pad_port(pad_port),
- _jitterBufferSize(jitterBufferSize),
-
- _output_packet(2048),
- _input_pad_packet(2048),
- _ordered(5000, _jitterBufferSize),
- _lastInfoFrameType(_typeCantExtract)
+AVTInput::AVTInput(const std::string& input_uri,
+ const std::string& output_uri,
+ uint32_t pad_port,
+ size_t jitterBufferSize) :
+ _input_uri(input_uri),
+ _output_uri(output_uri),
+ _pad_port(pad_port),
+ _jitterBufferSize(jitterBufferSize),
+
+ _output_packet(2048),
+ _input_pad_packet(2048),
+ _ordered(5000, _jitterBufferSize),
+ _lastInfoFrameType(_typeCantExtract)
{
}
int AVTInput::prepare(void)
{
- UdpSocket::init();
-
INFO("Open input socket\n");
int ret = _openSocketSrv(&_input_socket, _input_uri.c_str());
@@ -82,7 +83,7 @@ int AVTInput::prepare(void)
ret = _openSocketCli();
}
- if ( ret == 0 && _pad_port > 0) {
+ if (ret == 0 && _pad_port > 0) {
INFO("Open PAD Port %d\n", _pad_port);
char uri[50];
sprintf(uri, "udp://:%d", _pad_port);
@@ -105,13 +106,13 @@ int AVTInput::setDabPlusParameters(int bitrate, int channels, int sample_rate, b
return 1;
}
- if ( sample_rate != 48000 && sample_rate != 32000 ) {
+ if (sample_rate != 48000 && sample_rate != 32000) {
ERROR("Bad sample rate for DAB+ (32000,48000)");
return 1;
}
_dac = sample_rate == 48000 ? AVT_DAC_48 : AVT_DAC_32;
- if ( channels != 1 && channels != 2 ) {
+ if (channels != 1 && channels != 2) {
ERROR("Bad channel number for DAB+ (1,2)");
return 1;
}
@@ -169,7 +170,7 @@ bool AVTInput::_parseURI(const char* uri, std::string& address, long& port)
return true;
}
-int AVTInput::_openSocketSrv(UdpSocket* socket, const char* uri)
+int AVTInput::_openSocketSrv(Socket::UDPSocket* socket, const char* uri)
{
int returnCode = -1;
@@ -178,27 +179,13 @@ int AVTInput::_openSocketSrv(UdpSocket* socket, const char* uri)
if (_parseURI(uri, address, port)) {
returnCode = 0;
- if (socket->create(port) == -1) {
- fprintf(stderr, "can't set port %li on Udp input (%s: %s)\n",
- port, inetErrDesc, inetErrMsg);
- returnCode = -1;
- }
+ socket->reinit(port);
if (!address.empty()) {
- // joinGroup should accept const char*
- if (socket->joinGroup((char*)address.c_str()) == -1) {
- fprintf(stderr,
- "can't join multicast group %s (%s: %s)\n",
- address.c_str(), inetErrDesc, inetErrMsg);
- returnCode = -1;
- }
+ socket->joinGroup(address.c_str());
}
- if (socket->setBlocking(false) == -1) {
- fprintf(stderr, "can't set Udp input socket in non-blocking mode "
- "(%s: %s)\n", inetErrDesc, inetErrMsg);
- returnCode = -1;
- }
+ socket->setBlocking(false);
}
return returnCode;
@@ -216,74 +203,16 @@ int AVTInput::_openSocketCli()
return -1;
}
- if (_output_packet.getAddress().setAddress(address.c_str()) == -1) {
- fprintf(stderr, "Can't set address %s (%s: %s)\n", address.c_str(),
- inetErrDesc, inetErrMsg);
- return -1;
- }
-
- _output_packet.getAddress().setPort(port);
-
- if (_output_socket.create() == -1) {
- fprintf(stderr, "Can't create UDP socket (%s: %s)\n",
- inetErrDesc, inetErrMsg);
- return -1;
- }
-
+ _output_packet.address.resolveUdpDestination(address.c_str(), port);
return 0;
}
-/* ------------------------------------------------------------------
- * From ODR-Dabmux dabInputUdp::dabInputUdpRead
- */
-ssize_t AVTInput::_read(uint8_t* buf, size_t size, bool onlyOnePacket)
-{
- size_t nbBytes = 0;
-
- uint8_t* data = buf;
- UdpPacket _input_packet(2048);
-
- if (_input_packet.getLength() == 0) {
- _input_socket.receive(_input_packet);
- }
-
- while (nbBytes < size) {
- size_t freeSize = size - nbBytes;
- if (_input_packet.getLength() > freeSize) {
- // Not enought place in output
- memcpy(&data[nbBytes], _input_packet.getData(), freeSize);
- nbBytes = size;
- _input_packet.setOffset(_input_packet.getOffset() + freeSize);
- }
- else {
- size_t length = _input_packet.getLength();
- memcpy(&data[nbBytes], _input_packet.getData(), length);
- nbBytes += length;
- _input_packet.setOffset(0);
-
- _input_socket.receive(_input_packet);
- if (_input_packet.getLength() == 0 || onlyOnePacket) {
- break;
- }
- }
- }
- bzero(&data[nbBytes], size - nbBytes);
-
- return nbBytes;
-}
-
-/* ------------------------------------------------------------------
- *
- */
bool AVTInput::_isSTI(const uint8_t* buf)
{
return (memcmp(buf+1, STI_FSync0, sizeof(STI_FSync0)) == 0) ||
(memcmp(buf+1, STI_FSync1, sizeof(STI_FSync1)) == 0);
}
-/* ------------------------------------------------------------------
- *
- */
const uint8_t* AVTInput::_findDABFrameFromUDP(const uint8_t* buf, size_t size,
int32_t& frameNumber, size_t& dataSize)
{
@@ -362,19 +291,13 @@ const uint8_t* AVTInput::_findDABFrameFromUDP(const uint8_t* buf, size_t size,
void AVTInput::_sendCtrlMessage()
{
if (!_output_uri.empty()) {
- uint8_t data[50];
- uint32_t index = 0;
-
- data[index++] = 0xFD;
- data[index++] = 0x07;
- data[index++] = _subChannelIndex;
- data[index++] = _audioMode;
- data[index++] = _dac;
- data[index++] = _monoMode;
-
- _output_packet.setOffset(0);
- _output_packet.setLength(0);
- _output_packet.addData(data, index);
+ std::vector<uint8_t> buf({ 0xFD, 0x07,
+ static_cast<uint8_t>(_subChannelIndex),
+ static_cast<uint8_t>(_audioMode),
+ static_cast<uint8_t>(_dac),
+ static_cast<uint8_t>(_monoMode)});
+
+ _output_packet.buffer = buf;
_output_socket.send(_output_packet);
INFO("Send control packet to encoder\n");
@@ -390,28 +313,21 @@ void AVTInput::_sendCtrlMessage()
* : 1 Byte : Size of pad data
* Pad datas : X Bytes : In natural order, strating with FPAD bytes
*/
-void AVTInput::_sendPADFrame(UdpPacket* packet)
+void AVTInput::_sendPADFrame()
{
- if (packet && _padFrameQueue.size() > 0) {
+ if (_padFrameQueue.size() > 0) {
std::vector<uint8_t> frame(move(_padFrameQueue.front()));
_padFrameQueue.pop();
- uint8_t data[500];
- uint32_t index = 0;
-
- data[index++] = 0xFD;
- data[index++] = 0x18;
- data[index++] = frame.size()+2;
- data[index++] = 0xAD;
- data[index++] = frame.size();
- memcpy( data+index, frame.data(), frame.size());
- index += frame.size();
+ std::vector<uint8_t> buf({ 0xFD, 0x18,
+ static_cast<uint8_t>(frame.size()+2),
+ 0xAD,
+ static_cast<uint8_t>(frame.size())});
- packet->setOffset(0);
- packet->setLength(0);
- packet->addData(data, index);
-
- _input_pad_socket.send(*packet);
+ Socket::UDPPacket packet;
+ packet.buffer = buf;
+ copy(frame.begin(), frame.end(), back_inserter(packet.buffer));
+ _input_pad_socket.send(packet);
}
}
@@ -421,111 +337,71 @@ void AVTInput::_sendPADFrame(UdpPacket* packet)
* Command code : 1 Byte
* * 0x17 = Request for 1 PAD Frame
*/
-void AVTInput::_interpretMessage(const uint8_t* data, size_t size, UdpPacket* packet)
+void AVTInput::_interpretMessage(const uint8_t* data, size_t size)
{
if (size >= 2) {
if (data[0] == 0xFD) {
switch (data[1]) {
case 0x17:
- _sendPADFrame(packet);
+ _sendPADFrame();
break;
}
}
}
}
-/* ------------------------------------------------------------------
- *
- */
bool AVTInput::_checkMessage()
{
- bool dataRecevied = false;
-
- if (_input_pad_packet.getLength() == 0) {
- _input_pad_socket.receive(_input_pad_packet);
+ const auto packet = _input_pad_socket.receive(2048);
+ if (packet.buffer.empty()) {
+ return false;
}
- if (_input_pad_packet.getLength() > 0) {
- _interpretMessage((uint8_t*)_input_pad_packet.getData(), _input_pad_packet.getLength(),
- &_input_pad_packet);
- _input_pad_packet.setOffset(0);
- _input_pad_socket.receive(_input_pad_packet);
-
- dataRecevied = true;
- }
+ _interpretMessage(packet.buffer.data(), packet.buffer.size());
- return dataRecevied;
+ return true;
}
-/* ------------------------------------------------------------------
- *
- */
void AVTInput::_purgeMessages()
{
- bool dataRecevied;
int nb = 0;
- do {
- dataRecevied = false;
- if (_input_pad_packet.getLength() == 0) {
- _input_pad_socket.receive(_input_pad_packet);
- }
-
- if (_input_pad_packet.getLength() > 0) {
- nb++;
- _input_pad_packet.setOffset(0);
- _input_pad_socket.receive(_input_pad_packet);
-
- dataRecevied = true;
- }
- } while (dataRecevied);
+ while (not _input_pad_socket.receive(2048).buffer.empty()) {
+ nb++;
+ }
if (nb>0) DEBUG("%d messages purged\n", nb);
}
-/* ------------------------------------------------------------------
- *
- */
bool AVTInput::_readFrame()
{
- bool dataRecevied = false;
-
- uint8_t readBuf[MAX_AVT_FRAME_SIZE];
int32_t frameNumber;
const uint8_t* dataPtr = NULL;
size_t dataSize = 0;
- std::vector<uint8_t> data;
- size_t readBytes = _read(readBuf, sizeof(readBuf), true/*onlyOnePacket*/);
- if (readBytes > 0)
- {
- dataRecevied = true;
+ auto packet = _input_socket.receive(MAX_AVT_FRAME_SIZE);
+ const size_t readBytes = packet.buffer.size();
+
+ if (readBytes > 0) {
+ const uint8_t *readBuf = packet.buffer.data();
if (readBytes > _dab24msFrameSize) {
// Extract frame data and frame number from buf
dataPtr = _findDABFrameFromUDP(readBuf, readBytes, frameNumber, dataSize);
}
-// if (!data) {
-// // Assuming pure RAW data
-// data = buf;
-// dataSize = _dab24msFrameSize;
-// frameNumber = _dummyFrameNumber++;
-// }
- if (!dataPtr) {
- _info(_typeCantExtract, 0);
- }
+
if (dataPtr) {
- if (dataSize == _dab24msFrameSize ) {
- if( _frameAligned || frameNumber%5 == 0) {
+ if (dataSize == _dab24msFrameSize) {
+ if (_frameAligned or frameNumber%5 == 0) {
#if defined(DISTURB_INPUT)
// Duplicate a frame
- if(frameNumber%250==0) _ordered.push(frameNumber, dataPtr, dataSize);
+ if (frameNumber % 250 == 0) _ordered.push(frameNumber, dataPtr, dataSize);
// Invert 2 frames (content inverted, audio distrubed by this test))
- if( frameNumber % 200 == 0) frameNumber += 10;
- else if( (frameNumber-10) % 200 == 0) frameNumber -= 10;
+ if (frameNumber % 200 == 0) frameNumber += 10;
+ else if ((frameNumber-10) % 200 == 0) frameNumber -= 10;
// Remove a frame (audio distrubed, frame missing)
- if(frameNumber%300 > 5)
+ if (frameNumber % 300 > 5)
#endif
_ordered.push(frameNumber, dataPtr, dataSize);
_frameAligned = true;
@@ -533,14 +409,14 @@ bool AVTInput::_readFrame()
}
else ERROR("Wrong frame size from encoder %zu != %zu\n", dataSize, _dab24msFrameSize);
}
+ else {
+ _info(_typeCantExtract, 0);
+ }
}
- return dataRecevied;
+ return readBytes > 0;
}
-/* ------------------------------------------------------------------
- *
- */
ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf)
{
ssize_t nbBytes = 0;
@@ -577,9 +453,6 @@ ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf)
return nbBytes;
}
-/* ------------------------------------------------------------------
- *
- */
void AVTInput::pushPADFrame(const uint8_t* buf, size_t size)
{
if (_pad_port == 0) {
@@ -593,17 +466,11 @@ void AVTInput::pushPADFrame(const uint8_t* buf, size_t size)
}
}
-/* ------------------------------------------------------------------
- *
- */
bool AVTInput::padQueueFull()
{
return _padFrameQueue.size() >= MAX_PAD_FRAME_QUEUE_SIZE;
}
-/* ------------------------------------------------------------------
- *
- */
void AVTInput::_info(_frameType type, size_t size)
{
if (_lastInfoFrameType != type || _lastInfoSize != size) {
diff --git a/src/AVTInput.h b/src/AVTInput.h
index ffa632f..62b2248 100644
--- a/src/AVTInput.h
+++ b/src/AVTInput.h
@@ -30,7 +30,7 @@
#pragma once
-#include "UdpSocket.h"
+#include "Socket.h"
#include "OrderedQueue.h"
#include <cstdint>
#include <cstdio>
@@ -101,11 +101,11 @@ class AVTInput
uint32_t _pad_port;
size_t _jitterBufferSize;
- UdpSocket _input_socket;
- UdpSocket _output_socket;
- UdpPacket _output_packet;
- UdpSocket _input_pad_socket;
- UdpPacket _input_pad_packet;
+ Socket::UDPSocket _input_socket;
+ Socket::UDPSocket _output_socket;
+ Socket::UDPPacket _output_packet;
+ Socket::UDPSocket _input_pad_socket;
+ Socket::UDPPacket _input_pad_packet;
OrderedQueue _ordered;
std::queue<std::vector<uint8_t> > _padFrameQueue;
@@ -123,21 +123,15 @@ class AVTInput
uint8_t* _nextFrameIndex = 0;
bool _parseURI(const char* uri, std::string& address, long& port);
- int _openSocketSrv(UdpSocket* socket, const char* uri);
+ int _openSocketSrv(Socket::UDPSocket* socket, const char* uri);
int _openSocketCli();
void _sendCtrlMessage();
- void _sendPADFrame(UdpPacket* packet = NULL);
- void _interpretMessage(const uint8_t* data, size_t size, UdpPacket* packet = NULL);
+ void _sendPADFrame();
+ void _interpretMessage(const uint8_t* data, size_t size);
bool _checkMessage();
void _purgeMessages();
- /*! Read length bytes into buf.
- *
- * \return the number of bytes read.
- */
- ssize_t _read(uint8_t* buf, size_t length, bool onlyOnePacket=false);
-
/*! Test Bytes 1,2,3 for STI detection */
bool _isSTI(const uint8_t* buf);
diff --git a/src/UdpSocket.cpp b/src/UdpSocket.cpp
deleted file mode 100644
index 8ac3706..0000000
--- a/src/UdpSocket.cpp
+++ /dev/null
@@ -1,510 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2015 Matthias P. Braendli
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "UdpSocket.h"
-
-#include <iostream>
-#include <stdio.h>
-#include <errno.h>
-#include <fcntl.h>
-#include <string.h>
-
-#ifdef TRACE_ON
-# ifndef TRACE_CLASS
-# define TRACE_CLASS(class, func) cout <<"-" <<(class) <<"\t(" <<this <<")::" <<(func) <<endl
-# define TRACE_STATIC(class, func) cout <<"-" <<(class) <<"\t(static)::" <<(func) <<endl
-# endif
-#else
-# ifndef TRACE_CLASS
-# define TRACE_CLASS(class, func)
-# define TRACE_STATIC(class, func)
-# endif
-#endif
-
-
-/// Must be call once before doing any operation on sockets
-int UdpSocket::init()
-{
-#ifdef _WIN32
- WSADATA wsaData;
- WORD wVersionRequested = wVersionRequested = MAKEWORD( 2, 2 );
-
- int res = WSAStartup( wVersionRequested, &wsaData );
- if (res) {
- setInetError("Can't initialize winsock");
- return -1;
- }
-#endif
- return 0;
-}
-
-
-/// Must be call once before leaving application
-int UdpSocket::clean()
-{
-#ifdef _WIN32
- int res = WSACleanup();
- if (res) {
- setInetError("Can't initialize winsock");
- return -1;
- }
-#endif
- return 0;
-}
-
-
-/**
- * Two step constructor. Create must be called prior to use this
- * socket.
- */
-UdpSocket::UdpSocket() :
- listenSocket(INVALID_SOCKET)
-{
- TRACE_CLASS("UdpSocket", "UdpSocket()");
-}
-
-
-/**
- * One step constructor.
- * @param port The port number on which the socket will be bind
- * @param name The IP address on which the socket will be bind.
- * It is used to bind the socket on a specific interface if
- * the computer have many NICs.
- */
-UdpSocket::UdpSocket(int port, char *name) :
- listenSocket(INVALID_SOCKET)
-{
- TRACE_CLASS("UdpSocket", "UdpSocket(int, char*)");
- create(port, name);
-}
-
-
-/**
- * This functin set blocking mode. The socket can be blocking or not,
- * depending of the parametre. By default, the socket is blocking.
- * @param block If true, set the socket blocking, otherwise set non-blocking
- * @return 0 if ok
- * -1 if error
- */
-int UdpSocket::setBlocking(bool block)
-{
-#ifdef _WIN32
- unsigned long res = block ? 0 : 1;
- if (ioctlsocket(listenSocket, FIONBIO, &res) != 0) {
- setInetError("Can't change blocking state of socket");
- return -1;
- }
- return 0;
-#else
- int res;
- if (block)
- res = fcntl(listenSocket, F_SETFL, 0);
- else
- res = fcntl(listenSocket, F_SETFL, O_NONBLOCK);
- if (res == SOCKET_ERROR) {
- setInetError("Can't change blocking state of socket");
- return -1;
- }
- return 0;
-#endif
-}
-
-
-/**
- * Two step initializer. This function must be called after the constructor
- * without argument as been called.
- * @param port The port number on which the socket will be bind
- * @param name The IP address on which the socket will be bind.
- * It is used to bind the socket on a specific interface if
- * the computer have many NICs.
- * @return 0 if ok
- * -1 if error
- */
-int UdpSocket::create(int port, char *name)
-{
- TRACE_CLASS("UdpSocket", "create(int, char*)");
- if (listenSocket != INVALID_SOCKET)
- closesocket(listenSocket);
- address.setAddress(name);
- address.setPort(port);
- if ((listenSocket = socket(PF_INET, SOCK_DGRAM, 0)) == INVALID_SOCKET) {
- setInetError("Can't create socket");
- return -1;
- }
- reuseopt_t reuse = 1;
- if (setsockopt(listenSocket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse))
- == SOCKET_ERROR) {
- setInetError("Can't reuse address");
- return -1;
- }
-
- if (bind(listenSocket, address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) {
- setInetError("Can't bind socket");
- closesocket(listenSocket);
- listenSocket = INVALID_SOCKET;
- return -1;
- }
- return 0;
-}
-
-
-/// Destructor
-UdpSocket::~UdpSocket() {
- TRACE_CLASS("UdpSocket", "~UdpSocket()");
- if (listenSocket != INVALID_SOCKET)
- closesocket(listenSocket);
-}
-
-
-/**
- * Receive an UDP packet.
- * @param packet The packet that will receive data. The address will be set
- * to the source address.
- * @return 0 if ok, -1 if error
- */
-int UdpSocket::receive(UdpPacket &packet)
-{
- TRACE_CLASS("UdpSocket", "receive(UdpPacket)");
- socklen_t addrSize;
- addrSize = sizeof(*packet.getAddress().getAddress());
- int ret = recvfrom(listenSocket, packet.getData(), packet.getSize() - packet.getOffset(), 0,
- packet.getAddress().getAddress(), &addrSize);
- if (ret == SOCKET_ERROR) {
- packet.setLength(0);
-#ifndef _WIN32
- if (errno == EAGAIN)
- return 0;
-#endif
- setInetError("Can't receive UDP packet");
- return -1;
- }
- packet.setLength(ret);
- if (ret == (long)packet.getSize()) {
- packet.setSize(packet.getSize() << 1);
- }
- return 0;
-}
-
-/**
- * Send an UDP packet.
- * @param packet The UDP packet to be sent. It includes the data and the
- * destination address
- * return 0 if ok, -1 if error
- */
-int UdpSocket::send(UdpPacket &packet)
-{
-#ifdef DUMP
- TRACE_CLASS("UdpSocket", "send(UdpPacket)");
-#endif
- int ret = sendto(listenSocket, packet.getData(), packet.getLength(), 0,
- packet.getAddress().getAddress(), sizeof(*packet.getAddress().getAddress()));
- if (ret == SOCKET_ERROR
-#ifndef _WIN32
- && errno != ECONNREFUSED
-#endif
- ) {
- setInetError("Can't send UDP packet");
- return -1;
- }
- return 0;
-}
-
-
-/**
- * Send an UDP packet
- *
- * return 0 if ok, -1 if error
- */
-int UdpSocket::send(std::vector<uint8_t> data, InetAddress destination)
-{
-#ifdef DUMP
- TRACE_CLASS("UdpSocket", "send(vector<uint8_t>)");
-#endif
- int ret = sendto(listenSocket, &data[0], data.size(), 0,
- destination.getAddress(), sizeof(*destination.getAddress()));
- if (ret == SOCKET_ERROR
-#ifndef _WIN32
- && errno != ECONNREFUSED
-#endif
- ) {
- setInetError("Can't send UDP packet");
- return -1;
- }
- return 0;
-}
-
-
-/**
- * Must be called to receive data on a multicast address.
- * @param groupname The multica
-st address to join.
- * @return 0 if ok, -1 if error
- */
-int UdpSocket::joinGroup(char* groupname)
-{
- TRACE_CLASS("UdpSocket", "joinGroup(char*)");
-#ifdef _WIN32
- ip_mreq group;
-#else
- ip_mreqn group;
-#endif
- if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) {
- setInetError(groupname);
- return -1;
- }
- if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) {
- setInetError("Not a multicast address");
- return -1;
- }
-#ifdef _WIN32
- group.imr_interface.s_addr = 0;
- if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*)&group, sizeof(group))
- == SOCKET_ERROR) {
- setInetError("Can't join multicast group");
- return -1;
- }
-#else
- group.imr_address.s_addr = htons(INADDR_ANY);;
- group.imr_ifindex = 0;
- if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
- == SOCKET_ERROR) {
- setInetError("Can't join multicast group");
- }
-#endif
- return 0;
-}
-
-int UdpSocket::setMulticastTTL(int ttl)
-{
- if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl))
- == SOCKET_ERROR) {
- setInetError("Can't set ttl");
- return -1;
- }
-
- return 0;
-}
-
-int UdpSocket::setMulticastSource(const char* source_addr)
-{
- struct in_addr addr;
- if (inet_aton(source_addr, &addr) == 0) {
- setInetError("Can't parse source address");
- return -1;
- }
-
- if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))
- == SOCKET_ERROR) {
- setInetError("Can't set source address");
- return -1;
- }
-
- return 0;
-}
-
-
-/**
- * Constructs an UDP packet.
- * @param initSize The initial size of the data buffer
- */
-UdpPacket::UdpPacket(unsigned int initSize) :
- dataBuf(new char[initSize]),
- length(0),
- size(initSize),
- offset(0)
-{
- TRACE_CLASS("UdpPacket", "UdpPacket(unsigned int)");
- if (dataBuf == NULL)
- size = 0;
-}
-
-
-/// Destructor
-UdpPacket::~UdpPacket()
-{
- TRACE_CLASS("UdpPacket", "~UdpPacket()");
- if (dataBuf != NULL) {
- delete []dataBuf;
- dataBuf = NULL;
- }
-}
-
-
-/**
- * Changes size of the data buffer size. \a Length + \a offset data will be copied
- * in the new buffer.
- * @warning The pointer to data will be changed
- * @param newSize The new data buffer size
- */
-void UdpPacket::setSize(unsigned newSize)
-{
- TRACE_CLASS("UdpPacket", "setSize(unsigned)");
- char *tmp = new char[newSize];
- if (length > newSize)
- length = newSize;
- if (tmp) {
- memcpy(tmp, dataBuf, length);
- delete []dataBuf;
- dataBuf = tmp;
- size = newSize;
- }
-}
-
-
-/**
- * Give the pointer to data. It is ajusted with the \a offset.
- * @warning This pointer change. when the \a size of the buffer and the \a offset change.
- * @return The pointer
- */
-char *UdpPacket::getData()
-{
- return dataBuf + offset;
-}
-
-
-/**
- * Add some data at the end of data buffer and adjust size.
- * @param data Pointer to the data to add
- * @param size Size in bytes of new data
- */
-void UdpPacket::addData(const void *data, unsigned size)
-{
- if (length + size > this->size) {
- setSize(this->size << 1);
- }
- memcpy(dataBuf + length, data, size);
- length += size;
-}
-
-
-/**
- * Returns the length of useful data. Data before the \a offset are ignored.
- * @return The data length
- */
-unsigned long UdpPacket::getLength()
-{
- return length - offset;
-}
-
-
-/**
- * Returns the size of the data buffer.
- * @return The data buffer size
- */
-unsigned long UdpPacket::getSize()
-{
- return size;
-}
-
-
-/**
- * Returns the offset value.
- * @return The offset value
- */
-unsigned long UdpPacket::getOffset()
-{
- return offset;
-}
-
-
-/**
- * Sets the data length value. Data before the \a offset are ignored.
- * @param len The new length of data
- */
-void UdpPacket::setLength(unsigned long len)
-{
- length = len + offset;
-}
-
-
-/**
- * Sets the data offset. Data length is ajusted to ignore data before the \a offset.
- * @param val The new data offset.
- */
-void UdpPacket::setOffset(unsigned long val)
-{
- offset = val;
- if (offset > length)
- length = offset;
-}
-
-
-/**
- * Returns the UDP address of the data.
- * @return The UDP address
- */
-InetAddress &UdpPacket::getAddress()
-{
- return address;
-}
-
-/*
-WSAEINTR
-WSAEBADF
-WSAEACCES
-WSAEFAULT
-WSAEINVAL
-WSAEMFILE
-WSAEWOULDBLOCK
-WSAEINPROGRESS
-WSAEALREADY
-WSAENOTSOCK
-WSAEDESTADDRREQ
-WSAEMSGSIZE
-WSAEPROTOTYPE
-WSAENOPROTOOPT
-WSAEPROTONOSUPPORT
-WSAESOCKTNOSUPPORT
-WSAEOPNOTSUPP
-WSAEPFNOSUPPORT
-WSAEAFNOSUPPORT
-WSAEADDRINUSE
-WSAEADDRNOTAVAIL
-WSAENETDOWN
-WSAENETUNREACH
-WSAENETRESET
-WSAECONNABORTED
-WSAECONNRESET
-WSAENOBUFS
-WSAEISCONN
-WSAENOTCONN
-WSAESHUTDOWN
-WSAETOOMANYREFS
-WSAETIMEDOUT
-WSAECONNREFUSED
-WSAELOOP
-WSAENAMETOOLONG
-WSAEHOSTDOWN
-WSAEHOSTUNREACH
-WSAENOTEMPTY
-WSAEPROCLIM
-WSAEUSERS
-WSAEDQUOT
-WSAESTALE
-WSAEREMOTE
-WSAEDISCON
-WSASYSNOTREADY
-WSAVERNOTSUPPORTED
-WSANOTINITIALISED
-*/
diff --git a/src/UdpSocket.h b/src/UdpSocket.h
deleted file mode 100644
index 07e9f0e..0000000
--- a/src/UdpSocket.h
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2015 Matthias P. Braendli
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef _UDPSOCKET
-#define _UDPSOCKET
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#include "InetAddress.h"
-#ifdef _WIN32
-# include <winsock.h>
-# define socklen_t int
-# define reuseopt_t char
-#else
-# include <sys/socket.h>
-# include <netinet/in.h>
-# include <unistd.h>
-# include <netdb.h>
-# include <arpa/inet.h>
-# include <pthread.h>
-# define SOCKET int
-# define INVALID_SOCKET -1
-# define SOCKET_ERROR -1
-# define reuseopt_t int
-#endif
-//#define INVALID_PORT -1
-
-#include <stdlib.h>
-#include <iostream>
-#include <vector>
-
-class UdpPacket;
-
-
-/**
- * This class represents a socket for sending and receiving UDP packets.
- *
- * A UDP socket is the sending or receiving point for a packet delivery service.
- * Each packet sent or received on a datagram socket is individually
- * addressed and routed. Multiple packets sent from one machine to another may
- * be routed differently, and may arrive in any order.
- * @author Pascal Charest pascal.charest@crc.ca
- */
-class UdpSocket {
- public:
- UdpSocket();
- UdpSocket(int port, char *name = NULL);
- ~UdpSocket();
- UdpSocket(const UdpSocket& other) = delete;
- const UdpSocket& operator=(const UdpSocket& other) = delete;
-
- static int init();
- static int clean();
-
- int create(int port = 0, char *name = NULL);
-
- int send(UdpPacket &packet);
- int send(const std::vector<uint8_t> data);
- int send(std::vector<uint8_t> data, InetAddress destination);
- int receive(UdpPacket &packet);
- int joinGroup(char* groupname);
- int setMulticastSource(const char* source_addr);
- int setMulticastTTL(int ttl);
- /**
- * Connects the socket on a specific address. Only data from this address
- * will be received.
- * @param addr The address to connect the socket
- * @warning Not implemented yet.
- */
- void connect(InetAddress &addr);
- int setBlocking(bool block);
-
- protected:
- /// The address on which the socket is binded.
- InetAddress address;
- /// The low-level socket used by system functions.
- SOCKET listenSocket;
-};
-
-/**
- * This class represents a UDP packet.
- *
- * UDP packets are used to implement a connectionless packet delivery service.
- * Each message is routed from one machine to another based solely on
- * information contained within that packet. Multiple packets sent from one
- * machine to another might be routed differently, and might arrive in any order.
- * @author Pascal Charest pascal.charest@crc.ca
- */
-class UdpPacket {
- public:
- UdpPacket(unsigned int initSize = 1024);
- UdpPacket(const UdpPacket& packet) = delete;
- const UdpPacket& operator=(const UdpPacket&) = delete;
- UdpPacket(const UdpPacket&& packet) = delete;
- const UdpPacket& operator=(const UdpPacket&&) = delete;
- ~UdpPacket();
-
- char *getData();
- void addData(const void *data, unsigned size);
- unsigned long getLength();
- unsigned long getSize();
- unsigned long getOffset();
- void setLength(unsigned long len);
- void setOffset(unsigned long val);
- void setSize(unsigned newSize);
- InetAddress &getAddress();
-
- private:
- char *dataBuf;
- unsigned long length, size, offset;
- InetAddress address;
-};
-
-#endif // _UDPSOCKET
-