Viewing file: _preloaded.py (5.63 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# util/_preloaded.py # Copyright (C) 2005-2021 the SQLAlchemy authors and contributors # <see AUTHORS file> # # This module is part of SQLAlchemy and is released under # the MIT License: http://www.opensource.org/licenses/mit-license.php
"""Legacy routines to resolve circular module imports at runtime.
These routines are replaced in 1.4.
"""
from functools import update_wrapper
from . import compat
class _memoized_property(object): """vendored version of langhelpers.memoized_property.
not needed in the 1.4 version of preloaded.
"""
def __init__(self, fget, doc=None): self.fget = fget self.__doc__ = doc or fget.__doc__ self.__name__ = fget.__name__
def __get__(self, obj, cls): if obj is None: return self obj.__dict__[self.__name__] = result = self.fget(obj) return result
def _format_argspec_plus(fn, grouped=True): """vendored version of langhelpers._format_argspec_plus.
not needed in the 1.4 version of preloaded.
""" if compat.callable(fn): spec = compat.inspect_getfullargspec(fn) else: spec = fn
args = compat.inspect_formatargspec(*spec) if spec[0]: self_arg = spec[0][0] elif spec[1]: self_arg = "%s[0]" % spec[1] else: self_arg = None
apply_pos = compat.inspect_formatargspec( spec[0], spec[1], spec[2], None, spec[4] ) num_defaults = 0 if spec[3]: num_defaults += len(spec[3]) if spec[4]: num_defaults += len(spec[4]) name_args = spec[0] + spec[4]
if num_defaults: defaulted_vals = name_args[0 - num_defaults :] else: defaulted_vals = ()
apply_kw = compat.inspect_formatargspec( name_args, spec[1], spec[2], defaulted_vals, formatvalue=lambda x: "=" + x, ) if grouped: return dict( args=args, self_arg=self_arg, apply_pos=apply_pos, apply_kw=apply_kw, ) else: return dict( args=args[1:-1], self_arg=self_arg, apply_pos=apply_pos[1:-1], apply_kw=apply_kw[1:-1], )
class dependencies(object): """Apply imported dependencies as arguments to a function.
E.g.::
@util.dependencies( "sqlalchemy.sql.widget", "sqlalchemy.engine.default" ); def some_func(self, widget, default, arg1, arg2, **kw): # ...
Rationale is so that the impact of a dependency cycle can be associated directly with the few functions that cause the cycle, and not pollute the module-level namespace.
"""
def __init__(self, *deps): self.import_deps = [] for dep in deps: tokens = dep.split(".") self.import_deps.append( dependencies._importlater(".".join(tokens[0:-1]), tokens[-1]) )
def __call__(self, fn): import_deps = self.import_deps spec = compat.inspect_getfullargspec(fn)
spec_zero = list(spec[0]) hasself = spec_zero[0] in ("self", "cls")
for i in range(len(import_deps)): spec[0][i + (1 if hasself else 0)] = "import_deps[%r]" % i
inner_spec = _format_argspec_plus(spec, grouped=False)
for impname in import_deps: del spec_zero[1 if hasself else 0] spec[0][:] = spec_zero
outer_spec = _format_argspec_plus(spec, grouped=False)
code = "lambda %(args)s: fn(%(apply_kw)s)" % { "args": outer_spec["args"], "apply_kw": inner_spec["apply_kw"], }
decorated = eval(code, locals()) decorated.__defaults__ = getattr(fn, "im_func", fn).__defaults__ return update_wrapper(decorated, fn)
@classmethod def resolve_all(cls, path): for m in list(dependencies._unresolved): if m._full_path.startswith(path): m._resolve()
_unresolved = set() _by_key = {}
class _importlater(object): _unresolved = set()
_by_key = {}
def __new__(cls, path, addtl): key = path + "." + addtl if key in dependencies._by_key: return dependencies._by_key[key] else: dependencies._by_key[key] = imp = object.__new__(cls) return imp
def __init__(self, path, addtl): self._il_path = path self._il_addtl = addtl dependencies._unresolved.add(self)
@property def _full_path(self): return self._il_path + "." + self._il_addtl
@_memoized_property def module(self): if self in dependencies._unresolved: raise ImportError( "importlater.resolve_all() hasn't " "been called (this is %s %s)" % (self._il_path, self._il_addtl) )
return getattr(self._initial_import, self._il_addtl)
def _resolve(self): dependencies._unresolved.discard(self) self._initial_import = compat.import_( self._il_path, globals(), locals(), [self._il_addtl] )
def __getattr__(self, key): if key == "module": raise ImportError( "Could not resolve module %s" % self._full_path ) try: attr = getattr(self.module, key) except AttributeError: raise AttributeError( "Module %s has no attribute '%s'" % (self._full_path, key) ) self.__dict__[key] = attr return attr
|