Viewing file: resource_limits.py (2.29 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
import asyncio import logging
from enum import Enum from os import fsdecode from pathlib import Path from typing import List
from defence360agent.utils import OsReleaseInfo
logger = logging.getLogger(__name__)
RUN_WITH_INTENSITY = "/usr/libexec/run-with-intensity"
LVECTL_BIN_PATH = Path("/usr/sbin/lvectl") PROC_LVE_LIST_PATH = Path("/proc/lve/list")
class LimitsMethod(Enum): NICE = "nice" LVE = "lve" CGROUPS = "cgroups"
async def get_current_method() -> LimitsMethod: """Returns limit method, used in run-with-intensity tool.""" proc = await asyncio.create_subprocess_exec( RUN_WITH_INTENSITY, "show", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, )
stdout, stderr = await proc.communicate() stdout = fsdecode(stdout).strip()
if stdout == "nice": return LimitsMethod.NICE if stdout == "lve": return LimitsMethod.LVE if stdout == "cgroups": return LimitsMethod.CGROUPS raise LookupError( "Parsing of used limitation method failed\nstdout: {}\nstderr: {}" .format(stdout, fsdecode(stderr).strip()) )
async def create_subprocess( cmd: List[str], key: str, intensity_cpu: int, intensity_io: int, **subprocess_kwargs ) -> asyncio.subprocess.Process: """ Creates asyncio.Process with limited resources (cpu & io), using run-with-intensity tool.
:param cmd: command to execute :param intensity_cpu: cpu intensity limit :param intensity_io: io intensity limit :param subprocess_kwargs: keyword arguments for create_subprocess_exec func :return: executed Process """ limits_cmd = [ RUN_WITH_INTENSITY, "run", "--intensity-cpu", str(intensity_cpu), "--intensity-io", str(intensity_io), ] limits_cmd.extend(["--key", key])
return await asyncio.create_subprocess_exec( *(limits_cmd + cmd), **subprocess_kwargs )
def is_lve_active() -> bool: """Checks that LVE-utils is active resource limiter.""" # to avoid possible errors such as DEF-11941 # make sure that OS is CL return PROC_LVE_LIST_PATH.exists() and OsReleaseInfo.is_cloudlinux()
def has_lvectl() -> bool: """Checks that LVE-utils is installed.""" return LVECTL_BIN_PATH.exists()
|