File: //proc/self/root/opt/cloudlinux/venv/lib64/python3.11/site-packages/websiteisolation/config.py
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2026 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
"""Per-user domain config persistence (~/.lve/domains.json)."""
import contextlib
import fcntl
import json
import logging
import os
import tempfile
from dataclasses import dataclass, field
from typing import List, Optional
from clcommon.clpwd import ClPwd, drop_privileges
from clcommon.cpapi import docroot as cpapi_docroot
from . import id_registry
from .exceptions import LvdError
log = logging.getLogger(__name__)
LVD_CONFIG_DIR = '.lve'
LVD_CONFIG_FILE = 'domains.json'
LVD_CONFIG_VERSION = 1
LIMIT_FIELDS = ('cpu', 'pmem', 'io', 'nproc', 'iops')
@dataclass
class DomainLimits:
cpu: Optional[int] = None
pmem: Optional[int] = None
io: Optional[int] = None
nproc: Optional[int] = None
iops: Optional[int] = None
def to_dict(self):
"""Return only fields that are set (non-None)."""
return {k: v for k, v in self.__dict__.items() if v is not None}
@classmethod
def from_dict(cls, data):
if not data:
return cls()
return cls(**{k: int(v) for k, v in data.items() if k in LIMIT_FIELDS})
def update(self, **kwargs):
for k, v in kwargs.items():
if k in LIMIT_FIELDS and v is not None:
setattr(self, k, int(v))
def __bool__(self):
return any(v is not None for v in self.__dict__.values())
@dataclass
class DomainEntry:
name: str = ''
limits: DomainLimits = field(default_factory=DomainLimits)
def to_dict(self):
return {
'name': self.name,
'limits': self.limits.to_dict(),
}
@classmethod
def from_dict(cls, data):
return cls(
name=data.get('name', ''),
limits=DomainLimits.from_dict(data.get('limits')),
)
@dataclass
class LvdConfig:
version: int = LVD_CONFIG_VERSION
domains: List[DomainEntry] = field(default_factory=list)
_lve_id: Optional[int] = field(default=None, repr=False, compare=False)
def find_domain(self, name):
"""Find domain entry by name. Returns None if not found."""
for d in self.domains:
if d.name == name:
return d
return None
def remove_domain(self, name):
"""Remove domain entry by name."""
self.domains = [d for d in self.domains if d.name != name]
def to_dict(self):
return {
'version': self.version,
'domains': [d.to_dict() for d in self.domains],
}
@classmethod
def from_dict(cls, data, lve_id=None):
if not isinstance(data, dict) or 'domains' not in data:
return cls(_lve_id=lve_id)
return cls(
version=data.get('version', LVD_CONFIG_VERSION),
domains=[DomainEntry.from_dict(d) for d in data['domains']],
_lve_id=lve_id,
)
# --- Persistence ---
@classmethod
def load(cls, lve_id):
"""Load domains.json for a user (acquires flock for the read)."""
with cls._flock(lve_id):
return cls._read(lve_id)
def save(self):
"""Write domains.json for this user (acquires flock for the write)."""
if self._lve_id is None:
raise LvdError("cannot save: config has no associated lve_id")
with self._flock(self._lve_id):
self._write()
# --- Internal ---
@staticmethod
@contextlib.contextmanager
def _flock(lve_id):
"""Exclusive flock on a per-user sidecar file."""
with user_context(lve_id):
config_dir = ensure_config_dir(lve_id)
lock_path = os.path.join(config_dir, '.domains.lock')
fd = os.open(lock_path, os.O_CREAT | os.O_RDWR, 0o600)
try:
fcntl.flock(fd, fcntl.LOCK_EX)
yield
finally:
fcntl.flock(fd, fcntl.LOCK_UN)
os.close(fd)
@classmethod
def _read(cls, lve_id):
path = config_path(lve_id)
if not os.path.exists(path):
return cls(_lve_id=lve_id)
try:
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
except (json.JSONDecodeError, IOError) as e:
log.warning("failed to read config %s: %s", path, e)
return cls(_lve_id=lve_id)
return cls.from_dict(data, lve_id=lve_id)
def _write(self):
config_dir = ensure_config_dir(self._lve_id)
path = config_path(self._lve_id)
content = json.dumps(self.to_dict(), indent=2) + '\n'
try:
_write_via_tmp(config_dir, path, content)
except IOError as e:
raise LvdError(f"failed to write config {path}: {e}") from e
def _clpwd():
return ClPwd(min_uid=0)
def get_homedir(uid):
"""Get home directory for a uid."""
try:
return _clpwd().get_pw_by_uid(uid)[0].pw_dir
except ClPwd.NoSuchUserException as exc:
raise LvdError(f"user with uid {uid} not found") from exc
def get_uid_by_username(username):
"""Resolve username to uid."""
try:
return _clpwd().get_uid(username)
except ClPwd.NoSuchUserException as exc:
raise LvdError(f"user '{username}' not found") from exc
def get_username(lve_id):
"""Resolve username from uid."""
try:
return _clpwd().get_names(lve_id)[0]
except (ClPwd.NoSuchUserException, IndexError) as exc:
raise LvdError(f"no user found for uid {lve_id}") from exc
def resolve_lve_id(lve_id=None, username=None):
"""
Resolve lve_id from either --lve-id or --username.
If neither, use effective UID (end-user mode).
"""
if lve_id is not None:
return int(lve_id)
if username is not None:
return get_uid_by_username(username)
uid = os.geteuid()
if uid == 0:
raise LvdError("root must specify --lve-id or --username")
return uid
def config_path(lve_id):
"""Return path to the user's domains.json config."""
homedir = get_homedir(lve_id)
return os.path.join(homedir, LVD_CONFIG_DIR, LVD_CONFIG_FILE)
def ensure_config_dir(lve_id):
"""Create ~/.lve/ directory with proper permissions if it doesn't exist.
Caller must ensure privileges are already dropped to the target user."""
homedir = get_homedir(lve_id)
config_dir = os.path.join(homedir, LVD_CONFIG_DIR)
if not os.path.isdir(config_dir):
os.makedirs(config_dir, mode=0o700, exist_ok=True)
return config_dir
def _write_via_tmp(directory, filename, content):
"""Write content to a file atomically via a temporary file."""
temp_path = None
try:
with tempfile.NamedTemporaryFile('w', dir=directory, delete=False) as tmp:
temp_path = tmp.name
tmp.write(content)
tmp.flush()
os.fsync(tmp.fileno())
os.replace(temp_path, filename)
finally:
if temp_path and os.path.exists(temp_path):
try:
os.remove(temp_path)
except OSError:
pass
@contextlib.contextmanager
def user_context(lve_id):
"""Drop privileges to the target user if running as root."""
if os.geteuid() == 0:
username = get_username(lve_id)
with drop_privileges(username):
yield
else:
yield
def find_all_lve_ids_with_config():
"""Return list of LVE IDs (UIDs) that have domain isolation configured.
Scans the id_registry directory for per-user registry files.
Users only appear here after ``lvdctl set`` or ``lvdctl apply``
has been called at least once for one of their domains.
"""
ids_dir = id_registry.LVD_IDS_DIR
if not os.path.isdir(ids_dir):
return []
result = []
for name in os.listdir(ids_dir):
try:
result.append(int(name))
except ValueError:
continue
return result
def load_config(lve_id):
"""Load the domain isolation config for a given user."""
return LvdConfig.load(lve_id)
def resolve_docroot(domain_name):
"""Resolve domain name to document root path via panel API."""
try:
return cpapi_docroot(domain_name)[0]
except Exception as exc:
raise LvdError(f"failed to resolve docroot for domain '{domain_name}'") from exc