let kv_cache_event = KvCacheEvent {
event_id: 1,
data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None,
blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(0),
tokens_hash: LocalBlockHash(13226331709069118873),
}],
}),
};
7. KvRouter 订阅 KV_EVENT_SUBJECT接受缓存事件,反序列化 RouterEvent并转发给 KvIndexer 处理,更新 RadixTree
let event: RouterEvent = match serde_json::from_slice(&event.payload) {
Ok(event) => {
tracing::debug!("received kv event: {:?}", event);
event
}
Err(e) => {
tracing::warn!("Failed to deserialize RouterEvent: {:?}", e);
// Choosing warn and continue to process other events from other workers
// A bad event likely signals a problem with a worker, but potentially other workers are still healthy
continue;
}
};
if let Err(e) = kv_events_tx.send(event).await {
tracing::trace!("failed to send kv event to indexer; shutting down: {:?}", e);
}
}
});
8. 后续请求会按照上述流程,计算 hash 之后查找匹配的 Worker ID,执行上述流程循环。整体的架构图如下。
let model_details = KvModelDetailsBuilder::default()
.number_of_layers(number_of_layers)
.number_of_heads(number_of_heads)
.head_size(head_size)
.dtype(DType::F32) // Use F32 for easier validation
.build()?;
let block_details = KvBlockDetailsBuilder::default()
.layout(layout.clone())
.block_size(block_size)
.tp_size(1)
.tp_rank(0)
.model_details(model_details)
.build()?;
// Create the storage blocks
let h_blocks = KvBlockStorage::allocate(
number_of_cpu_blocks,
block_details.clone(),
StorageType::Pinned,
)?;
let d_blocks = KvBlockStorage::allocate(
number_of_gpu_blocks,
block_details.clone(),
StorageType::Device(device.clone()),
)?;
2. 这里可以实现卸载部分指定 KVCache block 到 CPU 上,实现灵活的数据搬运策略。copy_blocks_to是一个同步阻塞操作,复杂简单场景的数据搬运
// 从 GPU 复制特定块到 CPU
let gpu_layer = gpu_blocks.layer(0)?; // 获取GPU存储的第0层 (read only)
let mut cpu_layer = cpu_blocks.layer_mut(0)?; // 获取CPU存储的第0层(read or write)
let mut copy_stream = CopyStream::new(number_of_layers, number_of_gpu_blocks).unwrap();
// block list 0..64 as i32
let mut block_list: Vec<i32> = (0..number_of_gpu_blocks).map(|x| x as i32).collect();
block_list.shuffle(&mut rng);
let src_block_ids = block_list.clone();
block_list.shuffle(&mut rng);
let dst_block_ids = block_list.clone();
// Select the appropriate block map based on direction
if is_h2d {
copy_stream.prepare_block_map(h2d_block_map).unwrap();
} else {
copy_stream.prepare_block_map(d2h_block_map).unwrap();
}
let seq = tokens.into_sequence(self.block_size);
let (blocks, tail_block) = seq.into_parts();
log::debug!(
"request translates to {} blocks; remaining tokens: {}",
blocks.len(),
tail_block.tokens().len()
);
// first match blocks to inflight blocks
let mut inflight_blocks = self.inflight_blocks.match_token_blocks(&blocks)?;
log::debug!("matched {} inflight blocks", inflight_blocks.len());
2. 对于没有匹配到的 block,尝试从 available_block 中去匹配,将匹配到的 block 注册为inflight_blocks
let unmatched_blocks = &blocks[inflight_blocks.len()..];
let unmatched_hashes = unmatched_blocks
.iter()
.map(|b| b.sequence_hash())
.collect::<Vec<_>>();
// match the remaining blocks to freed gpu blocks (available_blocks)
let unregistered_blocks = self.available_blocks.match_blocks(unmatched_hashes).await?;
log::debug!("matched {} freed blocks", unregistered_blocks.len());
// the blocks from the freed blocks pool must be registered as inflight blocks
// todo - we might have to register the list of unregistered blocks as a single transaction
for block in unregistered_blocks {
inflight_blocks.push(self.inflight_blocks.register(block)?);
}
3. 对于上述操作中没有匹配的Blocks,需要分配新的储存空间,从可用池中获取足够数量的块,将未匹配的 tokens 和可用Blocks 关联。
let mut blocks_to_reuse = self
.available_blocks
.take_blocks(remaining_blocks.len() as u32 + 1)
.await?;
// update the blocks_to_reuse with the token block from remaining_blocks
let complete_prefill_blocks: Vec<UniqueBlock> = remaining_blocks
.into_iter()
.map(|b| {
let mut block = blocks_to_reuse.pop().unwrap();
block.update_token_block(b);
block
})
.collect();
assert_eq!(blocks_to_reuse.len(), 1);
let tail_kv_block = blocks_to_reuse.pop().unwrap(); KV Storage
// Initialize CUDA
let context = CudaContext::new(0).unwrap();
let stream = context.default_stream();
// Create a host tensor with f32 elements (6 elements)
let pinned_storage = OwnedStorage::create_pinned_array(6 * 4).unwrap();
// Create a host tensor view
let shape = [2, 3];
let mut host_view = TensorView::<_, 2>::new(&pinned_storage, shape, 4).unwrap();
2. 重新定义一个包含 6 个 f32 值的数组,按照行优先的顺序填入 Tensor View。并在 GPU 上创建一个内存块,以及Tensor View。然后将数据从 CPU 传输到 GPU
// Set some values
let values = [1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0];
for i in 0..2 {
for j in 0..3 {
host_view
.set_element::<f32>(&[i, j], values[i * 3 + j])
.unwrap();
}
}
// Create a device tensor
let device_storage = OwnedStorage::create_device_array(6 * 4, context.clone()).unwrap();
let mut device_view = TensorView::<_, 2>::new(&device_storage, shape, 4).unwrap();
// Copy from host to device using h2d method
host_view.h2d(&mut device_view, &stream).unwrap();
3. 创建另一个固定内存块和 Tensor View,用于接收从 GPU 返回的数据流。这里分离输入和输出的内存,防止堵塞竞争。
// Create another host tensor for receiving data back
let pinned_storage2 = OwnedStorage::create_pinned_array(6 * 4).unwrap();
let mut host_view2 = TensorView::<_, 2>::new(&pinned_storage2, shape, 4).unwrap();
// Copy from device to host using d2h method
device_view.d2h(&mut host_view2, &stream).unwrap();
stream.synchronize().unwrap();4. 检查确保整个传输过程保持数据的完整性
// Verify the data was correctly transferred
for i in 0..2 {
for j in 0..3 {
assert_eq!(
host_view2.get_element::<f32>(&[i, j]).unwrap(),
values[i * 3 + j]
);
}
}
原文地址:https://zhuanlan.zhihu.com/p/1911201145034110047