aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@ettus.com>2018-01-09 17:49:34 -0800
committerMartin Braun <martin.braun@ettus.com>2018-01-10 17:29:49 -0800
commit0691c5ea752139e44605004a3c2d8a5fe493934d (patch)
treeb801b2684dac58496ef5f39565ee3983a0700b28
parentc5acc08e671f1797f3130657eec48f48ffa51bf7 (diff)
downloaduhd-0691c5ea752139e44605004a3c2d8a5fe493934d.tar.gz
uhd-0691c5ea752139e44605004a3c2d8a5fe493934d.tar.bz2
uhd-0691c5ea752139e44605004a3c2d8a5fe493934d.zip
mpm: Spawn periph manager inside the RPC process
Reviewed-by: Brent Stapleton <brent.stapleton@ettus.com>
-rwxr-xr-xmpm/python/usrp_hwd.py87
-rw-r--r--mpm/python/usrp_mpm/discovery.py27
-rw-r--r--mpm/python/usrp_mpm/mpmtypes.py20
-rw-r--r--mpm/python/usrp_mpm/rpc_server.py38
4 files changed, 96 insertions, 76 deletions
diff --git a/mpm/python/usrp_hwd.py b/mpm/python/usrp_hwd.py
index 82b1ecd93..4036a1084 100755
--- a/mpm/python/usrp_hwd.py
+++ b/mpm/python/usrp_hwd.py
@@ -15,11 +15,9 @@ from gevent import signal
from gevent.hub import BlockingSwitchOutError
import usrp_mpm as mpm
from usrp_mpm.mpmtypes import SharedState
-from usrp_mpm.periph_manager import periph_manager
_PROCESSES = []
-
def setup_arg_parser():
"""
Create an arg parser
@@ -107,58 +105,69 @@ def kill_time(sig, frame):
log.info("System exiting")
sys.exit(0)
-
-def main():
+def init_only(log, args):
"""
- Go, go, go!
-
- Main process loop.
+ Run the full initialization immediately and return
"""
- args = parse_args()
- log = mpm.get_main_logger(
- log_default_delta=args.verbose-args.quiet
- ).getChild('main')
- shared = SharedState()
# Create the periph_manager for this device
# This call will be forwarded to the device specific implementation
# e.g. in periph_manager/n310.py
- # Which implementation is called will be determined during configuration
- # with cmake (-DMPM_DEVICE).
- # mgr is thus derived from PeriphManagerBase (see periph_manager/base.py)
+ # Which implementation is called will be determined during
+ # configuration with cmake (-DMPM_DEVICE).
+ # mgr is thus derived from PeriphManagerBase
+ # (see periph_manager/base.py)
+ from usrp_mpm.periph_manager import periph_manager
log.info("Spawning periph manager...")
- mgr_generator = lambda: periph_manager(args)
- mgr = mgr_generator()
- discovery_info = {
- "type": mgr.get_device_info().get("type", "n/a"),
- "serial": mgr.get_device_info().get("serial", "n/a"),
- "product": mgr.get_device_info().get("product", "n/a")
- }
- if args.init_only:
- init_time_start = time.time()
- init_result = mgr.init(args.default_args)
- init_duration = time.time() - init_time_start
- if init_result:
- log.info("Initialization successful! Duration: {:.02f} s"
- .format(init_duration))
- else:
- log.warning("Initialization failed! Duration: {:.02f} s"
- .format(init_duration))
- log.info("Terminating on user request before launching RPC server.")
- mgr.deinit()
- return init_result
+ ctor_time_start = time.time()
+ mgr = periph_manager(args)
+ ctor_duration = time.time() - ctor_time_start
+ log.info("Ctor Duration: {:.02f} s".format(ctor_duration))
+ init_time_start = time.time()
+ init_result = mgr.init(args.default_args)
+ init_duration = time.time() - init_time_start
+ if init_result:
+ log.info("Initialization successful! Duration: {:.02f} s"
+ .format(init_duration))
+ else:
+ log.warning("Initialization failed! Duration: {:.02f} s"
+ .format(init_duration))
+ log.info("Terminating on user request before launching RPC server.")
+ mgr.deinit()
+ return init_result
+
+def spawn_processes(log, args):
+ """
+ Launch the subprocesses and hang until completion.
+ """
+ shared = SharedState()
+ log.info("Spawning RPC process...")
+ _PROCESSES.append(
+ mpm.spawn_rpc_process(mpm.mpmtypes.MPM_RPC_PORT, shared, args))
log.info("Spawning discovery process...")
_PROCESSES.append(
- mpm.spawn_discovery_process(discovery_info, shared, args.discovery_addr)
+ mpm.spawn_discovery_process(shared, args.discovery_addr)
)
- log.info("Spawning RPC process...")
- _PROCESSES.append(
- mpm.spawn_rpc_process(mpm.mpmtypes.MPM_RPC_PORT, shared, mgr, mgr_generator))
log.info("Processes launched. Registering signal handlers.")
signal.signal(signal.SIGTERM, kill_time)
signal.signal(signal.SIGINT, kill_time)
signal.pause()
return True
+
+def main():
+ """
+ Go, go, go!
+
+ Main process loop.
+ """
+ args = parse_args()
+ log = mpm.get_main_logger(
+ log_default_delta=args.verbose-args.quiet
+ ).getChild('main')
+ if args.init_only:
+ return init_only(log, args)
+ return spawn_processes(log, args)
+
if __name__ == '__main__':
exit(not main())
diff --git a/mpm/python/usrp_mpm/discovery.py b/mpm/python/usrp_mpm/discovery.py
index 7a133af15..8c58fa682 100644
--- a/mpm/python/usrp_mpm/discovery.py
+++ b/mpm/python/usrp_mpm/discovery.py
@@ -11,13 +11,13 @@ from __future__ import print_function
from multiprocessing import Process
import socket
from builtins import bytes
-from six import iteritems
from usrp_mpm.mpmtypes import MPM_DISCOVERY_PORT
from usrp_mpm.mpmlog import get_main_logger
+from usrp_mpm.mpmutils import to_binary_str
-RESPONSE_PREAMBLE = "USRP-MPM"
-RESPONSE_SEP = ";"
-RESPONSE_CLAIMED_KEY = "claimed"
+RESPONSE_PREAMBLE = b"USRP-MPM"
+RESPONSE_SEP = b";"
+RESPONSE_CLAIMED_KEY = b"claimed"
# "Max MTU" is not a redundant name. We don't know the total path MTU, but we
# can say for sure that it won't exceed a certain value, and that's the max MTU
MAX_MTU = 8000
@@ -25,7 +25,7 @@ MAX_MTU = 8000
IP_MTU_DISCOVER = 10
IP_PMTUDISC_DO = 2
-def spawn_discovery_process(device_info, shared_state, discovery_addr):
+def spawn_discovery_process(shared_state, discovery_addr):
"""
Returns a process that contains the device discovery.
@@ -38,25 +38,27 @@ def spawn_discovery_process(device_info, shared_state, discovery_addr):
"""
proc = Process(
target=_discovery_process,
- args=(device_info, shared_state, discovery_addr)
+ args=(shared_state, discovery_addr)
)
proc.start()
return proc
-def _discovery_process(device_info, state, discovery_addr):
+def _discovery_process(state, discovery_addr):
"""
The actual process for device discovery. Is spawned by
spawn_discovery_process().
"""
- def create_response_string():
+ log = get_main_logger().getChild('discovery')
+ def create_response_string(state):
" Generate the string that gets sent back to the requester. "
return RESPONSE_SEP.join(
[RESPONSE_PREAMBLE] + \
- ["{k}={v}".format(k=k, v=v) for k, v in iteritems(device_info)] + \
- ["{k}={v}".format(k=RESPONSE_CLAIMED_KEY, v=state.claim_status.value)]
+ [b"type="+state.dev_type.value] + \
+ [b"product="+state.dev_product.value] + \
+ [b"serial="+state.dev_serial.value] + \
+ [RESPONSE_CLAIMED_KEY+to_binary_str("={}".format(state.claim_status.value))]
)
- log = get_main_logger().getChild('discovery')
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# FIXME really, we should only bind to the subnet but I haven't gotten that
@@ -80,7 +82,8 @@ def _discovery_process(device_info, state, discovery_addr):
if data.strip(b"\0") == b"MPM-DISC":
log.info("Sending discovery response to %s port: %d",
sender[0], sender[1])
- send_data = bytes(create_response_string(), 'ascii')
+ resp_str = create_response_string(state)
+ send_data = resp_str
log.info(send_data)
send_sock.sendto(send_data, sender)
elif data.strip(b"\0").startswith(b"MPM-ECHO"):
diff --git a/mpm/python/usrp_mpm/mpmtypes.py b/mpm/python/usrp_mpm/mpmtypes.py
index 878149faf..07e058a62 100644
--- a/mpm/python/usrp_mpm/mpmtypes.py
+++ b/mpm/python/usrp_mpm/mpmtypes.py
@@ -15,25 +15,20 @@ from builtins import object
MPM_RPC_PORT = 49601
MPM_DISCOVERY_PORT = 49600
-
MPM_DISCOVERY_MESSAGE = "MPM-DISC"
-
class SharedState(object):
"""
- Holds information which should be shared between processes
- Usage should be kept to a minimum
+ Holds information which should be shared between processes.
"""
-
def __init__(self):
self.lock = RLock()
- self.claim_status = Value(
- ctypes.c_bool,
- False, lock=self.lock) # lock
- self.claim_token = Array(
- ctypes.c_char, 256,
- lock=self.lock) # String with max length of 256
-
+ self.claim_status = Value(ctypes.c_bool, False, lock=self.lock)
+ # String with max length of 256
+ self.claim_token = Array(ctypes.c_char, 256, lock=self.lock)
+ self.dev_type = Array(ctypes.c_char, 16, lock=self.lock)
+ self.dev_serial = Array(ctypes.c_char, 8, lock=self.lock)
+ self.dev_product = Array(ctypes.c_char, 16, lock=self.lock)
class SID(object):
"""
@@ -56,7 +51,6 @@ class SID(object):
self.dst_addr, self.dst_ep = \
[int(x, 10) for x in dst.split('.', 2)]
else:
- print(sid)
self.src_addr = sid >> 24
self.src_ep = (sid >> 16) & 0xFF
self.dst_addr = (sid >> 8) & 0xFF
diff --git a/mpm/python/usrp_mpm/rpc_server.py b/mpm/python/usrp_mpm/rpc_server.py
index 076103aa9..2310dd814 100644
--- a/mpm/python/usrp_mpm/rpc_server.py
+++ b/mpm/python/usrp_mpm/rpc_server.py
@@ -49,24 +49,37 @@ class MPMServer(RPCServer):
# Compatibility number for MPM
MPM_COMPAT_NUM = (1, 1)
- def __init__(self, state, mgr, mgr_generator=None, *args, **kwargs):
+ def __init__(self, state, default_args):
self.log = get_main_logger().getChild('RPCServer')
self._state = state
self._timer = Greenlet()
self.session_id = None
- self.periph_manager = mgr
- self._mgr_generator = mgr_generator
+ # Create the periph_manager for this device
+ # This call will be forwarded to the device specific implementation
+ # e.g. in periph_manager/n310.py
+ # Which implementation is called will be determined during
+ # configuration with cmake (-DMPM_DEVICE).
+ # mgr is thus derived from PeriphManagerBase
+ # (see periph_manager/base.py)
+ from usrp_mpm.periph_manager import periph_manager
+ self._mgr_generator = lambda: periph_manager(default_args)
+ self.periph_manager = self._mgr_generator()
+ device_info = self.periph_manager.get_device_info()
+ self._state.dev_type.value = \
+ to_binary_str(device_info.get("type", "n/a"))
+ self._state.dev_product.value = \
+ to_binary_str(device_info.get("product", "n/a"))
+ self._state.dev_serial.value = \
+ to_binary_str(device_info.get("serial", "n/a"))
self._db_methods = []
self._mb_methods = []
self.claimed_methods = copy.copy(self.default_claimed_methods)
self._last_error = ""
- self._init_rpc_calls(mgr)
+ self._init_rpc_calls(self.periph_manager)
# We call the server __init__ function here, and not earlier, because
# first the commands need to be registered
super(MPMServer, self).__init__(
- *args,
pack_params={'use_bin_type': True},
- **kwargs
)
def _init_rpc_calls(self, mgr):
@@ -434,14 +447,14 @@ class MPMServer(RPCServer):
]
-def _rpc_server_process(shared_state, port, mgr, mgr_generator):
+def _rpc_server_process(shared_state, port, default_args):
"""
This is the actual process that's running the RPC server.
"""
connections = Pool(1000)
server = StreamServer(
('0.0.0.0', port),
- handle=MPMServer(shared_state, mgr, mgr_generator),
+ handle=MPMServer(shared_state, default_args),
spawn=connections)
# catch signals and stop the stream server
signal(signal.SIGTERM, lambda *args: server.stop())
@@ -449,12 +462,13 @@ def _rpc_server_process(shared_state, port, mgr, mgr_generator):
server.serve_forever()
-def spawn_rpc_process(state, udp_port, mgr, mgr_generator):
+def spawn_rpc_process(state, udp_port, default_args):
"""
Returns a process that contains the RPC server
"""
-
- proc_args = [udp_port, state, mgr, mgr_generator]
- proc = Process(target=_rpc_server_process, args=proc_args)
+ proc = Process(
+ target=_rpc_server_process,
+ args=[udp_port, state, default_args],
+ )
proc.start()
return proc