From 8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 7 Jun 2019 10:14:51 +0200 Subject: Work on STI-D/EDI input --- lib/ReedSolomon.cpp | 116 +++++++++++++++++++++++++++++++++ lib/ReedSolomon.h | 56 ++++++++++++++++ lib/Socket.cpp | 165 +++++++++++++++++++++++++++++++++++----------- lib/Socket.h | 32 ++++++++- lib/ThreadsafeQueue.h | 176 ++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 504 insertions(+), 41 deletions(-) create mode 100644 lib/ReedSolomon.cpp create mode 100644 lib/ReedSolomon.h create mode 100644 lib/ThreadsafeQueue.h (limited to 'lib') diff --git a/lib/ReedSolomon.cpp b/lib/ReedSolomon.cpp new file mode 100644 index 0000000..38d8ea8 --- /dev/null +++ b/lib/ReedSolomon.cpp @@ -0,0 +1,116 @@ +/* + Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right + of Canada (Communications Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#include "ReedSolomon.h" +#include +#include +#include +#include +#include // For galois.h ... +#include // For memcpy + +extern "C" { +#include "fec/fec.h" +} +#include + +#define SYMSIZE 8 + + +ReedSolomon::ReedSolomon(int N, int K, bool reverse, int gfpoly, int firstRoot, int primElem) +{ + setReverse(reverse); + + m_N = N; + m_K = K; + + const int symsize = SYMSIZE; + const int nroots = N - K; // For EDI PFT, this must be 48 + const int pad = ((1 << symsize) - 1) - N; // is 255-N + + rsData = init_rs_char(symsize, gfpoly, firstRoot, primElem, nroots, pad); + + if (rsData == nullptr) { + std::stringstream ss; + ss << "Invalid Reed-Solomon parameters! " << + "N=" << N << " ; K=" << K << " ; pad=" << pad; + throw std::invalid_argument(ss.str()); + } +} + + +ReedSolomon::~ReedSolomon() +{ + free_rs_char(rsData); +} + + +void ReedSolomon::setReverse(bool state) +{ + reverse = state; +} + + +int ReedSolomon::encode(void* data, void* fec, size_t size) +{ + uint8_t* input = reinterpret_cast(data); + uint8_t* output = reinterpret_cast(fec); + int ret = 0; + + if (reverse) { + std::vector buffer(m_N); + + memcpy(&buffer[0], input, m_K); + memcpy(&buffer[m_K], output, m_N - m_K); + + ret = decode_rs_char(rsData, &buffer[0], nullptr, 0); + if ((ret != 0) && (ret != -1)) { + memcpy(input, &buffer[0], m_K); + memcpy(output, &buffer[m_K], m_N - m_K); + } + } + else { + encode_rs_char(rsData, input, output); + } + + return ret; +} + + +int ReedSolomon::encode(void* data, size_t size) +{ + uint8_t* input = reinterpret_cast(data); + int ret = 0; + + if (reverse) { + ret = decode_rs_char(rsData, input, nullptr, 0); + } + else { + encode_rs_char(rsData, input, &input[m_K]); + } + + return ret; +} diff --git a/lib/ReedSolomon.h b/lib/ReedSolomon.h new file mode 100644 index 0000000..abcef62 --- /dev/null +++ b/lib/ReedSolomon.h @@ -0,0 +1,56 @@ +/* + Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right + of Canada (Communications Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include + +class ReedSolomon +{ +public: + ReedSolomon(int N, int K, + bool reverse = false, + int gfpoly = 0x11d, int firstRoot = 0, int primElem = 1); + ReedSolomon(const ReedSolomon& other) = delete; + ReedSolomon operator=(const ReedSolomon& other) = delete; + ~ReedSolomon(); + + void setReverse(bool state); + int encode(void* data, void* fec, size_t size); + int encode(void* data, size_t size); + +private: + int m_N; + int m_K; + + void* rsData; + bool reverse; +}; + diff --git a/lib/Socket.cpp b/lib/Socket.cpp index fe3df44..9b404eb 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -42,13 +42,10 @@ void InetAddress::resolveUdpDestination(const std::string& destination, int port struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_family = AF_INET; hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ hints.ai_flags = 0; hints.ai_protocol = 0; - hints.ai_canonname = nullptr; - hints.ai_addr = nullptr; - hints.ai_next = nullptr; struct addrinfo *result, *rp; int s = getaddrinfo(destination.c_str(), service, &hints, &result); @@ -77,19 +74,19 @@ UDPPacket::UDPPacket(size_t initSize) : UDPSocket::UDPSocket() : - listenSocket(INVALID_SOCKET) + m_sock(INVALID_SOCKET) { reinit(0, ""); } UDPSocket::UDPSocket(int port) : - listenSocket(INVALID_SOCKET) + m_sock(INVALID_SOCKET) { reinit(port, ""); } UDPSocket::UDPSocket(int port, const std::string& name) : - listenSocket(INVALID_SOCKET) + m_sock(INVALID_SOCKET) { reinit(port, name); } @@ -97,16 +94,28 @@ UDPSocket::UDPSocket(int port, const std::string& name) : void UDPSocket::setBlocking(bool block) { - int res = fcntl(listenSocket, F_SETFL, block ? 0 : O_NONBLOCK); + int res = fcntl(m_sock, F_SETFL, block ? 0 : O_NONBLOCK); if (res == -1) { throw runtime_error(string("Can't change blocking state of socket: ") + strerror(errno)); } } +void UDPSocket::reinit(int port) +{ + return reinit(port, ""); +} + void UDPSocket::reinit(int port, const std::string& name) { - if (listenSocket != INVALID_SOCKET) { - ::close(listenSocket); + if (m_sock != INVALID_SOCKET) { + ::close(m_sock); + } + + if (port == 0) { + // No need to bind to a given port, creating the + // socket is enough + m_sock = ::socket(AF_INET, SOCK_DGRAM, 0); + return; } char service[NI_MAXSERV]; @@ -114,7 +123,7 @@ void UDPSocket::reinit(int port, const std::string& name) struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_family = AF_INET; hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ hints.ai_protocol = 0; /* Any protocol */ @@ -141,7 +150,7 @@ void UDPSocket::reinit(int port, const std::string& name) } if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { - listenSocket = sfd; + m_sock = sfd; break; } @@ -157,17 +166,17 @@ void UDPSocket::reinit(int port, const std::string& name) void UDPSocket::close() { - if (listenSocket != INVALID_SOCKET) { - ::close(listenSocket); + if (m_sock != INVALID_SOCKET) { + ::close(m_sock); } - listenSocket = INVALID_SOCKET; + m_sock = INVALID_SOCKET; } UDPSocket::~UDPSocket() { - if (listenSocket != INVALID_SOCKET) { - ::close(listenSocket); + if (m_sock != INVALID_SOCKET) { + ::close(m_sock); } } @@ -177,7 +186,7 @@ UDPPacket UDPSocket::receive(size_t max_size) UDPPacket packet(max_size); socklen_t addrSize; addrSize = sizeof(*packet.address.as_sockaddr()); - ssize_t ret = recvfrom(listenSocket, + ssize_t ret = recvfrom(m_sock, packet.buffer.data(), packet.buffer.size(), 0, @@ -186,7 +195,13 @@ UDPPacket UDPSocket::receive(size_t max_size) if (ret == SOCKET_ERROR) { packet.buffer.resize(0); + + // This suppresses the -Wlogical-op warning +#if EAGAIN == EWOULDBLOCK if (errno == EAGAIN) { +#else + if (errno == EAGAIN or errno == EWOULDBLOCK) { +#endif return 0; } throw runtime_error(string("Can't receive data: ") + strerror(errno)); @@ -198,24 +213,24 @@ UDPPacket UDPSocket::receive(size_t max_size) void UDPSocket::send(UDPPacket& packet) { - int ret = sendto(listenSocket, packet.buffer.data(), packet.buffer.size(), 0, + const int ret = sendto(m_sock, packet.buffer.data(), packet.buffer.size(), 0, packet.address.as_sockaddr(), sizeof(*packet.address.as_sockaddr())); if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { - throw runtime_error(string("Can't send UDP packet") + strerror(errno)); + throw runtime_error(string("Can't send UDP packet: ") + strerror(errno)); } } void UDPSocket::send(const std::vector& data, InetAddress destination) { - int ret = sendto(listenSocket, &data[0], data.size(), 0, + const int ret = sendto(m_sock, data.data(), data.size(), 0, destination.as_sockaddr(), sizeof(*destination.as_sockaddr())); if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { - throw runtime_error(string("Can't send UDP packet") + strerror(errno)); + throw runtime_error(string("Can't send UDP packet: ") + strerror(errno)); } } -void UDPSocket::joinGroup(char* groupname) +void UDPSocket::joinGroup(const char* groupname, const char* if_addr) { ip_mreqn group; if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { @@ -224,9 +239,15 @@ void UDPSocket::joinGroup(char* groupname) if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { throw runtime_error("Group name is not a multicast address"); } - group.imr_address.s_addr = htons(INADDR_ANY);; + + if (if_addr) { + group.imr_address.s_addr = inet_addr(if_addr); + } + else { + group.imr_address.s_addr = htons(INADDR_ANY); + } group.imr_ifindex = 0; - if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) + if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) == SOCKET_ERROR) { throw runtime_error(string("Can't join multicast group") + strerror(errno)); } @@ -239,7 +260,7 @@ void UDPSocket::setMulticastSource(const char* source_addr) throw runtime_error(string("Can't parse source address") + strerror(errno)); } - if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) + if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) == SOCKET_ERROR) { throw runtime_error(string("Can't set source address") + strerror(errno)); } @@ -247,26 +268,82 @@ void UDPSocket::setMulticastSource(const char* source_addr) void UDPSocket::setMulticastTTL(int ttl) { - if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) + if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) == SOCKET_ERROR) { throw runtime_error(string("Can't set multicast ttl") + strerror(errno)); } } +UDPReceiver::~UDPReceiver() { + m_stop = true; + m_sock.close(); + if (m_thread.joinable()) { + m_thread.join(); + } +} + +void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) { + m_port = port; + m_bindto = bindto; + m_mcastaddr = mcastaddr; + m_max_packets_queued = max_packets_queued; + m_thread = std::thread(&UDPReceiver::m_run, this); +} -TCPSocket::TCPSocket() +std::vector UDPReceiver::get_packet_buffer() { - if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { - throw std::runtime_error("Can't create TCP socket"); + if (m_stop) { + throw runtime_error("UDP Receiver not running"); } -#if defined(HAVE_SO_NOSIGPIPE) - int val = 1; - if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, - &val, sizeof(val)) < 0) { - throw std::runtime_error("Can't set SO_NOSIGPIPE"); + UDPPacket p; + m_packets.wait_and_pop(p); + + return p.buffer; +} + +void UDPReceiver::m_run() +{ + // Ensure that stop is set to true in case of exception or return + struct SetStopOnDestruct { + SetStopOnDestruct(atomic& stop) : m_stop(stop) {} + ~SetStopOnDestruct() { m_stop = true; } + private: atomic& m_stop; + } autoSetStop(m_stop); + + if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) { + m_sock.reinit(m_port, m_mcastaddr); + m_sock.setMulticastSource(m_bindto.c_str()); + m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str()); } -#endif + else { + m_sock.reinit(m_port, m_bindto); + } + + while (not m_stop) { + constexpr size_t packsize = 8192; + try { + auto packet = m_sock.receive(packsize); + if (packet.buffer.size() == packsize) { + // TODO replace fprintf + fprintf(stderr, "Warning, possible UDP truncation\n"); + } + + // If this blocks, the UDP socket will lose incoming packets + m_packets.push_wait_if_full(packet, m_max_packets_queued); + } + catch (const std::runtime_error& e) { + // TODO replace fprintf + // TODO handle intr + fprintf(stderr, "Socket error: %s\n", e.what()); + m_stop = true; + } + } +} + + +TCPSocket::TCPSocket() +{ } TCPSocket::~TCPSocket() @@ -314,7 +391,7 @@ void TCPSocket::connect(const std::string& hostname, int port) /* Obtain address(es) matching host/port */ struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = 0; hints.ai_protocol = 0; @@ -376,7 +453,7 @@ void TCPSocket::listen(int port, const string& name) struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_UNSPEC; + hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ hints.ai_protocol = 0; @@ -410,6 +487,14 @@ void TCPSocket::listen(int port, const string& name) freeaddrinfo(result); +#if defined(HAVE_SO_NOSIGPIPE) + int val = 1; + if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, + &val, sizeof(val)) < 0) { + throw std::runtime_error("Can't set SO_NOSIGPIPE"); + } +#endif + if (rp == nullptr) { throw runtime_error("Could not bind"); } @@ -683,7 +768,9 @@ TCPDataDispatcher::~TCPDataDispatcher() m_running = false; m_connections.clear(); m_listener_socket.close(); - m_listener_thread.join(); + if (m_listener_thread.joinable()) { + m_listener_thread.join(); + } } void TCPDataDispatcher::start(int port, const string& address) diff --git a/lib/Socket.h b/lib/Socket.h index 82ff5ad..2393584 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -104,13 +104,14 @@ class UDPSocket const UDPSocket& operator=(const UDPSocket& other) = delete; /** Close the already open socket, and create a new one. Throws a runtime_error on error. */ + void reinit(int port); void reinit(int port, const std::string& name); void close(void); void send(UDPPacket& packet); void send(const std::vector& data, InetAddress destination); UDPPacket receive(size_t max_size); - void joinGroup(char* groupname); + void joinGroup(const char* groupname, const char* if_addr = nullptr); void setMulticastSource(const char* source_addr); void setMulticastTTL(int ttl); @@ -120,9 +121,36 @@ class UDPSocket void setBlocking(bool block); protected: - SOCKET listenSocket; + SOCKET m_sock; }; +/* Threaded UDP receiver */ +class UDPReceiver { + public: + UDPReceiver() : m_port(0), m_thread(), m_stop(false), m_packets() {} + ~UDPReceiver(); + UDPReceiver(const UDPReceiver&) = delete; + UDPReceiver operator=(const UDPReceiver&) = delete; + + // Start the receiver in a separate thread + void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued); + + // Get the data contained in a UDP packet, blocks if none available + // In case of error, throws a runtime_error + std::vector get_packet_buffer(void); + + private: + void m_run(void); + + int m_port; + std::string m_bindto; + std::string m_mcastaddr; + size_t m_max_packets_queued; + std::thread m_thread; + std::atomic m_stop; + ThreadsafeQueue m_packets; + UDPSocket m_sock; +}; class TCPSocket { public: diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h new file mode 100644 index 0000000..62f4c96 --- /dev/null +++ b/lib/ThreadsafeQueue.h @@ -0,0 +1,176 @@ +/* + Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in + Right of Canada (Communications Research Center Canada) + + Copyright (C) 2018 + Matthias P. Braendli, matthias.braendli@mpb.li + + An implementation for a threadsafe queue, depends on C++11 + + When creating a ThreadsafeQueue, one can specify the minimal number + of elements it must contain before it is possible to take one + element out. + */ +/* + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ + +#pragma once + +#include +#include +#include +#include + +/* This queue is meant to be used by two threads. One producer + * that pushes elements into the queue, and one consumer that + * retrieves the elements. + * + * The queue can make the consumer block until an element + * is available, or a wakeup requested. + */ + +/* Class thrown by blocking pop to tell the consumer + * that there's a wakeup requested. */ +class ThreadsafeQueueWakeup {}; + +template +class ThreadsafeQueue +{ +public: + /* Push one element into the queue, and notify another thread that + * might be waiting. + * + * returns the new queue size. + */ + size_t push(T const& val) + { + std::unique_lock lock(the_mutex); + the_queue.push(val); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + size_t push(T&& val) + { + std::unique_lock lock(the_mutex); + the_queue.emplace(std::move(val)); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + /* Push one element into the queue, but wait until the + * queue size goes below the threshold. + * + * Notify waiting thread. + * + * returns the new queue size. + */ + size_t push_wait_if_full(T const& val, size_t threshold) + { + std::unique_lock lock(the_mutex); + while (the_queue.size() >= threshold) { + the_tx_notification.wait(lock); + } + the_queue.push(val); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + /* Trigger a wakeup event on a blocking consumer, which + * will receive a ThreadsafeQueueWakeup exception. + */ + void trigger_wakeup(void) + { + std::unique_lock lock(the_mutex); + wakeup_requested = true; + lock.unlock(); + the_rx_notification.notify_one(); + } + + /* Send a notification for the receiver thread */ + void notify(void) + { + the_rx_notification.notify_one(); + } + + bool empty() const + { + std::unique_lock lock(the_mutex); + return the_queue.empty(); + } + + size_t size() const + { + std::unique_lock lock(the_mutex); + return the_queue.size(); + } + + bool try_pop(T& popped_value) + { + std::unique_lock lock(the_mutex); + if (the_queue.empty()) { + return false; + } + + popped_value = the_queue.front(); + the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); + + return true; + } + + void wait_and_pop(T& popped_value, size_t prebuffering = 1) + { + std::unique_lock lock(the_mutex); + while (the_queue.size() < prebuffering and + not wakeup_requested) { + the_rx_notification.wait(lock); + } + + if (wakeup_requested) { + wakeup_requested = false; + throw ThreadsafeQueueWakeup(); + } + else { + std::swap(popped_value, the_queue.front()); + the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); + } + } + +private: + std::queue the_queue; + mutable std::mutex the_mutex; + std::condition_variable the_rx_notification; + std::condition_variable the_tx_notification; + bool wakeup_requested = false; +}; + -- cgit v1.2.3