diff options
-rw-r--r-- | src/Buffer.cpp | 18 | ||||
-rw-r--r-- | src/Buffer.h | 6 | ||||
-rw-r--r-- | src/InputReader.h | 4 | ||||
-rw-r--r-- | src/InputZeroMQReader.cpp | 19 | ||||
-rw-r--r-- | src/ModPlugin.cpp | 27 | ||||
-rw-r--r-- | src/ModPlugin.h | 6 | ||||
-rw-r--r-- | src/ThreadsafeQueue.h | 43 |
7 files changed, 78 insertions, 45 deletions
diff --git a/src/Buffer.cpp b/src/Buffer.cpp index e96b8d6..002c1eb 100644 --- a/src/Buffer.cpp +++ b/src/Buffer.cpp @@ -77,6 +77,12 @@ Buffer::~Buffer() } } +void Buffer::swap(Buffer& other) +{ + std::swap(m_len, other.m_len); + std::swap(m_capacity, other.m_capacity); + std::swap(m_data, other.m_data); +} Buffer& Buffer::operator=(const Buffer& other) { @@ -142,6 +148,13 @@ void Buffer::setData(const void *data, size_t len) appendData(data, len); } +uint8_t Buffer::operator[](size_t i) const +{ + if (i >= m_len) { + throw std::out_of_range("index out of range"); + } + return reinterpret_cast<uint8_t*>(m_data)[i]; +} void Buffer::appendData(const void *data, size_t len) { @@ -153,3 +166,8 @@ void Buffer::appendData(const void *data, size_t len) } } +void swap(Buffer& buf1, Buffer& buf2) +{ + buf1.swap(buf2); +} + diff --git a/src/Buffer.h b/src/Buffer.h index e486724..d181a46 100644 --- a/src/Buffer.h +++ b/src/Buffer.h @@ -50,6 +50,8 @@ class Buffer { Buffer(const std::vector<uint8_t>& vec); ~Buffer(); + void swap(Buffer& other); + /* Resize the buffer, reallocate memory if needed */ void setLength(size_t len); @@ -60,6 +62,8 @@ class Buffer { Buffer& operator=(Buffer&& other); Buffer& operator=(const std::vector<uint8_t>& buf); + uint8_t operator[](size_t i) const; + /* Concatenate the current data with the new data given. * Reallocates memory if needed. */ void appendData(const void *data, size_t len); @@ -80,3 +84,5 @@ class Buffer { void *m_data; }; +void swap(Buffer& buf1, Buffer& buf2); + diff --git a/src/InputReader.h b/src/InputReader.h index 84f6835..4ffa2b8 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -173,12 +173,12 @@ class InputZeroMQReader : public InputReader std::atomic<bool> m_running = ATOMIC_VAR_INIT(false); std::string m_uri; size_t m_max_queued_frames = 0; - ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > m_in_messages; + ThreadsafeQueue<std::vector<uint8_t> > m_in_messages; void RecvProcess(void); zmq::context_t m_zmqcontext; // is thread-safe - boost::thread m_recv_thread; + std::thread m_recv_thread; /* We must be careful to keep frame phase consistent. If we * drop a single ETI frame, we will break the transmission diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 1bb325f..e1e1c6d 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -36,7 +36,6 @@ #include <cstdio> #include <stdint.h> #include "zmq.hpp" -#include <boost/thread/thread.hpp> #include "InputReader.h" #include "PcDebug.h" #include "Utils.h" @@ -88,7 +87,7 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames) m_max_queued_frames = max_queued_frames; - m_recv_thread = boost::thread(&InputZeroMQReader::RecvProcess, this); + m_recv_thread = std::thread(&InputZeroMQReader::RecvProcess, this); return 0; } @@ -99,7 +98,7 @@ int InputZeroMQReader::GetNextFrame(void* buffer) return 0; } - shared_ptr<vector<uint8_t> > incoming; + vector<uint8_t> incoming; /* Do some prebuffering because reads will happen in bursts * (4 ETI frames in TM1) and we should make sure that @@ -122,11 +121,11 @@ int InputZeroMQReader::GetNextFrame(void* buffer) const size_t framesize = 6144; - if (incoming->empty()) { + if (incoming.empty()) { return 0; } - else if (incoming->size() == framesize) { - memcpy(buffer, &incoming->front(), framesize); + else if (incoming.size() == framesize) { + memcpy(buffer, &incoming.front(), framesize); } else { throw logic_error("ZMQ ETI not 6144"); @@ -184,7 +183,7 @@ void InputZeroMQReader::RecvProcess() const int num_events = zmq::poll(items, 1, zmq_timeout_ms); if (num_events == 0) { // timeout is signalled by an empty buffer - auto buf = make_shared<vector<uint8_t> >(); + vector<uint8_t> buf; m_in_messages.push(buf); continue; } @@ -228,7 +227,7 @@ void InputZeroMQReader::RecvProcess() throw runtime_error(ss.str()); } else { - auto buf = make_shared<vector<uint8_t> >(6144, 0x55); + vector<uint8_t> buf(6144, 0x55); const int framesize = dab_msg->buflen[i]; @@ -236,13 +235,13 @@ void InputZeroMQReader::RecvProcess() throw runtime_error("ZeroMQ packet too small"); } - memcpy(&buf->front(), + memcpy(&buf.front(), ((uint8_t*)incoming.data()) + offset, framesize); offset += framesize; - queue_size = m_in_messages.push(buf); + queue_size = m_in_messages.push(move(buf)); etiLog.log(trace, "ZMQ,push %zu", queue_size); } } diff --git a/src/ModPlugin.cpp b/src/ModPlugin.cpp index d567a90..f907ba8 100644 --- a/src/ModPlugin.cpp +++ b/src/ModPlugin.cpp @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2017 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -29,6 +29,7 @@ #include "Utils.h" #include <stdexcept> #include <string> +#include <cstring> #define MODASSERT(cond) \ if (not (cond)) { \ @@ -92,16 +93,14 @@ int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut) return 0; } - std::shared_ptr<Buffer> inbuffer = - std::make_shared<Buffer>(dataIn->getLength(), dataIn->getData()); - - m_input_queue.push(inbuffer); + Buffer inbuffer; + std::swap(inbuffer, *dataIn); + m_input_queue.push(std::move(inbuffer)); if (m_ready_to_output_data) { - std::shared_ptr<Buffer> outbuffer; + Buffer outbuffer; m_output_queue.wait_and_pop(outbuffer); - - dataOut->setData(outbuffer->getData(), outbuffer->getLength()); + std::swap(outbuffer, *dataOut); } else { dataOut->setLength(dataIn->getLength()); @@ -132,21 +131,21 @@ void PipelinedModCodec::process_thread() set_realtime_prio(1); while (m_running) { - std::shared_ptr<Buffer> dataIn; + Buffer dataIn; m_input_queue.wait_and_pop(dataIn); - if (!dataIn or dataIn->getLength() == 0) { + if (dataIn.getLength() == 0) { break; } - std::shared_ptr<Buffer> dataOut = std::make_shared<Buffer>(); - dataOut->setLength(dataIn->getLength()); + Buffer dataOut; + dataOut.setLength(dataIn.getLength()); - if (internal_process(dataIn.get(), dataOut.get()) == 0) { + if (internal_process(&dataIn, &dataOut) == 0) { m_running = false; } - m_output_queue.push(dataOut); + m_output_queue.push(std::move(dataOut)); } m_running = false; diff --git a/src/ModPlugin.h b/src/ModPlugin.h index e9cfa21..7f03618 100644 --- a/src/ModPlugin.h +++ b/src/ModPlugin.h @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2017 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -114,8 +114,8 @@ protected: private: bool m_ready_to_output_data = false; - ThreadsafeQueue<std::shared_ptr<Buffer> > m_input_queue; - ThreadsafeQueue<std::shared_ptr<Buffer> > m_output_queue; + ThreadsafeQueue<Buffer> m_input_queue; + ThreadsafeQueue<Buffer> m_output_queue; std::deque<meta_vec_t> m_metadata_fifo; diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index e27c100..6f3808c 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -2,7 +2,7 @@ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013, 2014 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li An implementation for a threadsafe queue using boost thread library @@ -28,11 +28,12 @@ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef THREADSAFE_QUEUE_H -#define THREADSAFE_QUEUE_H +#pragma once -#include <boost/thread.hpp> +#include <mutex> +#include <condition_variable> #include <queue> +#include <utility> /* This queue is meant to be used by two threads. One producer * that pushes elements into the queue, and one consumer that @@ -53,7 +54,7 @@ public: */ size_t push(T const& val) { - boost::mutex::scoped_lock lock(the_mutex); + std::unique_lock<std::mutex> lock(the_mutex); the_queue.push(val); size_t queue_size = the_queue.size(); lock.unlock(); @@ -63,6 +64,18 @@ public: return queue_size; } + size_t push(T&& val) + { + std::unique_lock<std::mutex> lock(the_mutex); + the_queue.emplace(std::move(val)); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + /* Push one element into the queue, but wait until the * queue size goes below the threshold. * @@ -72,7 +85,7 @@ public: */ size_t push_wait_if_full(T const& val, size_t threshold) { - boost::mutex::scoped_lock lock(the_mutex); + std::unique_lock<std::mutex> lock(the_mutex); while (the_queue.size() >= threshold) { the_tx_notification.wait(lock); } @@ -93,19 +106,19 @@ public: bool empty() const { - boost::mutex::scoped_lock lock(the_mutex); + std::unique_lock<std::mutex> lock(the_mutex); return the_queue.empty(); } size_t size() const { - boost::mutex::scoped_lock lock(the_mutex); + std::unique_lock<std::mutex> lock(the_mutex); return the_queue.size(); } bool try_pop(T& popped_value) { - boost::mutex::scoped_lock lock(the_mutex); + std::unique_lock<std::mutex> lock(the_mutex); if (the_queue.empty()) { return false; } @@ -121,12 +134,12 @@ public: void wait_and_pop(T& popped_value, size_t prebuffering = 1) { - boost::mutex::scoped_lock lock(the_mutex); + std::unique_lock<std::mutex> lock(the_mutex); while (the_queue.size() < prebuffering) { the_rx_notification.wait(lock); } - popped_value = the_queue.front(); + std::swap(popped_value, the_queue.front()); the_queue.pop(); lock.unlock(); @@ -135,10 +148,8 @@ public: private: std::queue<T> the_queue; - mutable boost::mutex the_mutex; - boost::condition_variable the_rx_notification; - boost::condition_variable the_tx_notification; + mutable std::mutex the_mutex; + std::condition_variable the_rx_notification; + std::condition_variable the_tx_notification; }; -#endif - |