Skip to main content

Mojo struct

ConsumerStage

struct ConsumerStage[pipeline_origin: MutOrigin, num_stages: Int]

Unified handle for consuming from a pipeline stage.

Works as both a linear type (direct use) and within context managers.

Lifecycle:

  1. Created via pipeline.acquire_consumer() or context manager
  2. Use index() for consumption
  3. Must call release() to signal and advance (compiler-enforced)

Two exit paths:

  • release(): Signal consumption complete + advance (normal path)
  • release_without_signal(): Advance only (for explicit signaling)

Parameters

  • pipeline_origin (MutOrigin): Origin of the pipeline reference.
  • num_stages (Int): Number of pipeline stages.

Fields

  • pipeline (Pointer[ProducerConsumerPipeline[num_stages], pipeline_origin]):

Implemented traits

AnyType, Movable

comptime members

__moveinit__is_trivial

comptime __moveinit__is_trivial = False

Methods

__init__

__init__(out self, pipeline: Pointer[ProducerConsumerPipeline[num_stages], pipeline_origin], index: UInt32, mbar: LegacyUnsafePointer[SharedMemBarrier, address_space=AddressSpace.SHARED])

__moveinit__

__moveinit__(out self, deinit other: Self)

Move constructor for Optional support.

index

index(self) -> UInt32

Get the current stage index.

Returns:

UInt32

mbar

mbar(self) -> MbarPtr

Get the barrier for manual signaling.

Use this for specialized signaling patterns like umma_arrive_leader_cta. For standard usage, just call release().

Returns:

MbarPtr

arrive

arrive(self)

Manually arrive on the consumer barrier.

Use for lane-guarded patterns: if lane_id() < CLUSTER_SIZE: stage.arrive() stage^.release_without_signal()

release

release(deinit self)

Signal consumption complete and advance to next stage.

This is the standard exit path. Equivalent to: arrive() consumer_step()

release_without_signal

release_without_signal(deinit self)

Advance to next stage WITHOUT signaling.

Use when you've already signaled via arrive() or specialized APIs.

Was this page helpful?