aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2015-10-30 16:57:42 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2015-10-30 16:57:42 +0100
commit5bd51162757cb8ae5b8a7fba86971ee3a176672f (patch)
tree7f31b1572780c03745790afa159d16ecde07f97b
parent0bd1e4e64984cb51e65d6f0e8db92f44934eff59 (diff)
downloadodr-dpd-5bd51162757cb8ae5b8a7fba86971ee3a176672f.tar.gz
odr-dpd-5bd51162757cb8ae5b8a7fba86971ee3a176672f.tar.bz2
odr-dpd-5bd51162757cb8ae5b8a7fba86971ee3a176672f.zip
Something resembling a correlation
-rw-r--r--OutputUHD.cpp48
-rw-r--r--OutputUHD.hpp5
-rw-r--r--main.cpp195
3 files changed, 240 insertions, 8 deletions
diff --git a/OutputUHD.cpp b/OutputUHD.cpp
index a56112d..f1fe3bf 100644
--- a/OutputUHD.cpp
+++ b/OutputUHD.cpp
@@ -37,9 +37,10 @@
using namespace std;
-OutputUHD::OutputUHD(double txgain)
+OutputUHD::OutputUHD(double txgain, double samplerate)
{
m_txgain = txgain;
+ m_samplerate = samplerate;
uhd::set_thread_priority_safe();
@@ -47,7 +48,6 @@ OutputUHD::OutputUHD(double txgain)
m_usrp = uhd::usrp::multi_usrp::make(device);
- m_samplerate = 2048000;
m_usrp->set_tx_rate(m_samplerate);
m_usrp->set_rx_rate(m_samplerate);
@@ -70,6 +70,17 @@ OutputUHD::OutputUHD(double txgain)
uhd::stream_args_t stream_args("fc32"); //complex floats
myTxStream = m_usrp->get_tx_stream(stream_args);
+
+ myRxStream = m_usrp->get_rx_stream(stream_args);
+ uhd::stream_cmd_t stream_cmd(true ?
+ uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS:
+ uhd::stream_cmd_t::STREAM_MODE_NUM_SAMPS_AND_DONE
+ );
+ stream_cmd.num_samps = 0;
+ stream_cmd.stream_now = true;
+ stream_cmd.time_spec = uhd::time_spec_t();
+ myRxStream->issue_stream_cmd(stream_cmd);
+
}
size_t OutputUHD::Transmit(const complexf *samples, size_t sizeIn, double *first_sample_time)
@@ -97,3 +108,36 @@ size_t OutputUHD::Transmit(const complexf *samples, size_t sizeIn, double *first
return num_acc_samps;
}
+size_t OutputUHD::Receive(complexf *samples, size_t sizeIn, double *first_sample_time)
+{
+ const double rx_timeout = 20.0;
+
+ uhd::rx_metadata_t md;
+ size_t usrp_max_num_samps = myRxStream->get_max_num_samps();
+
+ *first_sample_time = md.time_spec.get_real_secs();
+
+ size_t num_acc_samps = 0; //number of accumulated samples
+ while (num_acc_samps < sizeIn) {
+ size_t samps_to_send = std::min(sizeIn - num_acc_samps, usrp_max_num_samps);
+
+ //send a single packet
+ size_t num_rx_samps = myRxStream->recv(
+ &samples[num_acc_samps],
+ samps_to_send, md, rx_timeout);
+
+ if (num_acc_samps == 0) {
+ if (md.has_time_spec) {
+ *first_sample_time = md.time_spec.get_real_secs();
+ }
+ else {
+ MDEBUG("samp %zu no time spec!\n", num_acc_samps);
+ }
+ }
+
+ num_acc_samps += num_rx_samps;
+ }
+
+ return num_acc_samps;
+}
+
diff --git a/OutputUHD.hpp b/OutputUHD.hpp
index ad95e99..a6dc043 100644
--- a/OutputUHD.hpp
+++ b/OutputUHD.hpp
@@ -31,9 +31,10 @@ typedef std::complex<float> complexf;
class OutputUHD {
public:
- OutputUHD(double txgain);
+ OutputUHD(double txgain, double samplerate);
size_t Transmit(const complexf *samples, size_t sizeIn, double *first_sample_time);
+ size_t Receive(complexf *samples, size_t sizeIn, double *first_sample_time);
private:
double m_samplerate;
@@ -41,6 +42,8 @@ class OutputUHD {
uhd::usrp::multi_usrp::sptr m_usrp;
uhd::tx_metadata_t md;
uhd::tx_streamer::sptr myTxStream;
+
+ uhd::rx_streamer::sptr myRxStream;
};
#endif // __OUTPUT_UHD_H_
diff --git a/main.cpp b/main.cpp
index 5803262..a043e01 100644
--- a/main.cpp
+++ b/main.cpp
@@ -24,6 +24,23 @@
#include "OutputUHD.hpp"
#include "utils.hpp"
#include <zmq.hpp>
+#include <thread>
+#include <vector>
+#include <deque>
+#include <mutex>
+#include <atomic>
+#include <csignal>
+#include <iostream>
+#include <future>
+
+std::atomic<bool> running;
+
+void sig_int_handler(int) {
+ running = false;
+}
+
+const size_t samps_per_buffer = 20480;
+const size_t samplerate = 2048000;
size_t read_samples(FILE* fd, std::vector<complexf>& samples, size_t count)
{
@@ -41,6 +58,150 @@ size_t read_samples(FILE* fd, std::vector<complexf>& samples, size_t count)
return num_read;
}
+class AlignSample {
+ public:
+ AlignSample() {
+ m_rx_sample_time = 0;
+ m_tx_sample_time = 0;
+ }
+
+ void push_tx_samples(complexf* samps, size_t len, double first_sample_time) {
+ std::lock_guard<std::mutex> lock(m_mutex);
+ std::copy(samps, samps + len, std::back_inserter(m_txsamples));
+
+ if (m_tx_sample_time == 0) {
+ m_tx_sample_time = first_sample_time;
+ }
+ }
+
+ void push_rx_samples(complexf* samps, size_t len, double first_sample_time) {
+ std::lock_guard<std::mutex> lock(m_mutex);
+ std::copy(samps, samps + len, std::back_inserter(m_rxsamples));
+
+ if (m_rx_sample_time == 0) {
+ m_rx_sample_time = first_sample_time;
+ }
+ }
+
+ bool ready() {
+ return aligned() and m_rxsamples.size() > 8000 and m_txsamples.size() > 8000;
+ }
+
+ void debug() {
+ MDEBUG("Aligner\n");
+ MDEBUG(" RX: %f %zu\n", m_rx_sample_time, m_rxsamples.size());
+ MDEBUG(" TX: %f %zu\n", m_tx_sample_time, m_txsamples.size());
+ }
+
+ std::pair<bool, complexf> crosscorrelate(size_t offset, size_t len) {
+ complexf xcorr(0, 0);
+
+ if (m_rxsamples.size() < len or
+ m_txsamples.size() < len + offset) {
+ return {false, 0};
+ }
+
+ for (size_t i = 0; i < len; i++) {
+ xcorr += m_rxsamples[i] * std::conj(m_txsamples[i+offset]);
+ }
+ return {true, xcorr};
+ }
+
+ private:
+ bool aligned() {
+ std::lock_guard<std::mutex> lock(m_mutex);
+ debug();
+
+ if (std::abs(m_rx_sample_time - m_tx_sample_time) < 1e-6) {
+ return true;
+ }
+ else if (m_rx_sample_time < m_tx_sample_time) {
+ size_t rx_samples_to_skip = (m_tx_sample_time - m_rx_sample_time) * samplerate;
+
+ if (rx_samples_to_skip > m_rxsamples.size()) {
+ return false;
+ }
+
+ m_rxsamples.erase(m_rxsamples.begin(), m_rxsamples.begin() + rx_samples_to_skip);
+ m_rx_sample_time += (double)rx_samples_to_skip / samplerate;
+ return true;
+ }
+ else if (m_rx_sample_time > m_tx_sample_time) {
+ size_t tx_samples_to_skip = (m_rx_sample_time - m_tx_sample_time) * samplerate;
+
+ if (tx_samples_to_skip > m_txsamples.size()) {
+ return false;
+ }
+
+ m_txsamples.erase(m_txsamples.begin(), m_txsamples.begin() + tx_samples_to_skip);
+ m_tx_sample_time += (double)tx_samples_to_skip / samplerate;
+ return true;
+ }
+ return false;
+ }
+
+ std::mutex m_mutex;
+ double m_rx_sample_time;
+ std::deque<complexf> m_rxsamples;
+
+ double m_tx_sample_time;
+ std::deque<complexf> m_txsamples;
+};
+
+AlignSample aligner;
+
+size_t do_receive(OutputUHD* output_uhd)
+{
+ std::vector<complexf> samps(samps_per_buffer);
+ double first_sample_time = 0;
+
+ size_t total_received = 0;
+ double last_print_time = 0;
+
+ MDEBUG("Starting do_receive\n");
+ while (running) {
+ size_t received = output_uhd->Receive(&samps.front(), samps.size(), &first_sample_time);
+ aligner.push_rx_samples(&samps.front(), received, first_sample_time);
+ total_received += received;
+
+ if (first_sample_time - last_print_time > 1) {
+ MDEBUG("Rx %zu samples at t=%f\n", received, first_sample_time);
+ last_print_time = first_sample_time;
+ }
+ }
+ MDEBUG("Leaving do_receive\n");
+
+ return total_received;
+}
+
+void find_peak_correlation()
+{
+ if (aligner.ready()) {
+ const size_t max_offset = 1000; // 488us at 2048000
+ std::vector<complexf> correlations(max_offset);
+ double max_ampl = 0.0;
+ size_t pos_max = 0;
+ for (size_t offset = 0; offset < max_offset; offset++) {
+ auto valid_xc = aligner.crosscorrelate(offset, max_offset);
+
+ if (not valid_xc.first) { return; }
+
+ auto xc = valid_xc.second;
+ correlations[offset] = xc;
+
+ if (std::abs(xc) >= max_ampl) {
+ max_ampl = std::abs(xc);
+ pos_max = offset;
+ }
+ }
+ MDEBUG("Max correlation is %f at %zu\n", max_ampl, pos_max);
+ }
+ else {
+ MDEBUG("Not aligned\n");
+ }
+}
+
+
int main(int argc, char **argv)
{
double txgain = 0;
@@ -62,9 +223,7 @@ int main(int argc, char **argv)
std::string uri = argv[1];
- const size_t samps_per_buffer = 20480;
-
- OutputUHD output_uhd(txgain);
+ OutputUHD output_uhd(txgain, samplerate);
zmq::context_t ctx;
zmq::socket_t zmq_sock(ctx, ZMQ_SUB);
@@ -89,12 +248,21 @@ int main(int argc, char **argv)
double last_print_time = 0;
size_t sent = 0;
+ std::signal(SIGINT, &sig_int_handler);
+
+ running = true;
+
+ std::thread receive_thread(do_receive, &output_uhd);
+
+ auto fut_corr = std::async(std::launch::async, find_peak_correlation);
+
do {
double first_sample_time = 0;
if (fd) {
samps_read = read_samples(fd, input_samples, samps_per_buffer);
sent = output_uhd.Transmit(&input_samples.front(), samps_read, &first_sample_time);
+ aligner.push_tx_samples(&input_samples.front(), samps_read, first_sample_time);
}
else {
zmq::message_t msg;
@@ -110,6 +278,9 @@ int main(int argc, char **argv)
samps_read = msg.size() / sizeof(complexf);
sent = output_uhd.Transmit((complexf*)msg.data(), samps_read, &first_sample_time);
+
+ aligner.push_tx_samples((complexf*)msg.data(), samps_read, first_sample_time);
+
}
if (first_sample_time - last_print_time > 1) {
@@ -118,9 +289,23 @@ int main(int argc, char **argv)
}
total_samps_read += samps_read;
+
+ if (aligner.ready()) {
+ if (fut_corr.valid()) {
+ fut_corr.get();
+
+ fut_corr = std::async(std::launch::async, find_peak_correlation);
+ }
+ }
}
- while (samps_read and sent);
+ while (samps_read and sent and running);
+ MDEBUG("Leaving main loop with running=%d\n", running ? 1 : 0);
+
+ running = false;
+
+ receive_thread.join();
+
+ aligner.debug();
- MDEBUG("Read %zu samples in total\n", total_samps_read);
}