1
1
/**
2
- * (C) Copyright 2020-2023 Intel Corporation.
2
+ * (C) Copyright 2020-2024 Intel Corporation.
3
3
*
4
4
* SPDX-License-Identifier: BSD-2-Clause-Patent
5
5
*/
@@ -89,6 +89,7 @@ struct ec_agg_par_extent {
89
89
struct ec_agg_stripe {
90
90
daos_off_t as_stripenum ; /* ordinal of stripe, offset/(k*len) */
91
91
daos_epoch_t as_hi_epoch ; /* highest epoch in stripe */
92
+ daos_epoch_t as_lo_epoch ; /* lowest epoch in stripe */
92
93
d_list_t as_dextents ; /* list of stripe's data extents */
93
94
daos_off_t as_stripe_fill ; /* amount of stripe covered by data */
94
95
uint64_t as_offset ; /* start offset in stripe */
@@ -114,6 +115,7 @@ struct ec_agg_entry {
114
115
struct pl_obj_layout * ae_obj_layout ;
115
116
struct daos_shard_loc ae_peer_pshards [OBJ_EC_MAX_P ];
116
117
uint32_t ae_grp_idx ;
118
+ uint32_t ae_is_leader :1 ;
117
119
};
118
120
119
121
/* Parameters used to drive iterate all.
@@ -123,13 +125,13 @@ struct ec_agg_param {
123
125
struct ec_agg_entry ap_agg_entry ; /* entry used for each OID */
124
126
daos_epoch_range_t ap_epr ; /* hi/lo extent threshold */
125
127
daos_epoch_t ap_filter_eph ; /* Aggregatable filter epoch */
128
+ daos_epoch_t ap_min_unagg_eph ; /* minimum unaggregate epoch */
126
129
daos_handle_t ap_cont_handle ; /* VOS container handle */
127
130
int (* ap_yield_func )(void * arg ); /* yield function*/
128
131
void * ap_yield_arg ; /* yield argument */
129
132
uint32_t ap_credits_max ; /* # of tight loops to yield */
130
133
uint32_t ap_credits ; /* # of tight loops */
131
- uint32_t ap_initialized :1 , /* initialized flag */
132
- ap_obj_skipped :1 ; /* skipped obj during aggregation */
134
+ uint32_t ap_initialized :1 ; /* initialized flag */
133
135
};
134
136
135
137
/* Struct used to drive offloaded stripe update.
@@ -324,6 +326,7 @@ agg_clear_extents(struct ec_agg_entry *entry)
324
326
D_ASSERT (entry -> ae_cur_stripe .as_extent_cnt == 0 );
325
327
}
326
328
entry -> ae_cur_stripe .as_hi_epoch = 0UL ;
329
+ entry -> ae_cur_stripe .as_lo_epoch = 0UL ;
327
330
entry -> ae_cur_stripe .as_stripe_fill = 0 ;
328
331
entry -> ae_cur_stripe .as_has_holes = carry_is_hole ? true : false;
329
332
}
@@ -1858,7 +1861,13 @@ agg_process_stripe(struct ec_agg_param *agg_param, struct ec_agg_entry *entry)
1858
1861
* and all replica extents are newer than parity.
1859
1862
*/
1860
1863
if (ec_age_stripe_full (entry , ec_age_with_parity (entry ))) {
1861
- rc = agg_encode_local_parity (entry );
1864
+ if (entry -> ae_is_leader ) {
1865
+ rc = agg_encode_local_parity (entry );
1866
+ } else {
1867
+ update_vos = false;
1868
+ agg_param -> ap_min_unagg_eph = min (agg_param -> ap_min_unagg_eph ,
1869
+ entry -> ae_cur_stripe .as_lo_epoch );
1870
+ }
1862
1871
goto out ;
1863
1872
}
1864
1873
@@ -1868,6 +1877,13 @@ agg_process_stripe(struct ec_agg_param *agg_param, struct ec_agg_entry *entry)
1868
1877
goto out ;
1869
1878
}
1870
1879
1880
+ if (!entry -> ae_is_leader ) {
1881
+ update_vos = false;
1882
+ agg_param -> ap_min_unagg_eph = min (agg_param -> ap_min_unagg_eph ,
1883
+ entry -> ae_cur_stripe .as_lo_epoch );
1884
+ goto out ;
1885
+ }
1886
+
1871
1887
/* With parity and some newer partial replicas, possibly holes */
1872
1888
if (ec_age_with_hole (entry ))
1873
1889
process_holes = true;
@@ -1951,13 +1967,19 @@ agg_extent_add(struct ec_agg_entry *agg_entry, vos_iter_entry_t *entry,
1951
1967
agg_in_stripe (agg_entry , recx );
1952
1968
}
1953
1969
1970
+ if (agg_entry -> ae_cur_stripe .as_lo_epoch == 0 ||
1971
+ extent -> ae_epoch < agg_entry -> ae_cur_stripe .as_lo_epoch )
1972
+ agg_entry -> ae_cur_stripe .as_lo_epoch = extent -> ae_epoch ;
1973
+
1954
1974
if (extent -> ae_epoch > agg_entry -> ae_cur_stripe .as_hi_epoch )
1955
1975
agg_entry -> ae_cur_stripe .as_hi_epoch = extent -> ae_epoch ;
1956
1976
1957
- D_DEBUG (DB_TRACE , "adding extent " DF_RECX ", to stripe %lu, shard: %u\n" ,
1977
+ D_DEBUG (DB_TRACE , "adding extent " DF_RECX ", to stripe %lu, shard: %u"
1978
+ "max/min " DF_X64 "/" DF_X64 "\n" ,
1958
1979
DP_RECX (extent -> ae_recx ),
1959
1980
agg_stripenum (agg_entry , extent -> ae_recx .rx_idx ),
1960
- agg_entry -> ae_oid .id_shard );
1981
+ agg_entry -> ae_oid .id_shard , agg_entry -> ae_cur_stripe .as_hi_epoch ,
1982
+ agg_entry -> ae_cur_stripe .as_lo_epoch );
1961
1983
out :
1962
1984
return rc ;
1963
1985
}
@@ -1973,9 +1995,9 @@ agg_data_extent(struct ec_agg_param *agg_param, vos_iter_entry_t *entry,
1973
1995
1974
1996
D_ASSERT (!(entry -> ie_recx .rx_idx & PARITY_INDICATOR ));
1975
1997
1976
- D_DEBUG (DB_IO , DF_UOID " get recx " DF_RECX ", %u \n" ,
1998
+ D_DEBUG (DB_IO , DF_UOID " get recx " DF_RECX ", " DF_X64 "/%u leader %s \n" ,
1977
1999
DP_UOID (agg_entry -> ae_oid ), DP_RECX (entry -> ie_recx ),
1978
- entry -> ie_minor_epc );
2000
+ entry -> ie_epoch , entry -> ie_minor_epc , agg_entry -> ae_is_leader ? "yes" : "no" );
1979
2001
1980
2002
while (offset < end ) {
1981
2003
daos_off_t this_stripenum ;
@@ -2038,6 +2060,7 @@ agg_akey_post(daos_handle_t ih, struct ec_agg_param *agg_param,
2038
2060
2039
2061
agg_entry -> ae_cur_stripe .as_stripenum = 0UL ;
2040
2062
agg_entry -> ae_cur_stripe .as_hi_epoch = 0UL ;
2063
+ agg_entry -> ae_cur_stripe .as_lo_epoch = 0UL ;
2041
2064
agg_entry -> ae_cur_stripe .as_stripe_fill = 0UL ;
2042
2065
agg_entry -> ae_cur_stripe .as_offset = 0U ;
2043
2066
}
@@ -2073,39 +2096,57 @@ agg_reset_pos(vos_iter_type_t type, struct ec_agg_entry *agg_entry)
2073
2096
}
2074
2097
}
2075
2098
2076
- static int
2077
- agg_shard_is_leader (struct ds_pool * pool , struct ec_agg_entry * agg_entry )
2099
+ static bool
2100
+ agg_shard_is_parity (struct ds_pool * pool , struct ec_agg_entry * agg_entry )
2078
2101
{
2079
- struct pl_obj_shard * shard ;
2080
2102
struct daos_oclass_attr * oca ;
2081
2103
uint32_t grp_idx ;
2082
2104
uint32_t grp_start ;
2083
- uint32_t ec_tgt_idx ;
2084
- int shard_idx ;
2085
- int rc ;
2105
+ uint32_t min_fseq = -1 ;
2106
+ int leader_shard = -1 ;
2107
+ int i ;
2086
2108
2087
2109
oca = & agg_entry -> ae_oca ;
2110
+ if (is_ec_data_shard_by_layout_ver (agg_entry -> ae_oid .id_layout_ver ,
2111
+ agg_entry -> ae_dkey_hash , oca ,
2112
+ agg_entry -> ae_oid .id_shard )) {
2113
+ agg_entry -> ae_is_leader = 0 ;
2114
+ return false;
2115
+ }
2116
+
2088
2117
grp_idx = agg_entry -> ae_oid .id_shard / daos_oclass_grp_size (oca );
2089
- grp_start = grp_idx * daos_oclass_grp_size (oca );
2090
- ec_tgt_idx = obj_ec_shard_idx_by_layout_ver (agg_entry -> ae_oid .id_layout_ver ,
2091
- agg_entry -> ae_dkey_hash , oca ,
2092
- daos_oclass_grp_size (oca ) - 1 );
2093
- /**
2094
- * FIXME: only the last parity shard can be the EC agg leader. What about
2095
- * Degraded mode?
2096
- */
2097
- if (agg_entry -> ae_oid .id_shard != ec_tgt_idx + grp_start )
2098
- return 0 ;
2118
+ grp_start = grp_idx * agg_entry -> ae_obj_layout -> ol_grp_size ;
2119
+ for (i = 0 ; i < obj_ec_parity_tgt_nr (oca ); i ++ ) {
2120
+ uint32_t ec_tgt_idx ;
2121
+ uint32_t shard_idx ;
2122
+ struct pl_obj_shard * shard ;
2123
+
2124
+ ec_tgt_idx = obj_ec_shard_idx_by_layout_ver (agg_entry -> ae_oid .id_layout_ver ,
2125
+ agg_entry -> ae_dkey_hash , oca ,
2126
+ daos_oclass_grp_size (oca ) - i - 1 );
2127
+
2128
+ shard_idx = grp_start + ec_tgt_idx ;
2129
+ shard = pl_obj_get_shard (agg_entry -> ae_obj_layout , shard_idx );
2099
2130
2100
- /* If last parity unavailable, then skip the object via returning -DER_STALE. */
2101
- shard_idx = grp_idx * agg_entry -> ae_obj_layout -> ol_grp_size + ec_tgt_idx ;
2102
- shard = pl_obj_get_shard (agg_entry -> ae_obj_layout , shard_idx );
2103
- if (shard -> po_target != -1 && shard -> po_shard != -1 && !shard -> po_rebuilding )
2104
- rc = (agg_entry -> ae_oid .id_shard == shard -> po_shard ) ? 1 : 0 ;
2131
+ if (shard -> po_target == -1 || shard -> po_shard == -1 || shard -> po_rebuilding )
2132
+ continue ;
2133
+
2134
+ if (min_fseq == -1 || min_fseq > shard -> po_fseq ) {
2135
+ leader_shard = shard_idx ;
2136
+ min_fseq = shard -> po_fseq ;
2137
+ }
2138
+ }
2139
+
2140
+ /* No parity shard is available */
2141
+ if (leader_shard == -1 )
2142
+ return false;
2143
+
2144
+ if (agg_entry -> ae_oid .id_shard == leader_shard )
2145
+ agg_entry -> ae_is_leader = 1 ;
2105
2146
else
2106
- rc = - DER_STALE ;
2147
+ agg_entry -> ae_is_leader = 0 ;
2107
2148
2108
- return rc ;
2149
+ return true ;
2109
2150
}
2110
2151
2111
2152
/* Initializes the struct holding the iteration state (ec_agg_entry). */
@@ -2129,8 +2170,6 @@ agg_dkey(daos_handle_t ih, vos_iter_entry_t *entry,
2129
2170
struct ec_agg_param * agg_param , struct ec_agg_entry * agg_entry ,
2130
2171
unsigned int * acts )
2131
2172
{
2132
- int rc ;
2133
-
2134
2173
if (!agg_key_compare (agg_entry -> ae_dkey , entry -> ie_key )) {
2135
2174
D_DEBUG (DB_EPC , "Skip dkey: " DF_KEY " ec agg on re-probe\n" ,
2136
2175
DP_KEY (& entry -> ie_key ));
@@ -2144,24 +2183,16 @@ agg_dkey(daos_handle_t ih, vos_iter_entry_t *entry,
2144
2183
agg_entry -> ae_dkey_hash = obj_dkey2hash (agg_entry -> ae_oid .id_pub ,
2145
2184
& agg_entry -> ae_dkey );
2146
2185
agg_reset_pos (VOS_ITER_AKEY , agg_entry );
2147
- rc = agg_shard_is_leader ( agg_param -> ap_pool_info .api_pool , agg_entry );
2148
- if ( rc == 1 ) {
2149
- D_DEBUG ( DB_EPC , "oid:" DF_UOID ":" DF_KEY " ec agg starting\n" ,
2150
- DP_UOID ( agg_entry -> ae_oid ), DP_KEY ( & agg_entry -> ae_dkey ) );
2186
+ if ( agg_shard_is_parity ( agg_param -> ap_pool_info .api_pool , agg_entry )) {
2187
+ D_DEBUG ( DB_EPC , "oid:" DF_UOID ":" DF_KEY " ec agg starting leader %s\n" ,
2188
+ DP_UOID ( agg_entry -> ae_oid ), DP_KEY ( & agg_entry -> ae_dkey ) ,
2189
+ agg_entry -> ae_is_leader ? "yes" : "no" );
2151
2190
agg_reset_dkey_entry (& agg_param -> ap_agg_entry , entry );
2152
- rc = 0 ;
2153
2191
} else {
2154
- if (rc < 0 ) {
2155
- D_ERROR ("oid:" DF_UOID " ds_pool_check_leader failed "
2156
- DF_RC "\n" , DP_UOID (entry -> ie_oid ), DP_RC (rc ));
2157
- if (rc == - DER_STALE )
2158
- agg_param -> ap_obj_skipped = 1 ;
2159
- rc = 0 ;
2160
- }
2161
2192
* acts |= VOS_ITER_CB_SKIP ;
2162
2193
}
2163
2194
2164
- return rc ;
2195
+ return 0 ;
2165
2196
}
2166
2197
2167
2198
/* Handles akeys returned by the iterator. */
@@ -2625,7 +2656,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
2625
2656
2626
2657
agg_reset_entry (& ec_agg_param -> ap_agg_entry , NULL , NULL );
2627
2658
2628
- ec_agg_param -> ap_obj_skipped = 0 ;
2659
+ ec_agg_param -> ap_min_unagg_eph = DAOS_EPOCH_MAX ;
2629
2660
rc = vos_iterate (& iter_param , VOS_ITER_OBJ , true, & anchors ,
2630
2661
agg_iterate_pre_cb , agg_iterate_post_cb , ec_agg_param , NULL );
2631
2662
@@ -2637,8 +2668,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
2637
2668
ec_agg_param -> ap_agg_entry .ae_obj_hdl = DAOS_HDL_INVAL ;
2638
2669
}
2639
2670
2640
- if (ec_agg_param -> ap_obj_skipped && !cont -> sc_stopping ) {
2641
- D_DEBUG (DB_EPC , "with skipped obj during aggregation.\n" );
2671
+ if (cont -> sc_pool -> spc_pool -> sp_rebuilding > 0 && !cont -> sc_stopping ) {
2642
2672
/* There is rebuild going on, and we can't proceed EC aggregate boundary,
2643
2673
* Let's wait for 5 seconds for another EC aggregation.
2644
2674
*/
@@ -2649,7 +2679,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
2649
2679
vos_aggregate_exit (cont -> sc_hdl );
2650
2680
2651
2681
update_hae :
2652
- if (rc == 0 && ec_agg_param -> ap_obj_skipped == 0 ) {
2682
+ if (rc == 0 ) {
2653
2683
cont -> sc_ec_agg_eph = max (cont -> sc_ec_agg_eph , epr -> epr_hi );
2654
2684
if (!cont -> sc_stopping && cont -> sc_ec_query_agg_eph ) {
2655
2685
uint64_t orig , cur ;
@@ -2662,7 +2692,8 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
2662
2692
DP_CONT (cont -> sc_pool_uuid , cont -> sc_uuid ),
2663
2693
orig , cur , cur - orig );
2664
2694
2665
- * cont -> sc_ec_query_agg_eph = cont -> sc_ec_agg_eph ;
2695
+ * cont -> sc_ec_query_agg_eph = min (ec_agg_param -> ap_min_unagg_eph ,
2696
+ cont -> sc_ec_agg_eph );
2666
2697
}
2667
2698
}
2668
2699
0 commit comments