From ffae3a0ce36edb00fe1e9b99b63fb155933f9b52 Mon Sep 17 00:00:00 2001 From: andreas128 Date: Tue, 22 Nov 2016 15:36:08 +0100 Subject: Add error check for under runs Add asynchronous message passing for uhd in order to read error messages. --- __init__.py | 0 amplitude_ramp.py | 64 +++++++++++++++-------- dual_tone.grc | 153 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ tcp_async.py | 74 ++++++++++++++++++++++++++ 4 files changed, 268 insertions(+), 23 deletions(-) create mode 100644 __init__.py create mode 100644 tcp_async.py diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/amplitude_ramp.py b/amplitude_ramp.py index 6d41d29..c757ef8 100755 --- a/amplitude_ramp.py +++ b/amplitude_ramp.py @@ -32,6 +32,7 @@ import struct import threading from Queue import Queue from dual_tone import dual_tone # our flowgraph! +import tcp_async # TCP ports used to communicate between the flowgraph and the python script # The flowgraph interleaves 3 float streams : @@ -47,7 +48,9 @@ def xrange(start, stop, step): x += step class RampGenerator(threading.Thread): - def __init__(self, options): + tcpa = None + + def __init__(self, options, tcpa): threading.Thread.__init__(self) self.event_queue_ = Queue() self.in_queue_ = Queue() @@ -58,6 +61,8 @@ class RampGenerator(threading.Thread): self.ampl_step = float(options.ampl_step) self.ampl_stop = float(options.ampl_stop) + self.tcpa = tcpa + def set_source_ampl(self, ampl): self.event_queue_.put(ampl) self.in_queue_.get() @@ -89,36 +94,48 @@ class RampGenerator(threading.Thread): measurements = [] for ampl in amplitudes: - self.set_source_ampl(ampl) + measurement_correct = False + max_iter = 10 + while measurement_correct == False and max_iter > 0: + max_iter -= 1 + + self.set_source_ampl(ampl) - mag_gen_sum = 0 - phase_diff_sum = 0 - mag_feedback_sum = 0 + mag_gen_sum = 0 + phase_diff_sum = 0 + mag_feedback_sum = 0 - for measurement_ignore in range(self.num_meas_to_skip): - # Receive and ignore three floats on the socket - sock.recv(12) + for measurement_ignore in range(self.num_meas_to_skip): + # Receive and ignore three floats on the socket + sock.recv(12) - for measurement_ix in range(self.num_meas): - # Receive three floats on the socket - mag_gen, phase_diff, mag_feedback = struct.unpack( - "fff", - sock.recv(12)) + for measurement_ix in range(self.num_meas): + # Receive three floats on the socket + mag_gen, phase_diff, mag_feedback = struct.unpack( + "fff", + sock.recv(12)) - mag_gen_sum += mag_gen - phase_diff_sum += phase_diff - mag_feedback_sum += mag_feedback + mag_gen_sum += mag_gen + phase_diff_sum += phase_diff + mag_feedback_sum += mag_feedback - measurements.append((ampl, mag_gen, mag_feedback, phase_diff)) + measurements.append((ampl, mag_gen, mag_feedback, phase_diff)) - mag_gen_avg = mag_gen_sum / self.num_meas - mag_feedback_avg = mag_feedback_sum / self.num_meas - phase_diff_avg = phase_diff_sum / self.num_meas + mag_gen_avg = mag_gen_sum / self.num_meas + mag_feedback_avg = mag_feedback_sum / self.num_meas + phase_diff_avg = phase_diff_sum / self.num_meas - print("Ampl: {} Out: {:10} In: {:10} phase_diff: {:10}".format( - ampl, mag_gen_avg, mag_feedback_avg, phase_diff_avg)) + #Check asynchronous uhd messages for error + has_msg = self.tcpa.has_msg() + if not has_msg: + measurement_correct = True + print("Ampl: {} Out: {:10} In: {:10} phase_diff: {:10}".format( + ampl, mag_gen_avg, mag_feedback_avg, phase_diff_avg)) + else: + print("Retry measurements") + self.tcpa.stop() self.event_queue_.put("done") self.event_queue_.put(measurements) @@ -161,8 +178,9 @@ parser.add_argument('--decim', required=False) cli_args = parser.parse_args() +tcpa = tcp_async.UhdAsyncMsg() -rampgen = RampGenerator(cli_args) +rampgen = RampGenerator(cli_args, tcpa) rampgen.start() # this blocks until the flowgraph is up and running, i.e. all sockets diff --git a/dual_tone.grc b/dual_tone.grc index 08ad85e..8025939 100644 --- a/dual_tone.grc +++ b/dual_tone.grc @@ -420,6 +420,57 @@ 1 + + blks2_tcp_sink + + addr + 127.0.0.1 + + + alias + + + + comment + + + + affinity + + + + _enabled + True + + + _coordinate + (1072, 550) + + + _rotation + 0 + + + id + blks2_tcp_sink_1 + + + type + byte + + + server + True + + + port + 47010 + + + vlen + 1 + + blocks_add_xx @@ -655,6 +706,53 @@ 1 + + blocks_message_burst_source + + alias + + + + comment + + + + affinity + + + + _enabled + True + + + _coordinate + (720, 569) + + + _rotation + 0 + + + id + blocks_message_burst_source_0 + + + maxoutbuf + 0 + + + minoutbuf + 0 + + + type + byte + + + vlen + 1 + + blocks_moving_average_xx @@ -1028,6 +1126,49 @@ fff + + uhd_amsg_source + + alias + + + + comment + + + + affinity + + + + dev_addr + + + + _enabled + True + + + _coordinate + (328, 569) + + + _rotation + 0 + + + id + uhd_amsg_source_0 + + + maxoutbuf + 0 + + + minoutbuf + 0 + + uhd_usrp_sink @@ -2980,6 +3121,12 @@ 0 0 + + blocks_message_burst_source_0 + blks2_tcp_sink_1 + 0 + 0 + blocks_moving_average_xx_0 fir_filter_xxx_0 @@ -3022,6 +3169,12 @@ 0 2 + + uhd_amsg_source_0 + blocks_message_burst_source_0 + msg + msg + uhd_usrp_source_0 blocks_complex_to_mag_squared_0_0 diff --git a/tcp_async.py b/tcp_async.py new file mode 100644 index 0000000..4697437 --- /dev/null +++ b/tcp_async.py @@ -0,0 +1,74 @@ +"""Tcp client for asynchronous uhd message tcp port""" + +import threading +import Queue +import time +import socket + +class TcpAsyncClient(threading.Thread): + """Thead for message polling""" + queue = Queue.Queue() + q_quit = Queue.Queue() + + ip_address = None + port = None + BUFFER_SIZE = 1 + + def run(self, ip_address = "127.0.0.1", port = 47010): + """connect and poll messages to queue""" + + self.ip_address = ip_address + self.port = port + + + #Establish connection + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + while 1: + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((self.ip_address, self.port)) + break + except socket.error: + print("connecting to asynchronous uhd message tcp port " + str(self.port)) + #traceback.print_exc() + sock.close() + time.sleep(0.5) + + #Read messages + sock.settimeout(1) + while self.q_quit.empty(): + try: + data = sock.recv(self.BUFFER_SIZE) + self.queue.put(data) + except socket.timeout: + pass + + sock.close() + + def stop(self): + """stop thread""" + print("stop tcp_async uhd message tcp thread") + self.q_quit.put("end") + + +class UhdAsyncMsg(object): + """Creates a thread to connect to the asynchronous uhd messages tcp port""" + tcpa = TcpAsyncClient() + + def __init__(self): + self.tcpa.start() + + def stop(self): + """stop tcp thread""" + self.tcpa.stop() + + def get_res(self): + """get received messages as string of integer""" + out = "" + while not self.tcpa.queue.empty(): + out += str(ord(self.tcpa.queue.get())) + return out + + def has_msg(self): + """Checks if one or more messages were received and empties the message queue""" + return self.get_res() != "" -- cgit v1.2.3