aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2025-03-11 16:45:11 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2025-03-11 16:45:11 +0100
commit31115439d68bacaecc095a075ae34293123662e9 (patch)
tree1e317481d6e1b5a18439ef18340bdbf4181dd6f8 /lib
parent567c8d98a283d36cdb08e7457dca05fb3fd71d72 (diff)
downloaddabmod-31115439d68bacaecc095a075ae34293123662e9.tar.gz
dabmod-31115439d68bacaecc095a075ae34293123662e9.tar.bz2
dabmod-31115439d68bacaecc095a075ae34293123662e9.zip
Update common 5959418next
Diffstat (limited to 'lib')
-rw-r--r--lib/Socket.cpp22
-rw-r--r--lib/Socket.h10
-rw-r--r--lib/ThreadsafeQueue.h38
3 files changed, 57 insertions, 13 deletions
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index 938b573..5c920d7 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -24,6 +24,7 @@
#include "Socket.h"
+#include <numeric>
#include <stdexcept>
#include <cstdio>
#include <cstring>
@@ -1063,6 +1064,17 @@ void TCPConnection::process()
#endif
}
+TCPConnection::stats_t TCPConnection::get_stats() const
+{
+ TCPConnection::stats_t s;
+ const vector<size_t> buffer_sizes = queue.map<size_t>(
+ [](const vector<uint8_t>& vec) { return vec.size(); }
+ );
+
+ s.buffer_fullness = std::accumulate(buffer_sizes.cbegin(), buffer_sizes.cend(), 0);
+ s.remote_address = m_sock.get_remote_address();
+ return s;
+}
TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll) :
m_max_queue_size(max_queue_size),
@@ -1136,6 +1148,16 @@ void TCPDataDispatcher::process()
}
}
+
+std::vector<TCPConnection::stats_t> TCPDataDispatcher::get_stats() const
+{
+ std::vector<TCPConnection::stats_t> s;
+ for (const auto& conn : m_connections) {
+ s.push_back(conn.get_stats());
+ }
+ return s;
+}
+
TCPReceiveServer::TCPReceiveServer(size_t blocksize) :
m_blocksize(blocksize)
{
diff --git a/lib/Socket.h b/lib/Socket.h
index 7709145..29b618a 100644
--- a/lib/Socket.h
+++ b/lib/Socket.h
@@ -213,6 +213,8 @@ class TCPSocket {
SOCKET get_sockfd() const { return m_sock; }
+ InetAddress get_remote_address() const { return m_remote_address; }
+
private:
explicit TCPSocket(int sockfd);
explicit TCPSocket(int sockfd, InetAddress remote_address);
@@ -254,6 +256,12 @@ class TCPConnection
ThreadsafeQueue<std::vector<uint8_t> > queue;
+ struct stats_t {
+ size_t buffer_fullness = 0;
+ InetAddress remote_address;
+ };
+ stats_t get_stats() const;
+
private:
std::atomic<bool> m_running;
std::thread m_sender_thread;
@@ -276,6 +284,8 @@ class TCPDataDispatcher
void start(int port, const std::string& address);
void write(const std::vector<uint8_t>& data);
+ std::vector<TCPConnection::stats_t> get_stats() const;
+
private:
void process();
diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h
index 8b385d6..13bc19e 100644
--- a/lib/ThreadsafeQueue.h
+++ b/lib/ThreadsafeQueue.h
@@ -2,7 +2,7 @@
Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2023
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
An implementation for a threadsafe queue, depends on C++11
@@ -28,6 +28,7 @@
#pragma once
+#include <functional>
#include <mutex>
#include <condition_variable>
#include <queue>
@@ -63,10 +64,10 @@ public:
std::unique_lock<std::mutex> lock(the_mutex);
size_t queue_size_before = the_queue.size();
if (max_size == 0) {
- the_queue.push(val);
+ the_queue.push_back(val);
}
else if (queue_size_before < max_size) {
- the_queue.push(val);
+ the_queue.push_back(val);
}
size_t queue_size = the_queue.size();
lock.unlock();
@@ -80,10 +81,10 @@ public:
std::unique_lock<std::mutex> lock(the_mutex);
size_t queue_size_before = the_queue.size();
if (max_size == 0) {
- the_queue.emplace(std::move(val));
+ the_queue.emplace_back(std::move(val));
}
else if (queue_size_before < max_size) {
- the_queue.emplace(std::move(val));
+ the_queue.emplace_back(std::move(val));
}
size_t queue_size = the_queue.size();
lock.unlock();
@@ -110,9 +111,9 @@ public:
bool overflow = false;
while (the_queue.size() >= max_size) {
overflow = true;
- the_queue.pop();
+ the_queue.pop_front();
}
- the_queue.push(val);
+ the_queue.push_back(val);
const size_t queue_size = the_queue.size();
lock.unlock();
@@ -129,9 +130,9 @@ public:
bool overflow = false;
while (the_queue.size() >= max_size) {
overflow = true;
- the_queue.pop();
+ the_queue.pop_front();
}
- the_queue.emplace(std::move(val));
+ the_queue.emplace_back(std::move(val));
const size_t queue_size = the_queue.size();
lock.unlock();
@@ -152,7 +153,7 @@ public:
while (the_queue.size() >= threshold) {
the_tx_notification.wait(lock);
}
- the_queue.push(val);
+ the_queue.push_back(val);
size_t queue_size = the_queue.size();
lock.unlock();
@@ -198,7 +199,7 @@ public:
}
popped_value = the_queue.front();
- the_queue.pop();
+ the_queue.pop_front();
lock.unlock();
the_tx_notification.notify_one();
@@ -220,15 +221,26 @@ public:
}
else {
std::swap(popped_value, the_queue.front());
- the_queue.pop();
+ the_queue.pop_front();
lock.unlock();
the_tx_notification.notify_one();
}
}
+ template<typename R>
+ std::vector<R> map(std::function<R(const T&)> func) const
+ {
+ std::vector<R> result;
+ std::unique_lock<std::mutex> lock(the_mutex);
+ for (const T& elem : the_queue) {
+ result.push_back(func(elem));
+ }
+ return result;
+ }
+
private:
- std::queue<T> the_queue;
+ std::deque<T> the_queue;
mutable std::mutex the_mutex;
std::condition_variable the_rx_notification;
std::condition_variable the_tx_notification;