Skip to main content

Mojo struct

RolePipeline

@register_passable(trivial) struct RolePipeline[number_of_stages: Int, is_producer: Bool = True, producer_sub_stages: Int = 1, consumer_sub_stages: Int = 1]

Unified producer/consumer pipeline for barrier synchronization.

Producer role: Starts with phase=1, uses acquire/commit methods. Consumer role: Starts with phase=0, uses wait/release methods.

Sub-stages allow multiple barriers per stage:

  • Total producer barriers: num_stages * producer_sub_stages
  • Total consumer barriers: num_stages * consumer_sub_stages

Synchronization behavior (example with num_stages=1):

Producer: p0. consumer_mbar.wait(phase=1) # 1 != 0: falls through p1. producer_mbar.commit() # producer_mbar.phase=1 p2. step() # phase = 0 p3. consumer_mbar.wait(phase=0) # 0 == 0: blocked until c1 ...

Consumer: c0. producer_mbar.wait(phase=0) # 0 == 0: blocked until p1 c1. consumer.release() # consumer_mbar.phase=1 c2. step() # phase = 1 ...

Fields

  • producer_mbar_base (MBarType):
  • consumer_mbar_base (MBarType):
  • state (PipelineState[RolePipeline[number_of_stages, is_producer, producer_sub_stages, consumer_sub_stages].num_stages]):

Implemented traits

AnyType, Copyable, ImplicitlyCopyable, ImplicitlyDestructible, Movable, RegisterPassable, TrivialRegisterPassable

comptime members

__copyinit__is_trivial

comptime __copyinit__is_trivial = True

__del__is_trivial

comptime __del__is_trivial = True

__moveinit__is_trivial

comptime __moveinit__is_trivial = True

num_stages

comptime num_stages = number_of_stages

Methods

__init__

__init__(producer_mbar_base: UnsafePointer[SharedMemBarrier, MutAnyOrigin, address_space=AddressSpace.SHARED], consumer_mbar_base: UnsafePointer[SharedMemBarrier, MutAnyOrigin, address_space=AddressSpace.SHARED]) -> Self

producer_mbar

producer_mbar[sub_stage_idx: Int = 0](self) -> MBarType

Get producer mbar for current stage and optional sub-stage.

Parameters:

  • sub_stage_idx (Int): Sub-stage index (0 to producer_sub_stages-1).

Returns:

MBarType

consumer_mbar

consumer_mbar[sub_stage_idx: Int = 0](self) -> MBarType

Get consumer mbar for current stage and optional sub-stage.

Parameters:

  • sub_stage_idx (Int): Sub-stage index (0 to consumer_sub_stages-1).

Returns:

MBarType

acquire

acquire[sub_stage_idx: Int = 0](self)

Wait until consumer has released the buffer. Producer-only.

commit

commit(mut self)

Commit production and step. Producer-only.

commit_mma

commit_mma(self)

Commit via MMA arrive using elected thread. Producer-only.

commit_mma(self, elect: Int32)

Commit via MMA arrive with explicit elect value. Producer-only.

wait

wait(self)

Wait for producer to complete. Consumer-only.

release

release[sub_stage_idx: Int = 0](mut self)

Release buffer at sub-stage and step. Consumer-only.

release_no_step

release_no_step[sub_stage_idx: Int = 0](self)

Release buffer without stepping. For multi-sub-stage release.

step

step(mut self)

Was this page helpful?