Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

[L0] Asynchronous data fetching #711

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion omniscidb/BufferProvider/BufferProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class BufferProvider {
const int8_t* host_ptr,
const size_t num_bytes,
const int device_id) const = 0;
virtual void synchronizeStream(const int device_id) const = 0;
virtual void synchronizeDeviceDataStream(const int device_id) const = 0;
virtual void copyFromDevice(int8_t* host_ptr,
const int8_t* device_ptr,
const size_t num_bytes,
Expand Down
13 changes: 12 additions & 1 deletion omniscidb/CudaMgr/CudaMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ void CudaMgr::copyHostToDevice(int8_t* device_ptr,
cuMemcpyHtoD(reinterpret_cast<CUdeviceptr>(device_ptr), host_ptr, num_bytes));
}

void CudaMgr::copyHostToDeviceAsyncIfPossible(int8_t* device_ptr,
const int8_t* host_ptr,
const size_t num_bytes,
const int device_num) {
if constexpr (async_data_load_available) {
copyHostToDeviceAsync(device_ptr, host_ptr, num_bytes, device_num);
} else {
copyHostToDevice(device_ptr, host_ptr, num_bytes, device_num);
}
}

void CudaMgr::copyHostToDeviceAsync(int8_t* device_ptr,
const int8_t* host_ptr,
const size_t num_bytes,
Expand All @@ -120,7 +131,7 @@ void CudaMgr::copyHostToDeviceAsync(int8_t* device_ptr,
checkError(cuMemcpyHtoDAsync(
reinterpret_cast<CUdeviceptr>(device_ptr), host_ptr, num_bytes, stream_));
}
void CudaMgr::synchronizeStream(const int device_num) {
void CudaMgr::synchronizeDeviceDataStream(const int device_num) {
setContext(device_num);
checkError(cuStreamSynchronize(stream_));
}
Expand Down
9 changes: 7 additions & 2 deletions omniscidb/CudaMgr/CudaMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,17 @@ class CudaMgr : public GpuMgr {
const size_t num_bytes,
const int device_num) override;

void copyHostToDeviceAsyncIfPossible(int8_t* device_ptr,
const int8_t* host_ptr,
const size_t num_bytes,
const int device_num) override;

void copyHostToDeviceAsync(int8_t* device_ptr,
const int8_t* host_ptr,
const size_t num_bytes,
const int device_num) override;

void synchronizeStream(const int device_num) override;
void synchronizeDeviceDataStream(const int device_num) override;

void copyDeviceToHost(int8_t* host_ptr,
const int8_t* device_ptr,
Expand Down Expand Up @@ -289,7 +294,7 @@ class CudaMgr : public GpuMgr {
omnisci::DeviceGroup device_group_;
std::vector<CUcontext> device_contexts_;
mutable std::mutex device_cleanup_mutex_;
static constexpr bool async_data_load_available{true};
static constexpr bool async_data_load_available{false};
};

} // Namespace CudaMgr_Namespace
Expand Down
9 changes: 8 additions & 1 deletion omniscidb/CudaMgr/CudaMgrNoCuda.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,14 @@ void CudaMgr::copyHostToDeviceAsync(int8_t* device_ptr,
CHECK(false);
}

void CudaMgr::synchronizeStream(const int device_num) {
void CudaMgr::copyHostToDeviceAsyncIfPossible(int8_t* device_ptr,
const int8_t* host_ptr,
const size_t num_bytes,
const int device_num) {
CHECK(false);
}

void CudaMgr::synchronizeDeviceDataStream(const int device_num) {
CHECK(false);
}

Expand Down
1 change: 1 addition & 0 deletions omniscidb/DataMgr/Allocators/DeviceAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,5 @@ class DeviceAllocator : public Allocator {
virtual void setDeviceMem(int8_t* device_ptr,
unsigned char uc,
const size_t num_bytes) const = 0;
virtual void sync() = 0;
};
4 changes: 4 additions & 0 deletions omniscidb/DataMgr/Allocators/GpuAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,7 @@ void GpuAllocator::setDeviceMem(int8_t* device_ptr,
const size_t num_bytes) const {
buffer_provider_->setDeviceMem(device_ptr, uc, num_bytes, device_id_);
}

void GpuAllocator::sync() {
buffer_provider_->synchronizeDeviceDataStream(device_id_);
}
1 change: 1 addition & 0 deletions omniscidb/DataMgr/Allocators/GpuAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class GpuAllocator : public DeviceAllocator {
void setDeviceMem(int8_t* device_ptr,
unsigned char uc,
const size_t num_bytes) const override;
void sync() override;

private:
std::vector<Data_Namespace::AbstractBuffer*> owned_buffers_;
Expand Down
3 changes: 2 additions & 1 deletion omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ void CpuBuffer::readData(int8_t* const dst,
memcpy(dst, mem_ + offset, num_bytes);
} else if (dst_memory_level == GPU_LEVEL) {
CHECK_GE(dst_device_id, 0);
gpu_mgr_->copyHostToDevice(dst, mem_ + offset, num_bytes, dst_device_id);
gpu_mgr_->copyHostToDeviceAsyncIfPossible(
dst, mem_ + offset, num_bytes, dst_device_id);
} else {
LOG(FATAL) << "Unsupported buffer type";
}
Expand Down
10 changes: 3 additions & 7 deletions omniscidb/DataMgr/DataMgrBufferProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,14 @@ void DataMgrBufferProvider::copyToDeviceAsyncIfPossible(int8_t* device_ptr,
CHECK(data_mgr_);
const auto gpu_mgr = data_mgr_->getGpuMgr();
CHECK(gpu_mgr);
if (gpu_mgr->canLoadAsync()) {
gpu_mgr->copyHostToDeviceAsync(device_ptr, host_ptr, num_bytes, device_id);
} else {
gpu_mgr->copyHostToDevice(device_ptr, host_ptr, num_bytes, device_id);
}
gpu_mgr->copyHostToDeviceAsyncIfPossible(device_ptr, host_ptr, num_bytes, device_id);
}

void DataMgrBufferProvider::synchronizeStream(const int device_num) const {
void DataMgrBufferProvider::synchronizeDeviceDataStream(const int device_num) const {
CHECK(data_mgr_);
const auto gpu_mgr = data_mgr_->getGpuMgr();
CHECK(gpu_mgr);
gpu_mgr->synchronizeStream(device_num);
gpu_mgr->synchronizeDeviceDataStream(device_num);
}

void DataMgrBufferProvider::copyFromDevice(int8_t* host_ptr,
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/DataMgrBufferProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class DataMgrBufferProvider : public BufferProvider {
const int8_t* host_ptr,
const size_t num_bytes,
const int device_id) const override;
void synchronizeStream(const int device_id) const override;
void synchronizeDeviceDataStream(const int device_id) const override;
void copyFromDevice(int8_t* host_ptr,
const int8_t* device_ptr,
const size_t num_bytes,
Expand Down
7 changes: 6 additions & 1 deletion omniscidb/DataMgr/GpuMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ struct GpuMgr {
const size_t num_bytes,
const int device_num) = 0;

virtual void synchronizeStream(const int device_num) = 0;
virtual void copyHostToDeviceAsyncIfPossible(int8_t* device_ptr,
const int8_t* host_ptr,
const size_t num_bytes,
const int device_num) = 0;

virtual void synchronizeDeviceDataStream(const int device_num) = 0;

virtual void copyDeviceToHost(int8_t* host_ptr,
const int8_t* device_ptr,
Expand Down
133 changes: 131 additions & 2 deletions omniscidb/L0Mgr/L0Mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,96 @@ void* allocate_device_mem(const size_t num_bytes, L0Device& device) {
return mem;
}

L0Device::L0DataFetcher::L0DataFetcher(L0Device& device) : my_device_(device) {
ze_command_queue_desc_t command_queue_fetch_desc = {
ZE_STRUCTURE_TYPE_COMMAND_QUEUE_DESC,
nullptr,
0,
0,
0,
ZE_COMMAND_QUEUE_MODE_ASYNCHRONOUS,
ZE_COMMAND_QUEUE_PRIORITY_NORMAL};
L0_SAFE_CALL(zeCommandQueueCreate(my_device_.driver_.ctx(),
my_device_.device_,
&command_queue_fetch_desc,
&queue_handle_));
cur_cl_bytes_ = {{}, 0};
L0_SAFE_CALL(zeCommandListCreate(my_device_.driver_.ctx(),
my_device_.device_,
&cl_desc_,
&cur_cl_bytes_.cl_handle_));
}

L0Device::L0DataFetcher::~L0DataFetcher() {
zeCommandQueueDestroy(queue_handle_);
zeCommandListDestroy(cur_cl_bytes_.cl_handle_);
for (auto& dead_handle : graveyard_) {
zeCommandListDestroy(dead_handle);
}
for (auto& cl_handle : recycled_) {
zeCommandListDestroy(cl_handle);
}
}

void L0Device::L0DataFetcher::recycleGraveyard() {
while (recycled_.size() < GRAVEYARD_LIMIT && graveyard_.size()) {
recycled_.push_back(graveyard_.front());
graveyard_.pop_front();
L0_SAFE_CALL(zeCommandListReset(recycled_.back()));
}
for (auto& dead_handle : graveyard_) {
L0_SAFE_CALL(zeCommandListDestroy(dead_handle));
}
graveyard_.clear();
}

void L0Device::L0DataFetcher::setCLRecycledOrNew() {
cur_cl_bytes_ = {{}, 0};
if (recycled_.size()) {
cur_cl_bytes_.cl_handle_ = recycled_.front();
recycled_.pop_front();
} else {
L0_SAFE_CALL(zeCommandListCreate(my_device_.driver_.ctx(),
my_device_.device_,
&cl_desc_,
&cur_cl_bytes_.cl_handle_));
}
}

void L0Device::L0DataFetcher::appendCopyCommand(void* dst,
const void* src,
const size_t num_bytes) {
std::unique_lock<std::mutex> cl_lock(cur_cl_lock_);
L0_SAFE_CALL(zeCommandListAppendMemoryCopy(
cur_cl_bytes_.cl_handle_, dst, src, num_bytes, nullptr, 0, nullptr));
cur_cl_bytes_.bytes_ += num_bytes;
if (cur_cl_bytes_.bytes_ >= CL_BYTES_LIMIT) {
ze_command_list_handle_t cl_h_copy = cur_cl_bytes_.cl_handle_;
graveyard_.push_back(cur_cl_bytes_.cl_handle_);
setCLRecycledOrNew();
cl_lock.unlock();
L0_SAFE_CALL(zeCommandListClose(cl_h_copy));
L0_SAFE_CALL(
zeCommandQueueExecuteCommandLists(queue_handle_, 1, &cl_h_copy, nullptr));
}
}

void L0Device::L0DataFetcher::sync() {
if (cur_cl_bytes_.bytes_) {
L0_SAFE_CALL(zeCommandListClose(cur_cl_bytes_.cl_handle_));
L0_SAFE_CALL(zeCommandQueueExecuteCommandLists(
queue_handle_, 1, &cur_cl_bytes_.cl_handle_, nullptr));
}
L0_SAFE_CALL(
zeCommandQueueSynchronize(queue_handle_, std::numeric_limits<uint32_t>::max()));
L0_SAFE_CALL(zeCommandListReset(cur_cl_bytes_.cl_handle_));
if (graveyard_.size() > GRAVEYARD_LIMIT) {
recycleGraveyard();
}
}

L0Device::L0Device(const L0Driver& driver, ze_device_handle_t device)
: device_(device), driver_(driver) {
: device_(device), driver_(driver), data_fetcher_(*this) {
ze_command_queue_handle_t queue_handle;
ze_command_queue_desc_t command_queue_desc = {ZE_STRUCTURE_TYPE_COMMAND_QUEUE_DESC,
nullptr,
Expand Down Expand Up @@ -192,6 +280,14 @@ unsigned L0Device::maxSharedLocalMemory() const {
return compute_props_.maxSharedLocalMemory;
}

void L0Device::transferToDevice(void* dst, const void* src, const size_t num_bytes) {
data_fetcher_.appendCopyCommand(dst, src, num_bytes);
}

void L0Device::syncDataTransfers() {
data_fetcher_.sync();
}

L0CommandQueue::L0CommandQueue(ze_command_queue_handle_t handle) : handle_(handle) {}

ze_command_queue_handle_t L0CommandQueue::handle() const {
Expand Down Expand Up @@ -219,7 +315,6 @@ std::shared_ptr<L0Module> L0Device::create_module(uint8_t* code,
};
ze_module_handle_t handle;
ze_module_build_log_handle_t buildlog = nullptr;

auto status = zeModuleCreate(ctx(), device_, &desc, &handle, &buildlog);
if (log) {
size_t logSize = 0;
Expand Down Expand Up @@ -329,6 +424,40 @@ void L0Manager::copyHostToDevice(int8_t* device_ptr,
cl->submit(*queue);
}

void L0Manager::copyHostToDeviceAsync(int8_t* device_ptr,
const int8_t* host_ptr,
const size_t num_bytes,
const int device_num) {
if (!num_bytes)
return;
CHECK(host_ptr);
CHECK(device_ptr);
CHECK_GT(num_bytes, 0);
CHECK_GE(device_num, 0);
CHECK_LT(device_num, drivers_[0]->devices().size());

auto& device = drivers()[0]->devices()[device_num];
device->transferToDevice(device_ptr, host_ptr, num_bytes);
}

void L0Manager::copyHostToDeviceAsyncIfPossible(int8_t* device_ptr,
const int8_t* host_ptr,
const size_t num_bytes,
const int device_num) {
if constexpr (async_data_load_available) {
copyHostToDeviceAsync(device_ptr, host_ptr, num_bytes, device_num);
} else {
copyHostToDevice(device_ptr, host_ptr, num_bytes, device_num);
}
}

void L0Manager::synchronizeDeviceDataStream(const int device_num) {
CHECK_GE(device_num, 0);
CHECK_LT(device_num, drivers_[0]->devices().size());
auto& device = drivers()[0]->devices()[device_num];
device->syncDataTransfers();
}

void L0Manager::copyDeviceToHost(int8_t* host_ptr,
const int8_t* device_ptr,
const size_t num_bytes,
Expand Down
Loading