#!/usr/bin/env python3 # # Notice: Some parts of this file were copied from PyBOMBS, which has a # different copyright, and is highlighted appropriately. The following # copyright notice pertains to all the parts written specifically for this # script. # # Copyright 2016 Ettus Research # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # """ Run Vivado builds """ import os import sys import re import json from datetime import datetime import time import argparse import subprocess import threading from queue import Queue, Empty READ_TIMEOUT = 0.1 # s ############################################################################# # The following functions were copied with minor modifications from PyBOMBS: def get_console_width(): ''' Returns width of console. http://stackoverflow.com/questions/566746/how-to-get-console-window-width-in-python ''' env = os.environ def ioctl_GWINSZ(fd): try: import fcntl, termios, struct cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) except: return return cr cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass if not cr: cr = (env.get('LINES', 25), env.get('COLUMNS', 80)) return cr[1] def which(program): """ Equivalent to Unix' `which` command. Returns None if the executable `program` can't be found. If a full path is given (e.g. /usr/bin/foo), it will return the path if the executable can be found, or None otherwise. If no path is given, it will search PATH. """ def is_exe(fpath): " Check fpath is an executable " return os.path.isfile(fpath) and os.access(fpath, os.X_OK) if os.path.split(program)[0] and is_exe(program): return program else: for path in os.environ.get("PATH", "").split(os.pathsep): exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None # # End of functions copied from PyBOMBS. ############################################################################# def print_timer(time_delta): """docstring for print_timer""" hours, secs = divmod(time_delta.seconds, 3600) mins, secs = divmod(secs, 60) return "[{h:02}:{m:02}:{s:02}]".format( h=hours, m=mins, s=secs, ) def list_search(patterns, string): " Returns True if string matches any element of pattern " for pattern in patterns: if re.search(pattern, string) is not None: return True return False def parse_args(): " Parses args for this script, and for Vivado. " parser = argparse.ArgumentParser( description="Run Vivado and parse output.", ) parser.add_argument( '--no-color', action="store_true", help="Don't colorize output.", ) parser.add_argument( '--vivado-command', default=None, help="Vivado command.", ) parser.add_argument( '--parse-config', default=None, help="Additional parser configurations", ) parser.add_argument( '-v', '--verbose', default=False, action='store_true', help="Print Vivado output") parser.add_argument( '--warnings', default=False, action='store_true', help="Print Vivado warnings") our_args, viv_args = parser.parse_known_args() return our_args, " ".join(viv_args) class VivadoRunner(object): " Vivado Runner " colors = { 'warning': '\033[0;35m', 'critical warning': '\033[33m', 'error': '\033[1;31m', 'fatal': '\033[1;31m', 'task': '\033[32m', 'cmd': '\033[1;34m', 'normal': '\033[0m', } # Black 0;30 Dark Gray 1;30 # Blue 0;34 Light Blue 1;34 # Green 0;32 Light Green 1;32 # Cyan 0;36 Light Cyan 1;36 # Red 0;31 Light Red 1;31 # Purple 0;35 Light Purple 1;35 # Brown 0;33 Yellow 1;33 # Light Gray 0;37 White 1;37 viv_tcl_cmds = { 'synth_design' : 'Synthesis', 'opt_design': 'Logic Optimization', 'place_design': 'Placer', 'route_design': 'Routing', 'phys_opt_design': 'Physical Synthesis', 'report_timing' : 'Timing Reporting', 'report_power': 'Power Reporting', 'report_drc': 'DRC', 'write_bitstream': 'Write Bitstream', } def __init__(self, args, viv_args): self.status = '' self.args = args self.current_task = "Initialization" self.current_phase = "Starting" self.command = args.vivado_command + " " + viv_args self.notif_queue = Queue() self.msg_counters = {} self.fatal_error_found = False self.line_types = { 'cmd': { 'regexes': [ '^Command: .+', ], 'action': self.show_cmd, 'id': "Command", }, 'task': { 'regexes': [ '^Starting .* Task', '^.*Translating synthesized netlist.*', '^\[TEST CASE .*', ], 'action': self.update_task, 'id': "Task", }, 'phase': { 'regexes': [ '^Phase (?P[a-zA-Z0-9/. ]*)$', '^Start (?P[a-zA-Z0-9/. ]*)$', '^(?PTESTBENCH STARTED: [\w_]*)$', ], 'action': self.update_phase, 'id': "Phase", }, 'warning': { 'regexes': [ '^WARNING' ], 'action': lambda x: self.act_on_build_msg('warning', x), 'id': "Warning", 'fatal': [ ] }, 'critical warning': { 'regexes': [ '^CRITICAL WARNING' ], 'action': lambda x: self.act_on_build_msg('critical warning', x), 'id': "Critical Warning", 'fatal': [ ] }, 'error': { 'regexes': [ '^ERROR', 'no such file or directory', '^Result: FAILED' ], 'action': lambda x: self.act_on_build_msg('error', x), 'id': "Error", 'fatal': [ '.', # All errors are fatal by default ] }, 'test': { 'regexes': [ '^ - T' '^Result: ' ], 'action': self.update_testbench, 'id': "Test" } } self.parse_config = None if args.parse_config is not None: try: args.parse_config = os.path.normpath(args.parse_config) parse_config = json.load(open(args.parse_config)) self.add_notification( "Using parser configuration from: {pc}".format(pc=args.parse_config), color=self.colors.get('normal') ) loadables = ('regexes', 'ignore', 'fatal') for line_type in self.line_types: for loadable in loadables: self.line_types[line_type][loadable] = \ self.line_types[line_type].get(loadable, []) + \ parse_config.get(line_type, {}).get(loadable, []) except (IOError, ValueError): self.add_notification( "Could not read parser configuration from: {pc}".format(pc=args.parse_config), color=self.colors.get('warning') ) self.tty = sys.stdout.isatty() self.timer = datetime.now() # Make sure this is the last line in ctor def run(self): """ Kick off Vivado build. Returns True if it all passed. """ def enqueue_output(stdout_data, stdout_queue): " Puts the output from the process into the queue " for line in iter(stdout_data.readline, b''): stdout_queue.put(line) stdout_data.close() def poll_queue(q): " Safe polling from queue " try: return q.get(timeout=READ_TIMEOUT).decode('utf-8') except UnicodeDecodeError: pass except Empty: pass return "" # Start process self.add_notification( "Executing command: {cmd}".format(cmd=self.command), add_time=True, color=self.colors.get('cmd') ) proc = subprocess.Popen( self.command, shell=True, # Yes we run this in a shell. Unsafe but helps with Vivado. stdout=subprocess.PIPE, stderr=subprocess.STDOUT # Pipe it all out via stdout ) # Init thread and queue q_stdout = Queue() t = threading.Thread(target=enqueue_output, args=(proc.stdout, q_stdout)) # End the thread when the program terminates t.daemon = True t.start() status_line_t = threading.Thread(target=VivadoRunner.run_loop, args=(self.print_status_line, 0.5 if self.tty else 60*10)) status_line_t.daemon = True status_line_t.start() # Run loop while proc.poll() is None or not q_stdout.empty(): # Run while process is alive line_stdout = poll_queue(q_stdout) self.update_output(line_stdout) success = (proc.returncode == 0) and not self.fatal_error_found self.cleanup_output(success) return success def update_output(self, lines): " Receives a line from Vivado output and acts upon it. " self.process_line(lines) @staticmethod def run_loop(func, delay, *args, **kwargs): while True: func(*args, **kwargs) time.sleep(delay) def print_status_line(self): " Prints status on stdout" old_status_line_len = len(self.status) self.update_status_line() sys.stdout.write("\x1b[2K\r") # Scroll cursor back to beginning and clear last line self.flush_notification_queue(old_status_line_len) sys.stdout.write(self.status) sys.stdout.flush() # Make sure we print enough spaces to clear out all of the previous message # if not msgs_printed: # sys.stdout.write(" " * max(0, old_status_line_len - len(self.status))) def cleanup_output(self, success): " Run final printery after all is said and done. " # Check message counts are within limits self.update_phase("Finished") self.add_notification( "Process terminated. Status: {status}".format(status='Success' if success else 'Failure'), add_time=True, color=self.colors.get("task" if success else "error") ) sys.stdout.write("\n") self.flush_notification_queue(len(self.status)) print("") print("========================================================") print("Warnings: ", self.msg_counters.get('warning', 0)) print("Critical Warnings: ", self.msg_counters.get('critical warning', 0)) print("Errors: ", self.msg_counters.get('error', 0)) print("") sys.stdout.flush() def process_line(self, lines): " process line " for line in [l.rstrip() for l in lines.split("\n") if len(l.strip())]: line_info, line_data = self.classify_line(line) if line_info is not None: self.line_types[line_info]['action'](line_data) elif self.args.verbose: print(line) def classify_line(self, line): """ Identify the current line. Return None if the line can't be identified. """ for line_type in self.line_types: for regex in self.line_types[line_type]['regexes']: re_obj = re.search(regex, line) if re_obj is not None: return line_type, re_obj.groupdict().get('id', line) return None, None def update_status_line(self): " Update self.status. Does not print anything! " status_line = "{timer} Current task: {task} +++ Current Phase: {phase}" self.status = status_line.format( timer=print_timer(datetime.now() - self.timer), task=self.current_task.strip(), phase=self.current_phase.strip(), ) def add_notification(self, msg, add_time=False, color=None): """ Format msg and add it as a notification to the queue. """ if add_time: msg = print_timer(datetime.now() - self.timer) + " " + msg if color is not None and not self.args.no_color: msg = color + msg + self.colors.get('normal') self.notif_queue.put(msg) def flush_notification_queue(self, min_len): " Print all strings in the notification queue. " msg_printed = False while not self.notif_queue.empty(): msg = self.notif_queue.get().strip() print(msg) msg_printed = True return msg_printed def act_on_build_msg(self, msg_type, msg): """ Act on a warning, error, critical warning, etc. """ if list_search(self.line_types[msg_type].get('fatal', []), msg): self.add_notification(msg, color=self.colors.get('fatal')) self.fatal_error_found = True elif not list_search(self.line_types[msg_type].get('ignore', []), msg): self.add_notification(msg, color=self.colors.get(msg_type)) self.msg_counters[msg_type] = self.msg_counters.get(msg_type, 0) + 1 def show_cmd(self, tcl_cmd): " Show the current command " self.update_phase("Finished") tcl_cmd = tcl_cmd.replace("Command:", "").strip() #sys.stdout.write("\n") self.add_notification("Executing Tcl: " + tcl_cmd, add_time=True, color=self.colors.get("cmd")) cmd = tcl_cmd.strip().split()[0]; if cmd in self.viv_tcl_cmds: cmd = self.viv_tcl_cmds[cmd] self.update_task("Starting " + cmd + " Command", is_new=False) #self.flush_notification_queue(len(self.status)) def update_task(self, task, is_new=True): " Update current task " # Special case: Treat "translation" as a phase as well if "Translating synthesized netlist" in task: task = "Translating Synthesized Netlist" filtered_task = task.replace("Starting", "").replace("Task", "").replace("Command", "") if is_new and (filtered_task != self.current_task): self.update_phase("Finished") self.current_task = filtered_task self.current_phase = "Starting" self.add_notification(task, add_time=True, color=self.colors.get("task")) sys.stdout.write("\n") self.print_status_line() def update_phase(self, phase): " Update current phase " self.current_phase = phase.strip() self.current_task = self.current_task.replace("Phase", "") sys.stdout.write("\n") self.print_status_line() def update_testbench(self, testbench): pass # Do nothing def main(): " Go, go, go! " args, viv_args = parse_args() if args.vivado_command is None: if which("vivado"): args.vivado_command = "vivado" elif which("vivado_lab"): args.vivado_command = "vivado_lab" else: print("Cannot find Vivado executable!") return False try: return VivadoRunner(args, viv_args).run() except KeyboardInterrupt: print("") print("") print("Caught Ctrl-C. Exiting.") print("") return False if __name__ == "__main__": exit(not main())