Mojo struct
ProducerConsumerPipeline
@register_passable(trivial)
struct ProducerConsumerPipeline[num_stages: Int]
A producer-consumer pipeline using shared memory barriers to enforce synchronization (between producer and consumer warps).
This struct is commonly used with warp specialization to pipeline operations between two warps/warpgroups with data dependencies.
Parameters
- num_stages (
Int
): The number of pipeline stages.
Fields
- full (
UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)]
): - empty (
UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)]
):
Implemented traits
AnyType
,
Copyable
,
ImplicitlyCopyable
,
Movable
,
UnknownDestructibility
Aliases
__copyinit__is_trivial
alias __copyinit__is_trivial = UInt32.__copyinit__is_trivial if UInt32.__copyinit__is_trivial if UInt32.__copyinit__is_trivial if UInt32.__copyinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UInt32.__copyinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UInt32.__copyinit__is_trivial if UInt32.__copyinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UInt32.__copyinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UInt32.__copyinit__is_trivial if UInt32.__copyinit__is_trivial if UInt32.__copyinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UInt32.__copyinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UInt32.__copyinit__is_trivial if UInt32.__copyinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UInt32.__copyinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__copyinit__is_trivial
__del__is_trivial
alias __del__is_trivial = UInt32.__del__is_trivial if UInt32.__del__is_trivial if UInt32.__del__is_trivial if UInt32.__del__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UInt32.__del__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UInt32.__del__is_trivial if UInt32.__del__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UInt32.__del__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UInt32.__del__is_trivial if UInt32.__del__is_trivial if UInt32.__del__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UInt32.__del__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UInt32.__del__is_trivial if UInt32.__del__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UInt32.__del__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__del__is_trivial
__moveinit__is_trivial
alias __moveinit__is_trivial = UInt32.__moveinit__is_trivial if UInt32.__moveinit__is_trivial if UInt32.__moveinit__is_trivial if UInt32.__moveinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UInt32.__moveinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UInt32.__moveinit__is_trivial if UInt32.__moveinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UInt32.__moveinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UInt32.__moveinit__is_trivial if UInt32.__moveinit__is_trivial if UInt32.__moveinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UInt32.__moveinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UInt32.__moveinit__is_trivial if UInt32.__moveinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UInt32.__moveinit__is_trivial if UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial else UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)].__moveinit__is_trivial
Methods
__init__
__init__(ptr: UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)]) -> Self
Initialize the producer-consumer pipeline with default phases.
Args:
- ptr (
UnsafePointer
): Pointer to shared memory barriers.
wait_producer
wait_producer(self)
Consumer waits for producer.
wait_consumer
wait_consumer(self)
Producer waits for consumer.
producer_mbar
producer_mbar(self, stage: UInt32) -> UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)]
Get the producer barrier for a specific stage.
Args:
- stage (
UInt32
): The pipeline stage.
Returns:
UnsafePointer
: The shared memory barrier that the producer signals.
consumer_mbar
consumer_mbar(self, stage: UInt32) -> UnsafePointer[SharedMemBarrier, address_space=AddressSpace(3)]
Get the consumer barrier for a specific stage.
Args:
- stage (
UInt32
): The pipeline stage.
Returns:
UnsafePointer
: The shared memory barrier that the consumer signals.
producer_stage
producer_stage(self) -> UInt32
Get the current producer stage index.
Returns:
UInt32
: The current stage index for the producer (0 to num_stages-1).
consumer_stage
consumer_stage(self) -> UInt32
Get the current consumer stage index.
Returns:
UInt32
: The current stage index for the consumer (0 to num_stages-1).
consumer_step
consumer_step(mut self)
Advance the consumer to the next pipeline stage.
Increments the consumer stage and wraps to 0 when reaching num_stages, toggling the phase bit on wrap-around. Only switch phase at end of pipeline because we assume all barriers are at the same consumer/producer phase before checked. Once checked, the execution moves to next barrier.
producer_step
producer_step(mut self)
Advance the producer to the next pipeline stage.
Increments the producer stage and wraps to 0 when reaching num_stages, toggling the phase bit on wrap-around.
smem_bytes
static smem_bytes() -> UInt32
Calculate the shared memory bytes required for pipeline barriers.
Returns:
UInt32
: The total number of bytes needed for all pipeline barriers
(2 * num_stages barriers).
init_mbars
init_mbars(self, producer_arrive_count: Int32, consumer_arrive_count: Int32)
Initialize the smem barriers for the producer and consumer.
This function must be called by a single thread and must be called before any the pipeline object is used.
Args:
Was this page helpful?
Thank you! We'll create more content like this.
Thank you for helping us improve!