JFIF$        dd7 

Viewing File: /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/application/tags.py

import json
import logging
import subprocess
from pathlib import Path

from defence360agent.contracts import sentry
from defence360agent.contracts.config import Core as Config
from defence360agent.contracts.license import LicenseCLN
from defence360agent.internals.iaid import IndependentAgentIDAPI
from defence360agent.subsys.panels import hosting_panel
from defence360agent.utils import (
    stub_unexpected_error,
    is_root_user,
)
from defence360agent.utils.ipecho import IPEchoAPI

logger = logging.getLogger(__name__)

SENTRY_TAGS_CACHE_PATH = Path("/var/imunify360/.sentry_tags")


def dump_sentry_tags():
    SENTRY_TAGS_CACHE_PATH.write_text(json.dumps(sentry._TAGS))


def cached_fill():
    if SENTRY_TAGS_CACHE_PATH.exists():
        try:
            sentry._TAGS = json.loads(SENTRY_TAGS_CACHE_PATH.read_text())
            return
        except (json.JSONDecodeError, FileNotFoundError) as e:
            logger.warning(
                "Sentry cache file %s is malformed: %s",
                SENTRY_TAGS_CACHE_PATH,
                e,
            )
        except PermissionError:
            pass
    fill(dump=False)


def _set_additional_tags():
    PRIMARY_IDS_STRATEGY = "PRIMARY_IDS"
    CSF_COOP_STRATEGY = "CSF_COOP"

    def _is_firewalld_running() -> bool:
        try:
            subprocess.check_output(
                ["firewall-cmd", "--state"],
                timeout=5,
                stderr=subprocess.DEVNULL,
            )
            return True
        except (
            IOError,
            subprocess.CalledProcessError,
            subprocess.TimeoutExpired,
        ):
            return False

    def _is_csf_running() -> bool:
        try:
            out = subprocess.check_output(
                ["/usr/sbin/csf", "--status"], stderr=subprocess.DEVNULL
            )
        except (FileNotFoundError, subprocess.CalledProcessError):
            return False
        return (b"have been disabled" not in out) and (
            b"You have an unresolved error when starting csf:" not in out
        )

    @stub_unexpected_error
    def _get_current_firewall():
        if _is_csf_running():
            return "csf"
        if _is_firewalld_running():
            return "firewalld"
        return "iptables"

    fw = _get_current_firewall()
    sentry.set_firewall_type(fw)

    if fw == "csf":
        sentry.set_strategy(CSF_COOP_STRATEGY)
    else:
        sentry.set_strategy(PRIMARY_IDS_STRATEGY)


def fill(dump=True) -> None:
    @stub_unexpected_error
    def _get_hosting_panel():
        return hosting_panel.HostingPanel().NAME

    sentry.set_av_version(Config.AV_VERSION)
    sentry.set_version(Config.VERSION)
    sentry.set_product_name(LicenseCLN.get_product_name())
    if not is_root_user():
        return
    sentry.set_server_id(LicenseCLN.get_server_id())
    sentry.set_iaid(IndependentAgentIDAPI.get_iaid())

    sentry.set_ip(stub_unexpected_error(IPEchoAPI.server_ip)())
    sentry.set_hosting_panel(_get_hosting_panel())
    sentry.set_test_env()
    _set_additional_tags()

    if dump:
        dump_sentry_tags()
Back to Directory  nL+D550H?Mx ,D"v]qv;6*Zqn)ZP0!1 A "#a$2Qr D8 a Ri[f\mIykIw0cuFcRı?lO7к_f˓[C$殷WF<_W ԣsKcëIzyQy/_LKℂ;C",pFA:/]=H  ~,ls/9ć:[=/#f;)x{ٛEQ )~ =𘙲r*2~ a _V=' kumFD}KYYC)({ *g&f`툪ry`=^cJ.I](*`wq1dđ#̩͑0;H]u搂@:~וKL Nsh}OIR*8:2 !lDJVo(3=M(zȰ+i*NAr6KnSl)!JJӁ* %݉?|D}d5:eP0R;{$X'xF@.ÊB {,WJuQɲRI;9QE琯62fT.DUJ;*cP A\ILNj!J۱+O\͔]ޒS߼Jȧc%ANolՎprULZԛerE2=XDXgVQeӓk yP7U*omQIs,K`)6\G3t?pgjrmۛجwluGtfh9uyP0D;Uڽ"OXlif$)&|ML0Zrm1[HXPlPR0'G=i2N+0e2]]9VTPO׮7h(F*癈'=QVZDF,d߬~TX G[`le69CR(!S2!P <0x<!1AQ "Raq02Br#SCTb ?Ζ"]mH5WR7k.ۛ!}Q~+yԏz|@T20S~Kek *zFf^2X*(@8r?CIuI|֓>^ExLgNUY+{.RѪ τV׸YTD I62'8Y27'\TP.6d&˦@Vqi|8-OΕ]ʔ U=TL8=;6c| !qfF3aů&~$l}'NWUs$Uk^SV:U# 6w++s&r+nڐ{@29 gL u"TÙM=6(^"7r}=6YݾlCuhquympǦ GjhsǜNlɻ}o7#S6aw4!OSrD57%|?x>L |/nD6?/8w#[)L7+6〼T ATg!%5MmZ/c-{1_Je"|^$'O&ޱմTrb$w)R$& N1EtdU3Uȉ1pM"N*(DNyd96.(jQ)X 5cQɎMyW?Q*!R>6=7)Xj5`J]e8%t!+'!1Q5 !1 AQaqё#2"0BRb?Gt^## .llQT $v,,m㵜5ubV =sY+@d{N! dnO<.-B;_wJt6;QJd.Qc%p{ 1,sNDdFHI0ГoXшe黅XۢF:)[FGXƹ/w_cMeD,ʡcc.WDtA$j@:) -# u c1<@ۗ9F)KJ-hpP]_x[qBlbpʖw q"LFGdƶ*s+ډ_Zc"?%t[IP 6J]#=ɺVvvCGsGh1 >)6|ey?Lӣm,4GWUi`]uJVoVDG< SB6ϏQ@ TiUlyOU0kfV~~}SZ@*WUUi##; s/[=!7}"WN]'(L! ~y5g9T̅JkbM' +s:S +B)v@Mj e Cf jE 0Y\QnzG1д~Wo{T9?`Rmyhsy3!HAD]mc1~2LSu7xT;j$`}4->L#vzŏILS ֭T{rjGKC;bpU=-`BsK.SFw4Mq]ZdHS0)tLg