Skip to main content

Python module

transfer_engine

KVCache Transfer Engine

KVTransferEngine

class max.kv_cache.paged_cache.transfer_engine.KVTransferEngine(name, tensors, *, total_num_pages, listen_port=None)

KVCache Transfer Engine.

Currently this is only tested on CPU and supports single or multiple Paged tensors.

The TransferEngine communicates with other TransferEngines in other threads or processes. However, individual TransferEngines themselves are not thread-safe. It is intended to be used by MAX’s single-threaded scheduler.

Parameters:

cleanup()

cleanup()

Release all resources associated with the transfer engine.

Should be called before the transfer engine is garbage collected. Moving this logic into the __del__ destructor does causes a UCX error for unknown reasons.

Return type:

None

cleanup_transfer()

cleanup_transfer(transfer_req)

Cleanup a transfer. This should be called after a transfer is complete.

Parameters:

transfer_req (TransferReqData) – The transfer request to cleanup.

Return type:

None

completed_recv_transfers

completed_recv_transfers: dict[str, dict[str, int]]

Map of agent names to completed recv transfers.

connect()

connect(remote)

Connect to a remote engine.

Parameters:

remote (KVTransferEngineMetadata) – Metadata for the remote engine.

Return type:

None

inflight_send_transfers

inflight_send_transfers: dict[str, TransferReqData]

Map of transfer names to send transfer request data.

initiate_send_transfer()

initiate_send_transfer(remote_metadata, src_idxs, dst_idxs)

Initiate a transfer from current engine to remote engine for all tensors.

Parameters:

  • remote_metadata (KVTransferEngineMetadata) – Metadata for the remote engine.
  • src_idxs (list[int]) – List of indices of the source pages in the current engine.
  • dst_idxs (list[int]) – List of indices of the destination pages in the remote engine.

Return type:

TransferReqData

is_complete()

is_complete(transfer_req)

Checks if a given send or recv transfer is completed.

Parameters:

transfer_req (TransferReqData) – The transfer request.

Returns:

True if all transfers have completed; false otherwise.

Return type:

bool

memory_type

memory_type: MemoryType

Type of memory being managed (e.g. DRAM).

metadata

property metadata: KVTransferEngineMetadata

Get metadata for the current engine.

Returns:

Metadata for the current engine.

name

name: str

Name of transfer engine / nixl agent.

remote_agent_to_engine

remote_agent_to_engine: dict[str, str]

Map of remote agent names to their engine names.

remote_connections

remote_connections: dict[str, KVTransferEngineMetadata]

Map of remote engine names to their metadata.

sync_and_release()

sync_and_release(transfer_req)

Wait for a transfer to complete and release the transfer after it completes.

Parameters:

transfer_req (TransferReqData)

Return type:

None

tensor_agents

tensor_agents: list[TensorAgent]

List of TensorAgent objects containing all per-tensor data.

total_num_pages

total_num_pages: int

Total number of pages in each tensor.

KVTransferEngineMetadata

class max.kv_cache.paged_cache.transfer_engine.KVTransferEngineMetadata(*, name, total_num_pages, memory_type, agents_meta)

Metadata associated with a transfer engine.

This is safe to send between threads/processes.

Parameters:

agents_meta

agents_meta: list[TensorAgentMetadata]

memory_type

memory_type: MemoryType

name

name: str

total_num_pages

total_num_pages: int

TensorAgent

class max.kv_cache.paged_cache.transfer_engine.TensorAgent(agent, agent_name, tensor, base_addr, ucx_backend, bytes_per_page, device_id, agent_metadata, reg_dlist)

Manages a single tensor and its associated NIXL agent for transfers.

This class holds both the runtime state (live objects) and can generate the serializable metadata for communication between engines.

Parameters:

  • agent (nixl.Agent)
  • agent_name (str)
  • tensor (Tensor)
  • base_addr (int)
  • ucx_backend (int)
  • bytes_per_page (int)
  • device_id (int)
  • agent_metadata (bytes)
  • reg_dlist (nixl.RegistrationDescriptorList)

to_metadata()

to_metadata()

Convert to serializable metadata for communication.

Return type:

TensorAgentMetadata

TensorAgentMetadata

class max.kv_cache.paged_cache.transfer_engine.TensorAgentMetadata(*, metadata, agent_name, bytes_per_page, base_addr, device_id)

Metadata for a single tensor/agent in the transfer engine.

This is used for serialization and communication between engines.

Parameters:

  • metadata (bytes)
  • agent_name (str)
  • bytes_per_page (int)
  • base_addr (int)
  • device_id (int)

agent_name

agent_name: str

base_addr

base_addr: int

bytes_per_page

bytes_per_page: int

device_id

device_id: int

metadata

metadata: bytes

TransferReqData

class max.kv_cache.paged_cache.transfer_engine.TransferReqData(*, dst_name, src_name, transfer_name, transfer_ids, src_idxs, dst_idxs)

Metadata associated with a transfer request.

This is safe to send between threads/processes.

Parameters:

dst_idxs

dst_idxs: list[int]

dst_name

dst_name: str

src_idxs

src_idxs: list[int]

src_name

src_name: str

transfer_ids

transfer_ids: list[int]

transfer_name

transfer_name: str

available_port()

max.kv_cache.paged_cache.transfer_engine.available_port(start_port=8000, end_port=9000, max_attempts=100)

Find an available TCP port in the given range.

Parameters:

  • start_port (int) – The lower bound of the port range (inclusive).
  • end_port (int) – The upper bound of the port range (inclusive).
  • max_attempts (int) – Maximum number of attempts to find a free port.

Returns:

An available port number.

Return type:

int

Raises:

RuntimeError – If no available port is found after max_attempts.

Was this page helpful?