From 267365c9458690842da549a00b7adb7946a1117d Mon Sep 17 00:00:00 2001 From: Samuel O'Brien Date: Wed, 5 Aug 2020 09:29:41 -0500 Subject: sim: Clarify Naming of Streams ChdrSniffer is renamed to ChdrEndpoint to clarify its function as the actual destination of chdr packets, rather than just an observer. TxWorker has been renamed to OutputStream and RxWorker has been renamed to InputStream to avoid ambiguities regarding Tx and Rx terminology. Signed-off-by: Samuel O'Brien --- mpm/python/usrp_mpm/periph_manager/sim.py | 4 +- mpm/python/usrp_mpm/simulator/CMakeLists.txt | 2 +- mpm/python/usrp_mpm/simulator/chdr_endpoint.py | 99 +++++++++++++++++++++++ mpm/python/usrp_mpm/simulator/chdr_sniffer.py | 107 ------------------------- mpm/python/usrp_mpm/simulator/rfnoc_graph.py | 24 ++++-- 5 files changed, 119 insertions(+), 117 deletions(-) create mode 100644 mpm/python/usrp_mpm/simulator/chdr_endpoint.py delete mode 100644 mpm/python/usrp_mpm/simulator/chdr_sniffer.py (limited to 'mpm/python') diff --git a/mpm/python/usrp_mpm/periph_manager/sim.py b/mpm/python/usrp_mpm/periph_manager/sim.py index 7f9a610a7..20d2486c0 100644 --- a/mpm/python/usrp_mpm/periph_manager/sim.py +++ b/mpm/python/usrp_mpm/periph_manager/sim.py @@ -16,7 +16,7 @@ from usrp_mpm.mpmlog import get_logger from usrp_mpm.rpc_server import no_claim from usrp_mpm.periph_manager import PeriphManagerBase from usrp_mpm.simulator.sim_dboard_catalina import SimulatedCatalinaDboard -from usrp_mpm.simulator.chdr_sniffer import ChdrSniffer +from usrp_mpm.simulator.chdr_endpoint import ChdrEndpoint CLOCK_SOURCE_INTERNAL = "internal" @@ -83,7 +83,7 @@ class sim(PeriphManagerBase): super().__init__() self.device_id = 1 - self.chdr_sniffer = ChdrSniffer(self.log, args) + self.chdr_endpoint = ChdrEndpoint(self.log, args) # Unlike the real hardware drivers, if there is an exception here, # we just crash. No use missing an error when testing. diff --git a/mpm/python/usrp_mpm/simulator/CMakeLists.txt b/mpm/python/usrp_mpm/simulator/CMakeLists.txt index 82fcbd801..b1f44ef12 100644 --- a/mpm/python/usrp_mpm/simulator/CMakeLists.txt +++ b/mpm/python/usrp_mpm/simulator/CMakeLists.txt @@ -13,7 +13,7 @@ set(USRP_MPM_SIMULATOR_FILES ${CMAKE_CURRENT_SOURCE_DIR}/__init__.py ${CMAKE_CURRENT_SOURCE_DIR}/sim_dboard.py ${CMAKE_CURRENT_SOURCE_DIR}/sim_dboard_catalina.py - ${CMAKE_CURRENT_SOURCE_DIR}/chdr_sniffer.py + ${CMAKE_CURRENT_SOURCE_DIR}/chdr_endpoint.py ${CMAKE_CURRENT_SOURCE_DIR}/noc_block_regs.py ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_graph.py ${CMAKE_CURRENT_SOURCE_DIR}/stream_ep_regs.py diff --git a/mpm/python/usrp_mpm/simulator/chdr_endpoint.py b/mpm/python/usrp_mpm/simulator/chdr_endpoint.py new file mode 100644 index 000000000..616cd0ce9 --- /dev/null +++ b/mpm/python/usrp_mpm/simulator/chdr_endpoint.py @@ -0,0 +1,99 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +"""This module houses the ChdrEndpoint class, which handles networking, +packet dispatch, and acts as an interface between these and the RFNoC +Graph. +""" + +from threading import Thread +import socket +from uhd.chdr import ChdrPacket, ChdrWidth, PacketType +from .rfnoc_graph import XbarNode, XportNode, StreamEndpointNode, RFNoCGraph, NodeType + +CHDR_W = ChdrWidth.W64 + +class ChdrEndpoint: + """This class is created by the sim periph_manager + It is responsible for opening sockets, dispatching all chdr packet + traffic to the appropriate destination, and responding to said + traffic. + + The extra_args parameter is passed in from the periph_manager, and + coresponds to the --default_args flag of usrp_hwd.py on the + command line + """ + def __init__(self, log, extra_args): + self.log = log.getChild("ChdrEndpoint") + self.thread = Thread(target=self.socket_worker, daemon=True) + self.thread.start() + self.graph = RFNoCGraph(self.get_default_nodes(), self.log, 1, self.begin_tx, + self.end_tx, self.send_strc, self.begin_rx) + self.xport_map = {} + + def set_sample_rate(self, rate): + """Set the sample_rate of the next tx_stream. + + This method is called by the daughterboard. It coresponds to + sim_dboard.py:sim_db#set_catalina_clock_rate() + """ + self.graph.get_stream_spec().sample_rate = rate + + def get_default_nodes(self): + """Get a sensible NoC Core setup. This is the simplest + functional layout. It has one of each required component. + """ + nodes = [ + XportNode(0), + XbarNode(0, [2], [0]), + StreamEndpointNode(0) + ] + return nodes + + def send_strc(self, stream_ep, addr): + pass # TODO: currently not implemented + + def begin_tx(self, src_epid, stream_spec): + pass # TODO: currently not implemented + + def end_tx(self, src_epid): + pass # TODO: currently not implemented + + def begin_rx(self, dst_epid): + pass # TODO: currently not implemented + + def socket_worker(self): + """This is the method that runs in a background thread. It + blocks on the CHDR socket and processes packets as they come + in. + """ + self.log.info("Starting ChdrEndpoint Thread") + main_sock = socket.socket(socket.AF_INET, + socket.SOCK_DGRAM) + main_sock.bind(("0.0.0.0", 49153)) + + while True: + # This allows us to block on multiple sockets at the same time + buffer = bytearray(8000) # Max MTU + # received Data over socket + n_bytes, sender = main_sock.recvfrom_into(buffer) + self.log.trace("received {} bytes of data from {}" + .format(n_bytes, sender)) + try: + packet = ChdrPacket.deserialize(CHDR_W, buffer[:n_bytes]) + self.log.trace("Decoded Packet: {}".format(packet.to_string_with_payload())) + entry_xport = (1, NodeType.XPORT, 0) + pkt_type = packet.get_header().pkt_type + response = self.graph.handle_packet(packet, entry_xport, sender) + + if response is not None: + data = response.serialize() + self.log.trace("Returning Packet: {}" + .format(packet.to_string_with_payload())) + main_sock.sendto(bytes(data), sender) + except BaseException as ex: + self.log.warning("Unable to decode packet: {}" + .format(ex)) + raise ex diff --git a/mpm/python/usrp_mpm/simulator/chdr_sniffer.py b/mpm/python/usrp_mpm/simulator/chdr_sniffer.py deleted file mode 100644 index 8ff467cf2..000000000 --- a/mpm/python/usrp_mpm/simulator/chdr_sniffer.py +++ /dev/null @@ -1,107 +0,0 @@ -# -# Copyright 2020 Ettus Research, a National Instruments Brand -# -# SPDX-License-Identifier: GPL-3.0-or-later -# -"""This module houses the ChdrSniffer class, which handles networking, -packet dispatch, and the nitty gritty of spinning up rx and tx workers. - -Note: Rx and Tx are reversed when compared to their definitions in the -UHD radio_control_impl.cpp file. Rx is when samples are coming to the -simulator. Tx is when samples are being sent by the simulator. - -TODO: This class is run based on threads for development simplicity. If -more throughput is desired, it should be rewritten to use processes -instead. This allows the socket workers to truly work in parallel. -Python threads are limited by holding the GIL while they are executing. -""" - -from threading import Thread -import socket -from uhd.chdr import ChdrPacket, ChdrWidth, PacketType -from .rfnoc_graph import XbarNode, XportNode, StreamEndpointNode, RFNoCGraph, NodeType - -CHDR_W = ChdrWidth.W64 - -class ChdrSniffer: - """This class is created by the sim periph_manager - It is responsible for opening sockets, dispatching all chdr packet - traffic to the appropriate destination, and responding to said - traffic. - - The extra_args parameter is passed in from the periph_manager, and - coresponds to the --default_args flag of usrp_hwd.py on the - command line - """ - def __init__(self, log, extra_args): - self.log = log.getChild("ChdrSniffer") - self.thread = Thread(target=self.socket_worker, daemon=True) - self.thread.start() - self.graph = RFNoCGraph(self.get_default_nodes(), self.log, 1, self.begin_tx, - self.end_tx, self.send_strc, self.begin_rx) - self.xport_map = {} - - def set_sample_rate(self, rate): - """Set the sample_rate of the next tx_stream. - - This method is called by the daughterboard. It coresponds to - sim_dboard.py:sim_db#set_catalina_clock_rate() - """ - self.graph.get_stream_spec().sample_rate = rate - - def get_default_nodes(self): - """Get a sensible NoC Core setup. This is the simplest - functional layout. It has one of each required component. - """ - nodes = [ - XportNode(0), - XbarNode(0, [2], [0]), - StreamEndpointNode(0) - ] - return nodes - - def send_strc(self, stream_ep, addr): - pass # TODO: currently not implemented - - def begin_tx(self, src_epid, stream_spec): - pass # TODO: currently not implemented - - def end_tx(self, src_epid): - pass # TODO: currently not implemented - - def begin_rx(self, dst_epid): - pass # TODO: currently not implemented - - def socket_worker(self): - """This is the method that runs in a background thread. It - blocks on the CHDR socket and processes packets as they come - in. - """ - self.log.info("Starting ChdrSniffer Thread") - main_sock = socket.socket(socket.AF_INET, - socket.SOCK_DGRAM) - main_sock.bind(("0.0.0.0", 49153)) - - while True: - # This allows us to block on multiple sockets at the same time - buffer = bytearray(8192) # Max MTU - # received Data over socket - n_bytes, sender = main_sock.recvfrom_into(buffer) - self.log.trace("received {} bytes of data from {}" - .format(n_bytes, sender)) - try: - packet = ChdrPacket.deserialize(CHDR_W, buffer[:n_bytes]) - self.log.trace("Decoded Packet: {}".format(packet.to_string_with_payload())) - entry_xport = (1, NodeType.XPORT, 0) - pkt_type = packet.get_header().pkt_type - response = self.graph.handle_packet(packet, entry_xport, sender) - - if response is not None: - data = response.serialize() - self.log.trace("Returning Packet: {}" - .format(packet.to_string_with_payload())) - main_sock.sendto(bytes(data), sender) - except BaseException as ex: - self.log.warning("Unable to decode packet: {}" - .format(ex)) - raise ex diff --git a/mpm/python/usrp_mpm/simulator/rfnoc_graph.py b/mpm/python/usrp_mpm/simulator/rfnoc_graph.py index 42210ab6e..c12592603 100644 --- a/mpm/python/usrp_mpm/simulator/rfnoc_graph.py +++ b/mpm/python/usrp_mpm/simulator/rfnoc_graph.py @@ -23,7 +23,7 @@ class StreamSpec: radio registers (noc_block_regs.py) sample_rate comes from an rpc to the daughterboard (through the - set_sample_rate method in chdr_sniffer.py) + set_sample_rate method in chdr_endpoint.py) dst_epid comes from the source stream_ep @@ -146,18 +146,23 @@ class Node: next_node = self._handle_default_packet(packet, **kwargs) return next_node + # pylint: disable=unused-argument,no-self-use def _handle_mgmt_packet(self, packet, **kwargs): return NotImplemented + # pylint: disable=unused-argument,no-self-use def _handle_ctrl_packet(self, packet, **kwargs): return NotImplemented + # pylint: disable=unused-argument,no-self-use def _handle_data_packet(self, packet, **kwargs): return NotImplemented + # pylint: disable=unused-argument,no-self-use def _handle_strc_packet(self, packet, **kwargs): return NotImplemented + # pylint: disable=unused-argument,no-self-use def _handle_strs_packet(self, packet, **kwargs): return NotImplemented @@ -329,7 +334,7 @@ class StreamEndpointNode(Node): REG_OSTRM_CTRL_STATUS register """ if status.cfg_start: - # This only creates a new TxWorker if the cfg_start flag is set + # This only creates a new ChdrOutputStream if the cfg_start flag is set self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid)) self.sep_config(self, True) return STRM_STATUS_FC_ENABLED @@ -338,7 +343,7 @@ class StreamEndpointNode(Node): """Called by the ep_regs on a write to the REG_OSTRM_CTRL_STATUS register """ - # This always triggers the graph to create a new RxWorker + # This always triggers the graph to create a new ChdrInputStream self.sep_config(self, False) return STRM_STATUS_FC_ENABLED @@ -351,15 +356,18 @@ class StreamEndpointNode(Node): return NodeType.STRM_EP def get_epid(self): + """ Get the endpoint id of this stream endpoint """ return self.epid def set_epid(self, epid): + """ Set the endpoint id of this stream endpoint """ self.epid = epid def set_dst_epid(self, dst_epid): + """ Set the destination endpoint id of this stream endpoint """ self.dst_epid = dst_epid - def _handle_mgmt_packet(self, packet, regs, **kwargs): + def _handle_mgmt_packet(self, packet, **kwargs): send_upstream = False payload = packet.get_payload_mgmt() our_hop = payload.pop_hop() @@ -413,7 +421,7 @@ class RFNoCGraph: """This class holds all of the nodes of the NoC core and the Noc blocks. - It serves as an interface between the ChdrSniffer and the + It serves as an interface between the ChdrEndpoint and the individual blocks/nodes. """ def __init__(self, graph_list, log, device_id, create_tx_func, @@ -449,7 +457,7 @@ class RFNoCGraph: self.radio_tx_stop) def radio_tx_cmd(self, sep_block_id): - """Triggers the creation of a TxWorker in the ChdrSniffer using + """Triggers the creation of a ChdrOutputStream in the ChdrEndpoint using the current stream_spec. This method transforms the sep_block_id into an epid useable by @@ -470,7 +478,7 @@ class RFNoCGraph: self.stream_spec = StreamSpec() def radio_tx_stop(self, sep_block_id): - """Triggers the destuction of a TxWorker in the ChdrSniffer + """Triggers the destuction of a ChdrOutputStream in the ChdrEndpoint This method transforms the sep_block_id into an epid useable by the transmit code @@ -499,6 +507,7 @@ class RFNoCGraph: self.create_rx(stream_ep.epid) def change_spp(self, spp): + """Change the Stream Samples per Packet""" self.stream_spec.packet_samples = spp def find_ep_by_id(self, epid): @@ -551,4 +560,5 @@ class RFNoCGraph: return response_packet def get_stream_spec(self): + """ Get the current output stream configuration """ return self.stream_spec -- cgit v1.2.3