D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
alt
/
python37
/
lib
/
python3.7
/
site-packages
/
clwpos
/
Filename :
utils.py
back
Copy
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT # wpos_lib.py - helper functions for clwpos utility from __future__ import absolute_import import contextlib import dataclasses import datetime import logging import os import re import shutil import struct import sys import time import json import pwd import fcntl import uuid import subprocess from dataclasses import dataclass, asdict from enum import Enum from gettext import gettext as _ from urllib.parse import ( urlencode, urlparse, parse_qsl, urlunparse ) import psutil from contextlib import contextmanager from functools import wraps, lru_cache from pathlib import Path from socket import socket, AF_UNIX, SOCK_STREAM from typing import List, Tuple, Optional, Set, ContextManager import platform from secureio import write_file_via_tempfile from clcommon.cpapi.cpapiexceptions import NoDomain from clcommon.clpwd import ClPwd, drop_privileges from clcommon.lib.cledition import ( is_cl_solo_edition, is_cl_shared_pro_edition, is_cl_admin_edition, CLEditionDetectionError ) from cllicenselib import check_license from clcommon.cpapi import docroot, getCPName, get_customer_login, get_server_ip from clcommon.utils import exec_utility, run_command, demote from clwpos import gettext, wp_config from clwpos.cl_wpos_exceptions import ( WposError, WPOSLicenseMissing, WpCliUnsupportedException, WpNotExists, WpConfigWriteFailed, PhpBrokenException ) from clcommon.ui_config import UIConfig from clcommon.clcagefs import in_cagefs, _is_cagefs_enabled from clcommon.const import Feature from clcommon.cpapi import is_panel_feature_supported from .logsetup import setup_logging from clwpos.constants import ( USER_WPOS_DIR, WPOS_DAEMON_SOCKET_FILE, CLCONFIG_UTILITY, RedisRequiredConstants, CAGEFS_ENTER_USER_BIN, CAGEFS_ENTER_UTIL, CLWPOS_OPT_DIR, ALT_PHP_PREFIX, EA_PHP_PREFIX, PLESK_PHP_PREFIX, USER_CLWPOS_CONFIG, PUBLIC_OPTIONS, SUPPORTED_PANELS, SUITES_MARKERS ) from .socket_utils import pack_data_for_socket, read_unpack_response_from_socket_client from .user.website_check.errors import RollbackException logger = None def catch_error(func): """ Decorator for catching errors """ def func_wrapper(self, *args, **kwargs): global logger if logger is None: logger = setup_logging(__name__) try: return func(self, *args, **kwargs) except RollbackException as e: error_and_exit(self._is_json, { 'context': e.context, 'result': e.message, 'issues': e.errors }) except WposError as e: if isinstance(e, WPOSLicenseMissing): logger.warning(e) else: logger.exception(e) response = {'context': e.context, 'result': e.message, 'warning': e.warning} if e.details: response['details'] = e.details error_and_exit(self._is_json, response) except Exception as e: logger.exception(e) error_and_exit(self._is_json, {'context': {}, 'result': str(e)}) return func_wrapper class ExtendedJSONEncoder(json.JSONEncoder): """ Makes it easier to use ENUMs and DATACLASSes in program, automatically converting them when json is printed. """ def default(self, obj): if isinstance(obj, Enum): return obj.value if isinstance(obj, (datetime.date, datetime.datetime)): return obj.isoformat() if dataclasses.is_dataclass(obj): return dataclasses.asdict(obj) return json.JSONEncoder.default(self, obj) def _print_dictionary(data_dict, is_json: bool = False, is_pretty: bool = False): """ Print specified dictionary :param data_dict: data dictionary to print :param is_json: True - print in JSON, False - in text :param is_pretty: True - pretty json print, False - none (default) :return: None """ if is_json: # Print as JSON if is_pretty: print(json.dumps(data_dict, indent=4, sort_keys=True, cls=ExtendedJSONEncoder)) else: print(json.dumps(data_dict, sort_keys=True, cls=ExtendedJSONEncoder)) else: # Print as text print(data_dict) def error_and_exit(is_json: bool, message: dict, error_code: int = 1): """ Print error and exit :param is_json: :param message: Dictionary with keys "result" as string and optional "context" as dict :param error_code: Utility return code on error """ if 'warning' in message.keys() and not message.get('warning'): message.pop('warning') if is_json: message.update({"timestamp": time.time()}) _print_dictionary(message, is_json, is_pretty=True) else: try: print(str(message["result"]) % message.get("context", {})) except KeyError as e: print("Error: %s [%s]" % (str(e), message)) sys.exit(error_code) def print_data(is_json: bool, data: dict, result="success"): """ Output data wrapper :param is_json: :param data: data for output to stdout :param result: """ if isinstance(data, dict): data.update({"result": result, "timestamp": time.time()}) _print_dictionary(data, is_json, is_pretty=True) def is_run_under_user() -> bool: """ Detects is we running under root :return: True - user, False - root """ return os.geteuid() != 0 def is_shared_pro_safely(safely: bool): """ Detecting of shared_pro edition depends on jwt token There are some cases when we do not fail if there are cases with decoding (e.g summary collection) """ try: return is_cl_shared_pro_edition() except CLEditionDetectionError: if safely: return False else: raise def is_wpos_supported(safely=False) -> bool: """ Сheck if system environment is supported by WPOS :return: True - CPanel/Plesk on Solo/ CL Shared Pro/ CL Admin False - else """ is_panel_supported = getCPName() in SUPPORTED_PANELS return (is_cl_solo_edition(skip_jwt_check=True) or is_shared_pro_safely(safely) or is_cl_admin_edition(skip_jwt_check=True)) \ and is_panel_supported def create_clwpos_dir_if_not_exists(username): """ Creates {homedir}/.clwpos directory if it's not exists """ clwpos_dir = os.path.join(home_dir(username), USER_WPOS_DIR) if not os.path.isdir(clwpos_dir): os.mkdir(clwpos_dir, mode=0o700) def get_relative_docroot(domain, homedir): dr = docroot(domain)[0] if not dr.startswith(homedir): raise WposError(f"docroot {dr} for domain {domain} should start with {homedir}") return dr[len(homedir):].lstrip("/") def home_dir(username: str = None) -> str: pw = get_pw(username=username) return pw.pw_dir def user_name() -> str: return get_pw().pw_name def user_uid(*, username: str = None) -> int: return get_pw(username=username).pw_uid def get_pw(*, username: str = None): if username: return pwd.getpwnam(username) else: return pwd.getpwuid(os.geteuid()) class WposUser: """ Helper class to construct paths to user's WPOS dir and files inside it. """ def __init__(self, username: str, homedir: str = None) -> None: self.name = username self.home_dir = home_dir(username) if homedir is None else homedir self.wpos_dir = os.path.join(self.home_dir, USER_WPOS_DIR) self.wpos_config = os.path.join(self.wpos_dir, USER_CLWPOS_CONFIG) self.redis_conf = os.path.join(self.wpos_dir, 'redis.conf') self.redis_socket = os.path.join(self.wpos_dir, 'redis.sock') self.php_info = os.path.join(self.wpos_dir, '.php_info-{file_id}') def __eq__(self, other): return self.name == other.name def __hash__(self): return hash(self.name) def daemon_communicate(cmd_dict: dict) -> Optional[dict]: """ Send command to CLWPOS daemon via socket :param cmd_dict: Command dictionary :return: Daemon response as dictionary, None - daemon data/socket error """ bytes_to_send = pack_data_for_socket(cmd_dict) with socket(AF_UNIX, SOCK_STREAM) as s: try: s.connect(WPOS_DAEMON_SOCKET_FILE) s.sendall(bytes_to_send) response_dict = read_unpack_response_from_socket_client(s) if response_dict is None or not isinstance(response_dict, dict): raise WposError( message=gettext('Unexpected response from daemon. ' 'Report this issue to your system administrator.'), details=str(response_dict), context={}) if response_dict['result'] != 'success': raise WposError(message=gettext('Daemon was unable to execute the requested command.'), details=response_dict['result'], context=response_dict.get('context')) return response_dict except FileNotFoundError: raise WposError(gettext('CloudLinux AccelerateWP daemon socket (%(filename)s) not found. ' 'Contact your system administrator.'), {'filename': WPOS_DAEMON_SOCKET_FILE}) except (ConnectionError, OSError, IOError, AttributeError, struct.error, KeyError) as e: raise WposError(gettext('Unexpected daemon communication error.'), details=str(e)) def redis_cache_config_section() -> List[str]: """ Construct list of lines (configuration settings) that should be in Wordpress config file to enable redis. Please note that deleting of the plugin would flush all keys related to the plugin (site) from redis. REDIS_PREFIX and SELECTIVE_FLUSH in wp-config.php would guarantee that plugin will not flush keys unrelated to this plugin (site) """ socket_path = os.path.join(home_dir(), USER_WPOS_DIR, 'redis.sock') prefix_uuid = uuid.uuid4() redis_prefix = RedisRequiredConstants.WP_REDIS_PREFIX redis_schema = RedisRequiredConstants.WP_REDIS_SCHEME redis_client = RedisRequiredConstants.WP_REDIS_CLIENT redis_flush = RedisRequiredConstants.WP_REDIS_SELECTIVE_FLUSH redis_graceful = RedisRequiredConstants.WP_REDIS_GRACEFUL return ["// Start of CloudLinux generated section\n", f"define('{redis_schema.name}', '{redis_schema.val}');\n", f"define('{RedisRequiredConstants.WP_REDIS_PATH.name}', '{socket_path}');\n", f"define('{redis_client.name}', '{redis_client.val}');\n", f"define('{redis_graceful.name}', '{redis_graceful.val}');\n", f"define('{redis_prefix.name}', '{redis_prefix.val}{prefix_uuid}');\n", f"define('{redis_flush.name}', {redis_flush.val});\n", "// End of CloudLinux generated section\n"] def check_wp_config_existance(wp_config_path: str) -> None: """ Check that wp-config.php exists inside Wordpress directory. :param wp_config_path: absolute path to Wordpress config file :raises: WposError """ wp_path = os.path.dirname(wp_config_path) if not os.path.exists(wp_path): raise WpNotExists(wp_path) if not os.path.isfile(wp_config_path): raise WposError(message=gettext("Wordpress config file %(file)s is missing"), context={"file": wp_config_path}) def clear_redis_cache_config(abs_wp_path: str) -> None: """ Clear cloudlinux section with redis object cach config from docroot's wp-config.php :param abs_wp_path: Absolute path to WordPress :raises: WposError """ wp_config_path = str(wp_config.path(abs_wp_path)) check_wp_config_existance(wp_config_path) lines_to_filter = redis_cache_config_section() def __config_filter(line: str) -> bool: """ Filter function that should delete CL config options from the `redis_cache_config_section()` """ return line not in lines_to_filter and 'WP_REDIS_PREFIX' not in line try: wp_config_lines = wp_config.read(abs_wp_path) cleared_wp_config = list(filter(__config_filter, wp_config_lines)) write_file_via_tempfile("".join(cleared_wp_config), wp_config_path, 0o600) except (OSError, IOError) as e: raise WpConfigWriteFailed(wp_config_path, e) def create_redis_cache_config(abs_wp_path: str) -> None: """ Create config for redis-cache. We use manual copy cause we want to preserve file metadata and permissions and also we could add some custom config editing in the future. :param abs_wp_path: absolute path to WordPress :raises: WposError """ wp_config_path = str(wp_config.path(abs_wp_path)) check_wp_config_existance(wp_config_path) try: backup_wp_config = f"{wp_config_path}.backup" if not os.path.isfile(backup_wp_config): shutil.copy(wp_config_path, backup_wp_config) absent_constants = {constant.name: constant.val for constant in RedisRequiredConstants} wp_config_lines = wp_config.read(abs_wp_path) cleaned_lines = [] for line in wp_config_lines: absent_constants = {k: v for k, v in absent_constants.items() if f"define('{k}'" not in line} # nothing to do, all constants are already in conf if not absent_constants: return # cleanup existing consts, to rewrite all if not any(f"define('{redis_constant.name}'" in line for redis_constant in RedisRequiredConstants): cleaned_lines.append(line) updated_config = [ cleaned_lines[0], *redis_cache_config_section(), *cleaned_lines[1:], ] write_file_via_tempfile("".join(updated_config), wp_config_path, 0o600) except (OSError, IOError) as e: raise WpConfigWriteFailed(wp_config_path, e) def check_license_decorator(func): """Decorator to check for license validity """ @wraps(func) def wrapper(*args, **kwargs): """License check wrapper""" if not check_license(): raise WPOSLicenseMissing() return func(*args, **kwargs) return wrapper def check_domain(domain: str) -> Tuple[str, str]: """ Validates domain, determines it's owner and docroot or exit with error :param domain: Domain name to check :return: Tuple (username, docroot) """ try: document_root, owner = docroot(domain) return owner, document_root except NoDomain: # No such domain raise WposError(message=gettext("No such domain: %(domain)s."), context={"domain": domain}) def lock_file(path: str, attempts: Optional[int]): """ Try to take lock on file with specified number of attempts. """ lock_type = fcntl.LOCK_EX if attempts is not None: # avoid blocking on lock lock_type |= fcntl.LOCK_NB try: lock_fd = open(path, "a+") for _ in range(attempts or 1): # if attempts is None do 1 attempt try: fcntl.flock(lock_fd.fileno(), lock_type) break except OSError: time.sleep(0.3) else: raise LockFailedException(gettext("Another utility instance is already running. " "Try again later or contact system administrator " "in case if issue persists.")) except IOError: raise LockFailedException(gettext("IO error happened while getting lock.")) return lock_fd class LockFailedException(Exception): """ Exception when failed to take lock """ pass @contextmanager def acquire_lock(resource_path: str, attempts: Optional[int] = 10): """ Lock a file, than do something. Make specified number of attempts to acquire the lock, if attempts is None, wait until the lock is released. Usage: with acquire_lock(path, attempts=1): ... do something with files ... """ lock_fd = lock_file(resource_path + '.lock', attempts) yield release_lock(lock_fd) def release_lock(descriptor): """ Releases lock file """ try: # lock released explicitly fcntl.flock(descriptor.fileno(), fcntl.LOCK_UN) except IOError: # we ignore this cause process will be closed soon anyway pass descriptor.close() class PHP(str): """Class helper which hides differences of PHP behind abstract methods.""" def __new__(cls, *args): if cls != PHP: return str.__new__(cls, *args) for tp in _AltPHP, _EaPHP, _PleskPHP, _PleskLSPHP, _PleskVendorPHP: if args[0].startswith(tp.prefix()): return tp(*args) raise Exception(f"Unknown PHP: {args[0]}") @staticmethod def prefix() -> str: """Return prefix of PHP.""" raise NotImplementedError def modules_dir(self): return self.dir() def postfix(self) -> str: """*-phpXY-{postfix} if any""" if len(self.split('-')) > 2: return self.split('-')[-1] return '' def namely(self) -> str: """Named version without postfix""" if self.postfix(): return self[:-len(self.postfix())].strip('-') return self def handler(self): return None def _dir_relative_path(self) -> str: """Return relative path to dir of PHP.""" raise NotImplementedError def dir(self) -> Path: """Return path to dir of PHP.""" return Path(f"""/opt/{self._dir_relative_path()}""") def _bin_relative_path(self) -> str: """Return relative path to bin of PHP.""" raise NotImplementedError def bin(self) -> Path: """Return path to bin of PHP.""" return self.dir().joinpath(self._bin_relative_path()) def _ini_relative_path(self) -> str: """Return relative path to ini of PHP.""" raise NotImplementedError def ini(self) -> Path: """Return path to ini of PHP.""" return self.dir().joinpath(self._ini_relative_path()) def version(self): """""" return self.replace(self.prefix(), "").replace("php", "").split("-")[0] @property def digits(self): return int(self.version()) class _AltPHP(PHP): """Implementation for alt-php""" @staticmethod def prefix(): return "alt-" def _dir_relative_path(self): return f"alt/{self[len(self.prefix()):].split('-')[0]}" def _bin_relative_path(self): return "usr/bin/php" def _ini_relative_path(self): return "link/conf/default.ini" class _EaPHP(PHP): """Implementation for ea-php""" @staticmethod def prefix(): return "ea-" def _dir_relative_path(self): return f"cpanel/{self}" def _bin_relative_path(self): return "root/usr/bin/php" def _ini_relative_path(self): return "root/etc/php.ini" class _PleskPHP(PHP): """Implementation for plesk-php""" @staticmethod def prefix(): return "plesk-" def _dir_relative_path(self): return f"plesk/php/{float(self.digits)/10}" def _bin_relative_path(self): return "bin/php" def _ini_relative_path(self): return "etc/php.ini" class _PleskVendorPHP(PHP): """Implementation for plesk vendor php""" @staticmethod def prefix(): return "vendor-" def bin(self) -> Path: """Return path to bin of PHP.""" return Path("/usr/bin/php") def dir(self) -> Path: """Return path to dir of PHP.""" return Path("/etc") def _dir_relative_path(self): return "" def _bin_relative_path(self): return "bin/php" def _ini_relative_path(self): return "php.ini" def modules_dir(self): return Path("/usr/lib64/php") class _PleskLSPHP(_AltPHP): """Implementation for LSAPI on Plesk""" @staticmethod def prefix(): return "x-httpd-lsphp-" def namely(self): return f'alt-php{self.digits}' def _dir_relative_path(self): return f"alt/php{self[len(self.prefix()):]}" def is_conflict_modules_installed(php_version: PHP, module): """ Checks <module> enabled """ path = str(php_version.bin()) result = run_in_cagefs_if_needed([path, '-m'], env={}) if result.stderr and not result.stdout: raise PhpBrokenException(path, result.stderr) out = result.stdout if module in out.split('\n'): return True return False @lru_cache(maxsize=None) def wp_cli_compatibility_check(php_path: str): """ Ensures wp-cli is compatible, e.g some php modules may prevent stable work """ dangerous_module = 'snuffleupagus' if 'ea-php74' in php_path and is_conflict_modules_installed(PHP("ea-php74"), dangerous_module): raise WpCliUnsupportedException(message=gettext('Seems like ea-php74 %(module)s module is ' 'enabled. It may cause instabilities while managing ' 'Object Caching. Disable it and try again'), context={'module': dangerous_module}) def supported_php_handlers() -> List[str]: """ Return list of supported handlers according to edition """ supported = ['php-fpm', 'lsapi'] return supported def set_wpos_icon_visibility(hide: bool) -> Tuple[int, str]: """ Call cloudlinux-config utility to hide/show WPOS icon in user's control panel interface. """ params = [ 'set', '--data', json.dumps({'options': {'uiSettings': {'hideWPOSApp': hide}}}), '--json', ] returncode, stdout = exec_utility(CLCONFIG_UTILITY, params) return returncode, stdout def is_ui_icon_hidden() -> bool: """ Check the current state of WPOS icon in user's control panel interface """ return UIConfig().get_param('hideWPOSApp', 'uiSettings') @dataclass class ServerWideOptions: """ Options holder representing server-wide option available for reading for any user on server. Only can be changed by root. """ show_icon: bool allowed_suites: List visible_suites: List supported_suites: List upgrade_url: Optional[str] = None upgrade_url_cdn: Optional[str] = None def get_upgrade_url_for_user(self, username, domain, feature='object_cache'): """ Append some needed arguments to upgrade url to make it specific for user. Please pay attention that we add *customer_name* instead of system user, that may be different on plesk. """ from clwpos.feature_suites import PremiumSuite, CDNSuitePro # we should keep all the features here because we have smart-advice # which displays upgrade links per-advice and those advices # may be for different features feature_to_suite = { **{feature: PremiumSuite.name for feature in PremiumSuite.primary_features}, **{feature: CDNSuitePro.name for feature in CDNSuitePro.primary_features}, } if feature not in feature_to_suite: return None if feature in PremiumSuite.primary_features and self.upgrade_url is None: return None if feature in CDNSuitePro.primary_features and self.upgrade_url_cdn is None: return None url_parts = list(urlparse(self.upgrade_url)) query = dict(parse_qsl(url_parts[4])) query.update({ 'username': get_customer_login(username), 'domain': domain, 'server_ip': get_server_ip(), 'm': 'cloudlinux_advantage', 'action': 'provisioning', 'suite': feature_to_suite[feature] }) url_parts[4] = urlencode(query) return urlunparse(url_parts) @property def allowed_features(self): # TODO: fix this circle import one day from .feature_suites import ALL_SUITES _allowed_features = set() for suite in self.allowed_suites: _allowed_features.update(ALL_SUITES[suite].feature_set) return _allowed_features @property def visible_features(self): from .feature_suites import ALL_SUITES _visible_features = set() for suite in self.visible_suites: _visible_features.update(ALL_SUITES[suite].feature_set) return _visible_features def get_default_server_wide_options() -> ServerWideOptions: """ Return default content of /opt/clwpos/public_config.json. This file is accessible by all users on server. """ # circular import :( from .feature_suites import AWPSuite, PremiumSuite, CDNSuite, CDNSuitePro, SUPPORTED_SUITES is_icon_hidden = UIConfig().get_param('hideWPOSApp', 'uiSettings') visible_suites = [] allowed_suites = [] # --allowed-for-all previously used marker files # to mark suites as enabled # we must keep that behaviour for suite in (PremiumSuite.name, AWPSuite.name, CDNSuite.name, CDNSuitePro.name): if not os.path.isfile(SUITES_MARKERS[suite]): continue visible_suites.append(suite) allowed_suites.append(suite) return ServerWideOptions( show_icon=not is_icon_hidden, allowed_suites=allowed_suites, visible_suites=visible_suites, supported_suites=list(SUPPORTED_SUITES) ) def get_server_wide_options() -> ServerWideOptions: """ Gets server wide options which apply as defaults for all users """ from .feature_suites import ALL_SUITES default_options = get_default_server_wide_options() if not os.path.isfile(PUBLIC_OPTIONS): return default_options with open(PUBLIC_OPTIONS, 'r') as f: content = f.read() try: configuration: dict = json.loads(content) # these two options have different way of merging: we # must sum them and keep only unique elements for option_to_merge in ['visible_suites', 'allowed_suites', 'supported_suites']: if option_to_merge not in configuration: continue suites_from_config = configuration.pop(option_to_merge) suites_from_defaults = getattr(default_options, option_to_merge) # to filter out unknown suites from resulting structure # actually for downgrade cases, see AWP-272 for details merged_values = list(sorted(set(suites_from_defaults + list(set( suites_from_config).intersection(set(ALL_SUITES)))))) setattr(default_options, option_to_merge, merged_values) # the rest of the options just override their defaults default_options.__dict__.update(**configuration) return default_options except json.decoder.JSONDecodeError as err: raise WposError( message=_("File is corrupted: Please, delete file %(config_file)s" " or fix the line provided in details"), details=str(err), context={'config_file': PUBLIC_OPTIONS}) @contextmanager def write_public_options() -> ContextManager[ServerWideOptions]: """Set icon visibility in clwpos public options file""" public_config_data = get_server_wide_options() yield public_config_data with acquire_lock(PUBLIC_OPTIONS),\ open(PUBLIC_OPTIONS, "w") as f: json.dump(asdict(public_config_data), f) def run_in_cagefs_if_needed(command, **kwargs): """ Wrapper for subprocess to enter cagefs do not enter cagefs if: - CloudLinux Solo - if process already started as user in cagefs """ if in_cagefs() or not is_panel_feature_supported(Feature.CAGEFS): return subprocess.run(command, text=True, capture_output=True, preexec_fn=demote(os.geteuid(), os.getegid()), **kwargs) else: if os.geteuid() == 0: raise WposError(message=gettext(f'Internal error: command {command} must not be run as root. ' 'Please contact support if you have questions: ' 'https://cloudlinux.zendesk.com')) if isinstance(command, str): with_cagefs_enter = CAGEFS_ENTER_UTIL + ' ' + command else: with_cagefs_enter = [CAGEFS_ENTER_UTIL] + command return subprocess.run(with_cagefs_enter, preexec_fn=demote(os.geteuid(), os.getegid()), text=True, capture_output=True, **kwargs) def uid_by_name(name): """ Returns uid for user """ try: return ClPwd().get_uid(name) except ClPwd.NoSuchUserException: return None def get_alt_php_versions() -> List[PHP]: """ Get list of installed alt-php versions. """ alt_dir = '/opt/alt' pattern = re.compile(r'^php\d{2}$') alt_php_versions = [ PHP(f'alt-{dirname}') for dirname in os.listdir(alt_dir) if pattern.match(dirname) ] return alt_php_versions class PhpIniConfig: """ Helper class to update extensions in php .ini files. """ def __init__(self, php_version: PHP): self.php_version = php_version self.disabled_pattern = re.compile(r'^;\s*extension\s*=\s*(?P<module_name>\w+)\.so') self.enabled_pattern = re.compile(r'^\s*extension\s*=\s*(?P<module_name>\w+)\.so') def _enabled_modules(self, path: str) -> Set[str]: """ Return enabled modules. :param path: full path to .ini file """ with open(path, 'r') as f: return {self.enabled_pattern.match(line).group('module_name') for line in f if self.enabled_pattern.match(line) is not None} def enable_modules(self, path: str, modules: List[str]) -> bool: """ Enable specified modules in .ini php file. :param path: path to .ini file related to php directory :param modules: list of modules that should be enabled """ full_path = os.path.join(self.php_version.dir(), path) if not os.path.exists(full_path): return False modules_to_enable = set(modules) - self._enabled_modules(full_path) if modules_to_enable: with open(full_path) as f: new_ini_lines = [self._enable_module(line, modules_to_enable) for line in f.readlines()] for module in sorted(modules_to_enable): new_ini_lines.append('extension={}.so\n'.format(module)) write_file_via_tempfile(''.join(new_ini_lines), full_path, 0o644) return True def disable_modules(self, path: str, modules: List[str]) -> bool: """ Disable specified modules in .ini php file. :param path: path to .ini file related to php directory :param modules: list of modules that should be disabled """ full_path = os.path.join(self.php_version.dir(), path) if not os.path.exists(full_path): return False modules_to_disable = set(modules) & self._enabled_modules(full_path) if modules_to_disable: with open(full_path) as f: new_ini_lines = [self._disable_module(line, modules_to_disable) for line in f.readlines()] write_file_via_tempfile(''.join(new_ini_lines), full_path, 0o644) return True def _enable_module(self, line: str, modules_to_enable: Set[str]) -> str: """ Search for disabled module in line, uncomment line to enable module. """ match = self.disabled_pattern.match(line) if match is not None: module_name = match.group('module_name') if module_name in modules_to_enable: modules_to_enable.remove(module_name) return line.lstrip(';').lstrip() return line def _disable_module(self, line: str, modules_to_disable: Set[str]) -> str: """ Search for enabled module in line, comment line to disable module. """ match = self.enabled_pattern.match(line) if match is not None: module_name = match.group('module_name') if module_name in modules_to_disable: return f';{line}' return line def _run_clwpos_as_user_in_cagefs(user=None): """ All user-related actions must run inside of cagefs for security reasons. If solo just return because cagefs is only for shared and shared pro If root executed, we enter into user cagefs if user is pointed If not in cagefs and cagefs is enabeled for user enter into cagefs """ if not is_panel_feature_supported(Feature.CAGEFS): return if not is_run_under_user(): if user is None: raise WposError(message=gettext( "Internal Error: root enters into CageFS without specifying username" "Please contact support if you have questions: " "https://cloudlinux.zendesk.com" ) ) cmd = [CAGEFS_ENTER_USER_BIN, user] + sys.argv[:1] + sys.argv[3:] elif not in_cagefs() and _is_cagefs_enabled(user=user_name()): cmd = [CAGEFS_ENTER_UTIL] + sys.argv else: return p = subprocess.Popen(cmd, stdout=sys.stdout, stdin=sys.stdin, env={}) p.communicate() sys.exit(p.returncode) class RedisConfigurePidFile: """ Helper class that provides methods to work with pid files of php redis configuration processes. """ def __init__(self, php_prefix: str) -> None: self._pid_file_name = f'{php_prefix}-cloudlinux.pid' self.path = Path(CLWPOS_OPT_DIR, self._pid_file_name) def create(self) -> None: with self.path.open('w') as f: f.write(str(os.getpid())) def remove(self) -> None: if self.path.is_file(): self.path.unlink() def exists(self) -> bool: return self.path.is_file() @property def pid(self) -> int: if not self.exists(): return -1 with self.path.open() as f: try: return int(f.read().strip()) except ValueError: pass return -1 @contextmanager def create_pid_file(php_prefix: str): """ Context manager for creating pid file of current process. Removes pid file on exit. """ pid_file = RedisConfigurePidFile(php_prefix) try: pid_file.create() yield finally: pid_file.remove() def is_php_redis_configuration_running(php_prefix: str) -> bool: """ Find out if PHP redis configuration process is running. Based on looking for presence of pid files. For root also checks process existence. """ pid_file = RedisConfigurePidFile(php_prefix) if os.geteuid() != 0: return pid_file.exists() try: process = psutil.Process(pid_file.pid) return 'enable_redis' in process.name() except (ValueError, psutil.NoSuchProcess): return False def is_alt_php_redis_configuration_running() -> bool: """ Find out if alt-PHP redis configuration process is running. """ return is_php_redis_configuration_running(ALT_PHP_PREFIX) def is_ea_php_redis_configuration_running() -> bool: """ Find out if ea-PHP redis configuration process is running. """ return is_php_redis_configuration_running(EA_PHP_PREFIX) def is_plesk_php_redis_configuration_running() -> bool: """ Find out if ea-PHP redis configuration process is running. """ return is_php_redis_configuration_running(PLESK_PHP_PREFIX) def is_redis_configuration_running() -> bool: """ Find out if redis configuration process is running for any PHP (ea-php or alt-php). """ return is_alt_php_redis_configuration_running() or \ is_ea_php_redis_configuration_running() or \ is_plesk_php_redis_configuration_running() def update_redis_conf(new_user: WposUser, old_user: WposUser) -> None: """ Replace user's wpos directory path in redis.conf. """ with open(new_user.redis_conf) as f: redis_conf_lines = f.readlines() updated_lines = [ line.replace(old_user.wpos_dir, new_user.wpos_dir) for line in redis_conf_lines ] write_file_via_tempfile(''.join(updated_lines), new_user.redis_conf, 0o600) def update_wp_config(abs_wp_path: str, new_user: WposUser, old_user: WposUser) -> None: """ Replace user's redis socket path in wp-config.php. """ try: wp_config_lines = wp_config.read(abs_wp_path) except OSError as e: print('Error occurred during opening wp-config.php ' f'located in path "{abs_wp_path}": {e}', file=sys.stderr) return updated_lines = [ line.replace(old_user.redis_socket, new_user.redis_socket) if old_user.redis_socket in line else line for line in wp_config_lines ] write_file_via_tempfile(''.join(updated_lines), wp_config.path(abs_wp_path), 0o600) def get_parent_pid() -> int: """ Get parent process PID. """ proc = psutil.Process(os.getpid()) return proc.ppid() def _is_monitoring_daemon_exists() -> bool: """ Detect CL WPOS daemon presence in system :return: True - daemon works / False - No """ # /sbin/service clwpos_monitoring status # retcode != 0 - clwpos_monitoring not running/not installed # == 0 - clwpos_monitoring running returncode, _, _ = run_command(['/sbin/service', 'clwpos_monitoring', 'status'], return_full_output=True) if returncode != 0: return False return True def _update_clwpos_daemon_config_systemd(systemd_unit_file) -> Tuple[int, str, str]: """ Update systemd unit file and reload systemd """ shutil.copy('/usr/share/cloudlinux/clwpos_monitoring.service', systemd_unit_file) retcode, stdout, stderr = run_command(['/usr/bin/systemctl', 'enable', 'clwpos_monitoring.service'], return_full_output=True) if not retcode: retcode, stdout, stderr = run_command(['/usr/bin/systemctl', 'daemon-reload'], return_full_output=True) return retcode, stdout, stderr def _install_daemon_internal(is_solo: bool, systemd_unit_file: str, is_module_allowed_on_server: bool) -> Tuple[int, str, str]: """ Install WPOS daemon to system and start it """ retcode, stdout, stderr = 0, None, None if 'el6' in platform.release(): retcode, stdout, stderr = run_command(['/sbin/chkconfig', '--add', 'clwpos_monitoring'], return_full_output=True) else: if not is_solo and is_module_allowed_on_server: # CL Shared Pro and module enabled # Update unit file and reload systemd - setup daemon retcode, stdout, stderr = _update_clwpos_daemon_config_systemd(systemd_unit_file) if not retcode: retcode, stdout, stderr = run_command(['/sbin/service', 'clwpos_monitoring', 'start'], return_full_output=True) return retcode, stdout, stderr def install_monitoring_daemon(is_module_allowed_on_server: bool) -> Tuple[int, str, str]: """ Install WPOS daemon to server if need: - if daemon already present - do nothing; - on CL Solo install daemon always; - on CL Shared Pro install daemon if module allowed On solo and if /etc/systemd/system/clwpos_monitoring.service present it will be updated always We do not need restart installed daemon here, it's done in rpm_posttrans.sh :param is_module_allowed_on_server: True/False """ systemd_unit_file = '/etc/systemd/system/clwpos_monitoring.service' is_solo = is_cl_solo_edition() # if from rpm_posttrans if is_solo or os.path.exists(systemd_unit_file): # Update unit file and reload systemd _update_clwpos_daemon_config_systemd(systemd_unit_file) if _is_monitoring_daemon_exists(): return 0, "", "" return _install_daemon_internal(is_solo, systemd_unit_file, is_module_allowed_on_server) def get_status_from_daemon(service): command_get_service_status_dict = {"command": f"get-{service}-status"} try: daemon_result = daemon_communicate(command_get_service_status_dict) except WposError: return False return daemon_result.get('status') def redis_is_running() -> bool: return get_status_from_daemon('redis') def litespeed_is_running() -> bool: return get_status_from_daemon('litespeed') def _get_data_from_info_json(attribute: str) -> List: """ Return attribute's value from info.json file. """ from clwpos.feature_suites import get_admin_config_directory admin_config_dir = get_admin_config_directory(user_uid()) info_json = os.path.join(admin_config_dir, "info.json") try: with open(info_json) as f: return json.load(f)[attribute] except (OSError, KeyError, json.JSONDecodeError) as e: logging.exception("Error during reading of \"info.json\" file: %s", e) return [] def drop_permissions_if_needed(username): # there is no need to drop privileges if we are already # running as user, so we should handle this case # by using empty context instead context = drop_privileges if os.geteuid(): context = contextlib.nullcontext return context(username) def get_subscription_status(allowed_features: dict, suite: str, feature: str): from clwpos.daemon import WposDaemon subscription_status = 'active' if feature in allowed_features.get(suite) else 'no' try: is_pending = daemon_communicate({ "command": WposDaemon.DAEMON_GET_UPGRADE_ATTEMPT_STATUS, "feature": feature })["pending"] except WposError: # in a rare situation when daemon is not active we # still would like to return list of modules # this is an old test-covered behavior that I would # not like to change now # it seems that in 99% of cases daemon must be active as we # start in when first module is enabled is_pending = False if is_pending: subscription_status = 'pending' return subscription_status