Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
MT GPU fetch, l0 async
Browse files Browse the repository at this point in the history
  • Loading branch information
akroviakov committed Nov 13, 2023
1 parent 7a25adf commit 3bc2d3e
Show file tree
Hide file tree
Showing 22 changed files with 385 additions and 126 deletions.
2 changes: 1 addition & 1 deletion omniscidb/BufferProvider/BufferProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class BufferProvider {
const int8_t* host_ptr,
const size_t num_bytes,
const int device_id) const = 0;
virtual void synchronizeStream(const int device_id) const = 0;
virtual void synchronizeDeviceDataStream(const int device_id) const = 0;
virtual void copyFromDevice(int8_t* host_ptr,
const int8_t* device_ptr,
const size_t num_bytes,
Expand Down
13 changes: 12 additions & 1 deletion omniscidb/CudaMgr/CudaMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ void CudaMgr::copyHostToDevice(int8_t* device_ptr,
cuMemcpyHtoD(reinterpret_cast<CUdeviceptr>(device_ptr), host_ptr, num_bytes));
}

void CudaMgr::copyHostToDeviceAsyncIfPossible(int8_t* device_ptr,
const int8_t* host_ptr,
const size_t num_bytes,
const int device_num) {
if constexpr (async_data_load_available) {
copyHostToDeviceAsync(device_ptr, host_ptr, num_bytes, device_num);
} else {
copyHostToDevice(device_ptr, host_ptr, num_bytes, device_num);
}
}

void CudaMgr::copyHostToDeviceAsync(int8_t* device_ptr,
const int8_t* host_ptr,
const size_t num_bytes,
Expand All @@ -120,7 +131,7 @@ void CudaMgr::copyHostToDeviceAsync(int8_t* device_ptr,
checkError(cuMemcpyHtoDAsync(
reinterpret_cast<CUdeviceptr>(device_ptr), host_ptr, num_bytes, stream_));
}
void CudaMgr::synchronizeStream(const int device_num) {
void CudaMgr::synchronizeDeviceDataStream(const int device_num) {
setContext(device_num);
checkError(cuStreamSynchronize(stream_));
}
Expand Down
9 changes: 7 additions & 2 deletions omniscidb/CudaMgr/CudaMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,17 @@ class CudaMgr : public GpuMgr {
const size_t num_bytes,
const int device_num) override;

void copyHostToDeviceAsyncIfPossible(int8_t* device_ptr,
const int8_t* host_ptr,
const size_t num_bytes,
const int device_num) override;

void copyHostToDeviceAsync(int8_t* device_ptr,
const int8_t* host_ptr,
const size_t num_bytes,
const int device_num) override;

void synchronizeStream(const int device_num) override;
void synchronizeDeviceDataStream(const int device_num) override;

void copyDeviceToHost(int8_t* host_ptr,
const int8_t* device_ptr,
Expand Down Expand Up @@ -289,7 +294,7 @@ class CudaMgr : public GpuMgr {
omnisci::DeviceGroup device_group_;
std::vector<CUcontext> device_contexts_;
mutable std::mutex device_cleanup_mutex_;
static constexpr bool async_data_load_available{true};
static constexpr bool async_data_load_available{false};
};

} // Namespace CudaMgr_Namespace
Expand Down
9 changes: 8 additions & 1 deletion omniscidb/CudaMgr/CudaMgrNoCuda.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,14 @@ void CudaMgr::copyHostToDeviceAsync(int8_t* device_ptr,
CHECK(false);
}

void CudaMgr::synchronizeStream(const int device_num) {
void CudaMgr::copyHostToDeviceAsyncIfPossible(int8_t* device_ptr,
const int8_t* host_ptr,
const size_t num_bytes,
const int device_num) {
CHECK(false);
}

void CudaMgr::synchronizeDeviceDataStream(const int device_num) {
CHECK(false);
}

Expand Down
1 change: 1 addition & 0 deletions omniscidb/DataMgr/Allocators/DeviceAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,5 @@ class DeviceAllocator : public Allocator {
virtual void setDeviceMem(int8_t* device_ptr,
unsigned char uc,
const size_t num_bytes) const = 0;
virtual void sync() = 0;
};
4 changes: 4 additions & 0 deletions omniscidb/DataMgr/Allocators/GpuAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,7 @@ void GpuAllocator::setDeviceMem(int8_t* device_ptr,
const size_t num_bytes) const {
buffer_provider_->setDeviceMem(device_ptr, uc, num_bytes, device_id_);
}

void GpuAllocator::sync() {
buffer_provider_->synchronizeDeviceDataStream(device_id_);
}
1 change: 1 addition & 0 deletions omniscidb/DataMgr/Allocators/GpuAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class GpuAllocator : public DeviceAllocator {
void setDeviceMem(int8_t* device_ptr,
unsigned char uc,
const size_t num_bytes) const override;
void sync() override;

private:
std::vector<Data_Namespace::AbstractBuffer*> owned_buffers_;
Expand Down
3 changes: 2 additions & 1 deletion omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ void CpuBuffer::readData(int8_t* const dst,
memcpy(dst, mem_ + offset, num_bytes);
} else if (dst_memory_level == GPU_LEVEL) {
CHECK_GE(dst_device_id, 0);
gpu_mgr_->copyHostToDevice(dst, mem_ + offset, num_bytes, dst_device_id);
gpu_mgr_->copyHostToDeviceAsyncIfPossible(
dst, mem_ + offset, num_bytes, dst_device_id);
} else {
LOG(FATAL) << "Unsupported buffer type";
}
Expand Down
10 changes: 3 additions & 7 deletions omniscidb/DataMgr/DataMgrBufferProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,14 @@ void DataMgrBufferProvider::copyToDeviceAsyncIfPossible(int8_t* device_ptr,
CHECK(data_mgr_);
const auto gpu_mgr = data_mgr_->getGpuMgr();
CHECK(gpu_mgr);
if (gpu_mgr->canLoadAsync()) {
gpu_mgr->copyHostToDeviceAsync(device_ptr, host_ptr, num_bytes, device_id);
} else {
gpu_mgr->copyHostToDevice(device_ptr, host_ptr, num_bytes, device_id);
}
gpu_mgr->copyHostToDeviceAsyncIfPossible(device_ptr, host_ptr, num_bytes, device_id);
}

void DataMgrBufferProvider::synchronizeStream(const int device_num) const {
void DataMgrBufferProvider::synchronizeDeviceDataStream(const int device_num) const {
CHECK(data_mgr_);
const auto gpu_mgr = data_mgr_->getGpuMgr();
CHECK(gpu_mgr);
gpu_mgr->synchronizeStream(device_num);
gpu_mgr->synchronizeDeviceDataStream(device_num);
}

void DataMgrBufferProvider::copyFromDevice(int8_t* host_ptr,
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/DataMgrBufferProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class DataMgrBufferProvider : public BufferProvider {
const int8_t* host_ptr,
const size_t num_bytes,
const int device_id) const override;
void synchronizeStream(const int device_id) const override;
void synchronizeDeviceDataStream(const int device_id) const override;
void copyFromDevice(int8_t* host_ptr,
const int8_t* device_ptr,
const size_t num_bytes,
Expand Down
7 changes: 6 additions & 1 deletion omniscidb/DataMgr/GpuMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ struct GpuMgr {
const size_t num_bytes,
const int device_num) = 0;

virtual void synchronizeStream(const int device_num) = 0;
virtual void copyHostToDeviceAsyncIfPossible(int8_t* device_ptr,
const int8_t* host_ptr,
const size_t num_bytes,
const int device_num) = 0;

virtual void synchronizeDeviceDataStream(const int device_num) = 0;

virtual void copyDeviceToHost(int8_t* host_ptr,
const int8_t* device_ptr,
Expand Down
116 changes: 114 additions & 2 deletions omniscidb/L0Mgr/L0Mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,87 @@ void* allocate_device_mem(const size_t num_bytes, L0Device& device) {
return mem;
}

L0Device::L0Device(const L0Driver& driver, ze_device_handle_t device)
L0DataFetcher::L0DataFetcher(const L0Driver& driver, ze_device_handle_t device)
: device_(device), driver_(driver) {
ze_command_queue_desc_t command_queue_fetch_desc = {
ZE_STRUCTURE_TYPE_COMMAND_QUEUE_DESC,
nullptr,
0,
0,
0,
ZE_COMMAND_QUEUE_MODE_ASYNCHRONOUS,
ZE_COMMAND_QUEUE_PRIORITY_NORMAL};
L0_SAFE_CALL(zeCommandQueueCreate(
driver.ctx(), device_, &command_queue_fetch_desc, &queue_handle_));
current_cl_bytes = {{}, 0};
L0_SAFE_CALL(
zeCommandListCreate(driver.ctx(), device_, &cl_desc, &current_cl_bytes.first));
}

L0DataFetcher::~L0DataFetcher() {
zeCommandQueueDestroy(queue_handle_);
zeCommandListDestroy(current_cl_bytes.first);
for (auto& dead_handle : graveyard) {
zeCommandListDestroy(dead_handle);
}
for (auto& cl_handle : recycled) {
zeCommandListDestroy(cl_handle);
}
}

void L0DataFetcher::recycleGraveyard() {
while (recycled.size() < GRAVEYARD_LIMIT && graveyard.size()) {
recycled.push_back(graveyard.front());
graveyard.pop_front();
L0_SAFE_CALL(zeCommandListReset(recycled.back()));
}
for (auto& dead_handle : graveyard) {
L0_SAFE_CALL(zeCommandListDestroy(recycled.back()));
}
graveyard.clear();
}

void L0DataFetcher::appendCopyCommand(void* dst,
const void* src,
const size_t num_bytes) {
std::unique_lock<std::mutex> cl_lock(current_cl_lock);
L0_SAFE_CALL(zeCommandListAppendMemoryCopy(
current_cl_bytes.first, dst, src, num_bytes, nullptr, 0, nullptr));
current_cl_bytes.second += num_bytes;
if (current_cl_bytes.second >= 128 * 1024 * 1024) {
ze_command_list_handle_t cl_h_copy = current_cl_bytes.first;
graveyard.push_back(current_cl_bytes.first);
current_cl_bytes = {{}, 0};
if (recycled.size()) {
current_cl_bytes.first = recycled.front();
recycled.pop_front();
} else {
L0_SAFE_CALL(
zeCommandListCreate(driver_.ctx(), device_, &cl_desc, &current_cl_bytes.first));
}
cl_lock.unlock();
L0_SAFE_CALL(zeCommandListClose(cl_h_copy));
L0_SAFE_CALL(
zeCommandQueueExecuteCommandLists(queue_handle_, 1, &cl_h_copy, nullptr));
}
}

void L0DataFetcher::sync() {
if (current_cl_bytes.second) {
L0_SAFE_CALL(zeCommandListClose(current_cl_bytes.first));
L0_SAFE_CALL(zeCommandQueueExecuteCommandLists(
queue_handle_, 1, &current_cl_bytes.first, nullptr));
}
L0_SAFE_CALL(
zeCommandQueueSynchronize(queue_handle_, std::numeric_limits<uint32_t>::max()));
L0_SAFE_CALL(zeCommandListReset(current_cl_bytes.first));
if (graveyard.size() > GRAVEYARD_LIMIT) {
recycleGraveyard();
}
}

L0Device::L0Device(const L0Driver& driver, ze_device_handle_t device)
: device_(device), driver_(driver), data_fetcher(driver, device) {
ze_command_queue_handle_t queue_handle;
ze_command_queue_desc_t command_queue_desc = {ZE_STRUCTURE_TYPE_COMMAND_QUEUE_DESC,
nullptr,
Expand Down Expand Up @@ -219,7 +298,6 @@ std::shared_ptr<L0Module> L0Device::create_module(uint8_t* code,
};
ze_module_handle_t handle;
ze_module_build_log_handle_t buildlog = nullptr;

auto status = zeModuleCreate(ctx(), device_, &desc, &handle, &buildlog);
if (log) {
size_t logSize = 0;
Expand Down Expand Up @@ -329,6 +407,40 @@ void L0Manager::copyHostToDevice(int8_t* device_ptr,
cl->submit(*queue);
}

void L0Manager::copyHostToDeviceAsync(int8_t* device_ptr,
const int8_t* host_ptr,
const size_t num_bytes,
const int device_num) {
if (!num_bytes)
return;
CHECK(host_ptr);
CHECK(device_ptr);
CHECK_GT(num_bytes, 0);
CHECK_GE(device_num, 0);
CHECK_LT(device_num, drivers_[0]->devices().size());

auto& device = drivers()[0]->devices()[device_num];
device->data_fetcher.appendCopyCommand(device_ptr, host_ptr, num_bytes);
}

void L0Manager::copyHostToDeviceAsyncIfPossible(int8_t* device_ptr,
const int8_t* host_ptr,
const size_t num_bytes,
const int device_num) {
if constexpr (async_data_load_available) {
copyHostToDeviceAsync(device_ptr, host_ptr, num_bytes, device_num);
} else {
copyHostToDevice(device_ptr, host_ptr, num_bytes, device_num);
}
}

void L0Manager::synchronizeDeviceDataStream(const int device_num) {
CHECK_GE(device_num, 0);
CHECK_LT(device_num, drivers_[0]->devices().size());
auto& device = drivers()[0]->devices()[device_num];
device->data_fetcher.sync();
}

void L0Manager::copyDeviceToHost(int8_t* host_ptr,
const int8_t* device_ptr,
const size_t num_bytes,
Expand Down
51 changes: 42 additions & 9 deletions omniscidb/L0Mgr/L0Mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
#pragma once

#include <iostream>
#include <list>
#include <memory>
#include <mutex>
#include <vector>

#include "DataMgr/GpuMgr.h"
Expand Down Expand Up @@ -56,6 +58,35 @@ class L0Kernel;
class L0CommandList;
class L0CommandQueue;

class L0DataFetcher {
#ifdef HAVE_L0
static constexpr uint16_t GRAVEYARD_LIMIT{500};
ze_device_handle_t device_;
ze_command_queue_handle_t queue_handle_;
std::pair<ze_command_list_handle_t, uint64_t> current_cl_bytes;
std::list<ze_command_list_handle_t> graveyard;
std::list<ze_command_list_handle_t> recycled;
ze_command_list_desc_t cl_desc = {ZE_STRUCTURE_TYPE_COMMAND_LIST_DESC,
nullptr,
0,
ZE_COMMAND_LIST_FLAG_MAXIMIZE_THROUGHPUT};
std::mutex current_cl_lock;
#endif
const L0Driver& driver_;
void recycleGraveyard();

public:
void appendCopyCommand(void* dst, const void* src, const size_t num_bytes);
void sync();

#ifdef HAVE_L0
L0DataFetcher(const L0Driver& driver, ze_device_handle_t device);
~L0DataFetcher();
#else
L0DataFetcher() = default;
#endif
};

class L0Device {
private:
#ifdef HAVE_L0
Expand All @@ -68,6 +99,7 @@ class L0Device {
std::shared_ptr<L0CommandQueue> command_queue_;

public:
L0DataFetcher data_fetcher;
std::shared_ptr<L0CommandQueue> command_queue() const;
std::unique_ptr<L0CommandList> create_command_list() const;

Expand Down Expand Up @@ -169,7 +201,6 @@ class L0CommandList {

public:
void copy(void* dst, const void* src, const size_t num_bytes);

template <typename... Args>
void launch(L0Kernel& kernel, const GroupCount& gc, Args&&... args) {
#ifdef HAVE_L0
Expand Down Expand Up @@ -202,13 +233,15 @@ class L0Manager : public GpuMgr {
void copyHostToDeviceAsync(int8_t* device_ptr,
const int8_t* host_ptr,
const size_t num_bytes,
const int device_num) override {
CHECK(false);
}
void synchronizeStream(const int device_num) override {
LOG(WARNING)
<< "L0 has no async data transfer enabled, synchronizeStream() has no effect";
}
const int device_num) override;

void copyHostToDeviceAsyncIfPossible(int8_t* device_ptr,
const int8_t* host_ptr,
const size_t num_bytes,
const int device_num) override;

void synchronizeDeviceDataStream(const int device_num) override;

void copyHostToDevice(int8_t* device_ptr,
const int8_t* host_ptr,
const size_t num_bytes,
Expand Down Expand Up @@ -261,7 +294,7 @@ class L0Manager : public GpuMgr {

private:
std::vector<std::shared_ptr<L0Driver>> drivers_;
static constexpr bool async_data_load_available{false};
static constexpr bool async_data_load_available{true};
};

} // namespace l0
Loading

0 comments on commit 3bc2d3e

Please sign in to comment.