diff options
Diffstat (limited to 'mpm/python/usrp_mpm/simulator/chdr_endpoint.py')
-rw-r--r-- | mpm/python/usrp_mpm/simulator/chdr_endpoint.py | 71 |
1 files changed, 46 insertions, 25 deletions
diff --git a/mpm/python/usrp_mpm/simulator/chdr_endpoint.py b/mpm/python/usrp_mpm/simulator/chdr_endpoint.py index 616cd0ce9..303f3268d 100644 --- a/mpm/python/usrp_mpm/simulator/chdr_endpoint.py +++ b/mpm/python/usrp_mpm/simulator/chdr_endpoint.py @@ -10,8 +10,12 @@ Graph. from threading import Thread import socket -from uhd.chdr import ChdrPacket, ChdrWidth, PacketType +import queue +import select +from uhd.chdr import ChdrPacket, ChdrWidth from .rfnoc_graph import XbarNode, XportNode, StreamEndpointNode, RFNoCGraph, NodeType +from .chdr_stream import SendWrapper, ChdrOutputStream, ChdrInputStream, SelectableQueue +from .sample_source import NullSamples CHDR_W = ChdrWidth.W64 @@ -27,11 +31,20 @@ class ChdrEndpoint: """ def __init__(self, log, extra_args): self.log = log.getChild("ChdrEndpoint") + self.source_gen = NullSamples + self.sink_gen = NullSamples + self.xport_map = {} + + self.send_queue = SelectableQueue() + self.send_wrapper = SendWrapper(self.send_queue) + + self.graph = RFNoCGraph(self.get_default_nodes(), self.log, 1, self.send_wrapper, CHDR_W) self.thread = Thread(target=self.socket_worker, daemon=True) self.thread.start() - self.graph = RFNoCGraph(self.get_default_nodes(), self.log, 1, self.begin_tx, - self.end_tx, self.send_strc, self.begin_rx) - self.xport_map = {} + + def set_device_id(self, device_id): + """Set the device_id for this endpoint""" + self.graph.set_device_id(device_id) def set_sample_rate(self, rate): """Set the sample_rate of the next tx_stream. @@ -48,7 +61,7 @@ class ChdrEndpoint: nodes = [ XportNode(0), XbarNode(0, [2], [0]), - StreamEndpointNode(0) + StreamEndpointNode(0, self.source_gen, self.sink_gen) ] return nodes @@ -76,24 +89,32 @@ class ChdrEndpoint: while True: # This allows us to block on multiple sockets at the same time + ready_list, _, _ = select.select([main_sock, self.send_queue], [], []) buffer = bytearray(8000) # Max MTU - # received Data over socket - n_bytes, sender = main_sock.recvfrom_into(buffer) - self.log.trace("received {} bytes of data from {}" - .format(n_bytes, sender)) - try: - packet = ChdrPacket.deserialize(CHDR_W, buffer[:n_bytes]) - self.log.trace("Decoded Packet: {}".format(packet.to_string_with_payload())) - entry_xport = (1, NodeType.XPORT, 0) - pkt_type = packet.get_header().pkt_type - response = self.graph.handle_packet(packet, entry_xport, sender) - - if response is not None: - data = response.serialize() - self.log.trace("Returning Packet: {}" - .format(packet.to_string_with_payload())) - main_sock.sendto(bytes(data), sender) - except BaseException as ex: - self.log.warning("Unable to decode packet: {}" - .format(ex)) - raise ex + for sock in ready_list: + if sock is main_sock: + # Received Data over socket + n_bytes, sender = main_sock.recvfrom_into(buffer) + self.log.trace("Received {} bytes of data from {}" + .format(n_bytes, sender)) + try: + packet = ChdrPacket.deserialize(CHDR_W, buffer[:n_bytes]) + self.log.trace("Decoded Packet: {}" + .format(packet.to_string_with_payload())) + entry_xport = (NodeType.XPORT, 0) + response = self.graph.handle_packet(packet, entry_xport, sender, + sender, n_bytes) + + if response is not None: + data = response.serialize() + self.log.trace("Returning Packet: {}" + .format(packet.to_string_with_payload())) + main_sock.sendto(bytes(data), sender) + except BaseException as ex: + self.log.warning("Unable to decode packet: {}" + .format(ex)) + raise ex + else: + data, addr = self.send_queue.get() + sent_len = main_sock.sendto(data, addr) + assert len(data) == sent_len, "Didn't send whole packet." |