JFIF$        dd7 

Viewing File: /opt/imunify360/venv/lib/python3.11/site-packages/imav/plugins/plesk_notifications.py

"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.


This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 
See the GNU General Public License for more details.


You should have received a copy of the GNU General Public License
 along with this program.  If not, see <https://www.gnu.org/licenses/>.

Copyright © 2019 Cloud Linux Software Inc.

This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import logging
from functools import lru_cache
from pathlib import Path

from defence360agent.subsys.panels import hosting_panel, plesk
from defence360agent.contracts.plugins import MessageSink
from defence360agent.contracts.hooks import HooksConfig
from defence360agent.subsys import notifier

logger = logging.getLogger(__name__)

# Use PLESK_NOTIFICATION_SCRIPT_PATH and PLESK_NOTIFICATION_HOOK_PATH
# from `defence360agent.subsys.panels.plesk`
# when core package with the `DEF-35388` is released
SCRIPT_PATH = "/usr/local/psa/admin/plib/modules/imunify360/scripts/send-notifications.php"
HOOK_PATH = "/opt/imunify360/venv/share/imunify360/scripts/send-notifications"
EVENTS = [
    "CUSTOM_SCAN_MALWARE_FOUND",
    "USER_SCAN_MALWARE_FOUND",
    "REALTIME_MALWARE_FOUND",
]


class PleskNotificationsHooks(MessageSink):
    async def create_sink(self, loop):
        """MessageSink method"""
        if hosting_panel.HostingPanel().NAME != plesk.Plesk.NAME:
            return
        if self.is_supported():
            if not self.is_applied():
                await self.add_hooks()
        else:
            await self.remove_hooks()

    @lru_cache(maxsize=1)
    def is_supported(self) -> bool:
        return Path(SCRIPT_PATH).exists() and Path(HOOK_PATH).exists()

    def is_applied(self) -> bool:
        config = HooksConfig().get()
        config_rules = config.get("rules", {})

        if not all(event in config_rules for event in EVENTS):
            return False

        return all(
            HOOK_PATH
            in config_rules[event].get("SCRIPT", {}).get("scripts", [])
            for event in EVENTS
        )

    async def add_hooks(self):
        config = HooksConfig().get()
        data = {
            "rules": {
                event: rule
                for event, rule in config.get("rules", {}).items()
                if event in EVENTS
            }
        }

        for event in EVENTS:
            if event not in data["rules"]:
                data["rules"][event] = {}

        updated = False
        for event in EVENTS:
            rule = data["rules"][event]
            if HOOK_PATH not in rule.get("SCRIPT", {}).get("scripts", []):
                if "SCRIPT" not in rule:
                    rule["SCRIPT"] = {}
                    rule["SCRIPT"]["scripts"] = []
                elif "scripts" not in rule["SCRIPT"]:
                    rule["SCRIPT"]["scripts"] = []
                rule["SCRIPT"]["enabled"] = True
                rule["SCRIPT"]["scripts"].append(HOOK_PATH)
                updated = True

        if updated:
            HooksConfig().update(data)
            try:
                await notifier.config_updated()
            except ConnectionRefusedError:
                logger.warning(
                    "Notifier is not running, cannot send CONFIG_UPDATED event"
                )
            else:
                logger.info("Hooks added and configuration updated")

    async def remove_hooks(self):
        config = HooksConfig().get()
        data = {
            "rules": {
                event: rule
                for event, rule in config.get("rules", {}).items()
                if event in EVENTS
            }
        }
        updated = False
        for event, rule in data["rules"].items():
            if HOOK_PATH in rule.get("SCRIPT", {}).get("scripts", []):
                rule["SCRIPT"]["scripts"].remove(HOOK_PATH)
                rule["SCRIPT"]["enabled"] = len(rule["SCRIPT"]["scripts"]) != 0
                updated = True
        if updated:
            HooksConfig().update(data)
            try:
                await notifier.config_updated()
            except ConnectionRefusedError:
                logger.warning(
                    "Notifier is not running, cannot send CONFIG_UPDATED event"
                )
            else:
                logger.info("Hooks removed and configuration updated")
Back to Directory  nL+D550H?Mx ,D"v]qv;6*Zqn)ZP0!1 A "#a$2Qr D8 a Ri[f\mIykIw0cuFcRı?lO7к_f˓[C$殷WF<_W ԣsKcëIzyQy/_LKℂ;C",pFA:/]=H  ~,ls/9ć:[=/#f;)x{ٛEQ )~ =𘙲r*2~ a _V=' kumFD}KYYC)({ *g&f`툪ry`=^cJ.I](*`wq1dđ#̩͑0;H]u搂@:~וKL Nsh}OIR*8:2 !lDJVo(3=M(zȰ+i*NAr6KnSl)!JJӁ* %݉?|D}d5:eP0R;{$X'xF@.ÊB {,WJuQɲRI;9QE琯62fT.DUJ;*cP A\ILNj!J۱+O\͔]ޒS߼Jȧc%ANolՎprULZԛerE2=XDXgVQeӓk yP7U*omQIs,K`)6\G3t?pgjrmۛجwluGtfh9uyP0D;Uڽ"OXlif$)&|ML0Zrm1[HXPlPR0'G=i2N+0e2]]9VTPO׮7h(F*癈'=QVZDF,d߬~TX G[`le69CR(!S2!P <0x<!1AQ "Raq02Br#SCTb ?Ζ"]mH5WR7k.ۛ!}Q~+yԏz|@T20S~Kek *zFf^2X*(@8r?CIuI|֓>^ExLgNUY+{.RѪ τV׸YTD I62'8Y27'\TP.6d&˦@Vqi|8-OΕ]ʔ U=TL8=;6c| !qfF3aů&~$l}'NWUs$Uk^SV:U# 6w++s&r+nڐ{@29 gL u"TÙM=6(^"7r}=6YݾlCuhquympǦ GjhsǜNlɻ}o7#S6aw4!OSrD57%|?x>L |/nD6?/8w#[)L7+6〼T ATg!%5MmZ/c-{1_Je"|^$'O&ޱմTrb$w)R$& N1EtdU3Uȉ1pM"N*(DNyd96.(jQ)X 5cQɎMyW?Q*!R>6=7)Xj5`J]e8%t!+'!1Q5 !1 AQaqё#2"0BRb?Gt^## .llQT $v,,m㵜5ubV =sY+@d{N! dnO<.-B;_wJt6;QJd.Qc%p{ 1,sNDdFHI0ГoXшe黅XۢF:)[FGXƹ/w_cMeD,ʡcc.WDtA$j@:) -# u c1<@ۗ9F)KJ-hpP]_x[qBlbpʖw q"LFGdƶ*s+ډ_Zc"?%t[IP 6J]#=ɺVvvCGsGh1 >)6|ey?Lӣm,4GWUi`]uJVoVDG< SB6ϏQ@ TiUlyOU0kfV~~}SZ@*WUUi##; s/[=!7}"WN]'(L! ~y5g9T̅JkbM' +s:S +B)v@Mj e Cf jE 0Y\QnzG1д~Wo{T9?`Rmyhsy3!HAD]mc1~2LSu7xT;j$`}4->L#vzŏILS ֭T{rjGKC;bpU=-`BsK.SFw4Mq]ZdHS0)tLg