diff options
| author | Andrej Rode <andrej.rode@ettus.com> | 2017-02-09 11:16:16 -0800 | 
|---|---|---|
| committer | Martin Braun <martin.braun@ettus.com> | 2018-06-20 19:02:32 -0500 | 
| commit | 22e24497a510c174e6de7718ad918a423d1973dd (patch) | |
| tree | 436bc3d08451dbb2cc70f4c11c611cdd4ff23f78 /host/python/pyuhd.py | |
| parent | 91a5518443f4ff938f67a2f1bd1b09b24bceecd5 (diff) | |
| download | uhd-22e24497a510c174e6de7718ad918a423d1973dd.tar.gz uhd-22e24497a510c174e6de7718ad918a423d1973dd.tar.bz2 uhd-22e24497a510c174e6de7718ad918a423d1973dd.zip  | |
python: Initial commit of Python API
Initial commit of the Python API using Boost.Python. Bind the
MultiUSRP API for use in Python. Bindings intended to provide as
complete coverage as possible.
- Wrap most multi_usrp calls
- Adding multi channel send/recv examples in examples/python
- Adding setuptools support
- Initial attempt at binding the UHD types and filters
Diffstat (limited to 'host/python/pyuhd.py')
| -rw-r--r-- | host/python/pyuhd.py | 120 | 
1 files changed, 120 insertions, 0 deletions
diff --git a/host/python/pyuhd.py b/host/python/pyuhd.py new file mode 100644 index 000000000..32279afb3 --- /dev/null +++ b/host/python/pyuhd.py @@ -0,0 +1,120 @@ +# +# Copyright 2017 Ettus Research LLC +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +# + +import libpyuhd as lib +import numpy as np + + +class multi_usrp(object): +    def __init__(self, args=""): +        self.usrp = lib.multi_usrp.multi_usrp.make(args) + +    def __del__(self): +        # Help the garbage collection +        self.usrp = None + +    def set_rx_rate(self, rate, chan=None): +        if chan is None: +            for c in xrange(self.usrp.get_rx_num_channels()): +                self.usrp.set_rx_rate(rate, c) +        elif isinstance(chan, list): +            for c in chan: +                self.usrp.set_rx_rate(rate, c) +        else: +            self.usrp.set_rx_rate(rate, chan) + +    def set_tx_rate(self, rate, chan=None): +        if chan is None: +            for chan in xrange(self.usrp.get_tx_num_channels()): +                self.usrp.set_tx_rate(rate, chan) +        elif isinstance(chan, list): +            for c in chan: +                self.usrp.set_tx_rate(rate, c) +        else: +            self.usrp.set_tx_rate(rate, chan) + + +    def recv_num_samps(self, num_samps, freq, rate=1e6, channels=[0], gain=10): +        result = np.empty((len(channels), num_samps), dtype=np.complex64) +        for chan in channels: +            self.usrp.set_rx_rate(rate, chan) +            self.usrp.set_rx_freq(lib.types.tune_request(freq), chan) +            self.usrp.set_rx_gain(gain, chan) +        st_args = lib.types.stream_args("fc32", "sc16") +        st_args.channels = channels +        metadata = lib.types.rx_metadata() +        streamer = self.usrp.get_rx_stream(st_args) +        buffer_samps = streamer.get_max_num_samps() +        recv_buffer = np.zeros( +            (len(channels), buffer_samps), dtype=np.complex64) +        recv_samps = 0 +        stream_cmd = lib.types.stream_cmd(lib.types.stream_mode.start_cont) +        stream_cmd.stream_now = True +        streamer.issue_stream_cmd(stream_cmd) +        while (recv_samps < num_samps): +            samps = streamer.recv(recv_buffer, metadata) +            if metadata.error_code != lib.types.rx_metadata_error_code.none: +                print(metadata.strerror()) +            if samps: +                real_samps = min(num_samps - recv_samps, samps) +                result[:, recv_samps:recv_samps + real_samps - +                       1] = recv_buffer[:, 0:real_samps - 1] +                recv_samps += real_samps +        stream_cmd = lib.types.stream_cmd(lib.types.stream_mode.stop_cont) +        streamer.issue_stream_cmd(stream_cmd) +        while samps: +            samps = streamer.recv(recv_buffer, metadata) +        # Help the garbage collection +        streamer = None +        return result + +    def send_waveform(self, +                      waveform_proto, +                      duration, +                      freq, +                      rate=1e6, +                      channels=[0], +                      gain=10): +        self.set_tx_rate(rate) +        for chan in channels: +            self.usrp.set_tx_rate(rate, chan) +            self.usrp.set_tx_freq(lib.types.tune_request(freq), chan) +            self.usrp.set_tx_gain(gain, chan) +        st_args = lib.types.stream_args("fc32", "sc16") +        st_args.channels = channels +        metadata = lib.types.rx_metadata() +        streamer = self.usrp.get_tx_stream(st_args) +        buffer_samps = streamer.get_max_num_samps() +        proto_len = waveform_proto.shape[-1] +        if proto_len < buffer_samps: +            waveform_proto = np.tile(waveform_proto, (1, int(np.ceil(float(buffer_samps)/proto_len)))) +            proto_len = waveform_proto.shape[-1] +        metadata = lib.types.tx_metadata() +        send_samps = 0 +        max_samps = int(np.floor(duration * rate)) +        if waveform_proto.shape[0] < len(channels): +            waveform_proto = np.tile(waveform_proto[0], (len(channels), 1)) +        while send_samps < max_samps: +            real_samps = min(proto_len, max_samps-send_samps) +            if real_samps < proto_len: +                samples = streamer.send(waveform_proto[:real_samps], metadata) +            else: +                samples = streamer.send(waveform_proto, metadata) +            send_samps += samples +        # Help the garbage collection +        streamer = None +        return send_samps  | 
