D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
alt
/
python37
/
lib
/
python3.7
/
site-packages
/
ssa
/
modules
/
Filename :
processor.py
back
Copy
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ This module contains RequestProcessor class """ import logging import sys import time import traceback from datetime import datetime, timedelta from threading import Thread, RLock, current_thread from typing import Callable, Any from .autotracer import AutoTracer from .common import Common from .decision_maker import DecisionMaker from .stat_sender import StatisticsSender from ..db import session_scope, setup_database, RequestResult, cleanup_old_data from ..internal.exceptions import SSAError from ..internal.utils import ( singleton, url_split, switch_schedstats ) @singleton class RequestProcessor(Common): """ SSA Request processor implementation. Only one instance is allowed to be created """ BUFFER_SIZE = 100 def __init__(self, engine=None): super().__init__() self.logger = logging.getLogger('req_processor') self.logger.info('Processor enabled: %s', __package__) # enable throttling detection kernel mechanism on service start switch_schedstats(enabled=True) self.engine = engine if engine else setup_database() self._lock = RLock() self.decision_maker = DecisionMaker(engine=engine) self.sender = StatisticsSender() self.auto_tracer = AutoTracer(engine=engine) self.start_background_routine() # sqlite is not thread-safe and saving results one-by-one # into database slowes ssa 10x times # so let's make a buffer that will contain some elements # and flush it periodically self._buffer = [] @property def configured_duration(self): """ Return config file value multiplied by 1000000, as we receive duration in microseconds """ return self.requests_duration * 1000000 def send_stats(self, report: dict): """ Call Statistics Sender """ try: self.sender.send(report) except SSAError as e: self.logger.error('StatisticsSender failed: %s', str(e)) def start_background_routine(self) -> None: """ Start dumper|DecisionMaker thread in background """ t = Thread(target=self.background_routine, daemon=True) t.start() self.logger.info('[%s] Routine started', t.name) def background_routine(self) -> None: """ Dumps collected stats to file once an hour. Runs DecisionMaker once a day Cleanup storage after DecisionMaker run """ while True: tick = datetime.now() if tick.minute == 0: if tick.hour == 0: self.logger.info( '[%s] Routine thread launching buffer flushing (%s)', current_thread().name, tick) self._safe_exec(self.flush_with_lock) self.logger.info( '[%s] Routine thread launching AutoTracer (%s)', current_thread().name, tick) self._safe_exec(self.auto_tracer) self.logger.info( '[%s] Routine thread launching DecisionMaker (%s)', current_thread().name, tick) report = self._safe_exec(self.decision_maker) self.logger.info( '[%s] Routine thread launching cleanup (%s)', current_thread().name, tick) cleanup_old_data(self.engine) self._safe_exec(self.send_stats, report) # attempt to enable throttling detection kernel mechanism # in case it was accidentally switched off switch_schedstats(enabled=True) self._simple_sleep(60) else: self._sleep_till_next_hour(tick.minute) def _safe_exec(self, action: Callable, *args) -> Any: """Call requested Callable with given args and capture any exception""" try: return action(*args) except Exception: et, ev, _ = sys.exc_info() self.logger.exception('%s failed with exception %s, %s', str(action), et, ev, extra={'orig_traceback': traceback.format_exc()}) def _simple_sleep(self, to_sleep: int = 15 * 60): """ Log and sleep given number of seconds or 15 minutes by default """ self.logger.info('[%s] Routine thread sleeping for (%s)', current_thread().name, to_sleep) time.sleep(to_sleep) def _sleep_till_next_hour(self, start_minute): """ Sleep the number of minutes remaining till next hour """ sleep_for = (timedelta(hours=1) - timedelta( minutes=start_minute)).total_seconds() self._simple_sleep(int(sleep_for)) @staticmethod def get_interval_for(timestamp: int) -> int: """ Takes an hour of a day, to which the given timestamp belongs """ return datetime.fromtimestamp(timestamp).hour def flush_with_lock(self): with self._lock: objects = self._buffer[:] self._buffer = [] self.flush_buffer(objects) def flush_buffer(self, objects=None): """ Save in-memory buffer into database. """ if objects is None: objects = self._buffer if not objects: return with session_scope(self.engine) as db: db.bulk_save_objects(objects) def handle(self, data: dict) -> None: """ Process given request data """ url = data.get('url') if self.is_ignored(url): self.logger.debug('%s ignored', url) return domain, uri = url_split(url) self.logger.debug('[%s] Acquires lock to handle request counters', current_thread().name) objects_per_thread = [] with self._lock: self._buffer.append( RequestResult( domain=domain, path=url, timestamp=data['timestamp'], duration=data['duration'], is_slow_request=data['duration'] > self.configured_duration, hitting_limits=data['hitting_limits'], throttled_time=data['throttled_time'], io_throttled_time=data['io_throttled_time'], wordpress=data['wordpress'], ) ) # check buffer in same lock to prevent race conditions # between threads modifying buffer if len(self._buffer) >= self.BUFFER_SIZE: objects_per_thread = self._buffer[:] self._buffer = [] self.flush_buffer(objects_per_thread) self.logger.debug('[%s] Released lock to handle request counters', current_thread().name)