Viewing file: myimunify_id.py (4.4 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
import pwd import uuid from pathlib import Path from typing import Dict, List, Optional
from defence360agent.contracts.permissions import logger from defence360agent.model import instance from defence360agent.myimunify.model import MyImunify, update_users_protection from defence360agent.subsys.panels.hosting_panel import HostingPanel from defence360agent.utils import safe_fileops
MYIMUNIFY_ID_FILE_NAME = ".myimunify_id"
class MyImunifyIdError(Exception): """Exception representing issues related to MyImunify id"""
async def add_myimunify_user( sink, user: str, protection: bool ) -> Optional[str]: """Save subscription type to the DB and generate id file"""
myimunify, _ = MyImunify.get_or_create(user=user) myimunify.save() await update_users_protection(sink, [user], protection) logger.info("Applied setting MyImunify=%s for user %s", protection, user)
try: myimunify_id = await _get_or_generate_id(user) except MyImunifyIdError: # User no longer exists return None
return myimunify_id
async def get_myimunify_users() -> List[Dict]: """ Get a list of MyImunify users, their subscription types and unique ids """
users = [] user_details = await HostingPanel().get_user_details() myimunify_user_to_id = await _myimunify_user_to_id() with instance.db.transaction(): for user, myimunify_uid in sorted(myimunify_user_to_id.items()): record, _ = MyImunify.get_or_create(user=user) users.append( { "email": user_details.get(user, {}).get("email", ""), "username": user, "myimunify_id": myimunify_uid, "protection": record.protection, "locale": user_details.get(user, {}).get("locale", ""), } ) return users
async def _myimunify_user_to_id() -> Dict[str, str]: """Get a list of users and their MyImunify ids"""
user_to_id = {} for user in await HostingPanel().get_users(): try: user_to_id[user] = await _get_or_generate_id(user) except MyImunifyIdError: # User does not exist continue except safe_fileops.UnsafeFileOperation as e: logger.error( "Unable to generate id for user=%s, error=%s", user, str(e) ) continue return user_to_id
async def _get_or_generate_id(user: str) -> str: """ Read MyImunify id if exists or generate a new one and write into the file """ id_file = await _get_myimunify_id_file(user) try: return _read_id(id_file) except (FileNotFoundError, MyImunifyIdError): myimunify_id = uuid.uuid1().hex return await _write_id(myimunify_id, id_file)
async def _write_id(myimunify_id: str, id_file: Path) -> str: """Write MyImunify id to file""" text = ( "# DO NOT EDIT\n" "# This file contains MyImunify id unique to this user\n" "\n" f"{myimunify_id}\n" ) try: await safe_fileops.write_text(str(id_file), text) except (OSError, PermissionError) as e: logger.error("Unable to write myimunify_id in user home dir: %s", e) raise MyImunifyIdError from e return myimunify_id
def _read_id(id_file: Path) -> str: """Read MyImunify id from file"""
with id_file.open("r") as f: for line in reversed(f.readlines()): if line and not line.startswith("#"): if myimunify_id := line.strip(): return myimunify_id
raise MyImunifyIdError
async def _get_myimunify_id_file(user: str) -> Path: """Get a file with MyImunify id and create it if does not exist"""
try: user_pwd = pwd.getpwnam(user) except KeyError as e: logger.error("No such user: %s", user) raise MyImunifyIdError from e else: id_file = Path(user_pwd.pw_dir) / MYIMUNIFY_ID_FILE_NAME if not id_file.exists(): if not id_file.parent.exists(): logger.error("No such user homedir: %s", user) raise MyImunifyIdError try: await safe_fileops.touch(str(id_file)) except (PermissionError, OSError) as e: logger.error( "Unable to put myimunify_id in user home dir: %s", e ) raise MyImunifyIdError from e return id_file
|