aboutsummaryrefslogtreecommitdiffstats
path: root/host/utils/b2xx_side_channel.py
diff options
context:
space:
mode:
authorAshish Chaudhari <ashish@ettus.com>2015-08-10 23:14:20 -0700
committerAshish Chaudhari <ashish@ettus.com>2015-08-10 23:14:20 -0700
commitb5c81677078f56b3e671ebeaca1e3b803c2f4ef9 (patch)
treea1b17b4be203331de7e146e94051f26be5a20102 /host/utils/b2xx_side_channel.py
parent16e149fe6fcc1bc18adea3eeeefad2c7ee93b2e0 (diff)
parent28327c8e8a810b19da126116d0dc4c26b643baed (diff)
downloaduhd-b5c81677078f56b3e671ebeaca1e3b803c2f4ef9.tar.gz
uhd-b5c81677078f56b3e671ebeaca1e3b803c2f4ef9.tar.bz2
uhd-b5c81677078f56b3e671ebeaca1e3b803c2f4ef9.zip
Merge branch 'master' into ashish/register_api
Diffstat (limited to 'host/utils/b2xx_side_channel.py')
-rwxr-xr-xhost/utils/b2xx_side_channel.py272
1 files changed, 223 insertions, 49 deletions
diff --git a/host/utils/b2xx_side_channel.py b/host/utils/b2xx_side_channel.py
index 7b1e980eb..070f5684c 100755
--- a/host/utils/b2xx_side_channel.py
+++ b/host/utils/b2xx_side_channel.py
@@ -16,25 +16,27 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+"""
+Tool to read log buffers from the FX3. Use to debug USB connection issues.
+Requires PyUSB 1.0.
+"""
import sys
import time
import struct
-
from optparse import OptionParser
+import serial
try:
import usb.core
import usb.util
except Exception as e:
- print "Failed to import module 'usb'."
- print "Please make sure you have PyUSB installed and in your PYTHONPATH."
- print "PyUSB PyPI website: https://pypi.python.org/pypi/pyusb"
- print "To install, download from the website or use 'pip install pysusb'"
+ print("Failed to import module 'usb'.")
+ print("Please make sure you have PyUSB installed and in your PYTHONPATH.")
+ print("PyUSB PyPI website: https://pypi.python.org/pypi/pyusb")
+ print("To install, download from the website or use 'pip install pyusb'")
exit(1)
-import serial
-
VRT_OUT = usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_OUT
VRT_IN = usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_IN
@@ -62,7 +64,7 @@ VRQS[B200_VREQ_FLUSH_DATA_EPS] = 'B200_VREQ_FLUSH_DATA_EPS'
B200_VREQ_AD9361_LOOPBACK = 0x92
VRQS[B200_VREQ_AD9361_LOOPBACK] = 'B200_VREQ_AD9361_LOOPBACK'
-COUNTER_MAGIC = 0x10024001
+COUNTER_MAGIC = 0x10024001
"""
typedef struct Counters {
int magic;
@@ -162,6 +164,10 @@ USB_PHY_ERROR_REGISTERS = [
USB_ERROR_COUNTERS += USB_PHY_ERROR_REGISTERS
+PIB_COUNTERS = [
+ 'socket_inactive'
+]
+
COUNTERS = [
'magic',
@@ -177,7 +183,13 @@ COUNTERS = [
'heap_size',
- 'resume_count'
+ 'resume_count',
+ 'state_transition_count',
+ 'invalid_gpif_state',
+ ('thread_0', PIB_COUNTERS),
+ ('thread_1', PIB_COUNTERS),
+ ('thread_2', PIB_COUNTERS),
+ ('thread_3', PIB_COUNTERS),
]
USB_EVENTS = {}
@@ -243,6 +255,101 @@ LTSSM_STATES[0x1c] = ['111XX', "(none)"]
LTSSM_STATES[0x1f] = ['111XX', "(none)"]
LTSSM_STATES[0x2c] = ['101100', "Cypress/Intel workaround"]
+CF_NONE = 0
+CF_TX_SWING = 1 << 0
+CF_TX_DEEMPHASIS = 1 << 1
+CF_DISABLE_USB2 = 1 << 2
+CF_ENABLE_AS_SUPERSPEED = 1 << 3
+CF_PPORT_DRIVE_STRENGTH = 1 << 4
+CF_DMA_BUFFER_SIZE = 1 << 5
+CF_DMA_BUFFER_COUNT = 1 << 6
+CF_MANUAL_DMA = 1 << 7
+CF_SB_BAUD_DIV = 1 << 8
+
+CF_RE_ENUM = 1 << 31
+
+"""
+typedef struct Config {
+ int tx_swing; // [90] [65] 45
+ int tx_deemphasis; // 0x11
+ int disable_usb2; // 0
+ int enable_as_superspeed; // 1
+ int pport_drive_strength; // CY_U3P_DS_THREE_QUARTER_STRENGTH
+ int dma_buffer_size; // [USB3] (max)
+ int dma_buffer_count; // [USB3] 1
+ int manual_dma; // 0
+ int sb_baud_div; // 434*2
+} CONFIG, *PCONFIG;
+
+typedef struct ConfigMod {
+ int flags;
+ CONFIG config;
+} CONFIG_MOD, *PCONFIG_MOD;
+"""
+
+class Config():
+ def __init__(self,
+ tx_swing=None, tx_deemphasis=None, disable_usb2=None, enable_as_superspeed=None,
+ pport_drive_strength=None,
+ dma_buffer_size=None, dma_buffer_count=None, manual_dma=None,
+ sb_baud_div=None,
+ raw=None):
+ self.tx_swing = tx_swing
+ self.tx_deemphasis = tx_deemphasis
+ self.disable_usb2 = disable_usb2
+ self.enable_as_superspeed = enable_as_superspeed
+ self.pport_drive_strength = pport_drive_strength
+ self.dma_buffer_size = dma_buffer_size
+ self.dma_buffer_count = dma_buffer_count
+ self.manual_dma = manual_dma
+ self.sb_baud_div = sb_baud_div
+ self._count = 9
+
+ if raw:
+ (self.tx_swing,
+ self.tx_deemphasis,
+ self.disable_usb2,
+ self.enable_as_superspeed,
+ self.pport_drive_strength,
+ self.dma_buffer_size,
+ self.dma_buffer_count,
+ self.manual_dma,
+ self.sb_baud_div) = struct.unpack("i"*self._count, raw)
+ def pack(self):
+ return struct.pack("i"*self._count,
+ self.tx_swing,
+ self.tx_deemphasis,
+ self.disable_usb2,
+ self.enable_as_superspeed,
+ self.pport_drive_strength,
+ self.dma_buffer_size,
+ self.dma_buffer_count,
+ self.manual_dma,
+ self.sb_baud_div)
+ def __str__(self):
+ return self.to_string()
+ def to_string(self, flags=-1):
+ s = ""
+ if flags & CF_TX_SWING:
+ s += "tx_swing = %s\n" % (self.tx_swing)
+ if flags & CF_TX_DEEMPHASIS:
+ s += "tx_deemphasis = %s\n" % (self.tx_deemphasis)
+ if flags & CF_DISABLE_USB2:
+ s += "disable_usb2 = %s\n" % (self.disable_usb2)
+ if flags & CF_ENABLE_AS_SUPERSPEED:
+ s += "enable_as_superspeed = %s\n" % (self.enable_as_superspeed)
+ if flags & CF_PPORT_DRIVE_STRENGTH:
+ s += "pport_drive_strength = %s\n" % (self.pport_drive_strength)
+ if flags & CF_DMA_BUFFER_SIZE:
+ s += "dma_buffer_size = %s\n" % (self.dma_buffer_size)
+ if flags & CF_DMA_BUFFER_COUNT:
+ s += "dma_buffer_count = %s\n" % (self.dma_buffer_count)
+ if flags & CF_MANUAL_DMA:
+ s += "manual_dma = %s\n" % (self.manual_dma)
+ if flags & CF_SB_BAUD_DIV:
+ s += "sb_baud_div = %s\n" % (self.sb_baud_div)
+ return s
+
def _parse_usb_event_log(data):
l = []
for d in data:
@@ -252,11 +359,11 @@ def _parse_usb_event_log(data):
#l += [(USB_EVENTS[0x80][0] + "+%i" % (d & ~0x80), USB_EVENTS[0x80][1])]
ltssm_key = (d & ~0x80)
ltssm_val = "(unknown)"
- if LTSSM_STATES.has_key(ltssm_key):
+ if ltssm_key in LTSSM_STATES:
ltssm_val = LTSSM_STATES[ltssm_key][1]
ltssm_val = "LTSSM: " + ltssm_val
l += [(USB_EVENTS[0x80][0] + "+%i" % (d & ~0x80), ltssm_val)]
- elif USB_EVENTS.has_key(d):
+ elif d in USB_EVENTS:
l += [USB_EVENTS[d]]
#else:
# l += [("?", "?")]
@@ -313,8 +420,8 @@ class counter_set():
try:
vals = struct.unpack(self._fmt_str, data)
self._update(vals)
- except Exception, e:
- print e
+ except Exception as e:
+ print(("While updating counter set '%s':" % self._name), e)
def __str__(self):
return self.to_string()
@@ -346,7 +453,7 @@ class usb_device():
self.timeout = 2000
def open(self, idVendor, idProduct):
- print "Finding %04x:%04x..." % (idVendor, idProduct)
+ print("Finding %04x:%04x..." % (idVendor, idProduct))
self.dev = usb.core.find(idVendor=idVendor, idProduct=idProduct)
if self.dev is None:
raise ValueError('Device not found: %04x:%04x' % (idVendor, idProduct))
@@ -362,7 +469,7 @@ class usb_device():
#self.dev.set_configuration() # This will throw as device is already claimed
- print "Opened %04x:%04x" % (idVendor, idProduct)
+ print("Opened %04x:%04x" % (idVendor, idProduct))
#self.dev.ctrl_transfer(0x21, 0x09, 0, 0, [0x02,0x02,0x00,0x00,0x00,0x00,0x00,0x00] )
#self.dev.ctrl_transfer(bmRequestType, bRequest, wValue=0, wIndex=0, data_or_wLength = None, timeout = None
@@ -379,40 +486,41 @@ class usb_device():
l = []
try:
l = self.dev.ctrl_transfer(VRT_IN, B200_VREQ_GET_USB_SPEED, 0, 0, 1)
- except usb.core.USBError, e:
+ except usb.core.USBError as e:
if e.errno == 32:
- print e
+ print(e)
+ print("Is the firmware loaded?")
sys.exit(0)
if len(l) > 0:
self.usb_speed = l[0]
- print "Operating at USB", self.usb_speed
+ print("Operating at USB", self.usb_speed)
break
else:
- print "Retrying..."
+ print("Retrying...")
#if self.usb_speed == 3:
# self.max_buffer_size = 512
- print "Max buffer size:", self.max_buffer_size
- print
+ print("Max buffer size:", self.max_buffer_size)
+ print()
def _handle_error(self, e, vrt):
if e.errno == 19: # No such device
raise e
vrt_str = "0x%02x" % (vrt)
- if VRQS.has_key(vrt):
+ if vrt in VRQS:
vrt_str += " (%s)" % (VRQS[vrt])
- print "%s: %s" % (vrt_str, str(e))
+ print("%s: %s" % (vrt_str, str(e)))
def vrt_get(self, vrt):
try:
return self.dev.ctrl_transfer(VRT_IN, vrt, 0, 0, self.max_buffer_size, self.timeout)
- except usb.core.USBError, e:
+ except usb.core.USBError as e:
self._handle_error(e, vrt)
return []
def vrt_set(self, vrt, data=""):
try:
return self.dev.ctrl_transfer(VRT_OUT, vrt, 0, 0, data, self.timeout)
- except usb.core.USBError, e:
+ except usb.core.USBError as e:
self._handle_error(e, vrt)
return None
@@ -428,7 +536,11 @@ class usb_device():
last = 0
while raw[last] != 0:
try:
- idx = raw.index(0, last)
+ try:
+ idx = raw.index(0, last)
+ except ValueError as e:
+ print("No null termination in log buffer (length: %d, last null: %d)" % (len(raw), last))
+ break
self.log_index += 1
line = "".join(map(chr, raw[last:idx]))
#print "[%05i %05i] %s" % (self.log_index, self.log_read_count, line)
@@ -439,8 +551,8 @@ class usb_device():
last = idx + 1
if last >= len(raw):
break
- except Exception, e:
- print e
+ except Exception as e:
+ print("Exception while parsing log buffer:", e)
break
return lines
@@ -450,8 +562,8 @@ class usb_device():
return
for l in lines:
#print l
- print "[%05i %05i] %s" % (l[0], self.log_read_count, l[1])
- print
+ print("[%05i %05i] %s" % (l[0], self.log_read_count, l[1]))
+ print()
def get_counters(self):
data = self.vrt_get(B200_VREQ_GET_COUNTERS)
@@ -462,8 +574,8 @@ class usb_device():
def print_counters(self):
self.get_counters()
- print "[%05i]" % (self.counters_read_count)
- print self.counters
+ print("[%05i]" % (self.counters_read_count))
+ print(self.counters)
def get_usb_event_log(self):
data = self.vrt_get(B200_VREQ_GET_USB_EVENT_LOG)
@@ -480,8 +592,8 @@ class usb_device():
l = self.get_usb_event_log()
if len(l) == 0:
return
- print "\n".join(map(lambda x: ("[%05i] " % (self.usb_event_log_read_count)) + x[0] + ":\t" + x[1], l))
- print
+ print("\n".join([("[%05i] " % (self.usb_event_log_read_count)) + x[0] + ":\t" + x[1] for x in l]))
+ print()
def run_log(dev, options):
items = [
@@ -489,8 +601,8 @@ def run_log(dev, options):
(options.counters, dev.print_counters),
(options.usb_events, dev.print_usb_event_log)
]
- items = filter(lambda x: x[0] > 0, items)
- smallest_interval = min(map(lambda x: x[0], items))
+ items = [x for x in items if x[0] > 0]
+ smallest_interval = min([x[0] for x in items])
time_now = time.time()
last = [time_now]*len(items)
@@ -506,7 +618,7 @@ def run_log(dev, options):
if time_now < (time_last + items[i][0]):
continue
if options.clear_screen and not cleared:
- print chr(27) + "[2J"
+ print(chr(27) + "[2J")
cleared = True
#print items[i][1]
items[i][1]()
@@ -535,9 +647,9 @@ def recv_serial_data(ser):
data = data[0:-2]
if data == "":
#print "[Received an empty line]"
- print
+ print()
elif data[0] == ' ':
- print time_now_str, data[1:]
+ print(time_now_str, data[1:])
elif data[0] == 'U':
data = data[1:]
cur_type = 0
@@ -553,20 +665,20 @@ def recv_serial_data(ser):
i = ord(c) - ord('A')
cur_type = 2
else:
- print time_now_str, "[Unknown type: '%s' (0x%02x) in '%s']" % (c, ord(c), data)
+ print(time_now_str, "[Unknown type: '%s' (0x%02x) in '%s']" % (c, ord(c), data))
elif cur_type == 1:
i = ord(c) - ord('a')
if (i < 0) or (i >= len(USB_PHY_ERROR_REGISTERS)):
- print time_now_str, "[Unknown PHY error register index: '%s' (0x%02x) (%d) in '%s']" % (c, ord(c), i, data)
+ print(time_now_str, "[Unknown PHY error register index: '%s' (0x%02x) (%d) in '%s']" % (c, ord(c), i, data))
else:
- print time_now_str, USB_PHY_ERROR_REGISTERS[i]
+ print(time_now_str, USB_PHY_ERROR_REGISTERS[i])
cur_type = 0
elif cur_type == 2:
i2 = ord(c) - ord('A')
if (c < 'A') or (c > 'P'):
- print time_now_str, "[Unknown second USB event part: '%s' (0x%02x) (%d) in '%s']" % (c, ord(c), i2, data)
+ print(time_now_str, "[Unknown second USB event part: '%s' (0x%02x) (%d) in '%s']" % (c, ord(c), i2, data))
else:
i = (i << 4) | i2
usb_events += [i]
@@ -578,11 +690,14 @@ def recv_serial_data(ser):
if len(usb_events) > 0:
usb_event_log_read_count += 1
l = _parse_usb_event_log(usb_events)
- print "\n".join(map(lambda x: time_now_str + ("[%05i] " % (usb_event_log_read_count)) + x[0] + ":\t" + x[1], l))
+ print("\n".join([time_now_str + ("[%05i] " % (usb_event_log_read_count)) + x[0] + ":\t" + x[1] for x in l]))
+ #print
+
data = ""
def main():
parser = OptionParser(usage="%prog: [options]") #option_class=eng_option,
+
parser.add_option("-v", "--vid", type="string", default="0x2500", help="VID [default=%default]")
parser.add_option("-p", "--pid", type="string", default="0x0020", help="PID [default=%default]")
parser.add_option("-t", "--tty", type="string", default=None, help="TTY [default=%default]")
@@ -597,26 +712,34 @@ def main():
parser.add_option("-R", "--reset-counters", action="store_true", default=False, help="Reset counters [default=%default]")
parser.add_option("-f", "--flush-data-eps", action="store_true", default=False, help="Flush data endpoints [default=%default]")
parser.add_option("-L", "--fe-loopback", type="int", default=None, help="Change AD9361 digital loopback [default=%default]")
+
(options, args) = parser.parse_args()
if options.tty is not None and options.tty != "":
while True:
try:
ser = serial.Serial(port=options.tty, baudrate=115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=None) # timeout: None (blocking), 0 (non-blocking)
- print "Opened", options.tty
+ print("Opened", options.tty)
+
try:
recv_serial_data(ser)
except KeyboardInterrupt:
break
except Exception as e:
- print e
+ print("Unable to open serial port:", e)
break
else:
dev = usb_device()
+
while True:
try:
dev.open(idVendor=hex_to_int(options.vid), idProduct=hex_to_int(options.pid))
+ raw_config = dev.vrt_get(B200_VREQ_GET_CONFIG)
+ current_config = Config(raw=raw_config)
+ print("Current config:")
+ print(current_config)
+
if options.flush_data_eps:
dev.vrt_set(B200_VREQ_FLUSH_DATA_EPS)
if options.sb_baud_div is not None:
@@ -627,17 +750,68 @@ def main():
dev.vrt_set(B200_VREQ_CLEAR_COUNTERS)
if options.fe_loopback is not None:
dev.vrt_set(B200_VREQ_AD9361_LOOPBACK, struct.pack('B', int(options.fe_loopback)))
+ options.cmd = options.cmd.strip()
if len(options.cmd) == 0:
run_log(dev, options)
- pass
else:
- pass
+ cmds = options.cmd.split(',')
+ flags = 0
+ for cmd in cmds:
+ cmd = cmd.strip()
+ if len(cmd) == 0:
+ continue
+ parts = cmd.split(' ')
+ action = parts[0].lower()
+ try:
+ if action == "txswing":
+ current_config.tx_swing = int(parts[1])
+ flags |= CF_TX_SWING
+ elif action == "txdeemph":
+ current_config.tx_deemphasis = int(parts[1])
+ flags |= CF_TX_DEEMPHASIS
+ elif action == "ss":
+ current_config.enable_as_superspeed = int(parts[1])
+ flags |= CF_ENABLE_AS_SUPERSPEED
+ elif action == "disableusb2":
+ current_config.disable_usb2 = int(parts[1])
+ flags |= CF_DISABLE_USB2
+ elif action == "pportdrive":
+ current_config.pport_drive_strength = int(parts[1])
+ flags |= CF_PPORT_DRIVE_STRENGTH
+ elif action == "dmasize":
+ current_config.dma_buffer_size = int(parts[1])
+ flags |= CF_DMA_BUFFER_SIZE
+ elif action == "dmacount":
+ current_config.dma_buffer_count = int(parts[1])
+ flags |= CF_DMA_BUFFER_COUNT
+ elif action == "manualdma":
+ current_config.manual_dma = int(parts[1])
+ flags |= CF_MANUAL_DMA
+ elif action == "sbbauddiv":
+ current_config.sb_baud_div = int(parts[1])
+ flags |= CF_SB_BAUD_DIV
+ elif action == "reenum":
+ flags |= CF_RE_ENUM
+ else:
+ print("'%s' not implemented" % (action))
+ except Exception as e:
+ print("Exception while handling action '%s'" % (action), e)
+ if flags != 0:
+ print("New config to be set:")
+ print(current_config.to_string(flags))
+ #print current_config
+ #print "Update flags: 0x%x" % (flags)
+ new_config = struct.pack("I", flags) + current_config.pack()
+ dev.vrt_set(B200_VREQ_SET_CONFIG, new_config)
+ else:
+ print("Not updating config")
break
except usb.core.USBError as e:
if e.errno == 19: # No such device
pass
- print e
+ print(e)
break
+
return 0
if __name__ == '__main__':