Skip to main content

Mojo struct

InputTilePipeline

@register_passable(trivial) struct InputTilePipeline[Payload: TilePayload, num_group_stages: Int, k_group_size: Int]

Tile pipeline with configurable payload type.

Separates synchronization from tile storage. The Payload parameter (e.g., StandardTilePayload or BlockScaledTilePayload) holds tile arrays.

Fields

  • pipeline (InputTilePipeline[Payload, num_group_stages, k_group_size].Pipeline):
  • payload (Payload):

Implemented traits

AnyType, Copyable, ImplicitlyCopyable, ImplicitlyDestructible, Movable, RegisterPassable, TrivialRegisterPassable

comptime members

__copyinit__is_trivial

comptime __copyinit__is_trivial = Payload.__copyinit__is_trivial

__del__is_trivial

comptime __del__is_trivial = Payload.__del__is_trivial

__moveinit__is_trivial

comptime __moveinit__is_trivial = Payload.__moveinit__is_trivial

BarrierArray

comptime BarrierArray = SMemArray[SharedMemBarrier, (num_group_stages * 2)]

Pipeline

comptime Pipeline = ProducerConsumerPipeline[num_group_stages]

Methods

__init__

__init__(barriers: SMemArray[SharedMemBarrier, (num_group_stages * 2)], payload: Payload) -> Self

Initialize from typed barrier array and payload.

init_barriers

static init_barriers(storage_ptr: LegacyUnsafePointer[SharedMemBarrier, address_space=AddressSpace.SHARED], producer_arv_count: Int32, consumer_arv_count: Int32)

Initialize pipeline barriers. Called once by elect_one thread.

acquire_producer

acquire_producer(mut self) -> Tuple[UInt32, MbarPtr]

Wait for slot availability and return (stage, barrier).

Returns:

Tuple

release_producer

release_producer(mut self)

Signal completion and advance producer stage.

acquire_consumer

acquire_consumer(mut self) -> Tuple[UInt32, MbarPtr]

Wait for data availability and return (stage, barrier).

Returns:

Tuple

release_consumer

release_consumer(mut self)

Signal completion and advance consumer stage.

try_acquire_producer

try_acquire_producer(self) -> Bool

Non-blocking check if next producer stage is available.

Example (TMA Load warp): var ready = pipeline.try_acquire_producer() # ... do other work while potentially waiting ... pipeline.wait_producer_if_needed(ready) var stage = pipeline.producer_stage() # ... load tiles ...

Returns:

Bool: True if consumer has freed the stage, False otherwise.

try_acquire_consumer

try_acquire_consumer(self) -> Bool

Non-blocking check if next consumer stage has data.

Example (MMA warp): var ready = pipeline.try_acquire_consumer() # ... do other work while potentially waiting ... pipeline.wait_consumer_if_needed(ready) var stage = pipeline.consumer_stage() # ... process tiles ...

Returns:

Bool: True if producer has filled the stage, False otherwise.

wait_producer_if_needed

wait_producer_if_needed(self, already_ready: Bool)

Conditionally wait for producer stage if not already ready.

Args:

  • already_ready (Bool): Result from try_acquire_consumer().

wait_consumer_if_needed

wait_consumer_if_needed(self, already_ready: Bool)

Conditionally wait for consumer to free stage if not already ready.

Args:

  • already_ready (Bool): Result from try_acquire_producer().

producer_stage

producer_stage(self) -> UInt32

Returns:

UInt32

consumer_stage

consumer_stage(self) -> UInt32

Returns:

UInt32

producer_mbar

producer_mbar(self, stage: UInt32) -> MbarPtr

Returns:

MbarPtr

consumer_mbar

consumer_mbar(self, stage: UInt32) -> MbarPtr

Returns:

MbarPtr

producer

producer[mut_origin: MutOrigin](ref[mut_origin] self) -> InputProducer[mut_origin, Payload, num_group_stages, k_group_size]

Get producer view for TMA Load warp.

Returns:

InputProducer

consumer

consumer[mut_origin: MutOrigin](ref[mut_origin] self) -> InputConsumer[mut_origin, Payload, num_group_stages, k_group_size]

Get consumer view for MMA warp.

Returns:

InputConsumer

acquire_producer_linear

acquire_producer_linear[mut_origin: MutOrigin](ref[mut_origin] self) -> InputProducerStage[mut_origin, Payload, num_group_stages, k_group_size]

Acquire a producer stage handle using linear types.

Waits for the consumer to free the current stage, then returns a linear type handle that MUST be released (compiler-enforced).

Usage: var tiles = pipeline.acquire_producer_linear() load_tiles(tiles.payload(), tiles.stage(), tiles.barrier()) tiles^.release() # Advances to next stage

Returns:

InputProducerStage: An InputProducerStage handle that must be released.

acquire_consumer_linear

acquire_consumer_linear[mut_origin: MutOrigin](ref[mut_origin] self) -> InputConsumerStage[mut_origin, Payload, num_group_stages, k_group_size]

Acquire a consumer stage handle using linear types.

Waits for the producer to fill the current stage, then returns a linear type handle that MUST be released (compiler-enforced).

Usage: var tiles = pipeline.acquire_consumer_linear() process_tiles(tiles.payload(), tiles.stage()) tiles^.release() # Signals complete and advances

Returns:

InputConsumerStage: An InputConsumerStage handle that must be released.

drain_producer

drain_producer(mut self)

Drain pipeline to prevent CTA exit while peer is still working.

Call this after all producer iterations are complete. This is the linear type equivalent of InputProducer.drain().

Was this page helpful?