Skip to main content

Python module

pipeline

MAX pipeline for model inference and generation (Text Generation variant).

BatchInfo

class max.pipelines.lib.pipeline_variants.text_generation.BatchInfo(past_seq_lens, seq_lens, num_steps)

Information about a batch of requests passed to the pipeline

Parameters:

num_steps

num_steps: int

Number of steps to do in the pipeline

past_seq_lens

past_seq_lens: list[int]

Coordinated list of past sequence lengths (i.e. context lengths)

seq_lens

seq_lens: list[int]

Coordinated list of sequence lengths, i.e. prompt_len or 1

TextGenerationPipeline

class max.pipelines.lib.pipeline_variants.text_generation.TextGenerationPipeline(pipeline_config, pipeline_model, eos_token_id, weight_adapters, tokenizer)

Generalized token generator pipeline.

Initialize a text generation pipeline instance.

This sets up devices, the inference session, tokenizer, KV-cache manager, sampling kernel, and loads model weights and adapters.

Parameters:

  • pipeline_config (PipelineConfig) – Configuration for the pipeline and runtime behavior.
  • pipeline_model (type[PipelineModel[TextGenerationContextType]]) – Concrete model implementation to use for execution.
  • eos_token_id (int) – Default EOS token id used when HF config does not supply one or to seed the EOS set.
  • weight_adapters (dict[WeightsFormat, WeightsAdapter]) – Mapping from weights format to adapter implementation.
  • tokenizer (PipelineTokenizer[TextGenerationContextType, npt.NDArray[np.integer[Any]], TextGenerationRequest]) – Tokenizer implementation used to build contexts and decode.

Raises:

ValueError – If quantization_encoding is not configured in pipeline_config.model_config or if structured output is requested without a valid tokenizer delegate.

calculate_num_steps()

calculate_num_steps(num_steps, context)

Compute the number of generation steps allowed for a context.

The value is clamped by the remaining capacity with respect to the model’s configured max_seq_len.

Parameters:

  • num_steps (int) – Desired number of steps to attempt.
  • context (TextGenerationContextType) – The context whose sequence length constraints apply.

Returns:

The number of steps to execute for this context (>= 1).

Raises:

ValueError – If the current request length is already >= max_seq_len.

Return type:

int

execute()

execute(inputs)

Provided a batch, process batch inputs, execute the graph for num_steps in a multi-step scenario, then decode the tokens holistically and return the list of decoded tokens.

Parameters:

inputs (TextGenerationInputs[TextGenerationContextType])

Return type:

dict[RequestID, TextGenerationOutput]

initialize_bitmask()

initialize_bitmask(batch)

Allocate a per-request token bitmask for structured decoding.

Parameters:

  • batch_size – Number of requests in the batch.
  • batch (list[TextGenerationContextType])

Returns:

A bitmask array of shape [batch_size, vocab_size] if structured output is enabled; otherwise None.

Return type:

ndarray[tuple[int, …], dtype[int32]] | None

kv_managers

property kv_managers: list[Any]

Return the list of KV cache managers backing this pipeline.

pipeline_config

property pipeline_config: PipelineConfig

Return the pipeline configuration.

prepare_batch()

prepare_batch(batches, num_steps)

Prepare model inputs and ancillary state for multi-step execution.

This flattens replica batches, optionally initializes constrained decoding bitmasks, ensures KV-cache reservations, clamps num_steps per context, and builds initial model inputs.

Parameters:

  • batches (list[dict[RequestID, TextGenerationContextType]]) – Per-replica mapping of RequestID to context.
  • num_steps (int) – Desired number of steps to run.

Returns:

  • ModelInputs: Prepared inputs for the first step.
  • int: The clamped number of steps to run.
  • Optional[np.ndarray]: The structured decoding bitmask or None.
  • list[TextGenerationContextType]: The flattened context batch.

Return type:

A tuple of

release()

release(request_id)

Mark the context as complete, releasing the cache slot from the KV manager.

Parameters:

request_id (RequestID)

Return type:

None

tokenizer

property tokenizer: PipelineTokenizer[TextGenerationContextType, ndarray[tuple[int, ...], dtype[integer[Any]]], TextGenerationRequest]

Return the tokenizer used for building contexts and decoding.

update_context_and_prepare_responses()

update_context_and_prepare_responses(generated_tokens_host, batch_log_probabilities, flat_batch, num_steps, enable_log_probs)

Update the context objects and prepare the response objects for each context in the batch after generation.

Parameters:

  • generated_tokens_host (ndarray[tuple[int, ...], dtype[int32]]) – Array of generated tokens on the host, indexed as [batch, step].
  • batch_log_probabilities (list[list[LogProbabilities | None]]) – List of per-step log probability outputs (or None), each entry is a list per batch for that step.
  • flat_batch (list[TextGenerationContextType]) – List of generation contexts, one per request, matching batch dimension.
  • num_steps (int) – Number of generation steps to process for each context.
  • enable_log_probs (bool) – Whether to include log probability data in outputs.

Returns:

A dictionary mapping request IDs to their respective generation outputs.

Return type:

dict[RequestID, TextGenerationOutput]

update_for_structured_output()

update_for_structured_output(context, bitmask, index)

Update context and logits bitmask for structured output.

If a json_schema is present and no matcher is set, this compiles a grammar matcher and installs it on the context. It may also jump ahead in generation and fills the per-request token bitmask used to constrain the next-token distribution.

Parameters:

  • context (TextGenerationContextType) – Request context to update.
  • bitmask (ndarray[tuple[int, ...], dtype[int32]]) – Optional preallocated bitmask buffer; updated in-place.
  • index (int) – Global position into the bitmask for this request.

Raises:

ValueError – If a JSON schema is provided but structured output is not enabled via sampling configuration.

Return type:

None

StandaloneSpeculativeDecodingPipeline

final class max.pipelines.lib.speculative_decoding.StandaloneSpeculativeDecodingPipeline(pipeline_config, pipeline_model, eos_token_id, weight_adapters, tokenizer, draft_pipeline_model=None, draft_weight_adapters=None)

Bases: SpeculativeDecodingPipelineBase

Standalone speculative decoding where draft model runs independently.

In this approach, the draft model generates tokens without any information from the target model, then the target model verifies these tokens.

Parameters:

execute()

execute(inputs)

Execute standalone speculative decoding.

In standalone mode:

  1. Draft model generates tokens independently
  2. Target model verifies draft tokens
  3. Apply rejection sampling to accept/reject tokens

Parameters:

inputs (TextGenerationInputs[TextContext])

Return type:

dict[RequestID, TextGenerationOutput]

generate_draft_tokens()

generate_draft_tokens(batch, num_steps, model_inputs)

Parameters:

Return type:

tuple[int, Tensor, Tensor, ModelInputs, Tensor]

prepare_batch()

prepare_batch(model, batch, num_steps, return_n_logits, is_draft=False, draft_inputs=None, merged_draft_tokens=None, merged_draft_offsets=None)

Parameters:

Return type:

tuple[ModelInputs, int]

verify_draft_tokens_with_target_model()

verify_draft_tokens_with_target_model(draft_inputs, context_batch, num_draft_tokens_generated, draft_tokens, draft_logits, merged_draft_tokens, merged_draft_offsets, all_draft_logits)

Parameters:

Return type:

tuple[Tensor, Tensor, Tensor]

EmbeddingsPipeline

final class max.pipelines.lib.embeddings_pipeline.EmbeddingsPipeline(pipeline_config, pipeline_model, eos_token_id, weight_adapters, tokenizer)

Bases: Pipeline[EmbeddingsGenerationInputs, EmbeddingsGenerationOutput]

Generalized token generator pipeline.

Parameters:

execute()

execute(inputs)

Provided a batch, process batch inputs, execute the graph for num_steps in a multi-step scenario, then decode the tokens holistically and return the list of decoded tokens.

Parameters:

inputs (EmbeddingsGenerationInputs)

Return type:

dict[RequestID, EmbeddingsGenerationOutput]

release()

release(request_id)

Release any resources or state associated with a specific request.

This method should be implemented by concrete pipeline classes to perform cleanup or resource deallocation for the given request ID. It is typically called when a request has completed processing and its associated resources (such as memory, cache, or temporary files) are no longer needed.

Parameters:

request_id (RequestID) – The unique identifier of the request to release resources for.

Returns:

None

Raises:

NotImplementedError – If not implemented by a concrete subclass.

Return type:

None

Was this page helpful?