diff options
Diffstat (limited to 'lib/python/qmk/cli')
| -rw-r--r-- | lib/python/qmk/cli/__init__.py | 71 | ||||
| -rw-r--r-- | lib/python/qmk/cli/console.py | 302 | 
2 files changed, 345 insertions, 28 deletions
| diff --git a/lib/python/qmk/cli/__init__.py b/lib/python/qmk/cli/__init__.py index 6fe769fe7b..3face93a53 100644 --- a/lib/python/qmk/cli/__init__.py +++ b/lib/python/qmk/cli/__init__.py @@ -13,6 +13,21 @@ from milc import cli, __VERSION__  from milc.questions import yesno +import_names = { +    # A mapping of package name to importable name +    'pep8-naming': 'pep8ext_naming', +    'pyusb': 'usb.core', +} + +safe_commands = [ +    # A list of subcommands we always run, even when the module imports fail +    'clone', +    'config', +    'env', +    'setup', +] + +  def _run_cmd(*command):      """Run a command in a subshell.      """ @@ -50,8 +65,8 @@ def _find_broken_requirements(requirements):              module_import = module_name.replace('-', '_')              # Not every module is importable by its own name. -            if module_name == "pep8-naming": -                module_import = "pep8ext_naming" +            if module_name in import_names: +                module_import = import_names[module_name]              if not find_spec(module_import):                  broken_modules.append(module_name) @@ -107,32 +122,31 @@ if int(milc_version[0]) < 2 and int(milc_version[1]) < 3:  # Check to make sure we have all our dependencies  msg_install = 'Please run `python3 -m pip install -r %s` to install required python dependencies.' - -if _broken_module_imports('requirements.txt'): -    if yesno('Would you like to install the required Python modules?'): -        _run_cmd(sys.executable, '-m', 'pip', 'install', '-r', 'requirements.txt') -    else: -        print() -        print(msg_install % (str(Path('requirements.txt').resolve()),)) -        print() -        exit(1) - -if cli.config.user.developer: -    args = sys.argv[1:] -    while args and args[0][0] == '-': -        del args[0] -    if not args or args[0] != 'config': -        if _broken_module_imports('requirements-dev.txt'): -            if yesno('Would you like to install the required developer Python modules?'): -                _run_cmd(sys.executable, '-m', 'pip', 'install', '-r', 'requirements-dev.txt') -            elif yesno('Would you like to disable developer mode?'): -                _run_cmd(sys.argv[0], 'config', 'user.developer=None') -            else: -                print() -                print(msg_install % (str(Path('requirements-dev.txt').resolve()),)) -                print('You can also turn off developer mode: qmk config user.developer=None') -                print() -                exit(1) +args = sys.argv[1:] +while args and args[0][0] == '-': +    del args[0] + +if not args or args[0] not in safe_commands: +    if _broken_module_imports('requirements.txt'): +        if yesno('Would you like to install the required Python modules?'): +            _run_cmd(sys.executable, '-m', 'pip', 'install', '-r', 'requirements.txt') +        else: +            print() +            print(msg_install % (str(Path('requirements.txt').resolve()),)) +            print() +            exit(1) + +    if cli.config.user.developer and _broken_module_imports('requirements-dev.txt'): +        if yesno('Would you like to install the required developer Python modules?'): +            _run_cmd(sys.executable, '-m', 'pip', 'install', '-r', 'requirements-dev.txt') +        elif yesno('Would you like to disable developer mode?'): +            _run_cmd(sys.argv[0], 'config', 'user.developer=None') +        else: +            print() +            print(msg_install % (str(Path('requirements-dev.txt').resolve()),)) +            print('You can also turn off developer mode: qmk config user.developer=None') +            print() +            exit(1)  # Import our subcommands  from . import c2json  # noqa @@ -141,6 +155,7 @@ from . import chibios  # noqa  from . import clean  # noqa  from . import compile  # noqa  from . import config  # noqa +from . import console  # noqa  from . import docs  # noqa  from . import doctor  # noqa  from . import fileformat  # noqa diff --git a/lib/python/qmk/cli/console.py b/lib/python/qmk/cli/console.py new file mode 100644 index 0000000000..45ff0c8bee --- /dev/null +++ b/lib/python/qmk/cli/console.py @@ -0,0 +1,302 @@ +"""Acquire debugging information from usb hid devices + +cli implementation of https://www.pjrc.com/teensy/hid_listen.html +""" +from pathlib import Path +from threading import Thread +from time import sleep, strftime + +import hid +import usb.core + +from milc import cli + +LOG_COLOR = { +    'next': 0, +    'colors': [ +        '{fg_blue}', +        '{fg_cyan}', +        '{fg_green}', +        '{fg_magenta}', +        '{fg_red}', +        '{fg_yellow}', +    ], +} + +KNOWN_BOOTLOADERS = { +    # VID  ,  PID +    ('03EB', '2FEF'): 'atmel-dfu: ATmega16U2', +    ('03EB', '2FF0'): 'atmel-dfu: ATmega32U2', +    ('03EB', '2FF3'): 'atmel-dfu: ATmega16U4', +    ('03EB', '2FF4'): 'atmel-dfu: ATmega32U4', +    ('03EB', '2FF9'): 'atmel-dfu: AT90USB64', +    ('03EB', '2FFA'): 'atmel-dfu: AT90USB162', +    ('03EB', '2FFB'): 'atmel-dfu: AT90USB128', +    ('03EB', '6124'): 'Microchip SAM-BA', +    ('0483', 'DF11'): 'stm32-dfu: STM32 BOOTLOADER', +    ('16C0', '05DC'): 'USBasp: USBaspLoader', +    ('16C0', '05DF'): 'bootloadHID: HIDBoot', +    ('16C0', '0478'): 'halfkay: Teensy Halfkay', +    ('1B4F', '9203'): 'caterina: Pro Micro 3.3V', +    ('1B4F', '9205'): 'caterina: Pro Micro 5V', +    ('1B4F', '9207'): 'caterina: LilyPadUSB', +    ('1C11', 'B007'): 'kiibohd: Kiibohd DFU Bootloader', +    ('1EAF', '0003'): 'stm32duino: Maple 003', +    ('1FFB', '0101'): 'caterina: Polou A-Star 32U4 Bootloader', +    ('2341', '0036'): 'caterina: Arduino Leonardo', +    ('2341', '0037'): 'caterina: Arduino Micro', +    ('239A', '000C'): 'caterina: Adafruit Feather 32U4', +    ('239A', '000D'): 'caterina: Adafruit ItsyBitsy 32U4 3v', +    ('239A', '000E'): 'caterina: Adafruit ItsyBitsy 32U4 5v', +    ('239A', '000E'): 'caterina: Adafruit ItsyBitsy 32U4 5v', +    ('2A03', '0036'): 'caterina: Arduino Leonardo', +    ('2A03', '0037'): 'caterina: Arduino Micro', +    ('314B', '0106'): 'apm32-dfu: APM32 DFU ISP Mode' +} + + +class MonitorDevice(object): +    def __init__(self, hid_device, numeric): +        self.hid_device = hid_device +        self.numeric = numeric +        self.device = hid.Device(path=hid_device['path']) +        self.current_line = '' + +        cli.log.info('Console Connected: %(color)s%(manufacturer_string)s %(product_string)s{style_reset_all} (%(color)s%(vendor_id)04X:%(product_id)04X:%(index)d{style_reset_all})', hid_device) + +    def read(self, size, encoding='ascii', timeout=1): +        """Read size bytes from the device. +        """ +        return self.device.read(size, timeout).decode(encoding) + +    def read_line(self): +        """Read from the device's console until we get a \n. +        """ +        while '\n' not in self.current_line: +            self.current_line += self.read(32).replace('\x00', '') + +        lines = self.current_line.split('\n', 1) +        self.current_line = lines[1] + +        return lines[0] + +    def run_forever(self): +        while True: +            try: +                message = {**self.hid_device, 'text': self.read_line()} +                identifier = (int2hex(message['vendor_id']), int2hex(message['product_id'])) if self.numeric else (message['manufacturer_string'], message['product_string']) +                message['identifier'] = ':'.join(identifier) +                message['ts'] = '{style_dim}{fg_green}%s{style_reset_all} ' % (strftime(cli.config.general.datetime_fmt),) if cli.args.timestamp else '' + +                cli.echo('%(ts)s%(color)s%(identifier)s:%(index)d{style_reset_all}: %(text)s' % message) + +            except hid.HIDException: +                break + + +class FindDevices(object): +    def __init__(self, vid, pid, index, numeric): +        self.vid = vid +        self.pid = pid +        self.index = index +        self.numeric = numeric + +    def run_forever(self): +        """Process messages from our queue in a loop. +        """ +        live_devices = {} +        live_bootloaders = {} + +        while True: +            try: +                for device in list(live_devices): +                    if not live_devices[device]['thread'].is_alive(): +                        cli.log.info('Console Disconnected: %(color)s%(manufacturer_string)s %(product_string)s{style_reset_all} (%(color)s%(vendor_id)04X:%(product_id)04X:%(index)d{style_reset_all})', live_devices[device]) +                        del live_devices[device] + +                for device in self.find_devices(): +                    if device['path'] not in live_devices: +                        device['color'] = LOG_COLOR['colors'][LOG_COLOR['next']] +                        LOG_COLOR['next'] = (LOG_COLOR['next'] + 1) % len(LOG_COLOR['colors']) +                        live_devices[device['path']] = device + +                        try: +                            monitor = MonitorDevice(device, self.numeric) +                            device['thread'] = Thread(target=monitor.run_forever, daemon=True) + +                            device['thread'].start() +                        except Exception as e: +                            device['e'] = e +                            device['e_name'] = e.__class__.__name__ +                            cli.log.error("Could not connect to %(color)s%(manufacturer_string)s %(product_string)s{style_reset_all} (%(color)s:%(vendor_id)04X:%(product_id)04X:%(index)d): %(e_name)s: %(e)s", device) +                            if cli.config.general.verbose: +                                cli.log.exception(e) +                            del live_devices[device['path']] + +                if cli.args.bootloaders: +                    for device in self.find_bootloaders(): +                        if device.address in live_bootloaders: +                            live_bootloaders[device.address]._qmk_found = True +                        else: +                            name = KNOWN_BOOTLOADERS[(int2hex(device.idVendor), int2hex(device.idProduct))] +                            cli.log.info('Bootloader Connected: {style_bright}{fg_magenta}%s', name) +                            device._qmk_found = True +                            live_bootloaders[device.address] = device + +                    for device in list(live_bootloaders): +                        if live_bootloaders[device]._qmk_found: +                            live_bootloaders[device]._qmk_found = False +                        else: +                            name = KNOWN_BOOTLOADERS[(int2hex(live_bootloaders[device].idVendor), int2hex(live_bootloaders[device].idProduct))] +                            cli.log.info('Bootloader Disconnected: {style_bright}{fg_magenta}%s', name) +                            del live_bootloaders[device] + +                sleep(.1) + +            except KeyboardInterrupt: +                break + +    def is_bootloader(self, hid_device): +        """Returns true if the device in question matches a known bootloader vid/pid. +        """ +        return (int2hex(hid_device.idVendor), int2hex(hid_device.idProduct)) in KNOWN_BOOTLOADERS + +    def is_console_hid(self, hid_device): +        """Returns true when the usage page indicates it's a teensy-style console. +        """ +        return hid_device['usage_page'] == 0xFF31 and hid_device['usage'] == 0x0074 + +    def is_filtered_device(self, hid_device): +        """Returns True if the device should be included in the list of available consoles. +        """ +        return int2hex(hid_device['vendor_id']) == self.vid and int2hex(hid_device['product_id']) == self.pid + +    def find_devices_by_report(self, hid_devices): +        """Returns a list of available teensy-style consoles by doing a brute-force search. + +        Some versions of linux don't report usage and usage_page. In that case we fallback to reading the report (possibly inaccurately) ourselves. +        """ +        devices = [] + +        for device in hid_devices: +            path = device['path'].decode('utf-8') + +            if path.startswith('/dev/hidraw'): +                number = path[11:] +                report = Path(f'/sys/class/hidraw/hidraw{number}/device/report_descriptor') + +                if report.exists(): +                    rp = report.read_bytes() + +                    if rp[1] == 0x31 and rp[3] == 0x09: +                        devices.append(device) + +        return devices + +    def find_bootloaders(self): +        """Returns a list of available bootloader devices. +        """ +        return list(filter(self.is_bootloader, usb.core.find(find_all=True))) + +    def find_devices(self): +        """Returns a list of available teensy-style consoles. +        """ +        hid_devices = hid.enumerate() +        devices = list(filter(self.is_console_hid, hid_devices)) + +        if not devices: +            devices = self.find_devices_by_report(hid_devices) + +        if self.vid and self.pid: +            devices = list(filter(self.is_filtered_device, devices)) + +        # Add index numbers +        device_index = {} +        for device in devices: +            id = ':'.join((int2hex(device['vendor_id']), int2hex(device['product_id']))) + +            if id not in device_index: +                device_index[id] = 0 + +            device_index[id] += 1 +            device['index'] = device_index[id] + +        return devices + + +def int2hex(number): +    """Returns a string representation of the number as hex. +    """ +    return "%04X" % number + + +def list_devices(device_finder): +    """Show the user a nicely formatted list of devices. +    """ +    devices = device_finder.find_devices() + +    if devices: +        cli.log.info('Available devices:') +        for dev in devices: +            color = LOG_COLOR['colors'][LOG_COLOR['next']] +            LOG_COLOR['next'] = (LOG_COLOR['next'] + 1) % len(LOG_COLOR['colors']) +            cli.log.info("\t%s%s:%s:%d{style_reset_all}\t%s %s", color, int2hex(dev['vendor_id']), int2hex(dev['product_id']), dev['index'], dev['manufacturer_string'], dev['product_string']) + +    if cli.args.bootloaders: +        bootloaders = device_finder.find_bootloaders() + +        if bootloaders: +            cli.log.info('Available Bootloaders:') + +            for dev in bootloaders: +                cli.log.info("\t%s:%s\t%s", int2hex(dev.idVendor), int2hex(dev.idProduct), KNOWN_BOOTLOADERS[(int2hex(dev.idVendor), int2hex(dev.idProduct))]) + + +@cli.argument('--bootloaders', arg_only=True, default=True, action='store_boolean', help='displaying bootloaders.') +@cli.argument('-d', '--device', help='Device to select - uses format <pid>:<vid>[:<index>].') +@cli.argument('-l', '--list', arg_only=True, action='store_true', help='List available hid_listen devices.') +@cli.argument('-n', '--numeric', arg_only=True, action='store_true', help='Show VID/PID instead of names.') +@cli.argument('-t', '--timestamp', arg_only=True, action='store_true', help='Print the timestamp for received messages as well.') +@cli.argument('-w', '--wait', type=int, default=1, help="How many seconds to wait between checks (Default: 1)") +@cli.subcommand('Acquire debugging information from usb hid devices.', hidden=False if cli.config.user.developer else True) +def console(cli): +    """Acquire debugging information from usb hid devices +    """ +    vid = None +    pid = None +    index = 1 + +    if cli.config.console.device: +        device = cli.config.console.device.split(':') + +        if len(device) == 2: +            vid, pid = device + +        elif len(device) == 3: +            vid, pid, index = device + +            if not index.isdigit(): +                cli.log.error('Device index must be a number! Got "%s" instead.', index) +                exit(1) + +            index = int(index) + +            if index < 1: +                cli.log.error('Device index must be greater than 0! Got %s', index) +                exit(1) + +        else: +            cli.log.error('Invalid format for device, expected "<pid>:<vid>[:<index>]" but got "%s".', cli.config.console.device) +            cli.print_help() +            exit(1) + +        vid = vid.upper() +        pid = pid.upper() + +    device_finder = FindDevices(vid, pid, index, cli.args.numeric) + +    if cli.args.list: +        return list_devices(device_finder) + +    print('Looking for devices...', flush=True) +    device_finder.run_forever() | 
