D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
alt
/
python37
/
lib
/
python3.7
/
site-packages
/
lvestats
/
plugins
/
other
/
Filename :
v1_db_migrator.py
back
Copy
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import absolute_import from builtins import range import logging import time import os import sqlalchemy from sqlalchemy import insert, func from sqlalchemy import Column, Integer, String, Float from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import SQLAlchemyError, DatabaseError from datetime import datetime, timedelta from lvestats.lib.commons.func import get_chunks from lvestats.core.plugin import LveStatsPlugin, LveStatsPluginTerminated from lvestats.orm.history_gov import history_gov from lvestats.orm.history import history, history_x60 from lvestats.lib.commons.dateutil import gm_datetime_to_unixtimestamp from lvestats.lib import uidconverter from lvestats.lib.dbengine import fix_lost_keep_alive, validate_database STATE_FILE = '/var/lve/v1_migration_last.ts' V2_KEYS = ['id', 'mem', 'mem_limit', 'mem_fault', 'memphy', 'lmemphy', 'memphy_fault', 'mep', 'mep_limit', 'mep_fault', 'nproc', 'lnproc', 'nproc_fault', 'iops', 'liops'] V2_GOV_KEYS = ['username', 'sum_cpu', 'sum_write', 'sum_read', 'limit_cpu_on_period_end', 'limit_read_on_period_end', 'limit_write_on_period_end', 'cause_of_restrict'] V1Base = declarative_base() class V1HistoryGov(V1Base): """ Mapping out v1 gov history table """ __tablename__ = 'history_gov' ts = Column('ts', Integer, primary_key=True) username = Column('username', String(64), primary_key=True) sum_cpu = Column('sum_cpu', Float) sum_write = Column('sum_write', Float) sum_read = Column('sum_read', Float) limit_cpu_on_period_end = Column('limit_cpu_on_period_end', Integer) limit_read_on_period_end = Column('limit_read_on_period_end', Integer) limit_write_on_period_end = Column('limit_write_on_period_end', Integer) cause_of_restrict = Column('cause_of_restrict', Integer) server_id = Column('server_id', String(10), primary_key=True) weight = Column('weight', Integer) class V1History(V1Base): """ Mapping out v1 history table """ __tablename__ = 'history' id = Column('id', Integer, primary_key=True) cpu = Column('cpu', Integer) cpu_limit = Column('cpu_limit', Integer) cpu_max = Column('cpu_max', Integer) ncpu = Column('ncpu', Integer) mep = Column('mep', Integer) mep_limit = Column('mep_limit', Integer) mep_max = Column('mep_max', Integer) io = Column('io', Integer) io_max = Column('io_max', Integer) io_limit = Column('io_limit', Integer) mem = Column('mem', Integer) mem_limit = Column('mem_limit', Integer) mem_max = Column('mem_max', Integer) mem_fault = Column('mem_fault', Integer) mep_fault = Column('mep_fault', Integer) created = Column('created', sqlalchemy.types.DateTime, primary_key=True) weight = Column('weight', Integer) server_id = Column('server_id', String(10)) lmemphy = Column('lmemphy', Integer) memphy = Column('memphy', Integer) memphy_max = Column('memphy_max', Integer) memphy_fault = Column('memphy_fault', Integer) lnproc = Column('lnproc', Integer) nproc = Column('nproc', Integer) nproc_max = Column('nproc_max', Integer) nproc_fault = Column('nproc_fault', Integer) iops = Column('iops', Integer) iops_max = Column('iops_max', Integer) liops = Column('liops', Integer) class V1TimeInterval(object): """ The way it would work - on first run, the /var/lve/v1_migration_last.ts will be non-existant, and we will use latest timestamp from V1 db as the 'starting point' After that on each call of get_data we will use that 'starting point' to get_period from start point to 1 hour before. As soon as our start point is > 30 days old -- we will return as part of get_period third parameter true which means that ok, the rest of data is too old, lets move on. V1DBMigrator will convert data for that period, and then will call save_state(from) -- this will be new starting point for the next plugin run. We will store it in a property (last_ts), and save it to the file. So, that even if software restarted, we don't just ignore it. """ def __init__(self, v1session, ts_file=STATE_FILE, server_id='localhost'): self.ts_file = ts_file self.server_id = server_id self.last_ts = None self.last_uid = -1 self.v1session = v1session self.read_state() def save_ts_to_file(self, ts, uid=None): with open(self.ts_file, 'w') as f: f.write(ts.strftime(self.get_ts_format())) self.last_ts = ts if uid is not None: f.write('\n' + str(uid)) self.last_uid = uid or -1 f.close() @staticmethod def get_ts_format(): return "%Y-%m-%d %H:%M:%S.%f" def save_timestamp(self, ts): self._save_state(ts) def save_uid(self, uid=None): self._save_state(self.last_ts, uid) def _save_state(self, ts, uid=None): try: self.save_ts_to_file(ts, uid) except IOError as e: logging.getLogger('plugin.V1DBMigrator.TimeInterval').error("Unable to save v1 migration TS %s", str(e)) def _read_state(self): ts = None try: with open(self.ts_file, 'r') as f: ts = datetime.strptime(f.readline().rstrip(), self.get_ts_format()) uid = int(f.readline().rstrip() or -1) return ts, uid except IOError: return ts, -1 except ValueError as e: logging.getLogger('plugin.V1DBMigrator.TimeInterval').warning( "Unable to read %s (%s)" % (self.ts_file, str(e))) return ts, -1 def read_state(self): self.last_ts, self.last_uid = self._read_state() if self.last_ts is None: res = self.v1session.query(func.max(V1History.created)).filter( V1History.server_id == self.server_id).first() # set very old datetime if no rows in database last_ts_from_db = res[0] or datetime(1, 1, 1) self.last_ts = last_ts_from_db + timedelta(microseconds=1) def _to_ts(self): self.read_state() return self.last_ts - timedelta(microseconds=1) def is_too_old(self): return datetime.now() - timedelta(days=30) > self._to_ts() def get_uid(self): self.read_state() return self.last_uid def convert_username_to_uid(self, username): pass def _get_history_gov_users(self): from_ts, to_ts = self.get_period() from_ts_ = gm_datetime_to_unixtimestamp(from_ts) to_ts_ = gm_datetime_to_unixtimestamp(to_ts) usernames_ = self.v1session.query(V1HistoryGov).filter( V1HistoryGov.ts.between(from_ts_, to_ts_), V1HistoryGov.server_id == self.server_id ).distinct(V1HistoryGov.username).group_by(V1HistoryGov.username) return [item.username for item in usernames_] def _get_history_uids(self): from_ts, to_ts = self.get_period() uids_ = self.v1session.query(V1History).filter( V1History.created.between(from_ts, to_ts), V1History.server_id == self.server_id, V1History.id > self.last_uid ).distinct(V1History.id).group_by(V1History.id) return [item.id for item in uids_] def get_uids(self): uids_list = self._get_history_uids() for username in self._get_history_gov_users(): uid = self.convert_username_to_uid(username) if uid is not None and uid > self.last_uid and uid not in uids_list: uids_list.append(uid) return sorted(uids_list) def get_period(self): """ We want to go 1 hour at a time, up to 1 month back, starting from now """ to_ts = self._to_ts() from_ts = self.last_ts - timedelta(hours=1) return from_ts, to_ts class Break(Exception): pass class V1DBMigrator(LveStatsPlugin): PLUGIN_LOCATION = '/usr/share/lve-stats/plugins/v1_db_migrator.py' timeout = 18 # change default timeout is_done = False period = 60 # every minute order = 9500 # We pretty much want to be last one standing v1_connect_string = None V1Session = None # We will need it to create session on each execution timeInterval = None debug = True skip_on_error = True # What if we cannot save data for some reason, if True, skip it v2_server_id = 'localhost' v1_server_id = 'localhost' def __init__(self): self.log = logging.getLogger('plugin.V1DBMigrator') self._username_to_uid_cache = dict() self._no_such_uid_cache = list() self._procs = 1 self.now = 0 # This changes in MainLoop self.log.info("V1 Migration Started") self._time_commit = self.timeout*0.5 # time limit for stopping plugin self.control_time = True self._conn = None self._database_does_not_exist = False def set_config(self, config): self.v1_server_id = config.get('v1_server_id', 'localhost') self.v2_server_id = config.get('server_id', 'localhost') self.v1_connect_string = config.get('v1_connect_string') self.debug = config.get('debug', 'F').lower() in ('t', 'y', 'true', 'yes', 1) self.init_v1_db() def init_v1_db(self, ts=STATE_FILE): if self.v1_connect_string is None: self._database_does_not_exist = True return # check present sqlite database sqlite = 'sqlite:///' if self.v1_connect_string.startswith(sqlite) and not os.path.exists(self.v1_connect_string[len(sqlite):]): self.log.warning('Database "%s" does not exist' % self.v1_connect_string) self._database_does_not_exist = True return # create database engine try: v1_db_engine = sqlalchemy.engine.create_engine(self.v1_connect_string, echo=self.debug) except SQLAlchemyError as e: self.log.warning(str(e)) self._database_does_not_exist = True return # check present history table if not v1_db_engine.dialect.has_table(v1_db_engine, V1History.__tablename__): self.log.warning('Table "%s" in database "%s" does not exist' % (V1History.__tablename__, self.v1_connect_string)) self._database_does_not_exist = True return result = validate_database(v1_db_engine, hide_logging=True, base=V1Base) if result['column_error'] or result['table_error']: self.log.warning('V1 database malformed, migration skipped.') self._database_does_not_exist = True return self.V1Session = sessionmaker(bind=v1_db_engine) self.timeInterval = V1TimeInterval(self.get_v1_session(), ts, self.v1_server_id) self.timeInterval.convert_username_to_uid = self.convert_username_to_uid def get_v1_session(self): return self.V1Session() def execute(self, lve_data): self._procs = lve_data.get('procs', 1) if self.is_done: # all data had been migrated return if self._database_does_not_exist or self.timeInterval.is_too_old(): self.log.warning("V1 Migration Done") self.cleanup() self.fix_lost_keep_alive_records() else: self.convert_all() def fix_lost_keep_alive_records(self): session = sessionmaker(bind=self.engine)() fix_lost_keep_alive(session, server_id=self.v2_server_id, log_=self.log) session.close() def cleanup(self): """ There is not much to do on clean up. Lets just set flag done = True, and remove plugin so that on next restart it would't be running any more :return: """ self.is_done = True try: os.remove(V1DBMigrator.PLUGIN_LOCATION) # remove compiled python code os.remove(V1DBMigrator.PLUGIN_LOCATION + 'c') except (IOError, OSError) as e: self.log.error("Unable to remove %s: %s", V1DBMigrator.PLUGIN_LOCATION, str(e)) session = sessionmaker(bind=self.engine)() try: session.query(history_x60).filter(history_x60.server_id == self.v2_server_id).delete() session.commit() except SQLAlchemyError: session.rollback() def get_v1_gov_data(self, from_ts, to_ts, username): from_ts_ = gm_datetime_to_unixtimestamp(from_ts) to_ts_ = gm_datetime_to_unixtimestamp(to_ts) return self.get_v1_session().query(V1HistoryGov).filter( V1HistoryGov.ts.between(from_ts_, to_ts_), V1HistoryGov.username == username, V1HistoryGov.server_id == self.v1_server_id).all() def get_v1_data(self, from_ts, to_ts, uid): return self.get_v1_session().query(V1History).filter( V1History.created.between(from_ts, to_ts), V1History.server_id == self.v1_server_id, V1History.id == uid).order_by(V1History.id).all() def _convert_data(self, from_ts, to_ts, uid, trans): username = self.convert_uid_to_username(uid) try: v2_rows_insert_list = [] for row in self.get_v1_data(from_ts, to_ts, uid): v2_rows = self.convert_row(row, self._procs) v2_rows_insert_list.extend(v2_rows) if v2_rows_insert_list: for chunk in get_chunks(v2_rows_insert_list): self._conn.execute(insert(history), chunk) v2_gov_rows_insert_list = [] if username and username != 'root': # ignore uid 0 (root) for row in self.get_v1_gov_data(from_ts, to_ts, username): v2_gov_rows = self.convert_gov_row(row) v2_gov_rows_insert_list.extend(v2_gov_rows) if v2_gov_rows_insert_list: for chunk in get_chunks(v2_gov_rows_insert_list): self._conn.execute(insert(history_gov), chunk) except (SQLAlchemyError, DatabaseError) as e: trans.rollback() self.log.warning('Can not save data to database: %s', str(e)) if not self.skip_on_error: raise e except LveStatsPluginTerminated: trans.commit() self.log.debug("Plugin is terminated.") raise Break() def _work_time(self): return time.time() - self.now # calculate plugin working time def _need_break(self): return self.timeout - self._work_time() < self._time_commit*1.2 def convert_data(self, from_ts, to_ts): self.log.debug('Start converting from %s to %s' % (from_ts, to_ts)) uids = self.timeInterval.get_uids() # obtain uids need convert if not uids: return trans = self._conn.begin() for uid in uids: self._convert_data(from_ts, to_ts, uid, trans) self.timeInterval.save_uid(uid) self.log.debug( 'Converted from %s to %s uid: %s; plugin work time %s' % ( from_ts, to_ts, uid, self._work_time())) # control plugin work time if self.control_time and self._need_break(): self.log.debug( 'Stop converting; plugin work time %s' % self._work_time()) raise Break() if trans.is_active: trans.commit() def convert_all(self): with self.engine.begin() as self._conn: try: while not self._need_break() and not self.timeInterval.is_too_old(): from_ts, to_ts = self.timeInterval.get_period() self.convert_data(from_ts, to_ts) self.timeInterval.save_timestamp(from_ts) # save timestamp if not breacke cycle only except Break: # for break all cycles pass time_start = time.time() commit_time = time.time() - time_start self._time_commit = max(self._time_commit, commit_time) self.log.debug('Commit time %s' % commit_time) @staticmethod def fault_count(limit, _max): if limit == _max: return 1 else: return 0 @staticmethod def convert_iops_faults(v1_row, v2_row): # v1 & v2 store IOPS the same way, but faults are not tracked in v1 v2_row['iops_fault'] = V1DBMigrator.fault_count(v1_row.liops, v1_row.iops_max) @staticmethod def convert_io(v1_row, v2_row): # v1 stores IO in KB/s, v2 in B/s v2_row['io'] = v1_row.io * 1024 v2_row['io_limit'] = v1_row.io_limit * 1024 v2_row['io_fault'] = V1DBMigrator.fault_count(v1_row.io_limit, v1_row.io_max) @staticmethod def convert_cpu_(procs, cpu, cpu_limit, cpu_max, ncpu): """ v1 holds CPU relative to total cores, where on 4 core system 1 core is 25% it also limits by ncpu (whatever is less), so on 4 cores system 2 ncpu and 30% is 30% of all cores (as 2ncpu = 50%, and we take smaller), and 2 ncpu and 70% is 50%, as 2ncpu = 50% / we take smaller To switch to new limit, we need to talke old limit and multiply it by 100 So 25% on 4 core system in v1 (1 core), is 25 * 4 * 100 = 10,000 """ v2_cpu_limit = min(100 * cpu_limit * procs, ncpu * 100 * 100) # no matter what mistake we make, lets not ever set CPU usage > CPU limit v2_cpu = min(v2_cpu_limit, cpu * procs * 100) # if cpu_limit == cpu_max, lets consider it to be a fault, note we loose precision # anyway, so if weight was 60, we will add 60 faults... oh well. v2_cpu_faults = V1DBMigrator.fault_count(v2_cpu_limit, 100 * cpu_max * procs) return v2_cpu, v2_cpu_limit, v2_cpu_faults def convert_cpu(self, row, v2_row, procs): v2_row['cpu'], v2_row['cpu_limit'], v2_row['cpu_fault'] = self.convert_cpu_( procs, row.cpu, row.cpu_limit, row.cpu_max, row.ncpu) def convert_username_to_uid(self, username): if username in self._username_to_uid_cache: return self._username_to_uid_cache[username] uid = uidconverter._username_to_uid_local(username) self._username_to_uid_cache[username] = uid if uid is None: self.log.warning('Can not find uid for user %s' % username) return uid def convert_uid_to_username(self, uid): if uid in self._no_such_uid_cache: return for username_, uid_ in self._username_to_uid_cache.items(): if uid == uid_: return username_ username_ = uidconverter._uid_to_username_local(uid) if username_ is None: self._no_such_uid_cache.append(uid) self.log.warning('Can not find user name for uid %s' % uid) else: self._username_to_uid_cache[username_] = uid return username_ def convert_gov_row(self, row): to_ts = row.ts result = [] for i in range(0, row.weight): v2_gov_row = {'server_id': self.v2_server_id, 'ts': to_ts - 60 * i} for key in V2_GOV_KEYS: v2_gov_row[key] = row.__getattribute__(key) uid = self.convert_username_to_uid(v2_gov_row.pop('username')) if uid: v2_gov_row['uid'] = uid result.append(v2_gov_row) return result def convert_row(self, row, procs): to_ts = gm_datetime_to_unixtimestamp(row.created) result = [] for i in range(0, row.weight): v2_row = {'server_id': self.v2_server_id, 'created': to_ts - 60 * i} for key in V2_KEYS: v2_row[key] = row.__getattribute__(key) self.convert_cpu(row, v2_row, procs) self.convert_io(row, v2_row) self.convert_iops_faults(row, v2_row) result.append(v2_row) return result