Intrusion Exploit
Server: LiteSpeed
System: Linux cisadane.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: lenf4658 (1805)
PHP: 8.4.19
Disabled: NONE
Upload Files
File: //proc/self/root/opt/cloudlinux/venv/lib64/python3.11/site-packages/websiteisolation/config.py
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2026 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

"""Per-user domain config persistence (~/.lve/domains.json)."""

import contextlib
import fcntl
import json
import logging
import os
import tempfile
from dataclasses import dataclass, field
from typing import List, Optional

from clcommon.clpwd import ClPwd, drop_privileges
from clcommon.cpapi import docroot as cpapi_docroot

from . import id_registry
from .exceptions import LvdError

log = logging.getLogger(__name__)

LVD_CONFIG_DIR = '.lve'
LVD_CONFIG_FILE = 'domains.json'
LVD_CONFIG_VERSION = 1

LIMIT_FIELDS = ('cpu', 'pmem', 'io', 'nproc', 'iops')


@dataclass
class DomainLimits:
    cpu: Optional[int] = None
    pmem: Optional[int] = None
    io: Optional[int] = None
    nproc: Optional[int] = None
    iops: Optional[int] = None

    def to_dict(self):
        """Return only fields that are set (non-None)."""
        return {k: v for k, v in self.__dict__.items() if v is not None}

    @classmethod
    def from_dict(cls, data):
        if not data:
            return cls()
        return cls(**{k: int(v) for k, v in data.items() if k in LIMIT_FIELDS})

    def update(self, **kwargs):
        for k, v in kwargs.items():
            if k in LIMIT_FIELDS and v is not None:
                setattr(self, k, int(v))

    def __bool__(self):
        return any(v is not None for v in self.__dict__.values())


@dataclass
class DomainEntry:
    name: str = ''
    limits: DomainLimits = field(default_factory=DomainLimits)

    def to_dict(self):
        return {
            'name': self.name,
            'limits': self.limits.to_dict(),
        }

    @classmethod
    def from_dict(cls, data):
        return cls(
            name=data.get('name', ''),
            limits=DomainLimits.from_dict(data.get('limits')),
        )


@dataclass
class LvdConfig:
    version: int = LVD_CONFIG_VERSION
    domains: List[DomainEntry] = field(default_factory=list)
    _lve_id: Optional[int] = field(default=None, repr=False, compare=False)

    def find_domain(self, name):
        """Find domain entry by name. Returns None if not found."""
        for d in self.domains:
            if d.name == name:
                return d
        return None

    def remove_domain(self, name):
        """Remove domain entry by name."""
        self.domains = [d for d in self.domains if d.name != name]

    def to_dict(self):
        return {
            'version': self.version,
            'domains': [d.to_dict() for d in self.domains],
        }

    @classmethod
    def from_dict(cls, data, lve_id=None):
        if not isinstance(data, dict) or 'domains' not in data:
            return cls(_lve_id=lve_id)
        return cls(
            version=data.get('version', LVD_CONFIG_VERSION),
            domains=[DomainEntry.from_dict(d) for d in data['domains']],
            _lve_id=lve_id,
        )

    # --- Persistence ---

    @classmethod
    def load(cls, lve_id):
        """Load domains.json for a user (acquires flock for the read)."""
        with cls._flock(lve_id):
            return cls._read(lve_id)

    def save(self):
        """Write domains.json for this user (acquires flock for the write)."""
        if self._lve_id is None:
            raise LvdError("cannot save: config has no associated lve_id")
        with self._flock(self._lve_id):
            self._write()

    # --- Internal ---

    @staticmethod
    @contextlib.contextmanager
    def _flock(lve_id):
        """Exclusive flock on a per-user sidecar file."""
        with user_context(lve_id):
            config_dir = ensure_config_dir(lve_id)
            lock_path = os.path.join(config_dir, '.domains.lock')
            fd = os.open(lock_path, os.O_CREAT | os.O_RDWR, 0o600)
            try:
                fcntl.flock(fd, fcntl.LOCK_EX)
                yield
            finally:
                fcntl.flock(fd, fcntl.LOCK_UN)
                os.close(fd)

    @classmethod
    def _read(cls, lve_id):
        path = config_path(lve_id)
        if not os.path.exists(path):
            return cls(_lve_id=lve_id)
        try:
            with open(path, 'r', encoding='utf-8') as f:
                data = json.load(f)
        except (json.JSONDecodeError, IOError) as e:
            log.warning("failed to read config %s: %s", path, e)
            return cls(_lve_id=lve_id)
        return cls.from_dict(data, lve_id=lve_id)

    def _write(self):
        config_dir = ensure_config_dir(self._lve_id)
        path = config_path(self._lve_id)
        content = json.dumps(self.to_dict(), indent=2) + '\n'
        try:
            _write_via_tmp(config_dir, path, content)
        except IOError as e:
            raise LvdError(f"failed to write config {path}: {e}") from e


def _clpwd():
    return ClPwd(min_uid=0)


def get_homedir(uid):
    """Get home directory for a uid."""
    try:
        return _clpwd().get_pw_by_uid(uid)[0].pw_dir
    except ClPwd.NoSuchUserException as exc:
        raise LvdError(f"user with uid {uid} not found") from exc


def get_uid_by_username(username):
    """Resolve username to uid."""
    try:
        return _clpwd().get_uid(username)
    except ClPwd.NoSuchUserException as exc:
        raise LvdError(f"user '{username}' not found") from exc


def get_username(lve_id):
    """Resolve username from uid."""
    try:
        return _clpwd().get_names(lve_id)[0]
    except (ClPwd.NoSuchUserException, IndexError) as exc:
        raise LvdError(f"no user found for uid {lve_id}") from exc


def resolve_lve_id(lve_id=None, username=None):
    """
    Resolve lve_id from either --lve-id or --username.
    If neither, use effective UID (end-user mode).
    """
    if lve_id is not None:
        return int(lve_id)
    if username is not None:
        return get_uid_by_username(username)
    uid = os.geteuid()
    if uid == 0:
        raise LvdError("root must specify --lve-id or --username")
    return uid


def config_path(lve_id):
    """Return path to the user's domains.json config."""
    homedir = get_homedir(lve_id)
    return os.path.join(homedir, LVD_CONFIG_DIR, LVD_CONFIG_FILE)


def ensure_config_dir(lve_id):
    """Create ~/.lve/ directory with proper permissions if it doesn't exist.
    Caller must ensure privileges are already dropped to the target user."""
    homedir = get_homedir(lve_id)
    config_dir = os.path.join(homedir, LVD_CONFIG_DIR)
    if not os.path.isdir(config_dir):
        os.makedirs(config_dir, mode=0o700, exist_ok=True)
    return config_dir


def _write_via_tmp(directory, filename, content):
    """Write content to a file atomically via a temporary file."""
    temp_path = None
    try:
        with tempfile.NamedTemporaryFile('w', dir=directory, delete=False) as tmp:
            temp_path = tmp.name
            tmp.write(content)
            tmp.flush()
            os.fsync(tmp.fileno())
        os.replace(temp_path, filename)
    finally:
        if temp_path and os.path.exists(temp_path):
            try:
                os.remove(temp_path)
            except OSError:
                pass


@contextlib.contextmanager
def user_context(lve_id):
    """Drop privileges to the target user if running as root."""
    if os.geteuid() == 0:
        username = get_username(lve_id)
        with drop_privileges(username):
            yield
    else:
        yield


def find_all_lve_ids_with_config():
    """Return list of LVE IDs (UIDs) that have domain isolation configured.

    Scans the id_registry directory for per-user registry files.
    Users only appear here after ``lvdctl set`` or ``lvdctl apply``
    has been called at least once for one of their domains.
    """
    ids_dir = id_registry.LVD_IDS_DIR
    if not os.path.isdir(ids_dir):
        return []
    result = []
    for name in os.listdir(ids_dir):
        try:
            result.append(int(name))
        except ValueError:
            continue
    return result


def load_config(lve_id):
    """Load the domain isolation config for a given user."""
    return LvdConfig.load(lve_id)


def resolve_docroot(domain_name):
    """Resolve domain name to document root path via panel API."""
    try:
        return cpapi_docroot(domain_name)[0]
    except Exception as exc:
        raise LvdError(f"failed to resolve docroot for domain '{domain_name}'") from exc