Viewing file: utils.py (3.48 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
import logging from itertools import chain from typing import List, Any
from defence360agent.feature_management import hooks from defence360agent.feature_management.model import FeatureManagementPerms from defence360agent.model import instance from defence360agent.utils import execute_iterable_expression from .lookup import features
logger = logging.getLogger(__name__)
async def set_feature(user: str, feature: str, value: str) -> bool: """Sets a `feature` to `value` for a given `user`.
Calls appropriate hook and returns its (bool) result. Logs the result of setting change. If hook fails rollbacks changes to database. """ with instance.db.atomic() as trx: perm = FeatureManagementPerms.get_perm(user) perm.set_feature(feature, value) hook = hooks.get_hook(feature) ok = hook(user, value) if ok: logger.info( "Applied setting %s=%s for user %s", feature, value, user ) else: logger.error( "Failed to apply setting %s=%s for user %s", feature, value, user, ) trx.rollback() return ok
async def update_users( feature: str, users: List[str], value: Any, existing_users: List[str] ): result = {"succeeded": [], "failed": []}
for user in users: if user not in existing_users: logger.warning("No such user: %s", user) continue
if await set_feature(user, feature, value): result["succeeded"].append(user) else: result["failed"].append(user)
return result
async def update_default(feature: str, value: Any): perm = FeatureManagementPerms.get_default() perm.set_feature(feature, value) hook = hooks.get_hook(feature) return hook(None, value)
async def sync_users(users: List[str]) -> bool: """Synchronize existing permissions with panel users""" panel_users = set(users)
perm_users = FeatureManagementPerms.select(FeatureManagementPerms.user) perm_users = set(chain(*perm_users.tuples()))
perms_to_remove = perm_users - panel_users perms_to_remove.remove(FeatureManagementPerms.DEFAULT)
if perms_to_remove: logger.info("Remove permissions of users %s", perms_to_remove)
def expression(perms_to_remove): return FeatureManagementPerms.delete().where( FeatureManagementPerms.user.in_(perms_to_remove) )
execute_iterable_expression(expression, list(perms_to_remove))
perms_to_add = panel_users - perm_users
if perms_to_add: logger.info("Add permissions to users %s", perms_to_add)
for user in perms_to_add: perm = FeatureManagementPerms.get_perm(user)
for feature in features: value = perm.get_feature(feature) callback = hooks.get_hook(feature) callback(user, value) return bool(perms_to_add) or bool(perms_to_remove)
async def reset_features(**features): """Sets feature values for all existing users in feature management database to given values in `features`.""" users = list( chain( *FeatureManagementPerms.select(FeatureManagementPerms.user) .where( FeatureManagementPerms.user != FeatureManagementPerms.DEFAULT ) .tuples() ) ) for user in users: for feature, value in features.items(): await set_feature(user, feature, value)
|