JFIF$        dd7 

Viewing File: /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/contracts/hooks.py

import grp
import os

from defence360agent.contracts.config import Config, Core
from defence360agent.contracts.config_provider import ConfigReader
from defence360agent.utils import antivirus_mode


class Schema:
    @staticmethod
    def dict(data):
        return {
            "type": "dict",
            "schema": data,
            "default": {},
        }

    @staticmethod
    def list_of_strings(regex=None):
        return {
            "type": "list",
            "schema": {
                "type": "string",
                **({"regex": regex} if regex else {}),
            },
            "nullable": False,
            "default": [],
        }

    @staticmethod
    def list_of_emails(default_enabled=True):
        regex = (
            r"^.+@(.+\.)+.+|default$" if default_enabled else r"^.+@(.+\.)+.+$"
        )
        return Schema.list_of_strings(regex)

    @staticmethod
    def period():
        return {
            "period": {
                "type": "integer",
                "coerce": int,
                "min": 1,
                "default": 1,
            }
        }

    @staticmethod
    def string(nullable):
        return {
            "type": "string",
            "nullable": nullable,
        }

    @staticmethod
    def enabled():
        return {
            "enabled": {
                "type": "boolean",
                "default": False,
            }
        }

    @staticmethod
    def admin(period):
        return {
            "ADMIN": Schema.dict(
                {
                    **Schema.enabled(),
                    "admin_emails": Schema.list_of_emails(),
                    **(Schema.period() if period else {}),
                }
            )
        }

    @staticmethod
    def script(period):
        return {
            "SCRIPT": Schema.dict(
                {
                    **Schema.enabled(),
                    "scripts": Schema.list_of_strings(r"^\/.+$"),
                    **(Schema.period() if period else {}),
                }
            )
        }

    @staticmethod
    def user(period):
        return {
            "USER": Schema.dict(
                {
                    **Schema.enabled(),
                    **(Schema.period() if period else {}),
                }
            )
        }

    @staticmethod
    def target_script(period=False):
        return Schema.dict(
            {
                **Schema.script(period=period),
            }
        )

    @staticmethod
    def target_admin_and_script(period=False):
        return Schema.dict(
            {
                **Schema.admin(period=period),
                **Schema.script(period=period),
            }
        )

    @staticmethod
    def target_all(period=False):
        return Schema.dict(
            {
                **Schema.admin(period=period),
                # **Schema.user(period=period), # stage 2
                **Schema.script(period=period),
            }
        )


class HooksConfigReader(ConfigReader):
    GROUP_NAME = "_imunify"

    def _post_write(self):
        os.chmod(self.path, 0o640)
        os.chown(self.path, 0, grp.getgrnam(self.GROUP_NAME).gr_gid)


class HooksConfig(Config):
    def __init__(
        self, path=os.path.join(Core.GLOBAL_CONFDIR, Core.HOOKS_CONFIGFILENAME)
    ):
        validation_schema = (
            {
                "admin": Schema.dict(
                    {
                        "default_emails": Schema.list_of_emails(
                            default_enabled=False
                        ),
                        "notify_from_email": {
                            "type": "string",
                            "default": None,
                            "nullable": True,
                        },
                        "locale": Schema.string(nullable=True),
                    }
                ),
                "users": {
                    "type": "list",
                    "schema": Schema.dict(
                        {
                            "username": Schema.string(nullable=False),
                            "emails": Schema.list_of_emails(),
                            "locale": Schema.string(nullable=True),
                        }
                    ),
                    "nullable": True,
                    "default": [],
                },
                "rules": Schema.dict(
                    {
                        "REALTIME_MALWARE_FOUND": (
                            Schema.target_admin_and_script(period=True)
                        ),
                        "USER_SCAN_MALWARE_FOUND": Schema.target_all(),
                        "SCRIPT_BLOCKED": Schema.target_admin_and_script(
                            period=True
                        ),
                        "USER_SCAN_STARTED": Schema.target_script(),
                        "CUSTOM_SCAN_STARTED": Schema.target_script(),
                        "USER_SCAN_FINISHED": Schema.target_script(),
                        "CUSTOM_SCAN_FINISHED": Schema.target_script(),
                        "CUSTOM_SCAN_MALWARE_FOUND": (
                            Schema.target_admin_and_script()
                        ),
                    }
                ),
                "default": {},
            }
            if antivirus_mode.disabled
            else {
                "rules": Schema.dict(
                    {
                        "USER_SCAN_MALWARE_FOUND": Schema.target_script(),
                        "USER_SCAN_STARTED": Schema.target_script(),
                        "CUSTOM_SCAN_STARTED": Schema.target_script(),
                        "USER_SCAN_FINISHED": Schema.target_script(),
                        "CUSTOM_SCAN_FINISHED": Schema.target_script(),
                        "CUSTOM_SCAN_MALWARE_FOUND": Schema.target_script(),
                    }
                ),
                "default": {},
            }
        )
        super().__init__(
            path=path,
            validation_schema=validation_schema,
            config_reader=HooksConfigReader(path),
        )

    def get(self):
        data = self.config_to_dict()
        data.pop("users", None)
        return data

    def update(self, data):
        data.pop("users", None)
        self.dict_to_config(data)
Back to Directory  nL+D550H?Mx ,D"v]qv;6*Zqn)ZP0!1 A "#a$2Qr D8 a Ri[f\mIykIw0cuFcRı?lO7к_f˓[C$殷WF<_W ԣsKcëIzyQy/_LKℂ;C",pFA:/]=H  ~,ls/9ć:[=/#f;)x{ٛEQ )~ =𘙲r*2~ a _V=' kumFD}KYYC)({ *g&f`툪ry`=^cJ.I](*`wq1dđ#̩͑0;H]u搂@:~וKL Nsh}OIR*8:2 !lDJVo(3=M(zȰ+i*NAr6KnSl)!JJӁ* %݉?|D}d5:eP0R;{$X'xF@.ÊB {,WJuQɲRI;9QE琯62fT.DUJ;*cP A\ILNj!J۱+O\͔]ޒS߼Jȧc%ANolՎprULZԛerE2=XDXgVQeӓk yP7U*omQIs,K`)6\G3t?pgjrmۛجwluGtfh9uyP0D;Uڽ"OXlif$)&|ML0Zrm1[HXPlPR0'G=i2N+0e2]]9VTPO׮7h(F*癈'=QVZDF,d߬~TX G[`le69CR(!S2!P <0x<!1AQ "Raq02Br#SCTb ?Ζ"]mH5WR7k.ۛ!}Q~+yԏz|@T20S~Kek *zFf^2X*(@8r?CIuI|֓>^ExLgNUY+{.RѪ τV׸YTD I62'8Y27'\TP.6d&˦@Vqi|8-OΕ]ʔ U=TL8=;6c| !qfF3aů&~$l}'NWUs$Uk^SV:U# 6w++s&r+nڐ{@29 gL u"TÙM=6(^"7r}=6YݾlCuhquympǦ GjhsǜNlɻ}o7#S6aw4!OSrD57%|?x>L |/nD6?/8w#[)L7+6〼T ATg!%5MmZ/c-{1_Je"|^$'O&ޱմTrb$w)R$& N1EtdU3Uȉ1pM"N*(DNyd96.(jQ)X 5cQɎMyW?Q*!R>6=7)Xj5`J]e8%t!+'!1Q5 !1 AQaqё#2"0BRb?Gt^## .llQT $v,,m㵜5ubV =sY+@d{N! dnO<.-B;_wJt6;QJd.Qc%p{ 1,sNDdFHI0ГoXшe黅XۢF:)[FGXƹ/w_cMeD,ʡcc.WDtA$j@:) -# u c1<@ۗ9F)KJ-hpP]_x[qBlbpʖw q"LFGdƶ*s+ډ_Zc"?%t[IP 6J]#=ɺVvvCGsGh1 >)6|ey?Lӣm,4GWUi`]uJVoVDG< SB6ϏQ@ TiUlyOU0kfV~~}SZ@*WUUi##; s/[=!7}"WN]'(L! ~y5g9T̅JkbM' +s:S +B)v@Mj e Cf jE 0Y\QnzG1д~Wo{T9?`Rmyhsy3!HAD]mc1~2LSu7xT;j$`}4->L#vzŏILS ֭T{rjGKC;bpU=-`BsK.SFw4Mq]ZdHS0)tLg