Intrusion Exploit
Server: LiteSpeed
System: Linux cisadane.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: lenf4658 (1805)
PHP: 8.4.19
Disabled: NONE
Upload Files
File: //proc/thread-self/root/opt/cloudlinux/venv/lib64/python3.11/site-packages/ssa/agent.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

"""
This module contains contains classes implementing SSA Agent behaviour
"""
import atexit
import json
import logging
import socket as socket_module
import struct
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
from .internal.constants import agent_sock
from .internal.exceptions import SSAError
from .internal.utils import create_socket
from .modules.processor import RequestProcessor

# Maximum number of concurrent worker threads for handling requests.
# Limits memory usage on high-traffic servers.
MAX_WORKERS = 50


class SimpleAgent:
    """
    SSA Simple Agent class
    """

    def __init__(self):
        self.logger = logging.getLogger('agent')
        self.request_processor = RequestProcessor()
        self.executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
        atexit.register(self._shutdown)
        # start serving incoming connections
        self.listen()

    def _shutdown(self):
        """Gracefully shutdown the thread pool executor."""
        self.executor.shutdown(wait=False)

    def listen(self) -> None:
        """
        Start listening socket
        """
        _socket = create_socket(agent_sock)
        while True:
            connection, address = _socket.accept()
            self.executor.submit(self.handle, connection)
            self.logger.debug('[ThreadPool] Submitted task')

    # Fields that the PHP extension sends (see dump.c: ssa_agent_dump)
    _REQUIRED_FIELDS = frozenset({
        'timestamp', 'url', 'duration',
        'hitting_limits', 'throttled_time', 'io_throttled_time', 'wordpress'
    })

    @staticmethod
    def _get_peer_uid(connection: 'socket object') -> int:
        """
        Get the UID of the peer process using SO_PEERCRED.
        :param connection: socket object
        :return: UID of the connecting process
        """
        cred = connection.getsockopt(
            socket_module.SOL_SOCKET,
            socket_module.SO_PEERCRED,
            struct.calcsize('3i')
        )
        _pid, uid, _gid = struct.unpack('3i', cred)
        return uid

    @classmethod
    def _validate_input(cls, data: dict) -> bool:
        """
        Validate that input data contains exactly the expected metric fields.
        The PHP extension always sends all 7 fields (see dump.c), so we
        require an exact match to reject both unknown and partial payloads.
        """
        if not isinstance(data, dict) or not data:
            return False
        return set(data.keys()) == cls._REQUIRED_FIELDS

    def handle(self, connection: 'socket object') -> None:
        """
        Handle incoming connection
        :param connection: socket object usable to
        send and receive data on the connection
        """
        try:
            peer_uid = self._get_peer_uid(connection)
        except (OSError, struct.error) as e:
            self.logger.error(
                '[%s] Failed to get peer credentials: %s',
                current_thread().name, str(e))
            connection.close()
            return

        fileobj = connection.makefile(errors='ignore')
        try:
            input_data = self.read_input(fileobj)
            if not self._validate_input(input_data):
                self.logger.warning(
                    '[%s] Rejected invalid payload from UID=%d: keys=%s',
                    current_thread().name, peer_uid,
                    sorted(input_data.keys()) if isinstance(input_data, dict) else type(input_data).__name__)
                return
            self.request_processor.handle(input_data)
        except (SSAError, json.JSONDecodeError, ValueError) as e:
            self.logger.error('Handled exception in [%s]: %s',
                              current_thread().name, str(e))
        except Exception as e:
            self.logger.exception('Unexpected exception in [%s]: %s',
                                  current_thread().name, str(e))
        finally:
            fileobj.close()
            connection.close()

    def read_input(self, fileio: 'file object') -> dict:
        """
        Read input data and return decoded json
        :param fileio: a file-like object providing read method
        """
        data = fileio.read()
        self.logger.info('[%s] I received %i bytes: %s',
                         current_thread().name, len(data.encode()),
                         data)
        if data:
            return json.loads(data.strip(), strict=False)
        else:
            return {}