Skip to content

Commit 5028bad

Browse files
authored
[fix](inverted index) warm up inverted index (#38986)
1 parent dac1555 commit 5028bad

File tree

4 files changed

+107
-0
lines changed

4 files changed

+107
-0
lines changed

be/src/cloud/cloud_tablet.cpp

+32
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,38 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
246246
},
247247
.download_done {},
248248
});
249+
250+
auto download_idx_file = [&](const io::Path& idx_path) {
251+
io::DownloadFileMeta meta {
252+
.path = idx_path,
253+
.file_size = -1,
254+
.file_system = storage_resource.value()->fs,
255+
.ctx =
256+
{
257+
.expiration_time = expiration_time,
258+
},
259+
.download_done {},
260+
};
261+
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
262+
};
263+
auto schema_ptr = rowset_meta->tablet_schema();
264+
auto idx_version = schema_ptr->get_inverted_index_storage_format();
265+
if (idx_version == InvertedIndexStorageFormatPB::V1) {
266+
for (const auto& index : schema_ptr->indexes()) {
267+
if (index.index_type() == IndexType::INVERTED) {
268+
auto idx_path = storage_resource.value()->remote_idx_v1_path(
269+
*rowset_meta, seg_id, index.index_id(),
270+
index.get_index_suffix());
271+
download_idx_file(idx_path);
272+
}
273+
}
274+
} else if (idx_version == InvertedIndexStorageFormatPB::V2) {
275+
if (schema_ptr->has_inverted_index()) {
276+
auto idx_path = storage_resource.value()->remote_idx_v2_path(
277+
*rowset_meta, seg_id);
278+
download_idx_file(idx_path);
279+
}
280+
}
249281
}
250282
#endif
251283
}

be/src/cloud/cloud_warm_up_manager.cpp

+39
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,45 @@ void CloudWarmUpManager::handle_jobs() {
114114
wait->signal();
115115
},
116116
});
117+
118+
auto download_idx_file = [&](const io::Path& idx_path) {
119+
io::DownloadFileMeta meta {
120+
.path = idx_path,
121+
.file_size = -1,
122+
.file_system = storage_resource.value()->fs,
123+
.ctx =
124+
{
125+
.expiration_time = expiration_time,
126+
},
127+
.download_done =
128+
[wait](Status st) {
129+
if (!st) {
130+
LOG_WARNING("Warm up error ").error(st);
131+
}
132+
wait->signal();
133+
},
134+
};
135+
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
136+
};
137+
auto schema_ptr = rs->tablet_schema();
138+
auto idx_version = schema_ptr->get_inverted_index_storage_format();
139+
if (idx_version == InvertedIndexStorageFormatPB::V1) {
140+
for (const auto& index : schema_ptr->indexes()) {
141+
if (index.index_type() == IndexType::INVERTED) {
142+
wait->add_count();
143+
auto idx_path = storage_resource.value()->remote_idx_v1_path(
144+
*rs, seg_id, index.index_id(), index.get_index_suffix());
145+
download_idx_file(idx_path);
146+
}
147+
}
148+
} else if (idx_version == InvertedIndexStorageFormatPB::V2) {
149+
if (schema_ptr->has_inverted_index()) {
150+
wait->add_count();
151+
auto idx_path =
152+
storage_resource.value()->remote_idx_v2_path(*rs, seg_id);
153+
download_idx_file(idx_path);
154+
}
155+
}
117156
}
118157
}
119158
timespec time;

be/src/olap/storage_policy.cpp

+31
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,37 @@ std::string StorageResource::remote_segment_path(const RowsetMeta& rowset, int64
191191
}
192192
}
193193

194+
std::string StorageResource::remote_idx_v1_path(const RowsetMeta& rowset, int64_t seg_id,
195+
int64_t index_id,
196+
std::string_view index_path_suffix) const {
197+
std::string suffix =
198+
index_path_suffix.empty() ? "" : std::string {"@"} + index_path_suffix.data();
199+
switch (path_version) {
200+
case 0:
201+
return fmt::format("{}/{}/{}_{}_{}{}.idx", DATA_PREFIX, rowset.tablet_id(),
202+
rowset.rowset_id().to_string(), seg_id, index_id, suffix);
203+
case 1:
204+
return fmt::format("{}/{}/{}/{}/{}_{}{}.idx", DATA_PREFIX, shard_fn(rowset.tablet_id()),
205+
rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id, index_id,
206+
suffix);
207+
default:
208+
exit_at_unknown_path_version(fs->id(), path_version);
209+
}
210+
}
211+
212+
std::string StorageResource::remote_idx_v2_path(const RowsetMeta& rowset, int64_t seg_id) const {
213+
switch (path_version) {
214+
case 0:
215+
return fmt::format("{}/{}/{}_{}.idx", DATA_PREFIX, rowset.tablet_id(),
216+
rowset.rowset_id().to_string(), seg_id);
217+
case 1:
218+
return fmt::format("{}/{}/{}/{}/{}.idx", DATA_PREFIX, shard_fn(rowset.tablet_id()),
219+
rowset.tablet_id(), rowset.rowset_id().to_string(), seg_id);
220+
default:
221+
exit_at_unknown_path_version(fs->id(), path_version);
222+
}
223+
}
224+
194225
std::string StorageResource::remote_tablet_path(int64_t tablet_id) const {
195226
switch (path_version) {
196227
case 0:

be/src/olap/storage_policy.h

+5
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ struct StorageResource {
7777
int64_t seg_id) const;
7878
std::string remote_segment_path(const RowsetMeta& rowset, int64_t seg_id) const;
7979
std::string remote_tablet_path(int64_t tablet_id) const;
80+
81+
std::string remote_idx_v1_path(const RowsetMeta& rowset, int64_t seg_id, int64_t index_id,
82+
std::string_view index_suffix) const;
83+
std::string remote_idx_v2_path(const RowsetMeta& rowset, int64_t seg_id) const;
84+
8085
std::string cooldown_tablet_meta_path(int64_t tablet_id, int64_t replica_id,
8186
int64_t cooldown_term) const;
8287
};

0 commit comments

Comments
 (0)