diff options
author | Martin Braun <martin.braun@ettus.com> | 2018-01-09 17:49:34 -0800 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2018-01-10 17:29:49 -0800 |
commit | 0691c5ea752139e44605004a3c2d8a5fe493934d (patch) | |
tree | b801b2684dac58496ef5f39565ee3983a0700b28 | |
parent | c5acc08e671f1797f3130657eec48f48ffa51bf7 (diff) | |
download | uhd-0691c5ea752139e44605004a3c2d8a5fe493934d.tar.gz uhd-0691c5ea752139e44605004a3c2d8a5fe493934d.tar.bz2 uhd-0691c5ea752139e44605004a3c2d8a5fe493934d.zip |
mpm: Spawn periph manager inside the RPC process
Reviewed-by: Brent Stapleton <brent.stapleton@ettus.com>
-rwxr-xr-x | mpm/python/usrp_hwd.py | 87 | ||||
-rw-r--r-- | mpm/python/usrp_mpm/discovery.py | 27 | ||||
-rw-r--r-- | mpm/python/usrp_mpm/mpmtypes.py | 20 | ||||
-rw-r--r-- | mpm/python/usrp_mpm/rpc_server.py | 38 |
4 files changed, 96 insertions, 76 deletions
diff --git a/mpm/python/usrp_hwd.py b/mpm/python/usrp_hwd.py index 82b1ecd93..4036a1084 100755 --- a/mpm/python/usrp_hwd.py +++ b/mpm/python/usrp_hwd.py @@ -15,11 +15,9 @@ from gevent import signal from gevent.hub import BlockingSwitchOutError import usrp_mpm as mpm from usrp_mpm.mpmtypes import SharedState -from usrp_mpm.periph_manager import periph_manager _PROCESSES = [] - def setup_arg_parser(): """ Create an arg parser @@ -107,58 +105,69 @@ def kill_time(sig, frame): log.info("System exiting") sys.exit(0) - -def main(): +def init_only(log, args): """ - Go, go, go! - - Main process loop. + Run the full initialization immediately and return """ - args = parse_args() - log = mpm.get_main_logger( - log_default_delta=args.verbose-args.quiet - ).getChild('main') - shared = SharedState() # Create the periph_manager for this device # This call will be forwarded to the device specific implementation # e.g. in periph_manager/n310.py - # Which implementation is called will be determined during configuration - # with cmake (-DMPM_DEVICE). - # mgr is thus derived from PeriphManagerBase (see periph_manager/base.py) + # Which implementation is called will be determined during + # configuration with cmake (-DMPM_DEVICE). + # mgr is thus derived from PeriphManagerBase + # (see periph_manager/base.py) + from usrp_mpm.periph_manager import periph_manager log.info("Spawning periph manager...") - mgr_generator = lambda: periph_manager(args) - mgr = mgr_generator() - discovery_info = { - "type": mgr.get_device_info().get("type", "n/a"), - "serial": mgr.get_device_info().get("serial", "n/a"), - "product": mgr.get_device_info().get("product", "n/a") - } - if args.init_only: - init_time_start = time.time() - init_result = mgr.init(args.default_args) - init_duration = time.time() - init_time_start - if init_result: - log.info("Initialization successful! Duration: {:.02f} s" - .format(init_duration)) - else: - log.warning("Initialization failed! Duration: {:.02f} s" - .format(init_duration)) - log.info("Terminating on user request before launching RPC server.") - mgr.deinit() - return init_result + ctor_time_start = time.time() + mgr = periph_manager(args) + ctor_duration = time.time() - ctor_time_start + log.info("Ctor Duration: {:.02f} s".format(ctor_duration)) + init_time_start = time.time() + init_result = mgr.init(args.default_args) + init_duration = time.time() - init_time_start + if init_result: + log.info("Initialization successful! Duration: {:.02f} s" + .format(init_duration)) + else: + log.warning("Initialization failed! Duration: {:.02f} s" + .format(init_duration)) + log.info("Terminating on user request before launching RPC server.") + mgr.deinit() + return init_result + +def spawn_processes(log, args): + """ + Launch the subprocesses and hang until completion. + """ + shared = SharedState() + log.info("Spawning RPC process...") + _PROCESSES.append( + mpm.spawn_rpc_process(mpm.mpmtypes.MPM_RPC_PORT, shared, args)) log.info("Spawning discovery process...") _PROCESSES.append( - mpm.spawn_discovery_process(discovery_info, shared, args.discovery_addr) + mpm.spawn_discovery_process(shared, args.discovery_addr) ) - log.info("Spawning RPC process...") - _PROCESSES.append( - mpm.spawn_rpc_process(mpm.mpmtypes.MPM_RPC_PORT, shared, mgr, mgr_generator)) log.info("Processes launched. Registering signal handlers.") signal.signal(signal.SIGTERM, kill_time) signal.signal(signal.SIGINT, kill_time) signal.pause() return True + +def main(): + """ + Go, go, go! + + Main process loop. + """ + args = parse_args() + log = mpm.get_main_logger( + log_default_delta=args.verbose-args.quiet + ).getChild('main') + if args.init_only: + return init_only(log, args) + return spawn_processes(log, args) + if __name__ == '__main__': exit(not main()) diff --git a/mpm/python/usrp_mpm/discovery.py b/mpm/python/usrp_mpm/discovery.py index 7a133af15..8c58fa682 100644 --- a/mpm/python/usrp_mpm/discovery.py +++ b/mpm/python/usrp_mpm/discovery.py @@ -11,13 +11,13 @@ from __future__ import print_function from multiprocessing import Process import socket from builtins import bytes -from six import iteritems from usrp_mpm.mpmtypes import MPM_DISCOVERY_PORT from usrp_mpm.mpmlog import get_main_logger +from usrp_mpm.mpmutils import to_binary_str -RESPONSE_PREAMBLE = "USRP-MPM" -RESPONSE_SEP = ";" -RESPONSE_CLAIMED_KEY = "claimed" +RESPONSE_PREAMBLE = b"USRP-MPM" +RESPONSE_SEP = b";" +RESPONSE_CLAIMED_KEY = b"claimed" # "Max MTU" is not a redundant name. We don't know the total path MTU, but we # can say for sure that it won't exceed a certain value, and that's the max MTU MAX_MTU = 8000 @@ -25,7 +25,7 @@ MAX_MTU = 8000 IP_MTU_DISCOVER = 10 IP_PMTUDISC_DO = 2 -def spawn_discovery_process(device_info, shared_state, discovery_addr): +def spawn_discovery_process(shared_state, discovery_addr): """ Returns a process that contains the device discovery. @@ -38,25 +38,27 @@ def spawn_discovery_process(device_info, shared_state, discovery_addr): """ proc = Process( target=_discovery_process, - args=(device_info, shared_state, discovery_addr) + args=(shared_state, discovery_addr) ) proc.start() return proc -def _discovery_process(device_info, state, discovery_addr): +def _discovery_process(state, discovery_addr): """ The actual process for device discovery. Is spawned by spawn_discovery_process(). """ - def create_response_string(): + log = get_main_logger().getChild('discovery') + def create_response_string(state): " Generate the string that gets sent back to the requester. " return RESPONSE_SEP.join( [RESPONSE_PREAMBLE] + \ - ["{k}={v}".format(k=k, v=v) for k, v in iteritems(device_info)] + \ - ["{k}={v}".format(k=RESPONSE_CLAIMED_KEY, v=state.claim_status.value)] + [b"type="+state.dev_type.value] + \ + [b"product="+state.dev_product.value] + \ + [b"serial="+state.dev_serial.value] + \ + [RESPONSE_CLAIMED_KEY+to_binary_str("={}".format(state.claim_status.value))] ) - log = get_main_logger().getChild('discovery') sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # FIXME really, we should only bind to the subnet but I haven't gotten that @@ -80,7 +82,8 @@ def _discovery_process(device_info, state, discovery_addr): if data.strip(b"\0") == b"MPM-DISC": log.info("Sending discovery response to %s port: %d", sender[0], sender[1]) - send_data = bytes(create_response_string(), 'ascii') + resp_str = create_response_string(state) + send_data = resp_str log.info(send_data) send_sock.sendto(send_data, sender) elif data.strip(b"\0").startswith(b"MPM-ECHO"): diff --git a/mpm/python/usrp_mpm/mpmtypes.py b/mpm/python/usrp_mpm/mpmtypes.py index 878149faf..07e058a62 100644 --- a/mpm/python/usrp_mpm/mpmtypes.py +++ b/mpm/python/usrp_mpm/mpmtypes.py @@ -15,25 +15,20 @@ from builtins import object MPM_RPC_PORT = 49601 MPM_DISCOVERY_PORT = 49600 - MPM_DISCOVERY_MESSAGE = "MPM-DISC" - class SharedState(object): """ - Holds information which should be shared between processes - Usage should be kept to a minimum + Holds information which should be shared between processes. """ - def __init__(self): self.lock = RLock() - self.claim_status = Value( - ctypes.c_bool, - False, lock=self.lock) # lock - self.claim_token = Array( - ctypes.c_char, 256, - lock=self.lock) # String with max length of 256 - + self.claim_status = Value(ctypes.c_bool, False, lock=self.lock) + # String with max length of 256 + self.claim_token = Array(ctypes.c_char, 256, lock=self.lock) + self.dev_type = Array(ctypes.c_char, 16, lock=self.lock) + self.dev_serial = Array(ctypes.c_char, 8, lock=self.lock) + self.dev_product = Array(ctypes.c_char, 16, lock=self.lock) class SID(object): """ @@ -56,7 +51,6 @@ class SID(object): self.dst_addr, self.dst_ep = \ [int(x, 10) for x in dst.split('.', 2)] else: - print(sid) self.src_addr = sid >> 24 self.src_ep = (sid >> 16) & 0xFF self.dst_addr = (sid >> 8) & 0xFF diff --git a/mpm/python/usrp_mpm/rpc_server.py b/mpm/python/usrp_mpm/rpc_server.py index 076103aa9..2310dd814 100644 --- a/mpm/python/usrp_mpm/rpc_server.py +++ b/mpm/python/usrp_mpm/rpc_server.py @@ -49,24 +49,37 @@ class MPMServer(RPCServer): # Compatibility number for MPM MPM_COMPAT_NUM = (1, 1) - def __init__(self, state, mgr, mgr_generator=None, *args, **kwargs): + def __init__(self, state, default_args): self.log = get_main_logger().getChild('RPCServer') self._state = state self._timer = Greenlet() self.session_id = None - self.periph_manager = mgr - self._mgr_generator = mgr_generator + # Create the periph_manager for this device + # This call will be forwarded to the device specific implementation + # e.g. in periph_manager/n310.py + # Which implementation is called will be determined during + # configuration with cmake (-DMPM_DEVICE). + # mgr is thus derived from PeriphManagerBase + # (see periph_manager/base.py) + from usrp_mpm.periph_manager import periph_manager + self._mgr_generator = lambda: periph_manager(default_args) + self.periph_manager = self._mgr_generator() + device_info = self.periph_manager.get_device_info() + self._state.dev_type.value = \ + to_binary_str(device_info.get("type", "n/a")) + self._state.dev_product.value = \ + to_binary_str(device_info.get("product", "n/a")) + self._state.dev_serial.value = \ + to_binary_str(device_info.get("serial", "n/a")) self._db_methods = [] self._mb_methods = [] self.claimed_methods = copy.copy(self.default_claimed_methods) self._last_error = "" - self._init_rpc_calls(mgr) + self._init_rpc_calls(self.periph_manager) # We call the server __init__ function here, and not earlier, because # first the commands need to be registered super(MPMServer, self).__init__( - *args, pack_params={'use_bin_type': True}, - **kwargs ) def _init_rpc_calls(self, mgr): @@ -434,14 +447,14 @@ class MPMServer(RPCServer): ] -def _rpc_server_process(shared_state, port, mgr, mgr_generator): +def _rpc_server_process(shared_state, port, default_args): """ This is the actual process that's running the RPC server. """ connections = Pool(1000) server = StreamServer( ('0.0.0.0', port), - handle=MPMServer(shared_state, mgr, mgr_generator), + handle=MPMServer(shared_state, default_args), spawn=connections) # catch signals and stop the stream server signal(signal.SIGTERM, lambda *args: server.stop()) @@ -449,12 +462,13 @@ def _rpc_server_process(shared_state, port, mgr, mgr_generator): server.serve_forever() -def spawn_rpc_process(state, udp_port, mgr, mgr_generator): +def spawn_rpc_process(state, udp_port, default_args): """ Returns a process that contains the RPC server """ - - proc_args = [udp_port, state, mgr, mgr_generator] - proc = Process(target=_rpc_server_process, args=proc_args) + proc = Process( + target=_rpc_server_process, + args=[udp_port, state, default_args], + ) proc.start() return proc |