diff options
| -rw-r--r-- | src/OutputUHD.cpp | 9 | ||||
| -rw-r--r-- | src/OutputUHD.h | 2 | ||||
| -rw-r--r-- | src/ThreadsafeQueue.h | 41 | 
3 files changed, 40 insertions, 12 deletions
diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 1b84b7c..8f988f3 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the     Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2014, 2015 +   Copyright (C) 2016     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -404,11 +404,8 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)          }          else {              while (true) { -                if (uwd.frames.size() > FRAMES_MAX_SIZE) { -                    usleep(10000); // 10ms -                } - -                size_t num_frames = uwd.frames.push(frame); +                size_t num_frames = uwd.frames.push_wait_if_full(frame, +                        FRAMES_MAX_SIZE);                  etiLog.log(trace, "UHD,push %zu", num_frames);                  break;              } diff --git a/src/OutputUHD.h b/src/OutputUHD.h index a74f627..1c59136 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the     Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2014, 2015 +   Copyright (C) 2016     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index e5e83ef..e27c100 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -58,14 +58,37 @@ public:          size_t queue_size = the_queue.size();          lock.unlock(); -        notify(); +        the_rx_notification.notify_one();          return queue_size;      } -    void notify() +    /* Push one element into the queue, but wait until the +     * queue size goes below the threshold. +     * +     * Notify waiting thread. +     * +     * returns the new queue size. +     */ +    size_t push_wait_if_full(T const& val, size_t threshold) +    { +        boost::mutex::scoped_lock lock(the_mutex); +        while (the_queue.size() >= threshold) { +            the_tx_notification.wait(lock); +        } +        the_queue.push(val); +        size_t queue_size = the_queue.size(); +        lock.unlock(); + +        the_rx_notification.notify_one(); + +        return queue_size; +    } + +    /* Send a notification for the receiver thread */ +    void notify(void)      { -        the_condition_variable.notify_one(); +        the_rx_notification.notify_one();      }      bool empty() const @@ -89,6 +112,10 @@ public:          popped_value = the_queue.front();          the_queue.pop(); + +        lock.unlock(); +        the_tx_notification.notify_one(); +          return true;      } @@ -96,17 +123,21 @@ public:      {          boost::mutex::scoped_lock lock(the_mutex);          while (the_queue.size() < prebuffering) { -            the_condition_variable.wait(lock); +            the_rx_notification.wait(lock);          }          popped_value = the_queue.front();          the_queue.pop(); + +        lock.unlock(); +        the_tx_notification.notify_one();      }  private:      std::queue<T> the_queue;      mutable boost::mutex the_mutex; -    boost::condition_variable the_condition_variable; +    boost::condition_variable the_rx_notification; +    boost::condition_variable the_tx_notification;  };  #endif  | 
