Skip to content
This repository was archived by the owner on Jun 23, 2022. It is now read-only.

feat(disk): reject write if disk space is insufficient #833

Merged
merged 4 commits into from
May 28, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/dsn/dist/replication/replication_enums.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,9 @@ ENUM_REG(replication::disk_migration_status::MOVING)
ENUM_REG(replication::disk_migration_status::MOVED)
ENUM_REG(replication::disk_migration_status::CLOSED)
ENUM_END2(replication::disk_migration_status::type, disk_migration_status)

ENUM_BEGIN2(replication::disk_status::type, disk_status, replication::disk_status::NORMAL)
ENUM_REG(replication::disk_status::NORMAL)
ENUM_REG(replication::disk_status::SPACE_INSUFFICIENT)
ENUM_END2(replication::disk_status::type, disk_status)
} // namespace dsn
1 change: 1 addition & 0 deletions include/dsn/utility/error_code.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,5 @@ DEFINE_ERR_CODE(ERR_ACL_DENY)
DEFINE_ERR_CODE(ERR_SPLITTING)
DEFINE_ERR_CODE(ERR_PARENT_PARTITION_MISUSED)
DEFINE_ERR_CODE(ERR_CHILD_NOT_READY)
DEFINE_ERR_CODE(ERR_DISK_INSUFFICIENT)
} // namespace dsn
3 changes: 2 additions & 1 deletion src/client/partition_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ void partition_resolver::call_task(const rpc_response_task_ptr &t)
dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp)
{
if (req->header->gpid.value() != 0 && err != ERR_OK && err != ERR_HANDLER_NOT_FOUND &&
err != ERR_APP_NOT_EXIST && err != ERR_OPERATION_DISABLED && err != ERR_BUSY) {
err != ERR_APP_NOT_EXIST && err != ERR_OPERATION_DISABLED && err != ERR_BUSY &&
err != ERR_DISK_INSUFFICIENT) {
on_access_failure(req->header->gpid.get_partition_index(), err);
// still got time, retry
uint64_t nms = dsn_now_ms();
Expand Down
2 changes: 2 additions & 0 deletions src/client/partition_resolver_simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ void partition_resolver_simple::on_access_failure(int partition_index, error_cod
err == ERR_BUSY // busy (rpc busy or throttling busy)
||
err == ERR_SPLITTING // partition is splitting, reject read and write
||
err == ERR_DISK_INSUFFICIENT // replica disk space is insufficient
) {
return;
}
Expand Down
1 change: 1 addition & 0 deletions src/common/consensus.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -208,5 +208,6 @@ struct group_check_response
// Used for pause or cancel partition split
// if secondary pause or cancel split succeed, is_split_stopped = true
8:optional bool is_split_stopped;
9:optional metadata.disk_status disk_status = metadata.disk_status.NORMAL;
}

63 changes: 49 additions & 14 deletions src/common/fs_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@
namespace dsn {
namespace replication {

DSN_DEFINE_bool("replication",
enable_disk_available_space_check,
true,
"check if disk available space ratio below disk_min_available_space_ratio");
DSN_TAG_VARIABLE(enable_disk_available_space_check, FT_MUTABLE);
DSN_DEFINE_int32("replication",
disk_min_available_space_ratio,
10,
"if disk available space ratio "
"is below this value, all "
"replica on this disk will "
"reject client write");
DSN_TAG_VARIABLE(disk_min_available_space_ratio, FT_MUTABLE);

unsigned dir_node::replicas_count() const
{
unsigned sum = 0;
Expand Down Expand Up @@ -75,23 +89,40 @@ unsigned dir_node::remove(const gpid &pid)
return iter->second.erase(pid);
}

void dir_node::update_disk_stat()
void dir_node::update_disk_stat(bool &status_changed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bool dir_node::update_disk_stat()

Copy link
Contributor Author

@hycdong hycdong May 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used to define function like your suggestion. If return false, it seems like update disk stat failed, so I pass status_chaned instead.

{
FAIL_POINT_INJECT_F("update_disk_stat", [](string_view) {});
dsn::utils::filesystem::disk_space_info info;
if (dsn::utils::filesystem::get_disk_space_info(full_dir, info)) {
disk_capacity_mb = info.capacity / 1024 / 1024;
disk_available_mb = info.available / 1024 / 1024;
disk_available_ratio = static_cast<int>(
disk_capacity_mb == 0 ? 0 : std::round(disk_available_mb * 100.0 / disk_capacity_mb));
ddebug_f("update disk space succeed: dir = {}, capacity_mb = {}, available_mb = {}, "
"available_ratio = {}%",
full_dir,
disk_capacity_mb,
disk_available_mb,
disk_available_ratio);
} else {
if (!dsn::utils::filesystem::get_disk_space_info(full_dir, info)) {
derror_f("update disk space failed: dir = {}", full_dir);
return;
}
// update disk space info
disk_capacity_mb = info.capacity / 1024 / 1024;
disk_available_mb = info.available / 1024 / 1024;
disk_available_ratio = static_cast<int>(
disk_capacity_mb == 0 ? 0 : std::round(disk_available_mb * 100.0 / disk_capacity_mb));
ddebug_f("update disk space succeed: dir = {}, capacity_mb = {}, available_mb = {}, "
"available_ratio = {}%",
full_dir,
disk_capacity_mb,
disk_available_mb,
disk_available_ratio);
// disk available space check
if (FLAGS_enable_disk_available_space_check) {
// update disk status
auto old_status = status;
auto new_status = disk_available_ratio < FLAGS_disk_min_available_space_ratio
? disk_status::SPACE_INSUFFICIENT
: disk_status::NORMAL;
if (old_status != new_status) {
status = new_status;
ddebug_f("disk({}) status update from({}) to({})",
full_dir,
enum_to_string(old_status),
enum_to_string(new_status));
}
status_changed = (old_status != new_status);
}
}

Expand Down Expand Up @@ -284,7 +315,11 @@ void fs_manager::update_disk_stat()
{
reset_disk_stat();
for (auto &dir_node : _dir_nodes) {
dir_node->update_disk_stat();
bool status_changed = false;
dir_node->update_disk_stat(status_changed);
if (status_changed) {
_status_updated_dir_nodes.emplace_back(dir_node);
}
_total_capacity_mb += dir_node->disk_capacity_mb;
_total_available_mb += dir_node->disk_available_mb;
_min_available_ratio = std::min(dir_node->disk_available_ratio, _min_available_ratio);
Expand Down
24 changes: 19 additions & 5 deletions src/common/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@

#pragma once

#include <memory>

#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <dsn/service_api_cpp.h>
#include <dsn/tool-api/zlocks.h>
#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <memory>
#include <dsn/utility/flags.h>

#include "replication_common.h"

namespace dsn {
namespace replication {

DSN_DECLARE_bool(enable_disk_available_space_check);
DSN_DECLARE_int32(disk_min_available_space_ratio);

struct dir_node
{
public:
Expand All @@ -34,6 +40,7 @@ struct dir_node
int64_t disk_capacity_mb;
int64_t disk_available_mb;
int disk_available_ratio;
disk_status::type status;
std::map<app_id, std::set<gpid>> holding_replicas;
std::map<app_id, std::set<gpid>> holding_primary_replicas;
std::map<app_id, std::set<gpid>> holding_secondary_replicas;
Expand All @@ -43,19 +50,21 @@ struct dir_node
const std::string &dir_,
int64_t disk_capacity_mb_ = 0,
int64_t disk_available_mb_ = 0,
int disk_available_ratio_ = 0)
int disk_available_ratio_ = 0,
disk_status::type status_ = disk_status::NORMAL)
: tag(tag_),
full_dir(dir_),
disk_capacity_mb(disk_capacity_mb_),
disk_available_mb(disk_available_mb_),
disk_available_ratio(disk_available_ratio_)
disk_available_ratio(disk_available_ratio_),
status(status_)
{
}
unsigned replicas_count(app_id id) const;
unsigned replicas_count() const;
bool has(const dsn::gpid &pid) const;
unsigned remove(const dsn::gpid &pid);
void update_disk_stat();
void update_disk_stat(bool &status_changed);
};

class fs_manager
Expand Down Expand Up @@ -87,6 +96,7 @@ class fs_manager
_total_available_ratio = 0;
_min_available_ratio = 100;
_max_available_ratio = 0;
_status_updated_dir_nodes.clear();
}

dir_node *get_dir_node(const std::string &subdir);
Expand All @@ -102,6 +112,10 @@ class fs_manager
int _max_available_ratio = 0;

std::vector<std::shared_ptr<dir_node>> _dir_nodes;
// Used for disk available space check
// disk status will be updated periodically, this vector record nodes whose disk_status changed
// in this round
std::vector<std::shared_ptr<dir_node>> _status_updated_dir_nodes;

perf_counter_wrapper _counter_total_capacity_mb;
perf_counter_wrapper _counter_total_available_mb;
Expand Down
6 changes: 6 additions & 0 deletions src/common/metadata.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ enum split_status
CANCELING
}

enum disk_status
{
NORMAL = 0,
SPACE_INSUFFICIENT
}

// Used for cold backup and bulk load
struct file_meta
{
Expand Down
1 change: 0 additions & 1 deletion src/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,5 @@ class bulk_load_constant
static const std::string BULK_LOAD_LOCAL_ROOT_DIR;
static const int32_t PROGRESS_FINISHED;
};

} // namespace replication
} // namespace dsn
6 changes: 6 additions & 0 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// routine for get extra envs from replica
const std::map<std::string, std::string> &get_replica_extra_envs() const { return _extra_envs; }

void set_disk_status(disk_status::type status) { _disk_status = status; }
bool disk_space_insufficient() { return _disk_status == disk_status::SPACE_INSUFFICIENT; }
disk_status::type get_disk_status() { return _disk_status; }

protected:
// this method is marked protected to enable us to mock it in unit tests.
virtual decree max_gced_decree_no_lock() const;
Expand Down Expand Up @@ -566,6 +570,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
dsn::thread_access_checker _checker;

std::unique_ptr<security::access_controller> _access_controller;

disk_status::type _disk_status;
};
typedef dsn::ref_ptr<replica> replica_ptr;
} // namespace replication
Expand Down
5 changes: 5 additions & 0 deletions src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
return;
}

if (disk_space_insufficient() || _primary_states.secondary_disk_space_insufficient()) {
response_client_write(request, ERR_DISK_INSUFFICIENT);
return;
}

if (_is_bulk_load_ingestion) {
if (request->rpc_code() != dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) {
// reject write requests during ingestion
Expand Down
4 changes: 4 additions & 0 deletions src/replica/replica_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ void replica::on_group_check(const group_check_request &request,
}
// the group check may trigger start/finish/cancel/pause a split on the secondary.
_split_mgr->trigger_secondary_parent_split(request, response);
response.__set_disk_status(_disk_status);
break;
case partition_status::PS_POTENTIAL_SECONDARY:
init_learn(request.config.learner_signature);
Expand Down Expand Up @@ -235,6 +236,9 @@ void replica::on_group_check_reply(error_code err,
handle_learning_succeeded_on_primary(req->node, resp->learner_signature);
}
_split_mgr->primary_parent_handle_stop_split(req, resp);
if (req->config.status == partition_status::PS_SECONDARY) {
_primary_states.secondary_disk_status[req->node] = resp->disk_status;
}
}
}

Expand Down
15 changes: 15 additions & 0 deletions src/replica/replica_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ void primary_context::cleanup(bool clean_pending_mutations)
cleanup_bulk_load_states();

cleanup_split_states();

secondary_disk_status.clear();
}

bool primary_context::is_cleaned()
Expand Down Expand Up @@ -176,6 +178,19 @@ void primary_context::cleanup_split_states()
split_stopped_secondary.clear();
}

bool primary_context::secondary_disk_space_insufficient() const
{
for (const auto &kv : secondary_disk_status) {
if (kv.second == disk_status::SPACE_INSUFFICIENT) {
ddebug_f("partition[{}] secondary[{}] disk space is insufficient",
membership.pid,
kv.first.to_string());
return true;
}
}
return false;
}

bool secondary_context::cleanup(bool force)
{
CLEANUP_TASK(checkpoint_task, force)
Expand Down
5 changes: 5 additions & 0 deletions src/replica/replica_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ class primary_context

void cleanup_split_states();

bool secondary_disk_space_insufficient() const;

public:
// membership mgr, including learners
partition_configuration membership;
Expand Down Expand Up @@ -171,6 +173,9 @@ class primary_context
// if primary send an empty prepare after ingestion succeed to gurantee secondary commit its
// ingestion request
bool ingestion_is_empty_prepare_sent{false};

// secondary rpc_address -> secondary disk_status
std::unordered_map<rpc_address, disk_status::type> secondary_disk_status;
};

class secondary_context
Expand Down
20 changes: 20 additions & 0 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1863,6 +1863,7 @@ void replica_stub::on_disk_stat()
dsn::replication::disk_remove_useless_dirs(_options.data_dirs, report);
_fs_manager.update_disk_stat();
update_disk_holding_replicas();
update_disks_status();

_counter_replicas_error_replica_dir_count->set(report.error_replica_count);
_counter_replicas_garbage_replica_dir_count->set(report.garbage_replica_count);
Expand Down Expand Up @@ -2859,5 +2860,24 @@ void replica_stub::query_app_manual_compact_status(
}
}

void replica_stub::update_disks_status()
{
for (const auto &dir_node : _fs_manager._status_updated_dir_nodes) {
for (const auto &holding_replicas : dir_node->holding_replicas) {
const std::set<gpid> &pids = holding_replicas.second;
for (const auto &pid : pids) {
replica_ptr replica = get_replica(pid);
if (replica == nullptr) {
continue;
}
replica->set_disk_status(dir_node->status);
ddebug_f("{} update disk_status to {}",
replica->name(),
enum_to_string(replica->get_disk_status()));
}
}
}
}

} // namespace replication
} // namespace dsn
2 changes: 2 additions & 0 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
error_code error);
void update_disk_holding_replicas();

void update_disks_status();

void register_ctrl_command();

int get_app_id_from_replicas(std::string app_name)
Expand Down
26 changes: 26 additions & 0 deletions src/replica/test/replica_disk_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,5 +198,31 @@ TEST_F(replica_disk_test, gc_disk_useless_dir)
ASSERT_EQ(report.error_replica_count, 2);
}

TEST_F(replica_disk_test, disk_status_test)
{
int32_t node_index = 0;
struct disk_status_test
{
disk_status::type old_status;
disk_status::type new_status;
} tests[]{{disk_status::NORMAL, disk_status::NORMAL},
{disk_status::NORMAL, disk_status::SPACE_INSUFFICIENT},
{disk_status::SPACE_INSUFFICIENT, disk_status::SPACE_INSUFFICIENT},
{disk_status::SPACE_INSUFFICIENT, disk_status::NORMAL}};
for (const auto &test : tests) {
auto node = get_dir_nodes()[node_index];
mock_node_status(node_index, test.old_status, test.new_status);
update_disks_status();
for (auto &kv : node->holding_replicas) {
for (auto &pid : kv.second) {
bool flag;
ASSERT_EQ(replica_disk_space_insufficient(pid, flag), ERR_OK);
ASSERT_EQ(flag, test.new_status == disk_status::SPACE_INSUFFICIENT);
}
}
}
mock_node_status(node_index, disk_status::NORMAL, disk_status::NORMAL);
}

} // namespace replication
} // namespace dsn
Loading