Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix]Fix cgroup v2 init #39991

Merged
merged 1 commit into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 45 additions & 22 deletions be/src/agent/cgroup_cpu_ctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ bool CgroupCpuCtl::is_a_valid_cgroup_path(std::string cg_path) {
void CgroupCpuCtl::init_doris_cgroup_path() {
std::string conf_path = config::doris_cgroup_cpu_path;
if (conf_path.empty()) {
LOG(INFO) << "[cgroup_init_path]doris cgroup home path is not specify";
LOG(INFO) << "[cgroup_init_path]doris cgroup home path is not specify, if you not use "
"workload group, you can ignore this log.";
return;
}

Expand Down Expand Up @@ -107,9 +108,50 @@ void CgroupCpuCtl::init_doris_cgroup_path() {
: "cgroup query path is not valid";
_cpu_core_num = CpuInfo::num_cores();

std::string init_cg_v2_msg = "";
if (_is_enable_cgroup_v2_in_env && _is_cgroup_query_path_valid) {
Status ret = init_cgroup_v2_query_path_public_file(_doris_cgroup_cpu_path,
_doris_cgroup_cpu_query_path);
if (!ret.ok()) {
init_cg_v2_msg = " write cgroup v2 file failed, err=" + ret.to_string_no_stack() + ". ";
} else {
init_cg_v2_msg = "write cgroup v2 public file succ.";
}
}

LOG(INFO) << "[cgroup_init_path]init cgroup home path finish, home path="
<< _doris_cgroup_cpu_path << ", query path=" << _doris_cgroup_cpu_query_path << ", "
<< cg_msg << ", " << query_path_msg << ", core_num=" << _cpu_core_num;
<< cg_msg << ", " << query_path_msg << ", core_num=" << _cpu_core_num << ". "
<< init_cg_v2_msg;
}

Status CgroupCpuCtl::init_cgroup_v2_query_path_public_file(std::string home_path,
std::string query_path) {
// 1 enable cpu controller for home path's child
_doris_cgroup_cpu_path_subtree_ctl_file = home_path + "cgroup.subtree_control";
if (access(_doris_cgroup_cpu_path_subtree_ctl_file.c_str(), F_OK) != 0) {
return Status::InternalError<false>("not find cgroup v2 doris home's subtree control file");
}
RETURN_IF_ERROR(CgroupCpuCtl::write_cg_sys_file(_doris_cgroup_cpu_path_subtree_ctl_file, "+cpu",
"set cpu controller", false));

// 2 enable cpu controller for query path's child
_cgroup_v2_query_path_subtree_ctl_file = query_path + "/cgroup.subtree_control";
if (access(_cgroup_v2_query_path_subtree_ctl_file.c_str(), F_OK) != 0) {
return Status::InternalError<false>("not find cgroup v2 query path's subtree control file");
}
RETURN_IF_ERROR(CgroupCpuCtl::write_cg_sys_file(_cgroup_v2_query_path_subtree_ctl_file, "+cpu",
"set cpu controller", false));

// 3 write cgroup.procs
_doris_cg_v2_procs_file = query_path + "/cgroup.procs";
if (access(_doris_cg_v2_procs_file.c_str(), F_OK) != 0) {
return Status::InternalError<false>("not find cgroup v2 cgroup.procs file");
}
RETURN_IF_ERROR(CgroupCpuCtl::write_cg_sys_file(_doris_cg_v2_procs_file,
std::to_string(getpid()),
"set pid to cg v2 procs file", false));
return Status::OK();
}

uint64_t CgroupCpuCtl::cpu_soft_limit_default_value() {
Expand Down Expand Up @@ -258,7 +300,7 @@ Status CgroupV1CpuCtl::init() {
int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU);
if (ret != 0) {
LOG(ERROR) << "cgroup v1 mkdir workload group failed, path=" << _cgroup_v1_cpu_tg_path;
return Status::InternalError<false>("cgroup v1 mkdir workload group failed, path=",
return Status::InternalError<false>("cgroup v1 mkdir workload group failed, path={}",
_cgroup_v1_cpu_tg_path);
}
}
Expand Down Expand Up @@ -313,21 +355,6 @@ Status CgroupV2CpuCtl::init() {
return Status::InternalError<false>("find an invalid wg_id {}", _wg_id);
}

// enable cpu controller for home path's child
_doris_cgroup_cpu_path_subtree_ctl_file = _doris_cgroup_cpu_path + "cgroup.subtree_control";
if (access(_doris_cgroup_cpu_path_subtree_ctl_file.c_str(), F_OK) != 0) {
return Status::InternalError<false>("not find cgroup v2 doris home's subtree control file");
}
RETURN_IF_ERROR(enable_cpu_controller(_doris_cgroup_cpu_path_subtree_ctl_file));

// enable cpu controller for query path's child
_cgroup_v2_query_path_subtree_ctl_file =
_doris_cgroup_cpu_query_path + "/cgroup.subtree_control";
if (access(_cgroup_v2_query_path_subtree_ctl_file.c_str(), F_OK) != 0) {
return Status::InternalError<false>("not find cgroup v2 query path's subtree control file");
}
RETURN_IF_ERROR(enable_cpu_controller(_cgroup_v2_query_path_subtree_ctl_file));

// wg path
_cgroup_v2_query_wg_path = _doris_cgroup_cpu_query_path + "/" + std::to_string(_wg_id);
if (access(_cgroup_v2_query_wg_path.c_str(), F_OK) != 0) {
Expand Down Expand Up @@ -392,8 +419,4 @@ Status CgroupV2CpuCtl::add_thread_to_cgroup() {
return CgroupCpuCtl::add_thread_to_cgroup(_cgroup_v2_query_wg_thread_file);
}

Status CgroupV2CpuCtl::enable_cpu_controller(std::string file) {
return CgroupCpuCtl::write_cg_sys_file(file, "+cpu", "set cpu controller", false);
}

} // namespace doris
30 changes: 18 additions & 12 deletions be/src/agent/cgroup_cpu_ctl.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,18 @@ class CgroupCpuCtl {
static uint64_t cpu_soft_limit_default_value();

protected:
Status write_cg_sys_file(std::string file_path, std::string value, std::string msg,
bool is_append);

virtual Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) = 0;

virtual Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) = 0;

Status add_thread_to_cgroup(std::string task_file);

static Status write_cg_sys_file(std::string file_path, std::string value, std::string msg,
bool is_append);

static Status init_cgroup_v2_query_path_public_file(std::string home_path,
std::string query_path);

protected:
inline static uint64_t _cpu_core_num;
const static uint64_t _cpu_cfs_period_us = 100000;
Expand All @@ -77,6 +80,11 @@ class CgroupCpuCtl {
inline static bool _is_enable_cgroup_v2_in_env = false;
inline static bool _is_cgroup_query_path_valid = false;

// cgroup v2 public file
inline static std::string _doris_cgroup_cpu_path_subtree_ctl_file = "";
inline static std::string _cgroup_v2_query_path_subtree_ctl_file = "";
inline static std::string _doris_cg_v2_procs_file = "";

protected:
int _cpu_hard_limit = 0;
std::shared_mutex _lock_mutex;
Expand Down Expand Up @@ -143,16 +151,19 @@ class CgroupV1CpuCtl : public CgroupCpuCtl {
5 query path subtree_control file:
/sys/fs/cgroup/{doris_home}/query/cgroup.subtree_control

6 workload group path:
6 query path procs file:
/sys/fs/cgroup/{doris_home}/query/cgroup.procs

7 workload group path:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}

7 workload grou cpu.max file:
8 workload grou cpu.max file:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cpu.max

8 workload grou cpu.weight file:
9 workload grou cpu.weight file:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cpu.weight

9 workload group cgroup type file:
10 workload group cgroup type file:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cgroup.type

*/
Expand All @@ -165,11 +176,6 @@ class CgroupV2CpuCtl : public CgroupCpuCtl {
Status add_thread_to_cgroup() override;

private:
Status enable_cpu_controller(std::string file);

private:
std::string _doris_cgroup_cpu_path_subtree_ctl_file;
std::string _cgroup_v2_query_path_subtree_ctl_file;
std::string _cgroup_v2_query_wg_path;
std::string _cgroup_v2_query_wg_cpu_max_file;
std::string _cgroup_v2_query_wg_cpu_weight_file;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
_cgroup_cpu_ctl = std::move(cgroup_cpu_ctl);
LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << tg_id;
} else {
LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id= " << tg_id
LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id=" << tg_id
<< ", reason=" << ret.to_string();
}
} else {
Expand Down
Loading