diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2018-03-06 22:53:36 +0100 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2018-03-06 22:53:36 +0100 | 
| commit | 56dfbcd73529ee6dc42a17a139c9ceaccf1def3d (patch) | |
| tree | 47580fcfa894fca04805f670ec6acf30a08a6f5d /src | |
| parent | 5bea9241b246cfc4b9abef3d265a96d52a377c37 (diff) | |
| download | dabmod-56dfbcd73529ee6dc42a17a139c9ceaccf1def3d.tar.gz dabmod-56dfbcd73529ee6dc42a17a139c9ceaccf1def3d.tar.bz2 dabmod-56dfbcd73529ee6dc42a17a139c9ceaccf1def3d.zip | |
Avoid copies in ThreadsafeQueue and Buffer
Diffstat (limited to 'src')
| -rw-r--r-- | src/Buffer.cpp | 18 | ||||
| -rw-r--r-- | src/Buffer.h | 6 | ||||
| -rw-r--r-- | src/InputReader.h | 4 | ||||
| -rw-r--r-- | src/InputZeroMQReader.cpp | 19 | ||||
| -rw-r--r-- | src/ModPlugin.cpp | 27 | ||||
| -rw-r--r-- | src/ModPlugin.h | 6 | ||||
| -rw-r--r-- | src/ThreadsafeQueue.h | 43 | 
7 files changed, 78 insertions, 45 deletions
| diff --git a/src/Buffer.cpp b/src/Buffer.cpp index e96b8d6..002c1eb 100644 --- a/src/Buffer.cpp +++ b/src/Buffer.cpp @@ -77,6 +77,12 @@ Buffer::~Buffer()      }  } +void Buffer::swap(Buffer& other) +{ +    std::swap(m_len, other.m_len); +    std::swap(m_capacity, other.m_capacity); +    std::swap(m_data, other.m_data); +}  Buffer& Buffer::operator=(const Buffer& other)  { @@ -142,6 +148,13 @@ void Buffer::setData(const void *data, size_t len)      appendData(data, len);  } +uint8_t Buffer::operator[](size_t i) const +{ +    if (i >= m_len) { +        throw std::out_of_range("index out of range"); +    } +    return reinterpret_cast<uint8_t*>(m_data)[i]; +}  void Buffer::appendData(const void *data, size_t len)  { @@ -153,3 +166,8 @@ void Buffer::appendData(const void *data, size_t len)      }  } +void swap(Buffer& buf1, Buffer& buf2) +{ +    buf1.swap(buf2); +} + diff --git a/src/Buffer.h b/src/Buffer.h index e486724..d181a46 100644 --- a/src/Buffer.h +++ b/src/Buffer.h @@ -50,6 +50,8 @@ class Buffer {          Buffer(const std::vector<uint8_t>& vec);          ~Buffer(); +        void swap(Buffer& other); +          /* Resize the buffer, reallocate memory if needed */          void setLength(size_t len); @@ -60,6 +62,8 @@ class Buffer {          Buffer& operator=(Buffer&& other);          Buffer& operator=(const std::vector<uint8_t>& buf); +        uint8_t operator[](size_t i) const; +          /* Concatenate the current data with the new data given.           * Reallocates memory if needed. */          void appendData(const void *data, size_t len); @@ -80,3 +84,5 @@ class Buffer {          void *m_data;  }; +void swap(Buffer& buf1, Buffer& buf2); + diff --git a/src/InputReader.h b/src/InputReader.h index 84f6835..4ffa2b8 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -173,12 +173,12 @@ class InputZeroMQReader : public InputReader          std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);          std::string m_uri;          size_t m_max_queued_frames = 0; -        ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > m_in_messages; +        ThreadsafeQueue<std::vector<uint8_t> > m_in_messages;          void RecvProcess(void);          zmq::context_t m_zmqcontext; // is thread-safe -        boost::thread m_recv_thread; +        std::thread m_recv_thread;          /* We must be careful to keep frame phase consistent. If we           * drop a single ETI frame, we will break the transmission diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 1bb325f..e1e1c6d 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -36,7 +36,6 @@  #include <cstdio>  #include <stdint.h>  #include "zmq.hpp" -#include <boost/thread/thread.hpp>  #include "InputReader.h"  #include "PcDebug.h"  #include "Utils.h" @@ -88,7 +87,7 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames)      m_max_queued_frames = max_queued_frames; -    m_recv_thread = boost::thread(&InputZeroMQReader::RecvProcess, this); +    m_recv_thread = std::thread(&InputZeroMQReader::RecvProcess, this);      return 0;  } @@ -99,7 +98,7 @@ int InputZeroMQReader::GetNextFrame(void* buffer)          return 0;      } -    shared_ptr<vector<uint8_t> > incoming; +    vector<uint8_t> incoming;      /* Do some prebuffering because reads will happen in bursts       * (4 ETI frames in TM1) and we should make sure that @@ -122,11 +121,11 @@ int InputZeroMQReader::GetNextFrame(void* buffer)      const size_t framesize = 6144; -    if (incoming->empty()) { +    if (incoming.empty()) {          return 0;      } -    else if (incoming->size() == framesize) { -        memcpy(buffer, &incoming->front(), framesize); +    else if (incoming.size() == framesize) { +        memcpy(buffer, &incoming.front(), framesize);      }      else {          throw logic_error("ZMQ ETI not 6144"); @@ -184,7 +183,7 @@ void InputZeroMQReader::RecvProcess()              const int num_events = zmq::poll(items, 1, zmq_timeout_ms);              if (num_events == 0) {                  // timeout is signalled by an empty buffer -                auto buf = make_shared<vector<uint8_t> >(); +                vector<uint8_t> buf;                  m_in_messages.push(buf);                  continue;              } @@ -228,7 +227,7 @@ void InputZeroMQReader::RecvProcess()                              throw runtime_error(ss.str());                          }                          else { -                            auto buf = make_shared<vector<uint8_t> >(6144, 0x55); +                            vector<uint8_t> buf(6144, 0x55);                              const int framesize = dab_msg->buflen[i]; @@ -236,13 +235,13 @@ void InputZeroMQReader::RecvProcess()                                  throw runtime_error("ZeroMQ packet too small");                              } -                            memcpy(&buf->front(), +                            memcpy(&buf.front(),                                      ((uint8_t*)incoming.data()) + offset,                                      framesize);                              offset += framesize; -                            queue_size = m_in_messages.push(buf); +                            queue_size = m_in_messages.push(move(buf));                              etiLog.log(trace, "ZMQ,push %zu", queue_size);                          }                      } diff --git a/src/ModPlugin.cpp b/src/ModPlugin.cpp index d567a90..f907ba8 100644 --- a/src/ModPlugin.cpp +++ b/src/ModPlugin.cpp @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the     Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2017 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -29,6 +29,7 @@  #include "Utils.h"  #include <stdexcept>  #include <string> +#include <cstring>  #define MODASSERT(cond) \      if (not (cond)) { \ @@ -92,16 +93,14 @@ int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut)          return 0;      } -    std::shared_ptr<Buffer> inbuffer = -        std::make_shared<Buffer>(dataIn->getLength(), dataIn->getData()); - -    m_input_queue.push(inbuffer); +    Buffer inbuffer; +    std::swap(inbuffer, *dataIn); +    m_input_queue.push(std::move(inbuffer));      if (m_ready_to_output_data) { -        std::shared_ptr<Buffer> outbuffer; +        Buffer outbuffer;          m_output_queue.wait_and_pop(outbuffer); - -        dataOut->setData(outbuffer->getData(), outbuffer->getLength()); +        std::swap(outbuffer, *dataOut);      }      else {          dataOut->setLength(dataIn->getLength()); @@ -132,21 +131,21 @@ void PipelinedModCodec::process_thread()      set_realtime_prio(1);      while (m_running) { -        std::shared_ptr<Buffer> dataIn; +        Buffer dataIn;          m_input_queue.wait_and_pop(dataIn); -        if (!dataIn or dataIn->getLength() == 0) { +        if (dataIn.getLength() == 0) {              break;          } -        std::shared_ptr<Buffer> dataOut = std::make_shared<Buffer>(); -        dataOut->setLength(dataIn->getLength()); +        Buffer dataOut; +        dataOut.setLength(dataIn.getLength()); -        if (internal_process(dataIn.get(), dataOut.get()) == 0) { +        if (internal_process(&dataIn, &dataOut) == 0) {              m_running = false;          } -        m_output_queue.push(dataOut); +        m_output_queue.push(std::move(dataOut));      }      m_running = false; diff --git a/src/ModPlugin.h b/src/ModPlugin.h index e9cfa21..7f03618 100644 --- a/src/ModPlugin.h +++ b/src/ModPlugin.h @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2017 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -114,8 +114,8 @@ protected:  private:      bool m_ready_to_output_data = false; -    ThreadsafeQueue<std::shared_ptr<Buffer> > m_input_queue; -    ThreadsafeQueue<std::shared_ptr<Buffer> > m_output_queue; +    ThreadsafeQueue<Buffer> m_input_queue; +    ThreadsafeQueue<Buffer> m_output_queue;      std::deque<meta_vec_t> m_metadata_fifo; diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index e27c100..6f3808c 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -2,7 +2,7 @@     Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in     Right of Canada (Communications Research Center Canada) -   Copyright (C) 2013, 2014 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li     An implementation for a threadsafe queue using boost thread library @@ -28,11 +28,12 @@     along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>.   */ -#ifndef THREADSAFE_QUEUE_H -#define THREADSAFE_QUEUE_H +#pragma once -#include <boost/thread.hpp> +#include <mutex> +#include <condition_variable>  #include <queue> +#include <utility>  /* This queue is meant to be used by two threads. One producer   * that pushes elements into the queue, and one consumer that @@ -53,7 +54,7 @@ public:       */      size_t push(T const& val)      { -        boost::mutex::scoped_lock lock(the_mutex); +        std::unique_lock<std::mutex> lock(the_mutex);          the_queue.push(val);          size_t queue_size = the_queue.size();          lock.unlock(); @@ -63,6 +64,18 @@ public:          return queue_size;      } +    size_t push(T&& val) +    { +        std::unique_lock<std::mutex> lock(the_mutex); +        the_queue.emplace(std::move(val)); +        size_t queue_size = the_queue.size(); +        lock.unlock(); + +        the_rx_notification.notify_one(); + +        return queue_size; +    } +      /* Push one element into the queue, but wait until the       * queue size goes below the threshold.       * @@ -72,7 +85,7 @@ public:       */      size_t push_wait_if_full(T const& val, size_t threshold)      { -        boost::mutex::scoped_lock lock(the_mutex); +        std::unique_lock<std::mutex> lock(the_mutex);          while (the_queue.size() >= threshold) {              the_tx_notification.wait(lock);          } @@ -93,19 +106,19 @@ public:      bool empty() const      { -        boost::mutex::scoped_lock lock(the_mutex); +        std::unique_lock<std::mutex> lock(the_mutex);          return the_queue.empty();      }      size_t size() const      { -        boost::mutex::scoped_lock lock(the_mutex); +        std::unique_lock<std::mutex> lock(the_mutex);          return the_queue.size();      }      bool try_pop(T& popped_value)      { -        boost::mutex::scoped_lock lock(the_mutex); +        std::unique_lock<std::mutex> lock(the_mutex);          if (the_queue.empty()) {              return false;          } @@ -121,12 +134,12 @@ public:      void wait_and_pop(T& popped_value, size_t prebuffering = 1)      { -        boost::mutex::scoped_lock lock(the_mutex); +        std::unique_lock<std::mutex> lock(the_mutex);          while (the_queue.size() < prebuffering) {              the_rx_notification.wait(lock);          } -        popped_value = the_queue.front(); +        std::swap(popped_value, the_queue.front());          the_queue.pop();          lock.unlock(); @@ -135,10 +148,8 @@ public:  private:      std::queue<T> the_queue; -    mutable boost::mutex the_mutex; -    boost::condition_variable the_rx_notification; -    boost::condition_variable the_tx_notification; +    mutable std::mutex the_mutex; +    std::condition_variable the_rx_notification; +    std::condition_variable the_tx_notification;  }; -#endif - | 
