/*
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
Copyright (C) 2017
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
DESCRIPTION:
This presents a TCP socket to an external tool which calculates
a Digital Predistortion model from a short sequence of transmit
samples and corresponding receive samples.
*/
/*
This file is part of ODR-DabMod.
ODR-DabMod is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
ODR-DabMod is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with ODR-DabMod. If not, see .
*/
#include
#include
#include
#include "OutputUHDFeedback.h"
#include "Utils.h"
using namespace std;
typedef std::complex complexf;
OutputUHDFeedback::OutputUHDFeedback()
{
running = false;
}
void OutputUHDFeedback::setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port)
{
myUsrp = usrp;
burstRequest.state = BurstRequestState::None;
if (port) {
m_port = port;
running = true;
rx_burst_thread = boost::thread(&OutputUHDFeedback::ReceiveBurstThread, this);
burst_tcp_thread = boost::thread(&OutputUHDFeedback::ServeFeedbackThread, this);
}
}
OutputUHDFeedback::~OutputUHDFeedback()
{
running = false;
rx_burst_thread.join();
burst_tcp_thread.join();
}
void OutputUHDFeedback::set_tx_frame(
const std::vector &buf,
const struct frame_timestamp& ts)
{
boost::mutex::scoped_lock lock(burstRequest.mutex);
if (burstRequest.state == BurstRequestState::SaveTransmitFrame) {
const size_t n = std::min(
burstRequest.frame_length * sizeof(complexf), buf.size());
burstRequest.tx_samples.clear();
burstRequest.tx_samples.resize(n);
copy(buf.begin(), buf.begin() + n, burstRequest.tx_samples.begin());
burstRequest.tx_second = ts.timestamp_sec;
burstRequest.tx_pps = ts.timestamp_pps;
// Prepare the next state
burstRequest.rx_second = ts.timestamp_sec;
burstRequest.rx_pps = ts.timestamp_pps;
burstRequest.state = BurstRequestState::SaveReceiveFrame;
lock.unlock();
burstRequest.mutex_notification.notify_one();
}
else {
lock.unlock();
}
}
void OutputUHDFeedback::ReceiveBurstThread()
{
set_thread_name("uhdreceiveburst");
uhd::stream_args_t stream_args("fc32"); //complex floats
auto rxStream = myUsrp->get_rx_stream(stream_args);
while (running) {
boost::mutex::scoped_lock lock(burstRequest.mutex);
while (burstRequest.state != BurstRequestState::SaveReceiveFrame) {
if (not running) break;
burstRequest.mutex_notification.wait(lock);
}
if (not running) break;
uhd::stream_cmd_t cmd(
uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE);
cmd.num_samps = burstRequest.frame_length;
cmd.stream_now = false;
double pps = burstRequest.rx_pps / 16384000.0;
cmd.time_spec = uhd::time_spec_t(burstRequest.rx_second, pps);
rxStream->issue_stream_cmd(cmd);
uhd::rx_metadata_t md;
burstRequest.rx_samples.resize(burstRequest.frame_length * sizeof(complexf));
rxStream->recv(&burstRequest.rx_samples[0], burstRequest.frame_length, md);
burstRequest.rx_second = md.time_spec.get_full_secs();
burstRequest.rx_pps = md.time_spec.get_frac_secs() * 16384000.0;
burstRequest.state = BurstRequestState::Acquired;
lock.unlock();
burstRequest.mutex_notification.notify_one();
}
}
void OutputUHDFeedback::ServeFeedbackThread()
{
set_thread_name("uhdservefeedback");
int server_sock = -1;
try {
if ((server_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
throw std::runtime_error("Can't create TCP socket");
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(m_port);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(server_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
throw std::runtime_error("Can't bind TCP socket");
}
if (listen(server_sock, 1) < 0) {
throw std::runtime_error("Can't listen TCP socket");
}
while (running) {
struct sockaddr_in client;
socklen_t client_len = sizeof(client);
int client_sock = accept(server_sock,
(struct sockaddr*)&client, &client_len);
if (client_sock < 0) {
throw runtime_error("Could not establish new connection");
}
while (running) {
uint8_t request_version = 0;
int read = recv(client_sock, &request_version, 1, 0);
if (!read) break; // done reading
if (read < 0) {
etiLog.level(info) <<
"DPD Feedback Server Client read request verson failed";
}
if (request_version != 1) {
etiLog.level(info) << "DPD Feedback Server wrong request version";
break;
}
uint32_t num_samples = 0;
read = recv(client_sock, &num_samples, 4, 0);
if (!read) break; // done reading
if (read < 0) {
etiLog.level(info) <<
"DPD Feedback Server Client read num samples failed";
}
// We are ready to issue the request now
{
boost::mutex::scoped_lock lock(burstRequest.mutex);
burstRequest.frame_length = num_samples;
burstRequest.state = BurstRequestState::SaveTransmitFrame;
lock.unlock();
}
// Wait for the result to be ready
boost::mutex::scoped_lock lock(burstRequest.mutex);
while (burstRequest.state != BurstRequestState::Acquired) {
if (not running) break;
burstRequest.mutex_notification.wait(lock);
}
burstRequest.state = BurstRequestState::None;
lock.unlock();
if (send(client_sock,
&burstRequest.tx_second,
sizeof(burstRequest.tx_second),
0) < 0) {
etiLog.level(info) <<
"DPD Feedback Server Client send tx_second failed";
break;
}
if (send(client_sock,
&burstRequest.tx_pps,
sizeof(burstRequest.tx_pps),
0) < 0) {
etiLog.level(info) <<
"DPD Feedback Server Client send tx_pps failed";
break;
}
#warning "Send buf"
}
close(client_sock);
}
}
catch (runtime_error &e) {
etiLog.level(error) << "DPD Feedback Server fault: " << e.what();
}
running = false;
if (server_sock != -1) {
close(server_sock);
}
}