diff options
Diffstat (limited to 'mpm/python/usrp_mpm/rpc_server.py')
-rw-r--r-- | mpm/python/usrp_mpm/rpc_server.py | 202 |
1 files changed, 161 insertions, 41 deletions
diff --git a/mpm/python/usrp_mpm/rpc_server.py b/mpm/python/usrp_mpm/rpc_server.py index ddb588aa6..f712c5c87 100644 --- a/mpm/python/usrp_mpm/rpc_server.py +++ b/mpm/python/usrp_mpm/rpc_server.py @@ -18,84 +18,204 @@ Implemented RPC Servers """ from __future__ import print_function +from logging import getLogger from gevent.server import StreamServer -from types import graceful_exit, MPM_RPC_PORT +from gevent.pool import Pool +from gevent import signal +from gevent import spawn_later +from gevent import Greenlet +from gevent import monkey +monkey.patch_all() from mprpc import RPCServer +from random import choice from six import iteritems -import time - +from string import ascii_letters, digits +from threading import Timer from multiprocessing import Process + +LOG = getLogger(__name__) + + class MPMServer(RPCServer): - _db_methods = {} - def __init__(self, state, mgr): + """ + Main MPM RPC class which holds the periph_manager object and translates + RPC calls to appropiate calls in the periph_manager and dboard_managers. + + Claiming and unclaiming is implemented in python only + """ + _db_methods = [] + _mb_methods = [] + + def __init__(self, state, mgr, *args, **kwargs): self._state = state - # Instead do self.mboard = periphs.init_periph_manager(args...) + self._timer = Greenlet() self.periph_manager = mgr - for db_slot, db in iteritems(mgr.dboards): - methods = (m for m in dir(db) if not m.startswith('_') and callable(getattr(db, m))) - for method in methods: - command_name = 'db_'+ db_slot + '_' + method - self._add_command(getattr(db,method), command_name) - db_methods = self._db_methods.get(db_slot, []) - db_methods.append(command_name) - self._db_methods.update({db_slot: db_methods}) - - # When we do init we can just add dboard/periph_manager methods with setattr(self, method) - # Maybe using partial - # To remove methods again we also have to remove them from self._methods dict (they're cached) - super(MPMServer, self).__init__() + # add public mboard methods without namespace + self._update_component_commands(mgr, '', '_mb_methods') + # add public dboard methods in `db_<slot>_` namespace + for db_slot, dboard in iteritems(mgr.dboards): + self._update_component_commands(dboard, 'db_' + db_slot + '_', '_db_methods') + super(MPMServer, self).__init__(*args, **kwargs) + + def _update_component_commands(self, component, namespace, storage): + """ + Detect available methods for an object and add them to the RPC server + """ + for method in (m for m in dir(component) + if not m.startswith('_') and callable(getattr(component, m))): + if method.startswith('safe_'): + command_name = namespace + method.lstrip('safe_') + self._add_safe_command(getattr(component, method), command_name) + else: + command_name = namespace + method + self._add_command(getattr(component, method), command_name) + getattr(self, storage).append(command_name) + def _add_command(self, function, command): - setattr(self, command, function) + """ + Adds a method with the name command to the RPC server + This command will require an acquired claim on the device + """ + LOG.debug("adding command %s pointing to %s", command, function) + def new_function(token, *args): + if token[:256] != self._state.claim_token.value: + return False + return function(*args) + new_function.__doc__ = function.__doc__ + setattr(self, command, new_function) + + def _add_safe_command(self, function, command): + """ + Add a safe method which does not require a claim on the + device + """ + LOG.debug("adding safe command %s pointing to %s", command, function) + setattr(self, command, function) def list_methods(self): """ - Returns all public methods of this RPC server + Returns a tuple of public methods and + corresponding docs of this RPC server """ - methods = filter(lambda entry: not entry.startswith('_'), dir(self)) # Return public methods - methods_with_docs = map(lambda m: (m, getattr(self, m).__doc__), methods) - return methods_with_docs + return [(met, getattr(self, met).__doc__) + for met in dir(self) + if not met.startswith('_') and callable(getattr(self, met))] def ping(self, data=None): """ Take in data as argument and send it back + This is a safe method which can be called without a claim on the device """ + LOG.debug("I was pinged from: %s:%s", self.client_host, self.client_port) return data - def claim(self, token): + def claim(self, sender_id): + """ + claim `token` - tries to claim MPM device and provides a human readable sender_id + This is a safe method which can be called without a claim on the device + """ + self._state.lock.acquire() + if self._state.claim_status.value: + return "" + LOG.debug("claiming from: %s", self.client_host) + self.periph_manager.claimed = True + self._state.claim_token.value = ''.join(choice(ascii_letters + digits) for _ in range(256)) + self._state.claim_status.value = True + self._state.lock.release() + self.sender_id = sender_id + self._reset_timer() + LOG.debug("giving token: %s to host: %s", self._state.claim_token.value, self.client_host) + return self._state.claim_token.value + + def reclaim(self, token): """ - claim `token` - claims the MPM device with given token + reclaim a MPM device with a token. This operation will fail + if the device is claimed and the token doesn't match. + Or if the device is not claimed at all. """ + self._state.lock.acquire() if self._state.claim_status.value: - if self._state.claim_token.value == token: + if self._state.claim_token.value == token[:256]: + self._state.lock.release() + LOG.debug("reclaimed from: %s", self.client_host) + self._reset_timer() return True + self._state.lock.release() + LOG.debug("reclaim failed from: %s", self.client_host) return False - self._state.claim_status.value = True - self._state.claim_token.value = token - return True + LOG.debug("trying to reclaim unclaimed device from: %s", self.client_host) + return False + + + + + def _unclaim(self): + """ + unconditional unclaim - for internal use + """ + LOG.debug("releasing claim") + self._state.claim_status.value = False + self._state.claim_token.value = "" + self.sender_id = None + self.periph_manager.claimed = False + self._timer.kill() + + def _reset_timer(self): + """ + reset unclaim timer + """ + self._timer.kill() + self._timer = spawn_later(2.0, self._unclaim) def unclaim(self, token): """ unclaim `token` - unclaims the MPM device if it is claimed with this token """ if self._state.claim_status.value and self._state.claim_token.value == token: - self._state.claim_status.value = False - self._state.claim_token.value = "" + self._unclaim() return True return False + def get_device_info(self): + """ + get device information + This is as safe method which can be called without a claim on the device + """ + info = self.periph_manager._get_device_info() + if self.host in ["127.0.0.1", "::1"]: + info["connection"] = "local" + else: + info["connection"] = "remote" + return info + + def probe_interface(self, token): + """ + Forwards the call to periph_manager._probe_interface with the client ip addresss + as argument. Should be used to probe the data interfaces on the device + """ + if token[:256] != self._state.claim_token.value: + return False + return self.periph_manager._probe_interface(self.host) + + + def _rpc_server_process(shared_state, port, mgr): """ Start the RPC server """ - server = StreamServer(('0.0.0.0', port), handle=MPMServer(shared_state, mgr)) - try: - server.serve_forever() - except: - server.close() + connections = Pool(1000) + server = StreamServer( + ('0.0.0.0', port), + handle=MPMServer(shared_state, mgr), + spawn=connections) + # catch signals and stop the stream server + signal(signal.SIGTERM, lambda *args: server.stop()) + signal(signal.SIGINT, lambda *args: server.stop()) + server.serve_forever() def spawn_rpc_process(state, udp_port, mgr): @@ -103,7 +223,7 @@ def spawn_rpc_process(state, udp_port, mgr): Returns a process that contains the RPC server """ - p_args = [udp_port, state, mgr] - p = Process(target=_rpc_server_process, args=p_args) - p.start() - return p + proc_args = [udp_port, state, mgr] + proc = Process(target=_rpc_server_process, args=proc_args) + proc.start() + return proc |