D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
alt
/
python37
/
lib
/
python3.7
/
site-packages
/
lvestats
/
plugins
/
generic
/
Filename :
dbsaver_x60.py
back
Copy
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import print_function from __future__ import absolute_import from __future__ import division from builtins import range import sys import logging import time from lvestats.core.plugin import LveStatsPlugin, LveStatsPluginTerminated from sqlalchemy import insert from lvestats.orm.history import history, history_x60 from sqlalchemy.sql.expression import func from sqlalchemy.sql import select from sqlalchemy.exc import OperationalError from lvestats.lib.commons.func import get_chunks, reboot_lock from lvestats.lib.lveinfolib import HistoryShow, FIELD_TO_TABLE_COLUMN from lvestats.lib.commons.progress import ProgressCallback from lvestats.lib import dbengine AGGREGATE_PERIOD = 60*60 # one hour class DBSaverX60(LveStatsPlugin): FIELDS = [ 'ID', 'aCPU', 'lCPU', 'CPUf', 'aEP', 'lEP', 'EPf', 'aVMem', 'lVMem', 'VMemF', 'aPMem', 'lPMem', 'PMemF', 'aNproc', 'lNproc', 'NprocF', 'aIO', 'lIO', 'IOf', 'IOPSf', 'lIOPS', 'aIOPS' ] def __init__(self): self.log = logging.getLogger('plugin.DBSaverX60') self.now = 0 # This changes in MainLoop self.config = None self.period = AGGREGATE_PERIOD # once an hour self.server_id = 'localhost' self.verbose_ = False self.execute_timeout = 20 self.progress = ProgressCallback(time_change_step=0.1) def set_config(self, _config): self.config = _config def aggregate_db_data_by_hours(self, utc_from, utc_to): if self.verbose_: self.progress(0) utc_from_ = int(utc_from) // int(AGGREGATE_PERIOD)*int(AGGREGATE_PERIOD) if utc_from_ != utc_from: utc_from_ += AGGREGATE_PERIOD if utc_to - utc_from_ > AGGREGATE_PERIOD: self.log.debug("Data aggregation from %s to %s started.", utc_from_, utc_to) for aggr_cycle, hour_from in enumerate(range(utc_from_, int(utc_to), AGGREGATE_PERIOD), 1): hour_to = hour_from + AGGREGATE_PERIOD if hour_to > utc_to: break history_show = HistoryShow( dbengine=self.engine, period_from=hour_from + 1, # "+1" for exclude "hour_from" timestamp period_to=hour_to, show_columns=self.FIELDS, server_id=self.server_id, log=self.log) rows = history_show.proceed() mass_insert_values = list() for row in rows: one_insert_values = { 'server_id': self.server_id, 'time': history_show.time_count, 'created': hour_to} one_insert_values.update( {FIELD_TO_TABLE_COLUMN[field_]: row[indx_] for indx_, field_ in enumerate(self.FIELDS)}) # correct cpu one_insert_values['cpu'] *= 100 one_insert_values['cpu_limit'] *= 100 mass_insert_values.append(one_insert_values) with reboot_lock(): conn_ = self.engine.connect() trans = conn_.begin() try: if mass_insert_values: for chunk in get_chunks(mass_insert_values): conn_.execute(insert(history_x60), chunk) else: conn_.execute(insert(history_x60), {'server_id': self.server_id, 'created': hour_to, 'id': 0}) if self.verbose_: # show progress progress_ = 100 - (utc_to-hour_from)*100 // (utc_to-utc_from_) self.progress(progress_) except OperationalError as oe: trans.rollback() self.log.error('Can not write aggregated data to database: %s', str(oe)) if self.verbose_: print(' Warning: {}'.format(str(oe)[:255] + '...')) except LveStatsPluginTerminated: trans.rollback() conn_.close() self.log.debug("Plugin is terminated.") raise LveStatsPluginTerminated() except TypeError as oe: trans.rollback() conn_.close() msg = str(oe) # try fix incorrect database records after migration if "*=: 'NoneType' and 'int'" not in msg: raise self.log.warning(msg) from lvestats.lib.dbengine import fix_db fix_db(self.engine, self.config, from_timestmp=hour_from, to_timestamp=hour_to, log_=self.log) return else: trans.commit() conn_.close() work_time = time.time() - self.now # calculate plugin working time if (self.execute_timeout > 0 and self.execute_timeout - work_time < work_time/aggr_cycle + 1): # continue if can't finish in time self.log.debug('Data was aggregated from %s to %s in %s seconds', utc_from_, hour_to, work_time) return self.log.debug("Data aggregation done.") else: self.log.debug("Nothing to aggregate.") if self.verbose_: self.progress(100, force=True) self.progress.stop() def execute(self, lve_data): self.log.debug("Plugin executed") self.server_id = self.config.get('server_id', 'localhost') dt_x1_max, dt_x1_min, dt_x60_max, dt_x60_min = self.get_max_min_ts() self.aggregate_new(dt_x1_max, dt_x1_min, dt_x60_max) self.aggregate_migrated(dt_x1_min, dt_x60_min) def aggregate_migrated(self, dt_x1_min, dt_x60_min): self.log.debug("Aggregating migrated") if dt_x1_min is not None and dt_x60_min is not None and dt_x1_min + AGGREGATE_PERIOD < dt_x60_min: self.aggregate_db_data_by_hours(dt_x60_min - AGGREGATE_PERIOD, dt_x60_min) def aggregate_new(self, dt_x1_max, dt_x1_min, dt_x60_max): if dt_x1_max: if not dt_x60_max: dt_x60_max = dt_x1_min # from minimum timestamp if self.verbose_: print('[lve-stats]: Start aggregating data from "{}" to "{}"; to skip press Ctrl+C ...'.format( dt_x60_max, dt_x1_max)) # correct execute period if dt_x1_max - dt_x60_max > AGGREGATE_PERIOD * 2: # run plugin often if need aggregate more than one hour self.period = 60 # FIXME: This looks like mistake? else: self.period = AGGREGATE_PERIOD self.aggregate_db_data_by_hours(dt_x60_max, dt_x1_max) def get_max_min_ts(self): conn_ = self.engine.connect() try: # get last created query = select( [func.max(history.created), func.min(history.created)]).where( self.server_id == history.server_id) time_start = time.time() row = conn_.execute(query) dt_x1_max, dt_x1_min = row.fetchone() self.log.debug( '%s; query time: %s' % (str(query.compile(compile_kwargs={"literal_binds": True})).replace('\n', ' '), time.time() - time_start)) # get last created_max query = select( [func.max(history_x60.created), func.min(history_x60.created)]).where( self.server_id == history_x60.server_id) row = conn_.execute(query) dt_x60_max, dt_x60_min = row.fetchone() self.log.debug( '%s; query time: %s' % (str(query.compile(compile_kwargs={"literal_binds": True})).replace('\n', ' '), time.time() - time_start)) except LveStatsPluginTerminated: conn_.close() self.log.debug("Plugin is terminated.") raise LveStatsPluginTerminated() else: conn_.close() return dt_x1_max, dt_x1_min, dt_x60_max, dt_x60_min def main(argv_, config_, db_engine=None): lve_data = dict() verbose_ = '--verbose' in argv_ if db_engine is None: db_engine = dbengine.make_db_engine(config_, debug=False) if 'init' in argv_: if verbose_: print('[lve-stats]: creating table "{}"...'.format(history_x60.__tablename__)) with db_engine.begin() as conn_: history_x60.__table__.create(bind=conn_, checkfirst=True) # create table if exist if 'aggregate' in argv_: # configure plugin plugin_instance = DBSaverX60() plugin_instance.config = config_ # use specific config plugin_instance.set_db_engine(db_engine) plugin_instance.execute_timeout = 2*60*60 # two hours plugin_instance.verbose_ = verbose_ plugin_instance.now = time.time() plugin_instance.execute(lve_data) # generate aggregate data # for initial from command line # create table if exist and generate aggregate data if __name__ == '__main__' and len(sys.argv) >= 2: import lvestats.lib.config as config try: cnf = config.read_plugin_config(config.read_config(), plugin=DBSaverX60.__name__) main(argv_=sys.argv[1:], config_=cnf) except KeyboardInterrupt: print('Stopped') except dbengine.MakeDbException as e: print(str(e)) except config.ConfigError as ce: ce.log_and_exit()