diff options
-rw-r--r-- | mpm/python/usrp_mpm/simulator/chdr_endpoint.py | 46 | ||||
-rw-r--r-- | mpm/python/usrp_mpm/simulator/chdr_stream.py | 44 |
2 files changed, 45 insertions, 45 deletions
diff --git a/mpm/python/usrp_mpm/simulator/chdr_endpoint.py b/mpm/python/usrp_mpm/simulator/chdr_endpoint.py index bca7f9c69..41efb37f6 100644 --- a/mpm/python/usrp_mpm/simulator/chdr_endpoint.py +++ b/mpm/python/usrp_mpm/simulator/chdr_endpoint.py @@ -14,10 +14,54 @@ import queue import select from uhd.chdr import ChdrPacket, ChdrWidth from .rfnoc_graph import XbarNode, XportNode, StreamEndpointNode, RFNoCGraph, NodeType -from .chdr_stream import SendWrapper, ChdrOutputStream, ChdrInputStream, SelectableQueue +from .chdr_stream import ChdrOutputStream, ChdrInputStream CHDR_W = ChdrWidth.W64 +class SelectableQueue: + """ A simple python Queue implementation which can be selected. + This allows waiting on a queue and a socket simultaneously. + """ + def __init__(self, max_size=0): + self._queue = queue.Queue(max_size) + self._send_signal_rx, self._send_signal_tx = socket.socketpair() + + def put(self, item, block=True, timeout=None): + """ Put an element into the queue, optionally blocking """ + self._queue.put(item, block, timeout) + self._send_signal_tx.send(b"\x00") + + def fileno(self): + """ A fileno compatible with select.select """ + return self._send_signal_rx.fileno() + + def get(self): + """ Return the first element in the queue, blocking if none + are available. + """ + self._send_signal_rx.recv(1) + return self._queue.get_nowait() + +class SendWrapper: + """This class is used as an abstraction over queueing packets to be + sent by the socket thread. + """ + def __init__(self, queue): + self.queue = queue + + def send_packet(self, packet, addr): + """Serialize packet and then queue the data to be sent to addr + returns the length of the serialized packet + """ + data = packet.serialize() + self.send_data(bytes(data), addr) + return len(data) + + def send_data(self, data, addr): + """Queue data to be sent to addr""" + self.queue.put((data, addr)) + + class ChdrEndpoint: """This class is created by the sim periph_manager It is responsible for opening sockets, dispatching all chdr packet diff --git a/mpm/python/usrp_mpm/simulator/chdr_stream.py b/mpm/python/usrp_mpm/simulator/chdr_stream.py index b8a59bb36..a23935bd5 100644 --- a/mpm/python/usrp_mpm/simulator/chdr_stream.py +++ b/mpm/python/usrp_mpm/simulator/chdr_stream.py @@ -56,50 +56,6 @@ class XferCount: def __str__(self): return "XferCount{{num_bytes:{}, num_packets:{}}}".format(self.num_bytes, self.num_packets) -class SelectableQueue: - """ A simple python Queue implementation which can be selected. - This allows waiting on a queue and a socket simultaneously. - """ - def __init__(self, max_size=0): - self._queue = queue.Queue(max_size) - self._send_signal_rx, self._send_signal_tx = socket.socketpair() - - def put(self, item, block=True, timeout=None): - """ Put an element into the queue, optionally blocking """ - self._queue.put(item, block, timeout) - self._send_signal_tx.send(b"\x00") - - def fileno(self): - """ A fileno compatible with select.select """ - return self._send_signal_rx.fileno() - - def get(self): - """ Return the first element in the queue, blocking if none - are available. - """ - self._send_signal_rx.recv(1) - return self._queue.get_nowait() - -class SendWrapper: - """This class is used as an abstraction over queueing packets to be - sent by the socket thread. - """ - def __init__(self, queue): - self.queue = queue - - def send_packet(self, packet, addr): - """Serialize packet and then queue the data to be sent to addr - returns the length of the serialized packet - """ - data = packet.serialize() - self.send_data(bytes(data), addr) - return len(data) - - def send_data(self, data, addr): - """Queue data to be sent to addr""" - self.queue.put((data, addr)) - - class ChdrInputStream: """This class encapsulates an Rx Thread. This thread blocks on a queue which receives STRC and DATA ChdrPackets. It places the data |