diff options
29 files changed, 442 insertions, 124 deletions
| diff --git a/host/examples/benchmark_rate.cpp b/host/examples/benchmark_rate.cpp index ed3a8580a..3a69e001b 100644 --- a/host/examples/benchmark_rate.cpp +++ b/host/examples/benchmark_rate.cpp @@ -42,37 +42,32 @@ unsigned long long num_seq_errors = 0;  /***********************************************************************   * Benchmark RX Rate   **********************************************************************/ -void benchmark_rx_rate(uhd::usrp::multi_usrp::sptr usrp, const std::string &rx_cpu, const std::string &rx_otw){ +void benchmark_rx_rate(uhd::usrp::multi_usrp::sptr usrp, const std::string &rx_cpu, uhd::rx_streamer::sptr rx_stream){      uhd::set_thread_priority_safe(); -    //create a receive streamer -    uhd::stream_args_t stream_args(rx_cpu, rx_otw); -    for (size_t ch = 0; ch < usrp->get_num_mboards(); ch++) //linear channel mapping -        stream_args.channels.push_back(ch); -    uhd::rx_streamer::sptr rx_stream = usrp->get_rx_stream(stream_args); -      //print pre-test summary      std::cout << boost::format( -        "Testing receive rate %f Msps" -    ) % (usrp->get_rx_rate()/1e6) << std::endl; +        "Testing receive rate %f Msps on %u channels" +    ) % (usrp->get_rx_rate()/1e6) % rx_stream->get_num_channels() << std::endl;      //setup variables and allocate buffer      uhd::rx_metadata_t md;      const size_t max_samps_per_packet = rx_stream->get_max_num_samps();      std::vector<char> buff(max_samps_per_packet*uhd::convert::get_bytes_per_item(rx_cpu));      std::vector<void *> buffs; -    for (size_t ch = 0; ch < stream_args.channels.size(); ch++) +    for (size_t ch = 0; ch < rx_stream->get_num_channels(); ch++)          buffs.push_back(&buff.front()); //same buffer for each channel      bool had_an_overflow = false;      uhd::time_spec_t last_time;      const double rate = usrp->get_rx_rate(); +    issue_new_stream_cmd:      uhd::stream_cmd_t cmd(uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS);      cmd.time_spec = usrp->get_time_now() + uhd::time_spec_t(0.05);      cmd.stream_now = (buffs.size() == 1); -    usrp->issue_stream_cmd(cmd); +    rx_stream->issue_stream_cmd(cmd);      while (not boost::this_thread::interruption_requested()){ -        num_rx_samps += rx_stream->recv(buffs, max_samps_per_packet, md); +        num_rx_samps += rx_stream->recv(buffs, max_samps_per_packet, md)*rx_stream->get_num_channels();          //handle the error codes          switch(md.error_code){ @@ -87,6 +82,11 @@ void benchmark_rx_rate(uhd::usrp::multi_usrp::sptr usrp, const std::string &rx_c              had_an_overflow = true;              last_time = md.time_spec;              num_overflows++; +            if (rx_stream->get_num_channels() > 1) +            { +                rx_stream->issue_stream_cmd(uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS); +                goto issue_new_stream_cmd; +            }              break;          default: @@ -96,25 +96,19 @@ void benchmark_rx_rate(uhd::usrp::multi_usrp::sptr usrp, const std::string &rx_c          }      } -    usrp->issue_stream_cmd(uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS); +    rx_stream->issue_stream_cmd(uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS);  }  /***********************************************************************   * Benchmark TX Rate   **********************************************************************/ -void benchmark_tx_rate(uhd::usrp::multi_usrp::sptr usrp, const std::string &tx_cpu, const std::string &tx_otw){ +void benchmark_tx_rate(uhd::usrp::multi_usrp::sptr usrp, const std::string &tx_cpu, uhd::tx_streamer::sptr tx_stream){      uhd::set_thread_priority_safe(); -    //create a transmit streamer -    uhd::stream_args_t stream_args(tx_cpu, tx_otw); -    for (size_t ch = 0; ch < usrp->get_num_mboards(); ch++) //linear channel mapping -        stream_args.channels.push_back(ch); -    uhd::tx_streamer::sptr tx_stream = usrp->get_tx_stream(stream_args); -      //print pre-test summary      std::cout << boost::format( -        "Testing transmit rate %f Msps" -    ) % (usrp->get_tx_rate()/1e6) << std::endl; +        "Testing transmit rate %f Msps on %u channels" +    ) % (usrp->get_tx_rate()/1e6) % tx_stream->get_num_channels() << std::endl;      //setup variables and allocate buffer      uhd::tx_metadata_t md; @@ -122,12 +116,12 @@ void benchmark_tx_rate(uhd::usrp::multi_usrp::sptr usrp, const std::string &tx_c      const size_t max_samps_per_packet = tx_stream->get_max_num_samps();      std::vector<char> buff(max_samps_per_packet*uhd::convert::get_bytes_per_item(tx_cpu));      std::vector<const void *> buffs; -    for (size_t ch = 0; ch < stream_args.channels.size(); ch++) +    for (size_t ch = 0; ch < tx_stream->get_num_channels(); ch++)          buffs.push_back(&buff.front()); //same buffer for each channel      md.has_time_spec = (buffs.size() != 1);      while (not boost::this_thread::interruption_requested()){ -        num_tx_samps += tx_stream->send(buffs, max_samps_per_packet, md); +        num_tx_samps += tx_stream->send(buffs, max_samps_per_packet, md)*tx_stream->get_num_channels();;          md.has_time_spec = false;      } @@ -136,13 +130,13 @@ void benchmark_tx_rate(uhd::usrp::multi_usrp::sptr usrp, const std::string &tx_c      tx_stream->send(buffs, 0, md);  } -void benchmark_tx_rate_async_helper(uhd::usrp::multi_usrp::sptr usrp){ +void benchmark_tx_rate_async_helper(uhd::tx_streamer::sptr tx_stream){      //setup variables and allocate buffer      uhd::async_metadata_t async_md;      while (not boost::this_thread::interruption_requested()){ -        if (not usrp->get_device()->recv_async_msg(async_md)) continue; +        if (not tx_stream->recv_async_msg(async_md)) continue;          //handle the error codes          switch(async_md.event_code){ @@ -232,14 +226,24 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){      //spawn the receive test thread      if (vm.count("rx_rate")){          usrp->set_rx_rate(rx_rate); -        thread_group.create_thread(boost::bind(&benchmark_rx_rate, usrp, rx_cpu, rx_otw)); +        //create a receive streamer +        uhd::stream_args_t stream_args(rx_cpu, rx_otw); +        for (size_t ch = 0; ch < usrp->get_rx_num_channels(); ch++) //linear channel mapping +            stream_args.channels.push_back(ch); +        uhd::rx_streamer::sptr rx_stream = usrp->get_rx_stream(stream_args); +        thread_group.create_thread(boost::bind(&benchmark_rx_rate, usrp, rx_cpu, rx_stream));      }      //spawn the transmit test thread      if (vm.count("tx_rate")){          usrp->set_tx_rate(tx_rate); -        thread_group.create_thread(boost::bind(&benchmark_tx_rate, usrp, tx_cpu, tx_otw)); -        thread_group.create_thread(boost::bind(&benchmark_tx_rate_async_helper, usrp)); +        //create a transmit streamer +        uhd::stream_args_t stream_args(tx_cpu, tx_otw); +        for (size_t ch = 0; ch < usrp->get_tx_num_channels(); ch++) //linear channel mapping +            stream_args.channels.push_back(ch); +        uhd::tx_streamer::sptr tx_stream = usrp->get_tx_stream(stream_args); +        thread_group.create_thread(boost::bind(&benchmark_tx_rate, usrp, tx_cpu, tx_stream)); +        thread_group.create_thread(boost::bind(&benchmark_tx_rate_async_helper, tx_stream));      }      //sleep for the required duration diff --git a/host/examples/latency_test.cpp b/host/examples/latency_test.cpp index 461ac9bf8..da06086f9 100644 --- a/host/examples/latency_test.cpp +++ b/host/examples/latency_test.cpp @@ -103,7 +103,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){          stream_cmd.num_samps = buffer.size();          stream_cmd.stream_now = false;          stream_cmd.time_spec = usrp->get_time_now() + uhd::time_spec_t(0.01); -        usrp->issue_stream_cmd(stream_cmd); +        rx_stream->issue_stream_cmd(stream_cmd);          /***************************************************************           * Receive the requested packet @@ -133,7 +133,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){           * Check the async messages for result           **************************************************************/          uhd::async_metadata_t async_md; -        if (not usrp->get_device()->recv_async_msg(async_md)){ +        if (not tx_stream->recv_async_msg(async_md)){              std::cout << boost::format("failed:\n    Async message recv timed out.\n") << std::endl;              continue;          } diff --git a/host/examples/rx_ascii_art_dft.cpp b/host/examples/rx_ascii_art_dft.cpp index 1aede51a4..df3256b09 100644 --- a/host/examples/rx_ascii_art_dft.cpp +++ b/host/examples/rx_ascii_art_dft.cpp @@ -148,7 +148,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){      //-- Initialize      //------------------------------------------------------------------      initscr(); //curses init -    usrp->issue_stream_cmd(uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS); +    rx_stream->issue_stream_cmd(uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS);      boost::system_time next_refresh = boost::get_system_time();      //------------------------------------------------------------------ @@ -189,7 +189,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){      //------------------------------------------------------------------      //-- Cleanup      //------------------------------------------------------------------ -    usrp->issue_stream_cmd(uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS); +    rx_stream->issue_stream_cmd(uhd::stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS);      endwin(); //curses done      //finished diff --git a/host/examples/rx_multi_samples.cpp b/host/examples/rx_multi_samples.cpp index 83d648eb5..4f312ad36 100644 --- a/host/examples/rx_multi_samples.cpp +++ b/host/examples/rx_multi_samples.cpp @@ -126,7 +126,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){      stream_cmd.num_samps = total_num_samps;      stream_cmd.stream_now = false;      stream_cmd.time_spec = uhd::time_spec_t(seconds_in_future); -    usrp->issue_stream_cmd(stream_cmd); //tells all channels to stream +    rx_stream->issue_stream_cmd(stream_cmd); //tells all channels to stream      //meta-data will be filled in by recv()      uhd::rx_metadata_t md; diff --git a/host/examples/rx_samples_to_file.cpp b/host/examples/rx_samples_to_file.cpp index d0a02a6c1..1e833defa 100644 --- a/host/examples/rx_samples_to_file.cpp +++ b/host/examples/rx_samples_to_file.cpp @@ -58,7 +58,7 @@ template<typename samp_type> void recv_to_file(      stream_cmd.num_samps = num_requested_samples;      stream_cmd.stream_now = true;      stream_cmd.time_spec = uhd::time_spec_t(); -    usrp->issue_stream_cmd(stream_cmd); +    rx_stream->issue_stream_cmd(stream_cmd);      while(not stop_signal_called and (num_requested_samples != num_total_samps or num_requested_samples == 0)){          size_t num_rx_samps = rx_stream->recv(&buff.front(), buff.size(), md, 3.0); diff --git a/host/examples/rx_samples_to_udp.cpp b/host/examples/rx_samples_to_udp.cpp index f637f9313..0b3c6dce3 100644 --- a/host/examples/rx_samples_to_udp.cpp +++ b/host/examples/rx_samples_to_udp.cpp @@ -127,7 +127,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){      uhd::stream_cmd_t stream_cmd(uhd::stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_DONE);      stream_cmd.num_samps = total_num_samps;      stream_cmd.stream_now = true; -    usrp->issue_stream_cmd(stream_cmd); +    rx_stream->issue_stream_cmd(stream_cmd);      //loop until total number of samples reached      size_t num_acc_samps = 0; //number of accumulated samples diff --git a/host/examples/rx_timed_samples.cpp b/host/examples/rx_timed_samples.cpp index f0d49b4bd..0eea2ffee 100644 --- a/host/examples/rx_timed_samples.cpp +++ b/host/examples/rx_timed_samples.cpp @@ -83,7 +83,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){      stream_cmd.num_samps = total_num_samps;      stream_cmd.stream_now = false;      stream_cmd.time_spec = uhd::time_spec_t(seconds_in_future); -    usrp->issue_stream_cmd(stream_cmd); +    rx_stream->issue_stream_cmd(stream_cmd);      //meta-data will be filled in by recv()      uhd::rx_metadata_t md; diff --git a/host/examples/test_messages.cpp b/host/examples/test_messages.cpp index 6f2ddfe28..e39a8bd30 100644 --- a/host/examples/test_messages.cpp +++ b/host/examples/test_messages.cpp @@ -46,7 +46,7 @@ bool test_late_command_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streame      stream_cmd.num_samps = rx_stream->get_max_num_samps();      stream_cmd.stream_now = false;      stream_cmd.time_spec = uhd::time_spec_t(100.0); //time in the past -    usrp->issue_stream_cmd(stream_cmd); +    rx_stream->issue_stream_cmd(stream_cmd);      std::vector<std::complex<float> > buff(rx_stream->get_max_num_samps());      uhd::rx_metadata_t md; @@ -90,7 +90,7 @@ bool test_broken_chain_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streame      uhd::stream_cmd_t stream_cmd(uhd::stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_MORE);      stream_cmd.stream_now = true;      stream_cmd.num_samps = rx_stream->get_max_num_samps(); -    usrp->issue_stream_cmd(stream_cmd); +    rx_stream->issue_stream_cmd(stream_cmd);      std::vector<std::complex<float> > buff(rx_stream->get_max_num_samps());      uhd::rx_metadata_t md; @@ -132,7 +132,7 @@ bool test_broken_chain_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streame   *    Send a burst of many samples that will fragment internally.   *    We expect to get an burst ack async message.   */ -bool test_burst_ack_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streamer::sptr, uhd::tx_streamer::sptr tx_stream){ +bool test_burst_ack_message(uhd::usrp::multi_usrp::sptr, uhd::rx_streamer::sptr, uhd::tx_streamer::sptr tx_stream){      std::cout << "Test burst ack message... " << std::flush;      uhd::tx_metadata_t md; @@ -148,7 +148,7 @@ bool test_burst_ack_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streamer::      );      uhd::async_metadata_t async_md; -    if (not usrp->get_device()->recv_async_msg(async_md)){ +    if (not tx_stream->recv_async_msg(async_md)){          std::cout << boost::format(              "failed:\n"              "    Async message recv timed out.\n" @@ -178,7 +178,7 @@ bool test_burst_ack_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streamer::   *    Send a start of burst packet with no following end of burst.   *    We expect to get an underflow(within a burst) async message.   */ -bool test_underflow_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streamer::sptr, uhd::tx_streamer::sptr tx_stream){ +bool test_underflow_message(uhd::usrp::multi_usrp::sptr, uhd::rx_streamer::sptr, uhd::tx_streamer::sptr tx_stream){      std::cout << "Test underflow message... " << std::flush;      uhd::tx_metadata_t md; @@ -189,7 +189,7 @@ bool test_underflow_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streamer::      tx_stream->send("", 0, md);      uhd::async_metadata_t async_md; -    if (not usrp->get_device()->recv_async_msg(async_md, 1)){ +    if (not tx_stream->recv_async_msg(async_md, 1)){          std::cout << boost::format(              "failed:\n"              "    Async message recv timed out.\n" @@ -233,7 +233,7 @@ bool test_time_error_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streamer:      tx_stream->send("", 0, md);      uhd::async_metadata_t async_md; -    if (not usrp->get_device()->recv_async_msg(async_md)){ +    if (not tx_stream->recv_async_msg(async_md)){          std::cout << boost::format(              "failed:\n"              "    Async message recv timed out.\n" @@ -258,9 +258,9 @@ bool test_time_error_message(uhd::usrp::multi_usrp::sptr usrp, uhd::rx_streamer:      }  } -void flush_async(uhd::usrp::multi_usrp::sptr usrp){ +void flush_async(uhd::tx_streamer::sptr tx_stream){      uhd::async_metadata_t async_md; -    while (usrp->get_device()->recv_async_msg(async_md)){} +    while (tx_stream->recv_async_msg(async_md)){}  }  void flush_recv(uhd::rx_streamer::sptr rx_stream){ @@ -331,7 +331,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){      for (size_t n = 0; n < ntests; n++){          std::string key = tests.keys()[std::rand() % tests.size()];          bool pass = tests[key](usrp, rx_stream, tx_stream); -        flush_async(usrp); +        flush_async(tx_stream);          flush_recv(rx_stream);          //store result diff --git a/host/examples/test_timed_commands.cpp b/host/examples/test_timed_commands.cpp index cecf1607c..5dee58887 100644 --- a/host/examples/test_timed_commands.cpp +++ b/host/examples/test_timed_commands.cpp @@ -97,7 +97,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){      stream_cmd.stream_now = true;      const uhd::time_spec_t stream_time = usrp->get_time_now() + uhd::time_spec_t(0.1);      usrp->set_command_time(stream_time); -    usrp->issue_stream_cmd(stream_cmd); +    rx_stream->issue_stream_cmd(stream_cmd);      usrp->clear_command_time();      //meta-data will be filled in by recv() diff --git a/host/examples/transport_hammer.cpp b/host/examples/transport_hammer.cpp index ff35ceb21..a44e81a82 100644 --- a/host/examples/transport_hammer.cpp +++ b/host/examples/transport_hammer.cpp @@ -41,15 +41,9 @@ unsigned long long num_seq_errors = 0;  /***********************************************************************   * RX Hammer   **********************************************************************/ -void rx_hammer(uhd::usrp::multi_usrp::sptr usrp, const std::string &rx_cpu, const std::string &rx_otw){ +void rx_hammer(uhd::usrp::multi_usrp::sptr usrp, const std::string &rx_cpu, uhd::rx_streamer::sptr rx_stream){      uhd::set_thread_priority_safe(); -    //create a receive streamer -    uhd::stream_args_t stream_args(rx_cpu, rx_otw); -    for (size_t ch = 0; ch < usrp->get_num_mboards(); ch++) //linear channel mapping -        stream_args.channels.push_back(ch); -    uhd::rx_streamer::sptr rx_stream = usrp->get_rx_stream(stream_args); -      //print pre-test summary      std::cout << boost::format(          "Testing receive rate %f Msps" @@ -60,7 +54,7 @@ void rx_hammer(uhd::usrp::multi_usrp::sptr usrp, const std::string &rx_cpu, cons      const size_t max_samps_per_packet = rx_stream->get_max_num_samps();      std::vector<char> buff(max_samps_per_packet*uhd::convert::get_bytes_per_item(rx_cpu));      std::vector<void *> buffs; -    for (size_t ch = 0; ch < stream_args.channels.size(); ch++) +    for (size_t ch = 0; ch < rx_stream->get_num_channels(); ch++)          buffs.push_back(&buff.front()); //same buffer for each channel      bool had_an_overflow = false;      uhd::time_spec_t last_time; @@ -74,7 +68,7 @@ void rx_hammer(uhd::usrp::multi_usrp::sptr usrp, const std::string &rx_cpu, cons      while (not boost::this_thread::interruption_requested()){          cmd.num_samps = rand() % 100000; -        usrp->issue_stream_cmd(cmd); +        rx_stream->issue_stream_cmd(cmd);          num_rx_samps += rx_stream->recv(buffs, max_samps_per_packet, md, timeout, true);          //handle the error codes @@ -103,16 +97,15 @@ void rx_hammer(uhd::usrp::multi_usrp::sptr usrp, const std::string &rx_cpu, cons  /***********************************************************************   * TX Hammer   **********************************************************************/ -void tx_hammer(uhd::usrp::multi_usrp::sptr usrp, const std::string &tx_cpu, const std::string &tx_otw){ +void tx_hammer(uhd::usrp::multi_usrp::sptr usrp, const std::string &tx_cpu, uhd::tx_streamer::sptr tx_stream){      uhd::set_thread_priority_safe(); -    //create a transmit streamer -    uhd::stream_args_t stream_args(tx_cpu, tx_otw); -    for (size_t ch = 0; ch < usrp->get_num_mboards(); ch++) //linear channel mapping -        stream_args.channels.push_back(ch); -    uhd::tx_streamer::sptr tx_stream = usrp->get_tx_stream(stream_args);      uhd::tx_metadata_t md; -    std::vector<std::complex<float> > buff(10000); +    const size_t max_samps_per_packet = tx_stream->get_max_num_samps(); +    std::vector<char> buff(max_samps_per_packet*uhd::convert::get_bytes_per_item(tx_cpu)); +    std::vector<void *> buffs; +    for (size_t ch = 0; ch < tx_stream->get_num_channels(); ch++) +        buffs.push_back(&buff.front()); //same buffer for each channel      //print pre-test summary      std::cout << boost::format( @@ -130,7 +123,7 @@ void tx_hammer(uhd::usrp::multi_usrp::sptr usrp, const std::string &tx_cpu, cons          while(num_acc_samps < total_num_samps){              //send a single packet -            num_tx_samps += tx_stream->send(&buff, tx_stream->get_max_num_samps(), md, timeout); +            num_tx_samps += tx_stream->send(buffs, max_samps_per_packet, md, timeout);              num_acc_samps += std::min(total_num_samps-num_acc_samps, tx_stream->get_max_num_samps());          } @@ -140,13 +133,13 @@ void tx_hammer(uhd::usrp::multi_usrp::sptr usrp, const std::string &tx_cpu, cons      }  } -void tx_hammer_async_helper(uhd::usrp::multi_usrp::sptr usrp){ +void tx_hammer_async_helper(uhd::tx_streamer::sptr tx_stream){      //setup variables and allocate buffer      uhd::async_metadata_t async_md;      while (not boost::this_thread::interruption_requested()){ -        if (not usrp->get_device()->recv_async_msg(async_md)) continue; +        if (not tx_stream->recv_async_msg(async_md)) continue;          //handle the error codes          switch(async_md.event_code){ @@ -239,14 +232,24 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){      //spawn the receive test thread      if (vm.count("rx_rate")){          usrp->set_rx_rate(rx_rate); -        thread_group.create_thread(boost::bind(&rx_hammer, usrp, rx_cpu, rx_otw)); +        //create a receive streamer +        uhd::stream_args_t stream_args(rx_cpu, rx_otw); +        for (size_t ch = 0; ch < usrp->get_num_mboards(); ch++) //linear channel mapping +            stream_args.channels.push_back(ch); +        uhd::rx_streamer::sptr rx_stream = usrp->get_rx_stream(stream_args); +        thread_group.create_thread(boost::bind(&rx_hammer, usrp, rx_cpu, rx_stream));      }      //spawn the transmit test thread      if (vm.count("tx_rate")){          usrp->set_tx_rate(tx_rate); -        thread_group.create_thread(boost::bind(&tx_hammer, usrp, tx_cpu, tx_otw)); -        thread_group.create_thread(boost::bind(&tx_hammer_async_helper, usrp)); +        //create a transmit streamer +        uhd::stream_args_t stream_args(tx_cpu, tx_otw); +        for (size_t ch = 0; ch < usrp->get_num_mboards(); ch++) //linear channel mapping +            stream_args.channels.push_back(ch); +        uhd::tx_streamer::sptr tx_stream = usrp->get_tx_stream(stream_args); +        thread_group.create_thread(boost::bind(&tx_hammer, usrp, tx_cpu, tx_stream)); +        thread_group.create_thread(boost::bind(&tx_hammer_async_helper, tx_stream));      }      //sleep for the required duration diff --git a/host/examples/tx_bursts.cpp b/host/examples/tx_bursts.cpp index 8dd4a002c..c58d3e178 100644 --- a/host/examples/tx_bursts.cpp +++ b/host/examples/tx_bursts.cpp @@ -147,7 +147,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){          uhd::async_metadata_t async_md;          bool got_async_burst_ack = false;          //loop through all messages for the ACK packet (may have underflow messages in queue) -        while (not got_async_burst_ack and usrp->get_device()->recv_async_msg(async_md, seconds_in_future)){ +        while (not got_async_burst_ack and tx_stream->recv_async_msg(async_md, seconds_in_future)){              got_async_burst_ack = (async_md.event_code == uhd::async_metadata_t::EVENT_CODE_BURST_ACK);          }          std::cout << (got_async_burst_ack? "success" : "fail") << std::endl; diff --git a/host/examples/tx_timed_samples.cpp b/host/examples/tx_timed_samples.cpp index 4cc31a7c0..2eef80389 100644 --- a/host/examples/tx_timed_samples.cpp +++ b/host/examples/tx_timed_samples.cpp @@ -116,7 +116,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){      uhd::async_metadata_t async_md;      bool got_async_burst_ack = false;      //loop through all messages for the ACK packet (may have underflow messages in queue) -    while (not got_async_burst_ack and usrp->get_device()->recv_async_msg(async_md, timeout)){ +    while (not got_async_burst_ack and tx_stream->recv_async_msg(async_md, timeout)){          got_async_burst_ack = (async_md.event_code == uhd::async_metadata_t::EVENT_CODE_BURST_ACK);      }      std::cout << (got_async_burst_ack? "success" : "fail") << std::endl; diff --git a/host/examples/txrx_loopback_to_file.cpp b/host/examples/txrx_loopback_to_file.cpp index 9dc348da8..ef119b689 100644 --- a/host/examples/txrx_loopback_to_file.cpp +++ b/host/examples/txrx_loopback_to_file.cpp @@ -151,7 +151,7 @@ template<typename samp_type> void recv_to_file(      stream_cmd.num_samps = num_requested_samples;      stream_cmd.stream_now = false;      stream_cmd.time_spec = uhd::time_spec_t(settling_time); -    usrp->issue_stream_cmd(stream_cmd); +    rx_stream->issue_stream_cmd(stream_cmd);      while(not stop_signal_called and (num_requested_samples != num_total_samps or num_requested_samples == 0)){          size_t num_rx_samps = rx_stream->recv(&buff.front(), buff.size(), md, timeout); diff --git a/host/include/uhd/device.hpp b/host/include/uhd/device.hpp index 89c6332da..de39383c9 100644 --- a/host/include/uhd/device.hpp +++ b/host/include/uhd/device.hpp @@ -82,16 +82,6 @@ public:      //! Make a new transmit streamer from the streamer arguments      virtual tx_streamer::sptr get_tx_stream(const stream_args_t &args) = 0; -    /*! -     * Receive and asynchronous message from the device. -     * \param async_metadata the metadata to be filled in -     * \param timeout the timeout in seconds to wait for a message -     * \return true when the async_metadata is valid, false for timeout -     */ -    virtual bool recv_async_msg( -        async_metadata_t &async_metadata, double timeout = 0.1 -    ) = 0; -      //! Get access to the underlying property structure      virtual boost::shared_ptr<property_tree> get_tree(void) const = 0; diff --git a/host/include/uhd/device_deprecated.ipp b/host/include/uhd/device_deprecated.ipp index 0ee1cd706..58c43a876 100644 --- a/host/include/uhd/device_deprecated.ipp +++ b/host/include/uhd/device_deprecated.ipp @@ -184,6 +184,16 @@ size_t get_max_recv_samps_per_packet(void){      return _rx_streamer->get_max_num_samps();  } +/*! + * Receive and asynchronous message from the device. + * \param async_metadata the metadata to be filled in + * \param timeout the timeout in seconds to wait for a message + * \return true when the async_metadata is valid, false for timeout + */ +virtual bool recv_async_msg( +    async_metadata_t &async_metadata, double timeout = 0.1 +) = 0; +  private:      rx_streamer::sptr _rx_streamer;      io_type_t::tid_t _recv_tid; diff --git a/host/include/uhd/exception.hpp b/host/include/uhd/exception.hpp index c05861788..69e902fd5 100644 --- a/host/include/uhd/exception.hpp +++ b/host/include/uhd/exception.hpp @@ -157,9 +157,9 @@ namespace uhd{       * If the code evaluates to false, throw an assertion error.       * \param code the code that resolved to a boolean       */ -    #define UHD_ASSERT_THROW(code) if (not (code)) \ +    #define UHD_ASSERT_THROW(code) {if (not (code)) \          throw uhd::assertion_error(UHD_THROW_SITE_INFO(#code)); \ -    else void(0) +    }  } //namespace uhd diff --git a/host/include/uhd/stream.hpp b/host/include/uhd/stream.hpp index c05e8a1e9..75b097ded 100644 --- a/host/include/uhd/stream.hpp +++ b/host/include/uhd/stream.hpp @@ -1,5 +1,5 @@  // -// Copyright 2011-2012 Ettus Research LLC +// Copyright 2011-2013 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -21,6 +21,7 @@  #include <uhd/config.hpp>  #include <uhd/types/metadata.hpp>  #include <uhd/types/device_addr.hpp> +#include <uhd/types/stream_cmd.hpp>  #include <uhd/types/ref_vector.hpp>  #include <boost/utility.hpp>  #include <boost/shared_ptr.hpp> @@ -172,6 +173,19 @@ public:          const double timeout = 0.1,          const bool one_packet = false      ) = 0; + +    /*! +     * Issue a stream command to the usrp device. +     * This tells the usrp to send samples into the host. +     * See the documentation for stream_cmd_t for more info. +     * +     * With multiple devices, the first stream command in a chain of commands +     * should have a time spec in the near future and stream_now = false; +     * to ensure that the packets can be aligned by their time specs. +     * +     * \param stream_cmd the stream command to issue +     */ +    virtual void issue_stream_cmd(const stream_cmd_t &stream_cmd) = 0;  };  /*! @@ -221,6 +235,16 @@ public:          const tx_metadata_t &metadata,          const double timeout = 0.1      ) = 0; + +    /*! +     * Receive and asynchronous message from this TX stream. +     * \param async_metadata the metadata to be filled in +     * \param timeout the timeout in seconds to wait for a message +     * \return true when the async_metadata is valid, false for timeout +     */ +    virtual bool recv_async_msg( +        async_metadata_t &async_metadata, double timeout = 0.1 +    ) = 0;  };  } //namespace uhd diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp index 9c24005b7..35ffb293b 100644 --- a/host/include/uhd/transport/bounded_buffer.ipp +++ b/host/include/uhd/transport/bounded_buffer.ipp @@ -1,5 +1,5 @@  // -// Copyright 2010-2011 Ettus Research LLC +// Copyright 2010-2013 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -26,7 +26,7 @@  #include <boost/thread/condition.hpp>  #include <boost/thread/locks.hpp> -namespace uhd{ namespace transport{ namespace{ /*anon*/ +namespace uhd{ namespace transport{      template <typename elem_type> class bounded_buffer_detail : boost::noncopyable{      public: @@ -142,6 +142,6 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/          }      }; -}}} //namespace +}} //namespace  #endif /* INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_IPP */ diff --git a/host/include/uhd/transport/vrt_if_packet.hpp b/host/include/uhd/transport/vrt_if_packet.hpp index 1be480874..d16892281 100644 --- a/host/include/uhd/transport/vrt_if_packet.hpp +++ b/host/include/uhd/transport/vrt_if_packet.hpp @@ -1,5 +1,5 @@  // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2013 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -26,6 +26,9 @@ namespace uhd{ namespace transport{  namespace vrt{ +    //! The maximum number of 32-bit words in the vrlp link layer +    static const size_t num_vrl_words32 = 3; +      //! The maximum number of 32-bit words in a vrt if packet header      static const size_t max_if_hdr_words32 = 7; //hdr+sid+cid+tsi+tsf @@ -34,12 +37,24 @@ namespace vrt{       * The size fields are used for input and output depending upon       * the operation used (ie the pack or unpack function call).       */ -    struct UHD_API if_packet_info_t{ -        //packet type (pack only supports data) -        enum packet_type_t { +    struct UHD_API if_packet_info_t +    { +        if_packet_info_t(void); + +        //link layer type - always set for pack and unpack +        enum link_type_t +        { +            LINK_TYPE_NONE = 0x0, +            LINK_TYPE_CHDR = 0x1, +            LINK_TYPE_VRLP = 0x2, +        } link_type; + +        //packet type +        enum packet_type_t +        {              PACKET_TYPE_DATA      = 0x0, -            PACKET_TYPE_EXTENSION = 0x1, -            PACKET_TYPE_CONTEXT   = 0x2 +            PACKET_TYPE_IF_EXT    = 0x1, +            PACKET_TYPE_CONTEXT   = 0x2, //extension context: has_sid = true          } packet_type;          //size fields @@ -100,6 +115,22 @@ namespace vrt{          if_packet_info_t &if_packet_info      ); +    UHD_INLINE if_packet_info_t::if_packet_info_t(void): +        link_type(LINK_TYPE_NONE), +        packet_type(PACKET_TYPE_DATA), +        num_payload_words32(0), +        num_payload_bytes(0), +        num_header_words32(0), +        num_packet_words32(0), +        packet_count(0), +        sob(false), eob(false), +        has_sid(false), sid(0), +        has_cid(false), cid(0), +        has_tsi(false), tsi(0), +        has_tsf(false), tsf(0), +        has_tlr(false), tlr(0) +    {} +  } //namespace vrt  }} //namespace diff --git a/host/lib/transport/gen_vrt_if_packet.py b/host/lib/transport/gen_vrt_if_packet.py index e28ce3aae..56e7c61bf 100644 --- a/host/lib/transport/gen_vrt_if_packet.py +++ b/host/lib/transport/gen_vrt_if_packet.py @@ -1,6 +1,6 @@  #!/usr/bin/env python  # -# Copyright 2010-2011 Ettus Research LLC +# Copyright 2010-2013 Ettus Research LLC  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by @@ -48,12 +48,14 @@ TMPL_TEXT = """  using namespace uhd;  using namespace uhd::transport; +using namespace uhd::transport::vrt;  typedef size_t pred_type;  typedef std::vector<pred_type> pred_table_type;  #define pred_table_index(hdr) ((hdr >> 20) & 0x1ff) -static pred_table_type get_pred_unpack_table(void){ +static pred_table_type get_pred_unpack_table(void) +{      pred_table_type table(1 << 9, 0); //only 9 bits useful here (20-28)      for (size_t i = 0; i < table.size(); i++){          boost::uint32_t vrt_hdr_word = i << 20; @@ -74,13 +76,43 @@ static const pred_table_type pred_unpack_table(get_pred_unpack_table());  //maps num empty bytes to trailer bits  static const size_t occ_table[] = {0, 2, 1, 3}; +const boost::uint32_t VRLP = ('V' << 24) | ('R' << 16) | ('L' << 8) | ('P' << 0); +const boost::uint32_t VEND = ('V' << 24) | ('E' << 16) | ('N' << 8) | ('D' << 0); + +UHD_INLINE static boost::uint32_t chdr_to_vrt(const boost::uint32_t chdr, size_t &packet_count) +{ +    boost::uint32_t vrt = chdr & 0xffff; //words32 +    packet_count = (chdr >> 16) & 0xfff; +    vrt |= ((chdr >> 31) & 0x1) << 30; //context packet +    vrt |= ((chdr >> 30) & 0x1) << 26; //has tlr +    vrt |= ((chdr >> 29) & 0x1) << 20; //has tsf +    vrt |= ((chdr >> 28) & 0x1) << 24; //has eob +    vrt |= (0x1) << 28; //has sid (always) +    return vrt; +} + +UHD_INLINE static boost::uint32_t vrt_to_chdr(const boost::uint32_t vrt, const size_t packet_count) +{ +    boost::uint32_t chdr = vrt & 0xffff; //words32 +    chdr |= (packet_count & 0xfff) << 16; +    chdr |= ((vrt >> 30) & 0x1) << 31; //context packet +    chdr |= ((vrt >> 26) & 0x1) << 30; //has tlr +    chdr |= ((vrt >> 20) & 0x1) << 29; //has tsf +    chdr |= ((vrt >> 24) & 0x1) << 28; //has eob +    return chdr; +} +  ########################################################################  #def gen_code($XE_MACRO, $suffix)  ######################################################################## -void vrt::if_hdr_pack_$(suffix)( +/*********************************************************************** + * interal impl of packing VRT IF header only + **********************************************************************/ +UHD_INLINE void __if_hdr_pack_$(suffix)(      boost::uint32_t *packet_buff, -    if_packet_info_t &if_packet_info +    if_packet_info_t &if_packet_info, +    boost::uint32_t &vrt_hdr_word32  ){      boost::uint32_t vrt_hdr_flags = 0; @@ -154,31 +186,33 @@ void vrt::if_hdr_pack_$(suffix)(      }      //fill in complete header word -    packet_buff[0] = $(XE_MACRO)(boost::uint32_t(0 +    vrt_hdr_word32 = boost::uint32_t(0          | (if_packet_info.packet_type << 29)          | vrt_hdr_flags          | ((if_packet_info.packet_count & 0xf) << 16)          | (if_packet_info.num_packet_words32 & 0xffff) -    )); +    );  } -void vrt::if_hdr_unpack_$(suffix)( +/*********************************************************************** + * interal impl of unpacking VRT IF header only + **********************************************************************/ +UHD_INLINE void __if_hdr_unpack_$(suffix)(      const boost::uint32_t *packet_buff, -    if_packet_info_t &if_packet_info +    if_packet_info_t &if_packet_info, +    const boost::uint32_t vrt_hdr_word32  ){ -    //extract vrt header -    boost::uint32_t vrt_hdr_word = $(XE_MACRO)(packet_buff[0]); -    size_t packet_words32 = vrt_hdr_word & 0xffff; +    const size_t packet_words32 = vrt_hdr_word32 & 0xffff;      //failure case      if (if_packet_info.num_packet_words32 < packet_words32)          throw uhd::value_error("bad vrt header or packet fragment");      //extract fields from the header -    if_packet_info.packet_type = if_packet_info_t::packet_type_t(vrt_hdr_word >> 29); -    if_packet_info.packet_count = (vrt_hdr_word >> 16) & 0xf; +    if_packet_info.packet_type = if_packet_info_t::packet_type_t(vrt_hdr_word32 >> 29); +    if_packet_info.packet_count = (vrt_hdr_word32 >> 16) & 0xf; -    const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word)]; +    const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word32)];      size_t empty_bytes = 0; @@ -259,6 +293,84 @@ void vrt::if_hdr_unpack_$(suffix)(      }  } +/*********************************************************************** + * link layer + VRT IF packing + **********************************************************************/ +void vrt::if_hdr_pack_$(suffix)( +    boost::uint32_t *packet_buff, +    if_packet_info_t &if_packet_info +){ +    boost::uint32_t vrt_hdr_word32 = 0; +    switch (if_packet_info.link_type) +    { +    case if_packet_info_t::LINK_TYPE_NONE: +        __if_hdr_pack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32); +        packet_buff[0] = $(XE_MACRO)(vrt_hdr_word32); +        break; + +    case if_packet_info_t::LINK_TYPE_CHDR: +    { +        __if_hdr_pack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32); +        const boost::uint32_t chdr = vrt_to_chdr(vrt_hdr_word32, if_packet_info.packet_count); +        packet_buff[0] = $(XE_MACRO)(chdr); +        break; +    } + +    case if_packet_info_t::LINK_TYPE_VRLP: +        __if_hdr_pack_$(suffix)(packet_buff+2, if_packet_info, vrt_hdr_word32); +        if_packet_info.num_header_words32 += 2; +        if_packet_info.num_packet_words32 += 3; +        packet_buff[0] = $(XE_MACRO)(VRLP); +        packet_buff[1] = $(XE_MACRO)(boost::uint32_t( +            (if_packet_info.num_packet_words32 & 0xfffff) | +            ((if_packet_info.packet_count & 0xfff) << 20) +        )); +        packet_buff[2] = $(XE_MACRO)(vrt_hdr_word32); +        packet_buff[if_packet_info.num_packet_words32-1] = $(XE_MACRO)(VEND); +        break; +    } +} + +/*********************************************************************** + * link layer + VRT IF unpacking + **********************************************************************/ +void vrt::if_hdr_unpack_$(suffix)( +    const boost::uint32_t *packet_buff, +    if_packet_info_t &if_packet_info +){ +    boost::uint32_t vrt_hdr_word32 = 0; +    switch (if_packet_info.link_type) +    { +    case if_packet_info_t::LINK_TYPE_NONE: +        vrt_hdr_word32 = $(XE_MACRO)(packet_buff[0]); +        __if_hdr_unpack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32); +        break; + +    case if_packet_info_t::LINK_TYPE_CHDR: +    { +        const boost::uint32_t chdr = $(XE_MACRO)(packet_buff[0]); +        size_t packet_count = 0; +        vrt_hdr_word32 = chdr_to_vrt(chdr, packet_count); +        __if_hdr_unpack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32); +        if_packet_info.packet_count = packet_count; +        break; +    } + +    case if_packet_info_t::LINK_TYPE_VRLP: +    { +        if ($(XE_MACRO)(packet_buff[0]) != VRLP) throw uhd::value_error("bad vrl header VRLP"); +        const boost::uint32_t vrl_hdr = $(XE_MACRO)(packet_buff[1]); +        vrt_hdr_word32 = $(XE_MACRO)(packet_buff[2]); +        if (if_packet_info.num_packet_words32 < (vrl_hdr & 0xfffff)) throw uhd::value_error("bad vrl header or packet fragment"); +        if ($(XE_MACRO)(packet_buff[(vrl_hdr & 0xfffff)-1]) != VEND) throw uhd::value_error("bad vrl trailer VEND"); +        __if_hdr_unpack_$(suffix)(packet_buff+2, if_packet_info, vrt_hdr_word32); +        if_packet_info.num_header_words32 += 2; //add vrl header +        if_packet_info.packet_count = (vrl_hdr >> 20) & 0xfff; +        break; +    } +    } +} +  ########################################################################  #end def  ######################################################################## diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 5a75d5f0d..75d1f3068 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -63,6 +63,8 @@ static inline void handle_overflow_nop(void){}  class recv_packet_handler{  public:      typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type; +    typedef boost::function<void(const size_t)> handle_flowctrl_type; +    typedef boost::function<void(const stream_cmd_t&)> issue_stream_cmd_type;      typedef void(*vrt_unpacker_type)(const boost::uint32_t *, vrt::if_packet_info_t &);      //typedef boost::function<void(const boost::uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type; @@ -139,6 +141,19 @@ public:          _props.at(xport_chan).get_buff = get_buff;      } +    /*! +     * Set the function to handle flow control +     * \param xport_chan which transport channel +     * \param handle_flowctrl the callback function +     */ +    void set_xport_handle_flowctrl(const size_t xport_chan, const handle_flowctrl_type &handle_flowctrl, const size_t update_window, const bool do_init = false) +    { +        _props.at(xport_chan).handle_flowctrl = handle_flowctrl; +        //we need the window size to be within the 0xfff (max 12 bit seq) +        _props.at(xport_chan).fc_update_window = std::min<size_t>(update_window, 0xfff); +        if (do_init) handle_flowctrl(0); +    } +      //! Set the conversion routine for all channels      void set_converter(const uhd::convert::id_type &id){          _num_outputs = id.num_outputs; @@ -158,6 +173,21 @@ public:          _converter->set_scalar(scale_factor);      } +    //! Set the callback to issue stream commands +    void set_issue_stream_cmd(const size_t xport_chan, const issue_stream_cmd_type &issue_stream_cmd) +    { +        _props.at(xport_chan).issue_stream_cmd = issue_stream_cmd; +    } + +    //! Overload call to issue stream commands +    void issue_stream_cmd(const stream_cmd_t &stream_cmd) +    { +        for (size_t i = 0; i < _props.size(); i++) +        { +            if (_props[i].issue_stream_cmd) _props[i].issue_stream_cmd(stream_cmd); +        } +    } +      /*******************************************************************       * Receive:       * The entry point for the fast-path receive calls. @@ -219,8 +249,11 @@ private:              handle_overflow(&handle_overflow_nop)          {}          get_buff_type get_buff; +        issue_stream_cmd_type issue_stream_cmd;          size_t packet_count;          handle_overflow_type handle_overflow; +        handle_flowctrl_type handle_flowctrl; +        size_t fc_update_window;      };      std::vector<xport_chan_props_type> _props;      size_t _num_outputs; @@ -302,6 +335,15 @@ private:          info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true          info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32); +        //handle flow control +        if (_props[index].handle_flowctrl) +        { +            if ((info.ifpi.packet_count % _props[index].fc_update_window/2) == 0) +            { +                _props[index].handle_flowctrl(info.ifpi.packet_count); +            } +        } +          //--------------------------------------------------------------          //-- Determine return conditions:          //-- The order of these checks is HOLY. @@ -314,8 +356,9 @@ private:          //2) check for sequence errors          #ifndef SRPH_DONT_CHECK_SEQUENCE +        const size_t seq_mask = (info.ifpi.link_type == vrt::if_packet_info_t::LINK_TYPE_NONE)? 0xf : 0xfff;          const size_t expected_packet_count = _props[index].packet_count; -        _props[index].packet_count = (info.ifpi.packet_count + 1)%16; +        _props[index].packet_count = (info.ifpi.packet_count + 1) & seq_mask;          if (expected_packet_count != info.ifpi.packet_count){              return PACKET_SEQUENCE_ERROR;          } @@ -622,6 +665,11 @@ public:          return recv_packet_handler::recv(buffs, nsamps_per_buff, metadata, timeout, one_packet);      } +    void issue_stream_cmd(const stream_cmd_t &stream_cmd) +    { +        return recv_packet_handler::issue_stream_cmd(stream_cmd); +    } +  private:      size_t _max_num_samps;  }; diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index 726742327..41f030ea6 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -47,6 +47,7 @@ namespace uhd{ namespace transport{ namespace sph{  class send_packet_handler{  public:      typedef boost::function<managed_send_buffer::sptr(double)> get_buff_type; +    typedef boost::function<bool(uhd::async_metadata_t &, const double)> async_receiver_type;      typedef void(*vrt_packer_type)(boost::uint32_t *, vrt::if_packet_info_t &);      //typedef boost::function<void(boost::uint32_t *, vrt::if_packet_info_t &)> vrt_packer_type; @@ -57,6 +58,7 @@ public:      send_packet_handler(const size_t size = 1):          _next_packet_seq(0)      { +        this->set_enable_trailer(true);          this->resize(size);      } @@ -96,6 +98,11 @@ public:          _props.at(xport_chan).sid = sid;      } +    void set_enable_trailer(const bool enable) +    { +        _has_tlr = enable; +    } +      //! Set the rate of ticks per second      void set_tick_rate(const double rate){          _tick_rate = rate; @@ -138,6 +145,21 @@ public:          _converter->set_scalar(scale_factor);      } +    //! Set the callback to get async messages +    void set_async_receiver(const async_receiver_type &async_receiver) +    { +        _async_receiver = async_receiver; +    } + +    //! Overload call to get async metadata +    bool recv_async_msg( +        uhd::async_metadata_t &async_metadata, double timeout = 0.1 +    ){ +        if (_async_receiver) return _async_receiver(async_metadata, timeout); +        boost::this_thread::sleep(boost::posix_time::microseconds(long(timeout*1e6))); +        return false; +    } +      /*******************************************************************       * Send:       * The entry point for the fast-path send calls. @@ -154,7 +176,7 @@ public:          if_packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA;          //if_packet_info.has_sid = false; //set per channel          if_packet_info.has_cid = false; -        if_packet_info.has_tlr = true; +        if_packet_info.has_tlr = _has_tlr;          if_packet_info.has_tsi = false;          if_packet_info.has_tsf = metadata.has_time_spec;          if_packet_info.tsf     = metadata.time_spec.to_ticks(_tick_rate); @@ -165,9 +187,12 @@ public:              //TODO remove this code when sample counts of zero are supported by hardware              #ifndef SSPH_DONT_PAD_TO_ONE -            if (nsamps_per_buff == 0) return send_one_packet( -                _zero_buffs, 1, if_packet_info, timeout -            ) & 0x0; +                static const boost::uint64_t zero = 0; +                _zero_buffs.resize(buffs.size(), &zero); + +                if (nsamps_per_buff == 0) return send_one_packet( +                    _zero_buffs, 1, if_packet_info, timeout +                ) & 0x0;              #endif              return send_one_packet(buffs, nsamps_per_buff, if_packet_info, timeout); @@ -228,6 +253,8 @@ private:      size_t _max_samples_per_packet;      std::vector<const void *> _zero_buffs;      size_t _next_packet_seq; +    bool _has_tlr; +    async_receiver_type _async_receiver;      /*******************************************************************       * Send a single packet: @@ -337,6 +364,12 @@ public:          return send_packet_handler::send(buffs, nsamps_per_buff, metadata, timeout);      } +    bool recv_async_msg( +        uhd::async_metadata_t &async_metadata, double timeout = 0.1 +    ){ +        return send_packet_handler::recv_async_msg(async_metadata, timeout); +    } +  private:      size_t _max_num_samps;  }; diff --git a/host/lib/usrp/CMakeLists.txt b/host/lib/usrp/CMakeLists.txt index 8ae379f73..d251fbaeb 100644 --- a/host/lib/usrp/CMakeLists.txt +++ b/host/lib/usrp/CMakeLists.txt @@ -1,5 +1,5 @@  # -# Copyright 2010-2011 Ettus Research LLC +# Copyright 2010-2013 Ettus Research LLC  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by diff --git a/host/lib/usrp/b100/io_impl.cpp b/host/lib/usrp/b100/io_impl.cpp index 9b5d4d25c..c4a4e422e 100644 --- a/host/lib/usrp/b100/io_impl.cpp +++ b/host/lib/usrp/b100/io_impl.cpp @@ -164,6 +164,8 @@ rx_streamer::sptr b100_impl::get_rx_stream(const uhd::stream_args_t &args_){          my_streamer->set_overflow_handler(chan_i, boost::bind(              &rx_dsp_core_200::handle_overflow, _rx_dsps[dsp]          )); +        my_streamer->set_issue_stream_cmd(chan_i, boost::bind( +            &rx_dsp_core_200::issue_stream_command, _rx_dsps[dsp], _1));          _rx_streamers[dsp] = my_streamer; //store weak pointer      } @@ -217,6 +219,7 @@ tx_streamer::sptr b100_impl::get_tx_stream(const uhd::stream_args_t &args_){          my_streamer->set_xport_chan_get_buff(chan_i, boost::bind(              &zero_copy_if::get_send_buff, _data_transport, _1          )); +        my_streamer->set_async_receiver(boost::bind(&fifo_ctrl_excelsior::pop_async_msg, _fifo_ctrl, _1, _2));          _tx_streamers[dsp] = my_streamer; //store weak pointer      } diff --git a/host/lib/usrp/e100/io_impl.cpp b/host/lib/usrp/e100/io_impl.cpp index e34620444..bf04a5871 100644 --- a/host/lib/usrp/e100/io_impl.cpp +++ b/host/lib/usrp/e100/io_impl.cpp @@ -166,6 +166,8 @@ rx_streamer::sptr e100_impl::get_rx_stream(const uhd::stream_args_t &args_){          my_streamer->set_overflow_handler(chan_i, boost::bind(              &rx_dsp_core_200::handle_overflow, _rx_dsps[dsp]          )); +        my_streamer->set_issue_stream_cmd(chan_i, boost::bind( +            &rx_dsp_core_200::issue_stream_command, _rx_dsps[dsp], _1));          _rx_streamers[dsp] = my_streamer; //store weak pointer      } @@ -220,6 +222,7 @@ tx_streamer::sptr e100_impl::get_tx_stream(const uhd::stream_args_t &args_){          my_streamer->set_xport_chan_get_buff(chan_i, boost::bind(              &zero_copy_if::get_send_buff, _data_transport, _1          )); +        my_streamer->set_async_receiver(boost::bind(&fifo_ctrl_excelsior::pop_async_msg, _fifo_ctrl, _1, _2));          _tx_streamers[dsp] = my_streamer; //store weak pointer      } diff --git a/host/lib/usrp/usrp1/io_impl.cpp b/host/lib/usrp/usrp1/io_impl.cpp index 8940a92bb..d384eb13f 100644 --- a/host/lib/usrp/usrp1/io_impl.cpp +++ b/host/lib/usrp/usrp1/io_impl.cpp @@ -356,6 +356,11 @@ public:          return _stc->recv_post(metadata, num_samps_recvd);      } +    void issue_stream_cmd(const stream_cmd_t &stream_cmd) +    { +        _stc->issue_stream_cmd(stream_cmd); +    } +  private:      size_t _max_num_samps;      soft_time_ctrl::sptr _stc; @@ -410,6 +415,12 @@ public:          return num_samps_sent;      } +    bool recv_async_msg( +        async_metadata_t &async_metadata, double timeout = 0.1 +    ){ +        return _stc->get_async_queue().pop_with_timed_wait(async_metadata, timeout); +    } +  private:      size_t _max_num_samps;      soft_time_ctrl::sptr _stc; diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index e06cf8f6f..9ee6abed0 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -467,6 +467,8 @@ rx_streamer::sptr usrp2_impl::get_rx_stream(const uhd::stream_args_t &args_){                  my_streamer->set_xport_chan_get_buff(chan_i, boost::bind(                      &zero_copy_if::get_recv_buff, _mbc[mb].rx_dsp_xports[dsp], _1                  ), true /*flush*/); +                my_streamer->set_issue_stream_cmd(chan_i, boost::bind( +                    &rx_dsp_core_200::issue_stream_command, _mbc[mb].rx_dsps[dsp], _1));                  _mbc[mb].rx_streamers[dsp] = my_streamer; //store weak pointer                  break;              } @@ -536,6 +538,7 @@ tx_streamer::sptr usrp2_impl::get_tx_stream(const uhd::stream_args_t &args_){                  my_streamer->set_xport_chan_get_buff(chan_i, boost::bind(                      &usrp2_impl::io_impl::get_send_buff, _io_impl.get(), abs, _1                  )); +                my_streamer->set_async_receiver(boost::bind(&bounded_buffer<async_metadata_t>::pop_with_timed_wait, &(_io_impl->async_msg_fifo), _1, _2));                  _mbc[mb].tx_streamers[dsp] = my_streamer; //store weak pointer                  break;              } diff --git a/host/tests/sph_recv_test.cpp b/host/tests/sph_recv_test.cpp index 5a40029dc..9339a9739 100644 --- a/host/tests/sph_recv_test.cpp +++ b/host/tests/sph_recv_test.cpp @@ -288,7 +288,7 @@ BOOST_AUTO_TEST_CASE(test_sph_recv_one_channel_inline_message){          //simulate overflow          if (i == NUM_PKTS_TO_TEST/2){ -            ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_EXTENSION; +            ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_CONTEXT;              ifpi.num_payload_words32 = 1;              dummy_recv_xport.push_back_packet(ifpi, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW);          } diff --git a/host/tests/vrt_test.cpp b/host/tests/vrt_test.cpp index 066f1493b..225fa4f2b 100644 --- a/host/tests/vrt_test.cpp +++ b/host/tests/vrt_test.cpp @@ -1,5 +1,5 @@  // -// Copyright 2010-2011 Ettus Research LLC +// Copyright 2010-2013 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -17,26 +17,39 @@  #include <boost/test/unit_test.hpp>  #include <uhd/transport/vrt_if_packet.hpp> +#include <uhd/utils/byteswap.hpp> +#include <boost/format.hpp>  #include <cstdlib> +#include <iostream>  using namespace uhd::transport;  static void pack_and_unpack(      vrt::if_packet_info_t &if_packet_info_in  ){ -    boost::uint32_t header_buff[vrt::max_if_hdr_words32]; +    if (if_packet_info_in.num_payload_bytes == 0) +    { +        if_packet_info_in.num_payload_bytes = if_packet_info_in.num_payload_words32 * sizeof(boost::uint32_t); +    } +    boost::uint32_t packet_buff[2048];      //pack metadata into a vrt header      vrt::if_hdr_pack_be( -        header_buff, if_packet_info_in +        packet_buff, if_packet_info_in      ); +    std::cout << std::endl; +    for (size_t i = 0; i < 5; i++) +    { +        std::cout << boost::format("packet_buff[%u] = 0x%.8x") % i % uhd::byteswap(packet_buff[i]) << std::endl; +    }      vrt::if_packet_info_t if_packet_info_out; +    if_packet_info_out.link_type = if_packet_info_in.link_type;      if_packet_info_out.num_packet_words32 = if_packet_info_in.num_packet_words32;      //unpack the vrt header back into metadata      vrt::if_hdr_unpack_be( -        header_buff, if_packet_info_out +        packet_buff, if_packet_info_out      );      //check the the unpacked metadata is the same @@ -91,7 +104,7 @@ BOOST_AUTO_TEST_CASE(test_with_sid){      if_packet_info.has_tsf = false;      if_packet_info.has_tlr = false;      if_packet_info.sid = std::rand(); -    if_packet_info.num_payload_words32 = 1111; +    if_packet_info.num_payload_words32 = 11;      pack_and_unpack(if_packet_info);  } @@ -106,7 +119,7 @@ BOOST_AUTO_TEST_CASE(test_with_cid){      if_packet_info.has_tsf = false;      if_packet_info.has_tlr = false;      if_packet_info.cid = std::rand(); -    if_packet_info.num_payload_words32 = 2222; +    if_packet_info.num_payload_words32 = 22;      pack_and_unpack(if_packet_info);  } @@ -120,7 +133,7 @@ BOOST_AUTO_TEST_CASE(test_with_time){      if_packet_info.has_tlr = false;      if_packet_info.tsi = std::rand();      if_packet_info.tsf = std::rand(); -    if_packet_info.num_payload_words32 = 33333; +    if_packet_info.num_payload_words32 = 33;      pack_and_unpack(if_packet_info);  } @@ -136,6 +149,36 @@ BOOST_AUTO_TEST_CASE(test_with_all){      if_packet_info.cid = std::rand();      if_packet_info.tsi = std::rand();      if_packet_info.tsf = std::rand(); -    if_packet_info.num_payload_words32 = 44444; +    if_packet_info.num_payload_words32 = 44; +    pack_and_unpack(if_packet_info); +} + +BOOST_AUTO_TEST_CASE(test_with_vrlp){ +    vrt::if_packet_info_t if_packet_info; +    if_packet_info.link_type = vrt::if_packet_info_t::LINK_TYPE_VRLP; +    if_packet_info.packet_count = 3; +    if_packet_info.has_sid = true; +    if_packet_info.has_cid = false; +    if_packet_info.has_tsi = false; +    if_packet_info.has_tsf = true; +    if_packet_info.has_tlr = true; +    if_packet_info.tsi = std::rand(); +    if_packet_info.tsf = std::rand(); +    if_packet_info.num_payload_words32 = 42; +    pack_and_unpack(if_packet_info); +} + +BOOST_AUTO_TEST_CASE(test_with_chdr){ +    vrt::if_packet_info_t if_packet_info; +    if_packet_info.link_type = vrt::if_packet_info_t::LINK_TYPE_CHDR; +    if_packet_info.packet_count = 7; +    if_packet_info.has_sid = true; +    if_packet_info.has_cid = false; +    if_packet_info.has_tsi = false; +    if_packet_info.has_tsf = true; +    if_packet_info.has_tlr = true; +    if_packet_info.tsi = std::rand(); +    if_packet_info.tsf = std::rand(); +    if_packet_info.num_payload_words32 = 24;      pack_and_unpack(if_packet_info);  } | 
