D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
imunify360
/
venv
/
lib64
/
python3.11
/
site-packages
/
im360
/
subsys
/
Filename :
waf_rules_configurator.py
back
Copy
import json import logging import os from asyncio import CancelledError from packaging.version import Version from pathlib import Path from defence360agent.utils import ( BACKUP_EXTENSION, CheckRunError, ) from im360.files import MODSEC, Index from defence360agent.subsys import web_server from im360.subsys.panels.base import APACHE from im360.subsys.panels.generic.panel import GenericPanel from .modsec_app_version_detector import map_components_versions_to_tags from .panels.hosting_panel import HostingPanel from defence360agent.subsys.web_server import ( safe_update_config, graceful_restart, ) logger = logging.getLogger(__name__) COMPONENTS_VERSION_DB = ( "/var/lib/cloudlinux-app-version-detector/components_versions.sqlite3" ) RULES_CONF_PATTERN = "<IfModule security2_module>\n{}\n</IfModule>" MAPPING_FILE = "tags_matching.json" class NotSupportedWebserverError(Exception): pass async def is_webserver_supported() -> bool: """Apache >= 2.4 is supported and security2_module installed. - litespeed is not supported - nginx is not supported Apache is expected to be running, otherwise False is returned In case of any error, False is returned also """ hp = HostingPanel() webserver = await hp.get_web_server() if isinstance(hp, GenericPanel): # on Generic panel we expect that apache version >= 2.4 will be used # no check apache running and version explicitly return webserver == APACHE if webserver != APACHE: return False try: version = await web_server.apache_version() modules = await web_server.apache_modules() except CancelledError: raise except Exception as exc: logger.error("Error occurs while getting Apache version: %s", exc) return False return version >= Version("2.4") and b"security2_module" in modules async def update_waf_rules_config(): """ Update modsec config file with enabled tags for specific directories. """ if not await is_webserver_supported(): raise NotSupportedWebserverError( "WAF rules configurator supports only apache webserver with " "version >= 2.4 and ModSecurity 2" ) config_path = Path(HostingPanel().get_app_specific_waf_config()) new_config = _rules_config() is_uptodate = ( config_path.exists() and config_path.read_text() == new_config ) if not is_uptodate: if await safe_update_config(config_path, new_config): logger.info("WAF Rules Set Config was successfully updated") else: logger.info("WAF Rules Set Config is already up to date") def _rules_config(): mapping_path = os.path.join(Index.files_path(MODSEC), MAPPING_FILE) with open(mapping_path, encoding="utf-8") as f: tags = json.load(f) rules_list = map_components_versions_to_tags(COMPONENTS_VERSION_DB, tags) # sort rules list to get the equal hash for the same set of rules rules_config_text = RULES_CONF_PATTERN.format( "\n".join(sorted(rules_list)) ) return rules_config_text async def try_restore_config_from_backup(): """ In case if Agent starts and config backup is present — then it is required to restore it: original .conf files must be replaced by the backup file and then WS restart command must be applied. In this case we assume that backed up configuration file is correct and none config checks are performed. """ try: config_path = HostingPanel().get_app_specific_waf_config() except NotImplementedError: return if os.path.isfile(config_path + BACKUP_EXTENSION): os.rename(config_path + BACKUP_EXTENSION, config_path) try: await graceful_restart() except CheckRunError: logger.exception( "Web server failed to start with a backed up config" )