aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/tools
diff options
context:
space:
mode:
Diffstat (limited to 'mpm/tools')
-rwxr-xr-xmpm/tools/rpc_shell.py96
1 files changed, 96 insertions, 0 deletions
diff --git a/mpm/tools/rpc_shell.py b/mpm/tools/rpc_shell.py
new file mode 100755
index 000000000..ef3c73614
--- /dev/null
+++ b/mpm/tools/rpc_shell.py
@@ -0,0 +1,96 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 Ettus Research (National Instruments)
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+"""
+RPC shell to debug USRP MPM capable devices
+"""
+
+import sys
+import cmd
+import types
+from functools import partial
+from mprpc import RPCClient
+from mprpc.exceptions import RPCError
+from usrp_mpm import types
+
+def rpc_template(obj, command, args):
+ try:
+ if args:
+ response = obj.client.call(command, args)
+ else:
+ response = obj.client.call(command)
+ except RPCError as e:
+ print("RPC Command failed!")
+ print("Error: {}".format(e))
+ return
+ if isinstance(response, bool):
+ if response:
+ print("Commend executed successfully!")
+ return
+ print("Command failed!")
+ return
+ print(response)
+
+class RPCShell(cmd.Cmd):
+ prompt="MPM> "
+ client = None
+ remote_methods = []
+
+ def do_connect(self, host, port=types.MPM_RPC_PORT):
+ try:
+ self.client = RPCClient(host, port)
+ except:
+ print("Connection refused")
+ return
+ methods = self.client.call('list_methods')
+ for method in methods:
+ self.add_command(*method)
+
+ def do_disconnect(self, args):
+ if self.client:
+ try:
+ self.client.close()
+ except RPCError as e:
+ print("Error while closing the connection")
+ print("Error: {}".format(e))
+ for method in self.remote_methods:
+ delattr(self, "do_"+method)
+ self.remote_methods = []
+ self.client = None
+
+
+ def add_command(self, command, docs):
+ new_command = partial(rpc_template, self, str(command))
+ new_command.__doc__ = docs
+ setattr(self, "do_"+command, new_command)
+ self.remote_methods.append(command)
+
+ def run(self):
+ try:
+ self.cmdloop()
+ except KeyboardInterrupt:
+ self.do_disconnect(None)
+ exit(0)
+
+ def get_names(self):
+ return dir(self)
+
+
+if __name__ == "__main__":
+ my_shell = RPCShell()
+ exit(not(my_shell.run()))
+