Viewing file: defence360.py (3.73 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
import asyncio import logging import os import sys from pathlib import Path
import defence360agent.internals.logger from defence360agent.contracts.config import Core as Config from defence360agent.rpc_tools.exceptions import ResponseError from defence360agent.simple_rpc import SUCCESS, SocketError from defence360agent.utils import is_root_user from defence360agent.utils.cli import ( EXIT_CODES, EXITCODE_GENERAL_ERROR, print_error, print_response, print_warnings, ) from defence360agent.utils.parsers import EnvParser, create_cli_parser from defence360agent.sentry import flush_sentry
logger = logging.getLogger(__name__) RPM_TRANSACTION_LOCK = Path( "/var/lib/rpm-state/imunify360-transaction-in-progress" )
def main(rpc_handlers_init, cli_args): # get ready to start: set conservative umask os.umask(Config.FILE_UMASK)
defence360agent.internals.logger.reconfigure()
rpc_handlers_init() parser = create_cli_parser() args = parser.parse_args(args=cli_args)
if args.log_config or os.environ.get("IMUNIFY360_LOGGING_CONFIG_FILE"): defence360agent.internals.logger.update_logging_config_from_file( args.log_config or os.environ.get("IMUNIFY360_LOGGING_CONFIG_FILE") ) if args.console_log_level: defence360agent.internals.logger.setConsoleLogLevel( args.console_log_level ) if hasattr(args, "endpoint") and hasattr(args, "generate_endpoint_params"): try: cli_kwargs = args.generate_endpoint_params(args) envvar_kwargs = EnvParser.parse( os.environ, args.command, args.envvar_parameter_options, exclude=cli_kwargs, ) result, data = args.endpoint(**envvar_kwargs, **cli_kwargs)
print_warnings(data) flush_sentry()
if result == SUCCESS: print_response(args.command, data, args.json, args.verbose) else: print_error(result, data, args.json, args.verbose) sys.exit(EXIT_CODES[result]) except SocketError as e: print_response( None, {"items": "ERROR: {}".format(e)}, args.json, args.verbose ) sys.exit(EXITCODE_GENERAL_ERROR) else: print(parser.format_help())
def entrypoint(rpc_handlers_init): if not is_root_user(): logger.info("%s could be used by the root user only!", Config.NAME) print( "Imunify360 CLI is unavailable for non-root user", file=sys.stderr ) sys.exit(EXITCODE_GENERAL_ERROR) try: main(rpc_handlers_init, sys.argv[1:]) except KeyboardInterrupt: logger.warning("User pressed Ctrl+C, exiting...") sys.exit(EXITCODE_GENERAL_ERROR) except ResponseError as e: logger.error("Response error: %s", e) sys.exit(EXITCODE_GENERAL_ERROR) except ImportError as e: if RPM_TRANSACTION_LOCK.exists(): logger.error("RPM transaction is in progress. %s", e) print( "RPM transaction is in progress. Please, wait until it is " "finished and try again.", file=sys.stderr, ) sys.exit(EXITCODE_GENERAL_ERROR) else: logger.exception( "Unknown error happened. See logs for more information" ) sys.exit(EXITCODE_GENERAL_ERROR) except Exception: logger.exception( "Unknown error happened. See logs for more information" ) sys.exit(EXITCODE_GENERAL_ERROR) finally: # ensure loop is closed to prevent asyncio warning # (https://bugs.python.org/issue23548) asyncio.get_event_loop().close()
|