D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
alt
/
python37
/
lib
/
python3.7
/
site-packages
/
ssa
/
autotracing
/
Filename :
config.py
back
Copy
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ This module contains a config parser for cloudlinux-xray-autotracing """ import logging import os import pwd from configparser import ConfigParser from dataclasses import dataclass from enum import Enum from typing import Any, Union, Tuple, Iterator, List, Optional from clcommon.clpwd import ClPwd from clcommon.cpapi import cpusers from clcommon.cpapi.cpapiexceptions import CPAPIException from clcommon.lib.cledition import ( CLEditions, CLEditionDetectionError, SupportedEditions ) from ..internal.constants import flag_file from ..internal.exceptions import SSAError from ..internal.utils import ( umask_0, set_privileges, is_xray_user_agent_active, xray_version, is_kernel_version_supported ) logger = logging.getLogger('autotracing.config') def is_edition_supported() -> bool: """Currently Auto tracing feature is not supported on Shared edition""" # using clcommon.lib.cledition.is_cl_shared_edition(skip_marker_check=True) # directly gives false-positive in case of missing JWT token file, e.g.: # >>> is_cl_shared_edition() # True # >>> is_cl_shared_edition(skip_marker_check=True) # False # >>> CLEditions.get_from_jwt() # Traceback (most recent call last): # File "<stdin>", line 1, in <module> # File "/opt/alt/python37/lib/python3.7/site-packages/clcommon/lib/cledition.py", line 69, in get_from_jwt # token = read_jwt(CLN_JWT_TOKEN_PATH) # File "/opt/alt/python37/lib/python3.7/site-packages/clcommon/lib/jwt_token.py", line 16, in read_jwt # with open(jwt_path, mode='rb') as f: # FileNotFoundError: [Errno 2] No such file or directory: '/etc/sysconfig/rhn/jwt.token' try: edition = CLEditions.get_cl_edition(skip_marker_check=True) logger.info('Current edition: %s', edition) except CLEditionDetectionError as e: logger.error('Unable to detect edition: %s', str(e.message)) return False if edition is None: # no jwt at all return False return edition != SupportedEditions.SHARED.value class Status(Enum): """ Autotracing statuses """ ENABLED = 'enabled' DISABLED = 'disabled' @dataclass class User: """ User container """ uid: int name: str home: str class AutotracingConfig(ConfigParser): """ Autotracing basic config parser """ main_section = 'conf' def check_config_dir(self) -> None: """ If subdirectory location for autotracing config file does not exist, create it """ subdir_path = os.path.dirname(self.config_file) if not os.path.exists(subdir_path): os.mkdir(subdir_path) def set_config_value(self, key: Any, value: Any) -> None: """ Set given config item 'key' to given value 'value' """ self[self.main_section][key] = value self.check_config_dir() with open(self.config_file, 'w') as configfile: self.write(configfile) def get_config_value(self, key: Any) -> Any: """ Set given config item 'key' to given value 'value' """ self.read(self.config_file) return self[self.main_section][key] def set_status(self, value: Any) -> None: """ Set given status """ self.set_config_value('status', value) def get_status(self) -> Any: """ Set given status """ return self.get_config_value('status') class AdminLevelConfig(AutotracingConfig): """Admin level autotracing config""" def __init__(self): defaults = { 'status': 'disabled' } self.config_file = '/usr/share/clos_ssa/autotracing' super().__init__(defaults, default_section=self.main_section, strict=False) class UserLevelConfig(AutotracingConfig): """User level autotracing config""" def __init__(self, configpath: str): defaults = { 'status': AdminLevelConfig().get_status() } self.config_file = f'{configpath}/.ssa/autotracing' super().__init__(defaults, default_section=self.main_section, strict=False) def who_am_i() -> User: """ Get current user and his details """ pw_entry = pwd.getpwuid(os.getuid()) return User(pw_entry.pw_uid, pw_entry.pw_name, pw_entry.pw_dir) def config_instance(user_home: str = None) -> Union[AdminLevelConfig, UserLevelConfig]: """ Initialize correct config file instance depending on context """ current_user = who_am_i() if current_user.uid == 0: # in Admin mode: globally or for particular user if user_home: # for a particular user conf_instance = UserLevelConfig(user_home) else: # globally conf_instance = AdminLevelConfig() else: # in User mode: user's config only if is_xray_user_agent_active(): conf_instance = UserLevelConfig(current_user.home) else: # if no X-Ray App available, do not allow manipulations raise SSAError( 'Auto tracing management is not available. Reason: X-Ray End-User plugin is not enabled, please contact your system administrator for help.') return conf_instance def set_config_value(value: str, user: str = None) -> None: """ """ if user: # try to modify user's config with dropping privileges try: pw_data = pwd.getpwnam(user) except KeyError as e: raise SSAError(f"User '{user}' not found") from e try: with set_privileges(target_uid=pw_data.pw_uid, target_gid=pw_data.pw_gid): config_instance(pw_data.pw_dir).set_status(value) except PermissionError as e: raise SSAError(e.strerror) from e else: with umask_0(0o022): # remove write for group config_instance().set_status(value) def enable(username: str = None, mode_all: bool = False) -> None: """ Enable autotracing. If username is given, the user's config is changed in Admin's mode. Perform some misconfiguration checks before enabling and do not enable if some of them appear """ try: misconfiguration_checks() except SSAError as e: issue = e.reason else: issue = None if mode_all and username is None: remove_custom_users_configs() set_config_value(Status.ENABLED.value, username) return issue def disable(username: str = None, mode_all: bool = False) -> None: """ Disable autotracing. If username is given, the user's config is changed in Admin's mode """ if mode_all and username is None: remove_custom_users_configs() set_config_value(Status.DISABLED.value, username) def status(username: str = None) -> Optional[Tuple[str, Optional[str]]]: """ Get status of autotracing. If username is given, the status for a particular user is returned """ try: misconfiguration_checks() except SSAError as e: issue = e.reason else: issue = None if username is not None: try: return UserLevelConfig( ClPwd().get_homedir(username)).get_status(), None except ClPwd.NoSuchUserException as e: raise SSAError(str(e)) from e return AdminLevelConfig().get_status(), issue def _panel_users() -> Tuple: """ Get panel users via cpapi, ignoring exceptions like NotSupported, etc. """ try: return cpusers() except CPAPIException: return tuple() def user_configs() -> Iterator[Tuple[str, UserLevelConfig]]: """ Iterator over all users on the server along with their autotracing configs """ for user in _panel_users(): try: _homedir = ClPwd().get_homedir(user) except ClPwd.NoSuchUserException: continue yield user, UserLevelConfig(_homedir) def disabled_users() -> List[str]: """Get list of disabled users""" return [username for username, userconf in user_configs() if userconf.get_status() == Status.DISABLED.value] def remove_custom_users_configs() -> None: """ Remove custom users configurations """ for user, user_config_path in user_configs(): pw_data = pwd.getpwnam(user) try: with set_privileges(target_uid=pw_data.pw_uid, target_gid=pw_data.pw_gid): # if config is actually exists if os.path.isfile(user_config_path.config_file): os.remove(user_config_path.config_file) os.rmdir(os.path.dirname(user_config_path.config_file)) except PermissionError as e: raise SSAError(e.strerror) from e def misconfiguration_checks() -> None: """Additional checks for known malfunctions""" _template = "%(reason)s. You should %(fix)s in order to get Auto Tracing work" # check for edition if not is_edition_supported(): raise SSAError(_template % {'reason': 'Your current CL edition is unsupported by Auto Tracing feature', 'fix': 'use either CL Shared Pro or CL Solo edition'}, flag='warning') # check of IO throttling availability if not is_kernel_version_supported(): raise SSAError(_template % { 'reason': 'Your kernel does not support throttling detection', 'fix': 'update the kernel'}, flag='warning') # check of X-Ray is installed if xray_version() is None: raise SSAError(_template % {'reason': 'X-Ray is not installed', 'fix': 'install X-Ray'}, flag='warning') # check of SSA is enabled if not os.path.isfile(flag_file): raise SSAError( _template % {'reason': 'Slow Site Analyzer is disabled', 'fix': 'enable it'}, flag='warning')