Intrusion Exploit
Server: LiteSpeed
System: Linux cisadane.iixcp.rumahweb.net 5.14.0-427.42.1.el9_4.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Nov 1 14:58:02 EDT 2024 x86_64
User: lenf4658 (1805)
PHP: 8.4.19
Disabled: NONE
Upload Files
File: //sbin/lvdctl
#!/opt/cloudlinux/venv/bin/python3 -bb
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2026 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
#
# lvdctl — end-user utility for managing per-domain (LVD) resource limits.
# JSON-only output.

import argparse
import json
import logging
import os
import sys

from clcommon.clpwd import ClPwd

from websiteisolation.commands import cmd_apply, cmd_list, cmd_set
from websiteisolation.config import resolve_lve_id
from websiteisolation.exceptions import LvdError


VERSION = '1.0.0'


def _setup_logging():
    log_dir = os.path.join(ClPwd(min_uid=0).get_pw_by_uid(os.getuid())[0].pw_dir, '.lve')
    log_path = os.path.join(log_dir, 'lvdctl.log')
    logger = logging.getLogger('websiteisolation')
    logger.setLevel(logging.DEBUG)
    logger.propagate = False

    fmt = logging.Formatter('[%(levelname)s | %(asctime)s]: %(message)s')

    os.makedirs(log_dir, mode=0o700, exist_ok=True)
    fh = logging.FileHandler(log_path)
    fh.setLevel(logging.DEBUG)
    fh.setFormatter(fmt)
    logger.addHandler(fh)


def _output(data):
    """Print JSON response to stdout."""
    print(json.dumps(data, indent=2))


def _error(message, exit_code=1):
    """Print JSON error to stdout and exit."""
    _output({'result': 'error', 'message': str(message)})
    sys.exit(exit_code)


def _add_user_args(parser):
    """Add mutually exclusive --lve-id / --username to a subparser."""
    group = parser.add_mutually_exclusive_group()
    group.add_argument('--lve-id', type=int, metavar='ID',
                       help='LVE (user) ID')
    group.add_argument('--username', metavar='NAME',
                       help='username (alternative to --lve-id)')


def _add_domain_arg(parser, required=True):
    parser.add_argument('--domain', required=required, metavar='NAME',
                        help='domain name')


def _add_limit_args(parser):
    parser.add_argument('--cpu', type=int, metavar='VAL',
                        help='CPU limit (hundredths of percent, 2500 = 25%%)')
    parser.add_argument('--pmem', type=int, metavar='VAL',
                        help='physical memory limit (bytes)')
    parser.add_argument('--io', type=int, metavar='VAL',
                        help='I/O limit (KB/s)')
    parser.add_argument('--nproc', type=int, metavar='VAL',
                        help='max processes')
    parser.add_argument('--iops', type=int, metavar='VAL',
                        help='I/O operations per second')


def _collect_limits(args):
    limits = {}
    for field in ('cpu', 'pmem', 'io', 'nproc', 'iops'):
        val = getattr(args, field, None)
        if val is not None:
            limits[field] = val
    return limits


def handle_list(args):
    lve_id = resolve_lve_id(args.lve_id, args.username)
    return cmd_list(lve_id=lve_id, domain=args.domain)


def handle_set(args):
    lve_id = resolve_lve_id(args.lve_id, args.username)
    limits = _collect_limits(args)
    if not limits:
        _error("at least one limit (--cpu, --pmem, --io, --nproc, --iops) required")
    return cmd_set(lve_id, args.domain, limits)


def handle_apply(args):
    lve_id = resolve_lve_id(args.lve_id, args.username)
    return cmd_apply(lve_id, args.domain)


def build_parser():
    parser = argparse.ArgumentParser(
        prog='lvdctl',
        description='Manage per-domain (LVD) resource limits. JSON-only output.')
    parser.add_argument('--version', action='version',
                        version=json.dumps({'version': VERSION}))

    sub = parser.add_subparsers(dest='command', metavar='command')

    # list
    p = sub.add_parser('list', help='show limits for all domains or a specific domain')
    _add_user_args(p)
    _add_domain_arg(p, required=False)
    p.set_defaults(func=handle_list)

    # set
    p = sub.add_parser('set', help='store per-domain limits and apply them to kernel',
                       epilog='Example:\n'
                              '  lvdctl set --domain example.com --cpu 5000 --pmem 268435456 --io 2048 --nproc 30 --iops 500\n'
                              '\n'
                              'This sets: CPU 50%, 256 MB RAM, 2048 KB/s IO, 30 procs, 500 IOPS.\n'
                              'Use PYLVE_DEBUG=1 for verbose helper output.',
                       formatter_class=argparse.RawDescriptionHelpFormatter)
    _add_user_args(p)
    _add_domain_arg(p)
    _add_limit_args(p)
    p.set_defaults(func=handle_set)

    # apply
    p = sub.add_parser('apply', help='push one domain\'s limits from config to kernel')
    _add_user_args(p)
    _add_domain_arg(p)
    p.set_defaults(func=handle_apply)

    return parser


def main():
    if os.getuid() == 0:
        _error("lvdctl must not be run as root; run as a regular user")

    _setup_logging()
    log = logging.getLogger(__name__)
    log.debug('Executing "%s"', ' '.join(sys.argv))

    is_debug = int(os.environ.get('PYLVE_DEBUG', 0))
    if is_debug:
        print(f"DEBUG [lvdctl]: uid={os.getuid()} argv={sys.argv}",
              file=sys.stderr)

    parser = build_parser()
    args = parser.parse_args()

    if not args.command:
        parser.print_help()
        sys.exit(1)

    try:
        result = args.func(args)
        _output(result)
    except LvdError as e:
        log.error('%s', e)
        _error(str(e))
    except Exception as e:
        log.exception('unexpected error')
        _error(f"unexpected error: {e}")


if __name__ == '__main__':
    main()