aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/stream_python.hpp
blob: e07d120f1a48c801f681e61eaa64b13aa9089434 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
//
// Copyright 2017-2018 Ettus Research, a National Instruments Company
//
// SPDX-License-Identifier: GPL-3.0-or-later
//

#ifndef INCLUDED_UHD_STREAM_PYTHON_HPP
#define INCLUDED_UHD_STREAM_PYTHON_HPP

#include "utils/gil_release_python.hpp"
#include <uhd/stream.hpp>
#include <uhd/types/metadata.hpp>

#include <boost/format.hpp>

static size_t wrap_recv(uhd::rx_streamer *rx_stream,
                        bp::object &np_array,
                        bp::object &metadata,
                        const double timeout = 0.1)
{
    // Extract the metadata
    bp::extract<uhd::rx_metadata_t&> get_metadata(metadata);
    if (not get_metadata.check())
    {
        return 0;
    }

    // Get a numpy array object from given python object
    // No sanity checking possible!
    PyObject* array_obj = PyArray_FROM_OF(np_array.ptr(), NPY_ARRAY_CARRAY);
    PyArrayObject* array_type_obj = reinterpret_cast<PyArrayObject*>(array_obj);

    // Get dimensions of the numpy array
    const size_t dims = PyArray_NDIM(array_type_obj);
    const npy_intp* shape = PyArray_SHAPE(array_type_obj);

    // How many bytes to jump to get to the next element of this stride
    // (next row)
    const npy_intp* strides = PyArray_STRIDES(array_type_obj);
    const size_t channels = rx_stream->get_num_channels();

    // Check if numpy array sizes are okay
    if (((channels > 1) && (dims != 2))
     or ((size_t) shape[0] < channels))
    {
        // Manually decrement the ref count
        Py_DECREF(array_obj);
        // If we don't have a 2D NumPy array, assume we have a 1D array
        size_t input_channels = (dims != 2) ? 1 : shape[0];
        throw uhd::runtime_error(str(boost::format(
            "Number of RX channels (%d) does not match the dimensions of the data array (%d)")
            % channels % input_channels));
    }

    // Get a pointer to the storage
    std::vector<void*> channel_storage;
    char* data = PyArray_BYTES(array_type_obj);
    for (size_t i = 0; i < channels; ++i)
    {
        channel_storage.push_back((void*)(data + i * strides[0]));
    }

    // Get data buffer and size of the array
    size_t nsamps_per_buff;
    if (dims > 1) {
        nsamps_per_buff = (size_t) shape[1];
    } else {
        nsamps_per_buff = PyArray_SIZE(array_type_obj);
    }

    // Release the GIL only for the recv() call
    const size_t result = [&]() {
        scoped_gil_release gil_release;
        // Call the real recv()
        return rx_stream->recv(
            channel_storage,
            nsamps_per_buff,
            get_metadata(),
            timeout
        );
    }();

    // Manually decrement the ref count
    Py_DECREF(array_obj);
    return result;
}
BOOST_PYTHON_FUNCTION_OVERLOADS(overload_wrap_recv, wrap_recv, 3, 4);

static size_t wrap_send(uhd::tx_streamer *tx_stream,
                        bp::object &np_array,
                        bp::object &metadata,
                        const double timeout = 0.1)
{
    // Extract the metadata
    bp::extract<uhd::tx_metadata_t&> get_metadata(metadata);
    // TODO: throw an error here?
    if (not get_metadata.check())
    {
        return 0;
    }

    // Get a numpy array object from given python object
    // No sanity checking possible!
    // Note: this increases the ref count, which we'll need to manually decrease at the end
    PyObject* array_obj = PyArray_FROM_OF(np_array.ptr(),NPY_ARRAY_CARRAY);
    PyArrayObject* array_type_obj = reinterpret_cast<PyArrayObject*>(array_obj);

    // Get dimensions of the numpy array
    const size_t dims = PyArray_NDIM(array_type_obj);
    const npy_intp* shape = PyArray_SHAPE(array_type_obj);

    // How many bytes to jump to get to the next element of the stride
    // (next row)
    const npy_intp* strides = PyArray_STRIDES(array_type_obj);
    const size_t channels = tx_stream->get_num_channels();

    // Check if numpy array sizes are ok
    if (((channels > 1) && (dims != 2))
     or ((size_t) shape[0] < channels))
    {
        // Manually decrement the ref count
        Py_DECREF(array_obj);
        // If we don't have a 2D NumPy array, assume we have a 1D array
        size_t input_channels = (dims != 2) ? 1 : shape[0];
        throw uhd::runtime_error(str(boost::format(
            "Number of TX channels (%d) does not match the dimensions of the data array (%d)")
            % channels % input_channels));
    }

    // Get a pointer to the storage
    std::vector<void*> channel_storage;
    char* data = PyArray_BYTES(array_type_obj);
    for (size_t i = 0; i < channels; ++i)
    {
        channel_storage.push_back((void*)(data + i * strides[0]));
    }

    // Get data buffer and size of the array
    size_t nsamps_per_buff = (dims > 1) ? (size_t) shape[1] : PyArray_SIZE(array_type_obj);

    // Release the GIL only for the send() call
    const size_t result = [&]() {
        scoped_gil_release gil_release;
        // Call the real send()
        return tx_stream->send(
            channel_storage,
            nsamps_per_buff,
            get_metadata(),
            timeout
        );
    }();

    // Manually decrement the ref count
    Py_DECREF(array_obj);
    return result;
}
BOOST_PYTHON_FUNCTION_OVERLOADS(overload_wrap_send, wrap_send, 3, 4);

static bool wrap_recv_async_msg(uhd::tx_streamer *tx_stream,
                                uhd::async_metadata_t &async_metadata,
                                double timeout = 0.1)
{
    // Release the GIL
    scoped_gil_release gil_release;

    return tx_stream->recv_async_msg(async_metadata, timeout);
}
BOOST_PYTHON_FUNCTION_OVERLOADS(overload_wrap_recv_async_msg, wrap_recv_async_msg, 2, 3);

void export_stream()
{
    using stream_args_t = uhd::stream_args_t;
    using rx_streamer   = uhd::rx_streamer;
    using tx_streamer   = uhd::tx_streamer;

    bp::class_<stream_args_t>
        ("stream_args", bp::init<const std::string&, const std::string&>())

        // Properties
        .def_readwrite("cpu_format", &stream_args_t::cpu_format)
        .def_readwrite("otw_format", &stream_args_t::otw_format)
        .def_readwrite("args"      , &stream_args_t::args      )
        .def_readwrite("channels"  , &stream_args_t::channels  )
        ;

    bp::class_<
        rx_streamer,
        boost::shared_ptr<rx_streamer>,
        boost::noncopyable>("rx_streamer", "See: uhd::rx_streamer", bp::no_init)

        // Methods
        .def("recv"             , &wrap_recv, overload_wrap_recv()    )
        .def("get_num_channels" , &uhd::rx_streamer::get_num_channels )
        .def("get_max_num_samps", &uhd::rx_streamer::get_max_num_samps)
        .def("issue_stream_cmd" , &uhd::rx_streamer::issue_stream_cmd )
        ;

    bp::class_<
        tx_streamer,
        boost::shared_ptr<tx_streamer>,
        boost::noncopyable>("tx_streamer", "See: uhd::tx_streamer", bp::no_init)

        // Methods
        .def("send"             , &wrap_send, overload_wrap_send())
        .def("get_num_channels" , &tx_streamer::get_num_channels  )
        .def("get_max_num_samps", &tx_streamer::get_max_num_samps )
        .def("recv_async_msg"   , &wrap_recv_async_msg,
                                  overload_wrap_recv_async_msg()  )
        ;
}

#endif /* INCLUDED_UHD_STREAM_PYTHON_HPP */