Mojo struct
ConsumerStage
struct ConsumerStage[pipeline_origin: MutOrigin, num_stages: Int]
Unified handle for consuming from a pipeline stage.
Works as both a linear type (direct use) and within context managers.
Lifecycle:
- Created via
pipeline.acquire_consumer()or context manager - Use
index()for consumption - Must call
release()to signal and advance (compiler-enforced)
Two exit paths:
release(): Signal consumption complete + advance (normal path)release_without_signal(): Advance only (for explicit signaling)
Parameters
- pipeline_origin (
MutOrigin): Origin of the pipeline reference. - num_stages (
Int): Number of pipeline stages.
Fields
- pipeline (
Pointer[ProducerConsumerPipeline[num_stages], pipeline_origin]):
Implemented traits
comptime members
__moveinit__is_trivial
comptime __moveinit__is_trivial = False
Methods
__init__
__init__(out self, pipeline: Pointer[ProducerConsumerPipeline[num_stages], pipeline_origin], index: UInt32, mbar: LegacyUnsafePointer[SharedMemBarrier, address_space=AddressSpace.SHARED])
__moveinit__
__moveinit__(out self, deinit other: Self)
Move constructor for Optional support.
index
mbar
mbar(self) -> MbarPtr
Get the barrier for manual signaling.
Use this for specialized signaling patterns like umma_arrive_leader_cta. For standard usage, just call release().
Returns:
MbarPtr
arrive
arrive(self)
Manually arrive on the consumer barrier.
Use for lane-guarded patterns: if lane_id() < CLUSTER_SIZE: stage.arrive() stage^.release_without_signal()
release
release(deinit self)
Signal consumption complete and advance to next stage.
This is the standard exit path. Equivalent to: arrive() consumer_step()
release_without_signal
release_without_signal(deinit self)
Advance to next stage WITHOUT signaling.
Use when you've already signaled via arrive() or specialized APIs.
Was this page helpful?
Thank you! We'll create more content like this.
Thank you for helping us improve!