D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
alt
/
python37
/
lib
/
python3.7
/
site-packages
/
clwpos
/
optimization_features
/
Filename :
features.py
back
Copy
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import argparse import json import os import re import subprocess from pathlib import Path from typing import Dict from distutils.version import LooseVersion from clcommon.clwpos_lib import get_wp_cache_plugin from clwpos import gettext as _, constants from clwpos.cl_wpos_exceptions import WposError, WpCliCommandError from clwpos.constants import PULLZONE_DOMAIN_PROTOCOL, SMART_ADVISE_USER_UTILITY from clwpos.utils import run_in_cagefs_if_needed from dataclasses import dataclass, field, asdict from enum import Enum from clwpos.constants import ( MINIMUM_SUPPORTED_PHP_OBJECT_CACHE, CL_DOC_USER_PLUGIN, CLSOP_ZIP_PATH ) from clwpos.logsetup import setup_logging from clwpos.object_cache.redis_utils import ( get_cached_php_versions_with_redis_present, get_cached_php_versions_with_redis_loaded ) from clwpos.utils import ( is_conflict_modules_installed, PHP, clear_redis_cache_config, create_redis_cache_config ) from clwpos.wp_utils import ( wordpress, WordpressError, is_plugin_activated, is_plugin_installed, obtain_wp_cli_env, diagnose_redis_connection_constants, is_multisite, list_active_plugins, get_plugin_data ) class PluginStatus(Enum): UNINSTALLED = 'uninstalled' ACTIVE = 'active' INACTIVE = 'inactive' @dataclass class Issue: """ Generic class for keeping compatibility/misconfiguration issues """ header: str description: str fix_tip: str context: Dict[str, str] = field(default_factory=dict) @property def dict_repr(self): return asdict(self) class UniqueId: PHP_NOT_SUPPORTED = 'PHP_NOT_SUPPORTED' PLUGIN_CONFLICT = 'PLUGIN_CONFLICT' WORDPRESS_MULTISITE_ENABLED = 'WORDPRESS_MULTISITE_ENABLED' MISCONFIGURED_WORDPRESS = 'MISCONFIGURED_WORDPRESS' WEBSERVER_NOT_SUPPORTED = 'WEBSERVER_NOT_SUPPORTED' PHP_MISCONFIGURATION = 'PHP_MISCONFIGURATION' UNCOMPATIBLE_WORDPRESS_VERSION = 'UNCOMPATIBLE_WORDPRESS_VERSION' AWP_NOT_SUPPORTS_CDN = 'AWP_NOT_SUPPORTS_CDN' AWP_NOT_SUPPORTS_IMAGE_OPTIMIZATION = 'AWP_NOT_SUPPORTS_IMAGE_OPTIMIZATION' AWP_NOT_SUPPORTS_CPCSS = 'AWP_NOT_SUPPORTS_CPCSS' NS_CDN_CONFLICT = 'NS_CDN_CONFLICT' CLOUDLINUX_MODULE_ALREADY_ENABLED = 'CLOUDLINUX_MODULE_ALREADY_ENABLED' @dataclass class CompatibilityIssue(Issue): """ For compatibility issues """ unique_id: str = None telemetry: Dict[str, str] = field(default_factory=dict) type: str = 'incompatibility' @property def dict_repr(self): representation = asdict(self) representation.pop('unique_id') representation.pop('telemetry') return representation @dataclass class MisconfigurationIssue(Issue): """ For misconfiguration issues """ type: str = 'misconfiguration' class BillableFeatureMixin: def _get_or_create_unique_identifier(self): """ Wrapper for easy mocking """ from clwpos.billing import get_unique_identifier_as_user return get_unique_identifier_as_user() class Feature(str): """ Helper class which hides differences of optimization features behind abstract methods. """ NAME = '' WP_PLUGIN_NAME = '' HAS_LICENSE_TERMS = False LICENSE_TERMS_PATH = None _logger = setup_logging(f'{NAME.lower()}_feature') def __new__(cls, *args, **kwargs): if cls != Feature: return str.__new__(cls, *args) classes = { "object_cache": _ObjectCache, # yep, site_optimization and accelerate_wp names are same thing "site_optimization": _SiteOptimization, "accelerate_wp": _SiteOptimization, 'cdn': _Cdn, 'critical_css': _CriticalCSS, 'image_optimization': _ImageOptimization } try: return classes[args[0]](*args) except KeyError: raise argparse.ArgumentTypeError(f"No such feature: {args[0]}.") @classmethod def optimization_feature(cls): return cls(cls.NAME.lower()) @classmethod def included_optimization_features(cls): return [cls.optimization_feature()] @classmethod def redis_daemon_required(cls): raise NotImplementedError @classmethod def collect_docroot_issues(cls, wpos_user_obj, doc_root_info, visible_features=None): raise NotImplementedError @classmethod def is_php_supported(cls, php_version: PHP): raise NotImplementedError @classmethod def minimum_supported_wp_version(cls): raise NotImplementedError @staticmethod def collect_wordpress_issues(self, wordpress: Dict, docroot: str, module_is_enabled: bool): raise NotImplementedError @staticmethod def to_interface_name(): raise NotImplementedError @staticmethod def get_wp_plugin_status(wordpress_abs_path, plugin_name) -> PluginStatus: """ Get information about WordPress plugin current status. :param wordpress_abs_path: absolute path to wordpress installation :param plugin_name: name of plugin as it listed in plugins directory :return: PluginStatus """ response = Feature.get_plugin_data(wordpress_abs_path, plugin_name) # in case of missing plugin wp-cli returns empty dict if not response: return PluginStatus.UNINSTALLED # in any other case we get list of one element with parameters return PluginStatus(response[0]['status']) @staticmethod def get_plugin_data(wordpress_abs_path, plugin_name): return get_plugin_data(wordpress_abs_path, plugin_name) @staticmethod def get_plugin_version(wordpress_abs_path, plugin_name) -> str: response = Feature.get_plugin_data(wordpress_abs_path, plugin_name) # in case of missing plugin wp-cli returns empty dict if not response: raise WposError( message=_( 'Malformed plugins information received from wp-cli, ' 'unable to detect %(plugin)s version'), context={'plugin': plugin_name}, ) return response[0]['version'] @classmethod def _get_wp_plugin_compatibility_issues(cls, docroot, wordpress): """ Get issues that relates to currently installed WP plugin or None if everything is ok """ try: plugin_status = cls.get_wp_plugin_status( wordpress_abs_path=os.path.join(docroot, wordpress["path"]), plugin_name=cls.WP_PLUGIN_NAME) except WposError as e: return CompatibilityIssue( header=_('Unexpected WordPress error'), description=_( 'Unable to detect the WordPress plugins ' 'due to unexpected error. ' '\n\n' 'Technical details:\n%(error_message)s.\n' '\nMost likely WordPress installation is not working properly.' ), fix_tip=_( 'Check that your website is working properly – ' 'try to run the specified command to find any obvious ' 'errors in the WordPress configuration. ' 'Otherwise, try to fix other issues first - ' 'it may help to resolve this issue as well.' ), context=dict( error_message=e.message % e.context ), unique_id=UniqueId.MISCONFIGURED_WORDPRESS, telemetry=dict( error_message=e.message % e.context ) ) return cls._get_issues_from_wp_plugin_status(plugin_status) @classmethod def _get_issues_from_wp_plugin_status(cls, plugin_status): raise NotImplementedError @classmethod def install(cls, abs_wp_path: str): raise NotImplementedError @classmethod def enable(cls, abs_wp_path: str, *args, **kwargs): raise NotImplementedError @classmethod def disable(cls, abs_wp_path: str, **kwargs): raise NotImplementedError class _ObjectCache(Feature): """Implementation for object caching""" NAME = 'OBJECT_CACHE' WP_PLUGIN_NAME = 'redis-cache' @classmethod def redis_daemon_required(cls): return True @staticmethod def to_interface_name(): return 'object_cache' @classmethod def _get_issues_from_wp_plugin_status(cls, plugin_status): """ Get issue that relates to currently installed redis-cache plugin or None if everything is ok """ if plugin_status == PluginStatus.INACTIVE: return MisconfigurationIssue( header=_('"Redis Object Cache" plugin is deactivated'), description=_('Object caching is enabled, but the ' '"Redis Object Cache" plugin is deactivated in Wordpress admin page. Caching does not work'), fix_tip=_('Activate the Redis Object Cache plugin in the Wordpress admin page and ' 'enable Object Cache Drop-in in the Redis Object Cache plugin settings. ' 'As an alternative, rollback the feature and apply it again.') ) elif plugin_status == PluginStatus.ACTIVE: return MisconfigurationIssue( header=_('The Object Cache Drop-in not installed'), description=_('The Object Cache Drop-In is not enabled. Caching does not work'), fix_tip=_('Enable the Object Cache using the Redis Object Cache plugin ' 'settings page of Wordpress Admin. ' 'As an alternative, rollback the feature and apply it again.') ) elif plugin_status == PluginStatus.UNINSTALLED: return MisconfigurationIssue( header=_('"Redis Object Cache" plugin is not installed'), description=_('The "Redis Object Cache" WordPress plugin is not installed. ' 'Caching does not work'), fix_tip=_('Rollback the feature and apply it again. ' 'Contact your administrator if the issue persists.') ) else: raise WposError(_('Unexpected plugin status: %(status)s'), context=dict(status=plugin_status)) @classmethod def collect_docroot_issues(cls, wpos_user_obj, doc_root_info, visible_features=None): """ Collects incompatibilities related to docroot (non-supported handler, etc) for object caching. """ issues = [] php_version = PHP(doc_root_info['php_version'].namely()) is_modules_visible = None supported_php_versions = wpos_user_obj.supported_php_versions[OBJECT_CACHE_FEATURE] header__, fix_tip__, description__, uniq_id__, telemetry__ = None, None, None, None, None if visible_features is not None: is_modules_visible = 'object_cache' in visible_features if not cls.is_php_supported(php_version): header__ = _('PHP version is not supported') fix_tip__ = _('Please, set or ask your system administrator to set one of the ' 'supported PHP versions: %(compatible_versions)s') description__ = _('Non supported PHP version %(php_version)s currently is used.') uniq_id__ = UniqueId.PHP_NOT_SUPPORTED telemetry__ = dict( reason='PHP_VERSION_TOO_LOW', php_version=php_version, supported_php_versions=supported_php_versions ) elif php_version not in get_cached_php_versions_with_redis_present(): header = _('Redis extension is not installed for selected php version') fix_tip = _('Please, install or ask your system administrator to install redis extension ' 'for current %(php_version)s version, or use one of the compatible php versions: ' '%(compatible_versions)s for the domain.') description = _('Redis PHP extension is required for optimization feature, but not installed for ' 'selected PHP version: %(php_version)s.') # in order to create advices when module is not visible yet if not is_modules_visible: issues.append(MisconfigurationIssue( header=header, fix_tip=fix_tip, description=description, context=dict(php_version=php_version, compatible_versions=supported_php_versions))) else: header__ = header fix_tip__ = fix_tip description__ = description uniq_id__ = UniqueId.PHP_NOT_SUPPORTED telemetry__ = dict( php_version=php_version, reason='PHP_REDIS_NOT_INSTALLED', supported_php_versions=supported_php_versions ) elif php_version not in get_cached_php_versions_with_redis_loaded(): header = _('Redis extension is not loaded for selected php version') fix_tip = _('Please, load or ask your system administrator to load redis extension ' 'for current %(php_version)s version, or use one of the compatible php versions: ' '%(compatible_versions)s for the domain.') description = _('Redis PHP extension is required for optimization feature, but not loaded for ' 'selected PHP version: %(php_version)s.') if not is_modules_visible: issues.append(MisconfigurationIssue( header=header, fix_tip=fix_tip, description=description, context=dict(php_version=php_version, compatible_versions=supported_php_versions))) else: header__ = header fix_tip__ = fix_tip description__ = description uniq_id__ = UniqueId.PHP_NOT_SUPPORTED telemetry__ = dict( php_version=php_version, reason='PHP_REDIS_NOT_LOADED', supported_php_versions=supported_php_versions ) if not supported_php_versions: fix_tip__ = _('Please, ask your system administrator to setup at least ' 'one of the recommended PHP version in accordance with docs (%(docs_url)s).') if header__ is not None: issues.append( CompatibilityIssue( header=header__, description=description__, fix_tip=fix_tip__, context=dict(php_version=php_version, compatible_versions=', '.join(supported_php_versions), docs_url=constants.CL_DOC_USER_PLUGIN), unique_id=uniq_id__, telemetry=telemetry__ ) ) if doc_root_info["php_handler"] not in wpos_user_obj.supported_handlers: issues.append( CompatibilityIssue( header=_('Unsupported PHP handler'), description=_('Website uses unsupported PHP handler. Currently supported ' 'handler(s): %(supported_handlers)s.'), fix_tip=_('Please, set or ask your system administrator to set one of the ' 'supported PHP handlers for the domain: %(supported_handlers)s. ' 'Or keep watching our blog: %(blog_url)s for supported handlers list updates.'), context={ 'supported_handlers': ", ".join(wpos_user_obj.supported_handlers), 'blog_url': 'https://blog.cloudlinux.com/' }, unique_id=UniqueId.PHP_NOT_SUPPORTED, telemetry=dict( reason='PHP_UNSUPPORTED_HANDLER', handler=doc_root_info["php_handler"], supported_handlers=wpos_user_obj.supported_handlers, php_version=php_version ) ) ) incompatible_php_modules = {} incompatible_module = 'snuffleupagus' if incompatible_php_modules.get(php_version) == incompatible_module or \ is_conflict_modules_installed(php_version, incompatible_module): incompatible_php_modules[php_version] = incompatible_module issues.append( CompatibilityIssue( header=_('Unsupported PHP module is loaded'), description=_('Incompatible PHP module "%(incompatible_module)s" is currently used.'), fix_tip=_('Please, disable or remove "%(incompatible_module)s" PHP extension.'), context=dict(incompatible_module=incompatible_module), unique_id=UniqueId.PHP_NOT_SUPPORTED, telemetry=dict( handler=doc_root_info["php_handler"], supported_handlers=wpos_user_obj.supported_handlers, php_version=php_version ) )) return issues @classmethod def is_php_supported(cls, php_version: PHP): """ Check if passed php version >= minimum PHP version supported by object caching. """ return php_version.digits >= MINIMUM_SUPPORTED_PHP_OBJECT_CACHE @classmethod def minimum_supported_wp_version(cls): return constants.MINIMUM_SUPPORTED_WP_OBJECT_CACHE @classmethod def collect_wordpress_issues(cls, self, wordpress: Dict, docroot: str, module_is_enabled: bool): issues = [] wp_dir = Path(docroot).joinpath(wordpress["path"]) wp_content_dir = wp_dir.joinpath("wp-content") plugin_type = "object-cache" detected_object_cache_plugin = get_wp_cache_plugin(wp_dir, plugin_type) if module_is_enabled: if detected_object_cache_plugin != "redis-cache": issue = cls._get_wp_plugin_compatibility_issues(docroot, wordpress) if issue: issues.append(issue) if not self.is_redis_running: issues.append( MisconfigurationIssue( header=_('Redis is not running'), description=_('Object caching is enabled, but redis process is not running.'), fix_tip=_('Redis will start automatically in 5 minutes. ' 'If the issue persists - contact your system administrator and report this issue') ) ) try: diagnose_redis_connection_constants(docroot, wordpress['path']) except WpCliCommandError as e: issues.append( MisconfigurationIssue( header=_('Unable to identify redis constants in wordpress config'), description=_('wp-cli utility returns malformed response, reason: "%(reason)s"'), fix_tip=_('Please, try to check executed command and fix possible issues with it. ' 'If issue persists - please, contact CloudLinux support.'), context=dict( reason=e.message % e.context ) ) ) except WposError as e: issues.append( MisconfigurationIssue( header=_('Missed redis constants in site config'), description=_('WordPress config does not have needed constants ' 'for redis connection establishment.\n' 'Details: %(reason)s'), fix_tip=_('Please, try to disable and enable plugin again. ' 'If issue persists - please, contact CloudLinux support.'), context=dict( reason=e.message % e.context ) ) ) if detected_object_cache_plugin == "Unknown": drop_in_file = wp_content_dir.joinpath(f'{plugin_type}.php') issues.append( CompatibilityIssue( header=_('Conflicting object caching plugin enabled'), description=_('Unknown custom object caching plugin is already enabled'), fix_tip=_(f'Remove the drop-in ({drop_in_file}) file from the WordPress ' f'instance because it conflicts with AccelerateWP object caching.'), unique_id=UniqueId.PLUGIN_CONFLICT, telemetry=dict( reason='OBJECT_CACHE_ALREADY_ENABLED', plugin=detected_object_cache_plugin ) )) elif detected_object_cache_plugin == "w3-total-cache": issues.append( CompatibilityIssue( header=_('Object Caching of W3 Total Cache plugin is incompatible'), description=_('WordPress website already has Object Caching feature enabled ' 'with caching backend configured by the the W3 Total Cache plugin.'), fix_tip=_('Deactivate Object Caching in W3 Total Cache plugin settings.'), context=dict(), unique_id=UniqueId.PLUGIN_CONFLICT, telemetry=dict( reason='OBJECT_CACHE_ALREADY_ENABLED', plugin=detected_object_cache_plugin ) )) elif detected_object_cache_plugin not in (None, "redis-cache"): issues.append( CompatibilityIssue( header=_('Conflicting object caching plugin enabled'), description=_('The "%(detected_wp_plugin)s" plugin conflicts with AccelerateWP object caching.'), fix_tip=_('Deactivate object caching in the plugin settings or completely uninstall' 'the conflicting plugin using the WordPress administration interface.'), context=dict(detected_wp_plugin=detected_object_cache_plugin), unique_id=UniqueId.PLUGIN_CONFLICT, telemetry=dict( reason='OBJECT_CACHE_ALREADY_ENABLED', plugin=detected_object_cache_plugin ) )) try: if not self.check_installed_roc_plugin(os.path.join(docroot, wordpress['path'])): issues.append( CompatibilityIssue( header=_('Another Redis Object Cache plugin is installed'), description=_('Non CloudLinux Redis Object Cache is installed for the website'), fix_tip=_('Uninstall Redis Object Cache plugin using WordPress administration page'), unique_id=UniqueId.PLUGIN_CONFLICT, telemetry=dict( reason='OBJECT_CACHE_ALREADY_ENABLED', plugin=detected_object_cache_plugin ) )) except WpCliCommandError as e: issues.append( MisconfigurationIssue( header=_('Unable to identify installed object cache plugin in WordPress'), description=_('wp-cli utility returns malformed response, reason: "%(reason)s"'), fix_tip=_('Please, try to check executed command and fix possible issues with it. ' 'If issue persists - please, contact CloudLinux support.'), context=dict( reason=e.message % e.context ) ) ) try: multisite = is_multisite(os.path.join(docroot, wordpress["path"])) if multisite: issues.append( CompatibilityIssue( header=_('WordPress Multisite mode is enabled'), description=_('WordPress uses the Multisite mode which is currently not supported.'), fix_tip=_('Install or configure WordPress in the single-site mode.'), unique_id=UniqueId.WORDPRESS_MULTISITE_ENABLED, telemetry=dict() )) except WposError as e: issues.append( CompatibilityIssue( header=_('Unexpected WordPress error'), description=_('Unable to detect if the WordPress installation has the Multisite mode enabled ' 'mode due to unexpected error. ' '\n\n' 'Technical details:\n%(error_message)s.\n' '\nMost likely WordPress installation is not working properly.'), fix_tip=_('If this is only one issue, please check that your website is working properly – ' 'try to run the specified command to find any obvious ' 'errors in the WordPress configuration. ' 'Otherwise, try to fix other issues first - it may help to resolve this issue as well.'), context=dict( error_message=e.message % e.context ), unique_id=UniqueId.MISCONFIGURED_WORDPRESS, telemetry=dict( error_message=e.message % e.context ) )) return issues @classmethod def install(cls, abs_wp_path: str): """ Install redis-cache plugin for user. :param abs_wp_path: absolute path to wp site :return: """ res = wordpress(abs_wp_path, "plugin", "install", cls.WP_PLUGIN_NAME) if isinstance(res, WordpressError): raise WposError(message=res.message, context=res.context) @classmethod def enable(cls, abs_wp_path: str, *args, **kwargs): """ Enable redis-cache plugin for user. :param abs_wp_path: absolute path to wp site :return: """ create_redis_cache_config(abs_wp_path) errors = [] res = wordpress(abs_wp_path, "plugin", "activate", cls.WP_PLUGIN_NAME) if isinstance(res, WordpressError): errors.append(res) if not errors: res = wordpress(abs_wp_path, "redis", "enable") if isinstance(res, WordpressError): errors.append(res) if errors: clear_redis_cache_config(abs_wp_path) raise WposError(message='Errors during enabling feature: %(error)s', context=dict(error=str(errors))) @classmethod def disable(cls, abs_wp_path: str, **kwargs): """ Delete cloudlinux info from wp-config.php, deactivate and delete redis-cache plugin for user. :param abs_wp_path: absolute path to wp site :return: list of errors that occurred during command execution """ errors = [] if is_plugin_activated(abs_wp_path, cls.WP_PLUGIN_NAME): res = wordpress(abs_wp_path, "plugin", "deactivate", cls.WP_PLUGIN_NAME) if isinstance(res, WordpressError): errors.append(res) if not errors and is_plugin_installed(abs_wp_path, cls.WP_PLUGIN_NAME): # continue procedure further only if previous step succeeded res = wordpress(abs_wp_path, "plugin", "delete", cls.WP_PLUGIN_NAME) if isinstance(res, WordpressError): errors.append(res) if not errors: # cleanup constants in the end only if deactivation/deletion succeeded, # because it may impact on deactivating/deleting plugin try: clear_redis_cache_config(abs_wp_path) except WposError as err: cls._logger.exception(err) errors.append(WordpressError(err.message, err.context)) except Exception as e: cls._logger.exception(e) errors.append( WordpressError( message=_( 'Unexpected error happened while clearing cache: %(error)s'), context=dict(error=str(e))) ) return errors class _SiteOptimization(Feature): """Implementation for site optimization feature""" NAME = 'SITE_OPTIMIZATION' WP_PLUGIN_NAME = 'clsop' WP_FEATURE_NAME = 'accelerate-wp' @classmethod def redis_daemon_required(cls): return False @staticmethod def to_interface_name(): return 'accelerate_wp' @classmethod def collect_docroot_issues(cls, wpos_user_obj, doc_root_info, visible_features=None): """ Collects incompatibilities related to docroot (non-supported handler, etc) for site optimizatin module. """ issues = [] php_version = PHP(doc_root_info['php_version'].namely()) if not cls.is_php_supported(php_version): supported_php_versions = wpos_user_obj.supported_php_versions[SITE_OPTIMIZATION_FEATURE] issues.append( CompatibilityIssue( header=_('PHP version is not supported'), fix_tip=_('Please, set or ask your system administrator to set one of the ' 'supported PHP version: %(compatible_versions)s for the domain.'), description=_('Non supported PHP version %(php_version)s currently is used.'), context=dict(php_version=php_version, compatible_versions=', '.join(supported_php_versions), docs_url=CL_DOC_USER_PLUGIN), unique_id=UniqueId.PHP_NOT_SUPPORTED, telemetry=dict(reason='PHP_VERSION_TOO_LOW') ) ) return issues @staticmethod def _requirements(): with open("/opt/cloudlinux-site-optimization-module/requirements.json", "r") as f: # { # "required_php_version": "7.0", # "required_wp_version": "5.4", # "incompatible_plugins": { # "w3-total-cache": "w3-total-cache/w3-total-cache.php", # "wp-super-cache": "wp-super-cache/wp-cache.php" # } # } return json.load(f) @classmethod def incompatible_plugins(cls): return set(cls._requirements()["incompatible_plugins"].keys()) @classmethod def is_php_supported(cls, php_version: PHP): """ Check if passed php version >= minimum PHP version supported by site optimization feature. """ return php_version.digits >= int(cls._requirements()["required_php_version"].replace(".", "")) @classmethod def minimum_supported_wp_version(cls): return cls._requirements()["required_wp_version"] @classmethod def collect_wordpress_issues(cls, self, wordpress_info: Dict, docroot: str, module_is_enabled: bool): issues = [] abs_wp_path = Path(docroot).joinpath(wordpress_info["path"]) wp_content_dir = abs_wp_path.joinpath("wp-content") plugin_type = "advanced-cache" detected_advanced_cache_plugin = get_wp_cache_plugin(abs_wp_path, plugin_type) try: plugins_data = list_active_plugins(str(abs_wp_path)) except WposError as e: issues.append( MisconfigurationIssue( header=_('Unable to identify module compatibility'), description=_('Malformed output received from the following command: <br> $/opt/clwpos/wp-cli plugin list --status=active --format=json' '<br><br>The raw command output is:<br> \"%(wp_cli_response)s\"'), fix_tip=_('Please, check the received command output and ensure it returns a valid JSON.'), context=dict( wp_cli_response=str(e) ) ) ) found_plugins = set() else: found_plugins = {item["name"] for item in plugins_data} result = found_plugins & cls.incompatible_plugins() if detected_advanced_cache_plugin: result.add(detected_advanced_cache_plugin) result.discard("AccelerateWP") # if our WP Rocket module is enabled it's not conflicting plugin if module_is_enabled: result.discard("WP Rocket") issue = cls._get_wp_plugin_compatibility_issues(docroot, wordpress_info) if issue: issues.append(issue) # for more beautiful output if len(result) > 1: result.discard("Unknown") result = list(result) if len(result) == 1 and result[0] == 'Unknown': drop_in_file = wp_content_dir.joinpath(f'{plugin_type}.php') issues.append( CompatibilityIssue( header=_("Conflicting advanced cache plugin enabled"), description=_("Unknown advanced cache plugin is already enabled."), fix_tip=_(f'Remove the drop-in ({drop_in_file}) file from the WordPress ' f'instance because it conflicts with AccelerateWP.'), context=dict(plugins=", ".join(result)), unique_id=UniqueId.PLUGIN_CONFLICT, telemetry=dict( reason='SOM_ALREADY_ENABLED', plugin=list(result) ) ) ) elif result: issues.append( CompatibilityIssue( header=_("Conflicting plugins are enabled"), description=_("Found conflicting plugins: %(plugins)s."), fix_tip=_("Deactivate and uninstall the conflicting plugin " "using the WordPress administration interface."), context=dict(plugins=", ".join(result)), unique_id=UniqueId.PLUGIN_CONFLICT, telemetry=dict( reason='SOM_ALREADY_ENABLED', plugin=list(result) ) ) ) return issues @classmethod def _get_issues_from_wp_plugin_status(cls, plugin_status): """ Get issue that relates to currently installed redis-cache plugin or None if everything is ok """ if plugin_status == PluginStatus.INACTIVE: return MisconfigurationIssue( header=_('"AccelerateWP" plugin is deactivated'), description=_('AccelerateWP feature is enabled, but the ' '"AccelerateWP" plugin is deactivated in Wordpress admin page. Caching does not work'), fix_tip=_( 'Activate the "AccelerateWP" plugin in the Wordpress admin page. ' 'As an alternative, rollback the feature and apply it again.') ) elif plugin_status == PluginStatus.UNINSTALLED: return MisconfigurationIssue( header=_('"AccelerateWP" plugin is not installed'), description=_( 'The "AccelerateWP" WordPress plugin is not installed. ' 'Caching does not work'), fix_tip=_('Rollback the feature and apply it again. ' 'Contact your administrator if the issue persists.') ) @classmethod def install(cls, abs_wp_path: str): """ Install cloudlinux-site-optimization plugin for user. :param abs_wp_path: absolute path to wp site :return: """ env = obtain_wp_cli_env(abs_wp_path) if is_plugin_installed(abs_wp_path, 'clsop', env): return res = wordpress(abs_wp_path, "plugin", "install", CLSOP_ZIP_PATH, env=env) if isinstance(res, WordpressError): raise WposError(message=res.message, context=res.context) @classmethod def enable(cls, abs_wp_path: str, *args, **kwargs): """ Enable cloudlinux-site-optimization plugin for user. :param abs_wp_path: absolute path to wp site :return: """ env = obtain_wp_cli_env(abs_wp_path) res = wordpress(abs_wp_path, "plugin", "activate", cls.WP_PLUGIN_NAME, env=env) if isinstance(res, WordpressError): raise WposError(message=res.message, context=res.context) @classmethod def disable(cls, abs_wp_path: str, **kwargs): """ Deactivate and delete cloudlinux-site-optimization plugin for user. :param abs_wp_path: absolute path to wp site :return: list of errors that occurred during command execution """ errors = [] env = obtain_wp_cli_env(abs_wp_path) if is_plugin_activated(abs_wp_path, cls.WP_PLUGIN_NAME, env): res = wordpress(abs_wp_path, "plugin", "deactivate", cls.WP_PLUGIN_NAME, env=env) if isinstance(res, WordpressError): errors.append(res) if not errors and is_plugin_installed(abs_wp_path, cls.WP_PLUGIN_NAME, env): # continue procedure further only if previous step succeeded res = wordpress(abs_wp_path, "plugin", "uninstall", cls.WP_PLUGIN_NAME, env=env) if isinstance(res, WordpressError): errors.append(res) return errors class AWPDependentFeature(_SiteOptimization): """ Introduces basic class for features, those strictly depend on AccelerateWP plugin -- e.g. are its sub-features """ MINIMUM_AWP_PLUGIN_VERSION: str NOT_SUPPORTED_ID: str @classmethod def included_optimization_features(cls): """ Dependent feature needs AccelerateWP plugin """ return [_SiteOptimization.optimization_feature(), cls.optimization_feature()] @classmethod def is_plugin_version_supported(cls, abs_path): """ Dependent feature carries its own MINIMUM_AWP_PLUGIN_VERSION supported """ actual_version = cls.get_plugin_version(wordpress_abs_path=abs_path, plugin_name=_SiteOptimization.WP_PLUGIN_NAME) return LooseVersion(actual_version) >= LooseVersion(cls.MINIMUM_AWP_PLUGIN_VERSION) @classmethod def collect_wordpress_issues(cls, self, wordpress_info: Dict, docroot: str, module_is_enabled: bool): """ Collects all incompatibilities of AccelerateWP (Site Optimization plugin) + dependent plugin specific """ abs_wp_path = os.path.join(docroot, wordpress_info['path']) is_accelerate_wp_plugin_activated = is_plugin_activated(abs_wp_path, _SiteOptimization.WP_PLUGIN_NAME) issues = super().collect_wordpress_issues(self, wordpress_info, docroot, module_is_enabled=is_accelerate_wp_plugin_activated) if is_accelerate_wp_plugin_activated and not cls.is_plugin_version_supported(abs_wp_path): issues.append( CompatibilityIssue( header=_("Incompatible version of AccelerateWP plugin"), description=_( f"Version of AccelerateWP plugin must be higher than {cls.MINIMUM_AWP_PLUGIN_VERSION}"), fix_tip=_(f'Update AccelerateWP plugin in Wordpress Admin page'), unique_id=cls.NOT_SUPPORTED_ID, telemetry=dict( reason=cls.NOT_SUPPORTED_ID ) ) ) if module_is_enabled: issue = cls._get_wp_plugin_compatibility_issues(docroot, wordpress_info) if issue: issues.append(issue) return issues @classmethod def install(cls, abs_wp_path: str): """ Install basic AccelerateWP plugin """ env = obtain_wp_cli_env(abs_wp_path) if is_plugin_installed(abs_wp_path, _SiteOptimization.WP_PLUGIN_NAME, env): return res = wordpress(abs_wp_path, "plugin", "install", CLSOP_ZIP_PATH, env=env) if isinstance(res, WordpressError): raise WposError(message=res.message, context=res.context) @classmethod def enable(cls, abs_wp_path: str, *args, **kwargs): # enable AccelerateWP itself first _SiteOptimization.enable(abs_wp_path) res = wordpress(abs_wp_path, _SiteOptimization.WP_FEATURE_NAME, cls.WP_FEATURE_NAME, "enable", *args) if isinstance(res, WordpressError): raise WposError(message=res.message, context=res.context) @classmethod def disable(cls, abs_wp_path: str, **kwargs): """ Disables a dependent optimization feature inside accelerate-wp plugin """ env = obtain_wp_cli_env(abs_wp_path) if is_plugin_activated(abs_wp_path, _SiteOptimization.WP_PLUGIN_NAME, env): res = wordpress(abs_wp_path, _SiteOptimization.WP_FEATURE_NAME, cls.WP_FEATURE_NAME, "disable") if isinstance(res, WordpressError): raise WposError(message=res.message, context=res.context) class _Cdn(AWPDependentFeature, BillableFeatureMixin): """Implementation for CDN feature""" NAME = 'CDN' WP_FEATURE_NAME = 'cdn' # TODO: replace with real MINIMUM_AWP_PLUGIN_VERSION = '3.11.2' NOT_SUPPORTED_ID = UniqueId.AWP_NOT_SUPPORTS_CDN # this feature has specific license terms which user should # apply before he can use the feature HAS_LICENSE_TERMS = True LICENSE_TERMS_PATH = '/opt/clwpos/agreements/cdn' KNOWN_CDN_NS = { 'cloudflare': 'Cloudflare', '.fastly.net.': 'Fastly', '.akam.net': 'Akamai', 'awsdns': 'Amazon CloudFront(AWS)', '.impervadns.net': 'Imperva', 'azure': 'Azure CDN', 'google': 'Google Cloud CDN', 'sucuri': 'Sucuri', 'cdn77': 'CDN77', 'netlify': 'Netlify' } CDN_NS_PATTERN = re.compile(f"{'|'.join(KNOWN_CDN_NS.keys())}") @staticmethod def to_interface_name(): return 'cdn' @classmethod def incompatible_plugins(cls) -> set: """ CDN incompatible plugins. - cloudflare - litespeed-cache - speed-booster-pack - w3-total-cache - wp-fastest-cache - wp-super-cache are incompatible with CDN too, but they are already listed as SOM conflicting plugins """ cdn_incompatible_plugins = { 'autoptimize', 'bunnycdn', 'cdn-enabler', 'cloudimage', 'cloudinary-image-management-and-manipulation-in-the-cloud-cdn', 'nazy-load', 'optimole-wp', 'sirv', 'image-cdn', # TODO: AWP-435 'jetpack', 'nitropack', 'shift8-cdn', 'smartvideo', 'wp-cloudflare-page-cache', 'shapepress-dsgvo', 'amazon-s3-and-cloudfront', 'wp-cdn-yes', 'aws-cdn-by-wpadmin' } return cdn_incompatible_plugins.union(super().incompatible_plugins()) @classmethod def dig_ns(cls, domain_name: str) -> str: """Dig domain's NS""" dig_util = '/usr/bin/dig' if not os.path.isfile(dig_util): # dig is not installed, assume no NS detected return str() dig_cmd = [ dig_util, 'ns', domain_name, '+short' ] try: dig_result = subprocess.run(dig_cmd, capture_output=True, text=True) if dig_result.returncode: # dig command failed with returncode, assume no NS detected cls._logger.exception("dig domain failed with exitcode %s: \n" "stdout=%s\n" "stderr=%s", dig_result.returncode, dig_result.stdout, dig_result.stderr) return str() return dig_result.stdout.strip() except (OSError, IOError, ) as e: # subprocess failed to execute command, assume no NS detected cls._logger.exception( "Failed to dig domain, command crushed with: %s", e) return str() @classmethod def collect_docroot_issues(cls, wpos_user_obj, doc_root_info, visible_features=None): """ Collects incompatibilities related to docroot for CDN module: - site optimization inherited - CDN already enabled by NS """ issues = super().collect_docroot_issues(wpos_user_obj, doc_root_info, visible_features) primary_domain = doc_root_info['domains'][0] ns_cdn_detected = cls.CDN_NS_PATTERN.search( cls.dig_ns(primary_domain)) if ns_cdn_detected is not None: issues.append(CompatibilityIssue( header=_("CDN is already enabled"), description=_("Already enabled CDN found: %(cdn)s."), fix_tip=_("Deactivate the enabled CDN using " "your service provider instructions."), context=dict(cdn=cls.KNOWN_CDN_NS[ns_cdn_detected.group(0)]), unique_id=UniqueId.NS_CDN_CONFLICT, telemetry=dict( reason='CDN_ALREADY_ENABLED_BY_NS', plugin=cls.KNOWN_CDN_NS[ns_cdn_detected.group(0)] ) )) return issues def enable(self, abs_wp_path: str, *args, **kwargs): domain = f'{PULLZONE_DOMAIN_PROTOCOL}{kwargs.get("domain")}' website = f'/{kwargs.get("website")}' skip_checkers = kwargs.get("skip_checkers", False) get_pullzone_command = [SMART_ADVISE_USER_UTILITY, 'awp-cdn-get-pullzone', '--domain', domain, '--website', website] try: output = run_in_cagefs_if_needed(get_pullzone_command, check=True) except subprocess.CalledProcessError as error: self._logger.exception("Error during obtaining pullzone: \n" "stdout=%s\n" "stderr=%s", error.stdout, error.stderr) raise WposError('Unable to obtain pullzone required for CDN optimization feature') pullzone_data = json.loads(output.stdout)['data'] additional_args = list() if skip_checkers: additional_args.append('--skip-check') super().enable(abs_wp_path, '--account_id=%s' % pullzone_data['account_id'], '--cdn_url=%s' % pullzone_data['cdn_url'], '--api_key=%s' % self._get_or_create_unique_identifier(), *additional_args) @classmethod def disable(cls, abs_wp_path: str, **kwargs): """ Disables cdn feature inside accelerate wp plugin """ domain = f'{PULLZONE_DOMAIN_PROTOCOL}{kwargs.get("domain")}' website = f'/{kwargs.get("website")}' remove_pullzone_command = [SMART_ADVISE_USER_UTILITY, 'awp-cdn-remove-pullzone', '--domain', domain, '--website', website] try: run_in_cagefs_if_needed(remove_pullzone_command, check=True) except subprocess.CalledProcessError as error: cls._logger.exception("Error during removing pullzone: \n" "stdout=%s\n" "stderr=%s", error.stdout, error.stderr) raise WposError('Unable to remove pullzone') super().disable(abs_wp_path) class _ImageOptimization(AWPDependentFeature, BillableFeatureMixin): """Implementation for image optimization feature""" NAME = 'IMAGE_OPTIMIZATION' WP_FEATURE_NAME = 'image_optimization' MINIMUM_AWP_PLUGIN_VERSION = '3.12.6.1-1.1-1' NOT_SUPPORTED_ID = UniqueId.AWP_NOT_SUPPORTS_IMAGE_OPTIMIZATION @staticmethod def to_interface_name(): return 'image_optimization' @classmethod def incompatible_plugins(cls) -> set: """ Image Optimization incompatible plugins. """ img_opt_incompatible_plugins = { 'ewww-image-optimizer', 'shortpixel-image-optimiser', 'shortpixel-adaptive-images' 'imagify', 'optimole-wp', 'wp-smushit', 'resmushit-image-optimizer', 'megaoptim-image-optimizer', 'kraken-image-optimizer', 'tiny-compress-images', 'wp-compress-image-optimizer', 'optimus', 'imsanity', # TODO: AWP-435 # jetpack skipped for now, since more smart detection required } return img_opt_incompatible_plugins.union(super().incompatible_plugins()) def enable(self, abs_wp_path: str, *args, **kwargs): super().enable(abs_wp_path, '--unique_id=%s' % self._get_or_create_unique_identifier()) class _CriticalCSS(AWPDependentFeature, BillableFeatureMixin): """Implementation for Critical Path CSS feature""" NAME = 'CPCSS' WP_FEATURE_NAME = 'cpcss' MINIMUM_AWP_PLUGIN_VERSION = '3.12.6.1-1.1-1' NOT_SUPPORTED_ID = UniqueId.AWP_NOT_SUPPORTS_CPCSS @staticmethod def to_interface_name(): return 'critical_css' @classmethod def optimization_feature(cls): return cls(cls.to_interface_name()) def enable(self, abs_wp_path: str, *args, **kwargs): super().enable(abs_wp_path, '--unique_id=%s' % self._get_or_create_unique_identifier()) OBJECT_CACHE_FEATURE = Feature("object_cache") SITE_OPTIMIZATION_FEATURE = Feature("site_optimization") CDN_FEATURE = Feature('cdn') CRITICAL_CSS_FEATURE = Feature("critical_css") IMAGE_OPTIMIZATION_FEATURE = Feature('image_optimization') ALL_OPTIMIZATION_FEATURES = [ OBJECT_CACHE_FEATURE, SITE_OPTIMIZATION_FEATURE, CDN_FEATURE, CRITICAL_CSS_FEATURE, IMAGE_OPTIMIZATION_FEATURE ]