"""Utility methods for flake8."""
import collections
import fnmatch as _fnmatch
import inspect
import io
import os
import platform
import re
import sys
import tokenize
DIFF_HUNK_REGEXP = re.compile(r'^@@ -\d+(?:,\d+)? \+(\d+)(?:,(\d+))? @@.*$')
COMMA_SEPARATED_LIST_RE = re.compile(r'[,\s]')
LOCAL_PLUGIN_LIST_RE = re.compile(r'[,\t\n\r\f\v]')
[docs]def parse_comma_separated_list(value, regexp=COMMA_SEPARATED_LIST_RE):
# type: (Union[Sequence[str], str]) -> List[str]
"""Parse a comma-separated list.
:param value:
String or list of strings to be parsed and normalized.
:param regexp:
Compiled regular expression used to split the value when it is a
string.
:type regexp:
_sre.SRE_Pattern
:returns:
List of values with whitespace stripped.
:rtype:
list
"""
if not value:
return []
if not isinstance(value, (list, tuple)):
value = regexp.split(value)
item_gen = (item.strip() for item in value)
return [item for item in item_gen if item]
[docs]def normalize_paths(paths, parent=os.curdir):
# type: (Union[Sequence[str], str], str) -> List[str]
"""Parse a comma-separated list of paths.
:returns:
The normalized paths.
:rtype:
[str]
"""
return [normalize_path(p, parent)
for p in parse_comma_separated_list(paths)]
[docs]def normalize_path(path, parent=os.curdir):
# type: (str, str) -> str
"""Normalize a single-path.
:returns:
The normalized path.
:rtype:
str
"""
# NOTE(sigmavirus24): Using os.path.sep and os.path.altsep allow for
# Windows compatibility with both Windows-style paths (c:\\foo\bar) and
# Unix style paths (/foo/bar).
separator = os.path.sep
# NOTE(sigmavirus24): os.path.altsep may be None
alternate_separator = os.path.altsep or ''
if separator in path or (alternate_separator and
alternate_separator in path):
path = os.path.abspath(os.path.join(parent, path))
return path.rstrip(separator + alternate_separator)
def _stdin_get_value_py3():
stdin_value = sys.stdin.buffer.read()
fd = io.BytesIO(stdin_value)
try:
(coding, lines) = tokenize.detect_encoding(fd.readline)
return io.StringIO(stdin_value.decode(coding))
except (LookupError, SyntaxError, UnicodeError):
return io.StringIO(stdin_value.decode('utf-8'))
[docs]def stdin_get_value():
# type: () -> str
"""Get and cache it so plugins can use it."""
cached_value = getattr(stdin_get_value, 'cached_stdin', None)
if cached_value is None:
if sys.version_info < (3, 0):
stdin_value = io.BytesIO(sys.stdin.read())
else:
stdin_value = _stdin_get_value_py3()
stdin_get_value.cached_stdin = stdin_value
cached_value = stdin_get_value.cached_stdin
return cached_value.getvalue()
[docs]def parse_unified_diff(diff=None):
# type: (str) -> List[str]
"""Parse the unified diff passed on stdin.
:returns:
dictionary mapping file names to sets of line numbers
:rtype:
dict
"""
# Allow us to not have to patch out stdin_get_value
if diff is None:
diff = stdin_get_value()
number_of_rows = None
current_path = None
parsed_paths = collections.defaultdict(set)
for line in diff.splitlines():
if number_of_rows:
# NOTE(sigmavirus24): Below we use a slice because stdin may be
# bytes instead of text on Python 3.
if line[:1] != '-':
number_of_rows -= 1
# We're in the part of the diff that has lines starting with +, -,
# and ' ' to show context and the changes made. We skip these
# because the information we care about is the filename and the
# range within it.
# When number_of_rows reaches 0, we will once again start
# searching for filenames and ranges.
continue
# NOTE(sigmavirus24): Diffs that we support look roughly like:
# diff a/file.py b/file.py
# ...
# --- a/file.py
# +++ b/file.py
# Below we're looking for that last line. Every diff tool that
# gives us this output may have additional information after
# ``b/file.py`` which it will separate with a \t, e.g.,
# +++ b/file.py\t100644
# Which is an example that has the new file permissions/mode.
# In this case we only care about the file name.
if line[:3] == '+++':
current_path = line[4:].split('\t', 1)[0]
# NOTE(sigmavirus24): This check is for diff output from git.
if current_path[:2] == 'b/':
current_path = current_path[2:]
# We don't need to do anything else. We have set up our local
# ``current_path`` variable. We can skip the rest of this loop.
# The next line we will see will give us the hung information
# which is in the next section of logic.
continue
hunk_match = DIFF_HUNK_REGEXP.match(line)
# NOTE(sigmavirus24): pep8/pycodestyle check for:
# line[:3] == '@@ '
# But the DIFF_HUNK_REGEXP enforces that the line start with that
# So we can more simply check for a match instead of slicing and
# comparing.
if hunk_match:
(row, number_of_rows) = [
1 if not group else int(group)
for group in hunk_match.groups()
]
parsed_paths[current_path].update(
range(row, row + number_of_rows)
)
# We have now parsed our diff into a dictionary that looks like:
# {'file.py': set(range(10, 16), range(18, 20)), ...}
return parsed_paths
[docs]def is_windows():
# type: () -> bool
"""Determine if we're running on Windows.
:returns:
True if running on Windows, otherwise False
:rtype:
bool
"""
return os.name == 'nt'
# NOTE(sigmavirus24): If and when https://bugs.python.org/issue27649 is fixed,
# re-enable multiprocessing support on Windows.
[docs]def can_run_multiprocessing_on_windows():
# type: () -> bool
"""Determine if we can use multiprocessing on Windows.
This presently will **always** return False due to a `bug`_ in the
:mod:`multiprocessing` module on Windows. Once fixed, we will check
to ensure that the version of Python contains that fix (via version
inspection) and *conditionally* re-enable support on Windows.
.. _bug:
https://bugs.python.org/issue27649
:returns:
True if the version of Python is modern enough, otherwise False
:rtype:
bool
"""
is_new_enough_python27 = (2, 7, 11) <= sys.version_info < (3, 0)
is_new_enough_python3 = sys.version_info > (3, 2)
return False and (is_new_enough_python27 or is_new_enough_python3)
[docs]def is_using_stdin(paths):
# type: (List[str]) -> bool
"""Determine if we're going to read from stdin.
:param list paths:
The paths that we're going to check.
:returns:
True if stdin (-) is in the path, otherwise False
:rtype:
bool
"""
return '-' in paths
def _default_predicate(*args):
return False
[docs]def filenames_from(arg, predicate=None):
# type: (str, callable) -> Generator
"""Generate filenames from an argument.
:param str arg:
Parameter from the command-line.
:param callable predicate:
Predicate to use to filter out filenames. If the predicate
returns ``True`` we will exclude the filename, otherwise we
will yield it. By default, we include every filename
generated.
:returns:
Generator of paths
"""
if predicate is None:
predicate = _default_predicate
if predicate(arg):
return
if os.path.isdir(arg):
for root, sub_directories, files in os.walk(arg):
if predicate(root):
sub_directories[:] = []
continue
# NOTE(sigmavirus24): os.walk() will skip a directory if you
# remove it from the list of sub-directories.
for directory in sub_directories:
joined = os.path.join(root, directory)
if predicate(joined):
sub_directories.remove(directory)
for filename in files:
joined = os.path.join(root, filename)
if predicate(joined) or predicate(filename):
continue
yield joined
else:
yield arg
[docs]def fnmatch(filename, patterns, default=True):
# type: (str, List[str], bool) -> bool
"""Wrap :func:`fnmatch.fnmatch` to add some functionality.
:param str filename:
Name of the file we're trying to match.
:param list patterns:
Patterns we're using to try to match the filename.
:param bool default:
The default value if patterns is empty
:returns:
True if a pattern matches the filename, False if it doesn't.
``default`` if patterns is empty.
"""
if not patterns:
return default
return any(_fnmatch.fnmatch(filename, pattern) for pattern in patterns)
[docs]def parameters_for(plugin):
# type: (flake8.plugins.manager.Plugin) -> Dict[str, bool]
"""Return the parameters for the plugin.
This will inspect the plugin and return either the function parameters
if the plugin is a function or the parameters for ``__init__`` after
``self`` if the plugin is a class.
:param plugin:
The internal plugin object.
:type plugin:
flake8.plugins.manager.Plugin
:returns:
A dictionary mapping the parameter name to whether or not it is
required (a.k.a., is positional only/does not have a default).
:rtype:
dict([(str, bool)])
"""
func = plugin.plugin
is_class = not inspect.isfunction(func)
if is_class: # The plugin is a class
func = plugin.plugin.__init__
if sys.version_info < (3, 3):
argspec = inspect.getargspec(func)
start_of_optional_args = len(argspec[0]) - len(argspec[-1] or [])
parameter_names = argspec[0]
parameters = collections.OrderedDict([
(name, position < start_of_optional_args)
for position, name in enumerate(parameter_names)
])
else:
parameters = collections.OrderedDict([
(parameter.name, parameter.default is parameter.empty)
for parameter in inspect.signature(func).parameters.values()
if parameter.kind == parameter.POSITIONAL_OR_KEYWORD
])
if is_class:
parameters.pop('self', None)
return parameters
def get_python_version():
"""Find and format the python implementation and version.
:returns:
Implementation name, version, and platform as a string.
:rtype:
str
"""
# The implementation isn't all that important.
try:
impl = platform.python_implementation() + " "
except AttributeError: # Python 2.5
impl = ''
return '%s%s on %s' % (impl, platform.python_version(), platform.system())