diff --git a/src/include/daos/common.h b/src/include/daos/common.h index db968ac7449..a9d93673f73 100644 --- a/src/include/daos/common.h +++ b/src/include/daos/common.h @@ -1,5 +1,5 @@ /** - * (C) Copyright 2015-2023 Intel Corporation. + * (C) Copyright 2015-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ diff --git a/src/object/srv_internal.h b/src/object/srv_internal.h index a2ae1ad2d44..902eb48caf1 100644 --- a/src/object/srv_internal.h +++ b/src/object/srv_internal.h @@ -40,9 +40,12 @@ struct migrate_pool_tls { * should provide the pool/handle uuid */ uuid_t mpt_poh_uuid; - uuid_t mpt_coh_uuid; daos_handle_t mpt_pool_hdl; + /* container handle list for the migrate pool */ + uuid_t mpt_coh_uuid; + d_list_t mpt_cont_hdl_list; + /* Container/objects to be migrated will be attached to the tree */ daos_handle_t mpt_root_hdl; struct btr_root mpt_root; @@ -66,17 +69,15 @@ struct migrate_pool_tls { /* Max epoch for the migration, used for migrate fetch RPC */ uint64_t mpt_max_eph; - /* The ULT number generated on the xstream */ - uint64_t mpt_generated_ult; - - /* The ULT number executed on the xstream */ - uint64_t mpt_executed_ult; - - /* The ULT number generated for object on the xstream */ - uint64_t mpt_obj_generated_ult; + /* The ULT number on each target xstream, which actually refer + * back to the item within mpt_obj/dkey_ult_cnts array. + */ + ATOMIC uint32_t *mpt_tgt_obj_ult_cnt; + ATOMIC uint32_t *mpt_tgt_dkey_ult_cnt; - /* The ULT number executed on the xstream */ - uint64_t mpt_obj_executed_ult; + /* ULT count array from all targets, obj: enumeration, dkey:fetch/update */ + ATOMIC uint32_t *mpt_obj_ult_cnts; + ATOMIC uint32_t *mpt_dkey_ult_cnts; /* reference count for the structure */ uint64_t mpt_refcount; @@ -88,7 +89,7 @@ struct migrate_pool_tls { uint64_t mpt_inflight_max_size; ABT_cond mpt_inflight_cond; ABT_mutex mpt_inflight_mutex; - int mpt_inflight_max_ult; + uint32_t mpt_inflight_max_ult; uint32_t mpt_opc; ABT_cond mpt_init_cond; @@ -104,6 +105,12 @@ struct migrate_pool_tls { mpt_fini:1; }; +struct migrate_cont_hdl { + uuid_t mch_uuid; + daos_handle_t mch_hdl; + d_list_t mch_list; +}; + struct obj_bulk_args { ABT_eventual eventual; uint64_t bulk_size; diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index 4b8587cecb7..6785dca9dec 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -36,8 +36,8 @@ */ #define MIGRATE_MAX_SIZE (1 << 28) /* Max migrate ULT number on the server */ -#define MIGRATE_MAX_ULT 512 - +#define MIGRATE_DEFAULT_MAX_ULT 4096 +#define ENV_MIGRATE_ULT_CNT "D_MIGRATE_ULT_CNT" struct migrate_one { daos_key_t mo_dkey; uint64_t mo_dkey_hash; @@ -324,11 +324,67 @@ obj_tree_insert(daos_handle_t toh, uuid_t co_uuid, uint64_t tgt_id, daos_unit_oi return rc; } +static int +migrate_cont_open(struct migrate_pool_tls *tls, uuid_t cont_uuid, unsigned int flag, + daos_handle_t *coh) +{ + struct migrate_cont_hdl *mch; + int rc; + + d_list_for_each_entry(mch, &tls->mpt_cont_hdl_list, mch_list) { + if (uuid_compare(mch->mch_uuid, cont_uuid) == 0) { + *coh = mch->mch_hdl; + return 0; + } + } + + rc = dsc_cont_open(tls->mpt_pool_hdl, cont_uuid, tls->mpt_coh_uuid, flag, coh); + if (rc) { + D_ERROR("dsc_cont_open failed: "DF_RC"\n", DP_RC(rc)); + return rc; + } + + D_ALLOC_PTR(mch); + if (mch == NULL) { + dsc_cont_close(tls->mpt_pool_hdl, *coh); + *coh = DAOS_HDL_INVAL; + return -DER_NOMEM; + } + + d_list_add(&mch->mch_list, &tls->mpt_cont_hdl_list); + uuid_copy(mch->mch_uuid, cont_uuid); + mch->mch_hdl = *coh; + + return 0; +} + +static void +migrate_cont_close_all(struct migrate_pool_tls *tls) +{ + struct migrate_cont_hdl *mch; + struct migrate_cont_hdl *tmp; + + d_list_for_each_entry_safe(mch, tmp, &tls->mpt_cont_hdl_list, mch_list) { + dsc_cont_close(tls->mpt_pool_hdl, mch->mch_hdl); + d_list_del(&mch->mch_list); + D_FREE(mch); + } +} + void migrate_pool_tls_destroy(struct migrate_pool_tls *tls) { if (!tls) return; + + migrate_cont_close_all(tls); + if (daos_handle_is_valid(tls->mpt_pool_hdl)) + dsc_pool_close(tls->mpt_pool_hdl); + + if (tls->mpt_obj_ult_cnts) + D_FREE(tls->mpt_obj_ult_cnts); + if (tls->mpt_dkey_ult_cnts) + D_FREE(tls->mpt_dkey_ult_cnts); d_list_del(&tls->mpt_list); D_DEBUG(DB_REBUILD, "TLS destroy for "DF_UUID" ver %d\n", DP_UUID(tls->mpt_pool_uuid), tls->mpt_version); @@ -400,13 +456,14 @@ struct migrate_pool_tls_create_arg { uuid_t pool_hdl_uuid; uuid_t co_hdl_uuid; d_rank_list_t *svc_list; + ATOMIC uint32_t *obj_ult_cnts; + ATOMIC uint32_t *dkey_ult_cnts; uint64_t max_eph; unsigned int version; unsigned int generation; uint32_t opc; uint32_t new_layout_ver; - /* for sys sstream */ - bool track_status; + uint32_t max_ult_cnt; }; int @@ -430,6 +487,8 @@ migrate_pool_tls_create_one(void *data) if (pool_tls == NULL) D_GOTO(out, rc = -DER_NOMEM); + D_INIT_LIST_HEAD(&pool_tls->mpt_cont_hdl_list); + pool_tls->mpt_pool_hdl = DAOS_HDL_INVAL; D_INIT_LIST_HEAD(&pool_tls->mpt_list); rc = ABT_eventual_create(0, &pool_tls->mpt_done_eventual); @@ -452,20 +511,35 @@ migrate_pool_tls_create_one(void *data) pool_tls->mpt_rec_count = 0; pool_tls->mpt_obj_count = 0; pool_tls->mpt_size = 0; - pool_tls->mpt_generated_ult = 0; - pool_tls->mpt_executed_ult = 0; pool_tls->mpt_root_hdl = DAOS_HDL_INVAL; pool_tls->mpt_max_eph = arg->max_eph; - /* @mpt_pool is never used on system xstream */ - if (arg->track_status == false) { + pool_tls->mpt_new_layout_ver = arg->new_layout_ver; + pool_tls->mpt_opc = arg->opc; + if (dss_get_module_info()->dmi_xs_id == 0) { + int i; + + pool_tls->mpt_inflight_max_size = MIGRATE_MAX_SIZE; + pool_tls->mpt_inflight_max_ult = arg->max_ult_cnt; + D_ALLOC_ARRAY(pool_tls->mpt_obj_ult_cnts, dss_tgt_nr); + D_ALLOC_ARRAY(pool_tls->mpt_dkey_ult_cnts, dss_tgt_nr); + if (pool_tls->mpt_obj_ult_cnts == NULL || pool_tls->mpt_dkey_ult_cnts == NULL) + D_GOTO(out, rc = -DER_NOMEM); + for (i = 0; i < dss_tgt_nr; i++) { + atomic_init(&pool_tls->mpt_obj_ult_cnts[i], 0); + atomic_init(&pool_tls->mpt_dkey_ult_cnts[i], 0); + } + } else { + int tgt_id = dss_get_module_info()->dmi_tgt_id; + pool_tls->mpt_pool = ds_pool_child_lookup(arg->pool_uuid); if (pool_tls->mpt_pool == NULL) D_GOTO(out, rc = -DER_NO_HDL); + pool_tls->mpt_inflight_max_size = MIGRATE_MAX_SIZE / dss_tgt_nr; + pool_tls->mpt_inflight_max_ult = arg->max_ult_cnt / dss_tgt_nr; + pool_tls->mpt_tgt_obj_ult_cnt = &arg->obj_ult_cnts[tgt_id]; + pool_tls->mpt_tgt_dkey_ult_cnt = &arg->dkey_ult_cnts[tgt_id]; } - pool_tls->mpt_new_layout_ver = arg->new_layout_ver; - pool_tls->mpt_opc = arg->opc; - pool_tls->mpt_inflight_max_size = MIGRATE_MAX_SIZE; - pool_tls->mpt_inflight_max_ult = MIGRATE_MAX_ULT; + pool_tls->mpt_inflight_size = 0; pool_tls->mpt_refcount = 1; if (arg->svc_list) { @@ -496,6 +570,7 @@ migrate_pool_tls_lookup_create(struct ds_pool *pool, unsigned int version, unsig daos_prop_t *prop = NULL; struct daos_prop_entry *entry; int rc = 0; + uint32_t max_migrate_ult = MIGRATE_DEFAULT_MAX_ULT; D_ASSERT(dss_get_module_info()->dmi_xs_id == 0); tls = migrate_pool_tls_lookup(pool->sp_uuid, version, generation); @@ -516,6 +591,7 @@ migrate_pool_tls_lookup_create(struct ds_pool *pool, unsigned int version, unsig return rc; } + d_getenv_uint(ENV_MIGRATE_ULT_CNT, &max_migrate_ult); D_ASSERT(generation != (unsigned int)(-1)); uuid_copy(arg.pool_uuid, pool->sp_uuid); uuid_copy(arg.pool_hdl_uuid, pool_hdl_uuid); @@ -525,11 +601,12 @@ migrate_pool_tls_lookup_create(struct ds_pool *pool, unsigned int version, unsig arg.max_eph = max_eph; arg.new_layout_ver = new_layout_ver; arg.generation = generation; + arg.max_ult_cnt = max_migrate_ult; + /* * dss_task_collective does not do collective on sys xstrem, * sys xstream need some information to track rebuild status. */ - arg.track_status = true; rc = migrate_pool_tls_create_one(&arg); if (rc) D_GOTO(out, rc); @@ -558,7 +635,8 @@ migrate_pool_tls_lookup_create(struct ds_pool *pool, unsigned int version, unsig entry = daos_prop_entry_get(prop, DAOS_PROP_PO_SVC_LIST); D_ASSERT(entry != NULL); arg.svc_list = (d_rank_list_t *)entry->dpe_val_ptr; - arg.track_status = false; + arg.obj_ult_cnts = tls->mpt_obj_ult_cnts; + arg.dkey_ult_cnts = tls->mpt_dkey_ult_cnts; rc = dss_task_collective(migrate_pool_tls_create_one, &arg, 0); if (rc != 0) { D_ERROR(DF_UUID": failed to create migrate tls: "DF_RC"\n", @@ -647,6 +725,29 @@ mrone_recx_vos2_daos(struct migrate_one *mrone, int shard, daos_iod_t *iods, int mrone_recx_daos_vos_internal(mrone, false, shard, iods, iods_num); } +static int +mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_t *sgls, + daos_iod_t *iods, int iod_num, daos_epoch_t eph, uint32_t flags, + d_iov_t *csum_iov_fetch, struct migrate_pool_tls *tls) +{ + int rc; + +retry: + rc = dsc_obj_fetch(oh, eph, &mrone->mo_dkey, iod_num, iods, sgls, + NULL, flags, NULL, csum_iov_fetch); + if (rc == -DER_TIMEDOUT && + tls->mpt_version + 1 >= tls->mpt_pool->spc_map_version) { + /* If pool map does not change, then let's retry for timeout, instead of + * fail out. + */ + D_WARN(DF_UUID" retry "DF_UOID" "DF_RC"\n", + DP_UUID(tls->mpt_pool_uuid), DP_UOID(mrone->mo_oid), DP_RC(rc)); + D_GOTO(retry, rc); + } + + return rc; +} + static int mrone_obj_fetch(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_t *sgls, daos_iod_t *iods, int iod_num, daos_epoch_t eph, uint32_t flags, @@ -666,9 +767,8 @@ mrone_obj_fetch(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_t *sgls, if (daos_oclass_grp_size(&mrone->mo_oca) > 1) flags |= DIOF_TO_LEADER; - rc = dsc_obj_fetch(oh, eph, &mrone->mo_dkey, - iod_num, iods, sgls, NULL, - flags, NULL, csum_iov_fetch); + rc = mrone_obj_fetch_internal(mrone, oh, sgls, iods, iod_num, eph, + flags, csum_iov_fetch, tls); if (rc != 0) D_GOTO(out, rc); @@ -687,8 +787,8 @@ mrone_obj_fetch(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_t *sgls, csum_iov_fetch->iov_len = 0; csum_iov_fetch->iov_buf = p; - rc = dsc_obj_fetch(oh, eph, &mrone->mo_dkey, iod_num, iods, sgls, - NULL, flags, NULL, csum_iov_fetch); + rc = mrone_obj_fetch_internal(mrone, oh, sgls, iods, iod_num, + eph, flags, csum_iov_fetch, tls); } out: @@ -1623,7 +1723,6 @@ migrate_dkey(struct migrate_pool_tls *tls, struct migrate_one *mrone, { struct ds_cont_child *cont = NULL; struct cont_props props; - daos_handle_t poh = DAOS_HDL_INVAL; daos_handle_t coh = DAOS_HDL_INVAL; daos_handle_t oh = DAOS_HDL_INVAL; int rc; @@ -1635,20 +1734,19 @@ migrate_dkey(struct migrate_pool_tls *tls, struct migrate_one *mrone, rc = dsc_pool_open(tls->mpt_pool_uuid, tls->mpt_poh_uuid, 0, NULL, tls->mpt_pool->spc_pool->sp_map, - &tls->mpt_svc_list, &poh); + &tls->mpt_svc_list, &tls->mpt_pool_hdl); if (rc) D_GOTO(cont_put, rc); /* Open client dc handle used to read the remote object data */ - rc = dsc_cont_open(poh, mrone->mo_cont_uuid, tls->mpt_coh_uuid, 0, - &coh); + rc = migrate_cont_open(tls, mrone->mo_cont_uuid, 0, &coh); if (rc) - D_GOTO(pool_close, rc); + D_GOTO(cont_put, rc); /* Open the remote object */ rc = dsc_obj_open(coh, mrone->mo_oid.id_pub, DAOS_OO_RO, &oh); if (rc) - D_GOTO(cont_close, rc); + D_GOTO(cont_put, rc); if (DAOS_FAIL_CHECK(DAOS_REBUILD_TGT_NOSPACE)) D_GOTO(obj_close, rc = -DER_NOSPACE); @@ -1708,10 +1806,6 @@ migrate_dkey(struct migrate_pool_tls *tls, struct migrate_one *mrone, tls->mpt_size += mrone->mo_size; obj_close: dsc_obj_close(oh); -cont_close: - dsc_cont_close(poh, coh); -pool_close: - dsc_pool_close(poh); cont_put: if (cont != NULL) ds_cont_child_put(cont); @@ -1766,6 +1860,129 @@ migrate_one_destroy(struct migrate_one *mrone) D_FREE(mrone); } +enum { + OBJ_ULT = 1, + DKEY_ULT = 2, +}; + +/* Check if there are enough resource for the migration to proceed. */ +static int +migrate_system_enter(struct migrate_pool_tls *tls, int tgt_idx) +{ + uint32_t tgt_cnt = 0; + int rc = 0; + + D_ASSERT(dss_get_module_info()->dmi_xs_id == 0); + D_ASSERTF(tgt_idx < dss_tgt_nr, "tgt idx %d tgt nr %u\n", tgt_idx, dss_tgt_nr); + + tgt_cnt = atomic_load(&tls->mpt_obj_ult_cnts[tgt_idx]) + + atomic_load(&tls->mpt_dkey_ult_cnts[tgt_idx]); + + while ((tls->mpt_inflight_max_ult / dss_tgt_nr) <= tgt_cnt) { + D_DEBUG(DB_REBUILD, "tgt%d:%u max %u\n", + tgt_idx, tgt_cnt, tls->mpt_inflight_max_ult / dss_tgt_nr); + ABT_mutex_lock(tls->mpt_inflight_mutex); + ABT_cond_wait(tls->mpt_inflight_cond, tls->mpt_inflight_mutex); + ABT_mutex_unlock(tls->mpt_inflight_mutex); + if (tls->mpt_fini) + D_GOTO(out, rc = -DER_SHUTDOWN); + + tgt_cnt = atomic_load(&tls->mpt_obj_ult_cnts[tgt_idx]) + + atomic_load(&tls->mpt_dkey_ult_cnts[tgt_idx]); + } + + atomic_fetch_add(&tls->mpt_obj_ult_cnts[tgt_idx], 1); +out: + return rc; +} + +static int +migrate_tgt_enter(struct migrate_pool_tls *tls) +{ + uint32_t dkey_cnt = 0; + int rc = 0; + + D_ASSERT(dss_get_module_info()->dmi_xs_id != 0); + + dkey_cnt = atomic_load(tls->mpt_tgt_dkey_ult_cnt); + while (tls->mpt_inflight_max_ult / 2 <= dkey_cnt) { + D_DEBUG(DB_REBUILD, "tgt %u max %u\n", dkey_cnt, tls->mpt_inflight_max_ult); + + ABT_mutex_lock(tls->mpt_inflight_mutex); + ABT_cond_wait(tls->mpt_inflight_cond, tls->mpt_inflight_mutex); + ABT_mutex_unlock(tls->mpt_inflight_mutex); + if (tls->mpt_fini) + D_GOTO(out, rc = -DER_SHUTDOWN); + + dkey_cnt = atomic_load(tls->mpt_tgt_dkey_ult_cnt); + } + + atomic_fetch_add(tls->mpt_tgt_dkey_ult_cnt, 1); +out: + return rc; +} + +static void +migrate_system_try_wakeup(struct migrate_pool_tls *tls) +{ + bool wakeup = false; + int i; + + D_ASSERT(dss_get_module_info()->dmi_xs_id == 0); + for (i = 0; i < dss_tgt_nr; i++) { + uint32_t total_cnt; + + total_cnt = atomic_load(&tls->mpt_obj_ult_cnts[i]) + + atomic_load(&tls->mpt_dkey_ult_cnts[i]); + if (tls->mpt_inflight_max_ult / dss_tgt_nr > total_cnt) + wakeup = true; + } + + if (wakeup) { + ABT_mutex_lock(tls->mpt_inflight_mutex); + ABT_cond_broadcast(tls->mpt_inflight_cond); + ABT_mutex_unlock(tls->mpt_inflight_mutex); + } +} + +static void +migrate_system_exit(struct migrate_pool_tls *tls, unsigned int tgt_idx) +{ + /* NB: this will only be called during errr handling. In normal case + * the migrate ULT created by system will be exit on each target XS. + */ + D_ASSERT(dss_get_module_info()->dmi_xs_id == 0); + atomic_fetch_sub(&tls->mpt_obj_ult_cnts[tgt_idx], 1); + migrate_system_try_wakeup(tls); +} + +static void +migrate_tgt_try_wakeup(struct migrate_pool_tls *tls) +{ + uint32_t dkey_cnt = 0; + + D_ASSERT(dss_get_module_info()->dmi_xs_id != 0); + dkey_cnt = atomic_load(tls->mpt_tgt_dkey_ult_cnt); + if (tls->mpt_inflight_max_ult / 2 > dkey_cnt) { + ABT_mutex_lock(tls->mpt_inflight_mutex); + ABT_cond_broadcast(tls->mpt_inflight_cond); + ABT_mutex_unlock(tls->mpt_inflight_mutex); + } +} + +static void +migrate_tgt_exit(struct migrate_pool_tls *tls, int ult_type) +{ + D_ASSERT(dss_get_module_info()->dmi_xs_id != 0); + if (ult_type == OBJ_ULT) { + atomic_fetch_sub(tls->mpt_tgt_obj_ult_cnt, 1); + return; + } + + atomic_fetch_sub(tls->mpt_tgt_dkey_ult_cnt, 1); + migrate_tgt_try_wakeup(tls); +} + static void migrate_one_ult(void *arg) { @@ -1813,10 +2030,6 @@ migrate_one_ult(void *arg) rc = migrate_dkey(tls, mrone, data_size); tls->mpt_inflight_size -= data_size; - ABT_mutex_lock(tls->mpt_inflight_mutex); - ABT_cond_broadcast(tls->mpt_inflight_cond); - ABT_mutex_unlock(tls->mpt_inflight_mutex); - D_DEBUG(DB_REBUILD, DF_UOID" layout %u migrate dkey "DF_KEY" inflight_size "DF_U64": " DF_RC"\n", DP_UOID(mrone->mo_oid), mrone->mo_oid.id_layout_ver, DP_KEY(&mrone->mo_dkey), tls->mpt_inflight_size, DP_RC(rc)); @@ -1834,7 +2047,7 @@ migrate_one_ult(void *arg) out: migrate_one_destroy(mrone); if (tls != NULL) { - tls->mpt_executed_ult++; + migrate_tgt_exit(tls, DKEY_ULT); migrate_pool_tls_put(tls); } } @@ -2582,14 +2795,17 @@ migrate_start_ult(struct enum_unpack_arg *unpack_arg) DP_KEY(&mrone->mo_dkey), arg->tgt_idx, mrone->mo_iod_num); + rc = migrate_tgt_enter(tls); + if (rc) + break; d_list_del_init(&mrone->mo_list); rc = dss_ult_create(migrate_one_ult, mrone, DSS_XS_VOS, arg->tgt_idx, MIGRATE_STACK_SIZE, NULL); if (rc) { + migrate_tgt_exit(tls, DKEY_ULT); migrate_one_destroy(mrone); break; } - tls->mpt_generated_ult++; } put: @@ -2622,13 +2838,11 @@ migrate_one_epoch_object(daos_epoch_range_t *epr, struct migrate_pool_tls *tls, struct enum_unpack_arg unpack_arg = { 0 }; d_iov_t iov = { 0 }; d_sg_list_t sgl = { 0 }; - daos_handle_t poh = DAOS_HDL_INVAL; daos_handle_t coh = DAOS_HDL_INVAL; daos_handle_t oh = DAOS_HDL_INVAL; uint32_t minimum_nr; uint32_t enum_flags; uint32_t num; - int rc1; int rc = 0; D_DEBUG(DB_REBUILD, "migrate obj "DF_UOID" for shard %u eph " @@ -2642,18 +2856,19 @@ migrate_one_epoch_object(daos_epoch_range_t *epr, struct migrate_pool_tls *tls, } D_ASSERT(dss_get_module_info()->dmi_xs_id != 0); + rc = dsc_pool_open(tls->mpt_pool_uuid, tls->mpt_poh_uuid, 0, NULL, tls->mpt_pool->spc_pool->sp_map, - &tls->mpt_svc_list, &poh); + &tls->mpt_svc_list, &tls->mpt_pool_hdl); if (rc) { D_ERROR("dsc_pool_open failed: "DF_RC"\n", DP_RC(rc)); D_GOTO(out, rc); } - rc = dsc_cont_open(poh, arg->cont_uuid, tls->mpt_coh_uuid, 0, &coh); + rc = migrate_cont_open(tls, arg->cont_uuid, 0, &coh); if (rc) { - D_ERROR("dsc_cont_open failed: "DF_RC"\n", DP_RC(rc)); - D_GOTO(out_pool, rc); + D_ERROR("migrate_cont_open failed: "DF_RC"\n", DP_RC(rc)); + D_GOTO(out, rc); } /* Only open with RW flag, reintegrating flag will be set, which is needed @@ -2662,7 +2877,7 @@ migrate_one_epoch_object(daos_epoch_range_t *epr, struct migrate_pool_tls *tls, rc = dsc_obj_open(coh, arg->oid.id_pub, DAOS_OO_RO, &oh); if (rc) { D_ERROR("dsc_obj_open failed: "DF_RC"\n", DP_RC(rc)); - D_GOTO(out_cont, rc); + D_GOTO(out, rc); } unpack_arg.arg = arg; @@ -2678,7 +2893,7 @@ migrate_one_epoch_object(daos_epoch_range_t *epr, struct migrate_pool_tls *tls, if (rc) { D_ERROR("Unknown object class: %u\n", daos_obj_id2class(arg->oid.id_pub)); - D_GOTO(out_cont, rc); + D_GOTO(out_obj, rc); } memset(&anchor, 0, sizeof(anchor)); @@ -2795,6 +3010,18 @@ migrate_one_epoch_object(daos_epoch_range_t *epr, struct migrate_pool_tls *tls, rc = 0; continue; } else if (rc) { + /* To avoid reclaim and retry rebuild, let's retry until the pool map + * being changed due to further failure. + */ + if (rc == -DER_TIMEDOUT && + tls->mpt_version + 1 >= tls->mpt_pool->spc_map_version) { + D_WARN(DF_UUID" retry "DF_UOID" "DF_RC"\n", + DP_UUID(tls->mpt_pool_uuid), DP_UOID(arg->oid), + DP_RC(rc)); + rc = 0; + continue; + } + /* container might have been destroyed. Or there is * no spare target left for this object see * obj_grp_valid_shard_get() @@ -2810,9 +3037,8 @@ migrate_one_epoch_object(daos_epoch_range_t *epr, struct migrate_pool_tls *tls, num = 0; rc = 0; } - - D_DEBUG(DB_REBUILD, "Can not rebuild "DF_UOID" "DF_RC"\n", - DP_UOID(arg->oid), DP_RC(rc)); + D_DEBUG(DB_REBUILD, "Can not rebuild "DF_UOID" "DF_RC" mpt %u spc %u\n", + DP_UOID(arg->oid), DP_RC(rc), tls->mpt_version, tls->mpt_pool->spc_map_version); break; } @@ -2852,15 +3078,8 @@ migrate_one_epoch_object(daos_epoch_range_t *epr, struct migrate_pool_tls *tls, if (csum.iov_buf != NULL && csum.iov_buf != stack_csum_buf) D_FREE(csum.iov_buf); - +out_obj: dsc_obj_close(oh); -out_cont: - rc1 = dsc_cont_close(poh, coh); - if (rc1) - D_WARN(DF_UUID" container "DF_UUID" close failure: "DF_RC"\n", - DP_UUID(tls->mpt_pool_uuid), DP_UUID(tls->mpt_coh_uuid), DP_RC(rc1)); -out_pool: - dsc_pool_close(poh); out: D_DEBUG(DB_REBUILD, "obj "DF_UOID" for shard %u eph " DF_U64"-"DF_U64": "DF_RC"\n", DP_UOID(arg->oid), arg->shard, @@ -2976,15 +3195,15 @@ migrate_obj_ult(void *data) { struct iter_obj_arg *arg = data; struct migrate_pool_tls *tls = NULL; - daos_epoch_range_t epr; - int i; - int rc = 0; + daos_epoch_range_t epr; + int i; + int rc = 0; tls = migrate_pool_tls_lookup(arg->pool_uuid, arg->version, arg->generation); if (tls == NULL || tls->mpt_fini) { D_WARN("some one abort the rebuild "DF_UUID"\n", DP_UUID(arg->pool_uuid)); - D_GOTO(free_notls, rc = 0); + D_GOTO(free_notls, rc); } /* Only reintegrating targets/pool needs to discard the object, @@ -3040,7 +3259,6 @@ migrate_obj_ult(void *data) if (arg->epoch == DAOS_EPOCH_MAX) tls->mpt_obj_count++; - tls->mpt_obj_executed_ult++; if (rc == -DER_NONEXIST) { struct ds_cont_child *cont_child = NULL; @@ -3056,16 +3274,20 @@ migrate_obj_ult(void *data) if (DAOS_FAIL_CHECK(DAOS_REBUILD_OBJ_FAIL) && tls->mpt_obj_count >= daos_fail_value_get()) rc = -DER_IO; + out: if (tls->mpt_status == 0 && rc < 0) tls->mpt_status = rc; D_DEBUG(DB_REBUILD, ""DF_UUID"/%u stop migrate obj "DF_UOID - " for shard %u executed "DF_U64"/"DF_U64" : " DF_RC"\n", + " for shard %u ult %u/%u "DF_U64" : " DF_RC"\n", DP_UUID(tls->mpt_pool_uuid), tls->mpt_version, - DP_UOID(arg->oid), arg->shard, tls->mpt_obj_executed_ult, tls->mpt_obj_count, - DP_RC(rc)); + DP_UOID(arg->oid), arg->shard, atomic_load(tls->mpt_tgt_obj_ult_cnt), + atomic_load(tls->mpt_tgt_dkey_ult_cnt), tls->mpt_obj_count, DP_RC(rc)); free_notls: + if (tls != NULL) + migrate_tgt_exit(tls, OBJ_ULT); + D_FREE(arg->snaps); D_FREE(arg); migrate_pool_tls_put(tls); @@ -3117,14 +3339,12 @@ migrate_one_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_e sizeof(*obj_arg->snaps) * cont_arg->snap_cnt); } + /* Let's iterate the object on different xstream */ rc = dss_ult_create(migrate_obj_ult, obj_arg, DSS_XS_VOS, - tgt_idx, MIGRATE_STACK_SIZE, - NULL); + tgt_idx, MIGRATE_STACK_SIZE, NULL); if (rc) goto free; - tls->mpt_obj_generated_ult++; - val.epoch = eph; val.shard = shard; val.tgt_idx = tgt_idx; @@ -3132,19 +3352,19 @@ migrate_one_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_e d_iov_set(&val_iov, &val, sizeof(struct migrate_obj_val)); rc = obj_tree_insert(toh, cont_arg->cont_uuid, -1, oid, &val_iov); D_DEBUG(DB_REBUILD, "Insert "DF_UUID"/"DF_UUID"/"DF_UOID": ver %u " - "generated "DF_U64" "DF_RC"\n", DP_UUID(tls->mpt_pool_uuid), + "ult %u/%u "DF_RC"\n", DP_UUID(tls->mpt_pool_uuid), DP_UUID(cont_arg->cont_uuid), DP_UOID(oid), tls->mpt_version, - tls->mpt_obj_generated_ult, DP_RC(rc)); + atomic_load(&tls->mpt_obj_ult_cnts[tgt_idx]), + atomic_load(&tls->mpt_dkey_ult_cnts[tgt_idx]), DP_RC(rc)); return 0; - free: D_FREE(obj_arg->snaps); D_FREE(obj_arg); return rc; } -#define DEFAULT_YIELD_FREQ 128 +#define DEFAULT_YIELD_FREQ 16 static int migrate_obj_iter_cb(daos_handle_t ih, d_iov_t *key_iov, d_iov_t *val_iov, void *data) @@ -3165,10 +3385,17 @@ migrate_obj_iter_cb(daos_handle_t ih, d_iov_t *key_iov, d_iov_t *val_iov, void * " eph "DF_U64" start\n", DP_UUID(arg->cont_uuid), DP_UOID(*oid), ih.cookie, epoch); + rc = migrate_system_enter(arg->pool_tls, tgt_idx); + if (rc != 0) { + DL_ERROR(rc, DF_UUID" enter migrate failed.", DP_UUID(arg->cont_uuid)); + return rc; + } + rc = migrate_one_object(*oid, epoch, punched_epoch, shard, tgt_idx, arg); if (rc != 0) { D_ERROR("obj "DF_UOID" migration failed: "DF_RC"\n", DP_UOID(*oid), DP_RC(rc)); + migrate_system_exit(arg->pool_tls, tgt_idx); return rc; } @@ -3180,7 +3407,7 @@ migrate_obj_iter_cb(daos_handle_t ih, d_iov_t *key_iov, d_iov_t *val_iov, void * if (--arg->yield_freq == 0) { arg->yield_freq = DEFAULT_YIELD_FREQ; - ABT_thread_yield(); + dss_sleep(0); } /* re-probe the dbtree after deletion */ @@ -3250,34 +3477,6 @@ migrate_cont_iter_cb(daos_handle_t ih, d_iov_t *key_iov, arg.pool_tls = tls; uuid_copy(arg.cont_uuid, cont_uuid); while (!dbtree_is_empty(root->root_hdl)) { - uint64_t ult_cnt; - - D_ASSERT(tls->mpt_obj_generated_ult >= - tls->mpt_obj_executed_ult); - D_ASSERT(tls->mpt_generated_ult >= tls->mpt_executed_ult); - - ult_cnt = max(tls->mpt_obj_generated_ult - - tls->mpt_obj_executed_ult, - tls->mpt_generated_ult - - tls->mpt_executed_ult); - - while (ult_cnt >= tls->mpt_inflight_max_ult && !tls->mpt_fini) { - ABT_mutex_lock(tls->mpt_inflight_mutex); - ABT_cond_wait(tls->mpt_inflight_cond, - tls->mpt_inflight_mutex); - ABT_mutex_unlock(tls->mpt_inflight_mutex); - ult_cnt = max(tls->mpt_obj_generated_ult - - tls->mpt_obj_executed_ult, - tls->mpt_generated_ult - - tls->mpt_executed_ult); - D_DEBUG(DB_REBUILD, "obj "DF_U64"/"DF_U64", key" - DF_U64"/"DF_U64" "DF_U64"\n", - tls->mpt_obj_generated_ult, - tls->mpt_obj_executed_ult, - tls->mpt_generated_ult, - tls->mpt_executed_ult, ult_cnt); - } - if (tls->mpt_fini) break; @@ -3533,7 +3732,6 @@ ds_obj_migrate_handler(crt_rpc_t *rpc) uuid_t co_hdl_uuid; struct ds_pool *pool = NULL; uint32_t rebuild_ver; - uint32_t rebuild_gen; int rc; migrate_in = crt_req_get(rpc); @@ -3576,11 +3774,12 @@ ds_obj_migrate_handler(crt_rpc_t *rpc) D_GOTO(out, rc); } - ds_rebuild_running_query(migrate_in->om_pool_uuid, -1, &rebuild_ver, NULL, &rebuild_gen); - if (rebuild_ver == 0 || rebuild_gen != migrate_in->om_generation) { - D_ERROR(DF_UUID" rebuild service has been stopped.\n", - DP_UUID(migrate_in->om_pool_uuid)); - D_GOTO(out, rc = -DER_SHUTDOWN); + ds_rebuild_running_query(migrate_in->om_pool_uuid, -1, &rebuild_ver, NULL, NULL); + if (rebuild_ver == 0 || rebuild_ver != migrate_in->om_version) { + rc = -DER_SHUTDOWN; + DL_ERROR(rc, DF_UUID" rebuild ver %u om version %u", + DP_UUID(migrate_in->om_pool_uuid), rebuild_ver, migrate_in->om_version); + D_GOTO(out, rc); } rc = ds_migrate_object(pool, po_hdl_uuid, co_hdl_uuid, co_uuid, migrate_in->om_version, @@ -3600,11 +3799,8 @@ struct migrate_query_arg { uuid_t pool_uuid; ABT_mutex status_lock; struct ds_migrate_status dms; - uint32_t obj_generated_ult; - uint32_t obj_executed_ult; - uint32_t generated_ult; - uint32_t executed_ult; uint32_t version; + uint32_t total_ult_cnt; uint32_t generation; }; @@ -3622,17 +3818,15 @@ migrate_check_one(void *data) arg->dms.dm_rec_count += tls->mpt_rec_count; arg->dms.dm_obj_count += tls->mpt_obj_count; arg->dms.dm_total_size += tls->mpt_size; - arg->obj_generated_ult += tls->mpt_obj_generated_ult; - arg->obj_executed_ult += tls->mpt_obj_executed_ult; - arg->generated_ult += tls->mpt_generated_ult; - arg->executed_ult += tls->mpt_executed_ult; if (arg->dms.dm_status == 0) arg->dms.dm_status = tls->mpt_status; + arg->total_ult_cnt += atomic_load(tls->mpt_tgt_obj_ult_cnt) + + atomic_load(tls->mpt_tgt_dkey_ult_cnt); ABT_mutex_unlock(arg->status_lock); - - D_DEBUG(DB_REBUILD, "status %d/%d rec/obj/size " + D_DEBUG(DB_REBUILD, "status %d/%d/ ult %u/%u rec/obj/size " DF_U64"/"DF_U64"/"DF_U64"\n", tls->mpt_status, - arg->dms.dm_status, tls->mpt_rec_count, + arg->dms.dm_status, atomic_load(tls->mpt_tgt_obj_ult_cnt), + atomic_load(tls->mpt_tgt_dkey_ult_cnt), tls->mpt_rec_count, tls->mpt_obj_count, tls->mpt_size); migrate_pool_tls_put(tls); @@ -3662,32 +3856,21 @@ ds_migrate_query_status(uuid_t pool_uuid, uint32_t ver, unsigned int generation, if (rc) D_GOTO(out, rc); - /** - * The object ULT is generated by 0 xstream, and dss_collective does not - * do collective on 0 xstream - **/ - arg.obj_generated_ult += tls->mpt_obj_generated_ult; - tls->mpt_obj_executed_ult = arg.obj_executed_ult; - tls->mpt_generated_ult = arg.generated_ult; - tls->mpt_executed_ult = arg.executed_ult; - *dms = arg.dms; - if (arg.obj_generated_ult > arg.obj_executed_ult || - arg.generated_ult > arg.executed_ult || tls->mpt_ult_running) - dms->dm_migrating = 1; + if (arg.total_ult_cnt > 0 || tls->mpt_ult_running) + arg.dms.dm_migrating = 1; else - dms->dm_migrating = 0; + arg.dms.dm_migrating = 0; - ABT_mutex_lock(tls->mpt_inflight_mutex); - ABT_cond_broadcast(tls->mpt_inflight_cond); - ABT_mutex_unlock(tls->mpt_inflight_mutex); + if (dms != NULL) + *dms = arg.dms; + migrate_system_try_wakeup(tls); D_DEBUG(DB_REBUILD, "pool "DF_UUID" ver %u migrating=%s," " obj_count="DF_U64", rec_count="DF_U64 - " size="DF_U64" obj %u/%u general %u/%u status %d\n", - DP_UUID(pool_uuid), ver, dms->dm_migrating ? "yes" : "no", - dms->dm_obj_count, dms->dm_rec_count, dms->dm_total_size, - arg.obj_generated_ult, arg.obj_executed_ult, - arg.generated_ult, arg.executed_ult, dms->dm_status); + " size="DF_U64" ult cnt %u status %d\n", + DP_UUID(pool_uuid), ver, arg.dms.dm_migrating ? "yes" : "no", + arg.dms.dm_obj_count, arg.dms.dm_rec_count, arg.dms.dm_total_size, + arg.total_ult_cnt, arg.dms.dm_status); out: ABT_mutex_free(&arg.status_lock); migrate_pool_tls_put(tls); diff --git a/src/rebuild/rebuild_iv.c b/src/rebuild/rebuild_iv.c index 84859f30344..cc585e037e1 100644 --- a/src/rebuild/rebuild_iv.c +++ b/src/rebuild/rebuild_iv.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2017-2023 Intel Corporation. + * (C) Copyright 2017-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -98,8 +98,9 @@ rebuild_iv_ent_update(struct ds_iv_entry *entry, struct ds_iv_key *key, d_rank_t rank; int rc; - D_DEBUG(DB_REBUILD, "rank %d master rank %d\n", src_iv->riv_rank, - src_iv->riv_master_rank); + D_DEBUG(DB_REBUILD, "rank %d master rank %d term "DF_U64" gen %u dtx resync %u\n", + src_iv->riv_rank, src_iv->riv_master_rank, src_iv->riv_leader_term, + src_iv->riv_rebuild_gen, src_iv->riv_dtx_resyc_version); if (src_iv->riv_master_rank == -1) return -DER_NOTLEADER; diff --git a/src/rebuild/scan.c b/src/rebuild/scan.c index 271b3b689fa..5468aad02de 100644 --- a/src/rebuild/scan.c +++ b/src/rebuild/scan.c @@ -1117,21 +1117,31 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc) */ d_list_for_each_entry(rpt, &rebuild_gst.rg_tgt_tracker_list, rt_list) { if (uuid_compare(rpt->rt_pool_uuid, rsi->rsi_pool_uuid) == 0 && - ((rpt->rt_rebuild_ver < rsi->rsi_rebuild_ver) || - (rpt->rt_rebuild_op == rsi->rsi_rebuild_op && - rpt->rt_rebuild_ver == rsi->rsi_rebuild_ver && - rpt->rt_rebuild_gen < rsi->rsi_rebuild_gen))) { + rpt->rt_rebuild_ver < rsi->rsi_rebuild_ver && + rpt->rt_rebuild_op == rsi->rsi_rebuild_op) { D_INFO(DF_UUID" %p %s %u/"DF_U64"/%u < incoming rebuild %u/"DF_U64"/%u\n", DP_UUID(rpt->rt_pool_uuid), rpt, RB_OP_STR(rpt->rt_rebuild_op), rpt->rt_rebuild_ver, rpt->rt_leader_term, rpt->rt_rebuild_gen, rsi->rsi_rebuild_ver, rsi->rsi_leader_term, rsi->rsi_rebuild_gen); rpt->rt_abort = 1; + if (rpt->rt_leader_rank != rsi->rsi_master_rank) { + D_DEBUG(DB_REBUILD, DF_UUID" master rank" + " %d -> %d term "DF_U64" -> "DF_U64"\n", + DP_UUID(rpt->rt_pool_uuid), + rpt->rt_leader_rank, rsi->rsi_master_rank, + rpt->rt_leader_term, rsi->rsi_leader_term); + /* If this is the old leader, then also stop the rebuild + * tracking ULT. + */ + rebuild_leader_stop(rsi->rsi_pool_uuid, rsi->rsi_rebuild_ver, + -1, rpt->rt_leader_term); + } } } /* check if the rebuild with different leader is already started */ - rpt = rpt_lookup(rsi->rsi_pool_uuid, -1, rsi->rsi_rebuild_ver, rsi->rsi_rebuild_gen); - if (rpt != NULL) { + rpt = rpt_lookup(rsi->rsi_pool_uuid, -1, rsi->rsi_rebuild_ver, -1); + if (rpt != NULL && rpt->rt_rebuild_op == rsi->rsi_rebuild_op) { if (rpt->rt_global_done) { D_WARN("the previous rebuild "DF_UUID"/%d/"DF_U64"/%p is not cleanup yet\n", DP_UUID(rsi->rsi_pool_uuid), rsi->rsi_rebuild_ver, @@ -1167,12 +1177,15 @@ rebuild_tgt_scan_handler(crt_rpc_t *rpc) /* If this is the old leader, then also stop the rebuild tracking ULT. */ rebuild_leader_stop(rsi->rsi_pool_uuid, rsi->rsi_rebuild_ver, - rsi->rsi_rebuild_gen, rpt->rt_leader_term); + -1, rpt->rt_leader_term); } rpt->rt_leader_term = rsi->rsi_leader_term; D_GOTO(out, rc = 0); + } else if (rpt != NULL) { + rpt_put(rpt); + rpt = NULL; } tls = rebuild_pool_tls_lookup(rsi->rsi_pool_uuid, rsi->rsi_rebuild_ver, diff --git a/src/rebuild/srv.c b/src/rebuild/srv.c index 6bb1101b6bd..03521484a55 100644 --- a/src/rebuild/srv.c +++ b/src/rebuild/srv.c @@ -1254,6 +1254,8 @@ rebuild_leader_start(struct ds_pool *pool, struct rebuild_task *task, struct rebuild_global_pool_tracker **p_rgt) { uint64_t leader_term; + uint32_t version; + uint32_t generation; int rc; rc = ds_pool_svc_term_get(pool->sp_uuid, &leader_term); @@ -1263,7 +1265,14 @@ rebuild_leader_start(struct ds_pool *pool, struct rebuild_task *task, return rc; } - rc = rebuild_prepare(pool, task->dst_map_ver, ++pool->sp_rebuild_gen, + /* If this happened due to leader switch, then do not need update + * generation. + */ + ds_rebuild_running_query(pool->sp_uuid, -1, &version, NULL, &generation); + if (version < task->dst_map_ver) + generation = ++pool->sp_rebuild_gen; + + rc = rebuild_prepare(pool, task->dst_map_ver, generation, leader_term, task->dst_reclaim_eph, &task->dst_tgts, task->dst_rebuild_op, p_rgt); if (rc <= 0) diff --git a/src/tests/suite/daos_rebuild.c b/src/tests/suite/daos_rebuild.c index f30fd3d6aa3..0fd8facea4b 100644 --- a/src/tests/suite/daos_rebuild.c +++ b/src/tests/suite/daos_rebuild.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2016-2023 Intel Corporation. + * (C) Copyright 2016-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -125,9 +125,11 @@ rebuild_retry_for_stale_pool(void **state) /* make one shard to return STALE for rebuild fetch */ rank = get_rank_by_oid_shard(arg, oids[0], 1); + daos_debug_set_params(arg->group, rank, DMG_KEY_FAIL_NUM, + 5, 0, NULL); daos_debug_set_params(arg->group, rank, DMG_KEY_FAIL_LOC, - DAOS_REBUILD_STALE_POOL | DAOS_FAIL_ONCE, - 0, NULL); + DAOS_REBUILD_STALE_POOL | DAOS_FAIL_SOME, + 0, NULL); } par_barrier(PAR_COMM_WORLD);