Viewing file: lookup.py (2.76 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
import inspect from typing import Any, Callable, List
from ..contracts.config import MyImunifyConfig from ..contracts.permissions import is_plesk_service_plan_enabled from ..rpc_tools.lookup import wraps from .checkers import check_feature from .exceptions import UserArgumentNotFound
features = set() # feature storage
def _wrapper( name: str, permissions: List[str], func: Callable[..., Any], user_key: str ) -> Callable[..., Any]: """ Wrapper to enable feature management for func
:param name: feature name :param func: function/method to wrap :param user_key: parameter name which contains user name :return: new callable object """ signature = inspect.signature(func) if user_key not in signature.parameters: raise UserArgumentNotFound( "Expecting argument '%s' for %s", user_key, func )
user_param = signature.parameters[user_key] user_key_required = user_param.default is user_param.empty
def checker(**kwargs): if user_key_required and user_key not in kwargs: raise UserArgumentNotFound( "Argument '%s' for '%s' must be specified explicitly", user_key, func, )
if MyImunifyConfig.ENABLED: """Ignore the decorator if MyImunify is enabled""" return
if is_plesk_service_plan_enabled(): """Ignore the decorator if Plesk service plan is enabled""" return
user = kwargs.get(user_key, user_param.default) check_feature(name, permissions, user)
@wraps(func) def wrapper(*args, **kwargs): checker(**kwargs) return func(*args, **kwargs)
@wraps(func) async def async_wrapper(*args, **kwargs): checker(**kwargs) return await func(*args, **kwargs)
if inspect.iscoroutinefunction(func): return async_wrapper return wrapper
def feature( name: str, permissions: List[str], user_key="user" ) -> Callable[[Any], Any]: """ Get decorator to manage function/method with feature management
:param name: feature name :param user_key: parameter name which contains user name :param permissions: list of permission values, with which user can access specifig endpoint :return: decorator """
def decorator(obj): if inspect.isclass(obj): for m_name, m_obj in getattr(obj, "__dict__", {}).items(): if not m_name.startswith("_") and inspect.isfunction(m_obj): wrapper = _wrapper(name, permissions, m_obj, user_key) setattr(obj, m_name, wrapper) elif inspect.isfunction(obj): obj = _wrapper(name, permissions, obj, user_key)
features.add(name)
return obj
return decorator
|