Skip to content

Commit d878919

Browse files
committed
[Feature](Cloud) Support session variable disable_file_cache in query.
1 parent 9ce5707 commit d878919

File tree

9 files changed

+72
-19
lines changed

9 files changed

+72
-19
lines changed

be/src/exec/rowid_fetcher.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,7 @@ Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request,
381381
<< ", row_size:" << row_size;
382382
*response->add_row_locs() = row_loc;
383383
});
384+
// TODO: supoort session variable enable_page_cache and disable_file_cache if necessary.
384385
SegmentCacheHandle segment_cache;
385386
RETURN_IF_ERROR(scope_timer_run(
386387
[&]() {

be/src/olap/parallel_scanner_builder.cpp

+8-1
Original file line numberDiff line numberDiff line change
@@ -179,13 +179,20 @@ Status ParallelScannerBuilder::_load() {
179179
RETURN_IF_ERROR(tablet->capture_consistent_rowsets_unlocked({0, version}, &rowsets));
180180
}
181181

182+
bool enable_segment_cache = _state->query_options().__isset.enable_segment_cache
183+
? _state->query_options().enable_segment_cache
184+
: true;
185+
bool disable_file_cache = _state->query_options().__isset.disable_file_cache
186+
? _state->query_options().disable_file_cache
187+
: false;
182188
for (auto& rowset : rowsets) {
183189
RETURN_IF_ERROR(rowset->load());
184190
const auto rowset_id = rowset->rowset_id();
185191
auto& segment_cache_handle = _segment_cache_handles[rowset_id];
186192

187193
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
188-
std::dynamic_pointer_cast<BetaRowset>(rowset), &segment_cache_handle, true));
194+
std::dynamic_pointer_cast<BetaRowset>(rowset), &segment_cache_handle,
195+
enable_segment_cache, false, disable_file_cache));
189196
_total_rows += rowset->num_rows();
190197
}
191198
}

be/src/olap/rowset/beta_rowset.cpp

+12-7
Original file line numberDiff line numberDiff line change
@@ -146,23 +146,26 @@ Status BetaRowset::get_segments_size(std::vector<size_t>* segments_size) {
146146
return Status::OK();
147147
}
148148

149-
Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments) {
150-
return load_segments(0, num_segments(), segments);
149+
Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
150+
bool disable_file_cache) {
151+
return load_segments(0, num_segments(), segments, disable_file_cache);
151152
}
152153

153154
Status BetaRowset::load_segments(int64_t seg_id_begin, int64_t seg_id_end,
154-
std::vector<segment_v2::SegmentSharedPtr>* segments) {
155+
std::vector<segment_v2::SegmentSharedPtr>* segments,
156+
bool disable_file_cache) {
155157
int64_t seg_id = seg_id_begin;
156158
while (seg_id < seg_id_end) {
157159
std::shared_ptr<segment_v2::Segment> segment;
158-
RETURN_IF_ERROR(load_segment(seg_id, &segment));
160+
RETURN_IF_ERROR(load_segment(seg_id, &segment, disable_file_cache));
159161
segments->push_back(std::move(segment));
160162
seg_id++;
161163
}
162164
return Status::OK();
163165
}
164166

165-
Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment) {
167+
Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment,
168+
bool disable_file_cache) {
166169
auto fs = _rowset_meta->fs();
167170
if (!fs) {
168171
return Status::Error<INIT_FAILED>("get fs failed");
@@ -171,11 +174,13 @@ Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* se
171174
DCHECK(seg_id >= 0);
172175
auto seg_path = DORIS_TRY(segment_path(seg_id));
173176
io::FileReaderOptions reader_options {
174-
.cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE
175-
: io::FileCachePolicy::NO_CACHE,
177+
.cache_type = !disable_file_cache && config::enable_file_cache
178+
? io::FileCachePolicy::FILE_BLOCK_CACHE
179+
: io::FileCachePolicy::NO_CACHE,
176180
.is_doris_table = true,
177181
.file_size = _rowset_meta->segment_file_size(seg_id),
178182
};
183+
179184
auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _schema, reader_options,
180185
segment);
181186
if (!s.ok()) {

be/src/olap/rowset/beta_rowset.h

+6-3
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,15 @@ class BetaRowset final : public Rowset {
7171

7272
Status check_file_exist() override;
7373

74-
Status load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments);
74+
Status load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
75+
bool disable_file_cache = false);
7576

7677
Status load_segments(int64_t seg_id_begin, int64_t seg_id_end,
77-
std::vector<segment_v2::SegmentSharedPtr>* segments);
78+
std::vector<segment_v2::SegmentSharedPtr>* segments,
79+
bool disable_file_cache = false);
7880

79-
Status load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment);
81+
Status load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment,
82+
bool disable_file_cache = false);
8083

8184
Status get_segments_size(std::vector<size_t>* segments_size);
8285

be/src/olap/rowset/beta_rowset_reader.cpp

+17-3
Original file line numberDiff line numberDiff line change
@@ -249,10 +249,24 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
249249
}
250250

251251
// load segments
252-
bool should_use_cache = use_cache || _read_context->reader_type == ReaderType::READER_QUERY;
252+
bool disable_file_cache = false;
253+
bool enable_segment_cache = true;
254+
auto* state = read_context->runtime_state;
255+
if (state != nullptr) {
256+
disable_file_cache = state->query_options().__isset.disable_file_cache
257+
? state->query_options().disable_file_cache
258+
: false;
259+
enable_segment_cache = state->query_options().__isset.enable_segment_cache
260+
? state->query_options().enable_segment_cache
261+
: true;
262+
}
263+
// When reader type is for query, session variable `enable_segment_cache` should be respected.
264+
bool should_use_cache = use_cache || (_read_context->reader_type == ReaderType::READER_QUERY &&
265+
enable_segment_cache);
253266
SegmentCacheHandle segment_cache_handle;
254-
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(_rowset, &segment_cache_handle,
255-
should_use_cache));
267+
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
268+
_rowset, &segment_cache_handle, should_use_cache,
269+
/*need_load_pk_index_and_bf*/ false, disable_file_cache));
256270

257271
// create iterator for each segment
258272
auto& segments = segment_cache_handle.get_segments();

be/src/olap/segment_loader.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ void SegmentCache::erase(const SegmentCache::CacheKey& key) {
5252

5353
Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
5454
SegmentCacheHandle* cache_handle, bool use_cache,
55-
bool need_load_pk_index_and_bf) {
55+
bool need_load_pk_index_and_bf, bool disable_file_cache) {
5656
if (cache_handle->is_inited()) {
5757
return Status::OK();
5858
}
@@ -62,7 +62,7 @@ Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
6262
continue;
6363
}
6464
segment_v2::SegmentSharedPtr segment;
65-
RETURN_IF_ERROR(rowset->load_segment(i, &segment));
65+
RETURN_IF_ERROR(rowset->load_segment(i, &segment, disable_file_cache));
6666
if (need_load_pk_index_and_bf) {
6767
RETURN_IF_ERROR(segment->load_pk_index_and_bf());
6868
}

be/src/olap/segment_loader.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ class SegmentLoader {
118118
// Load segments of "rowset", return the "cache_handle" which contains segments.
119119
// If use_cache is true, it will be loaded from _cache.
120120
Status load_segments(const BetaRowsetSharedPtr& rowset, SegmentCacheHandle* cache_handle,
121-
bool use_cache = false, bool need_load_pk_index_and_bf = false);
121+
bool use_cache = false, bool need_load_pk_index_and_bf = false,
122+
bool disable_file_cache = false);
122123

123124
void erase_segment(const SegmentCache::CacheKey& key);
124125

fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java

+17
Original file line numberDiff line numberDiff line change
@@ -1882,6 +1882,8 @@ public void setIgnoreRuntimeFilterIds(String ignoreRuntimeFilterIds) {
18821882

18831883
public static final String IGNORE_SHAPE_NODE = "ignore_shape_nodes";
18841884

1885+
public static final String ENABLE_SEGMENT_CACHE = "enable_segment_cache";
1886+
18851887
public Set<String> getIgnoreShapePlanNodes() {
18861888
return Arrays.stream(ignoreShapePlanNodes.split(",[\\s]*")).collect(ImmutableSet.toImmutableSet());
18871889
}
@@ -2002,6 +2004,11 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) {
20022004
})
20032005
public boolean useMaxLengthOfVarcharInCtas = true;
20042006

2007+
// Whether enable segment cache. Segment cache only works when FE's query options sets enableSegmentCache true
2008+
// along with BE's config `disable_segment_cache` false
2009+
@VariableMgr.VarAttr(name = ENABLE_SEGMENT_CACHE, needForward = true)
2010+
public boolean enableSegmentCache = true;
2011+
20052012
public boolean isEnableJoinSpill() {
20062013
return enableJoinSpill;
20072014
}
@@ -3425,6 +3432,14 @@ public void setLoadStreamPerNode(int loadStreamPerNode) {
34253432
this.loadStreamPerNode = loadStreamPerNode;
34263433
}
34273434

3435+
public void setEnableSegmentCache(boolean value) {
3436+
this.enableSegmentCache = value;
3437+
}
3438+
3439+
public boolean isEnableSegmentCache() {
3440+
return this.enableSegmentCache;
3441+
}
3442+
34283443
/**
34293444
* Serialize to thrift object.
34303445
* Used for rest api.
@@ -3558,6 +3573,8 @@ public TQueryOptions toThrift() {
35583573
tResult.setEnableShortCircuitQueryAccessColumnStore(enableShortCircuitQueryAcessColumnStore);
35593574
tResult.setReadCsvEmptyLineAsNull(readCsvEmptyLineAsNull);
35603575
tResult.setSerdeDialect(getSerdeDialect());
3576+
3577+
tResult.setEnableSegmentCache(enableSegmentCache);
35613578
return tResult;
35623579
}
35633580

gensrc/thrift/PaloInternalService.thrift

+7-2
Original file line numberDiff line numberDiff line change
@@ -308,15 +308,20 @@ struct TQueryOptions {
308308
113: optional bool enable_local_merge_sort = false;
309309

310310
114: optional bool enable_parallel_result_sink = false;
311-
311+
312312
115: optional bool enable_short_circuit_query_access_column_store = false;
313313

314314
116: optional bool enable_no_need_read_data_opt = true;
315315

316316
117: optional bool read_csv_empty_line_as_null = false;
317317

318318
118: optional TSerdeDialect serde_dialect = TSerdeDialect.DORIS;
319-
// For cloud, to control if the content would be written into file cache
319+
320+
119: optional bool enable_segment_cache = true
321+
322+
// For cloud.
323+
// In write path, to control if the content would be written into file cache.
324+
// In read path, read from file cache or remote storage when execute query.
320325
1000: optional bool disable_file_cache = false
321326
}
322327

0 commit comments

Comments
 (0)