Viewing file: model.py (3.42 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
import asyncio import itertools import logging from typing import List, Optional
from peewee import BooleanField, CharField
import defence360agent.subsys.panels.hosting_panel as hp from defence360agent.contracts.messages import MessageType from defence360agent.model import Model, instance from defence360agent.model.simplification import run_in_executor from defence360agent.utils import execute_iterable_expression from defence360agent.utils.config import update_config from imav.malwarelib.model import MalwareHit
logger = logging.getLogger(__name__)
class MyImunify(Model): """Secure-site related settings"""
class Meta: database = instance.db db_table = "myimunify"
#: The username of the end-user, or an empty string for the default value #: for all new users. user = CharField(unique=True)
#: Is MyImunify protection enabled/disabled to the end-user. protection = BooleanField(null=False, default=False)
@classmethod def get_protection(cls, user: Optional[str]) -> bool: """Get SecureSite protection by username""" if user is None or user == "root": # root return True
perm, _ = cls.get_or_create(user=user, defaults={"protection": False}) return perm.protection
@classmethod def update_users_protection(cls, users: List[str], status: bool): cls.insert_many( [{"user": user, "protection": status} for user in users] ).on_conflict( conflict_target=[cls.user], preserve=[], update={cls.protection: status}, ).execute()
async def malware_cleanup(sink, user): hits = MalwareHit.malicious_select(user=user, cleanup=True) if hits: await sink.process_message(MessageType.MalwareCleanupTask(hits=hits))
async def update_users_protection(sink, users: List[str], status: bool): await run_in_executor( None, lambda: MyImunify.update_users_protection(users, status), ) proactive_mode = None if not status: proactive_mode = "LOG" tasks = [ update_config( sink, {"PROACTIVE_DEFENCE": {"mode": proactive_mode}}, user, ) for user in users ]
if status: tasks += [malware_cleanup(sink, user) for user in users]
await asyncio.gather(*tasks)
async def set_protection_status_for_all_users(sink, status: bool): """Set protection status for all users""" panel_users = await hp.HostingPanel().get_users() await update_users_protection(sink, panel_users, status)
async def sync_users(sink, users: List[str]) -> bool: """Synchronize existing permissions with myimunify users""" panel_users = set(users)
myimunify_users = MyImunify.select(MyImunify.user) myimunify_users = set(itertools.chain(*myimunify_users.tuples()))
users_to_remove = myimunify_users - panel_users
if users_to_remove: logger.info("Remove myimunify users %s", users_to_remove)
def expression(users_to_remove): return MyImunify.delete().where( MyImunify.user.in_(users_to_remove) )
execute_iterable_expression(expression, list(users_to_remove))
users_to_add = panel_users - myimunify_users
if users_to_add: logger.info("Add permissions to users %s", users_to_add) await update_users_protection(sink, users, False) return bool(users_to_add) or bool(users_to_remove)
|