Skip to content

Commit 8d0cd03

Browse files
Yukang-Liandataroaring
authored andcommitted
[Fix](group commit) Fix cloud group commit be select strategy (#39986)
## Proposed changes In #35558, we optimized be select for group commit. However, we forgot to apply this strategy to cloud. This PR applys it. <!--Describe your changes.-->
1 parent 0acf277 commit 8d0cd03

File tree

3 files changed

+92
-58
lines changed

3 files changed

+92
-58
lines changed

fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java

+88-52
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public class LoadAction extends RestBaseController {
8585

8686
@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_load", method = RequestMethod.PUT)
8787
public Object load(HttpServletRequest request, HttpServletResponse response,
88-
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
88+
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
8989
if (needRedirect(request.getScheme())) {
9090
return redirectToHttps(request);
9191
}
@@ -102,21 +102,29 @@ public Object load(HttpServletRequest request, HttpServletResponse response,
102102

103103
@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load", method = RequestMethod.PUT)
104104
public Object streamLoad(HttpServletRequest request,
105-
HttpServletResponse response,
106-
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
107-
LOG.info("streamload action, db: {}, tbl: {}, headers: {}", db, table, getAllHeaders(request));
105+
HttpServletResponse response,
106+
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
107+
LOG.info("streamload action, db: {}, tbl: {}, headers: {}", db, table, getAllHeaders(request));
108108
boolean groupCommit = false;
109109
String groupCommitStr = request.getHeader("group_commit");
110-
if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) {
111-
groupCommit = true;
112-
try {
113-
if (isGroupCommitBlock(db, table)) {
114-
String msg = "insert table " + table + GroupCommitPlanner.SCHEMA_CHANGE;
115-
return new RestBaseResult(msg);
110+
if (groupCommitStr != null) {
111+
if (!groupCommitStr.equalsIgnoreCase("async_mode") && !groupCommitStr.equalsIgnoreCase("sync_mode")
112+
&& !groupCommitStr.equalsIgnoreCase("off_mode")) {
113+
return new RestBaseResult("Header `group_commit` can only be `sync_mode`, `async_mode` or `off_mode`.");
114+
}
115+
if (!groupCommitStr.equalsIgnoreCase("off_mode")) {
116+
groupCommit = true;
117+
if (groupCommitStr.equalsIgnoreCase("async_mode")) {
118+
try {
119+
if (isGroupCommitBlock(db, table)) {
120+
String msg = "insert table " + table + GroupCommitPlanner.SCHEMA_CHANGE;
121+
return new RestBaseResult(msg);
122+
}
123+
} catch (Exception e) {
124+
LOG.info("exception:" + e);
125+
return new RestBaseResult(e.getMessage());
126+
}
116127
}
117-
} catch (Exception e) {
118-
LOG.info("exception:" + e);
119-
return new RestBaseResult(e.getMessage());
120128
}
121129
}
122130
if (needRedirect(request.getScheme())) {
@@ -147,21 +155,32 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse
147155
boolean groupCommit = false;
148156
long tableId = -1;
149157
String groupCommitStr = request.getHeader("group_commit");
150-
if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) {
151-
groupCommit = true;
152-
try {
153-
String[] pair = parseDbAndTb(sql);
154-
Database db = Env.getCurrentInternalCatalog()
155-
.getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s));
156-
Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s));
157-
tableId = tbl.getId();
158-
if (isGroupCommitBlock(pair[0], pair[1])) {
159-
String msg = "insert table " + pair[1] + GroupCommitPlanner.SCHEMA_CHANGE;
160-
return new RestBaseResult(msg);
158+
if (groupCommitStr != null) {
159+
if (!groupCommitStr.equalsIgnoreCase("async_mode") && !groupCommitStr.equalsIgnoreCase("sync_mode")
160+
&& !groupCommitStr.equalsIgnoreCase("off_mode")) {
161+
return new RestBaseResult("Header `group_commit` can only be `sync_mode`, `async_mode` or `off_mode`.");
162+
}
163+
if (!groupCommitStr.equalsIgnoreCase("off_mode")) {
164+
try {
165+
groupCommit = true;
166+
String[] pair = parseDbAndTb(sql);
167+
Database db = Env.getCurrentInternalCatalog()
168+
.getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s));
169+
Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s));
170+
tableId = tbl.getId();
171+
172+
// async mode needs to write WAL, we need to block load during waiting WAL.
173+
if (groupCommitStr.equalsIgnoreCase("async_mode")) {
174+
if (isGroupCommitBlock(pair[0], pair[1])) {
175+
String msg = "insert table " + pair[1] + GroupCommitPlanner.SCHEMA_CHANGE;
176+
return new RestBaseResult(msg);
177+
}
178+
179+
}
180+
} catch (Exception e) {
181+
LOG.info("exception:" + e);
182+
return new RestBaseResult(e.getMessage());
161183
}
162-
} catch (Exception e) {
163-
LOG.info("exception:" + e);
164-
return new RestBaseResult(e.getMessage());
165184
}
166185
}
167186
executeCheckPassword(request, response);
@@ -223,8 +242,8 @@ private String[] parseDbAndTb(String sql) throws Exception {
223242

224243
@RequestMapping(path = "/api/{" + DB_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT)
225244
public Object streamLoad2PC(HttpServletRequest request,
226-
HttpServletResponse response,
227-
@PathVariable(value = DB_KEY) String db) {
245+
HttpServletResponse response,
246+
@PathVariable(value = DB_KEY) String db) {
228247
LOG.info("streamload action 2PC, db: {}, headers: {}", db, getAllHeaders(request));
229248
if (needRedirect(request.getScheme())) {
230249
return redirectToHttps(request);
@@ -236,9 +255,9 @@ public Object streamLoad2PC(HttpServletRequest request,
236255

237256
@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT)
238257
public Object streamLoad2PC_table(HttpServletRequest request,
239-
HttpServletResponse response,
240-
@PathVariable(value = DB_KEY) String db,
241-
@PathVariable(value = TABLE_KEY) String table) {
258+
HttpServletResponse response,
259+
@PathVariable(value = DB_KEY) String db,
260+
@PathVariable(value = TABLE_KEY) String table) {
242261
LOG.info("streamload action 2PC, db: {}, tbl: {}, headers: {}", db, table, getAllHeaders(request));
243262
if (needRedirect(request.getScheme())) {
244263
return redirectToHttps(request);
@@ -382,7 +401,7 @@ private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolea
382401
if (Strings.isNullOrEmpty(cloudClusterName)) {
383402
throw new LoadException("No cloud cluster name selected.");
384403
}
385-
return selectCloudRedirectBackend(cloudClusterName, request, groupCommit);
404+
return selectCloudRedirectBackend(cloudClusterName, request, groupCommit, tableId);
386405
} else {
387406
return selectLocalRedirectBackend(groupCommit, request, tableId);
388407
}
@@ -409,21 +428,7 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ
409428
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
410429
}
411430
if (groupCommit) {
412-
ConnectContext ctx = new ConnectContext();
413-
ctx.setEnv(Env.getCurrentEnv());
414-
ctx.setThreadLocalInfo();
415-
ctx.setRemoteIP(request.getRemoteAddr());
416-
// We set this variable to fulfill required field 'user' in
417-
// TMasterOpRequest(FrontendService.thrift)
418-
ctx.setQualifiedUser(Auth.ADMIN_USER);
419-
ctx.setThreadLocalInfo();
420-
421-
try {
422-
backend = Env.getCurrentEnv().getGroupCommitManager()
423-
.selectBackendForGroupCommit(tableId, ctx, false);
424-
} catch (DdlException e) {
425-
throw new RuntimeException(e);
426-
}
431+
backend = selectBackendForGroupCommit("", request, tableId, false);
427432
} else {
428433
backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
429434
}
@@ -433,17 +438,23 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ
433438
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
434439
}
435440

436-
private TNetworkAddress selectCloudRedirectBackend(String clusterName, HttpServletRequest req, boolean groupCommit)
441+
private TNetworkAddress selectCloudRedirectBackend(String clusterName, HttpServletRequest req, boolean groupCommit,
442+
long tableId)
437443
throws LoadException {
438-
Backend backend = StreamLoadHandler.selectBackend(clusterName, groupCommit);
444+
Backend backend = null;
445+
if (groupCommit) {
446+
backend = selectBackendForGroupCommit(clusterName, req, tableId, true);
447+
} else {
448+
backend = StreamLoadHandler.selectBackend(clusterName);
449+
}
439450

440451
String redirectPolicy = req.getHeader(LoadAction.HEADER_REDIRECT_POLICY);
441452
// User specified redirect policy
442453
if (redirectPolicy != null && redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_RANDOM_BE)) {
443454
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
444455
}
445456
redirectPolicy = redirectPolicy == null || redirectPolicy.isEmpty()
446-
? Config.streamload_redirect_policy : redirectPolicy;
457+
? Config.streamload_redirect_policy : redirectPolicy;
447458

448459
Pair<String, Integer> publicHostPort = null;
449460
Pair<String, Integer> privateHostPort = null;
@@ -563,7 +574,7 @@ private boolean checkClusterToken(String token) {
563574
// temporarily addressing the users' needs for audit logs.
564575
// So this function is not widely tested under general scenario
565576
private Object executeWithClusterToken(HttpServletRequest request, String db,
566-
String table, boolean isStreamLoad) {
577+
String table, boolean isStreamLoad) {
567578
try {
568579
ConnectContext ctx = new ConnectContext();
569580
ctx.setEnv(Env.getCurrentEnv());
@@ -647,4 +658,29 @@ private String getAllHeaders(HttpServletRequest request) {
647658
}
648659
return headers.toString();
649660
}
661+
662+
private Backend selectBackendForGroupCommit(String clusterName, HttpServletRequest req, long tableId,
663+
boolean isCloud)
664+
throws LoadException {
665+
ConnectContext ctx = new ConnectContext();
666+
ctx.setEnv(Env.getCurrentEnv());
667+
ctx.setThreadLocalInfo();
668+
ctx.setRemoteIP(req.getRemoteAddr());
669+
// We set this variable to fulfill required field 'user' in
670+
// TMasterOpRequest(FrontendService.thrift)
671+
ctx.setQualifiedUser(Auth.ADMIN_USER);
672+
ctx.setThreadLocalInfo();
673+
if (isCloud) {
674+
ctx.setCloudCluster(clusterName);
675+
}
676+
677+
Backend backend = null;
678+
try {
679+
backend = Env.getCurrentEnv().getGroupCommitManager()
680+
.selectBackendForGroupCommit(tableId, ctx, isCloud);
681+
} catch (DdlException e) {
682+
throw new LoadException(e.getMessage(), e);
683+
}
684+
return backend;
685+
}
650686
}

fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,12 @@ public StreamLoadHandler(TStreamLoadPutRequest request, AtomicInteger indexId,
8484
* Select a random backend in the given cloud cluster.
8585
*
8686
* @param clusterName cloud cluster name
87-
* @param groupCommit if this selection is for group commit
8887
* @throws LoadException if there is no available backend
8988
*/
90-
public static Backend selectBackend(String clusterName, boolean groupCommit) throws LoadException {
89+
public static Backend selectBackend(String clusterName) throws LoadException {
9190
List<Backend> backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
9291
.getBackendsByClusterName(clusterName)
93-
.stream().filter(be -> be.isAlive() && (!groupCommit || groupCommit && !be.isDecommissioned()))
92+
.stream().filter(Backend::isAlive)
9493
.collect(Collectors.toList());
9594

9695
if (backends.isEmpty()) {
@@ -101,8 +100,7 @@ public static Backend selectBackend(String clusterName, boolean groupCommit) thr
101100
// TODO: add a more sophisticated algorithm to select backend
102101
SecureRandom rand = new SecureRandom();
103102
int randomIndex = rand.nextInt(backends.size());
104-
Backend backend = backends.get(randomIndex);
105-
return backend;
103+
return backends.get(randomIndex);
106104
}
107105

108106
public void setCloudCluster() throws UserException {

fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ public HttpPut generateRequestForMySqlLoad(
447447
private String selectBackendForMySqlLoad(String database, String table) throws LoadException {
448448
Backend backend = null;
449449
if (Config.isCloudMode()) {
450-
backend = StreamLoadHandler.selectBackend(ConnectContext.get().getCloudCluster(), false);
450+
backend = StreamLoadHandler.selectBackend(ConnectContext.get().getCloudCluster());
451451
} else {
452452
BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build();
453453
List<Long> backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);

0 commit comments

Comments
 (0)