megengine.distributed package

megengine.distributed.functional

megengine.distributed.functional.all_gather(inp, group=<megengine.distributed.group.Group object>, device='')[源代码]

创建用于聚合通信的 all_gather 算子。

参数
  • inp (Tensor) – 输入张量。

  • group (Optional[Group]) – 通信组。

  • device (Optional[str]) – 执行设备。

返回类型

Tensor

megengine.distributed.functional.all_reduce_max(inp, group=<megengine.distributed.group.Group object>, device='')[源代码]

创建用于聚合通信的 all_reduce_max 算子。

参数
  • inp (Tensor) – 输入张量。

  • group (Optional[Group]) – 通信组。

  • device (Optional[str]) – 执行设备。

返回类型

Tensor

megengine.distributed.functional.all_reduce_min(inp, group=<megengine.distributed.group.Group object>, device='')[源代码]

创建用于聚合通信的 all_reduce_min 算子。

参数
  • inp (Tensor) – 输入张量。

  • group (Optional[Group]) – 通信组。

  • device (Optional[str]) – 执行设备。

返回类型

Tensor

megengine.distributed.functional.all_reduce_sum(inp, group=<megengine.distributed.group.Group object>, device='')[源代码]

创建用于聚合通信的 all_reduce_sum 算子。

参数
  • inp (Tensor) – 输入张量。

  • group (Optional[Group]) – 通信组。

  • device (Optional[str]) – 执行设备。

返回类型

Tensor

megengine.distributed.functional.all_to_all(inp, group=<megengine.distributed.group.Group object>, device='')[源代码]

创建用于聚合通信 all_to_all 算子。

参数
  • inp (Tensor) – 输入张量。

  • group (Optional[Group]) – 通信组。

  • device (Optional[str]) – 执行设备。

返回类型

Tensor

megengine.distributed.functional.broadcast(inp, group=<megengine.distributed.group.Group object>, device='')[源代码]

创建用于聚合通信的广播算子。

参数
  • inp (Tensor) – 输入张量。

  • group (Optional[Group]) – 通信组。

  • device (Optional[str]) – 执行设备。

返回类型

Tensor

megengine.distributed.functional.gather(inp, group=<megengine.distributed.group.Group object>, device='')[源代码]

创建用于聚合通信的 gather 算子。

参数
  • inp (Tensor) – 输入张量。

  • group (Optional[Group]) – 通信组。

  • device (Optional[str]) – 执行设备。

返回类型

Tensor

megengine.distributed.functional.reduce_scatter_sum(inp, group=<megengine.distributed.group.Group object>, device='')[源代码]

创建用于聚合通信的 reduce_scatter_sum 算子

参数
  • inp (Tensor) – 输入张量。

  • group (Optional[Group]) – 通信组。

  • device (Optional[str]) – 执行设备。

返回类型

Tensor

megengine.distributed.functional.reduce_sum(inp, group=<megengine.distributed.group.Group object>, device='')[源代码]

创建用于聚合通信的 reduce_sum 算子

参数
  • inp (Tensor) – 输入张量。

  • group (Optional[Group]) – 通信组。

  • device (Optional[str]) – 执行设备。

返回类型

Tensor

megengine.distributed.functional.remote_recv(src_rank, shape, dtype, device=None)[源代码]

从远端进程接收一个张量。

参数
  • src_rank (int) – 源进程的序号。

  • shape (Tuple[int]) – 被接收的张量的形状。

  • dtype (type) – 被接收的张量的数据类型。

  • device (Optional[str]) – 被接收的张量将要放置的设备。

返回类型

Tensor

megengine.distributed.functional.remote_send(inp, dest_rank)[源代码]

发送一个张量到远端进程。

参数
  • inp (Tensor) – 被发送的张量。

  • dest_rank (int) – 目标进程序号。

返回类型

Tensor

megengine.distributed.functional.scatter(inp, group=<megengine.distributed.group.Group object>, device='')[源代码]

创建用于聚合通信的 scatter 算子。

参数
  • inp (Tensor) – 输入张量。

  • group (Optional[Group]) – 通信组。

  • device (Optional[str]) – 执行设备。

返回类型

Tensor

megengine.distributed.group

class megengine.distributed.group.Group(proc_ranks)[源代码]

基类:object

check(proc_ranks)[源代码]
property comp_node
property key
property rank
reset(proc_ranks)[源代码]
property size
class megengine.distributed.group.StaticData[源代码]

基类:object

backend = None
client = None
device = None
master_ip = None
mm_server_port = None
next_stream = None
proc_rank = None
py_server_port = None
server = None
world_size = None
megengine.distributed.group.get_backend()[源代码]

获取字符串形式表示的后端。

返回类型

str

megengine.distributed.group.get_client()[源代码]

获取 python XML RPC 服务器的客户端。

返回类型

Client

megengine.distributed.group.get_mm_server_addr()[源代码]

获取 C++ mm_server 的主机IP和端口。

返回类型

Tuple[str, int]

megengine.distributed.group.get_py_server_addr()[源代码]

获取 python XML RPC 服务器的主机IP和端口。

返回类型

Tuple[str, int]

megengine.distributed.group.get_rank()[源代码]

获取当前进程的进程号。

返回类型

int

megengine.distributed.group.get_world_size()[源代码]

获取的参与任务的进程总数。

返回类型

int

megengine.distributed.group.group_barrier(group=<megengine.distributed.group.Group object>)[源代码]

阻止调用,直到组中的所有进程达到这个障碍点。

返回类型

None

megengine.distributed.group.init_process_group(master_ip, port, world_size, rank, device, backend='nccl')[源代码]

初始化分布式进程组,并且指定在当前进程中使用的设备。

参数
  • master_ip (str) – 主节点的IP地址。

  • port (int) – 所有进程之间进行通信的可用端口。

  • world_size (int) – 参与任务的进程总数。

  • rank (int) – 当前进程的进程号。

  • device (int) – 待与该进程绑定的GPU设备号。

  • backend (Optional[str]) – Communicator的后端,目前支持’NCCL’和’UCX’。

返回类型

None

megengine.distributed.group.is_distributed()[源代码]

如果分布式进程组已完成初始化则返回True。

返回类型

bool

megengine.distributed.group.new_group(proc_ranks)[源代码]

构造一个包含特定序号的子通信组。

返回类型

Group

megengine.distributed.helper

class megengine.distributed.helper.AllreduceCallback(reduce_method, group=<megengine.distributed.group.Group object>)[源代码]

基类:object

具有张量融合优化的 Allreduce 回调函数。

参数
  • reduce_method (str) – 归并梯度的方法。

  • group (Group) – 通信组。

class megengine.distributed.helper.TensorFuture(ack=True)[源代码]

基类:megengine.utils.future.Future

device()[源代码]
dtype()[源代码]
numpy()[源代码]
shape()[源代码]
megengine.distributed.helper.bcast_list_(inps, group=<megengine.distributed.group.Group object>)[源代码]

在指定通信组间广播张量。

参数
  • inps (list) – 输入张量。

  • group (Group) – 通信组。

megengine.distributed.helper.get_device_count_by_fork(device_type)[源代码]

在fork出来的线程中获取设备计数。详细信息可参考 https://stackoverflow.com/questions/22950047/cuda-initialization-error-after-fork

megengine.distributed.helper.get_offsets(shapes)[源代码]
megengine.distributed.helper.make_allreduce_cb

megengine.distributed.helper.AllreduceCallback 的别名

megengine.distributed.helper.pack_allreduce_split(pack_list, shapes, group, reduce_method)[源代码]
megengine.distributed.helper.param_pack_concat(inps, offsets, offsets_val)[源代码]

返回拼接的张量,只用在 parampack 处。

参数
  • inps (list) – 输入张量。

  • offsets (Tensor) – 设备偏移量。

  • offsets_val (list) – 输入的偏移量,长度为 2 * n,格式为 [begin0, end0, begin1, end1]

返回

拼接的张量。

示例:

import numpy as np
from megengine import tensor
from megengine.distributed.helper import param_pack_concat

a = tensor(np.ones((1,), np.int32))
b = tensor(np.ones((3, 3), np.int32))
offsets_val = [0, 1, 1, 10]
offsets = tensor(offsets_val, np.int32)
c = param_pack_concat([a, b], offsets, offsets_val)
print(c.numpy())

输出:

[1 1 1 1 1 1 1 1 1 1]
megengine.distributed.helper.param_pack_split(inp, offsets, shapes)[源代码]
根据描述的偏移量和形状,把分隔的张量以张量列表的形式返回,

只用在 parampack 处。

参数
  • inp (Tensor) – 输入张量。

  • offsets (list) – 输出的偏移量,长度为 2 * n,其中 n 是想要被切割成多少部分,格式为 [begin0, end0, begin1, end1]

  • shapes (list) – 输出的张量形状。

返回

被切分的张量。

示例:

import numpy as np
from megengine import tensor
from megengine.distributed.helper import param_pack_split

a = tensor(np.ones((10,), np.int32))
b, c = param_pack_split(a, [0, 1, 1, 10], [(1,), (3, 3)])
print(b.numpy())
print(c.numpy())

输出:

[1]
[[1 1 1]
 [1 1 1]
 [1 1 1]]
megengine.distributed.helper.synchronized(func)[源代码]

装饰器。结束后,装饰后的函数会被同步。实际应用上,我们用它来防止hub.load期间的数据竞争

megengine.distributed.launcher

megengine.distributed.launcher.launcher(func)[源代码]

在单机多卡环境下启动多个进程进行训练的装饰器。

megengine.distributed.server

class megengine.distributed.server.Client(master_ip, port)[源代码]

基类:object

分布式训练的分布式客户端。

参数
  • master_ip – 主节点的IP地址。

  • port – 获取主节点上RPC服务器的端口

check_is_grad(key)[源代码]

检查 send/recv 是否需要梯度。

参数

key – 用来匹配 send/recv 的key。

check_remote_tracer(key)[源代码]

获取 send/recv 的tracer dict。

参数

key – 用来匹配 send/recv 的key。

connect()[源代码]

检查连接是否成功。

get_mm_server_port()[源代码]

获取多个服务器的端口。

group_barrier(key, size)[源代码]

等待通信组内所有成员的障碍点。

参数
  • key – 通信组内相互匹配的key。

  • size – 通信组的大小。

set_is_grad(key, is_grad)[源代码]

用 key 来标记 send/recv 是需要梯度的。

参数
  • key – 用来匹配 send/recv 的key。

  • is_grad – 该节点是否需要梯度。

set_remote_tracer(key, tracer_set)[源代码]

为 send/recv 算子设置 tracer dict。

参数
  • key – 用来匹配 send/recv 的key。

  • tracer_set – 有效的 tracer 集合。

class megengine.distributed.server.Methods(mm_server_port)[源代码]

基类:object

分布式服务器方法。被用来在不同的节点间交换信息。

参数

mm_server_port – 多机的 rpc 服务器端口。

check_is_grad(key)[源代码]

检查 send/recv 是否需要梯度。

参数

key – 用来匹配 send/recv 的key。

check_remote_tracer(key)[源代码]

获取 send/recv 的tracer dict。

参数

key – 用来匹配 send/recv 的key。

connect()[源代码]

检查连接成功的方法。

get_mm_server_port()[源代码]

获取多机 rpc 服务器端口。

group_barrier(key, size)[源代码]

等待通信组内所有成员的障碍点。

参数
  • key – 通信组内相互匹配的key。

  • size – 通信组的大小。

set_is_grad(key, is_grad)[源代码]

用 key 来标记 send/recv 是需要梯度的。

参数
  • key – 用来匹配 send/recv 的key。

  • is_grad – 该节点是否需要梯度。

set_remote_tracer(key, tracer_set)[源代码]

为 send/recv 算子设置 tracer dict。

参数
  • key – 用来匹配 send/recv 的key。

  • tracer_set – 有效的 tracer 集合。

class megengine.distributed.server.Server(port)[源代码]

基类:object

分布式训练的分布式服务器。需要在主节点上运行。

参数

port – python 服务器端口。

class megengine.distributed.server.ThreadXMLRPCServer(addr, requestHandler=<class 'xmlrpc.server.SimpleXMLRPCRequestHandler'>, logRequests=True, allow_none=False, encoding=None, bind_and_activate=True, use_builtin_types=False)[源代码]

基类:socketserver.ThreadingMixIn, xmlrpc.server.SimpleXMLRPCServer

megengine.distributed.server.start_server(py_server_port, mm_server_port)[源代码]

启动 python 分布式服务器和多机服务器。

参数
  • py_server_port – python 服务器端口。

  • mm_server_port – 多机服务器端口。

megengine.distributed.util

megengine.distributed.util.get_free_ports(num)[源代码]

获得一个或多个空闲端口。

返回类型

List[int]