aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosh Blum <josh@joshknows.com>2013-07-15 15:44:42 -0700
committerJosh Blum <josh@joshknows.com>2013-07-15 15:44:42 -0700
commitc464a63e87e32bea2b4c430ff29b0b5e0829893a (patch)
tree9e6967a7e9f1e4d2b17c43d3bb497709f2fdeb8a
parent1f97c6c1e1ed05dbef94f74f52f1ed3370ebfd26 (diff)
downloaduhd-c464a63e87e32bea2b4c430ff29b0b5e0829893a.tar.gz
uhd-c464a63e87e32bea2b4c430ff29b0b5e0829893a.tar.bz2
uhd-c464a63e87e32bea2b4c430ff29b0b5e0829893a.zip
uhd: added new calls to streamer object + support work
* The transmit streamer gives access to the async msg queue. * The receive streamer gives access to the issue stream cmd. * Supporting usrp implementation files updated. * Example applications updated to use this API.
-rw-r--r--host/examples/benchmark_rate.cpp62
-rw-r--r--host/examples/latency_test.cpp4
-rw-r--r--host/examples/rx_ascii_art_dft.cpp4
-rw-r--r--host/examples/rx_multi_samples.cpp2
-rw-r--r--host/examples/rx_samples_to_file.cpp2
-rw-r--r--host/examples/rx_samples_to_udp.cpp2
-rw-r--r--host/examples/rx_timed_samples.cpp2
-rw-r--r--host/examples/test_messages.cpp20
-rw-r--r--host/examples/test_timed_commands.cpp2
-rw-r--r--host/examples/transport_hammer.cpp47
-rw-r--r--host/examples/tx_bursts.cpp2
-rw-r--r--host/examples/tx_timed_samples.cpp2
-rw-r--r--host/examples/txrx_loopback_to_file.cpp2
-rw-r--r--host/include/uhd/device.hpp10
-rw-r--r--host/include/uhd/device_deprecated.ipp10
-rw-r--r--host/include/uhd/exception.hpp4
-rw-r--r--host/include/uhd/stream.hpp26
-rw-r--r--host/include/uhd/transport/bounded_buffer.ipp6
-rw-r--r--host/include/uhd/transport/vrt_if_packet.hpp43
-rw-r--r--host/lib/transport/gen_vrt_if_packet.py140
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp50
-rw-r--r--host/lib/transport/super_send_packet_handler.hpp41
-rw-r--r--host/lib/usrp/CMakeLists.txt2
-rw-r--r--host/lib/usrp/b100/io_impl.cpp3
-rw-r--r--host/lib/usrp/e100/io_impl.cpp3
-rw-r--r--host/lib/usrp/usrp1/io_impl.cpp11
-rw-r--r--host/lib/usrp/usrp2/io_impl.cpp3
-rw-r--r--host/tests/sph_recv_test.cpp2
-rw-r--r--host/tests/vrt_test.cpp59
29 files changed, 442 insertions, 124 deletions
diff --git a/host/examples/benchmark_rate.cpp b/host/examples/benchmark_rate.cpp
index ed3a8580a..3a69e001b 100644
--- a/host/examples/benchmark_rate.cpp
+++ b/host/examples/benchmark_rate.cpp
@@ -42,37 +42,32 @@ unsigned long long num_seq_errors = 0;
/***********************************************************************
* Benchmark RX Rate
**********************************************************************/
-void benchmark_rx_rate(uhd::usrp::multi_usrp::sptr usrp, const std::string &rx_cpu, const std::string &rx_otw){
+void benchmark_rx_rate(uhd::usrp::multi_usrp::sptr usrp, const std::string &rx_cpu, uhd::rx_streamer::sptr rx_stream){
uhd::set_thread_priority_safe();
- //create a receive streamer
- uhd::stream_args_t stream_args(rx_cpu, rx_otw);
- for (size_t ch = 0; ch < usrp->get_num_mboards(); ch++) //linear channel mapping
- stream_args.channels.push_back(ch);
- uhd::rx_streamer::sptr rx_stream = usrp->get_rx_stream(stream_args);
-
//print pre-test summary
std::cout << boost::format(
- "Testing receive rate %f Msps"
- ) % (usrp->get_rx_rate()/1e6) << std::endl;
+ "Testing receive rate %f Msps on %u channels"
+ ) % (usrp->get_rx_rate()/1e6) % rx_stream->get_num_channels() << std::endl;
//setup variables and allocate buffer
uhd::rx_metadata_t md;
const size_t max_samps_per_packet = rx_stream->get_max_num_samps();
std::vector<char> buff(max_samps_per_packet*uhd::convert::get_bytes_per_item(rx_cpu));
std::vector<void *> buffs;
- for (size_t ch = 0; ch < stream_args.channels.size(); ch++)
+ for (size_t ch = 0; ch < rx_stream->get_num_channels(); ch++)
buffs.push_back(&buff.front()); //same buffer for each channel
bool had_an_overflow = false;
uhd::time_spec_t last_time;
const double rate = usrp->get_rx_rate();
+ issue_new_stream_cmd:
uhd::stream_cmd_t cmd(uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS);
cmd.time_spec = usrp->get_time_now() + uhd::time_spec_t(0.05);
cmd.stream_now = (buffs.size() == 1);
- usrp->issue_stream_cmd(cmd);
+ rx_stream->issue_stream_cmd(cmd);
while (not boost::this_thread::interruption_requested()){
- num_rx_samps += rx_stream->recv(buffs, max_samps_per_packet, md);
+ num_rx_samps += rx_stream->recv(buffs, max_samps_per_packet, md)*rx_stream->get_num_channels();
//handle the error codes
switch(md.error_code){
@@ -87,6 +82,11 @@ void benchmark_rx_rate(uhd::usrp::multi_usrp::sptr usrp, const std::string &rx_c
had_an_overflow = true;
last_time = md.time_spec;
num_overflows++;
+ if (rx_stream->get_num_channels() > 1)
+ {
+ rx_stream->issue_stream_cmd(uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS);
+ goto issue_new_stream_cmd;
+ }
break;
default:
@@ -96,25 +96,19 @@ void benchmark_rx_rate(uhd::usrp::multi_usrp::sptr usrp, const std::string &rx_c
}
}
- usrp->issue_stream_cmd(uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS);
+ rx_stream->issue_stream_cmd(uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS);
}
/***********************************************************************
* Benchmark TX Rate
**********************************************************************/
-void benchmark_tx_rate(uhd::usrp::multi_usrp::sptr usrp, const std::string &tx_cpu, const std::string &tx_otw){
+void benchmark_tx_rate(uhd::usrp::multi_usrp::sptr usrp, const std::string &tx_cpu, uhd::tx_streamer::sptr tx_stream){
uhd::set_thread_priority_safe();
- //create a transmit streamer
- uhd::stream_args_t stream_args(tx_cpu, tx_otw);
- for (size_t ch = 0; ch < usrp->get_num_mboards(); ch++) //linear channel mapping
- stream_args.channels.push_back(ch);
- uhd::tx_streamer::sptr tx_stream = usrp->get_tx_stream(stream_args);
-
//print pre-test summary
std::cout << boost::format(
- "Testing transmit rate %f Msps"
- ) % (usrp->get_tx_rate()/1e6) << std::endl;
+ "Testing transmit rate %f Msps on %u channels"
+ ) % (usrp->get_tx_rate()/1e6) % tx_stream->get_num_channels() << std::endl;
//setup variables and allocate buffer
uhd::tx_metadata_t md;
@@ -122,12 +116,12 @@ void benchmark_tx_rate(uhd::usrp::multi_usrp::sptr usrp, const std::string &tx_c
const size_t max_samps_per_packet = tx_stream->get_max_num_samps();
std::vector<char> buff(max_samps_per_packet*uhd::convert::get_bytes_per_item(tx_cpu));
std::vector<const void *> buffs;
- for (size_t ch = 0; ch < stream_args.channels.size(); ch++)
+ for (size_t ch = 0; ch < tx_stream->get_num_channels(); ch++)
buffs.push_back(&buff.front()); //same buffer for each channel
md.has_time_spec = (buffs.size() != 1);
while (not boost::this_thread::interruption_requested()){
- num_tx_samps += tx_stream->send(buffs, max_samps_per_packet, md);
+ num_tx_samps += tx_stream->send(buffs, max_samps_per_packet, md)*tx_stream->get_num_channels();;
md.has_time_spec = false;
}
@@ -136,13 +130,13 @@ void benchmark_tx_rate(uhd::usrp::multi_usrp::sptr usrp, const std::string &tx_c
tx_stream->send(buffs, 0, md);
}
-void benchmark_tx_rate_async_helper(uhd::usrp::multi_usrp::sptr usrp){
+void benchmark_tx_rate_async_helper(uhd::tx_streamer::sptr tx_stream){
//setup variables and allocate buffer
uhd::async_metadata_t async_md;
while (not boost::this_thread::interruption_requested()){
- if (not usrp->get_device()->recv_async_msg(async_md)) continue;
+ if (not tx_stream->recv_async_msg(async_md)) continue;
//handle the error codes
switch(async_md.event_code){
@@ -232,14 +226,24 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){
//spawn the receive test thread
if (vm.count("rx_rate")){
usrp->set_rx_rate(rx_rate);
- thread_group.create_thread(boost::bind(&benchmark_rx_rate, usrp, rx_cpu, rx_otw));
+ //create a receive streamer
+ uhd::stream_args_t stream_args(rx_cpu, rx_otw);
+ for (size_t ch = 0; ch < usrp->get_rx_num_channels(); ch++) //linear channel mapping
+ stream_args.channels.push_back(ch);
+ uhd::rx_streamer::sptr rx_stream = usrp->get_rx_stream(stream_args);
+ thread_group.create_thread(boost::bind(&benchmark_rx_rate, usrp, rx_cpu, rx_stream));
}
//spawn the transmit test thread
if (vm.count("tx_rate")){
usrp->set_tx_rate(tx_rate);
- thread_group.create_thread(boost::bind(&benchmark_tx_rate, usrp, tx_cpu, tx_otw));
- thread_group.create_thread(boost::bind(&benchmark_tx_rate_async_helper, usrp));
+ //create a transmit streamer
+ uhd::stream_args_t stream_args(tx_cpu, tx_otw);
+ for (size_t ch = 0; ch < usrp->get_tx_num_channels(); ch++) //linear channel mapping
+ stream_args.channels.push_back(ch);
+ uhd::tx_streamer::sptr tx_stream = usrp->get_tx_stream(stream_args);
+ thread_group.create_thread(boost::bind(&benchmark_tx_rate, usrp, tx_cpu, tx_stream));
+ thread_group.create_thread(boost::bind(&benchmark_tx_rate_async_helper, tx_stream));
}
//sleep for the required duration
diff --git a/host/examples/latency_test.cpp b/host/examples/latency_test.cpp
index 461ac9bf8..da06086f9 100644
--- a/host/examples/latency_test.cpp
+++ b/host/examples/latency_test.cpp
@@ -103,7 +103,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){
stream_cmd.num_samps = buffer.size();
stream_cmd.stream_now = false;
stream_cmd.time_spec = usrp->get_time_now() + uhd::time_spec_t(0.01);
- usrp->issue_stream_cmd(stream_cmd);
+ rx_stream->issue_stream_cmd(stream_cmd);
/***************************************************************
* Receive the requested packet
@@ -133,7 +133,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){
* Check the async messages for result
**************************************************************/
uhd::async_metadata_t async_md;
- if (not usrp->get_device()->recv_async_msg(async_md)){
+ if (not tx_stream->recv_async_msg(async_md)){
std::cout << boost::format("failed:\n Async message recv timed out.\n") << std::endl;
continue;
}
diff --git a/host/examples/rx_ascii_art_dft.cpp b/host/examples/rx_ascii_art_dft.cpp
index 1aede51a4..df3256b09 100644
--- a/host/examples/rx_ascii_art_dft.cpp
+++ b/host/examples/rx_ascii_art_dft.cpp
@@ -148,7 +148,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){
//-- Initialize
//------------------------------------------------------------------
initscr(); //curses init
- usrp->issue_stream_cmd(uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS);
+ rx_stream->issue_stream_cmd(uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS);
boost::system_time next_refresh = boost::get_system_time();
//------------------------------------------------------------------
@@ -189,7 +189,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){
//------------------------------------------------------------------
//-- Cleanup
//------------------------------------------------------------------
- usrp->issue_stream_cmd(uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS);
+ rx_stream->issue_stream_cmd(uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS);
endwin(); //curses done
//finished
diff --git a/host/examples/rx_multi_samples.cpp b/host/examples/rx_multi_samples.cpp
index 83d648eb5..4f312ad36 100644
--- a/host/examples/rx_multi_samples.cpp
+++ b/host/examples/rx_multi_samples.cpp
@@ -126,7 +126,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){
stream_cmd.num_samps = total_num_samps;
stream_cmd.stream_now = false;
stream_cmd.time_spec = uhd::time_spec_t(seconds_in_future);
- usrp->issue_stream_cmd(stream_cmd); //tells all channels to stream
+ rx_stream->issue_stream_cmd(stream_cmd); //tells all channels to stream
//meta-data will be filled in by recv()
uhd::rx_metadata_t md;
diff --git a/host/examples/rx_samples_to_file.cpp b/host/examples/rx_samples_to_file.cpp
index d0a02a6c1..1e833defa 100644
--- a/host/examples/rx_samples_to_file.cpp
+++ b/host/examples/rx_samples_to_file.cpp
@@ -58,7 +58,7 @@ template<typename samp_type> void recv_to_file(
stream_cmd.num_samps = num_requested_samples;
stream_cmd.stream_now = true;
stream_cmd.time_spec = uhd::time_spec_t();
- usrp->issue_stream_cmd(stream_cmd);
+ rx_stream->issue_stream_cmd(stream_cmd);
while(not stop_signal_called and (num_requested_samples != num_total_samps or num_requested_samples == 0)){
size_t num_rx_samps = rx_stream->recv(&buff.front(), buff.size(), md, 3.0);
diff --git a/host/examples/rx_samples_to_udp.cpp b/host/examples/rx_samples_to_udp.cpp
index f637f9313..0b3c6dce3 100644
--- a/host/examples/rx_samples_to_udp.cpp
+++ b/host/examples/rx_samples_to_udp.cpp
@@ -127,7 +127,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){
uhd::stream_cmd_t stream_cmd(uhd::stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_DONE);
stream_cmd.num_samps = total_num_samps;
stream_cmd.stream_now = true;
- usrp->issue_stream_cmd(stream_cmd);
+ rx_stream->issue_stream_cmd(stream_cmd);
//loop until total number of samples reached
size_t num_acc_samps = 0; //number of accumulated samples
diff --git a/host/examples/rx_timed_samples.cpp b/host/examples/rx_timed_samples.cpp
index f0d49b4bd..0eea2ffee 100644
--- a/host/examples/rx_timed_samples.cpp
+++ b/host/examples/rx_timed_samples.cpp
@@ -83,7 +83,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){
stream_cmd.num_samps = total_num_samps;
stream_cmd.stream_now = false;
stream_cmd.time_spec = uhd::time_spec_t(seconds_in_future);
- usrp->issue_stream_cmd(stream_cmd);
+ rx_stream->issue_stream_cmd(stream_cmd);
//meta-data will be filled in by recv()
uhd::rx_metadata_t md;
diff --git a/host/examples/test_messages.cpp b/host/examples/test_messages.cpp
index 6f2ddfe28..e39a8bd30 100644
--- a/host/examples/test_messages.cpp
+++ b/host/examples/test_messages.cpp
@@ -46,7 +46,7 @@ bool test_late_command_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streame
stream_cmd.num_samps = rx_stream->get_max_num_samps();
stream_cmd.stream_now = false;
stream_cmd.time_spec = uhd::time_spec_t(100.0); //time in the past
- usrp->issue_stream_cmd(stream_cmd);
+ rx_stream->issue_stream_cmd(stream_cmd);
std::vector<std::complex<float> > buff(rx_stream->get_max_num_samps());
uhd::rx_metadata_t md;
@@ -90,7 +90,7 @@ bool test_broken_chain_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streame
uhd::stream_cmd_t stream_cmd(uhd::stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_MORE);
stream_cmd.stream_now = true;
stream_cmd.num_samps = rx_stream->get_max_num_samps();
- usrp->issue_stream_cmd(stream_cmd);
+ rx_stream->issue_stream_cmd(stream_cmd);
std::vector<std::complex<float> > buff(rx_stream->get_max_num_samps());
uhd::rx_metadata_t md;
@@ -132,7 +132,7 @@ bool test_broken_chain_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streame
* Send a burst of many samples that will fragment internally.
* We expect to get an burst ack async message.
*/
-bool test_burst_ack_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streamer::sptr, uhd::tx_streamer::sptr tx_stream){
+bool test_burst_ack_message(uhd::usrp::multi_usrp::sptr, uhd::rx_streamer::sptr, uhd::tx_streamer::sptr tx_stream){
std::cout << "Test burst ack message... " << std::flush;
uhd::tx_metadata_t md;
@@ -148,7 +148,7 @@ bool test_burst_ack_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streamer::
);
uhd::async_metadata_t async_md;
- if (not usrp->get_device()->recv_async_msg(async_md)){
+ if (not tx_stream->recv_async_msg(async_md)){
std::cout << boost::format(
"failed:\n"
" Async message recv timed out.\n"
@@ -178,7 +178,7 @@ bool test_burst_ack_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streamer::
* Send a start of burst packet with no following end of burst.
* We expect to get an underflow(within a burst) async message.
*/
-bool test_underflow_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streamer::sptr, uhd::tx_streamer::sptr tx_stream){
+bool test_underflow_message(uhd::usrp::multi_usrp::sptr, uhd::rx_streamer::sptr, uhd::tx_streamer::sptr tx_stream){
std::cout << "Test underflow message... " << std::flush;
uhd::tx_metadata_t md;
@@ -189,7 +189,7 @@ bool test_underflow_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streamer::
tx_stream->send("", 0, md);
uhd::async_metadata_t async_md;
- if (not usrp->get_device()->recv_async_msg(async_md, 1)){
+ if (not tx_stream->recv_async_msg(async_md, 1)){
std::cout << boost::format(
"failed:\n"
" Async message recv timed out.\n"
@@ -233,7 +233,7 @@ bool test_time_error_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streamer:
tx_stream->send("", 0, md);
uhd::async_metadata_t async_md;
- if (not usrp->get_device()->recv_async_msg(async_md)){
+ if (not tx_stream->recv_async_msg(async_md)){
std::cout << boost::format(
"failed:\n"
" Async message recv timed out.\n"
@@ -258,9 +258,9 @@ bool test_time_error_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streamer:
}
}
-void flush_async(uhd::usrp::multi_usrp::sptr usrp){
+void flush_async(uhd::tx_streamer::sptr tx_stream){
uhd::async_metadata_t async_md;
- while (usrp->get_device()->recv_async_msg(async_md)){}
+ while (tx_stream->recv_async_msg(async_md)){}
}
void flush_recv(uhd::rx_streamer::sptr rx_stream){
@@ -331,7 +331,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){
for (size_t n = 0; n < ntests; n++){
std::string key = tests.keys()[std::rand() % tests.size()];
bool pass = tests[key](usrp, rx_stream, tx_stream);
- flush_async(usrp);
+ flush_async(tx_stream);
flush_recv(rx_stream);
//store result
diff --git a/host/examples/test_timed_commands.cpp b/host/examples/test_timed_commands.cpp
index cecf1607c..5dee58887 100644
--- a/host/examples/test_timed_commands.cpp
+++ b/host/examples/test_timed_commands.cpp
@@ -97,7 +97,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){
stream_cmd.stream_now = true;
const uhd::time_spec_t stream_time = usrp->get_time_now() + uhd::time_spec_t(0.1);
usrp->set_command_time(stream_time);
- usrp->issue_stream_cmd(stream_cmd);
+ rx_stream->issue_stream_cmd(stream_cmd);
usrp->clear_command_time();
//meta-data will be filled in by recv()
diff --git a/host/examples/transport_hammer.cpp b/host/examples/transport_hammer.cpp
index ff35ceb21..a44e81a82 100644
--- a/host/examples/transport_hammer.cpp
+++ b/host/examples/transport_hammer.cpp
@@ -41,15 +41,9 @@ unsigned long long num_seq_errors = 0;
/***********************************************************************
* RX Hammer
**********************************************************************/
-void rx_hammer(uhd::usrp::multi_usrp::sptr usrp, const std::string &rx_cpu, const std::string &rx_otw){
+void rx_hammer(uhd::usrp::multi_usrp::sptr usrp, const std::string &rx_cpu, uhd::rx_streamer::sptr rx_stream){
uhd::set_thread_priority_safe();
- //create a receive streamer
- uhd::stream_args_t stream_args(rx_cpu, rx_otw);
- for (size_t ch = 0; ch < usrp->get_num_mboards(); ch++) //linear channel mapping
- stream_args.channels.push_back(ch);
- uhd::rx_streamer::sptr rx_stream = usrp->get_rx_stream(stream_args);
-
//print pre-test summary
std::cout << boost::format(
"Testing receive rate %f Msps"
@@ -60,7 +54,7 @@ void rx_hammer(uhd::usrp::multi_usrp::sptr usrp, const std::string &rx_cpu, cons
const size_t max_samps_per_packet = rx_stream->get_max_num_samps();
std::vector<char> buff(max_samps_per_packet*uhd::convert::get_bytes_per_item(rx_cpu));
std::vector<void *> buffs;
- for (size_t ch = 0; ch < stream_args.channels.size(); ch++)
+ for (size_t ch = 0; ch < rx_stream->get_num_channels(); ch++)
buffs.push_back(&buff.front()); //same buffer for each channel
bool had_an_overflow = false;
uhd::time_spec_t last_time;
@@ -74,7 +68,7 @@ void rx_hammer(uhd::usrp::multi_usrp::sptr usrp, const std::string &rx_cpu, cons
while (not boost::this_thread::interruption_requested()){
cmd.num_samps = rand() % 100000;
- usrp->issue_stream_cmd(cmd);
+ rx_stream->issue_stream_cmd(cmd);
num_rx_samps += rx_stream->recv(buffs, max_samps_per_packet, md, timeout, true);
//handle the error codes
@@ -103,16 +97,15 @@ void rx_hammer(uhd::usrp::multi_usrp::sptr usrp, const std::string &rx_cpu, cons
/***********************************************************************
* TX Hammer
**********************************************************************/
-void tx_hammer(uhd::usrp::multi_usrp::sptr usrp, const std::string &tx_cpu, const std::string &tx_otw){
+void tx_hammer(uhd::usrp::multi_usrp::sptr usrp, const std::string &tx_cpu, uhd::tx_streamer::sptr tx_stream){
uhd::set_thread_priority_safe();
- //create a transmit streamer
- uhd::stream_args_t stream_args(tx_cpu, tx_otw);
- for (size_t ch = 0; ch < usrp->get_num_mboards(); ch++) //linear channel mapping
- stream_args.channels.push_back(ch);
- uhd::tx_streamer::sptr tx_stream = usrp->get_tx_stream(stream_args);
uhd::tx_metadata_t md;
- std::vector<std::complex<float> > buff(10000);
+ const size_t max_samps_per_packet = tx_stream->get_max_num_samps();
+ std::vector<char> buff(max_samps_per_packet*uhd::convert::get_bytes_per_item(tx_cpu));
+ std::vector<void *> buffs;
+ for (size_t ch = 0; ch < tx_stream->get_num_channels(); ch++)
+ buffs.push_back(&buff.front()); //same buffer for each channel
//print pre-test summary
std::cout << boost::format(
@@ -130,7 +123,7 @@ void tx_hammer(uhd::usrp::multi_usrp::sptr usrp, const std::string &tx_cpu, cons
while(num_acc_samps < total_num_samps){
//send a single packet
- num_tx_samps += tx_stream->send(&buff, tx_stream->get_max_num_samps(), md, timeout);
+ num_tx_samps += tx_stream->send(buffs, max_samps_per_packet, md, timeout);
num_acc_samps += std::min(total_num_samps-num_acc_samps, tx_stream->get_max_num_samps());
}
@@ -140,13 +133,13 @@ void tx_hammer(uhd::usrp::multi_usrp::sptr usrp, const std::string &tx_cpu, cons
}
}
-void tx_hammer_async_helper(uhd::usrp::multi_usrp::sptr usrp){
+void tx_hammer_async_helper(uhd::tx_streamer::sptr tx_stream){
//setup variables and allocate buffer
uhd::async_metadata_t async_md;
while (not boost::this_thread::interruption_requested()){
- if (not usrp->get_device()->recv_async_msg(async_md)) continue;
+ if (not tx_stream->recv_async_msg(async_md)) continue;
//handle the error codes
switch(async_md.event_code){
@@ -239,14 +232,24 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){
//spawn the receive test thread
if (vm.count("rx_rate")){
usrp->set_rx_rate(rx_rate);
- thread_group.create_thread(boost::bind(&rx_hammer, usrp, rx_cpu, rx_otw));
+ //create a receive streamer
+ uhd::stream_args_t stream_args(rx_cpu, rx_otw);
+ for (size_t ch = 0; ch < usrp->get_num_mboards(); ch++) //linear channel mapping
+ stream_args.channels.push_back(ch);
+ uhd::rx_streamer::sptr rx_stream = usrp->get_rx_stream(stream_args);
+ thread_group.create_thread(boost::bind(&rx_hammer, usrp, rx_cpu, rx_stream));
}
//spawn the transmit test thread
if (vm.count("tx_rate")){
usrp->set_tx_rate(tx_rate);
- thread_group.create_thread(boost::bind(&tx_hammer, usrp, tx_cpu, tx_otw));
- thread_group.create_thread(boost::bind(&tx_hammer_async_helper, usrp));
+ //create a transmit streamer
+ uhd::stream_args_t stream_args(tx_cpu, tx_otw);
+ for (size_t ch = 0; ch < usrp->get_num_mboards(); ch++) //linear channel mapping
+ stream_args.channels.push_back(ch);
+ uhd::tx_streamer::sptr tx_stream = usrp->get_tx_stream(stream_args);
+ thread_group.create_thread(boost::bind(&tx_hammer, usrp, tx_cpu, tx_stream));
+ thread_group.create_thread(boost::bind(&tx_hammer_async_helper, tx_stream));
}
//sleep for the required duration
diff --git a/host/examples/tx_bursts.cpp b/host/examples/tx_bursts.cpp
index 8dd4a002c..c58d3e178 100644
--- a/host/examples/tx_bursts.cpp
+++ b/host/examples/tx_bursts.cpp
@@ -147,7 +147,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){
uhd::async_metadata_t async_md;
bool got_async_burst_ack = false;
//loop through all messages for the ACK packet (may have underflow messages in queue)
- while (not got_async_burst_ack and usrp->get_device()->recv_async_msg(async_md, seconds_in_future)){
+ while (not got_async_burst_ack and tx_stream->recv_async_msg(async_md, seconds_in_future)){
got_async_burst_ack = (async_md.event_code == uhd::async_metadata_t::EVENT_CODE_BURST_ACK);
}
std::cout << (got_async_burst_ack? "success" : "fail") << std::endl;
diff --git a/host/examples/tx_timed_samples.cpp b/host/examples/tx_timed_samples.cpp
index 4cc31a7c0..2eef80389 100644
--- a/host/examples/tx_timed_samples.cpp
+++ b/host/examples/tx_timed_samples.cpp
@@ -116,7 +116,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){
uhd::async_metadata_t async_md;
bool got_async_burst_ack = false;
//loop through all messages for the ACK packet (may have underflow messages in queue)
- while (not got_async_burst_ack and usrp->get_device()->recv_async_msg(async_md, timeout)){
+ while (not got_async_burst_ack and tx_stream->recv_async_msg(async_md, timeout)){
got_async_burst_ack = (async_md.event_code == uhd::async_metadata_t::EVENT_CODE_BURST_ACK);
}
std::cout << (got_async_burst_ack? "success" : "fail") << std::endl;
diff --git a/host/examples/txrx_loopback_to_file.cpp b/host/examples/txrx_loopback_to_file.cpp
index 9dc348da8..ef119b689 100644
--- a/host/examples/txrx_loopback_to_file.cpp
+++ b/host/examples/txrx_loopback_to_file.cpp
@@ -151,7 +151,7 @@ template<typename samp_type> void recv_to_file(
stream_cmd.num_samps = num_requested_samples;
stream_cmd.stream_now = false;
stream_cmd.time_spec = uhd::time_spec_t(settling_time);
- usrp->issue_stream_cmd(stream_cmd);
+ rx_stream->issue_stream_cmd(stream_cmd);
while(not stop_signal_called and (num_requested_samples != num_total_samps or num_requested_samples == 0)){
size_t num_rx_samps = rx_stream->recv(&buff.front(), buff.size(), md, timeout);
diff --git a/host/include/uhd/device.hpp b/host/include/uhd/device.hpp
index 89c6332da..de39383c9 100644
--- a/host/include/uhd/device.hpp
+++ b/host/include/uhd/device.hpp
@@ -82,16 +82,6 @@ public:
//! Make a new transmit streamer from the streamer arguments
virtual tx_streamer::sptr get_tx_stream(const stream_args_t &args) = 0;
- /*!
- * Receive and asynchronous message from the device.
- * \param async_metadata the metadata to be filled in
- * \param timeout the timeout in seconds to wait for a message
- * \return true when the async_metadata is valid, false for timeout
- */
- virtual bool recv_async_msg(
- async_metadata_t &async_metadata, double timeout = 0.1
- ) = 0;
-
//! Get access to the underlying property structure
virtual boost::shared_ptr<property_tree> get_tree(void) const = 0;
diff --git a/host/include/uhd/device_deprecated.ipp b/host/include/uhd/device_deprecated.ipp
index 0ee1cd706..58c43a876 100644
--- a/host/include/uhd/device_deprecated.ipp
+++ b/host/include/uhd/device_deprecated.ipp
@@ -184,6 +184,16 @@ size_t get_max_recv_samps_per_packet(void){
return _rx_streamer->get_max_num_samps();
}
+/*!
+ * Receive and asynchronous message from the device.
+ * \param async_metadata the metadata to be filled in
+ * \param timeout the timeout in seconds to wait for a message
+ * \return true when the async_metadata is valid, false for timeout
+ */
+virtual bool recv_async_msg(
+ async_metadata_t &async_metadata, double timeout = 0.1
+) = 0;
+
private:
rx_streamer::sptr _rx_streamer;
io_type_t::tid_t _recv_tid;
diff --git a/host/include/uhd/exception.hpp b/host/include/uhd/exception.hpp
index c05861788..69e902fd5 100644
--- a/host/include/uhd/exception.hpp
+++ b/host/include/uhd/exception.hpp
@@ -157,9 +157,9 @@ namespace uhd{
* If the code evaluates to false, throw an assertion error.
* \param code the code that resolved to a boolean
*/
- #define UHD_ASSERT_THROW(code) if (not (code)) \
+ #define UHD_ASSERT_THROW(code) {if (not (code)) \
throw uhd::assertion_error(UHD_THROW_SITE_INFO(#code)); \
- else void(0)
+ }
} //namespace uhd
diff --git a/host/include/uhd/stream.hpp b/host/include/uhd/stream.hpp
index c05e8a1e9..75b097ded 100644
--- a/host/include/uhd/stream.hpp
+++ b/host/include/uhd/stream.hpp
@@ -1,5 +1,5 @@
//
-// Copyright 2011-2012 Ettus Research LLC
+// Copyright 2011-2013 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -21,6 +21,7 @@
#include <uhd/config.hpp>
#include <uhd/types/metadata.hpp>
#include <uhd/types/device_addr.hpp>
+#include <uhd/types/stream_cmd.hpp>
#include <uhd/types/ref_vector.hpp>
#include <boost/utility.hpp>
#include <boost/shared_ptr.hpp>
@@ -172,6 +173,19 @@ public:
const double timeout = 0.1,
const bool one_packet = false
) = 0;
+
+ /*!
+ * Issue a stream command to the usrp device.
+ * This tells the usrp to send samples into the host.
+ * See the documentation for stream_cmd_t for more info.
+ *
+ * With multiple devices, the first stream command in a chain of commands
+ * should have a time spec in the near future and stream_now = false;
+ * to ensure that the packets can be aligned by their time specs.
+ *
+ * \param stream_cmd the stream command to issue
+ */
+ virtual void issue_stream_cmd(const stream_cmd_t &stream_cmd) = 0;
};
/*!
@@ -221,6 +235,16 @@ public:
const tx_metadata_t &metadata,
const double timeout = 0.1
) = 0;
+
+ /*!
+ * Receive and asynchronous message from this TX stream.
+ * \param async_metadata the metadata to be filled in
+ * \param timeout the timeout in seconds to wait for a message
+ * \return true when the async_metadata is valid, false for timeout
+ */
+ virtual bool recv_async_msg(
+ async_metadata_t &async_metadata, double timeout = 0.1
+ ) = 0;
};
} //namespace uhd
diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp
index 9c24005b7..35ffb293b 100644
--- a/host/include/uhd/transport/bounded_buffer.ipp
+++ b/host/include/uhd/transport/bounded_buffer.ipp
@@ -1,5 +1,5 @@
//
-// Copyright 2010-2011 Ettus Research LLC
+// Copyright 2010-2013 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -26,7 +26,7 @@
#include <boost/thread/condition.hpp>
#include <boost/thread/locks.hpp>
-namespace uhd{ namespace transport{ namespace{ /*anon*/
+namespace uhd{ namespace transport{
template <typename elem_type> class bounded_buffer_detail : boost::noncopyable{
public:
@@ -142,6 +142,6 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/
}
};
-}}} //namespace
+}} //namespace
#endif /* INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_IPP */
diff --git a/host/include/uhd/transport/vrt_if_packet.hpp b/host/include/uhd/transport/vrt_if_packet.hpp
index 1be480874..d16892281 100644
--- a/host/include/uhd/transport/vrt_if_packet.hpp
+++ b/host/include/uhd/transport/vrt_if_packet.hpp
@@ -1,5 +1,5 @@
//
-// Copyright 2010 Ettus Research LLC
+// Copyright 2010-2013 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -26,6 +26,9 @@ namespace uhd{ namespace transport{
namespace vrt{
+ //! The maximum number of 32-bit words in the vrlp link layer
+ static const size_t num_vrl_words32 = 3;
+
//! The maximum number of 32-bit words in a vrt if packet header
static const size_t max_if_hdr_words32 = 7; //hdr+sid+cid+tsi+tsf
@@ -34,12 +37,24 @@ namespace vrt{
* The size fields are used for input and output depending upon
* the operation used (ie the pack or unpack function call).
*/
- struct UHD_API if_packet_info_t{
- //packet type (pack only supports data)
- enum packet_type_t {
+ struct UHD_API if_packet_info_t
+ {
+ if_packet_info_t(void);
+
+ //link layer type - always set for pack and unpack
+ enum link_type_t
+ {
+ LINK_TYPE_NONE = 0x0,
+ LINK_TYPE_CHDR = 0x1,
+ LINK_TYPE_VRLP = 0x2,
+ } link_type;
+
+ //packet type
+ enum packet_type_t
+ {
PACKET_TYPE_DATA = 0x0,
- PACKET_TYPE_EXTENSION = 0x1,
- PACKET_TYPE_CONTEXT = 0x2
+ PACKET_TYPE_IF_EXT = 0x1,
+ PACKET_TYPE_CONTEXT = 0x2, //extension context: has_sid = true
} packet_type;
//size fields
@@ -100,6 +115,22 @@ namespace vrt{
if_packet_info_t &if_packet_info
);
+ UHD_INLINE if_packet_info_t::if_packet_info_t(void):
+ link_type(LINK_TYPE_NONE),
+ packet_type(PACKET_TYPE_DATA),
+ num_payload_words32(0),
+ num_payload_bytes(0),
+ num_header_words32(0),
+ num_packet_words32(0),
+ packet_count(0),
+ sob(false), eob(false),
+ has_sid(false), sid(0),
+ has_cid(false), cid(0),
+ has_tsi(false), tsi(0),
+ has_tsf(false), tsf(0),
+ has_tlr(false), tlr(0)
+ {}
+
} //namespace vrt
}} //namespace
diff --git a/host/lib/transport/gen_vrt_if_packet.py b/host/lib/transport/gen_vrt_if_packet.py
index e28ce3aae..56e7c61bf 100644
--- a/host/lib/transport/gen_vrt_if_packet.py
+++ b/host/lib/transport/gen_vrt_if_packet.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2010-2011 Ettus Research LLC
+# Copyright 2010-2013 Ettus Research LLC
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -48,12 +48,14 @@ TMPL_TEXT = """
using namespace uhd;
using namespace uhd::transport;
+using namespace uhd::transport::vrt;
typedef size_t pred_type;
typedef std::vector<pred_type> pred_table_type;
#define pred_table_index(hdr) ((hdr >> 20) & 0x1ff)
-static pred_table_type get_pred_unpack_table(void){
+static pred_table_type get_pred_unpack_table(void)
+{
pred_table_type table(1 << 9, 0); //only 9 bits useful here (20-28)
for (size_t i = 0; i < table.size(); i++){
boost::uint32_t vrt_hdr_word = i << 20;
@@ -74,13 +76,43 @@ static const pred_table_type pred_unpack_table(get_pred_unpack_table());
//maps num empty bytes to trailer bits
static const size_t occ_table[] = {0, 2, 1, 3};
+const boost::uint32_t VRLP = ('V' << 24) | ('R' << 16) | ('L' << 8) | ('P' << 0);
+const boost::uint32_t VEND = ('V' << 24) | ('E' << 16) | ('N' << 8) | ('D' << 0);
+
+UHD_INLINE static boost::uint32_t chdr_to_vrt(const boost::uint32_t chdr, size_t &packet_count)
+{
+ boost::uint32_t vrt = chdr & 0xffff; //words32
+ packet_count = (chdr >> 16) & 0xfff;
+ vrt |= ((chdr >> 31) & 0x1) << 30; //context packet
+ vrt |= ((chdr >> 30) & 0x1) << 26; //has tlr
+ vrt |= ((chdr >> 29) & 0x1) << 20; //has tsf
+ vrt |= ((chdr >> 28) & 0x1) << 24; //has eob
+ vrt |= (0x1) << 28; //has sid (always)
+ return vrt;
+}
+
+UHD_INLINE static boost::uint32_t vrt_to_chdr(const boost::uint32_t vrt, const size_t packet_count)
+{
+ boost::uint32_t chdr = vrt & 0xffff; //words32
+ chdr |= (packet_count & 0xfff) << 16;
+ chdr |= ((vrt >> 30) & 0x1) << 31; //context packet
+ chdr |= ((vrt >> 26) & 0x1) << 30; //has tlr
+ chdr |= ((vrt >> 20) & 0x1) << 29; //has tsf
+ chdr |= ((vrt >> 24) & 0x1) << 28; //has eob
+ return chdr;
+}
+
########################################################################
#def gen_code($XE_MACRO, $suffix)
########################################################################
-void vrt::if_hdr_pack_$(suffix)(
+/***********************************************************************
+ * interal impl of packing VRT IF header only
+ **********************************************************************/
+UHD_INLINE void __if_hdr_pack_$(suffix)(
boost::uint32_t *packet_buff,
- if_packet_info_t &if_packet_info
+ if_packet_info_t &if_packet_info,
+ boost::uint32_t &vrt_hdr_word32
){
boost::uint32_t vrt_hdr_flags = 0;
@@ -154,31 +186,33 @@ void vrt::if_hdr_pack_$(suffix)(
}
//fill in complete header word
- packet_buff[0] = $(XE_MACRO)(boost::uint32_t(0
+ vrt_hdr_word32 = boost::uint32_t(0
| (if_packet_info.packet_type << 29)
| vrt_hdr_flags
| ((if_packet_info.packet_count & 0xf) << 16)
| (if_packet_info.num_packet_words32 & 0xffff)
- ));
+ );
}
-void vrt::if_hdr_unpack_$(suffix)(
+/***********************************************************************
+ * interal impl of unpacking VRT IF header only
+ **********************************************************************/
+UHD_INLINE void __if_hdr_unpack_$(suffix)(
const boost::uint32_t *packet_buff,
- if_packet_info_t &if_packet_info
+ if_packet_info_t &if_packet_info,
+ const boost::uint32_t vrt_hdr_word32
){
- //extract vrt header
- boost::uint32_t vrt_hdr_word = $(XE_MACRO)(packet_buff[0]);
- size_t packet_words32 = vrt_hdr_word & 0xffff;
+ const size_t packet_words32 = vrt_hdr_word32 & 0xffff;
//failure case
if (if_packet_info.num_packet_words32 < packet_words32)
throw uhd::value_error("bad vrt header or packet fragment");
//extract fields from the header
- if_packet_info.packet_type = if_packet_info_t::packet_type_t(vrt_hdr_word >> 29);
- if_packet_info.packet_count = (vrt_hdr_word >> 16) & 0xf;
+ if_packet_info.packet_type = if_packet_info_t::packet_type_t(vrt_hdr_word32 >> 29);
+ if_packet_info.packet_count = (vrt_hdr_word32 >> 16) & 0xf;
- const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word)];
+ const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word32)];
size_t empty_bytes = 0;
@@ -259,6 +293,84 @@ void vrt::if_hdr_unpack_$(suffix)(
}
}
+/***********************************************************************
+ * link layer + VRT IF packing
+ **********************************************************************/
+void vrt::if_hdr_pack_$(suffix)(
+ boost::uint32_t *packet_buff,
+ if_packet_info_t &if_packet_info
+){
+ boost::uint32_t vrt_hdr_word32 = 0;
+ switch (if_packet_info.link_type)
+ {
+ case if_packet_info_t::LINK_TYPE_NONE:
+ __if_hdr_pack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32);
+ packet_buff[0] = $(XE_MACRO)(vrt_hdr_word32);
+ break;
+
+ case if_packet_info_t::LINK_TYPE_CHDR:
+ {
+ __if_hdr_pack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32);
+ const boost::uint32_t chdr = vrt_to_chdr(vrt_hdr_word32, if_packet_info.packet_count);
+ packet_buff[0] = $(XE_MACRO)(chdr);
+ break;
+ }
+
+ case if_packet_info_t::LINK_TYPE_VRLP:
+ __if_hdr_pack_$(suffix)(packet_buff+2, if_packet_info, vrt_hdr_word32);
+ if_packet_info.num_header_words32 += 2;
+ if_packet_info.num_packet_words32 += 3;
+ packet_buff[0] = $(XE_MACRO)(VRLP);
+ packet_buff[1] = $(XE_MACRO)(boost::uint32_t(
+ (if_packet_info.num_packet_words32 & 0xfffff) |
+ ((if_packet_info.packet_count & 0xfff) << 20)
+ ));
+ packet_buff[2] = $(XE_MACRO)(vrt_hdr_word32);
+ packet_buff[if_packet_info.num_packet_words32-1] = $(XE_MACRO)(VEND);
+ break;
+ }
+}
+
+/***********************************************************************
+ * link layer + VRT IF unpacking
+ **********************************************************************/
+void vrt::if_hdr_unpack_$(suffix)(
+ const boost::uint32_t *packet_buff,
+ if_packet_info_t &if_packet_info
+){
+ boost::uint32_t vrt_hdr_word32 = 0;
+ switch (if_packet_info.link_type)
+ {
+ case if_packet_info_t::LINK_TYPE_NONE:
+ vrt_hdr_word32 = $(XE_MACRO)(packet_buff[0]);
+ __if_hdr_unpack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32);
+ break;
+
+ case if_packet_info_t::LINK_TYPE_CHDR:
+ {
+ const boost::uint32_t chdr = $(XE_MACRO)(packet_buff[0]);
+ size_t packet_count = 0;
+ vrt_hdr_word32 = chdr_to_vrt(chdr, packet_count);
+ __if_hdr_unpack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32);
+ if_packet_info.packet_count = packet_count;
+ break;
+ }
+
+ case if_packet_info_t::LINK_TYPE_VRLP:
+ {
+ if ($(XE_MACRO)(packet_buff[0]) != VRLP) throw uhd::value_error("bad vrl header VRLP");
+ const boost::uint32_t vrl_hdr = $(XE_MACRO)(packet_buff[1]);
+ vrt_hdr_word32 = $(XE_MACRO)(packet_buff[2]);
+ if (if_packet_info.num_packet_words32 < (vrl_hdr & 0xfffff)) throw uhd::value_error("bad vrl header or packet fragment");
+ if ($(XE_MACRO)(packet_buff[(vrl_hdr & 0xfffff)-1]) != VEND) throw uhd::value_error("bad vrl trailer VEND");
+ __if_hdr_unpack_$(suffix)(packet_buff+2, if_packet_info, vrt_hdr_word32);
+ if_packet_info.num_header_words32 += 2; //add vrl header
+ if_packet_info.packet_count = (vrl_hdr >> 20) & 0xfff;
+ break;
+ }
+ }
+}
+
########################################################################
#end def
########################################################################
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 5a75d5f0d..75d1f3068 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -63,6 +63,8 @@ static inline void handle_overflow_nop(void){}
class recv_packet_handler{
public:
typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type;
+ typedef boost::function<void(const size_t)> handle_flowctrl_type;
+ typedef boost::function<void(const stream_cmd_t&)> issue_stream_cmd_type;
typedef void(*vrt_unpacker_type)(const boost::uint32_t *, vrt::if_packet_info_t &);
//typedef boost::function<void(const boost::uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type;
@@ -139,6 +141,19 @@ public:
_props.at(xport_chan).get_buff = get_buff;
}
+ /*!
+ * Set the function to handle flow control
+ * \param xport_chan which transport channel
+ * \param handle_flowctrl the callback function
+ */
+ void set_xport_handle_flowctrl(const size_t xport_chan, const handle_flowctrl_type &handle_flowctrl, const size_t update_window, const bool do_init = false)
+ {
+ _props.at(xport_chan).handle_flowctrl = handle_flowctrl;
+ //we need the window size to be within the 0xfff (max 12 bit seq)
+ _props.at(xport_chan).fc_update_window = std::min<size_t>(update_window, 0xfff);
+ if (do_init) handle_flowctrl(0);
+ }
+
//! Set the conversion routine for all channels
void set_converter(const uhd::convert::id_type &id){
_num_outputs = id.num_outputs;
@@ -158,6 +173,21 @@ public:
_converter->set_scalar(scale_factor);
}
+ //! Set the callback to issue stream commands
+ void set_issue_stream_cmd(const size_t xport_chan, const issue_stream_cmd_type &issue_stream_cmd)
+ {
+ _props.at(xport_chan).issue_stream_cmd = issue_stream_cmd;
+ }
+
+ //! Overload call to issue stream commands
+ void issue_stream_cmd(const stream_cmd_t &stream_cmd)
+ {
+ for (size_t i = 0; i < _props.size(); i++)
+ {
+ if (_props[i].issue_stream_cmd) _props[i].issue_stream_cmd(stream_cmd);
+ }
+ }
+
/*******************************************************************
* Receive:
* The entry point for the fast-path receive calls.
@@ -219,8 +249,11 @@ private:
handle_overflow(&handle_overflow_nop)
{}
get_buff_type get_buff;
+ issue_stream_cmd_type issue_stream_cmd;
size_t packet_count;
handle_overflow_type handle_overflow;
+ handle_flowctrl_type handle_flowctrl;
+ size_t fc_update_window;
};
std::vector<xport_chan_props_type> _props;
size_t _num_outputs;
@@ -302,6 +335,15 @@ private:
info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true
info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32);
+ //handle flow control
+ if (_props[index].handle_flowctrl)
+ {
+ if ((info.ifpi.packet_count % _props[index].fc_update_window/2) == 0)
+ {
+ _props[index].handle_flowctrl(info.ifpi.packet_count);
+ }
+ }
+
//--------------------------------------------------------------
//-- Determine return conditions:
//-- The order of these checks is HOLY.
@@ -314,8 +356,9 @@ private:
//2) check for sequence errors
#ifndef SRPH_DONT_CHECK_SEQUENCE
+ const size_t seq_mask = (info.ifpi.link_type == vrt::if_packet_info_t::LINK_TYPE_NONE)? 0xf : 0xfff;
const size_t expected_packet_count = _props[index].packet_count;
- _props[index].packet_count = (info.ifpi.packet_count + 1)%16;
+ _props[index].packet_count = (info.ifpi.packet_count + 1) & seq_mask;
if (expected_packet_count != info.ifpi.packet_count){
return PACKET_SEQUENCE_ERROR;
}
@@ -622,6 +665,11 @@ public:
return recv_packet_handler::recv(buffs, nsamps_per_buff, metadata, timeout, one_packet);
}
+ void issue_stream_cmd(const stream_cmd_t &stream_cmd)
+ {
+ return recv_packet_handler::issue_stream_cmd(stream_cmd);
+ }
+
private:
size_t _max_num_samps;
};
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp
index 726742327..41f030ea6 100644
--- a/host/lib/transport/super_send_packet_handler.hpp
+++ b/host/lib/transport/super_send_packet_handler.hpp
@@ -47,6 +47,7 @@ namespace uhd{ namespace transport{ namespace sph{
class send_packet_handler{
public:
typedef boost::function<managed_send_buffer::sptr(double)> get_buff_type;
+ typedef boost::function<bool(uhd::async_metadata_t &, const double)> async_receiver_type;
typedef void(*vrt_packer_type)(boost::uint32_t *, vrt::if_packet_info_t &);
//typedef boost::function<void(boost::uint32_t *, vrt::if_packet_info_t &)> vrt_packer_type;
@@ -57,6 +58,7 @@ public:
send_packet_handler(const size_t size = 1):
_next_packet_seq(0)
{
+ this->set_enable_trailer(true);
this->resize(size);
}
@@ -96,6 +98,11 @@ public:
_props.at(xport_chan).sid = sid;
}
+ void set_enable_trailer(const bool enable)
+ {
+ _has_tlr = enable;
+ }
+
//! Set the rate of ticks per second
void set_tick_rate(const double rate){
_tick_rate = rate;
@@ -138,6 +145,21 @@ public:
_converter->set_scalar(scale_factor);
}
+ //! Set the callback to get async messages
+ void set_async_receiver(const async_receiver_type &async_receiver)
+ {
+ _async_receiver = async_receiver;
+ }
+
+ //! Overload call to get async metadata
+ bool recv_async_msg(
+ uhd::async_metadata_t &async_metadata, double timeout = 0.1
+ ){
+ if (_async_receiver) return _async_receiver(async_metadata, timeout);
+ boost::this_thread::sleep(boost::posix_time::microseconds(long(timeout*1e6)));
+ return false;
+ }
+
/*******************************************************************
* Send:
* The entry point for the fast-path send calls.
@@ -154,7 +176,7 @@ public:
if_packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA;
//if_packet_info.has_sid = false; //set per channel
if_packet_info.has_cid = false;
- if_packet_info.has_tlr = true;
+ if_packet_info.has_tlr = _has_tlr;
if_packet_info.has_tsi = false;
if_packet_info.has_tsf = metadata.has_time_spec;
if_packet_info.tsf = metadata.time_spec.to_ticks(_tick_rate);
@@ -165,9 +187,12 @@ public:
//TODO remove this code when sample counts of zero are supported by hardware
#ifndef SSPH_DONT_PAD_TO_ONE
- if (nsamps_per_buff == 0) return send_one_packet(
- _zero_buffs, 1, if_packet_info, timeout
- ) & 0x0;
+ static const boost::uint64_t zero = 0;
+ _zero_buffs.resize(buffs.size(), &zero);
+
+ if (nsamps_per_buff == 0) return send_one_packet(
+ _zero_buffs, 1, if_packet_info, timeout
+ ) & 0x0;
#endif
return send_one_packet(buffs, nsamps_per_buff, if_packet_info, timeout);
@@ -228,6 +253,8 @@ private:
size_t _max_samples_per_packet;
std::vector<const void *> _zero_buffs;
size_t _next_packet_seq;
+ bool _has_tlr;
+ async_receiver_type _async_receiver;
/*******************************************************************
* Send a single packet:
@@ -337,6 +364,12 @@ public:
return send_packet_handler::send(buffs, nsamps_per_buff, metadata, timeout);
}
+ bool recv_async_msg(
+ uhd::async_metadata_t &async_metadata, double timeout = 0.1
+ ){
+ return send_packet_handler::recv_async_msg(async_metadata, timeout);
+ }
+
private:
size_t _max_num_samps;
};
diff --git a/host/lib/usrp/CMakeLists.txt b/host/lib/usrp/CMakeLists.txt
index 8ae379f73..d251fbaeb 100644
--- a/host/lib/usrp/CMakeLists.txt
+++ b/host/lib/usrp/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2010-2011 Ettus Research LLC
+# Copyright 2010-2013 Ettus Research LLC
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
diff --git a/host/lib/usrp/b100/io_impl.cpp b/host/lib/usrp/b100/io_impl.cpp
index 9b5d4d25c..c4a4e422e 100644
--- a/host/lib/usrp/b100/io_impl.cpp
+++ b/host/lib/usrp/b100/io_impl.cpp
@@ -164,6 +164,8 @@ rx_streamer::sptr b100_impl::get_rx_stream(const uhd::stream_args_t &args_){
my_streamer->set_overflow_handler(chan_i, boost::bind(
&rx_dsp_core_200::handle_overflow, _rx_dsps[dsp]
));
+ my_streamer->set_issue_stream_cmd(chan_i, boost::bind(
+ &rx_dsp_core_200::issue_stream_command, _rx_dsps[dsp], _1));
_rx_streamers[dsp] = my_streamer; //store weak pointer
}
@@ -217,6 +219,7 @@ tx_streamer::sptr b100_impl::get_tx_stream(const uhd::stream_args_t &args_){
my_streamer->set_xport_chan_get_buff(chan_i, boost::bind(
&zero_copy_if::get_send_buff, _data_transport, _1
));
+ my_streamer->set_async_receiver(boost::bind(&fifo_ctrl_excelsior::pop_async_msg, _fifo_ctrl, _1, _2));
_tx_streamers[dsp] = my_streamer; //store weak pointer
}
diff --git a/host/lib/usrp/e100/io_impl.cpp b/host/lib/usrp/e100/io_impl.cpp
index e34620444..bf04a5871 100644
--- a/host/lib/usrp/e100/io_impl.cpp
+++ b/host/lib/usrp/e100/io_impl.cpp
@@ -166,6 +166,8 @@ rx_streamer::sptr e100_impl::get_rx_stream(const uhd::stream_args_t &args_){
my_streamer->set_overflow_handler(chan_i, boost::bind(
&rx_dsp_core_200::handle_overflow, _rx_dsps[dsp]
));
+ my_streamer->set_issue_stream_cmd(chan_i, boost::bind(
+ &rx_dsp_core_200::issue_stream_command, _rx_dsps[dsp], _1));
_rx_streamers[dsp] = my_streamer; //store weak pointer
}
@@ -220,6 +222,7 @@ tx_streamer::sptr e100_impl::get_tx_stream(const uhd::stream_args_t &args_){
my_streamer->set_xport_chan_get_buff(chan_i, boost::bind(
&zero_copy_if::get_send_buff, _data_transport, _1
));
+ my_streamer->set_async_receiver(boost::bind(&fifo_ctrl_excelsior::pop_async_msg, _fifo_ctrl, _1, _2));
_tx_streamers[dsp] = my_streamer; //store weak pointer
}
diff --git a/host/lib/usrp/usrp1/io_impl.cpp b/host/lib/usrp/usrp1/io_impl.cpp
index 8940a92bb..d384eb13f 100644
--- a/host/lib/usrp/usrp1/io_impl.cpp
+++ b/host/lib/usrp/usrp1/io_impl.cpp
@@ -356,6 +356,11 @@ public:
return _stc->recv_post(metadata, num_samps_recvd);
}
+ void issue_stream_cmd(const stream_cmd_t &stream_cmd)
+ {
+ _stc->issue_stream_cmd(stream_cmd);
+ }
+
private:
size_t _max_num_samps;
soft_time_ctrl::sptr _stc;
@@ -410,6 +415,12 @@ public:
return num_samps_sent;
}
+ bool recv_async_msg(
+ async_metadata_t &async_metadata, double timeout = 0.1
+ ){
+ return _stc->get_async_queue().pop_with_timed_wait(async_metadata, timeout);
+ }
+
private:
size_t _max_num_samps;
soft_time_ctrl::sptr _stc;
diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp
index e06cf8f6f..9ee6abed0 100644
--- a/host/lib/usrp/usrp2/io_impl.cpp
+++ b/host/lib/usrp/usrp2/io_impl.cpp
@@ -467,6 +467,8 @@ rx_streamer::sptr usrp2_impl::get_rx_stream(const uhd::stream_args_t &args_){
my_streamer->set_xport_chan_get_buff(chan_i, boost::bind(
&zero_copy_if::get_recv_buff, _mbc[mb].rx_dsp_xports[dsp], _1
), true /*flush*/);
+ my_streamer->set_issue_stream_cmd(chan_i, boost::bind(
+ &rx_dsp_core_200::issue_stream_command, _mbc[mb].rx_dsps[dsp], _1));
_mbc[mb].rx_streamers[dsp] = my_streamer; //store weak pointer
break;
}
@@ -536,6 +538,7 @@ tx_streamer::sptr usrp2_impl::get_tx_stream(const uhd::stream_args_t &args_){
my_streamer->set_xport_chan_get_buff(chan_i, boost::bind(
&usrp2_impl::io_impl::get_send_buff, _io_impl.get(), abs, _1
));
+ my_streamer->set_async_receiver(boost::bind(&bounded_buffer<async_metadata_t>::pop_with_timed_wait, &(_io_impl->async_msg_fifo), _1, _2));
_mbc[mb].tx_streamers[dsp] = my_streamer; //store weak pointer
break;
}
diff --git a/host/tests/sph_recv_test.cpp b/host/tests/sph_recv_test.cpp
index 5a40029dc..9339a9739 100644
--- a/host/tests/sph_recv_test.cpp
+++ b/host/tests/sph_recv_test.cpp
@@ -288,7 +288,7 @@ BOOST_AUTO_TEST_CASE(test_sph_recv_one_channel_inline_message){
//simulate overflow
if (i == NUM_PKTS_TO_TEST/2){
- ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_EXTENSION;
+ ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_CONTEXT;
ifpi.num_payload_words32 = 1;
dummy_recv_xport.push_back_packet(ifpi, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW);
}
diff --git a/host/tests/vrt_test.cpp b/host/tests/vrt_test.cpp
index 066f1493b..225fa4f2b 100644
--- a/host/tests/vrt_test.cpp
+++ b/host/tests/vrt_test.cpp
@@ -1,5 +1,5 @@
//
-// Copyright 2010-2011 Ettus Research LLC
+// Copyright 2010-2013 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -17,26 +17,39 @@
#include <boost/test/unit_test.hpp>
#include <uhd/transport/vrt_if_packet.hpp>
+#include <uhd/utils/byteswap.hpp>
+#include <boost/format.hpp>
#include <cstdlib>
+#include <iostream>
using namespace uhd::transport;
static void pack_and_unpack(
vrt::if_packet_info_t &if_packet_info_in
){
- boost::uint32_t header_buff[vrt::max_if_hdr_words32];
+ if (if_packet_info_in.num_payload_bytes == 0)
+ {
+ if_packet_info_in.num_payload_bytes = if_packet_info_in.num_payload_words32 * sizeof(boost::uint32_t);
+ }
+ boost::uint32_t packet_buff[2048];
//pack metadata into a vrt header
vrt::if_hdr_pack_be(
- header_buff, if_packet_info_in
+ packet_buff, if_packet_info_in
);
+ std::cout << std::endl;
+ for (size_t i = 0; i < 5; i++)
+ {
+ std::cout << boost::format("packet_buff[%u] = 0x%.8x") % i % uhd::byteswap(packet_buff[i]) << std::endl;
+ }
vrt::if_packet_info_t if_packet_info_out;
+ if_packet_info_out.link_type = if_packet_info_in.link_type;
if_packet_info_out.num_packet_words32 = if_packet_info_in.num_packet_words32;
//unpack the vrt header back into metadata
vrt::if_hdr_unpack_be(
- header_buff, if_packet_info_out
+ packet_buff, if_packet_info_out
);
//check the the unpacked metadata is the same
@@ -91,7 +104,7 @@ BOOST_AUTO_TEST_CASE(test_with_sid){
if_packet_info.has_tsf = false;
if_packet_info.has_tlr = false;
if_packet_info.sid = std::rand();
- if_packet_info.num_payload_words32 = 1111;
+ if_packet_info.num_payload_words32 = 11;
pack_and_unpack(if_packet_info);
}
@@ -106,7 +119,7 @@ BOOST_AUTO_TEST_CASE(test_with_cid){
if_packet_info.has_tsf = false;
if_packet_info.has_tlr = false;
if_packet_info.cid = std::rand();
- if_packet_info.num_payload_words32 = 2222;
+ if_packet_info.num_payload_words32 = 22;
pack_and_unpack(if_packet_info);
}
@@ -120,7 +133,7 @@ BOOST_AUTO_TEST_CASE(test_with_time){
if_packet_info.has_tlr = false;
if_packet_info.tsi = std::rand();
if_packet_info.tsf = std::rand();
- if_packet_info.num_payload_words32 = 33333;
+ if_packet_info.num_payload_words32 = 33;
pack_and_unpack(if_packet_info);
}
@@ -136,6 +149,36 @@ BOOST_AUTO_TEST_CASE(test_with_all){
if_packet_info.cid = std::rand();
if_packet_info.tsi = std::rand();
if_packet_info.tsf = std::rand();
- if_packet_info.num_payload_words32 = 44444;
+ if_packet_info.num_payload_words32 = 44;
+ pack_and_unpack(if_packet_info);
+}
+
+BOOST_AUTO_TEST_CASE(test_with_vrlp){
+ vrt::if_packet_info_t if_packet_info;
+ if_packet_info.link_type = vrt::if_packet_info_t::LINK_TYPE_VRLP;
+ if_packet_info.packet_count = 3;
+ if_packet_info.has_sid = true;
+ if_packet_info.has_cid = false;
+ if_packet_info.has_tsi = false;
+ if_packet_info.has_tsf = true;
+ if_packet_info.has_tlr = true;
+ if_packet_info.tsi = std::rand();
+ if_packet_info.tsf = std::rand();
+ if_packet_info.num_payload_words32 = 42;
+ pack_and_unpack(if_packet_info);
+}
+
+BOOST_AUTO_TEST_CASE(test_with_chdr){
+ vrt::if_packet_info_t if_packet_info;
+ if_packet_info.link_type = vrt::if_packet_info_t::LINK_TYPE_CHDR;
+ if_packet_info.packet_count = 7;
+ if_packet_info.has_sid = true;
+ if_packet_info.has_cid = false;
+ if_packet_info.has_tsi = false;
+ if_packet_info.has_tsf = true;
+ if_packet_info.has_tlr = true;
+ if_packet_info.tsi = std::rand();
+ if_packet_info.tsf = std::rand();
+ if_packet_info.num_payload_words32 = 24;
pack_and_unpack(if_packet_info);
}