Skip to content
This repository was archived by the owner on Jun 23, 2022. It is now read-only.

feat(disk): reject write if disk space is insufficient #833

Merged
merged 4 commits into from
May 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/dsn/dist/replication/replication_enums.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,9 @@ ENUM_REG(replication::disk_migration_status::MOVING)
ENUM_REG(replication::disk_migration_status::MOVED)
ENUM_REG(replication::disk_migration_status::CLOSED)
ENUM_END2(replication::disk_migration_status::type, disk_migration_status)

ENUM_BEGIN2(replication::disk_status::type, disk_status, replication::disk_status::NORMAL)
ENUM_REG(replication::disk_status::NORMAL)
ENUM_REG(replication::disk_status::SPACE_INSUFFICIENT)
ENUM_END2(replication::disk_status::type, disk_status)
} // namespace dsn
1 change: 1 addition & 0 deletions include/dsn/utility/error_code.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,5 @@ DEFINE_ERR_CODE(ERR_ACL_DENY)
DEFINE_ERR_CODE(ERR_SPLITTING)
DEFINE_ERR_CODE(ERR_PARENT_PARTITION_MISUSED)
DEFINE_ERR_CODE(ERR_CHILD_NOT_READY)
DEFINE_ERR_CODE(ERR_DISK_INSUFFICIENT)
} // namespace dsn
2 changes: 2 additions & 0 deletions src/client/partition_resolver_simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ void partition_resolver_simple::on_access_failure(int partition_index, error_cod
err == ERR_BUSY // busy (rpc busy or throttling busy)
||
err == ERR_SPLITTING // partition is splitting, reject read and write
||
err == ERR_DISK_INSUFFICIENT // replica disk space is insufficient
) {
return;
}
Expand Down
1 change: 1 addition & 0 deletions src/common/consensus.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -208,5 +208,6 @@ struct group_check_response
// Used for pause or cancel partition split
// if secondary pause or cancel split succeed, is_split_stopped = true
8:optional bool is_split_stopped;
9:optional metadata.disk_status disk_status = metadata.disk_status.NORMAL;
}

63 changes: 49 additions & 14 deletions src/common/fs_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@
namespace dsn {
namespace replication {

DSN_DEFINE_bool("replication",
enable_disk_available_space_check,
true,
"check if disk available space ratio below disk_min_available_space_ratio");
DSN_TAG_VARIABLE(enable_disk_available_space_check, FT_MUTABLE);
DSN_DEFINE_int32("replication",
disk_min_available_space_ratio,
10,
"if disk available space ratio "
"is below this value, all "
"replica on this disk will "
"reject client write");
DSN_TAG_VARIABLE(disk_min_available_space_ratio, FT_MUTABLE);

unsigned dir_node::replicas_count() const
{
unsigned sum = 0;
Expand Down Expand Up @@ -75,23 +89,40 @@ unsigned dir_node::remove(const gpid &pid)
return iter->second.erase(pid);
}

void dir_node::update_disk_stat()
void dir_node::update_disk_stat(bool &status_changed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bool dir_node::update_disk_stat()

Copy link
Contributor Author

@hycdong hycdong May 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used to define function like your suggestion. If return false, it seems like update disk stat failed, so I pass status_chaned instead.

{
FAIL_POINT_INJECT_F("update_disk_stat", [](string_view) {});
dsn::utils::filesystem::disk_space_info info;
if (dsn::utils::filesystem::get_disk_space_info(full_dir, info)) {
disk_capacity_mb = info.capacity / 1024 / 1024;
disk_available_mb = info.available / 1024 / 1024;
disk_available_ratio = static_cast<int>(
disk_capacity_mb == 0 ? 0 : std::round(disk_available_mb * 100.0 / disk_capacity_mb));
ddebug_f("update disk space succeed: dir = {}, capacity_mb = {}, available_mb = {}, "
"available_ratio = {}%",
full_dir,
disk_capacity_mb,
disk_available_mb,
disk_available_ratio);
} else {
if (!dsn::utils::filesystem::get_disk_space_info(full_dir, info)) {
derror_f("update disk space failed: dir = {}", full_dir);
return;
}
// update disk space info
disk_capacity_mb = info.capacity / 1024 / 1024;
disk_available_mb = info.available / 1024 / 1024;
disk_available_ratio = static_cast<int>(
disk_capacity_mb == 0 ? 0 : std::round(disk_available_mb * 100.0 / disk_capacity_mb));
ddebug_f("update disk space succeed: dir = {}, capacity_mb = {}, available_mb = {}, "
"available_ratio = {}%",
full_dir,
disk_capacity_mb,
disk_available_mb,
disk_available_ratio);
// disk available space check
if (FLAGS_enable_disk_available_space_check) {
// update disk status
auto old_status = status;
auto new_status = disk_available_ratio < FLAGS_disk_min_available_space_ratio
? disk_status::SPACE_INSUFFICIENT
: disk_status::NORMAL;
if (old_status != new_status) {
status = new_status;
ddebug_f("disk({}) status update from({}) to({})",
full_dir,
enum_to_string(old_status),
enum_to_string(new_status));
}
status_changed = (old_status != new_status);
}
}

Expand Down Expand Up @@ -284,7 +315,11 @@ void fs_manager::update_disk_stat()
{
reset_disk_stat();
for (auto &dir_node : _dir_nodes) {
dir_node->update_disk_stat();
bool status_changed = false;
dir_node->update_disk_stat(status_changed);
if (status_changed) {
_status_updated_dir_nodes.emplace_back(dir_node);
}
_total_capacity_mb += dir_node->disk_capacity_mb;
_total_available_mb += dir_node->disk_available_mb;
_min_available_ratio = std::min(dir_node->disk_available_ratio, _min_available_ratio);
Expand Down
24 changes: 19 additions & 5 deletions src/common/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@

#pragma once

#include <memory>

#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <dsn/service_api_cpp.h>
#include <dsn/tool-api/zlocks.h>
#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <memory>
#include <dsn/utility/flags.h>

#include "replication_common.h"

namespace dsn {
namespace replication {

DSN_DECLARE_bool(enable_disk_available_space_check);
DSN_DECLARE_int32(disk_min_available_space_ratio);

struct dir_node
{
public:
Expand All @@ -34,6 +40,7 @@ struct dir_node
int64_t disk_capacity_mb;
int64_t disk_available_mb;
int disk_available_ratio;
disk_status::type status;
std::map<app_id, std::set<gpid>> holding_replicas;
std::map<app_id, std::set<gpid>> holding_primary_replicas;
std::map<app_id, std::set<gpid>> holding_secondary_replicas;
Expand All @@ -43,19 +50,21 @@ struct dir_node
const std::string &dir_,
int64_t disk_capacity_mb_ = 0,
int64_t disk_available_mb_ = 0,
int disk_available_ratio_ = 0)
int disk_available_ratio_ = 0,
disk_status::type status_ = disk_status::NORMAL)
: tag(tag_),
full_dir(dir_),
disk_capacity_mb(disk_capacity_mb_),
disk_available_mb(disk_available_mb_),
disk_available_ratio(disk_available_ratio_)
disk_available_ratio(disk_available_ratio_),
status(status_)
{
}
unsigned replicas_count(app_id id) const;
unsigned replicas_count() const;
bool has(const dsn::gpid &pid) const;
unsigned remove(const dsn::gpid &pid);
void update_disk_stat();
void update_disk_stat(bool &status_changed);
};

class fs_manager
Expand Down Expand Up @@ -87,6 +96,7 @@ class fs_manager
_total_available_ratio = 0;
_min_available_ratio = 100;
_max_available_ratio = 0;
_status_updated_dir_nodes.clear();
}

dir_node *get_dir_node(const std::string &subdir);
Expand All @@ -102,6 +112,10 @@ class fs_manager
int _max_available_ratio = 0;

std::vector<std::shared_ptr<dir_node>> _dir_nodes;
// Used for disk available space check
// disk status will be updated periodically, this vector record nodes whose disk_status changed
// in this round
std::vector<std::shared_ptr<dir_node>> _status_updated_dir_nodes;

perf_counter_wrapper _counter_total_capacity_mb;
perf_counter_wrapper _counter_total_available_mb;
Expand Down
6 changes: 6 additions & 0 deletions src/common/metadata.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ enum split_status
CANCELING
}

enum disk_status
{
NORMAL = 0,
SPACE_INSUFFICIENT
}

// Used for cold backup and bulk load
struct file_meta
{
Expand Down
1 change: 0 additions & 1 deletion src/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,5 @@ class bulk_load_constant
static const std::string BULK_LOAD_LOCAL_ROOT_DIR;
static const int32_t PROGRESS_FINISHED;
};

} // namespace replication
} // namespace dsn
6 changes: 6 additions & 0 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// routine for get extra envs from replica
const std::map<std::string, std::string> &get_replica_extra_envs() const { return _extra_envs; }

void set_disk_status(disk_status::type status) { _disk_status = status; }
bool disk_space_insufficient() { return _disk_status == disk_status::SPACE_INSUFFICIENT; }
disk_status::type get_disk_status() { return _disk_status; }

protected:
// this method is marked protected to enable us to mock it in unit tests.
virtual decree max_gced_decree_no_lock() const;
Expand Down Expand Up @@ -566,6 +570,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
dsn::thread_access_checker _checker;

std::unique_ptr<security::access_controller> _access_controller;

disk_status::type _disk_status;
};
typedef dsn::ref_ptr<replica> replica_ptr;
} // namespace replication
Expand Down
5 changes: 5 additions & 0 deletions src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
return;
}

if (disk_space_insufficient() || _primary_states.secondary_disk_space_insufficient()) {
response_client_write(request, ERR_DISK_INSUFFICIENT);
return;
}

if (_is_bulk_load_ingestion) {
if (request->rpc_code() != dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) {
// reject write requests during ingestion
Expand Down
4 changes: 4 additions & 0 deletions src/replica/replica_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ void replica::on_group_check(const group_check_request &request,
}
// the group check may trigger start/finish/cancel/pause a split on the secondary.
_split_mgr->trigger_secondary_parent_split(request, response);
response.__set_disk_status(_disk_status);
break;
case partition_status::PS_POTENTIAL_SECONDARY:
init_learn(request.config.learner_signature);
Expand Down Expand Up @@ -235,6 +236,9 @@ void replica::on_group_check_reply(error_code err,
handle_learning_succeeded_on_primary(req->node, resp->learner_signature);
}
_split_mgr->primary_parent_handle_stop_split(req, resp);
if (req->config.status == partition_status::PS_SECONDARY) {
_primary_states.secondary_disk_status[req->node] = resp->disk_status;
}
}
}

Expand Down
15 changes: 15 additions & 0 deletions src/replica/replica_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ void primary_context::cleanup(bool clean_pending_mutations)
cleanup_bulk_load_states();

cleanup_split_states();

secondary_disk_status.clear();
}

bool primary_context::is_cleaned()
Expand Down Expand Up @@ -176,6 +178,19 @@ void primary_context::cleanup_split_states()
split_stopped_secondary.clear();
}

bool primary_context::secondary_disk_space_insufficient() const
{
for (const auto &kv : secondary_disk_status) {
if (kv.second == disk_status::SPACE_INSUFFICIENT) {
ddebug_f("partition[{}] secondary[{}] disk space is insufficient",
membership.pid,
kv.first.to_string());
return true;
}
}
return false;
}

bool secondary_context::cleanup(bool force)
{
CLEANUP_TASK(checkpoint_task, force)
Expand Down
5 changes: 5 additions & 0 deletions src/replica/replica_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ class primary_context

void cleanup_split_states();

bool secondary_disk_space_insufficient() const;

public:
// membership mgr, including learners
partition_configuration membership;
Expand Down Expand Up @@ -171,6 +173,9 @@ class primary_context
// if primary send an empty prepare after ingestion succeed to gurantee secondary commit its
// ingestion request
bool ingestion_is_empty_prepare_sent{false};

// secondary rpc_address -> secondary disk_status
std::unordered_map<rpc_address, disk_status::type> secondary_disk_status;
};

class secondary_context
Expand Down
20 changes: 20 additions & 0 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1863,6 +1863,7 @@ void replica_stub::on_disk_stat()
dsn::replication::disk_remove_useless_dirs(_options.data_dirs, report);
_fs_manager.update_disk_stat();
update_disk_holding_replicas();
update_disks_status();

_counter_replicas_error_replica_dir_count->set(report.error_replica_count);
_counter_replicas_garbage_replica_dir_count->set(report.garbage_replica_count);
Expand Down Expand Up @@ -2859,5 +2860,24 @@ void replica_stub::query_app_manual_compact_status(
}
}

void replica_stub::update_disks_status()
{
for (const auto &dir_node : _fs_manager._status_updated_dir_nodes) {
for (const auto &holding_replicas : dir_node->holding_replicas) {
const std::set<gpid> &pids = holding_replicas.second;
for (const auto &pid : pids) {
replica_ptr replica = get_replica(pid);
if (replica == nullptr) {
continue;
}
replica->set_disk_status(dir_node->status);
ddebug_f("{} update disk_status to {}",
replica->name(),
enum_to_string(replica->get_disk_status()));
}
}
}
}

} // namespace replication
} // namespace dsn
2 changes: 2 additions & 0 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
error_code error);
void update_disk_holding_replicas();

void update_disks_status();

void register_ctrl_command();

int get_app_id_from_replicas(std::string app_name)
Expand Down
26 changes: 26 additions & 0 deletions src/replica/test/replica_disk_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,5 +198,31 @@ TEST_F(replica_disk_test, gc_disk_useless_dir)
ASSERT_EQ(report.error_replica_count, 2);
}

TEST_F(replica_disk_test, disk_status_test)
{
int32_t node_index = 0;
struct disk_status_test
{
disk_status::type old_status;
disk_status::type new_status;
} tests[]{{disk_status::NORMAL, disk_status::NORMAL},
{disk_status::NORMAL, disk_status::SPACE_INSUFFICIENT},
{disk_status::SPACE_INSUFFICIENT, disk_status::SPACE_INSUFFICIENT},
{disk_status::SPACE_INSUFFICIENT, disk_status::NORMAL}};
for (const auto &test : tests) {
auto node = get_dir_nodes()[node_index];
mock_node_status(node_index, test.old_status, test.new_status);
update_disks_status();
for (auto &kv : node->holding_replicas) {
for (auto &pid : kv.second) {
bool flag;
ASSERT_EQ(replica_disk_space_insufficient(pid, flag), ERR_OK);
ASSERT_EQ(flag, test.new_status == disk_status::SPACE_INSUFFICIENT);
}
}
}
mock_node_status(node_index, disk_status::NORMAL, disk_status::NORMAL);
}

} // namespace replication
} // namespace dsn
Loading