Source code for compat_patcher_core.registry

from __future__ import absolute_import, print_function, unicode_literals

import collections
import itertools

import six

from compat_patcher_core.utilities import (
    tuplify_software_version,
    _import_attribute_from_dotted_string,
)


[docs]class PatchingRegistry(object): """ This registry is used to store and select a set of fixers related to some specific software. `family_prefix` will be used to constructe family names, along with the software reference version provided by the fixer. `populate_callable`, if provided, is a callable taking the registry as first argument, and which will be called by `populate()`. `current_software_version` may be a version tuple or a string. If it's None, then an override value will have to be provided when calling `get_relevant_fixers`. """ def __init__( self, family_prefix, populate_callable=None, current_software_version=None ): assert family_prefix and isinstance( family_prefix, six.string_types ), family_prefix assert populate_callable is None or hasattr( populate_callable, "__call__" ), populate_callable self._family_prefix = family_prefix self._is_populated = False self._populate_callable = populate_callable self._patching_registry = collections.OrderedDict() self._current_software_version = current_software_version def _get_current_software_version(self): """ Returns a tuple of integers, or a dotted string, representing the current version of the software to be patched. """ current_software_version = self._current_software_version if six.callable(current_software_version): current_software_version = current_software_version() assert current_software_version is None or isinstance( current_software_version, (six.string_types, tuple, list) ), current_software_version return current_software_version
[docs] def populate(self): """ Trigger the registration of potential lazy fixers, which might be in other submodules, or waiting in factory functions. """ res = None if not self._is_populated: if self._populate_callable: res = self._populate_callable(self) self._is_populated = True return res
@staticmethod def _extract_docstring(func): """Extract and check the docstring of a callable""" doc = func.__doc__ if not doc: raise ValueError("Fixer %r must provide a help string" % func) return doc
[docs] def register_compatibility_fixer( self, fixer_reference_version, fixer_applied_from_version=None, # INCLUDING this version fixer_applied_upto_version=None, # EXCLUDING this version feature_supported_from_version=None, feature_supported_upto_version=None, fixer_tags=None, ): """ Register a compatibility fixer, which will be activated only if current software version is >= `fixer_applied_from_version` and < `fixer_applied_upto_version` (let them be None to have no limit). The "fixer_reference_version" parameters identifies the software version where the breaking change was introduced (for backwards compatibility fixers), or where the new feature was introduced (for forwards compatibility fixers). It is not related to the appearance of corresponding DeprecationWarnings in the software. It is also used to sort fixers when applying them, and to generate the name of the family of fixers concerned. `feature_supported_from_version` (included) and `feature_supported_upto_version` (excluded) may be used to limit the range of software versions for which related unit-tests are expected to work (i.e versions for which the feature is available, either as a monkey-paching or as standard code). Version identifiers must be dotted strings, eg. "1.9.1". None means "no limit" here. `fixer_tags` is a **list** of strings, which can be used to differentiate fixers which will be applied at different moments of software startup. """ assert ( isinstance(fixer_reference_version, six.string_types) and fixer_reference_version ), fixer_reference_version # eg. "1.9" assert fixer_tags is None or isinstance(fixer_tags, list), fixer_tags fixer_family = self._family_prefix + fixer_reference_version fixer_reference_version = tuplify_software_version(fixer_reference_version) fixer_applied_from_version = tuplify_software_version( fixer_applied_from_version ) fixer_applied_upto_version = tuplify_software_version( fixer_applied_upto_version ) feature_supported_from_version = tuplify_software_version( feature_supported_from_version ) feature_supported_upto_version = tuplify_software_version( feature_supported_upto_version ) fixer_tags = fixer_tags or [] if fixer_applied_from_version and fixer_applied_upto_version: assert fixer_applied_from_version < fixer_applied_upto_version if feature_supported_from_version and feature_supported_upto_version: assert feature_supported_from_version < feature_supported_upto_version def _register_simple_fixer(func): fixer_id = func.__name__ # untouched ATM, not fully qualified new_fixer = dict( fixer_callable=func, fixer_id=fixer_id, fixer_explanation=self._extract_docstring(func), fixer_reference_version=fixer_reference_version, fixer_family=fixer_family, fixer_tags=fixer_tags, fixer_applied_from_version=fixer_applied_from_version, fixer_applied_upto_version=fixer_applied_upto_version, feature_supported_from_version=feature_supported_from_version, feature_supported_upto_version=feature_supported_upto_version, fixer_qualified_name="%s|%s" % (fixer_family, fixer_id), ) assert fixer_id not in self._patching_registry, ( "duplicate fixer id %s detected" % fixer_id ) self._patching_registry[fixer_id] = new_fixer # print("patching_registry", patching_registry) return func return _register_simple_fixer
[docs] def get_all_fixers(self): """Return the list of all fixers (as dicts) known by this registry.""" return list(self._patching_registry.values())
[docs] def get_fixer_by_id(self, fixer_id): """Return the fixer having this (unqualified) ID, or raise KeyError.""" return self._patching_registry[fixer_id]
[docs] def get_relevant_fixers( self, include_fixer_ids="*", include_fixer_families=None, exclude_fixer_ids=None, exclude_fixer_families=None, current_software_version=None, log=None, ): """ Return the list of fixers (as dicts) to be applied for the target software version, based on the metadata of fixers, as well as inclusion/exclusion lists provided as arguments. For inclusion/exclusion filters, a special "*" value means "all fixers", else a list of strings is expected. An output callable `log` may be provided, expecting a string as argument, to debug the reasons why some fixers weren't selected. This method forces a populate() on the registry. """ if not current_software_version: current_software_version = self._get_current_software_version() if not current_software_version: raise ValueError( "PatchingRegistry must be provided a valid current_software_version " "in its constructor or as `get_relevant_fixers` argument." ) self.populate() ALL = "*" log = log or (lambda x: x) current_software_version = tuplify_software_version(current_software_version) relevant_fixers = [] # Shortcut for the common case "no specific inclusion/exclusion lists" mass_include = ( include_fixer_ids == ALL or include_fixer_families == ALL ) and not any((exclude_fixer_ids, exclude_fixer_families)) for fixer_id, fixer in self._patching_registry.items(): assert fixer_id == fixer["fixer_id"], fixer fixer_qualified_name = fixer["fixer_qualified_name"] if ( fixer["fixer_applied_from_version"] is not None and current_software_version < fixer["fixer_applied_from_version"] ): # STRICTLY SMALLER log( "Skipping fixer %s, useful only in next software versions" % fixer_id ) continue if ( fixer["fixer_applied_upto_version"] is not None and current_software_version >= fixer["fixer_applied_upto_version"] ): # GREATER OR EQUAL log( "Skipping fixer %s, useful only in previous software versions" % fixer_id ) continue if not mass_include: included = False if include_fixer_ids == ALL or ( include_fixer_ids and ( fixer_id in include_fixer_ids or fixer_qualified_name in include_fixer_ids ) ): included = True if include_fixer_families == ALL or ( include_fixer_families and fixer["fixer_family"] in include_fixer_families ): included = True if not included: log( "Skipping fixer %s, having neither id nor family (%s) included by patcher settings" % (fixer_id, fixer["fixer_family"]) ) continue if exclude_fixer_ids == ALL or ( exclude_fixer_ids and ( fixer_id in exclude_fixer_ids or fixer_qualified_name in exclude_fixer_ids ) ): log("Skipping fixer %s, excluded by patcher settings" % fixer_id) continue if exclude_fixer_families == ALL or ( exclude_fixer_families and fixer["fixer_family"] in exclude_fixer_families ): log( "Skipping fixer %s, having family %s excluded by patcher settings" % (fixer_id, fixer["fixer_family"]) ) continue # cheers, this fixer has passed all filters! relevant_fixers.append(fixer) return relevant_fixers
[docs] def get_relevant_fixer_ids(self, qualified=False, **kwargs): """"Same as `get_relevant_fixers`, but only returns IDs of selected fixers. If `qualified` is True, returns a fixers IDs dot-prefixed with the family name.""" fixers = self.get_relevant_fixers(**kwargs) field_name = "fixer_qualified_name" if qualified else "fixer_id" return [f[field_name] for f in fixers]
[docs]class MultiPatchingRegistry(object): """ This patching registry wraps a list of other registries, each having its own fixers and current software version. It concatenates and returns selected fixers on demand, assuming that they are compatible with each other. """ def __init__(self, registries): self._registry_references = registries self._is_populated = False self._registries = self._load_registries(registries) def populate(self): res = [] if not self._is_populated: for registry in self._registries: res.append(registry.populate()) self._is_populated = True return res @staticmethod def _load_registries(registry_references): registries = [] for registry_reference in registry_references: original_registry_reference = registry_reference if isinstance(registry_reference, six.string_types): registry_reference = _import_attribute_from_dotted_string( registry_reference ) if six.callable(registry_reference): registry_reference = registry_reference() if not isinstance(registry_reference, PatchingRegistry): raise ValueError( "Wrong registry reference %r" % original_registry_reference ) registries.append(registry_reference) assert len(registries) == len(registry_references) return registries @staticmethod def _flatten(list_of_lists): return list(itertools.chain(*list_of_lists))
[docs] def get_relevant_fixers(self, *args, **kwargs): """Populate underlying registries, and return the concatenation of their selected fixers. Forcing a `current_software_version` as parameter of this method is still possible, but beware that underlying registries all deal with the same software stack, in this case. """ self.populate() return self._flatten( registry.get_relevant_fixers(*args, **kwargs) for registry in self._registries )
[docs] def get_all_fixers(self, *args, **kwargs): """Return the concatenation of all fixers of underlying registries.""" return self._flatten( [registry.get_all_fixers(*args, **kwargs) for registry in self._registries] )
[docs] def get_fixer_by_id(self, fixer_id, *args, **kwargs): """ In case of duplicate fixers having the same ID, just return the first one. """ for registry in self._registries: try: return registry.get_fixer_by_id(fixer_id, *args, **kwargs) except KeyError: pass raise KeyError("Fixer %r not found in any patching registries" % fixer_id)
get_relevant_fixer_ids = six.get_unbound_function( PatchingRegistry.get_relevant_fixer_ids ) # Unmodified