From a53130679944ddd179593259eb953b89ab1a7a38 Mon Sep 17 00:00:00 2001 From: Brent Stapleton Date: Fri, 18 Jan 2019 09:45:34 -0800 Subject: lib: transport: apply clang-format This is a continuation of 967be2a4. $ find host/lib/transport -iname *.hpp -o -iname *.cpp |\ xargs clang-format -i -style=file Skipping host/lib/transport/nirio/ because of build errors. $ git checkout host/lib/transport/nirio --- host/lib/transport/muxed_zero_copy_if.cpp | 226 ++++++++++++++++-------------- 1 file changed, 120 insertions(+), 106 deletions(-) (limited to 'host/lib/transport/muxed_zero_copy_if.cpp') diff --git a/host/lib/transport/muxed_zero_copy_if.cpp b/host/lib/transport/muxed_zero_copy_if.cpp index 2e7583276..b26d2097c 100644 --- a/host/lib/transport/muxed_zero_copy_if.cpp +++ b/host/lib/transport/muxed_zero_copy_if.cpp @@ -5,73 +5,75 @@ // SPDX-License-Identifier: GPL-3.0-or-later // -#include -#include #include +#include +#include #include #include #include #include #include -#include #include +#include #include using namespace uhd; using namespace uhd::transport; -class muxed_zero_copy_if_impl : public muxed_zero_copy_if, - public boost::enable_shared_from_this +class muxed_zero_copy_if_impl + : public muxed_zero_copy_if, + public boost::enable_shared_from_this { public: typedef boost::shared_ptr sptr; - muxed_zero_copy_if_impl( - zero_copy_if::sptr base_xport, + muxed_zero_copy_if_impl(zero_copy_if::sptr base_xport, stream_classifier_fn classify_fn, - size_t max_streams - ): - _base_xport(base_xport), _classify(classify_fn), - _max_num_streams(max_streams), _num_dropped_frames(0) + size_t max_streams) + : _base_xport(base_xport) + , _classify(classify_fn) + , _max_num_streams(max_streams) + , _num_dropped_frames(0) { - //Create the receive thread to poll the underlying transport - //and classify packets into queues - _recv_thread = boost::thread( - boost::bind(&muxed_zero_copy_if_impl::_update_queues, this)); + // Create the receive thread to poll the underlying transport + // and classify packets into queues + _recv_thread = + boost::thread(boost::bind(&muxed_zero_copy_if_impl::_update_queues, this)); } virtual ~muxed_zero_copy_if_impl() { UHD_SAFE_CALL( - //Interrupt buffer updater loop + // Interrupt buffer updater loop _recv_thread.interrupt(); - //Wait for loop to finish - //No timeout on join. The recv loop is guaranteed - //to terminate in a reasonable amount of time because - //there are no timed blocks on the underlying. + // Wait for loop to finish + // No timeout on join. The recv loop is guaranteed + // to terminate in a reasonable amount of time because + // there are no timed blocks on the underlying. _recv_thread.join(); - //Flush base transport + // Flush base transport while (_base_xport->get_recv_buff(0.0001)) /*NOP*/; - //Release child streams - //Note that this will not delete or flush the child streams - //until the owners of the streams have released the respective - //shared pointers. This ensures that packets are not dropped. - _streams.clear(); - ); + // Release child streams + // Note that this will not delete or flush the child streams + // until the owners of the streams have released the respective + // shared pointers. This ensures that packets are not dropped. + _streams.clear();); } virtual zero_copy_if::sptr make_stream(const uint32_t stream_num) { boost::lock_guard lock(_mutex); if (_streams.size() >= _max_num_streams) { - throw uhd::runtime_error("muxed_zero_copy_if: stream capacity exceeded. cannot create more streams."); + throw uhd::runtime_error("muxed_zero_copy_if: stream capacity exceeded. " + "cannot create more streams."); } // Only allocate a portion of the base transport's frames to each stream // to prevent all streams from attempting to use all the frames. - stream_impl::sptr stream = boost::make_shared( - this->shared_from_this(), stream_num, - _base_xport->get_num_send_frames() / _max_num_streams, - _base_xport->get_num_recv_frames() / _max_num_streams); + stream_impl::sptr stream = + boost::make_shared(this->shared_from_this(), + stream_num, + _base_xport->get_num_send_frames() / _max_num_streams, + _base_xport->get_num_recv_frames() / _max_num_streams); _streams[stream_num] = stream; return stream; } @@ -97,20 +99,21 @@ private: public: stream_mrb(size_t size) : _buff(new char[size]) {} - ~stream_mrb() { + ~stream_mrb() + { delete[] _buff; } void release() {} - UHD_INLINE sptr get_new(char *buff, size_t len) + UHD_INLINE sptr get_new(char* buff, size_t len) { memcpy(_buff, buff, len); return make(this, _buff, len); } private: - char *_buff; + char* _buff; }; class stream_impl : public zero_copy_if @@ -119,20 +122,19 @@ private: typedef boost::shared_ptr sptr; typedef boost::weak_ptr wptr; - stream_impl( - muxed_zero_copy_if_impl::sptr muxed_xport, + stream_impl(muxed_zero_copy_if_impl::sptr muxed_xport, const uint32_t stream_num, const size_t num_send_frames, - const size_t num_recv_frames - ) : - _stream_num(stream_num), _muxed_xport(muxed_xport), - _num_send_frames(num_send_frames), - _send_frame_size(_muxed_xport->base_xport()->get_send_frame_size()), - _num_recv_frames(num_recv_frames), - _recv_frame_size(_muxed_xport->base_xport()->get_recv_frame_size()), - _buff_queue(num_recv_frames), - _buffers(num_recv_frames), - _buffer_index(0) + const size_t num_recv_frames) + : _stream_num(stream_num) + , _muxed_xport(muxed_xport) + , _num_send_frames(num_send_frames) + , _send_frame_size(_muxed_xport->base_xport()->get_send_frame_size()) + , _num_recv_frames(num_recv_frames) + , _recv_frame_size(_muxed_xport->base_xport()->get_recv_frame_size()) + , _buff_queue(num_recv_frames) + , _buffers(num_recv_frames) + , _buffer_index(0) { for (size_t i = 0; i < num_recv_frames; i++) { _buffers[i] = boost::make_shared(_recv_frame_size); @@ -141,25 +143,28 @@ private: ~stream_impl(void) { - //First remove the stream from muxed transport - //so no more frames are pushed in + // First remove the stream from muxed transport + // so no more frames are pushed in _muxed_xport->remove_stream(_stream_num); - //Flush the transport + // Flush the transport managed_recv_buffer::sptr buff; while (_buff_queue.pop_with_haste(buff)) { - //NOP + // NOP } } - size_t get_num_recv_frames(void) const { + size_t get_num_recv_frames(void) const + { return _num_recv_frames; } - size_t get_recv_frame_size(void) const { + size_t get_recv_frame_size(void) const + { return _recv_frame_size; } - managed_recv_buffer::sptr get_recv_buff(double timeout) { + managed_recv_buffer::sptr get_recv_buff(double timeout) + { managed_recv_buffer::sptr buff; if (_buff_queue.pop_with_timed_wait(buff, timeout)) { return buff; @@ -168,16 +173,20 @@ private: } } - void push_recv_buff(managed_recv_buffer::sptr buff) { - _buff_queue.push_with_wait(_buffers.at(_buffer_index++)->get_new(buff->cast(), buff->size())); + void push_recv_buff(managed_recv_buffer::sptr buff) + { + _buff_queue.push_with_wait( + _buffers.at(_buffer_index++)->get_new(buff->cast(), buff->size())); _buffer_index %= _buffers.size(); } - size_t get_num_send_frames(void) const { + size_t get_num_send_frames(void) const + { return _num_send_frames; } - size_t get_send_frame_size(void) const { + size_t get_send_frame_size(void) const + { return _send_frame_size; } @@ -187,51 +196,55 @@ private: } private: - const uint32_t _stream_num; - muxed_zero_copy_if_impl::sptr _muxed_xport; - const size_t _num_send_frames; - const size_t _send_frame_size; - const size_t _num_recv_frames; - const size_t _recv_frame_size; - bounded_buffer _buff_queue; - std::vector< boost::shared_ptr > _buffers; - size_t _buffer_index; + const uint32_t _stream_num; + muxed_zero_copy_if_impl::sptr _muxed_xport; + const size_t _num_send_frames; + const size_t _send_frame_size; + const size_t _num_recv_frames; + const size_t _recv_frame_size; + bounded_buffer _buff_queue; + std::vector> _buffers; + size_t _buffer_index; }; - inline zero_copy_if::sptr& base_xport() { return _base_xport; } + inline zero_copy_if::sptr& base_xport() + { + return _base_xport; + } void _update_queues() { - //Run forever: + // Run forever: // - Pull packets from the base transport // - Classify them // - Push them to the appropriate receive queue while (true) { - { //Uninterruptable block of code + { // Uninterruptable block of code boost::this_thread::disable_interruption interrupt_disabler; if (not _process_next_buffer()) { - //Be a good citizen and yield if no packet is processed + // Be a good citizen and yield if no packet is processed static const size_t MIN_DUR = 1; std::this_thread::sleep_for(std::chrono::nanoseconds(MIN_DUR)); - //We call sleep(MIN_DUR) above instead of yield() to ensure that we - //relinquish the current scheduler time slot. - //yield() is a hint to the scheduler to end the time - //slice early and schedule in another thread that is ready to run. - //However in most situations, there will be no other thread and - //this thread will continue to run which will rail a CPU core. - //We call sleep(MIN_DUR=1) instead which will sleep for a minimum time. - //Ideally we would like to use boost::chrono::.*seconds::min() but that - //is bound to 0, which causes the sleep_for call to be a no-op and - //thus useless to actually force a sleep. + // We call sleep(MIN_DUR) above instead of yield() to ensure that we + // relinquish the current scheduler time slot. + // yield() is a hint to the scheduler to end the time + // slice early and schedule in another thread that is ready to run. + // However in most situations, there will be no other thread and + // this thread will continue to run which will rail a CPU core. + // We call sleep(MIN_DUR=1) instead which will sleep for a minimum + // time. Ideally we would like to use boost::chrono::.*seconds::min() + // but that is bound to 0, which causes the sleep_for call to be a + // no-op and thus useless to actually force a sleep. //**************************************************************** - //NOTE: This behavior makes this transport a poor choice for + // NOTE: This behavior makes this transport a poor choice for // low latency communication. //**************************************************************** } } - //Check if the master thread has requested a shutdown - if (boost::this_thread::interruption_requested()) break; + // Check if the master thread has requested a shutdown + if (boost::this_thread::interruption_requested()) + break; } } @@ -241,10 +254,11 @@ private: if (buff) { stream_impl::sptr stream; try { - const uint32_t stream_num = _classify(buff->cast(), _base_xport->get_recv_frame_size()); + const uint32_t stream_num = + _classify(buff->cast(), _base_xport->get_recv_frame_size()); { - //Hold the stream mutex long enough to pull a bounded buffer - //and lock it (increment its ref count). + // Hold the stream mutex long enough to pull a bounded buffer + // and lock it (increment its ref count). boost::lock_guard lock(_mutex); stream_map_t::iterator str_iter = _streams.find(stream_num); if (str_iter != _streams.end()) { @@ -252,41 +266,41 @@ private: } } } catch (std::exception&) { - //If _classify throws we simply drop the frame + // If _classify throws we simply drop the frame } - //Once a bounded buffer is acquired, we can rely on its - //thread safety to serialize with the consumer. + // Once a bounded buffer is acquired, we can rely on its + // thread safety to serialize with the consumer. if (stream.get()) { stream->push_recv_buff(buff); } else { boost::lock_guard lock(_mutex); _num_dropped_frames++; } - //We processed a packet, and there could be more coming - //Don't yield in the next iteration. + // We processed a packet, and there could be more coming + // Don't yield in the next iteration. return true; } else { - //The base transport is idle. Return false to let the - //thread yield. + // The base transport is idle. Return false to let the + // thread yield. return false; } } typedef std::map stream_map_t; - zero_copy_if::sptr _base_xport; - stream_classifier_fn _classify; - stream_map_t _streams; - const size_t _max_num_streams; - size_t _num_dropped_frames; - boost::thread _recv_thread; - boost::mutex _mutex; + zero_copy_if::sptr _base_xport; + stream_classifier_fn _classify; + stream_map_t _streams; + const size_t _max_num_streams; + size_t _num_dropped_frames; + boost::thread _recv_thread; + boost::mutex _mutex; }; -muxed_zero_copy_if::sptr muxed_zero_copy_if::make( - zero_copy_if::sptr base_xport, +muxed_zero_copy_if::sptr muxed_zero_copy_if::make(zero_copy_if::sptr base_xport, muxed_zero_copy_if::stream_classifier_fn classify_fn, - size_t max_streams -) { - return boost::make_shared(base_xport, classify_fn, max_streams); + size_t max_streams) +{ + return boost::make_shared( + base_xport, classify_fn, max_streams); } -- cgit v1.2.3