aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorandreas128 <Andreas>2016-11-22 15:36:08 +0100
committerandreas128 <Andreas>2016-11-22 15:36:08 +0100
commitffae3a0ce36edb00fe1e9b99b63fb155933f9b52 (patch)
tree44c44e205b33eff1776a35c74ea8b98beaefe121
parentf06932dea5ca0ba62f7c995fe92ad320eb365985 (diff)
downloadODR-StaticPrecorrection-ffae3a0ce36edb00fe1e9b99b63fb155933f9b52.tar.gz
ODR-StaticPrecorrection-ffae3a0ce36edb00fe1e9b99b63fb155933f9b52.tar.bz2
ODR-StaticPrecorrection-ffae3a0ce36edb00fe1e9b99b63fb155933f9b52.zip
Add error check for under runs
Add asynchronous message passing for uhd in order to read error messages.
-rw-r--r--__init__.py0
-rwxr-xr-xamplitude_ramp.py64
-rw-r--r--dual_tone.grc153
-rw-r--r--tcp_async.py74
4 files changed, 268 insertions, 23 deletions
diff --git a/__init__.py b/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/__init__.py
diff --git a/amplitude_ramp.py b/amplitude_ramp.py
index 6d41d29..c757ef8 100755
--- a/amplitude_ramp.py
+++ b/amplitude_ramp.py
@@ -32,6 +32,7 @@ import struct
import threading
from Queue import Queue
from dual_tone import dual_tone # our flowgraph!
+import tcp_async
# TCP ports used to communicate between the flowgraph and the python script
# The flowgraph interleaves 3 float streams :
@@ -47,7 +48,9 @@ def xrange(start, stop, step):
x += step
class RampGenerator(threading.Thread):
- def __init__(self, options):
+ tcpa = None
+
+ def __init__(self, options, tcpa):
threading.Thread.__init__(self)
self.event_queue_ = Queue()
self.in_queue_ = Queue()
@@ -58,6 +61,8 @@ class RampGenerator(threading.Thread):
self.ampl_step = float(options.ampl_step)
self.ampl_stop = float(options.ampl_stop)
+ self.tcpa = tcpa
+
def set_source_ampl(self, ampl):
self.event_queue_.put(ampl)
self.in_queue_.get()
@@ -89,36 +94,48 @@ class RampGenerator(threading.Thread):
measurements = []
for ampl in amplitudes:
- self.set_source_ampl(ampl)
+ measurement_correct = False
+ max_iter = 10
+ while measurement_correct == False and max_iter > 0:
+ max_iter -= 1
+
+ self.set_source_ampl(ampl)
- mag_gen_sum = 0
- phase_diff_sum = 0
- mag_feedback_sum = 0
+ mag_gen_sum = 0
+ phase_diff_sum = 0
+ mag_feedback_sum = 0
- for measurement_ignore in range(self.num_meas_to_skip):
- # Receive and ignore three floats on the socket
- sock.recv(12)
+ for measurement_ignore in range(self.num_meas_to_skip):
+ # Receive and ignore three floats on the socket
+ sock.recv(12)
- for measurement_ix in range(self.num_meas):
- # Receive three floats on the socket
- mag_gen, phase_diff, mag_feedback = struct.unpack(
- "fff",
- sock.recv(12))
+ for measurement_ix in range(self.num_meas):
+ # Receive three floats on the socket
+ mag_gen, phase_diff, mag_feedback = struct.unpack(
+ "fff",
+ sock.recv(12))
- mag_gen_sum += mag_gen
- phase_diff_sum += phase_diff
- mag_feedback_sum += mag_feedback
+ mag_gen_sum += mag_gen
+ phase_diff_sum += phase_diff
+ mag_feedback_sum += mag_feedback
- measurements.append((ampl, mag_gen, mag_feedback, phase_diff))
+ measurements.append((ampl, mag_gen, mag_feedback, phase_diff))
- mag_gen_avg = mag_gen_sum / self.num_meas
- mag_feedback_avg = mag_feedback_sum / self.num_meas
- phase_diff_avg = phase_diff_sum / self.num_meas
+ mag_gen_avg = mag_gen_sum / self.num_meas
+ mag_feedback_avg = mag_feedback_sum / self.num_meas
+ phase_diff_avg = phase_diff_sum / self.num_meas
- print("Ampl: {} Out: {:10} In: {:10} phase_diff: {:10}".format(
- ampl, mag_gen_avg, mag_feedback_avg, phase_diff_avg))
+ #Check asynchronous uhd messages for error
+ has_msg = self.tcpa.has_msg()
+ if not has_msg:
+ measurement_correct = True
+ print("Ampl: {} Out: {:10} In: {:10} phase_diff: {:10}".format(
+ ampl, mag_gen_avg, mag_feedback_avg, phase_diff_avg))
+ else:
+ print("Retry measurements")
+ self.tcpa.stop()
self.event_queue_.put("done")
self.event_queue_.put(measurements)
@@ -161,8 +178,9 @@ parser.add_argument('--decim',
required=False)
cli_args = parser.parse_args()
+tcpa = tcp_async.UhdAsyncMsg()
-rampgen = RampGenerator(cli_args)
+rampgen = RampGenerator(cli_args, tcpa)
rampgen.start()
# this blocks until the flowgraph is up and running, i.e. all sockets
diff --git a/dual_tone.grc b/dual_tone.grc
index 08ad85e..8025939 100644
--- a/dual_tone.grc
+++ b/dual_tone.grc
@@ -421,6 +421,57 @@
</param>
</block>
<block>
+ <key>blks2_tcp_sink</key>
+ <param>
+ <key>addr</key>
+ <value>127.0.0.1</value>
+ </param>
+ <param>
+ <key>alias</key>
+ <value></value>
+ </param>
+ <param>
+ <key>comment</key>
+ <value></value>
+ </param>
+ <param>
+ <key>affinity</key>
+ <value></value>
+ </param>
+ <param>
+ <key>_enabled</key>
+ <value>True</value>
+ </param>
+ <param>
+ <key>_coordinate</key>
+ <value>(1072, 550)</value>
+ </param>
+ <param>
+ <key>_rotation</key>
+ <value>0</value>
+ </param>
+ <param>
+ <key>id</key>
+ <value>blks2_tcp_sink_1</value>
+ </param>
+ <param>
+ <key>type</key>
+ <value>byte</value>
+ </param>
+ <param>
+ <key>server</key>
+ <value>True</value>
+ </param>
+ <param>
+ <key>port</key>
+ <value>47010</value>
+ </param>
+ <param>
+ <key>vlen</key>
+ <value>1</value>
+ </param>
+ </block>
+ <block>
<key>blocks_add_xx</key>
<param>
<key>alias</key>
@@ -656,6 +707,53 @@
</param>
</block>
<block>
+ <key>blocks_message_burst_source</key>
+ <param>
+ <key>alias</key>
+ <value></value>
+ </param>
+ <param>
+ <key>comment</key>
+ <value></value>
+ </param>
+ <param>
+ <key>affinity</key>
+ <value></value>
+ </param>
+ <param>
+ <key>_enabled</key>
+ <value>True</value>
+ </param>
+ <param>
+ <key>_coordinate</key>
+ <value>(720, 569)</value>
+ </param>
+ <param>
+ <key>_rotation</key>
+ <value>0</value>
+ </param>
+ <param>
+ <key>id</key>
+ <value>blocks_message_burst_source_0</value>
+ </param>
+ <param>
+ <key>maxoutbuf</key>
+ <value>0</value>
+ </param>
+ <param>
+ <key>minoutbuf</key>
+ <value>0</value>
+ </param>
+ <param>
+ <key>type</key>
+ <value>byte</value>
+ </param>
+ <param>
+ <key>vlen</key>
+ <value>1</value>
+ </param>
+ </block>
+ <block>
<key>blocks_moving_average_xx</key>
<param>
<key>alias</key>
@@ -1029,6 +1127,49 @@
</param>
</block>
<block>
+ <key>uhd_amsg_source</key>
+ <param>
+ <key>alias</key>
+ <value></value>
+ </param>
+ <param>
+ <key>comment</key>
+ <value></value>
+ </param>
+ <param>
+ <key>affinity</key>
+ <value></value>
+ </param>
+ <param>
+ <key>dev_addr</key>
+ <value></value>
+ </param>
+ <param>
+ <key>_enabled</key>
+ <value>True</value>
+ </param>
+ <param>
+ <key>_coordinate</key>
+ <value>(328, 569)</value>
+ </param>
+ <param>
+ <key>_rotation</key>
+ <value>0</value>
+ </param>
+ <param>
+ <key>id</key>
+ <value>uhd_amsg_source_0</value>
+ </param>
+ <param>
+ <key>maxoutbuf</key>
+ <value>0</value>
+ </param>
+ <param>
+ <key>minoutbuf</key>
+ <value>0</value>
+ </param>
+ </block>
+ <block>
<key>uhd_usrp_sink</key>
<param>
<key>alias</key>
@@ -2981,6 +3122,12 @@
<sink_key>0</sink_key>
</connection>
<connection>
+ <source_block_id>blocks_message_burst_source_0</source_block_id>
+ <sink_block_id>blks2_tcp_sink_1</sink_block_id>
+ <source_key>0</source_key>
+ <sink_key>0</sink_key>
+ </connection>
+ <connection>
<source_block_id>blocks_moving_average_xx_0</source_block_id>
<sink_block_id>fir_filter_xxx_0</sink_block_id>
<source_key>0</source_key>
@@ -3023,6 +3170,12 @@
<sink_key>2</sink_key>
</connection>
<connection>
+ <source_block_id>uhd_amsg_source_0</source_block_id>
+ <sink_block_id>blocks_message_burst_source_0</sink_block_id>
+ <source_key>msg</source_key>
+ <sink_key>msg</sink_key>
+ </connection>
+ <connection>
<source_block_id>uhd_usrp_source_0</source_block_id>
<sink_block_id>blocks_complex_to_mag_squared_0_0</sink_block_id>
<source_key>0</source_key>
diff --git a/tcp_async.py b/tcp_async.py
new file mode 100644
index 0000000..4697437
--- /dev/null
+++ b/tcp_async.py
@@ -0,0 +1,74 @@
+"""Tcp client for asynchronous uhd message tcp port"""
+
+import threading
+import Queue
+import time
+import socket
+
+class TcpAsyncClient(threading.Thread):
+ """Thead for message polling"""
+ queue = Queue.Queue()
+ q_quit = Queue.Queue()
+
+ ip_address = None
+ port = None
+ BUFFER_SIZE = 1
+
+ def run(self, ip_address = "127.0.0.1", port = 47010):
+ """connect and poll messages to queue"""
+
+ self.ip_address = ip_address
+ self.port = port
+
+
+ #Establish connection
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ while 1:
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect((self.ip_address, self.port))
+ break
+ except socket.error:
+ print("connecting to asynchronous uhd message tcp port " + str(self.port))
+ #traceback.print_exc()
+ sock.close()
+ time.sleep(0.5)
+
+ #Read messages
+ sock.settimeout(1)
+ while self.q_quit.empty():
+ try:
+ data = sock.recv(self.BUFFER_SIZE)
+ self.queue.put(data)
+ except socket.timeout:
+ pass
+
+ sock.close()
+
+ def stop(self):
+ """stop thread"""
+ print("stop tcp_async uhd message tcp thread")
+ self.q_quit.put("end")
+
+
+class UhdAsyncMsg(object):
+ """Creates a thread to connect to the asynchronous uhd messages tcp port"""
+ tcpa = TcpAsyncClient()
+
+ def __init__(self):
+ self.tcpa.start()
+
+ def stop(self):
+ """stop tcp thread"""
+ self.tcpa.stop()
+
+ def get_res(self):
+ """get received messages as string of integer"""
+ out = ""
+ while not self.tcpa.queue.empty():
+ out += str(ord(self.tcpa.queue.get()))
+ return out
+
+ def has_msg(self):
+ """Checks if one or more messages were received and empties the message queue"""
+ return self.get_res() != ""