Viewing file: schedule_watcher.py (4.29 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ from contextlib import suppress from logging import getLogger from pathlib import Path
from defence360agent import utils from defence360agent.contracts.config import ANTIVIRUS_MODE from defence360agent.contracts.config import MalwareScanSchedule as Config from defence360agent.contracts.config import ( MalwareScanScheduleInterval as Interval, ) from defence360agent.contracts.config import SystemConfig from defence360agent.contracts.license import LicenseCLN from defence360agent.contracts.messages import MessageType from defence360agent.contracts.plugins import ( MessageSink, MessageSource, expect, ) from imav.malwarelib.utils import reset_malware_schedule
logger = getLogger(__name__)
AVAILABLE_INTERVALS = [ Interval.NONE, Interval.DAY, Interval.WEEK, Interval.MONTH, ]
AVP_INTERVALS = [ Interval.NONE, Interval.MONTH, ]
# maker created during imav-deploy.sh if AV+ Revisium license # to prevent SCANNING_SCHEDULE params being reset due to absense of CLN issued license # it is removed right after imav-deploy.sh is done REVISIUM_PREMIUM_MARKER = Path("/var/imunify360/premium_revisium_license.flag")
def allowed_schedule_interval(): valid_avp = LicenseCLN.is_valid_av_plus() revisium_license_exists = REVISIUM_PREMIUM_MARKER.exists() condition = (not ANTIVIRUS_MODE) or valid_avp or revisium_license_exists return AVAILABLE_INTERVALS if condition else AVP_INTERVALS
class ScheduleWatcher(MessageSink, MessageSource): def __init__(self): self._cron = self._read_cron() self._update_cron() self._sink = None
async def create_sink(self, loop): pass
async def create_source(self, loop, sink): self._sink = sink
@staticmethod def _read_cron(): with suppress(FileNotFoundError): return Path(Config.CRON_PATH).read_text()
@staticmethod def _write_cron(job): path = Path(Config.CRON_PATH)
with suppress(FileNotFoundError): path.unlink()
if job: path.touch(mode=0o600) path.write_text(job)
def _get_job(self): if Config.INTERVAL == Interval.NONE: return None elif Config.INTERVAL not in AVAILABLE_INTERVALS: logger.error("Unsupported interval value: %s", Config.INTERVAL) return self._cron elif Config.INTERVAL not in allowed_schedule_interval(): logger.info("Malware schedule interval is being reset to defaults") reset_malware_schedule()
schedule_schema = { Interval.DAY: (Config.HOUR, "*", "*"), Interval.WEEK: (Config.HOUR, "*", Config.DAY_OF_WEEK), Interval.MONTH: (Config.HOUR, Config.DAY_OF_MONTH, "*"), }
schedule = schedule_schema[Config.INTERVAL] job = Config.CRON_STRING.format(*schedule, cmd=Config.CMD) return job
def _update_cron(self): job = self._get_job()
if self._cron != job: logger.info("Update background scan schedule") self._write_cron(job) self._cron = job
return True
return False
async def _stop_background_scan(self): await self._sink.process_message( MessageType.MalwareScanQueueStopBackground() )
@expect(MessageType.ConfigUpdate) @utils.log_error_and_ignore() async def schedule_config_updated(self, message): if isinstance(message["conf"], SystemConfig): if self._update_cron() and not self._cron: await self._stop_background_scan()
|