Skip to content

Commit 18c6290

Browse files
committed
[chore](table) Add batch method to get visible version of the olap table
Since get visible version is a heavy operation in the cloud mode, this PR add a batch method, to obtain all visible versions via only one RPC.
1 parent 191371d commit 18c6290

File tree

4 files changed

+132
-50
lines changed

4 files changed

+132
-50
lines changed

cloud/src/meta-service/meta_service.cpp

+7-4
Original file line numberDiff line numberDiff line change
@@ -315,19 +315,22 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
315315
return;
316316
}
317317

318-
size_t num_acquired = request->partition_ids_size();
318+
size_t num_acquired =
319+
is_table_version ? request->table_ids_size() : request->partition_ids_size();
319320
response->mutable_versions()->Reserve(num_acquired);
320321
response->mutable_db_ids()->CopyFrom(request->db_ids());
321322
response->mutable_table_ids()->CopyFrom(request->table_ids());
322-
response->mutable_partition_ids()->CopyFrom(request->partition_ids());
323+
if (!is_table_version) {
324+
response->mutable_partition_ids()->CopyFrom(request->partition_ids());
325+
}
323326

324327
constexpr size_t BATCH_SIZE = 500;
325328
std::vector<std::string> version_keys;
326329
std::vector<std::optional<std::string>> version_values;
327330
version_keys.reserve(BATCH_SIZE);
328331
version_values.reserve(BATCH_SIZE);
329332
while ((code == MetaServiceCode::OK || code == MetaServiceCode::KV_TXN_TOO_OLD) &&
330-
response->versions_size() < response->partition_ids_size()) {
333+
response->versions_size() < num_acquired) {
331334
std::unique_ptr<Transaction> txn;
332335
TxnErrorCode err = txn_kv_->create_txn(&txn);
333336
if (err != TxnErrorCode::TXN_OK) {
@@ -343,11 +346,11 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
343346
for (size_t j = i; j < limit; j++) {
344347
int64_t db_id = request->db_ids(j);
345348
int64_t table_id = request->table_ids(j);
346-
int64_t partition_id = request->partition_ids(j);
347349
std::string ver_key;
348350
if (is_table_version) {
349351
table_version_key({instance_id, db_id, table_id}, &ver_key);
350352
} else {
353+
int64_t partition_id = request->partition_ids(j);
351354
partition_version_key({instance_id, db_id, table_id, partition_id}, &ver_key);
352355
}
353356
version_keys.push_back(std::move(ver_key));

fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java

+87-29
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,16 @@
4747
import org.apache.doris.common.UserException;
4848
import org.apache.doris.common.io.DeepCopy;
4949
import org.apache.doris.common.io.Text;
50-
import org.apache.doris.common.profile.SummaryProfile;
5150
import org.apache.doris.common.util.PropertyAnalyzer;
5251
import org.apache.doris.common.util.Util;
52+
import org.apache.doris.datasource.InternalCatalog;
5353
import org.apache.doris.mtmv.MTMVRelatedTableIf;
5454
import org.apache.doris.mtmv.MTMVSnapshotIf;
5555
import org.apache.doris.mtmv.MTMVVersionSnapshot;
5656
import org.apache.doris.persist.gson.GsonPostProcessable;
5757
import org.apache.doris.persist.gson.GsonUtils;
5858
import org.apache.doris.qe.ConnectContext;
5959
import org.apache.doris.qe.OriginStatement;
60-
import org.apache.doris.qe.StmtExecutor;
6160
import org.apache.doris.resource.Tag;
6261
import org.apache.doris.rpc.RpcException;
6362
import org.apache.doris.statistics.AnalysisInfo;
@@ -2225,7 +2224,6 @@ public int getBaseSchemaVersion() {
22252224
return baseIndexMeta.getSchemaVersion();
22262225
}
22272226

2228-
22292227
public void setEnableSingleReplicaCompaction(boolean enableSingleReplicaCompaction) {
22302228
if (tableProperty == null) {
22312229
tableProperty = new TableProperty(new HashMap<>());
@@ -2849,6 +2847,7 @@ public long getVisibleVersion() {
28492847
if (Config.isNotCloudMode()) {
28502848
return tableAttributes.getVisibleVersion();
28512849
}
2850+
28522851
// get version rpc
28532852
Cloud.GetVersionRequest request = Cloud.GetVersionRequest.newBuilder()
28542853
.setDbId(this.getDatabase().getId())
@@ -2858,7 +2857,7 @@ public long getVisibleVersion() {
28582857
.build();
28592858

28602859
try {
2861-
Cloud.GetVersionResponse resp = getVersionFromMeta(request);
2860+
Cloud.GetVersionResponse resp = VersionHelper.getVersionFromMeta(request);
28622861
long version = -1;
28632862
if (resp.getStatus().getCode() == Cloud.MetaServiceCode.OK) {
28642863
version = resp.getVersion();
@@ -2874,7 +2873,90 @@ public long getVisibleVersion() {
28742873
}
28752874
return version;
28762875
} catch (RpcException e) {
2877-
throw new RuntimeException("get version from meta service failed");
2876+
throw new RuntimeException("get version from meta service failed", e);
2877+
}
2878+
}
2879+
2880+
// Get the table versions in batch.
2881+
public static List<Long> getVisibleVersionByTableIds(Collection<Long> tableIds) {
2882+
List<OlapTable> tables = new ArrayList<>();
2883+
2884+
InternalCatalog catalog = Env.getCurrentEnv().getInternalCatalog();
2885+
for (long tableId : tableIds) {
2886+
Table table = catalog.getTableByTableId(tableId);
2887+
if (table == null) {
2888+
throw new RuntimeException("get table visible version failed, no such table " + tableId + " exists");
2889+
}
2890+
if (table.getType() != TableType.OLAP) {
2891+
throw new RuntimeException(
2892+
"get table visible version failed, table " + tableId + " is not a OLAP table");
2893+
}
2894+
tables.add((OlapTable) table);
2895+
}
2896+
2897+
return getVisibleVersionInBatch(tables);
2898+
}
2899+
2900+
// Get the table versions in batch.
2901+
public static List<Long> getVisibleVersionInBatch(Collection<OlapTable> tables) {
2902+
if (tables.isEmpty()) {
2903+
return new ArrayList<>();
2904+
}
2905+
2906+
if (Config.isNotCloudMode()) {
2907+
return tables.stream()
2908+
.map(table -> table.tableAttributes.getVisibleVersion())
2909+
.collect(Collectors.toList());
2910+
}
2911+
2912+
List<Long> dbIds = new ArrayList<>();
2913+
List<Long> tableIds = new ArrayList<>();
2914+
for (OlapTable table : tables) {
2915+
dbIds.add(table.getDatabase().getId());
2916+
tableIds.add(table.getId());
2917+
}
2918+
2919+
return getVisibleVersionFromMeta(dbIds, tableIds);
2920+
}
2921+
2922+
private static List<Long> getVisibleVersionFromMeta(List<Long> dbIds, List<Long> tableIds) {
2923+
// get version rpc
2924+
Cloud.GetVersionRequest request = Cloud.GetVersionRequest.newBuilder()
2925+
.setDbId(-1)
2926+
.setTableId(-1)
2927+
.setPartitionId(-1)
2928+
.addAllDbIds(dbIds)
2929+
.addAllTableIds(tableIds)
2930+
.setBatchMode(true)
2931+
.setIsTableVersion(true)
2932+
.build();
2933+
2934+
try {
2935+
Cloud.GetVersionResponse resp = VersionHelper.getVersionFromMeta(request);
2936+
if (resp.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
2937+
throw new RpcException("get table visible version", "unexpected status " + resp.getStatus());
2938+
}
2939+
2940+
List<Long> versions = resp.getVersionsList();
2941+
if (versions.size() != tableIds.size()) {
2942+
throw new RpcException("get table visible version",
2943+
"wrong number of versions, required " + tableIds.size() + ", but got " + versions.size());
2944+
}
2945+
2946+
if (LOG.isDebugEnabled()) {
2947+
LOG.debug("get table version from meta service, tables: {}, versions: {}", tableIds, versions);
2948+
}
2949+
2950+
for (int i = 0; i < versions.size(); i++) {
2951+
// Set visible version to 1 if no such table version exists.
2952+
if (versions.get(i) <= 0L) {
2953+
versions.set(i, 1L);
2954+
}
2955+
}
2956+
2957+
return versions;
2958+
} catch (RpcException e) {
2959+
throw new RuntimeException("get table version from meta service failed", e);
28782960
}
28792961
}
28802962

@@ -2921,19 +3003,6 @@ public MTMVSnapshotIf getTableSnapshot() {
29213003
return new MTMVVersionSnapshot(visibleVersion);
29223004
}
29233005

2924-
private static Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionRequest req)
2925-
throws RpcException {
2926-
long startAt = System.nanoTime();
2927-
try {
2928-
return VersionHelper.getVisibleVersion(req);
2929-
} finally {
2930-
SummaryProfile profile = getSummaryProfile();
2931-
if (profile != null) {
2932-
profile.addGetTableVersionTime(System.nanoTime() - startAt);
2933-
}
2934-
}
2935-
}
2936-
29373006
@Override
29383007
public boolean needAutoRefresh() {
29393008
return true;
@@ -2944,17 +3013,6 @@ public boolean isPartitionColumnAllowNull() {
29443013
return true;
29453014
}
29463015

2947-
private static SummaryProfile getSummaryProfile() {
2948-
ConnectContext ctx = ConnectContext.get();
2949-
if (ctx != null) {
2950-
StmtExecutor executor = ctx.getExecutor();
2951-
if (executor != null) {
2952-
return executor.getSummaryProfile();
2953-
}
2954-
}
2955-
return null;
2956-
}
2957-
29583016
public void setStatistics(Statistics statistics) {
29593017
this.statistics = statistics;
29603018
}

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java

+2-15
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public long getVisibleVersion() {
125125
.build();
126126

127127
try {
128-
Cloud.GetVersionResponse resp = getVersionFromMeta(request);
128+
Cloud.GetVersionResponse resp = VersionHelper.getVersionFromMeta(request);
129129
long version = -1;
130130
if (resp.getStatus().getCode() == MetaServiceCode.OK) {
131131
version = resp.getVersion();
@@ -238,7 +238,7 @@ public static List<Long> getSnapshotVisibleVersion(List<Long> dbIds, List<Long>
238238
if (LOG.isDebugEnabled()) {
239239
LOG.debug("getVisibleVersion use CloudPartition {}", partitionIds.toString());
240240
}
241-
Cloud.GetVersionResponse resp = getVersionFromMeta(req);
241+
Cloud.GetVersionResponse resp = VersionHelper.getVersionFromMeta(req);
242242
if (resp.getStatus().getCode() != MetaServiceCode.OK) {
243243
throw new RpcException("get visible version", "unexpected status " + resp.getStatus());
244244
}
@@ -339,19 +339,6 @@ public boolean hasData() {
339339
return getVisibleVersion() > Partition.PARTITION_INIT_VERSION;
340340
}
341341

342-
private static Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionRequest req)
343-
throws RpcException {
344-
long startAt = System.nanoTime();
345-
try {
346-
return VersionHelper.getVisibleVersion(req);
347-
} finally {
348-
SummaryProfile profile = getSummaryProfile();
349-
if (profile != null) {
350-
profile.addGetPartitionVersionTime(System.nanoTime() - startAt);
351-
}
352-
}
353-
}
354-
355342
private static boolean isEmptyPartitionPruneDisabled() {
356343
ConnectContext ctx = ConnectContext.get();
357344
if (ctx != null && (ctx.getSessionVariable().getDisableNereidsRules().get(RuleType.valueOf(

fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/VersionHelper.java

+36-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
import org.apache.doris.cloud.proto.Cloud;
2121
import org.apache.doris.common.Config;
22+
import org.apache.doris.common.profile.SummaryProfile;
23+
import org.apache.doris.qe.ConnectContext;
24+
import org.apache.doris.qe.StmtExecutor;
2225
import org.apache.doris.rpc.RpcException;
2326

2427
import org.apache.logging.log4j.LogManager;
@@ -32,6 +35,26 @@
3235
public class VersionHelper {
3336
private static final Logger LOG = LogManager.getLogger(VersionHelper.class);
3437

38+
// Call get_version() from meta service, and save the elapsed to summary profile.
39+
public static Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionRequest req)
40+
throws RpcException {
41+
long startAt = System.nanoTime();
42+
boolean isTableVersion = req.getIsTableVersion();
43+
try {
44+
return getVisibleVersion(req);
45+
} finally {
46+
SummaryProfile profile = getSummaryProfile();
47+
if (profile != null) {
48+
long elapsed = System.nanoTime() - startAt;
49+
if (isTableVersion) {
50+
profile.addGetTableVersionTime(elapsed);
51+
} else {
52+
profile.addGetPartitionVersionTime(elapsed);
53+
}
54+
}
55+
}
56+
}
57+
3558
public static Cloud.GetVersionResponse getVisibleVersion(Cloud.GetVersionRequest request) throws RpcException {
3659
int tryTimes = 0;
3760
while (tryTimes++ < Config.metaServiceRpcRetryTimes()) {
@@ -65,8 +88,7 @@ public static Cloud.GetVersionResponse getVisibleVersionInternal(Cloud.GetVersio
6588
long deadline = System.currentTimeMillis() + timeoutMs;
6689
Cloud.GetVersionResponse resp = null;
6790
try {
68-
Future<Cloud.GetVersionResponse> future =
69-
MetaServiceProxy.getInstance().getVisibleVersionAsync(request);
91+
Future<Cloud.GetVersionResponse> future = MetaServiceProxy.getInstance().getVisibleVersionAsync(request);
7092

7193
while (resp == null) {
7294
try {
@@ -89,4 +111,16 @@ private static void sleepSeveralMs(int lowerMs, int upperMs) {
89111
LOG.warn("get snapshot from meta service: sleep get interrupted exception");
90112
}
91113
}
114+
115+
private static SummaryProfile getSummaryProfile() {
116+
ConnectContext ctx = ConnectContext.get();
117+
if (ctx != null) {
118+
StmtExecutor executor = ctx.getExecutor();
119+
if (executor != null) {
120+
return executor.getSummaryProfile();
121+
}
122+
}
123+
return null;
124+
}
125+
92126
}

0 commit comments

Comments
 (0)