D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
alt
/
python37
/
lib
/
python3.7
/
site-packages
/
lvestats
/
lib
/
info
/
Filename :
cloudlinux_top.py
back
Copy
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import absolute_import import json import logging import time from distutils.util import strtobool from clcommon.lib import MySQLGovException, MySQLGovernor, GovernorStatus from clcommon import cpapi, FormattedException from clcommon.cpapi import NotSupported from lvestats.lib.commons.logsetup import setup_logging from typing import Callable # pylint: disable=unused-import import lvestats.lib.commons.decorators from lvestats.lib import dbengine from lvestats.lib.commons import func from lvestats.lib.commons import server_status from lvestats.lib.commons.users_manager import g_usersManager from lvestats.lib.commons.func import ( get_users_for_reseller, skip_user_by_maxuid ) from clcommon.cpapi.pluginlib import getuser from lvestats.lib.parsers.cloudlinux_top_argparse import cloudlinux_top_parser from lvestats.lib.uidconverter import uid_to_username from lvestats.lib.config import HIDE_MAX_UID_LVE_PARAMETER NOT_AVAILABLE = "N/A" class CloudLinuxTop(object): def __init__(self, cnf, engine=None): self._log = setup_logging( config=cnf, caller_name="CloudLinuxTop", file_level=logging.WARNING, console_level=logging.FATAL) self.fname = '/var/lve/cloudlinux_top.json' self.fname_db = '/var/lve/cloudlinux_dbtop.json' self.touch_fname = '/var/lve/governor.ts' self.server_id = cnf.get('server_id', 'localhost') self.hide_maxuid_lve = strtobool(cnf.get(HIDE_MAX_UID_LVE_PARAMETER, 'true')) self.engine_error = None if engine is not None: self.engine = engine else: try: self.engine = dbengine.make_db_engine(cnf) except dbengine.MakeDbException as e: self.engine_error = e self.mysql_governor = MySQLGovernor() self.governor_status, self.governor_error = \ self.mysql_governor.get_governor_status() self.governor_mode = func.get_governor_mode(self.mysql_governor) self._admins = self._get_admins() if getuser() == 'root' else set() @staticmethod def _get_admins(): try: return cpapi.admins() except (NotSupported, AttributeError): return set() def _get_username_by_id(self, uid): return uid_to_username( uid=uid, local_server_id=self.server_id, server_id=self.server_id, db_engine=self.engine ) def _get_order_key(self, order, cl_data): # type: (str, dict) -> Callable[dict] """returns key function for sorted""" def mysql_key(value): def key(k): username = self._get_username_by_id(k["id"]) if username not in cl_data["dbgov_data"]: return 0 else: dbgov_stats = cl_data["dbgov_data"][username] return dbgov_stats[value] return key order_key_map = { "cpu": lambda k: k["usage"]["cpu"]["all"], "io": lambda k: k["usage"]["io"]["all"], "mysql_cpu": mysql_key("cpu"), "mysql_io": mysql_key("io"), "iops": lambda k: k["usage"].get("iops", 0), "ep": lambda k: k["usage"]["ep"], "nproc": lambda k: k["usage"].get("pno", 0), "pmem": lambda k: k["usage"].get("mem", 0), } return order_key_map[order] def _load_from_json_file(self, fname, default=None): """ Try load and parse json data from file; return <default> if loading or parsing wrong :param fname: path to parsed file :param None|dict default: default data if loaded file is wrong :return None| dict: return loaded data """ try: with open(fname) as json_file: cl_data = json.load(json_file) except (IOError, OSError): cl_data = default return cl_data def _touch_dbtop(self): """ Trying touch the file to signal new data """ try: func.touch(self.touch_fname) except (IOError, OSError): self._log.error("An error occurred while touching file %s. " "Database statistics may be not available", self.touch_fname) def _load_data(self): """ Load and join data from separate files :return dict: loaded data """ cl_data = self._load_from_json_file(self.fname, {"users": [], "resellers": []}) cl_data.update(self._load_from_json_file(self.fname_db, {"dbgov_data": {}})) return cl_data def _get_data(self, username_filter=None, domain_filter=None, limit=None, show_mysql=True, order="cpu", reseller=None): """ :type order: str :type show_mysql: bool :type limit: int|None :type domain_filter: str|None :type username_filter: str|None :param None|str reseller: get information about given reseller :rtype: tuple[list, list] """ users_result = [] resellers_result = [] users_list = [] current_user_name = getuser() current_user_is_admin = current_user_name == 'root' if current_user_is_admin or reseller: self._touch_dbtop() # touch file as signal to save dbgov data to file g_usersManager.build_users_cache(reseller) if reseller: users_list = get_users_for_reseller(reseller or getuser()) # show reseller information too else: users_list = [current_user_name] cl_data = self._load_data() cl_users_data_ordered = sorted(cl_data["users"], key=self._get_order_key(order, cl_data), reverse=True) for user in cl_users_data_ordered: # skip users with maxuid lve ids due to some client`s complaints (details: LVES-929) if self.hide_maxuid_lve and skip_user_by_maxuid(user['id']): continue username = self._get_username_by_id(user["id"]) or NOT_AVAILABLE # users must not be skipped if current user is root and when --for-reseller is not passed should_skip_users = not current_user_is_admin or reseller is not None if should_skip_users and username not in users_list or username in self._admins: continue if username == NOT_AVAILABLE: user["domain"] = NOT_AVAILABLE user["reseller"] = NOT_AVAILABLE else: user["reseller"] = g_usersManager.get_reseller(username, raise_exc=False) or NOT_AVAILABLE user["domain"] = g_usersManager.get_domain(username, raise_exc=bool(domain_filter)) or NOT_AVAILABLE if (domain_filter and (user["domain"] == NOT_AVAILABLE or domain_filter not in user["domain"])) or \ (username_filter and (username == NOT_AVAILABLE or username_filter not in username)): continue user["username"] = username if show_mysql and self.governor_status == GovernorStatus.ENABLED: self._add_mysql_data(user, cl_data, username) users_result.append(user) if limit is not None and 0 < limit <= len(users_result): break cl_resellers_data_ordered = \ sorted(cl_data.get("resellers", []), key=self._get_order_key(order, cl_data), reverse=True) if current_user_is_admin: for reseller_info in cl_resellers_data_ordered: username = self._get_username_by_id(reseller_info['id']) # get only reseller that we need (if needed) if reseller and username != (reseller or getuser()): continue if username is None: username = NOT_AVAILABLE reseller_info["name"] = username resellers_result.append(reseller_info) return users_result, resellers_result def _add_mysql_data(self, user, cl_data, username): """ add mysql statistics to the user dict :type cl_data: dict :type user: dict :type username: str """ dbgov_stats = cl_data["dbgov_data"].get(username, {"cpu": 0, "io": 0, "cause_of_restrict": "-", "time_of_restrict": 0}) if self.governor_mode != "all": try: mysql_limits = self.mysql_governor.get_limits_by_user(username) except MySQLGovException as e: self.governor_status = GovernorStatus.ERROR self.governor_error = e else: user["limit"]["cpu"]["mysql"] = mysql_limits[0] user["limit"]["io"]["mysql"] = mysql_limits[1] * 1024 # in bytes user["usage"]["cpu"]["mysql"] = dbgov_stats["cpu"] user["usage"]["io"]["mysql"] = dbgov_stats["io"] else: user["limit"]["cpu"]["mysql"] = "-" user["limit"]["io"]["mysql"] = "-" user["usage"]["cpu"]["mysql"] = min(dbgov_stats["cpu"], user["usage"]["cpu"]["all"]) user["usage"]["io"]["mysql"] = min(dbgov_stats["io"], user["usage"]["io"]["all"]) if dbgov_stats["cause_of_restrict"] != "-": user["restricted"] = True user["restriction"] = { "time": dbgov_stats["time_of_restrict"], "reason": "Violated the {0} limit".format(dbgov_stats["cause_of_restrict"])} else: user["restricted"] = False @staticmethod def to_json(value): """ :type value: dict :rtype: str """ return json.dumps(value, indent=4, sort_keys=True) @lvestats.lib.commons.decorators.no_sigpipe def main(self, *args): """ :rtype: (str, int) """ parser = cloudlinux_top_parser() result = {"timestamp": time.time()} try: namespace = parser.parse_args(args) if not namespace.json: return "Use --json option, other modes currently unsupported", 1 except ValueError as e: result["result"] = e.args[0] result["context"] = {} return self.to_json(result), 0 if getuser() != 'root' and namespace.for_reseller is not None and namespace.for_reseller != getuser(): error_msg = 'Permission denied. Reseller can view ' \ 'information only about himself. ' \ 'Option --for-reseller={} is forbidden.'.format(namespace.for_reseller) result['result'] = error_msg result["context"] = {} return self.to_json(result), 1 lvestats_status = server_status.check_server_status() if lvestats_status != server_status.RUNNING: result["result"] = server_status.status_description[lvestats_status] elif self.engine_error is not None: result["result"] = self.engine_error.message # pylint: disable=exception-message-attribute result["context"] = self.engine_error.context else: try: users, resellers = self._get_data( username_filter=namespace.username, domain_filter=namespace.domain, limit=namespace.max, show_mysql=not namespace.hide_mysql, order=namespace.order_by, reseller=namespace.for_reseller) except IOError as e: result["result"] = str(e) result["context"] = {} except FormattedException as e: result["result"] = e.message result["context"] = e.context else: result.update({ "result": "success", "users": users, "resellers": resellers, "mySqlGov": self.governor_status, "mySqlGovMode": self.governor_mode}) # broken governor is a non-fatal problem # and lve statistics should works even in that case # so let's threat "error" status as "warning" # and show "warning" message in web ui # anyway, at the moment lvemanager ignores "errors" field if self.governor_status == GovernorStatus.ERROR: result["warning"] = self.governor_error.message # pylint: disable=exception-message-attribute try: result["context"] = self.governor_error.context except AttributeError: result["context"] = {} return self.to_json(result), 0