self.config = config
if config.kv_transfer_config is None:
raise ValueError("KVTransferConfig is not set in the VllmConfig,"
" cannot initialize KVConnector.")
assert self.config.kv_transfer_config.is_kv_transfer_instance, "KV"\
"TransferAgent should only be used when kv_connector is set."
# 工厂类的作用
self.connector = KVConnectorFactory.create_connector_v0(
rank, local_rank, config)connector基类的定义参考:vllm/vllm/distributed/kv_transfer/kv_connector/base.py,基类函数主要是两个:发送函数(P调用)、接收函数(D调用)。其操作逻辑如下所示。
POP操作:
原始 buffer: [10, 20, 30, 40, 50]
input_tokens: tensor([1, 2, 3, 4, 5])
roi: tensor([False, True, False, True, False])
buffer.pop(input_tokens, roi)
处理后的 buffer: [10, 30, 50]Pipe模块
pipe用来进行具体的数据传递,对外接口:
class KVPipeBase(ABC):
"""
This class provides an interface for sending and receiving tensors, or
None, by distributed communications.
"""
@abstractmethod
def send_tensor(self, tensor: Optional[torch.Tensor]) -> None:
"""Send a tensor, or None, via the pipe.
"""
raise NotImplementedError
@abstractmethod
def recv_tensor(self) -> Optional[torch.Tensor]:
"""Receive a tensor (can be None) from the pipeline.
"""
raise NotImplementedError
@abstractmethod
def close(self) -> None:
"""Close the pipeline and release resources.
"""
raise NotImplementedError以PyNcclPipe为例,用于在分布式环境中实现张量及其元数据的发送和接收功能,里面的一些关键步骤:
函数在scheduler的schedule()操作中调用,会改变num_computed_tokens的计算,创建worker交互的meta数据。但base类的操作不改变scheduler_output,也不改变队列执行顺序(扩展类可改变队列执行逻辑)。
# vllm/vllm/v1/core/sched/scheduler.py
# scheduler中的函数调用位置:
# KVConnector: update internal state after allocation.
# This information is used to determine if a load is
# needed for this request.
if self.connector is not None:
self.connector.update_state_after_alloc(
request,
new_computed_blocks + new_blocks,
num_external_computed_tokens,
)
# ...
# 计算远端已有kv数量,
# Get externally-cached tokens if using a KVConnector.
num_external_computed_tokens, load_kv_async = (
(0, False) if self.connector is None else
self.connector.get_num_new_matched_tokens(
request, num_native_computed_tokens))
# Total computed tokens (local + external).
num_computed_tokens = (num_native_computed_tokens +
num_external_computed_tokens +
num_prealloc_computed_tokens)
# ...
# NOTE(Kuntai): this function is designed for multiple purposes:
# 1. Plan the KV cache store
# 2. Wrap up all the KV cache load / save ops into an opaque object
# 3. Clear the internal states of the connector
if self.connector is not None:
meta = self.connector.build_connector_meta(scheduler_output)
scheduler_output.kv_connector_metadata = metaWorker connector
worker connector主要接口是生产者端调用存储KV接口,消费者端调用的加载KV接口;两种角色与之配套的都有一个异步等待操作,支持按层传递,可异步执行。
bind_connector_metadata:绑定元数据;
start_load_kv:开始加载,消费端调用;
wait_for_layer_load:阻塞直到指定层加载结束,消费端调用;
save_kv_layer:开始保存层,生产端调用;
wait_for_save:阻塞直到所有保存完成,生产端调用;
#vllm/vllm/attention/layer.py
# worker connecotr 使用位置(attention部分):
# ...
# 加载逻辑使用
def wait_for_kv_layer_from_connector(layer_name: str):
if not has_kv_transfer_group() or not is_v1_kv_transfer_group():
return
connector = get_kv_transfer_group()
forward_context: ForwardContext = get_forward_context()
attn_metadata = forward_context.attn_metadata
if attn_metadata is None:
return
assert isinstance(attn_metadata, dict)
connector.wait_for_layer_load(layer_name)
# ...
# 保存逻辑使用
def maybe_save_kv_layer_to_connector(
layer_name: str,
kv_cache_layer: List[torch.Tensor],
):
if not has_kv_transfer_group() or not is_v1_kv_transfer_group():
return
connector = get_kv_transfer_group()
forward_context: ForwardContext = get_forward_context()
attn_metadata = forward_context.attn_metadata
if attn_metadata is None:
return
assert isinstance(attn_metadata, dict)
connector.save_kv_layer(layer_name, kv_cache_layer,
attn_metadata[layer_name])
worker connector的接口start/wait由runner调用(ModelRunner模块),而attention调用存/读KV cache接口。相关PR:KV Connector API V1,大致的时序如下:
def _connector_finished(
self, request: Request) -> tuple[bool, Optional[KVTransferParams]]:
"""Invoke the KV connector request_finished() method if applicable."""
if self.connector is None:
return False, None
block_ids = self.kv_cache_manager.get_block_ids(request.request_id)
return self.connector.request_finished(request, block_ids)
def _update_waiting_for_remote_kv(self, request: Request) -> bool:
"""
P/D: check if the request_id is finished_recving.
The finished_recving_kv_req_ids list is populated
on the previous steps()'s update_from_output based
on the worker side connector.
When the kv transfer is ready, we cache the blocks
and the request state will be moved back to WAITING from
WAITING_FOR_REMOTE_KV.
"""
if request.request_id not in self.finished_recving_kv_req_ids:
return False
# Now that the blocks are ready, actually cache them.
block_ids = self.kv_cache_manager.get_block_ids(request.request_id)
num_computed_tokens = len(block_ids) * self.block_size
if num_computed_tokens == request.num_tokens:
num_computed_tokens -= 1
self.kv_cache_manager.single_type_manager.cache_blocks(
request,
self.kv_cache_manager.req_to_block_hashes[request.request_id],
num_computed_tokens,
)
# Update the request state for scheduling.
request.num_computed_tokens = num_computed_tokens
# Return that we are ready.
self.finished_recving_kv_req_ids.remove(request.request_id)
return True
def _update_from_kv_xfer_finished(self,
model_runner_output: ModelRunnerOutput):
"""
P/D: update the scheduler state based on the output.
The Worker side connectors add finished_recving and
finished_sending reqs to the output.
* if finished_sending: free the blocks
# if finished_recving: add to state so we can
scheduler the request during the next step.
"""
# P/D: update recv and send status from last step.
for req_id in (model_runner_output.finished_recving or ()):
logger.debug("Finished recving KV transfer for request %s", req_id)
self.finished_recving_kv_req_ids.add(req_id)
for req_id in (model_runner_output.finished_sending or ()):
logger.debug("Finished sending KV transfer for request %s", req_id)
self._free_blocks(self.requests[req_id])能否让scheduler中的connector也更轻量,将更多工作交给worker完成?值得关注 3.3 整体的可靠性设计