Mojo struct
InputTilePipeline
struct InputTilePipeline[Payload: TilePayload, num_group_stages: Int, k_group_size: Int]
Tile pipeline with configurable payload type.
Separates synchronization from tile storage. The Payload parameter (e.g., StandardTilePayload or BlockScaledTilePayload) holds tile arrays.
Fieldsβ
- βpipeline (
InputTilePipeline[Payload, num_group_stages, k_group_size].Pipeline): - βpayload (
Payload):
Implemented traitsβ
AnyType,
Copyable,
ImplicitlyCopyable,
ImplicitlyDestructible,
Movable,
RegisterPassable,
TrivialRegisterPassable
comptime membersβ
BarrierArrayβ
comptime BarrierArray = SMemArray[SharedMemBarrier, (num_group_stages * 2)]
Pipelineβ
comptime Pipeline = ProducerConsumerPipeline[num_group_stages]
Methodsβ
__init__β
__init__(barriers: SMemArray[SharedMemBarrier, (num_group_stages * 2)], payload: Payload) -> Self
Initialize from typed barrier array and payload.
init_barriersβ
static init_barriers(storage_ptr: UnsafePointer[SharedMemBarrier, MutAnyOrigin, address_space=AddressSpace.SHARED], producer_arv_count: Int32, consumer_arv_count: Int32)
Initialize pipeline barriers. Called once by elect_one thread.
try_acquire_producerβ
try_acquire_producer(self) -> Bool
Non-blocking check if next producer stage is available.
Example (TMA Load warp):
var ready = pipeline.try_acquire_producer() # ... do other work while potentially waiting ... pipeline.wait_producer_if_needed(ready) var stage = pipeline.producer_stage() # ... load tiles ...
Returns:
Bool: True if consumer has freed the stage, False otherwise.
try_acquire_consumerβ
try_acquire_consumer(self) -> Bool
Non-blocking check if next consumer stage has data.
Example (MMA warp):
var ready = pipeline.try_acquire_consumer() # ... do other work while potentially waiting ... pipeline.wait_consumer_if_needed(ready) var stage = pipeline.consumer_stage() # ... process tiles ...
Returns:
Bool: True if producer has filled the stage, False otherwise.
wait_producer_if_neededβ
wait_producer_if_needed(self, already_ready: Bool)
Conditionally wait for producer stage if not already ready.
Args:
- βalready_ready (
Bool): Result from try_acquire_consumer().
wait_consumer_if_neededβ
wait_consumer_if_needed(self, already_ready: Bool)
Conditionally wait for consumer to free stage if not already ready.
Args:
- βalready_ready (
Bool): Result from try_acquire_producer().
producer_stageβ
consumer_stageβ
producer_mbarβ
producer_mbar(self, stage: UInt32) -> MbarPtr
Returns:
MbarPtr
consumer_mbarβ
consumer_mbar(self, stage: UInt32) -> MbarPtr
Returns:
MbarPtr
producerβ
producer[mut_origin: MutOrigin](ref[Payload] self) -> InputProducer[mut_origin, Payload, num_group_stages, k_group_size]
Get producer view for TMA Load warp.
Returns:
InputProducer[mut_origin, Payload, num_group_stages, k_group_size]
consumerβ
consumer[mut_origin: MutOrigin](ref[Payload] self) -> InputConsumer[mut_origin, Payload, num_group_stages, k_group_size]
Get consumer view for MMA warp.
Returns:
InputConsumer[mut_origin, Payload, num_group_stages, k_group_size]
acquire_producerβ
acquire_producer[mut_origin: MutOrigin](ref[Payload] self) -> InputProducerStage[mut_origin, Payload, num_group_stages, k_group_size]
Acquire a producer stage handle (linear type).
Waits for the consumer to free the current stage, then returns a linear type handle that MUST be released (compiler-enforced).
Usage: var tiles = pipeline.acquire_producer() load_tiles(tiles.payload(), tiles.stage(), tiles.barrier()) tiles^.release() # Advances to next stage
Returns:
InputProducerStage[mut_origin, Payload, num_group_stages, k_group_size]: An InputProducerStage handle that must be released.
acquire_consumerβ
acquire_consumer[mut_origin: MutOrigin](ref[Payload] self) -> InputConsumerStage[mut_origin, Payload, num_group_stages, k_group_size]
Acquire a consumer stage handle (linear type).
Waits for the producer to fill the current stage, then returns a linear type handle that MUST be released (compiler-enforced).
Usage: var tiles = pipeline.acquire_consumer() process_tiles(tiles.payload(), tiles.stage()) tiles^.release() # Signals complete and advances
Returns:
InputConsumerStage[mut_origin, Payload, num_group_stages, k_group_size]: An InputConsumerStage handle that must be released.
drain_producerβ
drain_producer(mut self)
Drain pipeline to prevent CTA exit while peer is still working.
Call this after all producer iterations are complete. This is the linear type equivalent of InputProducer.drain().
Was this page helpful?
Thank you! We'll create more content like this.
Thank you for helping us improve!