JFIF$        dd7 

Viewing File: /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/rpc_tools/validate.py

import datetime
import logging
import math
import os
import re
from collections import namedtuple
from functools import wraps

from cerberus.validator import Validator
from defence360agent.contracts.config import (
    ANTIVIRUS_MODE,
    BackupRestore,
    Malware,
)
from defence360agent.contracts.license import LicenseCLN
from defence360agent.subsys.backup_systems import BackupSystem, get_backend

logger = logging.getLogger(__name__)

SHA256_REGEXP = re.compile("^[A-Fa-f0-9]{64}$")


class ValidationError(Exception):
    def __init__(self, errors, extra_data=None):
        if isinstance(errors, str):
            self.errors = [errors]
        else:
            self.errors = errors
        self.extra_data = extra_data or {}


OrderByBase = namedtuple("OrderByBase", ["column_name", "desc"])


class OrderBy(OrderByBase):
    def __new__(cls, column_name, desc):
        return super().__new__(cls, column_name, desc)

    @classmethod
    def fromstring(cls, ob_string):
        """
        :param ob_string: for example: 'user+', 'id-'
        :return:
        """
        try:
            col_name, sign = re.compile("^(.+)([+|-])").split(ob_string)[1:-1]
            return cls(col_name, sign == "-")
        except ValueError as e:
            raise ValueError(
                "Incorrect order_by: ({}): {}".format(str(e), ob_string)
            )


class SchemaValidator(Validator):
    _DATE_FORMAT = "%Y-%m-%d"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.extra_data = {}

    def _normalize_coerce_order_by(self, value):
        if isinstance(value, OrderBy):
            return value
        return OrderBy.fromstring(value)

    def _normalize_coerce_sha256hash(self, value):
        return str(value).strip().lower()

    def _normalize_coerce_scan_db(self, value):
        if ANTIVIRUS_MODE:
            return False
        if value is None:
            return Malware.DATABASE_SCAN_ENABLED
        return value

    def _validate_type_order_by(self, value):
        if isinstance(value, OrderBy):
            return True
        return False

    def _validate_type_sha256hash(self, value: str):
        return SHA256_REGEXP.match(str(value).strip())

    def _validate_is_absolute_path(self, is_absolute_path, field, value):
        """{'type': 'boolean', 'empty': False}"""
        if is_absolute_path:
            if not os.path.isabs(value):
                self._error(field, "Path {} should be absolute".format(value))

    def _validate_isascii(self, isascii, field, value):
        """{'type': 'boolean'}"""
        if isascii:
            try:
                value.encode("ascii")
            except UnicodeEncodeError:
                self._error(field, "Must only contain ascii symbols")

    def _normalize_coerce_int(self, value):
        return int(value)

    def _normalize_default_setter_now(self, document) -> int:
        return math.ceil(datetime.datetime.now().timestamp())

    # for argparser support
    def _validate_cli(self, *args, **kwargs):
        """{'type': 'dict', 'empty': False, 'schema': {
        'users': {'type': 'list', 'allowed': ['non-root', 'root'],
            'empty': False},
        'require_rpc': {'type': 'string', 'empty': True, 'default': 'running',
                        'allowed': ['running', 'stopped', 'any', 'direct']}
        }}
        """

    # for argparser support
    def _validate_help(self, *args, **kwargs):
        """{'type': 'string', 'empty': False}"""

    # for argparser support
    def _validate_positional(self, *args, **kwargs):
        """{'type': 'boolean', 'empty': True, 'default': False}"""

    # metadata for response validation
    def _validate_return_type(self, *args, **kwargs):
        """{'type': 'string', 'empty': True}"""

    def _validate_cli_only(self, *args, **kwargs):
        """{'type': 'boolean', 'empty': False, 'default': False}"""

    def _validate_envvar(self, *args, **kwargs):
        """
        Parameter can be passed via the specified environment variable.
        The value specified via a CLI argument takes precedence.

        The rule's arguments are validated against this schema:
        {'type': 'string', 'empty': False}
        """

    def _validate_envvar_only(self, *args, **kwargs):
        """
        Parameter will only be accepted if provided via environment
        variable specified by `envvar`. It will be rejected if passed as
        a CLI argument.

        The rule's arguments are validated against this schema:
        {'type': 'boolean', 'default': False}
        """

    def _normalize_coerce_path(self, value: str):
        if value:
            return os.path.abspath(value)

        return value

    def _normalize_coerce_backup_system(self, value):
        if isinstance(value, BackupSystem):
            return value

        return get_backend(value)

    def _validator_backup_is_enabled(self, field, value):
        if not (BackupRestore.ENABLED and BackupRestore.backup_system()):
            self._error(field, "Backup is not enabled!")


def validate(validator, hashable, params):
    values = validator.normalized(
        {hashable: params}, always_return_document=True
    )
    if not validator.validate({hashable: values[hashable]}):
        logger.warning(
            "Validation error with command {}, params {}, errors {}".format(
                hashable, params, validator.errors
            )
        )
        raise ValidationError(validator.errors, validator.extra_data)

    return validator.document[hashable]


def validate_middleware(validator):
    def wrapped(f):
        @wraps(f)
        async def wrapper(request, *args, **kwargs):
            hashable = tuple(request["command"])
            request["params"] = validate(
                validator, hashable, request["params"]
            )
            result = await f(request, *args, **kwargs)
            return result

        return wrapper

    return wrapped


def validate_av_plus_license(func):
    """
    Decorator for CLI commands methods that ensures that the AV+ license
    is valid.

    :raises ValidationError:
    """
    exception = ValidationError("ImunifyAV+ license required")

    @wraps(func)
    async def async_wrapper(*args, **kwargs):
        if LicenseCLN.is_valid_av_plus():
            return await func(*args, **kwargs)
        raise exception

    if ANTIVIRUS_MODE:
        return async_wrapper
    return func
Back to Directory  nL+D550H?Mx ,D"v]qv;6*Zqn)ZP0!1 A "#a$2Qr D8 a Ri[f\mIykIw0cuFcRı?lO7к_f˓[C$殷WF<_W ԣsKcëIzyQy/_LKℂ;C",pFA:/]=H  ~,ls/9ć:[=/#f;)x{ٛEQ )~ =𘙲r*2~ a _V=' kumFD}KYYC)({ *g&f`툪ry`=^cJ.I](*`wq1dđ#̩͑0;H]u搂@:~וKL Nsh}OIR*8:2 !lDJVo(3=M(zȰ+i*NAr6KnSl)!JJӁ* %݉?|D}d5:eP0R;{$X'xF@.ÊB {,WJuQɲRI;9QE琯62fT.DUJ;*cP A\ILNj!J۱+O\͔]ޒS߼Jȧc%ANolՎprULZԛerE2=XDXgVQeӓk yP7U*omQIs,K`)6\G3t?pgjrmۛجwluGtfh9uyP0D;Uڽ"OXlif$)&|ML0Zrm1[HXPlPR0'G=i2N+0e2]]9VTPO׮7h(F*癈'=QVZDF,d߬~TX G[`le69CR(!S2!P <0x<!1AQ "Raq02Br#SCTb ?Ζ"]mH5WR7k.ۛ!}Q~+yԏz|@T20S~Kek *zFf^2X*(@8r?CIuI|֓>^ExLgNUY+{.RѪ τV׸YTD I62'8Y27'\TP.6d&˦@Vqi|8-OΕ]ʔ U=TL8=;6c| !qfF3aů&~$l}'NWUs$Uk^SV:U# 6w++s&r+nڐ{@29 gL u"TÙM=6(^"7r}=6YݾlCuhquympǦ GjhsǜNlɻ}o7#S6aw4!OSrD57%|?x>L |/nD6?/8w#[)L7+6〼T ATg!%5MmZ/c-{1_Je"|^$'O&ޱմTrb$w)R$& N1EtdU3Uȉ1pM"N*(DNyd96.(jQ)X 5cQɎMyW?Q*!R>6=7)Xj5`J]e8%t!+'!1Q5 !1 AQaqё#2"0BRb?Gt^## .llQT $v,,m㵜5ubV =sY+@d{N! dnO<.-B;_wJt6;QJd.Qc%p{ 1,sNDdFHI0ГoXшe黅XۢF:)[FGXƹ/w_cMeD,ʡcc.WDtA$j@:) -# u c1<@ۗ9F)KJ-hpP]_x[qBlbpʖw q"LFGdƶ*s+ډ_Zc"?%t[IP 6J]#=ɺVvvCGsGh1 >)6|ey?Lӣm,4GWUi`]uJVoVDG< SB6ϏQ@ TiUlyOU0kfV~~}SZ@*WUUi##; s/[=!7}"WN]'(L! ~y5g9T̅JkbM' +s:S +B)v@Mj e Cf jE 0Y\QnzG1д~Wo{T9?`Rmyhsy3!HAD]mc1~2LSu7xT;j$`}4->L#vzŏILS ֭T{rjGKC;bpU=-`BsK.SFw4Mq]ZdHS0)tLg