Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-14598 object: correct epoch for parity migration #13453

Merged
merged 5 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/container/srv_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,18 @@ ds_cont_child_stop_all(struct ds_pool_child *pool_child)
}
}

void
ds_cont_child_reset_ec_agg_eph_all(struct ds_pool_child *pool_child)
{
struct ds_cont_child *cont_child;

D_DEBUG(DB_MD, DF_UUID"[%d]: reset all containers EC aggregate epoch.\n",
DP_UUID(pool_child->spc_uuid), dss_get_module_info()->dmi_tgt_id);

d_list_for_each_entry(cont_child, &pool_child->spc_cont_list, sc_link)
cont_child->sc_ec_agg_eph = cont_child->sc_ec_agg_eph_boundary;
}

static int
cont_child_start(struct ds_pool_child *pool_child, const uuid_t co_uuid,
bool *started, struct ds_cont_child **cont_out)
Expand Down
3 changes: 2 additions & 1 deletion src/include/daos_srv/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ void ds_cont_child_stop_all(struct ds_pool_child *pool_child);

int ds_cont_child_lookup(uuid_t pool_uuid, uuid_t cont_uuid,
struct ds_cont_child **ds_cont);

void
ds_cont_child_reset_ec_agg_eph_all(struct ds_pool_child *pool_child);
/** initialize a csummer based on container properties. Will retrieve the
* checksum related properties from IV
*/
Expand Down
133 changes: 82 additions & 51 deletions src/object/srv_ec_aggregate.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright 2020-2023 Intel Corporation.
* (C) Copyright 2020-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -89,6 +89,7 @@ struct ec_agg_par_extent {
struct ec_agg_stripe {
daos_off_t as_stripenum; /* ordinal of stripe, offset/(k*len) */
daos_epoch_t as_hi_epoch; /* highest epoch in stripe */
daos_epoch_t as_lo_epoch; /* lowest epoch in stripe */
d_list_t as_dextents; /* list of stripe's data extents */
daos_off_t as_stripe_fill; /* amount of stripe covered by data */
uint64_t as_offset; /* start offset in stripe */
Expand All @@ -114,6 +115,7 @@ struct ec_agg_entry {
struct pl_obj_layout *ae_obj_layout;
struct daos_shard_loc ae_peer_pshards[OBJ_EC_MAX_P];
uint32_t ae_grp_idx;
uint32_t ae_is_leader:1;
};

/* Parameters used to drive iterate all.
Expand All @@ -123,13 +125,13 @@ struct ec_agg_param {
struct ec_agg_entry ap_agg_entry; /* entry used for each OID */
daos_epoch_range_t ap_epr; /* hi/lo extent threshold */
daos_epoch_t ap_filter_eph; /* Aggregatable filter epoch */
daos_epoch_t ap_min_unagg_eph; /* minimum unaggregate epoch */
daos_handle_t ap_cont_handle; /* VOS container handle */
int (*ap_yield_func)(void *arg); /* yield function*/
void *ap_yield_arg; /* yield argument */
uint32_t ap_credits_max; /* # of tight loops to yield */
uint32_t ap_credits; /* # of tight loops */
uint32_t ap_initialized:1, /* initialized flag */
ap_obj_skipped:1; /* skipped obj during aggregation */
uint32_t ap_initialized:1; /* initialized flag */
};

/* Struct used to drive offloaded stripe update.
Expand Down Expand Up @@ -324,6 +326,7 @@ agg_clear_extents(struct ec_agg_entry *entry)
D_ASSERT(entry->ae_cur_stripe.as_extent_cnt == 0);
}
entry->ae_cur_stripe.as_hi_epoch = 0UL;
entry->ae_cur_stripe.as_lo_epoch = 0UL;
entry->ae_cur_stripe.as_stripe_fill = 0;
entry->ae_cur_stripe.as_has_holes = carry_is_hole ? true : false;
}
Expand Down Expand Up @@ -1858,7 +1861,13 @@ agg_process_stripe(struct ec_agg_param *agg_param, struct ec_agg_entry *entry)
* and all replica extents are newer than parity.
*/
if (ec_age_stripe_full(entry, ec_age_with_parity(entry))) {
rc = agg_encode_local_parity(entry);
if (entry->ae_is_leader) {
rc = agg_encode_local_parity(entry);
} else {
update_vos = false;
agg_param->ap_min_unagg_eph = min(agg_param->ap_min_unagg_eph,
entry->ae_cur_stripe.as_lo_epoch);
}
goto out;
}

Expand All @@ -1868,6 +1877,13 @@ agg_process_stripe(struct ec_agg_param *agg_param, struct ec_agg_entry *entry)
goto out;
}

if (!entry->ae_is_leader) {
update_vos = false;
agg_param->ap_min_unagg_eph = min(agg_param->ap_min_unagg_eph,
entry->ae_cur_stripe.as_lo_epoch);
goto out;
}

/* With parity and some newer partial replicas, possibly holes */
if (ec_age_with_hole(entry))
process_holes = true;
Expand Down Expand Up @@ -1951,13 +1967,19 @@ agg_extent_add(struct ec_agg_entry *agg_entry, vos_iter_entry_t *entry,
agg_in_stripe(agg_entry, recx);
}

if (agg_entry->ae_cur_stripe.as_lo_epoch == 0 ||
extent->ae_epoch < agg_entry->ae_cur_stripe.as_lo_epoch)
agg_entry->ae_cur_stripe.as_lo_epoch = extent->ae_epoch;

if (extent->ae_epoch > agg_entry->ae_cur_stripe.as_hi_epoch)
agg_entry->ae_cur_stripe.as_hi_epoch = extent->ae_epoch;

D_DEBUG(DB_TRACE, "adding extent "DF_RECX", to stripe %lu, shard: %u\n",
D_DEBUG(DB_TRACE, "adding extent "DF_RECX", to stripe %lu, shard: %u"
"max/min "DF_X64"/"DF_X64"\n",
DP_RECX(extent->ae_recx),
agg_stripenum(agg_entry, extent->ae_recx.rx_idx),
agg_entry->ae_oid.id_shard);
agg_entry->ae_oid.id_shard, agg_entry->ae_cur_stripe.as_hi_epoch,
agg_entry->ae_cur_stripe.as_lo_epoch);
out:
return rc;
}
Expand All @@ -1973,9 +1995,9 @@ agg_data_extent(struct ec_agg_param *agg_param, vos_iter_entry_t *entry,

D_ASSERT(!(entry->ie_recx.rx_idx & PARITY_INDICATOR));

D_DEBUG(DB_IO, DF_UOID" get recx "DF_RECX", %u\n",
D_DEBUG(DB_IO, DF_UOID" get recx "DF_RECX", "DF_X64"/%u leader %s\n",
DP_UOID(agg_entry->ae_oid), DP_RECX(entry->ie_recx),
entry->ie_minor_epc);
entry->ie_epoch, entry->ie_minor_epc, agg_entry->ae_is_leader ? "yes" : "no");

while (offset < end) {
daos_off_t this_stripenum;
Expand Down Expand Up @@ -2038,6 +2060,7 @@ agg_akey_post(daos_handle_t ih, struct ec_agg_param *agg_param,

agg_entry->ae_cur_stripe.as_stripenum = 0UL;
agg_entry->ae_cur_stripe.as_hi_epoch = 0UL;
agg_entry->ae_cur_stripe.as_lo_epoch = 0UL;
agg_entry->ae_cur_stripe.as_stripe_fill = 0UL;
agg_entry->ae_cur_stripe.as_offset = 0U;
}
Expand Down Expand Up @@ -2073,39 +2096,57 @@ agg_reset_pos(vos_iter_type_t type, struct ec_agg_entry *agg_entry)
}
}

static int
agg_shard_is_leader(struct ds_pool *pool, struct ec_agg_entry *agg_entry)
static bool
agg_shard_is_parity(struct ds_pool *pool, struct ec_agg_entry *agg_entry)
{
struct pl_obj_shard *shard;
struct daos_oclass_attr *oca;
uint32_t grp_idx;
uint32_t grp_start;
uint32_t ec_tgt_idx;
int shard_idx;
int rc;
uint32_t min_fseq = -1;
int leader_shard = -1;
int i;

oca = &agg_entry->ae_oca;
if (is_ec_data_shard_by_layout_ver(agg_entry->ae_oid.id_layout_ver,
agg_entry->ae_dkey_hash, oca,
agg_entry->ae_oid.id_shard)) {
agg_entry->ae_is_leader = 0;
return false;
}

grp_idx = agg_entry->ae_oid.id_shard / daos_oclass_grp_size(oca);
grp_start = grp_idx * daos_oclass_grp_size(oca);
ec_tgt_idx = obj_ec_shard_idx_by_layout_ver(agg_entry->ae_oid.id_layout_ver,
agg_entry->ae_dkey_hash, oca,
daos_oclass_grp_size(oca) - 1);
/**
* FIXME: only the last parity shard can be the EC agg leader. What about
* Degraded mode?
*/
if (agg_entry->ae_oid.id_shard != ec_tgt_idx + grp_start)
return 0;
grp_start = grp_idx * agg_entry->ae_obj_layout->ol_grp_size;
for (i = 0; i < obj_ec_parity_tgt_nr(oca); i++) {
uint32_t ec_tgt_idx;
uint32_t shard_idx;
struct pl_obj_shard *shard;

ec_tgt_idx = obj_ec_shard_idx_by_layout_ver(agg_entry->ae_oid.id_layout_ver,
agg_entry->ae_dkey_hash, oca,
daos_oclass_grp_size(oca) - i - 1);

shard_idx = grp_start + ec_tgt_idx;
shard = pl_obj_get_shard(agg_entry->ae_obj_layout, shard_idx);

/* If last parity unavailable, then skip the object via returning -DER_STALE. */
shard_idx = grp_idx * agg_entry->ae_obj_layout->ol_grp_size + ec_tgt_idx;
shard = pl_obj_get_shard(agg_entry->ae_obj_layout, shard_idx);
if (shard->po_target != -1 && shard->po_shard != -1 && !shard->po_rebuilding)
rc = (agg_entry->ae_oid.id_shard == shard->po_shard) ? 1 : 0;
if (shard->po_target == -1 || shard->po_shard == -1 || shard->po_rebuilding)
continue;

if (min_fseq == -1 || min_fseq > shard->po_fseq) {
leader_shard = shard_idx;
min_fseq = shard->po_fseq;
}
}

/* No parity shard is available */
if (leader_shard == -1)
return false;

if (agg_entry->ae_oid.id_shard == leader_shard)
agg_entry->ae_is_leader = 1;
else
rc = -DER_STALE;
agg_entry->ae_is_leader = 0;

return rc;
return true;
}

/* Initializes the struct holding the iteration state (ec_agg_entry). */
Expand All @@ -2129,8 +2170,6 @@ agg_dkey(daos_handle_t ih, vos_iter_entry_t *entry,
struct ec_agg_param *agg_param, struct ec_agg_entry *agg_entry,
unsigned int *acts)
{
int rc;

if (!agg_key_compare(agg_entry->ae_dkey, entry->ie_key)) {
D_DEBUG(DB_EPC, "Skip dkey: "DF_KEY" ec agg on re-probe\n",
DP_KEY(&entry->ie_key));
Expand All @@ -2144,24 +2183,16 @@ agg_dkey(daos_handle_t ih, vos_iter_entry_t *entry,
agg_entry->ae_dkey_hash = obj_dkey2hash(agg_entry->ae_oid.id_pub,
&agg_entry->ae_dkey);
agg_reset_pos(VOS_ITER_AKEY, agg_entry);
rc = agg_shard_is_leader(agg_param->ap_pool_info.api_pool, agg_entry);
if (rc == 1) {
D_DEBUG(DB_EPC, "oid:"DF_UOID":"DF_KEY" ec agg starting\n",
DP_UOID(agg_entry->ae_oid), DP_KEY(&agg_entry->ae_dkey));
if(agg_shard_is_parity(agg_param->ap_pool_info.api_pool, agg_entry)) {
D_DEBUG(DB_EPC, "oid:"DF_UOID":"DF_KEY" ec agg starting leader %s\n",
DP_UOID(agg_entry->ae_oid), DP_KEY(&agg_entry->ae_dkey),
agg_entry->ae_is_leader ? "yes" : "no");
agg_reset_dkey_entry(&agg_param->ap_agg_entry, entry);
rc = 0;
} else {
if (rc < 0) {
D_ERROR("oid:"DF_UOID" ds_pool_check_leader failed "
DF_RC"\n", DP_UOID(entry->ie_oid), DP_RC(rc));
if (rc == -DER_STALE)
agg_param->ap_obj_skipped = 1;
rc = 0;
}
*acts |= VOS_ITER_CB_SKIP;
}

return rc;
return 0;
}

/* Handles akeys returned by the iterator. */
Expand Down Expand Up @@ -2625,7 +2656,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,

agg_reset_entry(&ec_agg_param->ap_agg_entry, NULL, NULL);

ec_agg_param->ap_obj_skipped = 0;
ec_agg_param->ap_min_unagg_eph = DAOS_EPOCH_MAX;
rc = vos_iterate(&iter_param, VOS_ITER_OBJ, true, &anchors,
agg_iterate_pre_cb, agg_iterate_post_cb, ec_agg_param, NULL);

Expand All @@ -2637,8 +2668,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
ec_agg_param->ap_agg_entry.ae_obj_hdl = DAOS_HDL_INVAL;
}

if (ec_agg_param->ap_obj_skipped && !cont->sc_stopping) {
D_DEBUG(DB_EPC, "with skipped obj during aggregation.\n");
if (cont->sc_pool->spc_pool->sp_rebuilding > 0 && !cont->sc_stopping) {
/* There is rebuild going on, and we can't proceed EC aggregate boundary,
* Let's wait for 5 seconds for another EC aggregation.
*/
Expand All @@ -2649,7 +2679,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
vos_aggregate_exit(cont->sc_hdl);

update_hae:
if (rc == 0 && ec_agg_param->ap_obj_skipped == 0) {
if (rc == 0) {
cont->sc_ec_agg_eph = max(cont->sc_ec_agg_eph, epr->epr_hi);
if (!cont->sc_stopping && cont->sc_ec_query_agg_eph) {
uint64_t orig, cur;
Expand All @@ -2662,7 +2692,8 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
DP_CONT(cont->sc_pool_uuid, cont->sc_uuid),
orig, cur, cur - orig);

*cont->sc_ec_query_agg_eph = cont->sc_ec_agg_eph;
*cont->sc_ec_query_agg_eph = min(ec_agg_param->ap_min_unagg_eph,
cont->sc_ec_agg_eph);
}
}

Expand Down
7 changes: 6 additions & 1 deletion src/object/srv_obj_migrate.c
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,12 @@ __migrate_fetch_update_parity(struct migrate_one *mrone, daos_handle_t oh,

offset = iods[i].iod_recxs[0].rx_idx;
size = iods[i].iod_recxs[0].rx_nr;
parity_eph = ephs[i][0];
/* Use stable epoch for partial parity update to make sure
* these partial updates are not below stable epoch boundary,
* otherwise both EC and VOS aggregation might operate on
* the same recxs.
*/
parity_eph = encode ? ephs[i][0] : mrone->mo_epoch;
tmp_iod = iods[i];
ptr = iov[i].iov_buf;
for (j = 1; j < iods[i].iod_nr; j++) {
Expand Down
1 change: 1 addition & 0 deletions src/pool/srv_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -1614,6 +1614,7 @@ update_child_map(void *data)
return 0;
}

ds_cont_child_reset_ec_agg_eph_all(child);
child->spc_map_version = pool->sp_map_version;
ds_pool_child_put(child);
return 0;
Expand Down
2 changes: 1 addition & 1 deletion src/tests/ftest/daos_test/suite.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ timeouts:
test_daos_extend_simple: 3600
test_daos_oid_allocator: 640
test_daos_checksum: 500
test_daos_rebuild_ec: 6400
test_daos_rebuild_ec: 7200
test_daos_aggregate_ec: 200
test_daos_degraded_ec: 1900
test_daos_dedup: 220
Expand Down
7 changes: 5 additions & 2 deletions src/tests/suite/daos_rebuild_ec.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright 2016-2023 Intel Corporation.
* (C) Copyright 2016-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -1304,6 +1304,9 @@ rebuild_ec_parity_overwrite_fail_parity_internal(void **state, int *kill_shards,
parity_rank = get_rank_by_oid_shard(arg, oid, shard_idx);
rebuild_single_pool_rank(arg, parity_rank, true);

print_message("sleep 60 seconds for aggregation\n");
sleep(60);

/* fail data shard */
for (i = 0; i < nr; i++) {
shard_idx = (dkey_hash % 6 + kill_shards[i]) % 6;
Expand Down Expand Up @@ -1487,7 +1490,7 @@ static const struct CMUnitTest rebuild_tests[] = {
{"REBUILD46: fail parity shard and data shards after overwrite",
rebuild_ec_overwrite_fail_parity_data, rebuild_ec_8nodes_setup,
test_teardown},
{"REBUILD46: fail parity shard and data shards after overwrite with aggregation",
{"REBUILD47: fail parity shard and data shards after overwrite with aggregation",
rebuild_ec_overwrite_fail_parity_data_with_parity, rebuild_ec_8nodes_setup,
test_teardown},
};
Expand Down