@@ -61,9 +61,9 @@ Status MultiTablePipe::append_json(const char* data, size_t size) {
61
61
}
62
62
63
63
KafkaConsumerPipePtr MultiTablePipe::get_pipe_by_table (const std::string& table) {
64
- auto pipe = _planned_pipes .find (table);
65
- DCHECK (pipe != _planned_pipes .end ());
66
- return pipe ->second ;
64
+ auto pair = _planned_tables .find (table);
65
+ DCHECK (pair != _planned_tables .end ());
66
+ return std::static_pointer_cast<io::KafkaConsumerPipe>(pair ->second -> pipe ) ;
67
67
}
68
68
69
69
static std::string_view get_first_part (const char * dat, char delimiter) {
@@ -78,15 +78,15 @@ static std::string_view get_first_part(const char* dat, char delimiter) {
78
78
}
79
79
80
80
Status MultiTablePipe::finish () {
81
- for (auto & pair : _planned_pipes ) {
82
- RETURN_IF_ERROR (pair.second ->finish ());
81
+ for (auto & pair : _planned_tables ) {
82
+ RETURN_IF_ERROR (pair.second ->pipe -> finish ());
83
83
}
84
84
return Status::OK ();
85
85
}
86
86
87
87
void MultiTablePipe::cancel (const std::string& reason) {
88
- for (auto & pair : _planned_pipes ) {
89
- pair.second ->cancel (reason);
88
+ for (auto & pair : _planned_tables ) {
89
+ pair.second ->pipe -> cancel (reason);
90
90
}
91
91
}
92
92
@@ -101,19 +101,29 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size
101
101
return Status::InternalError (" empty data" );
102
102
}
103
103
KafkaConsumerPipePtr pipe = nullptr ;
104
- auto iter = _planned_pipes .find (table);
105
- if (iter != _planned_pipes .end ()) {
106
- pipe = iter->second ;
104
+ auto iter = _planned_tables .find (table);
105
+ if (iter != _planned_tables .end ()) {
106
+ pipe = std::static_pointer_cast<io::KafkaConsumerPipe>( iter->second -> pipe ) ;
107
107
RETURN_NOT_OK_STATUS_WITH_WARN ((pipe .get ()->*cb)(data, size),
108
108
" append failed in planned kafka pipe" );
109
109
} else {
110
- iter = _unplanned_pipes.find (table);
111
- if (iter == _unplanned_pipes.end ()) {
110
+ iter = _unplanned_tables.find (table);
111
+ if (iter == _unplanned_tables.end ()) {
112
+ std::shared_ptr<StreamLoadContext> ctx =
113
+ std::make_shared<StreamLoadContext>(doris::ExecEnv::GetInstance ());
114
+ ctx->id = UniqueId::gen_uid ();
112
115
pipe = std::make_shared<io::KafkaConsumerPipe>();
113
- LOG (INFO) << " create new unplanned pipe: " << pipe .get () << " , ctx: " << _ctx->brief ();
114
- _unplanned_pipes.emplace (table, pipe );
116
+ ctx->pipe = pipe ;
117
+ #ifndef BE_TEST
118
+ RETURN_NOT_OK_STATUS_WITH_WARN (
119
+ doris::ExecEnv::GetInstance ()->new_load_stream_mgr ()->put (ctx->id , ctx),
120
+ " put stream load ctx error" );
121
+ #endif
122
+ _unplanned_tables.emplace (table, ctx);
123
+ LOG (INFO) << " create new unplanned table ctx, table: " << table
124
+ << " load id: " << ctx->id << " , txn id: " << _ctx->txn_id ;
115
125
} else {
116
- pipe = iter->second ;
126
+ pipe = std::static_pointer_cast<io::KafkaConsumerPipe>( iter->second -> pipe ) ;
117
127
}
118
128
119
129
// It is necessary to determine whether the sum of pipe_current_capacity and size is greater than pipe_max_capacity,
@@ -124,7 +134,7 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size
124
134
auto pipe_current_capacity = pipe ->current_capacity ();
125
135
auto pipe_max_capacity = pipe ->max_capacity ();
126
136
if (_unplanned_row_cnt >= _row_threshold ||
127
- _unplanned_pipes .size () >= _wait_tables_threshold ||
137
+ _unplanned_tables .size () >= _wait_tables_threshold ||
128
138
pipe_current_capacity + size > pipe_max_capacity) {
129
139
LOG (INFO) << fmt::format (
130
140
" unplanned row cnt={} reach row_threshold={} or "
@@ -151,112 +161,106 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size
151
161
152
162
#ifndef BE_TEST
153
163
Status MultiTablePipe::request_and_exec_plans () {
154
- if (_unplanned_pipes .empty ()) {
164
+ if (_unplanned_tables .empty ()) {
155
165
return Status::OK ();
156
166
}
157
167
158
- // get list of table names in unplanned pipes
159
- std::vector<std::string> tables;
160
168
fmt::memory_buffer log_buffer;
161
169
log_buffer.clear ();
162
- fmt::format_to (log_buffer, " request plans for {} tables: [ " , _unplanned_pipes.size ());
163
- for (auto & pair : _unplanned_pipes) {
164
- tables.push_back (pair.first );
170
+ fmt::format_to (log_buffer, " request plans for {} tables: [ " , _unplanned_tables.size ());
171
+ for (auto & pair : _unplanned_tables) {
165
172
fmt::format_to (log_buffer, " {} " , pair.first );
166
173
}
167
174
fmt::format_to (log_buffer, " ]" );
168
175
LOG (INFO) << fmt::to_string (log_buffer);
169
176
170
- TStreamLoadPutRequest request;
171
- set_request_auth (&request, _ctx->auth );
172
- request.db = _ctx->db ;
173
- request.table_names = tables;
174
- request.__isset .table_names = true ;
175
- request.txnId = _ctx->txn_id ;
176
- request.formatType = _ctx->format ;
177
- request.__set_compress_type (_ctx->compress_type );
178
- request.__set_header_type (_ctx->header_type );
179
- request.__set_loadId (_ctx->id .to_thrift ());
180
- request.fileType = TFileType::FILE_STREAM;
181
- request.__set_thrift_rpc_timeout_ms (config::thrift_rpc_timeout_ms);
182
- request.__set_memtable_on_sink_node (_ctx->memtable_on_sink_node );
183
- request.__set_user (_ctx->qualified_user );
184
- request.__set_cloud_cluster (_ctx->cloud_cluster );
185
- // no need to register new_load_stream_mgr coz it is already done in routineload submit task
186
-
187
- // plan this load
188
- ExecEnv* exec_env = doris::ExecEnv::GetInstance ();
189
- TNetworkAddress master_addr = exec_env->master_info ()->network_address ;
190
- int64_t stream_load_put_start_time = MonotonicNanos ();
191
- RETURN_IF_ERROR (ThriftRpcHelper::rpc<FrontendServiceClient>(
192
- master_addr.hostname , master_addr.port ,
193
- [&request, this ](FrontendServiceConnection& client) {
194
- client->streamLoadMultiTablePut (_ctx->multi_table_put_result , request);
195
- }));
196
- _ctx->stream_load_put_cost_nanos = MonotonicNanos () - stream_load_put_start_time;
197
-
198
- Status plan_status (Status::create (_ctx->multi_table_put_result .status ));
199
- if (!plan_status.ok ()) {
200
- LOG (WARNING) << " plan streaming load failed. errmsg=" << plan_status << _ctx->brief ();
201
- return plan_status;
202
- }
203
-
204
177
Status st;
205
- if (_ctx->multi_table_put_result .__isset .params &&
206
- !_ctx->multi_table_put_result .__isset .pipeline_params ) {
207
- st = exec_plans (exec_env, _ctx->multi_table_put_result .params );
208
- } else if (!_ctx->multi_table_put_result .__isset .params &&
209
- _ctx->multi_table_put_result .__isset .pipeline_params ) {
210
- st = exec_plans (exec_env, _ctx->multi_table_put_result .pipeline_params );
211
- } else {
212
- return Status::Aborted (" too many or too few params are set in multi_table_put_result." );
213
- }
178
+ for (auto & pair : _unplanned_tables) {
179
+ TStreamLoadPutRequest request;
180
+ set_request_auth (&request, _ctx->auth );
181
+ std::vector<std::string> tables;
182
+ tables.push_back (pair.first );
183
+ request.db = _ctx->db ;
184
+ request.table_names = tables;
185
+ request.__isset .table_names = true ;
186
+ request.txnId = _ctx->txn_id ;
187
+ request.formatType = _ctx->format ;
188
+ request.__set_compress_type (_ctx->compress_type );
189
+ request.__set_header_type (_ctx->header_type );
190
+ request.__set_loadId ((pair.second ->id ).to_thrift ());
191
+ request.fileType = TFileType::FILE_STREAM;
192
+ request.__set_thrift_rpc_timeout_ms (config::thrift_rpc_timeout_ms);
193
+ request.__set_memtable_on_sink_node (_ctx->memtable_on_sink_node );
194
+ request.__set_user (_ctx->qualified_user );
195
+ request.__set_cloud_cluster (_ctx->cloud_cluster );
196
+ // no need to register new_load_stream_mgr coz it is already done in routineload submit task
197
+
198
+ // plan this load
199
+ ExecEnv* exec_env = doris::ExecEnv::GetInstance ();
200
+ TNetworkAddress master_addr = exec_env->master_info ()->network_address ;
201
+ int64_t stream_load_put_start_time = MonotonicNanos ();
202
+ RETURN_IF_ERROR (ThriftRpcHelper::rpc<FrontendServiceClient>(
203
+ master_addr.hostname , master_addr.port ,
204
+ [&request, this ](FrontendServiceConnection& client) {
205
+ client->streamLoadMultiTablePut (_ctx->multi_table_put_result , request);
206
+ }));
207
+ _ctx->stream_load_put_cost_nanos = MonotonicNanos () - stream_load_put_start_time;
208
+
209
+ Status plan_status (Status::create (_ctx->multi_table_put_result .status ));
210
+ if (!plan_status.ok ()) {
211
+ LOG (WARNING) << " plan streaming load failed. errmsg=" << plan_status << _ctx->brief ();
212
+ return plan_status;
213
+ }
214
214
215
+ if (_ctx->multi_table_put_result .__isset .params &&
216
+ !_ctx->multi_table_put_result .__isset .pipeline_params ) {
217
+ st = exec_plans (exec_env, _ctx->multi_table_put_result .params );
218
+ } else if (!_ctx->multi_table_put_result .__isset .params &&
219
+ _ctx->multi_table_put_result .__isset .pipeline_params ) {
220
+ st = exec_plans (exec_env, _ctx->multi_table_put_result .pipeline_params );
221
+ } else {
222
+ return Status::Aborted (" too many or too few params are set in multi_table_put_result." );
223
+ }
224
+ if (!st.ok ()) {
225
+ return st;
226
+ }
227
+ }
228
+ _unplanned_tables.clear ();
215
229
return st;
216
230
}
217
231
218
232
template <typename ExecParam>
219
233
Status MultiTablePipe::exec_plans (ExecEnv* exec_env, std::vector<ExecParam> params) {
220
234
// put unplanned pipes into planned pipes and clear unplanned pipes
221
- for (auto & pipe : _unplanned_pipes ) {
222
- _ctx->table_list .push_back (pipe .first );
223
- _planned_pipes .emplace (pipe .first , pipe .second );
235
+ for (auto & pair : _unplanned_tables ) {
236
+ _ctx->table_list .push_back (pair .first );
237
+ _planned_tables .emplace (pair .first , pair .second );
224
238
}
225
239
LOG (INFO) << fmt::format (" {} tables plan complete, planned table cnt={}, returned plan cnt={}" ,
226
- _unplanned_pipes .size (), _planned_pipes .size (), params.size ())
240
+ _unplanned_tables .size (), _planned_tables .size (), params.size ())
227
241
<< " , ctx: " << _ctx->brief ();
228
- _unplanned_pipes.clear ();
229
242
230
243
for (auto & plan : params) {
231
244
DBUG_EXECUTE_IF (" MultiTablePipe.exec_plans.failed" ,
232
245
{ return Status::Aborted (" MultiTablePipe.exec_plans.failed" ); });
233
246
if (!plan.__isset .table_name ||
234
- _planned_pipes .find (plan.table_name ) == _planned_pipes .end ()) {
247
+ _unplanned_tables .find (plan.table_name ) == _unplanned_tables .end ()) {
235
248
return Status::Aborted (" Missing vital param: table_name" );
236
249
}
237
250
238
- if constexpr (std::is_same_v<ExecParam, TExecPlanFragmentParams>) {
239
- RETURN_IF_ERROR (
240
- put_pipe (plan.params .fragment_instance_id , _planned_pipes[plan.table_name ]));
241
- LOG (INFO) << " fragment_instance_id=" << print_id (plan.params .fragment_instance_id )
242
- << " table=" << plan.table_name << " , ctx: " << _ctx->brief ();
243
- } else if constexpr (std::is_same_v<ExecParam, TPipelineFragmentParams>) {
244
- auto pipe_id = calculate_pipe_id (plan.query_id , plan.fragment_id );
245
- RETURN_IF_ERROR (put_pipe (pipe_id, _planned_pipes[plan.table_name ]));
246
- LOG (INFO) << " pipe_id=" << pipe_id << " , table=" << plan.table_name
247
- << " , ctx: " << _ctx->brief ();
248
- } else {
249
- LOG (WARNING) << " illegal exec param type, need `TExecPlanFragmentParams` or "
250
- " `TPipelineFragmentParams`, will crash"
251
- << " , ctx: " << _ctx->brief ();
252
- CHECK (false );
253
- }
254
-
255
251
_inflight_cnt++;
256
252
257
253
RETURN_IF_ERROR (exec_env->fragment_mgr ()->exec_plan_fragment (
258
- plan, [this ](RuntimeState* state, Status* status) {
254
+ plan, [this , plan ](RuntimeState* state, Status* status) {
259
255
DCHECK (state);
256
+ auto pair = _planned_tables.find (plan.table_name );
257
+ if (pair == _planned_tables.end ()) {
258
+ LOG (WARNING) << " failed to get ctx, table: " << plan.table_name ;
259
+ } else {
260
+ doris::ExecEnv::GetInstance ()->new_load_stream_mgr ()->remove (
261
+ pair->second ->id );
262
+ }
263
+
260
264
{
261
265
std::lock_guard<std::mutex> l (_tablet_commit_infos_lock);
262
266
_tablet_commit_infos.insert (_tablet_commit_infos.end (),
@@ -300,12 +304,12 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para
300
304
#else
301
305
Status MultiTablePipe::request_and_exec_plans () {
302
306
// put unplanned pipes into planned pipes
303
- for (auto & pipe : _unplanned_pipes ) {
304
- _planned_pipes .emplace (pipe .first , pipe .second );
307
+ for (auto & pipe : _unplanned_tables ) {
308
+ _planned_tables .emplace (pipe .first , pipe .second );
305
309
}
306
310
LOG (INFO) << fmt::format (" {} tables plan complete, planned table cnt={}" ,
307
- _unplanned_pipes .size (), _planned_pipes .size ());
308
- _unplanned_pipes .clear ();
311
+ _unplanned_tables .size (), _planned_tables .size ());
312
+ _unplanned_tables .clear ();
309
313
return Status::OK ();
310
314
}
311
315
@@ -330,35 +334,6 @@ void MultiTablePipe::_handle_consumer_finished() {
330
334
_ctx->promise .set_value (_status); // when all done, finish the routine load task
331
335
}
332
336
333
- Status MultiTablePipe::put_pipe (const TUniqueId& pipe_id,
334
- std::shared_ptr<io::StreamLoadPipe> pipe) {
335
- std::lock_guard<std::mutex> l (_pipe_map_lock);
336
- auto it = _pipe_map.find (pipe_id);
337
- if (it != std::end (_pipe_map)) {
338
- return Status::InternalError (" id already exist" );
339
- }
340
- _pipe_map.emplace (pipe_id, pipe );
341
- return Status::OK ();
342
- }
343
-
344
- std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::get_pipe (const TUniqueId& pipe_id) {
345
- std::lock_guard<std::mutex> l (_pipe_map_lock);
346
- auto it = _pipe_map.find (pipe_id);
347
- if (it == std::end (_pipe_map)) {
348
- return {};
349
- }
350
- return it->second ;
351
- }
352
-
353
- void MultiTablePipe::remove_pipe (const TUniqueId& pipe_id) {
354
- std::lock_guard<std::mutex> l (_pipe_map_lock);
355
- auto it = _pipe_map.find (pipe_id);
356
- if (it != std::end (_pipe_map)) {
357
- _pipe_map.erase (it);
358
- VLOG_NOTICE << " remove stream load pipe: " << pipe_id;
359
- }
360
- }
361
-
362
337
template Status MultiTablePipe::exec_plans (ExecEnv* exec_env,
363
338
std::vector<TExecPlanFragmentParams> params);
364
339
template Status MultiTablePipe::exec_plans (ExecEnv* exec_env,
0 commit comments