aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/include/uhdlib/usrp/common/rpc.py
blob: c484991a67e92b13fff0719fdc6288e64e8c10eb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#
# Copyright 2021 Ettus Research, a National Instruments Company
#
# SPDX-License-Identifier: GPL-3.0-or-later
#

import re
import sys
from mako.template import Template

class Function:
    def __init__(self, return_type, function_name, args):
        self.name = function_name
        self.does_return = return_type != "void"
        self.return_type = return_type
        self.arg_names = [arg[1] for arg in args]
        self.rpcname = f"\"{function_name}\""
        self.args = [" ".join(arg) for arg in args]
        self.has_rpcprefix = False

    def enable_rpcprefix(self):
        self.rpcname = f"_rpc_prefix + \"{self.name}\""
        self.has_rpcprefix = True

class Interface:
    def __init__(self, basename, functions, has_rpcprefix=False):
        self.basename = basename
        self.functions = functions
        self.has_rpcprefix = has_rpcprefix
        if has_rpcprefix:
            for fn in self.functions:
                fn.enable_rpcprefix()

def fn_from_string(function_string):
    m = re.match(r"^([a-zA-Z:<>,_0-9 ]+)\s+([a-zA-Z0-9_]+)\(([a-zA-Z0-9,_:&<> ]*)\)$", function_string)
    return_type = m.group(1)
    function_name = m.group(2)
    args = m.group(3)
    args = [arg.strip() for arg in args.split(",")]
    args = [arg.split(" ") for arg in args if len(arg) > 0]
    args = [(" ".join(arg[:-1]), arg[-1]) for arg in args]
    return Function(return_type, function_name, args)

IFACES = [
    Interface("mpmd_rpc", [
        fn_from_string("size_t get_num_timekeepers()"),
        fn_from_string("std::vector<std::string> get_mb_sensors()"),
        fn_from_string("sensor_value_t::sensor_map_t get_mb_sensor(const std::string& sensor)"),
        fn_from_string("std::vector<std::string> get_gpio_banks()"),
        fn_from_string("std::vector<std::string> get_gpio_srcs(const std::string& bank)"),
        fn_from_string("bool supports_feature(const std::string& feature)"),
        fn_from_string("void set_tick_period(size_t tick_index, uint64_t period_ns)"),
        fn_from_string("uint64_t get_timekeeper_time(size_t timekeeper_idx, bool last_pps)"),
        fn_from_string("void set_timekeeper_time(size_t timekeeper_idx, uint64_t ticks, bool last_pps)"),
        fn_from_string("void set_time_source(const std::string& source)"),
        fn_from_string("std::string get_time_source()"),
        fn_from_string("std::vector<std::string> get_time_sources()"),
        fn_from_string("void set_clock_source(const std::string& source)"),
        fn_from_string("std::string get_clock_source()"),
        fn_from_string("std::vector<std::string> get_clock_sources()"),
        Function("void", "set_sync_source", [("const std::map<std::string, std::string>&", "source")]),
        fn_from_string("std::map<std::string, std::string> get_sync_source()"),
        fn_from_string("std::vector<std::map<std::string, std::string>> get_sync_sources()"),
        fn_from_string("void set_clock_source_out(bool enb)"),
        fn_from_string("void set_trigger_io(const std::string& direction)"),
        fn_from_string("std::map<std::string, std::string> get_mb_eeprom()"),
        fn_from_string("std::vector<std::string> get_gpio_src(const std::string& bank)"),
        fn_from_string("void set_gpio_src(const std::string& bank, const std::vector<std::string>& src)"),
    ]),
]

COMMON_TMPL = """<% import time %>\
/***********************************************************************
 * This file was generated by ${file} on ${time.strftime("%c")}
 **********************************************************************/

//
// Copyright 2021 Ettus Research, a National Instruments Brand
//
// SPDX-License-Identifier: GPL-3.0-or-later
//

#pragma once

#include <uhd/types/sensors.hpp>
#include <uhdlib/utils/rpc.hpp>
#include <stddef.h>
#include <memory>
#include <string>
#include <vector>

namespace uhd { namespace usrp {

%for iface in ifaces:
    class ${iface.basename}_iface {
    public:
        using sptr = std::shared_ptr<${iface.basename}_iface>;

        %for function in iface.functions:
            virtual ${function.return_type} ${function.name}(${",".join(function.args)}) = 0;
        %endfor

        // Deprecated
        virtual uhd::rpc_client::sptr get_raw_rpc_client() = 0;
    };

    class ${iface.basename} : public ${iface.basename}_iface {
    public:
        %if iface.has_rpcprefix:
            ${iface.basename}(uhd::rpc_client::sptr rpc, const std::string& rpc_prefix) : _rpcc(rpc), _rpc_prefix(rpc_prefix) {}
        %else:
            ${iface.basename}(uhd::rpc_client::sptr rpc) : _rpcc(rpc) {}
        %endif

        %for function in iface.functions:
            ${function.return_type} ${function.name}(${",".join(function.args)}) override
            {
                %if function.does_return:
                return _rpcc->request_with_token<${function.return_type}>(${",".join([function.rpcname] + function.arg_names)});
                %else:
                _rpcc->notify_with_token(${",".join([function.rpcname] + function.arg_names)});
                %endif
            }
        %endfor

        // Deprecated
        uhd::rpc_client::sptr get_raw_rpc_client() { return _rpcc; }

    private:
        uhd::rpc_client::sptr _rpcc;
        %if iface.has_rpcprefix:
            const std::string _rpc_prefix;
        %endif
    };
%endfor

}}
"""

def parse_tmpl(_tmpl_text, **kwargs):
    return Template(_tmpl_text).render(**kwargs)

if __name__ == '__main__':
    out_file = sys.argv[1]

    code = parse_tmpl(COMMON_TMPL,
        ifaces=IFACES,
        file=__file__,
    )

    with open(out_file, 'w') as f:
        f.write(code)