summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosh Blum <josh@joshknows.com>2011-07-01 16:54:53 -0700
committerJosh Blum <josh@joshknows.com>2011-07-01 16:54:53 -0700
commit614d4901bb9dbb6866a0c5f57b1397e4b185a11f (patch)
treed2c0308af2464ae3b15e2079a25d8f24d46f0caa
parent6aa4690af05559d28b5238fa153fd46ff57cf06f (diff)
downloaduhd-614d4901bb9dbb6866a0c5f57b1397e4b185a11f.tar.gz
uhd-614d4901bb9dbb6866a0c5f57b1397e4b185a11f.tar.bz2
uhd-614d4901bb9dbb6866a0c5f57b1397e4b185a11f.zip
usrp: created common code to demux an rx stream (b100, e100)
-rw-r--r--host/examples/rx_multi_samples.cpp2
-rw-r--r--host/lib/usrp/b100/io_impl.cpp52
-rw-r--r--host/lib/usrp/common/CMakeLists.txt9
-rw-r--r--host/lib/usrp/common/recv_packet_demuxer.cpp87
-rw-r--r--host/lib/usrp/common/recv_packet_demuxer.hpp41
-rw-r--r--host/lib/usrp/common/validate_subdev_spec.hpp1
-rw-r--r--host/lib/usrp/e100/io_impl.cpp56
7 files changed, 152 insertions, 96 deletions
diff --git a/host/examples/rx_multi_samples.cpp b/host/examples/rx_multi_samples.cpp
index e820343ca..08b2d399d 100644
--- a/host/examples/rx_multi_samples.cpp
+++ b/host/examples/rx_multi_samples.cpp
@@ -44,7 +44,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){
("nsamps", po::value<size_t>(&total_num_samps)->default_value(10000), "total number of samples to receive")
("rate", po::value<double>(&rate)->default_value(100e6/16), "rate of incoming samples")
("sync", po::value<std::string>(&sync)->default_value("now"), "synchronization method: now, pps")
- ("subdev", po::value<std::string>(&subdev)->default_value(""), "subdev spec (homogeneous across motherboards)")
+ ("subdev", po::value<std::string>(&subdev), "subdev spec (homogeneous across motherboards)")
("dilv", "specify to disable inner-loop verbose")
;
po::variables_map vm;
diff --git a/host/lib/usrp/b100/io_impl.cpp b/host/lib/usrp/b100/io_impl.cpp
index 5b38a14ac..fbf341c7f 100644
--- a/host/lib/usrp/b100/io_impl.cpp
+++ b/host/lib/usrp/b100/io_impl.cpp
@@ -15,6 +15,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
+#include "recv_packet_demuxer.hpp"
#include "validate_subdev_spec.hpp"
#include "../../transport/super_recv_packet_handler.hpp"
#include "../../transport/super_send_packet_handler.hpp"
@@ -25,7 +26,6 @@
#include <uhd/transport/bounded_buffer.hpp>
#include <boost/bind.hpp>
#include <boost/format.hpp>
-#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <uhd/utils/msg.hpp>
@@ -34,55 +34,18 @@
using namespace uhd;
using namespace uhd::usrp;
using namespace uhd::transport;
-namespace asio = boost::asio;
/***********************************************************************
* IO Implementation Details
**********************************************************************/
struct b100_impl::io_impl{
- io_impl(zero_copy_if::sptr data_transport, const size_t recv_width):
- data_transport(data_transport), async_msg_fifo(100/*messages deep*/)
- {
- for (size_t i = 0; i < recv_width; i++){
- typedef bounded_buffer<managed_recv_buffer::sptr> buffs_queue_type;
- _buffs_queue.push_back(new buffs_queue_type(data_transport->get_num_recv_frames()));
- }
- }
-
- ~io_impl(void){
- for (size_t i = 0; i < _buffs_queue.size(); i++){
- delete _buffs_queue[i];
- }
- }
+ io_impl(void):
+ async_msg_fifo(100/*messages deep*/)
+ { /* NOP */ }
zero_copy_if::sptr data_transport;
bounded_buffer<async_metadata_t> async_msg_fifo;
- std::vector<bounded_buffer<managed_recv_buffer::sptr> *> _buffs_queue;
-
- //gets buffer, determines if its the requested index,
- //and either queues the buffer or returns the buffer
- managed_recv_buffer::sptr get_recv_buff(const size_t index, const double timeout){
- while (true){
- managed_recv_buffer::sptr buff;
-
- //attempt to pop a buffer from the queue
- if (_buffs_queue[index]->pop_with_haste(buff)) return buff;
-
- //otherwise, call into the transport
- buff = data_transport->get_recv_buff(timeout);
- if (buff.get() == NULL) return buff; //timeout
-
- //check the stream id to know which channel
- const boost::uint32_t *vrt_hdr = buff->cast<const boost::uint32_t *>();
- const size_t rx_index = uhd::wtohx(vrt_hdr[1]) - B100_RX_SID_BASE;
- if (rx_index == index) return buff; //got expected message
-
- //otherwise queue and try again
- if (rx_index < _buffs_queue.size()) _buffs_queue[rx_index]->push_with_pop_on_full(buff);
- else UHD_MSG(error) << "Got a data packet with known SID " << uhd::wtohx(vrt_hdr[1]) << std::endl;
- }
- }
-
+ recv_packet_demuxer::sptr demuxer;
sph::recv_packet_handler recv_handler;
sph::send_packet_handler send_handler;
};
@@ -109,7 +72,8 @@ void b100_impl::io_init(void){
_fpga_ctrl->poke32(B100_REG_MISC_RX_LEN, 4);
//create new io impl
- _io_impl = UHD_PIMPL_MAKE(io_impl, (_data_transport, _rx_dsps.size()));
+ _io_impl = UHD_PIMPL_MAKE(io_impl, ());
+ _io_impl->demuxer = recv_packet_demuxer::make(_data_transport, _rx_dsps.size(), B100_RX_SID_BASE);
//now its safe to register the async callback
_fpga_ctrl->set_async_cb(boost::bind(&b100_impl::handle_async_message, this, _1));
@@ -193,7 +157,7 @@ void b100_impl::update_rx_subdev_spec(const uhd::usrp::subdev_spec_t &spec){
for (size_t i = 0; i < _io_impl->recv_handler.size(); i++){
_rx_dsps[i]->set_nsamps_per_packet(get_max_recv_samps_per_packet()); //seems to be a good place to set this
_io_impl->recv_handler.set_xport_chan_get_buff(i, boost::bind(
- &b100_impl::io_impl::get_recv_buff, _io_impl.get(), i, _1
+ &recv_packet_demuxer::get_recv_buff, _io_impl->demuxer, i, _1
));
_io_impl->recv_handler.set_overflow_handler(i, boost::bind(&rx_dsp_core_200::handle_overflow, _rx_dsps[i]));
}
diff --git a/host/lib/usrp/common/CMakeLists.txt b/host/lib/usrp/common/CMakeLists.txt
index 31004d952..ec8b60fec 100644
--- a/host/lib/usrp/common/CMakeLists.txt
+++ b/host/lib/usrp/common/CMakeLists.txt
@@ -20,10 +20,15 @@
########################################################################
IF(ENABLE_USB)
INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/../firmware/fx2/common)
- INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR})
LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/fx2_ctrl.cpp
- ${CMAKE_CURRENT_SOURCE_DIR}/validate_subdev_spec.cpp
)
ENDIF(ENABLE_USB)
+
+INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR})
+
+LIBUHD_APPEND_SOURCES(
+ ${CMAKE_CURRENT_SOURCE_DIR}/validate_subdev_spec.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/recv_packet_demuxer.cpp
+)
diff --git a/host/lib/usrp/common/recv_packet_demuxer.cpp b/host/lib/usrp/common/recv_packet_demuxer.cpp
new file mode 100644
index 000000000..93f423aeb
--- /dev/null
+++ b/host/lib/usrp/common/recv_packet_demuxer.cpp
@@ -0,0 +1,87 @@
+//
+// Copyright 2011 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include "recv_packet_demuxer.hpp"
+#include <uhd/utils/msg.hpp>
+#include <uhd/utils/byteswap.hpp>
+#include <boost/thread/mutex.hpp>
+#include <queue>
+#include <deque>
+#include <vector>
+
+using namespace uhd;
+using namespace uhd::usrp;
+using namespace uhd::transport;
+
+static UHD_INLINE boost::uint32_t extract_sid(managed_recv_buffer::sptr &buff){
+ //ASSUME that the data is in little endian format
+ return uhd::wtohx(buff->cast<const boost::uint32_t *>()[1]);
+}
+
+class recv_packet_demuxer_impl : public uhd::usrp::recv_packet_demuxer{
+public:
+ recv_packet_demuxer_impl(
+ transport::zero_copy_if::sptr transport,
+ const size_t size,
+ const boost::uint32_t sid_base
+ ):
+ _transport(transport), _sid_base(sid_base), _queues(size)
+ {
+ /* NOP */
+ }
+
+ managed_recv_buffer::sptr get_recv_buff(const size_t index, const double timeout){
+ boost::mutex::scoped_lock lock(_mutex);
+ managed_recv_buffer::sptr buff;
+
+ //there is already an entry in the queue, so pop that
+ if (not _queues[index].wrapper.empty()){
+ std::swap(buff, _queues[index].wrapper.front());
+ _queues[index].wrapper.pop();
+ return buff;
+ }
+
+ while (true){
+ //otherwise call into the transport
+ buff = _transport->get_recv_buff(timeout);
+ if (buff.get() == NULL) return buff; //timeout
+
+ //check the stream id to know which channel
+ const size_t rx_index = extract_sid(buff) - _sid_base;
+ if (rx_index == index) return buff; //got expected message
+
+ //otherwise queue and try again
+ if (rx_index < _queues.size()) _queues[rx_index].wrapper.push(buff);
+ else UHD_MSG(error) << "Got a data packet with known SID " << extract_sid(buff) << std::endl;
+ }
+ }
+
+private:
+ transport::zero_copy_if::sptr _transport;
+ const boost::uint32_t _sid_base;
+ boost::mutex _mutex;
+ struct channel_guts_type{
+ channel_guts_type(void): wrapper(container){}
+ std::deque<managed_recv_buffer::sptr> container;
+ std::queue<managed_recv_buffer::sptr> wrapper;
+ };
+ std::vector<channel_guts_type> _queues;
+};
+
+recv_packet_demuxer::sptr recv_packet_demuxer::make(transport::zero_copy_if::sptr transport, const size_t size, const boost::uint32_t sid_base){
+ return sptr(new recv_packet_demuxer_impl(transport, size, sid_base));
+}
diff --git a/host/lib/usrp/common/recv_packet_demuxer.hpp b/host/lib/usrp/common/recv_packet_demuxer.hpp
new file mode 100644
index 000000000..fde756d27
--- /dev/null
+++ b/host/lib/usrp/common/recv_packet_demuxer.hpp
@@ -0,0 +1,41 @@
+//
+// Copyright 2011 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#ifndef INCLUDED_LIBUHD_USRP_COMMON_RECV_PACKET_DEMUXER_HPP
+#define INCLUDED_LIBUHD_USRP_COMMON_RECV_PACKET_DEMUXER_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/transport/zero_copy.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/cstdint.hpp>
+
+namespace uhd{ namespace usrp{
+
+ class recv_packet_demuxer{
+ public:
+ typedef boost::shared_ptr<recv_packet_demuxer> sptr;
+
+ //! Make a new demuxer from a transport and parameters
+ static sptr make(transport::zero_copy_if::sptr transport, const size_t size, const boost::uint32_t sid_base);
+
+ //! Get a buffer at the given index from the transport
+ virtual transport::managed_recv_buffer::sptr get_recv_buff(const size_t index, const double timeout) = 0;
+ };
+
+}} //namespace uhd::usrp
+
+#endif /* INCLUDED_LIBUHD_USRP_COMMON_RECV_PACKET_DEMUXER_HPP */
diff --git a/host/lib/usrp/common/validate_subdev_spec.hpp b/host/lib/usrp/common/validate_subdev_spec.hpp
index e73b3e1ee..7d9e2c309 100644
--- a/host/lib/usrp/common/validate_subdev_spec.hpp
+++ b/host/lib/usrp/common/validate_subdev_spec.hpp
@@ -20,7 +20,6 @@
#include <uhd/config.hpp>
#include <uhd/usrp/subdev_spec.hpp>
-#include <uhd/utils/assert_has.hpp>
#include <uhd/property_tree.hpp>
#include <string>
diff --git a/host/lib/usrp/e100/io_impl.cpp b/host/lib/usrp/e100/io_impl.cpp
index 7af2515a9..14b348f96 100644
--- a/host/lib/usrp/e100/io_impl.cpp
+++ b/host/lib/usrp/e100/io_impl.cpp
@@ -15,6 +15,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
+#include "recv_packet_demuxer.hpp"
#include "validate_subdev_spec.hpp"
#include "../../transport/super_recv_packet_handler.hpp"
#include "../../transport/super_send_packet_handler.hpp"
@@ -46,59 +47,17 @@ using namespace uhd::transport;
* - vrt packet handler states
**********************************************************************/
struct e100_impl::io_impl{
- io_impl(zero_copy_if::sptr data_transport, const size_t recv_width):
- false_alarm(0),
- data_transport(data_transport),
- async_msg_fifo(100/*messages deep*/)
- {
- for (size_t i = 0; i < recv_width; i++){
- typedef bounded_buffer<managed_recv_buffer::sptr> buffs_queue_type;
- _buffs_queue.push_back(new buffs_queue_type(data_transport->get_num_recv_frames()));
- }
- }
-
- ~io_impl(void){
- recv_pirate_crew.interrupt_all();
- recv_pirate_crew.join_all();
- for (size_t i = 0; i < _buffs_queue.size(); i++){
- delete _buffs_queue[i];
- }
- }
+ io_impl(void):
+ false_alarm(0), async_msg_fifo(100/*messages deep*/)
+ { /* NOP */ }
double tick_rate; //set by update tick rate method
e100_ctrl::sptr iface; //so handle irq can peek and poke
void handle_irq(void);
size_t false_alarm;
-
- std::vector<bounded_buffer<managed_recv_buffer::sptr> *> _buffs_queue;
-
- //gets buffer, determines if its the requested index,
- //and either queues the buffer or returns the buffer
- managed_recv_buffer::sptr get_recv_buff(const size_t index, const double timeout){
- while (true){
- managed_recv_buffer::sptr buff;
-
- //attempt to pop a buffer from the queue
- if (_buffs_queue[index]->pop_with_haste(buff)) return buff;
-
- //otherwise, call into the transport
- buff = data_transport->get_recv_buff(timeout);
- if (buff.get() == NULL) return buff; //timeout
-
- //check the stream id to know which channel
- const boost::uint32_t *vrt_hdr = buff->cast<const boost::uint32_t *>();
- const size_t rx_index = uhd::wtohx(vrt_hdr[1]) - E100_RX_SID_BASE;
- if (rx_index == index) return buff; //got expected message
-
- //otherwise queue and try again
- if (rx_index < _buffs_queue.size()) _buffs_queue[rx_index]->push_with_pop_on_full(buff);
- else UHD_MSG(error) << "Got a data packet with known SID " << uhd::wtohx(vrt_hdr[1]) << std::endl;
- }
- }
-
//The data transport is listed first so that it is deconstructed last,
//which is after the states and booty which may hold managed buffers.
- zero_copy_if::sptr data_transport;
+ recv_packet_demuxer::sptr demuxer;
//state management for the vrt packet handler code
sph::recv_packet_handler recv_handler;
@@ -213,7 +172,8 @@ void e100_impl::io_init(void){
_tx_otw_type.byteorder = uhd::otw_type_t::BO_LITTLE_ENDIAN;
//create new io impl
- _io_impl = UHD_PIMPL_MAKE(io_impl, (_data_transport, _rx_dsps.size()));
+ _io_impl = UHD_PIMPL_MAKE(io_impl, ());
+ _io_impl->demuxer = recv_packet_demuxer::make(_data_transport, _rx_dsps.size(), E100_RX_SID_BASE);
_io_impl->iface = _fpga_ctrl;
//clear state machines
@@ -280,7 +240,7 @@ void e100_impl::update_rx_subdev_spec(const uhd::usrp::subdev_spec_t &spec){
for (size_t i = 0; i < _io_impl->recv_handler.size(); i++){
_rx_dsps[i]->set_nsamps_per_packet(get_max_recv_samps_per_packet()); //seems to be a good place to set this
_io_impl->recv_handler.set_xport_chan_get_buff(i, boost::bind(
- &e100_impl::io_impl::get_recv_buff, _io_impl.get(), i, _1
+ &recv_packet_demuxer::get_recv_buff, _io_impl->demuxer, i, _1
));
_io_impl->recv_handler.set_overflow_handler(i, boost::bind(&rx_dsp_core_200::handle_overflow, _rx_dsps[i]));
}