JFIF$        dd7 

Viewing File: /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/api/server/send_message.py

import base64
import json
import os
import time
import urllib.error
import urllib.request
from abc import ABC, abstractmethod
from logging import getLogger
from typing import Optional
import asyncio

from defence360agent.api.server import (
    API,
    APIError,
    APITokenError,
    FGWSendMessgeException,
)
from defence360agent.contracts.config import Core
from defence360agent.contracts.messages import Message
from defence360agent.internals.global_scope import g
from defence360agent.internals.iaid import (
    IndependentAgentIDAPI,
    IAIDTokenError,
)
from defence360agent.utils.async_utils import AsyncIterate
from defence360agent.utils.json import ServerJSONEncoder

logger = getLogger(__name__)


class BaseSendMessageAPI(API, ABC):
    URL = "/api/v2/send-message/{method}"

    @abstractmethod
    async def _send_request(self, message_method, headers, post_data) -> dict:
        pass  # pragma: no cover

    def check_response(self, result: dict) -> None:
        if "status" not in result:
            raise APIError("unexpected server response: {!r}".format(result))
        if result["status"] != "ok":
            raise APIError("server error: {}".format(result.get("msg")))

    async def send_data(self, method: str, post_data: bytes) -> None:
        try:
            token = await IndependentAgentIDAPI.get_token()
        except IAIDTokenError as e:
            raise APITokenError(f"IAID token error occurred {e}")
        headers = {
            "Content-Type": "application/json",
            "X-Auth": token,
        }
        result = await self._send_request(method, headers, post_data)
        self.check_response(result)


class SendMessageAPI(BaseSendMessageAPI):
    _SOCKET_TIMEOUT = Core.DEFAULT_SOCKET_TIMEOUT

    def __init__(self, rpm_ver: str, base_url: str = None, executor=None):
        self._executor = executor
        self.rpm_ver = rpm_ver
        self.product_name = ""
        self.server_id = None  # type: Optional[str]
        self.license = {}  # type: dict
        if base_url:
            self.base_url = base_url
        else:
            self.base_url = self._BASE_URL

    def set_product_name(self, product_name: str) -> None:
        self.product_name = product_name

    def set_server_id(self, server_id: Optional[str]) -> None:
        self.server_id = server_id

    def set_license(self, license: dict) -> None:
        self.license = license

    async def _send_request(self, message_method, headers, post_data):
        request = urllib.request.Request(
            self.base_url + self.URL.format(method=message_method),
            data=post_data,
            headers=headers,
            method="POST",
        )
        return await self.async_request(request, executor=self._executor)

    async def send_message(self, message: Message) -> None:
        # add message handling time if it does not exist, so that
        # the server does not depend on the time it was received
        if "timestamp" not in message:
            message["timestamp"] = time.time()

        data2send = {
            "payload": message.payload,
            "rpm_ver": self.rpm_ver,
            "message_id": message.message_id,
            "server_id": self.server_id,
            "name": self.product_name,
        }
        post_data = json.dumps(data2send, cls=ServerJSONEncoder).encode()
        await self.send_data(message.method, post_data)


class FileBasedGatewayAPI(SendMessageAPI):
    async def _prepare_message(self, message, semaphore) -> dict:
        async with semaphore:
            loaded = await asyncio.to_thread(json.loads, message)
            return {
                "method": loaded["method"],
                "data": {k: v for k, v in loaded.items() if k != "method"},
            }

    async def send_messages(self, messages: list[tuple[float, bytes]]) -> None:
        max_threads = 5
        semaphore = asyncio.Semaphore(max_threads)
        tasks = [
            self._prepare_message(msg, semaphore)
            async for _, msg in AsyncIterate(messages)
        ]
        prepared_messages = await asyncio.gather(*tasks)

        dumped_messages = await asyncio.to_thread(
            json.dumps, prepared_messages
        )

        bin_file_path = os.getenv(
            "I360_MESSAGE_GATEWAY_BIN_PATH", "/usr/libexec/"
        )
        bin_file = os.path.join(bin_file_path, "imunify-message-gateway")

        command = [
            bin_file,
            "send-many",
            "--producer=i360-agent-non-resident",
        ]

        process = await asyncio.create_subprocess_exec(
            *command,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        b64data = base64.b64encode(dumped_messages.encode())
        stdout, stderr = await process.communicate(input=b64data)
        if g.get("DEBUG"):
            logger.info(
                "Message sent to fgw: %s %s %s", len(messages), stdout, stderr
            )

        if process.returncode != 0:
            logger.error(f"Error sending message: {stderr.decode()}")
            raise FGWSendMessgeException(
                str(f"Error sending message: {stderr.decode()}")
            )
Back to Directory  nL+D550H?Mx ,D"v]qv;6*Zqn)ZP0!1 A "#a$2Qr D8 a Ri[f\mIykIw0cuFcRı?lO7к_f˓[C$殷WF<_W ԣsKcëIzyQy/_LKℂ;C",pFA:/]=H  ~,ls/9ć:[=/#f;)x{ٛEQ )~ =𘙲r*2~ a _V=' kumFD}KYYC)({ *g&f`툪ry`=^cJ.I](*`wq1dđ#̩͑0;H]u搂@:~וKL Nsh}OIR*8:2 !lDJVo(3=M(zȰ+i*NAr6KnSl)!JJӁ* %݉?|D}d5:eP0R;{$X'xF@.ÊB {,WJuQɲRI;9QE琯62fT.DUJ;*cP A\ILNj!J۱+O\͔]ޒS߼Jȧc%ANolՎprULZԛerE2=XDXgVQeӓk yP7U*omQIs,K`)6\G3t?pgjrmۛجwluGtfh9uyP0D;Uڽ"OXlif$)&|ML0Zrm1[HXPlPR0'G=i2N+0e2]]9VTPO׮7h(F*癈'=QVZDF,d߬~TX G[`le69CR(!S2!P <0x<!1AQ "Raq02Br#SCTb ?Ζ"]mH5WR7k.ۛ!}Q~+yԏz|@T20S~Kek *zFf^2X*(@8r?CIuI|֓>^ExLgNUY+{.RѪ τV׸YTD I62'8Y27'\TP.6d&˦@Vqi|8-OΕ]ʔ U=TL8=;6c| !qfF3aů&~$l}'NWUs$Uk^SV:U# 6w++s&r+nڐ{@29 gL u"TÙM=6(^"7r}=6YݾlCuhquympǦ GjhsǜNlɻ}o7#S6aw4!OSrD57%|?x>L |/nD6?/8w#[)L7+6〼T ATg!%5MmZ/c-{1_Je"|^$'O&ޱմTrb$w)R$& N1EtdU3Uȉ1pM"N*(DNyd96.(jQ)X 5cQɎMyW?Q*!R>6=7)Xj5`J]e8%t!+'!1Q5 !1 AQaqё#2"0BRb?Gt^## .llQT $v,,m㵜5ubV =sY+@d{N! dnO<.-B;_wJt6;QJd.Qc%p{ 1,sNDdFHI0ГoXшe黅XۢF:)[FGXƹ/w_cMeD,ʡcc.WDtA$j@:) -# u c1<@ۗ9F)KJ-hpP]_x[qBlbpʖw q"LFGdƶ*s+ډ_Zc"?%t[IP 6J]#=ɺVvvCGsGh1 >)6|ey?Lӣm,4GWUi`]uJVoVDG< SB6ϏQ@ TiUlyOU0kfV~~}SZ@*WUUi##; s/[=!7}"WN]'(L! ~y5g9T̅JkbM' +s:S +B)v@Mj e Cf jE 0Y\QnzG1д~Wo{T9?`Rmyhsy3!HAD]mc1~2LSu7xT;j$`}4->L#vzŏILS ֭T{rjGKC;bpU=-`BsK.SFw4Mq]ZdHS0)tLg