Viewing file: sentry.py (3.05 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
"""Helper for integrate sentry in stand-alone scripts""" import json import os import subprocess
from contextlib import suppress from pathlib import Path from typing import List
import distro import sentry_sdk
from defence360agent.application import tags from defence360agent.contracts import sentry
IMUNIFY360 = "imunify360" IMUNIFYAV = "imunify-antivirus" IMUNIFY360_PKG = "imunify360-firewall" LICENSE = "/var/imunify360/license.json" LICENSE_FREE = "/var/imunify360/license-free.json" FREE_ID = "IMUNIFYAV" UNKNOWN_ID = "UNKNOWN" SENTRY_DSN_PATH = Path("/opt/imunify360/venv/share/imunify360/sentry") SENTRY_DSN_DEFAULT = "https://6de77a2763bd40c58fc9e3a89285aaa8@im360.sentry.cloudlinux.com/3?timeout=20" # noqa: E501
def get_sentry_dsn() -> str: """Return dsn from the file or the default one.""" try: return SENTRY_DSN_PATH.read_text(encoding="ascii").strip() except (OSError, UnicodeDecodeError): return SENTRY_DSN_DEFAULT
def get_server_id() -> str: with suppress(Exception): for filename in [LICENSE, LICENSE_FREE]: with suppress(FileNotFoundError), open(filename) as file: return json.load(file)["id"] return UNKNOWN_ID
def collect_output(cmd: List[str]) -> str: try: cp = subprocess.run( cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, ) except OSError: return "" if cp.returncode != 0: return "" return os.fsdecode(cp.stdout)
def get_rpm_version(pkg: str) -> str: cmd = ["rpm", "-q", "--queryformat=%{VERSION}-%{RELEASE}", pkg] return collect_output(cmd)
def get_dpkg_version(pkg: str) -> str: cmd = ["dpkg-query", "--showformat=${Version}", "--show", pkg] return collect_output(cmd)
def get_current_os(): platform_os = distro.linux_distribution()[0] return platform_os.lower()
def get_package_name(): platform_os = get_current_os() service_name = IMUNIFY360_PKG if platform_os != "ubuntu" and get_rpm_version(IMUNIFYAV): service_name = IMUNIFYAV else: service_name = IMUNIFY360 return service_name
def get_service_version(service_name) -> str: platform_os = get_current_os() if platform_os != "ubuntu": version = get_rpm_version(service_name) else: version = get_dpkg_version(service_name) return version
def configure_sentry(): # using LoggingIntegration (contained in default Integrations) # logging event with *error* level will be reported to Sentry automatically sentry_sdk.init(dsn=get_sentry_dsn()) with sentry_sdk.configure_scope() as scope: package = get_package_name() scope.user = {"id": get_server_id()} scope.set_tag("name", package) scope.set_tag("version", get_service_version(package)) tags.cached_fill() for tag, value in sentry.tags().items(): scope.set_tag(tag, value)
def flush_sentry(): client = sentry_sdk.Hub.current.client if client is not None: client.flush(timeout=2.0)
|