From 90b88a27d27c52c337941a99773dfeb70e9c5917 Mon Sep 17 00:00:00 2001
From: Martin Braun <martin.braun@ettus.com>
Date: Wed, 7 Oct 2015 15:06:42 -0700
Subject: tests: Added first batch of device tests

- Currently supported: B2xx, X3x0
- Runs some simple examples
---
 host/tests/devtest/CMakeLists.txt             |  40 +++++
 host/tests/devtest/README.md                  |  28 ++++
 host/tests/devtest/benchmark_rate_test.py     |  75 +++++++++
 host/tests/devtest/devtest_b2xx.py            |  76 +++++++++
 host/tests/devtest/devtest_x3x0.py            |  57 +++++++
 host/tests/devtest/gpio_test.py               |  47 ++++++
 host/tests/devtest/run_testsuite.py           | 132 +++++++++++++++
 host/tests/devtest/rx_samples_to_file_test.py |  67 ++++++++
 host/tests/devtest/test_messages_test.py      |  57 +++++++
 host/tests/devtest/test_pps_test.py           |  51 ++++++
 host/tests/devtest/tx_bursts_test.py          |  63 ++++++++
 host/tests/devtest/uhd_test_base.py           | 222 ++++++++++++++++++++++++++
 host/tests/devtest/usrp_probe.py              |  50 ++++++
 13 files changed, 965 insertions(+)
 create mode 100644 host/tests/devtest/CMakeLists.txt
 create mode 100644 host/tests/devtest/README.md
 create mode 100755 host/tests/devtest/benchmark_rate_test.py
 create mode 100755 host/tests/devtest/devtest_b2xx.py
 create mode 100755 host/tests/devtest/devtest_x3x0.py
 create mode 100644 host/tests/devtest/gpio_test.py
 create mode 100755 host/tests/devtest/run_testsuite.py
 create mode 100755 host/tests/devtest/rx_samples_to_file_test.py
 create mode 100644 host/tests/devtest/test_messages_test.py
 create mode 100644 host/tests/devtest/test_pps_test.py
 create mode 100755 host/tests/devtest/tx_bursts_test.py
 create mode 100755 host/tests/devtest/uhd_test_base.py
 create mode 100644 host/tests/devtest/usrp_probe.py

(limited to 'host/tests/devtest')

diff --git a/host/tests/devtest/CMakeLists.txt b/host/tests/devtest/CMakeLists.txt
new file mode 100644
index 000000000..c770446a6
--- /dev/null
+++ b/host/tests/devtest/CMakeLists.txt
@@ -0,0 +1,40 @@
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+# Formatting
+MESSAGE(STATUS "")
+
+#
+# Arguments:
+# - pattern: This will be used to identify which devtest_*.py is to be executed.
+# - filter: Will be used in args strings as "type=<filter>".
+# - devtype: A descriptive string. Is only used for CMake output.
+MACRO(ADD_DEVTEST pattern filter devtype)
+    MESSAGE(STATUS "Adding ${devtype} device test target")
+    ADD_CUSTOM_TARGET("test_${pattern}"
+        ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/run_testsuite.py
+        "--src-dir" "@CMAKE_CURRENT_SOURCE_DIR@"
+        "--devtest-pattern" "${pattern}"
+        "--device-filter" "${filter}"
+        "--build-type" "${CMAKE_BUILD_TYPE}"
+        "--build-dir" "${CMAKE_BINARY_DIR}"
+        COMMENT "Running device test on all connected ${devtype} devices:"
+        WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}"
+    )
+ENDMACRO(ADD_DEVTEST)
+
+IF(ENABLE_B200)
+    ADD_DEVTEST("b2xx" "b200" "B2XX")
+ENDIF(ENABLE_B200)
+IF(ENABLE_X300)
+    ADD_DEVTEST("x3x0" "x300" "X3x0")
+ENDIF(ENABLE_X300)
+
+# Formatting
+MESSAGE(STATUS "")
diff --git a/host/tests/devtest/README.md b/host/tests/devtest/README.md
new file mode 100644
index 000000000..ee1ff3c9f
--- /dev/null
+++ b/host/tests/devtest/README.md
@@ -0,0 +1,28 @@
+# Device Tests
+
+These are a set of tests to be run with one or more attached devices.
+None of these tests require special configuration; e.g., the X3x0 test
+will work regardless of attached daughterboards, FPGIO wiring etc.
+
+## Adding new tests
+
+To add new tests, add new files with classes that derive from unittest.TestCase.
+Most of the time, you'll want to derive from `uhd_test_case` or
+`uhd_example_test_case`.
+
+## Adding new devices
+
+To add new devices, follow these steps:
+
+1) Add an entry to the CMakeLists.txt file in this directory using the
+   `ADD_DEVTEST()` macro.
+2) Add a `devtest_pattern.py` file to this directory, where `pattern` is
+   the same pattern used in the `ADD_DEVTEST()` macro.
+3) Edit this devtest file to import all the tests you want to run. Some
+   may require parameterization.
+
+The devtest file is 'executed' using Python's unittest module, so it doesn't
+require any actual commands. If the device needs special initialization,
+commands inside this file will be executed *if* they are *not* in a
+`if __name__ == "__main__"` conditional.
+
diff --git a/host/tests/devtest/benchmark_rate_test.py b/host/tests/devtest/benchmark_rate_test.py
new file mode 100755
index 000000000..2602e1771
--- /dev/null
+++ b/host/tests/devtest/benchmark_rate_test.py
@@ -0,0 +1,75 @@
+#!/usr/bin/env python
+#
+# Copyright 2015 Ettus Research LLC
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+""" Test using benchmark_rate. """
+
+import re
+from uhd_test_base import shell_application, uhd_example_test_case
+
+class uhd_benchmark_rate_test(uhd_example_test_case):
+    """
+    Run benchmark_rate in various configurations.
+    """
+    tests = {}
+
+    def setup_example(self):
+        """
+        Set args.
+        """
+        self.test_params = uhd_benchmark_rate_test.tests
+
+    def run_test(self, test_name, test_args):
+        """
+        Runs benchmark_rate with the given parameters. Parses output and writes
+        results to file.
+        """
+        self.log.info('Running test {n}, Channel = {c}, Sample Rate = {r}'.format(
+            n=test_name, c=test_args.get('chan', '0'), r=test_args.get('rate', 1e6),
+        ))
+        args = [
+            self.create_addr_args_str(),
+            '--duration', str(test_args.get('duration', 1)),
+            '--channels', str(test_args.get('chan', '0')),
+        ]
+        if 'tx' in test_args['direction']:
+            args.append('--tx_rate')
+            args.append(str(test_args.get('rate', 1e6)))
+        if 'rx' in test_args['direction']:
+            args.append('--rx_rate')
+            args.append(str(test_args.get('rate')))
+        (app, run_results) = self.run_example('benchmark_rate', args)
+        match = re.search(r'(Num received samples):\s*(.*)', app.stdout)
+        run_results['num_rx_samples'] = int(match.group(2)) if match else -1
+        match = re.search(r'(Num dropped samples):\s*(.*)', app.stdout)
+        run_results['num_rx_dropped'] = int(match.group(2)) if match else -1
+        match = re.search(r'(Num overflows detected):\s*(.*)', app.stdout)
+        run_results['num_rx_overruns'] = int(match.group(2)) if match else -1
+        match = re.search(r'(Num transmitted samples):\s*(.*)', app.stdout)
+        run_results['num_tx_samples'] = int(match.group(2)) if match else -1
+        match = re.search(r'(Num sequence errors):\s*(.*)', app.stdout)
+        run_results['num_tx_seqerrs'] = int(match.group(2)) if match else -1
+        match = re.search(r'(Num underflows detected):\s*(.*)', app.stdout)
+        run_results['num_tx_underruns'] = int(match.group(2)) if match else -1
+        run_results['passed'] = all([
+            run_results['return_code'] == 0,
+            run_results['num_rx_dropped'] == 0,
+            run_results['num_tx_seqerrs'] == 0,
+            run_results['num_tx_underruns'] <= test_args.get('acceptable-underruns', 0),
+        ])
+        self.report_example_results(test_name, run_results)
+        return run_results
+
diff --git a/host/tests/devtest/devtest_b2xx.py b/host/tests/devtest/devtest_b2xx.py
new file mode 100755
index 000000000..cd777aa18
--- /dev/null
+++ b/host/tests/devtest/devtest_b2xx.py
@@ -0,0 +1,76 @@
+#
+# Copyright 2015 Ettus Research LLC
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+"""
+Run device tests for the B2xx series.
+"""
+
+from benchmark_rate_test import uhd_benchmark_rate_test
+uhd_benchmark_rate_test.tests = {
+    #'mimo': {
+        #'duration': 1,
+        #'directions': ['tx,rx',],
+        #'channels': ['0,1',],
+        #'sample-rates': [1e6, 30e6],
+        #'products': ['B210',],
+        #'acceptable-underruns': 500,
+    #},
+    'siso_chan0_slow': {
+        'duration': 1,
+        'direction': 'tx,rx',
+        'chan': '0',
+        'rate': 1e6,
+        'acceptable-underruns': 50,
+    },
+    'siso_chan0_fast': {
+        'duration': 1,
+        'direction': 'tx,rx',
+        'chan': '0',
+        'rate': 40e6,
+        'acceptable-underruns': 500,
+    },
+    'siso_chan1_slow': {
+        'duration': 1,
+        'direction': 'tx,rx',
+        'chan': '1',
+        'rate': 1e6,
+        'acceptable-underruns': 50,
+        'products': ['B210',],
+    },
+    'siso_chan1_fast': {
+        'duration': 1,
+        'direction': 'tx,rx',
+        'chan': '1',
+        'rate': 40e6,
+        'acceptable-underruns': 500,
+        'products': ['B210',],
+    },
+}
+
+from rx_samples_to_file_test import rx_samples_to_file_test
+rx_samples_to_file_test.tests = {
+    'default': {
+        'duration': 1,
+        'subdev': 'A:A',
+        'rate': 5e6,
+        'products': ['B210', 'B200',],
+    },
+}
+
+from tx_bursts_test import uhd_tx_bursts_test
+from test_pps_test import uhd_test_pps_test
+from gpio_test import gpio_test
+
diff --git a/host/tests/devtest/devtest_x3x0.py b/host/tests/devtest/devtest_x3x0.py
new file mode 100755
index 000000000..7ad6b21b6
--- /dev/null
+++ b/host/tests/devtest/devtest_x3x0.py
@@ -0,0 +1,57 @@
+#
+# Copyright 2015 Ettus Research LLC
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+"""
+Run device tests for the X3x0 series.
+"""
+
+from benchmark_rate_test import uhd_benchmark_rate_test
+uhd_benchmark_rate_test.tests = {
+    'mimo_slow': {
+        'duration': 1,
+        'direction': 'tx,rx',
+        'chan': '0,1',
+        'rate': 1e6,
+        'acceptable-underruns': 500,
+    },
+    'mimo_fast': {
+        'duration': 1,
+        'direction': 'tx,rx',
+        'chan': '0,1',
+        'rate': 12.5e6,
+        'acceptable-underruns': 500,
+    },
+    'siso_chan0_slow': {
+        'duration': 1,
+        'direction': 'tx,rx',
+        'chan': '0',
+        'rate': 1e6,
+        'acceptable-underruns': 0,
+    },
+    'siso_chan1_slow': {
+        'duration': 1,
+        'direction': 'tx,rx',
+        'chan': '1',
+        'rate': 1e6,
+        'acceptable-underruns': 0,
+    },
+}
+
+#from rx_samples_to_file_test import rx_samples_to_file_test
+from tx_bursts_test import uhd_tx_bursts_test
+from test_pps_test import uhd_test_pps_test
+from gpio_test import gpio_test
+
diff --git a/host/tests/devtest/gpio_test.py b/host/tests/devtest/gpio_test.py
new file mode 100644
index 000000000..d764a8d96
--- /dev/null
+++ b/host/tests/devtest/gpio_test.py
@@ -0,0 +1,47 @@
+#
+# Copyright 2015 Ettus Research LLC
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+""" Test for test_pps_input. """
+
+import re
+from uhd_test_base import uhd_example_test_case
+
+class gpio_test(uhd_example_test_case):
+    """ Run gpio. """
+    tests = {'default': {},}
+
+    def setup_example(self):
+        """
+        Set args.
+        """
+        self.test_params = gpio_test.tests
+
+    def run_test(self, test_name, test_args):
+        """ Run the app and scrape for the success message. """
+        self.log.info('Running test {n}'.format(n=test_name,))
+        # Run example:
+        args = [
+            self.create_addr_args_str(),
+        ]
+        (app, run_results) = self.run_example('gpio', args)
+        # Evaluate pass/fail:
+        run_results['passed'] = all([
+            app.returncode == 0,
+            re.search('All tests passed!', app.stdout) is not None,
+        ])
+        self.report_example_results(test_name, run_results)
+        return run_results
+
diff --git a/host/tests/devtest/run_testsuite.py b/host/tests/devtest/run_testsuite.py
new file mode 100755
index 000000000..587d1cc64
--- /dev/null
+++ b/host/tests/devtest/run_testsuite.py
@@ -0,0 +1,132 @@
+#!/usr/bin/env python
+#
+# Copyright 2015 Ettus Research LLC
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+"""
+Device test runner.
+"""
+
+import os
+import sys
+import subprocess
+import argparse
+import logging
+import time
+from threading  import Thread
+try:
+    from Queue import Queue, Empty
+except ImportError:
+    from queue import Queue, Empty  # Py3k
+from usrp_probe import get_usrp_list
+
+ANI = ('.', 'o', 'O', '0', 'O', 'o')
+
+def setup_parser():
+    """ Set up argparser """
+    parser = argparse.ArgumentParser(description="Test utility for UHD/USRP.")
+    parser.add_argument('--devtest-pattern', '-p', default='*', help='e.g. b2xx')
+    parser.add_argument('--device-filter', '-f', default=None, required=True, help='b200, x300, ...')
+    parser.add_argument('--log-dir', '-l', default='.')
+    parser.add_argument('--src-dir', default='.', help='Directory where the test sources are stored')
+    parser.add_argument('--build-dir', default=None, help='Build dir (where examples/ and utils/ are)')
+    parser.add_argument('--build-type', default='Release')
+    return parser
+
+def setup_env(args):
+    def setup_env_win(env, build_dir, build_type):
+        env['PATH'] = "{build_dir}/lib/{build_type};{build_dir}/examples/{build_type};{build_dir}/utils/{build_type};{path}".format(
+            build_dir=build_dir, build_type=build_type, path=env.get('PATH', '')
+        )
+        env['LIBPATH'] = "{build_dir}/lib/{build_type};{path}".format(
+            build_dir=build_dir, build_type=build_type, path=env.get('LIBPATH', '')
+        )
+        env['LIB'] = "{build_dir}/lib/{build_type};{path}".format(
+            build_dir=build_dir, build_type=build_type, path=env.get('LIB', '')
+        )
+        return env
+    def setup_env_unix(env, build_dir):
+        env['PATH'] = "{build_dir}/examples:{build_dir}/utils:{path}".format(
+            build_dir=build_dir, path=env.get('PATH', '')
+        )
+        env['LD_LIBRARY_PATH'] = "{build_dir}/lib:{path}".format(
+            build_dir=build_dir, path=env.get('LD_LIBRARY_PATH', '')
+        )
+        return env
+    def setup_env_osx(env, build_dir):
+        env['PATH'] = "{build_dir}/examples:{build_dir}/utils:{path}".format(
+                build_dir=build_dir, path=env.get('PATH', '')
+        )
+        env['DYLD_LIBRARY_PATH'] = "{build_dir}/lib:{path}".format(
+                build_dir=build_dir, path=env.get('DYLD_LIBRARY_PATH', '')
+        )
+        return env
+    ### Go
+    env = os.environ
+    if sys.platform.startswith('linux'):
+        env = setup_env_unix(env, args.build_dir)
+    elif sys.platform.startswith('win'):
+        env = setup_env_win(env, args.build_dir, args.build_type)
+    elif sys.platform.startswith('darwin'):
+        env = setup_env_osx(env, args.build_dir)
+    else:
+        print("Devtest not supported on this platform ({0}).".format(sys.platform))
+        exit(1)
+    return env
+
+def main():
+    """
+    Go, go, go!
+    """
+    args = setup_parser().parse_args()
+    devtest_pattern = "devtest_{p}.py".format(p=args.devtest_pattern)
+    uhd_args_list = get_usrp_list("type=" + args.device_filter)
+    if len(uhd_args_list) == 0:
+        print("No devices found. Exiting.")
+        exit(1)
+    for uhd_idx, uhd_info in enumerate(uhd_args_list):
+        print('--- Running all tests for device {dev} ({prod}, Serial: {ser}).'.format(
+            dev=uhd_idx,
+            prod=uhd_info.get('product', 'USRP'),
+            ser=uhd_info.get('serial')
+        ))
+        print('--- This will take some time. Better grab a cup of tea.')
+        env = setup_env(args)
+        args_str = uhd_info['args']
+        env['_UHD_TEST_ARGS_STR'] = args_str
+        logfile_name = "log{}.log".format(
+            args_str.replace('type=', '_').replace('serial=', '_').replace(',', '')
+        )
+        resultsfile_name = "results{}.log".format(
+            args_str.replace('type=', '_').replace('serial=', '_').replace(',', '')
+        )
+        env['_UHD_TEST_LOGFILE'] = os.path.join(args.log_dir, logfile_name)
+        env['_UHD_TEST_RESULTSFILE'] = os.path.join(args.log_dir, resultsfile_name)
+        env['_UHD_TEST_LOG_LEVEL'] = str(logging.INFO)
+        env['_UHD_TEST_PRINT_LEVEL'] = str(logging.WARNING)
+        p = subprocess.Popen(
+            [
+                "python", "-m", "unittest", "discover", "-v",
+                "-s", args.src_dir,
+                "-p", devtest_pattern,
+            ],
+            env=env,
+            stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+        )
+        print p.communicate()[0]
+    print('--- Done testing all attached devices.')
+
+if __name__ == "__main__":
+    main()
diff --git a/host/tests/devtest/rx_samples_to_file_test.py b/host/tests/devtest/rx_samples_to_file_test.py
new file mode 100755
index 000000000..bac6ac256
--- /dev/null
+++ b/host/tests/devtest/rx_samples_to_file_test.py
@@ -0,0 +1,67 @@
+#!/usr/bin/env python
+#
+# Copyright 2015 Ettus Research LLC
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+""" Test the rx_samples_to_file example. """
+
+from uhd_test_base import uhd_example_test_case
+
+class rx_samples_to_file_test(uhd_example_test_case):
+    """
+    Run rx_samples_to_file and check output.
+    """
+    tests = {
+        'default': {
+            'duration': 1,
+            'rate': 5e6,
+        },
+    }
+
+    def setup_example(self):
+        """
+        Set args.
+        """
+        self.test_params = rx_samples_to_file_test.tests
+
+    def run_test(self, test_name, test_args):
+        """
+        Test launcher. Runs the example.
+        """
+        self.log.info('Running test {n}, Subdev = {subdev}, Sample Rate = {rate}'.format(
+            n=test_name, subdev=test_args.get('subdev'), rate=test_args.get('rate'),
+        ))
+        # Run example:
+        args = [
+            self.create_addr_args_str(),
+            '--null',
+            '--stats',
+            '--duration', str(test_args['duration']),
+            '--rate', str(test_args.get('rate', 1e6)),
+            '--wirefmt', test_args.get('wirefmt', 'sc16'),
+        ]
+        if test_args.has_key('subdev'):
+            args.append('--subdev')
+            args.append(test_args['subdev'])
+        (app, run_results) = self.run_example('rx_samples_to_file', args)
+        # Evaluate pass/fail:
+        run_results['passed'] = all([
+            not run_results['has_D'],
+            not run_results['has_S'],
+            run_results['return_code'] == 0,
+        ])
+        self.report_example_results(test_name, run_results)
+        return run_results
+
diff --git a/host/tests/devtest/test_messages_test.py b/host/tests/devtest/test_messages_test.py
new file mode 100644
index 000000000..496765c75
--- /dev/null
+++ b/host/tests/devtest/test_messages_test.py
@@ -0,0 +1,57 @@
+#!/usr/bin/env python
+#
+# Copyright 2015 Ettus Research LLC
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+""" Test the test_messages example. """
+
+import re
+from uhd_test_base import uhd_example_test_case
+
+class uhd_test_messages_test(uhd_example_test_case):
+    """
+    Run test_messages and check output.
+    """
+    tests = {'default': {},}
+
+    def setup_example(self):
+        """
+        Set args.
+        """
+        self.test_params = uhd_test_messages_test.tests
+
+    def run_test(self, test_name, test_args):
+        """ Run the app and scrape for the failure messages. """
+        self.log.info('Running test {n}'.format(n=test_name,))
+        # Run example:
+        args = [
+            self.create_addr_args_str(),
+        ]
+        if test_args.has_key('ntests'):
+            args.append('--ntests')
+            args.append(test_args['ntests'])
+        (app, run_results) = self.run_example('test_messages', args)
+        # Evaluate pass/fail:
+        succ_fail_re = re.compile(r'(?P<test>.*)->\s+(?P<succ>\d+) successes,\s+(?P<fail>\d+) +failures')
+        for mo in succ_fail_re.finditer(app.stdout):
+            key = mo.group("test").strip().replace(' ', '_').lower()
+            successes = int(mo.group("succ"))
+            failures = int(mo.group("fail"))
+            run_results[key] = "{}/{}".format(successes, successes+failures)
+            run_results['passed'] = bool(failures)
+
+        self.report_example_results(test_name, run_results)
+        return run_results
+
diff --git a/host/tests/devtest/test_pps_test.py b/host/tests/devtest/test_pps_test.py
new file mode 100644
index 000000000..1e5b36e2c
--- /dev/null
+++ b/host/tests/devtest/test_pps_test.py
@@ -0,0 +1,51 @@
+#!/usr/bin/env python
+#
+# Copyright 2015 Ettus Research LLC
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+""" Test for test_pps_input. """
+
+import re
+from uhd_test_base import uhd_example_test_case
+
+class uhd_test_pps_test(uhd_example_test_case):
+    """ Run test_pps_input. """
+    tests = {'default': {},}
+
+    def setup_example(self):
+        """
+        Set args.
+        """
+        self.test_params = uhd_test_pps_test.tests
+
+    def run_test(self, test_name, test_args):
+        """ Run the app and scrape for the success message. """
+        self.log.info('Running test {n}'.format(n=test_name,))
+        # Run example:
+        args = [
+            self.create_addr_args_str(),
+        ]
+        if test_args.has_key('source'):
+            args.append('--source')
+            args.append(test_args['source'])
+        (app, run_results) = self.run_example('test_pps_input', args)
+        # Evaluate pass/fail:
+        run_results['passed'] = all([
+            app.returncode == 0,
+            re.search('Success!', app.stdout) is not None,
+        ])
+        self.report_example_results(test_name, run_results)
+        return run_results
+
diff --git a/host/tests/devtest/tx_bursts_test.py b/host/tests/devtest/tx_bursts_test.py
new file mode 100755
index 000000000..863f35fe1
--- /dev/null
+++ b/host/tests/devtest/tx_bursts_test.py
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+#
+# Copyright 2015 Ettus Research LLC
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+""" Run the test for tx_burst """
+
+import re
+from uhd_test_base import uhd_example_test_case
+
+class uhd_tx_bursts_test(uhd_example_test_case):
+    """ Run test_messages. """
+    tests = {
+        'default': {
+            'nsamps': 10000,
+            'rate': 5e6,
+            'channels': '0',
+        },
+    }
+
+    def setup_example(self):
+        """
+        Set args.
+        """
+        self.test_params = uhd_tx_bursts_test.tests
+
+    def run_test(self, test_name, test_args):
+        """ Run the app and scrape for the failure messages. """
+        self.log.info('Running test {name}, Channel = {channel}, Sample Rate = {rate}'.format(
+            name=test_name, channel=test_args.get('channel'), rate=test_args.get('rate'),
+        ))
+        # Run example:
+        args = [
+            self.create_addr_args_str(),
+            '--nsamps', str(test_args['nsamps']),
+            '--channels', str(test_args['channels']),
+            '--rate', str(test_args.get('rate', 1e6)),
+        ]
+        if test_args.has_key('subdev'):
+            args.append('--subdev')
+            args.append(test_args['subdev'])
+        (app, run_results) = self.run_example('tx_bursts', args)
+        # Evaluate pass/fail:
+        run_results['passed'] = all([
+            app.returncode == 0,
+            not run_results['has_S'],
+        ])
+        run_results['async_burst_ack_found'] = re.search('success', app.stdout) is not None
+        self.report_example_results(test_name, run_results)
+        return run_results
+
diff --git a/host/tests/devtest/uhd_test_base.py b/host/tests/devtest/uhd_test_base.py
new file mode 100755
index 000000000..046e6fb47
--- /dev/null
+++ b/host/tests/devtest/uhd_test_base.py
@@ -0,0 +1,222 @@
+#!/usr/bin/env python
+
+import os
+import sys
+import yaml
+import unittest
+import re
+import time
+import logging
+from subprocess import Popen, PIPE, STDOUT
+from usrp_probe import get_usrp_list
+
+#--------------------------------------------------------------------------
+# Application
+#--------------------------------------------------------------------------
+class shell_application(object):
+    """
+    Wrapper for applications that are in $PATH.
+    Note: The CMake infrastructure makes sure all examples and utils are in $PATH.
+    """
+    def __init__(self, name):
+        self.name = name
+        self.stdout = ''
+        self.stderr = ''
+        self.returncode = None
+        self.exec_time = None
+
+    def run(self, args = []):
+        cmd_line = [self.name]
+        cmd_line.extend(args)
+        start_time = time.time()
+        p = Popen(cmd_line, stdout=PIPE, stderr=PIPE, close_fds=True)
+        self.stdout, self.stderr = p.communicate()
+        self.returncode = p.returncode
+        self.exec_time = time.time() - start_time
+
+#--------------------------------------------------------------------------
+# Test case base
+#--------------------------------------------------------------------------
+class uhd_test_case(unittest.TestCase):
+    """
+    Base class for UHD test cases.
+    """
+    test_name = '--TEST--'
+
+    def set_up(self):
+        """
+        Override this to add own setup code per test.
+        """
+        pass
+
+    def setUp(self):
+        self.name = self.__class__.__name__
+        self.test_id = self.id().split('.')[-1]
+        self.results = {}
+        self.results_file = os.getenv('_UHD_TEST_RESULTSFILE', "")
+        if self.results_file and os.path.isfile(self.results_file):
+            self.results = yaml.safe_load(open(self.results_file).read()) or {}
+        self.args_str = os.getenv('_UHD_TEST_ARGS_STR', "")
+        self.usrp_info = get_usrp_list(self.args_str)[0]
+        if not self.results.has_key(self.usrp_info['serial']):
+            self.results[self.usrp_info['serial']] = {}
+        if not self.results[self.usrp_info['serial']].has_key(self.name):
+            self.results[self.usrp_info['serial']][self.name] = {}
+        self.setup_logger()
+        self.set_up()
+
+    def setup_logger(self):
+        " Add logging infrastructure "
+        self.log = logging.getLogger("devtest.{name}".format(name=self.name))
+        self.log_file = os.getenv('_UHD_TEST_LOGFILE', "devtest.log")
+        #self.log_level = int(os.getenv('_UHD_TEST_LOG_LEVEL', logging.DEBUG))
+        #self.print_level = int(os.getenv('_UHD_TEST_PRINT_LEVEL', logging.WARNING))
+        self.log_level = logging.DEBUG
+        self.print_level = logging.WARNING
+        file_handler = logging.FileHandler(self.log_file)
+        file_handler.setLevel(self.log_level)
+        console_handler = logging.StreamHandler()
+        console_handler.setLevel(self.print_level)
+        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+        file_handler.setFormatter(formatter)
+        console_handler.setFormatter(formatter)
+        self.log.setLevel(logging.DEBUG)
+        self.log.addHandler(file_handler)
+        self.log.addHandler(console_handler)
+        self.log.info("Starting test with device: {dev}".format(dev=self.args_str))
+
+    def tear_down(self):
+        pass
+
+    def tearDown(self):
+        self.tear_down()
+        if self.results_file:
+            open(self.results_file, 'w').write(yaml.dump(self.results, default_flow_style=False))
+
+    def report_result(self, testname, key, value):
+        """ Store a result as a key/value pair.
+        After completion, all results for one test are written to the results file.
+        """
+        if not self.results[self.usrp_info['serial']][self.name].has_key(testname):
+            self.results[self.usrp_info['serial']][self.name][testname] = {}
+        self.results[self.usrp_info['serial']][self.name][testname][key] = value
+
+    def create_addr_args_str(self, argname="args"):
+        """ Returns an args string, usually '--args "type=XXX,serial=YYY" """
+        if len(self.args_str) == 0:
+            return ''
+        return '--{}={}'.format(argname, self.args_str)
+
+    def filter_warnings(self, errstr):
+        """ Searches errstr for UHD warnings, removes them, and puts them into a separate string.
+        Returns (errstr, warnstr), where errstr no longer has warning. """
+        warn_re = re.compile("UHD Warning:\n(?:    .*\n)+")
+        warnstr = "\n".join(warn_re.findall(errstr)).strip()
+        errstr = warn_re.sub('', errstr).strip()
+        return (errstr, warnstr)
+
+    def filter_stderr(self, stderr, run_results={}):
+        """ Filters the output to stderr. run_results[] is a dictionary.
+        This function will:
+        - Remove UUUUU... strings, since they are generally not a problem.
+        - Remove all DDDD and SSSS strings, and add run_results['has_S'] = True
+          and run_results['has_D'] = True.
+        - Remove warnings and put them in run_results['warnings']
+        - Put the filtered error string into run_results['errors'] and returns the dictionary
+        """
+        errstr, run_results['warnings'] = self.filter_warnings(stderr)
+        # Scan for underruns and sequence errors / dropped packets  not detected in the counter
+        errstr = re.sub('UU+', '', errstr)
+        (errstr, n_subs) = re.subn('SS+', '', errstr)
+        if n_subs:
+            run_results['has_S'] = True
+        (errstr, n_subs) = re.subn('DD+', '', errstr)
+        if n_subs:
+            run_results['has_D'] = True
+        errstr = re.sub("\n\n+", "\n", errstr)
+        run_results['errors'] = errstr.strip()
+        return run_results
+
+class uhd_example_test_case(uhd_test_case):
+    """
+    A test case that runs an example.
+    """
+
+    def setup_example(self):
+        """
+        Override this to add specific setup code.
+        """
+        pass
+
+    def set_up(self):
+        """
+        """
+        self.setup_example()
+
+    def run_test(self, test_name, test_args):
+        """
+        Override this to run the actual example.
+
+        Needs to return either a boolean or a dict with key 'passed' to determine
+        pass/fail.
+        """
+        raise NotImplementedError
+
+    def run_example(self, example, args):
+        """
+        Run `example' (which has to be a UHD example or utility) with `args'.
+        Return results and the app object.
+        """
+        self.log.info("Running example: `{example} {args}'".format(example=example, args=" ".join(args)))
+        app = shell_application(example)
+        app.run(args)
+        run_results = {
+            'return_code': app.returncode,
+            'passed': False,
+            'has_D': False,
+            'has_S': False,
+        }
+        run_results = self.filter_stderr(app.stderr, run_results)
+        self.log.info('STDERR Output:')
+        self.log.info(str(app.stderr))
+        return (app, run_results)
+
+
+    def report_example_results(self, test_name, run_results):
+        for key in sorted(run_results):
+            self.log.info('{key} = {val}'.format(key=key, val=run_results[key]))
+            self.report_result(
+                test_name,
+                key, run_results[key]
+            )
+        if run_results.has_key('passed'):
+            self.report_result(
+                test_name,
+                'status',
+                'Passed' if run_results['passed'] else 'Failed',
+            )
+        if run_results.has_key('errors'):
+            self.report_result(
+                test_name,
+                'errors',
+                'Yes' if run_results['errors'] else 'No',
+            )
+
+    def test_all(self):
+        """
+        Hook for test runner. Needs to be a class method that starts with 'test'.
+        Calls run_test().
+        """
+        for test_name, test_args in self.test_params.iteritems():
+            if not test_args.has_key('product') or (self.usrp_info['product'] in test_args.get('products', [])):
+                run_results = self.run_test(test_name, test_args)
+                passed = bool(run_results)
+                if isinstance(run_results, dict):
+                    passed = run_results['passed']
+                self.assertTrue(
+                    passed,
+                    msg="Errors occurred during test `{t}'. Check log file for details.\nRun results:\n{r}".format(
+                        t=test_name, r=yaml.dump(run_results, default_flow_style=False)
+                    )
+                )
+
diff --git a/host/tests/devtest/usrp_probe.py b/host/tests/devtest/usrp_probe.py
new file mode 100644
index 000000000..ba3c645e4
--- /dev/null
+++ b/host/tests/devtest/usrp_probe.py
@@ -0,0 +1,50 @@
+#!/usr/bin/env python
+#
+# Copyright 2015 Ettus Research LLC
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+""" Run uhd_find_devices and parse the output. """
+
+import re
+import subprocess
+
+def get_usrp_list(device_filter=None):
+    """ Returns a list of dicts that contain USRP info """
+    try:
+        if device_filter is not None:
+            output = subprocess.check_output(['uhd_find_devices', '--args', device_filter])
+        else:
+            output = subprocess.check_output('uhd_find_devices')
+    except subprocess.CalledProcessError:
+        return []
+    split_re = "\n*-+\n-- .*\n-+\n"
+    uhd_strings = re.split(split_re, output)
+    result = []
+    for uhd_string in uhd_strings:
+        if not re.match("Device Address", uhd_string):
+            continue
+        this_result = {k: v for k, v in re.findall("    ([a-z]+): (.*)", uhd_string)}
+        args_string = ""
+        try:
+            args_string = "type={},serial={}".format(this_result['type'], this_result['serial'])
+        except KeyError:
+            continue
+        this_result['args'] = args_string
+        result.append(this_result)
+    return result
+
+if __name__ == "__main__":
+    print get_usrp_list()
+    print get_usrp_list('type=x300')
-- 
cgit v1.2.3