aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py
diff options
context:
space:
mode:
Diffstat (limited to 'mpm/python/usrp_mpm/simulator/stream_endpoint_node.py')
-rw-r--r--mpm/python/usrp_mpm/simulator/stream_endpoint_node.py21
1 files changed, 19 insertions, 2 deletions
diff --git a/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py b/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py
index b4477ee47..06aaaa046 100644
--- a/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py
+++ b/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py
@@ -8,7 +8,7 @@ This is a stream endpoint node, which is used in the RFNoCGraph class.
It also houses the logic to create output and input streams.
"""
from uhd.chdr import MgmtOpCode, MgmtOpCfg, MgmtOp, PacketType, CtrlStatus, CtrlOpCode, \
- ChdrHeader, StrcOpCode, StrcPayload, ChdrPacket
+ ChdrHeader, StrcOpCode, StrcPayload, ChdrPacket, StrsStatus
from .rfnoc_common import Node, NodeType, to_iter, swap_src_dst, RETURN_TO_SENDER
from .stream_ep_regs import StreamEpRegs, STRM_STATUS_FC_ENABLED
from .chdr_stream import ChdrOutputStream, ChdrInputStream
@@ -39,6 +39,8 @@ class StreamEndpointNode(Node):
self.dst_to_addr = None
self.source_gen = source_gen
self.sink_gen = sink_gen
+ self.downstream_capacity = None
+ self.strs_handlers = {}
self.ep_regs = StreamEpRegs(self.get_epid, self.set_epid, self.set_dst_epid,
self.ctrl_status_callback_out, self.ctrl_status_callback_in,
4, 4 * 8000)
@@ -52,6 +54,12 @@ class StreamEndpointNode(Node):
self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid))
addr = self.dst_to_addr(self)
self.send_strc(addr)
+ def handle_initial_strs(strs_packet):
+ payload = strs_packet.get_payload_strs()
+ assert payload.status == StrsStatus.OKAY, \
+ "Received STRS Packet Err: {}".format(payload.status)
+ self.downstream_capacity = (payload.capacity_pkts, payload.capacity_bytes)
+ self.strs_handlers[self.epid] = handle_initial_strs
return STRM_STATUS_FC_ENABLED
def ctrl_status_callback_in(self, status):
@@ -141,7 +149,11 @@ class StreamEndpointNode(Node):
self.input_stream.queue_packet(packet, num_bytes, sender)
def _handle_strs_packet(self, packet, **kwargs):
- pass # TODO: Implement flow control for output streams
+ header = packet.get_header()
+ if header.dst_epid in self.strs_handlers:
+ self.strs_handlers.pop(header.dst_epid)(packet)
+ else:
+ self.output_stream.queue_packet(packet)
def _handle_strc_packet(self, packet, **kwargs):
self._handle_data_packet(packet, **kwargs)
@@ -160,6 +172,8 @@ class StreamEndpointNode(Node):
payload = StrcPayload()
payload.src_epid = self.epid
payload.op_code = StrcOpCode.INIT
+ payload.num_pkts = 10
+ payload.num_bytes = 10 * 1024 # 10 KB
packet = ChdrPacket(self.chdr_w, header, payload)
self.send_wrapper.send_packet(packet, addr)
@@ -175,6 +189,9 @@ class StreamEndpointNode(Node):
assert self.output_stream is None, \
"Output Stream already running on epid: {}".format(self.epid)
stream_spec.dst_epid = self.dst_epid
+ stream_spec.capacity_packets = self.downstream_capacity[0]
+ stream_spec.capacity_bytes = self.downstream_capacity[1]
+ self.downstream_capacity = None
self.output_stream = ChdrOutputStream(self.log, self.chdr_w, self.source_gen(),
stream_spec, self.send_wrapper)