From 382bfbb308985ef05362810624c0c1f312586ba9 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sat, 12 Aug 2023 15:19:33 +0200 Subject: Reconcile with Common 2a455ba --- lib/Socket.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/Socket.cpp') diff --git a/lib/Socket.cpp b/lib/Socket.cpp index 10ec1ca..b71c01e 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -893,7 +893,7 @@ ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms) return 0; } - return 0; + throw std::logic_error("unreachable"); } void TCPClient::reconnect() -- cgit v1.2.3 From b006fa5dc614aa9745584a6fb73244b11da0b8af Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 1 Jul 2024 13:19:14 +0200 Subject: Common: 710f5e6, Socket multicast reception --- lib/Socket.cpp | 38 ++++++++++++++++++++++++++++++++------ lib/Socket.h | 1 + 2 files changed, 33 insertions(+), 6 deletions(-) (limited to 'lib/Socket.cpp') diff --git a/lib/Socket.cpp b/lib/Socket.cpp index b71c01e..1281066 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -193,6 +193,34 @@ void UDPSocket::reinit(int port, const std::string& name) } } +void UDPSocket::init_receive_multicast(int port, const string& local_if_addr, const string& mcastaddr) +{ + if (m_sock != INVALID_SOCKET) { + ::close(m_sock); + } + + m_port = port; + m_sock = ::socket(AF_INET, SOCK_DGRAM, 0); + + int reuse_setting = 1; + if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse_setting, sizeof(reuse_setting)) == -1) { + throw runtime_error("Can't reuse address"); + } + + struct sockaddr_in la; + memset((char *) &la, 0, sizeof(la)); + la.sin_family = AF_INET; + la.sin_port = htons(port); + la.sin_addr.s_addr = INADDR_ANY; + if (::bind(m_sock, (struct sockaddr*)&la, sizeof(la))) { + throw runtime_error(string("Could not bind: ") + strerror(errno)); + } + + joinGroup(mcastaddr.c_str(), local_if_addr.c_str()); + +} + + void UDPSocket::close() { if (m_sock != INVALID_SOCKET) { @@ -276,7 +304,7 @@ void UDPSocket::joinGroup(const char* groupname, const char* if_addr) throw runtime_error("Cannot convert multicast group name"); } if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { - throw runtime_error("Group name is not a multicast address"); + throw runtime_error(string("Group name '") + groupname + "' is not a multicast address"); } if (if_addr) { @@ -309,7 +337,7 @@ void UDPSocket::setMulticastTTL(int ttl) { if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) == SOCKET_ERROR) { - throw runtime_error(string("Can't set multicast ttl") + strerror(errno)); + throw runtime_error(string("Can't set multicast ttl: ") + strerror(errno)); } } @@ -327,15 +355,13 @@ void UDPReceiver::add_receive_port(int port, const string& bindto, const string& UDPSocket sock; if (IN_MULTICAST(ntohl(inet_addr(mcastaddr.c_str())))) { - sock.reinit(port, mcastaddr); - sock.setMulticastSource(bindto.c_str()); - sock.joinGroup(mcastaddr.c_str(), bindto.c_str()); + sock.init_receive_multicast(port, bindto, mcastaddr); } else { sock.reinit(port, bindto); } - m_sockets.push_back(move(sock)); + m_sockets.push_back(std::move(sock)); } vector UDPReceiver::receive(int timeout_ms) diff --git a/lib/Socket.h b/lib/Socket.h index d8242e2..44f93d0 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -111,6 +111,7 @@ class UDPSocket /** Close the already open socket, and create a new one. Throws a runtime_error on error. */ void reinit(int port); void reinit(int port, const std::string& name); + void init_receive_multicast(int port, const std::string& local_if_addr, const std::string& mcastaddr); void close(void); void send(UDPPacket& packet); -- cgit v1.2.3 From 9b18142f07d9d87bd9eced148a245b1d37bda29a Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 20 Aug 2024 15:01:25 +0200 Subject: Update common: improve multicast input and incomplete timestamps --- ChangeLog | 3 ++ doc/advanced.mux | 16 +++++++++ lib/Socket.cpp | 103 +++++++++++++++++++++++++++++++++++++++++------------ lib/Socket.h | 10 ++++-- lib/edi/common.cpp | 6 ++-- lib/edi/common.hpp | 2 +- src/input/Edi.cpp | 4 +-- 7 files changed, 112 insertions(+), 32 deletions(-) (limited to 'lib/Socket.cpp') diff --git a/ChangeLog b/ChangeLog index 6210fa3..5ede82f 100644 --- a/ChangeLog +++ b/ChangeLog @@ -4,6 +4,9 @@ ODR-DabMux in this repository upcoming: Make compatible with easydab again Remove odr-zmq2edi + Fix timestamp issue with EDI streams that have seconds=0 + Fix receiving multicast streams, when several multicast groups are + on the same port. 2024-05-05: Matthias P. Braendli (v4.5.0): diff --git a/doc/advanced.mux b/doc/advanced.mux index 35757cd..246f981 100644 --- a/doc/advanced.mux +++ b/doc/advanced.mux @@ -201,6 +201,22 @@ subchannels { inputproto sti inputuri "rtp://127.0.0.1:32010" } + sub-udp { + type dabplus + bitrate 96 + id 1 + protection 3 + inputproto edi + + ; Receive EDI/UDP unicast on port 32010 + inputuri "udp://:32010" + + ; Receive EDI/UDP multicast stream on group 239.10.11.12 port 32010 + ;inputuri "udp://@239.10.11.12:32010" + + ; Same, but specify local interface address 192.168.0.1, to select which local interface to use + ;inputuri "udp://192.168.0.10@239.10.11.12:32010" + } sub-ri { type dabplus bitrate 96 diff --git a/lib/Socket.cpp b/lib/Socket.cpp index 1281066..bcffb07 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -24,7 +24,7 @@ #include "Socket.h" -#include +#include #include #include #include @@ -106,16 +106,20 @@ UDPSocket::UDPSocket(UDPSocket&& other) { m_sock = other.m_sock; m_port = other.m_port; + m_multicast_source = other.m_multicast_source; other.m_port = 0; other.m_sock = INVALID_SOCKET; + other.m_multicast_source = ""; } const UDPSocket& UDPSocket::operator=(UDPSocket&& other) { m_sock = other.m_sock; m_port = other.m_port; + m_multicast_source = other.m_multicast_source; other.m_port = 0; other.m_sock = INVALID_SOCKET; + other.m_multicast_source = ""; return *this; } @@ -144,6 +148,7 @@ void UDPSocket::reinit(int port, const std::string& name) // No need to bind to a given port, creating the // socket is enough m_sock = ::socket(AF_INET, SOCK_DGRAM, 0); + post_init(); return; } @@ -180,6 +185,7 @@ void UDPSocket::reinit(int port, const std::string& name) if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { m_sock = sfd; + post_init(); break; } @@ -193,6 +199,14 @@ void UDPSocket::reinit(int port, const std::string& name) } } +void UDPSocket::post_init() { + int pktinfo = 1; + if (setsockopt(m_sock, IPPROTO_IP, IP_PKTINFO, &pktinfo, sizeof(pktinfo)) == SOCKET_ERROR) { + throw runtime_error(string("Can't request pktinfo: ") + strerror(errno)); + } + +} + void UDPSocket::init_receive_multicast(int port, const string& local_if_addr, const string& mcastaddr) { if (m_sock != INVALID_SOCKET) { @@ -201,9 +215,10 @@ void UDPSocket::init_receive_multicast(int port, const string& local_if_addr, co m_port = port; m_sock = ::socket(AF_INET, SOCK_DGRAM, 0); + post_init(); int reuse_setting = 1; - if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse_setting, sizeof(reuse_setting)) == -1) { + if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse_setting, sizeof(reuse_setting)) == SOCKET_ERROR) { throw runtime_error("Can't reuse address"); } @@ -216,8 +231,8 @@ void UDPSocket::init_receive_multicast(int port, const string& local_if_addr, co throw runtime_error(string("Could not bind: ") + strerror(errno)); } - joinGroup(mcastaddr.c_str(), local_if_addr.c_str()); - + m_multicast_source = mcastaddr; + join_group(mcastaddr.c_str(), local_if_addr.c_str()); } @@ -240,16 +255,26 @@ UDPSocket::~UDPSocket() UDPPacket UDPSocket::receive(size_t max_size) { + struct sockaddr_in addr; + struct msghdr msg; + struct iovec iov; + constexpr size_t BUFFER_SIZE = 1024; + char control_buffer[BUFFER_SIZE]; + struct cmsghdr *cmsg; + UDPPacket packet(max_size); - socklen_t addrSize; - addrSize = sizeof(*packet.address.as_sockaddr()); - ssize_t ret = recvfrom(m_sock, - packet.buffer.data(), - packet.buffer.size(), - 0, - packet.address.as_sockaddr(), - &addrSize); + memset(&msg, 0, sizeof(msg)); + msg.msg_name = &addr; + msg.msg_namelen = sizeof(addr); + msg.msg_iov = &iov; + iov.iov_base = packet.buffer.data(); + iov.iov_len = packet.buffer.size(); + msg.msg_iovlen = 1; + msg.msg_control = control_buffer; + msg.msg_controllen = sizeof(control_buffer); + + ssize_t ret = recvmsg(m_sock, &msg, 0); if (ret == SOCKET_ERROR) { packet.buffer.resize(0); @@ -260,12 +285,42 @@ UDPPacket UDPSocket::receive(size_t max_size) if (errno == EAGAIN or errno == EWOULDBLOCK) #endif { - return 0; + return packet; } throw runtime_error(string("Can't receive data: ") + strerror(errno)); } - packet.buffer.resize(ret); + struct in_pktinfo *pktinfo = nullptr; + for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) { + pktinfo = (struct in_pktinfo *)CMSG_DATA(cmsg); + break; + } + } + + if (pktinfo) { + char src_addr[INET_ADDRSTRLEN]; + char dst_addr[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &(addr.sin_addr), src_addr, INET_ADDRSTRLEN); + inet_ntop(AF_INET, &(pktinfo->ipi_addr), dst_addr, INET_ADDRSTRLEN); + //fprintf(stderr, "Received packet from %s to %s: %zu\n", src_addr, dst_addr, ret); + + memcpy(&packet.address.addr, &addr, sizeof(addr)); + + if (m_multicast_source.empty() or + strcmp(dst_addr, m_multicast_source.c_str()) == 0) { + packet.buffer.resize(ret); + } + else { + // Ignore packet for different multicast group + packet.buffer.resize(0); + } + } + else { + //fprintf(stderr, "No pktinfo: %zu\n", ret); + packet.buffer.resize(ret); + } + return packet; } @@ -297,7 +352,7 @@ void UDPSocket::send(const std::string& data, InetAddress destination) } } -void UDPSocket::joinGroup(const char* groupname, const char* if_addr) +void UDPSocket::join_group(const char* groupname, const char* if_addr) { ip_mreqn group; if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { @@ -316,7 +371,7 @@ void UDPSocket::joinGroup(const char* groupname, const char* if_addr) group.imr_ifindex = 0; if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) == SOCKET_ERROR) { - throw runtime_error(string("Can't join multicast group") + strerror(errno)); + throw runtime_error(string("Can't join multicast group: ") + strerror(errno)); } } @@ -324,12 +379,12 @@ void UDPSocket::setMulticastSource(const char* source_addr) { struct in_addr addr; if (inet_aton(source_addr, &addr) == 0) { - throw runtime_error(string("Can't parse source address") + strerror(errno)); + throw runtime_error(string("Can't parse source address: ") + strerror(errno)); } if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) == SOCKET_ERROR) { - throw runtime_error(string("Can't set source address") + strerror(errno)); + throw runtime_error(string("Can't set source address: ") + strerror(errno)); } } @@ -392,11 +447,13 @@ vector UDPReceiver::receive(int timeout_ms) for (size_t i = 0; i < m_sockets.size(); i++) { if (fds[i].revents & POLLIN) { auto p = m_sockets[i].receive(2048); // This is larger than the usual MTU - ReceivedPacket rp; - rp.packetdata = move(p.buffer); - rp.received_from = move(p.address); - rp.port_received_on = m_sockets[i].getPort(); - received.push_back(move(rp)); + if (not p.buffer.empty()) { + ReceivedPacket rp; + rp.packetdata = std::move(p.buffer); + rp.received_from = std::move(p.address); + rp.port_received_on = m_sockets[i].getPort(); + received.push_back(std::move(rp)); + } } } diff --git a/lib/Socket.h b/lib/Socket.h index 44f93d0..1320a64 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -2,7 +2,7 @@ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2022 + Copyright (C) 2024 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -31,7 +31,7 @@ #include "ThreadsafeQueue.h" #include #include -#include +#include #include #include #include @@ -118,7 +118,6 @@ class UDPSocket void send(const std::vector& data, InetAddress destination); void send(const std::string& data, InetAddress destination); UDPPacket receive(size_t max_size); - void joinGroup(const char* groupname, const char* if_addr = nullptr); void setMulticastSource(const char* source_addr); void setMulticastTTL(int ttl); @@ -130,9 +129,14 @@ class UDPSocket SOCKET getNativeSocket() const; int getPort() const; + private: + void join_group(const char* groupname, const char* if_addr = nullptr); + void post_init(); + protected: SOCKET m_sock = INVALID_SOCKET; int m_port = 0; + std::string m_multicast_source = ""; }; /* UDP packet receiver supporting receiving from several ports at once */ diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp index b314737..38eadf9 100644 --- a/lib/edi/common.cpp +++ b/lib/edi/common.cpp @@ -33,9 +33,9 @@ namespace EdiDecoder { using namespace std; -bool frame_timestamp_t::valid() const +bool frame_timestamp_t::is_valid() const { - return tsta != 0xFFFFFF; + return tsta != 0xFFFFFF and seconds != 0; } string frame_timestamp_t::to_string() const @@ -43,7 +43,7 @@ string frame_timestamp_t::to_string() const const time_t seconds_in_unix_epoch = to_unix_epoch(); stringstream ss; - if (valid()) { + if (is_valid()) { ss << "Timestamp: "; } else { diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp index f273ecf..fdd7424 100644 --- a/lib/edi/common.hpp +++ b/lib/edi/common.hpp @@ -39,7 +39,7 @@ struct frame_timestamp_t { uint32_t utco = 0; uint32_t tsta = 0xFFFFFF; // According to EN 300 797 Annex B - bool valid() const; + bool is_valid() const; std::string to_string() const; std::time_t to_unix_epoch() const; std::chrono::system_clock::time_point to_system_clock() const; diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp index fc380d8..141641f 100644 --- a/src/input/Edi.cpp +++ b/src/input/Edi.cpp @@ -254,7 +254,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc while (not m_pending_sti_frame.frame.empty()) { if (m_pending_sti_frame.frame.size() == size) { - if (m_pending_sti_frame.timestamp.valid()) { + if (m_pending_sti_frame.timestamp.is_valid()) { auto ts_req = EdiDecoder::frame_timestamp_t::from_unix_epoch(seconds, utco, tsta); ts_req += m_tist_delay; const double offset = ts_req.diff_s(m_pending_sti_frame.timestamp); @@ -324,7 +324,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc m_is_prebuffering = true; return 0; } - else if (not m_pending_sti_frame.timestamp.valid()) { + else if (not m_pending_sti_frame.timestamp.is_valid()) { etiLog.level(warn) << "EDI input " << m_name << " invalid timestamp, ignoring"; memset(buffer, 0, size); -- cgit v1.2.3 From cfbb956903a3f424ced24bd20cbcc2b9727fb9a2 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 30 Sep 2024 08:56:06 +0200 Subject: Improve could not bind UDP error --- lib/Socket.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/Socket.cpp') diff --git a/lib/Socket.cpp b/lib/Socket.cpp index bcffb07..2df1559 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -195,7 +195,7 @@ void UDPSocket::reinit(int port, const std::string& name) freeaddrinfo(result); if (rp == nullptr) { - throw runtime_error("Could not bind"); + throw runtime_error(string{"Could not bind to port "} + to_string(port)); } } -- cgit v1.2.3 From 30a100d4fbc17972d71f75695441bbedeee076ef Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Wed, 4 Dec 2024 08:41:03 +0100 Subject: common: Update Socket --- lib/Socket.cpp | 36 +++++++++++++++++++++++++++++++----- lib/Socket.h | 25 +++++++++++++++++++++---- lib/edioutput/Transport.cpp | 10 +++++++++- lib/edioutput/Transport.h | 2 -- 4 files changed, 61 insertions(+), 12 deletions(-) (limited to 'lib/Socket.cpp') diff --git a/lib/Socket.cpp b/lib/Socket.cpp index 2df1559..938b573 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -478,7 +478,7 @@ TCPSocket::~TCPSocket() TCPSocket::TCPSocket(TCPSocket&& other) : m_sock(other.m_sock), - m_remote_address(move(other.m_remote_address)) + m_remote_address(std::move(other.m_remote_address)) { if (other.m_sock != -1) { other.m_sock = -1; @@ -967,12 +967,22 @@ ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms) reconnect(); } + m_last_received_packet_ts = chrono::steady_clock::now(); + return ret; } catch (const TCPSocket::Interrupted&) { return -1; } catch (const TCPSocket::Timeout&) { + const auto timeout = chrono::milliseconds(timeout_ms * 5); + if (m_last_received_packet_ts.has_value() and + chrono::steady_clock::now() - *m_last_received_packet_ts > timeout) + { + // This is to catch half-closed TCP connections + reconnect(); + } + return 0; } @@ -983,6 +993,7 @@ void TCPClient::reconnect() { TCPSocket newsock; m_sock = std::move(newsock); + m_last_received_packet_ts = nullopt; m_sock.connect(m_hostname, m_port, true); } @@ -990,7 +1001,7 @@ TCPConnection::TCPConnection(TCPSocket&& sock) : queue(), m_running(true), m_sender_thread(), - m_sock(move(sock)) + m_sock(std::move(sock)) { #if MISSING_OWN_ADDR auto own_addr = m_sock.getOwnAddress(); @@ -1109,7 +1120,7 @@ void TCPDataDispatcher::process() auto sock = m_listener_socket.accept(timeout_ms); if (sock.valid()) { auto lock = unique_lock(m_mutex); - m_connections.emplace(m_connections.begin(), move(sock)); + m_connections.emplace(m_connections.begin(), std::move(sock)); if (m_buffers_to_preroll > 0) { for (const auto& buf : m_preroll_queue) { @@ -1181,7 +1192,7 @@ void TCPReceiveServer::process() } else { buf.resize(r); - m_queue.push(make_shared(move(buf))); + m_queue.push(make_shared(std::move(buf))); } } catch (const TCPSocket::Interrupted&) { @@ -1222,7 +1233,7 @@ TCPSendClient::~TCPSendClient() } } -void TCPSendClient::sendall(const std::vector& buffer) +TCPSendClient::ErrorStats TCPSendClient::sendall(const std::vector& buffer) { if (not m_running) { throw runtime_error(m_exception_data); @@ -1234,6 +1245,17 @@ void TCPSendClient::sendall(const std::vector& buffer) vector discard; m_queue.try_pop(discard); } + + TCPSendClient::ErrorStats es; + es.num_reconnects = m_num_reconnects.load(); + + es.has_seen_new_errors = es.num_reconnects != m_num_reconnects_prev; + m_num_reconnects_prev = es.num_reconnects; + + auto lock = unique_lock(m_error_mutex); + es.last_error = m_last_error; + + return es; } void TCPSendClient::process() @@ -1255,12 +1277,16 @@ void TCPSendClient::process() } else { try { + m_num_reconnects.fetch_add(1, std::memory_order_seq_cst); m_sock.connect(m_hostname, m_port); m_is_connected = true; } catch (const runtime_error& e) { m_is_connected = false; this_thread::sleep_for(chrono::seconds(1)); + + auto lock = unique_lock(m_error_mutex); + m_last_error = e.what(); } } } diff --git a/lib/Socket.h b/lib/Socket.h index 1320a64..7709145 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -31,9 +31,11 @@ #include "ThreadsafeQueue.h" #include #include -#include +#include #include #include +#include +#include #include #include @@ -236,6 +238,8 @@ class TCPClient { TCPSocket m_sock; std::string m_hostname; int m_port; + + std::optional m_last_received_packet_ts; }; /* Helper class for TCPDataDispatcher, contains a queue of pending data and @@ -329,10 +333,18 @@ class TCPSendClient { public: TCPSendClient(const std::string& hostname, int port); ~TCPSendClient(); + TCPSendClient(const TCPSendClient&) = delete; + TCPSendClient& operator=(const TCPSendClient&) = delete; - /* Throws a runtime_error on error - */ - void sendall(const std::vector& buffer); + + struct ErrorStats { + std::string last_error = ""; + size_t num_reconnects = 0; + bool has_seen_new_errors = false; + }; + + /* Throws a runtime_error when the process thread isn't running */ + ErrorStats sendall(const std::vector& buffer); private: void process(); @@ -349,6 +361,11 @@ class TCPSendClient { std::string m_exception_data; std::thread m_sender_thread; TCPSocket m_listener_socket; + + std::atomic m_num_reconnects = ATOMIC_VAR_INIT(0); + size_t m_num_reconnects_prev = 0; + std::mutex m_error_mutex; + std::string m_last_error = ""; }; } diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp index 8ebb9fc..4979e93 100644 --- a/lib/edioutput/Transport.cpp +++ b/lib/edioutput/Transport.cpp @@ -193,7 +193,15 @@ void Sender::write(const AFPacket& af_packet) tcp_dispatchers.at(tcp_dest.get())->write(af_packet); } else if (auto tcp_dest = dynamic_pointer_cast(dest)) { - tcp_senders.at(tcp_dest.get())->sendall(af_packet); + const auto error_stats = tcp_senders.at(tcp_dest.get())->sendall(af_packet); + + if (m_conf.verbose and error_stats.has_seen_new_errors) { + fprintf(stderr, "TCP output %s:%d has %zu reconnects: most recent error: %s\n", + tcp_dest->dest_addr.c_str(), + tcp_dest->dest_port, + error_stats.num_reconnects, + error_stats.last_error.c_str()); + } } else { throw logic_error("EDI destination not implemented"); diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h index 6a3f229..c62545c 100644 --- a/lib/edioutput/Transport.h +++ b/lib/edioutput/Transport.h @@ -31,11 +31,9 @@ #include "AFPacket.h" #include "PFT.h" #include "Socket.h" -#include #include #include #include -#include #include #include #include -- cgit v1.2.3 From 6517cc3078eba96ea96e085d033a4b8a96eb7151 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 11 Mar 2025 16:35:08 +0100 Subject: Add EDI/TCP number of active connections statistics --- doc/STATS.md | 7 +++-- doc/show_dabmux_stats.py | 22 +++++++++++++ lib/Socket.cpp | 22 +++++++++++++ lib/Socket.h | 10 ++++++ lib/ThreadsafeQueue.h | 38 +++++++++++++++-------- lib/edioutput/Transport.cpp | 16 +++++++++- lib/edioutput/Transport.h | 11 +++++-- src/DabMultiplexer.cpp | 7 +++++ src/ManagementServer.cpp | 75 ++++++++++++++++++++++++++++++++------------- src/ManagementServer.h | 20 ++++++++---- 10 files changed, 181 insertions(+), 47 deletions(-) (limited to 'lib/Socket.cpp') diff --git a/doc/STATS.md b/doc/STATS.md index 385d41e..435a92e 100644 --- a/doc/STATS.md +++ b/doc/STATS.md @@ -4,12 +4,13 @@ Stats available through Management Server Interface --------- -The management server makes statistics about the inputs available through a ZMQ request/reply socket. +The management server makes statistics about the inputs and EDI/TCP outputs +available through a ZMQ request/reply socket. The `show_dabmux_stats.py` illustrates how to access this information. -Meaning of values ------------------ +Meaning of values for inputs +---------------------------- `max` and `min` indicate input buffer fullness in bytes. diff --git a/doc/show_dabmux_stats.py b/doc/show_dabmux_stats.py index 7ea60f7..3b6d869 100755 --- a/doc/show_dabmux_stats.py +++ b/doc/show_dabmux_stats.py @@ -46,6 +46,7 @@ if len(sys.argv) == 1: data = sock.recv().decode("utf-8") values = json.loads(data)['values'] + print("## INPUT STATS") tmpl = "{ident:20}{maxfill:>8}{minfill:>8}{under:>8}{over:>8}{audioleft:>8}{audioright:>8}{peakleft:>8}{peakright:>8}{state:>16}{version:>48}{uptime:>8}{offset:>8}" print(tmpl.format( ident="id", @@ -89,6 +90,27 @@ if len(sys.argv) == 1: uptime=v['uptime'], offset=v['last_tist_offset'])) + sock.send(b"output_values") + + poller = zmq.Poller() + poller.register(sock, zmq.POLLIN) + + socks = dict(poller.poll(1000)) + if socks: + if socks.get(sock) == zmq.POLLIN: + print() + print("## OUTPUT STATS") + data = sock.recv().decode("utf-8") + values = json.loads(data)['output_values'] + for identifier in values: + if identifier.startswith("edi_tcp_"): + listen_port = identifier.rsplit("_", 1)[-1] + num_connections = values[identifier]["num_connections"] + print(f"EDI TCP on port {listen_port}: {num_connections} connections") + else: + print(f"Unknown output type: {identifier}") + + elif len(sys.argv) == 2 and sys.argv[1] == "config": sock = connect() diff --git a/lib/Socket.cpp b/lib/Socket.cpp index 938b573..5c920d7 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -24,6 +24,7 @@ #include "Socket.h" +#include #include #include #include @@ -1063,6 +1064,17 @@ void TCPConnection::process() #endif } +TCPConnection::stats_t TCPConnection::get_stats() const +{ + TCPConnection::stats_t s; + const vector buffer_sizes = queue.map( + [](const vector& vec) { return vec.size(); } + ); + + s.buffer_fullness = std::accumulate(buffer_sizes.cbegin(), buffer_sizes.cend(), 0); + s.remote_address = m_sock.get_remote_address(); + return s; +} TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll) : m_max_queue_size(max_queue_size), @@ -1136,6 +1148,16 @@ void TCPDataDispatcher::process() } } + +std::vector TCPDataDispatcher::get_stats() const +{ + std::vector s; + for (const auto& conn : m_connections) { + s.push_back(conn.get_stats()); + } + return s; +} + TCPReceiveServer::TCPReceiveServer(size_t blocksize) : m_blocksize(blocksize) { diff --git a/lib/Socket.h b/lib/Socket.h index 7709145..29b618a 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -213,6 +213,8 @@ class TCPSocket { SOCKET get_sockfd() const { return m_sock; } + InetAddress get_remote_address() const { return m_remote_address; } + private: explicit TCPSocket(int sockfd); explicit TCPSocket(int sockfd, InetAddress remote_address); @@ -254,6 +256,12 @@ class TCPConnection ThreadsafeQueue > queue; + struct stats_t { + size_t buffer_fullness = 0; + InetAddress remote_address; + }; + stats_t get_stats() const; + private: std::atomic m_running; std::thread m_sender_thread; @@ -276,6 +284,8 @@ class TCPDataDispatcher void start(int port, const std::string& address); void write(const std::vector& data); + std::vector get_stats() const; + private: void process(); diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h index 8b385d6..13bc19e 100644 --- a/lib/ThreadsafeQueue.h +++ b/lib/ThreadsafeQueue.h @@ -2,7 +2,7 @@ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2023 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li An implementation for a threadsafe queue, depends on C++11 @@ -28,6 +28,7 @@ #pragma once +#include #include #include #include @@ -63,10 +64,10 @@ public: std::unique_lock lock(the_mutex); size_t queue_size_before = the_queue.size(); if (max_size == 0) { - the_queue.push(val); + the_queue.push_back(val); } else if (queue_size_before < max_size) { - the_queue.push(val); + the_queue.push_back(val); } size_t queue_size = the_queue.size(); lock.unlock(); @@ -80,10 +81,10 @@ public: std::unique_lock lock(the_mutex); size_t queue_size_before = the_queue.size(); if (max_size == 0) { - the_queue.emplace(std::move(val)); + the_queue.emplace_back(std::move(val)); } else if (queue_size_before < max_size) { - the_queue.emplace(std::move(val)); + the_queue.emplace_back(std::move(val)); } size_t queue_size = the_queue.size(); lock.unlock(); @@ -110,9 +111,9 @@ public: bool overflow = false; while (the_queue.size() >= max_size) { overflow = true; - the_queue.pop(); + the_queue.pop_front(); } - the_queue.push(val); + the_queue.push_back(val); const size_t queue_size = the_queue.size(); lock.unlock(); @@ -129,9 +130,9 @@ public: bool overflow = false; while (the_queue.size() >= max_size) { overflow = true; - the_queue.pop(); + the_queue.pop_front(); } - the_queue.emplace(std::move(val)); + the_queue.emplace_back(std::move(val)); const size_t queue_size = the_queue.size(); lock.unlock(); @@ -152,7 +153,7 @@ public: while (the_queue.size() >= threshold) { the_tx_notification.wait(lock); } - the_queue.push(val); + the_queue.push_back(val); size_t queue_size = the_queue.size(); lock.unlock(); @@ -198,7 +199,7 @@ public: } popped_value = the_queue.front(); - the_queue.pop(); + the_queue.pop_front(); lock.unlock(); the_tx_notification.notify_one(); @@ -220,15 +221,26 @@ public: } else { std::swap(popped_value, the_queue.front()); - the_queue.pop(); + the_queue.pop_front(); lock.unlock(); the_tx_notification.notify_one(); } } + template + std::vector map(std::function func) const + { + std::vector result; + std::unique_lock lock(the_mutex); + for (const T& elem : the_queue) { + result.push_back(func(elem)); + } + return result; + } + private: - std::queue the_queue; + std::deque the_queue; mutable std::mutex the_mutex; std::condition_variable the_rx_notification; std::condition_variable the_tx_notification; diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp index 4979e93..a5e0bc3 100644 --- a/lib/edioutput/Transport.cpp +++ b/lib/edioutput/Transport.cpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2022 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -220,6 +220,20 @@ void Sender::override_pft_sequence(uint16_t pseq) edi_pft.OverridePSeq(pseq); } +std::vector Sender::get_tcp_server_stats() const +{ + std::vector stats; + + for (auto& el : tcp_dispatchers) { + Sender::stats_t s; + s.listen_port = el.first->listen_port; + s.stats = el.second->get_stats(); + stats.push_back(s); + } + + return stats; +} + void Sender::run() { while (m_running) { diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h index c62545c..2ca638e 100644 --- a/lib/edioutput/Transport.h +++ b/lib/edioutput/Transport.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2022 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -38,10 +38,11 @@ #include #include #include +#include namespace edi { -/** STI sender for EDI output */ +/** ETI/STI sender for EDI output */ class Sender { public: @@ -64,6 +65,12 @@ class Sender { void override_af_sequence(uint16_t seq); void override_pft_sequence(uint16_t pseq); + struct stats_t { + uint16_t listen_port; + std::vector stats; + }; + std::vector get_tcp_server_stats() const; + private: void run(); diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp index a68f09a..b9575fc 100644 --- a/src/DabMultiplexer.cpp +++ b/src/DabMultiplexer.cpp @@ -28,6 +28,7 @@ #include #include "DabMultiplexer.h" #include "ConfigParser.h" +#include "ManagementServer.h" #include "crc.h" #include "utils.h" @@ -795,6 +796,12 @@ void DabMultiplexer::mux_frame(std::vector >& outputs } edi_sender->write(edi_tagpacket); + + for (const auto& stat : edi_sender->get_tcp_server_stats()) { + get_mgmt_server().update_edi_tcp_output_stat( + stat.listen_port, + stat.stats.size()); + } } #if _DEBUG diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp index 568e80e..dff093a 100644 --- a/src/ManagementServer.cpp +++ b/src/ManagementServer.cpp @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -28,13 +28,12 @@ along with ODR-DabMux. If not, see . */ -#include -#include -#include -#include -#include #include #include +#include +#include +#include +#include #include #include "ManagementServer.h" #include "Log.h" @@ -127,37 +126,42 @@ ManagementServer& get_mgmt_server() */ } -void ManagementServer::registerInput(InputStat* is) +void ManagementServer::register_input(InputStat* is) { unique_lock lock(m_statsmutex); std::string id(is->get_name()); - if (m_inputStats.count(id) == 1) { + if (m_input_stats.count(id) == 1) { etiLog.level(error) << "Double registration in MGMT Server with id '" << id << "'"; return; } - m_inputStats[id] = is; + m_input_stats[id] = is; } -void ManagementServer::unregisterInput(std::string id) +void ManagementServer::unregister_input(std::string id) { unique_lock lock(m_statsmutex); - if (m_inputStats.count(id) == 1) { - m_inputStats.erase(id); + if (m_input_stats.count(id) == 1) { + m_input_stats.erase(id); } } +// outputs will never disappear, no need to have a "remove" logic +void ManagementServer::update_edi_tcp_output_stat(uint16_t listen_port, size_t num_connections) +{ + m_output_stats[listen_port] = num_connections; +} bool ManagementServer::isInputRegistered(std::string& id) { unique_lock lock(m_statsmutex); - if (m_inputStats.count(id) == 0) { + if (m_input_stats.count(id) == 0) { etiLog.level(error) << "Management Server: id '" << id << "' does was not registered"; @@ -166,7 +170,7 @@ bool ManagementServer::isInputRegistered(std::string& id) return true; } -std::string ManagementServer::getStatConfigJSON() +std::string ManagementServer::get_input_config_json() { unique_lock lock(m_statsmutex); @@ -175,7 +179,7 @@ std::string ManagementServer::getStatConfigJSON() std::map::iterator iter; int i = 0; - for(iter = m_inputStats.begin(); iter != m_inputStats.end(); + for (iter = m_input_stats.begin(); iter != m_input_stats.end(); ++iter, i++) { std::string id = iter->first; @@ -192,16 +196,15 @@ std::string ManagementServer::getStatConfigJSON() return ss.str(); } -std::string ManagementServer::getValuesJSON() +std::string ManagementServer::get_input_values_json() { unique_lock lock(m_statsmutex); std::ostringstream ss; ss << "{ \"values\" : {\n"; - std::map::iterator iter; int i = 0; - for(iter = m_inputStats.begin(); iter != m_inputStats.end(); + for (auto iter = m_input_stats.begin(); iter != m_input_stats.end(); ++iter, i++) { const std::string& id = iter->first; @@ -220,6 +223,31 @@ std::string ManagementServer::getValuesJSON() return ss.str(); } +std::string ManagementServer::get_output_values_json() +{ + unique_lock lock(m_statsmutex); + + std::ostringstream ss; + ss << "{ \"output_values\" : {\n"; + + int i = 0; + for (auto iter = m_output_stats.begin(); iter != m_output_stats.end(); + ++iter, i++) + { + auto listen_port = iter->first; + auto num_connections = iter->second; + if (i > 0) { + ss << " ,\n"; + } + ss << " \"edi_tcp_" << listen_port << "\" : { \"num_connections\": " << + num_connections << "} "; + } + + ss << "}\n}\n"; + + return ss.str(); +} + ManagementServer::ManagementServer() : m_zmq_context(), m_zmq_sock(m_zmq_context, ZMQ_REP), @@ -323,10 +351,13 @@ void ManagementServer::handle_message(zmq::message_t& zmq_message) << "}\n"; } else if (data == "config") { - answer << getStatConfigJSON(); + answer << get_input_config_json(); } else if (data == "values") { - answer << getValuesJSON(); + answer << get_input_values_json(); + } + else if (data == "output_values") { + answer << get_output_values_json(); } else if (data == "getptree") { unique_lock lock(m_configmutex); @@ -366,12 +397,12 @@ InputStat::InputStat(const std::string& name) : InputStat::~InputStat() { - get_mgmt_server().unregisterInput(m_name); + get_mgmt_server().unregister_input(m_name); } void InputStat::registerAtServer() { - get_mgmt_server().registerInput(this); + get_mgmt_server().register_input(this); } void InputStat::notifyBuffer(long bufsize) diff --git a/src/ManagementServer.h b/src/ManagementServer.h index 6e39922..c7a4222 100644 --- a/src/ManagementServer.h +++ b/src/ManagementServer.h @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -50,6 +50,7 @@ # include "config.h" #endif +#include "Socket.h" #include "zmq.hpp" #include #include @@ -167,8 +168,10 @@ class ManagementServer void open(int listenport); /* Un-/Register a statistics data source */ - void registerInput(InputStat* is); - void unregisterInput(std::string id); + void register_input(InputStat* is); + void unregister_input(std::string id); + + void update_edi_tcp_output_stat(uint16_t listen_port, size_t num_connections); /* Load a ptree given by the management server. * @@ -205,20 +208,25 @@ class ManagementServer std::thread m_restarter_thread; /******* Statistics Data ********/ - std::map m_inputStats; + std::map m_input_stats; + + // Holds information about EDI/TCP outputs + std::map m_output_stats; /* Return a description of the configuration that will * allow to define what graphs to be created * * returns: a JSON encoded configuration */ - std::string getStatConfigJSON(); + std::string get_input_config_json(); /* Return the values for the statistics as defined in the configuration * * returns: JSON encoded statistics */ - std::string getValuesJSON(); + std::string get_input_values_json(); + + std::string get_output_values_json(); // mutex for accessing the map std::mutex m_statsmutex; -- cgit v1.2.3 From d8fab73aeb2c4401bbd88024556a7726ff2129ea Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 29 Sep 2025 14:45:31 +0200 Subject: common 53fdfd2: Fix race condition in TCPDataDispatcher get_stats --- lib/Socket.cpp | 1 + lib/Socket.h | 2 +- lib/ThreadsafeQueue.h | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) (limited to 'lib/Socket.cpp') diff --git a/lib/Socket.cpp b/lib/Socket.cpp index 5c920d7..33c9c73 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -1152,6 +1152,7 @@ void TCPDataDispatcher::process() std::vector TCPDataDispatcher::get_stats() const { std::vector s; + auto lock = unique_lock(m_mutex); for (const auto& conn : m_connections) { s.push_back(conn.get_stats()); } diff --git a/lib/Socket.h b/lib/Socket.h index 29b618a..b9a40ee 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -298,7 +298,7 @@ class TCPDataDispatcher std::thread m_listener_thread; TCPSocket m_listener_socket; - std::mutex m_mutex; + mutable std::mutex m_mutex; std::deque > m_preroll_queue; std::list m_connections; }; diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h index 13bc19e..a8d2e85 100644 --- a/lib/ThreadsafeQueue.h +++ b/lib/ThreadsafeQueue.h @@ -31,7 +31,7 @@ #include #include #include -#include +#include #include #include -- cgit v1.2.3