diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp index f399f752c36155..444a7395f09058 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp @@ -19,6 +19,7 @@ #include #include +#include // IWYU pragma: no_include #include // IWYU pragma: keep #include @@ -31,6 +32,7 @@ #include "olap/olap_common.h" #include "olap/rowset/segment_v2/inverted_index_compound_directory.h" #include "olap/rowset/segment_v2/inverted_index_compound_reader.h" +#include "olap/rowset/segment_v2/inverted_index_desc.h" #include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "util/defer_op.h" @@ -39,19 +41,78 @@ namespace doris { namespace segment_v2 { -IndexSearcherPtr InvertedIndexSearcherCache::build_index_searcher(const io::FileSystemSPtr& fs, - const std::string& index_dir, - const std::string& file_name) { - DorisCompoundReader* directory = +Status FulltextIndexSearcherBuilder::build(const io::FileSystemSPtr& fs, + const std::string& index_dir, + const std::string& file_name, + OptionalIndexSearcherPtr& output_searcher) { + auto* directory = new DorisCompoundReader(DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()), file_name.c_str(), config::inverted_index_read_buffer_size); auto closeDirectory = true; auto index_searcher = std::make_shared(directory, closeDirectory); + if (!index_searcher) { + _CLDECDELETE(directory) + output_searcher = std::nullopt; + return Status::Error( + "FulltextIndexSearcherBuilder build index_searcher error."); + } // NOTE: need to cl_refcount-- here, so that directory will be deleted when // index_searcher is destroyed _CLDECDELETE(directory) - return index_searcher; + output_searcher = index_searcher; + return Status::OK(); +} + +Status BKDIndexSearcherBuilder::build(const io::FileSystemSPtr& fs, const std::string& index_dir, + const std::string& file_name, + OptionalIndexSearcherPtr& output_searcher) { + try { + auto compound_reader = std::make_unique( + DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()), file_name.c_str(), + config::inverted_index_read_buffer_size); + + if (!compound_reader) { + LOG(ERROR) << "compound reader is null when get directory for:" << index_dir << "/" + << file_name; + output_searcher = std::nullopt; + return Status::Error( + "compound reader is null"); + } + CLuceneError err; + std::unique_ptr data_in; + std::unique_ptr meta_in; + std::unique_ptr index_in; + + if (!compound_reader->openInput( + InvertedIndexDescriptor::get_temporary_bkd_index_data_file_name().c_str(), + data_in, err) || + !compound_reader->openInput( + InvertedIndexDescriptor::get_temporary_bkd_index_meta_file_name().c_str(), + meta_in, err) || + !compound_reader->openInput( + InvertedIndexDescriptor::get_temporary_bkd_index_file_name().c_str(), index_in, + err)) { + // Consider logging the error or handling it more comprehensively + LOG(ERROR) << "open bkd index input error: {}" << err.what(); + output_searcher = std::nullopt; + return Status::Error( + "open bkd index input error"); + } + auto bkd_reader = std::make_shared(data_in.release()); + if (0 == bkd_reader->read_meta(meta_in.get())) { + VLOG_NOTICE << "bkd index file is empty:" << compound_reader->toString(); + output_searcher = std::nullopt; + return Status::EndOfFile("bkd index file is empty"); + } + + bkd_reader->read_index(index_in.get()); + output_searcher = IndexSearcherPtr {bkd_reader}; + return Status::OK(); + } catch (const CLuceneError& e) { + return Status::Error( + "BKDIndexSearcherBuilder build error: {}", e.what()); + } } InvertedIndexSearcherCache* InvertedIndexSearcherCache::create_global_instance( @@ -98,13 +159,18 @@ InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t } } -Status InvertedIndexSearcherCache::get_index_searcher(const io::FileSystemSPtr& fs, - const std::string& index_dir, - const std::string& file_name, - InvertedIndexCacheHandle* cache_handle, - OlapReaderStatistics* stats, bool use_cache) { +Status InvertedIndexSearcherCache::get_index_searcher( + const io::FileSystemSPtr& fs, const std::string& index_dir, const std::string& file_name, + InvertedIndexCacheHandle* cache_handle, OlapReaderStatistics* stats, + InvertedIndexReaderType reader_type, bool use_cache) { auto file_path = index_dir + "/" + file_name; - + bool exists = false; + RETURN_IF_ERROR(fs->exists(file_path, &exists)); + if (!exists) { + LOG(WARNING) << "inverted index: " << file_path << " not exist."; + return Status::Error( + "inverted index input file {} not found", file_path); + } using namespace std::chrono; auto start_time = steady_clock::now(); Defer cost {[&]() { @@ -119,14 +185,40 @@ Status InvertedIndexSearcherCache::get_index_searcher(const io::FileSystemSPtr& } cache_handle->owned = !use_cache; - IndexSearcherPtr index_searcher = nullptr; + IndexSearcherPtr index_searcher; + std::unique_ptr index_builder = nullptr; auto mem_tracker = std::unique_ptr(new MemTracker("InvertedIndexSearcherCacheWithRead")); #ifndef BE_TEST { SCOPED_RAW_TIMER(&stats->inverted_index_searcher_open_timer); SCOPED_CONSUME_MEM_TRACKER(mem_tracker.get()); - index_searcher = build_index_searcher(fs, index_dir, file_name); + switch (reader_type) { + case InvertedIndexReaderType::STRING_TYPE: + case InvertedIndexReaderType::FULLTEXT: { + index_builder = std::make_unique(); + break; + } + case InvertedIndexReaderType::BKD: { + index_builder = std::make_unique(); + break; + } + + default: + LOG(ERROR) << "InvertedIndexReaderType:" << reader_type_to_string(reader_type) + << " is not support for InvertedIndexSearcherCache"; + return Status::Error( + "InvertedIndexSearcherCache do not support reader type."); + } + OptionalIndexSearcherPtr result; + RETURN_IF_ERROR(index_builder->build(fs, index_dir, file_name, result)); + if (!result.has_value()) { + LOG(ERROR) << "InvertedIndexReaderType:" << reader_type_to_string(reader_type) + << " build for InvertedIndexSearcherCache error"; + return Status::Error( + "InvertedIndexSearcherCache build error."); + } + index_searcher = *result; } #endif @@ -144,7 +236,8 @@ Status InvertedIndexSearcherCache::get_index_searcher(const io::FileSystemSPtr& Status InvertedIndexSearcherCache::insert(const io::FileSystemSPtr& fs, const std::string& index_dir, - const std::string& file_name) { + const std::string& file_name, + InvertedIndexReaderType reader_type) { auto file_path = index_dir + "/" + file_name; using namespace std::chrono; @@ -156,13 +249,39 @@ Status InvertedIndexSearcherCache::insert(const io::FileSystemSPtr& fs, InvertedIndexSearcherCache::CacheKey cache_key(file_path); IndexCacheValuePtr cache_value = std::make_unique(); - IndexSearcherPtr index_searcher = nullptr; + IndexSearcherPtr index_searcher; + std::unique_ptr builder = nullptr; auto mem_tracker = std::unique_ptr(new MemTracker("InvertedIndexSearcherCacheWithInsert")); #ifndef BE_TEST { SCOPED_CONSUME_MEM_TRACKER(mem_tracker.get()); - index_searcher = build_index_searcher(fs, index_dir, file_name); + switch (reader_type) { + case InvertedIndexReaderType::STRING_TYPE: + case InvertedIndexReaderType::FULLTEXT: { + builder = std::make_unique(); + break; + } + case InvertedIndexReaderType::BKD: { + builder = std::make_unique(); + break; + } + + default: + LOG(ERROR) << "InvertedIndexReaderType:" << reader_type_to_string(reader_type) + << " is not support for InvertedIndexSearcherCache"; + return Status::Error( + "InvertedIndexSearcherCache do not support reader type."); + } + OptionalIndexSearcherPtr result; + RETURN_IF_ERROR(builder->build(fs, index_dir, file_name, result)); + if (!result.has_value()) { + LOG(ERROR) << "InvertedIndexReaderType:" << reader_type_to_string(reader_type) + << " build for InvertedIndexSearcherCache error"; + return Status::Error( + "InvertedIndexSearcherCache build error."); + } + index_searcher = *result; } #endif diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.h b/be/src/olap/rowset/segment_v2/inverted_index_cache.h index 79439ac4621794..cb975021967df9 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.h @@ -34,9 +34,11 @@ #include #include +#include #include #include #include +#include #include "common/config.h" #include "common/status.h" @@ -54,16 +56,44 @@ namespace lucene { namespace search { class IndexSearcher; } // namespace search +namespace util { +namespace bkd { +class bkd_reader; +} +} // namespace util } // namespace lucene namespace doris { struct OlapReaderStatistics; namespace segment_v2 { -using IndexSearcherPtr = std::shared_ptr; +using FulltextIndexSearcherPtr = std::shared_ptr; +using BKDIndexSearcherPtr = std::shared_ptr; +using IndexSearcherPtr = std::variant; +using OptionalIndexSearcherPtr = std::optional; class InvertedIndexCacheHandle; +class IndexSearcherBuilder { +public: + virtual Status build(const io::FileSystemSPtr& fs, const std::string& index_dir, + const std::string& file_name, + OptionalIndexSearcherPtr& output_searcher) = 0; + virtual ~IndexSearcherBuilder() = default; +}; + +class FulltextIndexSearcherBuilder : public IndexSearcherBuilder { +public: + Status build(const io::FileSystemSPtr& fs, const std::string& index_dir, + const std::string& file_name, OptionalIndexSearcherPtr& output_searcher) override; +}; + +class BKDIndexSearcherBuilder : public IndexSearcherBuilder { +public: + Status build(const io::FileSystemSPtr& fs, const std::string& index_dir, + const std::string& file_name, OptionalIndexSearcherPtr& output_searcher) override; +}; + class InvertedIndexSearcherCache : public LRUCachePolicy { public: // The cache key of index_searcher lru cache @@ -73,7 +103,7 @@ class InvertedIndexSearcherCache : public LRUCachePolicy { }; // The cache value of index_searcher lru cache. - // Holding a opened index_searcher. + // Holding an opened index_searcher. struct CacheValue : public LRUCacheValueBase { IndexSearcherPtr index_searcher; }; @@ -95,19 +125,16 @@ class InvertedIndexSearcherCache : public LRUCachePolicy { return ExecEnv::GetInstance()->get_inverted_index_searcher_cache(); } - static IndexSearcherPtr build_index_searcher(const io::FileSystemSPtr& fs, - const std::string& index_dir, - const std::string& file_name); - InvertedIndexSearcherCache(size_t capacity, uint32_t num_shards); Status get_index_searcher(const io::FileSystemSPtr& fs, const std::string& index_dir, const std::string& file_name, InvertedIndexCacheHandle* cache_handle, - OlapReaderStatistics* stats, bool use_cache = true); + OlapReaderStatistics* stats, InvertedIndexReaderType reader_type, + bool use_cache = true); // function `insert` called after inverted index writer close Status insert(const io::FileSystemSPtr& fs, const std::string& index_dir, - const std::string& file_name); + const std::string& file_name, InvertedIndexReaderType reader_type); // function `erase` called after compaction remove segment Status erase(const std::string& index_file_path); @@ -211,7 +238,7 @@ class InvertedIndexQueryCache : public LRUCachePolicy { key_buf.append("/"); key_buf.append(column_name); key_buf.append("/"); - auto query_type_str = InvertedIndexQueryType_toString(query_type); + auto query_type_str = query_type_to_string(query_type); if (query_type_str.empty()) { return ""; } diff --git a/be/src/olap/rowset/segment_v2/inverted_index_query_type.h b/be/src/olap/rowset/segment_v2/inverted_index_query_type.h index 1ebfe6359181e9..64171c7739d2d6 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_query_type.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_query_type.h @@ -22,6 +22,50 @@ namespace doris { namespace segment_v2 { +enum class InvertedIndexReaderType { + UNKNOWN = -1, + FULLTEXT = 0, + STRING_TYPE = 1, + BKD = 2, +}; + +template +constexpr const char* InvertedIndexReaderTypeToString(); + +template <> +constexpr const char* InvertedIndexReaderTypeToString() { + return "UNKNOWN"; +} + +template <> +constexpr const char* InvertedIndexReaderTypeToString() { + return "FULLTEXT"; +} + +template <> +constexpr const char* InvertedIndexReaderTypeToString() { + return "STRING_TYPE"; +} + +template <> +constexpr const char* InvertedIndexReaderTypeToString() { + return "BKD"; +} + +inline std::string reader_type_to_string(InvertedIndexReaderType query_type) { + switch (query_type) { + case InvertedIndexReaderType::UNKNOWN: + return InvertedIndexReaderTypeToString(); + case InvertedIndexReaderType::FULLTEXT: + return InvertedIndexReaderTypeToString(); + case InvertedIndexReaderType::STRING_TYPE: + return InvertedIndexReaderTypeToString(); + case InvertedIndexReaderType::BKD: + return InvertedIndexReaderTypeToString(); + } + return ""; // Explicitly handle all cases +} + enum class InvertedIndexQueryType { UNKNOWN_QUERY = -1, EQUAL_QUERY = 0, @@ -34,7 +78,7 @@ enum class InvertedIndexQueryType { MATCH_PHRASE_QUERY = 7, }; -inline std::string InvertedIndexQueryType_toString(InvertedIndexQueryType query_type) { +inline std::string query_type_to_string(InvertedIndexQueryType query_type) { switch (query_type) { case InvertedIndexQueryType::UNKNOWN_QUERY: { return "UNKNOWN"; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp index e870680ef50dbb..599122630d2ad9 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp @@ -42,7 +42,6 @@ #include #include -#include #include #include #include @@ -289,13 +288,6 @@ Status FullTextIndexReader::query(OlapReaderStatistics* stats, RuntimeState* run "inverted index path: {} not exist.", index_file_path.string()); } - auto get_index_search = [this, &index_dir, &index_file_name, &stats]() { - InvertedIndexCacheHandle inverted_index_cache_handle; - static_cast(InvertedIndexSearcherCache::instance()->get_index_searcher( - _fs, index_dir.c_str(), index_file_name, &inverted_index_cache_handle, stats)); - return inverted_index_cache_handle.get_index_searcher(); - }; - std::unique_ptr query; std::wstring field_ws = std::wstring(column_name.begin(), column_name.end()); @@ -324,34 +316,41 @@ Status FullTextIndexReader::query(OlapReaderStatistics* stats, RuntimeState* run term_match_bitmap = cache_handle.get_bitmap(); } else { stats->inverted_index_query_cache_miss++; + InvertedIndexCacheHandle inverted_index_cache_handle; + RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->get_index_searcher( + _fs, index_dir.c_str(), index_file_name, &inverted_index_cache_handle, + stats, type())); + auto searcher_variant = inverted_index_cache_handle.get_index_searcher(); + if (FulltextIndexSearcherPtr* searcher_ptr = + std::get_if(&searcher_variant)) { + term_match_bitmap = std::make_shared(); - auto index_searcher = get_index_search(); - - term_match_bitmap = std::make_shared(); - - Status res = Status::OK(); - if (query_type == InvertedIndexQueryType::MATCH_PHRASE_QUERY) { - auto* phrase_query = new lucene::search::PhraseQuery(); - for (auto& token : analyse_result) { - std::wstring wtoken = StringUtil::string_to_wstring(token); - auto* term = _CLNEW lucene::index::Term(field_ws.c_str(), wtoken.c_str()); - phrase_query->add(term); - _CLDECDELETE(term); + Status res = Status::OK(); + if (query_type == InvertedIndexQueryType::MATCH_PHRASE_QUERY) { + auto* phrase_query = new lucene::search::PhraseQuery(); + for (auto& token : analyse_result) { + std::wstring wtoken = StringUtil::string_to_wstring(token); + auto* term = + _CLNEW lucene::index::Term(field_ws.c_str(), wtoken.c_str()); + phrase_query->add(term); + _CLDECDELETE(term); + } + query.reset(phrase_query); + res = normal_index_search(stats, query_type, *searcher_ptr, + null_bitmap_already_read, query, + term_match_bitmap); + } else { + res = match_all_index_search(stats, runtime_state, field_ws, analyse_result, + *searcher_ptr, term_match_bitmap); + } + if (!res.ok()) { + return res; } - query.reset(phrase_query); - res = normal_index_search(stats, query_type, index_searcher, - null_bitmap_already_read, query, term_match_bitmap); - } else { - res = match_all_index_search(stats, runtime_state, field_ws, analyse_result, - index_searcher, term_match_bitmap); - } - if (!res.ok()) { - return res; - } - // add to cache - term_match_bitmap->runOptimize(); - cache->insert(cache_key, term_match_bitmap, &cache_handle); + // add to cache + term_match_bitmap->runOptimize(); + cache->insert(cache_key, term_match_bitmap, &cache_handle); + } } query_match_bitmap = *term_match_bitmap; } else { @@ -374,26 +373,31 @@ Status FullTextIndexReader::query(OlapReaderStatistics* stats, RuntimeState* run term_match_bitmap = cache_handle.get_bitmap(); } else { stats->inverted_index_query_cache_miss++; - - auto index_searcher = get_index_search(); - - term_match_bitmap = std::make_shared(); - // unique_ptr with custom deleter - std::unique_ptr term { - _CLNEW lucene::index::Term(field_ws.c_str(), token_ws.c_str()), - [](lucene::index::Term* term) { _CLDECDELETE(term); }}; - query.reset(new lucene::search::TermQuery(term.get())); - - Status res = - normal_index_search(stats, query_type, index_searcher, - null_bitmap_already_read, query, term_match_bitmap); - if (!res.ok()) { - return res; + InvertedIndexCacheHandle inverted_index_cache_handle; + RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->get_index_searcher( + _fs, index_dir.c_str(), index_file_name, &inverted_index_cache_handle, + stats, type())); + auto searcher_variant = inverted_index_cache_handle.get_index_searcher(); + if (FulltextIndexSearcherPtr* searcher_ptr = + std::get_if(&searcher_variant)) { + term_match_bitmap = std::make_shared(); + // unique_ptr with custom deleter + std::unique_ptr term { + _CLNEW lucene::index::Term(field_ws.c_str(), token_ws.c_str()), + [](lucene::index::Term* term) { _CLDECDELETE(term); }}; + query.reset(new lucene::search::TermQuery(term.get())); + + Status res = normal_index_search(stats, query_type, *searcher_ptr, + null_bitmap_already_read, query, + term_match_bitmap); + if (!res.ok()) { + return res; + } + + // add to cache + term_match_bitmap->runOptimize(); + cache->insert(cache_key, term_match_bitmap, &cache_handle); } - - // add to cache - term_match_bitmap->runOptimize(); - cache->insert(cache_key, term_match_bitmap, &cache_handle); } // add to query_match_bitmap @@ -428,7 +432,7 @@ Status FullTextIndexReader::query(OlapReaderStatistics* stats, RuntimeState* run Status FullTextIndexReader::normal_index_search( OlapReaderStatistics* stats, InvertedIndexQueryType query_type, - const IndexSearcherPtr& index_searcher, bool& null_bitmap_already_read, + const FulltextIndexSearcherPtr& index_searcher, bool& null_bitmap_already_read, const std::unique_ptr& query, const std::shared_ptr& term_match_bitmap) { check_null_bitmap(index_searcher, null_bitmap_already_read); @@ -463,7 +467,8 @@ Status FullTextIndexReader::normal_index_search( Status FullTextIndexReader::match_all_index_search( OlapReaderStatistics* stats, RuntimeState* runtime_state, const std::wstring& field_ws, - const std::vector& analyse_result, const IndexSearcherPtr& index_searcher, + const std::vector& analyse_result, + const FulltextIndexSearcherPtr& index_searcher, const std::shared_ptr& term_match_bitmap) { TQueryOptions queryOptions = runtime_state->query_options(); try { @@ -479,7 +484,7 @@ Status FullTextIndexReader::match_all_index_search( return Status::OK(); } -void FullTextIndexReader::check_null_bitmap(const IndexSearcherPtr& index_searcher, +void FullTextIndexReader::check_null_bitmap(const FulltextIndexSearcherPtr& index_searcher, bool& null_bitmap_already_read) { // try to reuse index_searcher's directory to read null_bitmap to cache // to avoid open directory additionally for null_bitmap @@ -580,56 +585,60 @@ Status StringTypeInvertedIndexReader::query(OlapReaderStatistics* stats, roaring::Roaring result; InvertedIndexCacheHandle inverted_index_cache_handle; - static_cast(InvertedIndexSearcherCache::instance()->get_index_searcher( - _fs, index_dir.c_str(), index_file_name, &inverted_index_cache_handle, stats)); - auto index_searcher = inverted_index_cache_handle.get_index_searcher(); - - // try to reuse index_searcher's directory to read null_bitmap to cache - // to avoid open directory additionally for null_bitmap - InvertedIndexQueryCacheHandle null_bitmap_cache_handle; - static_cast( - read_null_bitmap(&null_bitmap_cache_handle, index_searcher->getReader()->directory())); - - try { - if (query_type == InvertedIndexQueryType::MATCH_ANY_QUERY || - query_type == InvertedIndexQueryType::MATCH_ALL_QUERY || - query_type == InvertedIndexQueryType::EQUAL_QUERY) { - SCOPED_RAW_TIMER(&stats->inverted_index_searcher_search_timer); - index_searcher->_search(query.get(), [&result](DocRange* doc_range) { - if (doc_range->type_ == DocRangeType::kMany) { - result.addMany(doc_range->doc_many_size_, doc_range->doc_many->data()); - } else { - result.addRange(doc_range->doc_range.first, doc_range->doc_range.second); - } - }); - } else { - SCOPED_RAW_TIMER(&stats->inverted_index_searcher_search_timer); - index_searcher->_search(query.get(), - [&result](const int32_t docid, const float_t /*score*/) { - // docid equal to rowid in segment - result.add(docid); - }); - } - } catch (const CLuceneError& e) { - if (_is_range_query(query_type) && e.number() == CL_ERR_TooManyClauses) { - return Status::Error( - "range query term exceeds limits, try to downgrade from inverted index, column " - "name:{}, search_str:{}", - column_name, search_str); - } else { - return Status::Error( - "CLuceneError occured, error msg: {}, column name: {}, search_str: {}", - e.what(), column_name, search_str); + RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->get_index_searcher( + _fs, index_dir.c_str(), index_file_name, &inverted_index_cache_handle, stats, type())); + auto searcher_variant = inverted_index_cache_handle.get_index_searcher(); + if (FulltextIndexSearcherPtr* index_searcher = + std::get_if(&searcher_variant)) { + // try to reuse index_searcher's directory to read null_bitmap to cache + // to avoid open directory additionally for null_bitmap + InvertedIndexQueryCacheHandle null_bitmap_cache_handle; + static_cast(read_null_bitmap(&null_bitmap_cache_handle, + (*index_searcher)->getReader()->directory())); + + try { + if (query_type == InvertedIndexQueryType::MATCH_ANY_QUERY || + query_type == InvertedIndexQueryType::MATCH_ALL_QUERY || + query_type == InvertedIndexQueryType::EQUAL_QUERY) { + SCOPED_RAW_TIMER(&stats->inverted_index_searcher_search_timer); + (*index_searcher)->_search(query.get(), [&result](DocRange* doc_range) { + if (doc_range->type_ == DocRangeType::kMany) { + result.addMany(doc_range->doc_many_size_, doc_range->doc_many->data()); + } else { + result.addRange(doc_range->doc_range.first, doc_range->doc_range.second); + } + }); + } else { + SCOPED_RAW_TIMER(&stats->inverted_index_searcher_search_timer); + (*index_searcher) + ->_search(query.get(), + [&result](const int32_t docid, const float_t /*score*/) { + // docid equal to rowid in segment + result.add(docid); + }); + } + } catch (const CLuceneError& e) { + if (_is_range_query(query_type) && e.number() == CL_ERR_TooManyClauses) { + return Status::Error( + "range query term exceeds limits, try to downgrade from inverted index, " + "column " + "name:{}, search_str:{}", + column_name, search_str); + } else { + return Status::Error( + "CLuceneError occured, error msg: {}, column name: {}, search_str: {}", + e.what(), column_name, search_str); + } } - } - // add to cache - std::shared_ptr term_match_bitmap = - std::make_shared(result); - term_match_bitmap->runOptimize(); - cache->insert(cache_key, term_match_bitmap, &cache_handle); + // add to cache + std::shared_ptr term_match_bitmap = + std::make_shared(result); + term_match_bitmap->runOptimize(); + cache->insert(cache_key, term_match_bitmap, &cache_handle); - bit_map->swap(result); + bit_map->swap(result); + } return Status::OK(); } @@ -639,22 +648,11 @@ InvertedIndexReaderType StringTypeInvertedIndexReader::type() { BkdIndexReader::BkdIndexReader(io::FileSystemSPtr fs, const std::string& path, const TabletIndex* index_meta) - : InvertedIndexReader(fs, path, index_meta), _compoundReader(nullptr) { + : InvertedIndexReader(fs, path, index_meta) { io::Path io_path(_path); - auto index_dir = io_path.parent_path(); - auto index_file_name = InvertedIndexDescriptor::get_index_file_name(io_path.filename(), - index_meta->index_id()); - - // check index file existence - auto index_file = index_dir / index_file_name; - if (!indexExists(index_file)) { - LOG(WARNING) << "bkd index: " << index_file.string() << " not exist."; - return; - } - _file_full_path = index_file; - _compoundReader = std::make_unique( - DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()), index_file_name.c_str(), - config::inverted_index_read_buffer_size); + _index_dir = io_path.parent_path(); + _index_file_name = InvertedIndexDescriptor::get_index_file_name(io_path.filename(), + index_meta->index_id()); } Status BkdIndexReader::new_iterator(OlapReaderStatistics* stats, RuntimeState* runtime_state, @@ -701,12 +699,21 @@ Status BkdIndexReader::try_query(OlapReaderStatistics* stats, const std::string& uint32_t* count) { auto visitor = std::make_unique(nullptr, query_type, true); std::shared_ptr r; - RETURN_IF_ERROR(get_bkd_reader(&r)); + auto st = get_bkd_reader(r, stats); + if (!st.ok()) { + // empty bkd index file, just return + if (st.code() == ErrorCode::END_OF_FILE) { + return Status::OK(); + } + LOG(WARNING) << "get bkd reader for " << _index_dir / _index_file_name + << " failed: " << st; + return st; + } std::string query_str; _value_key_coder->full_encode_ascending(query_value, &query_str); - InvertedIndexQueryCache::CacheKey cache_key {_file_full_path, column_name, query_type, - query_str}; + InvertedIndexQueryCache::CacheKey cache_key {_index_dir / _index_file_name, column_name, + query_type, query_str}; auto cache = InvertedIndexQueryCache::instance(); InvertedIndexQueryCacheHandle cache_handler; roaring::Roaring bit_map; @@ -716,14 +723,7 @@ Status BkdIndexReader::try_query(OlapReaderStatistics* stats, const std::string& return Status::OK(); } try { - auto st = bkd_query(stats, column_name, query_value, query_type, r, visitor.get()); - if (!st.ok()) { - if (st.code() == ErrorCode::END_OF_FILE) { - return Status::OK(); - } - LOG(WARNING) << "bkd_query for column " << column_name << " failed: " << st; - return st; - } + RETURN_IF_ERROR(bkd_query(stats, column_name, query_value, query_type, r, visitor.get())); *count = r->estimate_point_count(visitor.get()); } catch (const CLuceneError& e) { return Status::Error( @@ -753,16 +753,28 @@ Status BkdIndexReader::query(OlapReaderStatistics* stats, RuntimeState* runtime_ const std::string& column_name, const void* query_value, InvertedIndexQueryType query_type, roaring::Roaring* bit_map) { SCOPED_RAW_TIMER(&stats->inverted_index_query_timer); + /*if (_bkd_reader == nullptr) { + LOG(WARNING) << "bkd index input file {} not found" << _file_full_path; + return Status::EndOfFile("bkd index file is empty"); + }*/ auto visitor = std::make_unique(bit_map, query_type); std::shared_ptr r; - RETURN_IF_ERROR(get_bkd_reader(&r)); - + auto st = get_bkd_reader(r, stats); + if (!st.ok()) { + // empty bkd index file, just return + if (st.code() == ErrorCode::END_OF_FILE) { + return Status::OK(); + } + LOG(WARNING) << "get bkd reader for " << _index_dir / _index_file_name + << " failed: " << st; + return st; + } std::string query_str; _value_key_coder->full_encode_ascending(query_value, &query_str); - InvertedIndexQueryCache::CacheKey cache_key {_file_full_path, column_name, query_type, - query_str}; + InvertedIndexQueryCache::CacheKey cache_key {_index_dir / _index_file_name, column_name, + query_type, query_str}; auto cache = InvertedIndexQueryCache::instance(); InvertedIndexQueryCacheHandle cache_handler; auto cache_status = handle_cache(cache, cache_key, &cache_handler, stats, bit_map); @@ -771,14 +783,7 @@ Status BkdIndexReader::query(OlapReaderStatistics* stats, RuntimeState* runtime_ } try { - auto st = bkd_query(stats, column_name, query_value, query_type, r, visitor.get()); - if (!st.ok()) { - if (st.code() == ErrorCode::END_OF_FILE) { - return Status::OK(); - } - LOG(WARNING) << "bkd_query for column " << column_name << " failed: " << st; - return st; - } + RETURN_IF_ERROR(bkd_query(stats, column_name, query_value, query_type, r, visitor.get())); r->intersect(visitor.get()); } catch (const CLuceneError& e) { return Status::Error( @@ -795,45 +800,26 @@ Status BkdIndexReader::query(OlapReaderStatistics* stats, RuntimeState* runtime_ return Status::OK(); } -Status BkdIndexReader::get_bkd_reader(std::shared_ptr* bkdReader) { - // bkd file reader - if (_compoundReader == nullptr) { - return Status::Error( - "bkd index input file not found"); - } - CLuceneError err; - std::unique_ptr data_in; - std::unique_ptr meta_in; - std::unique_ptr index_in; - - if (!_compoundReader->openInput( - InvertedIndexDescriptor::get_temporary_bkd_index_data_file_name().c_str(), data_in, - err) || - !_compoundReader->openInput( - InvertedIndexDescriptor::get_temporary_bkd_index_meta_file_name().c_str(), meta_in, - err) || - !_compoundReader->openInput( - InvertedIndexDescriptor::get_temporary_bkd_index_file_name().c_str(), index_in, - err)) { - return Status::Error("bkd index input error: {}", - err.what()); - } - - *bkdReader = std::make_shared(data_in.release()); - if (0 == (*bkdReader)->read_meta(meta_in.get())) { - VLOG_NOTICE << "bkd index file is empty:" << _compoundReader->toString(); - return Status::EndOfFile("bkd index file is empty"); - } - - (*bkdReader)->read_index(index_in.get()); - - _type_info = get_scalar_type_info((FieldType)(*bkdReader)->type); - if (_type_info == nullptr) { - return Status::Error( - "unsupported typeinfo, type={}", (*bkdReader)->type); +Status BkdIndexReader::get_bkd_reader(BKDIndexSearcherPtr& bkd_reader, + OlapReaderStatistics* stats) { + InvertedIndexCacheHandle inverted_index_cache_handle; + RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->get_index_searcher( + _fs, _index_dir.c_str(), _index_file_name, &inverted_index_cache_handle, stats, + type())); + auto searcher_variant = inverted_index_cache_handle.get_index_searcher(); + auto* bkd_searcher = std::get_if(&searcher_variant); + if (bkd_searcher) { + _type_info = get_scalar_type_info((FieldType)(*bkd_searcher)->type); + if (_type_info == nullptr) { + return Status::Error( + "unsupported typeinfo, type={}", (*bkd_searcher)->type); + } + _value_key_coder = get_key_coder(_type_info->type()); + bkd_reader = *bkd_searcher; + return Status::OK(); } - _value_key_coder = get_key_coder(_type_info->type()); - return Status::OK(); + return Status::Error( + "get bkd reader from searcher cache builder error"); } InvertedIndexReaderType BkdIndexReader::type() { diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.h b/be/src/olap/rowset/segment_v2/inverted_index_reader.h index 20c5c731f9eca8..b1801142c3fa15 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_reader.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.h @@ -59,15 +59,6 @@ namespace segment_v2 { class InvertedIndexIterator; class InvertedIndexQueryCacheHandle; -enum class InvertedIndexReaderType { - UNKNOWN = -1, - FULLTEXT = 0, - STRING_TYPE = 1, - BKD = 2, -}; - -using IndexSearcherPtr = std::shared_ptr; - class InvertedIndexReader : public std::enable_shared_from_this { public: explicit InvertedIndexReader(io::FileSystemSPtr fs, const std::string& path, @@ -141,7 +132,7 @@ class FullTextIndexReader : public InvertedIndexReader { private: Status normal_index_search(OlapReaderStatistics* stats, InvertedIndexQueryType query_type, - const IndexSearcherPtr& index_searcher, + const FulltextIndexSearcherPtr& index_searcher, bool& null_bitmap_already_read, const std::unique_ptr& query, const std::shared_ptr& term_match_bitmap); @@ -149,10 +140,11 @@ class FullTextIndexReader : public InvertedIndexReader { Status match_all_index_search(OlapReaderStatistics* stats, RuntimeState* runtime_state, const std::wstring& field_ws, const std::vector& analyse_result, - const IndexSearcherPtr& index_searcher, + const FulltextIndexSearcherPtr& index_searcher, const std::shared_ptr& term_match_bitmap); - void check_null_bitmap(const IndexSearcherPtr& index_searcher, bool& null_bitmap_already_read); + void check_null_bitmap(const FulltextIndexSearcherPtr& index_searcher, + bool& null_bitmap_already_read); }; class StringTypeInvertedIndexReader : public InvertedIndexReader { @@ -216,25 +208,13 @@ class BkdIndexReader : public InvertedIndexReader { ENABLE_FACTORY_CREATOR(BkdIndexReader); private: - std::string _file_full_path; + std::string _index_file_name; + io::Path _index_dir; public: explicit BkdIndexReader(io::FileSystemSPtr fs, const std::string& path, const TabletIndex* index_meta); - ~BkdIndexReader() override { - if (_compoundReader != nullptr) { - try { - _compoundReader->close(); - } catch (const CLuceneError& e) { - // Handle exception, e.g., log it, but don't rethrow. - LOG(ERROR) << "Exception caught in BkdIndexReader destructor: " << e.what() - << std::endl; - } catch (...) { - // Handle all other exceptions, but don't rethrow. - LOG(ERROR) << "Unknown exception caught in BkdIndexReader destructor." << std::endl; - } - } - } + ~BkdIndexReader() override = default; Status new_iterator(OlapReaderStatistics* stats, RuntimeState* runtime_state, std::unique_ptr* iterator) override; @@ -256,12 +236,11 @@ class BkdIndexReader : public InvertedIndexReader { roaring::Roaring* bit_map); InvertedIndexReaderType type() override; - Status get_bkd_reader(std::shared_ptr* reader); + Status get_bkd_reader(BKDIndexSearcherPtr& reader, OlapReaderStatistics* stats); private: const TypeInfo* _type_info {}; const KeyCoder* _value_key_coder {}; - std::unique_ptr _compoundReader; }; class InvertedIndexIterator { @@ -274,7 +253,7 @@ class InvertedIndexIterator { Status read_from_inverted_index(const std::string& column_name, const void* query_value, InvertedIndexQueryType query_type, uint32_t segment_num_rows, - roaring::Roaring* bit_map, bool skip_try = false); + roaring::Roaring* bit_map, bool skip_try = true); Status try_read_from_inverted_index(const std::string& column_name, const void* query_value, InvertedIndexQueryType query_type, uint32_t* count); diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp index 5c53a91c3d8cfc..a532708371b2e3 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp @@ -118,8 +118,11 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { // open index searcher into cache auto index_file_name = InvertedIndexDescriptor::get_index_file_name( _segment_file_name, _index_meta->index_id()); - static_cast(InvertedIndexSearcherCache::instance()->insert(_fs, _directory, - index_file_name)); + auto st = InvertedIndexSearcherCache::instance()->insert( + _fs, _directory, index_file_name, InvertedIndexReaderType::FULLTEXT); + if (!st.ok()) { + LOG(ERROR) << "insert inverted index searcher cache error:" << st; + } } } } @@ -141,7 +144,6 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { auto index_path = InvertedIndexDescriptor::get_temporary_index_path( _directory + "/" + _segment_file_name, _index_meta->index_id()); - // LOG(INFO) << "inverted index path: " << index_path; bool exists = false; auto st = _fs->exists(index_path.c_str(), &exists); if (!st.ok()) { @@ -151,12 +153,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { } if (exists) { LOG(ERROR) << "try to init a directory:" << index_path << " already exists"; - return Status::InternalError("init_fulltext_index a directory already exists"); - //st = _fs->delete_directory(index_path.c_str()); - //if (!st.ok()) { - // LOG(ERROR) << "delete directory:" << index_path << " error:" << st; - // return st; - //} + return Status::InternalError("init_fulltext_index directory already exists"); } _char_string_reader = std::make_unique>(); diff --git a/be/src/olap/schema.cpp b/be/src/olap/schema.cpp index 8f786a81bf68b5..c59f49cb3ba4ad 100644 --- a/be/src/olap/schema.cpp +++ b/be/src/olap/schema.cpp @@ -180,6 +180,7 @@ vectorized::IColumn::MutablePtr Schema::get_predicate_column_ptr(const Field& fi break; case FieldType::OLAP_FIELD_TYPE_VARCHAR: case FieldType::OLAP_FIELD_TYPE_STRING: + case FieldType::OLAP_FIELD_TYPE_JSONB: if (config::enable_low_cardinality_optimize && reader_type == ReaderType::READER_QUERY) { ptr = doris::vectorized::ColumnDictionary::create( field.type()); diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 24096d1b69358c..c1325c174f64ad 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -274,13 +274,15 @@ Status EnginePublishVersionTask::finish() { (*_succ_tablets)[tablet_id] = 0; } else { add_error_tablet_id(tablet_id); - LOG(WARNING) - << "publish version failed on transaction, tablet version not " - "exists. " - << "transaction_id=" << transaction_id - << ", tablet_id=" << tablet_id - << ", tablet_state=" << tablet_state_name(tablet->tablet_state()) - << ", version=" << par_ver_info.version; + if (!res.is()) { + LOG(WARNING) + << "publish version failed on transaction, tablet version not " + "exists. " + << "transaction_id=" << transaction_id + << ", tablet_id=" << tablet_id << ", tablet_state=" + << tablet_state_name(tablet->tablet_state()) + << ", version=" << par_ver_info.version; + } } } } diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 10cc44b80bae2d..42ab71a45471d4 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -114,6 +114,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _split_block_distribute_by_channel_timer = ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime"); _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1); + _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1); _overall_throughput = _profile->add_derived_counter( "OverallThroughput", TUnit::BYTES_PER_SECOND, std::bind(&RuntimeProfile::units_per_second, _bytes_sent_counter, @@ -309,6 +310,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block SourceState source_state) { auto& local_state = get_local_state(state); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows()); + COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows()); SCOPED_TIMER(local_state.exec_time_counter()); local_state._peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); bool all_receiver_eof = true; diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 1c7b6cbdaad1af..bc9c26e36ea22c 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -146,6 +146,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; } RuntimeProfile::Counter* blocks_sent_counter() { return _blocks_sent_counter; } + RuntimeProfile::Counter* rows_sent_counter() { return _rows_sent_counter; } RuntimeProfile::Counter* local_send_timer() { return _local_send_timer; } RuntimeProfile::Counter* local_bytes_send_counter() { return _local_bytes_send_counter; } RuntimeProfile::Counter* local_sent_rows() { return _local_sent_rows; } @@ -192,6 +193,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { RuntimeProfile::Counter* _split_block_hash_compute_timer = nullptr; RuntimeProfile::Counter* _split_block_distribute_by_channel_timer = nullptr; RuntimeProfile::Counter* _blocks_sent_counter = nullptr; + RuntimeProfile::Counter* _rows_sent_counter = nullptr; // Throughput per total time spent in sender RuntimeProfile::Counter* _overall_throughput = nullptr; // Used to counter send bytes under local data exchange diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 321642bdba0629..e9a51d56ad13fd 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -64,7 +64,8 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) state->execution_timeout())); _result_sink_dependency = ResultSinkDependency::create_shared(_parent->operator_id(), _parent->node_id()); - + _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1); + _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1); ((PipBufferControlBlock*)_sender.get())->set_dependency(_result_sink_dependency); return Status::OK(); } @@ -131,6 +132,8 @@ Status ResultSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); + COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows()); + COUNTER_UPDATE(local_state.blocks_sent_counter(), 1); if (_fetch_option.use_two_phase_fetch && block->rows() > 0) { RETURN_IF_ERROR(_second_phase_fetch_data(state, block)); } diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index 9bda54e79b6907..93ce397b840786 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -62,6 +62,8 @@ class ResultSinkLocalState final : public PipelineXSinkLocalState<> { Status open(RuntimeState* state) override; Status close(RuntimeState* state, Status exec_status) override; WriteDependency* dependency() override { return _result_sink_dependency.get(); } + RuntimeProfile::Counter* blocks_sent_counter() { return _blocks_sent_counter; } + RuntimeProfile::Counter* rows_sent_counter() { return _rows_sent_counter; } private: friend class ResultSinkOperatorX; @@ -71,6 +73,8 @@ class ResultSinkLocalState final : public PipelineXSinkLocalState<> { std::shared_ptr _sender; std::shared_ptr _writer; std::shared_ptr _result_sink_dependency; + RuntimeProfile::Counter* _blocks_sent_counter = nullptr; + RuntimeProfile::Counter* _rows_sent_counter = nullptr; }; class ResultSinkOperatorX final : public DataSinkOperatorX { diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 85ccf8620cabeb..fbbf2e05d7f770 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -945,7 +945,7 @@ Status ScanLocalState::_normalize_noneq_binary_predicate( DCHECK(expr->children().size() == 2); auto noneq_checker = [](const std::string& fn_name) { - return fn_name != "ne" && fn_name != "eq"; + return fn_name != "ne" && fn_name != "eq" && fn_name != "eq_for_null"; }; StringRef value; int slot_ref_child = -1; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 8885a0601a1888..e989af75b20e2e 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -80,10 +80,12 @@ Status BlockedTaskScheduler::add_blocked_task(PipelineTask* task) { if (!static_cast(task)->push_blocked_task_to_queue()) { // put this task into current dependency's blocking queue and wait for event notification // instead of using a separate BlockedTaskScheduler. + task->set_running(false); return Status::OK(); } _blocked_tasks.push_back(task); _task_cond.notify_one(); + task->set_running(false); return Status::OK(); } @@ -337,7 +339,6 @@ void TaskScheduler::_do_work(size_t index) { } auto pipeline_state = task->get_state(); - task->set_running(false); switch (pipeline_state) { case PipelineTaskState::BLOCKED_FOR_SOURCE: case PipelineTaskState::BLOCKED_FOR_SINK: @@ -346,6 +347,7 @@ void TaskScheduler::_do_work(size_t index) { static_cast(_blocked_task_scheduler->add_blocked_task(task)); break; case PipelineTaskState::RUNNABLE: + task->set_running(false); static_cast(_task_queue->push_back(task, index)); break; default: @@ -368,9 +370,8 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state, auto try_close_failed = !status.ok() && state != PipelineTaskState::CANCELED; if (try_close_failed) { cancel(); - // Call `close` if `try_close` failed to make sure allocated resources are released - static_cast(task->close(exec_status)); - } else if (!task->is_pipelineX() && task->is_pending_finish()) { + } + if (!task->is_pipelineX() && task->is_pending_finish()) { task->set_state(PipelineTaskState::PENDING_FINISH); static_cast(_blocked_task_scheduler->add_blocked_task(task)); task->set_running(false); @@ -380,6 +381,7 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state, task->set_running(false); return; } + status = task->close(exec_status); if (!status.ok() && state != PipelineTaskState::CANCELED) { cancel(); diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp index c54bbbd99dd99a..60e7c57a6c12fa 100644 --- a/be/src/runtime/routine_load/data_consumer_group.cpp +++ b/be/src/runtime/routine_load/data_consumer_group.cpp @@ -125,10 +125,9 @@ Status KafkaDataConsumerGroup::start_all(std::shared_ptr ctx, bool eos = false; while (true) { if (eos || left_time <= 0 || left_rows <= 0 || left_bytes <= 0) { - _rows = ctx->max_batch_rows - left_rows; LOG(INFO) << "consumer group done: " << _grp_id << ". consume time(ms)=" << ctx->max_interval_s * 1000 - left_time - << ", received rows=" << _rows + << ", received rows=" << ctx->max_batch_rows - left_rows << ", received bytes=" << ctx->max_batch_size - left_bytes << ", eos: " << eos << ", left_time: " << left_time << ", left_rows: " << left_rows << ", left_bytes: " << left_bytes diff --git a/be/src/runtime/routine_load/data_consumer_group.h b/be/src/runtime/routine_load/data_consumer_group.h index 0cda80a9ec4dc3..e15ad7115f6e9a 100644 --- a/be/src/runtime/routine_load/data_consumer_group.h +++ b/be/src/runtime/routine_load/data_consumer_group.h @@ -60,10 +60,6 @@ class DataConsumerGroup { ++_counter; } - int64_t get_consumer_rows() const { return _rows; } - - void set_consumer_rows(int64_t rows) { _rows = rows; } - // start all consumers virtual Status start_all(std::shared_ptr ctx, std::shared_ptr kafka_pipe) { @@ -81,8 +77,6 @@ class DataConsumerGroup { // when the counter becomes zero, shutdown the queue to finish std::mutex _mutex; int _counter; - // received total rows - int64_t _rows {0}; }; // for kafka diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index ca43ee055cb2e9..d14b2999c14395 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -354,15 +354,6 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr ctx, // wait for all consumers finished HANDLE_ERROR(ctx->future.get(), "consume failed"); - // check received and load rows - LOG(INFO) << "routine load task received rows: " << consumer_grp.get()->get_consumer_rows() - << " load total rows: " << ctx.get()->number_total_rows - << " loaded rows: " << ctx.get()->number_loaded_rows - << " filtered rows: " << ctx.get()->number_filtered_rows - << " unselected rows: " << ctx.get()->number_unselected_rows; - DCHECK(consumer_grp.get()->get_consumer_rows() == ctx.get()->number_total_rows); - consumer_grp.get()->set_consumer_rows(0); - ctx->load_cost_millis = UnixMillis() - ctx->start_millis; // return the consumer back to pool diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 6c24caaf63ab9e..eff008ddc37f59 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -244,10 +244,23 @@ int64_t MemInfo::tg_not_enable_overcommit_group_gc() { return 0; } + std::vector task_groups_overcommit; + for (const auto& task_group : task_groups) { + taskgroup::TaskGroupInfo tg_info; + task_group->task_group_info(&tg_info); + if (task_group->memory_used() > tg_info.memory_limit) { + task_groups_overcommit.push_back(task_group); + } + } + if (task_groups_overcommit.empty()) { + return 0; + } + LOG(INFO) << fmt::format( - "[MemoryGC] start GC work load group that not enable overcommit, number of group: {}, " + "[MemoryGC] start GC work load group that not enable overcommit, number of overcommit " + "group: {}, " "if it exceeds the limit, try free size = (group used - group limit).", - task_groups.size()); + task_groups_overcommit.size()); Defer defer {[&]() { if (total_free_memory > 0) { @@ -255,13 +268,14 @@ int64_t MemInfo::tg_not_enable_overcommit_group_gc() { tg_profile->pretty_print(&ss); LOG(INFO) << fmt::format( "[MemoryGC] end GC work load group that not enable overcommit, number of " - "group: {}, free memory {}. cost(us): {}, details: {}", - task_groups.size(), PrettyPrinter::print(total_free_memory, TUnit::BYTES), + "overcommit group: {}, free memory {}. cost(us): {}, details: {}", + task_groups_overcommit.size(), + PrettyPrinter::print(total_free_memory, TUnit::BYTES), watch.elapsed_time() / 1000, ss.str()); } }}; - for (const auto& task_group : task_groups) { + for (const auto& task_group : task_groups_overcommit) { taskgroup::TaskGroupInfo tg_info; task_group->task_group_info(&tg_info); auto used = task_group->memory_used(); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index d711aa43b4a9d1..305bafe10f793d 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -970,7 +970,7 @@ Status VScanNode::_normalize_noneq_binary_predicate(VExpr* expr, VExprContext* e DCHECK(expr->children().size() == 2); auto noneq_checker = [](const std::string& fn_name) { - return fn_name != "ne" && fn_name != "eq"; + return fn_name != "ne" && fn_name != "eq" && fn_name != "eq_for_null"; }; StringRef value; int slot_ref_child = -1; diff --git a/be/src/vec/functions/comparison_equal_for_null.cpp b/be/src/vec/functions/comparison_equal_for_null.cpp index 983d40424001be..36c6a86f6dafe1 100644 --- a/be/src/vec/functions/comparison_equal_for_null.cpp +++ b/be/src/vec/functions/comparison_equal_for_null.cpp @@ -66,6 +66,37 @@ class FunctionEqForNull : public IFunction { size_t result, size_t input_rows_count) const override { ColumnWithTypeAndName& col_left = block.get_by_position(arguments[0]); ColumnWithTypeAndName& col_right = block.get_by_position(arguments[1]); + bool left_only_null = col_left.column->only_null(); + bool right_only_null = col_right.column->only_null(); + if (left_only_null && right_only_null) { + auto result_column = ColumnVector::create(input_rows_count, 1); + block.get_by_position(result).column = std::move(result_column); + return Status::OK(); + } else if (left_only_null) { + auto right_type_nullable = col_right.type->is_nullable(); + if (!right_type_nullable) { + block.get_by_position(result).column = + ColumnVector::create(input_rows_count); + } else { + auto const* nullable_right_col = + assert_cast(col_right.column.get()); + block.get_by_position(result).column = + nullable_right_col->get_null_map_column().clone_resized(input_rows_count); + } + return Status::OK(); + } else if (right_only_null) { + auto left_type_nullable = col_left.type->is_nullable(); + if (!left_type_nullable) { + block.get_by_position(result).column = + ColumnVector::create(input_rows_count, (UInt8)0); + } else { + auto const* nullable_left_col = + assert_cast(col_left.column.get()); + block.get_by_position(result).column = + nullable_left_col->get_null_map_column().clone_resized(input_rows_count); + } + return Status::OK(); + } const auto& [left_col, left_const] = unpack_if_const(col_left.column); const auto& [right_col, right_const] = unpack_if_const(col_right.column); @@ -153,15 +184,15 @@ class FunctionEqForNull : public IFunction { bool right_const) { if (left_const) { for (int i = 0; i < rows; ++i) { - result[i] |= left[0] & (left[0] == right[i]); + result[i] &= (left[0] == right[i]); } } else if (right_const) { for (int i = 0; i < rows; ++i) { - result[i] |= left[i] & (left[i] == right[0]); + result[i] &= (left[i] == right[0]); } } else { for (int i = 0; i < rows; ++i) { - result[i] |= left[i] & (left[i] == right[i]); + result[i] &= (left[i] == right[i]); } } } diff --git a/docs/en/docs/advanced/partition/auto-partition.md b/docs/en/docs/advanced/partition/auto-partition.md index c718f75454d495..3e6fc6dc8e8f1d 100644 --- a/docs/en/docs/advanced/partition/auto-partition.md +++ b/docs/en/docs/advanced/partition/auto-partition.md @@ -26,7 +26,7 @@ under the License. # AUTO PARTITION - + diff --git a/docs/en/docs/sql-manual/sql-functions/date-time-functions/date-trunc.md b/docs/en/docs/sql-manual/sql-functions/date-time-functions/date-trunc.md index d9274b84047c96..23a1c25393e7f2 100644 --- a/docs/en/docs/sql-manual/sql-functions/date-time-functions/date-trunc.md +++ b/docs/en/docs/sql-manual/sql-functions/date-time-functions/date-trunc.md @@ -26,12 +26,8 @@ under the License. ## date_trunc - - date_trunc - - ### Description #### Syntax @@ -42,8 +38,8 @@ Truncates datetime in the specified time unit. datetime is a legal date expression. -unit is the time unit you want to truncate. The optional values are as follows: [`second`,`minute`,`hour`,`day`,`week`,`month`,`quarter`,`year`]。 -If unit does not meet the above optional values, the result will return NULL. +unit is the time unit you want to truncate. The optional values are as follows: [`second`,`minute`,`hour`,`day`,`week`,`month`,`quarter`,`year`]. + ### example ``` @@ -104,4 +100,5 @@ mysql> select date_trunc('2010-12-02 19:28:30', 'year'); +-------------------------------------------------+ ``` ### keywords - DATE_TRUNC,DATE,DATETIME + +DATE_TRUNC,DATE,DATETIME diff --git a/docs/zh-CN/docs/advanced/partition/auto-partition.md b/docs/zh-CN/docs/advanced/partition/auto-partition.md index 193a0868b3728b..42c07581c2ef6f 100644 --- a/docs/zh-CN/docs/advanced/partition/auto-partition.md +++ b/docs/zh-CN/docs/advanced/partition/auto-partition.md @@ -26,7 +26,7 @@ under the License. # 自动分区 - + diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/date-time-functions/date-trunc.md b/docs/zh-CN/docs/sql-manual/sql-functions/date-time-functions/date-trunc.md index 44cd68e6e466d2..d1ce23dac7c06c 100644 --- a/docs/zh-CN/docs/sql-manual/sql-functions/date-time-functions/date-trunc.md +++ b/docs/zh-CN/docs/sql-manual/sql-functions/date-time-functions/date-trunc.md @@ -26,12 +26,8 @@ under the License. ## date_trunc - - date_trunc - - ### description #### Syntax @@ -43,7 +39,7 @@ date_trunc datetime 参数是合法的日期表达式。 unit 参数是您希望截断的时间间隔,可选的值如下:[`second`,`minute`,`hour`,`day`,`week`,`month`,`quarter`,`year`]。 -如果unit 不符合上述可选值,结果将返回NULL。 + ### example ``` @@ -106,4 +102,4 @@ mysql> select date_trunc('2010-12-02 19:28:30', 'year'); ### keywords - DATE_TRUNC,DATE,TRUNC +DATE_TRUNC,DATE,TRUNC diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java index fba2e486fb6574..224d82c4016c8b 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java @@ -33,6 +33,7 @@ import org.apache.doris.journal.bdbje.BDBToolOptions; import org.apache.doris.persist.meta.MetaReader; import org.apache.doris.qe.QeService; +import org.apache.doris.qe.SimpleScheduler; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FeServer; import org.apache.doris.service.FrontendOptions; @@ -194,6 +195,8 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star Env.getCurrentEnv().setHttpReady(true); } + SimpleScheduler.init(); + if (options.enableQeService) { QeService qeService = new QeService(Config.query_port, Config.arrow_flight_sql_port, ExecuteEnv.getInstance().getScheduler()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 5bd66e71374226..ef7acb484a1e86 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -641,25 +641,6 @@ private void analyzeSubquery(Analyzer analyzer, boolean skipCheck) throws UserEx checkColumnCoverage(mentionedColumns, targetTable.getBaseSchema()); realTargetColumnNames = targetColumns.stream().map(Column::getName).collect(Collectors.toList()); - Map slotToIndex = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - for (int i = 0; i < queryStmt.getResultExprs().size(); i++) { - Expr expr = queryStmt.getResultExprs().get(i); - if (!(expr instanceof StringLiteral && ((StringLiteral) expr).getValue() - .equals(SelectStmt.DEFAULT_VALUE))) { - slotToIndex.put(realTargetColumnNames.get(i), queryStmt.getResultExprs().get(i) - .checkTypeCompatibility(targetTable.getColumn(realTargetColumnNames.get(i)).getType())); - } - } - - for (Column column : targetTable.getBaseSchema()) { - if (!slotToIndex.containsKey(column.getName())) { - if (column.getDefaultValue() == null) { - slotToIndex.put(column.getName(), new NullLiteral()); - } else { - slotToIndex.put(column.getName(), new StringLiteral(column.getDefaultValue())); - } - } - } // handle VALUES() or SELECT constant list if (isValuesOrConstantSelect) { @@ -668,7 +649,8 @@ private void analyzeSubquery(Analyzer analyzer, boolean skipCheck) throws UserEx // INSERT INTO VALUES(...) List> rows = selectStmt.getValueList().getRows(); for (int rowIdx = 0; rowIdx < rows.size(); ++rowIdx) { - analyzeRow(analyzer, targetColumns, rows, rowIdx, origColIdxsForExtendCols, slotToIndex, skipCheck); + analyzeRow(analyzer, targetColumns, rows, rowIdx, origColIdxsForExtendCols, realTargetColumnNames, + skipCheck); } // clear these 2 structures, rebuild them using VALUES exprs @@ -686,7 +668,8 @@ private void analyzeSubquery(Analyzer analyzer, boolean skipCheck) throws UserEx // `selectStmt.getResultExprs().clear();` will clear the `rows` too, causing // error. rows.add(Lists.newArrayList(selectStmt.getResultExprs())); - analyzeRow(analyzer, targetColumns, rows, 0, origColIdxsForExtendCols, slotToIndex, skipCheck); + analyzeRow(analyzer, targetColumns, rows, 0, origColIdxsForExtendCols, realTargetColumnNames, + skipCheck); // rows may be changed in analyzeRow(), so rebuild the result exprs selectStmt.getResultExprs().clear(); for (Expr expr : rows.get(0)) { @@ -697,6 +680,8 @@ private void analyzeSubquery(Analyzer analyzer, boolean skipCheck) throws UserEx // INSERT INTO SELECT ... FROM tbl if (!origColIdxsForExtendCols.isEmpty()) { // extend the result expr by duplicating the related exprs + Map slotToIndex = buildSlotToIndex(queryStmt.getResultExprs(), realTargetColumnNames, + analyzer); for (Pair entry : origColIdxsForExtendCols) { if (entry.second == null) { queryStmt.getResultExprs().add(queryStmt.getResultExprs().get(entry.first)); @@ -726,6 +711,8 @@ private void analyzeSubquery(Analyzer analyzer, boolean skipCheck) throws UserEx // expand colLabels in QueryStmt if (!origColIdxsForExtendCols.isEmpty()) { if (queryStmt.getResultExprs().size() != queryStmt.getBaseTblResultExprs().size()) { + Map slotToIndex = buildSlotToIndex(queryStmt.getBaseTblResultExprs(), + realTargetColumnNames, analyzer); for (Pair entry : origColIdxsForExtendCols) { if (entry.second == null) { queryStmt.getBaseTblResultExprs().add(queryStmt.getBaseTblResultExprs().get(entry.first)); @@ -764,9 +751,35 @@ private void analyzeSubquery(Analyzer analyzer, boolean skipCheck) throws UserEx } } - private void analyzeRow(Analyzer analyzer, List targetColumns, List> rows, - int rowIdx, List> origColIdxsForExtendCols, Map slotToIndex, - boolean skipCheck) throws AnalysisException { + private Map buildSlotToIndex(ArrayList row, List realTargetColumnNames, + Analyzer analyzer) throws AnalysisException { + Map slotToIndex = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + for (int i = 0; i < row.size(); i++) { + Expr expr = row.get(i); + expr.analyze(analyzer); + if (expr instanceof DefaultValueExpr || expr instanceof StringLiteral + && ((StringLiteral) expr).getValue().equals(SelectStmt.DEFAULT_VALUE)) { + continue; + } + expr.analyze(analyzer); + slotToIndex.put(realTargetColumnNames.get(i), + expr.checkTypeCompatibility(targetTable.getColumn(realTargetColumnNames.get(i)).getType())); + } + for (Column column : targetTable.getBaseSchema()) { + if (!slotToIndex.containsKey(column.getName())) { + if (column.getDefaultValue() == null) { + slotToIndex.put(column.getName(), new NullLiteral()); + } else { + slotToIndex.put(column.getName(), new StringLiteral(column.getDefaultValue())); + } + } + } + return slotToIndex; + } + + private void analyzeRow(Analyzer analyzer, List targetColumns, List> rows, int rowIdx, + List> origColIdxsForExtendCols, List realTargetColumnNames, boolean skipCheck) + throws AnalysisException { // 1. check number of fields if equal with first row // targetColumns contains some shadow columns, which is added by system, // so we should minus this @@ -778,6 +791,8 @@ private void analyzeRow(Analyzer analyzer, List targetColumns, List row = rows.get(rowIdx); + Map slotToIndex = buildSlotToIndex(row, realTargetColumnNames, analyzer); + if (!origColIdxsForExtendCols.isEmpty()) { /** * we should extend the row for shadow columns. diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java index 519fb6f9bc6c8d..4c22ff9d8c7446 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java @@ -53,7 +53,7 @@ public class SimpleScheduler { private static Map> blacklistBackends = Maps.newConcurrentMap(); private static UpdateBlacklistThread updateBlacklistThread; - static { + public static void init() { updateBlacklistThread = new UpdateBlacklistThread(); updateBlacklistThread.start(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index f3fa143b528622..824e3f74abd84a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -57,9 +57,9 @@ public abstract class BaseAnalysisTask { + " NULL AS `part_id`, " + " COUNT(1) AS `row_count`, " + " NDV(`${colName}`) AS `ndv`, " - + " COUNT(1) - COUNT(${colName}) AS `null_count`, " - + " CAST(MIN(${colName}) AS STRING) AS `min`, " - + " CAST(MAX(${colName}) AS STRING) AS `max`, " + + " COUNT(1) - COUNT(`${colName}`) AS `null_count`, " + + " CAST(MIN(`${colName}`) AS STRING) AS `min`, " + + " CAST(MAX(`${colName}`) AS STRING) AS `max`, " + " ${dataSizeFunction} AS `data_size`, " + " NOW() AS `update_time` " + " FROM `${catalogName}`.`${dbName}`.`${tblName}`"; @@ -91,13 +91,13 @@ public abstract class BaseAnalysisTask { + "NULL AS `part_id`, " + "${rowCount} AS `row_count`, " + "${ndvFunction} as `ndv`, " - + "IFNULL(SUM(IF(`t1`.`column_key` IS NULL, `t1`.count, 0)), 0) * ${scaleFactor} as `null_count`, " + + "IFNULL(SUM(IF(`t1`.`column_key` IS NULL, `t1`.`count`, 0)), 0) * ${scaleFactor} as `null_count`, " + "'${min}' AS `min`, " + "'${max}' AS `max`, " + "${dataSizeFunction} * ${scaleFactor} AS `data_size`, " + "NOW() " + "FROM ( " - + " SELECT t0.`${colName}` as column_key, COUNT(1) as `count` " + + " SELECT t0.`${colName}` as `column_key`, COUNT(1) as `count` " + " FROM " + " (SELECT `${colName}` FROM `${catalogName}`.`${dbName}`.`${tblName}` " + " ${sampleHints} ${limit}) as `t0` " @@ -260,8 +260,8 @@ protected String getMinFunction() { } protected String getNdvFunction(String totalRows) { - String sampleRows = "SUM(t1.count)"; - String onceCount = "SUM(IF(t1.count = 1, 1, 0))"; + String sampleRows = "SUM(`t1`.`count`)"; + String onceCount = "SUM(IF(`t1`.`count` = 1, 1, 0))"; String countDistinct = "COUNT(1)"; // DUJ1 estimator: n*d / (n - f1 + f1*n/N) // f1 is the count of element that appears only once in the sample. diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java index 0179e9cf7eddeb..c5935a58e3e168 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java @@ -17,6 +17,7 @@ package org.apache.doris.qe; +import org.apache.doris.analysis.AccessTestUtil; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.CreateViewStmt; import org.apache.doris.analysis.PartitionValue; @@ -26,6 +27,7 @@ import org.apache.doris.analysis.StatementBase; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; @@ -50,13 +52,10 @@ import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.Util; -import org.apache.doris.datasource.CatalogMgr; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlSerializer; -import org.apache.doris.mysql.privilege.AccessControllerManager; -import org.apache.doris.mysql.privilege.MockedAuth; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.glue.LogicalPlanAdapter; @@ -77,7 +76,6 @@ import org.apache.doris.qe.cache.SqlCache; import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.Backend; -import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageType; import org.apache.doris.thrift.TUniqueId; @@ -96,17 +94,15 @@ import java.io.StringReader; import java.net.UnknownHostException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.function.Function; public class OlapQueryCacheTest { private static final Logger LOG = LogManager.getLogger(OlapQueryCacheTest.class); public static String clusterName = "testCluster"; - public static String dbName = "testDb"; public static String fullDbName = "testCluster:testDb"; - public static String tableName = "testTbl"; public static String userName = "testUser"; private static ConnectContext context; @@ -115,21 +111,12 @@ public class OlapQueryCacheTest { private Cache.HitRange hitRange; private Analyzer analyzer; private Database db; - - @Mocked - private AccessControllerManager accessManager; - @Mocked - private SystemInfoService service; - @Mocked private Env env; - @Mocked - private InternalCatalog catalog; - @Mocked private ConnectContext ctx; + private QueryState state; + private ConnectScheduler scheduler; @Mocked - MysqlChannel channel; - @Mocked - ConnectScheduler scheduler; + private MysqlChannel channel = null; @BeforeClass public static void start() { @@ -150,8 +137,15 @@ public static void start() { @Before public void setUp() throws Exception { - MockedAuth.mockedAccess(accessManager); - MockedAuth.mockedConnectContext(ctx, "root", "192.168.1.1"); + state = new QueryState(); + scheduler = new ConnectScheduler(10); + ctx = new ConnectContext(); + + SessionVariable sessionVariable = new SessionVariable(); + Deencapsulation.setField(sessionVariable, "beNumberForTest", 1); + MysqlSerializer serializer = MysqlSerializer.newInstance(); + env = AccessTestUtil.fetchAdminCatalog(); + new MockUp() { @Mock public boolean showHiddenColumns() { @@ -160,82 +154,28 @@ public boolean showHiddenColumns() { }; new MockUp() { @Mock - public SystemInfoService getCurrentSystemInfo() { - return service; - } - }; - db = new Database(1L, fullDbName); - - new Expectations(catalog) { - { - catalog.getDbNullable(fullDbName); - minTimes = 0; - result = db; - - catalog.getDbNullable(dbName); - minTimes = 0; - result = db; - - catalog.getDbNullable(db.getId()); - minTimes = 0; - result = db; - - catalog.getDbNames(); - minTimes = 0; - result = Lists.newArrayList(fullDbName); + Env getCurrentEnv() { + return env; } }; - CatalogMgr dsMgr = new CatalogMgr(); - new Expectations(dsMgr) { - { - dsMgr.getCatalog((String) any); - minTimes = 0; - result = catalog; - - dsMgr.getCatalogOrException((String) any, (Function) any); - minTimes = 0; - result = catalog; - - dsMgr.getCatalogOrAnalysisException((String) any); - minTimes = 0; - result = catalog; - } - }; - - new Expectations(env) { - { - env.getAccessManager(); - minTimes = 0; - result = accessManager; - - env.getCurrentCatalog(); - minTimes = 0; - result = catalog; - - env.getInternalCatalog(); - minTimes = 0; - result = catalog; - - env.getCatalogMgr(); - minTimes = 0; - result = dsMgr; - } - }; FunctionSet fs = new FunctionSet(); fs.init(); Deencapsulation.setField(env, "functionSet", fs); - QueryState state = new QueryState(); - channel.reset(); - SessionVariable sessionVariable = new SessionVariable(); - Deencapsulation.setField(sessionVariable, "beNumberForTest", 1); + channel.reset(); new Expectations(channel) { { + channel.sendOnePacket((ByteBuffer) any); + minTimes = 0; + + channel.reset(); + minTimes = 0; + channel.getSerializer(); minTimes = 0; - result = MysqlSerializer.newInstance(); + result = serializer; } }; @@ -290,7 +230,7 @@ public SystemInfoService getCurrentSystemInfo() { ctx.getDatabase(); minTimes = 0; - result = dbName; + result = fullDbName; ctx.getSessionVariable(); minTimes = 0; @@ -305,17 +245,32 @@ public SystemInfoService getCurrentSystemInfo() { ctx.getCurrentCatalog(); minTimes = 0; - result = catalog; + result = env.getCurrentCatalog(); ctx.getCatalog(anyString); minTimes = 0; - result = catalog; + result = env.getCurrentCatalog(); + + ConnectContext.get(); + minTimes = 0; + result = ctx; + + ctx.getRemoteIP(); + minTimes = 0; + result = "192.168.1.1"; + + ctx.getCurrentUserIdentity(); + minTimes = 0; + UserIdentity userIdentity = new UserIdentity(userName, "192.168.1.1"); + userIdentity.setIsAnalyzed(); + result = userIdentity; } }; analyzer = new Analyzer(env, ctx); newRangeList = Lists.newArrayList(); + db = ((InternalCatalog) env.getCurrentCatalog()).getDbNullable(fullDbName); // table and view init use analyzer, should init after analyzer build OlapTable tbl1 = createOrderTable(); db.createTable(tbl1); @@ -600,7 +555,6 @@ private StatementBase parseSql(String sql, Analyzer analyzer, boolean needToSql) @Test public void testCacheNode() throws Exception { - Env.getCurrentSystemInfo(); CacheCoordinator cp = CacheCoordinator.getInstance(); cp.debugModel = true; Backend bd1 = new Backend(1, "", 1000); @@ -626,7 +580,6 @@ public void testCacheNode() throws Exception { @Test public void testCacheModeNone() throws Exception { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql("select @@version_comment limit 1"); List scanNodes = Lists.newArrayList(); CacheAnalyzer ca = new CacheAnalyzer(context, parseStmt, scanNodes); @@ -636,7 +589,6 @@ public void testCacheModeNone() throws Exception { @Test public void testCacheModeTable() throws Exception { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql( "SELECT country, COUNT(userid) FROM userprofile GROUP BY country" ); @@ -650,7 +602,6 @@ public void testCacheModeTable() throws Exception { @Test public void testWithinMinTime() throws Exception { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql( "SELECT country, COUNT(userid) FROM userprofile GROUP BY country" ); @@ -664,7 +615,6 @@ public void testWithinMinTime() throws Exception { @Test public void testPartitionModel() throws Exception { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql( "SELECT eventdate, COUNT(DISTINCT userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and " + "eventdate<=\"2020-01-15\" GROUP BY eventdate" @@ -680,7 +630,6 @@ public void testPartitionModel() throws Exception { @Test public void testParseByte() throws Exception { - Env.getCurrentSystemInfo(); RowBatchBuilder sb = new RowBatchBuilder(CacheMode.Partition); byte[] buffer = new byte[]{10, 50, 48, 50, 48, 45, 48, 51, 45, 49, 48, 1, 51, 2, 67, 78}; PartitionRange.PartitionKeyType key1 = sb.getKeyFromRow(buffer, 0, Type.DATE); @@ -693,7 +642,6 @@ public void testParseByte() throws Exception { @Test public void testPartitionIntTypeSql() throws Exception { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql( "SELECT `date`, COUNT(id) FROM `order` WHERE `date`>=20200112 and `date`<=20200115 GROUP BY date" ); @@ -737,7 +685,6 @@ public void testPartitionIntTypeSql() throws Exception { @Test public void testSimpleCacheSql() throws Exception { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql( "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and " + "eventdate<=\"2020-01-15\" GROUP BY eventdate" @@ -780,7 +727,6 @@ public void testSimpleCacheSql() throws Exception { @Test public void testHitSqlCache() throws Exception { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql( "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and " + "eventdate<=\"2020-01-14\" GROUP BY eventdate" @@ -795,7 +741,6 @@ public void testHitSqlCache() throws Exception { @Test public void testHitPartPartition() throws Exception { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql( "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and " + "eventdate<=\"2020-01-14\" GROUP BY eventdate" @@ -841,7 +786,6 @@ public void testHitPartPartition() throws Exception { @Test public void testNoUpdatePartition() throws Exception { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql( "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and " + "eventdate<=\"2020-01-14\" GROUP BY eventdate" @@ -883,7 +827,6 @@ public void testNoUpdatePartition() throws Exception { @Test public void testUpdatePartition() throws Exception { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql( "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and " + "eventdate<=\"2020-01-15\" GROUP BY eventdate" @@ -932,7 +875,6 @@ public void testUpdatePartition() throws Exception { @Test public void testRewriteMultiPredicate1() throws Exception { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql( "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>\"2020-01-11\" and " + "eventdate<\"2020-01-16\"" @@ -976,7 +918,6 @@ public void testRewriteMultiPredicate1() throws Exception { @Test public void testRewriteJoin() throws Exception { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql( "SELECT appevent.eventdate, country, COUNT(appevent.userid) FROM appevent" + " INNER JOIN userprofile ON appevent.userid = userprofile.userid" @@ -1021,7 +962,6 @@ public void testRewriteJoin() throws Exception { @Test public void testSubSelect() throws Exception { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql( "SELECT eventdate, sum(pv) FROM (SELECT eventdate, COUNT(userid) AS pv FROM appevent WHERE " + "eventdate>\"2020-01-11\" AND eventdate<\"2020-01-16\"" @@ -1074,7 +1014,6 @@ public void testSubSelect() throws Exception { @Test public void testNotHitPartition() throws Exception { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql( "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and " + "eventdate<=\"2020-01-14\" GROUP BY eventdate" @@ -1103,7 +1042,6 @@ public void testNotHitPartition() throws Exception { @Test public void testSqlCacheKey() { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql( "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and " + "eventdate<=\"2020-01-14\" GROUP BY eventdate" @@ -1125,7 +1063,6 @@ public void testSqlCacheKey() { @Test public void testSqlCacheKeyWithChineseChar() { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql( "SELECT eventdate, COUNT(userid) FROM appevent WHERE eventdate>=\"2020-01-12\" and " + "eventdate<=\"2020-01-14\" and city=\"北京\" GROUP BY eventdate" @@ -1145,7 +1082,6 @@ public void testSqlCacheKeyWithChineseChar() { @Test public void testSqlCacheKeyWithView() { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql("SELECT * from testDb.view1"); ArrayList selectedPartitionIds = Lists.newArrayList(20200112L, 20200113L, 20200114L); @@ -1166,7 +1102,6 @@ public void testSqlCacheKeyWithView() { @Test public void testSqlCacheKeyWithViewForNereids() { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSqlByNereids("SELECT * from testDb.view1"); ArrayList selectedPartitionIds = Lists.newArrayList(20200112L, 20200113L, 20200114L); @@ -1185,7 +1120,6 @@ public void testSqlCacheKeyWithViewForNereids() { @Test public void testSqlCacheKeyWithSubSelectView() { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql( "select origin.eventdate as eventdate, origin.userid as userid\n" + "from (\n" @@ -1214,7 +1148,6 @@ public void testSqlCacheKeyWithSubSelectView() { @Test public void testSqlCacheKeyWithSubSelectViewForNereids() { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSqlByNereids( "select origin.eventdate as eventdate, origin.userid as userid\n" + "from (\n" @@ -1243,7 +1176,6 @@ public void testSqlCacheKeyWithSubSelectViewForNereids() { @Test public void testPartitionCacheKeyWithView() { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql("SELECT * from testDb.view3"); ArrayList selectedPartitionIds = Lists.newArrayList(20200112L, 20200113L, 20200114L, 20200115L); @@ -1270,7 +1202,6 @@ public void testPartitionCacheKeyWithView() { @Test public void testPartitionCacheKeyWithSubSelectView() { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql( "select origin.eventdate as eventdate, origin.cnt as cnt\n" + "from (\n" @@ -1294,8 +1225,8 @@ public void testPartitionCacheKeyWithSubSelectView() { Assert.assertEquals(cache.getSqlWithViewStmt(), "SELECT `origin`.`eventdate` AS `eventdate`, `origin`.`cnt` AS `cnt` " + "FROM (SELECT `eventdate` AS `eventdate`, count(`userid`) AS `cnt` " - + "FROM `testDb`.`view2` GROUP BY `eventdate`) origin|SELECT `eventdate` AS `eventdate`, " - + "`userid` AS `userid` FROM `testCluster:testDb`.`appevent`"); + + "FROM `testCluster:testDb`.`view2` GROUP BY `eventdate`) origin|SELECT `eventdate` " + + "AS `eventdate`, `userid` AS `userid` FROM `testCluster:testDb`.`appevent`"); } catch (Exception e) { LOG.warn("ex={}", e); Assert.fail(e.getMessage()); @@ -1304,7 +1235,6 @@ public void testPartitionCacheKeyWithSubSelectView() { @Test public void testSqlCacheKeyWithNestedView() { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql("SELECT * from testDb.view4"); ArrayList selectedPartitionIds = Lists.newArrayList(20200112L, 20200113L, 20200114L); @@ -1317,7 +1247,7 @@ public void testSqlCacheKeyWithNestedView() { String cacheKey = sqlCache.getSqlWithViewStmt(); Assert.assertEquals(cacheKey, "SELECT `testCluster:testDb`.`view4`.`eventdate` AS `eventdate`, " + "`testCluster:testDb`.`view4`.`__count_1` AS `__count_1` FROM `testCluster:testDb`.`view4`|" - + "SELECT `eventdate` AS `eventdate`, count(`userid`) AS `__count_1` FROM `testDb`.`view2` " + + "SELECT `eventdate` AS `eventdate`, count(`userid`) AS `__count_1` FROM `testCluster:testDb`.`view2` " + "WHERE `eventdate` >= '2020-01-12' AND `eventdate` <= '2020-01-14' GROUP BY `eventdate`|" + "SELECT `eventdate` AS `eventdate`, `userid` AS `userid` FROM `testCluster:testDb`.`appevent`"); Assert.assertEquals(selectedPartitionIds.size(), sqlCache.getSumOfPartitionNum()); @@ -1325,7 +1255,6 @@ public void testSqlCacheKeyWithNestedView() { @Test public void testSqlCacheKeyWithNestedViewForNereids() { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSqlByNereids("SELECT * from testDb.view4"); ArrayList selectedPartitionIds = Lists.newArrayList(20200112L, 20200113L, 20200114L); @@ -1337,15 +1266,14 @@ public void testSqlCacheKeyWithNestedViewForNereids() { SqlCache sqlCache = (SqlCache) ca.getCache(); String cacheKey = sqlCache.getSqlWithViewStmt(); Assert.assertEquals(cacheKey, "SELECT * from testDb.view4|SELECT `eventdate` AS `eventdate`, " - + "count(`userid`) AS `__count_1` FROM `testDb`.`view2` WHERE `eventdate` >= '2020-01-12' AND " - + "`eventdate` <= '2020-01-14' GROUP BY `eventdate`|SELECT `eventdate` AS `eventdate`, " + + "count(`userid`) AS `__count_1` FROM `testCluster:testDb`.`view2` WHERE `eventdate` >= '2020-01-12' " + + "AND `eventdate` <= '2020-01-14' GROUP BY `eventdate`|SELECT `eventdate` AS `eventdate`, " + "`userid` AS `userid` FROM `testCluster:testDb`.`appevent`"); Assert.assertEquals(selectedPartitionIds.size(), sqlCache.getSumOfPartitionNum()); } @Test public void testCacheLocalViewMultiOperand() { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql( "SELECT COUNT(userid)\n" + "FROM (\n" @@ -1369,7 +1297,6 @@ public void testCacheLocalViewMultiOperand() { @Test // test that some partitions do not exist in the table public void testNotExistPartitionSql() { - Env.getCurrentSystemInfo(); StatementBase parseStmt = parseSql( "SELECT `date`, COUNT(id) FROM `order` WHERE `date`>=20200110 and `date`<=20200115 GROUP BY date" ); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java index 63b907006dc276..6ba2d2715663dd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java @@ -46,6 +46,7 @@ public class SimpleSchedulerTest { @BeforeClass public static void setUp() { + SimpleScheduler.init(); FeConstants.heartbeat_interval_second = 2; be1 = new Backend(1000L, "192.168.100.0", 9050); be2 = new Backend(1001L, "192.168.100.1", 9050); diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/BaseAnalysisTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/BaseAnalysisTaskTest.java index e3d080fea0aea2..fe81c055e0dfbf 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/BaseAnalysisTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/BaseAnalysisTaskTest.java @@ -55,8 +55,8 @@ public void testGetFunctions() { Assertions.assertEquals("NULL", maxFunction); String ndvFunction = olapAnalysisTask.getNdvFunction(String.valueOf(100)); - Assertions.assertEquals("SUM(t1.count) * COUNT(1) / (SUM(t1.count) - SUM(IF(t1.count = 1, 1, 0)) " - + "+ SUM(IF(t1.count = 1, 1, 0)) * SUM(t1.count) / 100)", ndvFunction); + Assertions.assertEquals("SUM(`t1`.`count`) * COUNT(1) / (SUM(`t1`.`count`) - SUM(IF(`t1`.`count` = 1, 1, 0)) " + + "+ SUM(IF(`t1`.`count` = 1, 1, 0)) * SUM(`t1`.`count`) / 100)", ndvFunction); System.out.println(ndvFunction); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java index 8e30519e8c4fff..c174795b36bf20 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java @@ -151,7 +151,7 @@ public ResultRow collectBasicStat(AutoCloseConnectContext context) { @Mock public void runQuery(String sql, boolean needEncode) { Assertions.assertFalse(needEncode); - Assertions.assertEquals("SELECT CONCAT('30001', '-', '-1', '-', 'null') AS `id`, 10001 AS `catalog_id`, 20001 AS `db_id`, 30001 AS `tbl_id`, -1 AS `idx_id`, 'null' AS `col_id`, NULL AS `part_id`, 500 AS `row_count`, SUM(t1.count) * COUNT(1) / (SUM(t1.count) - SUM(IF(t1.count = 1, 1, 0)) + SUM(IF(t1.count = 1, 1, 0)) * SUM(t1.count) / 500) as `ndv`, IFNULL(SUM(IF(`t1`.`column_key` IS NULL, `t1`.count, 0)), 0) * 5.0 as `null_count`, 'MQ==' AS `min`, 'Mg==' AS `max`, SUM(LENGTH(`column_key`) * count) * 5.0 AS `data_size`, NOW() FROM ( SELECT t0.`${colName}` as column_key, COUNT(1) as `count` FROM (SELECT `${colName}` FROM `catalogName`.`${dbName}`.`${tblName}` limit 100) as `t0` GROUP BY `t0`.`${colName}` ) as `t1` ", sql); + Assertions.assertEquals("SELECT CONCAT('30001', '-', '-1', '-', 'null') AS `id`, 10001 AS `catalog_id`, 20001 AS `db_id`, 30001 AS `tbl_id`, -1 AS `idx_id`, 'null' AS `col_id`, NULL AS `part_id`, 500 AS `row_count`, SUM(`t1`.`count`) * COUNT(1) / (SUM(`t1`.`count`) - SUM(IF(`t1`.`count` = 1, 1, 0)) + SUM(IF(`t1`.`count` = 1, 1, 0)) * SUM(`t1`.`count`) / 500) as `ndv`, IFNULL(SUM(IF(`t1`.`column_key` IS NULL, `t1`.`count`, 0)), 0) * 5.0 as `null_count`, 'MQ==' AS `min`, 'Mg==' AS `max`, SUM(LENGTH(`column_key`) * count) * 5.0 AS `data_size`, NOW() FROM ( SELECT t0.`${colName}` as `column_key`, COUNT(1) as `count` FROM (SELECT `${colName}` FROM `catalogName`.`${dbName}`.`${tblName}` limit 100) as `t0` GROUP BY `t0`.`${colName}` ) as `t1` ", sql); return; } }; @@ -292,7 +292,7 @@ public ResultRow collectBasicStat(AutoCloseConnectContext context) { @Mock public void runQuery(String sql, boolean needEncode) { Assertions.assertFalse(needEncode); - Assertions.assertEquals("SELECT CONCAT('30001', '-', '-1', '-', 'null') AS `id`, 10001 AS `catalog_id`, 20001 AS `db_id`, 30001 AS `tbl_id`, -1 AS `idx_id`, 'null' AS `col_id`, NULL AS `part_id`, 500 AS `row_count`, SUM(t1.count) * COUNT(1) / (SUM(t1.count) - SUM(IF(t1.count = 1, 1, 0)) + SUM(IF(t1.count = 1, 1, 0)) * SUM(t1.count) / 500) as `ndv`, IFNULL(SUM(IF(`t1`.`column_key` IS NULL, `t1`.count, 0)), 0) * 5.0 as `null_count`, 'MQ==' AS `min`, 'Mg==' AS `max`, SUM(LENGTH(`column_key`) * count) * 5.0 AS `data_size`, NOW() FROM ( SELECT t0.`${colName}` as column_key, COUNT(1) as `count` FROM (SELECT `${colName}` FROM `catalogName`.`${dbName}`.`${tblName}` limit 100) as `t0` GROUP BY `t0`.`${colName}` ) as `t1` ", sql); + Assertions.assertEquals("SELECT CONCAT('30001', '-', '-1', '-', 'null') AS `id`, 10001 AS `catalog_id`, 20001 AS `db_id`, 30001 AS `tbl_id`, -1 AS `idx_id`, 'null' AS `col_id`, NULL AS `part_id`, 500 AS `row_count`, SUM(`t1`.`count`) * COUNT(1) / (SUM(`t1`.`count`) - SUM(IF(`t1`.`count` = 1, 1, 0)) + SUM(IF(`t1`.`count` = 1, 1, 0)) * SUM(`t1`.`count`) / 500) as `ndv`, IFNULL(SUM(IF(`t1`.`column_key` IS NULL, `t1`.`count`, 0)), 0) * 5.0 as `null_count`, 'MQ==' AS `min`, 'Mg==' AS `max`, SUM(LENGTH(`column_key`) * count) * 5.0 AS `data_size`, NOW() FROM ( SELECT t0.`${colName}` as `column_key`, COUNT(1) as `count` FROM (SELECT `${colName}` FROM `catalogName`.`${dbName}`.`${tblName}` limit 100) as `t0` GROUP BY `t0`.`${colName}` ) as `t1` ", sql); return; } }; diff --git a/regression-test/data/correctness_p0/test_null_equal.out b/regression-test/data/correctness_p0/test_null_equal.out new file mode 100644 index 00000000000000..2c927e4744b622 --- /dev/null +++ b/regression-test/data/correctness_p0/test_null_equal.out @@ -0,0 +1,199 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !test_const1 -- +true + +-- !test_const2 -- +false + +-- !test_const3 -- +false + +-- !test1 -- + +-- !test2 -- + +-- !test3 -- +1 1 +2 2 +3 3 + +-- !test4 -- +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N + +-- !test5 -- +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N + +-- !test6 -- +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +\N \N +1 1 +2 2 +3 3 + +-- !test7 -- +1 + diff --git a/regression-test/suites/fault_injection_p0/test_index_fault_injection.out b/regression-test/data/fault_injection_p0/test_index_fault_injection.out similarity index 100% rename from regression-test/suites/fault_injection_p0/test_index_fault_injection.out rename to regression-test/data/fault_injection_p0/test_index_fault_injection.out diff --git a/regression-test/data/inverted_index_p0/test_null_index.out b/regression-test/data/inverted_index_p0/test_null_index.out new file mode 100644 index 00000000000000..e37dc33ca03d32 --- /dev/null +++ b/regression-test/data/inverted_index_p0/test_null_index.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- + +-- !sql -- +1 a \N [null] [1] + +-- !sql -- diff --git a/regression-test/data/json_p0/test_json_predict_is_null.out b/regression-test/data/json_p0/test_json_predict_is_null.out new file mode 100644 index 00000000000000..e472118b03690c --- /dev/null +++ b/regression-test/data/json_p0/test_json_predict_is_null.out @@ -0,0 +1,85 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_extract -- +1 \N \N +2 null \N +3 true \N +4 false \N +5 100 \N +6 10000 \N +7 1000000000 \N +8 1152921504606846976 \N +9 6.18 \N +10 "abcd" \N +11 {} \N +12 {"k1":"v31","k2":300} "v31" +13 [] \N +14 [123,456] \N +15 ["abc","def"] \N +16 [null,true,false,100,6.18,"abc"] \N +17 [{"k1":"v41","k2":400},1,"a",3.14] \N +18 {"k1":"v31","k2":300,"a1":[{"k1":"v41","k2":400},1,"a",3.14]} "v31" +26 \N \N +27 {"k1":"v1","k2":200} "v1" +28 {"a.b.c":{"k1.a1":"v31","k2":300},"a":"niu"} \N +29 12524337771678448270 \N +30 -9223372036854775808 \N +31 18446744073709551615 \N + +-- !select_pred -- +1 \N +26 \N + +-- !select_delete -- +0 + +-- !select_pred -- +2 null +3 true +4 false +5 100 +6 10000 +7 1000000000 +8 1152921504606846976 +9 6.18 +10 "abcd" +11 {} +12 {"k1":"v31","k2":300} +13 [] +14 [123,456] +15 ["abc","def"] +16 [null,true,false,100,6.18,"abc"] +17 [{"k1":"v41","k2":400},1,"a",3.14] +18 {"k1":"v31","k2":300,"a1":[{"k1":"v41","k2":400},1,"a",3.14]} +27 {"k1":"v1","k2":200} +28 {"a.b.c":{"k1.a1":"v31","k2":300},"a":"niu"} +29 12524337771678448270 +30 -9223372036854775808 +31 18446744073709551615 + +-- !select_drop -- +0 + +-- !select -- +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +27 +28 +29 +30 +31 + diff --git a/regression-test/data/jsonb_p0/test_jsonb_predict_is_null.out b/regression-test/data/jsonb_p0/test_jsonb_predict_is_null.out new file mode 100644 index 00000000000000..e472118b03690c --- /dev/null +++ b/regression-test/data/jsonb_p0/test_jsonb_predict_is_null.out @@ -0,0 +1,85 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_extract -- +1 \N \N +2 null \N +3 true \N +4 false \N +5 100 \N +6 10000 \N +7 1000000000 \N +8 1152921504606846976 \N +9 6.18 \N +10 "abcd" \N +11 {} \N +12 {"k1":"v31","k2":300} "v31" +13 [] \N +14 [123,456] \N +15 ["abc","def"] \N +16 [null,true,false,100,6.18,"abc"] \N +17 [{"k1":"v41","k2":400},1,"a",3.14] \N +18 {"k1":"v31","k2":300,"a1":[{"k1":"v41","k2":400},1,"a",3.14]} "v31" +26 \N \N +27 {"k1":"v1","k2":200} "v1" +28 {"a.b.c":{"k1.a1":"v31","k2":300},"a":"niu"} \N +29 12524337771678448270 \N +30 -9223372036854775808 \N +31 18446744073709551615 \N + +-- !select_pred -- +1 \N +26 \N + +-- !select_delete -- +0 + +-- !select_pred -- +2 null +3 true +4 false +5 100 +6 10000 +7 1000000000 +8 1152921504606846976 +9 6.18 +10 "abcd" +11 {} +12 {"k1":"v31","k2":300} +13 [] +14 [123,456] +15 ["abc","def"] +16 [null,true,false,100,6.18,"abc"] +17 [{"k1":"v41","k2":400},1,"a",3.14] +18 {"k1":"v31","k2":300,"a1":[{"k1":"v41","k2":400},1,"a",3.14]} +27 {"k1":"v1","k2":200} +28 {"a.b.c":{"k1.a1":"v31","k2":300},"a":"niu"} +29 12524337771678448270 +30 -9223372036854775808 +31 18446744073709551615 + +-- !select_drop -- +0 + +-- !select -- +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +27 +28 +29 +30 +31 + diff --git a/regression-test/data/mv_p0/test_insert_multi/test_insert_multi.out b/regression-test/data/mv_p0/test_insert_multi/test_insert_multi.out new file mode 100644 index 00000000000000..46af15050874a5 --- /dev/null +++ b/regression-test/data/mv_p0/test_insert_multi/test_insert_multi.out @@ -0,0 +1,9 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_star -- +1 1 1 2020-02-02 1 +1 2 2 2020-02-02 1 + +-- !select_mv -- +1 1 +2 1 + diff --git a/regression-test/suites/correctness_p0/test_null_equal.groovy b/regression-test/suites/correctness_p0/test_null_equal.groovy new file mode 100644 index 00000000000000..653dc59e90fef1 --- /dev/null +++ b/regression-test/suites/correctness_p0/test_null_equal.groovy @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_null_equal") { + qt_test_const1 "select null <=> null;" + qt_test_const2 "select null <=> 0;" + qt_test_const3 "select 1 <=> null;" + + sql "drop table if exists test_eq_for_null_not_nullable;" + sql """ + create table test_eq_for_null_not_nullable( + k1 int not null + ) distributed by hash(k1) properties("replication_num"="1"); + """ + sql """ + insert into test_eq_for_null_not_nullable values + (1),(2),(3); + """ + sql "sync" + qt_test1 "select * from test_eq_for_null_not_nullable where k1 <=> null;" + qt_test2 "select * from test_eq_for_null_not_nullable where null <=> k1;" + + sql "drop table if exists test_eq_for_null_nullable;" + sql """ + create table test_eq_for_null_nullable( + k1 int + ) distributed by hash(k1) properties("replication_num"="1"); + """ + sql """ + insert into test_eq_for_null_nullable values + (1),(2),(3), + (null), (null), (null),(null),(null),(null),(null),(null), + (null), (null), (null),(null),(null),(null),(null),(null), + (null), (null), (null),(null),(null),(null),(null),(null), + (null), (null), (null),(null),(null),(null),(null),(null), + (null), (null), (null),(null),(null),(null),(null),(null), + (null), (null), (null),(null),(null),(null),(null),(null), + (null), (null), (null),(null),(null),(null),(null),(null); + """ + sql "sync" + qt_test3 "select * from test_eq_for_null_not_nullable l, test_eq_for_null_nullable r where l.k1 <=> r.k1 order by 1;" + qt_test4 "select * from test_eq_for_null_nullable where k1 <=> null;" + qt_test5 "select * from test_eq_for_null_nullable where null <=> k1;" + + sql "drop table if exists test_eq_for_null_nullable2;" + sql """ + create table test_eq_for_null_nullable2( + k1 int + ) distributed by hash(k1) properties("replication_num"="1"); + """ + sql """ + insert into test_eq_for_null_nullable2 values + (null),(0),(1),(2),(3); + """ + sql "sync" + + qt_test6 "select * from test_eq_for_null_nullable a, test_eq_for_null_nullable2 b where a.k1 <=> b.k1 order by 1;" + + qt_test7 "select * from test_eq_for_null_nullable2 where k1 <=> 1 order by 1;" + + + +} \ No newline at end of file diff --git a/regression-test/suites/inverted_index_p0/test_null_index.groovy b/regression-test/suites/inverted_index_p0/test_null_index.groovy new file mode 100644 index 00000000000000..6bbab71e94091b --- /dev/null +++ b/regression-test/suites/inverted_index_p0/test_null_index.groovy @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +suite("test_null_index", "p0"){ + // prepare test table + + + def timeout = 60000 + def delta_time = 1000 + def alter_res = "null" + def useTime = 0 + + def indexTblName = "no_index_test" + + sql "DROP TABLE IF EXISTS ${indexTblName}" + // create 1 replica table + sql """ + CREATE TABLE IF NOT EXISTS ${indexTblName}( + `id` int(11) NOT NULL, + `str` string NOT NULL, + `str_null` string NULL, + `value` array NOT NULL, + `value_int` array NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql "INSERT INTO $indexTblName VALUES (1, 'a', null, [null], [1]), (2, 'b', 'b', ['b'], [2]), (3, 'c', 'c', ['c'], [3]);" + qt_sql "SELECT * FROM $indexTblName WHERE str match null order by id;" + qt_sql "SELECT * FROM $indexTblName WHERE str_null match null order by id;" + try { + qt_sql "SELECT * FROM $indexTblName WHERE value_int element_eq 2;" + } catch (Exception e) { + assertTrue(e.getMessage().contains("unsupported nested array of type"), e.getMessage()) + } +} diff --git a/regression-test/suites/json_p0/test_json_predict_is_null.groovy b/regression-test/suites/json_p0/test_json_predict_is_null.groovy new file mode 100644 index 00000000000000..ed47a2934f8a8b --- /dev/null +++ b/regression-test/suites/json_p0/test_json_predict_is_null.groovy @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_json_predict_is_null", "p0") { + sql """ set experimental_enable_nereids_planner = false """ + + sql """ set experimental_enable_nereids_planner = true """ + sql """ set enable_fallback_to_original_planner = true """ + + sql "DROP TABLE IF EXISTS j_pred" + + sql """ + CREATE TABLE IF NOT EXISTS j_pred ( + id INT, + j JSON + ) + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 10 + PROPERTIES("replication_num" = "1", "disable_auto_compaction" = "true"); + """ + + // insert into valid json rows + sql """INSERT INTO j_pred VALUES(26, NULL)""" + sql """INSERT INTO j_pred VALUES(27, '{"k1":"v1", "k2": 200}')""" + sql """INSERT INTO j_pred VALUES(28, '{"a.b.c":{"k1.a1":"v31", "k2": 300},"a":"niu"}')""" + // int64 value + sql """INSERT INTO j_pred VALUES(29, '12524337771678448270')""" + // int64 min value + sql """INSERT INTO j_pred VALUES(30, '-9223372036854775808')""" + // int64 max value + sql """INSERT INTO j_pred VALUES(31, '18446744073709551615')""" + + // load the jsonb data from csv file + streamLoad { + table "j_pred" + + file "test_json.csv" // import csv file + set 'max_filter_ratio', '0.3' + time 10000 // limit inflight 10s + set 'strict_mode', 'true' + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + + def (code, out, err) = curl("GET", json.ErrorURL) + log.info("error result: " + out) + + assertEquals("success", json.Status.toLowerCase()) + assertEquals(25, json.NumberTotalRows) + assertEquals(18, json.NumberLoadedRows) + assertEquals(7, json.NumberFilteredRows) + assertTrue(json.LoadBytes > 0) + } + } + + qt_select_extract "SELECT id, j, jsonb_extract(j, '\$.k1') FROM j_pred ORDER BY id" + + qt_select_pred "select * from j_pred where j is null order by id" + + qt_select_delete "delete from j_pred where j is null" + + qt_select_pred "select * from j_pred order by id" + + qt_select_drop "alter table j_pred DROP COLUMN j" + + qt_select "select * from j_pred order by id" +} diff --git a/regression-test/suites/jsonb_p0/test_jsonb_predict_is_null.groovy b/regression-test/suites/jsonb_p0/test_jsonb_predict_is_null.groovy new file mode 100644 index 00000000000000..c97d0469908320 --- /dev/null +++ b/regression-test/suites/jsonb_p0/test_jsonb_predict_is_null.groovy @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_jsonb_predict_is_null", "p0") { + sql """ set experimental_enable_nereids_planner = false """ + + sql """ set experimental_enable_nereids_planner = true """ + sql """ set enable_fallback_to_original_planner = true """ + + sql "DROP TABLE IF EXISTS jb_pred" + + sql """ + CREATE TABLE IF NOT EXISTS jb_pred ( + id INT, + j JSONB + ) + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 10 + PROPERTIES("replication_num" = "1", "disable_auto_compaction" = "true"); + """ + + // insert into valid json rows + sql """INSERT INTO jb_pred VALUES(26, NULL)""" + sql """INSERT INTO jb_pred VALUES(27, '{"k1":"v1", "k2": 200}')""" + sql """INSERT INTO jb_pred VALUES(28, '{"a.b.c":{"k1.a1":"v31", "k2": 300},"a":"niu"}')""" + // int64 value + sql """INSERT INTO jb_pred VALUES(29, '12524337771678448270')""" + // int64 min value + sql """INSERT INTO jb_pred VALUES(30, '-9223372036854775808')""" + // int64 max value + sql """INSERT INTO jb_pred VALUES(31, '18446744073709551615')""" + + // load the jsonb data from csv file + streamLoad { + table "jb_pred" + + file "test_jsonb.csv" // import csv file + set 'max_filter_ratio', '0.3' + time 10000 // limit inflight 10s + set 'strict_mode', 'true' + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + def (code, out, err) = curl("GET", json.ErrorURL) + log.info("error result: " + out) + + assertEquals("success", json.Status.toLowerCase()) + assertEquals(25, json.NumberTotalRows) + assertEquals(18, json.NumberLoadedRows) + assertEquals(7, json.NumberFilteredRows) + assertTrue(json.LoadBytes > 0) + } + } + + qt_select_extract "SELECT id, j, jsonb_extract(j, '\$.k1') FROM jb_pred ORDER BY id" + + qt_select_pred "select * from jb_pred where j is null order by id" + + qt_select_delete "delete from jb_pred where j is null" + + qt_select_pred "select * from jb_pred order by id" + + qt_select_drop "alter table jb_pred DROP COLUMN j" + + qt_select "select * from jb_pred order by id" +} diff --git a/regression-test/suites/mv_p0/test_insert_multi/test_insert_multi.groovy b/regression-test/suites/mv_p0/test_insert_multi/test_insert_multi.groovy new file mode 100644 index 00000000000000..3f0d648f7c4e87 --- /dev/null +++ b/regression-test/suites/mv_p0/test_insert_multi/test_insert_multi.groovy @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite ("test_insert_multi") { + + sql """ DROP TABLE IF EXISTS sales_records; """ + + sql """ + create table sales_records(record_id int, seller_id int, store_id int, sale_date date, sale_amt bigint) distributed by hash(record_id) properties("replication_num" = "1"); + """ + + createMV ("create materialized view store_amt as select store_id, sum(sale_amt) from sales_records group by store_id;") + + sql """insert into sales_records values(1,1,1,"2020-02-02",1),(1,2,2,"2020-02-02",1);""" + + qt_select_star "select * from sales_records order by 1,2;" + + explain { + sql(" SELECT store_id, sum(sale_amt) FROM sales_records GROUP BY store_id order by 1;") + contains "(store_amt)" + } + qt_select_mv " SELECT store_id, sum(sale_amt) FROM sales_records GROUP BY store_id order by 1;" +}