Viewing file: endpoints.py (8.9 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import logging import pathlib
from imav.patchman.config import PatchmanConfig from imav.patchman.constants import PATCHMAN_PACKAGE, PATCHMAN_SERVICE_NAME from imav.patchman.custom_integration import PatchmanCustomIntegration from imav.patchman.exceptions import PatchmanError from imav.patchman.feature import ( FeatureStatus, PatchmanFeature, RealtimeFeature, ) from imav.patchman.model import Domain, Path, User, patchman_db from imav.patchman.license import License from imav.patchman.utils import get_current_ip, is_ipv4, is_private_ip from imav.subsys.realtime_av import REALTIME_SERVICE_NAME
from defence360agent.contracts.config import Core from defence360agent.contracts.messages import MessageType from defence360agent.rpc_tools.lookup import RootEndpoints, bind from defence360agent.subsys.panels import hosting_panel from defence360agent.utils import ( check_run, finally_happened, OsReleaseInfo, os_version, )
logger = logging.getLogger(__name__)
OLD_PATCHMAN_PACKAGE = "patchman-client" OLD_PATCHMAN_BIN = "/usr/local/patchman/patchmand"
def is_debian() -> bool: return OsReleaseInfo.id_like() & OsReleaseInfo.DEBIAN
class PatchmanEndpoints(RootEndpoints): KEYFILE = "/etc/patchman/license/key"
@bind("patchman", "users") async def users( self, custom_integration=False, integration_type=None, metadata_path=None, ): """Writes the users and their domains to database.
Patchman go agent executes this endpoint to fill the database. In case of custom integration it retrieves the user-defined parameters from Patchman Portal and passes it here.
:param custom_integration: true if custom integration, false by default :param integration_type: only for custom integration, file or script :param metadata_path: only for custom integration, path to file or script that represent metadata (File type 1 or Script 1 from https://docs.imunify360.com/Patchman_custom_integration.pdf) """ if custom_integration: hp = PatchmanCustomIntegration(integration_type, metadata_path) else: hp = hosting_panel.HostingPanel()
patchman_users = await hp.patchman_users()
with patchman_db( [User, Domain, Path], target_dir=Core.TMPDIR, prefix="users" ) as db: for user_data in patchman_users: user_name = user_data["username"] email = user_data["email"] or "" language = user_data["language"] parent = user_data["parent"] or "" level = user_data["level"] suspended = user_data["suspended"]
user = User.create( name=user_name, email=email, language=language, parent=parent, level=level, suspended=suspended, )
domains = user_data["domains"] for domain_data in domains: domain_name = domain_data["domain"] domain = Domain.create(name=domain_name, user=user)
paths = domain_data["paths"] for path_name in paths: Path.create(name=path_name, domain=domain)
return {"path": db.database}
@bind("patchman", "register") async def register(self, regkey=None): """ Writes a key to Patchman's key file. Patchman automatically checks for registration every 5 seconds. """ keyfile_path = pathlib.Path(self.KEYFILE) keyfile_path.parent.mkdir(parents=True, exist_ok=True) with keyfile_path.open("w") as f: f.write(regkey)
@property def patchman_feature(self): return PatchmanFeature()
@property def realtime_feature(self): return RealtimeFeature()
async def is_installed(self): status = (await self.patchman_feature.status())["items"]["status"] return status in [FeatureStatus.INSTALLING, FeatureStatus.INSTALLED]
async def _migrate_patchman(self) -> None: if await self.is_installed(): logger.info("%s package already installed", PATCHMAN_PACKAGE) return package_manager = "apt-get" if is_debian() else "yum" cmd = [package_manager, "-y", "install", PATCHMAN_PACKAGE] if not is_debian(): # use swap to replace old patchman package(s) cmd = [ package_manager, "-y", "swap", OLD_PATCHMAN_PACKAGE, PATCHMAN_PACKAGE, ] if not os_version().startswith("7"): # CentOS/CL > 7 # it was added only here # https://github.com/rpm-software-management/dnf/pull/61 cmd += ["--allowerasing"] await check_run(cmd) await self._sink.process_message( MessageType.EnsureServiceState(service=PATCHMAN_SERVICE_NAME) )
@bind("patchman", "install") async def install(self, ip_address=None): """Install imunify-patchman package""" if ip_address and not is_ipv4(ip_address): raise ValueError("Only ipv4 address is supported") if not ip_address: current_ip = get_current_ip() if is_private_ip(current_ip): raise ValueError( "Can't automatically detect the outbound IP " "on this machine, since you're using a NAT. " "Please manually provide the outbound IPv4 address " "that we should use." ) if ip_address: # custom IP PatchmanConfig.set("network", "ip", ip_address) await self.patchman_feature.install() if await finally_happened( self.patchman_feature.has_status, FeatureStatus.INSTALLED, max_tries=10, delay=6, ): await self._sink.process_message( MessageType.EnsureServiceState(service=PATCHMAN_SERVICE_NAME) )
@bind("patchman", "migrate") async def migrate(self): """Migrate old patchman agent to imunify-patchman""" if not pathlib.Path(OLD_PATCHMAN_BIN).exists(): logger.info("There is nothing to migrate") return if not License.has_clean_mode(): logger.warning("Unsupported license type to migrate") return await self._migrate_patchman()
@bind("patchman", "uninstall") async def uninstall(self): """Uninstall imunify-patchman package""" await self.patchman_feature.remove()
@bind("patchman", "status") async def get_status(self): """Get patchman status""" return { "status": (await self.patchman_feature.status())["items"][ "status" ], "realtime": (await self.realtime_feature.status())["items"][ "status" ], }
@bind("patchman", "install", "realtime") async def install_realtime(self): """Install imunify-realtime-av package""" status = (await self.patchman_feature.status())["items"]["status"] if status != FeatureStatus.INSTALLED: # don't install realtime package without patchman package raise PatchmanError( "Can't install realtime package, install patchman first" ) if not License.has_realtime_enabled(): raise PatchmanError( "Realtime is not enabled for the current Patchman license" ) await self.realtime_feature.install() if await finally_happened( self.realtime_feature.has_status, FeatureStatus.INSTALLED, max_tries=10, delay=6, ): await self._sink.process_message( MessageType.EnsureServiceState(service=REALTIME_SERVICE_NAME) )
@bind("patchman", "uninstall", "realtime") async def uninstall_realtime(self): """Uninstall imunify-realtime-av package""" await self.realtime_feature.remove()
|