Mojo struct
RingBuffer
struct RingBuffer[a_type: DType, b_type: DType, a_tile_layout: Layout, b_tile_layout: Layout, num_pipeline_stages: Int, num_consumers: Int, cluster_size: Int, tma_transfer: Bool = True]
Ring buffer for managing pipeline synchronization between producers and consumers.
This struct encapsulates the synchronization logic for a multi-stage pipeline with one producer and multiple consumers, supporting both single-block and multi-cluster configurations.
The ring buffer uses two sets of barriers:
- full_mbar: Signals when tiles are ready for consumption
- empty_mbar: Signals when slots are available for production
Template Parameters: a_type: Data type for A matrix tiles b_type: Data type for B matrix tiles a_tile_layout: Memory layout for A tiles b_tile_layout: Memory layout for B tiles num_pipeline_stages: Number of stages in the circular buffer num_consumers: Number of consumer warp groups cluster_size: Number of blocks in the cluster (1 for single-block) tma_transfer: Whether the RingBuffer is used for TMA transfers (default: True)
Fields
- full_mbar (
SMemArrayType[SharedMemBarrier, num_pipeline_stages]): - empty_mbar (
SMemArrayType[SharedMemBarrier, num_pipeline_stages]): - read_state (
PipelineState[num_pipeline_stages]): - write_state (
PipelineState[num_pipeline_stages]): - warp_group_thread_idx (
UInt): - a_tiles (
SMemTileArrayType[a_type, a_tile_layout, num_pipeline_stages, 128]): - b_tiles (
SMemTileArrayType[b_type, b_tile_layout, num_pipeline_stages, 128]):
Implemented traits
AnyType,
Copyable,
ImplicitlyCopyable,
UnknownDestructibility
Aliases
__copyinit__is_trivial
alias __copyinit__is_trivial = True
__del__is_trivial
alias __del__is_trivial = True
ATile
alias ATile = LayoutTensor[a_type, a_tile_layout, MutableAnyOrigin, address_space=AddressSpace(3), alignment=128]
ATileArray
alias ATileArray = SMemTileArrayType[a_type, a_tile_layout, num_pipeline_stages, 128]
BTile
alias BTile = LayoutTensor[b_type, b_tile_layout, MutableAnyOrigin, address_space=AddressSpace(3), alignment=128]
BTileArray
alias BTileArray = SMemTileArrayType[b_type, b_tile_layout, num_pipeline_stages, 128]
PipelineBarrier
alias PipelineBarrier = SMemArrayType[SharedMemBarrier, num_pipeline_stages]
SMM
alias SMM = SharedMemoryManager[NVIDIASharedMemoryBasePtr]
Methods
__init__
__init__(out self, full_mbar: SMemArrayType[SharedMemBarrier, num_pipeline_stages], empty_mbar: SMemArrayType[SharedMemBarrier, num_pipeline_stages], warp_group_thread_idx: UInt, a_tiles: SMemTileArrayType[a_type, a_tile_layout, num_pipeline_stages, 128], b_tiles: SMemTileArrayType[b_type, b_tile_layout, num_pipeline_stages, 128])
Initialize ring buffer with barrier pointers.
Args:
- full_mbar (
SMemArrayType): Barrier array signaling when tiles are ready. - empty_mbar (
SMemArrayType): Barrier array signaling when slots are empty. - warp_group_thread_idx (
UInt): Thread index within the warp group. - a_tiles (
SMemTileArrayType): Iterator over A matrix tile storage. - b_tiles (
SMemTileArrayType): Iterator over B matrix tile storage.
__enter__
__enter__(mut self) -> Self
Context manager entry.
get_expected_bytes
static get_expected_bytes() -> Int
Calculate expected bytes per pipeline stage for TMA transfers.
Returns:
get_slot
get_slot(mut self) -> UInt32
Producer waits for empty buffer slot and prepares for loading.
This method blocks until the current write slot is empty (all consumers have finished with it), then prepares the barrier for new data.
Returns:
UInt32: Index of the available slot in the ring buffer.
get_producer_tiles
get_producer_tiles(mut self) -> Tuple[UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)], LayoutTensor[a_type, a_tile_layout, MutableAnyOrigin, address_space=AddressSpace(3), alignment=128], LayoutTensor[b_type, b_tile_layout, MutableAnyOrigin, address_space=AddressSpace(3), alignment=128]]
Get the next available slot for the producer to fill.
Returns:
Tuple: Tuple of (barrier, a_tile, b_tile) for the producer to use.
enqueue_tile
enqueue_tile(mut self)
Producer signals that tile loading is complete.
This handles the specific signaling pattern needed:
- For cp.async: Signal async copy arrival and barrier arrival
- For TMA: Barrier arrival is handled by hardware
After signaling, advances to the next pipeline stage.
get_tile
get_tile(mut self) -> UInt32
Consumer waits for full buffer slot.
This method blocks until the producer has filled the current read slot.
Returns:
UInt32: Index of the available tile to consume.
get_consumer_tiles
get_consumer_tiles(mut self) -> Tuple[UInt32, LayoutTensor[a_type, a_tile_layout, MutableAnyOrigin, address_space=AddressSpace(3), alignment=128], LayoutTensor[b_type, b_tile_layout, MutableAnyOrigin, address_space=AddressSpace(3), alignment=128]]
Consumer waits for full buffer slot and returns the tiles.
Returns:
Tuple: Tuple of (read_idx, a_tile, b_tile) for the consumer to process.
release_slot
release_slot(mut self, read_idx: UInt32)
Consumer signals that buffer slot is empty.
This allows the producer to reuse this slot in the ring buffer. Different arrival patterns are used for single-block vs multi-cluster.
Args:
- read_idx (
UInt32): Index of the slot to release.
consumer
consumer(mut self) -> RingBufferConsumer[self, RingBuffer[a_type, b_type, a_tile_layout, b_tile_layout, num_pipeline_stages, num_consumers, cluster_size, tma_transfer]]
Create a consumer view of this ring buffer.
Returns:
producer
producer(mut self) -> RingBufferProducer[self, RingBuffer[a_type, b_type, a_tile_layout, b_tile_layout, num_pipeline_stages, num_consumers, cluster_size, tma_transfer]]
Create a producer view of this ring buffer.
Returns:
RingBufferProducer
arrive_empty_barriers
arrive_empty_barriers(self)
Helper to arrive at empty barriers during consumer initialization.
This is called when consumers enter their context to signal they are ready to consume tiles. It ensures all pipeline stages start with empty slots available for the producer.
Was this page helpful?
Thank you! We'll create more content like this.
Thank you for helping us improve!