Viewing file: clconfpars.py (10.13 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT #
import configparser import locale import os import re import syslog from collections import namedtuple
class WebConfigParsingError(Exception): def __init__(self, message): self.message = message
class WebConfigMissing(Exception): def __init__(self, message): self.message = message
SECHEAD = 'asection'
def load(path, case_sensitive=False, ignore_bad_encoding=False): config = configparser.ConfigParser(allow_no_value=True, interpolation=None, strict=False) if case_sensitive: config.optionxform = str if ignore_bad_encoding: with open(path, 'rb') as f: raw = f.read().decode(locale.getpreferredencoding(), 'replace') else: with open(path, 'r', encoding='utf-8') as f: raw = f.read() config.read_string(f'[{SECHEAD}]\n' + raw, source=path) return dict(config.items(section=SECHEAD))
_QUOTES = "'", '"'
def _strip_escape_quotes_of_config_value(val: str) -> str: """ Strips single or double quote char only if the quote present from both sides. """ if val.startswith(_QUOTES) and val.endswith(_QUOTES): return val[1:-1] return val
def load_fast(path, delimiter="=", strip_quotes=False): data = {} with open(path, "r", encoding="utf-8", errors="surrogateescape") as f: for line in f.readlines(): parts = line.split(delimiter, 1) try: key, value = parts except ValueError: # Skip broken lines continue
value = value.strip() value = ( _strip_escape_quotes_of_config_value(value) if strip_quotes else value ) data[key.strip()] = value return data
cache = {}
def load_once(path, ignore_errors=False): """ Read ini file once (cached) and return its content as dict """ try: res = cache[path] except KeyError: try: res = cache[path] = load(path) except (IOError, configparser.Error): if not ignore_errors: raise res = cache[path] = {} return res
def change_settings(settings_dict, path, tmp_path=None): if not tmp_path: tmp_path = path + ".tmp"
used_keys = []
with (open(path, 'r', encoding='utf-8') as fin, open(tmp_path, 'w', encoding='utf-8') as fout): for line in fin: stripped_line = line.strip() if stripped_line and not stripped_line.startswith('#'): key, _ = stripped_line.split('=', 1) key = key.strip() if key in settings_dict: fout.write(f'{key}={settings_dict[key]}\n') used_keys.append(key) continue fout.write(line)
with open(tmp_path, 'a', encoding='utf-8') as fout: for key in settings_dict: if key not in used_keys: fout.write(f'{key}={settings_dict[key]}\n')
os.rename(tmp_path, path)
_NGINX_TOKENS_RE = re.compile( r""" ( # Comments (:? \# .* $ )
# Single-, double-quoted strings and bare strings without whitespaces | (:? "[^"\n]*?" ) | (:? '[^'\n]*?' ) | (:? [^"';\s\{\}]+ )
# Structural characters | ; | \{ | \} | \n ) """, re.IGNORECASE | re.MULTILINE | re.VERBOSE, )
def _ngx_tokenize(data): tokens = ( match.group(0) for match in _NGINX_TOKENS_RE.finditer(data) if match and match.group(0) ) # Explicitly ignore comments return (tok for tok in tokens if not tok.startswith('#'))
def _ngx_take_until(it, val): for tok in it: if tok in val: return
yield tok
def _ngx_take_until_block_end(it): lvl = 1 for t in it: if t == "{": lvl += 1 elif t == "}": lvl -= 1 if lvl < 1: return yield t
def _ngx_scan_block_info(block_tokens, need_fields): """Scan a block for required fields, skips nested blocks""" info = {} for tok in block_tokens: # We need to skip until the end of inner block if it occurs if tok == "{": for _ in _ngx_take_until_block_end(block_tokens): pass # Now gather the value, the last occurrence is in priority if tok in need_fields: value_tokens = _ngx_take_until(block_tokens, ";\n") info[tok] = list(value_tokens)
return info
def nginx_conf_loose_parser(data): """ Parse content of NGINX configuration in a manner tolerant to minor mistakes and extract relevant fields from all `server` directives.
Relevant fields are: - `server_name` - `root` - returned as `document_root` - `ssl` - if `listen` field contains "ssl" word
Doesn't handle interpolated values (ex. `${val}`) outside of quoted strings """ tokens = _ngx_tokenize(data) for tok in tokens: if tok != "server": continue
# Nothing seems to be allowed between "server" directive and # the opening of his block, so we just discard everything # until first block opening seen for _ in _ngx_take_until(tokens, "{"): pass
# Limit further scan by the inside of block block_tokens = _ngx_take_until_block_end(tokens)
# By using only `block_tokens` we ensure all blocks are properly delimited info = _ngx_scan_block_info(block_tokens, ("server_name", "root", "listen")) try: server_name = info["server_name"] root = info["root"] except KeyError: continue if not server_name and not root: continue
yield { "server_name": _strip_escape_quotes_of_config_value(server_name[0]), "document_root": _strip_escape_quotes_of_config_value(root[0]), "ssl": "ssl" in info.get("listen", []), }
def nginx_conf_parser(conf_file): """Parse NGINX config file, see `nginx_conf_loose_parser` for more details""" if not os.path.isfile(conf_file): raise WebConfigMissing(f'File does not exists {conf_file}')
dirty_data = read_unicode_file_with_decode_fallback(conf_file)
return list(nginx_conf_loose_parser(dirty_data))
def apache_conf_parser(conf_file): if not os.path.isfile(conf_file): raise WebConfigMissing(f'File does not exists {conf_file}')
conf_data = []
data_all = read_unicode_file_with_decode_fallback(conf_file).splitlines()
data = [i for i in data_all if re.search('^((?!#).)*$', i)]
ID = 0 enable = False
result = {} vhost = [] while len(data) > 0: out = data.pop(0) if "<VirtualHost" in out: ip_port = out.split()[1] port = '0' try: ip, port = ip_port.split(':') port = port.replace('>', '') except ValueError: ip = ip_port vhost.append(ip) vhost.append(port) enable = True continue
if "</VirtualHost>" in out: result[ID] = vhost ID+=1 enable = False vhost = [] continue
if enable: vhost.append(out) continue
for value in result.values(): # result[i][0] is an IP # result[i][1] is a port data = { 'user' : None, 'server_name' : '', 'document_root' : '', 'server_alias' : None, 'port' : int(value[1]), 'ssl' : False, } for line in value: if "ServerName" in line: data['server_name'] = line.split()[1].strip().replace('www.', '') continue if "DocumentRoot" in line: # remove all whitespaces (first strip) and also quotes (second one) data['document_root'] = line.split()[1].strip().strip('"') continue if "ServerAlias" in line: data['server_alias'] = ','.join(str(n) for n in line.split()[1:]) continue if "SuexecUserGroup" in line: data['user'] = line.split()[1].strip() if "SSLEngine" in line: data['ssl'] = line.split()[1].strip().lower() == 'on'
conf_data.append(data)
return conf_data
PamLVECfg = namedtuple('PamLVECfg', ['min_uid', 'cagefs_enabled', 'groups'])
def parse_pam_lve_config(configfile): """ Parse string like: "session required pam_lve.so 500 1 group1,group2" :param configfile: path to config file to parse :type configfile: str :return: PamLVECfg instance when pam_lve configuratiom is found, None otherwise :rtype: namedtuple :raises: IOError, ValueError """ with open(configfile, 'r', encoding='utf-8') as f: for line in f: if line.startswith('#'): continue s = line.split() if len(s) >= 3 and s[2] == 'pam_lve.so': # parse config string taking pam_lve defaults into account min_uid = int(s[3]) if len(s) >= 4 else 500 cagefs_enabled = bool(int(s[4])) if len(s) >= 5 else False groups = s[5].split(',') if len(s) >= 6 else ['wheel'] return PamLVECfg(min_uid, cagefs_enabled, groups) # pam_lve line is not found in config file return None
def read_unicode_file_with_decode_fallback(file_path: str) -> str: with open(file_path, 'rb') as f: raw_data = f.read() try: return raw_data.decode() except UnicodeDecodeError: syslog.syslog( syslog.LOG_WARNING, f'Failed to decode "{file_path}" content as utf-8 - loading with placeholders for invalid unicode sequences' ) return raw_data.decode(errors='replace')
|