diff options
| -rw-r--r-- | lib/Socket.cpp | 22 | ||||
| -rw-r--r-- | lib/Socket.h | 10 | ||||
| -rw-r--r-- | lib/ThreadsafeQueue.h | 38 | 
3 files changed, 57 insertions, 13 deletions
| diff --git a/lib/Socket.cpp b/lib/Socket.cpp index 938b573..5c920d7 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -24,6 +24,7 @@  #include "Socket.h" +#include <numeric>  #include <stdexcept>  #include <cstdio>  #include <cstring> @@ -1063,6 +1064,17 @@ void TCPConnection::process()  #endif  } +TCPConnection::stats_t TCPConnection::get_stats() const +{ +    TCPConnection::stats_t s; +    const vector<size_t> buffer_sizes = queue.map<size_t>( +            [](const vector<uint8_t>& vec) { return vec.size(); } +            ); + +    s.buffer_fullness = std::accumulate(buffer_sizes.cbegin(), buffer_sizes.cend(), 0); +    s.remote_address = m_sock.get_remote_address(); +    return s; +}  TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll) :      m_max_queue_size(max_queue_size), @@ -1136,6 +1148,16 @@ void TCPDataDispatcher::process()      }  } + +std::vector<TCPConnection::stats_t> TCPDataDispatcher::get_stats() const +{ +    std::vector<TCPConnection::stats_t> s; +    for (const auto& conn : m_connections) { +        s.push_back(conn.get_stats()); +    } +    return s; +} +  TCPReceiveServer::TCPReceiveServer(size_t blocksize) :      m_blocksize(blocksize)  { diff --git a/lib/Socket.h b/lib/Socket.h index 7709145..29b618a 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -213,6 +213,8 @@ class TCPSocket {          SOCKET get_sockfd() const { return m_sock; } +        InetAddress get_remote_address() const { return m_remote_address; } +      private:          explicit TCPSocket(int sockfd);          explicit TCPSocket(int sockfd, InetAddress remote_address); @@ -254,6 +256,12 @@ class TCPConnection          ThreadsafeQueue<std::vector<uint8_t> > queue; +        struct stats_t { +            size_t buffer_fullness = 0; +            InetAddress remote_address; +        }; +        stats_t get_stats() const; +      private:          std::atomic<bool> m_running;          std::thread m_sender_thread; @@ -276,6 +284,8 @@ class TCPDataDispatcher          void start(int port, const std::string& address);          void write(const std::vector<uint8_t>& data); +        std::vector<TCPConnection::stats_t> get_stats() const; +      private:          void process(); diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h index 8b385d6..13bc19e 100644 --- a/lib/ThreadsafeQueue.h +++ b/lib/ThreadsafeQueue.h @@ -2,7 +2,7 @@     Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in     Right of Canada (Communications Research Center Canada) -   Copyright (C) 2023 +   Copyright (C) 2025     Matthias P. Braendli, matthias.braendli@mpb.li     An implementation for a threadsafe queue, depends on C++11 @@ -28,6 +28,7 @@  #pragma once +#include <functional>  #include <mutex>  #include <condition_variable>  #include <queue> @@ -63,10 +64,10 @@ public:          std::unique_lock<std::mutex> lock(the_mutex);          size_t queue_size_before = the_queue.size();          if (max_size == 0) { -            the_queue.push(val); +            the_queue.push_back(val);          }          else if (queue_size_before < max_size) { -            the_queue.push(val); +            the_queue.push_back(val);          }          size_t queue_size = the_queue.size();          lock.unlock(); @@ -80,10 +81,10 @@ public:          std::unique_lock<std::mutex> lock(the_mutex);          size_t queue_size_before = the_queue.size();          if (max_size == 0) { -            the_queue.emplace(std::move(val)); +            the_queue.emplace_back(std::move(val));          }          else if (queue_size_before < max_size) { -            the_queue.emplace(std::move(val)); +            the_queue.emplace_back(std::move(val));          }          size_t queue_size = the_queue.size();          lock.unlock(); @@ -110,9 +111,9 @@ public:          bool overflow = false;          while (the_queue.size() >= max_size) {              overflow = true; -            the_queue.pop(); +            the_queue.pop_front();          } -        the_queue.push(val); +        the_queue.push_back(val);          const size_t queue_size = the_queue.size();          lock.unlock(); @@ -129,9 +130,9 @@ public:          bool overflow = false;          while (the_queue.size() >= max_size) {              overflow = true; -            the_queue.pop(); +            the_queue.pop_front();          } -        the_queue.emplace(std::move(val)); +        the_queue.emplace_back(std::move(val));          const size_t queue_size = the_queue.size();          lock.unlock(); @@ -152,7 +153,7 @@ public:          while (the_queue.size() >= threshold) {              the_tx_notification.wait(lock);          } -        the_queue.push(val); +        the_queue.push_back(val);          size_t queue_size = the_queue.size();          lock.unlock(); @@ -198,7 +199,7 @@ public:          }          popped_value = the_queue.front(); -        the_queue.pop(); +        the_queue.pop_front();          lock.unlock();          the_tx_notification.notify_one(); @@ -220,15 +221,26 @@ public:          }          else {              std::swap(popped_value, the_queue.front()); -            the_queue.pop(); +            the_queue.pop_front();              lock.unlock();              the_tx_notification.notify_one();          }      } +    template<typename R> +    std::vector<R> map(std::function<R(const T&)> func) const +    { +        std::vector<R> result; +        std::unique_lock<std::mutex> lock(the_mutex); +        for (const T& elem : the_queue) { +            result.push_back(func(elem)); +        } +        return result; +    } +  private: -    std::queue<T> the_queue; +    std::deque<T> the_queue;      mutable std::mutex the_mutex;      std::condition_variable the_rx_notification;      std::condition_variable the_tx_notification; | 
