aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/OutputUHDFeedback.cpp69
1 files changed, 42 insertions, 27 deletions
diff --git a/src/OutputUHDFeedback.cpp b/src/OutputUHDFeedback.cpp
index 3ef5648..9e3aab2 100644
--- a/src/OutputUHDFeedback.cpp
+++ b/src/OutputUHDFeedback.cpp
@@ -138,8 +138,6 @@ void OutputUHDFeedback::ReceiveBurstThread()
if (not m_running) break;
- etiLog.level(debug) << "Prepare RX stream command for " << burstRequest.num_samples;
-
uhd::stream_cmd_t cmd(
uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE);
cmd.num_samps = burstRequest.num_samples;
@@ -148,6 +146,10 @@ void OutputUHDFeedback::ReceiveBurstThread()
double pps = burstRequest.rx_pps / 16384000.0;
cmd.time_spec = uhd::time_spec_t(burstRequest.rx_second, pps);
+ // We need to free the mutex while we recv(), because otherwise we block the
+ // TX thread
+ lock.unlock();
+
const double usrp_time = m_usrp->get_time_now().get_real_secs();
const double cmd_time = cmd.time_spec.get_real_secs();
@@ -157,9 +159,14 @@ void OutputUHDFeedback::ReceiveBurstThread()
rxStream->issue_stream_cmd(cmd);
uhd::rx_metadata_t md;
- burstRequest.rx_samples.resize(burstRequest.num_samples * sizeof(complexf));
- size_t samples_read = rxStream->recv(&burstRequest.rx_samples[0], burstRequest.num_samples, md);
- assert(samples_read <= burstRequest.num_samples);
+
+ std::vector<uint8_t> buf(cmd.num_samps * sizeof(complexf));
+
+ const double timeout = 60;
+ size_t samples_read = rxStream->recv(&buf[0], cmd.num_samps, md, timeout);
+
+ lock.lock();
+ burstRequest.rx_samples = std::move(buf);
burstRequest.rx_samples.resize(samples_read * sizeof(complexf));
// The recv might have happened at another time than requested
@@ -197,6 +204,22 @@ static int accept_with_timeout(int server_socket, int timeout_ms, struct sockadd
}
}
+static ssize_t sendall(int socket, const void *buffer, size_t buflen)
+{
+ uint8_t *buf = (uint8_t*)buffer;
+ while (buflen > 0) {
+ ssize_t sent = send(socket, buf, buflen, 0);
+ if (sent < 0) {
+ return -1;
+ }
+ else {
+ buf += sent;
+ buflen -= sent;
+ }
+ }
+ return buflen;
+}
+
void OutputUHDFeedback::ServeFeedbackThread()
{
set_thread_name("uhdservefeedback");
@@ -277,28 +300,24 @@ void OutputUHDFeedback::ServeFeedbackThread()
burstRequest.tx_samples.size() / sizeof(complexf),
burstRequest.rx_samples.size() / sizeof(complexf)));
- if (send(client_sock,
- &burstRequest.num_samples,
- sizeof(burstRequest.num_samples),
- 0) < 0) {
+ uint32_t num_samples_32 = burstRequest.num_samples;
+ if (sendall(client_sock, &num_samples_32, sizeof(num_samples_32)) < 0) {
etiLog.level(info) <<
"DPD Feedback Server Client send num_samples failed";
break;
}
- if (send(client_sock,
+ if (sendall(client_sock,
&burstRequest.tx_second,
- sizeof(burstRequest.tx_second),
- 0) < 0) {
+ sizeof(burstRequest.tx_second)) < 0) {
etiLog.level(info) <<
"DPD Feedback Server Client send tx_second failed";
break;
}
- if (send(client_sock,
+ if (sendall(client_sock,
&burstRequest.tx_pps,
- sizeof(burstRequest.tx_pps),
- 0) < 0) {
+ sizeof(burstRequest.tx_pps)) < 0) {
etiLog.level(info) <<
"DPD Feedback Server Client send tx_pps failed";
break;
@@ -307,38 +326,34 @@ void OutputUHDFeedback::ServeFeedbackThread()
const size_t frame_bytes = burstRequest.num_samples * sizeof(complexf);
assert(burstRequest.tx_samples.size() >= frame_bytes);
- if (send(client_sock,
+ if (sendall(client_sock,
&burstRequest.tx_samples[0],
- frame_bytes,
- 0) < 0) {
+ frame_bytes) < 0) {
etiLog.level(info) <<
"DPD Feedback Server Client send tx_frame failed";
break;
}
- if (send(client_sock,
+ if (sendall(client_sock,
&burstRequest.rx_second,
- sizeof(burstRequest.rx_second),
- 0) < 0) {
+ sizeof(burstRequest.rx_second)) < 0) {
etiLog.level(info) <<
"DPD Feedback Server Client send rx_second failed";
break;
}
- if (send(client_sock,
+ if (sendall(client_sock,
&burstRequest.rx_pps,
- sizeof(burstRequest.rx_pps),
- 0) < 0) {
+ sizeof(burstRequest.rx_pps)) < 0) {
etiLog.level(info) <<
"DPD Feedback Server Client send rx_pps failed";
break;
}
assert(burstRequest.rx_samples.size() >= frame_bytes);
- if (send(client_sock,
+ if (sendall(client_sock,
&burstRequest.rx_samples[0],
- frame_bytes,
- 0) < 0) {
+ frame_bytes) < 0) {
etiLog.level(info) <<
"DPD Feedback Server Client send rx_frame failed";
break;