summaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-06-07 10:14:51 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-06-07 10:14:51 +0200
commit8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213 (patch)
treec0296c305409bb7fc373625aea05c2f57054eb5c /lib
parent43f4a3a2a695c303bd4fdfbd7fec6def29284f2e (diff)
downloaddabmux-8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213.tar.gz
dabmux-8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213.tar.bz2
dabmux-8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213.zip
Work on STI-D/EDI input
Diffstat (limited to 'lib')
-rw-r--r--lib/ReedSolomon.cpp116
-rw-r--r--lib/ReedSolomon.h56
-rw-r--r--lib/Socket.cpp165
-rw-r--r--lib/Socket.h32
-rw-r--r--lib/ThreadsafeQueue.h176
5 files changed, 504 insertions, 41 deletions
diff --git a/lib/ReedSolomon.cpp b/lib/ReedSolomon.cpp
new file mode 100644
index 0000000..38d8ea8
--- /dev/null
+++ b/lib/ReedSolomon.cpp
@@ -0,0 +1,116 @@
+/*
+ Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right
+ of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2016
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "ReedSolomon.h"
+#include <vector>
+#include <algorithm>
+#include <stdexcept>
+#include <sstream>
+#include <stdio.h> // For galois.h ...
+#include <string.h> // For memcpy
+
+extern "C" {
+#include "fec/fec.h"
+}
+#include <assert.h>
+
+#define SYMSIZE 8
+
+
+ReedSolomon::ReedSolomon(int N, int K, bool reverse, int gfpoly, int firstRoot, int primElem)
+{
+ setReverse(reverse);
+
+ m_N = N;
+ m_K = K;
+
+ const int symsize = SYMSIZE;
+ const int nroots = N - K; // For EDI PFT, this must be 48
+ const int pad = ((1 << symsize) - 1) - N; // is 255-N
+
+ rsData = init_rs_char(symsize, gfpoly, firstRoot, primElem, nroots, pad);
+
+ if (rsData == nullptr) {
+ std::stringstream ss;
+ ss << "Invalid Reed-Solomon parameters! " <<
+ "N=" << N << " ; K=" << K << " ; pad=" << pad;
+ throw std::invalid_argument(ss.str());
+ }
+}
+
+
+ReedSolomon::~ReedSolomon()
+{
+ free_rs_char(rsData);
+}
+
+
+void ReedSolomon::setReverse(bool state)
+{
+ reverse = state;
+}
+
+
+int ReedSolomon::encode(void* data, void* fec, size_t size)
+{
+ uint8_t* input = reinterpret_cast<uint8_t*>(data);
+ uint8_t* output = reinterpret_cast<uint8_t*>(fec);
+ int ret = 0;
+
+ if (reverse) {
+ std::vector<uint8_t> buffer(m_N);
+
+ memcpy(&buffer[0], input, m_K);
+ memcpy(&buffer[m_K], output, m_N - m_K);
+
+ ret = decode_rs_char(rsData, &buffer[0], nullptr, 0);
+ if ((ret != 0) && (ret != -1)) {
+ memcpy(input, &buffer[0], m_K);
+ memcpy(output, &buffer[m_K], m_N - m_K);
+ }
+ }
+ else {
+ encode_rs_char(rsData, input, output);
+ }
+
+ return ret;
+}
+
+
+int ReedSolomon::encode(void* data, size_t size)
+{
+ uint8_t* input = reinterpret_cast<uint8_t*>(data);
+ int ret = 0;
+
+ if (reverse) {
+ ret = decode_rs_char(rsData, input, nullptr, 0);
+ }
+ else {
+ encode_rs_char(rsData, input, &input[m_K]);
+ }
+
+ return ret;
+}
diff --git a/lib/ReedSolomon.h b/lib/ReedSolomon.h
new file mode 100644
index 0000000..abcef62
--- /dev/null
+++ b/lib/ReedSolomon.h
@@ -0,0 +1,56 @@
+/*
+ Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right
+ of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2016
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#ifdef HAVE_CONFIG_H
+# include <config.h>
+#endif
+
+#include <stdlib.h>
+
+class ReedSolomon
+{
+public:
+ ReedSolomon(int N, int K,
+ bool reverse = false,
+ int gfpoly = 0x11d, int firstRoot = 0, int primElem = 1);
+ ReedSolomon(const ReedSolomon& other) = delete;
+ ReedSolomon operator=(const ReedSolomon& other) = delete;
+ ~ReedSolomon();
+
+ void setReverse(bool state);
+ int encode(void* data, void* fec, size_t size);
+ int encode(void* data, size_t size);
+
+private:
+ int m_N;
+ int m_K;
+
+ void* rsData;
+ bool reverse;
+};
+
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index fe3df44..9b404eb 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -42,13 +42,10 @@ void InetAddress::resolveUdpDestination(const std::string& destination, int port
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
- hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
+ hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
hints.ai_flags = 0;
hints.ai_protocol = 0;
- hints.ai_canonname = nullptr;
- hints.ai_addr = nullptr;
- hints.ai_next = nullptr;
struct addrinfo *result, *rp;
int s = getaddrinfo(destination.c_str(), service, &hints, &result);
@@ -77,19 +74,19 @@ UDPPacket::UDPPacket(size_t initSize) :
UDPSocket::UDPSocket() :
- listenSocket(INVALID_SOCKET)
+ m_sock(INVALID_SOCKET)
{
reinit(0, "");
}
UDPSocket::UDPSocket(int port) :
- listenSocket(INVALID_SOCKET)
+ m_sock(INVALID_SOCKET)
{
reinit(port, "");
}
UDPSocket::UDPSocket(int port, const std::string& name) :
- listenSocket(INVALID_SOCKET)
+ m_sock(INVALID_SOCKET)
{
reinit(port, name);
}
@@ -97,16 +94,28 @@ UDPSocket::UDPSocket(int port, const std::string& name) :
void UDPSocket::setBlocking(bool block)
{
- int res = fcntl(listenSocket, F_SETFL, block ? 0 : O_NONBLOCK);
+ int res = fcntl(m_sock, F_SETFL, block ? 0 : O_NONBLOCK);
if (res == -1) {
throw runtime_error(string("Can't change blocking state of socket: ") + strerror(errno));
}
}
+void UDPSocket::reinit(int port)
+{
+ return reinit(port, "");
+}
+
void UDPSocket::reinit(int port, const std::string& name)
{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
+ }
+
+ if (port == 0) {
+ // No need to bind to a given port, creating the
+ // socket is enough
+ m_sock = ::socket(AF_INET, SOCK_DGRAM, 0);
+ return;
}
char service[NI_MAXSERV];
@@ -114,7 +123,7 @@ void UDPSocket::reinit(int port, const std::string& name)
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
- hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
+ hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
hints.ai_protocol = 0; /* Any protocol */
@@ -141,7 +150,7 @@ void UDPSocket::reinit(int port, const std::string& name)
}
if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
- listenSocket = sfd;
+ m_sock = sfd;
break;
}
@@ -157,17 +166,17 @@ void UDPSocket::reinit(int port, const std::string& name)
void UDPSocket::close()
{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
}
- listenSocket = INVALID_SOCKET;
+ m_sock = INVALID_SOCKET;
}
UDPSocket::~UDPSocket()
{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
}
}
@@ -177,7 +186,7 @@ UDPPacket UDPSocket::receive(size_t max_size)
UDPPacket packet(max_size);
socklen_t addrSize;
addrSize = sizeof(*packet.address.as_sockaddr());
- ssize_t ret = recvfrom(listenSocket,
+ ssize_t ret = recvfrom(m_sock,
packet.buffer.data(),
packet.buffer.size(),
0,
@@ -186,7 +195,13 @@ UDPPacket UDPSocket::receive(size_t max_size)
if (ret == SOCKET_ERROR) {
packet.buffer.resize(0);
+
+ // This suppresses the -Wlogical-op warning
+#if EAGAIN == EWOULDBLOCK
if (errno == EAGAIN) {
+#else
+ if (errno == EAGAIN or errno == EWOULDBLOCK) {
+#endif
return 0;
}
throw runtime_error(string("Can't receive data: ") + strerror(errno));
@@ -198,24 +213,24 @@ UDPPacket UDPSocket::receive(size_t max_size)
void UDPSocket::send(UDPPacket& packet)
{
- int ret = sendto(listenSocket, packet.buffer.data(), packet.buffer.size(), 0,
+ const int ret = sendto(m_sock, packet.buffer.data(), packet.buffer.size(), 0,
packet.address.as_sockaddr(), sizeof(*packet.address.as_sockaddr()));
if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
- throw runtime_error(string("Can't send UDP packet") + strerror(errno));
+ throw runtime_error(string("Can't send UDP packet: ") + strerror(errno));
}
}
void UDPSocket::send(const std::vector<uint8_t>& data, InetAddress destination)
{
- int ret = sendto(listenSocket, &data[0], data.size(), 0,
+ const int ret = sendto(m_sock, data.data(), data.size(), 0,
destination.as_sockaddr(), sizeof(*destination.as_sockaddr()));
if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
- throw runtime_error(string("Can't send UDP packet") + strerror(errno));
+ throw runtime_error(string("Can't send UDP packet: ") + strerror(errno));
}
}
-void UDPSocket::joinGroup(char* groupname)
+void UDPSocket::joinGroup(const char* groupname, const char* if_addr)
{
ip_mreqn group;
if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) {
@@ -224,9 +239,15 @@ void UDPSocket::joinGroup(char* groupname)
if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) {
throw runtime_error("Group name is not a multicast address");
}
- group.imr_address.s_addr = htons(INADDR_ANY);;
+
+ if (if_addr) {
+ group.imr_address.s_addr = inet_addr(if_addr);
+ }
+ else {
+ group.imr_address.s_addr = htons(INADDR_ANY);
+ }
group.imr_ifindex = 0;
- if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
+ if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
== SOCKET_ERROR) {
throw runtime_error(string("Can't join multicast group") + strerror(errno));
}
@@ -239,7 +260,7 @@ void UDPSocket::setMulticastSource(const char* source_addr)
throw runtime_error(string("Can't parse source address") + strerror(errno));
}
- if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))
+ if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))
== SOCKET_ERROR) {
throw runtime_error(string("Can't set source address") + strerror(errno));
}
@@ -247,26 +268,82 @@ void UDPSocket::setMulticastSource(const char* source_addr)
void UDPSocket::setMulticastTTL(int ttl)
{
- if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl))
+ if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl))
== SOCKET_ERROR) {
throw runtime_error(string("Can't set multicast ttl") + strerror(errno));
}
}
+UDPReceiver::~UDPReceiver() {
+ m_stop = true;
+ m_sock.close();
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
+void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) {
+ m_port = port;
+ m_bindto = bindto;
+ m_mcastaddr = mcastaddr;
+ m_max_packets_queued = max_packets_queued;
+ m_thread = std::thread(&UDPReceiver::m_run, this);
+}
-TCPSocket::TCPSocket()
+std::vector<uint8_t> UDPReceiver::get_packet_buffer()
{
- if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
- throw std::runtime_error("Can't create TCP socket");
+ if (m_stop) {
+ throw runtime_error("UDP Receiver not running");
}
-#if defined(HAVE_SO_NOSIGPIPE)
- int val = 1;
- if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE,
- &val, sizeof(val)) < 0) {
- throw std::runtime_error("Can't set SO_NOSIGPIPE");
+ UDPPacket p;
+ m_packets.wait_and_pop(p);
+
+ return p.buffer;
+}
+
+void UDPReceiver::m_run()
+{
+ // Ensure that stop is set to true in case of exception or return
+ struct SetStopOnDestruct {
+ SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {}
+ ~SetStopOnDestruct() { m_stop = true; }
+ private: atomic<bool>& m_stop;
+ } autoSetStop(m_stop);
+
+ if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) {
+ m_sock.reinit(m_port, m_mcastaddr);
+ m_sock.setMulticastSource(m_bindto.c_str());
+ m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str());
}
-#endif
+ else {
+ m_sock.reinit(m_port, m_bindto);
+ }
+
+ while (not m_stop) {
+ constexpr size_t packsize = 8192;
+ try {
+ auto packet = m_sock.receive(packsize);
+ if (packet.buffer.size() == packsize) {
+ // TODO replace fprintf
+ fprintf(stderr, "Warning, possible UDP truncation\n");
+ }
+
+ // If this blocks, the UDP socket will lose incoming packets
+ m_packets.push_wait_if_full(packet, m_max_packets_queued);
+ }
+ catch (const std::runtime_error& e) {
+ // TODO replace fprintf
+ // TODO handle intr
+ fprintf(stderr, "Socket error: %s\n", e.what());
+ m_stop = true;
+ }
+ }
+}
+
+
+TCPSocket::TCPSocket()
+{
}
TCPSocket::~TCPSocket()
@@ -314,7 +391,7 @@ void TCPSocket::connect(const std::string& hostname, int port)
/* Obtain address(es) matching host/port */
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
- hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
+ hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = 0;
hints.ai_protocol = 0;
@@ -376,7 +453,7 @@ void TCPSocket::listen(int port, const string& name)
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
- hints.ai_family = AF_UNSPEC;
+ hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
hints.ai_protocol = 0;
@@ -410,6 +487,14 @@ void TCPSocket::listen(int port, const string& name)
freeaddrinfo(result);
+#if defined(HAVE_SO_NOSIGPIPE)
+ int val = 1;
+ if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE,
+ &val, sizeof(val)) < 0) {
+ throw std::runtime_error("Can't set SO_NOSIGPIPE");
+ }
+#endif
+
if (rp == nullptr) {
throw runtime_error("Could not bind");
}
@@ -683,7 +768,9 @@ TCPDataDispatcher::~TCPDataDispatcher()
m_running = false;
m_connections.clear();
m_listener_socket.close();
- m_listener_thread.join();
+ if (m_listener_thread.joinable()) {
+ m_listener_thread.join();
+ }
}
void TCPDataDispatcher::start(int port, const string& address)
diff --git a/lib/Socket.h b/lib/Socket.h
index 82ff5ad..2393584 100644
--- a/lib/Socket.h
+++ b/lib/Socket.h
@@ -104,13 +104,14 @@ class UDPSocket
const UDPSocket& operator=(const UDPSocket& other) = delete;
/** Close the already open socket, and create a new one. Throws a runtime_error on error. */
+ void reinit(int port);
void reinit(int port, const std::string& name);
void close(void);
void send(UDPPacket& packet);
void send(const std::vector<uint8_t>& data, InetAddress destination);
UDPPacket receive(size_t max_size);
- void joinGroup(char* groupname);
+ void joinGroup(const char* groupname, const char* if_addr = nullptr);
void setMulticastSource(const char* source_addr);
void setMulticastTTL(int ttl);
@@ -120,9 +121,36 @@ class UDPSocket
void setBlocking(bool block);
protected:
- SOCKET listenSocket;
+ SOCKET m_sock;
};
+/* Threaded UDP receiver */
+class UDPReceiver {
+ public:
+ UDPReceiver() : m_port(0), m_thread(), m_stop(false), m_packets() {}
+ ~UDPReceiver();
+ UDPReceiver(const UDPReceiver&) = delete;
+ UDPReceiver operator=(const UDPReceiver&) = delete;
+
+ // Start the receiver in a separate thread
+ void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued);
+
+ // Get the data contained in a UDP packet, blocks if none available
+ // In case of error, throws a runtime_error
+ std::vector<uint8_t> get_packet_buffer(void);
+
+ private:
+ void m_run(void);
+
+ int m_port;
+ std::string m_bindto;
+ std::string m_mcastaddr;
+ size_t m_max_packets_queued;
+ std::thread m_thread;
+ std::atomic<bool> m_stop;
+ ThreadsafeQueue<UDPPacket> m_packets;
+ UDPSocket m_sock;
+};
class TCPSocket {
public:
diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h
new file mode 100644
index 0000000..62f4c96
--- /dev/null
+++ b/lib/ThreadsafeQueue.h
@@ -0,0 +1,176 @@
+/*
+ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
+ Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ An implementation for a threadsafe queue, depends on C++11
+
+ When creating a ThreadsafeQueue, one can specify the minimal number
+ of elements it must contain before it is possible to take one
+ element out.
+ */
+/*
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <mutex>
+#include <condition_variable>
+#include <queue>
+#include <utility>
+
+/* This queue is meant to be used by two threads. One producer
+ * that pushes elements into the queue, and one consumer that
+ * retrieves the elements.
+ *
+ * The queue can make the consumer block until an element
+ * is available, or a wakeup requested.
+ */
+
+/* Class thrown by blocking pop to tell the consumer
+ * that there's a wakeup requested. */
+class ThreadsafeQueueWakeup {};
+
+template<typename T>
+class ThreadsafeQueue
+{
+public:
+ /* Push one element into the queue, and notify another thread that
+ * might be waiting.
+ *
+ * returns the new queue size.
+ */
+ size_t push(T const& val)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ the_queue.push(val);
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ size_t push(T&& val)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ the_queue.emplace(std::move(val));
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ /* Push one element into the queue, but wait until the
+ * queue size goes below the threshold.
+ *
+ * Notify waiting thread.
+ *
+ * returns the new queue size.
+ */
+ size_t push_wait_if_full(T const& val, size_t threshold)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ while (the_queue.size() >= threshold) {
+ the_tx_notification.wait(lock);
+ }
+ the_queue.push(val);
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
+ /* Trigger a wakeup event on a blocking consumer, which
+ * will receive a ThreadsafeQueueWakeup exception.
+ */
+ void trigger_wakeup(void)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ wakeup_requested = true;
+ lock.unlock();
+ the_rx_notification.notify_one();
+ }
+
+ /* Send a notification for the receiver thread */
+ void notify(void)
+ {
+ the_rx_notification.notify_one();
+ }
+
+ bool empty() const
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ return the_queue.empty();
+ }
+
+ size_t size() const
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ return the_queue.size();
+ }
+
+ bool try_pop(T& popped_value)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ if (the_queue.empty()) {
+ return false;
+ }
+
+ popped_value = the_queue.front();
+ the_queue.pop();
+
+ lock.unlock();
+ the_tx_notification.notify_one();
+
+ return true;
+ }
+
+ void wait_and_pop(T& popped_value, size_t prebuffering = 1)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ while (the_queue.size() < prebuffering and
+ not wakeup_requested) {
+ the_rx_notification.wait(lock);
+ }
+
+ if (wakeup_requested) {
+ wakeup_requested = false;
+ throw ThreadsafeQueueWakeup();
+ }
+ else {
+ std::swap(popped_value, the_queue.front());
+ the_queue.pop();
+
+ lock.unlock();
+ the_tx_notification.notify_one();
+ }
+ }
+
+private:
+ std::queue<T> the_queue;
+ mutable std::mutex the_mutex;
+ std::condition_variable the_rx_notification;
+ std::condition_variable the_tx_notification;
+ bool wakeup_requested = false;
+};
+