Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](cloud) manage node via sql like non cloud mode #40264

Merged
merged 18 commits into from
Sep 12, 2024
32 changes: 32 additions & 0 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <ostream>
#include <string>

#include "cloud/config.h"
#include "common/config.h"
#include "common/status.h"
#include "olap/storage_engine.h"
Expand Down Expand Up @@ -244,6 +245,37 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
_engine.notify_listeners();
}

if (master_info.__isset.meta_service_endpoint != config::is_cloud_mode()) {
return Status::InvalidArgument(
"fe and be do not work in same mode, fe cloud mode: {},"
" be cloud mode: {}",
master_info.__isset.meta_service_endpoint, config::is_cloud_mode());
}

if (master_info.__isset.meta_service_endpoint && config::meta_service_endpoint.empty() &&
!master_info.meta_service_endpoint.empty()) {
auto st = config::set_config("meta_service_endpoint", master_info.meta_service_endpoint,
true);
LOG(INFO) << "set config meta_service_endpoing " << master_info.meta_service_endpoint << " "
<< st;
}

if (master_info.__isset.cloud_instance_id) {
if (!config::cloud_instance_id.empty() &&
config::cloud_instance_id != master_info.cloud_instance_id) {
return Status::InvalidArgument(
"cloud_instance_id in fe.conf and be.conf are not same, fe: {}, be: {}",
master_info.cloud_instance_id, config::cloud_instance_id);
}

if (config::cloud_instance_id.empty() && !master_info.cloud_instance_id.empty()) {
auto st = config::set_config("cloud_instance_id", master_info.cloud_instance_id, true);
config::set_cloud_unique_id(master_info.cloud_instance_id);
LOG(INFO) << "set config cloud_instance_id " << master_info.cloud_instance_id << " "
<< st;
}
}

return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos, bool
j->mutable_obj_info()->set_sk(j->obj_info().sk().substr(0, 2) + "xxx");
}

LOG(INFO) << "get storage vault, enable_storage_vault=" << is_vault_mode
LOG(INFO) << "get storage vault, enable_storage_vault=" << *is_vault_mode
<< " response=" << resp.ShortDebugString();
return Status::OK();
}
Expand Down
24 changes: 1 addition & 23 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,29 +160,7 @@ struct RefreshFSVaultVisitor {
};

Status CloudStorageEngine::open() {
cloud::StorageVaultInfos vault_infos;
bool enable_storage_vault = false;
do {
auto st = _meta_mgr->get_storage_vault_info(&vault_infos, &enable_storage_vault);
if (st.ok()) {
break;
}

LOG(WARNING) << "failed to get vault info, retry after 5s, err=" << st;
std::this_thread::sleep_for(5s);
} while (vault_infos.empty());

for (auto& [id, vault_info, path_format] : vault_infos) {
if (auto st = std::visit(VaultCreateFSVisitor {id, path_format}, vault_info); !st.ok())
[[unlikely]] {
return vault_process_error(id, vault_info, std::move(st));
}
}

// vault mode should not support latest_fs to get rid of unexpected storage backends choosen
if (!enable_storage_vault) {
set_latest_fs(get_filesystem(std::get<0>(vault_infos.back())));
}
sync_storage_vault();

// TODO(plat1ko): DeleteBitmapTxnManager

Expand Down
21 changes: 17 additions & 4 deletions be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,27 @@ class CloudStorageEngine final : public BaseStorageEngine {
}
void _check_file_cache_ttl_block_valid();

std::optional<StorageResource> get_storage_resource(const std::string& vault_id) const {
std::optional<StorageResource> get_storage_resource(const std::string& vault_id) {
LOG(INFO) << "Getting storage resource for vault_id: " << vault_id;
if (vault_id.empty()) {
if (latest_fs() == nullptr) {
LOG(INFO) << "there is not latest fs";
return std::nullopt;
}
return StorageResource {latest_fs()};
}

if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) {
return storage_resource->first;
}
bool synced = false;
do {
if (auto storage_resource = doris::get_storage_resource(vault_id); storage_resource) {
return storage_resource->first;
}
if (synced) {
break;
}
sync_storage_vault();
synced = true;
} while (true);

return std::nullopt;
}
Expand Down
14 changes: 12 additions & 2 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

#include "cloud/config.h"

#include "common/status.h"

namespace doris::config {

DEFINE_String(cloud_unique_id, "");
DEFINE_String(meta_service_endpoint, "");
DEFINE_String(deploy_mode, "");
DEFINE_mString(cloud_instance_id, "");
DEFINE_mString(cloud_unique_id, "");
DEFINE_mString(meta_service_endpoint, "");
DEFINE_Bool(meta_service_use_load_balancer, "false");
DEFINE_mInt32(meta_service_rpc_timeout_ms, "10000");
DEFINE_Bool(meta_service_connection_pooled, "true");
Expand Down Expand Up @@ -64,4 +68,10 @@ DEFINE_mBool(enable_new_tablet_do_compaction, "false");

DEFINE_Bool(enable_cloud_txn_lazy_commit, "false");

void set_cloud_unique_id(std::string instance_id) {
if (cloud_unique_id.empty() && !instance_id.empty()) {
static_cast<void>(set_config("cloud_unique_id", "1:" + instance_id + ":compute", true));
}
}

} // namespace doris::config
11 changes: 8 additions & 3 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@

namespace doris::config {

DECLARE_String(cloud_unique_id);
DECLARE_String(deploy_mode);
// deprecated do not configure directly
DECLARE_mString(cloud_instance_id);
DECLARE_mString(cloud_unique_id);

static inline bool is_cloud_mode() {
return !cloud_unique_id.empty();
return deploy_mode == "cloud" || !cloud_unique_id.empty();
}

void set_cloud_unique_id(std::string instance_id);

// Set the endpoint of meta service.
//
// If meta services are deployed behind a load balancer, set this config to "host:port" of the load balancer.
Expand All @@ -40,7 +45,7 @@ static inline bool is_cloud_mode() {
// If you want to access a group of meta services directly, set the addresses of meta services to this config,
// separated by a comma, like "host:port,host:port,host:port", then BE will choose a server to connect in randomly.
// In this mode, The config meta_service_connection_pooled is still useful, but the other two configs will be ignored.
DECLARE_String(meta_service_endpoint);
DECLARE_mString(meta_service_endpoint);
// Set the underlying connection type to pooled.
DECLARE_Bool(meta_service_connection_pooled);
DECLARE_mInt64(meta_service_connection_pool_size);
Expand Down
4 changes: 3 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@
#include <utility>
#include <vector>

#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "config.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "util/cpu_info.h"
Expand Down Expand Up @@ -1663,6 +1663,8 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t
SET_FIELD(it.second, std::vector<std::string>, fill_conf_map, set_to_default);
}

set_cloud_unique_id(cloud_instance_id);

return true;
}

Expand Down
5 changes: 2 additions & 3 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2338,9 +2338,8 @@ void MetaServiceImpl::get_cluster(google::protobuf::RpcController* controller,
std::find_if(instance.storage_vault_names().begin(), instance.storage_vault_names().end(),
[](const std::string& name) { return name == BUILT_IN_STORAGE_VAULT_NAME; }) ==
instance.storage_vault_names().end()) {
code = MetaServiceCode::STORAGE_VAULT_NOT_FOUND;
msg = "instance has no built in storage vault";
return;
LOG_EVERY_N(INFO, 100) << "There is no builtin vault in instance "
<< instance.instance_id();
}

auto get_cluster_mysql_user = [](const ClusterPB& c, std::set<std::string>* mysql_users) {
Expand Down
4 changes: 3 additions & 1 deletion cloud/src/resource-manager/resource_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ bool ResourceManager::check_cluster_params_valid(const ClusterPB& cluster, std::
n.heartbeat_port()) {
continue;
}
ss << "check cluster params failed, node : " << proto_to_json(n);
ss << "check cluster params failed, edit_log_port is required for frontends while "
"heatbeat_port is required for banckens, node : "
<< proto_to_json(n);
*err = ss.str();
no_err = false;
break;
Expand Down
80 changes: 62 additions & 18 deletions docker/runtime/doris-compose/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,15 +291,27 @@ def docker_env(self):
enable_coverage = self.cluster.coverage_dir

envs = {
"MY_IP": self.get_ip(),
"MY_ID": self.id,
"MY_TYPE": self.node_type(),
"FE_QUERY_PORT": FE_QUERY_PORT,
"FE_EDITLOG_PORT": FE_EDITLOG_PORT,
"BE_HEARTBEAT_PORT": BE_HEARTBEAT_PORT,
"DORIS_HOME": os.path.join(self.docker_home_dir()),
"STOP_GRACE": 1 if enable_coverage else 0,
"IS_CLOUD": 1 if self.cluster.is_cloud else 0,
"MY_IP":
self.get_ip(),
"MY_ID":
self.id,
"MY_TYPE":
self.node_type(),
"FE_QUERY_PORT":
FE_QUERY_PORT,
"FE_EDITLOG_PORT":
FE_EDITLOG_PORT,
"BE_HEARTBEAT_PORT":
BE_HEARTBEAT_PORT,
"DORIS_HOME":
os.path.join(self.docker_home_dir()),
"STOP_GRACE":
1 if enable_coverage else 0,
"IS_CLOUD":
1 if self.cluster.is_cloud else 0,
"SQL_MODE_NODE_MGR":
1 if hasattr(self.cluster, 'sql_mode_node_mgr')
and self.cluster.sql_mode_node_mgr else 0
}

if self.cluster.is_cloud:
Expand Down Expand Up @@ -390,15 +402,22 @@ def get_add_init_config(self):
cfg += self.cluster.fe_config
if self.cluster.is_cloud:
cfg += [
"cloud_unique_id = " + self.cloud_unique_id(),
"meta_service_endpoint = {}".format(
self.cluster.get_meta_server_addr()),
"",
self.cluster.get_meta_server_addr()), "",
"# For regression-test",
"ignore_unsupported_properties_in_cloud_mode = true",
"merge_on_write_forced_to_false = true",
"deploy_mode = cloud"
]

if self.cluster.sql_mode_node_mgr:
cfg += [
"cloud_instance_id = " + self.cloud_instance_id(),
]
else:
cfg += [
"cloud_unique_id = " + self.cloud_unique_id(),
]
return cfg

def init_is_follower(self):
Expand All @@ -420,6 +439,9 @@ def docker_env(self):
def cloud_unique_id(self):
return "sql_server_{}".format(self.id)

def cloud_instance_id(self):
return "reg_cloud_instance"

def entrypoint(self):
return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_fe.sh")]

Expand Down Expand Up @@ -450,14 +472,26 @@ def get_add_init_config(self):
cfg += self.cluster.be_config
if self.cluster.is_cloud:
cfg += [
"cloud_unique_id = " + self.cloud_unique_id(),
"meta_service_endpoint = {}".format(
self.cluster.get_meta_server_addr()),
'tmp_file_dirs = [ {"path":"./storage/tmp","max_cache_bytes":10240000, "max_upload_bytes":10240000}]',
'enable_file_cache = true',
'file_cache_path = [ {{"path": "{}/storage/file_cache", "total_size":53687091200, "query_limit": 10737418240}}]'
.format(self.docker_home_dir()),
"deploy_mode = cloud",
]

if self.cluster.be_metaservice_endpoint:
cfg += [
"meta_service_endpoint = {}".format(
self.cluster.get_meta_server_addr()),
]
if self.cluster.be_cloud_instanceid:
cfg += [
"cloud_instance_id = " + self.cloud_instance_id(),
]
if not self.cluster.sql_mode_node_mgr:
cfg += [
"cloud_unique_id = " + self.cloud_unique_id(),
]
return cfg

def init_cluster_name(self):
Expand Down Expand Up @@ -519,6 +553,9 @@ def docker_env(self):
def cloud_unique_id(self):
return "compute_node_{}".format(self.id)

def cloud_instance_id(self):
return "reg_cloud_instance"

def docker_home_dir(self):
return os.path.join(DOCKER_DORIS_PATH, "be")

Expand Down Expand Up @@ -628,7 +665,8 @@ class Cluster(object):

def __init__(self, name, subnet, image, is_cloud, fe_config, be_config,
ms_config, recycle_config, fe_follower, be_disks, be_cluster,
reg_be, coverage_dir, cloud_store_config):
reg_be, coverage_dir, cloud_store_config, sql_mode_node_mgr,
be_metaservice_endpoint, be_cloud_instanceid):
self.name = name
self.subnet = subnet
self.image = image
Expand All @@ -647,11 +685,15 @@ def __init__(self, name, subnet, image, is_cloud, fe_config, be_config,
node_type: Group(node_type)
for node_type in Node.TYPE_ALL
}
self.sql_mode_node_mgr = sql_mode_node_mgr
self.be_metaservice_endpoint = be_metaservice_endpoint
self.be_cloud_instanceid = be_cloud_instanceid

@staticmethod
def new(name, image, is_cloud, fe_config, be_config, ms_config,
recycle_config, fe_follower, be_disks, be_cluster, reg_be,
coverage_dir, cloud_store_config):
coverage_dir, cloud_store_config, sql_mode_node_mgr,
be_metaservice_endpoint, be_cloud_instanceid):
if not os.path.exists(LOCAL_DORIS_PATH):
os.makedirs(LOCAL_DORIS_PATH, exist_ok=True)
os.chmod(LOCAL_DORIS_PATH, 0o777)
Expand All @@ -663,7 +705,9 @@ def new(name, image, is_cloud, fe_config, be_config, ms_config,
cluster = Cluster(name, subnet, image, is_cloud, fe_config,
be_config, ms_config, recycle_config,
fe_follower, be_disks, be_cluster, reg_be,
coverage_dir, cloud_store_config)
coverage_dir, cloud_store_config,
sql_mode_node_mgr, be_metaservice_endpoint,
be_cloud_instanceid)
os.makedirs(cluster.get_path(), exist_ok=True)
os.makedirs(get_status_path(name), exist_ok=True)
cluster._save_meta()
Expand Down
Loading
Loading