Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve raft module #1802

Merged
merged 1 commit into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public RaftBackendStore(BackendStore store, RaftSharedContext context) {
this.store = store;
this.context = context;
this.mutationBatch = new ThreadLocal<>();
this.isSafeRead = this.context.isSafeRead();
this.isSafeRead = this.context.safeRead();
}

public BackendStore originStore() {
Expand Down Expand Up @@ -228,7 +228,7 @@ public void run(Status status, long index, byte[] reqCtx) {
}
}
};
this.node().node().readIndex(BytesUtil.EMPTY_BYTES, readIndexClosure);
this.node().readIndex(BytesUtil.EMPTY_BYTES, readIndexClosure);
try {
return future.waitFinished();
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ private RaftResult<T> get() {
}
}

public void complete(Status status) {
this.future.complete(new RaftResult<>(status));
}

public void complete(Status status, Supplier<T> callback) {
this.future.complete(new RaftResult<>(status, callback));
}
Expand All @@ -79,7 +83,7 @@ public void failure(Status status, Throwable exception) {
@Override
public void run(Status status) {
if (status.isOk()) {
this.complete(status, () -> null);
this.complete(status);
} else {
LOG.error("Failed to apply command: {}", status);
String msg = "Failed to apply command in raft node with error: " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ public RaftNode(RaftSharedContext context) {
this.busyCounter = new AtomicInteger();
}

public RaftSharedContext context() {
protected RaftSharedContext context() {
return this.context;
}

public Node node() {
protected Node node() {
assert this.node != null;
return this.node;
}
Expand Down Expand Up @@ -111,62 +111,55 @@ public void snapshot() {
}
}

private Node initRaftNode() throws IOException {
NodeOptions nodeOptions = this.context.nodeOptions();
nodeOptions.setFsm(this.stateMachine);
// TODO: When support sharding, groupId needs to be bound to shard Id
String groupId = this.context.group();
PeerId endpoint = this.context.endpoint();
/*
* Start raft node with shared rpc server:
* return new RaftGroupService(groupId, endpoint, nodeOptions,
* this.context.rpcServer(), true)
* .start(false)
*/
return RaftServiceFactory.createAndInitRaftNode(groupId, endpoint,
nodeOptions);
public void readIndex(byte[] reqCtx, ReadIndexClosure done) {
this.node.readIndex(reqCtx, done);
}

private void submitCommand(StoreCommand command, RaftStoreClosure closure) {
public Object submitAndWait(StoreCommand command, RaftStoreClosure future) {
// Submit command to raft node
this.submitCommand(command, future);

try {
/*
* Here wait for the command to complete:
* 1.If on the leader, wait for the logs has been committed.
* 2.If on the follower, request command will be forwarded to the
* leader, actually it has waited in forwardToLeader().
*/
return future.waitFinished();
} catch (Throwable e) {
throw new BackendException("Failed to wait store command %s",
e, command);
}
}

private void submitCommand(StoreCommand command, RaftStoreClosure future) {
// Wait leader elected
LeaderInfo leaderInfo = this.waitLeaderElected(
RaftSharedContext.NO_TIMEOUT);
// If myself is not leader, forward to the leader
if (!leaderInfo.selfIsLeader) {
this.context.rpcForwarder().forwardToLeader(leaderInfo.leaderId,
command, closure);
command, future);
return;
}

// Sleep a while when raft node is busy
this.waitIfBusy();

Task task = new Task();
task.setDone(closure);
// compress return BytesBuffer
// Compress data, note compress() will return a BytesBuffer
ByteBuffer buffer = LZ4Util.compress(command.data(),
RaftSharedContext.BLOCK_SIZE)
.forReadWritten()
.asByteBuffer();
LOG.debug("The bytes size of command(compressed) {} is {}",
command.action(), buffer.limit());
LOG.debug("Submit to raft node '{}', the compressed bytes of command " +
"{} is {}", this.node, command.action(), buffer.limit());
task.setData(buffer);
LOG.debug("submit to raft node {}", this.node);
task.setDone(future);
this.node.apply(task);
}

public Object submitAndWait(StoreCommand command, RaftStoreClosure future) {
this.submitCommand(command, future);
try {
/*
* Here will wait future complete, actually the follower has waited
* in forwardToLeader, written like this to simplify the code
*/
return future.waitFinished();
} catch (Throwable e) {
throw new BackendException("Failed to wait store command %s",
e, command);
}
}

protected LeaderInfo waitLeaderElected(int timeout) {
String group = this.context.group();
LeaderInfo leaderInfo = this.leaderInfo.get();
Expand Down Expand Up @@ -250,6 +243,22 @@ private void waitIfBusy() {
}
}

private Node initRaftNode() throws IOException {
NodeOptions nodeOptions = this.context.nodeOptions();
nodeOptions.setFsm(this.stateMachine);
// TODO: When support sharding, groupId needs to be bound to shard Id
String groupId = this.context.group();
PeerId endpoint = this.context.endpoint();
/*
* Start raft node with shared rpc server:
* return new RaftGroupService(groupId, endpoint, nodeOptions,
* this.context.rpcServer(), true)
* .start(false)
*/
return RaftServiceFactory.createAndInitRaftNode(groupId, endpoint,
nodeOptions);
}

@Override
public String toString() {
return String.format("[%s-%s]", this.context.group(), this.nodeId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public final class RaftResult<T> {
private final Supplier<T> callback;
private final Throwable exception;

public RaftResult(Status status) {
this(status, () -> null, null);
}

public RaftResult(Status status, Supplier<T> callback) {
this(status, callback, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public RaftSharedContext(HugeGraphParams params) {

public void initRaftNode() {
this.raftNode = new RaftNode(this);
this.rpcForwarder = new RpcForwarder(this.raftNode);
this.rpcForwarder = new RpcForwarder(this.raftNode.node());
this.raftGroupManager = new RaftGroupManagerImpl(this);
}

Expand Down Expand Up @@ -337,7 +337,7 @@ public PeerId endpoint() {
return endpoint;
}

public boolean isSafeRead() {
public boolean safeRead() {
return this.config().get(CoreOptions.RAFT_SAFE_READ);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.slf4j.Logger;

import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.core.NodeImpl;
import com.alipay.sofa.jraft.entity.PeerId;
Expand All @@ -34,7 +35,6 @@
import com.alipay.sofa.jraft.util.Endpoint;
import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.backend.store.raft.RaftClosure;
import com.baidu.hugegraph.backend.store.raft.RaftNode;
import com.baidu.hugegraph.backend.store.raft.RaftStoreClosure;
import com.baidu.hugegraph.backend.store.raft.StoreCommand;
import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.CommonResponse;
Expand All @@ -53,14 +53,14 @@ public class RpcForwarder {
private final PeerId nodeId;
private final RaftClientService rpcClient;

public RpcForwarder(RaftNode node) {
this.nodeId = node.node().getNodeId().getPeerId();
this.rpcClient = ((NodeImpl) node.node()).getRpcService();
public RpcForwarder(Node node) {
this.nodeId = node.getNodeId().getPeerId();
this.rpcClient = ((NodeImpl) node).getRpcService();
E.checkNotNull(this.rpcClient, "rpc client");
}

public void forwardToLeader(PeerId leaderId, StoreCommand command,
RaftStoreClosure closure) {
RaftStoreClosure future) {
E.checkNotNull(leaderId, "leader id");
E.checkState(!leaderId.equals(this.nodeId),
"Invalid state: current node is the leader, there is " +
Expand All @@ -80,7 +80,7 @@ public void forwardToLeader(PeerId leaderId, StoreCommand command,
public void setResponse(StoreCommandResponse response) {
if (response.getStatus()) {
LOG.debug("StoreCommandResponse status ok");
closure.complete(Status.OK(), () -> null);
future.complete(Status.OK(), () -> null);
} else {
LOG.debug("StoreCommandResponse status error");
Status status = new Status(RaftError.UNKNOWN,
Expand All @@ -90,13 +90,13 @@ public void setResponse(StoreCommandResponse response) {
"is [%s], failed to forward request " +
"to leader: %s",
leaderId, response.getMessage());
closure.failure(status, e);
future.failure(status, e);
}
}

@Override
public void run(Status status) {
closure.run(status);
future.run(status);
}
};
this.waitRpc(leaderId.getEndpoint(), request, responseClosure);
Expand All @@ -112,7 +112,7 @@ public <T extends Message> RaftClosure<T> forwardToLeader(PeerId leaderId,
this.nodeId, leaderId);

RaftClosure<T> future = new RaftClosure<>();
RpcResponseClosure<T> responseClosure = new RpcResponseClosure<T>() {
RpcResponseClosure<T> responseDone = new RpcResponseClosure<T>() {
@Override
public void setResponse(T response) {
FieldDescriptor fd = response.getDescriptorForType()
Expand Down Expand Up @@ -142,7 +142,7 @@ public void run(Status status) {
future.run(status);
}
};
this.waitRpc(leaderId.getEndpoint(), request, responseClosure);
this.waitRpc(leaderId.getEndpoint(), request, responseDone);
return future;
}

Expand All @@ -151,7 +151,8 @@ private <T extends Message> void waitRpc(Endpoint endpoint, Message request,
E.checkNotNull(endpoint, "leader endpoint");
try {
this.rpcClient.invokeWithDone(endpoint, request, done,
WAIT_RPC_TIMEOUT).get();
WAIT_RPC_TIMEOUT)
.get();
} catch (InterruptedException e) {
throw new BackendException("Invoke rpc request was interrupted, " +
"please try again later", e);
Expand Down