JFIF$        dd7 

Viewing File: /opt/cloudlinux/venv/lib/python3.11/site-packages/xray/user_agent.py

# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

"""
This module contains X-Ray User Manager service implementation
"""
import json
import logging
import re
import subprocess
from threading import Thread, current_thread
from typing import Tuple

from xray.console_utils.run_user.runners import get_runner, Runner
from xray.internal.constants import user_agent_sock, user_agent_log
from xray.internal.exceptions import XRayError
from xray import gettext as _
from xray.internal.user_plugin_utils import (
    unpack_request,
    pack_response,
    extract_creds,
    check_for_root,
    error_response
)
from xray.internal.utils import create_socket, read_sys_id, configure_logging
from clcommon.clwpos_lib import get_locale_from_envars

logger = logging.getLogger('user_agent')


def general_exec_error() -> Tuple[bytes, bytes]:
    """
    General format of message in case of errors during manager execution
    """
    _err = _('X-Ray User Plugin failed to execute your request. Please, contact your server administrator')
    return error_response(_err).encode(), b''


def log_truncate(orig_msg: bytes) -> str:
    """
    Cut data field from the original message, because it could be huge
    """
    return re.sub('(?<="data": {).+(?=}, "result")', '...', orig_msg.decode())


def duplicate_warning_cast(orig_msg: bytes) -> bytes:
    """
    Extend warning 'Task is duplicated by URL' for end user
    """

    def gen(m):
        """
        Add more text for duplicate warning, leave others unchanged
        """
        additional = b""". \
In case if you do not see running task for the same URL in your list of tasks below, \
contact your server administrator and ask him to check whether the requested URL \
is in the tasks list of X-Ray Admin Plugin or is scheduled for continuous tracing."""
        warn = m.group(0)
        if warn == b'Task is duplicated by URL':
            return warn + additional
        return warn

    return re.sub(b'(?<="warning": ").+(?="})', gen, orig_msg)


def execute_manager(command: dict, user: str, runner: Runner) -> Tuple[bytes, bytes]:
    """
    Trigger runner.target utility with requested parameters
    """
    if runner.name == 'manager':
        try:
            command['system_id'] = read_sys_id()
        except XRayError:
            return general_exec_error()

    def runner_cast_opt(opt):
        """runner may define a special cast for each option"""
        return runner.option_cast[opt](
            opt) if opt in runner.option_cast else opt

    def hide_true_val(v):
        """Hide True value from cmd representation for bool-flag options"""
        return '' if v is True else f'={v}'

    api_version_key = 'api_version'
    locale_option = 'lang'

    # options that is not needed to be passed to utility
    skip_options = [api_version_key, locale_option]

    api_version = command.get(api_version_key)
    # could be passed from WordPress Plugin
    target_locale = command.get(locale_option, get_locale_from_envars())

    if api_version:
        casted_api_version = runner_cast_opt(api_version_key)
        cmd = [f'/usr/sbin/{runner.target}', f'--{casted_api_version}', api_version, command.pop('command')]
    else:
        cmd = [f'/usr/sbin/{runner.target}', command.pop('command')]

    # bool options are added only if they are True
    cmd.extend([f'--{runner_cast_opt(k)}{hide_true_val(v)}' for k, v in
                command.items() if v and k not in skip_options])

    with_env = {'XRAYEXEC_UID': user, 'LANG': target_locale}
    logger.info('Going to execute: %s, with environment: %s', cmd, str(with_env))
    try:
        p = subprocess.run(cmd, capture_output=True,
                           env=with_env)
    except (OSError, ValueError, subprocess.SubprocessError):
        return general_exec_error()

    byte_out, byte_err = p.stdout.strip(), p.stderr.strip()
    if byte_out:
        logger.info('[%s] Proxied command stdout: %s', current_thread().name,
                    log_truncate(byte_out))
    if byte_err:
        logger.info('[%s] Proxied command stderr: %s', current_thread().name,
                    byte_err.decode())
    return duplicate_warning_cast(byte_out), byte_err


def handle(connection: 'socket object') -> None:
    """
    Handle incoming connection
    :param connection: socket object usable to
    send and receive data on the connection
    """
    root_error = json.dumps({
        'result': _('Commands from root are not accepted')
    }, ensure_ascii=False)
    with connection:
        _pid, _uid, _gid = extract_creds(connection)
        if check_for_root(_uid):
            connection.sendall(pack_response(root_error.encode()))
            return

        data = connection.recv(4096)
        if not data:
            return

        args = unpack_request(data)

        try:
            # retrieve runner (manager or adviser)
            runner = get_runner(args.pop('runner'))
        except XRayError as e:
            connection.sendall(pack_response(str(e).encode()))
            return

        try:
            runner.validator(args)
        except SystemExit as e:
            connection.sendall(pack_response(str(e).encode()))
            return

        _out, _err = execute_manager(args, str(_uid), runner)
        connection.sendall(pack_response(_out or _err))


def run() -> None:
    """
    Run listening service
    """
    configure_logging(user_agent_log)
    with create_socket(user_agent_sock) as s:
        while True:
            conn, _ = s.accept()
            t = Thread(target=handle, args=(conn,))
            t.start()
            logger.info('[%s] Started', t.name)
Back to Directory  nL+D550H?Mx ,D"v]qv;6*Zqn)ZP0!1 A "#a$2Qr D8 a Ri[f\mIykIw0cuFcRı?lO7к_f˓[C$殷WF<_W ԣsKcëIzyQy/_LKℂ;C",pFA:/]=H  ~,ls/9ć:[=/#f;)x{ٛEQ )~ =𘙲r*2~ a _V=' kumFD}KYYC)({ *g&f`툪ry`=^cJ.I](*`wq1dđ#̩͑0;H]u搂@:~וKL Nsh}OIR*8:2 !lDJVo(3=M(zȰ+i*NAr6KnSl)!JJӁ* %݉?|D}d5:eP0R;{$X'xF@.ÊB {,WJuQɲRI;9QE琯62fT.DUJ;*cP A\ILNj!J۱+O\͔]ޒS߼Jȧc%ANolՎprULZԛerE2=XDXgVQeӓk yP7U*omQIs,K`)6\G3t?pgjrmۛجwluGtfh9uyP0D;Uڽ"OXlif$)&|ML0Zrm1[HXPlPR0'G=i2N+0e2]]9VTPO׮7h(F*癈'=QVZDF,d߬~TX G[`le69CR(!S2!P <0x<!1AQ "Raq02Br#SCTb ?Ζ"]mH5WR7k.ۛ!}Q~+yԏz|@T20S~Kek *zFf^2X*(@8r?CIuI|֓>^ExLgNUY+{.RѪ τV׸YTD I62'8Y27'\TP.6d&˦@Vqi|8-OΕ]ʔ U=TL8=;6c| !qfF3aů&~$l}'NWUs$Uk^SV:U# 6w++s&r+nڐ{@29 gL u"TÙM=6(^"7r}=6YݾlCuhquympǦ GjhsǜNlɻ}o7#S6aw4!OSrD57%|?x>L |/nD6?/8w#[)L7+6〼T ATg!%5MmZ/c-{1_Je"|^$'O&ޱմTrb$w)R$& N1EtdU3Uȉ1pM"N*(DNyd96.(jQ)X 5cQɎMyW?Q*!R>6=7)Xj5`J]e8%t!+'!1Q5 !1 AQaqё#2"0BRb?Gt^## .llQT $v,,m㵜5ubV =sY+@d{N! dnO<.-B;_wJt6;QJd.Qc%p{ 1,sNDdFHI0ГoXшe黅XۢF:)[FGXƹ/w_cMeD,ʡcc.WDtA$j@:) -# u c1<@ۗ9F)KJ-hpP]_x[qBlbpʖw q"LFGdƶ*s+ډ_Zc"?%t[IP 6J]#=ɺVvvCGsGh1 >)6|ey?Lӣm,4GWUi`]uJVoVDG< SB6ϏQ@ TiUlyOU0kfV~~}SZ@*WUUi##; s/[=!7}"WN]'(L! ~y5g9T̅JkbM' +s:S +B)v@Mj e Cf jE 0Y\QnzG1д~Wo{T9?`Rmyhsy3!HAD]mc1~2LSu7xT;j$`}4->L#vzŏILS ֭T{rjGKC;bpU=-`BsK.SFw4Mq]ZdHS0)tLg