aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm
diff options
context:
space:
mode:
Diffstat (limited to 'mpm/python/usrp_mpm')
-rw-r--r--mpm/python/usrp_mpm/simulator/chdr_stream.py83
-rw-r--r--mpm/python/usrp_mpm/simulator/rfnoc_common.py15
-rw-r--r--mpm/python/usrp_mpm/simulator/stream_endpoint_node.py21
3 files changed, 85 insertions, 34 deletions
diff --git a/mpm/python/usrp_mpm/simulator/chdr_stream.py b/mpm/python/usrp_mpm/simulator/chdr_stream.py
index df7f613a4..b8a59bb36 100644
--- a/mpm/python/usrp_mpm/simulator/chdr_stream.py
+++ b/mpm/python/usrp_mpm/simulator/chdr_stream.py
@@ -12,7 +12,7 @@ import time
from threading import Thread
import queue
import socket
-from uhd.chdr import PacketType, StrcOpCode, StrsPayload, StrsStatus, ChdrHeader, ChdrPacket
+from uhd.chdr import PacketType, StrcOpCode, StrcPayload, StrsPayload, StrsStatus, ChdrHeader, ChdrPacket
class XferCount:
"""This class keeps track of flow control transfer status which are
@@ -30,6 +30,13 @@ class XferCount:
xfer.num_bytes = payload.num_bytes
return xfer
+ @classmethod
+ def from_strs(cls, payload):
+ xfer = cls()
+ xfer.num_packets = payload.xfer_count_pkts
+ xfer.num_bytes = payload.xfer_count_bytes
+ return xfer
+
def has_exceeded(self, limit):
"""returns true if this XferCount >= the limit
in either packets or bytes
@@ -134,15 +141,14 @@ class ChdrInputStream:
self.sample_sink.accept_packet(packet)
elif pkt_type == PacketType.STRC:
req_payload = packet.get_payload_strc()
- # Ping doesn't set a new target, just checks status
- if req_payload.op_code in (StrcOpCode.RESYNC, StrcOpCode.INIT):
- if req_payload.op_code == StrcOpCode.INIT:
- self.xfer.clear()
- self.fc_freq = XferCount.from_strc(req_payload)
- self.command_addr = addr
- self.command_epid = req_payload.src_epid
- else:
- self.xfer = XferCount.from_strc(req_payload)
+ # Ping doesn't change anything, just requests a stream status packet
+ if req_payload.op_code == StrcOpCode.INIT:
+ self.xfer.clear()
+ self.fc_freq = XferCount.from_strc(req_payload)
+ self.command_addr = addr
+ self.command_epid = req_payload.src_epid
+ elif req_payload.op_code == StrcOpCode.RESYNC:
+ self.xfer = XferCount.from_strc(req_payload)
resp_packet = self._generate_strs_packet(req_payload.src_epid, self.our_epid)
self.send_wrapper.send_packet(resp_packet, addr)
else:
@@ -200,34 +206,44 @@ class ChdrOutputStream:
self.stream_spec = stream_spec
self.send_wrapper = send_wrapper
self.xfer = XferCount()
+ self.recv = XferCount()
self.stop = False
+ self.strs_queue = queue.Queue(100)
+ self.strc_seq_num = 0
+ self.data_seq_num = 0
+
self.thread = Thread(target=self._tx_worker, daemon=True)
self.thread.start()
def _tx_worker(self):
self.log.info("Stream TX Worker Starting with {} packets/sec"
.format(1/self.stream_spec.seconds_per_packet()))
+ self.log.info("Downstream Buffer Capacity: {} packets or {} bytes"
+ .format(self.stream_spec.capacity_packets, self.stream_spec.capacity_bytes))
header = ChdrHeader()
start_time = time.time()
next_send = start_time
header.dst_epid = self.stream_spec.dst_epid
header.pkt_type = PacketType.DATA_NO_TS
+ is_continuous = self.stream_spec.is_continuous
num_samps_left = None
- if not self.stream_spec.is_continuous:
+ if not is_continuous:
+ # TODO: Put sample format/width in the stream spec
num_samps_left = self.stream_spec.total_samples * 4 # SC16 is 4 bytes per sample
- for seq_num in self.stream_spec.seq_num_iter():
+ timestamp = self.stream_spec.init_timestamp \
+ if self.stream_spec.is_timed else None
+
+ while is_continuous or num_samps_left > 0:
if self.stop:
self.log.info("Stream Worker Stopped")
break
- if num_samps_left == 0:
- break
- header.seq_num = seq_num
- timestamp = self.stream_spec.init_timestamp \
- if seq_num == 0 and self.stream_spec.is_timed \
- else None
- packet = ChdrPacket(self.chdr_w, header, bytes(0))
+ header.seq_num = self.data_seq_num
+ # When seq_num gets to 65535 (Max Unsigned 16 bit integer)
+ # It wraps back around to 0
+ self.data_seq_num = int(self.data_seq_num + 1) & 0xFFFF
+ packet = ChdrPacket(self.chdr_w, header, bytes(0), timestamp)
packet_samples = self.stream_spec.packet_samples
if num_samps_left is not None:
packet_samples = min(packet_samples, num_samps_left)
@@ -242,6 +258,14 @@ class ChdrOutputStream:
time.sleep(delay)
next_send = next_send + self.stream_spec.seconds_per_packet()
+ timestamp = None
+
+ # Check Flow Control to assert there is space downstream
+ while not self._can_fit_packet(len(send_data)):
+ strs_update = self.strs_queue.get()
+ strs_payload = strs_update.get_payload_strs()
+ self._update_recv(strs_payload)
+
self.send_wrapper.send_data(send_data, self.stream_spec.addr)
self.xfer.count_packet(len(send_data))
@@ -254,3 +278,24 @@ class ChdrOutputStream:
def finish(self):
"""Stops the ChdrOutputStream"""
self.stop = True
+
+ def queue_packet(self, packet):
+ """ Place an incoming STRS packet in the Queue """
+ self.strs_queue.put_nowait(packet)
+
+ def _can_fit_packet(self, length):
+ """ Can the downstream buffer fit a packet of length right now """
+ packets_in_transit = self.xfer.num_packets - self.recv.num_packets
+ space_packets = self.stream_spec.capacity_packets - packets_in_transit
+ bytes_in_transit = self.xfer.num_bytes - self.recv.num_bytes
+ space_bytes = self.stream_spec.capacity_bytes - bytes_in_transit
+ return space_packets > 0 and space_bytes >= length
+
+ def _update_recv(self, strs_payload):
+ """ Update the Xfer counts for downstream receive using
+ an incoming strs payload
+ """
+ assert strs_payload.status == StrsStatus.OKAY, \
+ "Flow Control Error: STRS Status is {}".format(strs_payload.status)
+ self.recv.num_packets = strs_payload.xfer_count_pkts
+ self.recv.num_bytes = strs_payload.xfer_count_bytes
diff --git a/mpm/python/usrp_mpm/simulator/rfnoc_common.py b/mpm/python/usrp_mpm/simulator/rfnoc_common.py
index d7f61a4c1..b1dab559f 100644
--- a/mpm/python/usrp_mpm/simulator/rfnoc_common.py
+++ b/mpm/python/usrp_mpm/simulator/rfnoc_common.py
@@ -153,6 +153,8 @@ class StreamSpec:
self.sample_rate = None
self.dst_epid = None
self.addr = None
+ self.capacity_packets = 0
+ self.capacity_bytes = 0
def set_timestamp_lo(self, low):
"""Set the low 32 bits of the initial timestamp"""
@@ -182,19 +184,6 @@ class StreamSpec:
assert self.sample_rate != 0
return self.packet_samples / self.sample_rate
- def seq_num_iter(self):
- """Returns a generator which returns an incrementing integer
- for each packet that should be sent. This is useful to set the
- seq_num of each transmitted packet.
- """
- i = 0
- while True:
- if not self.is_continuous:
- if i >= self.total_samples:
- return
- yield i
- i += 1
-
def __str__(self):
return "StreamSpec{{total_samples: {}, is_continuous: {}, packet_samples: {}," \
"sample_rate: {}, dst_epid: {}, addr: {}}}" \
diff --git a/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py b/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py
index b4477ee47..06aaaa046 100644
--- a/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py
+++ b/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py
@@ -8,7 +8,7 @@ This is a stream endpoint node, which is used in the RFNoCGraph class.
It also houses the logic to create output and input streams.
"""
from uhd.chdr import MgmtOpCode, MgmtOpCfg, MgmtOp, PacketType, CtrlStatus, CtrlOpCode, \
- ChdrHeader, StrcOpCode, StrcPayload, ChdrPacket
+ ChdrHeader, StrcOpCode, StrcPayload, ChdrPacket, StrsStatus
from .rfnoc_common import Node, NodeType, to_iter, swap_src_dst, RETURN_TO_SENDER
from .stream_ep_regs import StreamEpRegs, STRM_STATUS_FC_ENABLED
from .chdr_stream import ChdrOutputStream, ChdrInputStream
@@ -39,6 +39,8 @@ class StreamEndpointNode(Node):
self.dst_to_addr = None
self.source_gen = source_gen
self.sink_gen = sink_gen
+ self.downstream_capacity = None
+ self.strs_handlers = {}
self.ep_regs = StreamEpRegs(self.get_epid, self.set_epid, self.set_dst_epid,
self.ctrl_status_callback_out, self.ctrl_status_callback_in,
4, 4 * 8000)
@@ -52,6 +54,12 @@ class StreamEndpointNode(Node):
self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid))
addr = self.dst_to_addr(self)
self.send_strc(addr)
+ def handle_initial_strs(strs_packet):
+ payload = strs_packet.get_payload_strs()
+ assert payload.status == StrsStatus.OKAY, \
+ "Received STRS Packet Err: {}".format(payload.status)
+ self.downstream_capacity = (payload.capacity_pkts, payload.capacity_bytes)
+ self.strs_handlers[self.epid] = handle_initial_strs
return STRM_STATUS_FC_ENABLED
def ctrl_status_callback_in(self, status):
@@ -141,7 +149,11 @@ class StreamEndpointNode(Node):
self.input_stream.queue_packet(packet, num_bytes, sender)
def _handle_strs_packet(self, packet, **kwargs):
- pass # TODO: Implement flow control for output streams
+ header = packet.get_header()
+ if header.dst_epid in self.strs_handlers:
+ self.strs_handlers.pop(header.dst_epid)(packet)
+ else:
+ self.output_stream.queue_packet(packet)
def _handle_strc_packet(self, packet, **kwargs):
self._handle_data_packet(packet, **kwargs)
@@ -160,6 +172,8 @@ class StreamEndpointNode(Node):
payload = StrcPayload()
payload.src_epid = self.epid
payload.op_code = StrcOpCode.INIT
+ payload.num_pkts = 10
+ payload.num_bytes = 10 * 1024 # 10 KB
packet = ChdrPacket(self.chdr_w, header, payload)
self.send_wrapper.send_packet(packet, addr)
@@ -175,6 +189,9 @@ class StreamEndpointNode(Node):
assert self.output_stream is None, \
"Output Stream already running on epid: {}".format(self.epid)
stream_spec.dst_epid = self.dst_epid
+ stream_spec.capacity_packets = self.downstream_capacity[0]
+ stream_spec.capacity_bytes = self.downstream_capacity[1]
+ self.downstream_capacity = None
self.output_stream = ChdrOutputStream(self.log, self.chdr_w, self.source_gen(),
stream_spec, self.send_wrapper)