For the complete documentation index, see llms.txt. Markdown versions of all pages are available by appending .md to any URL (e.g. /max/get-started.md).
Mojo module
pipeline
Producer-consumer pipeline utilities for SM100 structured kernels.
This module provides pipeline synchronization primitives for warp-specialized GPU kernels, enabling efficient producer-consumer patterns between warps.
Key abstraction:
- ProducerConsumerPipeline: Low-level barrier management for N-stage pipelines
- ProducerStage / ConsumerStage: Unified stage handles (linear types)
Unified Stage Typesβ
ProducerStage and ConsumerStage are linear types (@explicit_destroy) that work
in both contexts:
-
Linear Type API (flat, explicit): var stage = pipeline.acquire_producer()
... use stage.index(), stage.mbar() ...
stage^.release() # Compiler enforces this call
-
Context Manager API (scoped, automatic): with pipeline.produce() as stage: # ... use stage.index(), stage.mbar() ...
release() called automatically
The context managers store the stage internally and return a ref to it,
allowing access to the full stage API while managing lifetime automatically.
API Examplesβ
Producer side (e.g., MMA warp producing to epilogue):
# Context manager:
with pipeline.produce() as stage:
mma_op.mma(a, b, tmem_offset)
mma_op.commit(stage.mbar())
# __exit__ calls stage^.release() -> producer_step()
# Linear type:
var stage = pipeline.acquire_producer()
mma_op.mma(a, b, tmem_offset)
mma_op.commit(stage.mbar())
stage^.release()Consumer side (e.g., epilogue consuming from MMA):
# Context manager:
with pipeline.consume() as stage:
process(stage.index())
# __exit__ calls stage^.release() -> arrive + consumer_step()
# Linear type:
var stage = pipeline.acquire_consumer()
process(stage.index())
stage^.release() # Signal + advance
# Explicit signaling:
var stage = pipeline.acquire_consumer()
if lane_id() < CLUSTER_SIZE:
stage.arrive()
stage^.release_without_signal() # Advance onlyDirect API (for special cases): pipeline.wait_producer() / wait_consumer() pipeline.producer_step() / consumer_step() pipeline.producer_mbar(stage) / consumer_mbar(stage)
comptime valuesβ
MbarPtrβ
comptime MbarPtr = UnsafePointer[SharedMemBarrier, MutAnyOrigin, address_space=AddressSpace.SHARED]
Structsβ
- β
ConsumeContext: Context manager for consuming one pipeline stage. - β
ConsumerStage: Unified handle for consuming from a pipeline stage. - β
ExplicitConsumeContext: Context manager for consuming with EXPLICIT barrier arrive. - β
ProduceContext: Context manager for producing one pipeline stage. - β
ProducerConsumerPipeline: A producer-consumer pipeline using shared memory barriers to enforce synchronization (between producer and consumer warps). - β
ProducerStage: Unified handle for producing to a pipeline stage.
Was this page helpful?
Thank you! We'll create more content like this.
Thank you for helping us improve!