diff options
authorSamuel O'Brien <sam.obrien@ni.com>2020-08-05 10:00:20 -0500
committerAaron Rossetto <aaron.rossetto@ni.com>2020-10-28 15:25:48 -0500
commitd42ddc804118b2e9120c84efd477f9f4b3f8472e (patch)
parent57ca4235b1b634d8c487fe2f0928ecc79f1bdbe7 (diff)
sim: Support Streaming
This commit add support for both Tx and Rx streams to the simulator. Signed-off-by: Samuel O'Brien <sam.obrien@ni.com>
8 files changed, 860 insertions, 348 deletions
diff --git a/mpm/python/usrp_mpm/periph_manager/sim.py b/mpm/python/usrp_mpm/periph_manager/sim.py
index 20d2486c0..b5cc45807 100644
--- a/mpm/python/usrp_mpm/periph_manager/sim.py
+++ b/mpm/python/usrp_mpm/periph_manager/sim.py
@@ -143,8 +143,6 @@ class sim(PeriphManagerBase):
self._xport_mgrs = {
'udp': SimXportMgrUDP(self.log, args, SimEthDispatcher)
- #TODO: Actually create transports here when RFNoC is integrated
- self.log.trace("CHDR transport creation was skipped")
# Init complete.
self.log.debug("Device info: {}".format(self.device_info))
@@ -174,6 +172,7 @@ class sim(PeriphManagerBase):
this motherboard.
self.device_id = device_id
+ self.chdr_endpoint.set_device_id(device_id)
def get_device_id(self):
diff --git a/mpm/python/usrp_mpm/simulator/CMakeLists.txt b/mpm/python/usrp_mpm/simulator/CMakeLists.txt
index b1f44ef12..e95709249 100644
--- a/mpm/python/usrp_mpm/simulator/CMakeLists.txt
+++ b/mpm/python/usrp_mpm/simulator/CMakeLists.txt
@@ -17,6 +17,10 @@ set(USRP_MPM_SIMULATOR_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/sample_source.py
+ ${CMAKE_CURRENT_SOURCE_DIR}/chdr_stream.py
+ ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_common.py
+ ${CMAKE_CURRENT_SOURCE_DIR}/stream_endpoint_node.py
diff --git a/mpm/python/usrp_mpm/simulator/chdr_endpoint.py b/mpm/python/usrp_mpm/simulator/chdr_endpoint.py
index 616cd0ce9..303f3268d 100644
--- a/mpm/python/usrp_mpm/simulator/chdr_endpoint.py
+++ b/mpm/python/usrp_mpm/simulator/chdr_endpoint.py
@@ -10,8 +10,12 @@ Graph.
from threading import Thread
import socket
-from uhd.chdr import ChdrPacket, ChdrWidth, PacketType
+import queue
+import select
+from uhd.chdr import ChdrPacket, ChdrWidth
from .rfnoc_graph import XbarNode, XportNode, StreamEndpointNode, RFNoCGraph, NodeType
+from .chdr_stream import SendWrapper, ChdrOutputStream, ChdrInputStream, SelectableQueue
+from .sample_source import NullSamples
CHDR_W = ChdrWidth.W64
@@ -27,11 +31,20 @@ class ChdrEndpoint:
def __init__(self, log, extra_args):
self.log = log.getChild("ChdrEndpoint")
+ self.source_gen = NullSamples
+ self.sink_gen = NullSamples
+ self.xport_map = {}
+ self.send_queue = SelectableQueue()
+ self.send_wrapper = SendWrapper(self.send_queue)
+ self.graph = RFNoCGraph(self.get_default_nodes(), self.log, 1, self.send_wrapper, CHDR_W)
self.thread = Thread(target=self.socket_worker, daemon=True)
- self.graph = RFNoCGraph(self.get_default_nodes(), self.log, 1, self.begin_tx,
- self.end_tx, self.send_strc, self.begin_rx)
- self.xport_map = {}
+ def set_device_id(self, device_id):
+ """Set the device_id for this endpoint"""
+ self.graph.set_device_id(device_id)
def set_sample_rate(self, rate):
"""Set the sample_rate of the next tx_stream.
@@ -48,7 +61,7 @@ class ChdrEndpoint:
nodes = [
XbarNode(0, [2], [0]),
- StreamEndpointNode(0)
+ StreamEndpointNode(0, self.source_gen, self.sink_gen)
return nodes
@@ -76,24 +89,32 @@ class ChdrEndpoint:
while True:
# This allows us to block on multiple sockets at the same time
+ ready_list, _, _ = select.select([main_sock, self.send_queue], [], [])
buffer = bytearray(8000) # Max MTU
- # received Data over socket
- n_bytes, sender = main_sock.recvfrom_into(buffer)
- self.log.trace("received {} bytes of data from {}"
- .format(n_bytes, sender))
- try:
- packet = ChdrPacket.deserialize(CHDR_W, buffer[:n_bytes])
- self.log.trace("Decoded Packet: {}".format(packet.to_string_with_payload()))
- entry_xport = (1, NodeType.XPORT, 0)
- pkt_type = packet.get_header().pkt_type
- response = self.graph.handle_packet(packet, entry_xport, sender)
- if response is not None:
- data = response.serialize()
- self.log.trace("Returning Packet: {}"
- .format(packet.to_string_with_payload()))
- main_sock.sendto(bytes(data), sender)
- except BaseException as ex:
- self.log.warning("Unable to decode packet: {}"
- .format(ex))
- raise ex
+ for sock in ready_list:
+ if sock is main_sock:
+ # Received Data over socket
+ n_bytes, sender = main_sock.recvfrom_into(buffer)
+ self.log.trace("Received {} bytes of data from {}"
+ .format(n_bytes, sender))
+ try:
+ packet = ChdrPacket.deserialize(CHDR_W, buffer[:n_bytes])
+ self.log.trace("Decoded Packet: {}"
+ .format(packet.to_string_with_payload()))
+ entry_xport = (NodeType.XPORT, 0)
+ response = self.graph.handle_packet(packet, entry_xport, sender,
+ sender, n_bytes)
+ if response is not None:
+ data = response.serialize()
+ self.log.trace("Returning Packet: {}"
+ .format(packet.to_string_with_payload()))
+ main_sock.sendto(bytes(data), sender)
+ except BaseException as ex:
+ self.log.warning("Unable to decode packet: {}"
+ .format(ex))
+ raise ex
+ else:
+ data, addr = self.send_queue.get()
+ sent_len = main_sock.sendto(data, addr)
+ assert len(data) == sent_len, "Didn't send whole packet."
diff --git a/mpm/python/usrp_mpm/simulator/chdr_stream.py b/mpm/python/usrp_mpm/simulator/chdr_stream.py
new file mode 100644
index 000000000..f63fba050
--- /dev/null
+++ b/mpm/python/usrp_mpm/simulator/chdr_stream.py
@@ -0,0 +1,224 @@
+# Copyright 2020 Ettus Research, a National Instruments Brand
+# SPDX-License-Identifier: GPL-3.0-or-later
+This module contains the streaming backend for the simulator. It
+handles managing threads, as well as interfacing with sample sources
+and sinks.
+import time
+from threading import Thread
+import queue
+import socket
+from uhd.chdr import PacketType, StrcOpCode, StrsPayload, StrsStatus, ChdrHeader, ChdrPacket
+class XferCount:
+ """This class keeps track of flow control transfer status which are
+ used to populate Strc and Strs packets
+ """
+ def __init__(self):
+ self.num_bytes = 0
+ self.num_packets = 0
+ def count_packet(self, length):
+ """Accounts for a packet of len bytes in the xfer count"""
+ self.num_bytes += length
+ self.num_packets += 1
+ def clear(self):
+ """Reset the xfer counts to 0"""
+ self.num_bytes = 0
+ self.num_packets = 0
+ def __str__(self):
+ return "XferCount{{num_bytes:{}, num_packets:{}}}".format(self.num_bytes, self.num_packets)
+class SelectableQueue:
+ """ A simple python Queue implementation which can be selected.
+ This allows waiting on a queue and a socket simultaneously.
+ """
+ def __init__(self, max_size=0):
+ self._queue = queue.Queue(max_size)
+ self._send_signal_rx, self._send_signal_tx = socket.socketpair()
+ def put(self, item, block=True, timeout=None):
+ """ Put an element into the queue, optionally blocking """
+ self._queue.put(item, block, timeout)
+ self._send_signal_tx.send(b"\x00")
+ def fileno(self):
+ """ A fileno compatible with select.select """
+ return self._send_signal_rx.fileno()
+ def get(self):
+ """ Return the first element in the queue, blocking if none
+ are available.
+ """
+ self._send_signal_rx.recv(1)
+ return self._queue.get_nowait()
+class SendWrapper:
+ """This class is used as an abstraction over queueing packets to be
+ sent by the socket thread.
+ """
+ def __init__(self, queue):
+ self.queue = queue
+ def send_packet(self, packet, addr):
+ """Serialize packet and then queue the data to be sent to addr
+ returns the length of the serialized packet
+ """
+ data = packet.serialize()
+ self.send_data(bytes(data), addr)
+ return len(data)
+ def send_data(self, data, addr):
+ """Queue data to be sent to addr"""
+ self.queue.put((data, addr))
+class ChdrInputStream:
+ """This class encapsulates an Rx Thread. This thread blocks on a
+ queue which receives STRC and DATA ChdrPackets. It places the data
+ packets into the sample_sink and responds to the STRC packets using
+ the send_wrapper
+ """
+ CAPACITY_BYTES = int(5e6) # 5 MB
+ def __init__(self, log, chdr_w, sample_sink, send_wrapper, our_epid):
+ self.log = log
+ self.chdr_w = chdr_w
+ self.sample_sink = sample_sink
+ self.send_wrapper = send_wrapper
+ self.xfer = XferCount()
+ self.accum = XferCount()
+ self.fc_freq = None
+ self.command_target = None
+ self.command_addr = None
+ self.command_epid = None
+ self.our_epid = our_epid
+ self.rx_queue = queue.Queue(ChdrInputStream.QUEUE_CAP)
+ self.stop = False
+ self.thread = Thread(target=self._rx_worker, daemon=True)
+ self.thread.start()
+ def _rx_worker(self):
+ self.log.info("Stream RX Worker Starting")
+ while True:
+ packet, recv_len, addr = self.rx_queue.get()
+ # This break is here because when ChdrInputStream.stop() is called,
+ # a tuple of 3 None values is pushed into the queue to unblock the worker.
+ if self.stop:
+ break
+ header = packet.get_header()
+ pkt_type = header.pkt_type
+ if pkt_type in (PacketType.DATA_WITH_TS, PacketType.DATA_NO_TS):
+ self.xfer.count_packet(recv_len)
+ self.sample_sink.accept_packet(packet)
+ elif pkt_type == PacketType.STRC:
+ req_payload = packet.get_payload_strc()
+ resp_header = ChdrHeader()
+ resp_header.dst_epid = req_payload.src_epid
+ if req_payload.op_code == StrcOpCode.INIT:
+ self.xfer.clear()
+ elif req_payload.op_code == StrcOpCode.RESYNC:
+ self.xfer.num_bytes = req_payload.xfer_count_bytes
+ self.xfer.num_packets = req_payload.xfer_count_pkts
+ resp_payload = self._generate_strs_payload(header.dst_epid)
+ resp_packet = ChdrPacket(self.chdr_w, resp_header, resp_payload)
+ self.send_wrapper.send_packet(resp_packet, addr)
+ else:
+ raise RuntimeError("RX Worker received unsupported packet: {}".format(pkt_type))
+ self.sample_sink.close()
+ self.log.info("Stream RX Worker Done")
+ def finish(self):
+ """Unblocks the worker and stops the thread.
+ The worker will close its sample_sink
+ """
+ self.stop = True
+ self.rx_queue.put((None, None, None))
+ def _generate_strs_payload(self, src_epid):
+ """Create an strs payload from the information in self.xfer"""
+ resp_payload = StrsPayload()
+ resp_payload.src_epid = src_epid
+ resp_payload.status = StrsStatus.OKAY
+ resp_payload.capacity_bytes = ChdrInputStream.CAPACITY_BYTES
+ resp_payload.capacity_pkts = 0xFFFFFF
+ resp_payload.xfer_count_bytes = self.xfer.num_bytes
+ resp_payload.xfer_count_pkts = self.xfer.num_packets
+ return resp_payload
+ def queue_packet(self, packet, recv_len, addr):
+ """Queue a packet to be processed by the ChdrInputStream"""
+ self.rx_queue.put((packet, recv_len, addr))
+class ChdrOutputStream:
+ """This class encapsulates a Tx Thread. It takes data from its
+ sample_source and then sends it in a data packet using its
+ send_wrapper.
+ The tx stream is configured using the stream_spec object, which
+ sets parameters such as sample rate and destination
+ """
+ def __init__(self, log, chdr_w, sample_source, stream_spec, send_wrapper):
+ self.log = log
+ self.chdr_w = chdr_w
+ self.sample_source = sample_source
+ self.stream_spec = stream_spec
+ self.send_wrapper = send_wrapper
+ self.xfer = XferCount()
+ self.stop = False
+ self.thread = Thread(target=self._tx_worker, daemon=True)
+ self.thread.start()
+ def _tx_worker(self):
+ self.log.info("Stream TX Worker Starting with {} packets/sec"
+ .format(1/self.stream_spec.seconds_per_packet()))
+ header = ChdrHeader()
+ start_time = time.time()
+ next_send = start_time
+ header.dst_epid = self.stream_spec.dst_epid
+ header.pkt_type = PacketType.DATA_NO_TS
+ num_samps_left = None
+ if not self.stream_spec.is_continuous:
+ num_samps_left = self.stream_spec.total_samples * 4 # SC16 is 4 bytes per sample
+ for seq_num in self.stream_spec.seq_num_iter():
+ if self.stop:
+ self.log.info("Stream Worker Stopped")
+ break
+ if num_samps_left == 0:
+ break
+ header.seq_num = seq_num
+ packet = ChdrPacket(self.chdr_w, header, bytes(0))
+ packet_samples = self.stream_spec.packet_samples
+ if num_samps_left is not None:
+ packet_samples = min(packet_samples, num_samps_left)
+ num_samps_left -= packet_samples
+ packet = self.sample_source.fill_packet(packet, packet_samples)
+ if packet is None:
+ break
+ send_data = bytes(packet.serialize()) # Serialize before waiting
+ delay = next_send - time.time()
+ if delay > 0:
+ time.sleep(delay)
+ next_send = next_send + self.stream_spec.seconds_per_packet()
+ self.send_wrapper.send_data(send_data, self.stream_spec.addr)
+ self.xfer.count_packet(len(send_data))
+ self.log.info("Stream Worker Done")
+ finish_time = time.time()
+ self.log.info("Actual Packet Rate was {} packets/sec"
+ .format(self.xfer.num_packets/(finish_time - start_time)))
+ self.sample_source.close()
+ def finish(self):
+ """Stops the ChdrOutputStream"""
+ self.stop = True
diff --git a/mpm/python/usrp_mpm/simulator/rfnoc_common.py b/mpm/python/usrp_mpm/simulator/rfnoc_common.py
new file mode 100644
index 000000000..d7f61a4c1
--- /dev/null
+++ b/mpm/python/usrp_mpm/simulator/rfnoc_common.py
@@ -0,0 +1,202 @@
+# Copyright 2020 Ettus Research, a National Instruments Brand
+# SPDX-License-Identifier: GPL-3.0-or-later
+This file contains common classes that are used by both rfnoc_graph.py
+and stream_endpoint_node.py
+from enum import IntEnum
+from uhd.chdr import MgmtOpCode, MgmtOpNodeInfo, MgmtOp, PacketType
+def to_iter(index_func, length):
+ """Allows looping over an indexed object in a for-each loop"""
+ for i in range(length):
+ yield index_func(i)
+def swap_src_dst(packet, payload):
+ """Swap the src_epid and the dst_epid of a packet"""
+ header = packet.get_header()
+ our_epid = header.dst_epid
+ header.dst_epid = payload.src_epid
+ payload.src_epid = our_epid
+ packet.set_header(header)
+class NodeType(IntEnum):
+ """The type of a node in a NoC Core
+ RTS is a magic value used to determine when to return a packet to
+ the sender
+ """
+ XBAR = 1
+ STRM_EP = 2
+ XPORT = 3
+ RTS = 0xFF
+class Node:
+ """Represents a node in a NoC Core
+ These objects are usually constructed by the caller of RFNoC Graph.
+ Initially, they have references to other nodes as indexes of the
+ list of nodes. The graph calls graph_init and from_index so this
+ node can initialize their references to node_ids instead.
+ """
+ def __init__(self, node_inst):
+ """node_inst should start at 0 and increase for every Node of
+ the same type that is constructed
+ """
+ self.node_inst = node_inst
+ self.log = None
+ self.get_device_id = None
+ def graph_init(self, log, get_device_id, **kwargs):
+ """This method is called to initialize the Node Graph"""
+ self.log = log
+ self.get_device_id = get_device_id
+ def get_id(self):
+ """Return the NodeID (device_id, node_type, node_inst)"""
+ return (self.get_device_id(), self.get_type(), self.node_inst)
+ def get_local_id(self):
+ """Return the Local NodeID (node_type, node_inst)"""
+ return (self.get_type(), self.node_inst)
+ def get_type(self):
+ """Returns the NodeType of this node"""
+ raise NotImplementedError
+ def handle_packet(self, packet, **kwargs):
+ """Processes a packet
+ The return value should be a node_id or RETURN_TO_SENDER
+ """
+ packet_type = packet.get_header().pkt_type
+ next_node = None
+ if packet_type == PacketType.MGMT:
+ next_node = self._handle_mgmt_packet(packet, **kwargs)
+ elif packet_type == PacketType.CTRL:
+ next_node = self._handle_ctrl_packet(packet, **kwargs)
+ elif packet_type in (PacketType.DATA_WITH_TS, PacketType.DATA_NO_TS):
+ next_node = self._handle_data_packet(packet, **kwargs)
+ elif packet_type == PacketType.STRS:
+ next_node = self._handle_strs_packet(packet, **kwargs)
+ elif packet_type == PacketType.STRC:
+ next_node = self._handle_strc_packet(packet, **kwargs)
+ else:
+ raise RuntimeError("Invalid Enum Value for PacketType: {}".format(packet_type))
+ # If the specific method for that packet isn't implemented, try the general method
+ if next_node == NotImplemented:
+ next_node = self._handle_default_packet(packet, **kwargs)
+ return next_node
+ # pylint: disable=unused-argument,no-self-use
+ def _handle_mgmt_packet(self, packet, **kwargs):
+ return NotImplemented
+ # pylint: disable=unused-argument,no-self-use
+ def _handle_ctrl_packet(self, packet, **kwargs):
+ return NotImplemented
+ # pylint: disable=unused-argument,no-self-use
+ def _handle_data_packet(self, packet, **kwargs):
+ return NotImplemented
+ # pylint: disable=unused-argument,no-self-use
+ def _handle_strc_packet(self, packet, **kwargs):
+ return NotImplemented
+ # pylint: disable=unused-argument,no-self-use
+ def _handle_strs_packet(self, packet, **kwargs):
+ return NotImplemented
+ def _handle_default_packet(self, packet, **kwargs):
+ raise RuntimeError("{} has no operation defined for a {} packet"
+ .format(self.__class__, packet.get_header().pkt_type))
+ def from_index(self, nodes):
+ """Initialize this node's indexes to block_id references"""
+ pass
+ def info_response(self, extended_info):
+ """Generate a node info response MgmtOp"""
+ return MgmtOp(
+ op_payload=MgmtOpNodeInfo(self.get_device_id(), self.get_type(),
+ self.node_inst, extended_info),
+ op_code=MgmtOpCode.INFO_RESP
+ )
+class StreamSpec:
+ """This class carries the configuration parameters of a Tx stream.
+ total_samples, is_continuous, and packet_samples come from the
+ radio registers (noc_block_regs.py)
+ sample_rate comes from an rpc to the daughterboard (through the
+ set_sample_rate method in chdr_endpoint.py)
+ dst_epid comes from the source stream_ep
+ addr comes from the xport passed through when routing to dst_epid
+ """
+ HIGH_MASK = (0xFFFFFFFF) << 32
+ def __init__(self):
+ self.init_timestamp = 0
+ self.is_timed = False
+ self.total_samples = 0
+ self.is_continuous = True
+ self.packet_samples = None
+ self.sample_rate = None
+ self.dst_epid = None
+ self.addr = None
+ def set_timestamp_lo(self, low):
+ """Set the low 32 bits of the initial timestamp"""
+ self.init_timestamp = (self.init_timestamp & (StreamSpec.HIGH_MASK)) \
+ | (low & StreamSpec.LOW_MASK)
+ def set_timestamp_hi(self, high):
+ """Set the high 32 bits of the initial timestamp"""
+ self.init_timestamp = (self.init_timestamp & StreamSpec.LOW_MASK) \
+ | ((high & StreamSpec.LOW_MASK) << 32)
+ def set_num_words_lo(self, low):
+ """Set the low 32 bits of the total_samples field"""
+ self.total_samples = (self.total_samples & (StreamSpec.HIGH_MASK)) \
+ | (low & StreamSpec.LOW_MASK)
+ def set_num_words_hi(self, high):
+ """Set the high 32 bits of the total_samples field"""
+ self.total_samples = (self.total_samples & StreamSpec.LOW_MASK) \
+ | ((high & StreamSpec.LOW_MASK) << 32)
+ def seconds_per_packet(self):
+ """Calculates how many seconds should be between each packet
+ transmit
+ """
+ assert self.packet_samples != 0
+ assert self.sample_rate != 0
+ return self.packet_samples / self.sample_rate
+ def seq_num_iter(self):
+ """Returns a generator which returns an incrementing integer
+ for each packet that should be sent. This is useful to set the
+ seq_num of each transmitted packet.
+ """
+ i = 0
+ while True:
+ if not self.is_continuous:
+ if i >= self.total_samples:
+ return
+ yield i
+ i += 1
+ def __str__(self):
+ return "StreamSpec{{total_samples: {}, is_continuous: {}, packet_samples: {}," \
+ "sample_rate: {}, dst_epid: {}, addr: {}}}" \
+ .format(self.total_samples, self.is_continuous, self.packet_samples,
+ self.sample_rate, self.dst_epid, self.addr)
diff --git a/mpm/python/usrp_mpm/simulator/rfnoc_graph.py b/mpm/python/usrp_mpm/simulator/rfnoc_graph.py
index c12592603..ada6e70b0 100644
--- a/mpm/python/usrp_mpm/simulator/rfnoc_graph.py
+++ b/mpm/python/usrp_mpm/simulator/rfnoc_graph.py
@@ -10,177 +10,10 @@ This module handles and responds to Management and Ctrl Packets. It
also instantiates the registers and acts as an interface between
the chdr packets on the network and the registers.
-from enum import IntEnum
-from uhd.chdr import PacketType, MgmtOp, MgmtOpCode, MgmtOpNodeInfo, \
- CtrlStatus, CtrlOpCode, MgmtOpCfg, MgmtOpSelDest
+from uhd.chdr import MgmtOpCode, MgmtOpCfg, MgmtOpSelDest
from .noc_block_regs import NocBlockRegs, NocBlock, StreamEndpointPort, NocBlockPort
-from .stream_ep_regs import StreamEpRegs, STRM_STATUS_FC_ENABLED
-class StreamSpec:
- """This class carries the configuration parameters of a Tx stream.
- total_samples, is_continuous, and packet_samples come from the
- radio registers (noc_block_regs.py)
- sample_rate comes from an rpc to the daughterboard (through the
- set_sample_rate method in chdr_endpoint.py)
- dst_epid comes from the source stream_ep
- addr comes from the xport passed through when routing to dst_epid
- """
- HIGH_MASK = (0xFFFFFFFF) << 32
- def __init__(self):
- self.total_samples = 0
- self.is_continuous = True
- self.packet_samples = None
- self.sample_rate = None
- self.dst_epid = None
- self.addr = None
- def set_num_words_lo(self, low):
- """Set the low 32 bits of the total_samples field"""
- self.total_samples = (self.total_samples & (StreamSpec.HIGH_MASK)) \
- | (low & StreamSpec.LOW_MASK)
- def set_num_words_hi(self, high):
- """Set the high 32 bits of the total_samples field"""
- self.total_samples = (self.total_samples & StreamSpec.LOW_MASK) \
- | ((high & StreamSpec.LOW_MASK) << 32)
- def seconds_per_packet(self):
- """Calculates how many seconds should be between each packet
- transmit
- """
- assert self.packet_samples != 0
- assert self.sample_rate != 0
- return self.packet_samples / self.sample_rate
- def __str__(self):
- return "StreamSpec{{total_samples: {}, is_continuous: {}, packet_samples: {}," \
- " sample_rate: {}, dst_epid: {}, addr: {}}}" \
- .format(self.total_samples, self.is_continuous, self.packet_samples,
- self.sample_rate, self.dst_epid, self.addr)
-def to_iter(index_func, length):
- """Allows looping over an indexed object in a for-each loop"""
- for i in range(length):
- yield index_func(i)
-def _swap_src_dst(packet, payload):
- """Swap the src_epid and the dst_epid of a packet"""
- header = packet.get_header()
- our_epid = header.dst_epid
- header.dst_epid = payload.src_epid
- payload.src_epid = our_epid
- packet.set_header(header)
-class NodeType(IntEnum):
- """The type of a node in a NoC Core
- RTS is a magic value used to determine when to return a packet to
- the sender
- """
- XBAR = 1
- STRM_EP = 2
- XPORT = 3
- RTS = 0xFF
-RETURN_TO_SENDER = (0, NodeType.RTS, 0)
-class Node:
- """Represents a node in a RFNoCGraph
- These objects are usually constructed by the caller of RFNoC Graph.
- Initially, they have references to other nodes as indexes of the
- list of nodes. The graph calls graph_init and from_index so this
- node can initialize their references to node_ids instead.
- Subclasses can override _handle_<packet_type>_packet methods, and
- _handle_default_packet is provided, which will receive packets whose
- specific methods are not overridden
- """
- def __init__(self, node_inst):
- """node_inst should start at 0 and increase for every Node of
- the same type that is constructed
- """
- self.device_id = None
- self.node_inst = node_inst
- self.log = None
- def graph_init(self, log, device_id, **kwargs):
- """This method is called to initialize the Node Graph"""
- self.device_id = device_id
- self.log = log
- def get_id(self):
- """Return the NodeID (device_id, node_type, node_inst)"""
- return (self.device_id, self.get_type(), self.node_inst)
- def get_type(self):
- """Returns the NodeType of this node"""
- raise NotImplementedError
- def handle_packet(self, packet, **kwargs):
- """Processes a packet
- The return value should be a node_id or RETURN_TO_SENDER
- """
- packet_type = packet.get_header().pkt_type
- next_node = None
- if packet_type == PacketType.MGMT:
- next_node = self._handle_mgmt_packet(packet, **kwargs)
- elif packet_type == PacketType.CTRL:
- next_node = self._handle_ctrl_packet(packet, **kwargs)
- elif packet_type in (PacketType.DATA_W_TS, PacketType.DATA_NO_TS):
- next_node = self._handle_data_packet(packet, **kwargs)
- elif packet_type == PacketType.STRS:
- next_node = self._handle_strs_packet(packet, **kwargs)
- elif packet_type == PacketType.STRC:
- next_node = self._handle_strc_packet(packet, **kwargs)
- else:
- raise RuntimeError("Invalid Enum Value for PacketType: {}".format(packet_type))
- if next_node == NotImplemented:
- next_node = self._handle_default_packet(packet, **kwargs)
- return next_node
- # pylint: disable=unused-argument,no-self-use
- def _handle_mgmt_packet(self, packet, **kwargs):
- return NotImplemented
- # pylint: disable=unused-argument,no-self-use
- def _handle_ctrl_packet(self, packet, **kwargs):
- return NotImplemented
- # pylint: disable=unused-argument,no-self-use
- def _handle_data_packet(self, packet, **kwargs):
- return NotImplemented
- # pylint: disable=unused-argument,no-self-use
- def _handle_strc_packet(self, packet, **kwargs):
- return NotImplemented
- # pylint: disable=unused-argument,no-self-use
- def _handle_strs_packet(self, packet, **kwargs):
- return NotImplemented
- def _handle_default_packet(self, packet, **kwargs):
- raise RuntimeError("{} has no operation defined for a {} packet"
- .format(self.__class__, packet.get_header().pkt_type))
- def from_index(self, nodes):
- """Initialize this node's indexes to block_id references"""
- pass
- def info_response(self, extended_info):
- """Generate a node info response MgmtOp"""
- return MgmtOp(
- op_payload=MgmtOpNodeInfo(self.device_id, self.get_type(),
- self.node_inst, extended_info),
- op_code=MgmtOpCode.INFO_RESP
- )
+from .rfnoc_common import Node, NodeType, StreamSpec, to_iter, swap_src_dst, RETURN_TO_SENDER
+from .stream_endpoint_node import StreamEndpointNode
class XportNode(Node):
"""Represents an Xport node
@@ -206,7 +39,7 @@ class XportNode(Node):
elif op.op_code == MgmtOpCode.RETURN:
send_upstream = True
- _swap_src_dst(packet, payload)
+ swap_src_dst(packet, payload)
elif op.op_code == MgmtOpCode.NOP:
elif op.op_code == MgmtOpCode.ADVERTISE:
@@ -222,8 +55,7 @@ class XportNode(Node):
if send_upstream:
- else:
- return self.downstream
+ return self.downstream
def _handle_default_packet(self, packet, **kwargs):
return self.downstream
@@ -262,7 +94,7 @@ class XbarNode(Node):
(self.nports_xport << 8) | self.nports))
elif op.op_code == MgmtOpCode.RETURN:
send_upstream = True
- _swap_src_dst(packet, payload)
+ swap_src_dst(packet, payload)
elif op.op_code == MgmtOpCode.NOP:
elif op.op_code == MgmtOpCode.ADVERTISE:
@@ -287,6 +119,8 @@ class XbarNode(Node):
elif destination is not None:
return destination
+ else:
+ return self._handle_default_packet(packet, **kwargs)
def _handle_default_packet(self, packet, **kwargs):
dst_epid = packet.get_header().dst_epid
@@ -301,121 +135,11 @@ class XbarNode(Node):
for i in range(len(self.ports)):
nodes_index = self.ports[i]
node = nodes[nodes_index]
- self.ports[i] = node.get_id()
+ self.ports[i] = node.get_local_id()
if node.__class__ is XportNode:
- node.downstream = self.get_id()
+ node.downstream = self.get_local_id()
elif node.__class__ is StreamEndpointNode:
- node.upstream = self.get_id()
-class StreamEndpointNode(Node):
- """Represents a Stream endpoint node
- This class contains a StreamEpRegs object. To clarify, management
- packets access these registers, while control packets access the
- registers of the noc_blocks which are held in the RFNoCGraph and
- passed into handle_packet as the regs parameter
- """
- def __init__(self, node_inst):
- super().__init__(node_inst)
- self.epid = node_inst
- self.dst_epid = None
- self.upstream = None
- self.sep_config = None
- # These 4 aren't configurable right now
- self.has_data = True
- self.has_ctrl = True
- self.input_ports = 2
- self.output_ports = 2
- self.ep_regs = StreamEpRegs(self.get_epid, self.set_epid, self.set_dst_epid,
- self.status_callback_out, self.status_callback_in, 4, 4 * 8000)
- def status_callback_out(self, status):
- """Called by the ep_regs when on a write to the
- """
- if status.cfg_start:
- # This only creates a new ChdrOutputStream if the cfg_start flag is set
- self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid))
- self.sep_config(self, True)
- def status_callback_in(self, status):
- """Called by the ep_regs on a write to the
- """
- # This always triggers the graph to create a new ChdrInputStream
- self.sep_config(self, False)
- def graph_init(self, log, device_id, sep_config, **kwargs):
- super().graph_init(log, device_id)
- self.ep_regs.log = log
- self.sep_config = sep_config
- def get_type(self):
- return NodeType.STRM_EP
- def get_epid(self):
- """ Get the endpoint id of this stream endpoint """
- return self.epid
- def set_epid(self, epid):
- """ Set the endpoint id of this stream endpoint """
- self.epid = epid
- def set_dst_epid(self, dst_epid):
- """ Set the destination endpoint id of this stream endpoint """
- self.dst_epid = dst_epid
- def _handle_mgmt_packet(self, packet, **kwargs):
- send_upstream = False
- payload = packet.get_payload_mgmt()
- our_hop = payload.pop_hop()
- for op in to_iter(our_hop.get_op, our_hop.get_num_ops()):
- if op.op_code == MgmtOpCode.INFO_REQ:
- ext_info = 0
- ext_info |= (1 if self.has_ctrl else 0) << 0
- ext_info |= (1 if self.has_data else 0) << 1
- ext_info |= (self.input_ports & 0x3F) << 2
- ext_info |= (self.output_ports & 0x3F) << 8
- payload.get_hop(0).add_op(self.info_response(ext_info))
- elif op.op_code == MgmtOpCode.RETURN:
- send_upstream = True
- _swap_src_dst(packet, payload)
- elif op.op_code == MgmtOpCode.NOP:
- pass
- elif op.op_code == MgmtOpCode.CFG_RD_REQ:
- request = MgmtOpCfg.parse(op.get_op_payload())
- value = self.ep_regs.read(request.addr)
- payload.get_hop(0).add_op(MgmtOp(MgmtOpCode.CFG_RD_RESP,
- MgmtOpCfg(request.addr, value)))
- elif op.op_code == MgmtOpCode.CFG_WR_REQ:
- request = MgmtOpCfg.parse(op.get_op_payload())
- self.ep_regs.write(request.addr, request.data)
- else:
- raise NotImplementedError("op_code {} is not implemented for "
- "StreamEndpointNode".format(op.op_code))
- self.log.trace("Stream Endpoint {} processed hop:\n{}"
- .format(self.node_inst, our_hop))
- packet.set_payload(payload)
- if send_upstream:
- def _handle_ctrl_packet(self, packet, regs, **kwargs):
- payload = packet.get_payload_ctrl()
- _swap_src_dst(packet, payload)
- if payload.status != CtrlStatus.OKAY:
- raise RuntimeError("Control Status not OK: {}".format(payload.status))
- if payload.op_code == CtrlOpCode.READ:
- payload.is_ack = True
- payload.set_data([regs.read(payload.address)])
- elif payload.op_code == CtrlOpCode.WRITE:
- payload.is_ack = True
- regs.write(payload.address, payload.get_data()[0])
- else:
- raise NotImplementedError("Unknown Control OpCode: {}".format(payload.op_code))
- packet.set_payload(payload)
+ node.upstream = self.get_local_id()
class RFNoCGraph:
"""This class holds all of the nodes of the NoC core and the Noc
@@ -424,25 +148,21 @@ class RFNoCGraph:
It serves as an interface between the ChdrEndpoint and the
individual blocks/nodes.
- def __init__(self, graph_list, log, device_id, create_tx_func,
- stop_tx_func, send_strc_func, create_rx_func):
+ def __init__(self, graph_list, log, device_id, send_wrapper, chdr_w):
self.log = log.getChild("Graph")
- self.send_strc = send_strc_func
self.device_id = device_id
self.stream_spec = StreamSpec()
- self.create_tx = create_tx_func
- self.stop_tx = stop_tx_func
- self.create_rx = create_rx_func
- num_stream_ep = 0
+ self.stream_ep = []
for node in graph_list:
if node.__class__ is StreamEndpointNode:
- num_stream_ep += 1
- node.graph_init(self.log, device_id, sep_config=self.prepare_stream_ep)
- # These must be done sequentially so that device_id is initialized on all nodes
+ self.stream_ep.append(node)
+ node.graph_init(self.log, self.get_device_id, send_wrapper=send_wrapper,
+ chdr_w=chdr_w, dst_to_addr=self.dst_to_addr)
+ # These must be done sequentially so that get_device_id is initialized on all nodes
# before from_index is called on any node
for node in graph_list:
- self.graph_map = {node.get_id(): node
+ self.graph_map = {node.get_local_id(): node
for node in graph_list}
# For now, just use one radio block and hardcode it to the first stream endpoint
radio = NocBlock(1 << 16, 2, 2, 512, 1, 0x12AD1000, 16)
@@ -452,8 +172,8 @@ class RFNoCGraph:
(NocBlockPort(0, 0), StreamEndpointPort(0, 0)),
(NocBlockPort(0, 1), StreamEndpointPort(0, 1))
- self.regs = NocBlockRegs(self.log, 1 << 16, True, 1, [radio], num_stream_ep, 1, 0xE320,
- adj_list, 8, 1, self.get_stream_spec, self.radio_tx_cmd,
+ self.regs = NocBlockRegs(self.log, 1 << 16, True, 1, [radio], len(self.stream_ep), 1,
+ 0xE320, adj_list, 8, 1, self.get_stream_spec, self.radio_tx_cmd,
def radio_tx_cmd(self, sep_block_id):
@@ -468,14 +188,12 @@ class RFNoCGraph:
# The NoC Block index for stream endpoints is the inst + 1
# See rfnoc_graph.cpp:rfnoc_graph_impl#_init_sep_map()
sep_inst = sep_blk - 1
- sep_id = (self.get_device_id(), NodeType.STRM_EP, sep_inst)
+ sep_id = (NodeType.STRM_EP, sep_inst)
stream_ep = self.graph_map[sep_id]
- self.stream_spec.dst_epid = stream_ep.dst_epid
self.stream_spec.addr = self.dst_to_addr(stream_ep)
self.log.info("Streaming with StreamSpec:")
- self.create_tx(stream_ep.epid, self.stream_spec)
- self.stream_spec = StreamSpec()
+ stream_ep.begin_output(self.stream_spec)
def radio_tx_stop(self, sep_block_id):
"""Triggers the destuction of a ChdrOutputStream in the ChdrEndpoint
@@ -487,24 +205,16 @@ class RFNoCGraph:
sep_inst = sep_blk - 1
# The NoC Block index for stream endpoints is the inst + 1
# See rfnoc_graph.cpp:rfnoc_graph_impl#_init_sep_map()
- sep_id = (self.get_device_id(), NodeType.STRM_EP, sep_inst)
- src_epid = self.graph_map[sep_id].epid
- self.stop_tx(src_epid)
+ sep_id = (NodeType.STRM_EP, sep_inst)
+ stream_ep = self.graph_map[sep_id]
+ stream_ep.end_output()
def get_device_id(self):
return self.device_id
- def prepare_stream_ep(self, stream_ep, is_tx):
- """This is called by a stream_ep when it receives a status update
- Depending on whether it was a tx or rx status, it will either
- trigger an strc packet or an rx stream
- """
- if is_tx:
- addr = self.dst_to_addr(stream_ep)
- self.send_strc(stream_ep, addr)
- else:
- self.create_rx(stream_ep.epid)
+ def set_device_id(self, device_id):
+ """Set this graph's rfnoc device id"""
+ self.device_id = device_id
def change_spp(self, spp):
"""Change the Stream Samples per Packet"""
@@ -541,7 +251,7 @@ class RFNoCGraph:
current_node = self.graph_map[upstream_id]
return current_node.addr_map[dst_epid]
- def handle_packet(self, packet, xport_input, addr):
+ def handle_packet(self, packet, xport_input, addr, sender, num_bytes):
"""Given a chdr_packet, the id of an xport node to serve as an
entry point, and a source address, send the packet through the
node graph.
@@ -549,14 +259,18 @@ class RFNoCGraph:
node_id = xport_input
response_packet = None
while node_id is not None:
- if node_id[1] == NodeType.RTS:
+ assert len(node_id) == 2, "Node returned non-local node_id of len {}: {}" \
+ .format(len(node_id), node_id)
+ if node_id[0] == NodeType.RTS:
response_packet = packet
node = self.graph_map[node_id]
# If the node returns a value, it is the node id of the
# node the packet should be passed to next
- node_id = node.handle_packet(packet, regs=self.regs, addr=addr)
+ node_id = node.handle_packet(packet, regs=self.regs, addr=addr,
+ sender=sender, num_bytes=num_bytes)
return response_packet
def get_stream_spec(self):
diff --git a/mpm/python/usrp_mpm/simulator/sample_source.py b/mpm/python/usrp_mpm/simulator/sample_source.py
new file mode 100644
index 000000000..184649994
--- /dev/null
+++ b/mpm/python/usrp_mpm/simulator/sample_source.py
@@ -0,0 +1,142 @@
+# Copyright 2020 Ettus Research, a National Instruments Brand
+# SPDX-License-Identifier: GPL-3.0-or-later
+This module contains the interface for providing data to a simulator
+stream and receiving data from a simulator stream.
+#TODO: This is currently unused, as the cli is largely incomplete
+sources = {}
+sinks = {}
+def cli_source(cls):
+ """This decorator adds a class to the global list of SampleSources"""
+ sources[cls.__name__] = cls
+ return cls
+def cli_sink(cls):
+ """This decorator adds a class to the global list of SampleSinks"""
+ sinks[cls.__name__] = cls
+ return cls
+class SampleSource:
+ """This class defines the interface of a SampleSource. It
+ provides samples to the simulator which are then sent over the
+ network to a UHD client.
+ """
+ def fill_packet(self, packet, payload_size):
+ """This method should fill the packet with enough samples to
+ make its payload payload_size bytes long.
+ Returning None signals that this source is exhausted.
+ """
+ raise NotImplementedError()
+ def close(self):
+ """Use this to clean up any resources held by the object"""
+ raise NotImplementedError()
+class SampleSink:
+ """This class provides the interface of a SampleSink. It serves
+ as a destination for smaples received over the network from a
+ UHD client.
+ """
+ def accept_packet(self, packet):
+ """Called whenever a new packet is received"""
+ raise NotImplementedError()
+ def close(self):
+ """Use this to clean up any resources held by the object"""
+ raise NotImplementedError()
+class NullSamples(SampleSource, SampleSink):
+ """This combination source/sink simply provides an infinite
+ number of samples with a value of zero. You may optionally provide
+ a log object which will enable debug output.
+ """
+ def __init__(self, log=None):
+ self.log = log
+ def fill_packet(self, packet, payload_size):
+ if self.log is not None:
+ self.log.debug("Null Source called, providing {} bytes of zeroes".format(payload_size))
+ payload = bytes(payload_size)
+ packet.set_payload_bytes(payload)
+ return packet
+ def accept_packet(self, packet):
+ if self.log is not None:
+ self.log.debug("Null Source called, accepting {} bytes of payload"
+ .format(len(packet.get_payload_bytes())))
+ def close(self):
+ pass
+class IOSource(SampleSource):
+ """This adaptor class creates a sample source using a read object
+ that provides a read(# of bytes) function.
+ (e.g. the result of an open("<filename>", "rb") call)
+ """
+ def __init__(self, read):
+ self.read_obj = read
+ def fill_packet(self, packet, payload_size):
+ payload = self.read_obj.read(payload_size)
+ if len(payload) == 0:
+ return None
+ packet.set_payload_bytes(payload)
+ return packet
+ def close(self):
+ self.read_obj.close()
+class IOSink(SampleSink):
+ """This adaptor class creates a sample sink using a write object
+ that provides a write(bytes) function.
+ (e.g. the result of an open("<filename>", "wb") call)
+ """
+ def __init__(self, write):
+ self.write_obj = write
+ def accept_packet(self, packet):
+ payload = packet.get_payload_bytes()
+ written = self.write_obj.write(bytes(payload))
+ assert written == len(payload)
+ def close(self):
+ self.write_obj.close()
+class FileSource(IOSource):
+ """This class creates a SampleSource using a file path"""
+ def __init__(self, read_file, repeat=False):
+ self.open = lambda: open(read_file, "rb")
+ if isinstance(repeat, bool):
+ self.repeat = repeat
+ else:
+ self.repeat = repeat == "True"
+ read = self.open()
+ super().__init__(read)
+ def fill_packet(self, packet, payload_size):
+ payload = self.read_obj.read(payload_size)
+ if len(payload) == 0:
+ if self.repeat:
+ self.read_obj.close()
+ self.read_obj = self.open()
+ payload = self.read_obj.read(payload_size)
+ else:
+ return None
+ packet.set_payload_bytes(payload)
+ return packet
+class FileSink(IOSink):
+ """This class creates a SampleSink using a file path"""
+ def __init__(self, write_file):
+ write = open(write_file, "wb")
+ super().__init__(write)
diff --git a/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py b/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py
new file mode 100644
index 000000000..b4477ee47
--- /dev/null
+++ b/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py
@@ -0,0 +1,206 @@
+# Copyright 2020 Ettus Research, a National Instruments Brand
+# SPDX-License-Identifier: GPL-3.0-or-later
+This is a stream endpoint node, which is used in the RFNoCGraph class.
+It also houses the logic to create output and input streams.
+from uhd.chdr import MgmtOpCode, MgmtOpCfg, MgmtOp, PacketType, CtrlStatus, CtrlOpCode, \
+ ChdrHeader, StrcOpCode, StrcPayload, ChdrPacket
+from .rfnoc_common import Node, NodeType, to_iter, swap_src_dst, RETURN_TO_SENDER
+from .stream_ep_regs import StreamEpRegs, STRM_STATUS_FC_ENABLED
+from .chdr_stream import ChdrOutputStream, ChdrInputStream
+class StreamEndpointNode(Node):
+ """Represents a Stream endpoint node
+ This class contains a StreamEpRegs object. To clarify, management
+ packets access these registers, while control packets access the
+ registers of the noc_blocks which are held in the RFNoCGraph and
+ passed into handle_packet as the regs parameter
+ """
+ def __init__(self, node_inst, source_gen, sink_gen):
+ super().__init__(node_inst)
+ self.epid = node_inst
+ self.dst_epid = None
+ self.upstream = None
+ # ---- These 4 aren't configurable right now
+ self.has_data = True
+ self.has_ctrl = True
+ self.input_ports = 1
+ self.output_ports = 1
+ # ----
+ self.input_stream = None
+ self.output_stream = None
+ self.chdr_w = None
+ self.send_wrapper = None
+ self.dst_to_addr = None
+ self.source_gen = source_gen
+ self.sink_gen = sink_gen
+ self.ep_regs = StreamEpRegs(self.get_epid, self.set_epid, self.set_dst_epid,
+ self.ctrl_status_callback_out, self.ctrl_status_callback_in,
+ 4, 4 * 8000)
+ def ctrl_status_callback_out(self, status):
+ """Called by the ep_regs when on a write to the
+ """
+ if status.cfg_start:
+ # This only creates a new ChdrOutputStream if the cfg_start flag is set
+ self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid))
+ addr = self.dst_to_addr(self)
+ self.send_strc(addr)
+ def ctrl_status_callback_in(self, status):
+ """Called by the ep_regs on a write to the
+ """
+ # This always triggers the graph to create a new ChdrInputStream
+ self.begin_input()
+ def graph_init(self, log, set_device_id, send_wrapper, chdr_w, dst_to_addr, **kwargs):
+ super().graph_init(log, set_device_id)
+ self.ep_regs.log = log
+ self.chdr_w = chdr_w
+ self.send_wrapper = send_wrapper
+ self.dst_to_addr = dst_to_addr
+ def get_type(self):
+ return NodeType.STRM_EP
+ def get_epid(self):
+ """Get this endpoint's endpoint id"""
+ return self.epid
+ def set_epid(self, epid):
+ """Set this endpoint's endpoint id"""
+ self.epid = epid
+ def set_dst_epid(self, dst_epid):
+ """Set this endpoint's destination endpoint id"""
+ self.dst_epid = dst_epid
+ def _handle_mgmt_packet(self, packet, **kwargs):
+ send_upstream = False
+ payload = packet.get_payload_mgmt()
+ our_hop = payload.pop_hop()
+ for op in to_iter(our_hop.get_op, our_hop.get_num_ops()):
+ if op.op_code == MgmtOpCode.INFO_REQ:
+ ext_info = 0
+ ext_info |= (1 if self.has_ctrl else 0) << 0
+ ext_info |= (1 if self.has_data else 0) << 1
+ ext_info |= (self.input_ports & 0x3F) << 2
+ ext_info |= (self.output_ports & 0x3F) << 8
+ payload.get_hop(0).add_op(self.info_response(ext_info))
+ elif op.op_code == MgmtOpCode.RETURN:
+ send_upstream = True
+ swap_src_dst(packet, payload)
+ elif op.op_code == MgmtOpCode.NOP:
+ pass
+ elif op.op_code == MgmtOpCode.CFG_RD_REQ:
+ request = MgmtOpCfg.parse(op.get_op_payload())
+ value = self.ep_regs.read(request.addr)
+ payload.get_hop(0).add_op(MgmtOp(MgmtOpCode.CFG_RD_RESP,
+ MgmtOpCfg(request.addr, value)))
+ elif op.op_code == MgmtOpCode.CFG_WR_REQ:
+ request = MgmtOpCfg.parse(op.get_op_payload())
+ self.ep_regs.write(request.addr, request.data)
+ else:
+ raise NotImplementedError("op_code {} is not implemented for StreamEndpointNode"
+ .format(op.op_code))
+ self.log.trace("Stream Endpoint {} processed hop:\n{}"
+ .format(self.node_inst, our_hop))
+ packet.set_payload(payload)
+ if send_upstream:
+ self.log.trace("Stream Endpoint {} received packet:\n{}"
+ .format(self.node_inst, packet))
+ def _handle_ctrl_packet(self, packet, regs, **kwargs):
+ payload = packet.get_payload_ctrl()
+ swap_src_dst(packet, payload)
+ if payload.status != CtrlStatus.OKAY:
+ raise RuntimeError("Control Status not OK: {}".format(payload.status))
+ if payload.op_code == CtrlOpCode.READ:
+ payload.is_ack = True
+ payload.set_data([regs.read(payload.address)])
+ elif payload.op_code == CtrlOpCode.WRITE:
+ payload.is_ack = True
+ regs.write(payload.address, payload.get_data()[0])
+ else:
+ raise NotImplementedError("Unknown Control OpCode: {}".format(payload.op_code))
+ packet.set_payload(payload)
+ def _handle_data_packet(self, packet, num_bytes, sender, **kwargs):
+ assert self.input_stream is not None
+ self.input_stream.queue_packet(packet, num_bytes, sender)
+ def _handle_strs_packet(self, packet, **kwargs):
+ pass # TODO: Implement flow control for output streams
+ def _handle_strc_packet(self, packet, **kwargs):
+ self._handle_data_packet(packet, **kwargs)
+ def send_strc(self, addr):
+ """Send a Stream Command packet from the specified stream_ep
+ to the specified address.
+ This is not handled in ChdrOutputStream because the STRC must be
+ dispatched before UHD configures the samples per packet,
+ which is required to complete a StreamSpec
+ """
+ header = ChdrHeader()
+ header.dst_epid = self.dst_epid
+ header.pkt_type = PacketType.STRC
+ payload = StrcPayload()
+ payload.src_epid = self.epid
+ payload.op_code = StrcOpCode.INIT
+ packet = ChdrPacket(self.chdr_w, header, payload)
+ self.send_wrapper.send_packet(packet, addr)
+ def begin_output(self, stream_spec):
+ """Spin up a new ChdrOutputStream thread which transmits from src_epid
+ according to stream_spec.
+ This is triggered from RFNoC Graph when the radio receives a
+ Stream Command
+ """
+ # As of now, only one stream endpoint port per stream endpoint
+ # is supported.
+ assert self.output_stream is None, \
+ "Output Stream already running on epid: {}".format(self.epid)
+ stream_spec.dst_epid = self.dst_epid
+ self.output_stream = ChdrOutputStream(self.log, self.chdr_w, self.source_gen(),
+ stream_spec, self.send_wrapper)
+ def end_output(self):
+ """Stops src_epid's current transmission. This opens up the sep
+ to new transmissions in the future.
+ This is triggered either by the RFNoC Graph when the radio
+ receives a Stop Stream command or when the transmission has no
+ more samples to send.
+ """
+ self.output_stream.finish()
+ self.output_stream = None
+ def begin_input(self):
+ """Spin up a new ChdrInputStream thread which receives all data and strc
+ This is triggered by RFNoC Graph when there is a write to the
+ """
+ # As of now, only one stream endpoint port per stream endpoint
+ # is supported.
+ # Rx streams aren't explicitly ended by UHD. If we try to start
+ # a new one on the same epid, just quietly close the old one.
+ if self.input_stream is not None:
+ self.input_stream.finish()
+ self.input_stream = ChdrInputStream(self.log, self.chdr_w,
+ self.sink_gen(), self.send_wrapper, self.epid)