JFIF$        dd7 

Viewing File: /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/simple_rpc/plesk_stats.py

import datetime
import json
from contextlib import suppress

from imav.malwarelib.model import MalwareHit
from defence360agent.contracts.license import LicenseCLN
from defence360agent.rpc_tools.lookup import RootEndpoints, bind
from defence360agent.rpc_tools.utils import run_in_executor_decorator
from defence360agent.subsys.panels.hosting_panel import HostingPanel
from defence360agent.subsys.panels.plesk.api import list_docroots_domains_users
from defence360agent.utils import atomic_rewrite
from defence360agent.subsys.panels.plesk import Plesk
from defence360agent.subsys.features import kernel_care


class PleskStatsEndpoints(RootEndpoints):
    MAX_DOMAINS_COUNT = 100

    @bind("plesk-stats")
    async def plesk_stats(self):
        panel = HostingPanel()
        assert isinstance(panel, Plesk), "only for plesk"
        current_timestamp = int(round(datetime.datetime.now().timestamp()))
        last_modified_str = str(
            datetime.datetime.fromtimestamp(
                current_timestamp,
                datetime.timezone.utc,
            )
        )
        domains_stats = await self._domains_stats(
            await list_docroots_domains_users(),
        )
        return {
            "items": {
                "last_modified": current_timestamp * 1000,
                "last_modified_str": last_modified_str,
                **domains_stats,
                **(await self._get_stats_field_in_plugin_info()),
                "license": (1 if LicenseCLN.is_valid() else 0),
            }
        }

    @classmethod
    async def _get_stats_field_in_plugin_info(cls):
        if not await kernel_care.KernelCare().check_installed():
            return {}
        plugin_info = await kernel_care.KernelCare().get_plugin_info()
        previous = {
            "effective_kernel": None,
            "first_time_update_available": None,
        }
        with suppress(FileNotFoundError):
            with open(kernel_care.KernelCare.KC_PROPERTIES) as file:
                previous = json.load(file)
        update_available = plugin_info["updateCode"] == "1"
        first_time_update_available = (
            datetime.datetime.now(tz=datetime.timezone.utc)
            if plugin_info["effectiveKernel"] != previous["effective_kernel"]
            else datetime.datetime.fromtimestamp(
                previous["first_time_update_available"], datetime.timezone.utc
            )
        )
        outdated_since_days = (
            0
            if not update_available
            else (
                datetime.datetime.now(tz=datetime.timezone.utc)
                - first_time_update_available
            ).days
        )
        atomic_rewrite(
            kernel_care.KernelCare.KC_PROPERTIES,
            json.dumps(
                {
                    "effective_kernel": plugin_info["effectiveKernel"],
                    "first_time_update_available": first_time_update_available.timestamp(),  # noqa
                }
            ),
            backup=False,
        )
        return {
            "kernel_uptodate": plugin_info["autoUpdate"],
            "outdated_since_days": outdated_since_days,
        }

    @run_in_executor_decorator
    def _domains_stats(self, plesk_response):
        file_names = list(
            MalwareHit.select(MalwareHit.orig_file)
            .where(MalwareHit.is_infected())
            .tuples()
        )
        # usually infected_users << total_users (according to ch)
        # so we can compare each hit only with docroots, whose owners are
        # marked as infected in imunify database
        infected_users = set(
            data[0]
            for data in list(
                MalwareHit.select(MalwareHit.user)
                .where(MalwareHit.is_infected())
                .distinct()
                .tuples()
            )
        )
        infected_plesk_response = list(
            filter(
                lambda data: data[2] in infected_users,
                plesk_response,
            )
        )
        infected_sites = []

        for docroot, domain, user in infected_plesk_response:
            for (filename,) in file_names:
                if filename.startswith(docroot):
                    infected_sites.append(domain)
                    break

        return {
            "infected_sites": infected_sites[: self.MAX_DOMAINS_COUNT],
            "wsites_infected": len(infected_sites),
        }
Back to Directory  nL+D550H?Mx ,D"v]qv;6*Zqn)ZP0!1 A "#a$2Qr D8 a Ri[f\mIykIw0cuFcRı?lO7к_f˓[C$殷WF<_W ԣsKcëIzyQy/_LKℂ;C",pFA:/]=H  ~,ls/9ć:[=/#f;)x{ٛEQ )~ =𘙲r*2~ a _V=' kumFD}KYYC)({ *g&f`툪ry`=^cJ.I](*`wq1dđ#̩͑0;H]u搂@:~וKL Nsh}OIR*8:2 !lDJVo(3=M(zȰ+i*NAr6KnSl)!JJӁ* %݉?|D}d5:eP0R;{$X'xF@.ÊB {,WJuQɲRI;9QE琯62fT.DUJ;*cP A\ILNj!J۱+O\͔]ޒS߼Jȧc%ANolՎprULZԛerE2=XDXgVQeӓk yP7U*omQIs,K`)6\G3t?pgjrmۛجwluGtfh9uyP0D;Uڽ"OXlif$)&|ML0Zrm1[HXPlPR0'G=i2N+0e2]]9VTPO׮7h(F*癈'=QVZDF,d߬~TX G[`le69CR(!S2!P <0x<!1AQ "Raq02Br#SCTb ?Ζ"]mH5WR7k.ۛ!}Q~+yԏz|@T20S~Kek *zFf^2X*(@8r?CIuI|֓>^ExLgNUY+{.RѪ τV׸YTD I62'8Y27'\TP.6d&˦@Vqi|8-OΕ]ʔ U=TL8=;6c| !qfF3aů&~$l}'NWUs$Uk^SV:U# 6w++s&r+nڐ{@29 gL u"TÙM=6(^"7r}=6YݾlCuhquympǦ GjhsǜNlɻ}o7#S6aw4!OSrD57%|?x>L |/nD6?/8w#[)L7+6〼T ATg!%5MmZ/c-{1_Je"|^$'O&ޱմTrb$w)R$& N1EtdU3Uȉ1pM"N*(DNyd96.(jQ)X 5cQɎMyW?Q*!R>6=7)Xj5`J]e8%t!+'!1Q5 !1 AQaqё#2"0BRb?Gt^## .llQT $v,,m㵜5ubV =sY+@d{N! dnO<.-B;_wJt6;QJd.Qc%p{ 1,sNDdFHI0ГoXшe黅XۢF:)[FGXƹ/w_cMeD,ʡcc.WDtA$j@:) -# u c1<@ۗ9F)KJ-hpP]_x[qBlbpʖw q"LFGdƶ*s+ډ_Zc"?%t[IP 6J]#=ɺVvvCGsGh1 >)6|ey?Lӣm,4GWUi`]uJVoVDG< SB6ϏQ@ TiUlyOU0kfV~~}SZ@*WUUi##; s/[=!7}"WN]'(L! ~y5g9T̅JkbM' +s:S +B)v@Mj e Cf jE 0Y\QnzG1д~Wo{T9?`Rmyhsy3!HAD]mc1~2LSu7xT;j$`}4->L#vzŏILS ֭T{rjGKC;bpU=-`BsK.SFw4Mq]ZdHS0)tLg