aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm/rpc_server.py
diff options
context:
space:
mode:
Diffstat (limited to 'mpm/python/usrp_mpm/rpc_server.py')
-rw-r--r--mpm/python/usrp_mpm/rpc_server.py202
1 files changed, 161 insertions, 41 deletions
diff --git a/mpm/python/usrp_mpm/rpc_server.py b/mpm/python/usrp_mpm/rpc_server.py
index ddb588aa6..f712c5c87 100644
--- a/mpm/python/usrp_mpm/rpc_server.py
+++ b/mpm/python/usrp_mpm/rpc_server.py
@@ -18,84 +18,204 @@
Implemented RPC Servers
"""
from __future__ import print_function
+from logging import getLogger
from gevent.server import StreamServer
-from types import graceful_exit, MPM_RPC_PORT
+from gevent.pool import Pool
+from gevent import signal
+from gevent import spawn_later
+from gevent import Greenlet
+from gevent import monkey
+monkey.patch_all()
from mprpc import RPCServer
+from random import choice
from six import iteritems
-import time
-
+from string import ascii_letters, digits
+from threading import Timer
from multiprocessing import Process
+
+LOG = getLogger(__name__)
+
+
class MPMServer(RPCServer):
- _db_methods = {}
- def __init__(self, state, mgr):
+ """
+ Main MPM RPC class which holds the periph_manager object and translates
+ RPC calls to appropiate calls in the periph_manager and dboard_managers.
+
+ Claiming and unclaiming is implemented in python only
+ """
+ _db_methods = []
+ _mb_methods = []
+
+ def __init__(self, state, mgr, *args, **kwargs):
self._state = state
- # Instead do self.mboard = periphs.init_periph_manager(args...)
+ self._timer = Greenlet()
self.periph_manager = mgr
- for db_slot, db in iteritems(mgr.dboards):
- methods = (m for m in dir(db) if not m.startswith('_') and callable(getattr(db, m)))
- for method in methods:
- command_name = 'db_'+ db_slot + '_' + method
- self._add_command(getattr(db,method), command_name)
- db_methods = self._db_methods.get(db_slot, [])
- db_methods.append(command_name)
- self._db_methods.update({db_slot: db_methods})
-
- # When we do init we can just add dboard/periph_manager methods with setattr(self, method)
- # Maybe using partial
- # To remove methods again we also have to remove them from self._methods dict (they're cached)
- super(MPMServer, self).__init__()
+ # add public mboard methods without namespace
+ self._update_component_commands(mgr, '', '_mb_methods')
+ # add public dboard methods in `db_<slot>_` namespace
+ for db_slot, dboard in iteritems(mgr.dboards):
+ self._update_component_commands(dboard, 'db_' + db_slot + '_', '_db_methods')
+ super(MPMServer, self).__init__(*args, **kwargs)
+
+ def _update_component_commands(self, component, namespace, storage):
+ """
+ Detect available methods for an object and add them to the RPC server
+ """
+ for method in (m for m in dir(component)
+ if not m.startswith('_') and callable(getattr(component, m))):
+ if method.startswith('safe_'):
+ command_name = namespace + method.lstrip('safe_')
+ self._add_safe_command(getattr(component, method), command_name)
+ else:
+ command_name = namespace + method
+ self._add_command(getattr(component, method), command_name)
+ getattr(self, storage).append(command_name)
+
def _add_command(self, function, command):
- setattr(self, command, function)
+ """
+ Adds a method with the name command to the RPC server
+ This command will require an acquired claim on the device
+ """
+ LOG.debug("adding command %s pointing to %s", command, function)
+ def new_function(token, *args):
+ if token[:256] != self._state.claim_token.value:
+ return False
+ return function(*args)
+ new_function.__doc__ = function.__doc__
+ setattr(self, command, new_function)
+
+ def _add_safe_command(self, function, command):
+ """
+ Add a safe method which does not require a claim on the
+ device
+ """
+ LOG.debug("adding safe command %s pointing to %s", command, function)
+ setattr(self, command, function)
def list_methods(self):
"""
- Returns all public methods of this RPC server
+ Returns a tuple of public methods and
+ corresponding docs of this RPC server
"""
- methods = filter(lambda entry: not entry.startswith('_'), dir(self)) # Return public methods
- methods_with_docs = map(lambda m: (m, getattr(self, m).__doc__), methods)
- return methods_with_docs
+ return [(met, getattr(self, met).__doc__)
+ for met in dir(self)
+ if not met.startswith('_') and callable(getattr(self, met))]
def ping(self, data=None):
"""
Take in data as argument and send it back
+ This is a safe method which can be called without a claim on the device
"""
+ LOG.debug("I was pinged from: %s:%s", self.client_host, self.client_port)
return data
- def claim(self, token):
+ def claim(self, sender_id):
+ """
+ claim `token` - tries to claim MPM device and provides a human readable sender_id
+ This is a safe method which can be called without a claim on the device
+ """
+ self._state.lock.acquire()
+ if self._state.claim_status.value:
+ return ""
+ LOG.debug("claiming from: %s", self.client_host)
+ self.periph_manager.claimed = True
+ self._state.claim_token.value = ''.join(choice(ascii_letters + digits) for _ in range(256))
+ self._state.claim_status.value = True
+ self._state.lock.release()
+ self.sender_id = sender_id
+ self._reset_timer()
+ LOG.debug("giving token: %s to host: %s", self._state.claim_token.value, self.client_host)
+ return self._state.claim_token.value
+
+ def reclaim(self, token):
"""
- claim `token` - claims the MPM device with given token
+ reclaim a MPM device with a token. This operation will fail
+ if the device is claimed and the token doesn't match.
+ Or if the device is not claimed at all.
"""
+ self._state.lock.acquire()
if self._state.claim_status.value:
- if self._state.claim_token.value == token:
+ if self._state.claim_token.value == token[:256]:
+ self._state.lock.release()
+ LOG.debug("reclaimed from: %s", self.client_host)
+ self._reset_timer()
return True
+ self._state.lock.release()
+ LOG.debug("reclaim failed from: %s", self.client_host)
return False
- self._state.claim_status.value = True
- self._state.claim_token.value = token
- return True
+ LOG.debug("trying to reclaim unclaimed device from: %s", self.client_host)
+ return False
+
+
+
+
+ def _unclaim(self):
+ """
+ unconditional unclaim - for internal use
+ """
+ LOG.debug("releasing claim")
+ self._state.claim_status.value = False
+ self._state.claim_token.value = ""
+ self.sender_id = None
+ self.periph_manager.claimed = False
+ self._timer.kill()
+
+ def _reset_timer(self):
+ """
+ reset unclaim timer
+ """
+ self._timer.kill()
+ self._timer = spawn_later(2.0, self._unclaim)
def unclaim(self, token):
"""
unclaim `token` - unclaims the MPM device if it is claimed with this token
"""
if self._state.claim_status.value and self._state.claim_token.value == token:
- self._state.claim_status.value = False
- self._state.claim_token.value = ""
+ self._unclaim()
return True
return False
+ def get_device_info(self):
+ """
+ get device information
+ This is as safe method which can be called without a claim on the device
+ """
+ info = self.periph_manager._get_device_info()
+ if self.host in ["127.0.0.1", "::1"]:
+ info["connection"] = "local"
+ else:
+ info["connection"] = "remote"
+ return info
+
+ def probe_interface(self, token):
+ """
+ Forwards the call to periph_manager._probe_interface with the client ip addresss
+ as argument. Should be used to probe the data interfaces on the device
+ """
+ if token[:256] != self._state.claim_token.value:
+ return False
+ return self.periph_manager._probe_interface(self.host)
+
+
+
def _rpc_server_process(shared_state, port, mgr):
"""
Start the RPC server
"""
- server = StreamServer(('0.0.0.0', port), handle=MPMServer(shared_state, mgr))
- try:
- server.serve_forever()
- except:
- server.close()
+ connections = Pool(1000)
+ server = StreamServer(
+ ('0.0.0.0', port),
+ handle=MPMServer(shared_state, mgr),
+ spawn=connections)
+ # catch signals and stop the stream server
+ signal(signal.SIGTERM, lambda *args: server.stop())
+ signal(signal.SIGINT, lambda *args: server.stop())
+ server.serve_forever()
def spawn_rpc_process(state, udp_port, mgr):
@@ -103,7 +223,7 @@ def spawn_rpc_process(state, udp_port, mgr):
Returns a process that contains the RPC server
"""
- p_args = [udp_port, state, mgr]
- p = Process(target=_rpc_server_process, args=p_args)
- p.start()
- return p
+ proc_args = [udp_port, state, mgr]
+ proc = Process(target=_rpc_server_process, args=proc_args)
+ proc.start()
+ return proc