D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
alt
/
python37
/
lib
/
python3.7
/
site-packages
/
clwpos
/
Filename :
papi.py
back
Copy
""" This module is the only one that is allowed to be imported from other tools. Please, keep list of "external" methods here, so we can keep them backwards-compatible. The purpose is to reduce number of places in code which randomly are dependencies of other tools (e.g. x-ray). Don't user this module in clwpos code. Maybe one day we will make proper api :) """ import json import logging import pwd import subprocess from typing import List, Optional from typing_extensions import TypedDict from clwpos.billing import get_or_create_unique_identifier from clwpos.cl_wpos_exceptions import WposError from clwpos.constants import SMART_ADVISE_USER_UTILITY from clwpos.daemon import WposDaemon from clwpos.data_collector_utils import php_info as _php_info from clwpos.feature_suites import PremiumSuite, CDNSuitePro from clwpos.feature_suites.configurations import get_visible_modules, get_allowed_modules, get_allowed_features_dict from clwpos.optimization_features import Feature, OBJECT_CACHE_FEATURE, CDN_FEATURE, convert_feature_list_to_interface from clwpos.user.config import LicenseApproveStatus, UserConfig from clwpos.utils import ( is_wpos_supported as _is_wpos_supported, PHP, daemon_communicate, drop_permissions_if_needed, get_subscription_status, run_in_cagefs_if_needed ) class HostInfo(TypedDict): vhost: str account: str version: str handler: str documentroot: str def php_get_vhost_versions(user: str) -> List[HostInfo]: """ Get information about vhosts in the following format: [ { 'vhost': 'username.zone', 'account': 'username', 'version': 'ea-php80', 'handler': 'php-fpm', 'documentroot': '/home/username/public_html' } ] """ return WposDaemon._php_get_vhost_versions(user) class HostInfoExtended(HostInfo): php_binary: str def php_get_vhost_versions_user() -> List[HostInfoExtended]: """ Get information about vhosts in the following format: [ { 'vhost': 'username.zone', 'account': 'username', 'version': 'ea-php80', 'php_binary': '/opt/php/php', 'handler': 'php-fpm', 'documentroot': '/home/username/public_html' } ] Unlike the php_get_vhost_versions, it automatically detects current user and also performs php version change inside the cagefs, providing additional information about php binary. """ result = _php_info() for elem in result: elem['php_binary'] = PHP(elem["version"]).bin() return result def is_wpos_supported() -> bool: """ Determines whether wpos is supported in current environment """ return _is_wpos_supported() def is_feature_visible(feature: str, username: str) -> Optional[bool]: """ Determines whether feature is visible for username. Return False in case if feature is unknown. Return None in case if user is missing. """ try: pw = pwd.getpwnam(username) except KeyError: return None return Feature(feature).NAME in ( item.NAME for item in get_visible_modules(uid=pw.pw_uid) ) def is_feature_allowed(feature: str, username: str) -> Optional[bool]: """ Determines whether feature is allowed for username to be activated. Return False in case if feature is unknown. Return None in case if user is missing. """ try: pw = pwd.getpwnam(username) except KeyError: return None return Feature(feature).NAME in ( item.NAME for item in get_allowed_modules(uid=pw.pw_uid) ) def is_subscription_pending(feature: str, username: str) -> Optional[bool]: """ Determines whether feature is allowed for username to be activated. """ try: pw = pwd.getpwnam(username) except KeyError: return None try: is_pending = daemon_communicate({ "command": WposDaemon.DAEMON_GET_UPGRADE_ATTEMPT_STATUS, "feature": feature, "uid": pw.pw_uid })["pending"] except WposError: return False return is_pending def get_subscription_upgrade_url(feature: str, username: str) -> Optional[str]: """ Determines whether feature is allowed for username to be activated. """ try: pw = pwd.getpwnam(username) except KeyError: return None try: upgrade_url = daemon_communicate({ "command": WposDaemon.DAEMON_GET_UPGRADE_LINK_COMMAND, "uid": pw.pw_uid, "feature": feature })["upgrade_url"] except WposError: return None return upgrade_url def _get_cdn_usage_statistics(username): """Wrapper for easy mocking""" with drop_permissions_if_needed(username): get_usage_command = [SMART_ADVISE_USER_UTILITY, 'get-cdn-usage'] output = run_in_cagefs_if_needed(get_usage_command, check=True).stdout try: return json.loads(output)['data'] except KeyError: raise subprocess.CalledProcessError( returncode=0, cmd=get_usage_command, output=output) def get_subscriptions_info(username): subscriptions = {} try: uid = pwd.getpwnam(username).pw_uid except Exception: logging.error('Cannot get uid for user: %s, subscription info will be empty', username) return subscriptions allowed_features_dict = get_allowed_features_dict(uid) converted_allowed_features = {feature_set: convert_feature_list_to_interface(features) for feature_set, features in allowed_features_dict.items()} for feature, suite_name in ( (OBJECT_CACHE_FEATURE.NAME, PremiumSuite.name), (CDN_FEATURE.NAME, CDNSuitePro.name) ): subscriptions[feature.lower()] = subscription = dict( status=get_subscription_status( converted_allowed_features, suite_name, feature.lower()) ) if feature == CDN_FEATURE.NAME: try: info = _get_cdn_usage_statistics(username) except Exception as e: logging.warning("Error during obtaining usage, error: %s", str(e)) usage = None else: usage = calculate_cdn_usage(info) subscription['usage'] = usage return subscriptions def calculate_cdn_usage(info): warning = f"Content Delivery Network: You have reached your " \ f"{info['limit_bytes'] // 1024 ** 3}GB limit. " \ f"Please upgrade your subscription." \ f"" if info['total_bytes_used'] >= info['limit_bytes'] else None bytes_used = info['total_bytes_used'] bytes_limit = info['limit_bytes'] # no usage ---> no warning if bytes_used == 0 or bytes_limit == 0: warning = None usage = { "bandwidth": { "usage": bytes_used, "limit": bytes_limit }, "warning": warning } return usage def get_user_auth_key(username: str): """ Reads configuration and gets identifier of user that he can use to auth on provisioning server """ return get_or_create_unique_identifier(username) def get_license_approve_status(feature: str, username: str) -> LicenseApproveStatus: """ Provides easy way for 3rd party tools to get information about license approve status """ with drop_permissions_if_needed(username): uc = UserConfig(username) feature = Feature(feature) return uc.get_license_approve_status(feature) def approve_license_agreement(feature: str, username: str): """ Provides easy way for 3rd party tools to approve license """ with drop_permissions_if_needed(username): uc = UserConfig(username) feature = Feature(feature) uc.approve_license_agreement(feature) def get_license_agreement_text(feature: str, username: str): """ Returns text of the license agreement for the given feature or None if feature is not required to approve agreement """ with drop_permissions_if_needed(username): feature = Feature(feature) if not feature.HAS_LICENSE_TERMS: return None return open(feature.LICENSE_TERMS_PATH).read()