Skip to main content

Mojo module

allreduce

Multi-GPU allreduce implementation for efficient tensor reduction across GPUs.

This module provides an optimized implementation of allreduce operations across multiple GPUs, supporting both peer-to-peer (P2P) and non-P2P communication patterns. The implementation automatically selects between two approaches based on hardware capabilities:

  1. P2P-based implementation (when P2P access is available):

    • Uses direct GPU-to-GPU memory access for better performance
    • Implements both single-stage and two-stage algorithms:
      • Single-stage for latency-bound transfers (small tensors)
      • Two-stage (reduce-scatter + all-gather) for bandwidth-bound transfers (large tensors)
    • Optimized for NVLink bandwidth utilization
    • Uses vectorized memory access and higher precision accumulation
  2. Non-P2P fallback implementation:

    • Copies data through host memory when direct GPU access isn't possible
    • Simple but functional approach for systems without P2P support

The implementation is tuned for common GPU architectures (A100, H100) and includes parameters that can be adjusted for different hardware configurations.

Per-Device Architecture

The allreduce operation follows a per-device execution model:

  1. Single-Device Instances: Each GPU runs its own instance of the allreduce operation.

  2. Parallel Execution: The Python/Graph API layer is responsible for:

    • Creating one allreduce op instance per participating GPU.
    • Ensuring all instances execute in parallel.
    • Ensuring correctness by staging mo.fence.
  3. Device Affinity: Each allreduce instance:

    • Executes on its assigned GPU (specified via device context).
    • Reads from all GPUs' input buffers (requires P2P access).
    • Writes only to its own output buffer.
    • Uses the same synchronization signals as other instances.
  4. Requirements:

    • Peer-to-peer access must be enabled between all participating GPUs.
    • All instances must launch before any can complete (for synchronization).
    • The device context determines which GPU executes each instance.

Limitations:

  • Number of elements must be a multiple of SIMD width.
  • Maximum of 8 GPUs supported.
  • All input/output buffers must have identical shapes.

Visual Overview

  1. 1-Stage P2P (latency-bound)

    Each GPU r reads its portion from every peer buffer directly (via P2P), accumulates, then writes to its result using the epilogue:

    GPU r (result_r)
    src_ptrs[0] ─┐
    src_ptrs[1] ─┼──► Σ (high-precision accum) ──► output_lambda ──► result_r
    ...         ─┘

    Notes:

    • Vectorized loads from global memory on each GPU.
    • Good for small/latency-bound tensors.
  2. 2-Stage P2P (bandwidth-bound)

    Stage 1 (reduce-scatter): Each GPU r reduces its assigned partition and writes into its own signal payload (the bytes after the Signal header).

    src_ptrs[*]  ──►  reduce(partition r)  ──►  rank_sigs[r].payload  (per-GPU)

    Stage 2 (all-gather): Each GPU r gathers all partitions from peers' payloads and writes them to its result using the epilogue.

    [payload_0], [payload_1], ..., [payload_{ngpus-1}]  ──►  result_r (via output_lambda)

For the naive allreduce (no P2P) per-device flow and staging details, see the _allreduce_naive_single docstring in this file.

comptime values

allreduce_table

comptime allreduce_table = Table[TuningConfigAllreduce](List[TuningConfigAllreduce](TuningConfigAllreduce(-1, -1, "sm_90a", 216), TuningConfigAllreduce(4, 134217728, "sm_90a", 232), TuningConfigAllreduce(-1, -1, "sm_100a", 512), TuningConfigAllreduce(2, 8388608, "sm_100a", 512), TuningConfigAllreduce(2, 16777216, "sm_100a", 512), TuningConfigAllreduce(2, 33554432, "sm_100a", 512), TuningConfigAllreduce(2, 67108864, "sm_100a", 512), TuningConfigAllreduce(2, 134217728, "sm_100a", 512), TuningConfigAllreduce(4, 8388608, "sm_100a", 512), TuningConfigAllreduce(4, 16777216, "sm_100a", 512), TuningConfigAllreduce(4, 33554432, "sm_100a", 512), TuningConfigAllreduce(4, 67108864, "sm_100a", 512), TuningConfigAllreduce(4, 134217728, "sm_100a", 512), Tuple[]()), "allreduce_table")

elementwise_epilogue_type

comptime elementwise_epilogue_type = fn[dtype: DType, rank: Int, width: Int, *, alignment: Int](IndexList[rank], SIMD[dtype, width]) capturing -> None

Structs

  • TuningConfigAllreduce: Parameters: ngpus: Number of GPUs for running allreduce. num_bytes: Total number of input bytes supported by the config. sm_version: SM version (as string). num_blocks: Number of thread blocks for running allreduce.

Functions

Was this page helpful?