diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/Json.cpp | 4 | ||||
-rw-r--r-- | lib/Json.h | 4 | ||||
-rw-r--r-- | lib/Socket.cpp | 195 | ||||
-rw-r--r-- | lib/Socket.h | 44 | ||||
-rw-r--r-- | lib/ThreadsafeQueue.h | 38 | ||||
-rw-r--r-- | lib/edi/STIDecoder.cpp | 33 | ||||
-rw-r--r-- | lib/edi/STIDecoder.hpp | 12 | ||||
-rw-r--r-- | lib/edi/common.cpp | 33 | ||||
-rw-r--r-- | lib/edi/common.hpp | 12 | ||||
-rw-r--r-- | lib/edioutput/EDIConfig.h | 31 | ||||
-rw-r--r-- | lib/edioutput/PFT.cpp | 12 | ||||
-rw-r--r-- | lib/edioutput/PFT.h | 15 | ||||
-rw-r--r-- | lib/edioutput/Transport.cpp | 268 | ||||
-rw-r--r-- | lib/edioutput/Transport.h | 106 |
14 files changed, 556 insertions, 251 deletions
diff --git a/lib/Json.cpp b/lib/Json.cpp index 361a149..ee33671 100644 --- a/lib/Json.cpp +++ b/lib/Json.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2023 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -27,7 +27,7 @@ #include <sstream> #include <iomanip> #include <string> -#include <algorithm> +#include <stdexcept> #include "Json.h" @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2023 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -34,10 +34,10 @@ #include <vector> #include <memory> #include <optional> -#include <stdexcept> #include <string> #include <unordered_map> #include <variant> +#include <cstdint> namespace json { diff --git a/lib/Socket.cpp b/lib/Socket.cpp index b71c01e..5c920d7 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -24,7 +24,8 @@ #include "Socket.h" -#include <iostream> +#include <numeric> +#include <stdexcept> #include <cstdio> #include <cstring> #include <cerrno> @@ -106,16 +107,20 @@ UDPSocket::UDPSocket(UDPSocket&& other) { m_sock = other.m_sock; m_port = other.m_port; + m_multicast_source = other.m_multicast_source; other.m_port = 0; other.m_sock = INVALID_SOCKET; + other.m_multicast_source = ""; } const UDPSocket& UDPSocket::operator=(UDPSocket&& other) { m_sock = other.m_sock; m_port = other.m_port; + m_multicast_source = other.m_multicast_source; other.m_port = 0; other.m_sock = INVALID_SOCKET; + other.m_multicast_source = ""; return *this; } @@ -144,6 +149,7 @@ void UDPSocket::reinit(int port, const std::string& name) // No need to bind to a given port, creating the // socket is enough m_sock = ::socket(AF_INET, SOCK_DGRAM, 0); + post_init(); return; } @@ -180,6 +186,7 @@ void UDPSocket::reinit(int port, const std::string& name) if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { m_sock = sfd; + post_init(); break; } @@ -189,10 +196,47 @@ void UDPSocket::reinit(int port, const std::string& name) freeaddrinfo(result); if (rp == nullptr) { - throw runtime_error("Could not bind"); + throw runtime_error(string{"Could not bind to port "} + to_string(port)); + } +} + +void UDPSocket::post_init() { + int pktinfo = 1; + if (setsockopt(m_sock, IPPROTO_IP, IP_PKTINFO, &pktinfo, sizeof(pktinfo)) == SOCKET_ERROR) { + throw runtime_error(string("Can't request pktinfo: ") + strerror(errno)); + } + +} + +void UDPSocket::init_receive_multicast(int port, const string& local_if_addr, const string& mcastaddr) +{ + if (m_sock != INVALID_SOCKET) { + ::close(m_sock); + } + + m_port = port; + m_sock = ::socket(AF_INET, SOCK_DGRAM, 0); + post_init(); + + int reuse_setting = 1; + if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse_setting, sizeof(reuse_setting)) == SOCKET_ERROR) { + throw runtime_error("Can't reuse address"); + } + + struct sockaddr_in la; + memset((char *) &la, 0, sizeof(la)); + la.sin_family = AF_INET; + la.sin_port = htons(port); + la.sin_addr.s_addr = INADDR_ANY; + if (::bind(m_sock, (struct sockaddr*)&la, sizeof(la))) { + throw runtime_error(string("Could not bind: ") + strerror(errno)); } + + m_multicast_source = mcastaddr; + join_group(mcastaddr.c_str(), local_if_addr.c_str()); } + void UDPSocket::close() { if (m_sock != INVALID_SOCKET) { @@ -212,16 +256,26 @@ UDPSocket::~UDPSocket() UDPPacket UDPSocket::receive(size_t max_size) { + struct sockaddr_in addr; + struct msghdr msg; + struct iovec iov; + constexpr size_t BUFFER_SIZE = 1024; + char control_buffer[BUFFER_SIZE]; + struct cmsghdr *cmsg; + UDPPacket packet(max_size); - socklen_t addrSize; - addrSize = sizeof(*packet.address.as_sockaddr()); - ssize_t ret = recvfrom(m_sock, - packet.buffer.data(), - packet.buffer.size(), - 0, - packet.address.as_sockaddr(), - &addrSize); + memset(&msg, 0, sizeof(msg)); + msg.msg_name = &addr; + msg.msg_namelen = sizeof(addr); + msg.msg_iov = &iov; + iov.iov_base = packet.buffer.data(); + iov.iov_len = packet.buffer.size(); + msg.msg_iovlen = 1; + msg.msg_control = control_buffer; + msg.msg_controllen = sizeof(control_buffer); + + ssize_t ret = recvmsg(m_sock, &msg, 0); if (ret == SOCKET_ERROR) { packet.buffer.resize(0); @@ -232,12 +286,42 @@ UDPPacket UDPSocket::receive(size_t max_size) if (errno == EAGAIN or errno == EWOULDBLOCK) #endif { - return 0; + return packet; } throw runtime_error(string("Can't receive data: ") + strerror(errno)); } - packet.buffer.resize(ret); + struct in_pktinfo *pktinfo = nullptr; + for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) { + pktinfo = (struct in_pktinfo *)CMSG_DATA(cmsg); + break; + } + } + + if (pktinfo) { + char src_addr[INET_ADDRSTRLEN]; + char dst_addr[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &(addr.sin_addr), src_addr, INET_ADDRSTRLEN); + inet_ntop(AF_INET, &(pktinfo->ipi_addr), dst_addr, INET_ADDRSTRLEN); + //fprintf(stderr, "Received packet from %s to %s: %zu\n", src_addr, dst_addr, ret); + + memcpy(&packet.address.addr, &addr, sizeof(addr)); + + if (m_multicast_source.empty() or + strcmp(dst_addr, m_multicast_source.c_str()) == 0) { + packet.buffer.resize(ret); + } + else { + // Ignore packet for different multicast group + packet.buffer.resize(0); + } + } + else { + //fprintf(stderr, "No pktinfo: %zu\n", ret); + packet.buffer.resize(ret); + } + return packet; } @@ -269,14 +353,14 @@ void UDPSocket::send(const std::string& data, InetAddress destination) } } -void UDPSocket::joinGroup(const char* groupname, const char* if_addr) +void UDPSocket::join_group(const char* groupname, const char* if_addr) { ip_mreqn group; if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { throw runtime_error("Cannot convert multicast group name"); } if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { - throw runtime_error("Group name is not a multicast address"); + throw runtime_error(string("Group name '") + groupname + "' is not a multicast address"); } if (if_addr) { @@ -288,7 +372,7 @@ void UDPSocket::joinGroup(const char* groupname, const char* if_addr) group.imr_ifindex = 0; if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) == SOCKET_ERROR) { - throw runtime_error(string("Can't join multicast group") + strerror(errno)); + throw runtime_error(string("Can't join multicast group: ") + strerror(errno)); } } @@ -296,12 +380,12 @@ void UDPSocket::setMulticastSource(const char* source_addr) { struct in_addr addr; if (inet_aton(source_addr, &addr) == 0) { - throw runtime_error(string("Can't parse source address") + strerror(errno)); + throw runtime_error(string("Can't parse source address: ") + strerror(errno)); } if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) == SOCKET_ERROR) { - throw runtime_error(string("Can't set source address") + strerror(errno)); + throw runtime_error(string("Can't set source address: ") + strerror(errno)); } } @@ -309,7 +393,7 @@ void UDPSocket::setMulticastTTL(int ttl) { if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) == SOCKET_ERROR) { - throw runtime_error(string("Can't set multicast ttl") + strerror(errno)); + throw runtime_error(string("Can't set multicast ttl: ") + strerror(errno)); } } @@ -327,15 +411,13 @@ void UDPReceiver::add_receive_port(int port, const string& bindto, const string& UDPSocket sock; if (IN_MULTICAST(ntohl(inet_addr(mcastaddr.c_str())))) { - sock.reinit(port, mcastaddr); - sock.setMulticastSource(bindto.c_str()); - sock.joinGroup(mcastaddr.c_str(), bindto.c_str()); + sock.init_receive_multicast(port, bindto, mcastaddr); } else { sock.reinit(port, bindto); } - m_sockets.push_back(move(sock)); + m_sockets.push_back(std::move(sock)); } vector<UDPReceiver::ReceivedPacket> UDPReceiver::receive(int timeout_ms) @@ -366,11 +448,13 @@ vector<UDPReceiver::ReceivedPacket> UDPReceiver::receive(int timeout_ms) for (size_t i = 0; i < m_sockets.size(); i++) { if (fds[i].revents & POLLIN) { auto p = m_sockets[i].receive(2048); // This is larger than the usual MTU - ReceivedPacket rp; - rp.packetdata = move(p.buffer); - rp.received_from = move(p.address); - rp.port_received_on = m_sockets[i].getPort(); - received.push_back(move(rp)); + if (not p.buffer.empty()) { + ReceivedPacket rp; + rp.packetdata = std::move(p.buffer); + rp.received_from = std::move(p.address); + rp.port_received_on = m_sockets[i].getPort(); + received.push_back(std::move(rp)); + } } } @@ -395,7 +479,7 @@ TCPSocket::~TCPSocket() TCPSocket::TCPSocket(TCPSocket&& other) : m_sock(other.m_sock), - m_remote_address(move(other.m_remote_address)) + m_remote_address(std::move(other.m_remote_address)) { if (other.m_sock != -1) { other.m_sock = -1; @@ -884,12 +968,22 @@ ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms) reconnect(); } + m_last_received_packet_ts = chrono::steady_clock::now(); + return ret; } catch (const TCPSocket::Interrupted&) { return -1; } catch (const TCPSocket::Timeout&) { + const auto timeout = chrono::milliseconds(timeout_ms * 5); + if (m_last_received_packet_ts.has_value() and + chrono::steady_clock::now() - *m_last_received_packet_ts > timeout) + { + // This is to catch half-closed TCP connections + reconnect(); + } + return 0; } @@ -900,6 +994,7 @@ void TCPClient::reconnect() { TCPSocket newsock; m_sock = std::move(newsock); + m_last_received_packet_ts = nullopt; m_sock.connect(m_hostname, m_port, true); } @@ -907,7 +1002,7 @@ TCPConnection::TCPConnection(TCPSocket&& sock) : queue(), m_running(true), m_sender_thread(), - m_sock(move(sock)) + m_sock(std::move(sock)) { #if MISSING_OWN_ADDR auto own_addr = m_sock.getOwnAddress(); @@ -969,6 +1064,17 @@ void TCPConnection::process() #endif } +TCPConnection::stats_t TCPConnection::get_stats() const +{ + TCPConnection::stats_t s; + const vector<size_t> buffer_sizes = queue.map<size_t>( + [](const vector<uint8_t>& vec) { return vec.size(); } + ); + + s.buffer_fullness = std::accumulate(buffer_sizes.cbegin(), buffer_sizes.cend(), 0); + s.remote_address = m_sock.get_remote_address(); + return s; +} TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll) : m_max_queue_size(max_queue_size), @@ -1026,7 +1132,7 @@ void TCPDataDispatcher::process() auto sock = m_listener_socket.accept(timeout_ms); if (sock.valid()) { auto lock = unique_lock<mutex>(m_mutex); - m_connections.emplace(m_connections.begin(), move(sock)); + m_connections.emplace(m_connections.begin(), std::move(sock)); if (m_buffers_to_preroll > 0) { for (const auto& buf : m_preroll_queue) { @@ -1042,6 +1148,16 @@ void TCPDataDispatcher::process() } } + +std::vector<TCPConnection::stats_t> TCPDataDispatcher::get_stats() const +{ + std::vector<TCPConnection::stats_t> s; + for (const auto& conn : m_connections) { + s.push_back(conn.get_stats()); + } + return s; +} + TCPReceiveServer::TCPReceiveServer(size_t blocksize) : m_blocksize(blocksize) { @@ -1098,7 +1214,7 @@ void TCPReceiveServer::process() } else { buf.resize(r); - m_queue.push(make_shared<TCPReceiveMessageData>(move(buf))); + m_queue.push(make_shared<TCPReceiveMessageData>(std::move(buf))); } } catch (const TCPSocket::Interrupted&) { @@ -1139,7 +1255,7 @@ TCPSendClient::~TCPSendClient() } } -void TCPSendClient::sendall(const std::vector<uint8_t>& buffer) +TCPSendClient::ErrorStats TCPSendClient::sendall(const std::vector<uint8_t>& buffer) { if (not m_running) { throw runtime_error(m_exception_data); @@ -1151,6 +1267,17 @@ void TCPSendClient::sendall(const std::vector<uint8_t>& buffer) vector<uint8_t> discard; m_queue.try_pop(discard); } + + TCPSendClient::ErrorStats es; + es.num_reconnects = m_num_reconnects.load(); + + es.has_seen_new_errors = es.num_reconnects != m_num_reconnects_prev; + m_num_reconnects_prev = es.num_reconnects; + + auto lock = unique_lock<mutex>(m_error_mutex); + es.last_error = m_last_error; + + return es; } void TCPSendClient::process() @@ -1172,12 +1299,16 @@ void TCPSendClient::process() } else { try { + m_num_reconnects.fetch_add(1, std::memory_order_seq_cst); m_sock.connect(m_hostname, m_port); m_is_connected = true; } catch (const runtime_error& e) { m_is_connected = false; this_thread::sleep_for(chrono::seconds(1)); + + auto lock = unique_lock<mutex>(m_error_mutex); + m_last_error = e.what(); } } } diff --git a/lib/Socket.h b/lib/Socket.h index d8242e2..29b618a 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -2,7 +2,7 @@ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2022 + Copyright (C) 2024 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -31,9 +31,11 @@ #include "ThreadsafeQueue.h" #include <cstdlib> #include <atomic> -#include <iostream> +#include <chrono> #include <list> #include <memory> +#include <optional> +#include <string> #include <thread> #include <vector> @@ -111,13 +113,13 @@ class UDPSocket /** Close the already open socket, and create a new one. Throws a runtime_error on error. */ void reinit(int port); void reinit(int port, const std::string& name); + void init_receive_multicast(int port, const std::string& local_if_addr, const std::string& mcastaddr); void close(void); void send(UDPPacket& packet); void send(const std::vector<uint8_t>& data, InetAddress destination); void send(const std::string& data, InetAddress destination); UDPPacket receive(size_t max_size); - void joinGroup(const char* groupname, const char* if_addr = nullptr); void setMulticastSource(const char* source_addr); void setMulticastTTL(int ttl); @@ -129,9 +131,14 @@ class UDPSocket SOCKET getNativeSocket() const; int getPort() const; + private: + void join_group(const char* groupname, const char* if_addr = nullptr); + void post_init(); + protected: SOCKET m_sock = INVALID_SOCKET; int m_port = 0; + std::string m_multicast_source = ""; }; /* UDP packet receiver supporting receiving from several ports at once */ @@ -206,6 +213,8 @@ class TCPSocket { SOCKET get_sockfd() const { return m_sock; } + InetAddress get_remote_address() const { return m_remote_address; } + private: explicit TCPSocket(int sockfd); explicit TCPSocket(int sockfd, InetAddress remote_address); @@ -231,6 +240,8 @@ class TCPClient { TCPSocket m_sock; std::string m_hostname; int m_port; + + std::optional<std::chrono::steady_clock::time_point> m_last_received_packet_ts; }; /* Helper class for TCPDataDispatcher, contains a queue of pending data and @@ -245,6 +256,12 @@ class TCPConnection ThreadsafeQueue<std::vector<uint8_t> > queue; + struct stats_t { + size_t buffer_fullness = 0; + InetAddress remote_address; + }; + stats_t get_stats() const; + private: std::atomic<bool> m_running; std::thread m_sender_thread; @@ -267,6 +284,8 @@ class TCPDataDispatcher void start(int port, const std::string& address); void write(const std::vector<uint8_t>& data); + std::vector<TCPConnection::stats_t> get_stats() const; + private: void process(); @@ -324,10 +343,18 @@ class TCPSendClient { public: TCPSendClient(const std::string& hostname, int port); ~TCPSendClient(); + TCPSendClient(const TCPSendClient&) = delete; + TCPSendClient& operator=(const TCPSendClient&) = delete; - /* Throws a runtime_error on error - */ - void sendall(const std::vector<uint8_t>& buffer); + + struct ErrorStats { + std::string last_error = ""; + size_t num_reconnects = 0; + bool has_seen_new_errors = false; + }; + + /* Throws a runtime_error when the process thread isn't running */ + ErrorStats sendall(const std::vector<uint8_t>& buffer); private: void process(); @@ -344,6 +371,11 @@ class TCPSendClient { std::string m_exception_data; std::thread m_sender_thread; TCPSocket m_listener_socket; + + std::atomic<size_t> m_num_reconnects = ATOMIC_VAR_INIT(0); + size_t m_num_reconnects_prev = 0; + std::mutex m_error_mutex; + std::string m_last_error = ""; }; } diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h index 8b385d6..13bc19e 100644 --- a/lib/ThreadsafeQueue.h +++ b/lib/ThreadsafeQueue.h @@ -2,7 +2,7 @@ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2023 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li An implementation for a threadsafe queue, depends on C++11 @@ -28,6 +28,7 @@ #pragma once +#include <functional> #include <mutex> #include <condition_variable> #include <queue> @@ -63,10 +64,10 @@ public: std::unique_lock<std::mutex> lock(the_mutex); size_t queue_size_before = the_queue.size(); if (max_size == 0) { - the_queue.push(val); + the_queue.push_back(val); } else if (queue_size_before < max_size) { - the_queue.push(val); + the_queue.push_back(val); } size_t queue_size = the_queue.size(); lock.unlock(); @@ -80,10 +81,10 @@ public: std::unique_lock<std::mutex> lock(the_mutex); size_t queue_size_before = the_queue.size(); if (max_size == 0) { - the_queue.emplace(std::move(val)); + the_queue.emplace_back(std::move(val)); } else if (queue_size_before < max_size) { - the_queue.emplace(std::move(val)); + the_queue.emplace_back(std::move(val)); } size_t queue_size = the_queue.size(); lock.unlock(); @@ -110,9 +111,9 @@ public: bool overflow = false; while (the_queue.size() >= max_size) { overflow = true; - the_queue.pop(); + the_queue.pop_front(); } - the_queue.push(val); + the_queue.push_back(val); const size_t queue_size = the_queue.size(); lock.unlock(); @@ -129,9 +130,9 @@ public: bool overflow = false; while (the_queue.size() >= max_size) { overflow = true; - the_queue.pop(); + the_queue.pop_front(); } - the_queue.emplace(std::move(val)); + the_queue.emplace_back(std::move(val)); const size_t queue_size = the_queue.size(); lock.unlock(); @@ -152,7 +153,7 @@ public: while (the_queue.size() >= threshold) { the_tx_notification.wait(lock); } - the_queue.push(val); + the_queue.push_back(val); size_t queue_size = the_queue.size(); lock.unlock(); @@ -198,7 +199,7 @@ public: } popped_value = the_queue.front(); - the_queue.pop(); + the_queue.pop_front(); lock.unlock(); the_tx_notification.notify_one(); @@ -220,15 +221,26 @@ public: } else { std::swap(popped_value, the_queue.front()); - the_queue.pop(); + the_queue.pop_front(); lock.unlock(); the_tx_notification.notify_one(); } } + template<typename R> + std::vector<R> map(std::function<R(const T&)> func) const + { + std::vector<R> result; + std::unique_lock<std::mutex> lock(the_mutex); + for (const T& elem : the_queue) { + result.push_back(func(elem)); + } + return result; + } + private: - std::queue<T> the_queue; + std::deque<T> the_queue; mutable std::mutex the_mutex; std::condition_variable the_rx_notification; std::condition_variable the_tx_notification; diff --git a/lib/edi/STIDecoder.cpp b/lib/edi/STIDecoder.cpp index d387f1e..0499c53 100644 --- a/lib/edi/STIDecoder.cpp +++ b/lib/edi/STIDecoder.cpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2020 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -20,7 +20,6 @@ */ #include "STIDecoder.hpp" #include "buffer_unpack.hpp" -#include "crc.h" #include "Log.h" #include <cstdio> #include <cassert> @@ -70,6 +69,12 @@ void STIDecoder::setMaxDelay(int num_af_packets) m_dispatcher.setMaxDelay(num_af_packets); } +void STIDecoder::filter_stream_index(bool enable, uint16_t index) +{ + m_filter_stream = enable; + m_filtered_stream_index = index; +} + #define AFPACKET_HEADER_LEN 10 // includes SYNC bool STIDecoder::decode_starptr(const std::vector<uint8_t>& value, const tag_name_t& /*n*/) @@ -173,6 +178,20 @@ bool STIDecoder::decode_ssn(const std::vector<uint8_t>& value, const tag_name_t& n = (uint16_t)(name[2]) << 8; n |= (uint16_t)(name[3]); + if (n == 0) { + if (not m_ssnn_zero_warning_printed) { + etiLog.level(warn) << "EDI: Stream index SSnn tag is zero"; + } + m_ssnn_zero_warning_printed = true; + } + else { + m_ssnn_zero_warning_printed = false; + } + + if (m_filter_stream and m_filtered_stream_index != n) { + return true; + } + sti.stream_index = n - 1; // n is 1-indexed sti.rfa = value[0] >> 3; sti.tid = value[0] & 0x07; @@ -183,14 +202,20 @@ bool STIDecoder::decode_ssn(const std::vector<uint8_t>& value, const tag_name_t& sti.stid = istc & 0xFFF; if (sti.rfa != 0) { - etiLog.level(warn) << "EDI: rfa field in SSnn tag non-null"; + if (not m_rfa_nonnull_warning_printed) { + etiLog.level(warn) << "EDI: rfa field in SSnn tag non-null"; + } + m_rfa_nonnull_warning_printed = true; + } + else { + m_rfa_nonnull_warning_printed = false; } copy( value.cbegin() + 3, value.cend(), back_inserter(sti.istd)); - m_data_collector.add_payload(move(sti)); + m_data_collector.add_payload(std::move(sti)); return true; } diff --git a/lib/edi/STIDecoder.hpp b/lib/edi/STIDecoder.hpp index f85f789..81fbd82 100644 --- a/lib/edi/STIDecoder.hpp +++ b/lib/edi/STIDecoder.hpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2020 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -119,6 +119,10 @@ class STIDecoder { */ void setMaxDelay(int num_af_packets); + /* Enable/disable stream-index filtering. + * index==0 is out of spec, but some encoders do it anyway. */ + void filter_stream_index(bool enable, uint16_t index); + private: bool decode_starptr(const std::vector<uint8_t>& value, const tag_name_t& n); bool decode_dsti(const std::vector<uint8_t>& value, const tag_name_t& n); @@ -132,6 +136,12 @@ class STIDecoder { STIDataCollector& m_data_collector; TagDispatcher m_dispatcher; + + bool m_filter_stream = false; + uint16_t m_filtered_stream_index = 1; + + bool m_ssnn_zero_warning_printed = false; + bool m_rfa_nonnull_warning_printed = false; }; } diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp index c99997a..38eadf9 100644 --- a/lib/edi/common.cpp +++ b/lib/edi/common.cpp @@ -33,9 +33,9 @@ namespace EdiDecoder { using namespace std; -bool frame_timestamp_t::valid() const +bool frame_timestamp_t::is_valid() const { - return tsta != 0xFFFFFF; + return tsta != 0xFFFFFF and seconds != 0; } string frame_timestamp_t::to_string() const @@ -43,7 +43,7 @@ string frame_timestamp_t::to_string() const const time_t seconds_in_unix_epoch = to_unix_epoch(); stringstream ss; - if (valid()) { + if (is_valid()) { ss << "Timestamp: "; } else { @@ -129,10 +129,9 @@ std::string tag_name_to_human_readable(const tag_name_t& name) return s; } -TagDispatcher::TagDispatcher( - std::function<void()>&& af_packet_completed) : - m_af_packet_completed(move(af_packet_completed)), - m_tagpacket_handler([](const std::vector<uint8_t>& /*ignore*/){}) +TagDispatcher::TagDispatcher(std::function<void()>&& af_packet_completed) : + m_af_packet_completed(std::move(af_packet_completed)), + m_afpacket_handler([](std::vector<uint8_t>&& /*ignore*/){}) { } @@ -278,7 +277,6 @@ void TagDispatcher::setMaxDelay(int num_af_packets) } -#define AFPACKET_HEADER_LEN 10 // includes SYNC TagDispatcher::decode_result_t TagDispatcher::decode_afpacket( const std::vector<uint8_t> &input_data) { @@ -341,25 +339,30 @@ TagDispatcher::decode_result_t TagDispatcher::decode_afpacket( return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength + crclen}; } else { + vector<uint8_t> afpacket(AFPACKET_HEADER_LEN + taglength + crclen); + copy(input_data.begin(), + input_data.begin() + AFPACKET_HEADER_LEN + taglength + crclen, + afpacket.begin()); + m_afpacket_handler(std::move(afpacket)); + vector<uint8_t> payload(taglength); copy(input_data.begin() + AFPACKET_HEADER_LEN, input_data.begin() + AFPACKET_HEADER_LEN + taglength, payload.begin()); - return { - decode_tagpacket(payload) ? decode_state_e::Ok : decode_state_e::Error, - AFPACKET_HEADER_LEN + taglength + crclen}; + auto result = decode_tagpacket(payload) ? decode_state_e::Ok : decode_state_e::Error; + return {result, AFPACKET_HEADER_LEN + taglength + crclen}; } } void TagDispatcher::register_tag(const std::string& tag, tag_handler&& h) { - m_handlers[tag] = move(h); + m_handlers[tag] = std::move(h); } -void TagDispatcher::register_tagpacket_handler(tagpacket_handler&& h) +void TagDispatcher::register_afpacket_handler(afpacket_handler&& h) { - m_tagpacket_handler = move(h); + m_afpacket_handler = std::move(h); } @@ -428,8 +431,6 @@ bool TagDispatcher::decode_tagpacket(const vector<uint8_t> &payload) } } - m_tagpacket_handler(payload); - return success; } diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp index c3e6c40..fdd7424 100644 --- a/lib/edi/common.hpp +++ b/lib/edi/common.hpp @@ -32,12 +32,14 @@ namespace EdiDecoder { +constexpr size_t AFPACKET_HEADER_LEN = 10; // includes SYNC + struct frame_timestamp_t { uint32_t seconds = 0; uint32_t utco = 0; uint32_t tsta = 0xFFFFFF; // According to EN 300 797 Annex B - bool valid() const; + bool is_valid() const; std::string to_string() const; std::time_t to_unix_epoch() const; std::chrono::system_clock::time_point to_system_clock() const; @@ -133,9 +135,9 @@ class TagDispatcher { */ void register_tag(const std::string& tag, tag_handler&& h); - /* The complete tagpacket can also be retrieved */ - using tagpacket_handler = std::function<void(const std::vector<uint8_t>&)>; - void register_tagpacket_handler(tagpacket_handler&& h); + /* The complete AF packet can also be retrieved */ + using afpacket_handler = std::function<void(std::vector<uint8_t>&&)>; + void register_afpacket_handler(afpacket_handler&& h); seq_info_t get_seq_info() const { return m_last_sequences; @@ -160,7 +162,7 @@ class TagDispatcher { std::vector<uint8_t> m_input_data; std::map<std::string, tag_handler> m_handlers; std::function<void()> m_af_packet_completed; - tagpacket_handler m_tagpacket_handler; + afpacket_handler m_afpacket_handler; std::vector<std::string> m_ignored_tags; }; diff --git a/lib/edioutput/EDIConfig.h b/lib/edioutput/EDIConfig.h index 1997210..7016e87 100644 --- a/lib/edioutput/EDIConfig.h +++ b/lib/edioutput/EDIConfig.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2019 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -36,17 +36,31 @@ namespace edi { /** Configuration for EDI output */ +struct pft_settings_t { + // protection and fragmentation settings + bool verbose = false; + bool enable_pft = false; + unsigned chunk_len = 207; // RSk, data length of each chunk + unsigned fec = 0; // number of fragments that can be recovered + double fragment_spreading_factor = 0.95; + // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms) + // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets. +}; + struct destination_t { virtual ~destination_t() {}; + + pft_settings_t pft_settings = {}; }; + // Can represent both unicast and multicast destinations struct udp_destination_t : public destination_t { std::string dest_addr; - unsigned int dest_port = 0; + uint16_t dest_port = 0; std::string source_addr; - unsigned int source_port = 0; - unsigned int ttl = 10; + uint16_t source_port = 0; + uint8_t ttl = 10; }; // TCP server that can accept multiple connections @@ -66,16 +80,9 @@ struct tcp_client_t : public destination_t { }; struct configuration_t { - unsigned chunk_len = 207; // RSk, data length of each chunk - unsigned fec = 0; // number of fragments that can be recovered - bool dump = false; // dump a file with the EDI packets - bool verbose = false; - bool enable_pft = false; // Enable protection and fragmentation + bool verbose = false; unsigned int tagpacket_alignment = 0; std::vector<std::shared_ptr<destination_t> > destinations; - double fragment_spreading_factor = 0.95; - // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms) - // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets. bool enabled() const { return destinations.size() > 0; } diff --git a/lib/edioutput/PFT.cpp b/lib/edioutput/PFT.cpp index 7e0e8e9..f65fd67 100644 --- a/lib/edioutput/PFT.cpp +++ b/lib/edioutput/PFT.cpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2021 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -31,7 +31,6 @@ */ #include <vector> -#include <list> #include <cstdio> #include <cstring> #include <cstdint> @@ -41,6 +40,7 @@ #include "PFT.h" #include "crc.h" #include "ReedSolomon.h" +#include "Log.h" namespace edi { @@ -51,11 +51,10 @@ using namespace std; PFT::PFT() { } -PFT::PFT(const configuration_t &conf) : +PFT::PFT(const pft_settings_t& conf) : + m_enabled(conf.enable_pft), m_k(conf.chunk_len), m_m(conf.fec), - m_pseq(0), - m_num_chunks(0), m_verbose(conf.verbose) { if (m_k > 207) { @@ -324,5 +323,4 @@ void PFT::OverridePSeq(uint16_t pseq) m_pseq = pseq; } -} - +} // namespace edi diff --git a/lib/edioutput/PFT.h b/lib/edioutput/PFT.h index 42569a0..52e9f46 100644 --- a/lib/edioutput/PFT.h +++ b/lib/edioutput/PFT.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2021 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -33,12 +33,8 @@ #pragma once #include <vector> -#include <list> -#include <stdexcept> #include <cstdint> #include "AFPacket.h" -#include "Log.h" -#include "ReedSolomon.h" #include "EDIConfig.h" namespace edi { @@ -52,21 +48,24 @@ class PFT static constexpr int PARITYBYTES = 48; PFT(); - PFT(const configuration_t& conf); + PFT(const pft_settings_t& conf); + + bool is_enabled() const { return m_enabled and m_k > 0; } // return a list of PFT fragments with the correct // PFT headers - std::vector< PFTFragment > Assemble(AFPacket af_packet); + std::vector<PFTFragment> Assemble(AFPacket af_packet); // Apply Reed-Solomon FEC to the AF Packet RSBlock Protect(AFPacket af_packet); // Cut a RSBlock into several fragments that can be transmitted - std::vector< std::vector<uint8_t> > ProtectAndFragment(AFPacket af_packet); + std::vector<std::vector<uint8_t>> ProtectAndFragment(AFPacket af_packet); void OverridePSeq(uint16_t pseq); private: + bool m_enabled = false; unsigned int m_k = 207; // length of RS data word unsigned int m_m = 3; // number of fragments that can be recovered if lost uint16_t m_pseq = 0; diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp index 8ebb9fc..e9559b5 100644 --- a/lib/edioutput/Transport.cpp +++ b/lib/edioutput/Transport.cpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2022 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -25,7 +25,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ #include "Transport.h" -#include <iterator> +#include "Log.h" #include <cmath> #include <thread> @@ -57,13 +57,18 @@ void configuration_t::print() const else { throw logic_error("EDI destination not implemented"); } + etiLog.level(info) << " PFT=" << edi_dest->pft_settings.enable_pft; + if (edi_dest->pft_settings.enable_pft) { + etiLog.level(info) << " FEC=" << edi_dest->pft_settings.fec; + etiLog.level(info) << " Chunk Len=" << edi_dest->pft_settings.chunk_len; + etiLog.level(info) << " Fragment spreading factor=" << edi_dest->pft_settings.fragment_spreading_factor; + } } } Sender::Sender(const configuration_t& conf) : - m_conf(conf), - edi_pft(m_conf) + m_conf(conf) { if (m_conf.verbose) { etiLog.level(info) << "Setup EDI Output"; @@ -71,37 +76,39 @@ Sender::Sender(const configuration_t& conf) : for (const auto& edi_dest : m_conf.destinations) { if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) { - auto udp_socket = std::make_shared<Socket::UDPSocket>(udp_dest->source_port); + Socket::UDPSocket udp_socket(udp_dest->source_port); if (not udp_dest->source_addr.empty()) { - udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); - udp_socket->setMulticastTTL(udp_dest->ttl); + udp_socket.setMulticastSource(udp_dest->source_addr.c_str()); + udp_socket.setMulticastTTL(udp_dest->ttl); } - udp_sockets.emplace(udp_dest.get(), udp_socket); + auto sender = make_shared<udp_sender_t>( + udp_dest->dest_addr, + udp_dest->dest_port, + std::move(udp_socket)); + m_pft_spreaders.emplace_back( + make_shared<PFTSpreader>(udp_dest->pft_settings, sender)); } else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) { - auto dispatcher = make_shared<Socket::TCPDataDispatcher>( - tcp_dest->max_frames_queued, tcp_dest->tcp_server_preroll_buffers); - - dispatcher->start(tcp_dest->listen_port, "0.0.0.0"); - tcp_dispatchers.emplace(tcp_dest.get(), dispatcher); + auto sender = make_shared<tcp_dispatcher_t>( + tcp_dest->listen_port, + tcp_dest->max_frames_queued, + tcp_dest->tcp_server_preroll_buffers); + m_pft_spreaders.emplace_back( + make_shared<PFTSpreader>(tcp_dest->pft_settings, sender)); } else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { - auto tcp_send_client = make_shared<Socket::TCPSendClient>(tcp_dest->dest_addr, tcp_dest->dest_port); - tcp_senders.emplace(tcp_dest.get(), tcp_send_client); + auto sender = make_shared<tcp_send_client_t>(tcp_dest->dest_addr, tcp_dest->dest_port); + m_pft_spreaders.emplace_back( + make_shared<PFTSpreader>(tcp_dest->pft_settings, sender)); } else { throw logic_error("EDI destination not implemented"); } } - if (m_conf.dump) { - edi_debug_file.open("./edi.debug"); - } - - if (m_conf.enable_pft) { - unique_lock<mutex> lock(m_mutex); + { m_running = true; m_thread = thread(&Sender::run, this); } @@ -111,10 +118,52 @@ Sender::Sender(const configuration_t& conf) : } } +void Sender::write(const TagPacket& tagpacket) +{ + // Assemble into one AF Packet + edi::AFPacket af_packet = edi_af_packetiser.Assemble(tagpacket); + + write(af_packet); +} + +void Sender::write(const AFPacket& af_packet) +{ + for (auto& sender : m_pft_spreaders) { + sender->send_af_packet(af_packet); + } +} + +void Sender::override_af_sequence(uint16_t seq) +{ + edi_af_packetiser.OverrideSeq(seq); +} + +void Sender::override_pft_sequence(uint16_t pseq) +{ + for (auto& spreader : m_pft_spreaders) { + spreader->edi_pft.OverridePSeq(pseq); + } +} + +std::vector<Sender::stats_t> Sender::get_tcp_server_stats() const +{ + std::vector<Sender::stats_t> stats; + + for (auto& spreader : m_pft_spreaders) { + if (auto sender = std::dynamic_pointer_cast<tcp_dispatcher_t>(spreader->sender)) { + Sender::stats_t s; + s.listen_port = sender->listen_port; + s.stats = sender->sock.get_stats(); + stats.push_back(s); + } + } + + return stats; +} + Sender::~Sender() { { - unique_lock<mutex> lock(m_mutex); m_running = false; } @@ -123,36 +172,89 @@ Sender::~Sender() } } -void Sender::write(const TagPacket& tagpacket) +void Sender::run() { - // Assemble into one AF Packet - edi::AFPacket af_packet = edi_afPacketiser.Assemble(tagpacket); + while (m_running) { + const auto now = chrono::steady_clock::now(); + for (auto& spreader : m_pft_spreaders) { + spreader->tick(now); + } - write(af_packet); + this_thread::sleep_for(chrono::microseconds(500)); + } } -void Sender::write(const AFPacket& af_packet) + +void Sender::udp_sender_t::send_packet(const std::vector<uint8_t> &frame) +{ + Socket::InetAddress addr; + addr.resolveUdpDestination(dest_addr, dest_port); + sock.send(frame, addr); +} + +void Sender::tcp_dispatcher_t::send_packet(const std::vector<uint8_t> &frame) { - if (m_conf.enable_pft) { + sock.write(frame); +} + +void Sender::tcp_send_client_t::send_packet(const std::vector<uint8_t> &frame) +{ + sock.sendall(frame); +} + +Sender::udp_sender_t::udp_sender_t(std::string dest_addr, + uint16_t dest_port, + Socket::UDPSocket&& sock) : + dest_addr(dest_addr), + dest_port(dest_port), + sock(std::move(sock)) +{ +} + +Sender::tcp_dispatcher_t::tcp_dispatcher_t(uint16_t listen_port, + size_t max_frames_queued, + size_t tcp_server_preroll_buffers) : + listen_port(listen_port), + sock(max_frames_queued, tcp_server_preroll_buffers) +{ + sock.start(listen_port, "0.0.0.0"); +} + +Sender::tcp_send_client_t::tcp_send_client_t(const std::string &dest_addr, + uint16_t dest_port) : + sock(dest_addr, dest_port) +{ +} + +Sender::PFTSpreader::PFTSpreader(const pft_settings_t& conf, sender_sp sender) : + sender(sender), + edi_pft(conf) +{ +} + +void Sender::PFTSpreader::send_af_packet(const AFPacket& af_packet) +{ + using namespace std::chrono; + if (edi_pft.is_enabled()) { // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet); - if (m_conf.verbose and m_last_num_pft_fragments != edi_fragments.size()) { + if (settings.verbose and last_num_pft_fragments != edi_fragments.size()) { etiLog.log(debug, "EDI Output: Number of PFT fragments %zu\n", edi_fragments.size()); - m_last_num_pft_fragments = edi_fragments.size(); + last_num_pft_fragments = edi_fragments.size(); } /* Spread out the transmission of all fragments over part of the 24ms AF packet duration * to reduce the risk of losing a burst of fragments because of congestion. */ - using namespace std::chrono; auto inter_fragment_wait_time = microseconds(1); if (edi_fragments.size() > 1) { - if (m_conf.fragment_spreading_factor > 0) { + if (settings.fragment_spreading_factor > 0) { inter_fragment_wait_time = - microseconds( - llrint(m_conf.fragment_spreading_factor * 24000.0 / edi_fragments.size()) - ); + microseconds(llrint( + settings.fragment_spreading_factor * 24000.0 / + edi_fragments.size() + )); } } @@ -162,99 +264,35 @@ void Sender::write(const AFPacket& af_packet) auto tp = now; unique_lock<mutex> lock(m_mutex); for (auto& edi_frag : edi_fragments) { - m_pending_frames[tp] = move(edi_frag); + m_pending_frames[tp] = std::move(edi_frag); tp += inter_fragment_wait_time; } } - - // Transmission done in run() function } else /* PFT disabled */ { - // Send over ethernet - if (m_conf.dump) { - ostream_iterator<uint8_t> debug_iterator(edi_debug_file); - copy(af_packet.begin(), af_packet.end(), debug_iterator); - } - - for (auto& dest : m_conf.destinations) { - if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { - Socket::InetAddress addr; - addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); - - if (af_packet.size() > 1400 and not m_udp_fragmentation_warning_printed) { - fprintf(stderr, "EDI Output: AF packet larger than 1400," - " consider using PFT to avoid UP fragmentation.\n"); - m_udp_fragmentation_warning_printed = true; - } - - udp_sockets.at(udp_dest.get())->send(af_packet, addr); - } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { - tcp_dispatchers.at(tcp_dest.get())->write(af_packet); - } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { - tcp_senders.at(tcp_dest.get())->sendall(af_packet); - } - else { - throw logic_error("EDI destination not implemented"); - } - } + const auto now = steady_clock::now(); + unique_lock<mutex> lock(m_mutex); + m_pending_frames[now] = std::move(af_packet); } -} -void Sender::override_af_sequence(uint16_t seq) -{ - edi_afPacketiser.OverrideSeq(seq); + // Actual transmission done in tick() function } -void Sender::override_pft_sequence(uint16_t pseq) +void Sender::PFTSpreader::tick(const std::chrono::steady_clock::time_point& now) { - edi_pft.OverridePSeq(pseq); -} + unique_lock<mutex> lock(m_mutex); -void Sender::run() -{ - while (m_running) { - unique_lock<mutex> lock(m_mutex); - const auto now = chrono::steady_clock::now(); + for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) { + const auto& edi_frag = it->second; - // Send over ethernet - for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) { - const auto& edi_frag = it->second; - - if (it->first <= now) { - if (m_conf.dump) { - ostream_iterator<uint8_t> debug_iterator(edi_debug_file); - copy(edi_frag.begin(), edi_frag.end(), debug_iterator); - } - - for (auto& dest : m_conf.destinations) { - if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { - Socket::InetAddress addr; - addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); - - udp_sockets.at(udp_dest.get())->send(edi_frag, addr); - } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { - tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); - } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { - tcp_senders.at(tcp_dest.get())->sendall(edi_frag); - } - else { - throw logic_error("EDI destination not implemented"); - } - } - it = m_pending_frames.erase(it); - } - else { - ++it; - } + if (it->first <= now) { + sender->send_packet(edi_frag); + it = m_pending_frames.erase(it); + } + else { + ++it; } - - lock.unlock(); - this_thread::sleep_for(chrono::microseconds(500)); } } -} +} // namespace edi diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h index 6a3f229..b8a9008 100644 --- a/lib/edioutput/Transport.h +++ b/lib/edioutput/Transport.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2022 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -31,25 +31,23 @@ #include "AFPacket.h" #include "PFT.h" #include "Socket.h" -#include <vector> #include <chrono> #include <map> -#include <unordered_map> -#include <stdexcept> -#include <fstream> #include <cstdint> #include <thread> #include <mutex> +#include <vector> namespace edi { -/** STI sender for EDI output */ - +/** ETI/STI sender for EDI output */ class Sender { public: Sender(const configuration_t& conf); Sender(const Sender&) = delete; - Sender operator=(const Sender&) = delete; + Sender& operator=(const Sender&) = delete; + Sender(Sender&&) = delete; + Sender& operator=(Sender&&) = delete; ~Sender(); // Assemble the tagpacket into an AF packet, and if needed, @@ -66,33 +64,85 @@ class Sender { void override_af_sequence(uint16_t seq); void override_pft_sequence(uint16_t pseq); - private: - void run(); - - bool m_udp_fragmentation_warning_printed = false; + struct stats_t { + uint16_t listen_port; + std::vector<Socket::TCPConnection::stats_t> stats; + }; + std::vector<stats_t> get_tcp_server_stats() const; + private: configuration_t m_conf; - std::ofstream edi_debug_file; // The TagPacket will then be placed into an AFPacket - edi::AFPacketiser edi_afPacketiser; + edi::AFPacketiser edi_af_packetiser; - // The AF Packet will be protected with reed-solomon and split in fragments - edi::PFT edi_pft; + // PFT spreading requires sending UDP packets at specific time, + // independently of time when write() gets called + bool m_running = false; + std::thread m_thread; + virtual void run(); - std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets; - std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers; - std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSendClient>> tcp_senders; - // PFT spreading requires sending UDP packets at specific time, independently of - // time when write() gets called - std::thread m_thread; - std::mutex m_mutex; - bool m_running = false; - std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames; - size_t m_last_num_pft_fragments = 0; -}; -} + struct i_sender { + virtual void send_packet(const std::vector<uint8_t> &frame) = 0; + virtual ~i_sender() { } + }; + + struct udp_sender_t : public i_sender { + udp_sender_t( + std::string dest_addr, + uint16_t dest_port, + Socket::UDPSocket&& sock); + + std::string dest_addr; + uint16_t dest_port; + Socket::UDPSocket sock; + + virtual void send_packet(const std::vector<uint8_t> &frame) override; + }; + + struct tcp_dispatcher_t : public i_sender { + tcp_dispatcher_t( + uint16_t listen_port, + size_t max_frames_queued, + size_t tcp_server_preroll_buffers); + + uint16_t listen_port; + Socket::TCPDataDispatcher sock; + virtual void send_packet(const std::vector<uint8_t> &frame) override; + }; + + struct tcp_send_client_t : public i_sender { + tcp_send_client_t( + const std::string& dest_addr, + uint16_t dest_port); + + Socket::TCPSendClient sock; + virtual void send_packet(const std::vector<uint8_t> &frame) override; + }; + + class PFTSpreader { + public: + using sender_sp = std::shared_ptr<i_sender>; + PFTSpreader(const pft_settings_t &conf, sender_sp sender); + sender_sp sender; + edi::PFT edi_pft; + + void send_af_packet(const AFPacket &af_packet); + void tick(const std::chrono::steady_clock::time_point& now); + + private: + // send_af_packet() and tick() are called from different threads, both + // are accessing m_pending_frames + std::mutex m_mutex; + std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames; + pft_settings_t settings; + size_t last_num_pft_fragments = 0; + }; + + std::vector<std::shared_ptr<PFTSpreader>> m_pft_spreaders; +}; +} |