IMPORTANT: To view this page as Markdown, append `.md` to the URL (e.g. /max/get-started.md). For the complete documentation index, see llms.txt.
Skip to main content
For the complete documentation index, see llms.txt. Markdown versions of all pages are available by appending .md to any URL (e.g. /max/get-started.md).

Mojo struct

RolePipeline

struct RolePipeline[number_of_stages: Int, is_producer: Bool = True, producer_sub_stages: Int = 1, consumer_sub_stages: Int = 1, cta_group: Int = 1]

Unified producer/consumer pipeline for barrier synchronization.

Producer role: Starts with phase=1, uses acquire/commit methods. Consumer role: Starts with phase=0, uses wait/release methods.

Sub-stages allow multiple barriers per stage:

  • Total producer barriers: num_stages * producer_sub_stages
  • Total consumer barriers: num_stages * consumer_sub_stages

Synchronization behavior (example with num_stages=1):

Producer: p0. consumer_mbar.wait(phase=1) # 1 != 0: falls through p1. producer_mbar.commit() # producer_mbar.phase=1 p2. step() # phase = 0 p3. consumer_mbar.wait(phase=0) # 0 == 0: blocked until c1 ...

Consumer: c0. producer_mbar.wait(phase=0) # 0 == 0: blocked until p1 c1. consumer.release() # consumer_mbar.phase=1 c2. step() # phase = 1 ...

Fields​

  • ​producer_mbar_base (MBarType):
  • ​consumer_mbar_base (MBarType):
  • ​state (PipelineState[RolePipeline[number_of_stages, is_producer, producer_sub_stages, consumer_sub_stages, cta_group].num_stages]):

Implemented traits​

AnyType, Copyable, ImplicitlyCopyable, ImplicitlyDeletable, Movable, RegisterPassable, TrivialRegisterPassable

comptime members​

num_stages​

comptime num_stages = number_of_stages

Methods​

__init__​

def __init__(producer_mbar_base: UnsafePointer[SharedMemBarrier, MutAnyOrigin, address_space=AddressSpace.SHARED], consumer_mbar_base: UnsafePointer[SharedMemBarrier, MutAnyOrigin, address_space=AddressSpace.SHARED]) -> Self

producer_mbar​

def producer_mbar[sub_stage_idx: Int = 0](self) -> MBarType

Get producer mbar for current stage and optional sub-stage.

Parameters:

  • ​sub_stage_idx (Int): Sub-stage index (0 to producer_sub_stages-1).

Returns:

MBarType

consumer_mbar​

def consumer_mbar[sub_stage_idx: Int = 0](self) -> MBarType

Get consumer mbar for current stage and optional sub-stage.

Parameters:

  • ​sub_stage_idx (Int): Sub-stage index (0 to consumer_sub_stages-1).

Returns:

MBarType

acquire​

def acquire[sub_stage_idx: Int = 0](self)

Wait until consumer has released the buffer. Producer-only.

commit​

def commit(mut self)

Commit production and step. Producer-only.

commit_mma​

def commit_mma(self)

Commit via MMA arrive using elected thread. Producer-only.

def commit_mma(self, elect: Int32)

Commit via MMA arrive with explicit elect value. Producer-only.

wait​

def wait(self)

Wait for producer to complete. Consumer-only.

release​

def release[sub_stage_idx: Int = 0](mut self)

Release buffer at sub-stage and step. Consumer-only.

release_no_step​

def release_no_step[sub_stage_idx: Int = 0](self)

Release buffer without stepping. For multi-sub-stage release.

step​

def step(mut self)