megengine.distributed package

megengine.distributed.functional

megengine.distributed.functional.all_gather(inp, group=<megengine.distributed.group.Group object>, device='')[source]

Create all_gather operator for collective communication.

Parameters
  • inp (Tensor) – input tensor.

  • group (Optional[Group]) – communication group.

  • device (Optional[str]) – execution device.

Return type

Tensor

megengine.distributed.functional.all_reduce_max(inp, group=<megengine.distributed.group.Group object>, device='')[source]

Create all_reduce_max operator for collective communication.

Parameters
  • inp (Tensor) – input tensor.

  • group (Optional[Group]) – communication group.

  • device (Optional[str]) – execution device.

Return type

Tensor

megengine.distributed.functional.all_reduce_min(inp, group=<megengine.distributed.group.Group object>, device='')[source]

Create all_reduce_min operator for collective communication.

Parameters
  • inp (Tensor) – input tensor.

  • group (Optional[Group]) – communication group.

  • device (Optional[str]) – execution device.

Return type

Tensor

megengine.distributed.functional.all_reduce_sum(inp, group=<megengine.distributed.group.Group object>, device='')[source]

Create all_reduce_sum operator for collective communication.

Parameters
  • inp (Tensor) – input tensor.

  • group (Optional[Group]) – communication group.

  • device (Optional[str]) – execution device.

Return type

Tensor

megengine.distributed.functional.all_to_all(inp, group=<megengine.distributed.group.Group object>, device='')[source]

Create all_to_all operator for collective communication.

Parameters
  • inp (Tensor) – input tensor.

  • group (Optional[Group]) – communication group.

  • device (Optional[str]) – execution device.

Return type

Tensor

megengine.distributed.functional.broadcast(inp, group=<megengine.distributed.group.Group object>, device='')[source]

Create broadcast operator for collective communication.

Parameters
  • inp (Tensor) – input tensor.

  • group (Optional[Group]) – communication group.

  • device (Optional[str]) – execution device.

Return type

Tensor

megengine.distributed.functional.gather(inp, group=<megengine.distributed.group.Group object>, device='')[source]

Create gather operator for collective communication.

Parameters
  • inp (Tensor) – input tensor.

  • group (Optional[Group]) – communication group.

  • device (Optional[str]) – execution device.

Return type

Tensor

megengine.distributed.functional.reduce_scatter_sum(inp, group=<megengine.distributed.group.Group object>, device='')[source]

Create reduce_scatter_sum operator for collective communication.

Parameters
  • inp (Tensor) – input tensor.

  • group (Optional[Group]) – communication group.

  • device (Optional[str]) – execution device.

Return type

Tensor

megengine.distributed.functional.reduce_sum(inp, group=<megengine.distributed.group.Group object>, device='')[source]

Create reduce_sum operator for collective communication.

Parameters
  • inp (Tensor) – input tensor.

  • group (Optional[Group]) – communication group.

  • device (Optional[str]) – execution device.

Return type

Tensor

megengine.distributed.functional.remote_recv(src_rank, shape, dtype, device=None, inp=None)[source]

Receive a Tensor from a remote process.

Parameters
  • src_rank (int) – source process rank.

  • shape (Tuple[int]) – the shape of the tensor to receive.

  • dtype (type) – the data type of the tensor to receive.

  • device (Optional[str]) – the device to place the received tensor.

  • inp – dummy input to determine recved tensor type

Return type

Tensor

megengine.distributed.functional.remote_send(inp, dest_rank)[source]

Send a Tensor to a remote process.

Parameters
  • inp (Tensor) – tensor to send.

  • dest_rank (int) – destination process rank.

Return type

Tensor

megengine.distributed.functional.scatter(inp, group=<megengine.distributed.group.Group object>, device='')[source]

Create scatter operator for collective communication.

Parameters
  • inp (Tensor) – input tensor.

  • group (Optional[Group]) – communication group.

  • device (Optional[str]) – execution device.

Return type

Tensor

megengine.distributed.group

class megengine.distributed.group.Group(proc_ranks)[source]

Bases: object

check(proc_ranks)[source]
property comp_node
property key
property rank
reset(proc_ranks)[source]
property size
class megengine.distributed.group.StaticData[source]

Bases: object

backend = None
client = None
device = None
master_ip = None
mm_server_port = None
next_stream = None
proc_rank = None
py_server_port = None
server = None
world_size = None
megengine.distributed.group.get_backend()[source]

Get the backend str.

Return type

str

megengine.distributed.group.get_client()[source]

Get client of python XML RPC server.

Return type

Client

megengine.distributed.group.get_mm_server_addr()[source]

Get master_ip and port of C++ mm_server.

Return type

Tuple[str, int]

megengine.distributed.group.get_py_server_addr()[source]

Get master_ip and port of python XML RPC server.

Return type

Tuple[str, int]

megengine.distributed.group.get_rank()[source]

Get the rank of the current process.

Return type

int

megengine.distributed.group.get_world_size()[source]

Get the total number of processes participating in the job.

Return type

int

megengine.distributed.group.group_barrier(group=<megengine.distributed.group.Group object>)[source]

Block until all ranks in the group reach this barrier.

Return type

None

megengine.distributed.group.init_process_group(master_ip, port, world_size, rank, device, backend='nccl')[source]

Initialize the distributed process group and specify the device used in the current process

Parameters
  • master_ip (str) – ip address of the master node.

  • port (int) – port available for all processes to communicate.

  • world_size (int) – total number of processes participating in the job.

  • rank (int) – rank of the current process.

  • device (int) – the GPU device id to bind this process to.

  • backend (Optional[str]) – communicator backend, currently support ‘nccl’ and ‘ucx’.

Return type

None

megengine.distributed.group.is_distributed()[source]

Return True if the distributed process group has been initialized.

Return type

bool

megengine.distributed.group.new_group(proc_ranks)[source]

Build a subgroup containing certain ranks.

Return type

Group

megengine.distributed.helper

class megengine.distributed.helper.AllreduceCallback(reduce_method, group=<megengine.distributed.group.Group object>)[source]

Bases: object

Allreduce Callback with tensor fusion optimization.

Parameters
  • reduce_method (str) – the method to reduce gradiants.

  • group (Group) – communication group.

class megengine.distributed.helper.TensorFuture(ack=True)[source]

Bases: megengine.utils.future.Future

device()[source]
dtype()[source]
numpy()[source]
shape()[source]
megengine.distributed.helper.apply()
megengine.distributed.helper.bcast_list_(inps, group=<megengine.distributed.group.Group object>)[source]

Broadcast tensors between given group.

Parameters
  • inps (list) – input tensors.

  • group (Group) – communication group.

megengine.distributed.helper.get_device_count_by_fork(device_type)[source]

Get device count in fork thread. See https://stackoverflow.com/questions/22950047/cuda-initialization-error-after-fork for more information.

megengine.distributed.helper.get_offsets(shapes)[source]
megengine.distributed.helper.make_allreduce_cb

alias of megengine.distributed.helper.AllreduceCallback

megengine.distributed.helper.pack_allreduce_split(pack_list, shapes, group, reduce_method)[source]
megengine.distributed.helper.param_pack_concat(inps, offsets, offsets_val)[source]

Returns concated tensor, only used for parampack.

Parameters
  • inps (list) – input tensors.

  • offsets (Tensor) – device value of offsets.

  • offsets_val (list) – offsets of inputs, length of 2 * n, format [begin0, end0, begin1, end1].

Returns

concated tensor.

Examples:

import numpy as np
from megengine import tensor
from megengine.distributed.helper import param_pack_concat

a = tensor(np.ones((1,), np.int32))
b = tensor(np.ones((3, 3), np.int32))
offsets_val = [0, 1, 1, 10]
offsets = tensor(offsets_val, np.int32)
c = param_pack_concat([a, b], offsets, offsets_val)
print(c.numpy())

Outputs:

[1 1 1 1 1 1 1 1 1 1]
megengine.distributed.helper.param_pack_split(inp, offsets, shapes)[source]
Returns split tensor to tensor list as offsets and shapes described,

only used for parampack.

Parameters
  • inp (Tensor) – input tensor.

  • offsets (list) – offsets of outputs, length of 2 * n, while n is tensor nums you want to split, format [begin0, end0, begin1, end1].

  • shapes (list) – tensor shapes of outputs.

Returns

splitted tensors.

Examples:

import numpy as np
from megengine import tensor
from megengine.distributed.helper import param_pack_split

a = tensor(np.ones((10,), np.int32))
b, c = param_pack_split(a, [0, 1, 1, 10], [(1,), (3, 3)])
print(b.numpy())
print(c.numpy())

Outputs:

[1]
[[1 1 1]
 [1 1 1]
 [1 1 1]]
megengine.distributed.helper.synchronized(func)[source]

Decorator. Decorated function will synchronize when finished. Specifically, we use this to prevent data race during hub.load

megengine.distributed.launcher

class megengine.distributed.launcher.launcher(*args, **kwargs)[source]

Bases: object

Decorator for launching multiple processes in single-machine multi-gpu training.

Parameters
  • func – the function you want to launch in distributed mode.

  • n_gpus – how many devices each node.

  • world_size – how many devices totally.

  • rank_start – start number for rank.

  • master_ip – ip address for master node (where the rank 0 is).

  • port – server port for distributed server.

megengine.distributed.server

class megengine.distributed.server.Client(master_ip, port)[source]

Bases: object

Distributed Client for distributed training.

Parameters
  • master_ip – ip address of master node.

  • port – port of server at master node.

check_is_grad(key)[source]

Check whether send/recv need gradiants.

Parameters

key – key to match send/recv op.

check_remote_tracer(key)[source]

Get tracer dict for send/recv op.

Parameters

key – key to match send/recv op.

connect()[source]

Check connection success.

get_mm_server_port()[source]

Get multiple machine server port.

group_barrier(key, size)[source]

A barrier wait for all group member.

Parameters
  • key – group key to match each other.

  • size – group size.

set_is_grad(key, is_grad)[source]

Mark send/recv need gradiants by key.

Parameters
  • key – key to match send/recv op.

  • is_grad – whether this op need grad.

set_remote_tracer(key, tracer_set)[source]

Set tracer dict for tracing send/recv op.

Parameters
  • key – key to match send/recv op.

  • tracer_set – valid tracer set.

user_get(key)[source]

Get user defined key-value pairs across processes.

user_set(key, val)[source]

Set user defined key-value pairs across processes.

class megengine.distributed.server.Methods(mm_server_port)[source]

Bases: object

Distributed Server Method. Used for exchange information between distributed nodes.

Parameters

mm_server_port – multiple machine rpc server port.

check_is_grad(key)[source]

Check whether send/recv need gradiants.

Parameters

key – key to match send/recv op.

check_remote_tracer(key)[source]

Get tracer dict for send/recv op.

Parameters

key – key to match send/recv op.

connect()[source]

Method for checking connection success.

get_mm_server_port()[source]

Get multiple machine rpc server port.

group_barrier(key, size)[source]

A barrier wait for all group member.

Parameters
  • key – group key to match each other.

  • size – group size.

set_is_grad(key, is_grad)[source]

Mark send/recv need gradiants by key.

Parameters
  • key – key to match send/recv op.

  • is_grad – whether this op need grad.

set_remote_tracer(key, tracer_set)[source]

Set tracer dict for tracing send/recv op.

Parameters
  • key – key to match send/recv op.

  • tracer_set – valid tracer set.

user_get(key)[source]

Get user defined key-value pairs across processes.

user_set(key, val)[source]

Set user defined key-value pairs across processes.

class megengine.distributed.server.Server(port=0)[source]

Bases: object

Distributed Server for distributed training. Should be running at master node.

Parameters

port – python server port.

class megengine.distributed.server.ThreadXMLRPCServer(addr, requestHandler=<class 'xmlrpc.server.SimpleXMLRPCRequestHandler'>, logRequests=True, allow_none=False, encoding=None, bind_and_activate=True, use_builtin_types=False)[source]

Bases: socketserver.ThreadingMixIn, xmlrpc.server.SimpleXMLRPCServer

megengine.distributed.server.main(port=0, verbose=True)[source]

megengine.distributed.util

megengine.distributed.util.get_free_ports(num)[source]

Get one or more free ports.

Return type

List[int]