Python module
transfer_engine
KVCache Transfer Engine
KVTransferEngine
class max.kv_cache.paged_cache.transfer_engine.KVTransferEngine(name, tensors, *, total_num_pages, listen_port=None)
KVCache Transfer Engine.
Currently this is only tested on CPU and supports single or multiple Paged tensors.
The TransferEngine communicates with other TransferEngines in other threads or processes. However, individual TransferEngines themselves are not thread-safe. It is intended to be used by MAX’s single-threaded scheduler.
-
Parameters:
cleanup()
cleanup()
Release all resources associated with the transfer engine.
Should be called before the transfer engine is garbage collected. Moving this logic into the __del__ destructor does causes a UCX error for unknown reasons.
-
Return type:
-
None
cleanup_transfer()
cleanup_transfer(transfer_req)
Cleanup a transfer. This should be called after a transfer is complete.
-
Parameters:
-
transfer_req (TransferReqData) – The transfer request to cleanup.
-
Return type:
-
None
completed_recv_transfers
Map of agent names to completed recv transfers.
connect()
connect(remote)
Connect to a remote engine.
-
Parameters:
-
remote (KVTransferEngineMetadata) – Metadata for the remote engine.
-
Return type:
-
None
inflight_send_transfers
inflight_send_transfers: dict[str, TransferReqData]
Map of transfer names to send transfer request data.
initiate_send_transfer()
initiate_send_transfer(remote_metadata, src_idxs, dst_idxs)
Initiate a transfer from current engine to remote engine for all tensors.
-
Parameters:
-
- remote_metadata (KVTransferEngineMetadata) – Metadata for the remote engine.
- src_idxs (list[int]) – List of indices of the source pages in the current engine.
- dst_idxs (list[int]) – List of indices of the destination pages in the remote engine.
-
Return type:
is_complete()
is_complete(transfer_req)
Checks if a given send or recv transfer is completed.
WARNING
This method is prone to infinite loops. For the transfer to progress, the remote engine MUST call wait_recv_complete. As such, the following code will hang:
transfer_req = engine_1.write_to(...)
while not engine_1.is_complete(transfer_req):
pass
while not engine_2.is_complete(transfer_req):
passInstead do:
transfer_req = engine_1.write_to(...)
while not engine_1.is_complete(transfer_req) or not engine_2.is_complete(transfer_req):
pass-
Parameters:
-
transfer_req (TransferReqData) – The transfer request.
-
Returns:
-
True if all transfers have completed; false otherwise.
-
Return type:
memory_type
memory_type: MemoryType
Type of memory being managed (e.g. DRAM).
metadata
property metadata: KVTransferEngineMetadata
Get metadata for the current engine.
-
Returns:
-
Metadata for the current engine.
name
name: str
Name of transfer engine / nixl agent.
remote_agent_to_engine
Map of remote agent names to their engine names.
remote_connections
remote_connections: dict[str, KVTransferEngineMetadata]
Map of remote engine names to their metadata.
sync_and_release()
sync_and_release(transfer_req)
Wait for a transfer to complete and release the transfer after it completes.
-
Parameters:
-
transfer_req (TransferReqData)
-
Return type:
-
None
tensor_agents
tensor_agents: list[TensorAgent]
List of TensorAgent objects containing all per-tensor data.
total_num_pages
total_num_pages: int
Total number of pages in each tensor.
KVTransferEngineMetadata
class max.kv_cache.paged_cache.transfer_engine.KVTransferEngineMetadata(*, name, total_num_pages, memory_type, agents_meta)
Metadata associated with a transfer engine.
This is safe to send between threads/processes.
-
Parameters:
-
- name (str)
- total_num_pages (int)
- memory_type (MemoryType)
- agents_meta (list[TensorAgentMetadata])
agents_meta
agents_meta: list[TensorAgentMetadata]
memory_type
memory_type: MemoryType
name
name: str
total_num_pages
total_num_pages: int
TensorAgent
class max.kv_cache.paged_cache.transfer_engine.TensorAgent(agent, agent_name, tensor, base_addr, ucx_backend, bytes_per_page, device_id, agent_metadata, reg_dlist)
Manages a single tensor and its associated NIXL agent for transfers.
This class holds both the runtime state (live objects) and can generate the serializable metadata for communication between engines.
-
Parameters:
to_metadata()
to_metadata()
Convert to serializable metadata for communication.
-
Return type:
TensorAgentMetadata
class max.kv_cache.paged_cache.transfer_engine.TensorAgentMetadata(*, metadata, agent_name, bytes_per_page, base_addr, device_id)
Metadata for a single tensor/agent in the transfer engine.
This is used for serialization and communication between engines.
agent_name
agent_name: str
base_addr
base_addr: int
bytes_per_page
bytes_per_page: int
device_id
device_id: int
metadata
metadata: bytes
TransferReqData
class max.kv_cache.paged_cache.transfer_engine.TransferReqData(*, dst_name, src_name, transfer_name, transfer_ids, src_idxs, dst_idxs)
Metadata associated with a transfer request.
This is safe to send between threads/processes.
-
Parameters:
dst_idxs
dst_name
dst_name: str
src_idxs
src_name
src_name: str
transfer_ids
transfer_name
transfer_name: str
available_port()
max.kv_cache.paged_cache.transfer_engine.available_port(start_port=8000, end_port=9000, max_attempts=100)
Find an available TCP port in the given range.
-
Parameters:
-
Returns:
-
An available port number.
-
Return type:
-
Raises:
-
RuntimeError – If no available port is found after max_attempts.
Was this page helpful?
Thank you! We'll create more content like this.
Thank you for helping us improve!