Skip to content

Commit 8fe4ba2

Browse files
wangbodataroaring
authored andcommitted
[Improvement]Support cgroup v2 for workload group (#39374)
## Proposed changes Support cgroup v2 for wokrload group.
1 parent 1ac05ad commit 8fe4ba2

File tree

8 files changed

+359
-149
lines changed

8 files changed

+359
-149
lines changed

be/src/agent/cgroup_cpu_ctl.cpp

+255-99
Large diffs are not rendered by default.

be/src/agent/cgroup_cpu_ctl.h

+76-11
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@ namespace doris {
3030

3131
// cgroup cpu.cfs_quota_us default value, it means disable cpu hard limit
3232
const static int CGROUP_CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
33+
const static std::string CGROUP_V2_CPU_HARD_LIMIT_DEFAULT_VALUE = "max 100000";
3334

3435
class CgroupCpuCtl {
3536
public:
3637
virtual ~CgroupCpuCtl() = default;
37-
CgroupCpuCtl() = default;
3838
CgroupCpuCtl(uint64_t wg_id) { _wg_id = wg_id; }
3939

40-
virtual Status init();
40+
virtual Status init() = 0;
4141

4242
virtual Status add_thread_to_cgroup() = 0;
4343

@@ -48,18 +48,36 @@ class CgroupCpuCtl {
4848
// for log
4949
void get_cgroup_cpu_info(uint64_t* cpu_shares, int* cpu_hard_limit);
5050

51-
virtual Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids) = 0;
51+
static void init_doris_cgroup_path();
52+
53+
static Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids);
54+
55+
static std::unique_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id);
56+
57+
static bool is_a_valid_cgroup_path(std::string cg_path);
58+
59+
static uint64_t cpu_soft_limit_default_value();
5260

5361
protected:
54-
Status write_cg_sys_file(std::string file_path, int value, std::string msg, bool is_append);
62+
Status write_cg_sys_file(std::string file_path, std::string value, std::string msg,
63+
bool is_append);
5564

5665
virtual Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) = 0;
5766

5867
virtual Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) = 0;
5968

60-
std::string _doris_cgroup_cpu_path;
61-
uint64_t _cpu_core_num = CpuInfo::num_cores();
62-
uint64_t _cpu_cfs_period_us = 100000;
69+
Status add_thread_to_cgroup(std::string task_file);
70+
71+
protected:
72+
inline static uint64_t _cpu_core_num;
73+
const static uint64_t _cpu_cfs_period_us = 100000;
74+
inline static std::string _doris_cgroup_cpu_path = "";
75+
inline static std::string _doris_cgroup_cpu_query_path = "";
76+
inline static bool _is_enable_cgroup_v1_in_env = false;
77+
inline static bool _is_enable_cgroup_v2_in_env = false;
78+
inline static bool _is_cgroup_query_path_valid = false;
79+
80+
protected:
6381
int _cpu_hard_limit = 0;
6482
std::shared_mutex _lock_mutex;
6583
bool _init_succ = false;
@@ -96,20 +114,67 @@ class CgroupCpuCtl {
96114
class CgroupV1CpuCtl : public CgroupCpuCtl {
97115
public:
98116
CgroupV1CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {}
99-
CgroupV1CpuCtl() = default;
100117
Status init() override;
101118
Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
102119
Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) override;
103120
Status add_thread_to_cgroup() override;
104121

105-
Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids) override;
106-
107122
private:
108-
std::string _cgroup_v1_cpu_query_path;
109123
std::string _cgroup_v1_cpu_tg_path; // workload group path
110124
std::string _cgroup_v1_cpu_tg_quota_file;
111125
std::string _cgroup_v1_cpu_tg_shares_file;
112126
std::string _cgroup_v1_cpu_tg_task_file;
113127
};
114128

129+
/*
130+
NOTE: cgroup v2 directory structure
131+
1 root path:
132+
/sys/fs/cgroup
133+
134+
2 doris home path:
135+
/sys/fs/cgroup/{doris_home}/
136+
137+
3 doris home subtree_control file:
138+
/sys/fs/cgroup/{doris_home}/cgroup.subtree_control
139+
140+
4 query path:
141+
/sys/fs/cgroup/{doris_home}/query/
142+
143+
5 query path subtree_control file:
144+
/sys/fs/cgroup/{doris_home}/query/cgroup.subtree_control
145+
146+
6 workload group path:
147+
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}
148+
149+
7 workload grou cpu.max file:
150+
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cpu.max
151+
152+
8 workload grou cpu.weight file:
153+
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cpu.weight
154+
155+
9 workload group cgroup type file:
156+
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cgroup.type
157+
158+
*/
159+
class CgroupV2CpuCtl : public CgroupCpuCtl {
160+
public:
161+
CgroupV2CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {}
162+
Status init() override;
163+
Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
164+
Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) override;
165+
Status add_thread_to_cgroup() override;
166+
167+
private:
168+
Status enable_cpu_controller(std::string file);
169+
170+
private:
171+
std::string _doris_cgroup_cpu_path_subtree_ctl_file;
172+
std::string _cgroup_v2_query_path_subtree_ctl_file;
173+
std::string _cgroup_v2_query_wg_path;
174+
std::string _cgroup_v2_query_wg_cpu_max_file;
175+
std::string _cgroup_v2_query_wg_cpu_weight_file;
176+
std::string _cgroup_v2_query_wg_thread_file;
177+
std::string _cgroup_v2_query_wg_type_file;
178+
};
179+
115180
} // namespace doris

be/src/runtime/exec_env_init.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
274274
// NOTE: runtime query statistics mgr could be visited by query and daemon thread
275275
// so it should be created before all query begin and deleted after all query and daemon thread stoppped
276276
_runtime_query_statistics_mgr = new RuntimeQueryStatisticsMgr();
277+
CgroupCpuCtl::init_doris_cgroup_path();
277278
_file_cache_factory = new io::FileCacheFactory();
278279
std::vector<doris::CachePath> cache_paths;
279280
init_file_cache_factory(cache_paths);

be/src/runtime/workload_group/workload_group.cpp

+14-11
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,9 @@
4343

4444
namespace doris {
4545

46-
const static uint64_t CPU_SHARE_DEFAULT_VALUE = 1024;
4746
const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%";
4847
const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true;
4948
const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
50-
const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024;
5149
const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
5250
const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
5351

@@ -310,7 +308,7 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
310308
}
311309

312310
// 4 cpu_share
313-
uint64_t cpu_share = CPU_SHARE_DEFAULT_VALUE;
311+
uint64_t cpu_share = CgroupCpuCtl::cpu_soft_limit_default_value();
314312
if (tworkload_group_info.__isset.cpu_share) {
315313
cpu_share = tworkload_group_info.cpu_share;
316314
}
@@ -415,14 +413,18 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
415413

416414
std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
417415
if (config::doris_cgroup_cpu_path != "" && _cgroup_cpu_ctl == nullptr) {
418-
std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl = std::make_unique<CgroupV1CpuCtl>(tg_id);
419-
Status ret = cgroup_cpu_ctl->init();
420-
if (ret.ok()) {
421-
_cgroup_cpu_ctl = std::move(cgroup_cpu_ctl);
422-
LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << tg_id;
416+
std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl = CgroupCpuCtl::create_cgroup_cpu_ctl(tg_id);
417+
if (cgroup_cpu_ctl) {
418+
Status ret = cgroup_cpu_ctl->init();
419+
if (ret.ok()) {
420+
_cgroup_cpu_ctl = std::move(cgroup_cpu_ctl);
421+
LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << tg_id;
422+
} else {
423+
LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id= " << tg_id
424+
<< ", reason=" << ret.to_string();
425+
}
423426
} else {
424-
LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id= " << tg_id
425-
<< ", reason=" << ret.to_string();
427+
LOG(INFO) << "[upsert wg thread pool] create cgroup cpu ctl for " << tg_id << " failed";
426428
}
427429
}
428430

@@ -521,7 +523,8 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
521523
if (enable_cpu_hard_limit) {
522524
if (cpu_hard_limit > 0) {
523525
_cgroup_cpu_ctl->update_cpu_hard_limit(cpu_hard_limit);
524-
_cgroup_cpu_ctl->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
526+
_cgroup_cpu_ctl->update_cpu_soft_limit(
527+
CgroupCpuCtl::cpu_soft_limit_default_value());
525528
} else {
526529
LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is illegal: "
527530
<< cpu_hard_limit << ", gid=" << tg_id;

be/src/runtime/workload_group/workload_group_manager.cpp

+7-20
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
9292
}
9393
// wg is shutdown and running rum = 0, its resource can be released in BE
9494
if (workload_group_ptr->can_be_dropped()) {
95-
LOG(INFO) << "[topic_publish_wg]There is no query in wg" << wg_id << ", delete it.";
95+
LOG(INFO) << "[topic_publish_wg]There is no query in wg " << wg_id
96+
<< ", delete it.";
9697
deleted_task_groups.push_back(workload_group_ptr);
9798
}
9899
}
@@ -121,30 +122,16 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
121122
// Using cgdelete has no such issue.
122123
{
123124
if (config::doris_cgroup_cpu_path != "") {
124-
std::lock_guard<std::shared_mutex> write_lock(_init_cg_ctl_lock);
125-
if (!_cg_cpu_ctl) {
126-
_cg_cpu_ctl = std::make_unique<CgroupV1CpuCtl>();
127-
}
128-
if (!_is_init_succ) {
129-
Status ret = _cg_cpu_ctl->init();
130-
if (ret.ok()) {
131-
_is_init_succ = true;
132-
} else {
133-
LOG(INFO) << "[topic_publish_wg]init workload group mgr cpu ctl failed, "
134-
<< ret.to_string();
135-
}
136-
}
137-
if (_is_init_succ) {
138-
Status ret = _cg_cpu_ctl->delete_unused_cgroup_path(used_wg_id);
139-
if (!ret.ok()) {
140-
LOG(WARNING) << "[topic_publish_wg]" << ret.to_string();
141-
}
125+
std::lock_guard<std::shared_mutex> write_lock(_clear_cgroup_lock);
126+
Status ret = CgroupCpuCtl::delete_unused_cgroup_path(used_wg_id);
127+
if (!ret.ok()) {
128+
LOG(WARNING) << "[topic_publish_wg]" << ret.to_string();
142129
}
143130
}
144131
}
145132
int64_t time_cost_ms = MonotonicMillis() - begin_time;
146133
LOG(INFO) << "[topic_publish_wg]finish clear unused workload group, time cost: " << time_cost_ms
147-
<< "ms, deleted group size:" << deleted_task_groups.size()
134+
<< " ms, deleted group size:" << deleted_task_groups.size()
148135
<< ", before wg size=" << old_wg_size << ", after wg size=" << new_wg_size;
149136
}
150137

be/src/runtime/workload_group/workload_group_manager.h

+1-3
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,7 @@ class WorkloadGroupMgr {
6666
std::shared_mutex _group_mutex;
6767
std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups;
6868

69-
std::shared_mutex _init_cg_ctl_lock;
70-
std::unique_ptr<CgroupCpuCtl> _cg_cpu_ctl;
71-
bool _is_init_succ = false;
69+
std::shared_mutex _clear_cgroup_lock;
7270
};
7371

7472
} // namespace doris

fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ public void getProcNodeData(BaseProcResult result, QueryQueue qq) {
479479
row.add(val + "%");
480480
}
481481
} else if (CPU_SHARE.equals(key) && !properties.containsKey(key)) {
482-
row.add("1024");
482+
row.add("-1");
483483
} else if (MEMORY_LIMIT.equals(key) && !properties.containsKey(key)) {
484484
row.add("0%");
485485
} else if (ENABLE_MEMORY_OVERCOMMIT.equals(key) && !properties.containsKey(key)) {

regression-test/data/workload_manager_p0/test_curd_wlg.out

+4-4
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,16 @@ normal 20 50% true 2147483647 0 0 1% 16
5454
test_group 10 11% false 100 0 0 20% -1
5555

5656
-- !show_spill_1 --
57-
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 10% 10%
57+
spill_group_test -1 0% true 2147483647 0 0 -1 -1 10% 10%
5858

5959
-- !show_spill_1 --
60-
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 -1 10%
60+
spill_group_test -1 0% true 2147483647 0 0 -1 -1 -1 10%
6161

6262
-- !show_spill_2 --
63-
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 5% 10%
63+
spill_group_test -1 0% true 2147483647 0 0 -1 -1 5% 10%
6464

6565
-- !show_spill_3 --
66-
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 5% 40%
66+
spill_group_test -1 0% true 2147483647 0 0 -1 -1 5% 40%
6767

6868
-- !show_wg_tag --
6969
tag1_mem_wg1 50% -1 mem_tag1

0 commit comments

Comments
 (0)