Source code for megengine.hub.hub

# -*- coding: utf-8 -*-
import functools
import hashlib
import os
import sys
import types
from typing import Any, List
from urllib.parse import urlparse

from megengine.utils.http_download import download_from_url

from ..distributed import is_distributed
from ..logger import get_logger
from ..serialization import load as _mge_load_serialized
from .const import (
    DEFAULT_CACHE_DIR,
    DEFAULT_GIT_HOST,
    DEFAULT_PROTOCOL,
    ENV_MGE_HOME,
    ENV_XDG_CACHE_HOME,
    HUBCONF,
    HUBDEPENDENCY,
)
from .exceptions import InvalidProtocol
from .fetcher import GitHTTPSFetcher, GitSSHFetcher
from .tools import cd, check_module_exists, load_module

logger = get_logger(__name__)


PROTOCOLS = {
    "HTTPS": GitHTTPSFetcher,
    "SSH": GitSSHFetcher,
}


def _get_megengine_home() -> str:
    r"""MGE_HOME setting complies with the XDG Base Directory Specification"""
    megengine_home = os.path.expanduser(
        os.getenv(
            ENV_MGE_HOME,
            os.path.join(os.getenv(ENV_XDG_CACHE_HOME, DEFAULT_CACHE_DIR), "megengine"),
        )
    )
    return megengine_home


def _get_repo(
    git_host: str,
    repo_info: str,
    use_cache: bool = False,
    commit: str = None,
    protocol: str = DEFAULT_PROTOCOL,
) -> str:
    if protocol not in PROTOCOLS:
        raise InvalidProtocol(
            "Invalid protocol, the value should be one of {}.".format(
                ", ".join(PROTOCOLS.keys())
            )
        )
    cache_dir = os.path.expanduser(os.path.join(_get_megengine_home(), "hub"))
    with cd(cache_dir):
        fetcher = PROTOCOLS[protocol]
        repo_dir = fetcher.fetch(git_host, repo_info, use_cache, commit)
        return os.path.join(cache_dir, repo_dir)


def _check_dependencies(module: types.ModuleType) -> None:
    if not hasattr(module, HUBDEPENDENCY):
        return

    dependencies = getattr(module, HUBDEPENDENCY)
    if not dependencies:
        return

    missing_deps = [m for m in dependencies if not check_module_exists(m)]
    if len(missing_deps):
        raise RuntimeError("Missing dependencies: {}".format(", ".join(missing_deps)))


def _init_hub(
    repo_info: str,
    git_host: str,
    use_cache: bool = True,
    commit: str = None,
    protocol: str = DEFAULT_PROTOCOL,
):
    r"""Imports hubmodule like python import.

    Args:
        repo_info: a string with format ``"repo_owner/repo_name[:tag_name/:branch_name]"`` with an optional
            tag/branch. The default branch is ``master`` if not specified. Eg: ``"brain_sdk/MegBrain[:hub]"``
        git_host: host address of git repo. Eg: github.com
        use_cache: whether to use locally cached code or completely re-fetch.
        commit: commit id on github or gitlab.
        protocol: which protocol to use to get the repo, and HTTPS protocol only supports public repo on github.
            The value should be one of HTTPS, SSH.

    Returns:
        a python module.
    """
    cache_dir = os.path.expanduser(os.path.join(_get_megengine_home(), "hub"))
    os.makedirs(cache_dir, exist_ok=True)
    absolute_repo_dir = _get_repo(
        git_host, repo_info, use_cache=use_cache, commit=commit, protocol=protocol
    )
    sys.path.insert(0, absolute_repo_dir)
    hubmodule = load_module(HUBCONF, os.path.join(absolute_repo_dir, HUBCONF))
    sys.path.remove(absolute_repo_dir)

    return hubmodule


[docs]@functools.wraps(_init_hub) def import_module(*args, **kwargs): return _init_hub(*args, **kwargs)
[docs]def list( repo_info: str, git_host: str = DEFAULT_GIT_HOST, use_cache: bool = True, commit: str = None, protocol: str = DEFAULT_PROTOCOL, ) -> List[str]: r"""Lists all entrypoints available in repo hubconf. Args: repo_info: a string with format ``"repo_owner/repo_name[:tag_name/:branch_name]"`` with an optional tag/branch. The default branch is ``master`` if not specified. Eg: ``"brain_sdk/MegBrain[:hub]"`` git_host: host address of git repo. Eg: github.com use_cache: whether to use locally cached code or completely re-fetch. commit: commit id on github or gitlab. protocol: which protocol to use to get the repo, and HTTPS protocol only supports public repo on github. The value should be one of HTTPS, SSH. Returns: all entrypoint names of the model. """ hubmodule = _init_hub(repo_info, git_host, use_cache, commit, protocol) return [ _ for _ in dir(hubmodule) if not _.startswith("__") and callable(getattr(hubmodule, _)) ]
[docs]def load( repo_info: str, entry: str, *args, git_host: str = DEFAULT_GIT_HOST, use_cache: bool = True, commit: str = None, protocol: str = DEFAULT_PROTOCOL, **kwargs ) -> Any: r"""Loads model from github or gitlab repo, with pretrained weights. Args: repo_info: a string with format ``"repo_owner/repo_name[:tag_name/:branch_name]"`` with an optional tag/branch. The default branch is ``master`` if not specified. Eg: ``"brain_sdk/MegBrain[:hub]"`` entry: an entrypoint defined in hubconf. git_host: host address of git repo. Eg: github.com use_cache: whether to use locally cached code or completely re-fetch. commit: commit id on github or gitlab. protocol: which protocol to use to get the repo, and HTTPS protocol only supports public repo on github. The value should be one of HTTPS, SSH. Returns: a single model with corresponding pretrained weights. """ hubmodule = _init_hub(repo_info, git_host, use_cache, commit, protocol) if not hasattr(hubmodule, entry) or not callable(getattr(hubmodule, entry)): raise RuntimeError("Cannot find callable {} in hubconf.py".format(entry)) _check_dependencies(hubmodule) module = getattr(hubmodule, entry)(*args, **kwargs) return module
[docs]def help( repo_info: str, entry: str, git_host: str = DEFAULT_GIT_HOST, use_cache: bool = True, commit: str = None, protocol: str = DEFAULT_PROTOCOL, ) -> str: r"""This function returns docstring of entrypoint ``entry`` by following steps: 1. Pull the repo code specified by git and repo_info. 2. Load the entry defined in repo's hubconf.py 3. Return docstring of function entry. Args: repo_info: a string with format ``"repo_owner/repo_name[:tag_name/:branch_name]"`` with an optional tag/branch. The default branch is ``master`` if not specified. Eg: ``"brain_sdk/MegBrain[:hub]"`` entry: an entrypoint defined in hubconf.py git_host: host address of git repo. Eg: github.com use_cache: whether to use locally cached code or completely re-fetch. commit: commit id on github or gitlab. protocol: which protocol to use to get the repo, and HTTPS protocol only supports public repo on github. The value should be one of HTTPS, SSH. Returns: docstring of entrypoint ``entry``. """ hubmodule = _init_hub(repo_info, git_host, use_cache, commit, protocol) if not hasattr(hubmodule, entry) or not callable(getattr(hubmodule, entry)): raise RuntimeError("Cannot find callable {} in hubconf.py".format(entry)) doc = getattr(hubmodule, entry).__doc__ return doc
[docs]def load_serialized_obj_from_url(url: str, model_dir=None) -> Any: """Loads MegEngine serialized object from the given URL. If the object is already present in ``model_dir``, it's deserialized and returned. If no ``model_dir`` is specified, it will be ``MGE_HOME/serialized``. Args: url: url to serialized object. model_dir: dir to cache target serialized file. Returns: loaded object. """ if model_dir is None: model_dir = os.path.join(_get_megengine_home(), "serialized") os.makedirs(model_dir, exist_ok=True) parts = urlparse(url) filename = os.path.basename(parts.path) # use hash as prefix to avoid filename conflict from different urls sha256 = hashlib.sha256() sha256.update(url.encode()) digest = sha256.hexdigest()[:6] filename = digest + "_" + filename cached_file = os.path.join(model_dir, filename) logger.info( "load_serialized_obj_from_url: download to or using cached %s", cached_file ) if not os.path.exists(cached_file): if is_distributed(): logger.warning( "Downloading serialized object in DISTRIBUTED mode\n" " File may be downloaded multiple times. We recommend\n" " users to download in single process first." ) download_from_url(url, cached_file) state_dict = _mge_load_serialized(cached_file) return state_dict
[docs]class pretrained: r"""Decorator which helps to download pretrained weights from the given url. Including fs, s3, http(s). For example, we can decorate a resnet18 function as follows .. code-block:: @hub.pretrained("https://url/to/pretrained_resnet18.pkl") def resnet18(**kwargs): Returns: When decorated function is called with ``pretrained=True``, MegEngine will automatically download and fill the returned model with pretrained weights. """ def __init__(self, url): self.url = url def __call__(self, func): @functools.wraps(func) def pretrained_model_func( pretrained=False, **kwargs ): # pylint: disable=redefined-outer-name model = func(**kwargs) if pretrained: weights = load_serialized_obj_from_url(self.url) model.load_state_dict(weights) return model return pretrained_model_func
__all__ = [ "list", "load", "help", "load_serialized_obj_from_url", "pretrained", "import_module", ]