From 20be04f39e6998d9f9346e86c4d12f0bb4aa9eb2 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 11 Mar 2025 16:47:14 +0100 Subject: Update common 5959418 --- contrib/Socket.cpp | 22 ++++++++++++++++++++++ contrib/Socket.h | 10 ++++++++++ contrib/ThreadsafeQueue.h | 38 +++++++++++++++++++++++++------------- contrib/edioutput/Transport.cpp | 16 +++++++++++++++- contrib/edioutput/Transport.h | 11 +++++++++-- 5 files changed, 81 insertions(+), 16 deletions(-) (limited to 'contrib') diff --git a/contrib/Socket.cpp b/contrib/Socket.cpp index 938b573..5c920d7 100644 --- a/contrib/Socket.cpp +++ b/contrib/Socket.cpp @@ -24,6 +24,7 @@ #include "Socket.h" +#include #include #include #include @@ -1063,6 +1064,17 @@ void TCPConnection::process() #endif } +TCPConnection::stats_t TCPConnection::get_stats() const +{ + TCPConnection::stats_t s; + const vector buffer_sizes = queue.map( + [](const vector& vec) { return vec.size(); } + ); + + s.buffer_fullness = std::accumulate(buffer_sizes.cbegin(), buffer_sizes.cend(), 0); + s.remote_address = m_sock.get_remote_address(); + return s; +} TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll) : m_max_queue_size(max_queue_size), @@ -1136,6 +1148,16 @@ void TCPDataDispatcher::process() } } + +std::vector TCPDataDispatcher::get_stats() const +{ + std::vector s; + for (const auto& conn : m_connections) { + s.push_back(conn.get_stats()); + } + return s; +} + TCPReceiveServer::TCPReceiveServer(size_t blocksize) : m_blocksize(blocksize) { diff --git a/contrib/Socket.h b/contrib/Socket.h index 7709145..29b618a 100644 --- a/contrib/Socket.h +++ b/contrib/Socket.h @@ -213,6 +213,8 @@ class TCPSocket { SOCKET get_sockfd() const { return m_sock; } + InetAddress get_remote_address() const { return m_remote_address; } + private: explicit TCPSocket(int sockfd); explicit TCPSocket(int sockfd, InetAddress remote_address); @@ -254,6 +256,12 @@ class TCPConnection ThreadsafeQueue > queue; + struct stats_t { + size_t buffer_fullness = 0; + InetAddress remote_address; + }; + stats_t get_stats() const; + private: std::atomic m_running; std::thread m_sender_thread; @@ -276,6 +284,8 @@ class TCPDataDispatcher void start(int port, const std::string& address); void write(const std::vector& data); + std::vector get_stats() const; + private: void process(); diff --git a/contrib/ThreadsafeQueue.h b/contrib/ThreadsafeQueue.h index 8b385d6..13bc19e 100644 --- a/contrib/ThreadsafeQueue.h +++ b/contrib/ThreadsafeQueue.h @@ -2,7 +2,7 @@ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2023 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li An implementation for a threadsafe queue, depends on C++11 @@ -28,6 +28,7 @@ #pragma once +#include #include #include #include @@ -63,10 +64,10 @@ public: std::unique_lock lock(the_mutex); size_t queue_size_before = the_queue.size(); if (max_size == 0) { - the_queue.push(val); + the_queue.push_back(val); } else if (queue_size_before < max_size) { - the_queue.push(val); + the_queue.push_back(val); } size_t queue_size = the_queue.size(); lock.unlock(); @@ -80,10 +81,10 @@ public: std::unique_lock lock(the_mutex); size_t queue_size_before = the_queue.size(); if (max_size == 0) { - the_queue.emplace(std::move(val)); + the_queue.emplace_back(std::move(val)); } else if (queue_size_before < max_size) { - the_queue.emplace(std::move(val)); + the_queue.emplace_back(std::move(val)); } size_t queue_size = the_queue.size(); lock.unlock(); @@ -110,9 +111,9 @@ public: bool overflow = false; while (the_queue.size() >= max_size) { overflow = true; - the_queue.pop(); + the_queue.pop_front(); } - the_queue.push(val); + the_queue.push_back(val); const size_t queue_size = the_queue.size(); lock.unlock(); @@ -129,9 +130,9 @@ public: bool overflow = false; while (the_queue.size() >= max_size) { overflow = true; - the_queue.pop(); + the_queue.pop_front(); } - the_queue.emplace(std::move(val)); + the_queue.emplace_back(std::move(val)); const size_t queue_size = the_queue.size(); lock.unlock(); @@ -152,7 +153,7 @@ public: while (the_queue.size() >= threshold) { the_tx_notification.wait(lock); } - the_queue.push(val); + the_queue.push_back(val); size_t queue_size = the_queue.size(); lock.unlock(); @@ -198,7 +199,7 @@ public: } popped_value = the_queue.front(); - the_queue.pop(); + the_queue.pop_front(); lock.unlock(); the_tx_notification.notify_one(); @@ -220,15 +221,26 @@ public: } else { std::swap(popped_value, the_queue.front()); - the_queue.pop(); + the_queue.pop_front(); lock.unlock(); the_tx_notification.notify_one(); } } + template + std::vector map(std::function func) const + { + std::vector result; + std::unique_lock lock(the_mutex); + for (const T& elem : the_queue) { + result.push_back(func(elem)); + } + return result; + } + private: - std::queue the_queue; + std::deque the_queue; mutable std::mutex the_mutex; std::condition_variable the_rx_notification; std::condition_variable the_tx_notification; diff --git a/contrib/edioutput/Transport.cpp b/contrib/edioutput/Transport.cpp index 4979e93..a5e0bc3 100644 --- a/contrib/edioutput/Transport.cpp +++ b/contrib/edioutput/Transport.cpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2022 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -220,6 +220,20 @@ void Sender::override_pft_sequence(uint16_t pseq) edi_pft.OverridePSeq(pseq); } +std::vector Sender::get_tcp_server_stats() const +{ + std::vector stats; + + for (auto& el : tcp_dispatchers) { + Sender::stats_t s; + s.listen_port = el.first->listen_port; + s.stats = el.second->get_stats(); + stats.push_back(s); + } + + return stats; +} + void Sender::run() { while (m_running) { diff --git a/contrib/edioutput/Transport.h b/contrib/edioutput/Transport.h index c62545c..2ca638e 100644 --- a/contrib/edioutput/Transport.h +++ b/contrib/edioutput/Transport.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2022 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -38,10 +38,11 @@ #include #include #include +#include namespace edi { -/** STI sender for EDI output */ +/** ETI/STI sender for EDI output */ class Sender { public: @@ -64,6 +65,12 @@ class Sender { void override_af_sequence(uint16_t seq); void override_pft_sequence(uint16_t pseq); + struct stats_t { + uint16_t listen_port; + std::vector stats; + }; + std::vector get_tcp_server_stats() const; + private: void run(); -- cgit v1.2.3