Skip to content

Commit f112af0

Browse files
authored
[pick](branch-2.1) pick #41555 #41592 #38204 (#41781)
pick #41555 #41592 #38204
1 parent ec0c008 commit f112af0

21 files changed

+174
-38
lines changed

be/src/pipeline/exec/aggregation_sink_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
164164
? DataDistribution(ExchangeType::PASSTHROUGH)
165165
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
166166
}
167-
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
167+
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
168168
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
169169
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
170170
}

be/src/pipeline/exec/analytic_sink_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
102102
if (_partition_by_eq_expr_ctxs.empty()) {
103103
return {ExchangeType::PASSTHROUGH};
104104
} else if (_order_by_eq_expr_ctxs.empty()) {
105-
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
105+
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
106106
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
107107
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
108108
}

be/src/pipeline/exec/distinct_streaming_aggregation_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ class DistinctStreamingAggOperatorX final
107107

108108
DataDistribution required_data_distribution() const override {
109109
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
110-
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
110+
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
111111
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
112112
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
113113
}

be/src/pipeline/exec/hashjoin_build_sink.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ class HashJoinBuildSinkOperatorX final
167167
bool require_shuffled_data_distribution() const override {
168168
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
169169
}
170-
bool is_shuffled_hash_join() const override {
170+
bool is_shuffled_operator() const override {
171171
return _join_distribution == TJoinDistributionType::PARTITIONED;
172172
}
173173
bool require_data_distribution() const override {

be/src/pipeline/exec/hashjoin_probe_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
178178
bool require_shuffled_data_distribution() const override {
179179
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
180180
}
181-
bool is_shuffled_hash_join() const override {
181+
bool is_shuffled_operator() const override {
182182
return _join_distribution == TJoinDistributionType::PARTITIONED;
183183
}
184184
bool require_data_distribution() const override {

be/src/pipeline/exec/operator.h

+7-4
Original file line numberDiff line numberDiff line change
@@ -254,10 +254,13 @@ class OperatorBase {
254254

255255
virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
256256
[[nodiscard]] virtual bool require_data_distribution() const { return false; }
257-
[[nodiscard]] bool followed_by_shuffled_join() const { return _followed_by_shuffled_join; }
258-
void set_followed_by_shuffled_join(bool followed_by_shuffled_join) {
259-
_followed_by_shuffled_join = followed_by_shuffled_join;
257+
[[nodiscard]] bool followed_by_shuffled_operator() const {
258+
return _followed_by_shuffled_operator;
260259
}
260+
void set_followed_by_shuffled_operator(bool followed_by_shuffled_operator) {
261+
_followed_by_shuffled_operator = followed_by_shuffled_operator;
262+
}
263+
[[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
261264
[[nodiscard]] virtual bool require_shuffled_data_distribution() const { return false; }
262265

263266
protected:
@@ -268,7 +271,7 @@ class OperatorBase {
268271
OperatorXPtr _child_x = nullptr;
269272

270273
bool _is_closed;
271-
bool _followed_by_shuffled_join = false;
274+
bool _followed_by_shuffled_operator = false;
272275
};
273276

274277
/**

be/src/pipeline/exec/partitioned_hash_join_probe_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ class PartitionedHashJoinProbeOperatorX final
172172
bool require_shuffled_data_distribution() const override {
173173
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
174174
}
175-
bool is_shuffled_hash_join() const override {
175+
bool is_shuffled_operator() const override {
176176
return _join_distribution == TJoinDistributionType::PARTITIONED;
177177
}
178178

be/src/pipeline/exec/partitioned_hash_join_sink_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class PartitionedHashJoinSinkOperatorX
122122
bool require_shuffled_data_distribution() const override {
123123
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
124124
}
125-
bool is_shuffled_hash_join() const override {
125+
bool is_shuffled_operator() const override {
126126
return _join_distribution == TJoinDistributionType::PARTITIONED;
127127
}
128128

be/src/pipeline/exec/schema_scan_operator.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,9 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl
285285
} while (block->rows() == 0 && !*eos);
286286

287287
local_state.reached_limit(block, eos);
288+
if (*eos) {
289+
local_state._finish_dependency->set_always_ready();
290+
}
288291
return Status::OK();
289292
}
290293

be/src/pipeline/exec/sort_sink_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
8787
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
8888
DataDistribution required_data_distribution() const override {
8989
if (_is_analytic_sort) {
90-
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
90+
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
9191
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
9292
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
9393
} else if (_merge_by_exchange) {

be/src/pipeline/exec/union_sink_operator.h

+6
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> {
125125
}
126126
}
127127

128+
bool require_shuffled_data_distribution() const override {
129+
return _followed_by_shuffled_operator;
130+
}
131+
132+
bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }
133+
128134
private:
129135
int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; }
130136

be/src/pipeline/exec/union_source_operator.h

+5
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,11 @@ class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {
135135
return Status::OK();
136136
}
137137
[[nodiscard]] int get_child_count() const { return _child_size; }
138+
bool require_shuffled_data_distribution() const override {
139+
return _followed_by_shuffled_operator;
140+
}
141+
142+
bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }
138143

139144
private:
140145
bool _has_data(RuntimeState* state) const {

be/src/pipeline/pipeline_x/operator.h

-4
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,6 @@ class OperatorXBase : public OperatorBase {
231231

232232
[[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { return false; }
233233

234-
[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }
235-
236234
bool can_read() override {
237235
LOG(FATAL) << "should not reach here!";
238236
return false;
@@ -627,8 +625,6 @@ class DataSinkOperatorXBase : public OperatorBase {
627625
: DataDistribution(ExchangeType::NOOP);
628626
}
629627

630-
[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }
631-
632628
Status close(RuntimeState* state) override {
633629
return Status::InternalError("Should not reach here!");
634630
}

be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp

+23-20
Original file line numberDiff line numberDiff line change
@@ -771,7 +771,7 @@ Status PipelineXFragmentContext::_create_tree_helper(
771771
ObjectPool* pool, const std::vector<TPlanNode>& tnodes,
772772
const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs,
773773
OperatorXPtr parent, int* node_idx, OperatorXPtr* root, PipelinePtr& cur_pipe,
774-
int child_idx, const bool followed_by_shuffled_join) {
774+
int child_idx, const bool followed_by_shuffled_operator) {
775775
// propagate error case
776776
if (*node_idx >= tnodes.size()) {
777777
// TODO: print thrift msg
@@ -782,11 +782,11 @@ Status PipelineXFragmentContext::_create_tree_helper(
782782
const TPlanNode& tnode = tnodes[*node_idx];
783783

784784
int num_children = tnodes[*node_idx].num_children;
785-
bool current_followed_by_shuffled_join = followed_by_shuffled_join;
785+
bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
786786
OperatorXPtr op = nullptr;
787787
RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe,
788788
parent == nullptr ? -1 : parent->node_id(), child_idx,
789-
followed_by_shuffled_join));
789+
followed_by_shuffled_operator));
790790

791791
// assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
792792
if (parent != nullptr) {
@@ -797,7 +797,7 @@ Status PipelineXFragmentContext::_create_tree_helper(
797797
}
798798

799799
/**
800-
* `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled hash join.
800+
* `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
801801
*
802802
* For plan:
803803
* LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
@@ -811,8 +811,8 @@ Status PipelineXFragmentContext::_create_tree_helper(
811811
cur_pipe->operator_xs().empty()
812812
? cur_pipe->sink_x()->require_shuffled_data_distribution()
813813
: op->require_shuffled_data_distribution();
814-
current_followed_by_shuffled_join =
815-
(followed_by_shuffled_join || op->is_shuffled_hash_join()) &&
814+
current_followed_by_shuffled_operator =
815+
(followed_by_shuffled_operator || op->is_shuffled_operator()) &&
816816
require_shuffled_data_distribution;
817817

818818
cur_pipe->_name.push_back('-');
@@ -823,7 +823,7 @@ Status PipelineXFragmentContext::_create_tree_helper(
823823
for (int i = 0; i < num_children; i++) {
824824
++*node_idx;
825825
RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr,
826-
cur_pipe, i, current_followed_by_shuffled_join));
826+
cur_pipe, i, current_followed_by_shuffled_operator));
827827

828828
// we are expecting a child, but have used all nodes
829829
// this means we have been given a bad tree and must fail
@@ -865,13 +865,13 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
865865
* `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
866866
* So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
867867
*/
868-
const bool followed_by_shuffled_join =
869-
operator_xs.size() > idx ? operator_xs[idx]->followed_by_shuffled_join()
870-
: cur_pipe->sink_x()->followed_by_shuffled_join();
868+
const bool followed_by_shuffled_operator =
869+
operator_xs.size() > idx ? operator_xs[idx]->followed_by_shuffled_operator()
870+
: cur_pipe->sink_x()->followed_by_shuffled_operator();
871871
const bool should_disable_bucket_shuffle =
872872
bucket_seq_to_instance_idx.empty() &&
873873
shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() &&
874-
followed_by_shuffled_join;
874+
followed_by_shuffled_operator;
875875
sink.reset(new LocalExchangeSinkOperatorX(
876876
sink_id, local_exchange_id,
877877
should_disable_bucket_shuffle ? _total_instances : _num_instances,
@@ -1047,7 +1047,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
10471047
const DescriptorTbl& descs, OperatorXPtr& op,
10481048
PipelinePtr& cur_pipe, int parent_idx,
10491049
int child_idx,
1050-
const bool followed_by_shuffled_join) {
1050+
const bool followed_by_shuffled_operator) {
10511051
// We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
10521052
// Therefore, here we need to use a stack-like structure.
10531053
_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
@@ -1121,7 +1121,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
11211121
op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
11221122
_require_bucket_distribution));
11231123
RETURN_IF_ERROR(cur_pipe->add_operator(op));
1124-
op->set_followed_by_shuffled_join(followed_by_shuffled_join);
1124+
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
11251125
_require_bucket_distribution =
11261126
_require_bucket_distribution || op->require_data_distribution();
11271127
} else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
@@ -1152,7 +1152,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
11521152
sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
11531153
_require_bucket_distribution));
11541154
}
1155-
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
1155+
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
11561156
_require_bucket_distribution =
11571157
_require_bucket_distribution || sink->require_data_distribution();
11581158
sink->set_dests_id({op->operator_id()});
@@ -1203,8 +1203,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
12031203

12041204
_pipeline_parent_map.push(op->node_id(), cur_pipe);
12051205
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
1206-
sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join());
1207-
op->set_followed_by_shuffled_join(op->is_shuffled_hash_join());
1206+
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
1207+
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
12081208
} else {
12091209
op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs));
12101210
RETURN_IF_ERROR(cur_pipe->add_operator(op));
@@ -1225,8 +1225,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
12251225

12261226
_pipeline_parent_map.push(op->node_id(), cur_pipe);
12271227
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
1228-
sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join());
1229-
op->set_followed_by_shuffled_join(op->is_shuffled_hash_join());
1228+
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
1229+
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
12301230
}
12311231
_require_bucket_distribution =
12321232
_require_bucket_distribution || op->require_data_distribution();
@@ -1256,6 +1256,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
12561256
case TPlanNodeType::UNION_NODE: {
12571257
int child_count = tnode.num_children;
12581258
op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs));
1259+
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
12591260
RETURN_IF_ERROR(cur_pipe->add_operator(op));
12601261

12611262
const auto downstream_pipeline_id = cur_pipe->id();
@@ -1298,7 +1299,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
12981299
sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
12991300
_require_bucket_distribution));
13001301
}
1301-
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
1302+
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
13021303
_require_bucket_distribution =
13031304
_require_bucket_distribution || sink->require_data_distribution();
13041305
sink->set_dests_id({op->operator_id()});
@@ -1338,7 +1339,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
13381339
DataSinkOperatorXPtr sink;
13391340
sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
13401341
_require_bucket_distribution));
1341-
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
1342+
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
13421343
_require_bucket_distribution =
13431344
_require_bucket_distribution || sink->require_data_distribution();
13441345
sink->set_dests_id({op->operator_id()});
@@ -1349,11 +1350,13 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
13491350
case TPlanNodeType::INTERSECT_NODE: {
13501351
RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
13511352
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
1353+
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
13521354
break;
13531355
}
13541356
case TPlanNodeType::EXCEPT_NODE: {
13551357
RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
13561358
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
1359+
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
13571360
break;
13581361
}
13591362
case TPlanNodeType::REPEAT_NODE: {

be/src/vec/functions/function_string.cpp

+26-2
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ struct NameQuoteImpl {
8282
}
8383
};
8484

85-
struct NameStringLenght {
85+
struct NameStringLength {
8686
static constexpr auto name = "length";
8787
};
8888

@@ -104,6 +104,28 @@ struct StringLengthImpl {
104104
}
105105
};
106106

107+
struct NameCrc32 {
108+
static constexpr auto name = "crc32";
109+
};
110+
111+
struct Crc32Impl {
112+
using ReturnType = DataTypeInt64;
113+
static constexpr auto TYPE_INDEX = TypeIndex::String;
114+
using Type = String;
115+
using ReturnColumnType = ColumnVector<Int64>;
116+
117+
static Status vector(const ColumnString::Chars& data, const ColumnString::Offsets& offsets,
118+
PaddedPODArray<Int64>& res) {
119+
auto size = offsets.size();
120+
res.resize(size);
121+
for (int i = 0; i < size; ++i) {
122+
res[i] = crc32_z(0L, (const unsigned char*)data.data() + offsets[i - 1],
123+
offsets[i] - offsets[i - 1]);
124+
}
125+
return Status::OK();
126+
}
127+
};
128+
107129
struct NameStringUtf8Length {
108130
static constexpr auto name = "char_length";
109131
};
@@ -1073,7 +1095,8 @@ using StringFindInSetImpl = StringFunctionImpl<LeftDataType, RightDataType, Find
10731095

10741096
// ready for regist function
10751097
using FunctionStringASCII = FunctionUnaryToType<StringASCII, NameStringASCII>;
1076-
using FunctionStringLength = FunctionUnaryToType<StringLengthImpl, NameStringLenght>;
1098+
using FunctionStringLength = FunctionUnaryToType<StringLengthImpl, NameStringLength>;
1099+
using FunctionCrc32 = FunctionUnaryToType<Crc32Impl, NameCrc32>;
10771100
using FunctionStringUTF8Length = FunctionUnaryToType<StringUtf8LengthImpl, NameStringUtf8Length>;
10781101
using FunctionStringSpace = FunctionUnaryToType<StringSpace, NameStringSpace>;
10791102
using FunctionStringStartsWith =
@@ -1111,6 +1134,7 @@ using FunctionStringRPad = FunctionStringPad<StringRPad>;
11111134
void register_function_string(SimpleFunctionFactory& factory) {
11121135
factory.register_function<FunctionStringASCII>();
11131136
factory.register_function<FunctionStringLength>();
1137+
factory.register_function<FunctionCrc32>();
11141138
factory.register_function<FunctionStringUTF8Length>();
11151139
factory.register_function<FunctionStringSpace>();
11161140
factory.register_function<FunctionStringStartsWith>();

0 commit comments

Comments
 (0)