aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm/simulator/chdr_endpoint.py
diff options
context:
space:
mode:
Diffstat (limited to 'mpm/python/usrp_mpm/simulator/chdr_endpoint.py')
-rw-r--r--mpm/python/usrp_mpm/simulator/chdr_endpoint.py71
1 files changed, 46 insertions, 25 deletions
diff --git a/mpm/python/usrp_mpm/simulator/chdr_endpoint.py b/mpm/python/usrp_mpm/simulator/chdr_endpoint.py
index 616cd0ce9..303f3268d 100644
--- a/mpm/python/usrp_mpm/simulator/chdr_endpoint.py
+++ b/mpm/python/usrp_mpm/simulator/chdr_endpoint.py
@@ -10,8 +10,12 @@ Graph.
from threading import Thread
import socket
-from uhd.chdr import ChdrPacket, ChdrWidth, PacketType
+import queue
+import select
+from uhd.chdr import ChdrPacket, ChdrWidth
from .rfnoc_graph import XbarNode, XportNode, StreamEndpointNode, RFNoCGraph, NodeType
+from .chdr_stream import SendWrapper, ChdrOutputStream, ChdrInputStream, SelectableQueue
+from .sample_source import NullSamples
CHDR_W = ChdrWidth.W64
@@ -27,11 +31,20 @@ class ChdrEndpoint:
"""
def __init__(self, log, extra_args):
self.log = log.getChild("ChdrEndpoint")
+ self.source_gen = NullSamples
+ self.sink_gen = NullSamples
+ self.xport_map = {}
+
+ self.send_queue = SelectableQueue()
+ self.send_wrapper = SendWrapper(self.send_queue)
+
+ self.graph = RFNoCGraph(self.get_default_nodes(), self.log, 1, self.send_wrapper, CHDR_W)
self.thread = Thread(target=self.socket_worker, daemon=True)
self.thread.start()
- self.graph = RFNoCGraph(self.get_default_nodes(), self.log, 1, self.begin_tx,
- self.end_tx, self.send_strc, self.begin_rx)
- self.xport_map = {}
+
+ def set_device_id(self, device_id):
+ """Set the device_id for this endpoint"""
+ self.graph.set_device_id(device_id)
def set_sample_rate(self, rate):
"""Set the sample_rate of the next tx_stream.
@@ -48,7 +61,7 @@ class ChdrEndpoint:
nodes = [
XportNode(0),
XbarNode(0, [2], [0]),
- StreamEndpointNode(0)
+ StreamEndpointNode(0, self.source_gen, self.sink_gen)
]
return nodes
@@ -76,24 +89,32 @@ class ChdrEndpoint:
while True:
# This allows us to block on multiple sockets at the same time
+ ready_list, _, _ = select.select([main_sock, self.send_queue], [], [])
buffer = bytearray(8000) # Max MTU
- # received Data over socket
- n_bytes, sender = main_sock.recvfrom_into(buffer)
- self.log.trace("received {} bytes of data from {}"
- .format(n_bytes, sender))
- try:
- packet = ChdrPacket.deserialize(CHDR_W, buffer[:n_bytes])
- self.log.trace("Decoded Packet: {}".format(packet.to_string_with_payload()))
- entry_xport = (1, NodeType.XPORT, 0)
- pkt_type = packet.get_header().pkt_type
- response = self.graph.handle_packet(packet, entry_xport, sender)
-
- if response is not None:
- data = response.serialize()
- self.log.trace("Returning Packet: {}"
- .format(packet.to_string_with_payload()))
- main_sock.sendto(bytes(data), sender)
- except BaseException as ex:
- self.log.warning("Unable to decode packet: {}"
- .format(ex))
- raise ex
+ for sock in ready_list:
+ if sock is main_sock:
+ # Received Data over socket
+ n_bytes, sender = main_sock.recvfrom_into(buffer)
+ self.log.trace("Received {} bytes of data from {}"
+ .format(n_bytes, sender))
+ try:
+ packet = ChdrPacket.deserialize(CHDR_W, buffer[:n_bytes])
+ self.log.trace("Decoded Packet: {}"
+ .format(packet.to_string_with_payload()))
+ entry_xport = (NodeType.XPORT, 0)
+ response = self.graph.handle_packet(packet, entry_xport, sender,
+ sender, n_bytes)
+
+ if response is not None:
+ data = response.serialize()
+ self.log.trace("Returning Packet: {}"
+ .format(packet.to_string_with_payload()))
+ main_sock.sendto(bytes(data), sender)
+ except BaseException as ex:
+ self.log.warning("Unable to decode packet: {}"
+ .format(ex))
+ raise ex
+ else:
+ data, addr = self.send_queue.get()
+ sent_len = main_sock.sendto(data, addr)
+ assert len(data) == sent_len, "Didn't send whole packet."