aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2023-08-12 15:19:33 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2023-08-12 15:19:33 +0200
commit382bfbb308985ef05362810624c0c1f312586ba9 (patch)
tree07e3aecc3802fdd0052dba120402a54a482305b9
parent6dc54e8642dfd48096cf52c585c14f7ebe098486 (diff)
downloaddabmux-382bfbb308985ef05362810624c0c1f312586ba9.tar.gz
dabmux-382bfbb308985ef05362810624c0c1f312586ba9.tar.bz2
dabmux-382bfbb308985ef05362810624c0c1f312586ba9.zip
Reconcile with Common 2a455ba
-rw-r--r--lib/Socket.cpp2
-rw-r--r--lib/ThreadsafeQueue.h54
2 files changed, 51 insertions, 5 deletions
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index 10ec1ca..b71c01e 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -893,7 +893,7 @@ ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms)
return 0;
}
- return 0;
+ throw std::logic_error("unreachable");
}
void TCPClient::reconnect()
diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h
index 815dfe0..8b385d6 100644
--- a/lib/ThreadsafeQueue.h
+++ b/lib/ThreadsafeQueue.h
@@ -2,7 +2,7 @@
Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2018
+ Copyright (C) 2023
Matthias P. Braendli, matthias.braendli@mpb.li
An implementation for a threadsafe queue, depends on C++11
@@ -32,6 +32,7 @@
#include <condition_variable>
#include <queue>
#include <utility>
+#include <cassert>
/* This queue is meant to be used by two threads. One producer
* that pushes elements into the queue, and one consumer that
@@ -69,7 +70,6 @@ public:
}
size_t queue_size = the_queue.size();
lock.unlock();
-
the_rx_notification.notify_one();
return queue_size;
@@ -93,11 +93,57 @@ public:
return queue_size;
}
+ struct push_overflow_result { bool overflowed; size_t new_size; };
+
+ /* Push one element into the queue, and if queue is
+ * full remove one element from the other end.
+ *
+ * max_size == 0 is not allowed.
+ *
+ * returns the new queue size and a flag if overflow occurred.
+ */
+ push_overflow_result push_overflow(T const& val, size_t max_size)
+ {
+ assert(max_size > 0);
+ std::unique_lock<std::mutex> lock(the_mutex);
+
+ bool overflow = false;
+ while (the_queue.size() >= max_size) {
+ overflow = true;
+ the_queue.pop();
+ }
+ the_queue.push(val);
+ const size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return {overflow, queue_size};
+ }
+
+ push_overflow_result push_overflow(T&& val, size_t max_size)
+ {
+ assert(max_size > 0);
+ std::unique_lock<std::mutex> lock(the_mutex);
+
+ bool overflow = false;
+ while (the_queue.size() >= max_size) {
+ overflow = true;
+ the_queue.pop();
+ }
+ the_queue.emplace(std::move(val));
+ const size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return {overflow, queue_size};
+ }
+
+
/* Push one element into the queue, but wait until the
* queue size goes below the threshold.
*
- * Notify waiting thread.
- *
* returns the new queue size.
*/
size_t push_wait_if_full(T const& val, size_t threshold)