aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2018-03-06 22:53:36 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2018-03-06 22:53:36 +0100
commit56dfbcd73529ee6dc42a17a139c9ceaccf1def3d (patch)
tree47580fcfa894fca04805f670ec6acf30a08a6f5d
parent5bea9241b246cfc4b9abef3d265a96d52a377c37 (diff)
downloaddabmod-56dfbcd73529ee6dc42a17a139c9ceaccf1def3d.tar.gz
dabmod-56dfbcd73529ee6dc42a17a139c9ceaccf1def3d.tar.bz2
dabmod-56dfbcd73529ee6dc42a17a139c9ceaccf1def3d.zip
Avoid copies in ThreadsafeQueue and Buffer
-rw-r--r--src/Buffer.cpp18
-rw-r--r--src/Buffer.h6
-rw-r--r--src/InputReader.h4
-rw-r--r--src/InputZeroMQReader.cpp19
-rw-r--r--src/ModPlugin.cpp27
-rw-r--r--src/ModPlugin.h6
-rw-r--r--src/ThreadsafeQueue.h43
7 files changed, 78 insertions, 45 deletions
diff --git a/src/Buffer.cpp b/src/Buffer.cpp
index e96b8d6..002c1eb 100644
--- a/src/Buffer.cpp
+++ b/src/Buffer.cpp
@@ -77,6 +77,12 @@ Buffer::~Buffer()
}
}
+void Buffer::swap(Buffer& other)
+{
+ std::swap(m_len, other.m_len);
+ std::swap(m_capacity, other.m_capacity);
+ std::swap(m_data, other.m_data);
+}
Buffer& Buffer::operator=(const Buffer& other)
{
@@ -142,6 +148,13 @@ void Buffer::setData(const void *data, size_t len)
appendData(data, len);
}
+uint8_t Buffer::operator[](size_t i) const
+{
+ if (i >= m_len) {
+ throw std::out_of_range("index out of range");
+ }
+ return reinterpret_cast<uint8_t*>(m_data)[i];
+}
void Buffer::appendData(const void *data, size_t len)
{
@@ -153,3 +166,8 @@ void Buffer::appendData(const void *data, size_t len)
}
}
+void swap(Buffer& buf1, Buffer& buf2)
+{
+ buf1.swap(buf2);
+}
+
diff --git a/src/Buffer.h b/src/Buffer.h
index e486724..d181a46 100644
--- a/src/Buffer.h
+++ b/src/Buffer.h
@@ -50,6 +50,8 @@ class Buffer {
Buffer(const std::vector<uint8_t>& vec);
~Buffer();
+ void swap(Buffer& other);
+
/* Resize the buffer, reallocate memory if needed */
void setLength(size_t len);
@@ -60,6 +62,8 @@ class Buffer {
Buffer& operator=(Buffer&& other);
Buffer& operator=(const std::vector<uint8_t>& buf);
+ uint8_t operator[](size_t i) const;
+
/* Concatenate the current data with the new data given.
* Reallocates memory if needed. */
void appendData(const void *data, size_t len);
@@ -80,3 +84,5 @@ class Buffer {
void *m_data;
};
+void swap(Buffer& buf1, Buffer& buf2);
+
diff --git a/src/InputReader.h b/src/InputReader.h
index 84f6835..4ffa2b8 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -173,12 +173,12 @@ class InputZeroMQReader : public InputReader
std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
std::string m_uri;
size_t m_max_queued_frames = 0;
- ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > m_in_messages;
+ ThreadsafeQueue<std::vector<uint8_t> > m_in_messages;
void RecvProcess(void);
zmq::context_t m_zmqcontext; // is thread-safe
- boost::thread m_recv_thread;
+ std::thread m_recv_thread;
/* We must be careful to keep frame phase consistent. If we
* drop a single ETI frame, we will break the transmission
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index 1bb325f..e1e1c6d 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -36,7 +36,6 @@
#include <cstdio>
#include <stdint.h>
#include "zmq.hpp"
-#include <boost/thread/thread.hpp>
#include "InputReader.h"
#include "PcDebug.h"
#include "Utils.h"
@@ -88,7 +87,7 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames)
m_max_queued_frames = max_queued_frames;
- m_recv_thread = boost::thread(&InputZeroMQReader::RecvProcess, this);
+ m_recv_thread = std::thread(&InputZeroMQReader::RecvProcess, this);
return 0;
}
@@ -99,7 +98,7 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
return 0;
}
- shared_ptr<vector<uint8_t> > incoming;
+ vector<uint8_t> incoming;
/* Do some prebuffering because reads will happen in bursts
* (4 ETI frames in TM1) and we should make sure that
@@ -122,11 +121,11 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
const size_t framesize = 6144;
- if (incoming->empty()) {
+ if (incoming.empty()) {
return 0;
}
- else if (incoming->size() == framesize) {
- memcpy(buffer, &incoming->front(), framesize);
+ else if (incoming.size() == framesize) {
+ memcpy(buffer, &incoming.front(), framesize);
}
else {
throw logic_error("ZMQ ETI not 6144");
@@ -184,7 +183,7 @@ void InputZeroMQReader::RecvProcess()
const int num_events = zmq::poll(items, 1, zmq_timeout_ms);
if (num_events == 0) {
// timeout is signalled by an empty buffer
- auto buf = make_shared<vector<uint8_t> >();
+ vector<uint8_t> buf;
m_in_messages.push(buf);
continue;
}
@@ -228,7 +227,7 @@ void InputZeroMQReader::RecvProcess()
throw runtime_error(ss.str());
}
else {
- auto buf = make_shared<vector<uint8_t> >(6144, 0x55);
+ vector<uint8_t> buf(6144, 0x55);
const int framesize = dab_msg->buflen[i];
@@ -236,13 +235,13 @@ void InputZeroMQReader::RecvProcess()
throw runtime_error("ZeroMQ packet too small");
}
- memcpy(&buf->front(),
+ memcpy(&buf.front(),
((uint8_t*)incoming.data()) + offset,
framesize);
offset += framesize;
- queue_size = m_in_messages.push(buf);
+ queue_size = m_in_messages.push(move(buf));
etiLog.log(trace, "ZMQ,push %zu", queue_size);
}
}
diff --git a/src/ModPlugin.cpp b/src/ModPlugin.cpp
index d567a90..f907ba8 100644
--- a/src/ModPlugin.cpp
+++ b/src/ModPlugin.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -29,6 +29,7 @@
#include "Utils.h"
#include <stdexcept>
#include <string>
+#include <cstring>
#define MODASSERT(cond) \
if (not (cond)) { \
@@ -92,16 +93,14 @@ int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut)
return 0;
}
- std::shared_ptr<Buffer> inbuffer =
- std::make_shared<Buffer>(dataIn->getLength(), dataIn->getData());
-
- m_input_queue.push(inbuffer);
+ Buffer inbuffer;
+ std::swap(inbuffer, *dataIn);
+ m_input_queue.push(std::move(inbuffer));
if (m_ready_to_output_data) {
- std::shared_ptr<Buffer> outbuffer;
+ Buffer outbuffer;
m_output_queue.wait_and_pop(outbuffer);
-
- dataOut->setData(outbuffer->getData(), outbuffer->getLength());
+ std::swap(outbuffer, *dataOut);
}
else {
dataOut->setLength(dataIn->getLength());
@@ -132,21 +131,21 @@ void PipelinedModCodec::process_thread()
set_realtime_prio(1);
while (m_running) {
- std::shared_ptr<Buffer> dataIn;
+ Buffer dataIn;
m_input_queue.wait_and_pop(dataIn);
- if (!dataIn or dataIn->getLength() == 0) {
+ if (dataIn.getLength() == 0) {
break;
}
- std::shared_ptr<Buffer> dataOut = std::make_shared<Buffer>();
- dataOut->setLength(dataIn->getLength());
+ Buffer dataOut;
+ dataOut.setLength(dataIn.getLength());
- if (internal_process(dataIn.get(), dataOut.get()) == 0) {
+ if (internal_process(&dataIn, &dataOut) == 0) {
m_running = false;
}
- m_output_queue.push(dataOut);
+ m_output_queue.push(std::move(dataOut));
}
m_running = false;
diff --git a/src/ModPlugin.h b/src/ModPlugin.h
index e9cfa21..7f03618 100644
--- a/src/ModPlugin.h
+++ b/src/ModPlugin.h
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -114,8 +114,8 @@ protected:
private:
bool m_ready_to_output_data = false;
- ThreadsafeQueue<std::shared_ptr<Buffer> > m_input_queue;
- ThreadsafeQueue<std::shared_ptr<Buffer> > m_output_queue;
+ ThreadsafeQueue<Buffer> m_input_queue;
+ ThreadsafeQueue<Buffer> m_output_queue;
std::deque<meta_vec_t> m_metadata_fifo;
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
index e27c100..6f3808c 100644
--- a/src/ThreadsafeQueue.h
+++ b/src/ThreadsafeQueue.h
@@ -2,7 +2,7 @@
Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2013, 2014
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
An implementation for a threadsafe queue using boost thread library
@@ -28,11 +28,12 @@
along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef THREADSAFE_QUEUE_H
-#define THREADSAFE_QUEUE_H
+#pragma once
-#include <boost/thread.hpp>
+#include <mutex>
+#include <condition_variable>
#include <queue>
+#include <utility>
/* This queue is meant to be used by two threads. One producer
* that pushes elements into the queue, and one consumer that
@@ -53,7 +54,7 @@ public:
*/
size_t push(T const& val)
{
- boost::mutex::scoped_lock lock(the_mutex);
+ std::unique_lock<std::mutex> lock(the_mutex);
the_queue.push(val);
size_t queue_size = the_queue.size();
lock.unlock();
@@ -63,6 +64,18 @@ public:
return queue_size;
}
+ size_t push(T&& val)
+ {
+ std::unique_lock<std::mutex> lock(the_mutex);
+ the_queue.emplace(std::move(val));
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return queue_size;
+ }
+
/* Push one element into the queue, but wait until the
* queue size goes below the threshold.
*
@@ -72,7 +85,7 @@ public:
*/
size_t push_wait_if_full(T const& val, size_t threshold)
{
- boost::mutex::scoped_lock lock(the_mutex);
+ std::unique_lock<std::mutex> lock(the_mutex);
while (the_queue.size() >= threshold) {
the_tx_notification.wait(lock);
}
@@ -93,19 +106,19 @@ public:
bool empty() const
{
- boost::mutex::scoped_lock lock(the_mutex);
+ std::unique_lock<std::mutex> lock(the_mutex);
return the_queue.empty();
}
size_t size() const
{
- boost::mutex::scoped_lock lock(the_mutex);
+ std::unique_lock<std::mutex> lock(the_mutex);
return the_queue.size();
}
bool try_pop(T& popped_value)
{
- boost::mutex::scoped_lock lock(the_mutex);
+ std::unique_lock<std::mutex> lock(the_mutex);
if (the_queue.empty()) {
return false;
}
@@ -121,12 +134,12 @@ public:
void wait_and_pop(T& popped_value, size_t prebuffering = 1)
{
- boost::mutex::scoped_lock lock(the_mutex);
+ std::unique_lock<std::mutex> lock(the_mutex);
while (the_queue.size() < prebuffering) {
the_rx_notification.wait(lock);
}
- popped_value = the_queue.front();
+ std::swap(popped_value, the_queue.front());
the_queue.pop();
lock.unlock();
@@ -135,10 +148,8 @@ public:
private:
std::queue<T> the_queue;
- mutable boost::mutex the_mutex;
- boost::condition_variable the_rx_notification;
- boost::condition_variable the_tx_notification;
+ mutable std::mutex the_mutex;
+ std::condition_variable the_rx_notification;
+ std::condition_variable the_tx_notification;
};
-#endif
-