aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm/simulator/chdr_endpoint.py
blob: 616cd0ce9d90c31ffc8f172ca414b1144552e3ae (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#
# Copyright 2020 Ettus Research, a National Instruments Brand
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""This module houses the ChdrEndpoint class, which handles networking,
packet dispatch, and acts as an interface between these and the RFNoC
Graph.
"""

from threading import Thread
import socket
from uhd.chdr import ChdrPacket, ChdrWidth, PacketType
from .rfnoc_graph import XbarNode, XportNode, StreamEndpointNode, RFNoCGraph, NodeType

CHDR_W = ChdrWidth.W64

class ChdrEndpoint:
    """This class is created by the sim periph_manager
    It is responsible for opening sockets, dispatching all chdr packet
    traffic to the appropriate destination, and responding to said
    traffic.

    The extra_args parameter is passed in from the periph_manager, and
    coresponds to the --default_args flag of usrp_hwd.py on the
    command line
    """
    def __init__(self, log, extra_args):
        self.log = log.getChild("ChdrEndpoint")
        self.thread = Thread(target=self.socket_worker, daemon=True)
        self.thread.start()
        self.graph = RFNoCGraph(self.get_default_nodes(), self.log, 1, self.begin_tx,
                                self.end_tx, self.send_strc, self.begin_rx)
        self.xport_map = {}

    def set_sample_rate(self, rate):
        """Set the sample_rate of the next tx_stream.

        This method is called by the daughterboard. It coresponds to
        sim_dboard.py:sim_db#set_catalina_clock_rate()
        """
        self.graph.get_stream_spec().sample_rate = rate

    def get_default_nodes(self):
        """Get a sensible NoC Core setup. This is the simplest
        functional layout. It has one of each required component.
        """
        nodes = [
            XportNode(0),
            XbarNode(0, [2], [0]),
            StreamEndpointNode(0)
        ]
        return nodes

    def send_strc(self, stream_ep, addr):
        pass # TODO: currently not implemented

    def begin_tx(self, src_epid, stream_spec):
        pass # TODO: currently not implemented

    def end_tx(self, src_epid):
        pass # TODO: currently not implemented

    def begin_rx(self, dst_epid):
        pass # TODO: currently not implemented

    def socket_worker(self):
        """This is the method that runs in a background thread. It
        blocks on the CHDR socket and processes packets as they come
        in.
        """
        self.log.info("Starting ChdrEndpoint Thread")
        main_sock = socket.socket(socket.AF_INET,
                                  socket.SOCK_DGRAM)
        main_sock.bind(("0.0.0.0", 49153))

        while True:
            # This allows us to block on multiple sockets at the same time
            buffer = bytearray(8000) # Max MTU
            # received Data over socket
            n_bytes, sender = main_sock.recvfrom_into(buffer)
            self.log.trace("received {} bytes of data from {}"
                           .format(n_bytes, sender))
            try:
                packet = ChdrPacket.deserialize(CHDR_W, buffer[:n_bytes])
                self.log.trace("Decoded Packet: {}".format(packet.to_string_with_payload()))
                entry_xport = (1, NodeType.XPORT, 0)
                pkt_type = packet.get_header().pkt_type
                response = self.graph.handle_packet(packet, entry_xport, sender)

                if response is not None:
                    data = response.serialize()
                    self.log.trace("Returning Packet: {}"
                                   .format(packet.to_string_with_payload()))
                    main_sock.sendto(bytes(data), sender)
            except BaseException as ex:
                self.log.warning("Unable to decode packet: {}"
                                 .format(ex))
                raise ex