Source code for flake8.plugins.manager

"""Plugin loading and management logic and classes."""
import collections
import logging

import pkg_resources

from flake8 import exceptions
from flake8 import utils
from flake8.plugins import notifier

LOG = logging.getLogger(__name__)

__all__ = (
    'Checkers',
    'Listeners',
    'Plugin',
    'PluginManager',
    'ReportFormatters',
)

NO_GROUP_FOUND = object()


[docs]class Plugin(object): """Wrap an EntryPoint from setuptools and other logic."""
[docs] def __init__(self, name, entry_point): """Initialize our Plugin. :param str name: Name of the entry-point as it was registered with setuptools. :param entry_point: EntryPoint returned by setuptools. :type entry_point: setuptools.EntryPoint """ self.name = name self.entry_point = entry_point self._plugin = None self._parameters = None self._parameter_names = None self._group = None self._plugin_name = None self._version = None
def __repr__(self): """Provide an easy to read description of the current plugin.""" return 'Plugin(name="{0}", entry_point="{1}")'.format( self.name, self.entry_point )
[docs] def to_dictionary(self): """Convert this plugin to a dictionary.""" return { 'name': self.name, 'parameters': self.parameters, 'parameter_names': self.parameter_names, 'plugin': self.plugin, 'plugin_name': self.plugin_name, }
[docs] def is_in_a_group(self): """Determine if this plugin is in a group. :returns: True if the plugin is in a group, otherwise False. :rtype: bool """ return self.group() is not None
[docs] def group(self): """Find and parse the group the plugin is in.""" if self._group is None: name = self.name.split('.', 1) if len(name) > 1: self._group = name[0] else: self._group = NO_GROUP_FOUND if self._group is NO_GROUP_FOUND: return None return self._group
@property def parameters(self): """List of arguments that need to be passed to the plugin.""" if self._parameters is None: self._parameters = utils.parameters_for(self) return self._parameters @property def parameter_names(self): """List of argument names that need to be passed to the plugin.""" if self._parameter_names is None: self._parameter_names = list(self.parameters) return self._parameter_names @property def plugin(self): """Load and return the plugin associated with the entry-point. This property implicitly loads the plugin and then caches it. """ self.load_plugin() return self._plugin @property def version(self): """Return the version of the plugin.""" if self._version is None: if self.is_in_a_group(): self._version = version_for(self) else: self._version = self.plugin.version return self._version @property def plugin_name(self): """Return the name of the plugin.""" if self._plugin_name is None: if self.is_in_a_group(): self._plugin_name = self.group() else: self._plugin_name = self.plugin.name return self._plugin_name @property def off_by_default(self): """Return whether the plugin is ignored by default.""" return getattr(self.plugin, 'off_by_default', False)
[docs] def execute(self, *args, **kwargs): r"""Call the plugin with \*args and \*\*kwargs.""" return self.plugin(*args, **kwargs) # pylint: disable=not-callable
def _load(self, verify_requirements): # Avoid relying on hasattr() here. resolve = getattr(self.entry_point, 'resolve', None) require = getattr(self.entry_point, 'require', None) if resolve and require: if verify_requirements: LOG.debug('Verifying plugin "%s"\'s requirements.', self.name) require() self._plugin = resolve() else: self._plugin = self.entry_point.load( require=verify_requirements ) if not callable(self._plugin): msg = ('Plugin %r is not a callable. It might be written for an' ' older version of flake8 and might not work with this' ' version' % self._plugin) LOG.critical(msg) raise TypeError(msg)
[docs] def load_plugin(self, verify_requirements=False): """Retrieve the plugin for this entry-point. This loads the plugin, stores it on the instance and then returns it. It does not reload it after the first time, it merely returns the cached plugin. :param bool verify_requirements: Whether or not to make setuptools verify that the requirements for the plugin are satisfied. :returns: Nothing """ if self._plugin is None: LOG.info('Loading plugin "%s" from entry-point.', self.name) try: self._load(verify_requirements) except Exception as load_exception: LOG.exception(load_exception) failed_to_load = exceptions.FailedToLoadPlugin( plugin=self, exception=load_exception, ) LOG.critical(str(failed_to_load)) raise failed_to_load
[docs] def enable(self, optmanager, options=None): """Remove plugin name from the default ignore list.""" optmanager.remove_from_default_ignore([self.name]) optmanager.extend_default_select([self.name]) if not options: return try: options.ignore.remove(self.name) except (ValueError, KeyError): LOG.debug('Attempted to remove %s from the ignore list but it was ' 'not a member of the list.', self.name)
[docs] def disable(self, optmanager): """Add the plugin name to the default ignore list.""" optmanager.extend_default_ignore([self.name])
[docs] def provide_options(self, optmanager, options, extra_args): """Pass the parsed options and extra arguments to the plugin.""" parse_options = getattr(self.plugin, 'parse_options', None) if parse_options is not None: LOG.debug('Providing options to plugin "%s".', self.name) try: parse_options(optmanager, options, extra_args) except TypeError: parse_options(options) if self.name in options.enable_extensions: self.enable(optmanager, options)
[docs] def register_options(self, optmanager): """Register the plugin's command-line options on the OptionManager. :param optmanager: Instantiated OptionManager to register options on. :type optmanager: flake8.options.manager.OptionManager :returns: Nothing """ add_options = getattr(self.plugin, 'add_options', None) if add_options is not None: LOG.debug( 'Registering options from plugin "%s" on OptionManager %r', self.name, optmanager ) add_options(optmanager) if self.off_by_default: self.disable(optmanager)
[docs]class PluginManager(object): # pylint: disable=too-few-public-methods """Find and manage plugins consistently."""
[docs] def __init__(self, namespace, verify_requirements=False): """Initialize the manager. :param str namespace: Namespace of the plugins to manage, e.g., 'flake8.extension'. :param bool verify_requirements: Whether or not to make setuptools verify that the requirements for the plugin are satisfied. """ self.namespace = namespace self.verify_requirements = verify_requirements self.plugins = {} self.names = [] self._load_all_plugins()
def _load_all_plugins(self): LOG.info('Loading entry-points for "%s".', self.namespace) for entry_point in pkg_resources.iter_entry_points(self.namespace): name = entry_point.name self.plugins[name] = Plugin(name, entry_point) self.names.append(name) LOG.debug('Loaded %r for plugin "%s".', self.plugins[name], name)
[docs] def map(self, func, *args, **kwargs): r"""Call ``func`` with the plugin and \*args and \**kwargs after. This yields the return value from ``func`` for each plugin. :param collections.Callable func: Function to call with each plugin. Signature should at least be: .. code-block:: python def myfunc(plugin): pass Any extra positional or keyword arguments specified with map will be passed along to this function after the plugin. The plugin passed is a :class:`~flake8.plugins.manager.Plugin`. :param args: Positional arguments to pass to ``func`` after each plugin. :param kwargs: Keyword arguments to pass to ``func`` after each plugin. """ for name in self.names: yield func(self.plugins[name], *args, **kwargs)
[docs] def versions(self): # () -> (str, str) """Generate the versions of plugins. :returns: Tuples of the plugin_name and version :rtype: tuple """ plugins_seen = set() for entry_point_name in self.names: plugin = self.plugins[entry_point_name] plugin_name = plugin.plugin_name if plugin.plugin_name in plugins_seen: continue plugins_seen.add(plugin_name) yield (plugin_name, plugin.version)
def version_for(plugin): # (Plugin) -> Union[str, NoneType] """Determine the version of a plugin by it's module. :param plugin: The loaded plugin :type plugin: Plugin :returns: version string for the module :rtype: str """ module_name = plugin.plugin.__module__ try: module = __import__(module_name) except ImportError: return None return getattr(module, '__version__', None)
[docs]class PluginTypeManager(object): """Parent class for most of the specific plugin types.""" namespace = None def __init__(self): """Initialize the plugin type's manager.""" self.manager = PluginManager(self.namespace) self.plugins_loaded = False def __contains__(self, name): """Check if the entry-point name is in this plugin type manager.""" LOG.debug('Checking for "%s" in plugin type manager.', name) return name in self.plugins def __getitem__(self, name): """Retrieve a plugin by its name.""" LOG.debug('Retrieving plugin for "%s".', name) return self.plugins[name]
[docs] def get(self, name, default=None): """Retrieve the plugin referred to by ``name`` or return the default. :param str name: Name of the plugin to retrieve. :param default: Default value to return. :returns: Plugin object referred to by name, if it exists. :rtype: :class:`Plugin` """ if name in self: return self[name] return default
@property def names(self): """Proxy attribute to underlying manager.""" return self.manager.names @property def plugins(self): """Proxy attribute to underlying manager.""" return self.manager.plugins @staticmethod def _generate_call_function(method_name, optmanager, *args, **kwargs): def generated_function(plugin): # noqa: D105 method = getattr(plugin, method_name, None) if (method is not None and isinstance(method, collections.Callable)): return method(optmanager, *args, **kwargs) return generated_function
[docs] def load_plugins(self): """Load all plugins of this type that are managed by this manager.""" if self.plugins_loaded: return def load_plugin(plugin): """Call each plugin's load_plugin method.""" return plugin.load_plugin() plugins = list(self.manager.map(load_plugin)) # Do not set plugins_loaded if we run into an exception self.plugins_loaded = True return plugins
[docs] def register_plugin_versions(self, optmanager): """Register the plugins and their versions with the OptionManager.""" self.load_plugins() for (plugin_name, version) in self.manager.versions(): optmanager.register_plugin(name=plugin_name, version=version)
[docs] def register_options(self, optmanager): """Register all of the checkers' options to the OptionManager.""" self.load_plugins() call_register_options = self._generate_call_function( 'register_options', optmanager, ) list(self.manager.map(call_register_options))
[docs] def provide_options(self, optmanager, options, extra_args): """Provide parsed options and extra arguments to the plugins.""" call_provide_options = self._generate_call_function( 'provide_options', optmanager, options, extra_args, ) list(self.manager.map(call_provide_options))
class NotifierBuilderMixin(object): # pylint: disable=too-few-public-methods """Mixin class that builds a Notifier from a PluginManager.""" def build_notifier(self): """Build a Notifier for our Listeners. :returns: Object to notify our listeners of certain error codes and warnings. :rtype: :class:`~flake8.notifier.Notifier` """ notifier_trie = notifier.Notifier() for name in self.names: notifier_trie.register_listener(name, self.manager[name]) return notifier_trie
[docs]class Checkers(PluginTypeManager): """All of the checkers registered through entry-ponits.""" namespace = 'flake8.extension'
[docs] def checks_expecting(self, argument_name): """Retrieve checks that expect an argument with the specified name. Find all checker plugins that are expecting a specific argument. """ for plugin in self.plugins.values(): if argument_name == plugin.parameter_names[0]: yield plugin
[docs] def to_dictionary(self): """Return a dictionary of AST and line-based plugins.""" return { 'ast_plugins': [ plugin.to_dictionary() for plugin in self.ast_plugins ], 'logical_line_plugins': [ plugin.to_dictionary() for plugin in self.logical_line_plugins ], 'physical_line_plugins': [ plugin.to_dictionary() for plugin in self.physical_line_plugins ], }
[docs] def register_options(self, optmanager): """Register all of the checkers' options to the OptionManager. This also ensures that plugins that are not part of a group and are enabled by default are enabled on the option manager. """ # NOTE(sigmavirus24) We reproduce a little of # PluginTypeManager.register_options to reduce the number of times # that we loop over the list of plugins. Instead of looping twice, # option registration and enabling the plugin, we loop once with one # function to map over the plugins. self.load_plugins() call_register_options = self._generate_call_function( 'register_options', optmanager, ) def register_and_enable(plugin): call_register_options(plugin) if plugin.group() is None and not plugin.off_by_default: plugin.enable(optmanager) list(self.manager.map(register_and_enable))
@property def ast_plugins(self): """List of plugins that expect the AST tree.""" plugins = getattr(self, '_ast_plugins', []) if not plugins: plugins = list(self.checks_expecting('tree')) self._ast_plugins = plugins return plugins @property def logical_line_plugins(self): """List of plugins that expect the logical lines.""" plugins = getattr(self, '_logical_line_plugins', []) if not plugins: plugins = list(self.checks_expecting('logical_line')) self._logical_line_plugins = plugins return plugins @property def physical_line_plugins(self): """List of plugins that expect the physical lines.""" plugins = getattr(self, '_physical_line_plugins', []) if not plugins: plugins = list(self.checks_expecting('physical_line')) self._physical_line_plugins = plugins return plugins
[docs]class Listeners(PluginTypeManager, NotifierBuilderMixin): """All of the listeners registered through entry-points.""" namespace = 'flake8.listen'
[docs]class ReportFormatters(PluginTypeManager): """All of the report formatters registered through entry-points.""" namespace = 'flake8.report'