MMCT TEAM
Server IP : 128.199.20.84  /  Your IP : 172.69.7.229
Web Server : Apache/2.4.41 (Ubuntu)
System : Linux competent-maruti 5.4.0-128-generic #144-Ubuntu SMP Tue Sep 20 11:00:04 UTC 2022 x86_64
User : www-data ( 33)
PHP Version : 8.0.20
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF
Directory (0755) :  /usr/lib/python2.7/../python3/dist-packages/uaclient/

[  Home  ][  C0mmand  ][  Upload File  ]

Current File : //usr/lib/python2.7/../python3/dist-packages/uaclient/actions.py
import logging
import sys
from typing import Optional  # noqa: F401

from uaclient import (
    clouds,
    config,
    contract,
    entitlements,
    event_logger,
    exceptions,
    messages,
)
from uaclient import status as ua_status
from uaclient.clouds import identity

LOG = logging.getLogger("ua.actions")
event = event_logger.get_event_logger()


def attach_with_token(
    cfg: config.UAConfig, token: str, allow_enable: bool
) -> None:
    """
    Common functionality to take a token and attach via contract backend
    :raise UrlError: On unexpected connectivity issues to contract
        server or inability to access identity doc from metadata service.
    :raise ContractAPIError: On unexpected errors when talking to the contract
        server.
    """
    from uaclient.jobs.update_messaging import update_apt_and_motd_messages

    try:
        contract.request_updated_contract(
            cfg, token, allow_enable=allow_enable
        )
    except exceptions.UrlError as exc:
        # Persist updated status in the event of partial attach
        ua_status.status(cfg=cfg)
        update_apt_and_motd_messages(cfg)
        raise exc
    except exceptions.UserFacingError as exc:
        event.info(exc.msg, file_type=sys.stderr)
        # Persist updated status in the event of partial attach
        ua_status.status(cfg=cfg)
        update_apt_and_motd_messages(cfg)
        raise exc

    current_iid = identity.get_instance_id()
    if current_iid:
        cfg.write_cache("instance-id", current_iid)

    update_apt_and_motd_messages(cfg)


def auto_attach(
    cfg: config.UAConfig, cloud: clouds.AutoAttachCloudInstance
) -> None:
    """
    :raise UrlError: On unexpected connectivity issues to contract
        server or inability to access identity doc from metadata service.
    :raise ContractAPIError: On unexpected errors when talking to the contract
        server.
    :raise NonAutoAttachImageError: If this cloud type does not have
        auto-attach support.
    """
    contract_client = contract.UAContractClient(cfg)
    try:
        tokenResponse = contract_client.request_auto_attach_contract_token(
            instance=cloud
        )
    except exceptions.ContractAPIError as e:
        if e.code and 400 <= e.code < 500:
            raise exceptions.NonAutoAttachImageError(
                messages.UNSUPPORTED_AUTO_ATTACH
            )
        raise e

    token = tokenResponse["contractToken"]

    attach_with_token(cfg, token=token, allow_enable=True)


def enable_entitlement_by_name(
    cfg: config.UAConfig,
    name: str,
    *,
    assume_yes: bool = False,
    allow_beta: bool = False
):
    """
    Constructs an entitlement based on the name provided. Passes kwargs onto
    the entitlement constructor.
    :raise EntitlementNotFoundError: If no entitlement with the given name is
        found, then raises this error.
    """
    ent_cls = entitlements.entitlement_factory(cfg=cfg, name=name)
    entitlement = ent_cls(
        cfg, assume_yes=assume_yes, allow_beta=allow_beta, called_name=name
    )
    return entitlement.enable()


def status(
    cfg: config.UAConfig,
    *,
    simulate_with_token: Optional[str] = None,
    show_beta: bool = False
):
    """
    Construct the current UA status dictionary.
    """
    if simulate_with_token:
        status, ret = ua_status.simulate_status(
            cfg=cfg, token=simulate_with_token, show_beta=show_beta
        )
    else:
        status = ua_status.status(cfg=cfg, show_beta=show_beta)
        ret = 0

    return status, ret

MMCT - 2023