For the complete documentation index, see llms.txt. Markdown versions of all pages are available by appending .md to any URL (e.g. /max/get-started.md).
Mojo struct
ProducerConsumerPipeline
struct ProducerConsumerPipeline[num_stages: Int]
A producer-consumer pipeline using shared memory barriers to enforce synchronization (between producer and consumer warps).
This struct is commonly used with warp specialization to pipeline operations between two warps/warpgroups with data dependencies.
Parametersβ
- βnum_stages (
Int): The number of pipeline stages.
Fieldsβ
- βfull (
MbarPtr): - βempty (
MbarPtr):
Implemented traitsβ
AnyType,
Copyable,
ImplicitlyCopyable,
ImplicitlyDestructible,
Movable,
RegisterPassable,
TrivialRegisterPassable
Methodsβ
__init__β
__init__(ptr: UnsafePointer[SharedMemBarrier, MutAnyOrigin, address_space=AddressSpace.SHARED]) -> Self
Initialize the producer-consumer pipeline with default phases.
Args:
- βptr (
UnsafePointer[SharedMemBarrier, MutAnyOrigin, address_space=AddressSpace.SHARED]): Pointer to shared memory barriers.
wait_producerβ
wait_producer(self)
Consumer waits for producer.
wait_consumerβ
wait_consumer(self)
Producer waits for consumer.
try_wait_producerβ
try_wait_producer(self) -> Bool
Non-blocking check if producer data is ready.
Note: Use this with wait_producer_if_needed() for the try-acquire pattern:
var ready = pipeline.try_wait_producer()
# ... do other work ...
pipeline.wait_producer_if_needed(ready)Returns:
Bool: True if the producer has filled the current stage, False otherwise.
try_wait_consumerβ
try_wait_consumer(self) -> Bool
Non-blocking check if consumer has freed the stage.
Note: Use this with wait_consumer_if_needed() for the try-acquire pattern.
Returns:
Bool: True if the consumer has freed the current stage, False otherwise.
wait_producer_if_neededβ
wait_producer_if_needed(self, already_ready: Bool)
Conditionally wait for producer if not already ready.
Args:
- βalready_ready (
Bool): Result from try_wait_producer(). If True, skips waiting.
wait_consumer_if_neededβ
wait_consumer_if_needed(self, already_ready: Bool)
Conditionally wait for consumer if not already ready.
Args:
- βalready_ready (
Bool): Result from try_wait_consumer(). If True, skips waiting.
producer_mbarβ
producer_mbar(self, stage: UInt32) -> MbarPtr
Get the producer barrier for a specific stage.
Args:
- βstage (
UInt32): The pipeline stage.
Returns:
MbarPtr: The shared memory barrier that the producer signals.
consumer_mbarβ
consumer_mbar(self, stage: UInt32) -> MbarPtr
Get the consumer barrier for a specific stage.
Args:
- βstage (
UInt32): The pipeline stage.
Returns:
MbarPtr: The shared memory barrier that the consumer signals.
producer_stageβ
producer_stage(self) -> UInt32
Get the current producer stage index.
Returns:
UInt32: The current stage index for the producer (0 to num_stages-1).
consumer_stageβ
consumer_stage(self) -> UInt32
Get the current consumer stage index.
Returns:
UInt32: The current stage index for the consumer (0 to num_stages-1).
consumer_stepβ
consumer_step(mut self)
Advance the consumer to the next pipeline stage.
Increments the consumer stage and wraps to 0 when reaching num_stages, toggling the phase bit on wrap-around. Only switch phase at end of pipeline because we assume all barriers are at the same consumer/producer phase before checked. Once checked, the execution moves to next barrier.
producer_stepβ
producer_step(mut self)
Advance the producer to the next pipeline stage.
Increments the producer stage and wraps to 0 when reaching num_stages, toggling the phase bit on wrap-around.
smem_bytesβ
static smem_bytes() -> UInt32
Calculate the shared memory bytes required for pipeline barriers.
Returns:
UInt32: The total number of bytes needed for all pipeline barriers
(2 * num_stages barriers).
init_mbarsβ
init_mbars(self, producer_arrive_count: Int32, consumer_arrive_count: Int32)
Initialize the smem barriers for the producer and consumer.
This function must be called by a single thread and must be called before any the pipeline object is used.
Args:
producer_signal_and_stepβ
producer_signal_and_step(mut self)
Wait for consumer, signal production, and advance stage.
Combined operation for CLC throttling (Load warp):
- Wait for consumer to finish with current stage
- Signal that producer has new data
- Advance to next stage
consumer_signal_and_stepβ
consumer_signal_and_step(mut self)
Wait for producer, signal consumption, and advance stage.
Combined operation for CLC throttling (Scheduler warp):
- Wait for producer to have data ready
- Signal that consumer has consumed data
- Advance to next stage
produceβ
produce[origin: MutOrigin, //](ref[num_stages] self) -> ProduceContext[origin, num_stages]
Produce one pipeline stage with encapsulated barriers.
Usage: with pipeline.produce() as stage: # stage.index() gives current stage # stage.mbar() gives barrier for signaling # exit calls producer_step()
Returns:
ProduceContext[origin, num_stages]: Context that waits for consumer on enter, advances on exit.
consumeβ
consume[origin: MutOrigin, //](ref[num_stages] self) -> ConsumeContext[origin, num_stages]
Consume one pipeline stage with encapsulated barriers.
Usage: with pipeline.consume() as stage: # stage.index() gives current stage # exit signals consumer done and advances
Returns:
ConsumeContext[origin, num_stages]: Context that waits for producer on enter, signals+advances on exit.
consume_explicitβ
consume_explicit[origin: MutOrigin, //](ref[num_stages] self) -> ExplicitConsumeContext[origin, num_stages]
Consume one pipeline stage with EXPLICIT barrier arrive.
Use this for kernels requiring lane-guarded or specialized signaling.
Usage: with pipeline.consume_explicit() as stage: # ... do work ... if lane_id() < CLUSTER_SIZE: stage.arrive() # Lane-guarded arrive # exit only advances, does NOT arrive
For specialized signaling (e.g., umma_arrive_leader_cta): with pipeline.consume_explicit() as stage: if cta_group == 1: stage.arrive() else: umma_arrive_leader_cta(stage.mbar())
Returns:
ExplicitConsumeContext[origin, num_stages]: Context that waits for producer on enter, advances only on exit.
acquire_producerβ
acquire_producer[origin: MutOrigin, //](ref[num_stages] self) -> ProducerStage[origin, num_stages]
Acquire a producer stage handle using linear types.
Waits for the consumer to free the current stage, then returns a linear type handle that MUST be released (compiler-enforced).
Usage: var stage = pipeline.acquire_producer() # ... produce data, signal via stage.mbar() ... stage^.release() # Advances to next stage
Returns:
ProducerStage[origin, num_stages]: A ProducerStage handle that must be released.
acquire_consumerβ
acquire_consumer[origin: MutOrigin, //](ref[num_stages] self) -> ConsumerStage[origin, num_stages]
Acquire a consumer stage handle using linear types.
Waits for the producer to fill the current stage, then returns a linear type handle that MUST be released (compiler-enforced).
Usage: var stage = pipeline.acquire_consumer() # ... consume data ... stage^.release() # Signals complete and advances
For explicit signaling: var stage = pipeline.acquire_consumer() # ... consume data ... if lane_id() < CLUSTER_SIZE: stage.arrive() stage^.release_without_signal()
Returns:
ConsumerStage[origin, num_stages]: A ConsumerStage handle that must be released.
Was this page helpful?
Thank you! We'll create more content like this.
Thank you for helping us improve!