JFIF$        dd7 

Viewing File: /opt/imunify360/venv/lib/python3.11/site-packages/clcommon/group_info_reader.py

# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENCE.TXT
#

import platform
from time import sleep
from typing import Dict  # NOQA

import requests
from clcommon.utils import is_testing_enabled_repo, get_cl_version, is_ubuntu, get_os_version
from clcommon.clexception import FormattedException


class GroupInfoReaderError(FormattedException):
    def __init__(self, reason, message=None, details=None):
        if message is None:
            message = (
                "Encountered an error while getting the remote available groups info. "
                "Try again later. If the same problem occurs again - contact CloudLinux support."
            )
        super().__init__({
            "message": message,
            "context": {"reason": reason},
            "details": details,
        })


class GroupInfoReader:
    """
    The purpose of this class is to get remote yum info
    about available groups:
    1. alt-php
    2. alt-nodejs
    3. alt-ruby
    4. alt-python
    There we get special url with json which depends on machine features:
    cl version: 6, 7, 6 hybrid
    architecture: x84_64, i386
    enabled testing repositories status: beta, stable
    """

    # TODO: consider about caching during some period of time
    GROUP_INFO = None
    GROUP_URL = None
    CL_BASE = "https://repo.cloudlinux.com/other/"
    UBUNTU_BASE = "https://repo.cloudlinux.com/cloudlinux-ubuntu/"

    @classmethod
    def group_url(cls):
        # type: () -> str
        """Get url with available groups json"""
        if cls.GROUP_URL is None:
            cls.GROUP_URL = cls._get_group_url()
        return cls.GROUP_URL

    @classmethod
    def _get_group_url(cls):
        # type () -> str
        """
        Final url example for CloudLinux:
        https://repo.cloudlinux.com/other/cl6/package-info.x86_64.stable.json
        Final url example for Ubuntu:
        https://repo.cloudlinux.com/cloudlinux-ubuntu/package-info.20_04.json
        for cl6, arch x86_64 and disabled testing: stable
        :return: string with the result url or None if the CL version cannot be identified
        """
        if is_ubuntu():
            _, ubuntu_version = get_os_version()
            # 20.04 -> 20_04
            ubuntu_version = ubuntu_version.replace(".", "_")
            suffix = f"package-info.{ubuntu_version}.json"
            return cls.UBUNTU_BASE + suffix
        else:
            arch = platform.machine()
            arch = "i386" if arch != "x86_64" else arch
            repo = "beta" if is_testing_enabled_repo() else "stable"
            cl_version = get_cl_version()
            if cl_version is None:
                return None
            # replace cl7h -> cl7, due to using same repo link for 7/7h
            cl_version = cl_version.replace("cl7h", "cl7")
            suffix = ".".join(["/package-info", arch, repo, "json"])
            return cls.CL_BASE + cl_version + suffix

    @classmethod
    def get_available_groups(cls):
        # type: () -> Dict
        """
        Sends request to group url, gets json and converts it to dict
        :return: dict with groups info
        """
        if cls.GROUP_INFO is not None:
            return cls.GROUP_INFO
        url = cls.group_url()
        if url is None:
            raise GroupInfoReaderError(
                "Could not identify CloudLinux version",
                message="Could not identify CloudLinux version (using kernel version). "
                "Restart your system. If the same problem occurs again"
                " - contact CloudLinux support.",
            )
        # requests.get() with 3 retries
        # Increasing sleep time
        attempts = 3
        for i in range(attempts):
            try:
                cls.GROUP_INFO = requests.get(url, timeout=20).json()
                return cls.GROUP_INFO
            except requests.exceptions.RequestException as ex:
                if i + 1 >= attempts:
                    err_message = f"Unable to reach {url}. Check your internet connection \
                        or try again later. If the same problem occurs again \
                        - contact CloudLinux support."
                    raise GroupInfoReaderError(
                        f"{url} - link unavailable",
                        message=err_message,
                        details=str(ex),
                    ) from ex
                sleep(i + 1)
        return {}

    @classmethod
    def get_group_info(cls, group):
        # type: (str) -> Dict
        """
        Filter dict with all available groups by special group name
        E.g: group = python
        we will get dict like that:
        {'alt-python27': {'version': '2.7.15', 'name': 'alt-python27', 'release': '1.el6'},
        'alt-python33': {'version': '3.3.3', 'name': 'alt-python33', 'release': '1.el6'},
        'alt-python34': {'version': '3.4.4', 'name': 'alt-python34', 'release': '1.el6'}...}
        :param group
        :rtype: dict with info per group
        """
        group_info = {}
        available_groups = cls.get_available_groups()
        for grp in available_groups.get("groups_info", {}):
            if group in grp:
                group_info.update({grp: available_groups["groups_info"][grp]})
        return group_info
Back to Directory  nL+D550H?Mx ,D"v]qv;6*Zqn)ZP0!1 A "#a$2Qr D8 a Ri[f\mIykIw0cuFcRı?lO7к_f˓[C$殷WF<_W ԣsKcëIzyQy/_LKℂ;C",pFA:/]=H  ~,ls/9ć:[=/#f;)x{ٛEQ )~ =𘙲r*2~ a _V=' kumFD}KYYC)({ *g&f`툪ry`=^cJ.I](*`wq1dđ#̩͑0;H]u搂@:~וKL Nsh}OIR*8:2 !lDJVo(3=M(zȰ+i*NAr6KnSl)!JJӁ* %݉?|D}d5:eP0R;{$X'xF@.ÊB {,WJuQɲRI;9QE琯62fT.DUJ;*cP A\ILNj!J۱+O\͔]ޒS߼Jȧc%ANolՎprULZԛerE2=XDXgVQeӓk yP7U*omQIs,K`)6\G3t?pgjrmۛجwluGtfh9uyP0D;Uڽ"OXlif$)&|ML0Zrm1[HXPlPR0'G=i2N+0e2]]9VTPO׮7h(F*癈'=QVZDF,d߬~TX G[`le69CR(!S2!P <0x<!1AQ "Raq02Br#SCTb ?Ζ"]mH5WR7k.ۛ!}Q~+yԏz|@T20S~Kek *zFf^2X*(@8r?CIuI|֓>^ExLgNUY+{.RѪ τV׸YTD I62'8Y27'\TP.6d&˦@Vqi|8-OΕ]ʔ U=TL8=;6c| !qfF3aů&~$l}'NWUs$Uk^SV:U# 6w++s&r+nڐ{@29 gL u"TÙM=6(^"7r}=6YݾlCuhquympǦ GjhsǜNlɻ}o7#S6aw4!OSrD57%|?x>L |/nD6?/8w#[)L7+6〼T ATg!%5MmZ/c-{1_Je"|^$'O&ޱմTrb$w)R$& N1EtdU3Uȉ1pM"N*(DNyd96.(jQ)X 5cQɎMyW?Q*!R>6=7)Xj5`J]e8%t!+'!1Q5 !1 AQaqё#2"0BRb?Gt^## .llQT $v,,m㵜5ubV =sY+@d{N! dnO<.-B;_wJt6;QJd.Qc%p{ 1,sNDdFHI0ГoXшe黅XۢF:)[FGXƹ/w_cMeD,ʡcc.WDtA$j@:) -# u c1<@ۗ9F)KJ-hpP]_x[qBlbpʖw q"LFGdƶ*s+ډ_Zc"?%t[IP 6J]#=ɺVvvCGsGh1 >)6|ey?Lӣm,4GWUi`]uJVoVDG< SB6ϏQ@ TiUlyOU0kfV~~}SZ@*WUUi##; s/[=!7}"WN]'(L! ~y5g9T̅JkbM' +s:S +B)v@Mj e Cf jE 0Y\QnzG1д~Wo{T9?`Rmyhsy3!HAD]mc1~2LSu7xT;j$`}4->L#vzŏILS ֭T{rjGKC;bpU=-`BsK.SFw4Mq]ZdHS0)tLg