Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick](branch-2.1) Fix some group commit fault #40319

Merged
merged 3 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class LoadAction extends RestBaseController {

@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_load", method = RequestMethod.PUT)
public Object load(HttpServletRequest request, HttpServletResponse response,
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}
Expand All @@ -87,20 +87,28 @@ public Object load(HttpServletRequest request, HttpServletResponse response,

@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load", method = RequestMethod.PUT)
public Object streamLoad(HttpServletRequest request,
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
boolean groupCommit = false;
String groupCommitStr = request.getHeader("group_commit");
if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) {
groupCommit = true;
try {
if (isGroupCommitBlock(db, table)) {
String msg = "insert table " + table + " is blocked on schema change";
return new RestBaseResult(msg);
if (groupCommitStr != null) {
if (!groupCommitStr.equalsIgnoreCase("async_mode") && !groupCommitStr.equalsIgnoreCase("sync_mode")
&& !groupCommitStr.equalsIgnoreCase("off_mode")) {
return new RestBaseResult("Header `group_commit` can only be `sync_mode`, `async_mode` or `off_mode`.");
}
if (!groupCommitStr.equalsIgnoreCase("off_mode")) {
groupCommit = true;
if (groupCommitStr.equalsIgnoreCase("async_mode")) {
try {
if (isGroupCommitBlock(db, table)) {
String msg = "insert table " + table + " is blocked on schema change";
return new RestBaseResult(msg);
}
} catch (Exception e) {
LOG.info("exception:" + e);
return new RestBaseResult(e.getMessage());
}
}
} catch (Exception e) {
LOG.info("exception:" + e);
return new RestBaseResult(e.getMessage());
}
}
if (needRedirect(request.getScheme())) {
Expand Down Expand Up @@ -131,21 +139,32 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse
boolean groupCommit = false;
long tableId = -1;
String groupCommitStr = request.getHeader("group_commit");
if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) {
groupCommit = true;
try {
String[] pair = parseDbAndTb(sql);
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s));
Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s));
tableId = tbl.getId();
if (isGroupCommitBlock(pair[0], pair[1])) {
String msg = "insert table " + pair[1] + " is blocked on schema change";
return new RestBaseResult(msg);
if (groupCommitStr != null) {
if (!groupCommitStr.equalsIgnoreCase("async_mode") && !groupCommitStr.equalsIgnoreCase("sync_mode")
&& !groupCommitStr.equalsIgnoreCase("off_mode")) {
return new RestBaseResult("Header `group_commit` can only be `sync_mode`, `async_mode` or `off_mode`.");
}
if (!groupCommitStr.equalsIgnoreCase("off_mode")) {
try {
groupCommit = true;
String[] pair = parseDbAndTb(sql);
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s));
Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s));
tableId = tbl.getId();

// async mode needs to write WAL, we need to block load during waiting WAL.
if (groupCommitStr.equalsIgnoreCase("async_mode")) {
if (isGroupCommitBlock(pair[0], pair[1])) {
String msg = "insert table " + pair[1] + " is blocked on schema change";
return new RestBaseResult(msg);
}

}
} catch (Exception e) {
LOG.info("exception:" + e);
return new RestBaseResult(e.getMessage());
}
} catch (Exception e) {
LOG.info("exception:" + e);
return new RestBaseResult(e.getMessage());
}
}
executeCheckPassword(request, response);
Expand Down Expand Up @@ -207,8 +226,8 @@ private String[] parseDbAndTb(String sql) throws Exception {

@RequestMapping(path = "/api/{" + DB_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT)
public Object streamLoad2PC(HttpServletRequest request,
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db) {
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db) {
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}
Expand All @@ -219,9 +238,9 @@ public Object streamLoad2PC(HttpServletRequest request,

@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT)
public Object streamLoad2PC_table(HttpServletRequest request,
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db,
@PathVariable(value = TABLE_KEY) String table) {
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db,
@PathVariable(value = TABLE_KEY) String table) {
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}
Expand Down Expand Up @@ -361,21 +380,7 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
if (groupCommit) {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setThreadLocalInfo();
ctx.setRemoteIP(request.getRemoteAddr());
// We set this variable to fulfill required field 'user' in
// TMasterOpRequest(FrontendService.thrift)
ctx.setQualifiedUser(Auth.ADMIN_USER);
ctx.setThreadLocalInfo();

try {
backend = Env.getCurrentEnv().getGroupCommitManager()
.selectBackendForGroupCommit(tableId, ctx, false);
} catch (DdlException e) {
throw new RuntimeException(e);
}
backend = selectBackendForGroupCommit(request, tableId);
} else {
backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
}
Expand All @@ -402,7 +407,7 @@ private boolean checkClusterToken(String token) {
// temporarily addressing the users' needs for audit logs.
// So this function is not widely tested under general scenario
private Object executeWithClusterToken(HttpServletRequest request, String db,
String table, boolean isStreamLoad) {
String table, boolean isStreamLoad) {
try {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
Expand Down Expand Up @@ -473,4 +478,25 @@ private Object executeWithClusterToken(HttpServletRequest request, String db,
ConnectContext.remove();
}
}

private Backend selectBackendForGroupCommit(HttpServletRequest req, long tableId)
throws LoadException {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setThreadLocalInfo();
ctx.setRemoteIP(req.getRemoteAddr());
// We set this variable to fulfill required field 'user' in
// TMasterOpRequest(FrontendService.thrift)
ctx.setQualifiedUser(Auth.ADMIN_USER);
ctx.setThreadLocalInfo();

Backend backend = null;
try {
backend = Env.getCurrentEnv().getGroupCommitManager()
.selectBackendForGroupCommit(tableId, ctx);
} catch (DdlException e) {
throw new LoadException(e.getMessage(), e);
}
return backend;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private long getWalQueueSize(Backend backend, PGetWalQueueSizeRequest request) {
return size;
}

public Backend selectBackendForGroupCommit(long tableId, ConnectContext context, boolean isCloud)
public Backend selectBackendForGroupCommit(long tableId, ConnectContext context)
throws LoadException, DdlException {
// If a group commit request is sent to the follower FE, we will send this request to the master FE. master FE
// can select a BE and return this BE id to follower FE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public GroupCommitPlanner(Database db, OlapTable table, List<String> targetColum
public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext ctx,
List<InternalService.PDataRow> rows)
throws DdlException, RpcException, ExecutionException, InterruptedException {
backend = ctx.getInsertGroupCommit(this.table.getId());
selectBackends(ctx);
if (backend == null || !backend.isAlive() || backend.isDecommissioned()) {
List<Long> allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (allBackendIds.isEmpty()) {
Expand Down Expand Up @@ -162,7 +162,7 @@ public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext ctx,
.setRequest(execPlanFragmentParamsBytes)
.setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build())
.setLoadId(Types.PUniqueId.newBuilder().setHi(loadId.hi).setLo(loadId.lo)
.build()).addAllData(rows)
.build()).addAllData(rows)
.build();
Future<PGroupCommitInsertResponse> future = BackendServiceProxy.getInstance()
.groupCommitInsert(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request);
Expand Down Expand Up @@ -206,7 +206,7 @@ private static void processExprVal(Expr expr, InternalService.PDataRow.Builder r
protected void selectBackends(ConnectContext ctx) throws DdlException {
try {
backend = Env.getCurrentEnv().getGroupCommitManager()
.selectBackendForGroupCommit(this.table.getId(), ctx, false);
.selectBackendForGroupCommit(this.table.getId(), ctx);
} catch (LoadException e) {
throw new DdlException("No suitable backend");
}
Expand Down
Loading