Viewing file: fpm_utils.py (2.25 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT
from dataclasses import dataclass from typing import Optional
from xray import gettext as _ from .constants import fpm_stat_storage, fpm_reload_timeout from .exceptions import XRayError from .utils import dbm_storage, timestamp
@dataclass class FPMReloadController: """ Class to control the frequency of FPM service reloading """ fpm_service: str
@property def latest_reload(self) -> Optional[int]: """ Get the timestamp of latest reload of FPM service """ try: with dbm_storage(fpm_stat_storage) as _fpm_reload_stat: try: return int(_fpm_reload_stat[self.fpm_service].decode()) except KeyError: return None except RuntimeError as e: raise XRayError( _('Failed to get the timestamp of latest FPM reload: %s') % str(e), flag='warning')
def save_latest_reload(self): """ Save the timestamp of latest reload of FPM service """ try: # save timestamp of latest reload of corresponding service with dbm_storage(fpm_stat_storage) as _fpm_reload_stat: _fpm_reload_stat[self.fpm_service] = str(timestamp()) except RuntimeError as e: raise XRayError( _('Failed to save the timestamp of latest FPM reload %s, but main operation executed') % str(e), flag='warning')
def delta(self) -> float: """ Get the time delta between current time and saved timestamp """ try: # we store timestamp in seconds, # but timeout -- fpm_reload_timeout -- in minutes return (timestamp() - self.latest_reload) / 60 except (TypeError, XRayError): # 1 hour, # big enough for non-blocking flow in case of unexpected errors return 60.0
def restrict(self) -> bool: """ Check if FPM reload should be blocked """ return self.delta() <= fpm_reload_timeout
|