aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/OutputUHDFeedback.cpp120
-rw-r--r--src/OutputUHDFeedback.h12
2 files changed, 107 insertions, 25 deletions
diff --git a/src/OutputUHDFeedback.cpp b/src/OutputUHDFeedback.cpp
index 73497a1..8584839 100644
--- a/src/OutputUHDFeedback.cpp
+++ b/src/OutputUHDFeedback.cpp
@@ -40,6 +40,7 @@ DESCRIPTION:
#include <complex>
#include <uhd/types/stream_cmd.hpp>
#include <sys/socket.h>
+#include <poll.h>
#include "OutputUHDFeedback.h"
#include "Utils.h"
@@ -48,7 +49,7 @@ typedef std::complex<float> complexf;
OutputUHDFeedback::OutputUHDFeedback()
{
- m_running = false;
+ m_running.store(false);
}
void OutputUHDFeedback::setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port, uint32_t sampleRate)
@@ -59,7 +60,7 @@ void OutputUHDFeedback::setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port, u
if (port) {
m_port = port;
- m_running = true;
+ m_running.store(true);
rx_burst_thread = boost::thread(&OutputUHDFeedback::ReceiveBurstThread, this);
burst_tcp_thread = boost::thread(&OutputUHDFeedback::ServeFeedbackThread, this);
@@ -68,7 +69,11 @@ void OutputUHDFeedback::setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port, u
OutputUHDFeedback::~OutputUHDFeedback()
{
- m_running = false;
+ m_running.store(false);
+
+ rx_burst_thread.interrupt();
+ burst_tcp_thread.interrupt();
+
rx_burst_thread.join();
burst_tcp_thread.join();
}
@@ -79,9 +84,13 @@ void OutputUHDFeedback::set_tx_frame(
{
boost::mutex::scoped_lock lock(burstRequest.mutex);
+ assert(buf.size() % sizeof(complexf) == 0);
+
if (burstRequest.state == BurstRequestState::SaveTransmitFrame) {
const size_t n = std::min(
- burstRequest.frame_length * sizeof(complexf), buf.size());
+ burstRequest.num_samples * sizeof(complexf), buf.size());
+
+ burstRequest.num_samples = n / sizeof(complexf);
burstRequest.tx_samples.clear();
burstRequest.tx_samples.resize(n);
@@ -129,7 +138,7 @@ void OutputUHDFeedback::ReceiveBurstThread()
uhd::stream_cmd_t cmd(
uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE);
- cmd.num_samps = burstRequest.frame_length;
+ cmd.num_samps = burstRequest.num_samples;
cmd.stream_now = false;
double pps = burstRequest.rx_pps / 16384000.0;
@@ -138,9 +147,10 @@ void OutputUHDFeedback::ReceiveBurstThread()
rxStream->issue_stream_cmd(cmd);
uhd::rx_metadata_t md;
- burstRequest.rx_samples.resize(burstRequest.frame_length * sizeof(complexf));
- rxStream->recv(&burstRequest.rx_samples[0], burstRequest.frame_length, md);
+ burstRequest.rx_samples.resize(burstRequest.num_samples * sizeof(complexf));
+ rxStream->recv(&burstRequest.rx_samples[0], burstRequest.num_samples, md);
+ // The recv might have happened at another time than requested
burstRequest.rx_second = md.time_spec.get_full_secs();
burstRequest.rx_pps = md.time_spec.get_frac_secs() * 16384000.0;
@@ -151,13 +161,32 @@ void OutputUHDFeedback::ReceiveBurstThread()
}
}
+static int accept_with_timeout(int server_socket, int timeout_ms, struct sockaddr_in *client)
+{
+ struct pollfd fds[1];
+ fds[0].fd = server_socket;
+ fds[0].events = POLLIN | POLLOUT;
+
+ int retval = poll(fds, 1, timeout_ms);
+
+ if (retval == -1) {
+ throw std::runtime_error("TCP Socket accept error: " + to_string(errno));
+ }
+ else if (retval) {
+ socklen_t client_len = sizeof(struct sockaddr_in);
+ return accept(server_socket, (struct sockaddr*)&client, &client_len);
+ }
+ else {
+ return -2;
+ }
+}
+
void OutputUHDFeedback::ServeFeedbackThread()
{
set_thread_name("uhdservefeedback");
- int server_sock = -1;
try {
- if ((server_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+ if ((m_server_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
throw std::runtime_error("Can't create TCP socket");
}
@@ -166,31 +195,32 @@ void OutputUHDFeedback::ServeFeedbackThread()
addr.sin_port = htons(m_port);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
- if (bind(server_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
+ if (bind(m_server_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
throw std::runtime_error("Can't bind TCP socket");
}
- if (listen(server_sock, 1) < 0) {
+ if (listen(m_server_sock, 1) < 0) {
throw std::runtime_error("Can't listen TCP socket");
}
while (m_running) {
struct sockaddr_in client;
- socklen_t client_len = sizeof(client);
- int client_sock = accept(server_sock,
- (struct sockaddr*)&client, &client_len);
+ int client_sock = accept_with_timeout(m_server_sock, 1000, &client);
- if (client_sock < 0) {
+ if (client_sock == -1) {
throw runtime_error("Could not establish new connection");
}
+ else if (client_sock == -2) {
+ continue;
+ }
while (m_running) {
uint8_t request_version = 0;
- int read = recv(client_sock, &request_version, 1, 0);
+ ssize_t read = recv(client_sock, &request_version, 1, 0);
if (!read) break; // done reading
if (read < 0) {
etiLog.level(info) <<
- "DPD Feedback Server Client read request verson failed";
+ "DPD Feedback Server Client read request version failed";
}
if (request_version != 1) {
@@ -209,7 +239,7 @@ void OutputUHDFeedback::ServeFeedbackThread()
// We are ready to issue the request now
{
boost::mutex::scoped_lock lock(burstRequest.mutex);
- burstRequest.frame_length = num_samples;
+ burstRequest.num_samples = num_samples;
burstRequest.state = BurstRequestState::SaveTransmitFrame;
lock.unlock();
@@ -226,6 +256,15 @@ void OutputUHDFeedback::ServeFeedbackThread()
lock.unlock();
if (send(client_sock,
+ &burstRequest.num_samples,
+ sizeof(burstRequest.num_samples),
+ 0) < 0) {
+ etiLog.level(info) <<
+ "DPD Feedback Server Client send num_samples failed";
+ break;
+ }
+
+ if (send(client_sock,
&burstRequest.tx_second,
sizeof(burstRequest.tx_second),
0) < 0) {
@@ -243,7 +282,45 @@ void OutputUHDFeedback::ServeFeedbackThread()
break;
}
-#warning "Send buf"
+ const size_t frame_bytes = burstRequest.num_samples * sizeof(complexf);
+
+ assert(burstRequest.tx_samples.size() == frame_bytes);
+ if (send(client_sock,
+ &burstRequest.tx_samples[0],
+ frame_bytes,
+ 0) < 0) {
+ etiLog.level(info) <<
+ "DPD Feedback Server Client send tx_frame failed";
+ break;
+ }
+
+ if (send(client_sock,
+ &burstRequest.rx_second,
+ sizeof(burstRequest.rx_second),
+ 0) < 0) {
+ etiLog.level(info) <<
+ "DPD Feedback Server Client send rx_second failed";
+ break;
+ }
+
+ if (send(client_sock,
+ &burstRequest.rx_pps,
+ sizeof(burstRequest.rx_pps),
+ 0) < 0) {
+ etiLog.level(info) <<
+ "DPD Feedback Server Client send rx_pps failed";
+ break;
+ }
+
+ assert(burstRequest.rx_samples.size() == frame_bytes);
+ if (send(client_sock,
+ &burstRequest.rx_samples[0],
+ frame_bytes,
+ 0) < 0) {
+ etiLog.level(info) <<
+ "DPD Feedback Server Client send rx_frame failed";
+ break;
+ }
}
close(client_sock);
@@ -255,8 +332,9 @@ void OutputUHDFeedback::ServeFeedbackThread()
m_running = false;
- if (server_sock != -1) {
- close(server_sock);
+ if (m_server_sock != -1) {
+ close(m_server_sock);
+ m_server_sock = -1;
}
}
diff --git a/src/OutputUHDFeedback.h b/src/OutputUHDFeedback.h
index afc06b0..32668b6 100644
--- a/src/OutputUHDFeedback.h
+++ b/src/OutputUHDFeedback.h
@@ -44,6 +44,7 @@ DESCRIPTION:
#include <boost/thread.hpp>
#include <memory>
#include <string>
+#include <atomic>
#include "Log.h"
#include "TimestampDecoder.h"
@@ -62,21 +63,23 @@ struct UHDReceiveBurstRequest {
BurstRequestState state;
- // In the SaveTransmit states, frame_length samples are saved into
+ // In the SaveTransmit states, num_samples complexf samples are saved into
// the vectors
- size_t frame_length;
+ size_t num_samples;
// The timestamp of the first sample of the TX buffers
uint32_t tx_second;
uint32_t tx_pps; // in units of 1/16384000s
+ // Samples contain complexf, but since our internal representation is uint8_t
+ // we keep it like that
std::vector<uint8_t> tx_samples;
// The timestamp of the first sample of the RX buffers
uint32_t rx_second;
uint32_t rx_pps;
- std::vector<uint8_t> rx_samples;
+ std::vector<uint8_t> rx_samples; // Also, actually complexf
};
// Serve TX samples and RX feedback samples over a TCP connection
@@ -104,7 +107,8 @@ class OutputUHDFeedback {
UHDReceiveBurstRequest burstRequest;
- bool m_running = false;
+ std::atomic_bool m_running;
+ int m_server_sock = -1;
uint16_t m_port = 0;
uint32_t m_sampleRate = 0;
uhd::usrp::multi_usrp::sptr m_usrp;