Skip to main content

Mojo struct

InputTilePipeline

struct InputTilePipeline[Payload: TilePayload, num_group_stages: Int, k_group_size: Int]

Tile pipeline with configurable payload type.

Separates synchronization from tile storage. The Payload parameter (e.g., StandardTilePayload or BlockScaledTilePayload) holds tile arrays.

Fields​

  • ​pipeline (InputTilePipeline[Payload, num_group_stages, k_group_size].Pipeline):
  • ​payload (Payload):

Implemented traits​

AnyType, Copyable, ImplicitlyCopyable, ImplicitlyDestructible, Movable, RegisterPassable, TrivialRegisterPassable

comptime members​

BarrierArray​

comptime BarrierArray = SMemArray[SharedMemBarrier, (num_group_stages * 2)]

Pipeline​

comptime Pipeline = ProducerConsumerPipeline[num_group_stages]

Methods​

__init__​

__init__(barriers: SMemArray[SharedMemBarrier, (num_group_stages * 2)], payload: Payload) -> Self

Initialize from typed barrier array and payload.

init_barriers​

static init_barriers(storage_ptr: UnsafePointer[SharedMemBarrier, MutAnyOrigin, address_space=AddressSpace.SHARED], producer_arv_count: Int32, consumer_arv_count: Int32)

Initialize pipeline barriers. Called once by elect_one thread.

try_acquire_producer​

try_acquire_producer(self) -> Bool

Non-blocking check if next producer stage is available.

Example (TMA Load warp): var ready = pipeline.try_acquire_producer() # ... do other work while potentially waiting ... pipeline.wait_producer_if_needed(ready) var stage = pipeline.producer_stage() # ... load tiles ...

Returns:

Bool: True if consumer has freed the stage, False otherwise.

try_acquire_consumer​

try_acquire_consumer(self) -> Bool

Non-blocking check if next consumer stage has data.

Example (MMA warp): var ready = pipeline.try_acquire_consumer() # ... do other work while potentially waiting ... pipeline.wait_consumer_if_needed(ready) var stage = pipeline.consumer_stage() # ... process tiles ...

Returns:

Bool: True if producer has filled the stage, False otherwise.

wait_producer_if_needed​

wait_producer_if_needed(self, already_ready: Bool)

Conditionally wait for producer stage if not already ready.

Args:

  • ​already_ready (Bool): Result from try_acquire_consumer().

wait_consumer_if_needed​

wait_consumer_if_needed(self, already_ready: Bool)

Conditionally wait for consumer to free stage if not already ready.

Args:

  • ​already_ready (Bool): Result from try_acquire_producer().

producer_stage​

producer_stage(self) -> UInt32

Returns:

UInt32

consumer_stage​

consumer_stage(self) -> UInt32

Returns:

UInt32

producer_mbar​

producer_mbar(self, stage: UInt32) -> MbarPtr

Returns:

MbarPtr

consumer_mbar​

consumer_mbar(self, stage: UInt32) -> MbarPtr

Returns:

MbarPtr

producer​

producer[mut_origin: MutOrigin](ref[Payload] self) -> InputProducer[mut_origin, Payload, num_group_stages, k_group_size]

Get producer view for TMA Load warp.

Returns:

InputProducer[mut_origin, Payload, num_group_stages, k_group_size]

consumer​

consumer[mut_origin: MutOrigin](ref[Payload] self) -> InputConsumer[mut_origin, Payload, num_group_stages, k_group_size]

Get consumer view for MMA warp.

Returns:

InputConsumer[mut_origin, Payload, num_group_stages, k_group_size]

acquire_producer​

acquire_producer[mut_origin: MutOrigin](ref[Payload] self) -> InputProducerStage[mut_origin, Payload, num_group_stages, k_group_size]

Acquire a producer stage handle (linear type).

Waits for the consumer to free the current stage, then returns a linear type handle that MUST be released (compiler-enforced).

Usage: var tiles = pipeline.acquire_producer() load_tiles(tiles.payload(), tiles.stage(), tiles.barrier()) tiles^.release() # Advances to next stage

Returns:

InputProducerStage[mut_origin, Payload, num_group_stages, k_group_size]: An InputProducerStage handle that must be released.

acquire_consumer​

acquire_consumer[mut_origin: MutOrigin](ref[Payload] self) -> InputConsumerStage[mut_origin, Payload, num_group_stages, k_group_size]

Acquire a consumer stage handle (linear type).

Waits for the producer to fill the current stage, then returns a linear type handle that MUST be released (compiler-enforced).

Usage: var tiles = pipeline.acquire_consumer() process_tiles(tiles.payload(), tiles.stage()) tiles^.release() # Signals complete and advances

Returns:

InputConsumerStage[mut_origin, Payload, num_group_stages, k_group_size]: An InputConsumerStage handle that must be released.

drain_producer​

drain_producer(mut self)

Drain pipeline to prevent CTA exit while peer is still working.

Call this after all producer iterations are complete. This is the linear type equivalent of InputProducer.drain().