aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog3
-rw-r--r--configure.ac4
-rw-r--r--lib/Socket.cpp22
-rw-r--r--lib/Socket.h10
-rw-r--r--lib/ThreadsafeQueue.h38
5 files changed, 61 insertions, 16 deletions
diff --git a/ChangeLog b/ChangeLog
index 075e5bb..6266e8c 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -2,7 +2,7 @@ This file contains information about the changes done to
ODR-DabMod in this repository
2025-02-26: Matthias P. Braendli <matthias@mpb.li>
- (v2.7.0):
+ (v3.0.0):
Add support for the PrecisionWave DEXTER Modulator.
Improve timestamp decoding and handling.
Fix TII carrier levels.
@@ -11,6 +11,7 @@ ODR-DabMod in this repository
Remove EasyDABv3 support.
Remove ZeroMQ input.
Require C++17.
+ v2.7.0 retracted.
2022-04-20: Matthias P. Braendli <matthias@mpb.li>
(v2.6.0):
diff --git a/configure.ac b/configure.ac
index b8f83ec..46bbaf1 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1,7 +1,7 @@
# Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012 Her Majesty the
# Queen in Right of Canada (Communications Research Center Canada)
-# Copyright (C) 2023 Matthias P. Braendli, http://opendigitalradio.org
+# Copyright (C) 2025 Matthias P. Braendli, http://opendigitalradio.org
# This file is part of ODR-DabMod.
#
@@ -19,7 +19,7 @@
# along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
AC_PREREQ([2.69])
-AC_INIT([ODR-DabMod],[2.7.0],[matthias.braendli@mpb.li])
+AC_INIT([ODR-DabMod],[3.0.0],[matthias.braendli@mpb.li])
AC_CONFIG_AUX_DIR([build-aux])
AC_CONFIG_MACRO_DIR([m4])
AC_CANONICAL_TARGET
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index 938b573..5c920d7 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -24,6 +24,7 @@
#include "Socket.h"
+#include <numeric>
#include <stdexcept>
#include <cstdio>
#include <cstring>
@@ -1063,6 +1064,17 @@ void TCPConnection::process()
#endif
}
+TCPConnection::stats_t TCPConnection::get_stats() const
+{
+ TCPConnection::stats_t s;
+ const vector<size_t> buffer_sizes = queue.map<size_t>(
+ [](const vector<uint8_t>& vec) { return vec.size(); }
+ );
+
+ s.buffer_fullness = std::accumulate(buffer_sizes.cbegin(), buffer_sizes.cend(), 0);
+ s.remote_address = m_sock.get_remote_address();
+ return s;
+}
TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll) :
m_max_queue_size(max_queue_size),
@@ -1136,6 +1148,16 @@ void TCPDataDispatcher::process()
}
}
+
+std::vector<TCPConnection::stats_t> TCPDataDispatcher::get_stats() const
+{
+ std::vector<TCPConnection::stats_t> s;
+ for (const auto& conn : m_connections) {
+ s.push_back(conn.get_stats());
+ }
+ return s;
+}
+
TCPReceiveServer::TCPReceiveServer(size_t blocksize) :
m_blocksize(blocksize)
{
diff --git a/lib/Socket.h b/lib/Socket.h
index 7709145..29b618a 100644
--- a/lib/Socket.h
+++ b/lib/Socket.h
@@ -213,6 +213,8 @@ class TCPSocket {
SOCKET get_sockfd() const { return m_sock; }
+ InetAddress get_remote_address() const { return m_remote_address; }
+
private:
explicit TCPSocket(int sockfd);
explicit TCPSocket(int sockfd, InetAddress remote_address);
@@ -254,6 +256,12 @@ class TCPConnection
ThreadsafeQueue<std::vector<uint8_t> > queue;
+ struct stats_t {
+ size_t buffer_fullness = 0;
+ InetAddress remote_address;
+ };
+ stats_t get_stats() const;
+
private:
std::atomic<bool> m_running;
std::thread m_sender_thread;
@@ -276,6 +284,8 @@ class TCPDataDispatcher
void start(int port, const std::string& address);
void write(const std::vector<uint8_t>& data);
+ std::vector<TCPConnection::stats_t> get_stats() const;
+
private:
void process();
diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h
index 8b385d6..13bc19e 100644
--- a/lib/ThreadsafeQueue.h
+++ b/lib/ThreadsafeQueue.h
@@ -2,7 +2,7 @@
Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2023
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
An implementation for a threadsafe queue, depends on C++11
@@ -28,6 +28,7 @@
#pragma once
+#include <functional>
#include <mutex>
#include <condition_variable>
#include <queue>
@@ -63,10 +64,10 @@ public:
std::unique_lock<std::mutex> lock(the_mutex);
size_t queue_size_before = the_queue.size();
if (max_size == 0) {
- the_queue.push(val);
+ the_queue.push_back(val);
}
else if (queue_size_before < max_size) {
- the_queue.push(val);
+ the_queue.push_back(val);
}
size_t queue_size = the_queue.size();
lock.unlock();
@@ -80,10 +81,10 @@ public:
std::unique_lock<std::mutex> lock(the_mutex);
size_t queue_size_before = the_queue.size();
if (max_size == 0) {
- the_queue.emplace(std::move(val));
+ the_queue.emplace_back(std::move(val));
}
else if (queue_size_before < max_size) {
- the_queue.emplace(std::move(val));
+ the_queue.emplace_back(std::move(val));
}
size_t queue_size = the_queue.size();
lock.unlock();
@@ -110,9 +111,9 @@ public:
bool overflow = false;
while (the_queue.size() >= max_size) {
overflow = true;
- the_queue.pop();
+ the_queue.pop_front();
}
- the_queue.push(val);
+ the_queue.push_back(val);
const size_t queue_size = the_queue.size();
lock.unlock();
@@ -129,9 +130,9 @@ public:
bool overflow = false;
while (the_queue.size() >= max_size) {
overflow = true;
- the_queue.pop();
+ the_queue.pop_front();
}
- the_queue.emplace(std::move(val));
+ the_queue.emplace_back(std::move(val));
const size_t queue_size = the_queue.size();
lock.unlock();
@@ -152,7 +153,7 @@ public:
while (the_queue.size() >= threshold) {
the_tx_notification.wait(lock);
}
- the_queue.push(val);
+ the_queue.push_back(val);
size_t queue_size = the_queue.size();
lock.unlock();
@@ -198,7 +199,7 @@ public:
}
popped_value = the_queue.front();
- the_queue.pop();
+ the_queue.pop_front();
lock.unlock();
the_tx_notification.notify_one();
@@ -220,15 +221,26 @@ public:
}
else {
std::swap(popped_value, the_queue.front());
- the_queue.pop();
+ the_queue.pop_front();
lock.unlock();
the_tx_notification.notify_one();
}
}
+ template<typename R>
+ std::vector<R> map(std::function<R(const T&)> func) const
+ {
+ std::vector<R> result;
+ std::unique_lock<std::mutex> lock(the_mutex);
+ for (const T& elem : the_queue) {
+ result.push_back(func(elem));
+ }
+ return result;
+ }
+
private:
- std::queue<T> the_queue;
+ std::deque<T> the_queue;
mutable std::mutex the_mutex;
std::condition_variable the_rx_notification;
std::condition_variable the_tx_notification;