From a656cee6c9c230bb921c6bb6be0f0180460a96b4 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 12 May 2017 11:23:15 +0200 Subject: DPD: handle incomplete frames --- dpd/show_spectrum.py | 31 +++++++++++++++++++++++-------- src/OutputUHD.cpp | 2 +- src/OutputUHDFeedback.cpp | 25 ++++++++++++++++++++++--- 3 files changed, 46 insertions(+), 12 deletions(-) diff --git a/dpd/show_spectrum.py b/dpd/show_spectrum.py index 6c489e0..c1d5fe5 100755 --- a/dpd/show_spectrum.py +++ b/dpd/show_spectrum.py @@ -30,6 +30,15 @@ if len(sys.argv) != 3: port = int(sys.argv[1]) num_samps_to_request = int(sys.argv[2]) +def recv_exact(sock, num_bytes): + bufs = [] + while num_bytes > 0: + b = sock.recv(num_bytes) + if len(b) == 0: + break + num_bytes -= len(b) + bufs.append(b) + return b''.join(bufs) def get_samples(port, num_samps_to_request): """Connect to ODR-DabMod, retrieve TX and RX samples, load @@ -49,20 +58,26 @@ def get_samples(port, num_samps_to_request): s.sendall(struct.pack("=I", num_samps_to_request)) print("Wait for TX metadata") - num_samps, tx_second, tx_pps = struct.unpack("=III", s.recv(12)) + num_samps, tx_second, tx_pps = struct.unpack("=III", recv_exact(s, 12)) tx_ts = tx_second + tx_pps / 16384000.0 - print("Receiving {} TX samples".format(num_samps)) - txframe_bytes = s.recv(num_samps * SIZEOF_SAMPLE) - txframe = np.fromstring(txframe_bytes, dtype=np.complex64) + if num_samps > 0: + print("Receiving {} TX samples".format(num_samps)) + txframe_bytes = recv_exact(s, num_samps * SIZEOF_SAMPLE) + txframe = np.fromstring(txframe_bytes, dtype=np.complex64) + else: + txframe = np.array([], dtype=np.complex64) print("Wait for RX metadata") - rx_second, rx_pps = struct.unpack("=II", s.recv(8)) + rx_second, rx_pps = struct.unpack("=II", recv_exact(s, 8)) rx_ts = rx_second + rx_pps / 16384000.0 - print("Receiving {} RX samples".format(num_samps)) - rxframe_bytes = s.recv(num_samps * SIZEOF_SAMPLE) - rxframe = np.fromstring(rxframe_bytes, dtype=np.complex64) + if num_samps > 0: + print("Receiving {} RX samples".format(num_samps)) + rxframe_bytes = recv_exact(s, num_samps * SIZEOF_SAMPLE) + rxframe = np.fromstring(rxframe_bytes, dtype=np.complex64) + else: + txframe = np.array([], dtype=np.complex64) print("Disconnecting") s.close() diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 6edf7df..5e9e17c 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -50,7 +50,7 @@ using namespace std; // Maximum number of frames that can wait in uwd.frames -static const size_t FRAMES_MAX_SIZE = 2; +static const size_t FRAMES_MAX_SIZE = 8; typedef std::complex complexf; diff --git a/src/OutputUHDFeedback.cpp b/src/OutputUHDFeedback.cpp index 09b73ba..3ef5648 100644 --- a/src/OutputUHDFeedback.cpp +++ b/src/OutputUHDFeedback.cpp @@ -138,6 +138,8 @@ void OutputUHDFeedback::ReceiveBurstThread() if (not m_running) break; + etiLog.level(debug) << "Prepare RX stream command for " << burstRequest.num_samples; + uhd::stream_cmd_t cmd( uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE); cmd.num_samps = burstRequest.num_samples; @@ -146,16 +148,28 @@ void OutputUHDFeedback::ReceiveBurstThread() double pps = burstRequest.rx_pps / 16384000.0; cmd.time_spec = uhd::time_spec_t(burstRequest.rx_second, pps); + const double usrp_time = m_usrp->get_time_now().get_real_secs(); + const double cmd_time = cmd.time_spec.get_real_secs(); + + etiLog.level(debug) << + "RX stream command ts=" << std::fixed << cmd_time << " Delta=" << cmd_time - usrp_time; + rxStream->issue_stream_cmd(cmd); uhd::rx_metadata_t md; burstRequest.rx_samples.resize(burstRequest.num_samples * sizeof(complexf)); - rxStream->recv(&burstRequest.rx_samples[0], burstRequest.num_samples, md); + size_t samples_read = rxStream->recv(&burstRequest.rx_samples[0], burstRequest.num_samples, md); + assert(samples_read <= burstRequest.num_samples); + burstRequest.rx_samples.resize(samples_read * sizeof(complexf)); // The recv might have happened at another time than requested burstRequest.rx_second = md.time_spec.get_full_secs(); burstRequest.rx_pps = md.time_spec.get_frac_secs() * 16384000.0; + etiLog.level(debug) << "Read " << samples_read << " RX feedback samples " + << "at time " << std::fixed << burstRequest.tx_second << "." << + burstRequest.tx_pps / 16384000.0; + burstRequest.state = BurstRequestState::Acquired; lock.unlock(); @@ -258,6 +272,11 @@ void OutputUHDFeedback::ServeFeedbackThread() burstRequest.state = BurstRequestState::None; lock.unlock(); + burstRequest.num_samples = std::min(burstRequest.num_samples, + std::min( + burstRequest.tx_samples.size() / sizeof(complexf), + burstRequest.rx_samples.size() / sizeof(complexf))); + if (send(client_sock, &burstRequest.num_samples, sizeof(burstRequest.num_samples), @@ -287,7 +306,7 @@ void OutputUHDFeedback::ServeFeedbackThread() const size_t frame_bytes = burstRequest.num_samples * sizeof(complexf); - assert(burstRequest.tx_samples.size() == frame_bytes); + assert(burstRequest.tx_samples.size() >= frame_bytes); if (send(client_sock, &burstRequest.tx_samples[0], frame_bytes, @@ -315,7 +334,7 @@ void OutputUHDFeedback::ServeFeedbackThread() break; } - assert(burstRequest.rx_samples.size() == frame_bytes); + assert(burstRequest.rx_samples.size() >= frame_bytes); if (send(client_sock, &burstRequest.rx_samples[0], frame_bytes, -- cgit v1.2.3