diff options
author | Martin Braun <martin.braun@ettus.com> | 2017-11-15 17:21:27 -0800 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2017-12-22 15:05:06 -0800 |
commit | b12b9465ed13bf2eb7c0fe379c22dcb3b86c4054 (patch) | |
tree | 26301f6017b45756be1c6f6df51be07debef5ce3 /mpm/python/usrp_mpm/periph_manager | |
parent | d327a93530033a3e9f08f797e1b04f4357401e8e (diff) | |
download | uhd-b12b9465ed13bf2eb7c0fe379c22dcb3b86c4054.tar.gz uhd-b12b9465ed13bf2eb7c0fe379c22dcb3b86c4054.tar.bz2 uhd-b12b9465ed13bf2eb7c0fe379c22dcb3b86c4054.zip |
mpm/mpmd: Move to request_xport()/commit_xport() architecture
This commit combines code from various branches to finally enable both
UDP and Liberio transports.
Diffstat (limited to 'mpm/python/usrp_mpm/periph_manager')
-rw-r--r-- | mpm/python/usrp_mpm/periph_manager/base.py | 88 | ||||
-rw-r--r-- | mpm/python/usrp_mpm/periph_manager/n310.py | 393 |
2 files changed, 351 insertions, 130 deletions
diff --git a/mpm/python/usrp_mpm/periph_manager/base.py b/mpm/python/usrp_mpm/periph_manager/base.py index 0a3882102..a619247ad 100644 --- a/mpm/python/usrp_mpm/periph_manager/base.py +++ b/mpm/python/usrp_mpm/periph_manager/base.py @@ -28,7 +28,6 @@ from six import iteritems, itervalues from ..mpmlog import get_logger from .udev import get_eeprom_paths from .udev import get_spidev_nodes -from usrp_mpm import net from usrp_mpm import dtoverlay from usrp_mpm import eeprom from usrp_mpm.rpc_server import no_claim, no_rpc @@ -154,8 +153,6 @@ class PeriphManagerBase(object): self._init_dboards(args.override_db_pids) self._available_endpoints = list(range(256)) self._init_args = {} - self.log.info("Identifying available network interfaces...") - self._chdr_interfaces = self._init_interfaces(self.chdr_interfaces) def _init_mboard_with_eeprom(self): """ @@ -309,23 +306,6 @@ class PeriphManagerBase(object): self.dboards.append(db_class(dboard_idx, **dboard_info)) self.log.info("Found {} daughterboard(s).".format(len(self.dboards))) - def _init_interfaces(self, possible_ifaces): - """ - Initialize the list of network interfaces - """ - self.log.trace("Testing available interfaces out of `{}'".format( - possible_ifaces - )) - valid_ifaces = net.get_valid_interfaces(possible_ifaces) - if len(valid_ifaces): - self.log.debug("Found CHDR interfaces: `{}'".format(valid_ifaces)) - else: - self.log.warning("No CHDR interfaces found!") - return { - x: net.get_iface_info(x) - for x in valid_ifaces - } - def init(self, args): """ Run the mboard initialization. This is typically done at the beginning @@ -626,3 +606,71 @@ class PeriphManagerBase(object): "is not implemented.", dboard_idx) raise NotImplementedError + ####################################################################### + # Transport API + ####################################################################### + def request_xport( + self, + dst_address, + suggested_src_address, + xport_type, + ): + """ + When setting up a CHDR connection, this is the first call to be + made. This function will return a list of dictionaries, each + describing a way to open an CHDR connection. + All transports requested are bidirectional. + + The callee must maintain a lock on the available CHDR xports. After + calling request_xport(), the caller needs to pick one of the + dictionaries, possibly amend data (e.g., if the connection is an + Ethernet connection, then we need to know the source port, but more + details on that in commit_xport()'s documentation). + One way to implement a lock is to simply lock a mutex here and + unlock it in commit_xport(), even though there are probably more + nuanced solutions. + + Arguments: + dst_sid -- The destination part of the connection, i.e., which + RFNoC block are we connecting to. Example: 0x0230 + suggested_src_sid -- The source part of the connection, i.e., + what's the source address of packets going to + the destination at dst_sid. This is a + suggestion, MPM can override this. Example: + 0x0001. + xport_type -- One of the following strings: CTRL, ASYNC_MSG, + TX_DATA, RX_DATA. See also xports_type_t in UHD. + + The return value is a list of dictionaries. Every dictionary has + the following key/value pairs: + - type: Type of transport, e.g., "UDP", "liberio". + - ipv4 (UDP only): IPv4 address to connect to. + - port (UDP only): IP port to connect to. + - rx_mtu: In bytes, the max size RX packets can have (RX means going + from device to UHD) + - tx_mtu: In bytes, the max size TX packets can have (TX means going + from UHD to device) + """ + raise NotImplementedError("request_xport() not implemented.") + + def commit_xport(self, xport_info): + """ + When setting up a CHDR connection, this is the second call to be + made. + + Arguments: + xport_info -- A dictionary (string -> string). The dictionary must + have been originally created by request_xport(), but + additional key/value pairs need to be added. + + All transports need to also provide: + - rx_mtu: In bytes, the max number of bytes going from device to UHD + - tx_mtu: In bytes, the max number of bytes going from UHD to device + + UDP transports need to also provide: + - src_ipv4: IPv4 address the connection is coming from. + - src_port: IP port the connection is coming from. + """ + raise NotImplementedError("commit_xport() not implemented.") + + diff --git a/mpm/python/usrp_mpm/periph_manager/n310.py b/mpm/python/usrp_mpm/periph_manager/n310.py index a011d3559..93772b1ad 100644 --- a/mpm/python/usrp_mpm/periph_manager/n310.py +++ b/mpm/python/usrp_mpm/periph_manager/n310.py @@ -22,7 +22,7 @@ from __future__ import print_function import os import copy import shutil -from six import iteritems +from six import iteritems, itervalues from builtins import object from .base import PeriphManagerBase from ..net import get_iface_addrs @@ -31,6 +31,7 @@ from ..net import get_mac_addr from ..mpmtypes import SID from usrp_mpm.uio import UIO from usrp_mpm.rpc_server import no_claim, no_rpc +from usrp_mpm import net from ..sysfs_gpio import SysFSGPIO from ..ethtable import EthDispatcherTable from ..liberiotable import LiberioDispatcherTable @@ -42,6 +43,9 @@ N3XX_DEFAULT_TIME_SOURCE = 'internal' N3XX_DEFAULT_ENABLE_GPS = True N3XX_DEFAULT_ENABLE_FPGPIO = True +############################################################################### +# Additional peripheral controllers specific to Magnesium +############################################################################### class TCA6424(object): """ Abstraction layer for the port/gpio expander @@ -204,6 +208,224 @@ class FP_GPIO(object): assert index in range(self._gpiosize) return self._gpios.get(self._offset+index) +############################################################################### +# Transport managers +############################################################################### +class XportMgrUDP(object): + """ + Transport manager for UDP connections + """ + # Map Eth devices to UIO labels + eth_tables = {'eth1': 'misc-enet-regs0', 'eth2': 'misc-enet-regs1'} + + def __init__(self, possible_chdr_ifaces, log): + self.log = log + self._possible_chdr_ifaces = possible_chdr_ifaces + self.log.info("Identifying available network interfaces...") + self._chdr_ifaces = \ + self._init_interfaces(self._possible_chdr_ifaces) + self._eth_dispatchers = { + x: EthDispatcherTable(self.eth_tables.get(x)) + for x in list(self._chdr_ifaces.keys()) + } + for ifname, table in iteritems(self._eth_dispatchers): + table.set_ipv4_addr(self._chdr_ifaces[ifname]['ip_addr']) + + def _init_interfaces(self, possible_ifaces): + """ + Initialize the list of network interfaces + """ + self.log.trace("Testing available interfaces out of `{}'".format( + possible_ifaces + )) + valid_ifaces = net.get_valid_interfaces(possible_ifaces) + if len(valid_ifaces): + self.log.debug("Found CHDR interfaces: `{}'".format(valid_ifaces)) + else: + self.log.warning("No CHDR interfaces found!") + return { + x: net.get_iface_info(x) + for x in valid_ifaces + } + + def init(self, args): + """ + Call this when the user calls 'init' on the periph manager + """ + # TODO re-run _init_interfaces, IP addresses could have changed since + # bootup + for _, table in iteritems(self._eth_dispatchers): + if 'forward_eth' in args or 'forward_bcast' in args: + table.set_forward_policy( + args.get('forward_eth', False), + args.get('forward_bcast', False) + ) + if 'preload_ethtables' in args: + self._preload_ethtables( + self._eth_dispatchers, + args['preload_ethtables'] + ) + + def deinit(self): + " Clean up after a session terminates " + pass + + def _preload_ethtables(self, eth_dispatchers, table_file): + """ + Populates the ethernet tables from a JSON file + """ + import json + try: + eth_table_data = json.load(open(table_file)) + except ValueError as ex: + self.log.warning( + "Bad values in preloading table file: %s", + str(ex) + ) + return + self.log.info( + "Preloading Ethernet dispatch tables from JSON file `%s'.", + table_file + ) + for eth_iface, data in iteritems(eth_table_data): + if eth_iface not in eth_dispatchers: + self.log.warning( + "Request to preload eth dispatcher table for " + "iface `{}', but no such interface is " + "registered. Known interfaces: {}".format( + str(eth_iface), + ",".join(eth_dispatchers.keys()) + ) + ) + continue + eth_dispatcher = eth_dispatchers[eth_iface] + self.log.debug("Preloading {} dispatch table".format(eth_iface)) + try: + for dst_ep, udp_data in iteritems(data): + sid = SID() + sid.set_dst_ep(int(dst_ep)) + eth_dispatcher.set_route( + sid, + udp_data['ip_addr'], + udp_data['port'], + udp_data.get('mac_addr', None) + ) + except ValueError as ex: + self.log.warning( + "Bad values in preloading table file: %s", + str(ex) + ) + + def request_xport( + self, + sid, + xport_type, + ): + """ + Return UDP xport info + """ + assert xport_type in ('CTRL', 'ASYNC_MSG', 'TX_DATA', 'RX_DATA') + xport_info = { + 'type': 'UDP', + # TODO what about eth2, huh? + 'ipv4': str(self._chdr_ifaces['eth1']['ip_addr']), + 'port': '49153', # FIXME no hardcoding + 'send_sid': str(sid) + } + return [xport_info] + + def commit_xport(self, sid, xport_info): + """ + fuu + """ + # TODO do error checking on the xport_info + self.log.trace("Committing UDP transport using xport_info `%s'", + str(xport_info)) + sender_addr = xport_info['src_ipv4'] + sender_port = int(xport_info['src_port']) + self.log.trace("Incoming connection is coming from %s:%d", + sender_addr, sender_port) + mac_addr = get_mac_addr(sender_addr) + if mac_addr is None: + raise RuntimeError( + "Could not find MAC address for IP address {}".format( + sender_addr)) + self.log.trace("Incoming connection is coming from %s", + mac_addr) + # Remove hardcodings in the following lines FIXME + my_xbar = lib.xbar.xbar.make("/dev/crossbar0") # TODO + my_xbar.set_route(sid.src_addr, 0) # TODO + eth_dispatcher = self._eth_dispatchers['eth1'] # TODO + eth_dispatcher.set_route(sid.reversed(), sender_addr, sender_port) + self.log.trace("UDP transport successfully committed!") + return True + +class XportMgrLiberio(object): + """ + Transport manager for UDP connections + """ + # udev label for the UIO device that controls the DMA engine + liberio_label = 'liberio' + + def __init__(self, log): + self.log = log + self._dma_dispatcher = LiberioDispatcherTable(self.liberio_label) + self._data_chan_ctr = 0 + self._max_chan = 10 # TODO get this number from somewhere + + def init(self, args): + """ + Call this when the user calls 'init' on the periph manager + """ + pass + + def deinit(self): + " Clean up after a session terminates " + self._data_chan_ctr = 0 + + def request_xport( + self, + sid, + xport_type, + ): + """ + Return liberio xport info + """ + assert xport_type in ('CTRL', 'ASYNC_MSG', 'TX_DATA', 'RX_DATA') + if xport_type == 'CTRL': + chan = 0 + elif xport_type == 'ASYNC_MSG': + chan = 1 + else: + chan = 2 + self._data_chan_ctr + self._data_chan_ctr += 1 + xport_info = { + 'type': 'liberio', + 'send_sid': str(sid), + 'muxed': str(xport_type in ('CTRL', 'ASYNC_MSG')), + 'dma_chan': str(chan), + 'tx_dev': "/dev/tx-dma{}".format(chan), + 'rx_dev': "/dev/rx-dma{}".format(chan), + } + self.log.trace("Liberio: Chan: {} TX Device: {} RX Device: {}".format( + chan, xport_info['tx_dev'], xport_info['rx_dev'])) + self.log.trace("Liberio channel is muxed: %s", + "Yes" if xport_info['muxed'] else "No") + return [xport_info] + + def commit_xport(self, sid, xport_info): + " Commit liberio transport " + chan = int(xport_info['dma_chan']) + my_xbar = lib.xbar.xbar.make("/dev/crossbar0") # TODO + my_xbar.set_route(sid.src_addr, 2) # TODO + self._dma_dispatcher.set_route(sid.reversed(), chan) + self.log.trace("Liberio transport successfully committed!") + return True + + +############################################################################### +# Main Class +############################################################################### class n310(PeriphManagerBase): """ Holds N310 specific attributes and methods @@ -230,7 +452,6 @@ class n310(PeriphManagerBase): dboard_spimaster_addrs = ["e0006000.spi", "e0007000.spi"] chdr_interfaces = ['eth1', 'eth2'] # N310-specific settings - eth_tables = {'eth1': 'misc-enet-regs0', 'eth2': 'misc-enet-regs1'} # Path to N310 FPGA bin file # This file will always contain the current image, regardless of SFP type, # dboard, etc. The host is responsible for providing a compatible image @@ -242,8 +463,6 @@ class n310(PeriphManagerBase): 'callback': "update_fpga", }, } - # udev label for the UIO device that controls the DMA engine - liberio_label = 'liberio' def __init__(self, args): super(n310, self).__init__(args) @@ -273,16 +492,11 @@ class n310(PeriphManagerBase): self._clock_source = None self._time_source = None self._init_ref_clock_and_time(args.default_args) - # Define some attributes so PyLint stays quiet - self._eth_dispatchers = None - self._dma_dispatcher = None - # Init Ethernet - self._eth_dispatchers = { - x: EthDispatcherTable(self.eth_tables.get(x)) - for x in list(self._chdr_interfaces.keys()) + # Init CHDR transports + self._xport_mgrs = { + 'udp': XportMgrUDP(self.chdr_interfaces, self.log), + 'liberio': XportMgrLiberio(self.log), } - for ifname, table in iteritems(self._eth_dispatchers): - table.set_ipv4_addr(self._chdr_interfaces[ifname]['ip_addr']) # Init complete. self.log.info("mboard info: {}".format(self.mboard_info)) @@ -315,112 +529,71 @@ class n310(PeriphManagerBase): dispatchers accordingly. """ result = super(n310, self).init(args) - for _, table in iteritems(self._eth_dispatchers): - if 'forward_eth' in args or 'forward_bcast' in args: - table.set_forward_policy( - args.get('forward_eth', False), - args.get('forward_bcast', False) - ) - if 'preload_ethtables' in args: - self._preload_ethtables( - self._eth_dispatchers, - args['preload_ethtables'] - ) - self._dma_dispatcher = LiberioDispatcherTable(self.liberio_label) + for xport_mgr in itervalues(self._xport_mgrs): + xport_mgr.init(args) return result - def _preload_ethtables(self, eth_dispatchers, table_file): + def deinit(self): """ - Populates the ethernet tables from a JSON file + Clean up after a UHD session terminates. """ - import json - try: - eth_table_data = json.load(open(table_file)) - except ValueError as ex: - self.log.warning( - "Bad values in preloading table file: %s", - str(ex) + super(n310, self).deinit() + for xport_mgr in itervalues(self._xport_mgrs): + xport_mgr.deinit() + + ########################################################################### + # Transport API + ########################################################################### + def request_xport( + self, + dst_address, + suggested_src_address, + xport_type + ): + """ + See PeriphManagerBase.request_xport() for docs. + """ + # For now, we always accept the suggestion if available, or fail + src_address = suggested_src_address + if src_address not in self._available_endpoints: + raise RuntimeError("no more sids yo") + sid = SID(src_address << 16 | dst_address) + self.log.debug( + "request_xport(dst=0x%04X, suggested_src_address=0x%04X, xport_type=%s): " \ + "operating on SID: %s", + dst_address, suggested_src_address, str(xport_type), str(sid)) + # FIXME token! + assert self.mboard_info['rpc_connection'] in ('remote', 'local') + if self.mboard_info['rpc_connection'] == 'remote': + return self._xport_mgrs['udp'].request_xport( + sid, + xport_type, + ) + elif self.mboard_info['rpc_connection'] == 'local': + return self._xport_mgrs['liberio'].request_xport( + sid, + xport_type, ) - return - self.log.info( - "Preloading Ethernet dispatch tables from JSON file `%s'.", - table_file - ) - for eth_iface, data in iteritems(eth_table_data): - if eth_iface not in eth_dispatchers: - self.log.warning( - "Request to preload eth dispatcher table for " - "iface `{}', but no such interface is " - "registered. Known interfaces: {}".format( - str(eth_iface), - ",".join(eth_dispatchers.keys()) - ) - ) - continue - eth_dispatcher = eth_dispatchers[eth_iface] - self.log.debug("Preloading {} dispatch table".format(eth_iface)) - try: - for dst_ep, udp_data in iteritems(data): - sid = SID() - sid.set_dst_ep(int(dst_ep)) - eth_dispatcher.set_route( - sid, - udp_data['ip_addr'], - udp_data['port'], - udp_data.get('mac_addr', None) - ) - except ValueError as ex: - self.log.warning( - "Bad values in preloading table file: %s", - str(ex) - ) - def _allocate_sid(self, sender_addr, port, sid, xbar_src_addr, xbar_src_port, new_ep): # FIXME mtu + def commit_xport(self, xport_info): """ - Get the MAC address of the sender and store it in the FPGA ARP table + See PeriphManagerBase.commit_xport() for docs. + + Reminder: All connections are incoming, i.e. "send" or "TX" means + remote device to local device, and "receive" or "RX" means this local + device to remote device. "Remote device" can be, for example, a UHD + session. """ + ## Go, go, go + assert self.mboard_info['rpc_connection'] in ('remote', 'local') + sid = SID(xport_info['send_sid']) + self._available_endpoints.remove(sid.src_ep) + self.log.debug("Committing transport for SID %s, xport info: %s", + str(sid), str(xport_info)) if self.mboard_info['rpc_connection'] == 'remote': - self.log.debug("Preparing for UDP connection") - mac_addr = get_mac_addr(sender_addr) - if new_ep not in self._available_endpoints: - raise RuntimeError("no more sids yo") - self._available_endpoints.remove(new_ep) - if mac_addr is not None: - if sender_addr not in self.sid_endpoints: - self.sid_endpoints.update({sender_addr: (new_ep,)}) - else: - current_allocation = self.sid_endpoints.get(sender_addr) - new_allocation = current_allocation + (new_ep,) - self.sid_endpoints.update({sender_addr: new_allocation}) - sid = SID(sid) - sid.set_src_addr(xbar_src_addr) - sid.set_src_ep(new_ep) - my_xbar = lib.xbar.xbar.make("/dev/crossbar0") # TODO - my_xbar.set_route(xbar_src_addr, 0) # TODO - eth_dispatcher = self._eth_dispatchers['eth1'] # TODO - eth_dispatcher.set_route(sid.reversed(), sender_addr, port) - return sid.get() - else: - self.log.debug("Preparing for liberio connection") - if new_ep not in self._available_endpoints: - raise RuntimeError("no more sids yo") - self._available_endpoints.remove(new_ep) - if sender_addr not in self.sid_endpoints: - self.sid_endpoints.update({sender_addr: (new_ep,)}) - else: - current_allocation = self.sid_endpoints.get(sender_addr) - new_allocation = current_allocation + (new_ep,) - self.sid_endpoints.update({sender_addr: new_allocation}) - sid = SID(sid) - sid.set_src_addr(xbar_src_addr) - sid.set_src_ep(new_ep) - my_xbar = lib.xbar.xbar.make("/dev/crossbar0") # TODO - my_xbar.set_route(xbar_src_addr, 2) # TODO - mtu = 0 - assert False # This path is not yet done - self._dma_dispatcher.set_route(sid.reversed(), new_ep, mtu) - - return sid.get() + return self._xport_mgrs['udp'].commit_xport(sid, xport_info) + elif self.mboard_info['rpc_connection'] == 'local': + return self._xport_mgrs['liberio'].commit_xport(sid, xport_info) def get_clock_sources(self): " Lists all available clock sources. " |