Skip to main content

Mojo struct

ProducerConsumerPipeline

@register_passable(trivial) struct ProducerConsumerPipeline[num_stages: Int]

A producer-consumer pipeline using shared memory barriers to enforce synchronization (between producer and consumer warps).

This struct is commonly used with warp specialization to pipeline operations between two warps/warpgroups with data dependencies.

Parameters

  • num_stages (Int): The number of pipeline stages.

Fields

  • full (UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)]):
  • empty (UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)]):

Implemented traits

AnyType, Copyable, ImplicitlyCopyable, Movable, UnknownDestructibility

Aliases

__copyinit__is_trivial

alias __copyinit__is_trivial = UInt32.__copyinit__is_trivial if UInt32.__copyinit__is_trivial if UInt32.__copyinit__is_trivial if UInt32.__copyinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UInt32.__copyinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UInt32.__copyinit__is_trivial if UInt32.__copyinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UInt32.__copyinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UInt32.__copyinit__is_trivial if UInt32.__copyinit__is_trivial if UInt32.__copyinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UInt32.__copyinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UInt32.__copyinit__is_trivial if UInt32.__copyinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UInt32.__copyinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial

__del__is_trivial

alias __del__is_trivial = UInt32.__del__is_trivial if UInt32.__del__is_trivial if UInt32.__del__is_trivial if UInt32.__del__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UInt32.__del__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UInt32.__del__is_trivial if UInt32.__del__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UInt32.__del__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UInt32.__del__is_trivial if UInt32.__del__is_trivial if UInt32.__del__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UInt32.__del__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UInt32.__del__is_trivial if UInt32.__del__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UInt32.__del__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial

__moveinit__is_trivial

alias __moveinit__is_trivial = UInt32.__moveinit__is_trivial if UInt32.__moveinit__is_trivial if UInt32.__moveinit__is_trivial if UInt32.__moveinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UInt32.__moveinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UInt32.__moveinit__is_trivial if UInt32.__moveinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UInt32.__moveinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UInt32.__moveinit__is_trivial if UInt32.__moveinit__is_trivial if UInt32.__moveinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UInt32.__moveinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UInt32.__moveinit__is_trivial if UInt32.__moveinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UInt32.__moveinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial

Methods

__init__

__init__(ptr: UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)]) -> Self

Initialize the producer-consumer pipeline with default phases.

Args:

wait_producer

wait_producer(self)

Consumer waits for producer.

wait_consumer

wait_consumer(self)

Producer waits for consumer.

producer_mbar

producer_mbar(self, stage: UInt32) -> UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)]

Get the producer barrier for a specific stage.

Args:

  • stage (UInt32): The pipeline stage.

Returns:

UnsafePointer: The shared memory barrier that the producer signals.

consumer_mbar

consumer_mbar(self, stage: UInt32) -> UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)]

Get the consumer barrier for a specific stage.

Args:

  • stage (UInt32): The pipeline stage.

Returns:

UnsafePointer: The shared memory barrier that the consumer signals.

producer_stage

producer_stage(self) -> UInt32

Get the current producer stage index.

Returns:

UInt32: The current stage index for the producer (0 to num_stages-1).

consumer_stage

consumer_stage(self) -> UInt32

Get the current consumer stage index.

Returns:

UInt32: The current stage index for the consumer (0 to num_stages-1).

consumer_step

consumer_step(mut self)

Advance the consumer to the next pipeline stage.

Increments the consumer stage and wraps to 0 when reaching num_stages, toggling the phase bit on wrap-around. Only switch phase at end of pipeline because we assume all barriers are at the same consumer/producer phase before checked. Once checked, the execution moves to next barrier.

producer_step

producer_step(mut self)

Advance the producer to the next pipeline stage.

Increments the producer stage and wraps to 0 when reaching num_stages, toggling the phase bit on wrap-around.

smem_bytes

static smem_bytes() -> UInt32

Calculate the shared memory bytes required for pipeline barriers.

Returns:

UInt32: The total number of bytes needed for all pipeline barriers (2 * num_stages barriers).

init_mbars

init_mbars(self, producer_arrive_count: Int32, consumer_arrive_count: Int32)

Initialize the smem barriers for the producer and consumer.

This function must be called by a single thread and must be called before any the pipeline object is used.

Args:

  • producer_arrive_count (Int32): The number of threads that will arrive at the barrier marking data as produced.
  • consumer_arrive_count (Int32): The number of threads that will arrive at the barrier marking data as consumed.

Was this page helpful?