summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--firmware/microblaze/apps/txrx_uhd.c24
-rw-r--r--firmware/microblaze/usrp2/memory_map.h12
-rw-r--r--firmware/microblaze/usrp2p/memory_map.h12
-rw-r--r--host/docs/transport.rst2
-rw-r--r--host/include/uhd/transport/CMakeLists.txt2
-rw-r--r--host/include/uhd/transport/alignment_buffer.hpp69
-rw-r--r--host/include/uhd/transport/alignment_buffer.ipp144
-rw-r--r--host/include/uhd/transport/bounded_buffer.ipp13
-rw-r--r--host/lib/transport/udp_zero_copy_asio.cpp7
-rw-r--r--host/lib/transport/vrt_packet_handler.hpp1
-rw-r--r--host/lib/usrp/usrp2/fw_common.h4
-rw-r--r--host/lib/usrp/usrp2/io_impl.cpp203
-rw-r--r--host/lib/usrp/usrp2/mboard_impl.cpp7
-rw-r--r--host/lib/usrp/usrp2/usrp2_impl.cpp28
-rw-r--r--host/lib/usrp/usrp2/usrp2_impl.hpp6
-rw-r--r--host/test/buffer_test.cpp51
16 files changed, 247 insertions, 338 deletions
diff --git a/firmware/microblaze/apps/txrx_uhd.c b/firmware/microblaze/apps/txrx_uhd.c
index 517b50e91..7dc44f09f 100644
--- a/firmware/microblaze/apps/txrx_uhd.c
+++ b/firmware/microblaze/apps/txrx_uhd.c
@@ -55,15 +55,26 @@ static void setup_network(void);
static eth_mac_addr_t fp_mac_addr_src, fp_mac_addr_dst;
extern struct socket_address fp_socket_src, fp_socket_dst;
-void handle_udp_data_packet(
+static void handle_udp_err0_packet(
+ struct socket_address src, struct socket_address dst,
+ unsigned char *payload, int payload_len
+){
+ sr_udp_sm->err0_port = (((uint32_t)dst.port) << 16) | src.port;
+ printf("Storing for async error path:\n");
+ printf(" source udp port: %d\n", dst.port);
+ printf(" destination udp port: %d\n", src.port);
+ newline();
+}
+
+static void handle_udp_data_packet(
struct socket_address src, struct socket_address dst,
unsigned char *payload, int payload_len
){
- //its a tiny payload, load the fast-path variables
fp_mac_addr_src = *ethernet_mac_addr();
arp_cache_lookup_mac(&src.addr, &fp_mac_addr_dst);
fp_socket_src = dst;
fp_socket_dst = src;
+ sr_udp_sm->dsp0_port = (((uint32_t)dst.port) << 16) | src.port;
printf("Storing for fast path:\n");
printf(" source mac addr: ");
print_mac_addr(fp_mac_addr_src.addr); newline();
@@ -77,7 +88,7 @@ void handle_udp_data_packet(
printf(" destination udp port: %d\n", fp_socket_dst.port);
newline();
- //setup network and vrt
+ //setup network
setup_network();
}
@@ -85,7 +96,7 @@ void handle_udp_data_packet(
#define OTW_GPIO_BANK_TO_NUM(bank) \
(((bank) == USRP2_DIR_RX)? (GPIO_RX_BANK) : (GPIO_TX_BANK))
-void handle_udp_ctrl_packet(
+static void handle_udp_ctrl_packet(
struct socket_address src, struct socket_address dst,
unsigned char *payload, int payload_len
){
@@ -319,8 +330,8 @@ static void setup_network(void){
sr_udp_sm->ip_hdr.checksum = UDP_SM_INS_IP_HDR_CHKSUM | (chksum & 0xffff);
//setup the udp header machine
- sr_udp_sm->udp_hdr.src_port = fp_socket_src.port;
- sr_udp_sm->udp_hdr.dst_port = fp_socket_dst.port;
+ sr_udp_sm->udp_hdr.src_port = UDP_SM_INS_UDP_SRC_PORT;
+ sr_udp_sm->udp_hdr.dst_port = UDP_SM_INS_UDP_DST_PORT;
sr_udp_sm->udp_hdr.length = UDP_SM_INS_UDP_LEN;
sr_udp_sm->udp_hdr.checksum = UDP_SM_LAST_WORD; // zero UDP checksum
}
@@ -354,6 +365,7 @@ main(void)
//2) register callbacks for udp ports we service
register_udp_listener(USRP2_UDP_CTRL_PORT, handle_udp_ctrl_packet);
register_udp_listener(USRP2_UDP_DATA_PORT, handle_udp_data_packet);
+ register_udp_listener(USRP2_UDP_ERR0_PORT, handle_udp_err0_packet);
register_udp_listener(USRP2_UDP_UPDATE_PORT, handle_udp_fw_update_packet);
//3) set the routing mode to slave and send a garp
diff --git a/firmware/microblaze/usrp2/memory_map.h b/firmware/microblaze/usrp2/memory_map.h
index 23d96389f..a2de29cdb 100644
--- a/firmware/microblaze/usrp2/memory_map.h
+++ b/firmware/microblaze/usrp2/memory_map.h
@@ -330,11 +330,21 @@ typedef struct {
uint32_t length;
uint32_t checksum; //word 22
} udp_hdr;
- volatile uint32_t _pad[32-23];
+ volatile uint32_t _pad[1];
+ volatile uint32_t dsp0_port;
+ volatile uint32_t err0_port;
+ volatile uint32_t dsp1_port;
+ volatile uint32_t err1_port;
} sr_udp_sm_t;
// control bits (all expect UDP_SM_LAST_WORD are mutually exclusive)
+// Insert a UDP source port from the table
+#define UDP_SM_INS_UDP_SRC_PORT (1 << 21)
+
+// Insert a UDP dest port from the table
+#define UDP_SM_INS_UDP_DST_PORT (1 << 20)
+
// This is the last word of the header
#define UDP_SM_LAST_WORD (1 << 19)
diff --git a/firmware/microblaze/usrp2p/memory_map.h b/firmware/microblaze/usrp2p/memory_map.h
index 2b5ae57be..6f5c577e6 100644
--- a/firmware/microblaze/usrp2p/memory_map.h
+++ b/firmware/microblaze/usrp2p/memory_map.h
@@ -323,11 +323,21 @@ typedef struct {
uint32_t length;
uint32_t checksum; //word 22
} udp_hdr;
- volatile uint32_t _pad[32-23];
+ volatile uint32_t _pad[1];
+ volatile uint32_t dsp0_port;
+ volatile uint32_t err0_port;
+ volatile uint32_t dsp1_port;
+ volatile uint32_t err1_port;
} sr_udp_sm_t;
// control bits (all expect UDP_SM_LAST_WORD are mutually exclusive)
+// Insert a UDP source port from the table
+#define UDP_SM_INS_UDP_SRC_PORT (1 << 21)
+
+// Insert a UDP dest port from the table
+#define UDP_SM_INS_UDP_DST_PORT (1 << 20)
+
// This is the last word of the header
#define UDP_SM_LAST_WORD (1 << 19)
diff --git a/host/docs/transport.rst b/host/docs/transport.rst
index d9abd4923..018f909c1 100644
--- a/host/docs/transport.rst
+++ b/host/docs/transport.rst
@@ -36,7 +36,7 @@ The following parameters can be used to alter the transport's default behavior:
* **num_send_frames:** The number of send buffers to allocate
* **concurrency_hint:** The number of threads to run the IO service
-**Note:** num_send_frames and concurrency_hint will not have an effect
+**Note:** num_send_frames will not have an effect
as the asynchronous send implementation is currently disabled.
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
diff --git a/host/include/uhd/transport/CMakeLists.txt b/host/include/uhd/transport/CMakeLists.txt
index 2c84c0724..ec3b7b113 100644
--- a/host/include/uhd/transport/CMakeLists.txt
+++ b/host/include/uhd/transport/CMakeLists.txt
@@ -17,8 +17,6 @@
INSTALL(FILES
- alignment_buffer.hpp
- alignment_buffer.ipp
bounded_buffer.hpp
bounded_buffer.ipp
convert_types.hpp
diff --git a/host/include/uhd/transport/alignment_buffer.hpp b/host/include/uhd/transport/alignment_buffer.hpp
deleted file mode 100644
index f44a037f8..000000000
--- a/host/include/uhd/transport/alignment_buffer.hpp
+++ /dev/null
@@ -1,69 +0,0 @@
-//
-// Copyright 2010 Ettus Research LLC
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see <http://www.gnu.org/licenses/>.
-//
-
-#ifndef INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP
-#define INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP
-
-#include <uhd/config.hpp>
-#include <uhd/transport/bounded_buffer.hpp> //time_duration_t
-#include <boost/shared_ptr.hpp>
-#include <vector>
-
-namespace uhd{ namespace transport{
-
- /*!
- * Implement a templated alignment buffer:
- * Used for aligning asynchronously pushed elements with matching ids.
- */
- template <typename elem_type, typename seq_type> class alignment_buffer{
- public:
- typedef boost::shared_ptr<alignment_buffer<elem_type, seq_type> > sptr;
-
- /*!
- * Make a new alignment buffer object.
- * \param capacity the maximum elements per index
- * \param width the number of elements to align
- */
- static sptr make(size_t capacity, size_t width);
-
- /*!
- * Push an element with sequence id into the buffer at index.
- * \param elem the element to push
- * \param seq the sequence identifier
- * \param index the buffer index
- * \return true if the element fit without popping for space
- */
- virtual bool push_with_pop_on_full(
- const elem_type &elem, const seq_type &seq, size_t index
- ) = 0;
-
- /*!
- * Pop an aligned set of elements from this alignment buffer.
- * \param elems a collection to store the aligned elements
- * \param timeout the timeout in seconds
- * \return false when the operation times out
- */
- virtual bool pop_elems_with_timed_wait(
- std::vector<elem_type> &elems, double timeout
- ) = 0;
- };
-
-}} //namespace
-
-#include <uhd/transport/alignment_buffer.ipp>
-
-#endif /* INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP */
diff --git a/host/include/uhd/transport/alignment_buffer.ipp b/host/include/uhd/transport/alignment_buffer.ipp
deleted file mode 100644
index 833b5d399..000000000
--- a/host/include/uhd/transport/alignment_buffer.ipp
+++ /dev/null
@@ -1,144 +0,0 @@
-//
-// Copyright 2010 Ettus Research LLC
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see <http://www.gnu.org/licenses/>.
-//
-
-#ifndef INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_IPP
-#define INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_IPP
-
-#include <uhd/transport/bounded_buffer.hpp>
-#include <boost/thread/condition_variable.hpp>
-#include <utility>
-
-namespace uhd{ namespace transport{ namespace{ /*anon*/
-
- /*!
- * Imlement a templated alignment buffer:
- * Used for aligning asynchronously pushed elements with matching ids.
- */
- template <typename elem_type, typename seq_type>
- class alignment_buffer_impl : public alignment_buffer<elem_type, seq_type>{
- public:
-
- alignment_buffer_impl(size_t capacity, size_t width) : _last_seqs(width){
- for (size_t i = 0; i < width; i++){
- _buffs.push_back(bounded_buffer<buff_contents_type>::make(capacity));
- _all_indexes.push_back(i);
- }
- _there_was_a_clear = false;
- }
-
- UHD_INLINE bool push_with_pop_on_full(
- const elem_type &elem, const seq_type &seq, size_t index
- ){
- //clear the buffer for this index if the seqs are mis-ordered
- if (seq < _last_seqs[index]){
- _buffs[index]->clear();
- _there_was_a_clear = true;
- } _last_seqs[index] = seq;
- return _buffs[index]->push_with_pop_on_full(buff_contents_type(elem, seq));
- }
-
- UHD_INLINE bool pop_elems_with_timed_wait(
- std::vector<elem_type> &elems, double timeout
- ){
- boost::system_time exit_time = boost::get_system_time() + to_time_dur(timeout);
- buff_contents_type buff_contents_tmp;
- std::list<size_t> indexes_to_do(_all_indexes);
-
- //do an initial pop to load an initial sequence id
- size_t index = indexes_to_do.front();
- if (not _buffs[index]->pop_with_timed_wait(
- buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time())
- )) return false;
- elems[index] = buff_contents_tmp.first;
- seq_type expected_seq_id = buff_contents_tmp.second;
- indexes_to_do.pop_front();
-
- //get an aligned set of elements from the buffers:
- while(indexes_to_do.size() != 0){
-
- //respond to a clear by starting from scratch
- if(_there_was_a_clear){
- _there_was_a_clear = false;
- indexes_to_do = _all_indexes;
- index = indexes_to_do.front();
- if (not _buffs[index]->pop_with_timed_wait(
- buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time())
- )) return false;
- elems[index] = buff_contents_tmp.first;
- expected_seq_id = buff_contents_tmp.second;
- indexes_to_do.pop_front();
- }
-
- //pop an element off for this index
- index = indexes_to_do.front();
- if (not _buffs[index]->pop_with_timed_wait(
- buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time())
- )) return false;
-
- //if the sequence id matches:
- // store the popped element into the output,
- // remove this index from the list and continue
- if (buff_contents_tmp.second == expected_seq_id){
- elems[index] = buff_contents_tmp.first;
- indexes_to_do.pop_front();
- continue;
- }
-
- //if the sequence id is older:
- // continue with the same index to try again
- if (buff_contents_tmp.second < expected_seq_id){
- continue;
- }
-
- //if the sequence id is newer:
- // store the popped element into the output,
- // add all other indexes back into the list
- if (buff_contents_tmp.second > expected_seq_id){
- elems[index] = buff_contents_tmp.first;
- expected_seq_id = buff_contents_tmp.second;
- indexes_to_do = _all_indexes;
- indexes_to_do.remove(index);
- continue;
- }
- }
- return true;
- }
-
- private:
- //a vector of bounded buffers for each index
- typedef std::pair<elem_type, seq_type> buff_contents_type;
- std::vector<typename bounded_buffer<buff_contents_type>::sptr> _buffs;
- std::vector<seq_type> _last_seqs;
- std::list<size_t> _all_indexes;
- bool _there_was_a_clear;
- };
-
-}}} //namespace
-
-namespace uhd{ namespace transport{
-
- template <typename elem_type, typename seq_type>
- typename alignment_buffer<elem_type, seq_type>::sptr
- alignment_buffer<elem_type, seq_type>::make(size_t capacity, size_t width){
- return typename alignment_buffer<elem_type, seq_type>::sptr(
- new alignment_buffer_impl<elem_type, seq_type>(capacity, width)
- );
- }
-
-}} //namespace
-
-#endif /* INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_IPP */
diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp
index edc7faa06..f7915d866 100644
--- a/host/include/uhd/transport/bounded_buffer.ipp
+++ b/host/include/uhd/transport/bounded_buffer.ipp
@@ -26,14 +26,6 @@
namespace uhd{ namespace transport{ namespace{ /*anon*/
- static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){
- return boost::posix_time::microseconds(long(timeout*1e6));
- }
-
- static UHD_INLINE double from_time_dur(const boost::posix_time::time_duration &time_dur){
- return 1e-6*time_dur.total_microseconds();
- }
-
template <typename elem_type>
class bounded_buffer_impl : public bounded_buffer<elem_type>{
public:
@@ -127,6 +119,11 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/
_buffer.pop_back();
return elem;
}
+
+ static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){
+ return boost::posix_time::microseconds(long(timeout*1e6));
+ }
+
};
}}} //namespace
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp
index c758fa894..bbd63836c 100644
--- a/host/lib/transport/udp_zero_copy_asio.cpp
+++ b/host/lib/transport/udp_zero_copy_asio.cpp
@@ -37,14 +37,15 @@ namespace asio = boost::asio;
**********************************************************************/
//Define this to the the boost async io calls to perform receive.
//Otherwise, get_recv_buff uses a blocking receive with timeout.
-//#define USE_ASIO_ASYNC_RECV
+#define USE_ASIO_ASYNC_RECV
//Define this to the the boost async io calls to perform send.
//Otherwise, the commit callback uses a blocking send.
//#define USE_ASIO_ASYNC_SEND
-//enough buffering for half a second of samples at full rate on usrp2
-static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5);
+//By default, this buffer is sized insufficiently small.
+//For peformance, this buffer should be 10s of megabytes.
+static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(10e3);
//Large buffers cause more underflow at high rates.
//Perhaps this is due to the kernel scheduling,
diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp
index 278bcfeaa..7f8d84308 100644
--- a/host/lib/transport/vrt_packet_handler.hpp
+++ b/host/lib/transport/vrt_packet_handler.hpp
@@ -91,6 +91,7 @@ template <typename T> UHD_INLINE T get_context_code(
//vrt unpack each managed buffer
uhd::transport::vrt::if_packet_info_t if_packet_info;
for (size_t i = 0; i < state.width; i++){
+ if (state.managed_buffs[i].get() == NULL) continue; //better have a message packet coming up...
//extract packet words and check thats its enough to move on
size_t num_packet_words32 = state.managed_buffs[i]->size()/sizeof(boost::uint32_t);
diff --git a/host/lib/usrp/usrp2/fw_common.h b/host/lib/usrp/usrp2/fw_common.h
index efbb4b954..ee7fc3882 100644
--- a/host/lib/usrp/usrp2/fw_common.h
+++ b/host/lib/usrp/usrp2/fw_common.h
@@ -42,7 +42,9 @@ extern "C" {
// udp ports for the usrp2 communication
// Dynamic and/or private ports: 49152-65535
#define USRP2_UDP_CTRL_PORT 49152
-#define USRP2_UDP_DATA_PORT 49153
+//#define USRP2_UDP_UPDATE_PORT 49154
+#define USRP2_UDP_DATA_PORT 49156
+#define USRP2_UDP_ERR0_PORT 49157
////////////////////////////////////////////////////////////////////////
// I2C addresses
diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp
index cbc0a0817..5a6c0983c 100644
--- a/host/lib/usrp/usrp2/io_impl.cpp
+++ b/host/lib/usrp/usrp2/io_impl.cpp
@@ -21,11 +21,13 @@
#include <uhd/utils/byteswap.hpp>
#include <uhd/utils/thread_priority.hpp>
#include <uhd/transport/convert_types.hpp>
-#include <uhd/transport/alignment_buffer.hpp>
+#include <uhd/transport/bounded_buffer.hpp>
#include <boost/format.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
+#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <iostream>
+#include <list>
using namespace uhd;
using namespace uhd::usrp;
@@ -108,16 +110,24 @@ private:
* - vrt packet handler states
**********************************************************************/
struct usrp2_impl::io_impl{
- typedef alignment_buffer<managed_recv_buffer::sptr, time_spec_t> alignment_buffer_type;
- io_impl(size_t num_recv_frames, size_t send_frame_size, size_t width):
+ io_impl(size_t send_frame_size, size_t width):
packet_handler_recv_state(width),
- recv_pirate_booty(alignment_buffer_type::make(num_recv_frames-3, width)),
async_msg_fifo(bounded_buffer<async_metadata_t>::make(100/*messages deep*/))
{
- for (size_t i = 0; i < width; i++) fc_mons.push_back(
- flow_control_monitor::sptr(new flow_control_monitor(usrp2_impl::sram_bytes/send_frame_size))
- );
+ for (size_t i = 0; i < width; i++){
+ fc_mons.push_back(flow_control_monitor::sptr(
+ new flow_control_monitor(usrp2_impl::sram_bytes/send_frame_size)
+ ));
+ //init empty packet infos
+ vrt::if_packet_info_t packet_info;
+ packet_info.packet_count = 0xf;
+ packet_info.has_tsi = true;
+ packet_info.tsi = 0;
+ packet_info.has_tsf = true;
+ packet_info.tsf = 0;
+ prev_infos.push_back(packet_info);
+ }
}
~io_impl(void){
@@ -126,11 +136,6 @@ struct usrp2_impl::io_impl{
recv_pirate_crew.join_all();
}
- bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs, double timeout){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- return recv_pirate_booty->pop_elems_with_timed_wait(buffs, timeout);
- }
-
bool get_send_buffs(
const std::vector<zero_copy_if::sptr> &trans,
vrt_packet_handler::managed_send_buffs_t &buffs,
@@ -151,6 +156,15 @@ struct usrp2_impl::io_impl{
return true;
}
+ bool get_recv_buffs(
+ const std::vector<zero_copy_if::sptr> &xports,
+ vrt_packet_handler::managed_recv_buffs_t &buffs,
+ double timeout
+ );
+
+ //previous state for each buffer
+ std::vector<vrt::if_packet_info_t> prev_infos;
+
//flow control monitors
std::vector<flow_control_monitor::sptr> fc_mons;
@@ -162,29 +176,28 @@ struct usrp2_impl::io_impl{
void recv_pirate_loop(zero_copy_if::sptr, usrp2_mboard_impl::sptr, size_t);
boost::thread_group recv_pirate_crew;
bool recv_pirate_crew_raiding;
- alignment_buffer_type::sptr recv_pirate_booty;
bounded_buffer<async_metadata_t>::sptr async_msg_fifo;
boost::mutex spawn_mutex;
};
/***********************************************************************
* Receive Pirate Loop
- * - while raiding, loot for recv buffers
- * - put booty into the alignment buffer
+ * - while raiding, loot for message packet
+ * - update flow control condition count
+ * - put async message packets into queue
**********************************************************************/
void usrp2_impl::io_impl::recv_pirate_loop(
- zero_copy_if::sptr zc_if,
+ zero_copy_if::sptr zc_if_err0,
usrp2_mboard_impl::sptr mboard,
size_t index
){
set_thread_priority_safe();
recv_pirate_crew_raiding = true;
- size_t next_packet_seq = 0;
spawn_mutex.unlock();
while(recv_pirate_crew_raiding){
- managed_recv_buffer::sptr buff = zc_if->get_recv_buff();
+ managed_recv_buffer::sptr buff = zc_if_err0->get_recv_buff();
if (not buff.get()) continue; //ignore timeout/error buffers
try{
@@ -194,26 +207,6 @@ void usrp2_impl::io_impl::recv_pirate_loop(
const boost::uint32_t *vrt_hdr = buff->cast<const boost::uint32_t *>();
vrt::if_hdr_unpack_be(vrt_hdr, if_packet_info);
- //handle the rx data stream
- if (if_packet_info.sid == usrp2_impl::RECV_SID){
- //handle the packet count / sequence number
- if (if_packet_info.packet_count != next_packet_seq){
- //std::cerr << "S" << (if_packet_info.packet_count - next_packet_seq)%16;
- std::cerr << "O" << std::flush; //report overflow (drops in the kernel)
- }
- next_packet_seq = (if_packet_info.packet_count+1)%16;
-
- //extract the timespec and round to the nearest packet
- UHD_ASSERT_THROW(if_packet_info.has_tsi and if_packet_info.has_tsf);
- time_spec_t time(
- time_t(if_packet_info.tsi), size_t(if_packet_info.tsf), mboard->get_master_clock_freq()
- );
-
- //push the packet into the buffer with the new time
- recv_pirate_booty->push_with_pop_on_full(buff, time, index);
- continue;
- }
-
//handle a tx async report message
if (if_packet_info.sid == usrp2_impl::ASYNC_SID and if_packet_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){
@@ -253,11 +246,10 @@ void usrp2_impl::io_impl::recv_pirate_loop(
void usrp2_impl::io_init(void){
//the assumption is that all data transports should be identical
- const size_t num_recv_frames = _data_transports.front()->get_num_recv_frames();
const size_t send_frame_size = _data_transports.front()->get_send_frame_size();
//create new io impl
- _io_impl = UHD_PIMPL_MAKE(io_impl, (num_recv_frames, send_frame_size, _data_transports.size()));
+ _io_impl = UHD_PIMPL_MAKE(io_impl, (send_frame_size, _data_transports.size()));
//TODO temporary fix for weird power up state, remove when FPGA fixed
{
@@ -276,7 +268,7 @@ void usrp2_impl::io_init(void){
//spawn a new pirate to plunder the recv booty
_io_impl->recv_pirate_crew.create_thread(boost::bind(
&usrp2_impl::io_impl::recv_pirate_loop,
- _io_impl.get(), _data_transports.at(i),
+ _io_impl.get(), _err0_transports.at(i),
_mboards.at(i), i
));
//block here until the spawned thread unlocks
@@ -328,6 +320,133 @@ size_t usrp2_impl::send(
}
/***********************************************************************
+ * Alignment logic on receive
+ **********************************************************************/
+static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){
+ return boost::posix_time::microseconds(long(timeout*1e6));
+}
+
+static UHD_INLINE double from_time_dur(const boost::posix_time::time_duration &time_dur){
+ return 1e-6*time_dur.total_microseconds();
+}
+
+static UHD_INLINE time_spec_t extract_time_spec(
+ const vrt::if_packet_info_t &packet_info
+){
+ return time_spec_t( //assumes has_tsi and has_tsf are true
+ time_t(packet_info.tsi), size_t(packet_info.tsf),
+ 100e6 //tick rate does not have to be correct for comparison purposes
+ );
+}
+
+static UHD_INLINE void extract_packet_info(
+ managed_recv_buffer::sptr &buff,
+ vrt::if_packet_info_t &prev_info,
+ time_spec_t &time, bool &clear, bool &msg
+){
+ //extract packet info
+ vrt::if_packet_info_t next_info;
+ next_info.num_packet_words32 = buff->size()/sizeof(boost::uint32_t);
+ vrt::if_hdr_unpack_be(buff->cast<const boost::uint32_t *>(), next_info);
+
+ //handle the packet count / sequence number
+ if ((prev_info.packet_count+1)%16 != next_info.packet_count){
+ std::cerr << "O" << std::flush; //report overflow (drops in the kernel)
+ }
+
+ time = extract_time_spec(next_info);
+ clear = extract_time_spec(prev_info) > time;
+ msg = next_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA;
+ prev_info = next_info;
+}
+
+static UHD_INLINE bool handle_msg_packet(
+ vrt_packet_handler::managed_recv_buffs_t &buffs, size_t index
+){
+ for (size_t i = 0; i < buffs.size(); i++){
+ if (i == index) continue;
+ buffs[i].reset(); //set NULL
+ }
+ return true;
+}
+
+UHD_INLINE bool usrp2_impl::io_impl::get_recv_buffs(
+ const std::vector<zero_copy_if::sptr> &xports,
+ vrt_packet_handler::managed_recv_buffs_t &buffs,
+ double timeout
+){
+ if (buffs.size() == 1){
+ buffs[0] = xports[0]->get_recv_buff(timeout);
+ if (buffs[0].get() == NULL) return false;
+ bool clear, msg; time_spec_t time; //unused variables
+ //call extract_packet_info to handle printing the overflows
+ extract_packet_info(buffs[0], this->prev_infos[0], time, clear, msg);
+ return true;
+ }
+ //-------------------- begin alignment logic ---------------------//
+ boost::system_time exit_time = boost::get_system_time() + to_time_dur(timeout);
+ managed_recv_buffer::sptr buff_tmp;
+ std::list<size_t> _all_indexes, indexes_to_do;
+ for (size_t i = 0; i < buffs.size(); i++) _all_indexes.push_back(i);
+ bool clear, msg;
+ time_spec_t expected_time;
+
+ //respond to a clear by starting from scratch
+ got_clear:
+ indexes_to_do = _all_indexes;
+ clear = false;
+
+ //do an initial pop to load an initial sequence id
+ size_t index = indexes_to_do.front();
+ buff_tmp = xports[index]->get_recv_buff(from_time_dur(exit_time - boost::get_system_time()));
+ if (buff_tmp.get() == NULL) return false;
+ extract_packet_info(buff_tmp, this->prev_infos[index], expected_time, clear, msg);
+ if (clear) goto got_clear;
+ buffs[index] = buff_tmp;
+ if (msg) return handle_msg_packet(buffs, index);
+ indexes_to_do.pop_front();
+
+ //get an aligned set of elements from the buffers:
+ while(indexes_to_do.size() != 0){
+
+ //pop an element off for this index
+ index = indexes_to_do.front();
+ buff_tmp = xports[index]->get_recv_buff(from_time_dur(exit_time - boost::get_system_time()));
+ if (buff_tmp.get() == NULL) return false;
+ time_spec_t this_time;
+ extract_packet_info(buff_tmp, this->prev_infos[index], this_time, clear, msg);
+ if (clear) goto got_clear;
+ buffs[index] = buff_tmp;
+ if (msg) return handle_msg_packet(buffs, index);
+
+ //if the sequence id matches:
+ // remove this index from the list and continue
+ if (this_time == expected_time){
+ indexes_to_do.pop_front();
+ continue;
+ }
+
+ //if the sequence id is older:
+ // continue with the same index to try again
+ else if (this_time < expected_time){
+ continue;
+ }
+
+ //if the sequence id is newer:
+ // use the new expected time for comparison
+ // add all other indexes back into the list
+ else{
+ expected_time = this_time;
+ indexes_to_do = _all_indexes;
+ indexes_to_do.remove(index);
+ continue;
+ }
+ }
+ return true;
+ //-------------------- end alignment logic -----------------------//
+}
+
+/***********************************************************************
* Receive Data
**********************************************************************/
size_t usrp2_impl::get_max_recv_samps_per_packet(void) const{
@@ -357,7 +476,7 @@ size_t usrp2_impl::recv(
io_type, _rx_otw_type, //input and output types to convert
_mboards.front()->get_master_clock_freq(), //master clock tick rate
uhd::transport::vrt::if_hdr_unpack_be,
- boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _1, timeout),
+ boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _data_transports, _1, timeout),
boost::bind(&handle_overflow, _mboards, _1)
);
}
diff --git a/host/lib/usrp/usrp2/mboard_impl.cpp b/host/lib/usrp/usrp2/mboard_impl.cpp
index b8ebd6030..72d1c9d03 100644
--- a/host/lib/usrp/usrp2/mboard_impl.cpp
+++ b/host/lib/usrp/usrp2/mboard_impl.cpp
@@ -42,6 +42,7 @@ usrp2_mboard_impl::usrp2_mboard_impl(
size_t index,
transport::udp_simple::sptr ctrl_transport,
transport::zero_copy_if::sptr data_transport,
+ transport::zero_copy_if::sptr err0_transport,
const device_addr_t &device_args,
size_t recv_samps_per_packet
):
@@ -51,11 +52,15 @@ usrp2_mboard_impl::usrp2_mboard_impl(
//Send a small data packet so the usrp2 knows the udp source port.
//This setup must happen before further initialization occurs
//or the async update packets will cause ICMP destination unreachable.
- transport::managed_send_buffer::sptr send_buff = data_transport->get_send_buff();
+ transport::managed_send_buffer::sptr send_buff;
static const boost::uint32_t data[2] = {
uhd::htonx(boost::uint32_t(0 /* don't care seq num */)),
uhd::htonx(boost::uint32_t(USRP2_INVALID_VRT_HEADER))
};
+ send_buff = data_transport->get_send_buff();
+ std::memcpy(send_buff->cast<void*>(), &data, sizeof(data));
+ send_buff->commit(sizeof(data));
+ send_buff = err0_transport->get_send_buff();
std::memcpy(send_buff->cast<void*>(), &data, sizeof(data));
send_buff->commit(sizeof(data));
diff --git a/host/lib/usrp/usrp2/usrp2_impl.cpp b/host/lib/usrp/usrp2/usrp2_impl.cpp
index 133c39a35..f910999d4 100644
--- a/host/lib/usrp/usrp2/usrp2_impl.cpp
+++ b/host/lib/usrp/usrp2/usrp2_impl.cpp
@@ -197,10 +197,18 @@ static device_addrs_t usrp2_find(const device_addr_t &hint_){
* Make
**********************************************************************/
static device::sptr usrp2_make(const device_addr_t &device_addr){
-sep_indexed_dev_addrs(device_addr);
+
+ //setup the dsp transport hints (default to a large recv buff)
+ device_addr_t dsp_xport_hints = device_addr;
+ if (not dsp_xport_hints.has_key("recv_buff_size")){
+ //set to half-a-second of buffering at max rate
+ dsp_xport_hints["recv_buff_size"] = "50e6";
+ }
+
//create a ctrl and data transport for each address
std::vector<udp_simple::sptr> ctrl_transports;
std::vector<zero_copy_if::sptr> data_transports;
+ std::vector<zero_copy_if::sptr> err0_transports;
const device_addrs_t device_addrs = sep_indexed_dev_addrs(device_addr);
BOOST_FOREACH(const device_addr_t &dev_addr_i, device_addrs){
@@ -208,14 +216,17 @@ sep_indexed_dev_addrs(device_addr);
dev_addr_i["addr"], num2str(USRP2_UDP_CTRL_PORT)
));
data_transports.push_back(udp_zero_copy::make(
- dev_addr_i["addr"], num2str(USRP2_UDP_DATA_PORT), device_addr
+ dev_addr_i["addr"], num2str(USRP2_UDP_DATA_PORT), dsp_xport_hints
+ ));
+ err0_transports.push_back(udp_zero_copy::make(
+ dev_addr_i["addr"], num2str(USRP2_UDP_ERR0_PORT), device_addr_t()
));
}
//create the usrp2 implementation guts
- return device::sptr(
- new usrp2_impl(ctrl_transports, data_transports, device_addrs)
- );
+ return device::sptr(new usrp2_impl(
+ ctrl_transports, data_transports, err0_transports, device_addrs
+ ));
}
UHD_STATIC_BLOCK(register_usrp2_device){
@@ -228,9 +239,11 @@ UHD_STATIC_BLOCK(register_usrp2_device){
usrp2_impl::usrp2_impl(
std::vector<udp_simple::sptr> ctrl_transports,
std::vector<zero_copy_if::sptr> data_transports,
+ std::vector<zero_copy_if::sptr> err0_transports,
const device_addrs_t &device_args
):
- _data_transports(data_transports)
+ _data_transports(data_transports),
+ _err0_transports(err0_transports)
{
//setup rx otw type
_rx_otw_type.width = 16;
@@ -247,7 +260,8 @@ usrp2_impl::usrp2_impl(
//create a new mboard handler for each control transport
for(size_t i = 0; i < device_args.size(); i++){
_mboards.push_back(usrp2_mboard_impl::sptr(new usrp2_mboard_impl(
- i, ctrl_transports[i], data_transports[i], device_args[i],
+ i, ctrl_transports[i], data_transports[i],
+ err0_transports[i], device_args[i],
this->get_max_recv_samps_per_packet()
)));
//use an empty name when there is only one mboard
diff --git a/host/lib/usrp/usrp2/usrp2_impl.hpp b/host/lib/usrp/usrp2/usrp2_impl.hpp
index 85c00b079..9cd27ee41 100644
--- a/host/lib/usrp/usrp2/usrp2_impl.hpp
+++ b/host/lib/usrp/usrp2/usrp2_impl.hpp
@@ -85,6 +85,7 @@ public:
size_t index,
uhd::transport::udp_simple::sptr,
uhd::transport::zero_copy_if::sptr,
+ uhd::transport::zero_copy_if::sptr,
const uhd::device_addr_t &device_args,
size_t recv_samps_per_packet
);
@@ -186,11 +187,13 @@ public:
* Create a new usrp2 impl base.
* \param ctrl_transports the udp transports for control
* \param data_transports the udp transports for data
- * \param flow_control_hints optional flow control params
+ * \param err0_transports the udp transports for error
+ * \param device_args optional misc device parameters
*/
usrp2_impl(
std::vector<uhd::transport::udp_simple::sptr> ctrl_transports,
std::vector<uhd::transport::zero_copy_if::sptr> data_transports,
+ std::vector<uhd::transport::zero_copy_if::sptr> err0_transports,
const uhd::device_addrs_t &device_args
);
@@ -222,6 +225,7 @@ private:
//io impl methods and members
std::vector<uhd::transport::zero_copy_if::sptr> _data_transports;
+ std::vector<uhd::transport::zero_copy_if::sptr> _err0_transports;
uhd::otw_type_t _rx_otw_type, _tx_otw_type;
UHD_PIMPL_DECL(io_impl) _io_impl;
void io_init(void);
diff --git a/host/test/buffer_test.cpp b/host/test/buffer_test.cpp
index 8445412e7..e7bc88699 100644
--- a/host/test/buffer_test.cpp
+++ b/host/test/buffer_test.cpp
@@ -17,7 +17,6 @@
#include <boost/test/unit_test.hpp>
#include <uhd/transport/bounded_buffer.hpp>
-#include <uhd/transport/alignment_buffer.hpp>
#include <boost/assign/list_of.hpp>
using namespace boost::assign;
@@ -63,53 +62,3 @@ BOOST_AUTO_TEST_CASE(test_bounded_buffer_with_pop_on_full){
BOOST_CHECK(bb->pop_with_timed_wait(val, timeout));
BOOST_CHECK_EQUAL(val, 3);
}
-
-BOOST_AUTO_TEST_CASE(test_alignment_buffer){
- alignment_buffer<int, size_t>::sptr ab(alignment_buffer<int, size_t>::make(7, 3));
- //load index 0 with all good seq numbers
- BOOST_CHECK(ab->push_with_pop_on_full(0, 0, 0));
- BOOST_CHECK(ab->push_with_pop_on_full(1, 1, 0));
- BOOST_CHECK(ab->push_with_pop_on_full(2, 2, 0));
- BOOST_CHECK(ab->push_with_pop_on_full(3, 3, 0));
- BOOST_CHECK(ab->push_with_pop_on_full(4, 4, 0));
-
- //load index 1 with some skipped seq numbers
- BOOST_CHECK(ab->push_with_pop_on_full(10, 0, 1));
- BOOST_CHECK(ab->push_with_pop_on_full(11, 1, 1));
- BOOST_CHECK(ab->push_with_pop_on_full(14, 4, 1));
- BOOST_CHECK(ab->push_with_pop_on_full(15, 5, 1));
- BOOST_CHECK(ab->push_with_pop_on_full(16, 6, 1));
-
- //load index 2 with all good seq numbers
- BOOST_CHECK(ab->push_with_pop_on_full(20, 0, 2));
- BOOST_CHECK(ab->push_with_pop_on_full(21, 1, 2));
- BOOST_CHECK(ab->push_with_pop_on_full(22, 2, 2));
- BOOST_CHECK(ab->push_with_pop_on_full(23, 3, 2));
- BOOST_CHECK(ab->push_with_pop_on_full(24, 4, 2));
-
- //readback aligned values
- std::vector<int> aligned_elems(3);
-
- static const std::vector<int> expected_elems0 = list_of(0)(10)(20);
- BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout));
- BOOST_CHECK_EQUAL_COLLECTIONS(
- aligned_elems.begin(), aligned_elems.end(),
- expected_elems0.begin(), expected_elems0.end()
- );
-
- static const std::vector<int> expected_elems1 = list_of(1)(11)(21);
- BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout));
- BOOST_CHECK_EQUAL_COLLECTIONS(
- aligned_elems.begin(), aligned_elems.end(),
- expected_elems1.begin(), expected_elems1.end()
- );
-
- //there was a skip now find 4
-
- static const std::vector<int> expected_elems4 = list_of(4)(14)(24);
- BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout));
- BOOST_CHECK_EQUAL_COLLECTIONS(
- aligned_elems.begin(), aligned_elems.end(),
- expected_elems4.begin(), expected_elems4.end()
- );
-}