1
1
/**
2
- * (C) Copyright 2020-2023 Intel Corporation.
2
+ * (C) Copyright 2020-2024 Intel Corporation.
3
3
*
4
4
* SPDX-License-Identifier: BSD-2-Clause-Patent
5
5
*/
@@ -89,6 +89,7 @@ struct ec_agg_par_extent {
89
89
struct ec_agg_stripe {
90
90
daos_off_t as_stripenum ; /* ordinal of stripe, offset/(k*len) */
91
91
daos_epoch_t as_hi_epoch ; /* highest epoch in stripe */
92
+ daos_epoch_t as_lo_epoch ; /* lowest epoch in stripe */
92
93
d_list_t as_dextents ; /* list of stripe's data extents */
93
94
daos_off_t as_stripe_fill ; /* amount of stripe covered by data */
94
95
uint64_t as_offset ; /* start offset in stripe */
@@ -114,6 +115,7 @@ struct ec_agg_entry {
114
115
struct pl_obj_layout * ae_obj_layout ;
115
116
struct daos_shard_loc ae_peer_pshards [OBJ_EC_MAX_P ];
116
117
uint32_t ae_grp_idx ;
118
+ uint32_t ae_is_leader :1 ;
117
119
};
118
120
119
121
/* Parameters used to drive iterate all.
@@ -123,13 +125,13 @@ struct ec_agg_param {
123
125
struct ec_agg_entry ap_agg_entry ; /* entry used for each OID */
124
126
daos_epoch_range_t ap_epr ; /* hi/lo extent threshold */
125
127
daos_epoch_t ap_filter_eph ; /* Aggregatable filter epoch */
128
+ daos_epoch_t ap_min_unagg_eph ; /* minimum unaggregate epoch */
126
129
daos_handle_t ap_cont_handle ; /* VOS container handle */
127
130
int (* ap_yield_func )(void * arg ); /* yield function*/
128
131
void * ap_yield_arg ; /* yield argument */
129
132
uint32_t ap_credits_max ; /* # of tight loops to yield */
130
133
uint32_t ap_credits ; /* # of tight loops */
131
- uint32_t ap_initialized :1 , /* initialized flag */
132
- ap_obj_skipped :1 ; /* skipped obj during aggregation */
134
+ uint32_t ap_initialized :1 ; /* initialized flag */
133
135
};
134
136
135
137
/* Struct used to drive offloaded stripe update.
@@ -324,6 +326,7 @@ agg_clear_extents(struct ec_agg_entry *entry)
324
326
D_ASSERT (entry -> ae_cur_stripe .as_extent_cnt == 0 );
325
327
}
326
328
entry -> ae_cur_stripe .as_hi_epoch = 0UL ;
329
+ entry -> ae_cur_stripe .as_lo_epoch = 0UL ;
327
330
entry -> ae_cur_stripe .as_stripe_fill = 0 ;
328
331
entry -> ae_cur_stripe .as_has_holes = carry_is_hole ? true : false;
329
332
}
@@ -1835,7 +1838,13 @@ agg_process_stripe(struct ec_agg_param *agg_param, struct ec_agg_entry *entry)
1835
1838
* and all replica extents are newer than parity.
1836
1839
*/
1837
1840
if (ec_age_stripe_full (entry , ec_age_with_parity (entry ))) {
1838
- rc = agg_encode_local_parity (entry );
1841
+ if (entry -> ae_is_leader ) {
1842
+ rc = agg_encode_local_parity (entry );
1843
+ } else {
1844
+ update_vos = false;
1845
+ agg_param -> ap_min_unagg_eph = min (agg_param -> ap_min_unagg_eph ,
1846
+ entry -> ae_cur_stripe .as_lo_epoch );
1847
+ }
1839
1848
goto out ;
1840
1849
}
1841
1850
@@ -1845,6 +1854,13 @@ agg_process_stripe(struct ec_agg_param *agg_param, struct ec_agg_entry *entry)
1845
1854
goto out ;
1846
1855
}
1847
1856
1857
+ if (!entry -> ae_is_leader ) {
1858
+ update_vos = false;
1859
+ agg_param -> ap_min_unagg_eph = min (agg_param -> ap_min_unagg_eph ,
1860
+ entry -> ae_cur_stripe .as_lo_epoch );
1861
+ goto out ;
1862
+ }
1863
+
1848
1864
/* With parity and some newer partial replicas, possibly holes */
1849
1865
if (ec_age_with_hole (entry ))
1850
1866
process_holes = true;
@@ -1928,13 +1944,19 @@ agg_extent_add(struct ec_agg_entry *agg_entry, vos_iter_entry_t *entry,
1928
1944
agg_in_stripe (agg_entry , recx );
1929
1945
}
1930
1946
1947
+ if (agg_entry -> ae_cur_stripe .as_lo_epoch == 0 ||
1948
+ extent -> ae_epoch < agg_entry -> ae_cur_stripe .as_lo_epoch )
1949
+ agg_entry -> ae_cur_stripe .as_lo_epoch = extent -> ae_epoch ;
1950
+
1931
1951
if (extent -> ae_epoch > agg_entry -> ae_cur_stripe .as_hi_epoch )
1932
1952
agg_entry -> ae_cur_stripe .as_hi_epoch = extent -> ae_epoch ;
1933
1953
1934
- D_DEBUG (DB_TRACE , "adding extent " DF_RECX ", to stripe %lu, shard: %u\n" ,
1954
+ D_DEBUG (DB_TRACE , "adding extent " DF_RECX ", to stripe %lu, shard: %u"
1955
+ "max/min " DF_X64 "/" DF_X64 "\n" ,
1935
1956
DP_RECX (extent -> ae_recx ),
1936
1957
agg_stripenum (agg_entry , extent -> ae_recx .rx_idx ),
1937
- agg_entry -> ae_oid .id_shard );
1958
+ agg_entry -> ae_oid .id_shard , agg_entry -> ae_cur_stripe .as_hi_epoch ,
1959
+ agg_entry -> ae_cur_stripe .as_lo_epoch );
1938
1960
out :
1939
1961
return rc ;
1940
1962
}
@@ -1950,9 +1972,9 @@ agg_data_extent(struct ec_agg_param *agg_param, vos_iter_entry_t *entry,
1950
1972
1951
1973
D_ASSERT (!(entry -> ie_recx .rx_idx & PARITY_INDICATOR ));
1952
1974
1953
- D_DEBUG (DB_IO , DF_UOID " get recx " DF_RECX ", %u \n" ,
1975
+ D_DEBUG (DB_IO , DF_UOID " get recx " DF_RECX ", " DF_X64 "/%u leader %s \n" ,
1954
1976
DP_UOID (agg_entry -> ae_oid ), DP_RECX (entry -> ie_recx ),
1955
- entry -> ie_minor_epc );
1977
+ entry -> ie_epoch , entry -> ie_minor_epc , agg_entry -> ae_is_leader ? "yes" : "no" );
1956
1978
1957
1979
while (offset < end ) {
1958
1980
daos_off_t this_stripenum ;
@@ -2015,6 +2037,7 @@ agg_akey_post(daos_handle_t ih, struct ec_agg_param *agg_param,
2015
2037
2016
2038
agg_entry -> ae_cur_stripe .as_stripenum = 0UL ;
2017
2039
agg_entry -> ae_cur_stripe .as_hi_epoch = 0UL ;
2040
+ agg_entry -> ae_cur_stripe .as_lo_epoch = 0UL ;
2018
2041
agg_entry -> ae_cur_stripe .as_stripe_fill = 0UL ;
2019
2042
agg_entry -> ae_cur_stripe .as_offset = 0U ;
2020
2043
}
@@ -2050,39 +2073,57 @@ agg_reset_pos(vos_iter_type_t type, struct ec_agg_entry *agg_entry)
2050
2073
}
2051
2074
}
2052
2075
2053
- static int
2054
- agg_shard_is_leader (struct ds_pool * pool , struct ec_agg_entry * agg_entry )
2076
+ static bool
2077
+ agg_shard_is_parity (struct ds_pool * pool , struct ec_agg_entry * agg_entry )
2055
2078
{
2056
- struct pl_obj_shard * shard ;
2057
2079
struct daos_oclass_attr * oca ;
2058
2080
uint32_t grp_idx ;
2059
2081
uint32_t grp_start ;
2060
- uint32_t ec_tgt_idx ;
2061
- int shard_idx ;
2062
- int rc ;
2082
+ uint32_t min_fseq = -1 ;
2083
+ int leader_shard = -1 ;
2084
+ int i ;
2063
2085
2064
2086
oca = & agg_entry -> ae_oca ;
2087
+ if (is_ec_data_shard_by_layout_ver (agg_entry -> ae_oid .id_layout_ver ,
2088
+ agg_entry -> ae_dkey_hash , oca ,
2089
+ agg_entry -> ae_oid .id_shard )) {
2090
+ agg_entry -> ae_is_leader = 0 ;
2091
+ return false;
2092
+ }
2093
+
2065
2094
grp_idx = agg_entry -> ae_oid .id_shard / daos_oclass_grp_size (oca );
2066
- grp_start = grp_idx * daos_oclass_grp_size (oca );
2067
- ec_tgt_idx = obj_ec_shard_idx_by_layout_ver (agg_entry -> ae_oid .id_layout_ver ,
2068
- agg_entry -> ae_dkey_hash , oca ,
2069
- daos_oclass_grp_size (oca ) - 1 );
2070
- /**
2071
- * FIXME: only the last parity shard can be the EC agg leader. What about
2072
- * Degraded mode?
2073
- */
2074
- if (agg_entry -> ae_oid .id_shard != ec_tgt_idx + grp_start )
2075
- return 0 ;
2095
+ grp_start = grp_idx * agg_entry -> ae_obj_layout -> ol_grp_size ;
2096
+ for (i = 0 ; i < obj_ec_parity_tgt_nr (oca ); i ++ ) {
2097
+ uint32_t ec_tgt_idx ;
2098
+ uint32_t shard_idx ;
2099
+ struct pl_obj_shard * shard ;
2100
+
2101
+ ec_tgt_idx = obj_ec_shard_idx_by_layout_ver (agg_entry -> ae_oid .id_layout_ver ,
2102
+ agg_entry -> ae_dkey_hash , oca ,
2103
+ daos_oclass_grp_size (oca ) - i - 1 );
2104
+
2105
+ shard_idx = grp_start + ec_tgt_idx ;
2106
+ shard = pl_obj_get_shard (agg_entry -> ae_obj_layout , shard_idx );
2076
2107
2077
- /* If last parity unavailable, then skip the object via returning -DER_STALE. */
2078
- shard_idx = grp_idx * agg_entry -> ae_obj_layout -> ol_grp_size + ec_tgt_idx ;
2079
- shard = pl_obj_get_shard (agg_entry -> ae_obj_layout , shard_idx );
2080
- if (shard -> po_target != -1 && shard -> po_shard != -1 && !shard -> po_rebuilding )
2081
- rc = (agg_entry -> ae_oid .id_shard == shard -> po_shard ) ? 1 : 0 ;
2108
+ if (shard -> po_target == -1 || shard -> po_shard == -1 || shard -> po_rebuilding )
2109
+ continue ;
2110
+
2111
+ if (min_fseq == -1 || min_fseq > shard -> po_fseq ) {
2112
+ leader_shard = shard_idx ;
2113
+ min_fseq = shard -> po_fseq ;
2114
+ }
2115
+ }
2116
+
2117
+ /* No parity shard is available */
2118
+ if (leader_shard == -1 )
2119
+ return false;
2120
+
2121
+ if (agg_entry -> ae_oid .id_shard == leader_shard )
2122
+ agg_entry -> ae_is_leader = 1 ;
2082
2123
else
2083
- rc = - DER_STALE ;
2124
+ agg_entry -> ae_is_leader = 0 ;
2084
2125
2085
- return rc ;
2126
+ return true ;
2086
2127
}
2087
2128
2088
2129
/* Initializes the struct holding the iteration state (ec_agg_entry). */
@@ -2106,8 +2147,6 @@ agg_dkey(daos_handle_t ih, vos_iter_entry_t *entry,
2106
2147
struct ec_agg_param * agg_param , struct ec_agg_entry * agg_entry ,
2107
2148
unsigned int * acts )
2108
2149
{
2109
- int rc ;
2110
-
2111
2150
if (!agg_key_compare (agg_entry -> ae_dkey , entry -> ie_key )) {
2112
2151
D_DEBUG (DB_EPC , "Skip dkey: " DF_KEY " ec agg on re-probe\n" ,
2113
2152
DP_KEY (& entry -> ie_key ));
@@ -2121,24 +2160,16 @@ agg_dkey(daos_handle_t ih, vos_iter_entry_t *entry,
2121
2160
agg_entry -> ae_dkey_hash = obj_dkey2hash (agg_entry -> ae_oid .id_pub ,
2122
2161
& agg_entry -> ae_dkey );
2123
2162
agg_reset_pos (VOS_ITER_AKEY , agg_entry );
2124
- rc = agg_shard_is_leader ( agg_param -> ap_pool_info .api_pool , agg_entry );
2125
- if ( rc == 1 ) {
2126
- D_DEBUG ( DB_EPC , "oid:" DF_UOID ":" DF_KEY " ec agg starting\n" ,
2127
- DP_UOID ( agg_entry -> ae_oid ), DP_KEY ( & agg_entry -> ae_dkey ) );
2163
+ if ( agg_shard_is_parity ( agg_param -> ap_pool_info .api_pool , agg_entry )) {
2164
+ D_DEBUG ( DB_EPC , "oid:" DF_UOID ":" DF_KEY " ec agg starting leader %s\n" ,
2165
+ DP_UOID ( agg_entry -> ae_oid ), DP_KEY ( & agg_entry -> ae_dkey ) ,
2166
+ agg_entry -> ae_is_leader ? "yes" : "no" );
2128
2167
agg_reset_dkey_entry (& agg_param -> ap_agg_entry , entry );
2129
- rc = 0 ;
2130
2168
} else {
2131
- if (rc < 0 ) {
2132
- D_ERROR ("oid:" DF_UOID " ds_pool_check_leader failed "
2133
- DF_RC "\n" , DP_UOID (entry -> ie_oid ), DP_RC (rc ));
2134
- if (rc == - DER_STALE )
2135
- agg_param -> ap_obj_skipped = 1 ;
2136
- rc = 0 ;
2137
- }
2138
2169
* acts |= VOS_ITER_CB_SKIP ;
2139
2170
}
2140
2171
2141
- return rc ;
2172
+ return 0 ;
2142
2173
}
2143
2174
2144
2175
/* Handles akeys returned by the iterator. */
@@ -2599,7 +2630,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
2599
2630
2600
2631
agg_reset_entry (& ec_agg_param -> ap_agg_entry , NULL , NULL );
2601
2632
2602
- ec_agg_param -> ap_obj_skipped = 0 ;
2633
+ ec_agg_param -> ap_min_unagg_eph = DAOS_EPOCH_MAX ;
2603
2634
rc = vos_iterate (& iter_param , VOS_ITER_OBJ , true, & anchors ,
2604
2635
agg_iterate_pre_cb , agg_iterate_post_cb , ec_agg_param , NULL );
2605
2636
@@ -2611,8 +2642,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
2611
2642
ec_agg_param -> ap_agg_entry .ae_obj_hdl = DAOS_HDL_INVAL ;
2612
2643
}
2613
2644
2614
- if (ec_agg_param -> ap_obj_skipped && !cont -> sc_stopping ) {
2615
- D_DEBUG (DB_EPC , "with skipped obj during aggregation.\n" );
2645
+ if (cont -> sc_pool -> spc_pool -> sp_rebuilding > 0 && !cont -> sc_stopping ) {
2616
2646
/* There is rebuild going on, and we can't proceed EC aggregate boundary,
2617
2647
* Let's wait for 5 seconds for another EC aggregation.
2618
2648
*/
@@ -2623,7 +2653,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
2623
2653
vos_aggregate_exit (cont -> sc_hdl );
2624
2654
2625
2655
update_hae :
2626
- if (rc == 0 && ec_agg_param -> ap_obj_skipped == 0 ) {
2656
+ if (rc == 0 ) {
2627
2657
cont -> sc_ec_agg_eph = max (cont -> sc_ec_agg_eph , epr -> epr_hi );
2628
2658
if (!cont -> sc_stopping && cont -> sc_ec_query_agg_eph ) {
2629
2659
uint64_t orig , cur ;
@@ -2636,7 +2666,8 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
2636
2666
DP_CONT (cont -> sc_pool_uuid , cont -> sc_uuid ),
2637
2667
orig , cur , cur - orig );
2638
2668
2639
- * cont -> sc_ec_query_agg_eph = cont -> sc_ec_agg_eph ;
2669
+ * cont -> sc_ec_query_agg_eph = min (ec_agg_param -> ap_min_unagg_eph ,
2670
+ cont -> sc_ec_agg_eph );
2640
2671
}
2641
2672
}
2642
2673
0 commit comments