From 2b877e304d52c406720050aa55eed97b6f7869be Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 7 May 2017 14:22:21 +0200 Subject: Add WIP for OutputUHDFeedback --- src/OutputUHDFeedback.cpp | 245 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 245 insertions(+) create mode 100644 src/OutputUHDFeedback.cpp (limited to 'src/OutputUHDFeedback.cpp') diff --git a/src/OutputUHDFeedback.cpp b/src/OutputUHDFeedback.cpp new file mode 100644 index 0000000..dfe0f74 --- /dev/null +++ b/src/OutputUHDFeedback.cpp @@ -0,0 +1,245 @@ +/* + Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the + Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2017 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org + +DESCRIPTION: + This presents a TCP socket to an external tool which calculates + a Digital Predistortion model from a short sequence of transmit + samples and corresponding receive samples. +*/ + +/* + This file is part of ODR-DabMod. + + ODR-DabMod is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMod is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMod. If not, see . + */ + +#include +#include +#include +#include "OutputUHDFeedback.h" +#include "Utils.h" + +using namespace std; +typedef std::complex complexf; + +OutputUHDFeedback::OutputUHDFeedback() +{ + running = false; +} + +void OutputUHDFeedback::setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port) +{ + myUsrp = usrp; + burstRequest.state = BurstRequestState::None; + + if (port) { + m_port = port; + running = true; + + rx_burst_thread = boost::thread(&OutputUHDFeedback::ReceiveBurstThread, this); + burst_tcp_thread = boost::thread(&OutputUHDFeedback::ServeFeedbackThread, this); + } +} + +OutputUHDFeedback::~OutputUHDFeedback() +{ + running = false; + rx_burst_thread.join(); + burst_tcp_thread.join(); +} + +void OutputUHDFeedback::set_tx_frame( + const std::vector &buf, + const struct frame_timestamp& ts) +{ + boost::mutex::scoped_lock lock(burstRequest.mutex); + + if (burstRequest.state == BurstRequestState::SaveTransmitFrame) { + const size_t n = std::min( + burstRequest.frame_length * sizeof(complexf), buf.size()); + + burstRequest.tx_samples.clear(); + burstRequest.tx_samples.resize(n); + copy(buf.begin(), buf.begin() + n, burstRequest.tx_samples.begin()); + + burstRequest.tx_second = ts.timestamp_sec; + burstRequest.tx_pps = ts.timestamp_pps; + + // Prepare the next state + burstRequest.rx_second = ts.timestamp_sec; + burstRequest.rx_pps = ts.timestamp_pps; + burstRequest.state = BurstRequestState::SaveReceiveFrame; + + lock.unlock(); + burstRequest.mutex_notification.notify_one(); + } + else { + lock.unlock(); + } +} + +void OutputUHDFeedback::ReceiveBurstThread() +{ + set_thread_name("uhdreceiveburst"); + + uhd::stream_args_t stream_args("fc32"); //complex floats + auto rxStream = myUsrp->get_rx_stream(stream_args); + + while (running) { + boost::mutex::scoped_lock lock(burstRequest.mutex); + while (burstRequest.state != BurstRequestState::SaveReceiveFrame) { + if (not running) break; + burstRequest.mutex_notification.wait(lock); + } + + if (not running) break; + + uhd::stream_cmd_t cmd( + uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE); + cmd.num_samps = burstRequest.frame_length; + cmd.stream_now = false; + + double pps = burstRequest.rx_pps / 16384000.0; + cmd.time_spec = uhd::time_spec_t(burstRequest.rx_second, pps); + + rxStream->issue_stream_cmd(cmd); + + uhd::rx_metadata_t md; + burstRequest.rx_samples.resize(burstRequest.frame_length * sizeof(complexf)); + rxStream->recv(&burstRequest.rx_samples[0], burstRequest.frame_length, md); + + burstRequest.rx_second = md.time_spec.get_full_secs(); + burstRequest.rx_pps = md.time_spec.get_frac_secs() * 16384000.0; + + burstRequest.state = BurstRequestState::Acquired; + + lock.unlock(); + burstRequest.mutex_notification.notify_one(); + } +} + +void OutputUHDFeedback::ServeFeedbackThread() +{ + set_thread_name("uhdservefeedback"); + + int server_sock = -1; + try { + if ((server_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { + throw std::runtime_error("Can't create TCP socket"); + } + + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(m_port); + addr.sin_addr.s_addr = htonl(INADDR_ANY); + + if (bind(server_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { + throw std::runtime_error("Can't bind TCP socket"); + } + + if (listen(server_sock, 1) < 0) { + throw std::runtime_error("Can't listen TCP socket"); + } + + while (running) { + struct sockaddr_in client; + socklen_t client_len = sizeof(client); + int client_sock = accept(server_sock, + (struct sockaddr*)&client, &client_len); + + if (client_sock < 0) { + throw runtime_error("Could not establish new connection"); + } + + while (running) { + uint8_t request_version = 0; + int read = recv(client_sock, &request_version, 1, 0); + if (!read) break; // done reading + if (read < 0) { + etiLog.level(info) << + "DPD Feedback Server Client read request verson failed"; + } + + if (request_version != 1) { + etiLog.level(info) << "DPD Feedback Server wrong request version"; + break; + } + + uint32_t num_samples = 0; + read = recv(client_sock, &num_samples, 4, 0); + if (!read) break; // done reading + if (read < 0) { + etiLog.level(info) << + "DPD Feedback Server Client read num samples failed"; + } + + // We are ready to issue the request now + { + boost::mutex::scoped_lock lock(burstRequest.mutex); + burstRequest.frame_length = num_samples; + burstRequest.state = BurstRequestState::SaveTransmitFrame; + + lock.unlock(); + } + + // Wait for the result to be ready + boost::mutex::scoped_lock lock(burstRequest.mutex); + while (burstRequest.state != BurstRequestState::Acquired) { + if (not running) break; + burstRequest.mutex_notification.wait(lock); + } + + burstRequest.state = BurstRequestState::None; + lock.unlock(); + + if (send(client_sock, + &burstRequest.tx_second, + sizeof(burstRequest.tx_second), + 0) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send tx_second failed"; + break; + } + + if (send(client_sock, + &burstRequest.tx_pps, + sizeof(burstRequest.tx_pps), + 0) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send tx_pps failed"; + break; + } + +#warning "Send buf" + } + + close(client_sock); + } + } + catch (runtime_error &e) { + etiLog.level(error) << "DPD Feedback Server fault: " << e.what(); + } + + running = false; + + if (server_sock != -1) { + close(server_sock); + } +} -- cgit v1.2.3