diff --git a/src/container/srv_target.c b/src/container/srv_target.c index 80f924327b8..83b1846bead 100644 --- a/src/container/srv_target.c +++ b/src/container/srv_target.c @@ -850,6 +850,18 @@ ds_cont_child_stop_all(struct ds_pool_child *pool_child) } } +void +ds_cont_child_reset_ec_agg_eph_all(struct ds_pool_child *pool_child) +{ + struct ds_cont_child *cont_child; + + D_DEBUG(DB_MD, DF_UUID"[%d]: reset all containers EC aggregate epoch.\n", + DP_UUID(pool_child->spc_uuid), dss_get_module_info()->dmi_tgt_id); + + d_list_for_each_entry(cont_child, &pool_child->spc_cont_list, sc_link) + cont_child->sc_ec_agg_eph = cont_child->sc_ec_agg_eph_boundary; +} + static int cont_child_start(struct ds_pool_child *pool_child, const uuid_t co_uuid, bool *started, struct ds_cont_child **cont_out) diff --git a/src/include/daos_srv/container.h b/src/include/daos_srv/container.h index bbbb848d444..daa9184e707 100644 --- a/src/include/daos_srv/container.h +++ b/src/include/daos_srv/container.h @@ -190,7 +190,8 @@ void ds_cont_child_stop_all(struct ds_pool_child *pool_child); int ds_cont_child_lookup(uuid_t pool_uuid, uuid_t cont_uuid, struct ds_cont_child **ds_cont); - +void +ds_cont_child_reset_ec_agg_eph_all(struct ds_pool_child *pool_child); /** initialize a csummer based on container properties. Will retrieve the * checksum related properties from IV */ diff --git a/src/object/srv_ec_aggregate.c b/src/object/srv_ec_aggregate.c index 1c603297b51..71aedeca895 100644 --- a/src/object/srv_ec_aggregate.c +++ b/src/object/srv_ec_aggregate.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2020-2023 Intel Corporation. + * (C) Copyright 2020-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -89,6 +89,7 @@ struct ec_agg_par_extent { struct ec_agg_stripe { daos_off_t as_stripenum; /* ordinal of stripe, offset/(k*len) */ daos_epoch_t as_hi_epoch; /* highest epoch in stripe */ + daos_epoch_t as_lo_epoch; /* lowest epoch in stripe */ d_list_t as_dextents; /* list of stripe's data extents */ daos_off_t as_stripe_fill; /* amount of stripe covered by data */ uint64_t as_offset; /* start offset in stripe */ @@ -114,6 +115,7 @@ struct ec_agg_entry { struct pl_obj_layout *ae_obj_layout; struct daos_shard_loc ae_peer_pshards[OBJ_EC_MAX_P]; uint32_t ae_grp_idx; + uint32_t ae_is_leader:1; }; /* Parameters used to drive iterate all. @@ -123,13 +125,13 @@ struct ec_agg_param { struct ec_agg_entry ap_agg_entry; /* entry used for each OID */ daos_epoch_range_t ap_epr; /* hi/lo extent threshold */ daos_epoch_t ap_filter_eph; /* Aggregatable filter epoch */ + daos_epoch_t ap_min_unagg_eph; /* minimum unaggregate epoch */ daos_handle_t ap_cont_handle; /* VOS container handle */ int (*ap_yield_func)(void *arg); /* yield function*/ void *ap_yield_arg; /* yield argument */ uint32_t ap_credits_max; /* # of tight loops to yield */ uint32_t ap_credits; /* # of tight loops */ - uint32_t ap_initialized:1, /* initialized flag */ - ap_obj_skipped:1; /* skipped obj during aggregation */ + uint32_t ap_initialized:1; /* initialized flag */ }; /* Struct used to drive offloaded stripe update. @@ -324,6 +326,7 @@ agg_clear_extents(struct ec_agg_entry *entry) D_ASSERT(entry->ae_cur_stripe.as_extent_cnt == 0); } entry->ae_cur_stripe.as_hi_epoch = 0UL; + entry->ae_cur_stripe.as_lo_epoch = 0UL; entry->ae_cur_stripe.as_stripe_fill = 0; entry->ae_cur_stripe.as_has_holes = carry_is_hole ? true : false; } @@ -1858,7 +1861,13 @@ agg_process_stripe(struct ec_agg_param *agg_param, struct ec_agg_entry *entry) * and all replica extents are newer than parity. */ if (ec_age_stripe_full(entry, ec_age_with_parity(entry))) { - rc = agg_encode_local_parity(entry); + if (entry->ae_is_leader) { + rc = agg_encode_local_parity(entry); + } else { + update_vos = false; + agg_param->ap_min_unagg_eph = min(agg_param->ap_min_unagg_eph, + entry->ae_cur_stripe.as_lo_epoch); + } goto out; } @@ -1868,6 +1877,13 @@ agg_process_stripe(struct ec_agg_param *agg_param, struct ec_agg_entry *entry) goto out; } + if (!entry->ae_is_leader) { + update_vos = false; + agg_param->ap_min_unagg_eph = min(agg_param->ap_min_unagg_eph, + entry->ae_cur_stripe.as_lo_epoch); + goto out; + } + /* With parity and some newer partial replicas, possibly holes */ if (ec_age_with_hole(entry)) process_holes = true; @@ -1951,13 +1967,19 @@ agg_extent_add(struct ec_agg_entry *agg_entry, vos_iter_entry_t *entry, agg_in_stripe(agg_entry, recx); } + if (agg_entry->ae_cur_stripe.as_lo_epoch == 0 || + extent->ae_epoch < agg_entry->ae_cur_stripe.as_lo_epoch) + agg_entry->ae_cur_stripe.as_lo_epoch = extent->ae_epoch; + if (extent->ae_epoch > agg_entry->ae_cur_stripe.as_hi_epoch) agg_entry->ae_cur_stripe.as_hi_epoch = extent->ae_epoch; - D_DEBUG(DB_TRACE, "adding extent "DF_RECX", to stripe %lu, shard: %u\n", + D_DEBUG(DB_TRACE, "adding extent "DF_RECX", to stripe %lu, shard: %u" + "max/min "DF_X64"/"DF_X64"\n", DP_RECX(extent->ae_recx), agg_stripenum(agg_entry, extent->ae_recx.rx_idx), - agg_entry->ae_oid.id_shard); + agg_entry->ae_oid.id_shard, agg_entry->ae_cur_stripe.as_hi_epoch, + agg_entry->ae_cur_stripe.as_lo_epoch); out: return rc; } @@ -1973,9 +1995,9 @@ agg_data_extent(struct ec_agg_param *agg_param, vos_iter_entry_t *entry, D_ASSERT(!(entry->ie_recx.rx_idx & PARITY_INDICATOR)); - D_DEBUG(DB_IO, DF_UOID" get recx "DF_RECX", %u\n", + D_DEBUG(DB_IO, DF_UOID" get recx "DF_RECX", "DF_X64"/%u leader %s\n", DP_UOID(agg_entry->ae_oid), DP_RECX(entry->ie_recx), - entry->ie_minor_epc); + entry->ie_epoch, entry->ie_minor_epc, agg_entry->ae_is_leader ? "yes" : "no"); while (offset < end) { daos_off_t this_stripenum; @@ -2038,6 +2060,7 @@ agg_akey_post(daos_handle_t ih, struct ec_agg_param *agg_param, agg_entry->ae_cur_stripe.as_stripenum = 0UL; agg_entry->ae_cur_stripe.as_hi_epoch = 0UL; + agg_entry->ae_cur_stripe.as_lo_epoch = 0UL; agg_entry->ae_cur_stripe.as_stripe_fill = 0UL; agg_entry->ae_cur_stripe.as_offset = 0U; } @@ -2073,39 +2096,57 @@ agg_reset_pos(vos_iter_type_t type, struct ec_agg_entry *agg_entry) } } -static int -agg_shard_is_leader(struct ds_pool *pool, struct ec_agg_entry *agg_entry) +static bool +agg_shard_is_parity(struct ds_pool *pool, struct ec_agg_entry *agg_entry) { - struct pl_obj_shard *shard; struct daos_oclass_attr *oca; uint32_t grp_idx; uint32_t grp_start; - uint32_t ec_tgt_idx; - int shard_idx; - int rc; + uint32_t min_fseq = -1; + int leader_shard = -1; + int i; oca = &agg_entry->ae_oca; + if (is_ec_data_shard_by_layout_ver(agg_entry->ae_oid.id_layout_ver, + agg_entry->ae_dkey_hash, oca, + agg_entry->ae_oid.id_shard)) { + agg_entry->ae_is_leader = 0; + return false; + } + grp_idx = agg_entry->ae_oid.id_shard / daos_oclass_grp_size(oca); - grp_start = grp_idx * daos_oclass_grp_size(oca); - ec_tgt_idx = obj_ec_shard_idx_by_layout_ver(agg_entry->ae_oid.id_layout_ver, - agg_entry->ae_dkey_hash, oca, - daos_oclass_grp_size(oca) - 1); - /** - * FIXME: only the last parity shard can be the EC agg leader. What about - * Degraded mode? - */ - if (agg_entry->ae_oid.id_shard != ec_tgt_idx + grp_start) - return 0; + grp_start = grp_idx * agg_entry->ae_obj_layout->ol_grp_size; + for (i = 0; i < obj_ec_parity_tgt_nr(oca); i++) { + uint32_t ec_tgt_idx; + uint32_t shard_idx; + struct pl_obj_shard *shard; + + ec_tgt_idx = obj_ec_shard_idx_by_layout_ver(agg_entry->ae_oid.id_layout_ver, + agg_entry->ae_dkey_hash, oca, + daos_oclass_grp_size(oca) - i - 1); + + shard_idx = grp_start + ec_tgt_idx; + shard = pl_obj_get_shard(agg_entry->ae_obj_layout, shard_idx); - /* If last parity unavailable, then skip the object via returning -DER_STALE. */ - shard_idx = grp_idx * agg_entry->ae_obj_layout->ol_grp_size + ec_tgt_idx; - shard = pl_obj_get_shard(agg_entry->ae_obj_layout, shard_idx); - if (shard->po_target != -1 && shard->po_shard != -1 && !shard->po_rebuilding) - rc = (agg_entry->ae_oid.id_shard == shard->po_shard) ? 1 : 0; + if (shard->po_target == -1 || shard->po_shard == -1 || shard->po_rebuilding) + continue; + + if (min_fseq == -1 || min_fseq > shard->po_fseq) { + leader_shard = shard_idx; + min_fseq = shard->po_fseq; + } + } + + /* No parity shard is available */ + if (leader_shard == -1) + return false; + + if (agg_entry->ae_oid.id_shard == leader_shard) + agg_entry->ae_is_leader = 1; else - rc = -DER_STALE; + agg_entry->ae_is_leader = 0; - return rc; + return true; } /* Initializes the struct holding the iteration state (ec_agg_entry). */ @@ -2129,8 +2170,6 @@ agg_dkey(daos_handle_t ih, vos_iter_entry_t *entry, struct ec_agg_param *agg_param, struct ec_agg_entry *agg_entry, unsigned int *acts) { - int rc; - if (!agg_key_compare(agg_entry->ae_dkey, entry->ie_key)) { D_DEBUG(DB_EPC, "Skip dkey: "DF_KEY" ec agg on re-probe\n", DP_KEY(&entry->ie_key)); @@ -2144,24 +2183,16 @@ agg_dkey(daos_handle_t ih, vos_iter_entry_t *entry, agg_entry->ae_dkey_hash = obj_dkey2hash(agg_entry->ae_oid.id_pub, &agg_entry->ae_dkey); agg_reset_pos(VOS_ITER_AKEY, agg_entry); - rc = agg_shard_is_leader(agg_param->ap_pool_info.api_pool, agg_entry); - if (rc == 1) { - D_DEBUG(DB_EPC, "oid:"DF_UOID":"DF_KEY" ec agg starting\n", - DP_UOID(agg_entry->ae_oid), DP_KEY(&agg_entry->ae_dkey)); + if(agg_shard_is_parity(agg_param->ap_pool_info.api_pool, agg_entry)) { + D_DEBUG(DB_EPC, "oid:"DF_UOID":"DF_KEY" ec agg starting leader %s\n", + DP_UOID(agg_entry->ae_oid), DP_KEY(&agg_entry->ae_dkey), + agg_entry->ae_is_leader ? "yes" : "no"); agg_reset_dkey_entry(&agg_param->ap_agg_entry, entry); - rc = 0; } else { - if (rc < 0) { - D_ERROR("oid:"DF_UOID" ds_pool_check_leader failed " - DF_RC"\n", DP_UOID(entry->ie_oid), DP_RC(rc)); - if (rc == -DER_STALE) - agg_param->ap_obj_skipped = 1; - rc = 0; - } *acts |= VOS_ITER_CB_SKIP; } - return rc; + return 0; } /* Handles akeys returned by the iterator. */ @@ -2625,7 +2656,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, agg_reset_entry(&ec_agg_param->ap_agg_entry, NULL, NULL); - ec_agg_param->ap_obj_skipped = 0; + ec_agg_param->ap_min_unagg_eph = DAOS_EPOCH_MAX; rc = vos_iterate(&iter_param, VOS_ITER_OBJ, true, &anchors, agg_iterate_pre_cb, agg_iterate_post_cb, ec_agg_param, NULL); @@ -2637,8 +2668,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, ec_agg_param->ap_agg_entry.ae_obj_hdl = DAOS_HDL_INVAL; } - if (ec_agg_param->ap_obj_skipped && !cont->sc_stopping) { - D_DEBUG(DB_EPC, "with skipped obj during aggregation.\n"); + if (cont->sc_pool->spc_pool->sp_rebuilding > 0 && !cont->sc_stopping) { /* There is rebuild going on, and we can't proceed EC aggregate boundary, * Let's wait for 5 seconds for another EC aggregation. */ @@ -2649,7 +2679,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, vos_aggregate_exit(cont->sc_hdl); update_hae: - if (rc == 0 && ec_agg_param->ap_obj_skipped == 0) { + if (rc == 0) { cont->sc_ec_agg_eph = max(cont->sc_ec_agg_eph, epr->epr_hi); if (!cont->sc_stopping && cont->sc_ec_query_agg_eph) { uint64_t orig, cur; @@ -2662,7 +2692,8 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, DP_CONT(cont->sc_pool_uuid, cont->sc_uuid), orig, cur, cur - orig); - *cont->sc_ec_query_agg_eph = cont->sc_ec_agg_eph; + *cont->sc_ec_query_agg_eph = min(ec_agg_param->ap_min_unagg_eph, + cont->sc_ec_agg_eph); } } diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index 70e75b15ad5..70ce7973553 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -996,7 +996,12 @@ __migrate_fetch_update_parity(struct migrate_one *mrone, daos_handle_t oh, offset = iods[i].iod_recxs[0].rx_idx; size = iods[i].iod_recxs[0].rx_nr; - parity_eph = ephs[i][0]; + /* Use stable epoch for partial parity update to make sure + * these partial updates are not below stable epoch boundary, + * otherwise both EC and VOS aggregation might operate on + * the same recxs. + */ + parity_eph = encode ? ephs[i][0] : mrone->mo_epoch; tmp_iod = iods[i]; ptr = iov[i].iov_buf; for (j = 1; j < iods[i].iod_nr; j++) { diff --git a/src/pool/srv_target.c b/src/pool/srv_target.c index 4d459405008..8dce5ae8661 100644 --- a/src/pool/srv_target.c +++ b/src/pool/srv_target.c @@ -1614,6 +1614,7 @@ update_child_map(void *data) return 0; } + ds_cont_child_reset_ec_agg_eph_all(child); child->spc_map_version = pool->sp_map_version; ds_pool_child_put(child); return 0; diff --git a/src/tests/ftest/daos_test/suite.yaml b/src/tests/ftest/daos_test/suite.yaml index b853a8c1d89..c01356ea786 100644 --- a/src/tests/ftest/daos_test/suite.yaml +++ b/src/tests/ftest/daos_test/suite.yaml @@ -27,7 +27,7 @@ timeouts: test_daos_extend_simple: 3600 test_daos_oid_allocator: 640 test_daos_checksum: 500 - test_daos_rebuild_ec: 6400 + test_daos_rebuild_ec: 7200 test_daos_aggregate_ec: 200 test_daos_degraded_ec: 1900 test_daos_dedup: 220 diff --git a/src/tests/suite/daos_rebuild_ec.c b/src/tests/suite/daos_rebuild_ec.c index 6e3ac373690..b5018e50111 100644 --- a/src/tests/suite/daos_rebuild_ec.c +++ b/src/tests/suite/daos_rebuild_ec.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2016-2023 Intel Corporation. + * (C) Copyright 2016-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -1304,6 +1304,9 @@ rebuild_ec_parity_overwrite_fail_parity_internal(void **state, int *kill_shards, parity_rank = get_rank_by_oid_shard(arg, oid, shard_idx); rebuild_single_pool_rank(arg, parity_rank, true); + print_message("sleep 60 seconds for aggregation\n"); + sleep(60); + /* fail data shard */ for (i = 0; i < nr; i++) { shard_idx = (dkey_hash % 6 + kill_shards[i]) % 6; @@ -1487,7 +1490,7 @@ static const struct CMUnitTest rebuild_tests[] = { {"REBUILD46: fail parity shard and data shards after overwrite", rebuild_ec_overwrite_fail_parity_data, rebuild_ec_8nodes_setup, test_teardown}, - {"REBUILD46: fail parity shard and data shards after overwrite with aggregation", + {"REBUILD47: fail parity shard and data shards after overwrite with aggregation", rebuild_ec_overwrite_fail_parity_data_with_parity, rebuild_ec_8nodes_setup, test_teardown}, };