From 0b66a37f56eebad41a73a8cfa9a4ef4c18ceabba Mon Sep 17 00:00:00 2001 From: "lionel.liu@vesoft.com" <52276794+liuyu85cn@users.noreply.github.com> Date: Fri, 22 Apr 2022 17:56:09 +0800 Subject: [PATCH] Revert "Memory lock in raft (#3926)" This reverts commit 4112c7d760554a3979b379564697fb8bdc176328. --- src/kvstore/CMakeLists.txt | 1 + src/kvstore/Common.h | 10 - src/kvstore/KVStore.h | 2 +- src/kvstore/NebulaStore.cpp | 2 +- src/kvstore/NebulaStore.h | 2 +- src/kvstore/Part.cpp | 2 +- src/kvstore/Part.h | 2 +- src/kvstore/raftex/CMakeLists.txt | 1 - src/kvstore/raftex/RaftPart.cpp | 459 ++++++++------------ src/kvstore/raftex/RaftPart.h | 131 +++++- src/kvstore/raftex/test/LogCASTest.cpp | 104 +---- src/kvstore/raftex/test/LogCommandTest.cpp | 62 +-- src/kvstore/raftex/test/RaftexTestBase.cpp | 6 - src/kvstore/raftex/test/TestShard.cpp | 4 - src/kvstore/test/NebulaStoreTest.cpp | 29 +- src/storage/mutate/AddEdgesProcessor.cpp | 379 ++++++++++------ src/storage/mutate/AddEdgesProcessor.h | 5 +- src/storage/mutate/AddVerticesProcessor.cpp | 248 +++++------ src/storage/mutate/AddVerticesProcessor.h | 5 - src/storage/test/IndexTestUtil.h | 13 +- src/storage/test/RebuildIndexTest.cpp | 61 +-- src/storage/test/StatsTaskTest.cpp | 3 +- 22 files changed, 737 insertions(+), 794 deletions(-) diff --git a/src/kvstore/CMakeLists.txt b/src/kvstore/CMakeLists.txt index 748eb8c42f4..549be980087 100644 --- a/src/kvstore/CMakeLists.txt +++ b/src/kvstore/CMakeLists.txt @@ -6,6 +6,7 @@ nebula_add_library( PartManager.cpp NebulaStore.cpp RocksEngineConfig.cpp + LogEncoder.cpp NebulaSnapshotManager.cpp RateLimiter.cpp plugins/elasticsearch/ESListener.cpp diff --git a/src/kvstore/Common.h b/src/kvstore/Common.h index 0ebeefb5e2c..c97e6c878a7 100644 --- a/src/kvstore/Common.h +++ b/src/kvstore/Common.h @@ -270,16 +270,6 @@ inline rocksdb::Slice toSlice(const folly::StringPiece& str) { using KVMap = std::unordered_map; using KVArrayIterator = std::vector::const_iterator; -class MergeableAtomicOpResult { - public: - nebula::cpp2::ErrorCode code; - std::string batch; // batched result, like before. - std::list readSet; - std::list writeSet; -}; - -using MergeableAtomicOp = folly::Function; - } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/KVStore.h b/src/kvstore/KVStore.h index c1d42074009..edf9c1fbe4c 100644 --- a/src/kvstore/KVStore.h +++ b/src/kvstore/KVStore.h @@ -312,7 +312,7 @@ class KVStore { */ virtual void asyncAtomicOp(GraphSpaceID spaceId, PartitionID partId, - MergeableAtomicOp op, + raftex::AtomicOp op, KVCallback cb) = 0; /** diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 1e7e7180478..04e6a911428 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -918,7 +918,7 @@ void NebulaStore::asyncRemoveRange(GraphSpaceID spaceId, void NebulaStore::asyncAtomicOp(GraphSpaceID spaceId, PartitionID partId, - MergeableAtomicOp op, + raftex::AtomicOp op, KVCallback cb) { auto ret = part(spaceId, partId); if (!ok(ret)) { diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 7a5bff80f1e..89c12d424cb 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -415,7 +415,7 @@ class NebulaStore : public KVStore, public Handler { */ void asyncAtomicOp(GraphSpaceID spaceId, PartitionID partId, - MergeableAtomicOp op, + raftex::AtomicOp op, KVCallback cb) override; /** diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index 04d9145297f..dbf765aa8cd 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -115,7 +115,7 @@ void Part::sync(KVCallback cb) { [callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); }); } -void Part::asyncAtomicOp(MergeableAtomicOp op, KVCallback cb) { +void Part::asyncAtomicOp(raftex::AtomicOp op, KVCallback cb) { atomicOpAsync(std::move(op)) .thenValue( [callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); }); diff --git a/src/kvstore/Part.h b/src/kvstore/Part.h index 7db8586dfea..d19138ce189 100644 --- a/src/kvstore/Part.h +++ b/src/kvstore/Part.h @@ -124,7 +124,7 @@ class Part : public raftex::RaftPart { * @param op Atomic operation * @param cb Callback when has a result */ - void asyncAtomicOp(MergeableAtomicOp op, KVCallback cb); + void asyncAtomicOp(raftex::AtomicOp op, KVCallback cb); /** * @brief Add a raft learner asynchronously by adding raft log diff --git a/src/kvstore/raftex/CMakeLists.txt b/src/kvstore/raftex/CMakeLists.txt index 07e1a3afc54..6e2910c869c 100644 --- a/src/kvstore/raftex/CMakeLists.txt +++ b/src/kvstore/raftex/CMakeLists.txt @@ -5,7 +5,6 @@ nebula_add_library( RaftexService.cpp Host.cpp SnapshotManager.cpp - ../LogEncoder.cpp ) nebula_add_subdirectory(test) diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index ecc0bc60265..e2605971c0e 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -50,81 +50,92 @@ using nebula::wal::FileBasedWalPolicy; using OpProcessor = folly::Function(AtomicOp op)>; -/** - * @brief code to describle if a log can be merged with others - * NO_MERGE: can't merge with any other - * MERGE_NEXT: can't previous logs, can merge with next. (has to be head) - * MERGE_PREV: can merge with previous, can't merge any more. (has to be tail) - * MERGE_BOTH: can merge with any other - * - * Normal / heartbeat will always be MERGE_BOTH - * Command will alwayse be MERGE_PREV - * ATOMIC_OP can be either MERGE_NEXT or MERGE_BOTH - * depends on if it read a key in write set. - * no log type will judge as NO_MERGE - */ - -enum class MergeAbleCode { - NO_MERGE = 0, - MERGE_NEXT = 1, - MERGE_PREV = 2, - MERGE_BOTH = 3, -}; - -/** - * @brief this is an Iterator deal with memory lock. - */ class AppendLogsIterator final : public LogIterator { public: - AppendLogsIterator(LogID firstLogId, TermID termId, RaftPart::LogCache logs) - : firstLogId_(firstLogId), termId_(termId), logId_(firstLogId), logs_(std::move(logs)) {} + AppendLogsIterator(LogID firstLogId, TermID termId, RaftPart::LogCache logs, OpProcessor opCB) + : firstLogId_(firstLogId), + termId_(termId), + logId_(firstLogId), + logs_(std::move(logs)), + opCB_(std::move(opCB)) { + leadByAtomicOp_ = processAtomicOp(); + valid_ = idx_ < logs_.size(); + hasNonAtomicOpLogs_ = !leadByAtomicOp_ && valid_; + if (valid_) { + currLogType_ = lastLogType_ = logType(); + } + } + AppendLogsIterator(const AppendLogsIterator&) = delete; AppendLogsIterator(AppendLogsIterator&&) = default; AppendLogsIterator& operator=(const AppendLogsIterator&) = delete; AppendLogsIterator& operator=(AppendLogsIterator&&) = default; - ~AppendLogsIterator() { - if (!logs_.empty()) { - size_t notFulfilledPromise = 0; - for (auto& log : logs_) { - auto& promiseRef = std::get<4>(log); - if (!promiseRef.isFulfilled()) { - ++notFulfilledPromise; - } - } - if (notFulfilledPromise > 0) { - LOG(FATAL) << "notFulfilledPromise == " << notFulfilledPromise; - } - } + bool leadByAtomicOp() const { + return leadByAtomicOp_; + } + + bool hasNonAtomicOpLogs() const { + return hasNonAtomicOpLogs_; } - void commit(nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED) { - for (auto it = logs_.begin(); it != logs_.end(); ++it) { - auto& promiseRef = std::get<4>(*it); - if (!promiseRef.isFulfilled()) { - DCHECK(!promiseRef.isFulfilled()); - promiseRef.setValue(code); + LogID firstLogId() const { + return firstLogId_; + } + + LogID lastLogId() const { + return firstLogId_ + logs_.size() - 1; + } + + // Return true if the current log is a AtomicOp, otherwise return false + bool processAtomicOp() { + while (idx_ < logs_.size()) { + auto& tup = logs_.at(idx_); + auto logType = std::get<1>(tup); + if (logType != LogType::ATOMIC_OP) { + // Not a AtomicOp + return false; + } + + // Process AtomicOp log + CHECK(!!opCB_); + opResult_ = opCB_(std::move(std::get<3>(tup))); + if (opResult_.has_value()) { + // AtomicOp Succeeded + return true; + } else { + // AtomicOp failed, move to the next log, but do not increment the + // logId_ + ++idx_; } } + + // Reached the end + return false; } LogIterator& operator++() override { ++idx_; ++logId_; + if (idx_ < logs_.size()) { + currLogType_ = logType(); + valid_ = currLogType_ != LogType::ATOMIC_OP; + if (valid_) { + hasNonAtomicOpLogs_ = true; + } + valid_ = valid_ && lastLogType_ != LogType::COMMAND; + lastLogType_ = currLogType_; + } else { + valid_ = false; + } return *this; } + // The iterator becomes invalid when exhausting the logs + // **OR** running into a AtomicOp log bool valid() const override { - return idx_ < logs_.size(); - } - - bool empty() const { - return logs_.empty(); - } - - LogID firstLogId() const { - return firstLogId_; + return valid_; } LogID logId() const override { @@ -142,7 +153,31 @@ class AppendLogsIterator final : public LogIterator { } folly::StringPiece logMsg() const override { - return std::get<2>(logs_.at(idx_)); + DCHECK(valid()); + if (currLogType_ == LogType::ATOMIC_OP) { + CHECK(opResult_.has_value()); + return opResult_.value(); + } else { + return std::get<2>(logs_.at(idx_)); + } + } + + // Return true when there is no more log left for processing + bool empty() const { + return idx_ >= logs_.size(); + } + + // Resume the iterator so that we can continue to process the remaining logs + void resume() { + CHECK(!valid_); + if (!empty()) { + leadByAtomicOp_ = processAtomicOp(); + valid_ = idx_ < logs_.size(); + hasNonAtomicOpLogs_ = !leadByAtomicOp_ && valid_; + if (valid_) { + currLogType_ = lastLogType_ = logType(); + } + } } LogType logType() const { @@ -151,176 +186,17 @@ class AppendLogsIterator final : public LogIterator { private: size_t idx_{0}; + bool leadByAtomicOp_{false}; + bool hasNonAtomicOpLogs_{false}; + bool valid_{true}; + LogType lastLogType_{LogType::NORMAL}; + LogType currLogType_{LogType::NORMAL}; + std::optional opResult_; LogID firstLogId_; TermID termId_; LogID logId_; RaftPart::LogCache logs_; -}; - -class AppendLogsIteratorFactory { - public: - AppendLogsIteratorFactory() = default; - static void make(RaftPart::LogCache& cacheLogs, RaftPart::LogCache& sendLogs) { - DCHECK(sendLogs.empty()); - std::unordered_set memLock; - std::list> ranges; - for (auto& log : cacheLogs) { - auto code = mergeAble(log, memLock, ranges); - if (code == MergeAbleCode::MERGE_BOTH) { - sendLogs.emplace_back(); - std::swap(cacheLogs.front(), sendLogs.back()); - cacheLogs.pop_front(); - continue; - } else if (code == MergeAbleCode::MERGE_PREV) { - sendLogs.emplace_back(); - std::swap(cacheLogs.front(), sendLogs.back()); - cacheLogs.pop_front(); - break; - } else if (code == MergeAbleCode::NO_MERGE) { - // if we meet some failed atomicOp, we can just skip it. - cacheLogs.pop_front(); - continue; - } else { // MERGE_NEXT - break; - } - } - } - - /** - * @brief check if a incoming log can be merged with previous logs - * - * @param logWrapper - */ - static MergeAbleCode mergeAble(RaftPart::LogCacheItem& logWrapper, - std::unordered_set& memLock, - std::list>& ranges) { - // log type: - switch (std::get<1>(logWrapper)) { - case LogType::NORMAL: { - std::vector updateSet; - auto& log = std::get<2>(logWrapper); - if (log.empty()) { - return MergeAbleCode::MERGE_BOTH; - } - decode(log, updateSet, ranges); - for (auto& key : updateSet) { - memLock.insert(key.str()); - } - return MergeAbleCode::MERGE_BOTH; - } - case LogType::COMMAND: { - return MergeAbleCode::MERGE_PREV; - } - case LogType::ATOMIC_OP: { - auto& atomOp = std::get<3>(logWrapper); - auto [code, result, read, write] = atomOp(); - if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - DLOG(INFO) << "===> OOPs, atomOp failed!!!, code = " - << apache::thrift::util::enumNameSafe(code); - auto& promiseRef = std::get<4>(logWrapper); - if (!promiseRef.isFulfilled()) { - DCHECK(!promiseRef.isFulfilled()); - promiseRef.setValue(code); - } - return MergeAbleCode::NO_MERGE; - } - std::get<2>(logWrapper) = std::move(result); - /** - * @brief We accept same read/write key in a one log, - * but reject if same in different logs. - */ - for (auto& key : read) { - auto cit = memLock.find(key); - // read after write is not acceptable. - if (cit != memLock.end()) { - return MergeAbleCode::MERGE_NEXT; - } - - // if we try to read a key, in any range - for (auto& it : ranges) { - auto* begin = it.first.c_str(); - auto* end = it.second.c_str(); - auto* pKey = key.c_str(); - if ((std::strcmp(begin, pKey) <= 0) && (std::strcmp(pKey, end) <= 0)) { - return MergeAbleCode::MERGE_NEXT; - } - } - } - - for (auto& key : write) { - // it doesn't matter if insert failed. (if write conflict, last write win) - memLock.insert(key); - } - return MergeAbleCode::MERGE_BOTH; - } - default: - LOG(ERROR) << "should not get here"; - } - return MergeAbleCode::NO_MERGE; - } - - static void decode(const std::string& log, - std::vector& updateSet, - std::list>& ranges) { - switch (log[sizeof(int64_t)]) { - case nebula::kvstore::OP_PUT: { - auto pieces = nebula::kvstore::decodeMultiValues(log); - updateSet.push_back(pieces[0]); - break; - } - case nebula::kvstore::OP_MULTI_PUT: { - auto kvs = nebula::kvstore::decodeMultiValues(log); - // Make the number of values are an even number - DCHECK_EQ((kvs.size() + 1) / 2, kvs.size() / 2); - for (size_t i = 0; i < kvs.size(); i += 2) { - updateSet.push_back(kvs[i]); - } - break; - } - case nebula::kvstore::OP_REMOVE: { - auto key = nebula::kvstore::decodeSingleValue(log); - updateSet.push_back(key); - break; - } - case nebula::kvstore::OP_MULTI_REMOVE: { - auto keys = nebula::kvstore::decodeMultiValues(log); - for (auto k : keys) { - updateSet.push_back(k); - } - break; - } - case nebula::kvstore::OP_REMOVE_RANGE: { - auto range = nebula::kvstore::decodeMultiValues(log); - auto item = std::make_pair(range[0].str(), range[1].str()); - ranges.emplace_back(std::move(item)); - break; - } - case nebula::kvstore::OP_BATCH_WRITE: { - auto data = nebula::kvstore::decodeBatchValue(log); - for (auto& op : data) { - if (op.first == nebula::kvstore::BatchLogType::OP_BATCH_PUT) { - updateSet.push_back(op.second.first); - } else if (op.first == nebula::kvstore::BatchLogType::OP_BATCH_REMOVE) { - updateSet.push_back(op.second.first); - } else if (op.first == nebula::kvstore::BatchLogType::OP_BATCH_REMOVE_RANGE) { - auto begin = op.second.first; - auto end = op.second.second; - ranges.emplace_back(std::make_pair(begin, end)); - } - } - break; - } - case nebula::kvstore::OP_ADD_PEER: - case nebula::kvstore::OP_ADD_LEARNER: - case nebula::kvstore::OP_TRANS_LEADER: - case nebula::kvstore::OP_REMOVE_PEER: { - break; - } - default: { - VLOG(3) << "Unknown operation: " << static_cast(log[0]); - } - } - } + OpProcessor opCB_; }; /******************************************************** @@ -371,6 +247,7 @@ RaftPart::RaftPart( return this->preProcessLog(logId, logTermId, logClusterId, log); }, diskMan); + logs_.reserve(FLAGS_max_batch_size); CHECK(!!executor_) << idStr_ << "Should not be nullptr"; } @@ -746,7 +623,7 @@ folly::Future RaftPart::appendAsync(ClusterID source, s return appendLogAsync(source, LogType::NORMAL, std::move(log)); } -folly::Future RaftPart::atomicOpAsync(kvstore::MergeableAtomicOp op) { +folly::Future RaftPart::atomicOpAsync(AtomicOp op) { return appendLogAsync(clusterId_, LogType::ATOMIC_OP, "", std::move(op)); } @@ -757,7 +634,7 @@ folly::Future RaftPart::sendCommandAsync(std::string lo folly::Future RaftPart::appendLogAsync(ClusterID source, LogType logType, std::string log, - kvstore::MergeableAtomicOp op) { + AtomicOp op) { if (blocking_) { // No need to block heartbeats and empty log. if ((logType == LogType::NORMAL && !log.empty()) || logType == LogType::ATOMIC_OP) { @@ -771,7 +648,7 @@ folly::Future RaftPart::appendLogAsync(ClusterID source if (bufferOverFlow_) { VLOG_EVERY_N(2, 1000) << idStr_ << "The appendLog buffer is full. Please slow down the log appending rate." - << "replicatingLogs_ :" << std::boolalpha << replicatingLogs_; + << "replicatingLogs_ :" << replicatingLogs_; return nebula::cpp2::ErrorCode::E_RAFT_BUFFER_OVERFLOW; } { @@ -782,7 +659,7 @@ folly::Future RaftPart::appendLogAsync(ClusterID source if (logs_.size() >= FLAGS_max_batch_size) { // Buffer is full VLOG(2) << idStr_ << "The appendLog buffer is full. Please slow down the log appending rate." - << "replicatingLogs_ :" << std::boolalpha << replicatingLogs_; + << "replicatingLogs_ :" << replicatingLogs_; bufferOverFlow_ = true; return nebula::cpp2::ErrorCode::E_RAFT_BUFFER_OVERFLOW; } @@ -791,14 +668,26 @@ folly::Future RaftPart::appendLogAsync(ClusterID source // Append new logs to the buffer DCHECK_GE(source, 0); - folly::Promise promise; - retFuture = promise.getFuture(); - logs_.emplace_back(source, logType, std::move(log), std::move(op), std::move(promise)); + logs_.emplace_back(source, logType, std::move(log), std::move(op)); + switch (logType) { + case LogType::ATOMIC_OP: + retFuture = cachingPromise_.getSingleFuture(); + break; + case LogType::COMMAND: + retFuture = cachingPromise_.getAndRollSharedFuture(); + break; + case LogType::NORMAL: + retFuture = cachingPromise_.getSharedFuture(); + break; + } bool expected = false; if (replicatingLogs_.compare_exchange_strong(expected, true)) { // We need to send logs to all followers VLOG(4) << idStr_ << "Preparing to send AppendLog request"; + sendingPromise_ = std::move(cachingPromise_); + cachingPromise_.reset(); + std::swap(swappedOutLogs, logs_); bufferOverFlow_ = false; } else { VLOG(4) << idStr_ << "Another AppendLogs request is ongoing, just return"; @@ -821,22 +710,26 @@ folly::Future RaftPart::appendLogAsync(ClusterID source if (!checkAppendLogResult(res)) { // Mosy likely failed because the partition is not leader VLOG_EVERY_N(2, 1000) << idStr_ << "Cannot append logs, clean the buffer"; - return nebula::cpp2::ErrorCode::E_LEADER_CHANGED; + return res; } // Replicate buffered logs to all followers // Replication will happen on a separate thread and will block // until majority accept the logs, the leadership changes, or // the partition stops - { - std::lock_guard lck(logsLock_); - AppendLogsIteratorFactory::make(logs_, sendingLogs_); - bufferOverFlow_ = false; - if (sendingLogs_.empty()) { - replicatingLogs_ = false; - return retFuture; - } - } - AppendLogsIterator it(firstId, termId, std::move(sendingLogs_)); + VLOG(4) << idStr_ << "Calling appendLogsInternal()"; + AppendLogsIterator it( + firstId, + termId, + std::move(swappedOutLogs), + [this](AtomicOp opCB) -> std::optional { + CHECK(opCB != nullptr); + auto opRet = opCB(); + if (!opRet.has_value()) { + // Failed + sendingPromise_.setOneSingleValue(nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED); + } + return opRet; + }); appendLogsInternal(std::move(it), termId); return retFuture; @@ -848,6 +741,14 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) { TermID prevLogTerm = 0; LogID committed = 0; LogID lastId = 0; + if (iter.valid()) { + VLOG(4) << idStr_ << "Ready to append logs from id " << iter.logId() << " (Current term is " + << currTerm << ")"; + } else { + VLOG(4) << idStr_ << "Only happened when Atomic op failed"; + replicatingLogs_ = false; + return; + } nebula::cpp2::ErrorCode res = nebula::cpp2::ErrorCode::SUCCEEDED; do { std::lock_guard g(raftLock_); @@ -877,7 +778,6 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) { } while (false); if (!checkAppendLogResult(res)) { - iter.commit(nebula::cpp2::ErrorCode::E_LEADER_CHANGED); return; } // Step 2: Replicate to followers @@ -910,7 +810,6 @@ void RaftPart::replicateLogs(folly::EventBase* eb, if (!checkAppendLogResult(res)) { VLOG(3) << idStr_ << "replicateLogs failed because of not leader or term changed"; - iter.commit(nebula::cpp2::ErrorCode::E_LEADER_CHANGED); return; } @@ -1008,7 +907,6 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, } } if (!checkAppendLogResult(res)) { - iter.commit(nebula::cpp2::ErrorCode::E_LEADER_CHANGED); return; } @@ -1016,6 +914,7 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, // Majority have succeeded VLOG(4) << idStr_ << numSucceeded << " hosts have accepted the logs"; + LogID firstLogId = 0; do { std::lock_guard g(raftLock_); res = canAppendLogs(currTerm); @@ -1032,7 +931,6 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, VLOG(3) << idStr_ << "processAppendLogResponses failed because of not leader " "or term changed"; - iter.commit(nebula::cpp2::ErrorCode::E_LEADER_CHANGED); return; } @@ -1053,6 +951,7 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, CHECK_EQ(lastLogId, lastCommitId); committedLogId_ = lastCommitId; committedLogTerm_ = lastCommitTerm; + firstLogId = lastLogId_ + 1; lastMsgAcceptedCostMs_ = lastMsgSentDur_.elapsedInMSec(); lastMsgAcceptedTime_ = time::WallClock::fastNowInMilliSec(); if (!commitInThisTerm_) { @@ -1067,32 +966,56 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, << lastLogId; } - // at this monment, we have confidence logs should be succeeded replicated - LogID firstId = 0; + // Step 4: Fulfill the promise + if (iter.hasNonAtomicOpLogs()) { + sendingPromise_.setOneSharedValue(nebula::cpp2::ErrorCode::SUCCEEDED); + } + if (iter.leadByAtomicOp()) { + sendingPromise_.setOneSingleValue(nebula::cpp2::ErrorCode::SUCCEEDED); + } + // Step 5: Check whether need to continue + // the log replication { std::lock_guard lck(logsLock_); CHECK(replicatingLogs_); - iter.commit(); - if (logs_.empty()) { - // no incoming during log replication - replicatingLogs_ = false; - VLOG(4) << idStr_ << "No more log to be replicated"; - return; - } else { - // we have some new coming logs during replication - // need to send them also - AppendLogsIteratorFactory::make(logs_, sendingLogs_); - bufferOverFlow_ = false; - if (sendingLogs_.empty()) { + // Continue to process the original AppendLogsIterator if necessary + iter.resume(); + // If no more valid logs to be replicated in iter, create a new one if we + // have new log + if (iter.empty()) { + VLOG(4) << idStr_ << "logs size " << logs_.size(); + if (logs_.size() > 0) { + // continue to replicate the logs + sendingPromise_ = std::move(cachingPromise_); + cachingPromise_.reset(); + iter = AppendLogsIterator(firstLogId, + currTerm, + std::move(logs_), + [this](AtomicOp op) -> std::optional { + auto opRet = op(); + if (!opRet.has_value()) { + // Failed + sendingPromise_.setOneSingleValue( + nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED); + } + return opRet; + }); + logs_.clear(); + bufferOverFlow_ = false; + } + // Reset replicatingLogs_ one of the following is true: + // 1. old iter is empty && logs_.size() == 0 + // 2. old iter is empty && logs_.size() > 0, but all logs in new iter is + // atomic op, + // and all of them failed, which would make iter is empty again + if (iter.empty()) { replicatingLogs_ = false; + VLOG(4) << idStr_ << "No more log to be replicated"; return; } - firstId = lastLogId_ + 1; } } - AppendLogsIterator it(firstId, currTerm, std::move(sendingLogs_)); - this->appendLogsInternal(std::move(it), currTerm); - return; + this->appendLogsInternal(std::move(iter), currTerm); } else { // Not enough hosts accepted the log, re-try VLOG_EVERY_N(2, 1000) << idStr_ << "Only " << numSucceeded @@ -2106,19 +2029,11 @@ bool RaftPart::checkAppendLogResult(nebula::cpp2::ErrorCode res) { if (res != nebula::cpp2::ErrorCode::SUCCEEDED) { { std::lock_guard lck(logsLock_); - auto setPromiseForLogs = [&](LogCache& logs) { - for (auto& log : logs) { - auto& promiseRef = std::get<4>(log); - if (!promiseRef.isFulfilled()) { - promiseRef.setValue(res); - } - } - }; - setPromiseForLogs(logs_); - setPromiseForLogs(sendingLogs_); logs_.clear(); - sendingLogs_.clear(); + cachingPromise_.setValue(res); + cachingPromise_.reset(); bufferOverFlow_ = false; + sendingPromise_.setValue(res); replicatingLogs_ = false; } return false; diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index 83b097e454b..96be8fb58a6 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -65,10 +65,10 @@ class AppendLogsIterator; * should be applied atomically. You could implement CAS, READ-MODIFY-WRITE * operations though it. * */ -using AtomicOp = folly::Function(void)>; +using AtomicOp = folly::Function(void)>; + class RaftPart : public std::enable_shared_from_this { friend class AppendLogsIterator; - friend class AppendLogsIteratorFactory; friend class Host; friend class SnapshotManager; FRIEND_TEST(MemberChangeTest, AddRemovePeerTest); @@ -273,7 +273,7 @@ class RaftPart : public std::enable_shared_from_this { * @param op Atomic operation, will output a log if succeed * @return folly::Future */ - folly::Future atomicOpAsync(kvstore::MergeableAtomicOp op); + folly::Future atomicOpAsync(AtomicOp op); /** * @brief Send a log of COMMAND type @@ -565,13 +565,8 @@ class RaftPart : public std::enable_shared_from_this { using AppendLogResponses = std::vector>; using HeartbeatResponses = std::vector>; - // - using LogCacheItem = std::tuple>; - using LogCache = std::deque; + // + using LogCache = std::vector>; /**************************************************** * @@ -719,7 +714,7 @@ class RaftPart : public std::enable_shared_from_this { folly::Future appendLogAsync(ClusterID source, LogType logType, std::string log, - kvstore::MergeableAtomicOp cb = nullptr); + AtomicOp cb = nullptr); /** * @brief Append the logs in iterator @@ -792,6 +787,116 @@ class RaftPart : public std::enable_shared_from_this { void updateQuorum(); protected: + template + class PromiseSet final { + public: + PromiseSet() = default; + PromiseSet(const PromiseSet&) = delete; + PromiseSet(PromiseSet&&) = default; + + ~PromiseSet() = default; + + PromiseSet& operator=(const PromiseSet&) = delete; + PromiseSet& operator=(PromiseSet&& right) = default; + + /** + * @brief Clean all promises + */ + void reset() { + sharedPromises_.clear(); + singlePromises_.clear(); + rollSharedPromise_ = true; + } + + /** + * @brief Used for NORMAL raft log + * + * @return folly::Future + */ + folly::Future getSharedFuture() { + if (rollSharedPromise_) { + sharedPromises_.emplace_back(); + rollSharedPromise_ = false; + } + + return sharedPromises_.back().getFuture(); + } + + /** + * @brief Used for ATOMIC_OP raft log + * + * @return folly::Future + */ + folly::Future getSingleFuture() { + singlePromises_.emplace_back(); + rollSharedPromise_ = true; + + return singlePromises_.back().getFuture(); + } + + /** + * @brief Used for COMMAND raft log + * + * @return folly::Future + */ + folly::Future getAndRollSharedFuture() { + if (rollSharedPromise_) { + sharedPromises_.emplace_back(); + } + rollSharedPromise_ = true; + return sharedPromises_.back().getFuture(); + } + + /** + * @brief Set shared promise + * + * @tparam VT + * @param val + */ + template + void setOneSharedValue(VT&& val) { + CHECK(!sharedPromises_.empty()); + sharedPromises_.front().setValue(std::forward(val)); + sharedPromises_.pop_front(); + } + + /** + * @brief Set single promise + * + * @tparam VT + * @param val + */ + template + void setOneSingleValue(VT&& val) { + CHECK(!singlePromises_.empty()); + singlePromises_.front().setValue(std::forward(val)); + singlePromises_.pop_front(); + } + + /** + * @brief Set all promises to result, usually a failed result + * + * @param val + */ + void setValue(ValueType val) { + for (auto& p : sharedPromises_) { + p.setValue(val); + } + for (auto& p : singlePromises_) { + p.setValue(val); + } + } + + private: + // Whether the last future was returned from a shared promise + bool rollSharedPromise_{true}; + + // Promises shared by continuous non atomic op logs + std::list> sharedPromises_; + // A list of promises for atomic op logs + std::list> singlePromises_; + }; + const std::string idStr_; const ClusterID clusterId_; @@ -809,12 +914,14 @@ class RaftPart : public std::enable_shared_from_this { mutable std::mutex logsLock_; std::atomic_bool replicatingLogs_{false}; std::atomic_bool bufferOverFlow_{false}; + PromiseSet cachingPromise_; LogCache logs_; - LogCache sendingLogs_; // Partition level lock to synchronize the access of the partition mutable std::mutex raftLock_; + PromiseSet sendingPromise_; + Status status_; Role role_; diff --git a/src/kvstore/raftex/test/LogCASTest.cpp b/src/kvstore/raftex/test/LogCASTest.cpp index 23e65db2391..78c706912e8 100644 --- a/src/kvstore/raftex/test/LogCASTest.cpp +++ b/src/kvstore/raftex/test/LogCASTest.cpp @@ -29,12 +29,7 @@ TEST_F(LogCASTest, StartWithValidCAS) { // Append logs LOG(INFO) << "=====> Start appending logs"; std::vector msgs; - leader_->atomicOpAsync([] { - kvstore::MergeableAtomicOpResult ret; - ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; - ret.batch = "CAS Log Message"; - return ret; - }); + leader_->atomicOpAsync([]() { return test::compareAndSet("TCAS Log Message"); }); msgs.emplace_back("CAS Log Message"); appendLogs(1, 9, leader_, msgs); LOG(INFO) << "<===== Finish appending logs"; @@ -46,11 +41,7 @@ TEST_F(LogCASTest, StartWithInvalidCAS) { // Append logs LOG(INFO) << "=====> Start appending logs"; std::vector msgs; - leader_->atomicOpAsync([] { - kvstore::MergeableAtomicOpResult ret; - ret.code = ::nebula::cpp2::ErrorCode::E_RPC_FAILURE; - return ret; - }); + leader_->atomicOpAsync([]() { return test::compareAndSet("FCAS Log Message"); }); appendLogs(0, 9, leader_, msgs); LOG(INFO) << "<===== Finish appending logs"; @@ -63,12 +54,7 @@ TEST_F(LogCASTest, ValidCASInMiddle) { std::vector msgs; appendLogs(0, 4, leader_, msgs); - leader_->atomicOpAsync([] { - kvstore::MergeableAtomicOpResult ret; - ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; - ret.batch = "CAS Log Message"; - return ret; - }); + leader_->atomicOpAsync([]() { return test::compareAndSet("TCAS Log Message"); }); msgs.emplace_back("CAS Log Message"); appendLogs(6, 9, leader_, msgs); @@ -83,11 +69,7 @@ TEST_F(LogCASTest, InvalidCASInMiddle) { std::vector msgs; appendLogs(0, 4, leader_, msgs); - leader_->atomicOpAsync([] { - kvstore::MergeableAtomicOpResult ret; - ret.code = ::nebula::cpp2::ErrorCode::E_RPC_FAILURE; - return ret; - }); + leader_->atomicOpAsync([]() { return test::compareAndSet("FCAS Log Message"); }); appendLogs(5, 9, leader_, msgs); LOG(INFO) << "<===== Finish appending logs"; @@ -101,21 +83,10 @@ TEST_F(LogCASTest, EndWithValidCAS) { std::vector msgs; appendLogs(0, 7, leader_, msgs); - // leader_->atomicOpAsync([]() { return test::compareAndSet("TCAS Log Message"); }); - leader_->atomicOpAsync([] { - kvstore::MergeableAtomicOpResult ret; - ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; - ret.batch = "CAS Log Message"; - return ret; - }); + leader_->atomicOpAsync([]() { return test::compareAndSet("TCAS Log Message"); }); msgs.emplace_back("CAS Log Message"); - auto fut = leader_->atomicOpAsync([] { - kvstore::MergeableAtomicOpResult ret; - ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; - ret.batch = "CAS Log Message"; - return ret; - }); + auto fut = leader_->atomicOpAsync([]() { return test::compareAndSet("TCAS Log Message"); }); msgs.emplace_back("CAS Log Message"); fut.wait(); LOG(INFO) << "<===== Finish appending logs"; @@ -129,12 +100,8 @@ TEST_F(LogCASTest, EndWithInvalidCAS) { std::vector msgs; appendLogs(0, 7, leader_, msgs); - auto fut = leader_->atomicOpAsync([] { - kvstore::MergeableAtomicOpResult ret; - ret.code = ::nebula::cpp2::ErrorCode::E_RPC_FAILURE; - return ret; - }); - + leader_->atomicOpAsync([]() { return test::compareAndSet("FCAS Log Message"); }); + auto fut = leader_->atomicOpAsync([]() { return test::compareAndSet("FCAS Log Message"); }); fut.wait(); LOG(INFO) << "<===== Finish appending logs"; @@ -146,12 +113,7 @@ TEST_F(LogCASTest, AllValidCAS) { LOG(INFO) << "=====> Start appending logs"; std::vector msgs; for (int i = 1; i <= 10; ++i) { - auto fut = leader_->atomicOpAsync([] { - kvstore::MergeableAtomicOpResult ret; - ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; - ret.batch = "Test CAS Log"; - return ret; - }); + auto fut = leader_->atomicOpAsync([]() { return test::compareAndSet("TTest CAS Log"); }); msgs.emplace_back("Test CAS Log"); if (i == 10) { fut.wait(); @@ -167,14 +129,7 @@ TEST_F(LogCASTest, AllInvalidCAS) { LOG(INFO) << "=====> Start appending logs"; std::vector msgs; for (int i = 1; i <= 10; ++i) { - // auto fut = leader_->atomicOpAsync([]() { return test::compareAndSet("FCAS Log"); }); - - auto fut = leader_->atomicOpAsync([] { - kvstore::MergeableAtomicOpResult ret; - ret.code = ::nebula::cpp2::ErrorCode::E_RPC_FAILURE; - return ret; - }); - + auto fut = leader_->atomicOpAsync([]() { return test::compareAndSet("FCAS Log"); }); if (i == 10) { fut.wait(); } @@ -202,15 +157,8 @@ TEST_F(LogCASTest, OnlyOneCasSucceed) { } else { log = "FCAS Log " + std::to_string(i); } - auto fut = leader_->atomicOpAsync([=] { - kvstore::MergeableAtomicOpResult ret; - ret.code = ::nebula::cpp2::ErrorCode::E_RPC_FAILURE; - if (i == 1) { - ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; - ret.batch = msgs.back(); - } - return ret; - }); + auto fut = leader_->atomicOpAsync( + [log = std::move(log)]() mutable { return test::compareAndSet(log); }); if (i == 10) { fut.wait(); } @@ -239,15 +187,8 @@ TEST_F(LogCASTest, ZipCasTest) { } else { log = "FCAS Log " + std::to_string(i); } - auto fut = leader_->atomicOpAsync([=] { - kvstore::MergeableAtomicOpResult ret; - ret.code = ::nebula::cpp2::ErrorCode::E_RPC_FAILURE; - if (i % 2) { - ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; - ret.batch = msgs.back(); - } - return ret; - }); + auto fut = leader_->atomicOpAsync( + [log = std::move(log)]() mutable { return test::compareAndSet(log); }); if (i == 10) { fut.wait(); } @@ -268,13 +209,7 @@ TEST_F(LogCASTest, EmptyTest) { { LOG(INFO) << "return empty string for atomic operation!"; folly::Baton<> baton; - leader_ - ->atomicOpAsync([]() mutable { - kvstore::MergeableAtomicOpResult ret; - ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; - // ret.batch = log; - return ret; - }) + leader_->atomicOpAsync([log = std::move(log)]() mutable { return std::string(""); }) .thenValue([&baton](nebula::cpp2::ErrorCode res) { ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, res); baton.post(); @@ -284,14 +219,7 @@ TEST_F(LogCASTest, EmptyTest) { { LOG(INFO) << "return none string for atomic operation!"; folly::Baton<> baton; - // leader_->atomicOpAsync([log = std::move(log)]() mutable { return folly::none; }) - leader_ - ->atomicOpAsync([]() mutable { - kvstore::MergeableAtomicOpResult ret; - ret.code = ::nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED; - // ret.batch = log; - return ret; - }) + leader_->atomicOpAsync([log = std::move(log)]() mutable { return std::nullopt; }) .thenValue([&baton](nebula::cpp2::ErrorCode res) { ASSERT_EQ(nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED, res); baton.post(); diff --git a/src/kvstore/raftex/test/LogCommandTest.cpp b/src/kvstore/raftex/test/LogCommandTest.cpp index 7023cc523bf..c1d3a7c6982 100644 --- a/src/kvstore/raftex/test/LogCommandTest.cpp +++ b/src/kvstore/raftex/test/LogCommandTest.cpp @@ -48,8 +48,7 @@ TEST_F(LogCommandTest, CommandInMiddle) { leader_->sendCommandAsync("Command Log Message"); msgs.emplace_back("Command Log Message"); - bool waitLastLog = true; - appendLogs(6, 9, leader_, msgs, waitLastLog); + appendLogs(6, 9, leader_, msgs, true); LOG(INFO) << "<===== Finish appending logs"; ASSERT_EQ(3, leader_->commitTimes_); @@ -98,43 +97,16 @@ TEST_F(LogCommandTest, MixedLogs) { leader_->sendCommandAsync("Command log 1"); msgs.emplace_back("Command log 1"); - kvstore::MergeableAtomicOp op1 = []() { - kvstore::MergeableAtomicOpResult ret; - ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; - auto optStr = test::compareAndSet("TCAS Log Message 2"); - if (optStr) { - ret.batch = *optStr; - } - return ret; - }; - leader_->atomicOpAsync(std::move(op1)); + leader_->atomicOpAsync([]() { return test::compareAndSet("TCAS Log Message 2"); }); msgs.emplace_back("CAS Log Message 2"); leader_->appendAsync(0, "Normal log Message 3"); msgs.emplace_back("Normal log Message 3"); - kvstore::MergeableAtomicOp op2 = []() { - kvstore::MergeableAtomicOpResult ret; - ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; - auto optStr = test::compareAndSet("TCAS Log Message 4"); - if (optStr) { - ret.batch = *optStr; - } - return ret; - }; - leader_->atomicOpAsync(std::move(op2)); + leader_->atomicOpAsync([]() { return test::compareAndSet("TCAS Log Message 4"); }); msgs.emplace_back("CAS Log Message 4"); - kvstore::MergeableAtomicOp op3 = []() { - kvstore::MergeableAtomicOpResult ret; - ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; - auto optStr = test::compareAndSet("TCAS Log Message 5"); - if (optStr) { - ret.batch = *optStr; - } - return ret; - }; - leader_->atomicOpAsync(std::move(op3)); + leader_->atomicOpAsync([]() { return test::compareAndSet("TCAS Log Message 5"); }); msgs.emplace_back("CAS Log Message 5"); leader_->sendCommandAsync("Command log 6"); @@ -146,16 +118,7 @@ TEST_F(LogCommandTest, MixedLogs) { leader_->appendAsync(0, "Normal log Message 8"); msgs.emplace_back("Normal log Message 8"); - kvstore::MergeableAtomicOp op4 = []() { - kvstore::MergeableAtomicOpResult ret; - ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; - auto optStr = test::compareAndSet("FCAS Log Message"); - if (optStr) { - ret.batch = *optStr; - } - return ret; - }; - leader_->atomicOpAsync(std::move(op4)); + leader_->atomicOpAsync([]() { return test::compareAndSet("FCAS Log Message"); }); leader_->sendCommandAsync("Command log 9"); msgs.emplace_back("Command log 9"); @@ -163,23 +126,12 @@ TEST_F(LogCommandTest, MixedLogs) { auto f = leader_->appendAsync(0, "Normal log Message 10"); msgs.emplace_back("Normal log Message 10"); - kvstore::MergeableAtomicOp op5 = []() { - kvstore::MergeableAtomicOpResult ret; - ret.code = ::nebula::cpp2::ErrorCode::SUCCEEDED; - auto optStr = test::compareAndSet("FCAS Log Message"); - if (optStr) { - ret.batch = *optStr; - } - return ret; - }; - leader_->atomicOpAsync(std::move(op5)); - // leader_->atomicOpAsync([]() { return test::compareAndSet("FCAS Log Message"); }); + leader_->atomicOpAsync([]() { return test::compareAndSet("FCAS Log Message"); }); f.wait(); LOG(INFO) << "<===== Finish appending logs"; - // previous is 8 - ASSERT_EQ(5, leader_->commitTimes_); + ASSERT_EQ(8, leader_->commitTimes_); checkConsensus(copies_, 0, 9, msgs); } diff --git a/src/kvstore/raftex/test/RaftexTestBase.cpp b/src/kvstore/raftex/test/RaftexTestBase.cpp index 1c426d2bd4a..ca59ccaeefb 100644 --- a/src/kvstore/raftex/test/RaftexTestBase.cpp +++ b/src/kvstore/raftex/test/RaftexTestBase.cpp @@ -265,7 +265,6 @@ void appendLogs(int start, // Append 100 logs LOG(INFO) << "=====> Start appending logs from index " << start << " to " << end; for (int i = start; i <= end; ++i) { - LOG(INFO) << "=====> i = " << i; msgs.emplace_back(folly::stringPrintf("Test Log Message %03d", i)); auto fut = leader->appendAsync(0, msgs.back()); if (i == end && waitLastLog) { @@ -279,7 +278,6 @@ bool checkConsensus(std::vector>& copies, size_t start, size_t end, std::vector& msgs) { - LOG(INFO) << "checkConsensus()"; int32_t count = 0; for (; count < 3; count++) { bool consensus = true; @@ -289,14 +287,10 @@ bool checkConsensus(std::vector>& copies, // Check every copy for (auto& c : copies) { if (c != nullptr && c->isRunning()) { - LOG(INFO) << "====> checkConsensus(), msgs.size() " << msgs.size() << ", c->getNumLogs() " - << c->getNumLogs(); if (msgs.size() != c->getNumLogs() || !checkLog(c, start, end, msgs)) { consensus = false; break; } - } else { - LOG(INFO) << "====> checkConsensus(), c == nullptr || !c->isRunning()"; } } if (consensus == true) { diff --git a/src/kvstore/raftex/test/TestShard.cpp b/src/kvstore/raftex/test/TestShard.cpp index c6c6327e603..b7009423271 100644 --- a/src/kvstore/raftex/test/TestShard.cpp +++ b/src/kvstore/raftex/test/TestShard.cpp @@ -169,7 +169,6 @@ std::tuple TestShard::commitLogs( std::unique_ptr iter, bool wait, bool needLock) { UNUSED(wait); LogID lastId = kNoCommitLogId; - // LOG(INFO) << "TestShard::commitLogs() lastId = " << lastId; TermID lastTerm = kNoCommitLogTerm; int32_t commitLogsNum = 0; while (iter->valid()) { @@ -207,13 +206,10 @@ std::tuple TestShard::commitLogs( } VLOG(2) << "TestShard: " << idStr_ << "Committed log " << " up to " << lastId; - LOG(INFO) << "TestShard: " << idStr_ << "Committed log " - << " up to " << lastId; if (lastId > -1) { lastCommittedLogId_ = lastId; } if (commitLogsNum > 0) { - // LOG(INFO) << "====>> commitTimes_++"; commitTimes_++; } return {nebula::cpp2::ErrorCode::SUCCEEDED, lastId, lastTerm}; diff --git a/src/kvstore/test/NebulaStoreTest.cpp b/src/kvstore/test/NebulaStoreTest.cpp index 9d195f6e422..08fbddd9243 100644 --- a/src/kvstore/test/NebulaStoreTest.cpp +++ b/src/kvstore/test/NebulaStoreTest.cpp @@ -632,7 +632,7 @@ TEST(NebulaStoreTest, TransLeaderTest) { part->asyncTransferLeader(targetAddr, [&](nebula::cpp2::ErrorCode) { baton.post(); }); baton.wait(); } - sleep(FLAGS_raft_heartbeat_interval_secs * 2); + sleep(FLAGS_raft_heartbeat_interval_secs); { nebula::meta::ActiveHostsMan::AllLeaders leaderIds; ASSERT_EQ(3, stores[0]->allLeader(leaderIds)); @@ -652,7 +652,7 @@ TEST(NebulaStoreTest, TransLeaderTest) { part->asyncTransferLeader(targetAddr, [&](nebula::cpp2::ErrorCode) { baton.post(); }); baton.wait(); } - sleep(FLAGS_raft_heartbeat_interval_secs * 2); + sleep(FLAGS_raft_heartbeat_interval_secs); for (int i = 0; i < replicas; i++) { nebula::meta::ActiveHostsMan::AllLeaders leaderIds; ASSERT_EQ(1UL, stores[i]->allLeader(leaderIds)); @@ -962,9 +962,7 @@ TEST(NebulaStoreTest, ReadSnapshotTest) { // put kv { std::vector> expected, result; - - auto atomic = [&] { - kvstore::MergeableAtomicOpResult ret; + auto atomic = [&]() -> std::string { std::unique_ptr batchHolder = std::make_unique(); for (auto i = 0; i < 20; i++) { auto key = folly::stringPrintf("key_%d", i); @@ -972,9 +970,7 @@ TEST(NebulaStoreTest, ReadSnapshotTest) { batchHolder->put(key.data(), val.data()); expected.emplace_back(std::move(key), std::move(val)); } - ret.code = nebula::cpp2::ErrorCode::SUCCEEDED; - ret.batch = encodeBatchValue(batchHolder->getBatch()); - return ret; + return encodeBatchValue(batchHolder->getBatch()); }; folly::Baton baton; @@ -1038,9 +1034,7 @@ TEST(NebulaStoreTest, AtomicOpBatchTest) { // put kv { std::vector> expected, result; - - auto atomic = [&] { - kvstore::MergeableAtomicOpResult ret; + auto atomic = [&]() -> std::string { std::unique_ptr batchHolder = std::make_unique(); for (auto i = 0; i < 20; i++) { auto key = folly::stringPrintf("key_%d", i); @@ -1048,9 +1042,7 @@ TEST(NebulaStoreTest, AtomicOpBatchTest) { batchHolder->put(key.data(), val.data()); expected.emplace_back(std::move(key), std::move(val)); } - ret.code = nebula::cpp2::ErrorCode::SUCCEEDED; - ret.batch = encodeBatchValue(batchHolder->getBatch()); - return ret; + return encodeBatchValue(batchHolder->getBatch()); }; folly::Baton baton; @@ -1074,9 +1066,7 @@ TEST(NebulaStoreTest, AtomicOpBatchTest) { // put and remove { std::vector> expected, result; - - auto atomic = [&] { - kvstore::MergeableAtomicOpResult ret; + auto atomic = [&]() -> std::string { std::unique_ptr batchHolder = std::make_unique(); for (auto i = 0; i < 20; i++) { auto key = folly::stringPrintf("key_%d", i); @@ -1089,10 +1079,9 @@ TEST(NebulaStoreTest, AtomicOpBatchTest) { for (auto i = 0; i < 20; i = i + 5) { batchHolder->remove(folly::stringPrintf("key_%d", i)); } - ret.code = nebula::cpp2::ErrorCode::SUCCEEDED; - ret.batch = encodeBatchValue(batchHolder->getBatch()); - return ret; + return encodeBatchValue(batchHolder->getBatch()); }; + folly::Baton baton; auto callback = [&](nebula::cpp2::ErrorCode code) { EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); diff --git a/src/storage/mutate/AddEdgesProcessor.cpp b/src/storage/mutate/AddEdgesProcessor.cpp index ca739fdc5af..8a68b25ac18 100644 --- a/src/storage/mutate/AddEdgesProcessor.cpp +++ b/src/storage/mutate/AddEdgesProcessor.cpp @@ -152,18 +152,36 @@ void AddEdgesProcessor::doProcess(const cpp2::AddEdgesRequest& req) { void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) { const auto& partEdges = req.get_parts(); const auto& propNames = req.get_prop_names(); - for (const auto& part : partEdges) { + for (auto& part : partEdges) { + IndexCountWrapper wrapper(env_); + std::unique_ptr batchHolder = std::make_unique(); auto partId = part.first; - const auto& edges = part.second; - // cache edgeKey - std::unordered_set visited; - visited.reserve(edges.size()); - std::vector kvs; - kvs.reserve(edges.size()); + const auto& newEdges = part.second; + std::vector dummyLock; + dummyLock.reserve(newEdges.size()); auto code = nebula::cpp2::ErrorCode::SUCCEEDED; - deleteDupEdge(const_cast&>(edges)); - for (const auto& edge : edges) { - auto edgeKey = *edge.key_ref(); + + deleteDupEdge(const_cast&>(newEdges)); + for (auto& newEdge : newEdges) { + auto edgeKey = *newEdge.key_ref(); + auto l = std::make_tuple(spaceId_, + partId, + edgeKey.src_ref()->getStr(), + *edgeKey.edge_type_ref(), + *edgeKey.ranking_ref(), + edgeKey.dst_ref()->getStr()); + if (std::find(dummyLock.begin(), dummyLock.end(), l) == dummyLock.end()) { + if (!env_->edgesML_->try_lock(l)) { + LOG(ERROR) << folly::sformat("edge locked : src {}, type {}, rank {}, dst {}", + edgeKey.src_ref()->getStr(), + *edgeKey.edge_type_ref(), + *edgeKey.ranking_ref(), + edgeKey.dst_ref()->getStr()); + code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; + break; + } + dummyLock.emplace_back(std::move(l)); + } VLOG(3) << "PartitionID: " << partId << ", VertexID: " << *edgeKey.src_ref() << ", EdgeType: " << *edgeKey.edge_type_ref() << ", EdgeRanking: " << *edgeKey.ranking_ref() @@ -173,12 +191,17 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) { spaceVidLen_, edgeKey.src_ref()->getStr(), edgeKey.dst_ref()->getStr())) { LOG(ERROR) << "Space " << spaceId_ << " vertex length invalid, " << "space vid len: " << spaceVidLen_ << ", edge srcVid: " << *edgeKey.src_ref() - << ", dstVid: " << *edgeKey.dst_ref() << ", ifNotExists_: " << std::boolalpha - << ifNotExists_; + << ", dstVid: " << *edgeKey.dst_ref(); code = nebula::cpp2::ErrorCode::E_INVALID_VID; break; } + auto key = NebulaKeyUtils::edgeKey(spaceVidLen_, + partId, + edgeKey.src_ref()->getStr(), + *edgeKey.edge_type_ref(), + *edgeKey.ranking_ref(), + edgeKey.dst_ref()->getStr()); auto schema = env_->schemaMan_->getEdgeSchema(spaceId_, std::abs(*edgeKey.edge_type_ref())); if (!schema) { LOG(ERROR) << "Space " << spaceId_ << ", Edge " << *edgeKey.edge_type_ref() << " invalid"; @@ -186,159 +209,239 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) { break; } - auto key = NebulaKeyUtils::edgeKey(spaceVidLen_, - partId, - edgeKey.src_ref()->getStr(), - *edgeKey.edge_type_ref(), - *edgeKey.ranking_ref(), - (*edgeKey.dst_ref()).getStr()); - if (ifNotExists_ && !visited.emplace(key).second) { - LOG(INFO) << "skip " << edgeKey.src_ref()->getStr(); - continue; + auto props = newEdge.get_props(); + WriteResult wRet; + auto retEnc = encodeRowVal(schema.get(), propNames, props, wRet); + if (!retEnc.ok()) { + LOG(ERROR) << retEnc.status(); + code = writeResultTo(wRet, true); + break; } - - // collect values - WriteResult writeResult; - const auto& props = edge.get_props(); - auto encode = encodeRowVal(schema.get(), propNames, props, writeResult); - if (!encode.ok()) { - LOG(ERROR) << encode.status(); - code = writeResultTo(writeResult, true); + if (*edgeKey.edge_type_ref() > 0) { + std::string oldVal; + RowReaderWrapper nReader; + RowReaderWrapper oReader; + if (!ignoreExistedIndex_) { + auto obsIdx = findOldValue(partId, key); + if (nebula::ok(obsIdx)) { + // already exists in kvstore + if (ifNotExists_ && !nebula::value(obsIdx).empty()) { + continue; + } + if (!nebula::value(obsIdx).empty()) { + oldVal = std::move(value(obsIdx)); + oReader = RowReaderWrapper::getEdgePropReader( + env_->schemaMan_, spaceId_, *edgeKey.edge_type_ref(), oldVal); + } + } else { + code = nebula::error(obsIdx); + break; + } + } + if (!retEnc.value().empty()) { + nReader = RowReaderWrapper::getEdgePropReader( + env_->schemaMan_, spaceId_, *edgeKey.edge_type_ref(), retEnc.value()); + } + for (auto& index : indexes_) { + if (*edgeKey.edge_type_ref() == index->get_schema_id().get_edge_type()) { + /* + * step 1 , Delete old version index if exists. + */ + if (oReader != nullptr) { + auto ois = indexKeys(partId, oReader.get(), key, index, schema.get()); + if (!ois.empty()) { + // Check the index is building for the specified partition or not. + auto indexState = env_->getIndexState(spaceId_, partId); + if (env_->checkRebuilding(indexState)) { + auto delOpKey = OperationKeyUtils::deleteOperationKey(partId); + for (auto& oi : ois) { + batchHolder->put(std::string(delOpKey), std::move(oi)); + } + } else if (env_->checkIndexLocked(indexState)) { + LOG(ERROR) << "The index has been locked: " << index->get_index_name(); + code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; + break; + } else { + for (auto& oi : ois) { + batchHolder->remove(std::move(oi)); + } + } + } + } + /* + * step 2 , Insert new edge index + */ + if (nReader != nullptr) { + auto niks = indexKeys(partId, nReader.get(), key, index, schema.get()); + if (!niks.empty()) { + auto v = CommonUtils::ttlValue(schema.get(), nReader.get()); + auto niv = v.ok() ? IndexKeyUtils::indexVal(std::move(v).value()) : ""; + // Check the index is building for the specified partition or not. + auto indexState = env_->getIndexState(spaceId_, partId); + if (env_->checkRebuilding(indexState)) { + for (auto& nik : niks) { + auto opKey = OperationKeyUtils::modifyOperationKey(partId, std::move(nik)); + batchHolder->put(std::move(opKey), std::string(niv)); + } + } else if (env_->checkIndexLocked(indexState)) { + LOG(ERROR) << "The index has been locked: " << index->get_index_name(); + code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; + break; + } else { + for (auto& nik : niks) { + batchHolder->put(std::move(nik), std::string(niv)); + } + } + } + } + } + } + } + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { break; } - kvs.emplace_back(std::move(key), std::move(encode.value())); + batchHolder->put(std::move(key), std::move(retEnc.value())); + stats::StatsManager::addValue(kNumEdgesInserted); } - - auto atomicOp = - [partId, data = std::move(kvs), this]() mutable -> kvstore::MergeableAtomicOpResult { - return addEdgesWithIndex(partId, std::move(data)); - }; - - auto cb = [partId, this](nebula::cpp2::ErrorCode ec) { handleAsync(spaceId_, partId, ec); }; - if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + env_->edgesML_->unlockBatch(dummyLock); handleAsync(spaceId_, partId, code); - } else { - env_->kvstore_->asyncAtomicOp(spaceId_, partId, std::move(atomicOp), std::move(cb)); + continue; + } + if (consistOp_) { + (*consistOp_)(*batchHolder, nullptr); } + auto batch = encodeBatchValue(batchHolder->getBatch()); + DCHECK(!batch.empty()); + nebula::MemoryLockGuard lg(env_->edgesML_.get(), std::move(dummyLock), false, false); + env_->kvstore_->asyncAppendBatch(spaceId_, + partId, + std::move(batch), + [l = std::move(lg), icw = std::move(wrapper), partId, this]( + nebula::cpp2::ErrorCode retCode) { + UNUSED(l); + UNUSED(icw); + handleAsync(spaceId_, partId, retCode); + }); } } -kvstore::MergeableAtomicOpResult AddEdgesProcessor::addEdgesWithIndex( - PartitionID partId, std::vector&& data) { - kvstore::MergeableAtomicOpResult ret; - ret.code = nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED; +ErrorOr AddEdgesProcessor::addEdges( + PartitionID partId, const std::vector& edges) { IndexCountWrapper wrapper(env_); std::unique_ptr batchHolder = std::make_unique(); - for (auto& [key, value] : data) { - auto edgeType = NebulaKeyUtils::getEdgeType(spaceVidLen_, key); - RowReaderWrapper oldReader; - RowReaderWrapper newReader = - RowReaderWrapper::getEdgePropReader(env_->schemaMan_, spaceId_, std::abs(edgeType), value); + + /* + * Define the map newEdges to avoid inserting duplicate edge. + * This map means : + * map , + * -- edge_unique_key is only used as the unique key , for example: + * insert below edges in the same request: + * kv(part1_src1_edgeType1_rank1_dst1 , v1) + * kv(part1_src1_edgeType1_rank1_dst1 , v2) + * kv(part1_src1_edgeType1_rank1_dst1 , v3) + * kv(part1_src1_edgeType1_rank1_dst1 , v4) + * + * Ultimately, kv(part1_src1_edgeType1_rank1_dst1 , v4) . It's just what I need. + */ + std::unordered_map newEdges; + std::for_each( + edges.begin(), edges.end(), [&newEdges](const auto& e) { newEdges[e.first] = e.second; }); + + for (auto& e : newEdges) { + std::string val; + RowReaderWrapper oReader; + RowReaderWrapper nReader; + auto edgeType = NebulaKeyUtils::getEdgeType(spaceVidLen_, e.first); auto schema = env_->schemaMan_->getEdgeSchema(spaceId_, std::abs(edgeType)); if (!schema) { - return ret; + LOG(ERROR) << "Space " << spaceId_ << ", Edge " << edgeType << " invalid"; + return nebula::cpp2::ErrorCode::E_EDGE_NOT_FOUND; } - - // only out-edge need to handle index - if (edgeType > 0) { - std::string oldVal; - if (!ignoreExistedIndex_) { - // read the old key value and initialize row reader if exists - auto result = findOldValue(partId, key); - if (nebula::ok(result)) { - if (ifNotExists_ && !nebula::value(result).empty()) { - continue; - } else if (!nebula::value(result).empty()) { - oldVal = std::move(nebula::value(result)); - oldReader = - RowReaderWrapper::getEdgePropReader(env_->schemaMan_, spaceId_, edgeType, oldVal); - ret.readSet.emplace_back(key); + for (auto& index : indexes_) { + if (edgeType == index->get_schema_id().get_edge_type()) { + /* + * step 1 , Delete old version index if exists. + */ + if (!ignoreExistedIndex_ && val.empty()) { + auto obsIdx = findOldValue(partId, e.first); + if (!nebula::ok(obsIdx)) { + return nebula::error(obsIdx); + } + val = std::move(nebula::value(obsIdx)); + if (!val.empty()) { + oReader = + RowReaderWrapper::getEdgePropReader(env_->schemaMan_, spaceId_, edgeType, val); + if (oReader == nullptr) { + LOG(ERROR) << "Bad format row"; + return nebula::cpp2::ErrorCode::E_INVALID_DATA; + } } - } else { - // read old value failed - return ret; } - } - for (const auto& index : indexes_) { - if (edgeType == index->get_schema_id().get_edge_type()) { - // step 1, Delete old version index if exists. - if (oldReader != nullptr) { - auto oldIndexKeys = indexKeys(partId, oldReader.get(), key, index, nullptr); - if (!oldIndexKeys.empty()) { - ret.writeSet.insert(ret.writeSet.end(), oldIndexKeys.begin(), oldIndexKeys.end()); - // Check the index is building for the specified partition or - // not. - auto indexState = env_->getIndexState(spaceId_, partId); - if (env_->checkRebuilding(indexState)) { - auto delOpKey = OperationKeyUtils::deleteOperationKey(partId); - for (auto& idxKey : oldIndexKeys) { - ret.writeSet.push_back(idxKey); - batchHolder->put(std::string(delOpKey), std::move(idxKey)); - } - } else if (env_->checkIndexLocked(indexState)) { - return ret; - } else { - for (auto& idxKey : oldIndexKeys) { - ret.writeSet.push_back(idxKey); - batchHolder->remove(std::move(idxKey)); - } + + if (!val.empty()) { + auto ois = indexKeys(partId, oReader.get(), e.first, index, schema.get()); + if (!ois.empty()) { + // Check the index is building for the specified partition or not. + auto indexState = env_->getIndexState(spaceId_, partId); + if (env_->checkRebuilding(indexState)) { + auto deleteOpKey = OperationKeyUtils::deleteOperationKey(partId); + for (auto& oi : ois) { + batchHolder->put(std::string(deleteOpKey), std::move(oi)); + } + } else if (env_->checkIndexLocked(indexState)) { + LOG(ERROR) << "The index has been locked: " << index->get_index_name(); + return nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; + } else { + for (auto& oi : ois) { + batchHolder->remove(std::move(oi)); } } } + } - // step 2, Insert new edge index - if (newReader != nullptr) { - auto newIndexKeys = indexKeys(partId, newReader.get(), key, index, nullptr); - if (!newIndexKeys.empty()) { - // check if index has ttl field, write it to index value if exists - auto field = CommonUtils::ttlValue(schema.get(), newReader.get()); - auto indexVal = field.ok() ? IndexKeyUtils::indexVal(std::move(field).value()) : ""; - auto indexState = env_->getIndexState(spaceId_, partId); - if (env_->checkRebuilding(indexState)) { - for (auto& idxKey : newIndexKeys) { - auto opKey = OperationKeyUtils::modifyOperationKey(partId, idxKey); - ret.writeSet.push_back(opKey); - batchHolder->put(std::move(opKey), std::string(indexVal)); - } - } else if (env_->checkIndexLocked(indexState)) { - // return folly::Optional(); - return ret; - } else { - for (auto& idxKey : newIndexKeys) { - ret.writeSet.push_back(idxKey); - batchHolder->put(std::move(idxKey), std::string(indexVal)); - } - } + /* + * step 2 , Insert new edge index + */ + if (nReader == nullptr) { + nReader = + RowReaderWrapper::getEdgePropReader(env_->schemaMan_, spaceId_, edgeType, e.second); + if (nReader == nullptr) { + LOG(ERROR) << "Bad format row"; + return nebula::cpp2::ErrorCode::E_INVALID_DATA; + } + } + + auto niks = indexKeys(partId, nReader.get(), e.first, index, schema.get()); + if (!niks.empty()) { + auto v = CommonUtils::ttlValue(schema.get(), nReader.get()); + auto niv = v.ok() ? IndexKeyUtils::indexVal(std::move(v).value()) : ""; + // Check the index is building for the specified partition or not. + auto indexState = env_->getIndexState(spaceId_, partId); + if (env_->checkRebuilding(indexState)) { + for (auto& nik : niks) { + auto modifyOpKey = OperationKeyUtils::modifyOperationKey(partId, std::move(nik)); + batchHolder->put(std::move(modifyOpKey), std::string(niv)); + } + } else if (env_->checkIndexLocked(indexState)) { + LOG(ERROR) << "The index has been locked: " << index->get_index_name(); + return nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; + } else { + for (auto& nik : niks) { + batchHolder->put(std::move(nik), std::string(niv)); } } } } } - // step 3, Insert new edge data - ret.writeSet.push_back(key); - // for why use a copy not move here: - // previously, we use atomicOp(a kind of raft log, raft send this log in sync) - // this import an implicit constraint - // that all atomicOp will execute only once - // (because all atomicOp may fail or succeed, won't retry) - // but in mergeable mode of atomic: - // an atomicOp may fail because of conflict - // then it will retry after the prev batch commit - // this mean now atomicOp may execute twice - // (won't be more than twice) - // but if we move the key out, - // then the second run will core. - batchHolder->put(std::string(key), std::string(value)); - } - - if (consistOp_) { - (*consistOp_)(*batchHolder, nullptr); + /* + * step 3 , Insert new edge data + */ + auto key = e.first; + auto prop = e.second; + batchHolder->put(std::move(key), std::move(prop)); } - - ret.code = nebula::cpp2::ErrorCode::SUCCEEDED; - ret.batch = encodeBatchValue(batchHolder->getBatch()); - return ret; + return encodeBatchValue(batchHolder->getBatch()); } ErrorOr AddEdgesProcessor::findOldValue( diff --git a/src/storage/mutate/AddEdgesProcessor.h b/src/storage/mutate/AddEdgesProcessor.h index 9a8f8ef649e..0afa646a5c0 100644 --- a/src/storage/mutate/AddEdgesProcessor.h +++ b/src/storage/mutate/AddEdgesProcessor.h @@ -9,7 +9,6 @@ #include "common/base/Base.h" #include "common/stats/StatsManager.h" #include "kvstore/LogEncoder.h" -#include "kvstore/raftex/RaftPart.h" #include "storage/BaseProcessor.h" #include "storage/StorageFlags.h" @@ -38,8 +37,8 @@ class AddEdgesProcessor : public BaseProcessor { AddEdgesProcessor(StorageEnv* env, const ProcessorCounters* counters) : BaseProcessor(env, counters) {} - kvstore::MergeableAtomicOpResult addEdgesWithIndex(PartitionID partId, - std::vector&& data); + ErrorOr addEdges(PartitionID partId, + const std::vector& edges); ErrorOr findOldValue(PartitionID partId, const folly::StringPiece& rawKey); diff --git a/src/storage/mutate/AddVerticesProcessor.cpp b/src/storage/mutate/AddVerticesProcessor.cpp index dc0f3a97e31..83fcab649a3 100644 --- a/src/storage/mutate/AddVerticesProcessor.cpp +++ b/src/storage/mutate/AddVerticesProcessor.cpp @@ -137,17 +137,18 @@ void AddVerticesProcessor::doProcess(const cpp2::AddVerticesRequest& req) { void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& req) { const auto& partVertices = req.get_parts(); const auto& propNamesMap = req.get_prop_names(); - auto batchHolder = std::make_unique(); - - for (const auto& part : partVertices) { + for (auto& part : partVertices) { + IndexCountWrapper wrapper(env_); + std::unique_ptr batchHolder = std::make_unique(); auto partId = part.first; const auto& vertices = part.second; - std::vector kvs; - kvs.reserve(vertices.size()); - + std::vector dummyLock; + dummyLock.reserve(vertices.size()); auto code = nebula::cpp2::ErrorCode::SUCCEEDED; + + // cache tagKey deleteDupVid(const_cast&>(vertices)); - for (const auto& vertex : vertices) { + for (auto& vertex : vertices) { auto vid = vertex.get_id().getStr(); const auto& newTags = vertex.get_tags(); @@ -157,10 +158,18 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re code = nebula::cpp2::ErrorCode::E_INVALID_VID; break; } - batchHolder->put(NebulaKeyUtils::vertexKey(spaceVidLen_, partId, vid), ""); - for (const auto& newTag : newTags) { + for (auto& newTag : newTags) { auto tagId = newTag.get_tag_id(); + auto l = std::make_tuple(spaceId_, partId, tagId, vid); + if (std::find(dummyLock.begin(), dummyLock.end(), l) == dummyLock.end()) { + if (!env_->verticesML_->try_lock(l)) { + LOG(ERROR) << folly::sformat("The vertex locked : tag {}, vid {}", tagId, vid); + code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; + break; + } + dummyLock.emplace_back(std::move(l)); + } VLOG(3) << "PartitionID: " << partId << ", VertexID: " << vid << ", TagID: " << tagId; auto schema = env_->schemaMan_->getTagSchema(spaceId_, tagId); @@ -171,139 +180,134 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re } auto key = NebulaKeyUtils::tagKey(spaceVidLen_, partId, vid, tagId); - // collect values - const auto& props = newTag.get_props(); + auto props = newTag.get_props(); auto iter = propNamesMap.find(tagId); std::vector propNames; if (iter != propNamesMap.end()) { propNames = iter->second; } - WriteResult writeResult; - auto encode = encodeRowVal(schema.get(), propNames, props, writeResult); - if (!encode.ok()) { - LOG(ERROR) << encode.status(); - code = writeResultTo(writeResult, false); - break; + RowReaderWrapper nReader; + RowReaderWrapper oReader; + std::string oldVal; + if (!ignoreExistedIndex_) { + auto obsIdx = findOldValue(partId, vid, tagId); + if (nebula::ok(obsIdx)) { + if (ifNotExists_ && !nebula::value(obsIdx).empty()) { + continue; + } + if (!nebula::value(obsIdx).empty()) { + oldVal = std::move(value(obsIdx)); + oReader = + RowReaderWrapper::getTagPropReader(env_->schemaMan_, spaceId_, tagId, oldVal); + } + } else { + code = nebula::error(obsIdx); + break; + } } - kvs.emplace_back(std::string(key), std::string(encode.value())); - } - } - - if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - handleAsync(spaceId_, partId, code); - } else { - auto atomicOp = [&, partId, data = std::move(kvs)]() mutable { - return addVerticesWithIndex(partId, std::move(data), std::move(batchHolder)); - }; - auto cb = [partId, this](nebula::cpp2::ErrorCode ec) { handleAsync(spaceId_, partId, ec); }; - env_->kvstore_->asyncAtomicOp(spaceId_, partId, std::move(atomicOp), std::move(cb)); - } - } -} + WriteResult wRet; + auto retEnc = encodeRowVal(schema.get(), propNames, props, wRet); + if (!retEnc.ok()) { + LOG(ERROR) << retEnc.status(); + code = writeResultTo(wRet, false); + break; + } -kvstore::MergeableAtomicOpResult AddVerticesProcessor::addVerticesWithIndex( - PartitionID partId, - std::vector&& data, - std::unique_ptr&& batchHolder) { - kvstore::MergeableAtomicOpResult ret; - ret.code = nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED; - IndexCountWrapper wrapper(env_); - for (auto& [key, value] : data) { - auto vId = NebulaKeyUtils::getVertexId(spaceVidLen_, key); - auto tagId = NebulaKeyUtils::getTagId(spaceVidLen_, key); - RowReaderWrapper oldReader; - RowReaderWrapper newReader = - RowReaderWrapper::getTagPropReader(env_->schemaMan_, spaceId_, tagId, value); - auto schema = env_->schemaMan_->getTagSchema(spaceId_, tagId); - if (!schema) { - ret.code = nebula::cpp2::ErrorCode::E_TAG_NOT_FOUND; - DLOG(INFO) << "===>>> failed"; - return ret; - } - std::string oldVal; - if (!ignoreExistedIndex_) { - // read the old key value and initialize row reader if exists - auto result = findOldValue(partId, vId.str(), tagId); - if (nebula::ok(result)) { - if (ifNotExists_ && !nebula::value(result).empty()) { - continue; - } else if (!nebula::value(result).empty()) { - oldVal = std::move(nebula::value(result)); - oldReader = RowReaderWrapper::getTagPropReader(env_->schemaMan_, spaceId_, tagId, oldVal); - ret.readSet.emplace_back(key); + if (!retEnc.value().empty()) { + nReader = + RowReaderWrapper::getTagPropReader(env_->schemaMan_, spaceId_, tagId, retEnc.value()); } - } else { - // read old value failed - DLOG(INFO) << "===>>> failed"; - return ret; - } - } - for (const auto& index : indexes_) { - if (tagId == index->get_schema_id().get_tag_id()) { - // step 1, Delete old version index if exists. - if (oldReader != nullptr) { - auto oldIndexKeys = indexKeys(partId, vId.str(), oldReader.get(), index, schema.get()); - if (!oldIndexKeys.empty()) { - // Check the index is building for the specified partition or - // not. - auto indexState = env_->getIndexState(spaceId_, partId); - if (env_->checkRebuilding(indexState)) { - auto delOpKey = OperationKeyUtils::deleteOperationKey(partId); - for (auto& idxKey : oldIndexKeys) { - ret.writeSet.emplace_back(std::string(delOpKey)); - batchHolder->put(std::string(delOpKey), std::move(idxKey)); - } - } else if (env_->checkIndexLocked(indexState)) { - LOG(ERROR) << "The index has been locked: " << index->get_index_name(); - ret.code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; - return ret; - } else { - for (auto& idxKey : oldIndexKeys) { - ret.writeSet.emplace_back(std::string(idxKey)); - batchHolder->remove(std::move(idxKey)); + for (auto& index : indexes_) { + if (tagId == index->get_schema_id().get_tag_id()) { + auto indexFields = index->get_fields(); + /* + * step 1 , Delete old version index if exists. + */ + if (oReader != nullptr) { + auto ois = indexKeys(partId, vid, oReader.get(), index, schema.get()); + if (!ois.empty()) { + // Check the index is building for the specified partition or not + auto indexState = env_->getIndexState(spaceId_, partId); + if (env_->checkRebuilding(indexState)) { + auto delOpKey = OperationKeyUtils::deleteOperationKey(partId); + for (auto& oi : ois) { + batchHolder->put(std::string(delOpKey), std::move(oi)); + } + } else if (env_->checkIndexLocked(indexState)) { + LOG(ERROR) << "The index has been locked: " << index->get_index_name(); + code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; + break; + } else { + for (auto& oi : ois) { + batchHolder->remove(std::move(oi)); + } + } } } - } - } - // step 2, Insert new vertex index - if (newReader != nullptr) { - auto newIndexKeys = indexKeys(partId, vId.str(), newReader.get(), index, schema.get()); - if (!newIndexKeys.empty()) { - // check if index has ttl field, write it to index value if exists - auto field = CommonUtils::ttlValue(schema.get(), newReader.get()); - auto indexVal = field.ok() ? IndexKeyUtils::indexVal(std::move(field).value()) : ""; - auto indexState = env_->getIndexState(spaceId_, partId); - if (env_->checkRebuilding(indexState)) { - for (auto& idxKey : newIndexKeys) { - auto opKey = OperationKeyUtils::modifyOperationKey(partId, idxKey); - ret.writeSet.emplace_back(std::string(opKey)); - batchHolder->put(std::move(opKey), std::string(indexVal)); - } - } else if (env_->checkIndexLocked(indexState)) { - LOG(ERROR) << "The index has been locked: " << index->get_index_name(); - ret.code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; - return ret; - } else { - for (auto& idxKey : newIndexKeys) { - ret.writeSet.emplace_back(std::string(idxKey)); - batchHolder->put(std::move(idxKey), std::string(indexVal)); + /* + * step 2 , Insert new vertex index + */ + if (nReader != nullptr) { + auto niks = indexKeys(partId, vid, nReader.get(), index, schema.get()); + if (!niks.empty()) { + auto v = CommonUtils::ttlValue(schema.get(), nReader.get()); + auto niv = v.ok() ? IndexKeyUtils::indexVal(std::move(v).value()) : ""; + // Check the index is building for the specified partition or + // not. + auto indexState = env_->getIndexState(spaceId_, partId); + if (env_->checkRebuilding(indexState)) { + for (auto& nik : niks) { + auto opKey = OperationKeyUtils::modifyOperationKey(partId, nik); + batchHolder->put(std::move(opKey), std::string(niv)); + } + } else if (env_->checkIndexLocked(indexState)) { + LOG(ERROR) << "The index has been locked: " << index->get_index_name(); + code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; + break; + } else { + for (auto& nik : niks) { + batchHolder->put(std::move(nik), std::string(niv)); + } + } } } } + } // for index data + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + break; } + /* + * step 3 , Insert new vertex data + */ + batchHolder->put(std::move(key), std::move(retEnc.value())); + stats::StatsManager::addValue(kNumVerticesInserted); } + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + break; + } + } + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + env_->verticesML_->unlockBatch(dummyLock); + handleAsync(spaceId_, partId, code); + continue; } - // step 3, Insert new vertex data - ret.writeSet.emplace_back(key); - batchHolder->put(std::string(key), std::string(value)); + auto batch = encodeBatchValue(batchHolder->getBatch()); + DCHECK(!batch.empty()); + nebula::MemoryLockGuard lg(env_->verticesML_.get(), std::move(dummyLock), false, false); + env_->kvstore_->asyncAppendBatch(spaceId_, + partId, + std::move(batch), + [l = std::move(lg), icw = std::move(wrapper), partId, this]( + nebula::cpp2::ErrorCode retCode) { + UNUSED(l); + UNUSED(icw); + handleAsync(spaceId_, partId, retCode); + }); } - ret.batch = encodeBatchValue(batchHolder->getBatch()); - ret.code = nebula::cpp2::ErrorCode::SUCCEEDED; - return ret; -} +} // namespace storage ErrorOr AddVerticesProcessor::findOldValue( PartitionID partId, const VertexID& vId, TagID tagId) { diff --git a/src/storage/mutate/AddVerticesProcessor.h b/src/storage/mutate/AddVerticesProcessor.h index 851039fb396..71725c9b5be 100644 --- a/src/storage/mutate/AddVerticesProcessor.h +++ b/src/storage/mutate/AddVerticesProcessor.h @@ -46,11 +46,6 @@ class AddVerticesProcessor : public BaseProcessor { void deleteDupVid(std::vector& vertices); - kvstore::MergeableAtomicOpResult addVerticesWithIndex( - PartitionID partId, - std::vector&& data, - std::unique_ptr&& batchHolder); - private: GraphSpaceID spaceId_; std::vector> indexes_; diff --git a/src/storage/test/IndexTestUtil.h b/src/storage/test/IndexTestUtil.h index 9730fc3d514..87d2dbec3ef 100644 --- a/src/storage/test/IndexTestUtil.h +++ b/src/storage/test/IndexTestUtil.h @@ -236,14 +236,10 @@ class MockKVStore : public ::nebula::kvstore::KVStore { } } - void asyncAtomicOp(GraphSpaceID spaceId, - PartitionID partId, - ::nebula::kvstore::MergeableAtomicOp op, - ::nebula::kvstore::KVCallback cb) override { - UNUSED(spaceId); - UNUSED(partId); - UNUSED(cb); - UNUSED(op); + void asyncAtomicOp(GraphSpaceID, + PartitionID, + raftex::AtomicOp, + ::nebula::kvstore::KVCallback) override { LOG(FATAL) << "Unexpect"; } void asyncAppendBatch(GraphSpaceID, @@ -340,7 +336,6 @@ class MockIndexNode : public IndexNode { } std::unique_ptr copy() override { LOG(FATAL) << "Unexpect"; - return nullptr; } std::function nextFunc; std::function<::nebula::cpp2::ErrorCode(PartitionID)> executeFunc; diff --git a/src/storage/test/RebuildIndexTest.cpp b/src/storage/test/RebuildIndexTest.cpp index 1f2d507194b..e2b9737545e 100644 --- a/src/storage/test/RebuildIndexTest.cpp +++ b/src/storage/test/RebuildIndexTest.cpp @@ -27,6 +27,10 @@ class RebuildIndexTest : public ::testing::Test { protected: static void SetUpTestCase() { LOG(INFO) << "SetUp RebuildIndexTest TestCase"; + rootPath_ = std::make_unique("/tmp/RebuildIndexTest.XXXXXX"); + cluster_ = std::make_unique(); + cluster_->initStorageKV(rootPath_->path()); + env_ = cluster_->storageEnv_.get(); manager_ = AdminTaskManager::instance(); manager_->init(); } @@ -34,20 +38,14 @@ class RebuildIndexTest : public ::testing::Test { static void TearDownTestCase() { LOG(INFO) << "TearDown RebuildIndexTest TestCase"; manager_->shutdown(); - } - - void SetUp() override { - rootPath_ = std::make_unique("/tmp/RebuildIndexTest.XXXXXX"); - cluster_ = std::make_unique(); - cluster_->initStorageKV(rootPath_->path()); - env_ = cluster_->storageEnv_.get(); - } - - void TearDown() override { cluster_.reset(); rootPath_.reset(); } + void SetUp() override {} + + void TearDown() override {} + static StorageEnv* env_; static AdminTaskManager* manager_; @@ -93,7 +91,7 @@ TEST_F(RebuildIndexTest, RebuildTagIndexCheckALLData) { // Wait for the task finished do { - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + usleep(50); } while (!manager_->isFinished(context.jobId_, context.taskId_)); // Check the data count @@ -179,7 +177,7 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndexCheckALLData) { // Wait for the task finished do { - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + usleep(50); } while (!manager_->isFinished(context.jobId_, context.taskId_)); // Check the data count @@ -279,7 +277,7 @@ TEST_F(RebuildIndexTest, RebuildTagIndexWithDelete) { // Wait for the task finished do { - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + usleep(50); } while (!manager_->isFinished(context.jobId_, context.taskId_)); LOG(INFO) << "Check rebuild tag index..."; @@ -339,7 +337,7 @@ TEST_F(RebuildIndexTest, RebuildTagIndexWithAppend) { // Wait for the task finished do { - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + usleep(50); } while (!manager_->isFinished(context.jobId_, context.taskId_)); LOG(INFO) << "Check rebuild tag index..."; @@ -351,11 +349,7 @@ TEST_F(RebuildIndexTest, RebuildTagIndexWithAppend) { RebuildIndexTest::env_->rebuildIndexGuard_->clear(); writer->stop(); - - for (int i = 1; i <= 5; ++i) { - LOG(INFO) << "sleep for " << i << "s"; - sleep(1); - } + sleep(1); } TEST_F(RebuildIndexTest, RebuildTagIndex) { @@ -387,8 +381,7 @@ TEST_F(RebuildIndexTest, RebuildTagIndex) { // Wait for the task finished do { - // usleep(50); - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + usleep(50); } while (!manager_->isFinished(context.jobId_, context.taskId_)); // Check the result @@ -400,10 +393,7 @@ TEST_F(RebuildIndexTest, RebuildTagIndex) { } RebuildIndexTest::env_->rebuildIndexGuard_->clear(); - for (int i = 1; i <= 5; ++i) { - LOG(INFO) << "sleep for " << i << "s"; - sleep(1); - } + sleep(1); } TEST_F(RebuildIndexTest, RebuildEdgeIndexWithDelete) { @@ -448,8 +438,7 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndexWithDelete) { // Wait for the task finished do { - // usleep(50); - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + usleep(50); } while (!manager_->isFinished(context.jobId_, context.taskId_)); // Check the result @@ -465,11 +454,7 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndexWithDelete) { RebuildIndexTest::env_->rebuildIndexGuard_->clear(); writer->stop(); - // sleep(1); - for (int i = 1; i <= 5; ++i) { - LOG(INFO) << "sleep for " << i << "s"; - sleep(1); - } + sleep(1); } TEST_F(RebuildIndexTest, RebuildEdgeIndexWithAppend) { @@ -512,7 +497,7 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndexWithAppend) { // Wait for the task finished do { - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + usleep(50); } while (!manager_->isFinished(context.jobId_, context.taskId_)); // Check the result @@ -526,10 +511,6 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndexWithAppend) { RebuildIndexTest::env_->rebuildIndexGuard_->clear(); writer->stop(); sleep(1); - for (int i = 1; i <= 5; ++i) { - LOG(INFO) << "sleep for " << i << "s"; - sleep(1); - } } TEST_F(RebuildIndexTest, RebuildEdgeIndex) { @@ -560,7 +541,7 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndex) { // Wait for the task finished do { - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + usleep(50); } while (!manager_->isFinished(context.jobId_, context.taskId_)); // Check the result @@ -570,10 +551,6 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndex) { auto code = RebuildIndexTest::env_->kvstore_->get(1, key.first, key.second, &value); EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); } - for (int i = 1; i <= 5; ++i) { - LOG(INFO) << "sleep for " << i << "s"; - sleep(1); - } } } // namespace storage diff --git a/src/storage/test/StatsTaskTest.cpp b/src/storage/test/StatsTaskTest.cpp index 34fe20f1257..803a04b4d08 100644 --- a/src/storage/test/StatsTaskTest.cpp +++ b/src/storage/test/StatsTaskTest.cpp @@ -264,8 +264,7 @@ TEST_F(StatsTaskTest, StatsTagAndEdgeData) { ASSERT_EQ(0, edge.second); } } - // ASSERT_EQ(81, *statsItem.space_vertices_ref()); - EXPECT_EQ(81, *statsItem.space_vertices_ref()); + ASSERT_EQ(81, *statsItem.space_vertices_ref()); ASSERT_EQ(167, *statsItem.space_edges_ref()); }