From 5bd51162757cb8ae5b8a7fba86971ee3a176672f Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 30 Oct 2015 16:57:42 +0100 Subject: Something resembling a correlation --- OutputUHD.cpp | 48 ++++++++++++++- OutputUHD.hpp | 5 +- main.cpp | 195 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 240 insertions(+), 8 deletions(-) diff --git a/OutputUHD.cpp b/OutputUHD.cpp index a56112d..f1fe3bf 100644 --- a/OutputUHD.cpp +++ b/OutputUHD.cpp @@ -37,9 +37,10 @@ using namespace std; -OutputUHD::OutputUHD(double txgain) +OutputUHD::OutputUHD(double txgain, double samplerate) { m_txgain = txgain; + m_samplerate = samplerate; uhd::set_thread_priority_safe(); @@ -47,7 +48,6 @@ OutputUHD::OutputUHD(double txgain) m_usrp = uhd::usrp::multi_usrp::make(device); - m_samplerate = 2048000; m_usrp->set_tx_rate(m_samplerate); m_usrp->set_rx_rate(m_samplerate); @@ -70,6 +70,17 @@ OutputUHD::OutputUHD(double txgain) uhd::stream_args_t stream_args("fc32"); //complex floats myTxStream = m_usrp->get_tx_stream(stream_args); + + myRxStream = m_usrp->get_rx_stream(stream_args); + uhd::stream_cmd_t stream_cmd(true ? + uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS: + uhd::stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_DONE + ); + stream_cmd.num_samps = 0; + stream_cmd.stream_now = true; + stream_cmd.time_spec = uhd::time_spec_t(); + myRxStream->issue_stream_cmd(stream_cmd); + } size_t OutputUHD::Transmit(const complexf *samples, size_t sizeIn, double *first_sample_time) @@ -97,3 +108,36 @@ size_t OutputUHD::Transmit(const complexf *samples, size_t sizeIn, double *first return num_acc_samps; } +size_t OutputUHD::Receive(complexf *samples, size_t sizeIn, double *first_sample_time) +{ + const double rx_timeout = 20.0; + + uhd::rx_metadata_t md; + size_t usrp_max_num_samps = myRxStream->get_max_num_samps(); + + *first_sample_time = md.time_spec.get_real_secs(); + + size_t num_acc_samps = 0; //number of accumulated samples + while (num_acc_samps < sizeIn) { + size_t samps_to_send = std::min(sizeIn - num_acc_samps, usrp_max_num_samps); + + //send a single packet + size_t num_rx_samps = myRxStream->recv( + &samples[num_acc_samps], + samps_to_send, md, rx_timeout); + + if (num_acc_samps == 0) { + if (md.has_time_spec) { + *first_sample_time = md.time_spec.get_real_secs(); + } + else { + MDEBUG("samp %zu no time spec!\n", num_acc_samps); + } + } + + num_acc_samps += num_rx_samps; + } + + return num_acc_samps; +} + diff --git a/OutputUHD.hpp b/OutputUHD.hpp index ad95e99..a6dc043 100644 --- a/OutputUHD.hpp +++ b/OutputUHD.hpp @@ -31,9 +31,10 @@ typedef std::complex complexf; class OutputUHD { public: - OutputUHD(double txgain); + OutputUHD(double txgain, double samplerate); size_t Transmit(const complexf *samples, size_t sizeIn, double *first_sample_time); + size_t Receive(complexf *samples, size_t sizeIn, double *first_sample_time); private: double m_samplerate; @@ -41,6 +42,8 @@ class OutputUHD { uhd::usrp::multi_usrp::sptr m_usrp; uhd::tx_metadata_t md; uhd::tx_streamer::sptr myTxStream; + + uhd::rx_streamer::sptr myRxStream; }; #endif // __OUTPUT_UHD_H_ diff --git a/main.cpp b/main.cpp index 5803262..a043e01 100644 --- a/main.cpp +++ b/main.cpp @@ -24,6 +24,23 @@ #include "OutputUHD.hpp" #include "utils.hpp" #include +#include +#include +#include +#include +#include +#include +#include +#include + +std::atomic running; + +void sig_int_handler(int) { + running = false; +} + +const size_t samps_per_buffer = 20480; +const size_t samplerate = 2048000; size_t read_samples(FILE* fd, std::vector& samples, size_t count) { @@ -41,6 +58,150 @@ size_t read_samples(FILE* fd, std::vector& samples, size_t count) return num_read; } +class AlignSample { + public: + AlignSample() { + m_rx_sample_time = 0; + m_tx_sample_time = 0; + } + + void push_tx_samples(complexf* samps, size_t len, double first_sample_time) { + std::lock_guard lock(m_mutex); + std::copy(samps, samps + len, std::back_inserter(m_txsamples)); + + if (m_tx_sample_time == 0) { + m_tx_sample_time = first_sample_time; + } + } + + void push_rx_samples(complexf* samps, size_t len, double first_sample_time) { + std::lock_guard lock(m_mutex); + std::copy(samps, samps + len, std::back_inserter(m_rxsamples)); + + if (m_rx_sample_time == 0) { + m_rx_sample_time = first_sample_time; + } + } + + bool ready() { + return aligned() and m_rxsamples.size() > 8000 and m_txsamples.size() > 8000; + } + + void debug() { + MDEBUG("Aligner\n"); + MDEBUG(" RX: %f %zu\n", m_rx_sample_time, m_rxsamples.size()); + MDEBUG(" TX: %f %zu\n", m_tx_sample_time, m_txsamples.size()); + } + + std::pair crosscorrelate(size_t offset, size_t len) { + complexf xcorr(0, 0); + + if (m_rxsamples.size() < len or + m_txsamples.size() < len + offset) { + return {false, 0}; + } + + for (size_t i = 0; i < len; i++) { + xcorr += m_rxsamples[i] * std::conj(m_txsamples[i+offset]); + } + return {true, xcorr}; + } + + private: + bool aligned() { + std::lock_guard lock(m_mutex); + debug(); + + if (std::abs(m_rx_sample_time - m_tx_sample_time) < 1e-6) { + return true; + } + else if (m_rx_sample_time < m_tx_sample_time) { + size_t rx_samples_to_skip = (m_tx_sample_time - m_rx_sample_time) * samplerate; + + if (rx_samples_to_skip > m_rxsamples.size()) { + return false; + } + + m_rxsamples.erase(m_rxsamples.begin(), m_rxsamples.begin() + rx_samples_to_skip); + m_rx_sample_time += (double)rx_samples_to_skip / samplerate; + return true; + } + else if (m_rx_sample_time > m_tx_sample_time) { + size_t tx_samples_to_skip = (m_rx_sample_time - m_tx_sample_time) * samplerate; + + if (tx_samples_to_skip > m_txsamples.size()) { + return false; + } + + m_txsamples.erase(m_txsamples.begin(), m_txsamples.begin() + tx_samples_to_skip); + m_tx_sample_time += (double)tx_samples_to_skip / samplerate; + return true; + } + return false; + } + + std::mutex m_mutex; + double m_rx_sample_time; + std::deque m_rxsamples; + + double m_tx_sample_time; + std::deque m_txsamples; +}; + +AlignSample aligner; + +size_t do_receive(OutputUHD* output_uhd) +{ + std::vector samps(samps_per_buffer); + double first_sample_time = 0; + + size_t total_received = 0; + double last_print_time = 0; + + MDEBUG("Starting do_receive\n"); + while (running) { + size_t received = output_uhd->Receive(&samps.front(), samps.size(), &first_sample_time); + aligner.push_rx_samples(&samps.front(), received, first_sample_time); + total_received += received; + + if (first_sample_time - last_print_time > 1) { + MDEBUG("Rx %zu samples at t=%f\n", received, first_sample_time); + last_print_time = first_sample_time; + } + } + MDEBUG("Leaving do_receive\n"); + + return total_received; +} + +void find_peak_correlation() +{ + if (aligner.ready()) { + const size_t max_offset = 1000; // 488us at 2048000 + std::vector correlations(max_offset); + double max_ampl = 0.0; + size_t pos_max = 0; + for (size_t offset = 0; offset < max_offset; offset++) { + auto valid_xc = aligner.crosscorrelate(offset, max_offset); + + if (not valid_xc.first) { return; } + + auto xc = valid_xc.second; + correlations[offset] = xc; + + if (std::abs(xc) >= max_ampl) { + max_ampl = std::abs(xc); + pos_max = offset; + } + } + MDEBUG("Max correlation is %f at %zu\n", max_ampl, pos_max); + } + else { + MDEBUG("Not aligned\n"); + } +} + + int main(int argc, char **argv) { double txgain = 0; @@ -62,9 +223,7 @@ int main(int argc, char **argv) std::string uri = argv[1]; - const size_t samps_per_buffer = 20480; - - OutputUHD output_uhd(txgain); + OutputUHD output_uhd(txgain, samplerate); zmq::context_t ctx; zmq::socket_t zmq_sock(ctx, ZMQ_SUB); @@ -89,12 +248,21 @@ int main(int argc, char **argv) double last_print_time = 0; size_t sent = 0; + std::signal(SIGINT, &sig_int_handler); + + running = true; + + std::thread receive_thread(do_receive, &output_uhd); + + auto fut_corr = std::async(std::launch::async, find_peak_correlation); + do { double first_sample_time = 0; if (fd) { samps_read = read_samples(fd, input_samples, samps_per_buffer); sent = output_uhd.Transmit(&input_samples.front(), samps_read, &first_sample_time); + aligner.push_tx_samples(&input_samples.front(), samps_read, first_sample_time); } else { zmq::message_t msg; @@ -110,6 +278,9 @@ int main(int argc, char **argv) samps_read = msg.size() / sizeof(complexf); sent = output_uhd.Transmit((complexf*)msg.data(), samps_read, &first_sample_time); + + aligner.push_tx_samples((complexf*)msg.data(), samps_read, first_sample_time); + } if (first_sample_time - last_print_time > 1) { @@ -118,9 +289,23 @@ int main(int argc, char **argv) } total_samps_read += samps_read; + + if (aligner.ready()) { + if (fut_corr.valid()) { + fut_corr.get(); + + fut_corr = std::async(std::launch::async, find_peak_correlation); + } + } } - while (samps_read and sent); + while (samps_read and sent and running); + MDEBUG("Leaving main loop with running=%d\n", running ? 1 : 0); + + running = false; + + receive_thread.join(); + + aligner.debug(); - MDEBUG("Read %zu samples in total\n", total_samps_read); } -- cgit v1.2.3