aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm/net.py
blob: be2d3f7541fbbeae0f0e242d1adaeb60008309cf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# Copyright 2017 Ettus Research (National Instruments)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
"""
N310 implementation module
"""
import itertools
import socket
from six import iteritems
from pyroute2 import IPRoute
from .mpmlog import get_logger


def get_valid_interfaces(iface_list):
    """
    Given a list of interfaces (['eth1', 'eth2'] for example), return the
    subset that contains actually valid entries.
    Interfaces are checked for if they actually exist, and if so, if they're up.
    """
    valid_ifaces = []
    with IPRoute() as ipr:
        for iface in iface_list:
            valid_iface_idx = ipr.link_lookup(ifname=iface)
            if len(valid_iface_idx) == 0:
                continue
            valid_iface_idx = valid_iface_idx[0]
            link_info = ipr.get_links(valid_iface_idx)[0]
            if link_info.get_attr('IFLA_OPERSTATE') == 'UP' \
                    and len(get_iface_addrs(link_info.get_attr('IFLA_ADDRESS'))):
                assert link_info.get_attr('IFLA_IFNAME') == iface
                valid_ifaces.append(iface)
    return valid_ifaces


def get_iface_info(ifname):
    """
    Given an interface name (e.g. 'eth1'), return a dictionary with the
    following keys:
    - ip_addr: Main IPv4 address
    - ip_addrs: List of valid IPv4 addresses
    - mac_addr: MAC address

    All values are stored as strings.
    """
    try:
        with IPRoute() as ipr:
            links = ipr.link_lookup(ifname=ifname)
            if len(links) == 0:
                raise LookupError("No interfaces known with name `{}'!"
                                  .format(ifname))
            link_info = ipr.get_links(links)[0]
    except IndexError:
        raise LookupError("Could not get links for interface `{}'"
                          .format(ifname))
    mac_addr = link_info.get_attr('IFLA_ADDRESS')
    ip_addrs = get_iface_addrs(mac_addr)
    return {
        'mac_addr': mac_addr,
        'ip_addr': ip_addrs[0],
        'ip_addrs': ip_addrs,
    }

def ip_addr_to_iface(ip_addr, iface_list):
    """
    Return an Ethernet interface (e.g. 'eth1') given an IP address.

    Arguments:
    ip_addr -- The IP address as a string
    iface_list -- A map "interface name" -> iface_info, where iface_info
                  is another map as returned by net.get_iface_info().
    """
    # Flip around the iface_info map and then use it to look up by IP addr
    return {
        iface_info['ip_addr']: iface_name
        for iface_name, iface_info in iteritems(iface_list)
    }[ip_addr]


def get_iface_addrs(mac_addr):
    """
    return ipv4 addresses for a given macaddress
    input format: "aa:bb:cc:dd:ee:ff"
    """
    with  IPRoute() as ip2:
        # returns index
        [link] = ip2.link_lookup(address=mac_addr)
        # Only get v4 addresses
        addresses = [addr.get_attrs('IFA_ADDRESS')
                     for addr in ip2.get_addr(family=socket.AF_INET)
                     if addr.get('index', None) == link]
        # flatten possibly nested list
        addresses = list(itertools.chain.from_iterable(addresses))

        return addresses


def byte_to_mac(byte_str):
    """
    converts a bytestring into nice hex representation
    """
    return ':'.join(["%02x" % ord(x) for x in byte_str])


def get_mac_addr(remote_addr):
    """
    return MAC address of a remote host already discovered
    or None if no host entry was found
    """
    with IPRoute() as ip2:
        addrs = ip2.get_neighbours(dst=remote_addr)
        if len(addrs) > 1:
            get_logger('get_mac_addr').warning("More than one device with the "
                                               "same IP address found. "
                                               "Picking entry at random")
        if not addrs:
            return None
        return addrs[0].get_attr('NDA_LLADDR')