From 5ecca54bf6ff5063d08f891992053310508d4fa0 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 31 Jul 2024 16:47:10 +0800 Subject: [PATCH 1/6] [fix](move-memtable) tolerate minority failures in load_stream --- be/src/runtime/load_stream.cpp | 113 +++++++++++------- be/src/runtime/load_stream.h | 14 +-- be/src/runtime/load_stream_writer.cpp | 2 - .../test_load_stream_fault_injection.groovy | 6 +- 4 files changed, 76 insertions(+), 59 deletions(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index c818c4664a0689..aa1749caace3e6 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -62,7 +62,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, _txn_id(txn_id), _load_stream_mgr(load_stream_mgr) { load_stream_mgr->create_tokens(_flush_tokens); - _failed_st = std::make_shared(); + _status = Status::OK(); _profile = profile->create_child(fmt::format("TabletStream {}", id), true, true); _append_data_timer = ADD_TIMER(_profile, "AppendDataTime"); _add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime"); @@ -71,7 +71,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream) { ostr << "load_id=" << tablet_stream._load_id << ", txn_id=" << tablet_stream._txn_id - << ", tablet_id=" << tablet_stream._id << ", status=" << *tablet_stream._failed_st; + << ", tablet_id=" << tablet_stream._id << ", status=" << tablet_stream._status; return ostr; } @@ -89,17 +89,20 @@ Status TabletStream::init(std::shared_ptr schema, int64_t }; _load_stream_writer = std::make_shared(&req, _profile); - auto st = _load_stream_writer->init(); - if (!st.ok()) { - _failed_st = std::make_shared(st); + DBUG_EXECUTE_IF("TabletStream.init.uninited_writer", { + _status = Status::Uninitialized("fault injection"); + return _status; + }); + _status = _load_stream_writer->init(); + if (!_status.ok()) { LOG(INFO) << "failed to init rowset builder due to " << *this; } - return st; + return _status; } Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data) { - if (!_failed_st->ok()) { - return *_failed_st; + if (!_status.ok()) { + return _status; } // dispatch add_segment request @@ -156,9 +159,9 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data file_type, new_segid); } } - if (!st.ok() && _failed_st->ok()) { - _failed_st = std::make_shared(st); - LOG(INFO) << "write data failed " << *this; + if (!st.ok() && _status.ok()) { + _status = st; + LOG(WARNING) << "write data failed " << st << ", " << *this; } }; auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()]; @@ -173,10 +176,11 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data timer.start(); while (flush_token->num_tasks() >= load_stream_flush_token_max_tasks) { if (timer.elapsed_time() / 1000 / 1000 >= load_stream_max_wait_flush_token_time_ms) { - return Status::Error( + _status = Status::Error( "wait flush token back pressure time is more than " "load_stream_max_wait_flush_token_time {}", load_stream_max_wait_flush_token_time_ms); + return _status; } bthread_usleep(2 * 1000); // 2ms } @@ -184,10 +188,18 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data int64_t time_ms = timer.elapsed_time() / 1000 / 1000; g_load_stream_flush_wait_ms << time_ms; g_load_stream_flush_running_threads << 1; - return flush_token->submit_func(flush_func); + auto st = flush_token->submit_func(flush_func); + if (!st.ok()) { + _status = st; + } + return _status; } Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) { + if (!_status.ok()) { + return _status; + } + SCOPED_TIMER(_add_segment_timer); DCHECK(header.has_segment_statistics()); SegmentStatistics stat(header.segment_statistics()); @@ -204,15 +216,17 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data { std::lock_guard lock_guard(_lock); if (!_segids_mapping.contains(src_id)) { - return Status::InternalError( + _status = Status::InternalError( "add segment failed, no segment written by this src be yet, src_id={}, " "segment_id={}", src_id, segid); + return _status; } if (segid >= _segids_mapping[src_id]->size()) { - return Status::InternalError( + _status = Status::InternalError( "add segment failed, segment is never written, src_id={}, segment_id={}", src_id, segid); + return _status; } new_segid = _segids_mapping[src_id]->at(segid); } @@ -221,16 +235,24 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data auto add_segment_func = [this, new_segid, stat, flush_schema]() { signal::set_signal_task_id(_load_id); auto st = _load_stream_writer->add_segment(new_segid, stat, flush_schema); - if (!st.ok() && _failed_st->ok()) { - _failed_st = std::make_shared(st); + if (!st.ok() && _status.ok()) { + _status = st; LOG(INFO) << "add segment failed " << *this; } }; auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()]; - return flush_token->submit_func(add_segment_func); + auto st = flush_token->submit_func(add_segment_func); + if (!st.ok()) { + _status = st; + } + return _status; } Status TabletStream::close() { + if (!_status.ok()) { + return _status; + } + SCOPED_TIMER(_close_wait_timer); bthread::Mutex mu; std::unique_lock lock(mu); @@ -247,23 +269,24 @@ Status TabletStream::close() { if (ret) { cv.wait(lock); } else { - return Status::Error( + _status = Status::Error( "there is not enough thread resource for close load"); + return _status; } - if (!_failed_st->ok()) { - return *_failed_st; - } if (_next_segid.load() != _num_segments) { - return Status::Corruption( + _status = Status::Corruption( "segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id, _num_segments, _next_segid.load(), print_id(_load_id)); + return _status; } - Status st = Status::OK(); - auto close_func = [this, &mu, &cv, &st]() { + auto close_func = [this, &mu, &cv]() { signal::set_signal_task_id(_load_id); - st = _load_stream_writer->close(); + auto st = _load_stream_writer->close(); + if (!st.ok() && _status.ok()) { + _status = st; + } std::lock_guard lock(mu); cv.notify_one(); }; @@ -271,10 +294,10 @@ Status TabletStream::close() { if (ret) { cv.wait(lock); } else { - return Status::Error( + _status = Status::Error( "there is not enough thread resource for close load"); } - return st; + return _status; } IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id, @@ -298,7 +321,7 @@ Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) std::lock_guard lock_guard(_lock); auto it = _tablet_streams_map.find(tablet_id); if (it == _tablet_streams_map.end()) { - RETURN_IF_ERROR(_init_tablet_stream(tablet_stream, tablet_id, header.partition_id())); + _init_tablet_stream(tablet_stream, tablet_id, header.partition_id()); } else { tablet_stream = it->second; } @@ -307,17 +330,19 @@ Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* data) return tablet_stream->append_data(header, data); } -Status IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id, - int64_t partition_id) { +void IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id, + int64_t partition_id) { tablet_stream = std::make_shared(_load_id, tablet_id, _txn_id, _load_stream_mgr, _profile); _tablet_streams_map[tablet_id] = tablet_stream; - RETURN_IF_ERROR(tablet_stream->init(_schema, _id, partition_id)); - return Status::OK(); + auto st = tablet_stream->init(_schema, _id, partition_id); + if (!st.ok()) { + LOG(WARNING) << "tablet stream init failed " << *tablet_stream; + } } -Status IndexStream::close(const std::vector& tablets_to_commit, - std::vector* success_tablet_ids, FailedTablets* failed_tablets) { +void IndexStream::close(const std::vector& tablets_to_commit, + std::vector* success_tablet_ids, FailedTablets* failed_tablets) { std::lock_guard lock_guard(_lock); SCOPED_TIMER(_close_wait_timer); // open all need commit tablets @@ -328,8 +353,7 @@ Status IndexStream::close(const std::vector& tablets_to_commit, TabletStreamSharedPtr tablet_stream; auto it = _tablet_streams_map.find(tablet.tablet_id()); if (it == _tablet_streams_map.end()) { - RETURN_IF_ERROR( - _init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id())); + _init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id()); tablet_stream->add_num_segments(tablet.num_segments()); } else { it->second->add_num_segments(tablet.num_segments()); @@ -345,7 +369,6 @@ Status IndexStream::close(const std::vector& tablets_to_commit, failed_tablets->emplace_back(tablet_stream->id(), st); } } - return Status::OK(); } // TODO: Profile is temporary disabled, because: @@ -398,8 +421,8 @@ Status LoadStream::init(const POpenLoadStreamRequest* request) { return Status::OK(); } -Status LoadStream::close(int64_t src_id, const std::vector& tablets_to_commit, - std::vector* success_tablet_ids, FailedTablets* failed_tablets) { +void LoadStream::close(int64_t src_id, const std::vector& tablets_to_commit, + std::vector* success_tablet_ids, FailedTablets* failed_tablets) { std::lock_guard lock_guard(_lock); SCOPED_TIMER(_close_wait_timer); @@ -417,16 +440,14 @@ Status LoadStream::close(int64_t src_id, const std::vector& tablets_t if (_close_load_cnt < _total_streams) { // do not return commit info if there is remaining streams. - return Status::OK(); + return; } for (auto& [_, index_stream] : _index_streams_map) { - RETURN_IF_ERROR( - index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets)); + index_stream->close(_tablets_to_commit, success_tablet_ids, failed_tablets); } LOG(INFO) << "close load " << *this << ", success_tablet_num=" << success_tablet_ids->size() << ", failed_tablet_num=" << failed_tablets->size(); - return Status::OK(); } void LoadStream::_report_result(StreamId stream, const Status& status, @@ -612,8 +633,8 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* std::vector success_tablet_ids; FailedTablets failed_tablets; std::vector tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end()); - auto st = close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets); - _report_result(id, st, success_tablet_ids, failed_tablets, true); + close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablets); + _report_result(id, Status::OK(), success_tablet_ids, failed_tablets, true); brpc::StreamClose(id); } break; case PStreamHeader::GET_SCHEMA: { diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index 80e69c784ad789..427bc2dbb62cc8 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -66,7 +66,7 @@ class TabletStream { std::atomic _next_segid; int64_t _num_segments = 0; bthread::Mutex _lock; - std::shared_ptr _failed_st; + Status _status; PUniqueId _load_id; int64_t _txn_id; RuntimeProfile* _profile = nullptr; @@ -86,12 +86,12 @@ class IndexStream { Status append_data(const PStreamHeader& header, butil::IOBuf* data); - Status close(const std::vector& tablets_to_commit, - std::vector* success_tablet_ids, FailedTablets* failed_tablet_ids); + void close(const std::vector& tablets_to_commit, + std::vector* success_tablet_ids, FailedTablets* failed_tablet_ids); private: - Status _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id, - int64_t partition_id); + void _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id, + int64_t partition_id); private: int64_t _id; @@ -124,8 +124,8 @@ class LoadStream : public brpc::StreamInputHandler { } } - Status close(int64_t src_id, const std::vector& tablets_to_commit, - std::vector* success_tablet_ids, FailedTablets* failed_tablet_ids); + void close(int64_t src_id, const std::vector& tablets_to_commit, + std::vector* success_tablet_ids, FailedTablets* failed_tablet_ids); // callbacks called by brpc int on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) override; diff --git a/be/src/runtime/load_stream_writer.cpp b/be/src/runtime/load_stream_writer.cpp index 3e66787a9bd372..d501de3d53fbb4 100644 --- a/be/src/runtime/load_stream_writer.cpp +++ b/be/src/runtime/load_stream_writer.cpp @@ -140,7 +140,6 @@ Status LoadStreamWriter::close_writer(uint32_t segid, FileType file_type) { file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers; { std::lock_guard lock_guard(_lock); - DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.uninited_writer", { _is_init = false; }); if (!_is_init) { return Status::Corruption("close_writer failed, LoadStreamWriter is not inited"); } @@ -183,7 +182,6 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& st size_t inverted_file_size = 0; { std::lock_guard lock_guard(_lock); - DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.uninited_writer", { _is_init = false; }); if (!_is_init) { return Status::Corruption("add_segment failed, LoadStreamWriter is not inited"); } diff --git a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy index 6a6aa0efd43eae..2cace455c167ff 100644 --- a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy @@ -143,8 +143,6 @@ suite("load_stream_fault_injection", "nonConcurrent") { load_with_injection("LocalFileSystem.create_file_impl.open_file_failed", "") // LoadStreamWriter append_data meet null file writer error load_with_injection("LoadStreamWriter.append_data.null_file_writer", "") - // LoadStreamWriter close_writer meet not inited error - load_with_injection("LoadStreamWriter.close_writer.uninited_writer", "") // LoadStreamWriter close_writer meet not bad segid error load_with_injection("LoadStreamWriter.close_writer.bad_segid", "") // LoadStreamWriter close_writer meet null file writer error @@ -153,8 +151,8 @@ suite("load_stream_fault_injection", "nonConcurrent") { load_with_injection("LocalFileWriter.close.failed", "") // LoadStreamWriter close_writer meet bytes_appended and real file size not match error load_with_injection("FileWriter.close_writer.zero_bytes_appended", "") - // LoadStreamWriter add_segment meet not inited error - load_with_injection("LoadStreamWriter.add_segment.uninited_writer", "") + // LoadStreamWriter close_writer/add_segment meet not inited error + load_with_injection("TabletStream.init.uninited_writer", "") // LoadStreamWriter add_segment meet not bad segid error load_with_injection("LoadStreamWriter.add_segment.bad_segid", "") // LoadStreamWriter add_segment meet null file writer error From fdbb60785b2d2b119c93636f9e795ff6454da9d0 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 31 Jul 2024 16:48:58 +0800 Subject: [PATCH 2/6] [fix](move-memtable) tolerate minority replica failure in sinkv2 --- be/src/olap/rowset/beta_rowset_writer_v2.cpp | 14 +++- be/src/vec/sink/load_stream_map_pool.cpp | 21 ++++-- be/src/vec/sink/load_stream_stub.cpp | 45 ++++++++++--- be/src/vec/sink/load_stream_stub.h | 7 +- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 70 +++++++++++++------- be/src/vec/sink/writer/vtablet_writer_v2.h | 2 +- 6 files changed, 118 insertions(+), 41 deletions(-) diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp b/be/src/olap/rowset/beta_rowset_writer_v2.cpp index 3ebe331cfc12f8..95adf3d6e50cee 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp +++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp @@ -83,9 +83,19 @@ Status BetaRowsetWriterV2::create_file_writer(uint32_t segment_id, io::FileWrite Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, const SegmentStatistics& segstat, TabletSchemaSPtr flush_schema) { + bool ok = false; for (const auto& stream : _streams) { - RETURN_IF_ERROR(stream->add_segment(_context.partition_id, _context.index_id, - _context.tablet_id, segment_id, segstat, flush_schema)); + auto st = stream->add_segment(_context.partition_id, _context.index_id, _context.tablet_id, + segment_id, segstat, flush_schema); + if (!st.ok()) { + LOG(WARNING) << "failed to add segment " << segment_id << " to stream " + << stream->stream_id(); + } + ok = ok || st.ok(); + } + if (!ok) { + return Status::InternalError("failed to add segment {} of tablet {} to any replicas", + segment_id, _context.tablet_id); } return Status::OK(); } diff --git a/be/src/vec/sink/load_stream_map_pool.cpp b/be/src/vec/sink/load_stream_map_pool.cpp index 2fcb8deaeb2c85..ba69efd9fe38f0 100644 --- a/be/src/vec/sink/load_stream_map_pool.cpp +++ b/be/src/vec/sink/load_stream_map_pool.cpp @@ -77,10 +77,14 @@ Status LoadStreamMap::for_each_st(std::function std::lock_guard lock(_mutex); snapshot = _streams_for_node; } + Status status = Status::OK(); for (auto& [dst_id, streams] : snapshot) { - RETURN_IF_ERROR(fn(dst_id, *streams)); + auto st = fn(dst_id, *streams); + if (!st.ok() && status.ok()) { + status = st; + } } - return Status::OK(); + return status; } void LoadStreamMap::save_tablets_to_commit(int64_t dst_id, @@ -112,19 +116,26 @@ Status LoadStreamMap::close_load(bool incremental) { tablets_to_commit.push_back(tablet); tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]); } + Status status = Status::OK(); bool first = true; for (auto& stream : streams) { if (stream->is_incremental() != incremental) { continue; } if (first) { - RETURN_IF_ERROR(stream->close_load(tablets_to_commit)); + auto st = stream->close_load(tablets_to_commit); + if (!st.ok() && status.ok()) { + status = st; + } first = false; } else { - RETURN_IF_ERROR(stream->close_load({})); + auto st = stream->close_load({}); + if (!st.ok() && status.ok()) { + status = st; + } } } - return Status::OK(); + return status; }); } diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index f322d67ceaf9c3..416276bc662a1d 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -149,7 +149,7 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, int64_t idle_timeout_ms, bool enable_profile) { std::unique_lock lock(_open_mutex); if (_is_init.load()) { - return Status::OK(); + return _init_st; } _dst_id = node_info.id; std::string host_port = get_host_port(node_info.host, node_info.brpc_port); @@ -161,7 +161,8 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, brpc::Controller cntl; if (int ret = brpc::StreamCreate(&_stream_id, cntl, &opt)) { delete opt.handler; - return Status::Error(ret, "Failed to create stream"); + _init_st = Status::Error(ret, "Failed to create stream"); + return _init_st; } cntl.set_timeout_ms(config::open_load_stream_timeout_ms); POpenLoadStreamRequest request; @@ -174,7 +175,8 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, } else if (total_streams > 0) { request.set_total_streams(total_streams); } else { - return Status::InternalError("total_streams should be greator than 0"); + _init_st = Status::InternalError("total_streams should be greator than 0"); + return _init_st; } request.set_idle_timeout_ms(idle_timeout_ms); schema.to_protobuf(request.mutable_schema()); @@ -195,8 +197,9 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, } if (cntl.Failed()) { brpc::StreamClose(_stream_id); - return Status::InternalError("Failed to connect to backend {}: {}", _dst_id, - cntl.ErrorText()); + _init_st = Status::InternalError("Failed to connect to backend {}: {}", _dst_id, + cntl.ErrorText()); + return _init_st; } LOG(INFO) << "open load stream to " << host_port << ", " << *this; _is_init.store(true); @@ -207,6 +210,10 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t segment_id, uint64_t offset, std::span data, bool segment_eos, FileType file_type) { + if (!_is_init.load()) { + add_failed_tablet(tablet_id, _init_st); + return _init_st; + } DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", { if (segment_id != 0) { return Status::OK(); @@ -230,6 +237,10 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64 Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t segment_id, const SegmentStatistics& segment_stat, TabletSchemaSPtr flush_schema) { + if (!_is_init.load()) { + add_failed_tablet(tablet_id, _init_st); + return _init_st; + } DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", { if (segment_id != 0) { return Status::OK(); @@ -252,6 +263,9 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64 // CLOSE_LOAD Status LoadStreamStub::close_load(const std::vector& tablets_to_commit) { + if (!_is_init.load()) { + return _init_st; + } PStreamHeader header; *header.mutable_load_id() = _load_id; header.set_src_id(_src_id); @@ -259,11 +273,20 @@ Status LoadStreamStub::close_load(const std::vector& tablets_to_commi for (const auto& tablet : tablets_to_commit) { *header.add_tablets() = tablet; } - return _encode_and_send(header); + _close_st = _encode_and_send(header); + if (!_close_st.ok()) { + LOG(WARNING) << "stream " << _stream_id << " close failed: " << _close_st; + return _close_st; + } + _is_closing.store(true); + return Status::OK(); } // GET_SCHEMA Status LoadStreamStub::get_schema(const std::vector& tablets) { + if (!_is_init.load()) { + return _init_st; + } PStreamHeader header; *header.mutable_load_id() = _load_id; header.set_src_id(_src_id); @@ -284,6 +307,9 @@ Status LoadStreamStub::get_schema(const std::vector& tablets) { Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id, int64_t timeout_ms) { + if (!_is_init.load()) { + return _init_st; + } if (_tablet_schema_for_index->contains(index_id)) { return Status::OK(); } @@ -310,7 +336,10 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) { DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK); if (!_is_init.load()) { - return Status::InternalError("stream {} is not opened, {}", _stream_id, to_string()); + return _init_st; + } + if (!_is_closing.load()) { + return _close_st; } if (_is_closed.load()) { return _check_cancel(); @@ -341,7 +370,7 @@ void LoadStreamStub::cancel(Status reason) { LOG(WARNING) << *this << " is cancelled because of " << reason; { std::lock_guard lock(_cancel_mutex); - _cancel_reason = reason; + _cancel_st = reason; _is_cancelled.store(true); } { diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index b6436a4b81a4e4..b06b19e511d99d 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -223,11 +223,12 @@ class LoadStreamStub : public std::enable_shared_from_this { } std::lock_guard lock(_cancel_mutex); return Status::Cancelled("load_id={}, reason: {}", print_id(_load_id), - _cancel_reason.to_string_no_stack()); + _cancel_st.to_string_no_stack()); } protected: std::atomic _is_init; + std::atomic _is_closing; std::atomic _is_closed; std::atomic _is_cancelled; std::atomic _is_eos; @@ -236,7 +237,9 @@ class LoadStreamStub : public std::enable_shared_from_this { brpc::StreamId _stream_id; int64_t _src_id = -1; // source backend_id int64_t _dst_id = -1; // destination backend_id - Status _cancel_reason; + Status _init_st; + Status _close_st; + Status _cancel_st; bthread::Mutex _open_mutex; bthread::Mutex _close_mutex; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 8bf0520aba09ec..a608687076c17f 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -279,18 +279,18 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams& stream return Status::InternalError("Unknown node {} in tablet location", dst_id); } auto idle_timeout_ms = _state->execution_timeout() * 1000; - // get tablet schema from each backend only in the 1st stream - for (auto& stream : streams | std::ranges::views::take(1)) { - const std::vector& tablets_for_schema = _indexes_from_node[node_info->id]; - RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, - _txn_id, *_schema, tablets_for_schema, _total_streams, - idle_timeout_ms, _state->enable_profile())); - } - // for the rest streams, open without getting tablet schema - for (auto& stream : streams | std::ranges::views::drop(1)) { - RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, - _txn_id, *_schema, {}, _total_streams, idle_timeout_ms, - _state->enable_profile())); + std::vector& tablets_for_schema = _indexes_from_node[node_info->id]; + for (auto& stream : streams) { + auto st = stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, + _txn_id, *_schema, tablets_for_schema, _total_streams, + idle_timeout_ms, _state->enable_profile()); + if (st.ok()) { + // get tablet schema from each backend only in the 1st stream + tablets_for_schema.clear(); + } else { + LOG(WARNING) << "failed to open stream to backend " << dst_id + << ", load_id=" << print_id(_load_id); + } } return Status::OK(); } @@ -475,7 +475,7 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr block return st; } -Status VTabletWriterV2::_cancel(Status status) { +void VTabletWriterV2::_cancel(Status status) { LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id << ", sink_id=" << _sender_id << ", due to error: " << status; @@ -491,7 +491,6 @@ Status VTabletWriterV2::_cancel(Status status) { }); _load_stream_map->release(); } - return Status::OK(); } Status VTabletWriterV2::_send_new_partition_batch() { @@ -553,7 +552,8 @@ Status VTabletWriterV2::close(Status exec_status) { auto st = _delta_writer_for_tablet->close(segments_for_tablet, _profile); _delta_writer_for_tablet.reset(); if (!st.ok()) { - RETURN_IF_ERROR(_cancel(st)); + _cancel(st); + return st; } // only the last sink closing delta writers will have segment num if (!segments_for_tablet.empty()) { @@ -568,23 +568,39 @@ Status VTabletWriterV2::close(Status exec_status) { // send CLOSE_LOAD on all non-incremental streams if this is the last sink if (is_last_sink) { - RETURN_IF_ERROR(_load_stream_map->close_load(false)); + auto st = _load_stream_map->close_load(false); + if (!st.ok()) { + LOG(WARNING) << "close_load failed: " << st; + } } // close_wait on all non-incremental streams, even if this is not the last sink. // because some per-instance data structures are now shared among all sinks // due to sharing delta writers and load stream stubs. - RETURN_IF_ERROR(_close_wait(false)); + { + auto st = _close_wait(false); + if (!st.ok()) { + LOG(WARNING) << "close_wait failed: " << st; + } + } // send CLOSE_LOAD on all incremental streams if this is the last sink. // this must happen after all non-incremental streams are closed, // so we can ensure all sinks are in close phase before closing incremental streams. if (is_last_sink) { - RETURN_IF_ERROR(_load_stream_map->close_load(true)); + auto st = _load_stream_map->close_load(true); + if (!st.ok()) { + LOG(WARNING) << "close_load failed: " << st; + } } // close_wait on all incremental streams, even if this is not the last sink. - RETURN_IF_ERROR(_close_wait(true)); + { + auto st = _close_wait(true); + if (!st.ok()) { + LOG(WARNING) << "close_wait failed: " << st; + } + } // calculate and submit commit info if (is_last_sink) { @@ -628,7 +644,7 @@ Status VTabletWriterV2::close(Status exec_status) { LOG(INFO) << "finished to close olap table sink. load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id; } else { - RETURN_IF_ERROR(_cancel(status)); + _cancel(status); } _is_closed = true; @@ -640,6 +656,7 @@ Status VTabletWriterV2::_close_wait(bool incremental) { SCOPED_TIMER(_close_load_timer); return _load_stream_map->for_each_st( [this, incremental](int64_t dst_id, const Streams& streams) -> Status { + Status status = Status::OK(); for (auto& stream : streams) { if (stream->is_incremental() != incremental) { continue; @@ -651,9 +668,12 @@ Status VTabletWriterV2::_close_wait(bool incremental) { << print_id(_load_id); return Status::TimedOut("load timed out before close waiting"); } - RETURN_IF_ERROR(stream->close_wait(_state, remain_ms)); + auto st = stream->close_wait(_state, remain_ms); + if (!st.ok() && status.ok()) { + status = st; + } } - return Status::OK(); + return status; }); } @@ -692,6 +712,9 @@ Status VTabletWriterV2::_create_commit_info(std::vector& tabl load_stream_map->for_each([&](int64_t dst_id, const Streams& streams) { std::unordered_set known_tablets; for (const auto& stream : streams) { + LOG(INFO) << "stream " << stream->stream_id() + << " success tablets: " << stream->success_tablets().size() + << ", failed tablets: " << stream->failed_tablets().size(); for (auto [tablet_id, reason] : stream->failed_tablets()) { if (known_tablets.contains(tablet_id)) { continue; @@ -717,7 +740,8 @@ Status VTabletWriterV2::_create_commit_info(std::vector& tabl if (replicas > (num_replicas - 1) / 2) { LOG(INFO) << "tablet " << tablet_id << " failed on majority backends: " << failed_reason[tablet_id]; - return failed_reason.at(tablet_id); + return Status::InternalError("tablet {} failed on majority backends: {}", tablet_id, + failed_reason[tablet_id]); } } return Status::OK(); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index 363dea54c3b0e9..8cbfe20c9c80ec 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -149,7 +149,7 @@ class VTabletWriterV2 final : public AsyncResultWriter { Status _close_wait(bool incremental); - Status _cancel(Status status); + void _cancel(Status status); std::shared_ptr _mem_tracker; From d2c937312cc98bf5365871b21266a9bc7154a9b5 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 31 Jul 2024 20:15:44 +0800 Subject: [PATCH 3/6] tolerate few stream open failure --- be/src/vec/sink/load_stream_stub.h | 4 ++- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 27 ++++++++++++++++--- .../test_multi_replica_fault_injection.groovy | 2 ++ 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index b06b19e511d99d..e83cbf24c690db 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -194,6 +194,8 @@ class LoadStreamStub : public std::enable_shared_from_this { int64_t dst_id() const { return _dst_id; } + bool is_inited() const { return _is_init.load(); } + bool is_incremental() const { return _is_incremental; } friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub); @@ -237,7 +239,7 @@ class LoadStreamStub : public std::enable_shared_from_this { brpc::StreamId _stream_id; int64_t _src_id = -1; // source backend_id int64_t _dst_id = -1; // destination backend_id - Status _init_st; + Status _init_st = Status::InternalError("Stream is not open"); Status _close_st; Status _cancel_st; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index a608687076c17f..fb3c8ed7cacf32 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -280,7 +280,14 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams& stream } auto idle_timeout_ms = _state->execution_timeout() * 1000; std::vector& tablets_for_schema = _indexes_from_node[node_info->id]; + int fault_injection_skip_cnt = 0; for (auto& stream : streams) { + DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.one_stream_open_failure", { + if (fault_injection_skip_cnt < 1) { + fault_injection_skip_cnt++; + continue; + } + }); auto st = stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, _txn_id, *_schema, tablets_for_schema, _total_streams, idle_timeout_ms, _state->enable_profile()); @@ -368,11 +375,23 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, tablet.set_tablet_id(tablet_id); VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, index_id, tablet_id); _tablets_for_node[node_id].emplace(tablet_id, tablet); - streams.emplace_back(_load_stream_map->at(node_id)->at(_stream_index)); - RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id, tablet_id)); + auto stream = _load_stream_map->at(node_id)->at(_stream_index); + for (int i = 1; i < _stream_per_node && !stream->is_inited(); i++) { + stream = _load_stream_map->at(node_id)->at((_stream_index + i) % _stream_per_node); + } + streams.emplace_back(std::move(stream)); } _stream_index = (_stream_index + 1) % _stream_per_node; - return Status::OK(); + Status st; + for (auto& stream : streams) { + st = stream->wait_for_schema(partition_id, index_id, tablet_id); + if (st.ok()) { + break; + } else { + LOG(WARNING) << "failed to get schema from stream " << stream << ", err=" << st; + } + } + return st; } Status VTabletWriterV2::write(RuntimeState* state, Block& input_block) { @@ -430,7 +449,7 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr block Streams streams; auto st = _select_streams(tablet_id, rows.partition_id, rows.index_id, streams); if (!st.ok()) [[unlikely]] { - LOG(WARNING) << st << ", load_id=" << print_id(_load_id); + LOG(WARNING) << "select stream failed, " << st << ", load_id=" << print_id(_load_id); return std::unique_ptr(nullptr); } WriteRequest req { diff --git a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy index d0582099eb8fe6..026ae84df070b6 100644 --- a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy @@ -96,6 +96,8 @@ suite("test_multi_replica_fault_injection", "nonConcurrent") { load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", "failed to send segment data to any replicas") // test segment num check when LoadStreamStub missed tail segments load_with_injection("LoadStreamStub.only_send_segment_0", "segment num mismatch") + // test 1st stream to each backend failure + load_with_injection("VTabletWriterV2._open_streams_to_backend.one_stream_open_failure", "success") sql """ set enable_memtable_on_sink_node=false """ } } From 45ee1a6e696fee1833336e6a8e7395ca0d1057fe Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Thu, 1 Aug 2024 04:36:23 +0800 Subject: [PATCH 4/6] add skip be injection --- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 7 +++++++ .../test_multi_replica_fault_injection.groovy | 3 +++ 2 files changed, 10 insertions(+) diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index fb3c8ed7cacf32..67339916b7f143 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -264,8 +264,15 @@ Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) { } Status VTabletWriterV2::_open_streams() { + bool fault_injection_skip_be = true; for (auto& [dst_id, _] : _tablets_for_node) { auto streams = _load_stream_map->get_or_create(dst_id); + DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_one_backend", { + if (fault_injection_skip_be) { + fault_injection_skip_be = false; + continue; + } + }); RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams)); } return Status::OK(); diff --git a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy index 026ae84df070b6..33f7e28dbc930a 100644 --- a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy @@ -77,6 +77,7 @@ suite("test_multi_replica_fault_injection", "nonConcurrent") { def load_with_injection = { injection, error_msg-> try { + sql "truncate table test" GetDebugPoint().enableDebugPointForAllBEs(injection) sql "insert into test select * from baseall where k1 <= 3" } catch(Exception e) { @@ -98,6 +99,8 @@ suite("test_multi_replica_fault_injection", "nonConcurrent") { load_with_injection("LoadStreamStub.only_send_segment_0", "segment num mismatch") // test 1st stream to each backend failure load_with_injection("VTabletWriterV2._open_streams_to_backend.one_stream_open_failure", "success") + // test one backend open failure + load_with_injection("VTabletWriterV2._open_streams.skip_one_backend", "success") sql """ set enable_memtable_on_sink_node=false """ } } From 8ca0a5da9c0e9b2724ac83ae4293cb208376b6d5 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Tue, 6 Aug 2024 18:00:17 +0800 Subject: [PATCH 5/6] void _close_wait --- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 21 +++++++------------- be/src/vec/sink/writer/vtablet_writer_v2.h | 2 +- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 67339916b7f143..cf80459901e560 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -603,12 +603,7 @@ Status VTabletWriterV2::close(Status exec_status) { // close_wait on all non-incremental streams, even if this is not the last sink. // because some per-instance data structures are now shared among all sinks // due to sharing delta writers and load stream stubs. - { - auto st = _close_wait(false); - if (!st.ok()) { - LOG(WARNING) << "close_wait failed: " << st; - } - } + _close_wait(false); // send CLOSE_LOAD on all incremental streams if this is the last sink. // this must happen after all non-incremental streams are closed, @@ -621,12 +616,7 @@ Status VTabletWriterV2::close(Status exec_status) { } // close_wait on all incremental streams, even if this is not the last sink. - { - auto st = _close_wait(true); - if (!st.ok()) { - LOG(WARNING) << "close_wait failed: " << st; - } - } + _close_wait(true); // calculate and submit commit info if (is_last_sink) { @@ -678,9 +668,9 @@ Status VTabletWriterV2::close(Status exec_status) { return status; } -Status VTabletWriterV2::_close_wait(bool incremental) { +void VTabletWriterV2::_close_wait(bool incremental) { SCOPED_TIMER(_close_load_timer); - return _load_stream_map->for_each_st( + auto st = _load_stream_map->for_each_st( [this, incremental](int64_t dst_id, const Streams& streams) -> Status { Status status = Status::OK(); for (auto& stream : streams) { @@ -701,6 +691,9 @@ Status VTabletWriterV2::_close_wait(bool incremental) { } return status; }); + if (!st.ok()) { + LOG(WARNING) << "close_wait failed: " << st << ", load_id=" << print_id(_load_id); + } } void VTabletWriterV2::_calc_tablets_to_commit() { diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index 8cbfe20c9c80ec..6d0c9ae8ff8fe8 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -147,7 +147,7 @@ class VTabletWriterV2 final : public AsyncResultWriter { void _calc_tablets_to_commit(); - Status _close_wait(bool incremental); + void _close_wait(bool incremental); void _cancel(Status status); From b422bdaebaaf94a8a0cb404bad18b5f333e6a7e1 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Tue, 6 Aug 2024 18:04:30 +0800 Subject: [PATCH 6/6] void close_load --- be/src/vec/sink/load_stream_map_pool.cpp | 8 ++++++-- be/src/vec/sink/load_stream_map_pool.h | 2 +- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 10 ++-------- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/be/src/vec/sink/load_stream_map_pool.cpp b/be/src/vec/sink/load_stream_map_pool.cpp index ba69efd9fe38f0..e8407f4730d24f 100644 --- a/be/src/vec/sink/load_stream_map_pool.cpp +++ b/be/src/vec/sink/load_stream_map_pool.cpp @@ -107,8 +107,8 @@ bool LoadStreamMap::release() { return false; } -Status LoadStreamMap::close_load(bool incremental) { - return for_each_st([this, incremental](int64_t dst_id, const Streams& streams) -> Status { +void LoadStreamMap::close_load(bool incremental) { + auto st = for_each_st([this, incremental](int64_t dst_id, const Streams& streams) -> Status { std::vector tablets_to_commit; const auto& tablets = _tablets_to_commit[dst_id]; tablets_to_commit.reserve(tablets.size()); @@ -137,6 +137,10 @@ Status LoadStreamMap::close_load(bool incremental) { } return status; }); + if (!st.ok()) { + LOG(WARNING) << "close_load for " << (incremental ? "incremental" : "non-incremental") + << " streams failed: " << st << ", load_id=" << _load_id; + } } LoadStreamMapPool::LoadStreamMapPool() = default; diff --git a/be/src/vec/sink/load_stream_map_pool.h b/be/src/vec/sink/load_stream_map_pool.h index 8728686ce9bd62..b4d59800c78ab5 100644 --- a/be/src/vec/sink/load_stream_map_pool.h +++ b/be/src/vec/sink/load_stream_map_pool.h @@ -98,7 +98,7 @@ class LoadStreamMap { // send CLOSE_LOAD to all streams, return ERROR if any. // only call this method after release() returns true. - Status close_load(bool incremental); + void close_load(bool incremental); private: const UniqueId _load_id; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index cf80459901e560..4f1e0110c1d8ed 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -594,10 +594,7 @@ Status VTabletWriterV2::close(Status exec_status) { // send CLOSE_LOAD on all non-incremental streams if this is the last sink if (is_last_sink) { - auto st = _load_stream_map->close_load(false); - if (!st.ok()) { - LOG(WARNING) << "close_load failed: " << st; - } + _load_stream_map->close_load(false); } // close_wait on all non-incremental streams, even if this is not the last sink. @@ -609,10 +606,7 @@ Status VTabletWriterV2::close(Status exec_status) { // this must happen after all non-incremental streams are closed, // so we can ensure all sinks are in close phase before closing incremental streams. if (is_last_sink) { - auto st = _load_stream_map->close_load(true); - if (!st.ok()) { - LOG(WARNING) << "close_load failed: " << st; - } + _load_stream_map->close_load(true); } // close_wait on all incremental streams, even if this is not the last sink.