Viewing file: plugin.py (18.45 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import asyncio import json import logging import os import pwd import shutil import time from collections import defaultdict from pathlib import Path
import sentry_sdk from peewee import SqliteDatabase from defence360agent.api import inactivity from defence360agent.contracts.config import ( MalwareScanSchedule, MalwareScanScheduleInterval as Interval, ) from defence360agent.utils import atomic_rewrite, check_run from imav.model.wordpress import WordpressSite, WPSite from imav.wordpress import cli, PLUGIN_SLUG, telemetry from imav.wordpress.utils import ( build_command_for_user, calculate_next_scan_timestamp, clear_get_cagefs_enabled_users_cache, get_last_scan, get_malware_history, )
logger = logging.getLogger(__name__)
COMPONENTS_DB_PATH = Path( "/var/lib/cloudlinux-app-version-detector/components_versions.sqlite3" )
def get_data_dir(site: WPSite): return Path(site.docroot) / "wp-content" / "imunify-security"
async def _get_scan_data_for_user(sink, username: str, uid: int): # Get the last scan data last_scan = await get_last_scan(sink, username)
# Extract the last scan date last_scan_time = last_scan.get("scan_date", None)
next_scan_time = None if MalwareScanSchedule.INTERVAL != Interval.NONE: next_scan_time = calculate_next_scan_timestamp()
# Get all WordPress sites for the user (the main site is always last) all_users_sites = get_sites_for_user(uid)
# Get the malware history for the user malware_history = get_malware_history(username)
# Split malware history by site. This part relies on the main site being the last one in the list. # Without this all malware could be attributed to the main site. malware_by_site = defaultdict(list) for item in malware_history: if item["resource_type"] == "file": for site_path in all_users_sites: if item["file"].startswith(site_path): malware_by_site[site_path].append(item) break
return last_scan_time, next_scan_time, malware_by_site
async def _send_telemetry_task(coro, semaphore: asyncio.Semaphore): async with semaphore: try: await coro except Exception as e: logger.error(f"Telemetry task failed: {e}")
async def process_telemetry_tasks(coroutines: list, concurrency=10): semaphore = asyncio.Semaphore(concurrency) tasks = [ asyncio.create_task(_send_telemetry_task(coro, semaphore)) for coro in coroutines ]
try: await asyncio.gather(*tasks) except Exception as e: logger.error(f"Some telemetry tasks failed: {e}")
async def install_for_users(users: set[str], sink): """Install the imunify-security plugin for all sites where it is not installed.""" logger.info("Installing imunify-security wp plugin")
# Keep track of the installed sites installed = set() telemetry_coros = [] with inactivity.track.task("wp-plugin-installation"): try: clear_get_cagefs_enabled_users_cache()
to_install = _get_sites_without_plugin() - set( WPSite(r.docroot, r.domain, r.uid) for r in WordpressSite.select() )
if not to_install: return
# Group sites by user id sites_by_user = defaultdict(list) for site in to_install: sites_by_user[site.uid].append(site)
# Now iterate over the grouped sites for uid, sites in sites_by_user.items(): try: username = pwd.getpwuid(uid).pw_name except Exception as error: sentry_sdk.capture_message( "Skipping installation of WordPress plugin on" " {count} site(s) because they belong to user" " {user} and it is not possible to retrieve" " username for this user. Reason: {reason}".format( count=len(sites), user=uid, reason=error, ), level="warning", ) continue
if username not in users: # Skip the user if it's not in the list of users to install the plugin for continue
( last_scan_time, next_scan_time, malware_by_site, ) = await _get_scan_data_for_user(sink, username, uid)
for site in sites: try: # Check if site is correctly installed and accessible using WP CLI is_wordpress_installed = ( await cli.is_wordpress_installed(site) ) if not is_wordpress_installed: sentry_sdk.capture_message( "WordPress site is not accessible using WP" " CLI. site={site}".format(site=site), level="warning", ) continue
# Prepare the JSON data json_data = { "lastScanTimestamp": last_scan_time, "nextScanTimestamp": next_scan_time, "malware": malware_by_site.get(site.docroot, []), }
# Create the scan data file await update_scan_data_file(site, json_data)
# Install the plugin await cli.plugin_install(site) installed.add(site)
# Prepare telemetry telemetry_coros.append( telemetry.send_event( sink=sink, event="installed_by_imunify", site=site, ) ) except Exception as error: logger.error( "Failed to install plugin to site=%s error=%s", site, error, )
logger.info( "Installed imunify-security wp plugin on %d sites", len(installed), ) except asyncio.CancelledError: logger.info( "Installation imunify-security wp plugin was cancelled. Plugin" " was installed for %d sites", len(installed), ) except Exception as error: logger.error( "Error occurred during plugin installation. error=%s", error ) raise finally: WordpressSite.insert_many( [ { "domain": site.domain, "docroot": site.docroot, "uid": site.uid, "manually_deleted_at": None, } for site in installed ] ).execute() # Send telemetry await process_telemetry_tasks(telemetry_coros)
async def delete_plugin_files(site: WPSite): data_dir = get_data_dir(site) if data_dir.exists(): await asyncio.to_thread(shutil.rmtree, data_dir)
async def remove_all_installed(sink): """Remove the imunify-security plugin from all sites where it is installed.""" logger.info("Deleting imunify-security wp plugin")
telemetry_coros = [] affected = 0 with inactivity.track.task("wp-plugin-removal"): try: to_remove = WordpressSite.select().where( WordpressSite.manually_deleted_at.is_null(True) )
for site in to_remove: try: # Uninstall the plugin from WordPress site. await cli.plugin_deactivate(site) # Delete the data files from the site. await delete_plugin_files(site) # Delete the site from database. affected += ( WordpressSite.delete() .where(WordpressSite.docroot == site.docroot) .execute() )
# Send telemetry telemetry_coros.append( telemetry.send_event( sink=sink, event="uninstalled_by_imunify", site=site, ) ) except Exception as error: logger.error( "Failed to remove plugin from %s %s", site, error )
except asyncio.CancelledError: logger.info( "Deleting imunify-security wp plugin was cancelled. Plugin was" " deleted from %d sites", len(to_remove), ) except Exception as error: logger.error("Error occurred during plugin deleting. %s", error) raise finally: logger.info( "Removed imunify-security wp plugin from %s sites", affected, ) if affected > 0: # send telemetry await process_telemetry_tasks(telemetry_coros)
async def mark_site_as_manually_deleted(site, now): logger.info( "Mark site %s as manually deleted at %s (WP-Plugin removed)", site, now ) ( WordpressSite.update(manually_deleted_at=now) .where(WordpressSite.docroot == site.docroot) .execute() )
async def tidy_up_manually_deleted(sink): telemetry_coros = [] try: to_mark_as_manually_removed = _get_sites_without_plugin() & set( WPSite(r.docroot, r.domain, r.uid) for r in WordpressSite.select().where( WordpressSite.manually_deleted_at.is_null() ) )
if to_mark_as_manually_removed: now = time.time() for site in to_mark_as_manually_removed: await mark_site_as_manually_deleted(site, now)
# Prepare telemetry telemetry_coros.append( telemetry.send_event( sink=sink, event="removed_by_user", site=site, ) )
except Exception as error: logger.error("Error occurred during site tidy up. %s", error) finally: if telemetry_coros: await process_telemetry_tasks(telemetry_coros)
async def update_data_on_sites(sink, sites: list[WPSite]): if not sites: return
# Group sites by user id sites_by_user = defaultdict(list) for site in sites: sites_by_user[site.uid].append(site)
# Now iterate over the grouped sites for uid, sites in sites_by_user.items(): try: username = pwd.getpwuid(uid).pw_name except Exception as error: logger.error( "Failed to get username for uid=%d. error=%s", uid, error, ) continue
( last_scan_time, next_scan_time, malware_by_site, ) = await _get_scan_data_for_user(sink, username, uid)
for site in sites: try: # Prepare the JSON data json_data = { "lastScanTimestamp": last_scan_time, "nextScanTimestamp": next_scan_time, "malware": malware_by_site.get(site.docroot, []), }
# Update the scan data file await update_scan_data_file(site, json_data) except Exception as error: logger.error( "Failed to update scan data on site=%s error=%s", site, error, )
async def update_scan_data_file(site: WPSite, json_data: dict): # Get the gid for the given user user_info = pwd.getpwuid(site.uid) gid = user_info.pw_gid
# Create data directory data_dir = get_data_dir(site) if os.path.islink(data_dir): # If the data directory is a symlink, interrupt the process. raise Exception( "Data directory %s is a symlink, skipping.", str(data_dir) )
if not data_dir.exists(): command = build_command_for_user( user_info.pw_name, [ "mkdir", "-p", str(data_dir), ], )
await check_run(command)
if not data_dir.exists(): # Directory creation failed. Interrupt the process. raise Exception( "Failed to create directory %s for user %s", str(data_dir), user_info.pw_name, )
# we can safely change the permissions of the directory because we just created it data_dir.chmod(0o750)
scan_data_path = data_dir / "scan_data.php"
# Format the PHP file content php_content = ( "<?php\n" "if ( ! defined( 'WPINC' ) ) {\n" "\texit;\n" "}\n" "return json_decode( '" + json.dumps(json_data).replace("'", "\\'") + "', true );" )
# Check if the file exists, create an empty file if it doesn't if not scan_data_path.exists(): scan_data_path.touch()
# Write the formatted PHP file atomic_rewrite( scan_data_path, php_content, backup=False, uid=site.uid, gid=gid, permissions=0o400, )
def _get_sites_without_plugin() -> set[WPSite]: """ Get a set of wp sites where imunify-security plugin is not installed.
The data is pulled from the app-version-detector database. """ if not COMPONENTS_DB_PATH.exists(): logger.error( "App detector database '%s' couldn't be found.", str(COMPONENTS_DB_PATH), ) return set()
cursor = SqliteDatabase(COMPONENTS_DB_PATH).execute_sql( f""" WITH latest_reports AS ( SELECT id, uid, domain FROM report WHERE id IN ( SELECT MAX(id) FROM report WHERE domain IS NOT NULL AND domain != '' GROUP BY dir ) ) SELECT wp.real_path, lr.domain, lr.uid FROM apps AS wp INNER JOIN latest_reports AS lr ON wp.report_id = lr.id WHERE wp.title = 'wp_core' AND wp.parent_id IS NULL AND NOT EXISTS ( SELECT 1 FROM apps AS plugin WHERE plugin.parent_id = wp.id AND plugin.title = 'wp_plugin_{PLUGIN_SLUG.replace("-", "_")}' ) """ ) return { WPSite(docroot=row[0], domain=row[1], uid=int(row[2])) for row in cursor.fetchall() }
def get_sites_for_user(uid: int) -> list[str]: """ Get a set of paths to WordPress sites belonging to a particular user. Paths are sorted by their length to make sure that the main site is the last one in the list.
The data is pulled from the app-version-detector database. """ if not COMPONENTS_DB_PATH.exists(): logger.error( "App detector database '%s' couldn't be found.", str(COMPONENTS_DB_PATH), ) return list()
cursor = SqliteDatabase(COMPONENTS_DB_PATH).execute_sql( f""" WITH latest_reports AS ( SELECT MAX(id) as id FROM report WHERE uid = {uid} GROUP BY dir ) SELECT wp.real_path FROM apps AS wp INNER JOIN latest_reports AS lr ON wp.report_id = lr.id WHERE wp.title = 'wp_core' AND wp.parent_id IS NULL GROUP BY wp.real_path ORDER BY length(wp.real_path) DESC """ ) return [row[0] for row in cursor.fetchall()]
def get_sites_by_path(path: str) -> list[WPSite]: """ Get a set of wp sites by given path.
The data is pulled from the app-version-detector database. """ if not COMPONENTS_DB_PATH.exists(): logger.error( "App detector database '%s' couldn't be found.", str(COMPONENTS_DB_PATH), ) return list()
# Append * to the path to get all sites that start with the given path. Only if the path doesn't already end with *. if not path.endswith("*"): path += "/*"
cursor = SqliteDatabase(COMPONENTS_DB_PATH).execute_sql( f""" WITH latest_reports AS ( SELECT id, uid, domain FROM report WHERE id IN ( SELECT MAX(id) FROM report WHERE domain IS NOT NULL AND domain != '' GROUP BY dir ) ) SELECT wp.real_path, lr.domain, lr.uid FROM apps AS wp INNER JOIN latest_reports AS lr ON wp.report_id = lr.id WHERE wp.title = 'wp_core' AND wp.parent_id IS NULL AND wp.real_path GLOB '{path}' """ ) return [ WPSite(docroot=row[0], domain=row[1], uid=int(row[2])) for row in cursor.fetchall() ]
|