aboutsummaryrefslogtreecommitdiffstats
path: root/mpm
diff options
context:
space:
mode:
authorSamuel O'Brien <sam.obrien@ni.com>2020-08-05 10:00:20 -0500
committerAaron Rossetto <aaron.rossetto@ni.com>2020-10-28 15:25:48 -0500
commitd42ddc804118b2e9120c84efd477f9f4b3f8472e (patch)
tree6fc552eedc9f04434b3efe223a0d0add858ebb1d /mpm
parent57ca4235b1b634d8c487fe2f0928ecc79f1bdbe7 (diff)
downloaduhd-d42ddc804118b2e9120c84efd477f9f4b3f8472e.tar.gz
uhd-d42ddc804118b2e9120c84efd477f9f4b3f8472e.tar.bz2
uhd-d42ddc804118b2e9120c84efd477f9f4b3f8472e.zip
sim: Support Streaming
This commit add support for both Tx and Rx streams to the simulator. Signed-off-by: Samuel O'Brien <sam.obrien@ni.com>
Diffstat (limited to 'mpm')
-rw-r--r--mpm/python/usrp_mpm/periph_manager/sim.py3
-rw-r--r--mpm/python/usrp_mpm/simulator/CMakeLists.txt4
-rw-r--r--mpm/python/usrp_mpm/simulator/chdr_endpoint.py71
-rw-r--r--mpm/python/usrp_mpm/simulator/chdr_stream.py224
-rw-r--r--mpm/python/usrp_mpm/simulator/rfnoc_common.py202
-rw-r--r--mpm/python/usrp_mpm/simulator/rfnoc_graph.py356
-rw-r--r--mpm/python/usrp_mpm/simulator/sample_source.py142
-rw-r--r--mpm/python/usrp_mpm/simulator/stream_endpoint_node.py206
8 files changed, 860 insertions, 348 deletions
diff --git a/mpm/python/usrp_mpm/periph_manager/sim.py b/mpm/python/usrp_mpm/periph_manager/sim.py
index 20d2486c0..b5cc45807 100644
--- a/mpm/python/usrp_mpm/periph_manager/sim.py
+++ b/mpm/python/usrp_mpm/periph_manager/sim.py
@@ -143,8 +143,6 @@ class sim(PeriphManagerBase):
self._xport_mgrs = {
'udp': SimXportMgrUDP(self.log, args, SimEthDispatcher)
}
- #TODO: Actually create transports here when RFNoC is integrated
- self.log.trace("CHDR transport creation was skipped")
# Init complete.
self.log.debug("Device info: {}".format(self.device_info))
@@ -174,6 +172,7 @@ class sim(PeriphManagerBase):
this motherboard.
"""
self.device_id = device_id
+ self.chdr_endpoint.set_device_id(device_id)
def get_device_id(self):
"""
diff --git a/mpm/python/usrp_mpm/simulator/CMakeLists.txt b/mpm/python/usrp_mpm/simulator/CMakeLists.txt
index b1f44ef12..e95709249 100644
--- a/mpm/python/usrp_mpm/simulator/CMakeLists.txt
+++ b/mpm/python/usrp_mpm/simulator/CMakeLists.txt
@@ -17,6 +17,10 @@ set(USRP_MPM_SIMULATOR_FILES
${CMAKE_CURRENT_SOURCE_DIR}/noc_block_regs.py
${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_graph.py
${CMAKE_CURRENT_SOURCE_DIR}/stream_ep_regs.py
+ ${CMAKE_CURRENT_SOURCE_DIR}/sample_source.py
+ ${CMAKE_CURRENT_SOURCE_DIR}/chdr_stream.py
+ ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_common.py
+ ${CMAKE_CURRENT_SOURCE_DIR}/stream_endpoint_node.py
)
list(APPEND USRP_MPM_FILES ${USRP_MPM_SIMULATOR_FILES})
set(USRP_MPM_FILES ${USRP_MPM_FILES} PARENT_SCOPE)
diff --git a/mpm/python/usrp_mpm/simulator/chdr_endpoint.py b/mpm/python/usrp_mpm/simulator/chdr_endpoint.py
index 616cd0ce9..303f3268d 100644
--- a/mpm/python/usrp_mpm/simulator/chdr_endpoint.py
+++ b/mpm/python/usrp_mpm/simulator/chdr_endpoint.py
@@ -10,8 +10,12 @@ Graph.
from threading import Thread
import socket
-from uhd.chdr import ChdrPacket, ChdrWidth, PacketType
+import queue
+import select
+from uhd.chdr import ChdrPacket, ChdrWidth
from .rfnoc_graph import XbarNode, XportNode, StreamEndpointNode, RFNoCGraph, NodeType
+from .chdr_stream import SendWrapper, ChdrOutputStream, ChdrInputStream, SelectableQueue
+from .sample_source import NullSamples
CHDR_W = ChdrWidth.W64
@@ -27,11 +31,20 @@ class ChdrEndpoint:
"""
def __init__(self, log, extra_args):
self.log = log.getChild("ChdrEndpoint")
+ self.source_gen = NullSamples
+ self.sink_gen = NullSamples
+ self.xport_map = {}
+
+ self.send_queue = SelectableQueue()
+ self.send_wrapper = SendWrapper(self.send_queue)
+
+ self.graph = RFNoCGraph(self.get_default_nodes(), self.log, 1, self.send_wrapper, CHDR_W)
self.thread = Thread(target=self.socket_worker, daemon=True)
self.thread.start()
- self.graph = RFNoCGraph(self.get_default_nodes(), self.log, 1, self.begin_tx,
- self.end_tx, self.send_strc, self.begin_rx)
- self.xport_map = {}
+
+ def set_device_id(self, device_id):
+ """Set the device_id for this endpoint"""
+ self.graph.set_device_id(device_id)
def set_sample_rate(self, rate):
"""Set the sample_rate of the next tx_stream.
@@ -48,7 +61,7 @@ class ChdrEndpoint:
nodes = [
XportNode(0),
XbarNode(0, [2], [0]),
- StreamEndpointNode(0)
+ StreamEndpointNode(0, self.source_gen, self.sink_gen)
]
return nodes
@@ -76,24 +89,32 @@ class ChdrEndpoint:
while True:
# This allows us to block on multiple sockets at the same time
+ ready_list, _, _ = select.select([main_sock, self.send_queue], [], [])
buffer = bytearray(8000) # Max MTU
- # received Data over socket
- n_bytes, sender = main_sock.recvfrom_into(buffer)
- self.log.trace("received {} bytes of data from {}"
- .format(n_bytes, sender))
- try:
- packet = ChdrPacket.deserialize(CHDR_W, buffer[:n_bytes])
- self.log.trace("Decoded Packet: {}".format(packet.to_string_with_payload()))
- entry_xport = (1, NodeType.XPORT, 0)
- pkt_type = packet.get_header().pkt_type
- response = self.graph.handle_packet(packet, entry_xport, sender)
-
- if response is not None:
- data = response.serialize()
- self.log.trace("Returning Packet: {}"
- .format(packet.to_string_with_payload()))
- main_sock.sendto(bytes(data), sender)
- except BaseException as ex:
- self.log.warning("Unable to decode packet: {}"
- .format(ex))
- raise ex
+ for sock in ready_list:
+ if sock is main_sock:
+ # Received Data over socket
+ n_bytes, sender = main_sock.recvfrom_into(buffer)
+ self.log.trace("Received {} bytes of data from {}"
+ .format(n_bytes, sender))
+ try:
+ packet = ChdrPacket.deserialize(CHDR_W, buffer[:n_bytes])
+ self.log.trace("Decoded Packet: {}"
+ .format(packet.to_string_with_payload()))
+ entry_xport = (NodeType.XPORT, 0)
+ response = self.graph.handle_packet(packet, entry_xport, sender,
+ sender, n_bytes)
+
+ if response is not None:
+ data = response.serialize()
+ self.log.trace("Returning Packet: {}"
+ .format(packet.to_string_with_payload()))
+ main_sock.sendto(bytes(data), sender)
+ except BaseException as ex:
+ self.log.warning("Unable to decode packet: {}"
+ .format(ex))
+ raise ex
+ else:
+ data, addr = self.send_queue.get()
+ sent_len = main_sock.sendto(data, addr)
+ assert len(data) == sent_len, "Didn't send whole packet."
diff --git a/mpm/python/usrp_mpm/simulator/chdr_stream.py b/mpm/python/usrp_mpm/simulator/chdr_stream.py
new file mode 100644
index 000000000..f63fba050
--- /dev/null
+++ b/mpm/python/usrp_mpm/simulator/chdr_stream.py
@@ -0,0 +1,224 @@
+#
+# Copyright 2020 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+This module contains the streaming backend for the simulator. It
+handles managing threads, as well as interfacing with sample sources
+and sinks.
+"""
+import time
+from threading import Thread
+import queue
+import socket
+from uhd.chdr import PacketType, StrcOpCode, StrsPayload, StrsStatus, ChdrHeader, ChdrPacket
+
+class XferCount:
+ """This class keeps track of flow control transfer status which are
+ used to populate Strc and Strs packets
+ """
+ def __init__(self):
+ self.num_bytes = 0
+ self.num_packets = 0
+
+ def count_packet(self, length):
+ """Accounts for a packet of len bytes in the xfer count"""
+ self.num_bytes += length
+ self.num_packets += 1
+
+ def clear(self):
+ """Reset the xfer counts to 0"""
+ self.num_bytes = 0
+ self.num_packets = 0
+
+ def __str__(self):
+ return "XferCount{{num_bytes:{}, num_packets:{}}}".format(self.num_bytes, self.num_packets)
+
+class SelectableQueue:
+ """ A simple python Queue implementation which can be selected.
+ This allows waiting on a queue and a socket simultaneously.
+ """
+ def __init__(self, max_size=0):
+ self._queue = queue.Queue(max_size)
+ self._send_signal_rx, self._send_signal_tx = socket.socketpair()
+
+ def put(self, item, block=True, timeout=None):
+ """ Put an element into the queue, optionally blocking """
+ self._queue.put(item, block, timeout)
+ self._send_signal_tx.send(b"\x00")
+
+ def fileno(self):
+ """ A fileno compatible with select.select """
+ return self._send_signal_rx.fileno()
+
+ def get(self):
+ """ Return the first element in the queue, blocking if none
+ are available.
+ """
+ self._send_signal_rx.recv(1)
+ return self._queue.get_nowait()
+
+class SendWrapper:
+ """This class is used as an abstraction over queueing packets to be
+ sent by the socket thread.
+ """
+ def __init__(self, queue):
+ self.queue = queue
+
+ def send_packet(self, packet, addr):
+ """Serialize packet and then queue the data to be sent to addr
+ returns the length of the serialized packet
+ """
+ data = packet.serialize()
+ self.send_data(bytes(data), addr)
+ return len(data)
+
+ def send_data(self, data, addr):
+ """Queue data to be sent to addr"""
+ self.queue.put((data, addr))
+
+
+class ChdrInputStream:
+ """This class encapsulates an Rx Thread. This thread blocks on a
+ queue which receives STRC and DATA ChdrPackets. It places the data
+ packets into the sample_sink and responds to the STRC packets using
+ the send_wrapper
+ """
+ CAPACITY_BYTES = int(5e6) # 5 MB
+ QUEUE_CAP = 3
+ def __init__(self, log, chdr_w, sample_sink, send_wrapper, our_epid):
+ self.log = log
+ self.chdr_w = chdr_w
+ self.sample_sink = sample_sink
+ self.send_wrapper = send_wrapper
+ self.xfer = XferCount()
+ self.accum = XferCount()
+ self.fc_freq = None
+ self.command_target = None
+ self.command_addr = None
+ self.command_epid = None
+ self.our_epid = our_epid
+ self.rx_queue = queue.Queue(ChdrInputStream.QUEUE_CAP)
+ self.stop = False
+ self.thread = Thread(target=self._rx_worker, daemon=True)
+ self.thread.start()
+
+ def _rx_worker(self):
+ self.log.info("Stream RX Worker Starting")
+ while True:
+ packet, recv_len, addr = self.rx_queue.get()
+ # This break is here because when ChdrInputStream.stop() is called,
+ # a tuple of 3 None values is pushed into the queue to unblock the worker.
+ if self.stop:
+ break
+ header = packet.get_header()
+ pkt_type = header.pkt_type
+ if pkt_type in (PacketType.DATA_WITH_TS, PacketType.DATA_NO_TS):
+ self.xfer.count_packet(recv_len)
+ self.sample_sink.accept_packet(packet)
+ elif pkt_type == PacketType.STRC:
+ req_payload = packet.get_payload_strc()
+ resp_header = ChdrHeader()
+ resp_header.dst_epid = req_payload.src_epid
+ if req_payload.op_code == StrcOpCode.INIT:
+ self.xfer.clear()
+ elif req_payload.op_code == StrcOpCode.RESYNC:
+ self.xfer.num_bytes = req_payload.xfer_count_bytes
+ self.xfer.num_packets = req_payload.xfer_count_pkts
+ resp_payload = self._generate_strs_payload(header.dst_epid)
+ resp_packet = ChdrPacket(self.chdr_w, resp_header, resp_payload)
+ self.send_wrapper.send_packet(resp_packet, addr)
+ else:
+ raise RuntimeError("RX Worker received unsupported packet: {}".format(pkt_type))
+ self.sample_sink.close()
+ self.log.info("Stream RX Worker Done")
+
+ def finish(self):
+ """Unblocks the worker and stops the thread.
+ The worker will close its sample_sink
+ """
+ self.stop = True
+ self.rx_queue.put((None, None, None))
+
+ def _generate_strs_payload(self, src_epid):
+ """Create an strs payload from the information in self.xfer"""
+ resp_payload = StrsPayload()
+ resp_payload.src_epid = src_epid
+ resp_payload.status = StrsStatus.OKAY
+ resp_payload.capacity_bytes = ChdrInputStream.CAPACITY_BYTES
+ resp_payload.capacity_pkts = 0xFFFFFF
+ resp_payload.xfer_count_bytes = self.xfer.num_bytes
+ resp_payload.xfer_count_pkts = self.xfer.num_packets
+ return resp_payload
+
+ def queue_packet(self, packet, recv_len, addr):
+ """Queue a packet to be processed by the ChdrInputStream"""
+ self.rx_queue.put((packet, recv_len, addr))
+
+class ChdrOutputStream:
+ """This class encapsulates a Tx Thread. It takes data from its
+ sample_source and then sends it in a data packet using its
+ send_wrapper.
+
+ The tx stream is configured using the stream_spec object, which
+ sets parameters such as sample rate and destination
+ """
+ def __init__(self, log, chdr_w, sample_source, stream_spec, send_wrapper):
+ self.log = log
+ self.chdr_w = chdr_w
+ self.sample_source = sample_source
+ self.stream_spec = stream_spec
+ self.send_wrapper = send_wrapper
+ self.xfer = XferCount()
+ self.stop = False
+ self.thread = Thread(target=self._tx_worker, daemon=True)
+ self.thread.start()
+
+ def _tx_worker(self):
+ self.log.info("Stream TX Worker Starting with {} packets/sec"
+ .format(1/self.stream_spec.seconds_per_packet()))
+ header = ChdrHeader()
+ start_time = time.time()
+ next_send = start_time
+ header.dst_epid = self.stream_spec.dst_epid
+ header.pkt_type = PacketType.DATA_NO_TS
+
+ num_samps_left = None
+ if not self.stream_spec.is_continuous:
+ num_samps_left = self.stream_spec.total_samples * 4 # SC16 is 4 bytes per sample
+
+ for seq_num in self.stream_spec.seq_num_iter():
+ if self.stop:
+ self.log.info("Stream Worker Stopped")
+ break
+ if num_samps_left == 0:
+ break
+ header.seq_num = seq_num
+ packet = ChdrPacket(self.chdr_w, header, bytes(0))
+ packet_samples = self.stream_spec.packet_samples
+ if num_samps_left is not None:
+ packet_samples = min(packet_samples, num_samps_left)
+ num_samps_left -= packet_samples
+ packet = self.sample_source.fill_packet(packet, packet_samples)
+ if packet is None:
+ break
+ send_data = bytes(packet.serialize()) # Serialize before waiting
+
+ delay = next_send - time.time()
+ if delay > 0:
+ time.sleep(delay)
+ next_send = next_send + self.stream_spec.seconds_per_packet()
+
+ self.send_wrapper.send_data(send_data, self.stream_spec.addr)
+ self.xfer.count_packet(len(send_data))
+
+ self.log.info("Stream Worker Done")
+ finish_time = time.time()
+ self.log.info("Actual Packet Rate was {} packets/sec"
+ .format(self.xfer.num_packets/(finish_time - start_time)))
+ self.sample_source.close()
+
+ def finish(self):
+ """Stops the ChdrOutputStream"""
+ self.stop = True
diff --git a/mpm/python/usrp_mpm/simulator/rfnoc_common.py b/mpm/python/usrp_mpm/simulator/rfnoc_common.py
new file mode 100644
index 000000000..d7f61a4c1
--- /dev/null
+++ b/mpm/python/usrp_mpm/simulator/rfnoc_common.py
@@ -0,0 +1,202 @@
+#
+# Copyright 2020 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+This file contains common classes that are used by both rfnoc_graph.py
+and stream_endpoint_node.py
+"""
+from enum import IntEnum
+from uhd.chdr import MgmtOpCode, MgmtOpNodeInfo, MgmtOp, PacketType
+
+def to_iter(index_func, length):
+ """Allows looping over an indexed object in a for-each loop"""
+ for i in range(length):
+ yield index_func(i)
+
+def swap_src_dst(packet, payload):
+ """Swap the src_epid and the dst_epid of a packet"""
+ header = packet.get_header()
+ our_epid = header.dst_epid
+ header.dst_epid = payload.src_epid
+ payload.src_epid = our_epid
+ packet.set_header(header)
+
+class NodeType(IntEnum):
+ """The type of a node in a NoC Core
+
+ RTS is a magic value used to determine when to return a packet to
+ the sender
+ """
+ INVALID = 0
+ XBAR = 1
+ STRM_EP = 2
+ XPORT = 3
+ RTS = 0xFF
+
+RETURN_TO_SENDER = (NodeType.RTS, 0)
+
+class Node:
+ """Represents a node in a NoC Core
+
+ These objects are usually constructed by the caller of RFNoC Graph.
+ Initially, they have references to other nodes as indexes of the
+ list of nodes. The graph calls graph_init and from_index so this
+ node can initialize their references to node_ids instead.
+ """
+ def __init__(self, node_inst):
+ """node_inst should start at 0 and increase for every Node of
+ the same type that is constructed
+ """
+ self.node_inst = node_inst
+ self.log = None
+ self.get_device_id = None
+
+ def graph_init(self, log, get_device_id, **kwargs):
+ """This method is called to initialize the Node Graph"""
+ self.log = log
+ self.get_device_id = get_device_id
+
+ def get_id(self):
+ """Return the NodeID (device_id, node_type, node_inst)"""
+ return (self.get_device_id(), self.get_type(), self.node_inst)
+
+ def get_local_id(self):
+ """Return the Local NodeID (node_type, node_inst)"""
+ return (self.get_type(), self.node_inst)
+
+ def get_type(self):
+ """Returns the NodeType of this node"""
+ raise NotImplementedError
+
+ def handle_packet(self, packet, **kwargs):
+ """Processes a packet
+ The return value should be a node_id or RETURN_TO_SENDER
+ """
+ packet_type = packet.get_header().pkt_type
+ next_node = None
+ if packet_type == PacketType.MGMT:
+ next_node = self._handle_mgmt_packet(packet, **kwargs)
+ elif packet_type == PacketType.CTRL:
+ next_node = self._handle_ctrl_packet(packet, **kwargs)
+ elif packet_type in (PacketType.DATA_WITH_TS, PacketType.DATA_NO_TS):
+ next_node = self._handle_data_packet(packet, **kwargs)
+ elif packet_type == PacketType.STRS:
+ next_node = self._handle_strs_packet(packet, **kwargs)
+ elif packet_type == PacketType.STRC:
+ next_node = self._handle_strc_packet(packet, **kwargs)
+ else:
+ raise RuntimeError("Invalid Enum Value for PacketType: {}".format(packet_type))
+ # If the specific method for that packet isn't implemented, try the general method
+ if next_node == NotImplemented:
+ next_node = self._handle_default_packet(packet, **kwargs)
+ return next_node
+
+ # pylint: disable=unused-argument,no-self-use
+ def _handle_mgmt_packet(self, packet, **kwargs):
+ return NotImplemented
+
+ # pylint: disable=unused-argument,no-self-use
+ def _handle_ctrl_packet(self, packet, **kwargs):
+ return NotImplemented
+
+ # pylint: disable=unused-argument,no-self-use
+ def _handle_data_packet(self, packet, **kwargs):
+ return NotImplemented
+
+ # pylint: disable=unused-argument,no-self-use
+ def _handle_strc_packet(self, packet, **kwargs):
+ return NotImplemented
+
+ # pylint: disable=unused-argument,no-self-use
+ def _handle_strs_packet(self, packet, **kwargs):
+ return NotImplemented
+
+ def _handle_default_packet(self, packet, **kwargs):
+ raise RuntimeError("{} has no operation defined for a {} packet"
+ .format(self.__class__, packet.get_header().pkt_type))
+
+ def from_index(self, nodes):
+ """Initialize this node's indexes to block_id references"""
+ pass
+
+ def info_response(self, extended_info):
+ """Generate a node info response MgmtOp"""
+ return MgmtOp(
+ op_payload=MgmtOpNodeInfo(self.get_device_id(), self.get_type(),
+ self.node_inst, extended_info),
+ op_code=MgmtOpCode.INFO_RESP
+ )
+
+class StreamSpec:
+ """This class carries the configuration parameters of a Tx stream.
+
+ total_samples, is_continuous, and packet_samples come from the
+ radio registers (noc_block_regs.py)
+
+ sample_rate comes from an rpc to the daughterboard (through the
+ set_sample_rate method in chdr_endpoint.py)
+
+ dst_epid comes from the source stream_ep
+
+ addr comes from the xport passed through when routing to dst_epid
+ """
+ LOW_MASK = 0xFFFFFFFF
+ HIGH_MASK = (0xFFFFFFFF) << 32
+ def __init__(self):
+ self.init_timestamp = 0
+ self.is_timed = False
+ self.total_samples = 0
+ self.is_continuous = True
+ self.packet_samples = None
+ self.sample_rate = None
+ self.dst_epid = None
+ self.addr = None
+
+ def set_timestamp_lo(self, low):
+ """Set the low 32 bits of the initial timestamp"""
+ self.init_timestamp = (self.init_timestamp & (StreamSpec.HIGH_MASK)) \
+ | (low & StreamSpec.LOW_MASK)
+
+ def set_timestamp_hi(self, high):
+ """Set the high 32 bits of the initial timestamp"""
+ self.init_timestamp = (self.init_timestamp & StreamSpec.LOW_MASK) \
+ | ((high & StreamSpec.LOW_MASK) << 32)
+
+ def set_num_words_lo(self, low):
+ """Set the low 32 bits of the total_samples field"""
+ self.total_samples = (self.total_samples & (StreamSpec.HIGH_MASK)) \
+ | (low & StreamSpec.LOW_MASK)
+
+ def set_num_words_hi(self, high):
+ """Set the high 32 bits of the total_samples field"""
+ self.total_samples = (self.total_samples & StreamSpec.LOW_MASK) \
+ | ((high & StreamSpec.LOW_MASK) << 32)
+
+ def seconds_per_packet(self):
+ """Calculates how many seconds should be between each packet
+ transmit
+ """
+ assert self.packet_samples != 0
+ assert self.sample_rate != 0
+ return self.packet_samples / self.sample_rate
+
+ def seq_num_iter(self):
+ """Returns a generator which returns an incrementing integer
+ for each packet that should be sent. This is useful to set the
+ seq_num of each transmitted packet.
+ """
+ i = 0
+ while True:
+ if not self.is_continuous:
+ if i >= self.total_samples:
+ return
+ yield i
+ i += 1
+
+ def __str__(self):
+ return "StreamSpec{{total_samples: {}, is_continuous: {}, packet_samples: {}," \
+ "sample_rate: {}, dst_epid: {}, addr: {}}}" \
+ .format(self.total_samples, self.is_continuous, self.packet_samples,
+ self.sample_rate, self.dst_epid, self.addr)
diff --git a/mpm/python/usrp_mpm/simulator/rfnoc_graph.py b/mpm/python/usrp_mpm/simulator/rfnoc_graph.py
index c12592603..ada6e70b0 100644
--- a/mpm/python/usrp_mpm/simulator/rfnoc_graph.py
+++ b/mpm/python/usrp_mpm/simulator/rfnoc_graph.py
@@ -10,177 +10,10 @@ This module handles and responds to Management and Ctrl Packets. It
also instantiates the registers and acts as an interface between
the chdr packets on the network and the registers.
"""
-from enum import IntEnum
-from uhd.chdr import PacketType, MgmtOp, MgmtOpCode, MgmtOpNodeInfo, \
- CtrlStatus, CtrlOpCode, MgmtOpCfg, MgmtOpSelDest
+from uhd.chdr import MgmtOpCode, MgmtOpCfg, MgmtOpSelDest
from .noc_block_regs import NocBlockRegs, NocBlock, StreamEndpointPort, NocBlockPort
-from .stream_ep_regs import StreamEpRegs, STRM_STATUS_FC_ENABLED
-
-class StreamSpec:
- """This class carries the configuration parameters of a Tx stream.
-
- total_samples, is_continuous, and packet_samples come from the
- radio registers (noc_block_regs.py)
-
- sample_rate comes from an rpc to the daughterboard (through the
- set_sample_rate method in chdr_endpoint.py)
-
- dst_epid comes from the source stream_ep
-
- addr comes from the xport passed through when routing to dst_epid
- """
- LOW_MASK = 0xFFFFFFFF
- HIGH_MASK = (0xFFFFFFFF) << 32
- def __init__(self):
- self.total_samples = 0
- self.is_continuous = True
- self.packet_samples = None
- self.sample_rate = None
- self.dst_epid = None
- self.addr = None
-
- def set_num_words_lo(self, low):
- """Set the low 32 bits of the total_samples field"""
- self.total_samples = (self.total_samples & (StreamSpec.HIGH_MASK)) \
- | (low & StreamSpec.LOW_MASK)
-
- def set_num_words_hi(self, high):
- """Set the high 32 bits of the total_samples field"""
- self.total_samples = (self.total_samples & StreamSpec.LOW_MASK) \
- | ((high & StreamSpec.LOW_MASK) << 32)
-
- def seconds_per_packet(self):
- """Calculates how many seconds should be between each packet
- transmit
- """
- assert self.packet_samples != 0
- assert self.sample_rate != 0
- return self.packet_samples / self.sample_rate
-
- def __str__(self):
- return "StreamSpec{{total_samples: {}, is_continuous: {}, packet_samples: {}," \
- " sample_rate: {}, dst_epid: {}, addr: {}}}" \
- .format(self.total_samples, self.is_continuous, self.packet_samples,
- self.sample_rate, self.dst_epid, self.addr)
-
-def to_iter(index_func, length):
- """Allows looping over an indexed object in a for-each loop"""
- for i in range(length):
- yield index_func(i)
-
-def _swap_src_dst(packet, payload):
- """Swap the src_epid and the dst_epid of a packet"""
- header = packet.get_header()
- our_epid = header.dst_epid
- header.dst_epid = payload.src_epid
- payload.src_epid = our_epid
- packet.set_header(header)
-
-class NodeType(IntEnum):
- """The type of a node in a NoC Core
-
- RTS is a magic value used to determine when to return a packet to
- the sender
- """
- INVALID = 0
- XBAR = 1
- STRM_EP = 2
- XPORT = 3
- RTS = 0xFF
-
-RETURN_TO_SENDER = (0, NodeType.RTS, 0)
-
-class Node:
- """Represents a node in a RFNoCGraph
-
- These objects are usually constructed by the caller of RFNoC Graph.
- Initially, they have references to other nodes as indexes of the
- list of nodes. The graph calls graph_init and from_index so this
- node can initialize their references to node_ids instead.
-
- Subclasses can override _handle_<packet_type>_packet methods, and
- _handle_default_packet is provided, which will receive packets whose
- specific methods are not overridden
- """
- def __init__(self, node_inst):
- """node_inst should start at 0 and increase for every Node of
- the same type that is constructed
- """
- self.device_id = None
- self.node_inst = node_inst
- self.log = None
-
- def graph_init(self, log, device_id, **kwargs):
- """This method is called to initialize the Node Graph"""
- self.device_id = device_id
- self.log = log
-
- def get_id(self):
- """Return the NodeID (device_id, node_type, node_inst)"""
- return (self.device_id, self.get_type(), self.node_inst)
-
- def get_type(self):
- """Returns the NodeType of this node"""
- raise NotImplementedError
-
- def handle_packet(self, packet, **kwargs):
- """Processes a packet
-
- The return value should be a node_id or RETURN_TO_SENDER
- """
- packet_type = packet.get_header().pkt_type
- next_node = None
- if packet_type == PacketType.MGMT:
- next_node = self._handle_mgmt_packet(packet, **kwargs)
- elif packet_type == PacketType.CTRL:
- next_node = self._handle_ctrl_packet(packet, **kwargs)
- elif packet_type in (PacketType.DATA_W_TS, PacketType.DATA_NO_TS):
- next_node = self._handle_data_packet(packet, **kwargs)
- elif packet_type == PacketType.STRS:
- next_node = self._handle_strs_packet(packet, **kwargs)
- elif packet_type == PacketType.STRC:
- next_node = self._handle_strc_packet(packet, **kwargs)
- else:
- raise RuntimeError("Invalid Enum Value for PacketType: {}".format(packet_type))
- if next_node == NotImplemented:
- next_node = self._handle_default_packet(packet, **kwargs)
- return next_node
-
- # pylint: disable=unused-argument,no-self-use
- def _handle_mgmt_packet(self, packet, **kwargs):
- return NotImplemented
-
- # pylint: disable=unused-argument,no-self-use
- def _handle_ctrl_packet(self, packet, **kwargs):
- return NotImplemented
-
- # pylint: disable=unused-argument,no-self-use
- def _handle_data_packet(self, packet, **kwargs):
- return NotImplemented
-
- # pylint: disable=unused-argument,no-self-use
- def _handle_strc_packet(self, packet, **kwargs):
- return NotImplemented
-
- # pylint: disable=unused-argument,no-self-use
- def _handle_strs_packet(self, packet, **kwargs):
- return NotImplemented
-
- def _handle_default_packet(self, packet, **kwargs):
- raise RuntimeError("{} has no operation defined for a {} packet"
- .format(self.__class__, packet.get_header().pkt_type))
-
- def from_index(self, nodes):
- """Initialize this node's indexes to block_id references"""
- pass
-
- def info_response(self, extended_info):
- """Generate a node info response MgmtOp"""
- return MgmtOp(
- op_payload=MgmtOpNodeInfo(self.device_id, self.get_type(),
- self.node_inst, extended_info),
- op_code=MgmtOpCode.INFO_RESP
- )
+from .rfnoc_common import Node, NodeType, StreamSpec, to_iter, swap_src_dst, RETURN_TO_SENDER
+from .stream_endpoint_node import StreamEndpointNode
class XportNode(Node):
"""Represents an Xport node
@@ -206,7 +39,7 @@ class XportNode(Node):
payload.get_hop(0).add_op(self.info_response(0))
elif op.op_code == MgmtOpCode.RETURN:
send_upstream = True
- _swap_src_dst(packet, payload)
+ swap_src_dst(packet, payload)
elif op.op_code == MgmtOpCode.NOP:
pass
elif op.op_code == MgmtOpCode.ADVERTISE:
@@ -222,8 +55,7 @@ class XportNode(Node):
packet.set_payload(payload)
if send_upstream:
return RETURN_TO_SENDER
- else:
- return self.downstream
+ return self.downstream
def _handle_default_packet(self, packet, **kwargs):
return self.downstream
@@ -262,7 +94,7 @@ class XbarNode(Node):
(self.nports_xport << 8) | self.nports))
elif op.op_code == MgmtOpCode.RETURN:
send_upstream = True
- _swap_src_dst(packet, payload)
+ swap_src_dst(packet, payload)
elif op.op_code == MgmtOpCode.NOP:
pass
elif op.op_code == MgmtOpCode.ADVERTISE:
@@ -287,6 +119,8 @@ class XbarNode(Node):
return RETURN_TO_SENDER
elif destination is not None:
return destination
+ else:
+ return self._handle_default_packet(packet, **kwargs)
def _handle_default_packet(self, packet, **kwargs):
dst_epid = packet.get_header().dst_epid
@@ -301,121 +135,11 @@ class XbarNode(Node):
for i in range(len(self.ports)):
nodes_index = self.ports[i]
node = nodes[nodes_index]
- self.ports[i] = node.get_id()
+ self.ports[i] = node.get_local_id()
if node.__class__ is XportNode:
- node.downstream = self.get_id()
+ node.downstream = self.get_local_id()
elif node.__class__ is StreamEndpointNode:
- node.upstream = self.get_id()
-
-class StreamEndpointNode(Node):
- """Represents a Stream endpoint node
-
- This class contains a StreamEpRegs object. To clarify, management
- packets access these registers, while control packets access the
- registers of the noc_blocks which are held in the RFNoCGraph and
- passed into handle_packet as the regs parameter
- """
- def __init__(self, node_inst):
- super().__init__(node_inst)
- self.epid = node_inst
- self.dst_epid = None
- self.upstream = None
- self.sep_config = None
- # These 4 aren't configurable right now
- self.has_data = True
- self.has_ctrl = True
- self.input_ports = 2
- self.output_ports = 2
- self.ep_regs = StreamEpRegs(self.get_epid, self.set_epid, self.set_dst_epid,
- self.status_callback_out, self.status_callback_in, 4, 4 * 8000)
-
- def status_callback_out(self, status):
- """Called by the ep_regs when on a write to the
- REG_OSTRM_CTRL_STATUS register
- """
- if status.cfg_start:
- # This only creates a new ChdrOutputStream if the cfg_start flag is set
- self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid))
- self.sep_config(self, True)
- return STRM_STATUS_FC_ENABLED
-
- def status_callback_in(self, status):
- """Called by the ep_regs on a write to the
- REG_OSTRM_CTRL_STATUS register
- """
- # This always triggers the graph to create a new ChdrInputStream
- self.sep_config(self, False)
- return STRM_STATUS_FC_ENABLED
-
- def graph_init(self, log, device_id, sep_config, **kwargs):
- super().graph_init(log, device_id)
- self.ep_regs.log = log
- self.sep_config = sep_config
-
- def get_type(self):
- return NodeType.STRM_EP
-
- def get_epid(self):
- """ Get the endpoint id of this stream endpoint """
- return self.epid
-
- def set_epid(self, epid):
- """ Set the endpoint id of this stream endpoint """
- self.epid = epid
-
- def set_dst_epid(self, dst_epid):
- """ Set the destination endpoint id of this stream endpoint """
- self.dst_epid = dst_epid
-
- def _handle_mgmt_packet(self, packet, **kwargs):
- send_upstream = False
- payload = packet.get_payload_mgmt()
- our_hop = payload.pop_hop()
- for op in to_iter(our_hop.get_op, our_hop.get_num_ops()):
- if op.op_code == MgmtOpCode.INFO_REQ:
- ext_info = 0
- ext_info |= (1 if self.has_ctrl else 0) << 0
- ext_info |= (1 if self.has_data else 0) << 1
- ext_info |= (self.input_ports & 0x3F) << 2
- ext_info |= (self.output_ports & 0x3F) << 8
- payload.get_hop(0).add_op(self.info_response(ext_info))
- elif op.op_code == MgmtOpCode.RETURN:
- send_upstream = True
- _swap_src_dst(packet, payload)
- elif op.op_code == MgmtOpCode.NOP:
- pass
- elif op.op_code == MgmtOpCode.CFG_RD_REQ:
- request = MgmtOpCfg.parse(op.get_op_payload())
- value = self.ep_regs.read(request.addr)
- payload.get_hop(0).add_op(MgmtOp(MgmtOpCode.CFG_RD_RESP,
- MgmtOpCfg(request.addr, value)))
- elif op.op_code == MgmtOpCode.CFG_WR_REQ:
- request = MgmtOpCfg.parse(op.get_op_payload())
- self.ep_regs.write(request.addr, request.data)
- else:
- raise NotImplementedError("op_code {} is not implemented for "
- "StreamEndpointNode".format(op.op_code))
- self.log.trace("Stream Endpoint {} processed hop:\n{}"
- .format(self.node_inst, our_hop))
- packet.set_payload(payload)
- if send_upstream:
- return RETURN_TO_SENDER
-
- def _handle_ctrl_packet(self, packet, regs, **kwargs):
- payload = packet.get_payload_ctrl()
- _swap_src_dst(packet, payload)
- if payload.status != CtrlStatus.OKAY:
- raise RuntimeError("Control Status not OK: {}".format(payload.status))
- if payload.op_code == CtrlOpCode.READ:
- payload.is_ack = True
- payload.set_data([regs.read(payload.address)])
- elif payload.op_code == CtrlOpCode.WRITE:
- payload.is_ack = True
- regs.write(payload.address, payload.get_data()[0])
- else:
- raise NotImplementedError("Unknown Control OpCode: {}".format(payload.op_code))
- packet.set_payload(payload)
- return RETURN_TO_SENDER
+ node.upstream = self.get_local_id()
class RFNoCGraph:
"""This class holds all of the nodes of the NoC core and the Noc
@@ -424,25 +148,21 @@ class RFNoCGraph:
It serves as an interface between the ChdrEndpoint and the
individual blocks/nodes.
"""
- def __init__(self, graph_list, log, device_id, create_tx_func,
- stop_tx_func, send_strc_func, create_rx_func):
+ def __init__(self, graph_list, log, device_id, send_wrapper, chdr_w):
self.log = log.getChild("Graph")
- self.send_strc = send_strc_func
self.device_id = device_id
self.stream_spec = StreamSpec()
- self.create_tx = create_tx_func
- self.stop_tx = stop_tx_func
- self.create_rx = create_rx_func
- num_stream_ep = 0
+ self.stream_ep = []
for node in graph_list:
if node.__class__ is StreamEndpointNode:
- num_stream_ep += 1
- node.graph_init(self.log, device_id, sep_config=self.prepare_stream_ep)
- # These must be done sequentially so that device_id is initialized on all nodes
+ self.stream_ep.append(node)
+ node.graph_init(self.log, self.get_device_id, send_wrapper=send_wrapper,
+ chdr_w=chdr_w, dst_to_addr=self.dst_to_addr)
+ # These must be done sequentially so that get_device_id is initialized on all nodes
# before from_index is called on any node
for node in graph_list:
node.from_index(graph_list)
- self.graph_map = {node.get_id(): node
+ self.graph_map = {node.get_local_id(): node
for node in graph_list}
# For now, just use one radio block and hardcode it to the first stream endpoint
radio = NocBlock(1 << 16, 2, 2, 512, 1, 0x12AD1000, 16)
@@ -452,8 +172,8 @@ class RFNoCGraph:
(NocBlockPort(0, 0), StreamEndpointPort(0, 0)),
(NocBlockPort(0, 1), StreamEndpointPort(0, 1))
]
- self.regs = NocBlockRegs(self.log, 1 << 16, True, 1, [radio], num_stream_ep, 1, 0xE320,
- adj_list, 8, 1, self.get_stream_spec, self.radio_tx_cmd,
+ self.regs = NocBlockRegs(self.log, 1 << 16, True, 1, [radio], len(self.stream_ep), 1,
+ 0xE320, adj_list, 8, 1, self.get_stream_spec, self.radio_tx_cmd,
self.radio_tx_stop)
def radio_tx_cmd(self, sep_block_id):
@@ -468,14 +188,12 @@ class RFNoCGraph:
# The NoC Block index for stream endpoints is the inst + 1
# See rfnoc_graph.cpp:rfnoc_graph_impl#_init_sep_map()
sep_inst = sep_blk - 1
- sep_id = (self.get_device_id(), NodeType.STRM_EP, sep_inst)
+ sep_id = (NodeType.STRM_EP, sep_inst)
stream_ep = self.graph_map[sep_id]
- self.stream_spec.dst_epid = stream_ep.dst_epid
self.stream_spec.addr = self.dst_to_addr(stream_ep)
self.log.info("Streaming with StreamSpec:")
self.log.info(str(self.stream_spec))
- self.create_tx(stream_ep.epid, self.stream_spec)
- self.stream_spec = StreamSpec()
+ stream_ep.begin_output(self.stream_spec)
def radio_tx_stop(self, sep_block_id):
"""Triggers the destuction of a ChdrOutputStream in the ChdrEndpoint
@@ -487,24 +205,16 @@ class RFNoCGraph:
sep_inst = sep_blk - 1
# The NoC Block index for stream endpoints is the inst + 1
# See rfnoc_graph.cpp:rfnoc_graph_impl#_init_sep_map()
- sep_id = (self.get_device_id(), NodeType.STRM_EP, sep_inst)
- src_epid = self.graph_map[sep_id].epid
- self.stop_tx(src_epid)
+ sep_id = (NodeType.STRM_EP, sep_inst)
+ stream_ep = self.graph_map[sep_id]
+ stream_ep.end_output()
def get_device_id(self):
return self.device_id
- def prepare_stream_ep(self, stream_ep, is_tx):
- """This is called by a stream_ep when it receives a status update
-
- Depending on whether it was a tx or rx status, it will either
- trigger an strc packet or an rx stream
- """
- if is_tx:
- addr = self.dst_to_addr(stream_ep)
- self.send_strc(stream_ep, addr)
- else:
- self.create_rx(stream_ep.epid)
+ def set_device_id(self, device_id):
+ """Set this graph's rfnoc device id"""
+ self.device_id = device_id
def change_spp(self, spp):
"""Change the Stream Samples per Packet"""
@@ -541,7 +251,7 @@ class RFNoCGraph:
current_node = self.graph_map[upstream_id]
return current_node.addr_map[dst_epid]
- def handle_packet(self, packet, xport_input, addr):
+ def handle_packet(self, packet, xport_input, addr, sender, num_bytes):
"""Given a chdr_packet, the id of an xport node to serve as an
entry point, and a source address, send the packet through the
node graph.
@@ -549,14 +259,18 @@ class RFNoCGraph:
node_id = xport_input
response_packet = None
while node_id is not None:
- if node_id[1] == NodeType.RTS:
+ assert len(node_id) == 2, "Node returned non-local node_id of len {}: {}" \
+ .format(len(node_id), node_id)
+
+ if node_id[0] == NodeType.RTS:
response_packet = packet
break
node = self.graph_map[node_id]
# If the node returns a value, it is the node id of the
# node the packet should be passed to next
# or RETURN_TO_SENDER
- node_id = node.handle_packet(packet, regs=self.regs, addr=addr)
+ node_id = node.handle_packet(packet, regs=self.regs, addr=addr,
+ sender=sender, num_bytes=num_bytes)
return response_packet
def get_stream_spec(self):
diff --git a/mpm/python/usrp_mpm/simulator/sample_source.py b/mpm/python/usrp_mpm/simulator/sample_source.py
new file mode 100644
index 000000000..184649994
--- /dev/null
+++ b/mpm/python/usrp_mpm/simulator/sample_source.py
@@ -0,0 +1,142 @@
+#
+# Copyright 2020 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+This module contains the interface for providing data to a simulator
+stream and receiving data from a simulator stream.
+"""
+
+#TODO: This is currently unused, as the cli is largely incomplete
+sources = {}
+sinks = {}
+
+def cli_source(cls):
+ """This decorator adds a class to the global list of SampleSources"""
+ sources[cls.__name__] = cls
+ return cls
+
+def cli_sink(cls):
+ """This decorator adds a class to the global list of SampleSinks"""
+ sinks[cls.__name__] = cls
+ return cls
+
+class SampleSource:
+ """This class defines the interface of a SampleSource. It
+ provides samples to the simulator which are then sent over the
+ network to a UHD client.
+ """
+ def fill_packet(self, packet, payload_size):
+ """This method should fill the packet with enough samples to
+ make its payload payload_size bytes long.
+ Returning None signals that this source is exhausted.
+ """
+ raise NotImplementedError()
+
+ def close(self):
+ """Use this to clean up any resources held by the object"""
+ raise NotImplementedError()
+
+class SampleSink:
+ """This class provides the interface of a SampleSink. It serves
+ as a destination for smaples received over the network from a
+ UHD client.
+ """
+ def accept_packet(self, packet):
+ """Called whenever a new packet is received"""
+ raise NotImplementedError()
+
+ def close(self):
+ """Use this to clean up any resources held by the object"""
+ raise NotImplementedError()
+
+@cli_source
+@cli_sink
+class NullSamples(SampleSource, SampleSink):
+ """This combination source/sink simply provides an infinite
+ number of samples with a value of zero. You may optionally provide
+ a log object which will enable debug output.
+ """
+ def __init__(self, log=None):
+ self.log = log
+
+ def fill_packet(self, packet, payload_size):
+ if self.log is not None:
+ self.log.debug("Null Source called, providing {} bytes of zeroes".format(payload_size))
+ payload = bytes(payload_size)
+ packet.set_payload_bytes(payload)
+ return packet
+
+ def accept_packet(self, packet):
+ if self.log is not None:
+ self.log.debug("Null Source called, accepting {} bytes of payload"
+ .format(len(packet.get_payload_bytes())))
+
+ def close(self):
+ pass
+
+class IOSource(SampleSource):
+ """This adaptor class creates a sample source using a read object
+ that provides a read(# of bytes) function.
+ (e.g. the result of an open("<filename>", "rb") call)
+ """
+ def __init__(self, read):
+ self.read_obj = read
+
+ def fill_packet(self, packet, payload_size):
+ payload = self.read_obj.read(payload_size)
+ if len(payload) == 0:
+ return None
+ packet.set_payload_bytes(payload)
+ return packet
+
+ def close(self):
+ self.read_obj.close()
+
+class IOSink(SampleSink):
+ """This adaptor class creates a sample sink using a write object
+ that provides a write(bytes) function.
+ (e.g. the result of an open("<filename>", "wb") call)
+ """
+ def __init__(self, write):
+ self.write_obj = write
+
+ def accept_packet(self, packet):
+ payload = packet.get_payload_bytes()
+ written = self.write_obj.write(bytes(payload))
+ assert written == len(payload)
+
+ def close(self):
+ self.write_obj.close()
+
+@cli_source
+class FileSource(IOSource):
+ """This class creates a SampleSource using a file path"""
+ def __init__(self, read_file, repeat=False):
+ self.open = lambda: open(read_file, "rb")
+ if isinstance(repeat, bool):
+ self.repeat = repeat
+ else:
+ self.repeat = repeat == "True"
+ read = self.open()
+ super().__init__(read)
+
+ def fill_packet(self, packet, payload_size):
+ payload = self.read_obj.read(payload_size)
+ if len(payload) == 0:
+ if self.repeat:
+ self.read_obj.close()
+ self.read_obj = self.open()
+ payload = self.read_obj.read(payload_size)
+ else:
+ return None
+ packet.set_payload_bytes(payload)
+ return packet
+
+@cli_sink
+class FileSink(IOSink):
+ """This class creates a SampleSink using a file path"""
+ def __init__(self, write_file):
+ write = open(write_file, "wb")
+ super().__init__(write)
diff --git a/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py b/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py
new file mode 100644
index 000000000..b4477ee47
--- /dev/null
+++ b/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py
@@ -0,0 +1,206 @@
+#
+# Copyright 2020 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+This is a stream endpoint node, which is used in the RFNoCGraph class.
+It also houses the logic to create output and input streams.
+"""
+from uhd.chdr import MgmtOpCode, MgmtOpCfg, MgmtOp, PacketType, CtrlStatus, CtrlOpCode, \
+ ChdrHeader, StrcOpCode, StrcPayload, ChdrPacket
+from .rfnoc_common import Node, NodeType, to_iter, swap_src_dst, RETURN_TO_SENDER
+from .stream_ep_regs import StreamEpRegs, STRM_STATUS_FC_ENABLED
+from .chdr_stream import ChdrOutputStream, ChdrInputStream
+
+class StreamEndpointNode(Node):
+ """Represents a Stream endpoint node
+
+ This class contains a StreamEpRegs object. To clarify, management
+ packets access these registers, while control packets access the
+ registers of the noc_blocks which are held in the RFNoCGraph and
+ passed into handle_packet as the regs parameter
+ """
+ def __init__(self, node_inst, source_gen, sink_gen):
+ super().__init__(node_inst)
+ self.epid = node_inst
+ self.dst_epid = None
+ self.upstream = None
+ # ---- These 4 aren't configurable right now
+ self.has_data = True
+ self.has_ctrl = True
+ self.input_ports = 1
+ self.output_ports = 1
+ # ----
+ self.input_stream = None
+ self.output_stream = None
+ self.chdr_w = None
+ self.send_wrapper = None
+ self.dst_to_addr = None
+ self.source_gen = source_gen
+ self.sink_gen = sink_gen
+ self.ep_regs = StreamEpRegs(self.get_epid, self.set_epid, self.set_dst_epid,
+ self.ctrl_status_callback_out, self.ctrl_status_callback_in,
+ 4, 4 * 8000)
+
+ def ctrl_status_callback_out(self, status):
+ """Called by the ep_regs when on a write to the
+ REG_OSTRM_CTRL_STATUS register
+ """
+ if status.cfg_start:
+ # This only creates a new ChdrOutputStream if the cfg_start flag is set
+ self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid))
+ addr = self.dst_to_addr(self)
+ self.send_strc(addr)
+ return STRM_STATUS_FC_ENABLED
+
+ def ctrl_status_callback_in(self, status):
+ """Called by the ep_regs on a write to the
+ REG_OSTRM_CTRL_STATUS register
+ """
+ # This always triggers the graph to create a new ChdrInputStream
+ self.begin_input()
+ return STRM_STATUS_FC_ENABLED
+
+ def graph_init(self, log, set_device_id, send_wrapper, chdr_w, dst_to_addr, **kwargs):
+ super().graph_init(log, set_device_id)
+ self.ep_regs.log = log
+ self.chdr_w = chdr_w
+ self.send_wrapper = send_wrapper
+ self.dst_to_addr = dst_to_addr
+
+ def get_type(self):
+ return NodeType.STRM_EP
+
+ def get_epid(self):
+ """Get this endpoint's endpoint id"""
+ return self.epid
+
+ def set_epid(self, epid):
+ """Set this endpoint's endpoint id"""
+ self.epid = epid
+
+ def set_dst_epid(self, dst_epid):
+ """Set this endpoint's destination endpoint id"""
+ self.dst_epid = dst_epid
+
+ def _handle_mgmt_packet(self, packet, **kwargs):
+ send_upstream = False
+ payload = packet.get_payload_mgmt()
+ our_hop = payload.pop_hop()
+ for op in to_iter(our_hop.get_op, our_hop.get_num_ops()):
+ if op.op_code == MgmtOpCode.INFO_REQ:
+ ext_info = 0
+ ext_info |= (1 if self.has_ctrl else 0) << 0
+ ext_info |= (1 if self.has_data else 0) << 1
+ ext_info |= (self.input_ports & 0x3F) << 2
+ ext_info |= (self.output_ports & 0x3F) << 8
+ payload.get_hop(0).add_op(self.info_response(ext_info))
+ elif op.op_code == MgmtOpCode.RETURN:
+ send_upstream = True
+ swap_src_dst(packet, payload)
+ elif op.op_code == MgmtOpCode.NOP:
+ pass
+ elif op.op_code == MgmtOpCode.CFG_RD_REQ:
+ request = MgmtOpCfg.parse(op.get_op_payload())
+ value = self.ep_regs.read(request.addr)
+ payload.get_hop(0).add_op(MgmtOp(MgmtOpCode.CFG_RD_RESP,
+ MgmtOpCfg(request.addr, value)))
+ elif op.op_code == MgmtOpCode.CFG_WR_REQ:
+ request = MgmtOpCfg.parse(op.get_op_payload())
+ self.ep_regs.write(request.addr, request.data)
+ else:
+ raise NotImplementedError("op_code {} is not implemented for StreamEndpointNode"
+ .format(op.op_code))
+ self.log.trace("Stream Endpoint {} processed hop:\n{}"
+ .format(self.node_inst, our_hop))
+ packet.set_payload(payload)
+ if send_upstream:
+ return RETURN_TO_SENDER
+ self.log.trace("Stream Endpoint {} received packet:\n{}"
+ .format(self.node_inst, packet))
+
+ def _handle_ctrl_packet(self, packet, regs, **kwargs):
+ payload = packet.get_payload_ctrl()
+ swap_src_dst(packet, payload)
+ if payload.status != CtrlStatus.OKAY:
+ raise RuntimeError("Control Status not OK: {}".format(payload.status))
+ if payload.op_code == CtrlOpCode.READ:
+ payload.is_ack = True
+ payload.set_data([regs.read(payload.address)])
+ elif payload.op_code == CtrlOpCode.WRITE:
+ payload.is_ack = True
+ regs.write(payload.address, payload.get_data()[0])
+ else:
+ raise NotImplementedError("Unknown Control OpCode: {}".format(payload.op_code))
+ packet.set_payload(payload)
+ return RETURN_TO_SENDER
+
+ def _handle_data_packet(self, packet, num_bytes, sender, **kwargs):
+ assert self.input_stream is not None
+ self.input_stream.queue_packet(packet, num_bytes, sender)
+
+ def _handle_strs_packet(self, packet, **kwargs):
+ pass # TODO: Implement flow control for output streams
+
+ def _handle_strc_packet(self, packet, **kwargs):
+ self._handle_data_packet(packet, **kwargs)
+
+ def send_strc(self, addr):
+ """Send a Stream Command packet from the specified stream_ep
+ to the specified address.
+
+ This is not handled in ChdrOutputStream because the STRC must be
+ dispatched before UHD configures the samples per packet,
+ which is required to complete a StreamSpec
+ """
+ header = ChdrHeader()
+ header.dst_epid = self.dst_epid
+ header.pkt_type = PacketType.STRC
+ payload = StrcPayload()
+ payload.src_epid = self.epid
+ payload.op_code = StrcOpCode.INIT
+ packet = ChdrPacket(self.chdr_w, header, payload)
+ self.send_wrapper.send_packet(packet, addr)
+
+ def begin_output(self, stream_spec):
+ """Spin up a new ChdrOutputStream thread which transmits from src_epid
+ according to stream_spec.
+
+ This is triggered from RFNoC Graph when the radio receives a
+ Stream Command
+ """
+ # As of now, only one stream endpoint port per stream endpoint
+ # is supported.
+ assert self.output_stream is None, \
+ "Output Stream already running on epid: {}".format(self.epid)
+ stream_spec.dst_epid = self.dst_epid
+ self.output_stream = ChdrOutputStream(self.log, self.chdr_w, self.source_gen(),
+ stream_spec, self.send_wrapper)
+
+ def end_output(self):
+ """Stops src_epid's current transmission. This opens up the sep
+ to new transmissions in the future.
+
+ This is triggered either by the RFNoC Graph when the radio
+ receives a Stop Stream command or when the transmission has no
+ more samples to send.
+ """
+ self.output_stream.finish()
+ self.output_stream = None
+
+ def begin_input(self):
+ """Spin up a new ChdrInputStream thread which receives all data and strc
+
+ This is triggered by RFNoC Graph when there is a write to the
+ REG_ISTRM_CTRL_STATUS register
+ """
+ # As of now, only one stream endpoint port per stream endpoint
+ # is supported.
+
+ # Rx streams aren't explicitly ended by UHD. If we try to start
+ # a new one on the same epid, just quietly close the old one.
+ if self.input_stream is not None:
+ self.input_stream.finish()
+ self.input_stream = ChdrInputStream(self.log, self.chdr_w,
+ self.sink_gen(), self.send_wrapper, self.epid)