Skip to main content

Python module

pipeline

MAX pipeline for model inference and generation (Text Generation variant).

BatchInfo

class max.pipelines.lib.pipeline_variants.text_generation.BatchInfo(past_seq_lens, seq_lens, num_steps)

Information about a batch of requests passed to the pipeline

Parameters:

num_steps

num_steps: int

Number of steps to do in the pipeline

past_seq_lens

past_seq_lens: list[int]

Coordinated list of past sequence lengths (i.e. context lengths)

seq_lens

seq_lens: list[int]

Coordinated list of sequence lengths, i.e. prompt_len or 1

TextGenerationPipeline

class max.pipelines.lib.pipeline_variants.text_generation.TextGenerationPipeline(pipeline_config, pipeline_model, eos_token_id, weight_adapters, tokenizer)

Generalized token generator pipeline.

Initialize a text generation pipeline instance.

This sets up devices, the inference session, tokenizer, KV-cache manager, sampling kernel, and loads model weights and adapters.

Parameters:

  • pipeline_config (PipelineConfig) – Configuration for the pipeline and runtime behavior.
  • pipeline_model (type[PipelineModel[TextGenerationContextType]]) – Concrete model implementation to use for execution.
  • eos_token_id (int) – Default EOS token id used when HF config does not supply one or to seed the EOS set.
  • weight_adapters (dict[WeightsFormat, WeightsAdapter]) – Mapping from weights format to adapter implementation.
  • tokenizer (PipelineTokenizer[TextGenerationContextType, npt.NDArray[np.integer[Any]], TextGenerationRequest]) – Tokenizer implementation used to build contexts and decode.

Raises:

ValueError – If quantization_encoding is not configured in pipeline_config.model or if structured output is requested without a valid tokenizer delegate.

execute()

execute(inputs)

Provided a batch, process batch inputs, execute the graph for num_steps in a multi-step scenario, then decode the tokens holistically and return the list of decoded tokens.

Parameters:

inputs (TextGenerationInputs[TextGenerationContextType])

Return type:

dict[RequestID, TextGenerationOutput]

initialize_bitmask()

initialize_bitmask(batch)

Allocate a per-request token bitmask for structured decoding.

Parameters:

  • batch_size – Number of requests in the batch.
  • batch (list[TextGenerationContextType])

Returns:

A bitmask array of shape [batch_size, vocab_size] if structured output is enabled; otherwise None.

Return type:

ndarray[tuple[Any, …], dtype[int32]] | None

kv_managers

property kv_managers: list[Any]

Return the list of KV cache managers backing this pipeline.

pipeline_config

property pipeline_config: PipelineConfig

Return the pipeline configuration.

prepare_batch()

prepare_batch(batches, num_steps)

Prepare model inputs and ancillary state for multi-step execution.

This flattens replica batches, optionally initializes constrained decoding bitmasks, ensures KV-cache reservations, clamps num_steps per context, and builds initial model inputs.

Parameters:

  • batches (list[list[TextGenerationContextType]]) – Per-replica list of contexts.
  • num_steps (int) – Desired number of steps to run.

Returns:

  • ModelInputs: Prepared inputs for the first step.
  • int: The clamped number of steps to run.
  • Optional[np.ndarray]: The structured decoding bitmask or None.
  • list[TextGenerationContextType]: The flattened context batch.

Return type:

A tuple of

release()

release(request_id)

Mark the context as complete, releasing the cache slot from the KV manager.

Note: KV cache lifecycle is now managed by the scheduler. This method is kept for interface compatibility but is a no-op for regular pipelines.

Parameters:

request_id (RequestID)

Return type:

None

tokenizer

property tokenizer: PipelineTokenizer[TextGenerationContextType, ndarray[tuple[Any, ...], dtype[integer[Any]]], TextGenerationRequest]

Return the tokenizer used for building contexts and decoding.

update_for_structured_output()

update_for_structured_output(context, bitmask, index)

Update context and logits bitmask for structured output.

If a json_schema is present and no matcher is set, this compiles a grammar matcher and installs it on the context. It may also jump ahead in generation and fills the per-request token bitmask used to constrain the next-token distribution.

Parameters:

  • context (TextGenerationContextType) – Request context to update.
  • bitmask (ndarray[tuple[Any, ...], dtype[int32]]) – Optional preallocated bitmask buffer; updated in-place.
  • index (int) – Global position into the bitmask for this request.

Raises:

ValueError – If a JSON schema is provided but structured output is not enabled via sampling configuration.

Return type:

None

StandaloneSpeculativeDecodingPipeline

final class max.pipelines.lib.speculative_decoding.StandaloneSpeculativeDecodingPipeline(pipeline_config, pipeline_model, eos_token_id, weight_adapters, tokenizer, draft_pipeline_model=None, draft_weight_adapters=None)

Bases: SpeculativeDecodingPipelineBase

Standalone speculative decoding where draft model runs independently.

In this approach, the draft model generates tokens without any information from the target model, then the target model verifies these tokens.

Parameters:

execute()

execute(inputs)

Execute standalone speculative decoding.

In standalone mode:

  1. Draft model generates tokens independently
  2. Target model verifies draft tokens
  3. Apply rejection sampling to accept/reject tokens

Parameters:

inputs (TextGenerationInputs[TextContext])

Return type:

dict[RequestID, TextGenerationOutput]

generate_draft_tokens()

generate_draft_tokens(batch, num_steps, model_inputs)

Parameters:

Return type:

tuple[int, Buffer, Buffer, ModelInputs, Buffer]

prepare_batch()

prepare_batch(model, batch, num_steps, return_n_logits, is_draft=False, draft_inputs=None, merged_draft_tokens=None, merged_draft_offsets=None)

Parameters:

Return type:

tuple[ModelInputs, int]

verify_draft_tokens_with_target_model()

verify_draft_tokens_with_target_model(draft_inputs, context_batch, num_draft_tokens_generated, draft_tokens, draft_logits, merged_draft_tokens, merged_draft_offsets, all_draft_logits)

Parameters:

Return type:

tuple[Buffer, Buffer, Buffer]

EmbeddingsPipeline

final class max.pipelines.lib.embeddings_pipeline.EmbeddingsPipeline(pipeline_config, pipeline_model, eos_token_id, weight_adapters, tokenizer)

Bases: Pipeline[EmbeddingsGenerationInputs, EmbeddingsGenerationOutput]

Generalized token generator pipeline.

Parameters:

execute()

execute(inputs)

Provided a batch, process batch inputs, execute the graph for num_steps in a multi-step scenario, then decode the tokens holistically and return the list of decoded tokens.

Parameters:

inputs (EmbeddingsGenerationInputs)

Return type:

dict[RequestID, EmbeddingsGenerationOutput]

release()

release(request_id)

Release any resources or state associated with a specific request.

This method should be implemented by concrete pipeline classes to perform cleanup or resource deallocation for the given request ID. It is typically called when a request has completed processing and its associated resources (such as memory, cache, or temporary files) are no longer needed.

Parameters:

request_id (RequestID) – The unique identifier of the request to release resources for.

Returns:

None

Raises:

NotImplementedError – If not implemented by a concrete subclass.

Return type:

None

Was this page helpful?