Viewing file: common.py (2.75 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT
""" This module contains basic part of continuous tracingg implementation """
import logging import shelve from typing import List
from xray import gettext as _ from ..internal.constants import continuous_storage from ..internal.exceptions import XRayError from ..internal.types import ContinuousTask
class ContinuousCommon: """ Base class for continuous tracing """
def __init__(self): self.storage = continuous_storage self.logger = logging.getLogger('continuous') self.tracing_conf = self.load_tracing_configuration()
def load_tracing_configuration(self) -> dict: """ Load full continuous tracing configuration from the local storage """ self.logger.info('Loading continuous tracing configuration') try: with shelve.open(self.storage) as db: return {item: db[item] for item in db.keys()} except OSError as e: self.logger.error( 'Failed to load continuous tracing configuration', extra={'err': str(e)}) return dict()
def get_continuous_tasks(self) -> List[ContinuousTask]: """ Return existing configuration in the form of list """ return sorted( {ContinuousTask(**item) for item in self.tracing_conf.values()})
def dump_tracing_configuration(self) -> List[dict]: """ Dump continuous tracing configuration into local storage """ try: with shelve.open(self.storage) as db: for item in self.tracing_conf: db[item] = self.tracing_conf[item] return [db[item] for item in db.keys()] except OSError as e: raise XRayError( _('Failed to dump continuous configuration: %s') % str(e))
def remove_tracing_configuration(self, domain: str) -> None: """ Remove continuous tracing configuration for domain """ try: del self.tracing_conf[domain] with shelve.open(self.storage) as db: del db[domain] except KeyError: self.logger.error('Continuous monitoring is not enabled', extra={'domain': domain, 'tracing_entries': self.tracing_conf}) raise XRayError( _('Continuous monitoring for %s is not enabled') % domain) except OSError as e: raise XRayError( _('Failed to remove continuous configuration for {}: {}'.format(domain, str(e))))
|