From c00fd3682835444c74930baa7f1a2f97b762c249 Mon Sep 17 00:00:00 2001 From: "P. Sai Vinay" Date: Fri, 6 Nov 2020 16:45:26 +0530 Subject: [PATCH 1/7] Add percentile to DataFrame and Series --- docs/sphinx/reference/dataframe.rst | 1 + docs/sphinx/reference/series.rst | 1 + eland/dataframe.py | 50 ++++++ eland/operations.py | 144 +++++++++++++++--- eland/query.py | 18 +++ eland/query_compiler.py | 28 ++++ eland/series.py | 39 ++++- tests/dataframe/test_metrics_pytest.py | 56 ++++++- .../test_map_pd_aggs_to_es_aggs_pytest.py | 17 ++- tests/series/test_metrics_pytest.py | 21 ++- 10 files changed, 343 insertions(+), 32 deletions(-) diff --git a/docs/sphinx/reference/dataframe.rst b/docs/sphinx/reference/dataframe.rst index c795f65b..7cab8a61 100644 --- a/docs/sphinx/reference/dataframe.rst +++ b/docs/sphinx/reference/dataframe.rst @@ -99,6 +99,7 @@ Computations / Descriptive Stats DataFrame.sum DataFrame.nunique DataFrame.mode + DataFrame.quantile Reindexing / Selection / Label Manipulation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/docs/sphinx/reference/series.rst b/docs/sphinx/reference/series.rst index b4355fb4..5dd0af74 100644 --- a/docs/sphinx/reference/series.rst +++ b/docs/sphinx/reference/series.rst @@ -80,6 +80,7 @@ Computations / Descriptive Stats Series.nunique Series.value_counts Series.mode + Series.quantile Reindexing / Selection / Label Manipulation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/eland/dataframe.py b/eland/dataframe.py index 20737615..08ab7492 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -1685,6 +1685,56 @@ def mode( return self._query_compiler.mode( numeric_only=numeric_only, dropna=True, is_dataframe=True, es_size=es_size ) + def quantile( + self, + q: Union[float, List[float]] = 0.5, + numeric_only: Optional[bool] = True, + ) -> "pd.DataFrame": + """ + Used to calculate quantile for a given DataFrame. + + PARAMETERS + ---------- + q: + float or array like, default 0.5 + Value between 0 <= q <= 1, the quantile(s) to compute. + numeric_only: {True, False, None} Default is None + Which datatype to be returned + - True: Returns all values as float64, NaN/NaT values are removed + - None: Returns all values as the same dtype where possible, NaN/NaT are removed + - False: Returns all values as the same dtype where possible, NaN/NaT are preserved + + Returns + ------- + pandas.DataFrame + quantile value for each column + + See Also + -------- + :pandas_api_docs:`pandas.DataFrame.quantile` + + Examples + -------- + >>> ed_flights = ed.DataFrame('localhost', 'flights') + >>> ed_flights.quantile() # doctest: +SKIP + AvgTicketPrice 640.387285 + Cancelled 0.000000 + dayOfWeek 3.000000 + Name: 0.5, dtype: float64 + + >>> ed_flights.quantile([.2, .5, .75]) # doctest: +SKIP + AvgTicketPrice Cancelled dayOfWeek + 0.20 361.040768 0.0 1.0 + 0.50 640.387285 0.0 3.0 + 0.75 842.213490 0.0 4.0 + + >>> ed_flights.quantile([.2, .5, .75], numeric_only=False) # doctest: +SKIP + AvgTicketPrice Cancelled dayOfWeek timestamp + 0.20 361.040768 0.0 1.0 2018-01-09 04:37:40.409370361 + 0.50 640.387285 0.0 3.0 2018-01-21 23:59:14.074445068 + 0.75 842.213490 0.0 4.0 2018-02-01 04:54:29.135782959 + """ + return self._query_compiler.quantile(quantiles=q, numeric_only=numeric_only) def query(self, expr) -> "DataFrame": """ diff --git a/eland/operations.py b/eland/operations.py index 98ba5837..089a0a01 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -243,6 +243,7 @@ def _metric_aggs( is_dataframe_agg: bool = False, es_mode_size: Optional[int] = None, dropna: bool = True, + percentiles: Optional[List[float]] = None, ) -> Dict[str, Any]: """ Used to calculate metric aggregations @@ -262,6 +263,8 @@ def _metric_aggs( number of rows to return when multiple mode values are present. dropna: drop NaN/NaT for a dataframe + percentiles: + List of percentiles when 'quantile' agg is called. Otherwise it is None Returns ------- @@ -279,11 +282,18 @@ def _metric_aggs( if numeric_only: # Consider if field is Int/Float/Bool fields = [field for field in fields if (field.is_numeric or field.is_bool)] + elif not numeric_only and (pd_aggs == ["quantile"]): + # quantile doesn't accept text fields + fields = [ + field + for field in fields + if (field.is_numeric or field.is_bool or field.is_timestamp) + ] body = Query(query_params.query) # Convert pandas aggs to ES equivalent - es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs) + es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs, percentiles) for field in fields: for es_agg in es_aggs: @@ -293,11 +303,18 @@ def _metric_aggs( # If we have multiple 'extended_stats' etc. here we simply NOOP on 2nd call if isinstance(es_agg, tuple): - body.metric_aggs( - f"{es_agg[0]}_{field.es_field_name}", - es_agg[0], - field.aggregatable_es_field_name, - ) + if es_agg[0] == "percentiles": + body.percentile_agg( + f"{es_agg[0]}_{field.es_field_name}", + field.es_field_name, + es_agg[1], + ) + else: + body.metric_aggs( + f"{es_agg[0]}_{field.es_field_name}", + es_agg[0], + field.aggregatable_es_field_name, + ) elif es_agg == "mode": # TODO for dropna=False, Check If field is timestamp or boolean or numeric, # then use missing parameter for terms aggregation. @@ -307,6 +324,7 @@ def _metric_aggs( field.aggregatable_es_field_name, es_mode_size, ) + else: body.metric_aggs( f"{es_agg}_{field.es_field_name}", @@ -495,15 +513,20 @@ def _unpack_metric_aggs( pd_aggs: a list of aggs response: - a dict containing response from Elastic Search + a dict containing response from ElasticSearch numeric_only: return either numeric values or NaN/NaT + is_dataframe_agg: + - True then aggregation is called from dataframe + - False then aggregation is called from series Returns ------- a dictionary on which agg caluculations are done. """ results: Dict[str, Any] = {} + percentile_values: List[float] = [] + agg_value: Union[int, float] for field in fields: values = [] @@ -529,13 +552,26 @@ def _unpack_metric_aggs( # Pull multiple values from 'percentiles' result. if es_agg[0] == "percentiles": - agg_value = agg_value["values"] - - agg_value = agg_value[es_agg[1]] + agg_value = agg_value["values"] # Returns dictionary + if pd_agg == "median": + agg_value = agg_value["50.0"] + # Currently Pandas does the same + # If we call quantile it returns the same result as of median. + elif pd_agg == "quantile" and is_dataframe_agg: + agg_value = agg_value["50.0"] + else: + # We have to filter out the `_as_string` from results + # e.g. 'Cancelled_50.0': 0.0, 'Cancelled_50.0_as_string': 'false' + percentile_values = [ + value + for key, value in agg_value.items() + if not key.endswith("as_string") + ] + if not percentile_values and pd_agg not in ("quantile", "median"): + agg_value = agg_value[es_agg[1]] # Need to convert 'Population' stddev and variance - # from Elasticsearch into 'Sample' stddev and variance - # which is what pandas uses. + # from Elasticsearch into 'Sample' stddev and variance which is what pandas uses. if es_agg[1] in ("std_deviation", "variance"): # Neither transformation works with count <=1 count = response["aggregations"][ @@ -590,7 +626,7 @@ def _unpack_metric_aggs( ] # Null usually means there were no results. - if not isinstance(agg_value, list) and ( + if not isinstance(agg_value, (list, dict)) and ( agg_value is None or np.isnan(agg_value) ): if is_dataframe_agg and not numeric_only: @@ -612,12 +648,18 @@ def _unpack_metric_aggs( ) for value in agg_value ] + elif percentile_values: + percentile_values = [ + elasticsearch_date_to_pandas_date(value, field.es_date_format) + for value in percentile_values + ] else: agg_value = elasticsearch_date_to_pandas_date( agg_value, field.es_date_format ) + func = elasticsearch_date_to_pandas_date # If numeric_only is False | None then maintain column datatype - elif not numeric_only: + elif not numeric_only and pd_agg != "quantile": # we're only converting to bool for lossless aggs like min, max, and median. if pd_agg in {"max", "min", "median", "sum", "mode"}: # 'sum' isn't representable with bool, use int64 @@ -626,14 +668,67 @@ def _unpack_metric_aggs( else: agg_value = field.np_dtype.type(agg_value) - values.append(agg_value) + if not percentile_values: + values.append(agg_value) # If numeric_only is True and We only have a NaN type field then we check for empty. if values: results[field.column] = values if len(values) > 1 else values[0] + # This only runs when df.quantile() or series.quantile() is called + if percentile_values and not is_dataframe_agg: + results[f"{field.column}"] = percentile_values return results + def quantile( + self, + query_compiler: "QueryCompiler", + pd_aggs: List[str], + quantiles: Union[float, List[float]], + is_dataframe: bool = True, + numeric_only: bool = True, + ) -> Union[pd.DataFrame, pd.Series]: + # To verify if quantile range falls between 0 to 1 + def verify_quantile_range(quantile: Any) -> float: + if isinstance(quantile, (int, float, str)): + quantile = float(quantile) + if quantile > 1 or quantile < 0: + raise ValueError( + f"quantile should be in range of 0 and 1, given {quantile}" + ) + else: + raise TypeError("quantile should be of type int or float or str") + # quantile * 100 = percentile + return quantile * 100 + + percentiles = list( + map( + verify_quantile_range, + (quantiles if isinstance(quantiles, list) else [quantiles]), + ) + ) + + result = self._metric_aggs( + query_compiler, + pd_aggs=pd_aggs, + percentiles=percentiles, + is_dataframe_agg=False, + numeric_only=numeric_only, + ) + + df = pd.DataFrame( + result, + index=[i / 100 for i in percentiles], + columns=result.keys(), + dtype=(np.float64 if numeric_only else None), + ) + + # Display Output same as pandas does + if isinstance(quantiles, float): + return df.squeeze() if is_dataframe else df.squeeze() + else: + return df if is_dataframe else df.transpose().iloc[0] + def aggs_groupby( self, query_compiler: "QueryCompiler", @@ -821,10 +916,13 @@ def bucket_generator( return composite_buckets["buckets"] @staticmethod - def _map_pd_aggs_to_es_aggs(pd_aggs): + def _map_pd_aggs_to_es_aggs( + pd_aggs: List[str], percentiles: Optional[List[float]] = None + ) -> Union[List[str], List[Tuple[str, List[float]]]]: """ Args: pd_aggs - list of pandas aggs (e.g. ['mad', 'min', 'std'] etc.) + percentiles - list of percentiles for 'quantile' agg Returns: ed_aggs - list of corresponding es_aggs (e.g. ['median_absolute_deviation', 'min', 'std'] etc.) @@ -885,7 +983,14 @@ def _map_pd_aggs_to_es_aggs(pd_aggs): elif pd_agg == "mad": es_aggs.append("median_absolute_deviation") elif pd_agg == "median": - es_aggs.append(("percentiles", "50.0")) + es_aggs.append(("percentiles", [50.0])) + elif pd_agg == "quantile": + # None when 'quantile' is called in df.agg[...] + # Behaves same as median because pandas does the same. + if percentiles is not None: + es_aggs.append(("percentiles", percentiles)) + else: + es_aggs.append(("percentiles", [50.0])) elif pd_agg == "mode": if len(pd_aggs) != 1: @@ -896,9 +1001,6 @@ def _map_pd_aggs_to_es_aggs(pd_aggs): es_aggs.append("mode") # Not implemented - elif pd_agg == "quantile": - # TODO - raise NotImplementedError(pd_agg, " not currently implemented") elif pd_agg == "rank": # TODO raise NotImplementedError(pd_agg, " not currently implemented") @@ -913,7 +1015,7 @@ def _map_pd_aggs_to_es_aggs(pd_aggs): if extended_stats_calls >= 2: es_aggs = [ ("extended_stats", es_agg) - if es_agg in extended_stats_es_aggs + if not isinstance(es_agg, tuple) and (es_agg in extended_stats_es_aggs) else es_agg for es_agg in es_aggs ] diff --git a/eland/query.py b/eland/query.py index 68b89754..ef077a06 100644 --- a/eland/query.py +++ b/eland/query.py @@ -145,6 +145,24 @@ def metric_aggs(self, name: str, func: str, field: str) -> None: agg = {func: {"field": field}} self._aggs[name] = agg + def percentile_agg(self, name: str, field: str, percents: List[float]) -> None: + """ + + Ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-percentile-aggregation.html + + "aggs": { + "percentile_": { + "percentiles": { + "field": "AvgTicketPrice", + "percents": [95, 99, 99.0] + } + } + } + + """ + agg = {"percentiles": {"field": field, "percents": percents}} + self._aggs[name] = agg + def composite_agg_bucket_terms(self, name: str, field: str) -> None: """ Add terms agg for composite aggregation diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 5b41f04e..2ff42be2 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -636,6 +636,34 @@ def mode( is_dataframe=is_dataframe, es_size=es_size, ) + def quantile( + self, + quantiles: Union[float, List[float]], + numeric_only: bool = True, + is_dataframe: bool = True, + ) -> Union[pd.DataFrame, pd.Series, Any]: + """ + Holds quantile object for both DataFrame and Series + + PARAMETERS + ---------- + quantiles: + list of quantiles for computation + numeric_only: + Flag used to filter numeric columns + is_dataframe: + To identify if quantile is called from Series or DataFrame + True: Called from DataFrame + False: Called from Series + + """ + return self._operations.quantile( + self, + pd_aggs=["quantile"], + quantiles=quantiles, + numeric_only=numeric_only, + is_dataframe=is_dataframe, + ) def aggs_groupby( self, diff --git a/eland/series.py b/eland/series.py index 7cdbdcf8..b3c32bf8 100644 --- a/eland/series.py +++ b/eland/series.py @@ -35,7 +35,7 @@ import warnings from collections.abc import Collection from io import StringIO -from typing import TYPE_CHECKING, Any, Optional, Sequence, Tuple, Union +from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple, Union import numpy as np import pandas as pd @@ -565,6 +565,43 @@ def notna(self) -> BooleanFilter: notnull = notna + def quantile(self, q: Union[float, List[float]] = 0.5) -> Union[pd.Series, Any]: + """ + Used to calculate quantile for a given Series. + + PARAMETERS + ---------- + q: + float or array like, default 0.5 + Value between 0 <= q <= 1, the quantile(s) to compute. + + Returns + ------- + pandas.Series or any single dtype + + See Also + -------- + :pandas_api_docs:`pandas.Series.quantile` + + Examples + -------- + >>> ed_flights = ed.DataFrame('localhost', 'flights') + >>> ed_flights["timestamp"].quantile([.2,.5,.75]) + 0.20 2018-01-09 04:30:57.289159912 + 0.50 2018-01-21 23:39:27.031627441 + 0.75 2018-02-01 04:54:59.256136963 + Name: timestamp, dtype: datetime64[ns] + + >>> ed_flights["dayOfWeek"].quantile() + 3.0 + + >>> ed_flights["timestamp"].quantile() + Timestamp('2018-01-22 00:12:48.844534180') + """ + return self._query_compiler.quantile( + quantiles=q, numeric_only=None, is_dataframe=False + ) + @property def ndim(self) -> int: """ diff --git a/tests/dataframe/test_metrics_pytest.py b/tests/dataframe/test_metrics_pytest.py index 7a907dd6..d2664676 100644 --- a/tests/dataframe/test_metrics_pytest.py +++ b/tests/dataframe/test_metrics_pytest.py @@ -217,7 +217,8 @@ def test_flights_datetime_metrics_agg_func(self, agg): assert ed_metric.dtype == np.dtype("datetime64[ns]") assert_almost_equal(ed_metric[0], expected_values[agg]) - def test_flights_datetime_metrics_median(self): + @pytest.mark.parametrize("agg", ["median", "quantile"]) + def test_flights_datetime_metrics_median_quantile(self, agg): ed_df = self.ed_flights_small()[["timestamp"]] median = ed_df.median(numeric_only=False)[0] @@ -228,11 +229,11 @@ def test_flights_datetime_metrics_median(self): <= pd.to_datetime("2018-01-01 12:00:00.000") ) - median = ed_df.agg(["mean"])["timestamp"][0] - assert isinstance(median, pd.Timestamp) + agg_value = ed_df.agg([agg])["timestamp"][0] + assert isinstance(agg_value, pd.Timestamp) assert ( pd.to_datetime("2018-01-01 10:00:00.000") - <= median + <= agg_value <= pd.to_datetime("2018-01-01 12:00:00.000") ) @@ -446,3 +447,50 @@ def test_aggs_mode(self, es_size, numeric_only): assert_frame_equal( pd_mode, ed_mode, check_dtype=(False if es_size == 1 else True) ) + + @pytest.mark.parametrize("quantiles", [[0.2, 0.5], [0, 1]]) + @pytest.mark.parametrize("numeric_only", [False, None]) + def test_flights_quantile(self, quantiles, numeric_only): + pd_flights = self.pd_flights() + ed_flights = self.ed_flights() + + pd_quantile = pd_flights.filter(self.filter_data[0:3]).quantile( + q=quantiles, numeric_only=numeric_only + ) + ed_quantile = ed_flights.filter(self.filter_data[0:3]).quantile( + q=quantiles, numeric_only=numeric_only + ) + + assert_frame_equal(pd_quantile, ed_quantile, check_exact=False, rtol=2) + + pd_quantile = pd_flights[["timestamp"]].quantile( + q=quantiles, numeric_only=numeric_only + ) + ed_quantile = ed_flights[["timestamp"]].quantile( + q=quantiles, numeric_only=numeric_only + ) + + pd_timestamp = pd.to_numeric(pd_quantile.squeeze(), downcast="float") + ed_timestamp = pd.to_numeric(ed_quantile.squeeze(), downcast="float") + + assert_series_equal(pd_timestamp, ed_timestamp, check_exact=False, rtol=2) + + @pytest.mark.parametrize("quantiles", [5, [2, 1], -1.5, [1.2, 0.2]]) + def test_flights_quantile_error(self, quantiles): + ed_flights = self.ed_flights().filter(self.filter_data) + + match = f"quantile should be in range of 0 and 1, given {quantiles[0] if isinstance(quantiles, list) else quantiles}" + with pytest.raises(ValueError, match=match): + ed_flights[["timestamp"]].quantile(q=quantiles) + + @pytest.mark.parametrize("numeric_only", [True, False, None]) + def test_flights_agg_quantile(self, numeric_only): + pd_flights = self.pd_flights().filter(self.filter_data[0:3]) + ed_flights = self.ed_flights().filter(self.filter_data[0:3]) + + pd_quantile = pd_flights.agg(["quantile", "min"], numeric_only=numeric_only) + ed_quantile = ed_flights.agg(["quantile", "min"], numeric_only=numeric_only) + + assert_frame_equal( + pd_quantile, ed_quantile, check_exact=False, rtol=4, check_dtype=False + ) diff --git a/tests/operations/test_map_pd_aggs_to_es_aggs_pytest.py b/tests/operations/test_map_pd_aggs_to_es_aggs_pytest.py index 2ec78820..d7d78c79 100644 --- a/tests/operations/test_map_pd_aggs_to_es_aggs_pytest.py +++ b/tests/operations/test_map_pd_aggs_to_es_aggs_pytest.py @@ -20,7 +20,19 @@ def test_all_aggs(): es_aggs = Operations._map_pd_aggs_to_es_aggs( - ["min", "max", "mean", "std", "var", "mad", "count", "nunique", "median"] + [ + "min", + "max", + "mean", + "std", + "var", + "mad", + "count", + "nunique", + "median", + "quantile", + ], + percentiles=[0.2, 0.5, 0.8], ) assert es_aggs == [ @@ -32,7 +44,8 @@ def test_all_aggs(): "median_absolute_deviation", "value_count", "cardinality", - ("percentiles", "50.0"), + ("percentiles", [50.0]), + ("percentiles", [0.2, 0.5, 0.8]), ] diff --git a/tests/series/test_metrics_pytest.py b/tests/series/test_metrics_pytest.py index c471d97e..60b29ae7 100644 --- a/tests/series/test_metrics_pytest.py +++ b/tests/series/test_metrics_pytest.py @@ -105,14 +105,15 @@ def test_flights_datetime_metrics_agg(self, agg): assert_almost_equal(ed_metric, expected_values[agg]) - def test_flights_datetime_median_metric(self): + @pytest.mark.parametrize("agg", ["median", "quantile"]) + def test_flights_datetime_median_metric(self, agg): ed_series = self.ed_flights_small()["timestamp"] - median = ed_series.median() - assert isinstance(median, pd.Timestamp) + agg_value = getattr(ed_series, agg)() + assert isinstance(agg_value, pd.Timestamp) assert ( pd.to_datetime("2018-01-01 10:00:00.000") - <= median + <= agg_value <= pd.to_datetime("2018-01-01 12:00:00.000") ) @@ -137,3 +138,15 @@ def test_ecommerce_mode_es_size(self, es_size): ed_mode = ed_series["order_date"].mode(es_size) assert_series_equal(pd_mode, ed_mode) + @pytest.mark.parametrize("quantile_list", [0.2, 0.5, [0.2, 0.5]]) + @pytest.mark.parametrize("column", ["AvgTicketPrice", "Cancelled", "dayOfWeek"]) + def test_flights_quantile(self, column, quantile_list): + pd_flights = self.pd_flights()[column] + ed_flights = self.ed_flights()[column] + + pd_quantile = pd_flights.quantile(quantile_list) + ed_quantile = ed_flights.quantile(quantile_list) + if isinstance(quantile_list, list): + assert_series_equal(pd_quantile, ed_quantile, check_exact=False, rtol=2) + else: + assert pd_quantile * 0.9 <= ed_quantile <= pd_quantile * 1.1 From 96c44d10cd816f97f7f737caafea41752575fc69 Mon Sep 17 00:00:00 2001 From: "P. Sai Vinay" Date: Sat, 7 Nov 2020 21:17:17 +0530 Subject: [PATCH 2/7] Requested changes --- eland/dataframe.py | 6 +-- eland/operations.py | 50 +++++++++---------- eland/query.py | 2 +- eland/query_compiler.py | 6 +-- eland/series.py | 12 +++-- tests/dataframe/test_metrics_pytest.py | 22 ++++---- .../test_map_pd_aggs_to_es_aggs_pytest.py | 6 +++ tests/series/test_metrics_pytest.py | 12 ++++- 8 files changed, 69 insertions(+), 47 deletions(-) diff --git a/eland/dataframe.py b/eland/dataframe.py index 08ab7492..c33d271f 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -1687,18 +1687,18 @@ def mode( ) def quantile( self, - q: Union[float, List[float]] = 0.5, + q: Union[int, float, List[int], List[float]] = 0.5, numeric_only: Optional[bool] = True, ) -> "pd.DataFrame": """ Used to calculate quantile for a given DataFrame. - PARAMETERS + Parameters ---------- q: float or array like, default 0.5 Value between 0 <= q <= 1, the quantile(s) to compute. - numeric_only: {True, False, None} Default is None + numeric_only: {True, False, None} Default is True Which datatype to be returned - True: Returns all values as float64, NaN/NaT values are removed - None: Returns all values as the same dtype where possible, NaN/NaT are removed diff --git a/eland/operations.py b/eland/operations.py index 089a0a01..b505777b 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -351,6 +351,7 @@ def _metric_aggs( response=response, numeric_only=numeric_only, is_dataframe_agg=is_dataframe_agg, + percentiles=percentiles, ) def _terms_aggs( @@ -497,8 +498,9 @@ def _unpack_metric_aggs( pd_aggs: List[str], response: Dict[str, Any], numeric_only: Optional[bool], + percentiles: Optional[List[float]] = None, is_dataframe_agg: bool = False, - ): + ) -> Dict[str, List[Any]]: """ This method unpacks metric aggregations JSON response. This can be called either directly on an aggs query @@ -519,6 +521,8 @@ def _unpack_metric_aggs( is_dataframe_agg: - True then aggregation is called from dataframe - False then aggregation is called from series + percentiles: + List of percentiles when 'quantile' agg is called. Otherwise it is None Returns ------- @@ -560,18 +564,14 @@ def _unpack_metric_aggs( elif pd_agg == "quantile" and is_dataframe_agg: agg_value = agg_value["50.0"] else: - # We have to filter out the `_as_string` from results - # e.g. 'Cancelled_50.0': 0.0, 'Cancelled_50.0_as_string': 'false' - percentile_values = [ - value - for key, value in agg_value.items() - if not key.endswith("as_string") - ] + # Maintain order of percentiles + percentile_values = [agg_value[str(i)] for i in percentiles] if not percentile_values and pd_agg not in ("quantile", "median"): agg_value = agg_value[es_agg[1]] # Need to convert 'Population' stddev and variance - # from Elasticsearch into 'Sample' stddev and variance which is what pandas uses. + # from Elasticsearch into 'Sample' stddev and variance + # which is what pandas uses. if es_agg[1] in ("std_deviation", "variance"): # Neither transformation works with count <=1 count = response["aggregations"][ @@ -626,7 +626,7 @@ def _unpack_metric_aggs( ] # Null usually means there were no results. - if not isinstance(agg_value, (list, dict)) and ( + if not isinstance(agg_value, dict) and ( agg_value is None or np.isnan(agg_value) ): if is_dataframe_agg and not numeric_only: @@ -657,7 +657,6 @@ def _unpack_metric_aggs( agg_value = elasticsearch_date_to_pandas_date( agg_value, field.es_date_format ) - func = elasticsearch_date_to_pandas_date # If numeric_only is False | None then maintain column datatype elif not numeric_only and pd_agg != "quantile": # we're only converting to bool for lossless aggs like min, max, and median. @@ -684,29 +683,30 @@ def quantile( self, query_compiler: "QueryCompiler", pd_aggs: List[str], - quantiles: Union[float, List[float]], + quantiles: Union[int, float, List[int], List[float]], is_dataframe: bool = True, - numeric_only: bool = True, + numeric_only: Optional[bool] = True, ) -> Union[pd.DataFrame, pd.Series]: # To verify if quantile range falls between 0 to 1 - def verify_quantile_range(quantile: Any) -> float: - if isinstance(quantile, (int, float, str)): + def quantile_to_percentile(quantile: Any) -> float: + if isinstance(quantile, (int, float)): quantile = float(quantile) if quantile > 1 or quantile < 0: raise ValueError( f"quantile should be in range of 0 and 1, given {quantile}" ) else: - raise TypeError("quantile should be of type int or float or str") + raise TypeError("quantile should be of type int or float") # quantile * 100 = percentile - return quantile * 100 + # return float(...) because min(1.0) gives 1 + return float(min(100, max(0, quantile * 100))) - percentiles = list( - map( - verify_quantile_range, - (quantiles if isinstance(quantiles, list) else [quantiles]), + percentiles = [ + quantile_to_percentile(x) + for x in ( + (quantiles,) if not isinstance(quantiles, (list, tuple)) else quantiles ) - ) + ] result = self._metric_aggs( query_compiler, @@ -952,8 +952,8 @@ def _map_pd_aggs_to_es_aggs( """ # pd aggs that will be mapped to es aggs # that can use 'extended_stats'. - extended_stats_pd_aggs = {"mean", "min", "max", "sum", "var", "std"} - extended_stats_es_aggs = {"avg", "min", "max", "sum"} + extended_stats_pd_aggs = ["mean", "min", "max", "sum", "var", "std"] + extended_stats_es_aggs = ["avg", "min", "max", "sum"] extended_stats_calls = 0 es_aggs = [] @@ -1015,7 +1015,7 @@ def _map_pd_aggs_to_es_aggs( if extended_stats_calls >= 2: es_aggs = [ ("extended_stats", es_agg) - if not isinstance(es_agg, tuple) and (es_agg in extended_stats_es_aggs) + if es_agg in extended_stats_es_aggs else es_agg for es_agg in es_aggs ] diff --git a/eland/query.py b/eland/query.py index ef077a06..4845f915 100644 --- a/eland/query.py +++ b/eland/query.py @@ -260,7 +260,7 @@ def composite_agg_after_key(self, name: str, after_key: Dict[str, Any]) -> None: """ Add's after_key to existing query to fetch next bunch of results - PARAMETERS + Parameters ---------- name: str Name of the buckets diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 2ff42be2..dfc799b3 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -638,14 +638,14 @@ def mode( ) def quantile( self, - quantiles: Union[float, List[float]], - numeric_only: bool = True, + quantiles: Union[int, float, List[int], List[float]], + numeric_only: Optional[bool] = True, is_dataframe: bool = True, ) -> Union[pd.DataFrame, pd.Series, Any]: """ Holds quantile object for both DataFrame and Series - PARAMETERS + Parameters ---------- quantiles: list of quantiles for computation diff --git a/eland/series.py b/eland/series.py index b3c32bf8..43125d5b 100644 --- a/eland/series.py +++ b/eland/series.py @@ -565,11 +565,13 @@ def notna(self) -> BooleanFilter: notnull = notna - def quantile(self, q: Union[float, List[float]] = 0.5) -> Union[pd.Series, Any]: + def quantile( + self, q: Union[int, float, List[int], List[float]] = 0.5 + ) -> Union[pd.Series, Any]: """ Used to calculate quantile for a given Series. - PARAMETERS + Parameters ---------- q: float or array like, default 0.5 @@ -586,16 +588,16 @@ def quantile(self, q: Union[float, List[float]] = 0.5) -> Union[pd.Series, Any]: Examples -------- >>> ed_flights = ed.DataFrame('localhost', 'flights') - >>> ed_flights["timestamp"].quantile([.2,.5,.75]) + >>> ed_flights["timestamp"].quantile([.2,.5,.75]) # doctest: +SKIP 0.20 2018-01-09 04:30:57.289159912 0.50 2018-01-21 23:39:27.031627441 0.75 2018-02-01 04:54:59.256136963 Name: timestamp, dtype: datetime64[ns] - >>> ed_flights["dayOfWeek"].quantile() + >>> ed_flights["dayOfWeek"].quantile() # doctest: +SKIP 3.0 - >>> ed_flights["timestamp"].quantile() + >>> ed_flights["timestamp"].quantile() # doctest: +SKIP Timestamp('2018-01-22 00:12:48.844534180') """ return self._query_compiler.quantile( diff --git a/tests/dataframe/test_metrics_pytest.py b/tests/dataframe/test_metrics_pytest.py index d2664676..61e6e0a0 100644 --- a/tests/dataframe/test_metrics_pytest.py +++ b/tests/dataframe/test_metrics_pytest.py @@ -448,18 +448,18 @@ def test_aggs_mode(self, es_size, numeric_only): pd_mode, ed_mode, check_dtype=(False if es_size == 1 else True) ) - @pytest.mark.parametrize("quantiles", [[0.2, 0.5], [0, 1]]) + @pytest.mark.parametrize("quantiles", [[0.2, 0.5], [0, 1], [0.75, 0.2, 0.1, 0.5]]) @pytest.mark.parametrize("numeric_only", [False, None]) def test_flights_quantile(self, quantiles, numeric_only): pd_flights = self.pd_flights() ed_flights = self.ed_flights() - pd_quantile = pd_flights.filter(self.filter_data[0:3]).quantile( - q=quantiles, numeric_only=numeric_only - ) - ed_quantile = ed_flights.filter(self.filter_data[0:3]).quantile( - q=quantiles, numeric_only=numeric_only - ) + pd_quantile = pd_flights.filter( + ["AvgTicketPrice", "Cancelled", "dayOfWeek"] + ).quantile(q=quantiles, numeric_only=numeric_only) + ed_quantile = ed_flights.filter( + ["AvgTicketPrice", "Cancelled", "dayOfWeek"] + ).quantile(q=quantiles, numeric_only=numeric_only) assert_frame_equal(pd_quantile, ed_quantile, check_exact=False, rtol=2) @@ -485,8 +485,12 @@ def test_flights_quantile_error(self, quantiles): @pytest.mark.parametrize("numeric_only", [True, False, None]) def test_flights_agg_quantile(self, numeric_only): - pd_flights = self.pd_flights().filter(self.filter_data[0:3]) - ed_flights = self.ed_flights().filter(self.filter_data[0:3]) + pd_flights = self.pd_flights().filter( + ["AvgTicketPrice", "Cancelled", "dayOfWeek"] + ) + ed_flights = self.ed_flights().filter( + ["AvgTicketPrice", "Cancelled", "dayOfWeek"] + ) pd_quantile = pd_flights.agg(["quantile", "min"], numeric_only=numeric_only) ed_quantile = ed_flights.agg(["quantile", "min"], numeric_only=numeric_only) diff --git a/tests/operations/test_map_pd_aggs_to_es_aggs_pytest.py b/tests/operations/test_map_pd_aggs_to_es_aggs_pytest.py index d7d78c79..94b16bfa 100644 --- a/tests/operations/test_map_pd_aggs_to_es_aggs_pytest.py +++ b/tests/operations/test_map_pd_aggs_to_es_aggs_pytest.py @@ -63,3 +63,9 @@ def test_extended_stats_optimization(): es_aggs = Operations._map_pd_aggs_to_es_aggs(["count", pd_agg, "nunique"]) assert es_aggs == ["value_count", extended_es_agg, "cardinality"] + + +def test_percentiles_none(): + es_aggs = Operations._map_pd_aggs_to_es_aggs(["count", "min", "quantile"]) + + assert es_aggs == ["value_count", "min", ("percentiles", [50.0])] diff --git a/tests/series/test_metrics_pytest.py b/tests/series/test_metrics_pytest.py index 60b29ae7..c066463d 100644 --- a/tests/series/test_metrics_pytest.py +++ b/tests/series/test_metrics_pytest.py @@ -138,7 +138,9 @@ def test_ecommerce_mode_es_size(self, es_size): ed_mode = ed_series["order_date"].mode(es_size) assert_series_equal(pd_mode, ed_mode) - @pytest.mark.parametrize("quantile_list", [0.2, 0.5, [0.2, 0.5]]) + @pytest.mark.parametrize( + "quantile_list", [0.2, 0.5, [0.2, 0.5], [0.75, 0.2, 0.1, 0.5]] + ) @pytest.mark.parametrize("column", ["AvgTicketPrice", "Cancelled", "dayOfWeek"]) def test_flights_quantile(self, column, quantile_list): pd_flights = self.pd_flights()[column] @@ -150,3 +152,11 @@ def test_flights_quantile(self, column, quantile_list): assert_series_equal(pd_quantile, ed_quantile, check_exact=False, rtol=2) else: assert pd_quantile * 0.9 <= ed_quantile <= pd_quantile * 1.1 + + @pytest.mark.parametrize("quantiles_list", [[np.array([1, 2])], ["1", 2]]) + def test_quantile_non_numeric_values(self, quantiles_list): + ed_flights = self.ed_flights()["dayOfWeek"] + + match = "quantile should be of type int or float" + with pytest.raises(TypeError, match=match): + ed_flights.quantile(q=quantiles_list) From 38c3d5bfd78e5542874fa1d2da2486ff66710ff2 Mon Sep 17 00:00:00 2001 From: "P. Sai Vinay" Date: Thu, 19 Nov 2020 18:45:26 +0530 Subject: [PATCH 3/7] Requested changes --- eland/operations.py | 10 +++++----- .../test_map_pd_aggs_to_es_aggs_pytest.py | 13 ++++++++++--- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/eland/operations.py b/eland/operations.py index b505777b..8a5e7860 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -952,8 +952,8 @@ def _map_pd_aggs_to_es_aggs( """ # pd aggs that will be mapped to es aggs # that can use 'extended_stats'. - extended_stats_pd_aggs = ["mean", "min", "max", "sum", "var", "std"] - extended_stats_es_aggs = ["avg", "min", "max", "sum"] + extended_stats_pd_aggs = {"mean", "min", "max", "sum", "var", "std"} + extended_stats_es_aggs = {"avg", "min", "max", "sum"} extended_stats_calls = 0 es_aggs = [] @@ -983,14 +983,14 @@ def _map_pd_aggs_to_es_aggs( elif pd_agg == "mad": es_aggs.append("median_absolute_deviation") elif pd_agg == "median": - es_aggs.append(("percentiles", [50.0])) + es_aggs.append(("percentiles", (50.0,))) elif pd_agg == "quantile": # None when 'quantile' is called in df.agg[...] # Behaves same as median because pandas does the same. if percentiles is not None: - es_aggs.append(("percentiles", percentiles)) + es_aggs.append(("percentiles", tuple(percentiles))) else: - es_aggs.append(("percentiles", [50.0])) + es_aggs.append(("percentiles", (50.0,))) elif pd_agg == "mode": if len(pd_aggs) != 1: diff --git a/tests/operations/test_map_pd_aggs_to_es_aggs_pytest.py b/tests/operations/test_map_pd_aggs_to_es_aggs_pytest.py index 94b16bfa..88e8b404 100644 --- a/tests/operations/test_map_pd_aggs_to_es_aggs_pytest.py +++ b/tests/operations/test_map_pd_aggs_to_es_aggs_pytest.py @@ -44,8 +44,15 @@ def test_all_aggs(): "median_absolute_deviation", "value_count", "cardinality", - ("percentiles", [50.0]), - ("percentiles", [0.2, 0.5, 0.8]), + ("percentiles", (50.0,)), + ( + "percentiles", + ( + 0.2, + 0.5, + 0.8, + ), + ), ] @@ -68,4 +75,4 @@ def test_extended_stats_optimization(): def test_percentiles_none(): es_aggs = Operations._map_pd_aggs_to_es_aggs(["count", "min", "quantile"]) - assert es_aggs == ["value_count", "min", ("percentiles", [50.0])] + assert es_aggs == ["value_count", "min", ("percentiles", (50.0,))] From e3ad6016b343036f3e7c8b11400a59deacfd889b Mon Sep 17 00:00:00 2001 From: "P. Sai Vinay" Date: Fri, 8 Jan 2021 00:59:32 +0530 Subject: [PATCH 4/7] Rebase changes --- docs/sphinx/reference/api/eland.DataFrame.quantile.rst | 6 ++++++ docs/sphinx/reference/api/eland.Series.quantile.rst | 6 ++++++ eland/dataframe.py | 1 + eland/operations.py | 8 +++++--- eland/query_compiler.py | 1 + tests/series/test_metrics_pytest.py | 1 + 6 files changed, 20 insertions(+), 3 deletions(-) create mode 100644 docs/sphinx/reference/api/eland.DataFrame.quantile.rst create mode 100644 docs/sphinx/reference/api/eland.Series.quantile.rst diff --git a/docs/sphinx/reference/api/eland.DataFrame.quantile.rst b/docs/sphinx/reference/api/eland.DataFrame.quantile.rst new file mode 100644 index 00000000..eb3f8068 --- /dev/null +++ b/docs/sphinx/reference/api/eland.DataFrame.quantile.rst @@ -0,0 +1,6 @@ +eland.DataFrame.quantile +======================== + +.. currentmodule:: eland + +.. automethod:: DataFrame.quantile \ No newline at end of file diff --git a/docs/sphinx/reference/api/eland.Series.quantile.rst b/docs/sphinx/reference/api/eland.Series.quantile.rst new file mode 100644 index 00000000..958bf5c6 --- /dev/null +++ b/docs/sphinx/reference/api/eland.Series.quantile.rst @@ -0,0 +1,6 @@ +eland.Series.quantile +===================== + +.. currentmodule:: eland + +.. automethod:: Series.quantile \ No newline at end of file diff --git a/eland/dataframe.py b/eland/dataframe.py index c33d271f..5459e35a 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -1685,6 +1685,7 @@ def mode( return self._query_compiler.mode( numeric_only=numeric_only, dropna=True, is_dataframe=True, es_size=es_size ) + def quantile( self, q: Union[int, float, List[int], List[float]] = 0.5, diff --git a/eland/operations.py b/eland/operations.py index 8a5e7860..4de066a3 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -283,7 +283,7 @@ def _metric_aggs( # Consider if field is Int/Float/Bool fields = [field for field in fields if (field.is_numeric or field.is_bool)] elif not numeric_only and (pd_aggs == ["quantile"]): - # quantile doesn't accept text fields + # quantile doesn't accept timestamp fields fields = [ field for field in fields @@ -626,7 +626,7 @@ def _unpack_metric_aggs( ] # Null usually means there were no results. - if not isinstance(agg_value, dict) and ( + if not isinstance(agg_value, (list, dict)) and ( agg_value is None or np.isnan(agg_value) ): if is_dataframe_agg and not numeric_only: @@ -650,7 +650,9 @@ def _unpack_metric_aggs( ] elif percentile_values: percentile_values = [ - elasticsearch_date_to_pandas_date(value, field.es_date_format) + elasticsearch_date_to_pandas_date( + value, field.es_date_format + ) for value in percentile_values ] else: diff --git a/eland/query_compiler.py b/eland/query_compiler.py index dfc799b3..3495200e 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -636,6 +636,7 @@ def mode( is_dataframe=is_dataframe, es_size=es_size, ) + def quantile( self, quantiles: Union[int, float, List[int], List[float]], diff --git a/tests/series/test_metrics_pytest.py b/tests/series/test_metrics_pytest.py index c066463d..cf574361 100644 --- a/tests/series/test_metrics_pytest.py +++ b/tests/series/test_metrics_pytest.py @@ -138,6 +138,7 @@ def test_ecommerce_mode_es_size(self, es_size): ed_mode = ed_series["order_date"].mode(es_size) assert_series_equal(pd_mode, ed_mode) + @pytest.mark.parametrize( "quantile_list", [0.2, 0.5, [0.2, 0.5], [0.75, 0.2, 0.1, 0.5]] ) From 13f409a47e1f5a4a9092ea69707b6141f196fa5d Mon Sep 17 00:00:00 2001 From: "P. Sai Vinay" Date: Fri, 8 Jan 2021 01:04:15 +0530 Subject: [PATCH 5/7] Fix typo --- eland/operations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eland/operations.py b/eland/operations.py index 4de066a3..d8d5656e 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -515,7 +515,7 @@ def _unpack_metric_aggs( pd_aggs: a list of aggs response: - a dict containing response from ElasticSearch + a dict containing response from Elastic Search numeric_only: return either numeric values or NaN/NaT is_dataframe_agg: From b1297eb0a4cb1c7406f6a4ce4c7546633d6208d3 Mon Sep 17 00:00:00 2001 From: "P. Sai Vinay" Date: Mon, 31 May 2021 20:01:18 +0530 Subject: [PATCH 6/7] Fix Pandas quantile incompatibility with boolean --- eland/dataframe.py | 21 +++++++++++---------- tests/dataframe/test_metrics_pytest.py | 8 ++++---- tests/series/test_metrics_pytest.py | 4 +++- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/eland/dataframe.py b/eland/dataframe.py index 5459e35a..9ab39d34 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -1716,24 +1716,25 @@ def quantile( Examples -------- - >>> ed_flights = ed.DataFrame('localhost', 'flights') + >>> ed_df = ed.DataFrame('localhost', 'flights') + >>> ed_flights = ed_df.filter(["AvgTicketPrice", "FlightDelayMin", "dayOfWeek", "timestamp"]) >>> ed_flights.quantile() # doctest: +SKIP AvgTicketPrice 640.387285 - Cancelled 0.000000 + FlightDelayMin 0.000000 dayOfWeek 3.000000 Name: 0.5, dtype: float64 >>> ed_flights.quantile([.2, .5, .75]) # doctest: +SKIP - AvgTicketPrice Cancelled dayOfWeek - 0.20 361.040768 0.0 1.0 - 0.50 640.387285 0.0 3.0 - 0.75 842.213490 0.0 4.0 + AvgTicketPrice FlightDelayMin dayOfWeek + 0.20 361.040768 0.0 1.0 + 0.50 640.387285 0.0 3.0 + 0.75 842.213490 15.0 4.0 >>> ed_flights.quantile([.2, .5, .75], numeric_only=False) # doctest: +SKIP - AvgTicketPrice Cancelled dayOfWeek timestamp - 0.20 361.040768 0.0 1.0 2018-01-09 04:37:40.409370361 - 0.50 640.387285 0.0 3.0 2018-01-21 23:59:14.074445068 - 0.75 842.213490 0.0 4.0 2018-02-01 04:54:29.135782959 + AvgTicketPrice FlightDelayMin dayOfWeek timestamp + 0.20 361.040768 0.0 1.0 2018-01-09 04:43:55.296587520 + 0.50 640.387285 0.0 3.0 2018-01-21 23:51:57.637076736 + 0.75 842.213490 15.0 4.0 2018-02-01 04:46:16.658119680 """ return self._query_compiler.quantile(quantiles=q, numeric_only=numeric_only) diff --git a/tests/dataframe/test_metrics_pytest.py b/tests/dataframe/test_metrics_pytest.py index 61e6e0a0..297f17d4 100644 --- a/tests/dataframe/test_metrics_pytest.py +++ b/tests/dataframe/test_metrics_pytest.py @@ -455,10 +455,10 @@ def test_flights_quantile(self, quantiles, numeric_only): ed_flights = self.ed_flights() pd_quantile = pd_flights.filter( - ["AvgTicketPrice", "Cancelled", "dayOfWeek"] + ["AvgTicketPrice", "FlightDelayMin", "dayOfWeek"] ).quantile(q=quantiles, numeric_only=numeric_only) ed_quantile = ed_flights.filter( - ["AvgTicketPrice", "Cancelled", "dayOfWeek"] + ["AvgTicketPrice", "FlightDelayMin", "dayOfWeek"] ).quantile(q=quantiles, numeric_only=numeric_only) assert_frame_equal(pd_quantile, ed_quantile, check_exact=False, rtol=2) @@ -486,10 +486,10 @@ def test_flights_quantile_error(self, quantiles): @pytest.mark.parametrize("numeric_only", [True, False, None]) def test_flights_agg_quantile(self, numeric_only): pd_flights = self.pd_flights().filter( - ["AvgTicketPrice", "Cancelled", "dayOfWeek"] + ["AvgTicketPrice", "FlightDelayMin", "dayOfWeek"] ) ed_flights = self.ed_flights().filter( - ["AvgTicketPrice", "Cancelled", "dayOfWeek"] + ["AvgTicketPrice", "FlightDelayMin", "dayOfWeek"] ) pd_quantile = pd_flights.agg(["quantile", "min"], numeric_only=numeric_only) diff --git a/tests/series/test_metrics_pytest.py b/tests/series/test_metrics_pytest.py index cf574361..70c4aa1c 100644 --- a/tests/series/test_metrics_pytest.py +++ b/tests/series/test_metrics_pytest.py @@ -142,7 +142,9 @@ def test_ecommerce_mode_es_size(self, es_size): @pytest.mark.parametrize( "quantile_list", [0.2, 0.5, [0.2, 0.5], [0.75, 0.2, 0.1, 0.5]] ) - @pytest.mark.parametrize("column", ["AvgTicketPrice", "Cancelled", "dayOfWeek"]) + @pytest.mark.parametrize( + "column", ["AvgTicketPrice", "FlightDelayMin", "dayOfWeek"] + ) def test_flights_quantile(self, column, quantile_list): pd_flights = self.pd_flights()[column] ed_flights = self.ed_flights()[column] From e46b43873658c1c78578138705c50f16fa0c2441 Mon Sep 17 00:00:00 2001 From: "P. Sai Vinay" Date: Fri, 4 Jun 2021 00:45:35 +0530 Subject: [PATCH 7/7] Requested changes --- eland/operations.py | 37 +++++++++++++++---------------------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/eland/operations.py b/eland/operations.py index d8d5656e..abb4f846 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -282,13 +282,6 @@ def _metric_aggs( if numeric_only: # Consider if field is Int/Float/Bool fields = [field for field in fields if (field.is_numeric or field.is_bool)] - elif not numeric_only and (pd_aggs == ["quantile"]): - # quantile doesn't accept timestamp fields - fields = [ - field - for field in fields - if (field.is_numeric or field.is_bool or field.is_timestamp) - ] body = Query(query_params.query) @@ -305,31 +298,31 @@ def _metric_aggs( if isinstance(es_agg, tuple): if es_agg[0] == "percentiles": body.percentile_agg( - f"{es_agg[0]}_{field.es_field_name}", - field.es_field_name, - es_agg[1], + name=f"{es_agg[0]}_{field.es_field_name}", + field=field.es_field_name, + percents=es_agg[1], ) else: body.metric_aggs( - f"{es_agg[0]}_{field.es_field_name}", - es_agg[0], - field.aggregatable_es_field_name, + name=f"{es_agg[0]}_{field.es_field_name}", + func=es_agg[0], + field=field.aggregatable_es_field_name, ) elif es_agg == "mode": # TODO for dropna=False, Check If field is timestamp or boolean or numeric, # then use missing parameter for terms aggregation. body.terms_aggs( - f"{es_agg}_{field.es_field_name}", - "terms", - field.aggregatable_es_field_name, - es_mode_size, + name=f"{es_agg}_{field.es_field_name}", + func="terms", + field=field.aggregatable_es_field_name, + es_size=es_mode_size, ) else: body.metric_aggs( - f"{es_agg}_{field.es_field_name}", - es_agg, - field.aggregatable_es_field_name, + name=f"{es_agg}_{field.es_field_name}", + func=es_agg, + field=field.aggregatable_es_field_name, ) response = query_compiler._client.search( @@ -515,7 +508,7 @@ def _unpack_metric_aggs( pd_aggs: a list of aggs response: - a dict containing response from Elastic Search + a dict containing response from Elasticsearch numeric_only: return either numeric values or NaN/NaT is_dataframe_agg: @@ -727,7 +720,7 @@ def quantile_to_percentile(quantile: Any) -> float: # Display Output same as pandas does if isinstance(quantiles, float): - return df.squeeze() if is_dataframe else df.squeeze() + return df.squeeze() else: return df if is_dataframe else df.transpose().iloc[0]