IMPORTANT: To view this page as Markdown, append `.md` to the URL (e.g. /max/get-started.md). For the complete documentation index, see llms.txt.
Skip to main content
For the complete documentation index, see llms.txt. Markdown versions of all pages are available by appending .md to any URL (e.g. /max/get-started.md).

Mojo struct

InputTilePipeline

struct InputTilePipeline[Payload: TilePayload, num_group_stages: Int, k_group_size: Int]

Tile pipeline with configurable payload type.

Separates synchronization from tile storage. The Payload parameter (e.g., StandardTilePayload or BlockScaledTilePayload) holds tile arrays.

Fields​

  • ​pipeline (InputTilePipeline[Payload, num_group_stages, k_group_size].Pipeline):
  • ​payload (Payload):

Implemented traits​

AnyType, Copyable, ImplicitlyCopyable, ImplicitlyDeletable, Movable, RegisterPassable, TrivialRegisterPassable

comptime members​

BarrierArray​

comptime BarrierArray = SMemArray[SharedMemBarrier, (num_group_stages * 2)]

Pipeline​

comptime Pipeline = ProducerConsumerPipeline[num_group_stages]

Methods​

__init__​

def __init__(barriers: SMemArray[SharedMemBarrier, (num_group_stages * 2)], payload: Payload) -> Self

Initialize from typed barrier array and payload.

init_barriers​

static def init_barriers(storage_ptr: UnsafePointer[SharedMemBarrier, MutUntrackedOrigin, address_space=AddressSpace.SHARED], producer_arv_count: Int32, consumer_arv_count: Int32)

Initialize pipeline barriers. Called once by elect_one thread.

try_acquire_producer​

def try_acquire_producer(self) -> Bool

Non-blocking check if next producer stage is available.

Example (TMA Load warp): var ready = pipeline.try_acquire_producer() # ... do other work while potentially waiting ... pipeline.wait_producer_if_needed(ready) var stage = pipeline.producer_stage() # ... load tiles ...

Returns:

Bool: True if consumer has freed the stage, False otherwise.

try_acquire_consumer​

def try_acquire_consumer(self) -> Bool

Non-blocking check if next consumer stage has data.

Example (MMA warp): var ready = pipeline.try_acquire_consumer() # ... do other work while potentially waiting ... pipeline.wait_consumer_if_needed(ready) var stage = pipeline.consumer_stage() # ... process tiles ...

Returns:

Bool: True if producer has filled the stage, False otherwise.

wait_producer_if_needed​

def wait_producer_if_needed(self, already_ready: Bool)

Conditionally wait for producer stage if not already ready.

Args:

  • ​already_ready (Bool): Result from try_acquire_consumer().

wait_consumer_if_needed​

def wait_consumer_if_needed(self, already_ready: Bool)

Conditionally wait for consumer to free stage if not already ready.

Args:

  • ​already_ready (Bool): Result from try_acquire_producer().

producer_stage​

def producer_stage(self) -> UInt32

Returns:

UInt32

consumer_stage​

def consumer_stage(self) -> UInt32

Returns:

UInt32

producer_mbar​

def producer_mbar(self, stage: UInt32) -> MbarPtr

Returns:

MbarPtr

consumer_mbar​

def consumer_mbar(self, stage: UInt32) -> MbarPtr

Returns:

MbarPtr

producer​

def producer[mut_origin: MutOrigin](ref[Payload] self) -> InputProducer[mut_origin, Payload, num_group_stages, k_group_size]

Get producer view for TMA Load warp.

Returns:

InputProducer[mut_origin, Payload, num_group_stages, k_group_size]

consumer​

def consumer[mut_origin: MutOrigin](ref[Payload] self) -> InputConsumer[mut_origin, Payload, num_group_stages, k_group_size]

Get consumer view for MMA warp.

Returns:

InputConsumer[mut_origin, Payload, num_group_stages, k_group_size]

acquire_producer​

def acquire_producer[mut_origin: MutOrigin](ref[Payload] self) -> InputProducerStage[mut_origin, Payload, num_group_stages, k_group_size]

Acquire a producer stage handle (linear type).

Waits for the consumer to free the current stage, then returns a linear type handle that MUST be released (compiler-enforced).

Usage: var tiles = pipeline.acquire_producer() load_tiles(tiles.payload(), tiles.stage(), tiles.barrier()) tiles^.release() # Advances to next stage

Returns:

InputProducerStage[mut_origin, Payload, num_group_stages, k_group_size]: An InputProducerStage handle that must be released.

acquire_consumer​

def acquire_consumer[mut_origin: MutOrigin](ref[Payload] self) -> InputConsumerStage[mut_origin, Payload, num_group_stages, k_group_size]

Acquire a consumer stage handle (linear type).

Waits for the producer to fill the current stage, then returns a linear type handle that MUST be released (compiler-enforced).

Usage: var tiles = pipeline.acquire_consumer() process_tiles(tiles.payload(), tiles.stage()) tiles^.release() # Signals complete and advances

Returns:

InputConsumerStage[mut_origin, Payload, num_group_stages, k_group_size]: An InputConsumerStage handle that must be released.

drain_producer​

def drain_producer(mut self)

Drain pipeline to prevent CTA exit while peer is still working.

Call this after all producer iterations are complete. This is the linear type equivalent of InputProducer.drain().