From 8f8fb0e54295d44f2d928d40728e7e5f3657edec Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Thu, 1 Aug 2024 22:13:24 +0800 Subject: [PATCH] support group concat with distinct and order by --- .../aggregate_functions/aggregate_function.h | 11 +- .../aggregate_function_distinct.cpp | 25 ++- .../aggregate_function_distinct.h | 206 +++++++++++++----- .../aggregate_function_foreach.h | 2 - .../aggregate_function_null.h | 2 - .../aggregate_function_simple_factory.cpp | 1 - .../aggregate_function_sort.h | 13 +- 7 files changed, 173 insertions(+), 87 deletions(-) diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h index 082f27e7318345..e9d7ff37dbc6e8 100644 --- a/be/src/vec/aggregate_functions/aggregate_function.h +++ b/be/src/vec/aggregate_functions/aggregate_function.h @@ -43,6 +43,8 @@ class AggregateFunctionBitmapCount; template class AggregateFunctionBitmapOp; struct AggregateFunctionBitmapUnionOp; +class IAggregateFunction; +using AggregateFunctionPtr = std::shared_ptr; using DataTypePtr = std::shared_ptr; using DataTypes = std::vector; @@ -178,11 +180,6 @@ class IAggregateFunction { const size_t offset, IColumn& to, const size_t num_rows) const = 0; - /** Returns true for aggregate functions of type -State. - * They are executed as other aggregate functions, but not finalized (return an aggregation state that can be combined with another). - */ - virtual bool is_state() const { return false; } - /** Contains a loop with calls to "add" function. You can collect arguments into array "places" * and do a single call to "add_batch" for devirtualization and inlining. */ @@ -223,6 +220,8 @@ class IAggregateFunction { virtual void set_version(const int version_) { version = version_; } + virtual AggregateFunctionPtr transmit_to_stable() { return nullptr; } + protected: DataTypes argument_types; int version {}; @@ -519,8 +518,6 @@ class IAggregateFunctionDataHelper : public IAggregateFunctionHelper { } }; -using AggregateFunctionPtr = std::shared_ptr; - class AggregateFunctionGuard { public: using AggregateData = std::remove_pointer_t; diff --git a/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp b/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp index 3155aa24be2a08..f86d44b7d6828b 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp @@ -29,6 +29,16 @@ namespace doris::vectorized { +template +struct Reducer { + template + using Output = AggregateFunctionDistinctSingleNumericData; + using AggregateFunctionDistinctNormal = AggregateFunctionDistinct; +}; + +template +using AggregateFunctionDistinctNumeric = Reducer::AggregateFunctionDistinctNormal; + class AggregateFunctionCombinatorDistinct final : public IAggregateFunctionCombinator { public: String get_name() const override { return "Distinct"; } @@ -52,22 +62,15 @@ class AggregateFunctionCombinatorDistinct final : public IAggregateFunctionCombi if (arguments.size() == 1) { AggregateFunctionPtr res( - creator_with_numeric_type::create( + creator_with_numeric_type::create( arguments, result_is_nullable, nested_function)); if (res) { return res; } - if (arguments[0]->is_value_unambiguously_represented_in_contiguous_memory_region()) { - res = creator_without_type::create>>( - arguments, result_is_nullable, nested_function); - } else { - res = creator_without_type::create>>( - arguments, result_is_nullable, nested_function); - } + res = creator_without_type::create< + AggregateFunctionDistinct>( + arguments, result_is_nullable, nested_function); return res; } return creator_without_type::create< diff --git a/be/src/vec/aggregate_functions/aggregate_function_distinct.h b/be/src/vec/aggregate_functions/aggregate_function_distinct.h index c0c7a5b66dd58f..5eea0222d9a9f9 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_distinct.h +++ b/be/src/vec/aggregate_functions/aggregate_function_distinct.h @@ -28,6 +28,8 @@ #include #include #include +#include +#include #include #include "vec/aggregate_functions/aggregate_function.h" @@ -54,105 +56,170 @@ struct DefaultHash; namespace doris::vectorized { -template +template struct AggregateFunctionDistinctSingleNumericData { /// When creating, the hash table must be small. - using Set = HashSetWithStackMemory, 4>; - using Self = AggregateFunctionDistinctSingleNumericData; - Set set; + using Container = std::conditional_t, + HashSetWithStackMemory, 4>>; + using Self = AggregateFunctionDistinctSingleNumericData; + Container data; void add(const IColumn** columns, size_t /* columns_num */, size_t row_num, Arena*) { const auto& vec = assert_cast&>(*columns[0]).get_data(); - set.insert(vec[row_num]); + if constexpr (stable) { + data.insert_or_assign(vec[row_num], data.size()); + } else { + data.insert(vec[row_num]); + } } - void merge(const Self& rhs, Arena*) { set.merge(rhs.set); } + void merge(const Self& rhs, Arena*) { + DCHECK(!stable); + if constexpr (!stable) { + data.merge(rhs.data); + } + } - void serialize(BufferWritable& buf) const { set.write(buf); } + void serialize(BufferWritable& buf) const { + DCHECK(!stable); + if constexpr (!stable) { + data.write(buf); + } + } - void deserialize(BufferReadable& buf, Arena*) { set.read(buf); } + void deserialize(BufferReadable& buf, Arena*) { + DCHECK(!stable); + if constexpr (!stable) { + data.read(buf); + } + } MutableColumns get_arguments(const DataTypes& argument_types) const { MutableColumns argument_columns; argument_columns.emplace_back(argument_types[0]->create_column()); - for (const auto& elem : set) { - argument_columns[0]->insert(elem.get_value()); + + if constexpr (stable) { + argument_columns[0]->resize(data.size()); + auto ptr = (T*)const_cast(argument_columns[0]->get_raw_data().data); + for (auto it : data) { + ptr[it.second] = it.first; + } + } else { + for (const auto& elem : data) { + argument_columns[0]->insert(elem.get_value()); + } } return argument_columns; } }; +template struct AggregateFunctionDistinctGenericData { /// When creating, the hash table must be small. - using Set = HashSetWithStackMemory; + using Container = std::conditional_t, + HashSetWithStackMemory>; using Self = AggregateFunctionDistinctGenericData; - Set set; + Container data; void merge(const Self& rhs, Arena* arena) { - Set::LookupResult it; - bool inserted; - for (const auto& elem : rhs.set) { - StringRef key = elem.get_value(); - key.data = arena->insert(key.data, key.size); - set.emplace(key, it, inserted); + DCHECK(!stable); + if constexpr (!stable) { + typename Container::LookupResult it; + bool inserted; + for (const auto& elem : rhs.data) { + StringRef key = elem.get_value(); + key.data = arena->insert(key.data, key.size); + data.emplace(key, it, inserted); + } } } void serialize(BufferWritable& buf) const { - write_var_uint(set.size(), buf); - for (const auto& elem : set) { - write_string_binary(elem.get_value(), buf); + DCHECK(!stable); + if constexpr (!stable) { + write_var_uint(data.size(), buf); + for (const auto& elem : data) { + write_string_binary(elem.get_value(), buf); + } } } void deserialize(BufferReadable& buf, Arena* arena) { - UInt64 size; - read_var_uint(size, buf); - - StringRef ref; - for (size_t i = 0; i < size; ++i) { - read_string_binary(ref, buf); - set.insert(ref); + DCHECK(!stable); + if constexpr (!stable) { + UInt64 size; + read_var_uint(size, buf); + + StringRef ref; + for (size_t i = 0; i < size; ++i) { + read_string_binary(ref, buf); + data.insert(ref); + } } } }; -template -struct AggregateFunctionDistinctSingleGenericData : public AggregateFunctionDistinctGenericData { +template +struct AggregateFunctionDistinctSingleGenericData + : public AggregateFunctionDistinctGenericData { + using Base = AggregateFunctionDistinctGenericData; + using Base::data; void add(const IColumn** columns, size_t /* columns_num */, size_t row_num, Arena* arena) { - Set::LookupResult it; - bool inserted; auto key = columns[0]->get_data_at(row_num); key.data = arena->insert(key.data, key.size); - set.emplace(key, it, inserted); + + if constexpr (stable) { + data.insert_or_assign(key, data.size()); + } else { + typename Base::Container::LookupResult it; + bool inserted; + data.emplace(key, it, inserted); + } } MutableColumns get_arguments(const DataTypes& argument_types) const { MutableColumns argument_columns; argument_columns.emplace_back(argument_types[0]->create_column()); - for (const auto& elem : set) { - argument_columns[0]->insert_data(elem.get_value().data, elem.get_value().size); + if constexpr (stable) { + std::vector tmp(data.size()); + for (auto it : data) { + tmp[it.second] = it.first; + } + for (int i = 0; i < data.size(); i++) { + argument_columns[0]->insert_data(tmp[i].data, tmp[i].size); + } + } else { + for (const auto& elem : data) { + argument_columns[0]->insert_data(elem.get_value().data, elem.get_value().size); + } } return argument_columns; } }; -struct AggregateFunctionDistinctMultipleGenericData : public AggregateFunctionDistinctGenericData { +template +struct AggregateFunctionDistinctMultipleGenericData + : public AggregateFunctionDistinctGenericData { + using Base = AggregateFunctionDistinctGenericData; + using Base::data; void add(const IColumn** columns, size_t columns_num, size_t row_num, Arena* arena) { const char* begin = nullptr; - StringRef value(begin, 0); + StringRef key(begin, 0); for (size_t i = 0; i < columns_num; ++i) { auto cur_ref = columns[i]->serialize_value_into_arena(row_num, *arena, begin); - value.data = cur_ref.data - value.size; - value.size += cur_ref.size; + key.data = cur_ref.data - key.size; + key.size += cur_ref.size; } - Set::LookupResult it; - bool inserted; - value.data = arena->insert(value.data, value.size); - set.emplace(value, it, inserted); + if constexpr (stable) { + data.insert_or_assign(key, data.size()); + } else { + typename Base::Container::LookupResult it; + bool inserted; + data.emplace(key, it, inserted); + } } MutableColumns get_arguments(const DataTypes& argument_types) const { @@ -161,10 +228,23 @@ struct AggregateFunctionDistinctMultipleGenericData : public AggregateFunctionDi argument_columns[i] = argument_types[i]->create_column(); } - for (const auto& elem : set) { - const char* begin = elem.get_value().data; - for (auto& column : argument_columns) { - begin = column->deserialize_and_insert_from_arena(begin); + if constexpr (stable) { + std::vector tmp(data.size()); + for (auto it : data) { + tmp[it.second] = it.first; + } + for (int i = 0; i < data.size(); i++) { + const char* begin = tmp[i].data; + for (auto& column : argument_columns) { + begin = column->deserialize_and_insert_from_arena(begin); + } + } + } else { + for (const auto& elem : data) { + const char* begin = elem.get_value().data; + for (auto& column : argument_columns) { + begin = column->deserialize_and_insert_from_arena(begin); + } } } @@ -175,9 +255,10 @@ struct AggregateFunctionDistinctMultipleGenericData : public AggregateFunctionDi /** Adaptor for aggregate functions. * Adding -Distinct suffix to aggregate function **/ -template +template