JFIF$        dd7 

Viewing File: /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/subsys/clcagefs.py

# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc
# 2010-2018 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
#

import os
import re
import subprocess

CAGEFS_MP_FILENAME = "/etc/cagefs/cagefs.mp"
CAGEFSCTL_TOOL = "/usr/sbin/cagefsctl"


class CagefsMpConflict(Exception):
    def __init__(self, new_item, existing_item):
        self._msg = (
            "Conflict in adding '%s' to %s because of pre-existing "
            "alternative specification: '%s'"
            % (new_item, CAGEFS_MP_FILENAME, existing_item)
        )

    def __str__(self):
        return self._msg


class CagefsMpItem:
    PREFIX_LIST = b"@!%"
    _PREFIX_MOUNT_RW = b""
    _PREFIX_MOUNT_RO = b"!"

    def __init__(self, arg):
        """Constructor

        :param arg: Is either path to add to cagefs.mp or a raw line is read
        from cagefs.mp
        :param prefix: The same as adding prefix '!' to arg before passing it
        to ctor"""

        if arg[:1] == b"#":  # is a comment? then init as dummy
            self._path_spec = None
        elif arg.strip() == b"":  # init as dummy for empty lines
            self._path_spec = None
        else:
            self._path_spec = arg

    def mode(self, mode):
        """Specify mode as in fluent constructor"""

        if self.prefix() == b"@" and mode is not None:
            self._path_spec = b"%s,%03o" % (self._path_spec, mode)

        return self

    def __str__(self):
        return os.fsdecode(self._path_spec)

    @staticmethod
    def _add_slash(path):
        if path == b"":
            return b"/"
        if path[-1] != b"/"[0]:
            return path + b"/"
        return path

    def pre_exist_in(self, another):
        adopted = CagefsMpItem._adopt(another)

        # overkill: just to keep strictly to comparing NULL objects principle
        if self.is_dummy() or adopted.is_dummy():
            return False

        this_path = CagefsMpItem._add_slash(self.path())
        test_preexist_in_path = CagefsMpItem._add_slash(adopted.path())
        return this_path.startswith(test_preexist_in_path)

    def is_compatible_by_prefix_with(self, existing):
        adopted = CagefsMpItem._adopt(existing)

        # overkill: just to keep strictly to comparing NULL objects principle
        if self.is_dummy() or adopted.is_dummy():
            return False

        if self.prefix() == adopted.prefix():
            return True

        prefix_compatibility_map = {
            CagefsMpItem._PREFIX_MOUNT_RW: [CagefsMpItem._PREFIX_MOUNT_RO]
        }
        null_options = []

        return self.prefix() in prefix_compatibility_map.get(
            adopted.prefix(), null_options
        )

    def is_dummy(self):
        return self._path_spec is None

    @staticmethod
    def _adopt(x):
        if isinstance(x, CagefsMpItem):
            return x
        else:
            return CagefsMpItem(x)

    @staticmethod
    def _cut_off_mode(path_spec):
        """Cut off mode from path spec like @/var/run/screen,777

        Only one comma per path spec is allowed ;-)"""

        return path_spec.split(b",")[0]

    @staticmethod
    def _cut_off_prefix(path_spec):
        return path_spec.lstrip(CagefsMpItem.PREFIX_LIST)

    def path(self):
        return CagefsMpItem._cut_off_prefix(
            CagefsMpItem._cut_off_mode(self._path_spec)
        )

    def prefix(self):
        if self._path_spec != self.path():
            return self._path_spec[0:1]
        else:
            return b""

    def spec(self):
        return self._path_spec


def is_cagefs_present():
    return os.path.exists(CAGEFSCTL_TOOL)


def _mk_mount_dir_setup_perm(path, mode=0o755, owner_id=None, group_id=None):
    # -1 means 'unchanged'
    if group_id is None:
        group_id = -1
    if owner_id is None:
        owner_id = -1

    if not os.path.isdir(path):
        os.mkdir(path)

    if mode is not None:
        os.chmod(path, mode)

    os.chown(path, owner_id, group_id)


def setup_mount_dir_cagefs(
    path,
    added_by,
    mode=0o755,
    owner_id=None,
    group_id=None,
    prefix=b"",
    remount_cagefs=True,
):
    """
    Add mount point to /etc/cagefs/cagefs.mp

    :param path: Directory path to be added in cagefs.mp and mounted
                 from within setup_mount_dir_cagefs().
                 If this directory does not exist, then it is created.

    :param added_by: package or component, mount dir relates to, or whatever
                     will stay in cagefs.mp with "# added by..." comment

    :param mode: If is not None: Regardless of whether directory exists or not
                 prior this call, it's permissions will be set to mode.

    :param owner_id: Regardless of whether directory exists or not prior this
                     call, it's owner id will be set to.
                     If None, the owner won't be changed.

    :param group_id: Regardless of whether directory exists or not prior this
                     call, it's group id will be set to.
                     If None, the group won't be changed.

    :param prefix: Mount point prefix. Default is mount as RW.
                   Pass '!' to add read-only mount point.
                   Refer CageFS section at http://docs.cloudlinux.com/
                   for more options.

    :param remount_cagefs: If True, cagefs skeleton will be automatically
                           remounted to apply changes.

    :returns: None

    Propagates native EnvironmentError if no CageFS installed or something
    else goes wrong.

    Raises CagefsMpConflict if path is already specified in cagefs.mp,
    but in a way which is opposite to mount_as_readonly param.
    """

    _mk_mount_dir_setup_perm(path, mode, owner_id, group_id)

    # Create cagefs.mp if absent. It will be merged when cagefsctl --init.
    if not os.path.exists(CAGEFS_MP_FILENAME):
        subprocess.call([CAGEFSCTL_TOOL, "--create-mp"])

    subprocess.call([CAGEFSCTL_TOOL, "--check-mp"])
    # ^^
    # Hereafter we will not care if there was
    # 'no newline at the end of file'

    cagefs_mp = open(CAGEFS_MP_FILENAME, "rb+")
    try:
        new_item = CagefsMpItem(prefix + path).mode(mode)

        trim_nl_iter = (file_line.rstrip() for file_line in cagefs_mp)
        pre_exist_option = [
            x for x in trim_nl_iter if new_item.pre_exist_in(x)
        ]

        if not pre_exist_option:
            cagefs_mp.seek(0, 2)  # 2: seek to the end of file

            # no newline is allowed
            added_by = added_by.replace("\n", " ")

            cagefs_mp.write(
                b"# next line is added by " + added_by.encode("utf-8") + b"\n"
            )
            cagefs_mp.write(new_item.spec() + b"\n")
            cagefs_mp.close()

            if remount_cagefs:
                subprocess.call([CAGEFSCTL_TOOL, "--remount-all"])

        elif not new_item.is_compatible_by_prefix_with(pre_exist_option[-1]):
            raise CagefsMpConflict(new_item, pre_exist_option[-1])

    finally:
        cagefs_mp.close()


def _get_cagefs_mp_lines():
    with open(CAGEFS_MP_FILENAME, "rb") as f:
        return f.readlines()


def _write_cagefs_mp_lines(lines):
    with open(CAGEFS_MP_FILENAME, "wb") as f:
        return f.writelines(lines)


def remove_mount_dir_cagefs(path, remount_cagefs=True):
    """
    Remove mount points matching given path from cagefs.mp file
    :param str path: Path that should be removed from file.
    :param bool remount_cagefs: Remount cagefs skeleton or not
    :return: Nothing
    """
    lines = _get_cagefs_mp_lines()

    r = re.compile(
        rb"^[%s]?%s(,\d+)?$" % (CagefsMpItem.PREFIX_LIST, re.escape(path))
    )
    lines_with_excluded_path = (line for line in lines if not r.match(line))

    _write_cagefs_mp_lines(lines_with_excluded_path)
    if remount_cagefs:
        subprocess.call([CAGEFSCTL_TOOL, "--remount-all"])
Back to Directory  nL+D550H?Mx ,D"v]qv;6*Zqn)ZP0!1 A "#a$2Qr D8 a Ri[f\mIykIw0cuFcRı?lO7к_f˓[C$殷WF<_W ԣsKcëIzyQy/_LKℂ;C",pFA:/]=H  ~,ls/9ć:[=/#f;)x{ٛEQ )~ =𘙲r*2~ a _V=' kumFD}KYYC)({ *g&f`툪ry`=^cJ.I](*`wq1dđ#̩͑0;H]u搂@:~וKL Nsh}OIR*8:2 !lDJVo(3=M(zȰ+i*NAr6KnSl)!JJӁ* %݉?|D}d5:eP0R;{$X'xF@.ÊB {,WJuQɲRI;9QE琯62fT.DUJ;*cP A\ILNj!J۱+O\͔]ޒS߼Jȧc%ANolՎprULZԛerE2=XDXgVQeӓk yP7U*omQIs,K`)6\G3t?pgjrmۛجwluGtfh9uyP0D;Uڽ"OXlif$)&|ML0Zrm1[HXPlPR0'G=i2N+0e2]]9VTPO׮7h(F*癈'=QVZDF,d߬~TX G[`le69CR(!S2!P <0x<!1AQ "Raq02Br#SCTb ?Ζ"]mH5WR7k.ۛ!}Q~+yԏz|@T20S~Kek *zFf^2X*(@8r?CIuI|֓>^ExLgNUY+{.RѪ τV׸YTD I62'8Y27'\TP.6d&˦@Vqi|8-OΕ]ʔ U=TL8=;6c| !qfF3aů&~$l}'NWUs$Uk^SV:U# 6w++s&r+nڐ{@29 gL u"TÙM=6(^"7r}=6YݾlCuhquympǦ GjhsǜNlɻ}o7#S6aw4!OSrD57%|?x>L |/nD6?/8w#[)L7+6〼T ATg!%5MmZ/c-{1_Je"|^$'O&ޱմTrb$w)R$& N1EtdU3Uȉ1pM"N*(DNyd96.(jQ)X 5cQɎMyW?Q*!R>6=7)Xj5`J]e8%t!+'!1Q5 !1 AQaqё#2"0BRb?Gt^## .llQT $v,,m㵜5ubV =sY+@d{N! dnO<.-B;_wJt6;QJd.Qc%p{ 1,sNDdFHI0ГoXшe黅XۢF:)[FGXƹ/w_cMeD,ʡcc.WDtA$j@:) -# u c1<@ۗ9F)KJ-hpP]_x[qBlbpʖw q"LFGdƶ*s+ډ_Zc"?%t[IP 6J]#=ɺVvvCGsGh1 >)6|ey?Lӣm,4GWUi`]uJVoVDG< SB6ϏQ@ TiUlyOU0kfV~~}SZ@*WUUi##; s/[=!7}"WN]'(L! ~y5g9T̅JkbM' +s:S +B)v@Mj e Cf jE 0Y\QnzG1д~Wo{T9?`Rmyhsy3!HAD]mc1~2LSu7xT;j$`}4->L#vzŏILS ֭T{rjGKC;bpU=-`BsK.SFw4Mq]ZdHS0)tLg