summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2017-05-12 11:23:15 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2017-05-12 11:23:15 +0200
commita656cee6c9c230bb921c6bb6be0f0180460a96b4 (patch)
treed4e0ad440feb27de00a22f8ccccb23d84be014b1
parent201d2cd2e0431a5ea79fb69561c27555f3a03dc1 (diff)
downloaddabmod-a656cee6c9c230bb921c6bb6be0f0180460a96b4.tar.gz
dabmod-a656cee6c9c230bb921c6bb6be0f0180460a96b4.tar.bz2
dabmod-a656cee6c9c230bb921c6bb6be0f0180460a96b4.zip
DPD: handle incomplete frames
-rwxr-xr-xdpd/show_spectrum.py31
-rw-r--r--src/OutputUHD.cpp2
-rw-r--r--src/OutputUHDFeedback.cpp25
3 files changed, 46 insertions, 12 deletions
diff --git a/dpd/show_spectrum.py b/dpd/show_spectrum.py
index 6c489e0..c1d5fe5 100755
--- a/dpd/show_spectrum.py
+++ b/dpd/show_spectrum.py
@@ -30,6 +30,15 @@ if len(sys.argv) != 3:
port = int(sys.argv[1])
num_samps_to_request = int(sys.argv[2])
+def recv_exact(sock, num_bytes):
+ bufs = []
+ while num_bytes > 0:
+ b = sock.recv(num_bytes)
+ if len(b) == 0:
+ break
+ num_bytes -= len(b)
+ bufs.append(b)
+ return b''.join(bufs)
def get_samples(port, num_samps_to_request):
"""Connect to ODR-DabMod, retrieve TX and RX samples, load
@@ -49,20 +58,26 @@ def get_samples(port, num_samps_to_request):
s.sendall(struct.pack("=I", num_samps_to_request))
print("Wait for TX metadata")
- num_samps, tx_second, tx_pps = struct.unpack("=III", s.recv(12))
+ num_samps, tx_second, tx_pps = struct.unpack("=III", recv_exact(s, 12))
tx_ts = tx_second + tx_pps / 16384000.0
- print("Receiving {} TX samples".format(num_samps))
- txframe_bytes = s.recv(num_samps * SIZEOF_SAMPLE)
- txframe = np.fromstring(txframe_bytes, dtype=np.complex64)
+ if num_samps > 0:
+ print("Receiving {} TX samples".format(num_samps))
+ txframe_bytes = recv_exact(s, num_samps * SIZEOF_SAMPLE)
+ txframe = np.fromstring(txframe_bytes, dtype=np.complex64)
+ else:
+ txframe = np.array([], dtype=np.complex64)
print("Wait for RX metadata")
- rx_second, rx_pps = struct.unpack("=II", s.recv(8))
+ rx_second, rx_pps = struct.unpack("=II", recv_exact(s, 8))
rx_ts = rx_second + rx_pps / 16384000.0
- print("Receiving {} RX samples".format(num_samps))
- rxframe_bytes = s.recv(num_samps * SIZEOF_SAMPLE)
- rxframe = np.fromstring(rxframe_bytes, dtype=np.complex64)
+ if num_samps > 0:
+ print("Receiving {} RX samples".format(num_samps))
+ rxframe_bytes = recv_exact(s, num_samps * SIZEOF_SAMPLE)
+ rxframe = np.fromstring(rxframe_bytes, dtype=np.complex64)
+ else:
+ txframe = np.array([], dtype=np.complex64)
print("Disconnecting")
s.close()
diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp
index 6edf7df..5e9e17c 100644
--- a/src/OutputUHD.cpp
+++ b/src/OutputUHD.cpp
@@ -50,7 +50,7 @@
using namespace std;
// Maximum number of frames that can wait in uwd.frames
-static const size_t FRAMES_MAX_SIZE = 2;
+static const size_t FRAMES_MAX_SIZE = 8;
typedef std::complex<float> complexf;
diff --git a/src/OutputUHDFeedback.cpp b/src/OutputUHDFeedback.cpp
index 09b73ba..3ef5648 100644
--- a/src/OutputUHDFeedback.cpp
+++ b/src/OutputUHDFeedback.cpp
@@ -138,6 +138,8 @@ void OutputUHDFeedback::ReceiveBurstThread()
if (not m_running) break;
+ etiLog.level(debug) << "Prepare RX stream command for " << burstRequest.num_samples;
+
uhd::stream_cmd_t cmd(
uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE);
cmd.num_samps = burstRequest.num_samples;
@@ -146,16 +148,28 @@ void OutputUHDFeedback::ReceiveBurstThread()
double pps = burstRequest.rx_pps / 16384000.0;
cmd.time_spec = uhd::time_spec_t(burstRequest.rx_second, pps);
+ const double usrp_time = m_usrp->get_time_now().get_real_secs();
+ const double cmd_time = cmd.time_spec.get_real_secs();
+
+ etiLog.level(debug) <<
+ "RX stream command ts=" << std::fixed << cmd_time << " Delta=" << cmd_time - usrp_time;
+
rxStream->issue_stream_cmd(cmd);
uhd::rx_metadata_t md;
burstRequest.rx_samples.resize(burstRequest.num_samples * sizeof(complexf));
- rxStream->recv(&burstRequest.rx_samples[0], burstRequest.num_samples, md);
+ size_t samples_read = rxStream->recv(&burstRequest.rx_samples[0], burstRequest.num_samples, md);
+ assert(samples_read <= burstRequest.num_samples);
+ burstRequest.rx_samples.resize(samples_read * sizeof(complexf));
// The recv might have happened at another time than requested
burstRequest.rx_second = md.time_spec.get_full_secs();
burstRequest.rx_pps = md.time_spec.get_frac_secs() * 16384000.0;
+ etiLog.level(debug) << "Read " << samples_read << " RX feedback samples "
+ << "at time " << std::fixed << burstRequest.tx_second << "." <<
+ burstRequest.tx_pps / 16384000.0;
+
burstRequest.state = BurstRequestState::Acquired;
lock.unlock();
@@ -258,6 +272,11 @@ void OutputUHDFeedback::ServeFeedbackThread()
burstRequest.state = BurstRequestState::None;
lock.unlock();
+ burstRequest.num_samples = std::min(burstRequest.num_samples,
+ std::min(
+ burstRequest.tx_samples.size() / sizeof(complexf),
+ burstRequest.rx_samples.size() / sizeof(complexf)));
+
if (send(client_sock,
&burstRequest.num_samples,
sizeof(burstRequest.num_samples),
@@ -287,7 +306,7 @@ void OutputUHDFeedback::ServeFeedbackThread()
const size_t frame_bytes = burstRequest.num_samples * sizeof(complexf);
- assert(burstRequest.tx_samples.size() == frame_bytes);
+ assert(burstRequest.tx_samples.size() >= frame_bytes);
if (send(client_sock,
&burstRequest.tx_samples[0],
frame_bytes,
@@ -315,7 +334,7 @@ void OutputUHDFeedback::ServeFeedbackThread()
break;
}
- assert(burstRequest.rx_samples.size() == frame_bytes);
+ assert(burstRequest.rx_samples.size() >= frame_bytes);
if (send(client_sock,
&burstRequest.rx_samples[0],
frame_bytes,