# -*- coding: utf-8 -*-# MegEngine is Licensed under the Apache License, Version 2.0 (the "License")## Copyright (c) 2014-2021 Megvii Inc. All rights reserved.## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.importcollectionsimportmathfromfunctoolsimportlru_cachefromtypingimportIterable,Optional,Sequence,Tuple,Unionfrom..coreimport_configfrom..core._imperative_rt.core2importapply,dtype_promotionfrom..core._imperative_rt.opsimportSubgraphBuilderas_SubgraphBuilderfrom..core._trace_optionimportuse_symbolic_shapefrom..core.opsimportbuiltinfrom..core.ops.builtinimportBatchNorm,Elemwise,GetVarShape,Reduce,TypeCvtfrom..core.ops.specialimportConstfrom..core.tensorimportampfrom..core.tensor.utilsimport_normalize_axis,cast_tensors,subgraphfrom..jitimportexclude_from_tracefrom..tensorimportTensorfrom..utils.deprecationimportdeprecated_kwargs_defaultfrom.debug_paramimportget_execution_strategyfrom.elemwiseimportclip,minimumfrom.tensorimportbroadcast_to,concat,expand_dims,squeeze__all__=["argmax","argmin","argsort","dot","isinf","isnan","matinv","matmul","max","mean","min","norm","normalize","prod","sign","sort","std","sum","svd","topk","var",]
[文档]defisnan(inp:Tensor)->Tensor:r"""Returns a new tensor representing if each element is ``NaN`` or not. Args: inp: input tensor. Returns: result tensor. Examples: .. testcode:: from megengine import tensor import megengine.functional as F x = tensor([1, float("nan"), 0]) print(F.isnan(x).numpy()) Outputs: .. testoutput:: [False True False] """returninp!=inp
[文档]defisinf(inp:Tensor)->Tensor:r"""Returns a new tensor representing if each element is ``Inf`` or not. Args: inp: input tensor. Returns: result tensor. Examples: .. testcode:: from megengine import tensor import megengine.functional as F x = tensor([1, float("inf"), 0]) print(F.isinf(x).numpy()) Outputs: .. testoutput:: [False True False] """returnabs(inp).astype("float32")==float("inf")
[文档]defsign(inp:Tensor):r"""Returns a new tensor representing the sign of each element in input tensor. Args: inp: Tensor: Returns: the sign of input tensor. Examples: .. testcode:: from megengine import tensor import megengine.functional as F x = tensor([1, -1, 0]) print(F.sign(x).numpy()) Outputs: .. testoutput:: [ 1 -1 0] """return(inp>0).astype(inp.dtype)-(inp<0).astype(inp.dtype)
[文档]defsum(inp:Tensor,axis:Optional[Union[int,Sequence[int]]]=None,keepdims:bool=False,)->Tensor:r"""Returns the sum of input tensor along given axis. If axis is a list of dimensions, reduce over all of them. Args: inp: input tensor. axis: dimension to reduce. If None, all dimensions will be reduced. Default: None keepdims: whether the output tensor has axis retained or not. Default: False Returns: output tensor. Examples: .. testcode:: import numpy as np from megengine import tensor import megengine.functional as F x = tensor(np.arange(1, 7, dtype=np.int32).reshape(2, 3)) out = F.sum(x) print(out.numpy()) Outputs: .. testoutput:: 21 """returninp.sum(axis=axis,keepdims=keepdims)
[文档]defprod(inp:Tensor,axis:Optional[Union[int,Sequence[int]]]=None,keepdims=False)->Tensor:r"""Returns the product of input tensor along given axis. If axis is a list of dimensions, reduce over all of them. Args: inp: input tensor. axis: dimension to reduce. If None, all dimensions will be reduced. Default: None keepdims: whether the output tensor has axis retained or not. Default: False Returns: output tensor. Examples: .. testcode:: import numpy as np from megengine import tensor import megengine.functional as F x = tensor(np.arange(1, 7, dtype=np.int32).reshape(2, 3)) out = F.prod(x) print(out.numpy()) Outputs: .. testoutput:: 720 """returninp.prod(axis=axis,keepdims=keepdims)
[文档]defmean(inp:Tensor,axis:Optional[Union[int,Sequence[int]]]=None,keepdims:bool=False,)->Tensor:r"""Returns the mean value of input tensor along given axis. If axis is a list of dimensions, reduce over all of them. Args: inp: input tensor. axis: dimension to reduce. If None, all dimensions will be reduced. Default: None keepdims: whether the output tensor has axis retained or not. Default: False Returns: output tensor. Examples: .. testcode:: import numpy as np from megengine import tensor import megengine.functional as F x = tensor(np.arange(1, 7, dtype=np.int32).reshape(2, 3)) out = F.mean(x) print(out.numpy()) Outputs: .. testoutput:: 3.5 """returninp.mean(axis=axis,keepdims=keepdims)
[文档]defvar(inp:Tensor,axis:Optional[Union[int,Sequence[int]]]=None,keepdims:bool=False,)->Tensor:r"""Returns the variance value of input tensor along given axis. If axis is a list of dimensions, reduce over all of them. Args: inp: input tensor. axis: dimension to reduce. If None, all dimensions will be reduced. Default: None keepdims: whether the output tensor has axis retained or not. Default: False Returns: output tensor. Examples: .. testcode:: import numpy as np from megengine import tensor import megengine.functional as F data = tensor(np.arange(1, 7, dtype=np.float32).reshape(2, 3)) out = F.var(data) print(out.numpy().round(decimals=4)) Outputs: .. testoutput:: 2.9167 """ifaxisisNone:m=mean(inp,axis=axis,keepdims=False)else:m=mean(inp,axis=axis,keepdims=True)v=inp-mreturnmean(v**2,axis=axis,keepdims=keepdims)
[文档]defstd(inp:Tensor,axis:Optional[Union[int,Sequence[int]]]=None,keepdims:bool=False,)->Tensor:r"""Returns the standard deviation of input tensor along given axis. If axis is a list of dimensions, reduce over all of them. Args: inp: input tensor. axis: dimension to reduce. If None, all dimensions will be reduced. Default: None keepdims: whether the output tensor has axis retained or not. Default: False Returns: output tensor. Examples: .. testcode:: import numpy as np from megengine import tensor import megengine.functional as F data = tensor(np.arange(1, 7, dtype=np.float32).reshape(2, 3)) out = F.std(data, axis=1) print(out.numpy().round(decimals=4)) Outputs: .. testoutput:: [0.8165 0.8165] """returnvar(inp,axis=axis,keepdims=keepdims)**0.5
[文档]defmin(inp:Tensor,axis:Optional[Union[int,Sequence[int]]]=None,keepdims:bool=False,)->Tensor:r"""Returns the min value of input tensor along given axis. If axis is a list of dimensions, reduce over all of them. Args: inp: input tensor. axis: dimension to reduce. If None, all dimensions will be reduced. Default: None keepdims: whether the output tensor has axis retained or not. Default: False Returns: output tensor. Examples: .. testcode:: import numpy as np from megengine import tensor import megengine.functional as F x = tensor(np.arange(1, 7, dtype=np.int32).reshape(2,3)) out = F.min(x) print(out.numpy()) Outputs: .. testoutput:: 1 """returninp.min(axis=axis,keepdims=keepdims)
[文档]defmax(inp:Tensor,axis:Optional[Union[int,Sequence[int]]]=None,keepdims:bool=False,)->Tensor:r"""Returns the max value of the input tensor along given axis. If axis is a list of dimensions, reduce over all of them. Args: inp: input tensor. axis: dimension to reduce. If None, all dimensions will be reduced. Default: None keepdims: whether the output tensor has axis retained or not. Default: False Returns: output tensor. Examples: .. testcode:: import numpy as np from megengine import tensor import megengine.functional as F x = tensor(np.arange(1, 7, dtype=np.int32).reshape(2,3)) out = F.max(x) print(out.numpy()) Outputs: .. testoutput:: 6 """returninp.max(axis=axis,keepdims=keepdims)
[文档]defnorm(inp:Tensor,ord:float=None,axis:int=None,keepdims=False,):r"""Calculates ``p``-norm of input tensor along given axis. Args: inp: input tensor. ord: power of value applied to inp. Default: 2 axis: dimension to reduce. If None, input must be a vector. Default: None keepdims: whether the output tensor has axis retained or not. Default: False Returns: output tensor. Examples: .. testcode:: import numpy as np from megengine import tensor import megengine.functional as F x = tensor(np.arange(-3, 3, dtype=np.float32)) out = F.norm(x) print(out.numpy().round(decimals=4)) Outputs: .. testoutput:: 4.3589 """ifaxisisNone:ifinp.ndim!=1:raiseTypeError("axis is required unless input is a vector")ifordisNone:ord=2iford==0:returnsum(inp!=0,axis=axis,keepdims=keepdims)iford==math.inf:returnmax(abs(inp))iford==-math.inf:returnmin(abs(inp))returnsum(abs(inp)**ord,axis=axis,keepdims=keepdims)**(1.0/ord)
[文档]defargmin(inp:Tensor,axis:Optional[Union[int,Sequence[int]]]=None,keepdims:bool=False,)->Tensor:r"""Returns the indices of the minimum values along given axis. If axis is a list of dimensions, reduce over all of them. Args: inp: input tensor. axis: dimension to reduce. If None, all dimensions will be reduced. Default: None keepdims: whether the output tensor has axis retained or not. Default: False Returns: output tensor. Examples: .. testcode:: import numpy as np from megengine import tensor import megengine.functional as F x = tensor(np.arange(1, 7, dtype=np.int32).reshape(2,3)) out = F.argmin(x) print(out.numpy()) Outputs: .. testoutput:: 0 """ifaxisisNone:assertnotkeepdims,"can not set axis=None and keepdims=True"inp=inp.flatten()axis=0axis=_normalize_axis(inp.ndim,axis,reverse=True)ifisinstance(axis,collections.abc.Iterable):foraiinaxis:op=builtin.Argmin(axis=ai)(inp,)=apply(op,inp)ifnotkeepdims:inp=squeeze(inp,ai)returninpop=builtin.Argmin(axis=axis)(result,)=apply(op,inp)ifnotkeepdims:result=squeeze(result,axis)returnresult
[文档]defargmax(inp:Tensor,axis:Optional[Union[int,Sequence[int]]]=None,keepdims:bool=False,)->Tensor:r"""Returns the indices of the maximum values along given axis. If axis is a list of dimensions, reduce over all of them. Args: inp: input tensor. axis: dimension to reduce. If None, all dimensions will be reduced. Default: None keepdims: whether the output tensor has axis retained or not. Default: False Returns: output tensor. Examples: .. testcode:: import numpy as np from megengine import tensor import megengine.functional as F x = tensor(np.arange(1, 7, dtype=np.int32).reshape(2,3)) out = F.argmax(x) print(out.numpy()) Outputs: .. testoutput:: 5 """ifaxisisNone:assertnotkeepdims,"can not set axis=None and keepdims=True"inp=inp.flatten()axis=0axis=_normalize_axis(inp.ndim,axis,reverse=True)ifisinstance(axis,collections.abc.Iterable):foraiinaxis:op=builtin.Argmax(axis=ai)(inp,)=apply(op,inp)ifnotkeepdims:inp=squeeze(inp,ai)returninpop=builtin.Argmax(axis=axis)(result,)=apply(op,inp)ifnotkeepdims:result=squeeze(result,axis)returnresult
[文档]defnormalize(inp:Tensor,ord:float=None,axis:int=None,eps:float=1e-12,)->Tensor:r"""Performs :math:`L_p` normalization of input tensor along given axis. For a tensor of shape :math:`(n_0, ..., n_{dim}, ..., n_k)`, each :math:`n_{dim}` -element vector :math:`v` along dimension :attr:`axis` is transformed as: .. math:: v = \frac{v}{\max(\lVert v \rVert_p, \epsilon)}. Args: inp: input tensor. ord: power of value applied to input tensor. Default: 2 axis: dimension to reduce.If None, input must be a vector. Default: None eps: a small value to avoid division by zero. Default: 1e-12 Returns: normalized output tensor. """ifaxisisNone:returninp/clip(norm(inp,ord,axis),lower=eps)else:returninp/clip(norm(inp,ord,axis,keepdims=True),lower=eps)
[文档]defargsort(inp:Tensor,descending:bool=False)->Tensor:r"""Returns the indices that would sort the input tensor. Args: inp: input tensor. If it's 2d, the result would be array of indices show how to sort each row in the input tensor. descending: sort in descending order, where the largest comes first. Default: False inp: Tensor: descending: bool: Returns: indices of int32 indicates how to sort the input. Examples: .. testcode:: import numpy as np from megengine import tensor import megengine.functional as F x = tensor(np.array([1,2], dtype=np.float32)) indices = F.argsort(x) print(indices.numpy()) Outputs: .. testoutput:: [0 1] """assertlen(inp.shape)<=2,"Input should be 1d or 2d"ifdescending:order="descending"else:order="ascending"op=builtin.Argsort(order=order)iflen(inp.shape)==1:inp=inp.reshape(1,-1)_,result=apply(op,inp)returnresult[0]_,result=apply(op,inp)returnresult
[文档]defsort(inp:Tensor,descending:bool=False)->Tuple[Tensor,Tensor]:r"""Returns sorted tensor and the indices would sort the input tensor. Args: inp: input tensor. If it's 2d, the result would be sorted by row. descending: sort in descending order, where the largest comes first. Default: False Returns: tuple of two tensors `(sorted_tensor, indices_of_int32)`. Examples: .. testcode:: import numpy as np from megengine import tensor import megengine.functional as F x = tensor(np.array([1,2], dtype=np.float32)) out, indices = F.sort(x) print(out.numpy()) Outputs: .. testoutput:: [1. 2.] """assertlen(inp.shape)<=2,"Input should be 1d or 2d"ifdescending:order="descending"else:order="ascending"op=builtin.Argsort(order=order)iflen(inp.shape)==1:inp=inp.reshape(1,-1)tns,ind=apply(op,inp)returntns[0],ind[0]tns,ind=apply(op,inp)returntns,ind
[文档]@deprecated_kwargs_default("1.12","descending",3)deftopk(inp:Tensor,k:int,descending:bool=False,kth_only:bool=False,no_sort:bool=False,)->Tuple[Tensor,Tensor]:r"""Selects the ``Top-K`` (by default) smallest elements of 2d matrix by row. Args: inp: input tensor. If input tensor is 2d, each row will be sorted. k: number of elements needed. descending: if True, return the largest elements instead. Default: False kth_only: if True, only the k-th element will be returned. Default: False no_sort: if True, the returned elements can be unordered. Default: False Returns: tuple of two tensors ``(topk_tensor, indices_of_int32)`` Examples: .. testcode:: import numpy as np from megengine import tensor import megengine.functional as F x = tensor(np.array([2, 4, 6, 8, 7, 5, 3, 1], dtype=np.float32)) top, indices = F.topk(x, 5, descending=False) print(top.numpy(), indices.numpy()) Outputs: .. testoutput:: [1. 2. 3. 4. 5.] [7 0 6 1 5] """ifdescending:k=-kifkth_only:mode="kth_only"elifno_sort:mode="value_idx_nosort"else:mode="value_idx_sorted"op=builtin.TopK(mode=mode)ifnotisinstance(k,Tensor):(k,)=Const(k,dtype="int32",device=inp.device)()iflen(inp.shape)==1:ifkth_only:(tns,)=apply(op,expand_dims(inp,0),k)# FIXME:# could use a dedicated kernel# gradient may be routed to other indices if k-th value is not uniqueind=argmax((tns==inp).astype("int8"))tns=squeeze(tns,0)else:tns,ind=apply(op,expand_dims(inp,0),k)tns=squeeze(tns,0)ind=squeeze(ind,0)else:ifkth_only:(tns,)=apply(op,inp,k)# FIXME: same as aboveind=argmax((expand_dims(tns,1)==inp).astype("int8"),1)else:tns,ind=apply(op,inp,k)returntns,ind
[文档]defmatinv(inp:Tensor)->Tensor:r"""Computes the inverse of a batch of matrices; input must has shape [..., n, n]. Args: inp: input tensor. Returns: output tensor. Examples: .. testcode:: import numpy as np from megengine import tensor import megengine.functional as F data = tensor([[1.0, 0.0], [1.0, 1.0]]) out = F.matinv(data) print(out.numpy()) Outputs: .. testoutput:: [[ 1. 0.] [-1. 1.]] """(result,)=apply(builtin.MatrixInverse(),inp)returnresult
[文档]defmatmul(inp1:Tensor,inp2:Tensor,transpose_a=False,transpose_b=False,compute_mode="default",format="default",)->Tensor:r"""Performs a matrix multiplication of the matrices ``inp1`` and ``inp2``. With different inputs dim, this function behaves differently: * Both 1-D tensor, simply forward to ``dot``. * Both 2-D tensor, normal matrix multiplication. * If one input tensor is 1-D, matrix vector multiplication. * If at least one tensor are 3-dimensional or >3-dimensional, the other tensor should have dim >= 2, the batched matrix-matrix is returned, and the tensor with smaller dimension will be broadcasted. For example: * inp1: `(n, k, m)`, inp2: `(n, m, p)`, return: `(n, k, p)` * inp1: `(n, k, m)`, inp2: `(m, p)`, return: `(n, k, p)` * inp1: `(n, j, k, m)`, inp2: `(n, j, m, p)`, return: `(n, j, k, p)` Args: inp1: first matrix to be multiplied. inp2: second matrix to be multiplied. Returns: output tensor. Examples: .. testcode:: import numpy as np from megengine import tensor import megengine.functional as F data1 = tensor(np.arange(0, 6, dtype=np.float32).reshape(2, 3)) data2 = tensor(np.arange(0, 6, dtype=np.float32).reshape(3, 2)) out = F.matmul(data1, data2) print(out.numpy()) Outputs: .. testoutput:: [[10. 13.] [28. 40.]] """ifamp._enabled:compute_mode="float32"inp1,inp2=cast_tensors(inp1,inp2)else:dtype=dtype_promotion(inp1,inp2)ifinp1.dtype!=dtype:inp1=inp1.astype(dtype)ifinp2.dtype!=dtype:inp2=inp2.astype(dtype)dim1,dim2=inp1.ndim,inp2.ndimassertdim1>0anddim2>0maxdim=dim1ifdim1>dim2elsedim2compute_mode=_config._get_actual_op_param(compute_mode,_config.__compute_mode)ifdim1==1anddim2==1:# dispatch to Dotreturndot(inp1,inp2)elifmaxdim<=2ordim2<=2:# dispath to MatrixMulextentedMatrixMulOp=_get_extentedMatrixMulOp(inp1.device,inp1.dtype,dim1,dim2,transpose_a,transpose_b,compute_mode,format,strategy=_Hashable(get_execution_strategy()),)(result,)=apply(extentedMatrixMulOp(),inp1,inp2)returnresultelse:# dispath to BatchedMatrixMulextentedBatchedMatrixMulOp=_get_extentedBatchedMatrixMulOp(inp1.device,inp1.dtype,dim1,dim2,transpose_a,transpose_b,compute_mode,format,strategy=_Hashable(get_execution_strategy()),)(result,)=apply(extentedBatchedMatrixMulOp(),inp1,inp2)returnresult
[文档]defdot(inp1:Tensor,inp2:Tensor)->Tensor:r"""Computes dot-product of two vectors ``inp1`` and ``inp2``. inputs must be 1-dimensional or scalar. A scalar input is automatically broadcasted. Refer to :func:`~.matmul` for more general usage. Args: inp1: first vector. inp2: second vector. Returns: output value. Examples: .. testcode:: import numpy as np from megengine import tensor import megengine.functional as F data1 = tensor(np.arange(0, 6, dtype=np.float32)) data2 = tensor(np.arange(0, 6, dtype=np.float32)) out = F.dot(data1, data2) print(out.numpy()) Outputs: .. testoutput:: 55. """op=builtin.Dot()assert(inp1.ndim<=1andinp2.ndim<=1),"Input tensors for dot must be 1-dimensional or scalar"(result,)=apply(op,inp1,inp2)returnresult
[文档]defsvd(inp:Tensor,full_matrices=False,compute_uv=True)->Tensor:r"""Computes the singular value decompositions of input matrix. Args: inp: input matrix, must has shape `[..., M, N]`. Returns: output matrices, `(U, sigma, V)`. Examples: .. testcode:: import numpy as np from megengine import tensor import megengine.functional as F x = tensor(np.arange(0, 6, dtype=np.float32).reshape(2,3)) _, y, _ = F.svd(x) print(y.numpy().round(decimals=3)) Outputs: .. testoutput:: [7.348 1. ] """op=builtin.SVD(full_matrices=full_matrices,compute_uv=compute_uv)U,sigma,V=apply(op,inp)returnU,sigma,V
def_check_non_finite(inps:Iterable[Tensor],scale=1.0)->Tensor:r"""Check whether input contains infinite or nan value. Args: inp: a tensor to be checked. Returns: a int32 scalar tensor, 0 for False and 1 for True. """op=builtin.CheckNonFinite(scale=scale)oups=apply(op,*inps)out=oups[-1]foriinrange(len(inps)):inps[i]._reset(oups[i])returnout