# # Copyright 2021 Ettus Research, a National Instruments Company # # SPDX-License-Identifier: GPL-3.0-or-later # import re import sys from mako.template import Template class Function: def __init__(self, return_type, function_name, args): self.name = function_name self.does_return = return_type != "void" self.return_type = return_type self.arg_names = [arg[1] for arg in args] self.rpcname = f"\"{function_name}\"" self.args = [" ".join(arg) for arg in args] self.has_rpcprefix = False def enable_rpcprefix(self): self.rpcname = f"_rpc_prefix + \"{self.name}\"" self.has_rpcprefix = True class Interface: def __init__(self, basename, functions, has_rpcprefix=False): self.basename = basename self.functions = functions self.has_rpcprefix = has_rpcprefix if has_rpcprefix: for fn in self.functions: fn.enable_rpcprefix() def fn_from_string(function_string): m = re.match(r"^([a-zA-Z:<>,_0-9 ]+)\s+([a-zA-Z0-9_]+)\(([a-zA-Z0-9,_:&<> ]*)\)$", function_string) return_type = m.group(1) function_name = m.group(2) args = m.group(3) args = [arg.strip() for arg in args.split(",")] args = [arg.split(" ") for arg in args if len(arg) > 0] args = [(" ".join(arg[:-1]), arg[-1]) for arg in args] return Function(return_type, function_name, args) IFACES = [ Interface("mpmd_rpc", [ fn_from_string("size_t get_num_timekeepers()"), fn_from_string("std::vector get_mb_sensors()"), fn_from_string("sensor_value_t::sensor_map_t get_mb_sensor(const std::string& sensor)"), fn_from_string("std::vector get_gpio_banks()"), fn_from_string("std::vector get_gpio_srcs(const std::string& bank)"), fn_from_string("bool supports_feature(const std::string& feature)"), fn_from_string("void set_tick_period(size_t tick_index, uint64_t period_ns)"), fn_from_string("uint64_t get_timekeeper_time(size_t timekeeper_idx, bool last_pps)"), fn_from_string("void set_timekeeper_time(size_t timekeeper_idx, uint64_t ticks, bool last_pps)"), fn_from_string("void set_time_source(const std::string& source)"), fn_from_string("std::string get_time_source()"), fn_from_string("std::vector get_time_sources()"), fn_from_string("void set_clock_source(const std::string& source)"), fn_from_string("std::string get_clock_source()"), fn_from_string("std::vector get_clock_sources()"), Function("void", "set_sync_source", [("const std::map&", "source")]), fn_from_string("std::map get_sync_source()"), fn_from_string("std::vector> get_sync_sources()"), fn_from_string("void set_clock_source_out(bool enb)"), fn_from_string("void set_trigger_io(const std::string& direction)"), fn_from_string("std::map get_mb_eeprom()"), fn_from_string("std::vector get_gpio_src(const std::string& bank)"), fn_from_string("void set_gpio_src(const std::string& bank, const std::vector& src)"), ]), ] COMMON_TMPL = """<% import time %>\ /*********************************************************************** * This file was generated by ${file} on ${time.strftime("%c")} **********************************************************************/ // // Copyright 2021 Ettus Research, a National Instruments Brand // // SPDX-License-Identifier: GPL-3.0-or-later // #pragma once #include #include #include #include #include #include namespace uhd { namespace usrp { %for iface in ifaces: class ${iface.basename}_iface { public: using sptr = std::shared_ptr<${iface.basename}_iface>; %for function in iface.functions: virtual ${function.return_type} ${function.name}(${",".join(function.args)}) = 0; %endfor // Deprecated virtual uhd::rpc_client::sptr get_raw_rpc_client() = 0; }; class ${iface.basename} : public ${iface.basename}_iface { public: %if iface.has_rpcprefix: ${iface.basename}(uhd::rpc_client::sptr rpc, const std::string& rpc_prefix) : _rpcc(rpc), _rpc_prefix(rpc_prefix) {} %else: ${iface.basename}(uhd::rpc_client::sptr rpc) : _rpcc(rpc) {} %endif %for function in iface.functions: ${function.return_type} ${function.name}(${",".join(function.args)}) override { %if function.does_return: return _rpcc->request_with_token<${function.return_type}>(${",".join([function.rpcname] + function.arg_names)}); %else: _rpcc->notify_with_token(${",".join([function.rpcname] + function.arg_names)}); %endif } %endfor // Deprecated uhd::rpc_client::sptr get_raw_rpc_client() { return _rpcc; } private: uhd::rpc_client::sptr _rpcc; %if iface.has_rpcprefix: const std::string _rpc_prefix; %endif }; %endfor }} """ def parse_tmpl(_tmpl_text, **kwargs): return Template(_tmpl_text).render(**kwargs) if __name__ == '__main__': out_file = sys.argv[1] code = parse_tmpl(COMMON_TMPL, ifaces=IFACES, file=__file__, ) with open(out_file, 'w') as f: f.write(code)