Skip to content

Commit 89db998

Browse files
committed
To support delte in TOSS
dummy copy from chain add edges processor reguraly update some update may compile add some simple UT adjusting UT add explain DeleteEdgesRequest add some UT
1 parent fe68b0e commit 89db998

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1874
-305
lines changed

src/clients/storage/InternalStorageClient.cpp

+41
Original file line numberDiff line numberDiff line change
@@ -131,5 +131,46 @@ cpp2::ChainAddEdgesRequest InternalStorageClient::makeChainAddReq(const cpp2::Ad
131131
return ret;
132132
}
133133

134+
void InternalStorageClient::chainDeleteEdges(cpp2::DeleteEdgesRequest& req,
135+
const std::string& txnId,
136+
TermID termId,
137+
folly::Promise<nebula::cpp2::ErrorCode>&& p,
138+
folly::EventBase* evb) {
139+
auto spaceId = req.get_space_id();
140+
auto partId = req.get_parts().begin()->first;
141+
auto optLeader = getLeader(req.get_space_id(), partId);
142+
if (!optLeader.ok()) {
143+
LOG(WARNING) << folly::sformat("failed to get leader, space {}, part {}", spaceId, partId);
144+
p.setValue(::nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND);
145+
return;
146+
}
147+
HostAddr& leader = optLeader.value();
148+
leader.port += kInternalPortOffset;
149+
VLOG(2) << "leader host: " << leader;
150+
151+
cpp2::ChainDeleteEdgesRequest chainReq;
152+
chainReq.set_space_id(req.get_space_id());
153+
chainReq.set_parts(req.get_parts());
154+
chainReq.set_txn_id(txnId);
155+
chainReq.set_term(termId);
156+
auto resp = getResponse(
157+
evb,
158+
std::make_pair(leader, chainReq),
159+
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainDeleteEdgesRequest& r) {
160+
return client->future_chainDeleteEdges(r);
161+
});
162+
163+
std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
164+
auto code = getErrorCode(t);
165+
if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
166+
std::this_thread::sleep_for(std::chrono::milliseconds(500));
167+
chainDeleteEdges(req, txnId, termId, std::move(p));
168+
} else {
169+
p.setValue(code);
170+
}
171+
return;
172+
});
173+
}
174+
134175
} // namespace storage
135176
} // namespace nebula

src/clients/storage/InternalStorageClient.h

+6
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ class InternalStorageClient : public StorageClientBase<cpp2::InternalStorageServ
4242
folly::Promise<::nebula::cpp2::ErrorCode>&& p,
4343
folly::EventBase* evb = nullptr);
4444

45+
virtual void chainDeleteEdges(cpp2::DeleteEdgesRequest& req,
46+
const std::string& txnId,
47+
TermID termId,
48+
folly::Promise<::nebula::cpp2::ErrorCode>&& p,
49+
folly::EventBase* evb = nullptr);
50+
4551
private:
4652
cpp2::ChainAddEdgesRequest makeChainAddReq(const cpp2::AddEdgesRequest& req,
4753
TermID termId,

src/interface/storage.thrift

+10-11
Original file line numberDiff line numberDiff line change
@@ -863,17 +863,6 @@ service StorageAdminService {
863863
//
864864
//////////////////////////////////////////////////////////
865865

866-
// transaction request
867-
struct InternalTxnRequest {
868-
1: i64 txn_id,
869-
2: map<common.PartitionID, i64> term_of_parts,
870-
3: optional AddEdgesRequest add_edge_req,
871-
4: optional UpdateEdgeRequest upd_edge_req,
872-
5: optional map<common.PartitionID, list<i64>>(
873-
cpp.template = "std::unordered_map") edge_ver,
874-
}
875-
876-
877866
struct ChainAddEdgesRequest {
878867
1: common.GraphSpaceID space_id,
879868
// partId => edges
@@ -900,7 +889,17 @@ struct ChainUpdateEdgeRequest {
900889
5: required list<common.PartitionID> parts,
901890
}
902891

892+
struct ChainDeleteEdgesRequest {
893+
1: common.GraphSpaceID space_id,
894+
// partId => edgeKeys
895+
2: map<common.PartitionID, list<EdgeKey>>
896+
(cpp.template = "std::unordered_map") parts,
897+
3: binary txn_id
898+
4: i64 term,
899+
}
900+
903901
service InternalStorageService {
904902
ExecResponse chainAddEdges(1: ChainAddEdgesRequest req);
905903
UpdateResponse chainUpdateEdge(1: ChainUpdateEdgeRequest req);
904+
ExecResponse chainDeleteEdges(1: ChainDeleteEdgesRequest req);
906905
}

src/kvstore/raftex/RaftPart.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -1956,7 +1956,6 @@ void RaftPart::checkRemoteListeners(const std::set<HostAddr>& expected) {
19561956
}
19571957
}
19581958
}
1959-
19601959
bool RaftPart::leaseValid() {
19611960
std::lock_guard<std::mutex> g(raftLock_);
19621961
if (hosts_.empty()) {

src/mock/MockData.cpp

+8-4
Original file line numberDiff line numberDiff line change
@@ -743,7 +743,7 @@ std::vector<VertexID> MockData::mockPlayerVerticeIds() {
743743
return ret;
744744
}
745745

746-
std::vector<EdgeData> MockData::mockEdges(bool upper) {
746+
std::vector<EdgeData> MockData::mockEdges(bool upper, bool hasInEdges) {
747747
std::vector<EdgeData> ret;
748748
// Use serve data, positive edgeType is 101, reverse edgeType is -101
749749
for (auto& serve : serves_) {
@@ -787,7 +787,9 @@ std::vector<EdgeData> MockData::mockEdges(bool upper) {
787787
positiveEdge.props_ = std::move(props);
788788
auto reverseData = getReverseEdge(positiveEdge);
789789
ret.emplace_back(std::move(positiveEdge));
790-
ret.emplace_back(std::move(reverseData));
790+
if (hasInEdges) {
791+
ret.emplace_back(std::move(reverseData));
792+
}
791793
}
792794
return ret;
793795
}
@@ -946,11 +948,13 @@ nebula::storage::cpp2::DeleteVerticesRequest MockData::mockDeleteVerticesReq(int
946948
return req;
947949
}
948950

949-
nebula::storage::cpp2::AddEdgesRequest MockData::mockAddEdgesReq(bool upper, int32_t parts) {
951+
nebula::storage::cpp2::AddEdgesRequest MockData::mockAddEdgesReq(bool upper,
952+
int32_t parts,
953+
bool hasInEdges) {
950954
nebula::storage::cpp2::AddEdgesRequest req;
951955
req.set_space_id(1);
952956
req.set_if_not_exists(true);
953-
auto retRecs = mockEdges(upper);
957+
auto retRecs = mockEdges(upper, hasInEdges);
954958
for (auto& rec : retRecs) {
955959
nebula::storage::cpp2::NewEdge newEdge;
956960
nebula::storage::cpp2::EdgeKey edgeKey;

src/mock/MockData.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ class MockData {
115115
static std::vector<std::pair<PartitionID, std::string>> mockPlayerIndexKeys(bool upper = false);
116116

117117
// generate serve edge
118-
static std::vector<EdgeData> mockEdges(bool upper = false);
118+
// param: includeInEdges, if the return set has both out and in edges
119+
static std::vector<EdgeData> mockEdges(bool upper = false, bool includeInEdges = true);
119120

120121
static std::vector<std::pair<PartitionID, std::string>> mockServeIndexKeys();
121122

@@ -169,7 +170,8 @@ class MockData {
169170
int32_t parts = 6);
170171

171172
static nebula::storage::cpp2::AddEdgesRequest mockAddEdgesReq(bool upper = false,
172-
int32_t parts = 6);
173+
int32_t parts = 6,
174+
bool hasInEdges = true);
173175

174176
static nebula::storage::cpp2::DeleteVerticesRequest mockDeleteVerticesReq(int32_t parts = 6);
175177

src/storage/CMakeLists.txt

+5
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ nebula_add_library(
8383
transaction/ResumeUpdateProcessor.cpp
8484
transaction/ResumeUpdateRemoteProcessor.cpp
8585
transaction/ChainProcessorFactory.cpp
86+
transaction/ChainDeleteEdgesGroupProcessor.cpp
87+
transaction/ChainDeleteEdgesLocalProcessor.cpp
88+
transaction/ChainDeleteEdgesRemoteProcessor.cpp
89+
transaction/ChainDeleteEdgesResumeProcessor.cpp
90+
transaction/ChainDeleteEdgesResumeRemoteProcessor.cpp
8691
)
8792

8893
nebula_add_library(

src/storage/mutate/DeleteEdgesProcessor.cpp

+20-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,21 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) {
7575
handleAsync(spaceId_, partId, code);
7676
continue;
7777
}
78-
doRemove(spaceId_, partId, std::move(keys));
78+
79+
HookFuncPara para;
80+
if (tossHookFunc_) {
81+
para.keys.emplace(&keys);
82+
(*tossHookFunc_)(para);
83+
}
84+
if (para.result) {
85+
env_->kvstore_->asyncAppendBatch(
86+
spaceId_,
87+
partId,
88+
std::move(para.result.value()),
89+
[partId, this](nebula::cpp2::ErrorCode code) { handleAsync(spaceId_, partId, code); });
90+
} else {
91+
doRemove(spaceId_, partId, std::move(keys));
92+
}
7993
}
8094
} else {
8195
for (auto& part : partEdges) {
@@ -194,6 +208,11 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> DeleteEdgesProcessor::deleteEdges(
194208
}
195209
}
196210

211+
if (tossHookFunc_) {
212+
HookFuncPara para;
213+
para.batch.emplace(batchHolder.get());
214+
(*tossHookFunc_)(para);
215+
}
197216
return encodeBatchValue(batchHolder->getBatch());
198217
}
199218

src/storage/mutate/DeleteEdgesProcessor.h

+9
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "common/base/Base.h"
1010
#include "kvstore/LogEncoder.h"
1111
#include "storage/BaseProcessor.h"
12+
#include "storage/transaction/ConsistTypes.h"
1213

1314
namespace nebula {
1415
namespace storage {
@@ -24,6 +25,9 @@ class DeleteEdgesProcessor : public BaseProcessor<cpp2::ExecResponse> {
2425

2526
void process(const cpp2::DeleteEdgesRequest& req);
2627

28+
using HookFunction = std::function<void(HookFuncPara&)>;
29+
void setHookFunc(HookFunction func) { tossHookFunc_ = func; }
30+
2731
private:
2832
explicit DeleteEdgesProcessor(StorageEnv* env, const ProcessorCounters* counters)
2933
: BaseProcessor<cpp2::ExecResponse>(env, counters) {}
@@ -34,6 +38,11 @@ class DeleteEdgesProcessor : public BaseProcessor<cpp2::ExecResponse> {
3438
private:
3539
GraphSpaceID spaceId_;
3640
std::vector<std::shared_ptr<nebula::meta::cpp2::IndexItem>> indexes_;
41+
42+
protected:
43+
// TOSS use this hook function to append some delete operation
44+
// or may append some put operation
45+
std::optional<HookFunction> tossHookFunc_;
3746
};
3847

3948
} // namespace storage

src/storage/test/CMakeLists.txt

+15
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,21 @@ nebula_add_executable(
751751
gtest
752752
)
753753

754+
nebula_add_test(
755+
NAME
756+
chain_delete_edge_test
757+
SOURCES
758+
ChainDeleteEdgesTest.cpp
759+
OBJECTS
760+
${storage_test_deps}
761+
LIBRARIES
762+
${ROCKSDB_LIBRARIES}
763+
${THRIFT_LIBRARIES}
764+
${PROXYGEN_LIBRARIES}
765+
wangle
766+
gtest
767+
)
768+
754769
nebula_add_executable(
755770
NAME
756771
storage_index_write_bm

src/storage/test/ChainAddEdgesTest.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ TEST(ChainAddEdgesTest, TestUtilsTest) {
3434
mock::MockCluster cluster;
3535
cluster.initStorageKV(rootPath.path());
3636
auto* env = cluster.storageEnv_.get();
37-
auto mClient = MetaClientTestUpdater::makeDefaultMetaClient();
37+
auto mClient = MetaClientTestUpdater::makeDefault();
3838
env->metaClient_ = mClient.get();
3939
MetaClientTestUpdater::addPartTerm(env->metaClient_, mockSpaceId, mockPartNum, fackTerm);
4040

@@ -63,7 +63,7 @@ TEST(ChainAddEdgesTest, prepareLocalSucceedTest) {
6363
mock::MockCluster cluster;
6464
cluster.initStorageKV(rootPath.path());
6565
auto* env = cluster.storageEnv_.get();
66-
auto mClient = MetaClientTestUpdater::makeDefaultMetaClient();
66+
auto mClient = MetaClientTestUpdater::makeDefault();
6767
env->metaClient_ = mClient.get();
6868
MetaClientTestUpdater::addPartTerm(env->metaClient_, mockSpaceId, mockPartNum, fackTerm);
6969
auto* proc = new FakeChainAddEdgesProcessorLocal(env);
@@ -91,7 +91,7 @@ TEST(ChainAddEdgesTest, processRemoteSucceededTest) {
9191
mock::MockCluster cluster;
9292
cluster.initStorageKV(rootPath.path());
9393
auto* env = cluster.storageEnv_.get();
94-
auto mClient = MetaClientTestUpdater::makeDefaultMetaClient();
94+
auto mClient = MetaClientTestUpdater::makeDefault();
9595

9696
env->metaClient_ = mClient.get();
9797
auto* proc = new FakeChainAddEdgesProcessorLocal(env);
@@ -122,7 +122,7 @@ TEST(ChainAddEdgesTest, processRemoteFailedTest) {
122122
mock::MockCluster cluster;
123123
cluster.initStorageKV(rootPath.path());
124124
auto* env = cluster.storageEnv_.get();
125-
auto mClient = MetaClientTestUpdater::makeDefaultMetaClient();
125+
auto mClient = MetaClientTestUpdater::makeDefault();
126126
env->metaClient_ = mClient.get();
127127
MetaClientTestUpdater::addPartTerm(env->metaClient_, mockSpaceId, mockPartNum, fackTerm);
128128

@@ -151,7 +151,7 @@ TEST(ChainAddEdgesTest, processRemoteUnknownTest) {
151151
mock::MockCluster cluster;
152152
cluster.initStorageKV(rootPath.path());
153153
auto* env = cluster.storageEnv_.get();
154-
auto mClient = MetaClientTestUpdater::makeDefaultMetaClient();
154+
auto mClient = MetaClientTestUpdater::makeDefault();
155155
env->metaClient_ = mClient.get();
156156
MetaClientTestUpdater::addPartTerm(env->metaClient_, mockSpaceId, mockPartNum, fackTerm);
157157

@@ -182,7 +182,7 @@ TEST(ChainAddEdgesTest, processRemoteTest) {
182182
mock::MockCluster cluster;
183183
cluster.initStorageKV(rootPath.path());
184184
auto* env = cluster.storageEnv_.get();
185-
auto mClient = MetaClientTestUpdater::makeDefaultMetaClient();
185+
auto mClient = MetaClientTestUpdater::makeDefault();
186186

187187
env->metaClient_ = mClient.get();
188188
MetaClientTestUpdater::addPartTerm(env->metaClient_, mockSpaceId, mockPartNum, fackTerm);

0 commit comments

Comments
 (0)