diff options
Diffstat (limited to 'lib/python/qmk/cli')
| -rw-r--r-- | lib/python/qmk/cli/__init__.py | 72 | ||||
| -rw-r--r-- | lib/python/qmk/cli/config.py | 116 | ||||
| -rw-r--r-- | lib/python/qmk/cli/console.py | 302 | ||||
| -rwxr-xr-x | lib/python/qmk/cli/generate/layouts.py | 4 | 
4 files changed, 348 insertions, 146 deletions
| diff --git a/lib/python/qmk/cli/__init__.py b/lib/python/qmk/cli/__init__.py index 6fe769fe7b..02b721f342 100644 --- a/lib/python/qmk/cli/__init__.py +++ b/lib/python/qmk/cli/__init__.py @@ -12,6 +12,20 @@ from subprocess import run  from milc import cli, __VERSION__  from milc.questions import yesno +import_names = { +    # A mapping of package name to importable name +    'pep8-naming': 'pep8ext_naming', +    'pyusb': 'usb.core', +} + +safe_commands = [ +    # A list of subcommands we always run, even when the module imports fail +    'clone', +    'config', +    'env', +    'setup', +] +  def _run_cmd(*command):      """Run a command in a subshell. @@ -50,8 +64,8 @@ def _find_broken_requirements(requirements):              module_import = module_name.replace('-', '_')              # Not every module is importable by its own name. -            if module_name == "pep8-naming": -                module_import = "pep8ext_naming" +            if module_name in import_names: +                module_import = import_names[module_name]              if not find_spec(module_import):                  broken_modules.append(module_name) @@ -107,32 +121,31 @@ if int(milc_version[0]) < 2 and int(milc_version[1]) < 3:  # Check to make sure we have all our dependencies  msg_install = 'Please run `python3 -m pip install -r %s` to install required python dependencies.' - -if _broken_module_imports('requirements.txt'): -    if yesno('Would you like to install the required Python modules?'): -        _run_cmd(sys.executable, '-m', 'pip', 'install', '-r', 'requirements.txt') -    else: -        print() -        print(msg_install % (str(Path('requirements.txt').resolve()),)) -        print() -        exit(1) - -if cli.config.user.developer: -    args = sys.argv[1:] -    while args and args[0][0] == '-': -        del args[0] -    if not args or args[0] != 'config': -        if _broken_module_imports('requirements-dev.txt'): -            if yesno('Would you like to install the required developer Python modules?'): -                _run_cmd(sys.executable, '-m', 'pip', 'install', '-r', 'requirements-dev.txt') -            elif yesno('Would you like to disable developer mode?'): -                _run_cmd(sys.argv[0], 'config', 'user.developer=None') -            else: -                print() -                print(msg_install % (str(Path('requirements-dev.txt').resolve()),)) -                print('You can also turn off developer mode: qmk config user.developer=None') -                print() -                exit(1) +args = sys.argv[1:] +while args and args[0][0] == '-': +    del args[0] + +if not args or args[0] not in safe_commands: +    if _broken_module_imports('requirements.txt'): +        if yesno('Would you like to install the required Python modules?'): +            _run_cmd(sys.executable, '-m', 'pip', 'install', '-r', 'requirements.txt') +        else: +            print() +            print(msg_install % (str(Path('requirements.txt').resolve()),)) +            print() +            exit(1) + +    if cli.config.user.developer and _broken_module_imports('requirements-dev.txt'): +        if yesno('Would you like to install the required developer Python modules?'): +            _run_cmd(sys.executable, '-m', 'pip', 'install', '-r', 'requirements-dev.txt') +        elif yesno('Would you like to disable developer mode?'): +            _run_cmd(sys.argv[0], 'config', 'user.developer=None') +        else: +            print() +            print(msg_install % (str(Path('requirements-dev.txt').resolve()),)) +            print('You can also turn off developer mode: qmk config user.developer=None') +            print() +            exit(1)  # Import our subcommands  from . import c2json  # noqa @@ -140,7 +153,8 @@ from . import cformat  # noqa  from . import chibios  # noqa  from . import clean  # noqa  from . import compile  # noqa -from . import config  # noqa +from milc.subcommand import config  # noqa +from . import console  # noqa  from . import docs  # noqa  from . import doctor  # noqa  from . import fileformat  # noqa diff --git a/lib/python/qmk/cli/config.py b/lib/python/qmk/cli/config.py deleted file mode 100644 index e17d8bb9ba..0000000000 --- a/lib/python/qmk/cli/config.py +++ /dev/null @@ -1,116 +0,0 @@ -"""Read and write configuration settings -""" -from milc import cli - - -def print_config(section, key): -    """Print a single config setting to stdout. -    """ -    cli.echo('%s.%s{fg_cyan}={fg_reset}%s', section, key, cli.config[section][key]) - - -def show_config(): -    """Print the current configuration to stdout. -    """ -    for section in cli.config: -        for key in cli.config[section]: -            print_config(section, key) - - -def parse_config_token(config_token): -    """Split a user-supplied configuration-token into its components. -    """ -    section = option = value = None - -    if '=' in config_token and '.' not in config_token: -        cli.log.error('Invalid configuration token, the key must be of the form <section>.<option>: %s', config_token) -        return section, option, value - -    # Separate the key (<section>.<option>) from the value -    if '=' in config_token: -        key, value = config_token.split('=') -    else: -        key = config_token - -    # Extract the section and option from the key -    if '.' in key: -        section, option = key.split('.', 1) -    else: -        section = key - -    return section, option, value - - -def set_config(section, option, value): -    """Set a config key in the running config. -    """ -    log_string = '%s.%s{fg_cyan}:{fg_reset} %s {fg_cyan}->{fg_reset} %s' -    if cli.args.read_only: -        log_string += ' {fg_red}(change not written)' - -    cli.echo(log_string, section, option, cli.config[section][option], value) - -    if not cli.args.read_only: -        if value == 'None': -            del cli.config[section][option] -        else: -            cli.config[section][option] = value - - -@cli.argument('-ro', '--read-only', arg_only=True, action='store_true', help='Operate in read-only mode.') -@cli.argument('configs', nargs='*', arg_only=True, help='Configuration options to read or write.') -@cli.subcommand("Read and write configuration settings.") -def config(cli): -    """Read and write config settings. - -    This script iterates over the config_tokens supplied as argument. Each config_token has the following form: - -        section[.key][=value] - -    If only a section (EG 'compile') is supplied all keys for that section will be displayed. - -    If section.key is supplied the value for that single key will be displayed. - -    If section.key=value is supplied the value for that single key will be set. - -    If section.key=None is supplied the key will be deleted. - -    No validation is done to ensure that the supplied section.key is actually used by qmk scripts. -    """ -    if not cli.args.configs: -        return show_config() - -    # Process config_tokens -    save_config = False - -    for argument in cli.args.configs: -        # Split on space in case they quoted multiple config tokens -        for config_token in argument.split(' '): -            section, option, value = parse_config_token(config_token) - -            # Validation -            if option and '.' in option: -                cli.log.error('Config keys may not have more than one period! "%s" is not valid.', config_token) -                return False - -            # Do what the user wants -            if section and option and value: -                # Write a configuration option -                set_config(section, option, value) -                if not cli.args.read_only: -                    save_config = True - -            elif section and option: -                # Display a single key -                print_config(section, option) - -            elif section: -                # Display an entire section -                for key in cli.config[section]: -                    print_config(section, key) - -    # Ending actions -    if save_config: -        cli.save_config() - -    return True diff --git a/lib/python/qmk/cli/console.py b/lib/python/qmk/cli/console.py new file mode 100644 index 0000000000..45ff0c8bee --- /dev/null +++ b/lib/python/qmk/cli/console.py @@ -0,0 +1,302 @@ +"""Acquire debugging information from usb hid devices + +cli implementation of https://www.pjrc.com/teensy/hid_listen.html +""" +from pathlib import Path +from threading import Thread +from time import sleep, strftime + +import hid +import usb.core + +from milc import cli + +LOG_COLOR = { +    'next': 0, +    'colors': [ +        '{fg_blue}', +        '{fg_cyan}', +        '{fg_green}', +        '{fg_magenta}', +        '{fg_red}', +        '{fg_yellow}', +    ], +} + +KNOWN_BOOTLOADERS = { +    # VID  ,  PID +    ('03EB', '2FEF'): 'atmel-dfu: ATmega16U2', +    ('03EB', '2FF0'): 'atmel-dfu: ATmega32U2', +    ('03EB', '2FF3'): 'atmel-dfu: ATmega16U4', +    ('03EB', '2FF4'): 'atmel-dfu: ATmega32U4', +    ('03EB', '2FF9'): 'atmel-dfu: AT90USB64', +    ('03EB', '2FFA'): 'atmel-dfu: AT90USB162', +    ('03EB', '2FFB'): 'atmel-dfu: AT90USB128', +    ('03EB', '6124'): 'Microchip SAM-BA', +    ('0483', 'DF11'): 'stm32-dfu: STM32 BOOTLOADER', +    ('16C0', '05DC'): 'USBasp: USBaspLoader', +    ('16C0', '05DF'): 'bootloadHID: HIDBoot', +    ('16C0', '0478'): 'halfkay: Teensy Halfkay', +    ('1B4F', '9203'): 'caterina: Pro Micro 3.3V', +    ('1B4F', '9205'): 'caterina: Pro Micro 5V', +    ('1B4F', '9207'): 'caterina: LilyPadUSB', +    ('1C11', 'B007'): 'kiibohd: Kiibohd DFU Bootloader', +    ('1EAF', '0003'): 'stm32duino: Maple 003', +    ('1FFB', '0101'): 'caterina: Polou A-Star 32U4 Bootloader', +    ('2341', '0036'): 'caterina: Arduino Leonardo', +    ('2341', '0037'): 'caterina: Arduino Micro', +    ('239A', '000C'): 'caterina: Adafruit Feather 32U4', +    ('239A', '000D'): 'caterina: Adafruit ItsyBitsy 32U4 3v', +    ('239A', '000E'): 'caterina: Adafruit ItsyBitsy 32U4 5v', +    ('239A', '000E'): 'caterina: Adafruit ItsyBitsy 32U4 5v', +    ('2A03', '0036'): 'caterina: Arduino Leonardo', +    ('2A03', '0037'): 'caterina: Arduino Micro', +    ('314B', '0106'): 'apm32-dfu: APM32 DFU ISP Mode' +} + + +class MonitorDevice(object): +    def __init__(self, hid_device, numeric): +        self.hid_device = hid_device +        self.numeric = numeric +        self.device = hid.Device(path=hid_device['path']) +        self.current_line = '' + +        cli.log.info('Console Connected: %(color)s%(manufacturer_string)s %(product_string)s{style_reset_all} (%(color)s%(vendor_id)04X:%(product_id)04X:%(index)d{style_reset_all})', hid_device) + +    def read(self, size, encoding='ascii', timeout=1): +        """Read size bytes from the device. +        """ +        return self.device.read(size, timeout).decode(encoding) + +    def read_line(self): +        """Read from the device's console until we get a \n. +        """ +        while '\n' not in self.current_line: +            self.current_line += self.read(32).replace('\x00', '') + +        lines = self.current_line.split('\n', 1) +        self.current_line = lines[1] + +        return lines[0] + +    def run_forever(self): +        while True: +            try: +                message = {**self.hid_device, 'text': self.read_line()} +                identifier = (int2hex(message['vendor_id']), int2hex(message['product_id'])) if self.numeric else (message['manufacturer_string'], message['product_string']) +                message['identifier'] = ':'.join(identifier) +                message['ts'] = '{style_dim}{fg_green}%s{style_reset_all} ' % (strftime(cli.config.general.datetime_fmt),) if cli.args.timestamp else '' + +                cli.echo('%(ts)s%(color)s%(identifier)s:%(index)d{style_reset_all}: %(text)s' % message) + +            except hid.HIDException: +                break + + +class FindDevices(object): +    def __init__(self, vid, pid, index, numeric): +        self.vid = vid +        self.pid = pid +        self.index = index +        self.numeric = numeric + +    def run_forever(self): +        """Process messages from our queue in a loop. +        """ +        live_devices = {} +        live_bootloaders = {} + +        while True: +            try: +                for device in list(live_devices): +                    if not live_devices[device]['thread'].is_alive(): +                        cli.log.info('Console Disconnected: %(color)s%(manufacturer_string)s %(product_string)s{style_reset_all} (%(color)s%(vendor_id)04X:%(product_id)04X:%(index)d{style_reset_all})', live_devices[device]) +                        del live_devices[device] + +                for device in self.find_devices(): +                    if device['path'] not in live_devices: +                        device['color'] = LOG_COLOR['colors'][LOG_COLOR['next']] +                        LOG_COLOR['next'] = (LOG_COLOR['next'] + 1) % len(LOG_COLOR['colors']) +                        live_devices[device['path']] = device + +                        try: +                            monitor = MonitorDevice(device, self.numeric) +                            device['thread'] = Thread(target=monitor.run_forever, daemon=True) + +                            device['thread'].start() +                        except Exception as e: +                            device['e'] = e +                            device['e_name'] = e.__class__.__name__ +                            cli.log.error("Could not connect to %(color)s%(manufacturer_string)s %(product_string)s{style_reset_all} (%(color)s:%(vendor_id)04X:%(product_id)04X:%(index)d): %(e_name)s: %(e)s", device) +                            if cli.config.general.verbose: +                                cli.log.exception(e) +                            del live_devices[device['path']] + +                if cli.args.bootloaders: +                    for device in self.find_bootloaders(): +                        if device.address in live_bootloaders: +                            live_bootloaders[device.address]._qmk_found = True +                        else: +                            name = KNOWN_BOOTLOADERS[(int2hex(device.idVendor), int2hex(device.idProduct))] +                            cli.log.info('Bootloader Connected: {style_bright}{fg_magenta}%s', name) +                            device._qmk_found = True +                            live_bootloaders[device.address] = device + +                    for device in list(live_bootloaders): +                        if live_bootloaders[device]._qmk_found: +                            live_bootloaders[device]._qmk_found = False +                        else: +                            name = KNOWN_BOOTLOADERS[(int2hex(live_bootloaders[device].idVendor), int2hex(live_bootloaders[device].idProduct))] +                            cli.log.info('Bootloader Disconnected: {style_bright}{fg_magenta}%s', name) +                            del live_bootloaders[device] + +                sleep(.1) + +            except KeyboardInterrupt: +                break + +    def is_bootloader(self, hid_device): +        """Returns true if the device in question matches a known bootloader vid/pid. +        """ +        return (int2hex(hid_device.idVendor), int2hex(hid_device.idProduct)) in KNOWN_BOOTLOADERS + +    def is_console_hid(self, hid_device): +        """Returns true when the usage page indicates it's a teensy-style console. +        """ +        return hid_device['usage_page'] == 0xFF31 and hid_device['usage'] == 0x0074 + +    def is_filtered_device(self, hid_device): +        """Returns True if the device should be included in the list of available consoles. +        """ +        return int2hex(hid_device['vendor_id']) == self.vid and int2hex(hid_device['product_id']) == self.pid + +    def find_devices_by_report(self, hid_devices): +        """Returns a list of available teensy-style consoles by doing a brute-force search. + +        Some versions of linux don't report usage and usage_page. In that case we fallback to reading the report (possibly inaccurately) ourselves. +        """ +        devices = [] + +        for device in hid_devices: +            path = device['path'].decode('utf-8') + +            if path.startswith('/dev/hidraw'): +                number = path[11:] +                report = Path(f'/sys/class/hidraw/hidraw{number}/device/report_descriptor') + +                if report.exists(): +                    rp = report.read_bytes() + +                    if rp[1] == 0x31 and rp[3] == 0x09: +                        devices.append(device) + +        return devices + +    def find_bootloaders(self): +        """Returns a list of available bootloader devices. +        """ +        return list(filter(self.is_bootloader, usb.core.find(find_all=True))) + +    def find_devices(self): +        """Returns a list of available teensy-style consoles. +        """ +        hid_devices = hid.enumerate() +        devices = list(filter(self.is_console_hid, hid_devices)) + +        if not devices: +            devices = self.find_devices_by_report(hid_devices) + +        if self.vid and self.pid: +            devices = list(filter(self.is_filtered_device, devices)) + +        # Add index numbers +        device_index = {} +        for device in devices: +            id = ':'.join((int2hex(device['vendor_id']), int2hex(device['product_id']))) + +            if id not in device_index: +                device_index[id] = 0 + +            device_index[id] += 1 +            device['index'] = device_index[id] + +        return devices + + +def int2hex(number): +    """Returns a string representation of the number as hex. +    """ +    return "%04X" % number + + +def list_devices(device_finder): +    """Show the user a nicely formatted list of devices. +    """ +    devices = device_finder.find_devices() + +    if devices: +        cli.log.info('Available devices:') +        for dev in devices: +            color = LOG_COLOR['colors'][LOG_COLOR['next']] +            LOG_COLOR['next'] = (LOG_COLOR['next'] + 1) % len(LOG_COLOR['colors']) +            cli.log.info("\t%s%s:%s:%d{style_reset_all}\t%s %s", color, int2hex(dev['vendor_id']), int2hex(dev['product_id']), dev['index'], dev['manufacturer_string'], dev['product_string']) + +    if cli.args.bootloaders: +        bootloaders = device_finder.find_bootloaders() + +        if bootloaders: +            cli.log.info('Available Bootloaders:') + +            for dev in bootloaders: +                cli.log.info("\t%s:%s\t%s", int2hex(dev.idVendor), int2hex(dev.idProduct), KNOWN_BOOTLOADERS[(int2hex(dev.idVendor), int2hex(dev.idProduct))]) + + +@cli.argument('--bootloaders', arg_only=True, default=True, action='store_boolean', help='displaying bootloaders.') +@cli.argument('-d', '--device', help='Device to select - uses format <pid>:<vid>[:<index>].') +@cli.argument('-l', '--list', arg_only=True, action='store_true', help='List available hid_listen devices.') +@cli.argument('-n', '--numeric', arg_only=True, action='store_true', help='Show VID/PID instead of names.') +@cli.argument('-t', '--timestamp', arg_only=True, action='store_true', help='Print the timestamp for received messages as well.') +@cli.argument('-w', '--wait', type=int, default=1, help="How many seconds to wait between checks (Default: 1)") +@cli.subcommand('Acquire debugging information from usb hid devices.', hidden=False if cli.config.user.developer else True) +def console(cli): +    """Acquire debugging information from usb hid devices +    """ +    vid = None +    pid = None +    index = 1 + +    if cli.config.console.device: +        device = cli.config.console.device.split(':') + +        if len(device) == 2: +            vid, pid = device + +        elif len(device) == 3: +            vid, pid, index = device + +            if not index.isdigit(): +                cli.log.error('Device index must be a number! Got "%s" instead.', index) +                exit(1) + +            index = int(index) + +            if index < 1: +                cli.log.error('Device index must be greater than 0! Got %s', index) +                exit(1) + +        else: +            cli.log.error('Invalid format for device, expected "<pid>:<vid>[:<index>]" but got "%s".', cli.config.console.device) +            cli.print_help() +            exit(1) + +        vid = vid.upper() +        pid = pid.upper() + +    device_finder = FindDevices(vid, pid, index, cli.args.numeric) + +    if cli.args.list: +        return list_devices(device_finder) + +    print('Looking for devices...', flush=True) +    device_finder.run_forever() diff --git a/lib/python/qmk/cli/generate/layouts.py b/lib/python/qmk/cli/generate/layouts.py index ad6946d6cf..4de982f822 100755 --- a/lib/python/qmk/cli/generate/layouts.py +++ b/lib/python/qmk/cli/generate/layouts.py @@ -85,7 +85,9 @@ def generate_layouts(cli):      for alias, target in kb_info_json.get('layout_aliases', {}).items():          layouts_h_lines.append('') -        layouts_h_lines.append('#define %s %s' % (alias, target)) +        layouts_h_lines.append(f'#ifndef {alias}') +        layouts_h_lines.append(f'#   define {alias} {target}') +        layouts_h_lines.append('#endif')      # Show the results      layouts_h = '\n'.join(layouts_h_lines) + '\n' | 
