/*
Copyright (C) 2015
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
*/
/*
This file is part of ODR-DPD.
ODR-DPD is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
ODR-DPD is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with ODR-DPD. If not, see .
*/
#include "OutputUHD.hpp"
#include "pointcloud.hpp"
#include "AlignSample.hpp"
#include "utils.hpp"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
std::atomic running;
void sig_int_handler(int) {
running = false;
}
size_t read_samples_from_file(FILE* fd, std::vector& samples, size_t count)
{
if (samples.size() < count) {
MDEBUG("HAD TO RESIZE BUFFER!\n");
samples.resize(count);
}
size_t num_read = fread(&samples.front(), sizeof(complexf), count, fd);
if (num_read == 0) {
rewind(fd);
num_read = fread(&samples.front(), sizeof(complexf), count, fd);
}
return num_read;
}
AlignSample aligner;
PointCloud cloud(10000);
size_t do_receive(OutputUHD* output_uhd)
{
std::vector samps(samps_per_buffer);
double first_sample_time = 0;
size_t total_received = 0;
double last_print_time = 0;
MDEBUG("Starting do_receive\n");
while (running) {
ssize_t received = output_uhd->Receive(&samps.front(), samps.size(), &first_sample_time);
if (received > 0) {
aligner.push_rx_samples(&samps.front(), received, first_sample_time);
total_received += received;
if (first_sample_time - last_print_time > 1) {
//MDEBUG("Rx %zu samples at t=%f\n", received, first_sample_time);
last_print_time = first_sample_time;
}
}
else {
// A receive error occurred that invalidates the RX timestamp
MDEBUG("Reset aligner RX\n");
aligner.reset_rx();
}
}
MDEBUG("Leaving do_receive\n");
return total_received;
}
const size_t correlation_length = 16 * 1024; // 8ms at 2048000
long user_delay = 0;
void push_to_point_cloud(size_t rx_delay)
{
auto points = aligner.get_samples(correlation_length, rx_delay + user_delay);
if (points.first.size() > 0) {
cloud.push_samples(points);
}
}
size_t find_peak_correlation(size_t correlation_length)
{
double max_norm = 0.0;
size_t pos_max = 0;
auto result = aligner.crosscorrelate(correlation_length);
auto& xcs = result.correlation;
// Find correlation peak
for (size_t offset = 0; offset < xcs.size(); offset++) {
complexf xc = xcs[offset];
if (std::norm(xc) >= max_norm) {
max_norm = std::norm(xc);
pos_max = offset;
}
}
char msg[512];
snprintf(msg, 512, "Max correlation is %f at %fms (%zu), with RX %fdB and TX %fdB, RXtime %f, TXtime %f\n",
std::sqrt(max_norm),
(double)pos_max / (double)samplerate * 1000.0,
pos_max,
10*std::log(result.rx_power),
10*std::log(result.tx_power),
result.rx_timestamp,
result.tx_timestamp);
std::cerr << msg;
std::this_thread::sleep_for(std::chrono::microseconds(1));
// Eat much more than we correlate, because correlation is slow
aligner.consume(204800);
return pos_max;
}
void analyse_correlation()
{
const size_t num_analyse = 10;
std::vector max_positions(num_analyse);
while (running) {
for (size_t i = 0; running and i < num_analyse; i++) {
if (aligner.ready(correlation_length)) {
max_positions[i] = find_peak_correlation(correlation_length);
}
else {
MDEBUG("Waiting for correlation\n");
aligner.debug();
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
bool all_identical = true;
double mean = std::accumulate(max_positions.begin(), max_positions.end(), 0.0) / (double)max_positions.size();
for (size_t i = 0; i < num_analyse; i++) {
if (std::fabs(max_positions[i] - mean) > 1) {
all_identical = false;
break;
}
}
if (all_identical) {
size_t delay_samples = max_positions[0];
push_to_point_cloud(delay_samples);
}
else {
MDEBUG("Not all delays identical\n");
}
}
}
int main(int argc, char **argv)
{
double txgain = 0;
double rxgain = 0;
if (argc >= 3) {
txgain = strtod(argv[2], nullptr);
if (!(0 <= txgain and txgain < 80)) {
MDEBUG("txgain wrong: %f\n", txgain);
return -1;
}
}
if (argc >= 4) {
rxgain = strtod(argv[3], nullptr);
if (!(0 <= rxgain and rxgain < 80)) {
MDEBUG("rxgain wrong: %f\n", rxgain);
return -1;
}
}
if (argc < 2) {
MDEBUG("Require input file or url\n");
return -1;
}
std::string uri = argv[1];
zmq::context_t ctx;
zmq::socket_t zmq_sock(ctx, ZMQ_SUB);
FILE* fd = nullptr;
if (uri == "test") { //{{{
FILE* fd_rx = fopen("rx.test", "r");
if (!fd_rx) {
std::cerr << "fx_rx open error" << std::endl;
abort();
}
FILE* fd_tx = fopen("tx.test", "r");
if (!fd_tx) {
std::cerr << "fx_tx open error" << std::endl;
abort();
}
size_t num_rx_samples;
size_t num_tx_samples;
do {
const size_t len = 64;
std::vector rx_samples(len);
std::vector tx_samples(len);
num_rx_samples = fread(&rx_samples.front(), sizeof(complexf), len, fd_rx);
num_tx_samples = fread(&tx_samples.front(), sizeof(complexf), len, fd_tx);
aligner.push_rx_samples(&rx_samples.front(), num_rx_samples, 1);
aligner.push_tx_samples(&tx_samples.front(), num_tx_samples, 1);
std::cerr << ".";
} while (num_rx_samples and num_tx_samples);
std::cerr << std::endl;
aligner.debug();
const size_t correlation_length = 16 * 1024;
double max_norm = 0.0;
size_t pos_max = 0;
while (aligner.ready(correlation_length)) {
auto result = aligner.crosscorrelate(correlation_length);
auto& xcs = result.correlation;
for (size_t offset = 0; offset < xcs.size(); offset++) {
complexf& xc = xcs[offset];
if (std::norm(xc) >= max_norm) {
max_norm = std::norm(xc);
pos_max = offset;
}
}
MDEBUG("Max correlation is %f at %fms (%zu), with RX %fdB and TX %fdB, RXtime %f, TXtime %f\n",
std::sqrt(max_norm),
(double)pos_max / (double)samplerate * 1000.0,
pos_max,
10*std::log(result.rx_power),
10*std::log(result.tx_power),
result.rx_timestamp,
result.tx_timestamp);
aligner.consume(correlation_length / 2);
}
return 0;
} // }}}
else if (uri.find("tcp://") != 0) {
fd = fopen(uri.c_str(), "rb");
if (!fd) {
MDEBUG("Could not open file\n");
return -1;
}
}
else {
zmq_sock.connect(uri.c_str());
zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0);
}
OutputUHD output_uhd(txgain, rxgain, samplerate);
size_t samps_read = 0;
size_t total_samps_read = samps_read;
double last_print_time = 0;
size_t sent = 0;
std::signal(SIGINT, &sig_int_handler);
running = true;
std::thread receive_thread(do_receive, &output_uhd);
std::thread correlator_thread(analyse_correlation);
do {
const double first_sample_time = 4.0;
const double sample_time = first_sample_time + (double)total_samps_read / (double)samplerate;
if (fd) {
std::vector input_samples(samps_per_buffer);
samps_read = read_samples_from_file(fd, input_samples, samps_per_buffer);
sent = output_uhd.Transmit(&input_samples.front(), samps_read, sample_time);
aligner.push_tx_samples(&input_samples.front(), samps_read, sample_time);
}
else {
zmq::message_t msg;
if (not zmq_sock.recv(&msg)) {
MDEBUG("zmq recv error\n");
return -1;
}
if (msg.size() % sizeof(complexf) != 0) {
MDEBUG("Received incomplete size %zu\n", msg.size());
return -1;
}
samps_read = msg.size() / sizeof(complexf);
sent = output_uhd.Transmit((complexf*)msg.data(), samps_read, sample_time);
aligner.push_tx_samples((complexf*)msg.data(), samps_read, sample_time);
}
if (sample_time - last_print_time > 1) {
//MDEBUG("Tx %zu samples at t=%f\n", samps_read, sample_time);
last_print_time = sample_time;
}
total_samps_read += samps_read;
try {
std::string keyname = cloud.handle_event();
if (keyname == "l") {
user_delay += 1;
std::cerr << "User delay: " << user_delay << std::endl;
}
else if (keyname == "e") {
user_delay -= 1;
std::cerr << "User delay: " << user_delay << std::endl;
}
else if (keyname == "z") {
rxgain -= 1;
output_uhd.SetRxGain(rxgain);
}
else if (keyname == "a") {
rxgain += 1;
output_uhd.SetRxGain(rxgain);
}
else if (keyname == "x") {
txgain -= 1;
output_uhd.SetTxGain(txgain);
}
else if (keyname == "s") {
txgain += 1;
output_uhd.SetTxGain(txgain);
}
else if (not keyname.empty()) {
std::cerr << "Press L for later, E for earlier, Z/A to decrease/increase RX gain, X/S for TX gain" << std::endl;
}
}
catch (sdl_quit &e) {
running = false;
}
cloud.draw();
}
while (samps_read and sent and running);
MDEBUG("Leaving main loop with running=%d\n", running ? 1 : 0);
running = false;
receive_thread.join();
correlator_thread.join();
}