JFIF$        dd7 

Viewing File: /opt/imunify360/venv/lib/python3.11/site-packages/imav/wordpress/proxy_auth.py

"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.


This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 
See the GNU General Public License for more details.


You should have received a copy of the GNU General Public License
 along with this program.  If not, see <https://www.gnu.org/licenses/>.

Copyright © 2019 Cloud Linux Software Inc.

This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import asyncio
import logging
import os
import pwd
import secrets
from datetime import datetime, timedelta
from functools import lru_cache
from pathlib import Path

import jwt

from defence360agent.utils import atomic_rewrite, check_run
from imav.wordpress.cli import get_data_dir
from imav.wordpress.utils import write_plugin_data_file_atomically

logger = logging.getLogger(__name__)

DEFAULT_TOKEN_EXPIRATION = timedelta(hours=72)
JWT_SECRET_PATH = "/etc/imunify-agent-proxy/jwt-secret"
JWT_SECRET_PATH_OLD = "/etc/imunify-agent-proxy/jwt-secret.old"
PROXY_SERVICE_NAME = "imunify-agent-proxy"
SECRET_EXPIRATION_TTL = timedelta(days=7)


def is_secret_expired():
    try:
        stat = os.stat(JWT_SECRET_PATH)
    except FileNotFoundError:
        st_mtime = 0.0
    else:
        st_mtime = stat.st_mtime
    return (
        datetime.now().timestamp() - st_mtime > SECRET_EXPIRATION_TTL.seconds
    )


async def rotate_secret():
    """Load JWT secret from the configured file path."""
    secret_path = Path(JWT_SECRET_PATH)
    try:
        logger.info(
            "Rotating proxy auth secret",
        )
        stub_secret = secrets.token_bytes(32)
        secret_path.parent.mkdir(mode=0o700, parents=True, exist_ok=True)
        secret_path.touch(mode=0o600)
        atomic_rewrite(
            secret_path,
            stub_secret,
            uid=-1,
            backup=str(JWT_SECRET_PATH_OLD),
            permissions=0o600,
        )
        await check_run(["systemctl", "restart", PROXY_SERVICE_NAME])
    except Exception as e:
        logger.error("Got error while rotating the secret %s", e)


@lru_cache(1)
def load_secret_from_file() -> bytes:
    """Load JWT secret from the configured file path."""
    try:
        with open(JWT_SECRET_PATH, "rb") as f:
            return f.read().strip()
    except FileNotFoundError:
        logger.error("JWT secret file not found at %s", JWT_SECRET_PATH)
        raise
    except Exception as e:
        logger.error("Failed to read JWT secret: %s", e)
        raise


def generate_token(username: str, docroot: str) -> str:
    """
    Generate a JWT token for the given username and docroots.

    Args:
        username: The username for the token
        docroot: document root paths the user has access to

    Returns:
        The JWT token string
    """
    exp_time = datetime.utcnow() + DEFAULT_TOKEN_EXPIRATION

    claims = {"exp": exp_time, "username": username, "site_path": docroot}

    try:
        token = jwt.encode(claims, load_secret_from_file(), algorithm="HS256")
        return token
    except Exception as e:
        logger.error("Failed to generate JWT token: %s", e)
        raise


async def create_auth_php_file(site, token: str, uid, gid: int) -> None:
    """
    Create the auth.php file in the site's imunify-security directory.

    Args:
        site: WPSite instance
        token: JWT token string
        uid, gid: int used for file creation
    """
    try:
        data_dir = await get_data_dir(site)
        auth_file_path = data_dir / "auth.php"
        php_content = f"""<?php
return array(
\t'token' => '{token}',
);
"""
        # Run the file write operation in a thread pool
        await asyncio.to_thread(
            write_plugin_data_file_atomically,
            auth_file_path,
            php_content,
            uid,
            gid,
        )

        logger.info(
            "Created auth.php file for site %s at %s", site, auth_file_path
        )

    except Exception as e:
        logger.error("Failed to create auth.php file for site %s: %s", site, e)
        raise


async def setup_site_authentication(
    site, user_info: pwd.struct_passwd
) -> None:
    """
    Set up authentication for a site by creating JWT token and auth.php file.

    Args:
        site: WPSite instance
        user_info: pwd.struct_passwd data
    """
    try:
        token = generate_token(user_info.pw_name, str(site.docroot))

        await create_auth_php_file(
            site, token, user_info.pw_uid, user_info.pw_gid
        )
        logger.info("Successfully set up authentication for site %s", site)
    except Exception as e:
        logger.error(
            "Failed to set up authentication for site %s: %s", site, e
        )
        raise
Back to Directory  nL+D550H?Mx ,D"v]qv;6*Zqn)ZP0!1 A "#a$2Qr D8 a Ri[f\mIykIw0cuFcRı?lO7к_f˓[C$殷WF<_W ԣsKcëIzyQy/_LKℂ;C",pFA:/]=H  ~,ls/9ć:[=/#f;)x{ٛEQ )~ =𘙲r*2~ a _V=' kumFD}KYYC)({ *g&f`툪ry`=^cJ.I](*`wq1dđ#̩͑0;H]u搂@:~וKL Nsh}OIR*8:2 !lDJVo(3=M(zȰ+i*NAr6KnSl)!JJӁ* %݉?|D}d5:eP0R;{$X'xF@.ÊB {,WJuQɲRI;9QE琯62fT.DUJ;*cP A\ILNj!J۱+O\͔]ޒS߼Jȧc%ANolՎprULZԛerE2=XDXgVQeӓk yP7U*omQIs,K`)6\G3t?pgjrmۛجwluGtfh9uyP0D;Uڽ"OXlif$)&|ML0Zrm1[HXPlPR0'G=i2N+0e2]]9VTPO׮7h(F*癈'=QVZDF,d߬~TX G[`le69CR(!S2!P <0x<!1AQ "Raq02Br#SCTb ?Ζ"]mH5WR7k.ۛ!}Q~+yԏz|@T20S~Kek *zFf^2X*(@8r?CIuI|֓>^ExLgNUY+{.RѪ τV׸YTD I62'8Y27'\TP.6d&˦@Vqi|8-OΕ]ʔ U=TL8=;6c| !qfF3aů&~$l}'NWUs$Uk^SV:U# 6w++s&r+nڐ{@29 gL u"TÙM=6(^"7r}=6YݾlCuhquympǦ GjhsǜNlɻ}o7#S6aw4!OSrD57%|?x>L |/nD6?/8w#[)L7+6〼T ATg!%5MmZ/c-{1_Je"|^$'O&ޱմTrb$w)R$& N1EtdU3Uȉ1pM"N*(DNyd96.(jQ)X 5cQɎMyW?Q*!R>6=7)Xj5`J]e8%t!+'!1Q5 !1 AQaqё#2"0BRb?Gt^## .llQT $v,,m㵜5ubV =sY+@d{N! dnO<.-B;_wJt6;QJd.Qc%p{ 1,sNDdFHI0ГoXшe黅XۢF:)[FGXƹ/w_cMeD,ʡcc.WDtA$j@:) -# u c1<@ۗ9F)KJ-hpP]_x[qBlbpʖw q"LFGdƶ*s+ډ_Zc"?%t[IP 6J]#=ɺVvvCGsGh1 >)6|ey?Lӣm,4GWUi`]uJVoVDG< SB6ϏQ@ TiUlyOU0kfV~~}SZ@*WUUi##; s/[=!7}"WN]'(L! ~y5g9T̅JkbM' +s:S +B)v@Mj e Cf jE 0Y\QnzG1д~Wo{T9?`Rmyhsy3!HAD]mc1~2LSu7xT;j$`}4->L#vzŏILS ֭T{rjGKC;bpU=-`BsK.SFw4Mq]ZdHS0)tLg