From 0003d990bb4141908a1f347727b662e8db0d5a63 Mon Sep 17 00:00:00 2001 From: Samuel O'Brien Date: Fri, 7 Aug 2020 15:09:08 -0500 Subject: sim: Move SelectableQueue and SendWrapper This commit moves these two classes from chdr_stream.py to chdr_endpoint.py. ChdrEndpoint needs to be aware of the specific implementation of these classes, while ChdrInputStream and ChdrOutputStream treats them like black boxes. Therefore, it makes more sense to have these classes together with ChdrEndpoint Signed-off-by: Samuel O'Brien --- mpm/python/usrp_mpm/simulator/chdr_endpoint.py | 46 +++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) (limited to 'mpm/python/usrp_mpm/simulator/chdr_endpoint.py') diff --git a/mpm/python/usrp_mpm/simulator/chdr_endpoint.py b/mpm/python/usrp_mpm/simulator/chdr_endpoint.py index bca7f9c69..41efb37f6 100644 --- a/mpm/python/usrp_mpm/simulator/chdr_endpoint.py +++ b/mpm/python/usrp_mpm/simulator/chdr_endpoint.py @@ -14,10 +14,54 @@ import queue import select from uhd.chdr import ChdrPacket, ChdrWidth from .rfnoc_graph import XbarNode, XportNode, StreamEndpointNode, RFNoCGraph, NodeType -from .chdr_stream import SendWrapper, ChdrOutputStream, ChdrInputStream, SelectableQueue +from .chdr_stream import ChdrOutputStream, ChdrInputStream CHDR_W = ChdrWidth.W64 +class SelectableQueue: + """ A simple python Queue implementation which can be selected. + This allows waiting on a queue and a socket simultaneously. + """ + def __init__(self, max_size=0): + self._queue = queue.Queue(max_size) + self._send_signal_rx, self._send_signal_tx = socket.socketpair() + + def put(self, item, block=True, timeout=None): + """ Put an element into the queue, optionally blocking """ + self._queue.put(item, block, timeout) + self._send_signal_tx.send(b"\x00") + + def fileno(self): + """ A fileno compatible with select.select """ + return self._send_signal_rx.fileno() + + def get(self): + """ Return the first element in the queue, blocking if none + are available. + """ + self._send_signal_rx.recv(1) + return self._queue.get_nowait() + +class SendWrapper: + """This class is used as an abstraction over queueing packets to be + sent by the socket thread. + """ + def __init__(self, queue): + self.queue = queue + + def send_packet(self, packet, addr): + """Serialize packet and then queue the data to be sent to addr + returns the length of the serialized packet + """ + data = packet.serialize() + self.send_data(bytes(data), addr) + return len(data) + + def send_data(self, data, addr): + """Queue data to be sent to addr""" + self.queue.put((data, addr)) + + class ChdrEndpoint: """This class is created by the sim periph_manager It is responsible for opening sockets, dispatching all chdr packet -- cgit v1.2.3