Intrusion Exploit
Server: LiteSpeed
System: Linux cisadane.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: lenf4658 (1805)
PHP: 8.4.19
Disabled: NONE
Upload Files
File: //proc/self/root/opt/cloudlinux/venv/lib64/python3.11/site-packages/websiteisolation/commands.py
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2026 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

"""Public command API for lvdctl — per-domain (LVD) resource limit management."""

import logging
import os
import subprocess
import sys

from clcommon.cpapi import userdomains

from .config import (
    DomainEntry, LvdConfig,
    get_username, resolve_docroot,
)
from .exceptions import LvdError

log = logging.getLogger(__name__)

REGISTRY_HELPER = '/usr/share/lve-utils/lvd-registry-helper'
LIMITS_HELPER = '/usr/share/lve-utils/lvd-limits-helper'

_DEBUG = int(os.environ.get('PYLVE_DEBUG', 0))


def _ok(**kwargs):
    return {'result': 'success', **kwargs}


def _user_domains(lve_id):
    """Return set of domain names that belong to the user (via panel API)."""
    username = get_username(lve_id)
    try:
        pairs = userdomains(username) or []
    except Exception as exc:
        raise LvdError(f"failed to query domains for user '{username}': {exc}") from exc
    return {name for name, _docroot in pairs}


def _docroot_for(domain):
    """Resolve domain -> docroot."""
    docroot = resolve_docroot(domain)
    if not docroot:
        raise LvdError(f"cannot resolve document root for domain '{domain}'")
    return docroot


def _helper_env():
    """Build environment for SUID helper subprocesses."""
    env = os.environ.copy()
    if _DEBUG:
        env['LIBLVE_DEBUG_ENABLED'] = '1'
    return env


def _dbg(msg):
    if _DEBUG:
        print(f"DEBUG [lvdctl]: {msg}", file=sys.stderr)


def _get_domain_lve_id(uid, docroot):
    """Call lvd-registry-helper get and return domain_id, or None if not found."""
    argv = [REGISTRY_HELPER, 'get', str(uid), docroot]
    _dbg(f"call {REGISTRY_HELPER} get uid={uid} docroot={docroot}")
    try:
        result = subprocess.run(
            argv, capture_output=True, text=True, check=False,
            env=_helper_env(),
        )
    except OSError as e:
        raise LvdError(f"failed to run {REGISTRY_HELPER}: {e}") from e

    _dbg(f"  rc={result.returncode} stdout={result.stdout.strip()!r}"
         f" stderr={result.stderr.strip()!r}")

    if result.returncode != 0:
        stderr = result.stderr.strip()
        raise LvdError(f"lvd-registry-helper failed: {stderr}")

    out = result.stdout.strip()
    if not out:
        return None
    try:
        return int(out)
    except ValueError as exc:
        raise LvdError(f"lvd-registry-helper returned invalid output: {out!r}") from exc


def _call_limits_helper(uid, domain_id, limits):
    """Call lvd-limits-helper to apply limits to kernel.

    Unit conversions (user-facing → kernel):
      cpu  — centipercent, pass as-is
      pmem — bytes → 4 KB pages
      io   — KB/s, pass as-is
      nproc, iops — pass as-is
    """
    pmem_bytes = limits.get('pmem', 0)
    pmem_pages = pmem_bytes // 4096 if pmem_bytes else 0
    cpu = limits.get('cpu', 0)
    io = limits.get('io', 0)
    nproc = limits.get('nproc', 0)
    iops = limits.get('iops', 0)
    argv = [
        LIMITS_HELPER,
        str(uid), str(domain_id),
        str(cpu), str(pmem_pages), str(io), str(nproc), str(iops),
    ]
    _dbg(f"call {LIMITS_HELPER} uid={uid} domain_id={domain_id}"
         f" cpu={cpu} pmem={pmem_pages}pages({pmem_bytes}bytes)"
         f" io={io} nproc={nproc} iops={iops}")
    try:
        result = subprocess.run(
            argv, capture_output=True, text=True, check=False,
            env=_helper_env(),
        )
    except OSError as e:
        raise LvdError(f"failed to run {LIMITS_HELPER}: {e}") from e

    _dbg(f"  rc={result.returncode} stderr={result.stderr.strip()!r}")
    if result.stdout.strip():
        _dbg(f"  stdout={result.stdout.strip()!r}")

    if result.returncode != 0:
        stderr = result.stderr.strip()
        raise LvdError(f"lvd-limits-helper failed: {stderr}")


def cmd_set(lve_id, domain, limits):
    """Store per-domain limits in config and apply them to kernel."""
    # Verify registration before touching the config: if the domain has no
    # assigned LVE ID the limits helper will fail anyway, and we must not
    # leave a domains.json entry that can never be applied.
    docroot = _docroot_for(domain)
    domain_id = _get_domain_lve_id(lve_id, docroot)
    if domain_id is None:
        raise LvdError(
            f"domain '{domain}' has no registered domain ID; "
            "the server administrator must run "
            f"'lvectl enable-domain-limits {domain}' first"
        )

    config = LvdConfig.load(lve_id)
    entry = config.find_domain(name=domain)

    if entry is None:
        entry = DomainEntry(name=domain)
        config.domains.append(entry)

    entry.limits.update(**limits)
    config.save()

    _call_limits_helper(lve_id, domain_id, entry.limits.to_dict())

    return _ok(domain=domain, limits=entry.limits.to_dict())


def cmd_list(lve_id=None, domain=None):
    """
    List domains and their limits from config.
    Only includes domains that actually belong to the user (via panel API).
    """
    config = LvdConfig.load(lve_id)
    owned = _user_domains(lve_id)

    domains = config.domains
    if domain is not None:
        domains = [d for d in domains if d.name == domain]
    result = []
    for d in domains:
        if d.name not in owned:
            continue
        result.append({
            'name': d.name,
            'lve_id': lve_id,
            'limits': d.limits.to_dict(),
        })
    return _ok(domains=result)


def cmd_apply(lve_id, domain):
    """Push one domain's limits from config to kernel."""
    config = LvdConfig.load(lve_id)
    return _apply_domain(lve_id, domain, config)


# --- Internal helpers ---

def _apply_domain(lve_id, domain, config):
    """
    Push one domain's limits from config to kernel via SUID helpers.

    Looks up the domain ID that was assigned by the admin via
    ``lvectl enable-domain-limits``.  Domain ID assignment is a
    root-only operation; users can only read existing mappings and
    apply limits to them.
    """
    entry = config.find_domain(name=domain)
    if entry is None:
        raise LvdError(f"domain '{domain}' not found in config; use 'set' first")

    docroot = _docroot_for(domain)
    domain_id = _get_domain_lve_id(lve_id, docroot)
    if domain_id is None:
        raise LvdError(
            f"domain '{domain}' has no registered domain ID; "
            "the server administrator must run "
            f"'lvectl enable-domain-limits {domain}' first"
        )

    _call_limits_helper(lve_id, domain_id, entry.limits.to_dict())

    return _ok(domain=domain, limits=entry.limits.to_dict())