Viewing file: plesk_notifications.py (3.82 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import logging from functools import lru_cache from pathlib import Path
from defence360agent.contracts.plugins import MessageSink from defence360agent.contracts.hooks import HooksConfig from defence360agent.subsys import notifier
logger = logging.getLogger(__name__)
SCRIPT_PATH = ( "/opt/psa/admin/plib/modules/imunify360/scripts/send-notifications.php" ) HOOK_PATH = "/opt/imunify360/venv/share/imunify360/scripts/send-notifications" EVENTS = [ "CUSTOM_SCAN_MALWARE_FOUND", "USER_SCAN_MALWARE_FOUND", "REALTIME_MALWARE_FOUND", ]
class PleskNotificationsHooks(MessageSink): async def create_sink(self, loop): """MessageSink method""" if self.is_supported(): if not self.is_applied(): await self.add_hooks() else: await self.remove_hooks()
@lru_cache(maxsize=1) def is_supported(self) -> bool: return Path(SCRIPT_PATH).exists() and Path(HOOK_PATH).exists()
def is_applied(self) -> bool: config = HooksConfig().get() return all( [ HOOK_PATH in rule["SCRIPT"]["scripts"] for event, rule in config.get("rules", {}).items() if event in EVENTS ] )
async def add_hooks(self): config = HooksConfig().get() data = { "rules": { event: rule for event, rule in config.get("rules", {}).items() if event in EVENTS } } updated = False for event, rule in data["rules"].items(): if HOOK_PATH not in rule["SCRIPT"]["scripts"]: rule["SCRIPT"]["enabled"] = True rule["SCRIPT"]["scripts"].append(HOOK_PATH) updated = True if updated: HooksConfig().update(data) try: await notifier.config_updated() except ConnectionRefusedError: logger.warning( "Notifier is not running, cannot send CONFIG_UPDATED event" ) else: logger.info("Hooks added and configuration updated")
async def remove_hooks(self): config = HooksConfig().get() data = { "rules": { event: rule for event, rule in config.get("rules", {}).items() if event in EVENTS } } updated = False for event, rule in data["rules"].items(): if HOOK_PATH in rule["SCRIPT"]["scripts"]: rule["SCRIPT"]["scripts"].remove(HOOK_PATH) rule["SCRIPT"]["enabled"] = len(rule["SCRIPT"]["scripts"]) != 0 updated = True if updated: HooksConfig().update(data) try: await notifier.config_updated() except ConnectionRefusedError: logger.warning( "Notifier is not running, cannot send CONFIG_UPDATED event" ) else: logger.info("Hooks removed and configuration updated")
|