Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] master from apache:master #2

Merged
merged 17 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
ec5471f
[feature-wip](unique-key-merge-on-write) Implement tablet lookup inte…
zhannngchen Jul 20, 2022
a1c1cfc
Add some comments for the feature mow (#11028)
zhannngchen Jul 20, 2022
6233b52
[refactor] (Nereids) rename GroupExpression.getParent() to getOwnerGr…
englefly Jul 20, 2022
658a9f7
[fix](planner)unnecessary cast will be added on children in InPredica…
morrySnow Jul 20, 2022
7bdce8f
[refactor](policy) refactor some policy create and check logic (#11007)
morningman Jul 20, 2022
a71822a
[refactor]remove col_unique_id (#11025)
Lchangliang Jul 20, 2022
e5663f9
[Bug](array-type) Fix the core dump caused by unaligned __int128 (#11…
adonis0147 Jul 20, 2022
1ca00e0
[tools] add clickbench tools (#11009)
hello-stephen Jul 20, 2022
0a8ae6a
Refractor COLLECT_LIST and COLLECT_SET register logic (#10956)
cambyzju Jul 20, 2022
b62e3e7
[regression test]Add ssb sf1 test under unique table with zstd (#11004)
smallhibiscus Jul 20, 2022
a607c30
[docs] Fe build idea doc (#10996)
ByteYue Jul 20, 2022
6aadee9
[data lake]Support hdfs ha for Iceberg table. (#11002)
Jibing-Li Jul 20, 2022
2df1822
[bugfix]fix DCHECK failure in remove_all_remote_rowsets (#10994)
platoneko Jul 20, 2022
c037066
[fix](cache) fix that ShardedLRUCache may coredump when destructor wa…
liaoxin01 Jul 20, 2022
d9b6e07
[Vectorized] Support ODBC sink for vec exec engine (#11045)
HappenLee Jul 20, 2022
b95dedd
[doc]Gis function style (#11015)
hf200012 Jul 20, 2022
b115b36
[Bug] fix bug for function `unix_timestamp` (#11041)
Gabriel39 Jul 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -437,15 +437,16 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback() {
LOG(WARNING) << "drop table failed! signature: " << agent_task_req.signature;
error_msgs.push_back("drop table failed!");
status_code = TStatusCode::RUNTIME_ERROR;
}
// if tablet is dropped by fe, then the related txn should also be removed
StorageEngine::instance()->txn_manager()->force_rollback_tablet_related_txns(
dropped_tablet->data_dir()->get_meta(), drop_tablet_req.tablet_id,
drop_tablet_req.schema_hash, dropped_tablet->tablet_uid());
// We remove remote rowset directly.
// TODO(cyx): do remove in background
if (drop_tablet_req.is_drop_table_or_partition) {
dropped_tablet->remove_all_remote_rowsets();
} else {
// if tablet is dropped by fe, then the related txn should also be removed
StorageEngine::instance()->txn_manager()->force_rollback_tablet_related_txns(
dropped_tablet->data_dir()->get_meta(), drop_tablet_req.tablet_id,
drop_tablet_req.schema_hash, dropped_tablet->tablet_uid());
// We remove remote rowset directly.
// TODO(cyx): do remove in background
if (drop_tablet_req.is_drop_table_or_partition) {
dropped_tablet->remove_all_remote_rowsets();
}
}
} else {
status_code = TStatusCode::NOT_FOUND;
Expand Down
8 changes: 6 additions & 2 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "runtime/runtime_state.h"
#include "vec/sink/vdata_stream_sender.h"
#include "vec/sink/vmysql_table_sink.h"
#include "vec/sink/vodbc_table_sink.h"
#include "vec/sink/vresult_file_sink.h"
#include "vec/sink/vresult_sink.h"
#include "vec/sink/vtablet_sink.h"
Expand Down Expand Up @@ -156,8 +157,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
if (!thrift_sink.__isset.odbc_table_sink) {
return Status::InternalError("Missing data odbc sink.");
}
OdbcTableSink* odbc_tbl_sink = new OdbcTableSink(pool, row_desc, output_exprs);
sink->reset(odbc_tbl_sink);
if (is_vec) {
sink->reset(new vectorized::VOdbcTableSink(pool, row_desc, output_exprs));
} else {
sink->reset(new OdbcTableSink(pool, row_desc, output_exprs));
}
break;
}

Expand Down
112 changes: 112 additions & 0 deletions be/src/exec/odbc_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
#include "runtime/primitive_type.h"
#include "util/mysql_global.h"
#include "util/types.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"

#define ODBC_DISPOSE(h, ht, x, op) \
{ \
Expand Down Expand Up @@ -395,4 +398,113 @@ std::string ODBCConnector::handle_diagnostic_record(SQLHANDLE hHandle, SQLSMALLI
return diagnostic_msg;
}

Status ODBCConnector::append(const std::string& table_name, vectorized::Block* block,
const std::vector<vectorized::VExprContext*>& _output_vexpr_ctxs,
uint32_t start_send_row, uint32_t* num_rows_sent) {
_insert_stmt_buffer.clear();
std::u16string insert_stmt;
{
SCOPED_TIMER(_convert_tuple_timer);
fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", table_name);

int num_rows = block->rows();
int num_columns = block->columns();
for (int i = start_send_row; i < num_rows; ++i) {
(*num_rows_sent)++;

// Construct insert statement of odbc table
for (int j = 0; j < num_columns; ++j) {
if (j != 0) {
fmt::format_to(_insert_stmt_buffer, "{}", ", ");
}
auto [item, size] = block->get_by_position(j).column->get_data_at(i);
if (item == nullptr) {
fmt::format_to(_insert_stmt_buffer, "{}", "NULL");
continue;
}
switch (_output_vexpr_ctxs[j]->root()->type().type) {
case TYPE_BOOLEAN:
case TYPE_TINYINT:
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const int8_t*>(item));
break;
case TYPE_SMALLINT:
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const int16_t*>(item));
break;
case TYPE_INT:
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const int32_t*>(item));
break;
case TYPE_BIGINT:
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const int64_t*>(item));
break;
case TYPE_FLOAT:
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const float*>(item));
break;
case TYPE_DOUBLE:
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const double*>(item));
break;
case TYPE_DATE:
case TYPE_DATETIME: {
vectorized::VecDateTimeValue value =
binary_cast<int64_t, doris::vectorized::VecDateTimeValue>(
*(int64_t*)item);

char buf[64];
char* pos = value.to_string(buf);
std::string_view str(buf, pos - buf - 1);
fmt::format_to(_insert_stmt_buffer, "'{}'", str);
break;
}
case TYPE_VARCHAR:
case TYPE_CHAR:
case TYPE_STRING: {
fmt::format_to(_insert_stmt_buffer, "'{}'", fmt::basic_string_view(item, size));
break;
}
case TYPE_DECIMALV2: {
DecimalV2Value value = *(DecimalV2Value*)(item);
fmt::format_to(_insert_stmt_buffer, "{}", value.to_string());
break;
}
case TYPE_LARGEINT: {
fmt::format_to(_insert_stmt_buffer, "{}",
*reinterpret_cast<const __int128*>(item));
break;
}
default: {
return Status::InternalError("can't convert this type to mysql type. type = {}",
_output_expr_ctxs[j]->root()->type().type);
}
}
}

if (i < num_rows - 1 && _insert_stmt_buffer.size() < INSERT_BUFFER_SIZE) {
fmt::format_to(_insert_stmt_buffer, "{}", "),(");
} else {
// batch exhausted or _insert_stmt_buffer is full, need to do real insert stmt
fmt::format_to(_insert_stmt_buffer, "{}", ")");
break;
}
}
// Translate utf8 string to utf16 to use unicode encodeing
insert_stmt = utf8_to_wstring(
std::string(_insert_stmt_buffer.data(),
_insert_stmt_buffer.data() + _insert_stmt_buffer.size()));
}

{
SCOPED_TIMER(_result_send_timer);
ODBC_DISPOSE(_stmt, SQL_HANDLE_STMT,
SQLExecDirectW(_stmt, (SQLWCHAR*)(insert_stmt.c_str()), SQL_NTS),
_insert_stmt_buffer.data());
}
COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent);
return Status::OK();
}

} // namespace doris
10 changes: 9 additions & 1 deletion be/src/exec/odbc_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@

namespace doris {

namespace vectorized {
class VExprContext;
}

struct ODBCConnectorParam {
std::string connect_string;

Expand Down Expand Up @@ -72,7 +76,11 @@ class ODBCConnector {
// write for ODBC table
Status init_to_write(RuntimeProfile* profile);
Status append(const std::string& table_name, RowBatch* batch, uint32_t start_send_row,
uint32_t* num_row_sent);
uint32_t* num_rows_sent);

Status append(const std::string& table_name, vectorized::Block* block,
const std::vector<vectorized::VExprContext*>& _output_vexpr_ctxs,
uint32_t start_send_row, uint32_t* num_rows_sent);

// use in ODBC transaction
Status begin_trans(); // should be call after connect and before query or init_to_write
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/lru_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,14 +462,14 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity,
}

ShardedLRUCache::~ShardedLRUCache() {
_entity->deregister_hook(_name);
DorisMetrics::instance()->metric_registry()->deregister_entity(_entity);
if (_shards) {
for (int s = 0; s < _num_shards; s++) {
delete _shards[s];
}
delete[] _shards;
}
_entity->deregister_hook(_name);
DorisMetrics::instance()->metric_registry()->deregister_entity(_entity);
}

Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, void* value, size_t charge,
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/primary_key_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ namespace doris {
// The primary key index is designed in a similar way like RocksDB
// Partitioned Index, which is created in the segment file when MemTable flushes.
// Index is stored in multiple pages to leverage the IndexedColumnWriter.
//
// NOTE: for now, it's only used when unique key merge-on-write property enabled.
class PrimaryKeyIndexBuilder {
public:
PrimaryKeyIndexBuilder(io::FileWriter* file_writer)
Expand Down
76 changes: 73 additions & 3 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "olap/storage_policy_mgr.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_meta_manager.h"
#include "segment_loader.h"
#include "util/path_util.h"
#include "util/pretty_printer.h"
#include "util/scoped_cleanup.h"
Expand Down Expand Up @@ -111,6 +112,7 @@ Status Tablet::_init_once_action() {
_cumulative_compaction_type);
#endif

RowsetVector rowset_vec;
for (const auto& rs_meta : _tablet_meta->all_rs_metas()) {
Version version = rs_meta->version();
RowsetSharedPtr rowset;
Expand All @@ -121,6 +123,7 @@ Status Tablet::_init_once_action() {
<< ", res=" << res;
return res;
}
rowset_vec.push_back(rowset);
_rs_version_map[version] = std::move(rowset);
}

Expand All @@ -138,6 +141,10 @@ Status Tablet::_init_once_action() {
_stale_rs_version_map[version] = std::move(rowset);
}

if (_schema.keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
_rowset_tree = std::make_unique<RowsetTree>();
res = _rowset_tree->Init(rowset_vec);
}
return res;
}

Expand Down Expand Up @@ -235,6 +242,13 @@ Status Tablet::add_rowset(RowsetSharedPtr rowset) {
_rs_version_map[rowset->version()] = rowset;
_timestamped_version_tracker.add_version(rowset->version());

// Update rowset tree
if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
auto new_rowset_tree = std::make_unique<RowsetTree>();
ModifyRowSetTree(*_rowset_tree, {}, {rowset}, new_rowset_tree.get());
_rowset_tree = std::move(new_rowset_tree);
}

std::vector<RowsetSharedPtr> rowsets_to_delete;
// yiguolei: temp code, should remove the rowset contains by this rowset
// but it should be removed in multi path version
Expand Down Expand Up @@ -267,6 +281,10 @@ Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
// In this case, we no longer need to add the rowset in "to_delete" to
// _stale_rs_version_map, but can delete it directly.

if (to_add.empty() && to_delete.empty()) {
return Status::OK();
}

bool same_version = true;
std::sort(to_add.begin(), to_add.end(), Rowset::comparator);
std::sort(to_delete.begin(), to_delete.end(), Rowset::comparator);
Expand Down Expand Up @@ -323,6 +341,13 @@ Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,

_tablet_meta->modify_rs_metas(rs_metas_to_add, rs_metas_to_delete, same_version);

// Update rowset tree
if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
auto new_rowset_tree = std::make_unique<RowsetTree>();
ModifyRowSetTree(*_rowset_tree, to_delete, to_add, new_rowset_tree.get());
_rowset_tree = std::move(new_rowset_tree);
}

if (!same_version) {
// add rs_metas_to_delete to tracker
_timestamped_version_tracker.add_stale_path_version(rs_metas_to_delete);
Expand Down Expand Up @@ -410,6 +435,13 @@ Status Tablet::add_inc_rowset(const RowsetSharedPtr& rowset) {
RETURN_NOT_OK(_tablet_meta->add_rs_meta(rowset->rowset_meta()));
_rs_version_map[rowset->version()] = rowset;

// Update rowset tree
if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
auto new_rowset_tree = std::make_unique<RowsetTree>();
ModifyRowSetTree(*_rowset_tree, {}, {rowset}, new_rowset_tree.get());
_rowset_tree = std::move(new_rowset_tree);
}

_timestamped_version_tracker.add_version(rowset->version());

++_newly_created_rowset_num;
Expand Down Expand Up @@ -1007,6 +1039,11 @@ void Tablet::delete_all_files() {
it.second->remove();
}
_stale_rs_version_map.clear();

if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
// clear rowset_tree
_rowset_tree = std::make_unique<RowsetTree>();
}
}

bool Tablet::check_path(const std::string& path_to_check) const {
Expand Down Expand Up @@ -1817,9 +1854,42 @@ const TabletSchema& Tablet::tablet_schema() const {
return *rowset_meta->tablet_schema();
}

Status Tablet::lookup_row_key(const Slice& encoded_key, RowLocation* row_location) {
// TODO(zhannngchen): to be implemented in next patch, align with rowset-tree usage and
// update.
Status Tablet::lookup_row_key(const Slice& encoded_key, RowLocation* row_location,
uint32_t version) {
std::vector<std::pair<RowsetSharedPtr, int32_t>> selected_rs;
_rowset_tree->FindRowsetsWithKeyInRange(encoded_key, &selected_rs);
if (selected_rs.empty()) {
return Status::NotFound("No rowsets contains the key in key range");
}
// Usually newly written data has a higher probability of being modified, so prefer
// to search the key in the rowset with larger version.
std::sort(selected_rs.begin(), selected_rs.end(),
[](std::pair<RowsetSharedPtr, int32_t>& a, std::pair<RowsetSharedPtr, int32_t>& b) {
return a.first->end_version() > b.first->end_version();
});
RowLocation loc;
for (auto& rs : selected_rs) {
if (rs.first->end_version() > version) {
continue;
}
SegmentCacheHandle segment_cache_handle;
RETURN_NOT_OK(SegmentLoader::instance()->load_segments(
std::static_pointer_cast<BetaRowset>(rs.first), &segment_cache_handle, true));
auto& segments = segment_cache_handle.get_segments();
DCHECK_GT(segments.size(), rs.second);
Status s = segments[rs.second]->lookup_row_key(encoded_key, &loc);
if (s.is_not_found()) {
continue;
}
if (!s.ok()) {
return s;
}
loc.rowset_id = rs.first->rowset_id();
// Check delete bitmap, if the row
*row_location = loc;
// find it and return
return s;
}
return Status::NotFound("can't find key in all rowsets");
}

Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "olap/olap_define.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_reader.h"
#include "olap/rowset/rowset_tree.h"
#include "olap/tablet_meta.h"
#include "olap/tuple.h"
#include "olap/utils.h"
Expand Down Expand Up @@ -307,7 +308,7 @@ class Tablet : public BaseTablet {
// Lookup the row location of `encoded_key`, the function sets `row_location` on success.
// NOTE: the method only works in unique key model with primary key index, you will got a
// not supported error in other data model.
Status lookup_row_key(const Slice& encoded_key, RowLocation* row_location);
Status lookup_row_key(const Slice& encoded_key, RowLocation* row_location, uint32_t version);

private:
Status _init_once_action();
Expand Down Expand Up @@ -362,6 +363,9 @@ class Tablet : public BaseTablet {
// These _stale rowsets are been removed when rowsets' pathVersion is expired,
// this policy is judged and computed by TimestampedVersionTracker.
std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _stale_rs_version_map;
// RowsetTree is used to locate rowsets containing a key or a key range quickly.
// It's only used in UNIQUE_KEYS data model.
std::unique_ptr<RowsetTree> _rowset_tree;
// if this tablet is broken, set to true. default is false
std::atomic<bool> _is_bad;
// timestamp of last cumu compaction failure
Expand Down
Loading