aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm/mpmlog.py
blob: 646670d9b1ef0c05b103f139e1f59e9acd9555c3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#
# Copyright 2017 Ettus Research, National Instruments Company
#
# SPDX-License-Identifier: GPL-3.0
#
"""
MPM Logging
"""

from __future__ import print_function
import copy
import logging
from logging import CRITICAL, ERROR, WARNING, INFO, DEBUG
from builtins import str

# Colors
BOLD = str('\033[1m')
RED = str('\x1b[31m')
YELLOW = str('\x1b[33m')
GREEN = str('\x1b[32m')
PINK = str('\x1b[35m')
GREY = str('\x1b[90m')
RESET = str('\x1b[0m')

# Additional log level
TRACE = 1

DEFAULT_LOG_LEVEL = TRACE

class ColorStreamHandler(logging.StreamHandler):
    """
    StreamHandler that prints colored output
    """
    def emit(self, record):
        """
        Prints record with colors.
        record is not modified.
        """
        record_ = copy.copy(record)
        levelno = record_.levelno
        if levelno >= CRITICAL:
            color = RED
        elif levelno >= ERROR:
            color = RED
        elif levelno >= WARNING:
            color = YELLOW
        elif levelno >= INFO:
            color = GREEN
        elif levelno >= DEBUG:
            color = PINK
        elif levelno >= TRACE:
            color = ''
        else: # NOTSET and anything else
            color = RESET
        record_.msg = BOLD + color + str(record_.msg) + RESET
        logging.StreamHandler.emit(self, record_)

class MPMLogger(logging.getLoggerClass()):
    """
    Extends the regular Python logging with level 'trace' (like UHD)
    """
    def __init__(self, *args, **kwargs):
        logging.Logger.__init__(self, *args, **kwargs)
        self.cpp_log_buf = None
        try:
            import usrp_mpm.libpyusrp_periphs as lib
            self.cpp_log_buf = lib.types.log_buf.make_singleton()
        except ImportError:
            pass

    def trace(self, *args, **kwargs):
        """ Extends logging for super-high verbosity """
        self.log(TRACE, *args, **kwargs)


LOGGER = None # Logger singleton
def get_main_logger(
        use_console=True,
        use_journal=False,
        console_color=True,
        log_default_delta=0
    ):
    """
    Returns the top-level logger object. This is the only API call from this
    file that should be used outside.
    """
    global LOGGER
    if LOGGER is not None:
        return LOGGER
    logging.addLevelName(TRACE, 'TRACE')
    logging.setLoggerClass(MPMLogger)
    LOGGER = logging.getLogger('MPM')
    if use_console:
        console_handler = ColorStreamHandler() if console_color else logging.StreamHandler()
        console_formatter = logging.Formatter("[%(name)s] [%(levelname)s] %(message)s")
        console_handler.setFormatter(console_formatter)
        LOGGER.addHandler(console_handler)
    if use_journal:
        from systemd.journal import JournalHandler
        journal_handler = JournalHandler(SYSLOG_IDENTIFIER='usrp_hwd')
        journal_formatter = logging.Formatter('[%(levelname)s] [%(module)s] %(message)s')
        journal_handler.setFormatter(journal_formatter)
        LOGGER.addHandler(journal_handler)
    # Set default level:
    default_log_level = int(min(
        DEFAULT_LOG_LEVEL - log_default_delta * 10,
        CRITICAL
    ))
    default_log_level = max(1, default_log_level - (default_log_level % 10))
    LOGGER.setLevel(default_log_level)
    # Connect to C++ logging:
    if LOGGER.cpp_log_buf is not None:
        lib_logger = LOGGER.getChild('lib')
        def log_from_cpp():
            " Callback for logging from C++ "
            log_level, component, message = LOGGER.cpp_log_buf.pop()
            if log_level:
                lib_logger.log(log_level, "[%s] %s",
                        component, message.strip())
        LOGGER.cpp_log_buf.set_notify_callback(log_from_cpp)
    return LOGGER

def get_logger(child_name):
    """
    Returns a child logger. Prior to calling this, get_main_logger() needs to
    have been called.
    """
    assert LOGGER is not None
    return get_main_logger().getChild(child_name)


if __name__ == "__main__":
    print("Testing logger: ")
    LOG = get_main_logger().getChild('test')
    LOG.setLevel(TRACE)
    LOG.trace("trace message")
    LOG.debug("debug message")
    LOG.info("info message")
    LOG.warning("warning message")
    LOG.error("error message")
    LOG.critical("critical message")

    LOG2 = get_main_logger()
    LOG3 = get_main_logger()
    assert LOG2 is LOG3