Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add quantile to DataFrame and Series #318

Merged
merged 7 commits into from
Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/sphinx/reference/api/eland.DataFrame.quantile.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
eland.DataFrame.quantile
========================

.. currentmodule:: eland

.. automethod:: DataFrame.quantile
6 changes: 6 additions & 0 deletions docs/sphinx/reference/api/eland.Series.quantile.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
eland.Series.quantile
=====================

.. currentmodule:: eland

.. automethod:: Series.quantile
1 change: 1 addition & 0 deletions docs/sphinx/reference/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ Computations / Descriptive Stats
DataFrame.sum
DataFrame.nunique
DataFrame.mode
DataFrame.quantile

Reindexing / Selection / Label Manipulation
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
1 change: 1 addition & 0 deletions docs/sphinx/reference/series.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ Computations / Descriptive Stats
Series.nunique
Series.value_counts
Series.mode
Series.quantile

Reindexing / Selection / Label Manipulation
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
52 changes: 52 additions & 0 deletions eland/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1686,6 +1686,58 @@ def mode(
numeric_only=numeric_only, dropna=True, is_dataframe=True, es_size=es_size
)

def quantile(
self,
q: Union[int, float, List[int], List[float]] = 0.5,
numeric_only: Optional[bool] = True,
) -> "pd.DataFrame":
"""
Used to calculate quantile for a given DataFrame.

Parameters
----------
q:
float or array like, default 0.5
Value between 0 <= q <= 1, the quantile(s) to compute.
numeric_only: {True, False, None} Default is True
Which datatype to be returned
- True: Returns all values as float64, NaN/NaT values are removed
- None: Returns all values as the same dtype where possible, NaN/NaT are removed
- False: Returns all values as the same dtype where possible, NaN/NaT are preserved

Returns
-------
pandas.DataFrame
quantile value for each column

See Also
--------
:pandas_api_docs:`pandas.DataFrame.quantile`

Examples
--------
>>> ed_df = ed.DataFrame('localhost', 'flights')
>>> ed_flights = ed_df.filter(["AvgTicketPrice", "FlightDelayMin", "dayOfWeek", "timestamp"])
>>> ed_flights.quantile() # doctest: +SKIP
AvgTicketPrice 640.387285
FlightDelayMin 0.000000
dayOfWeek 3.000000
Name: 0.5, dtype: float64

>>> ed_flights.quantile([.2, .5, .75]) # doctest: +SKIP
AvgTicketPrice FlightDelayMin dayOfWeek
0.20 361.040768 0.0 1.0
0.50 640.387285 0.0 3.0
0.75 842.213490 15.0 4.0

>>> ed_flights.quantile([.2, .5, .75], numeric_only=False) # doctest: +SKIP
AvgTicketPrice FlightDelayMin dayOfWeek timestamp
0.20 361.040768 0.0 1.0 2018-01-09 04:43:55.296587520
0.50 640.387285 0.0 3.0 2018-01-21 23:51:57.637076736
0.75 842.213490 15.0 4.0 2018-02-01 04:46:16.658119680
"""
return self._query_compiler.quantile(quantiles=q, numeric_only=numeric_only)

def query(self, expr) -> "DataFrame":
"""
Query the columns of a DataFrame with a boolean expression.
Expand Down
149 changes: 123 additions & 26 deletions eland/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ def _metric_aggs(
is_dataframe_agg: bool = False,
es_mode_size: Optional[int] = None,
dropna: bool = True,
percentiles: Optional[List[float]] = None,
) -> Dict[str, Any]:
"""
Used to calculate metric aggregations
Expand All @@ -262,6 +263,8 @@ def _metric_aggs(
number of rows to return when multiple mode values are present.
dropna:
drop NaN/NaT for a dataframe
percentiles:
List of percentiles when 'quantile' agg is called. Otherwise it is None

Returns
-------
Expand All @@ -283,7 +286,7 @@ def _metric_aggs(
body = Query(query_params.query)

# Convert pandas aggs to ES equivalent
es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs)
es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs, percentiles)

for field in fields:
for es_agg in es_aggs:
Expand All @@ -293,25 +296,33 @@ def _metric_aggs(

# If we have multiple 'extended_stats' etc. here we simply NOOP on 2nd call
if isinstance(es_agg, tuple):
body.metric_aggs(
f"{es_agg[0]}_{field.es_field_name}",
es_agg[0],
field.aggregatable_es_field_name,
)
if es_agg[0] == "percentiles":
body.percentile_agg(
name=f"{es_agg[0]}_{field.es_field_name}",
field=field.es_field_name,
percents=es_agg[1],
)
else:
body.metric_aggs(
name=f"{es_agg[0]}_{field.es_field_name}",
func=es_agg[0],
field=field.aggregatable_es_field_name,
)
elif es_agg == "mode":
# TODO for dropna=False, Check If field is timestamp or boolean or numeric,
# then use missing parameter for terms aggregation.
body.terms_aggs(
f"{es_agg}_{field.es_field_name}",
"terms",
field.aggregatable_es_field_name,
es_mode_size,
name=f"{es_agg}_{field.es_field_name}",
func="terms",
field=field.aggregatable_es_field_name,
es_size=es_mode_size,
)

else:
body.metric_aggs(
f"{es_agg}_{field.es_field_name}",
es_agg,
field.aggregatable_es_field_name,
name=f"{es_agg}_{field.es_field_name}",
func=es_agg,
field=field.aggregatable_es_field_name,
)

response = query_compiler._client.search(
Expand All @@ -333,6 +344,7 @@ def _metric_aggs(
response=response,
numeric_only=numeric_only,
is_dataframe_agg=is_dataframe_agg,
percentiles=percentiles,
)

def _terms_aggs(
Expand Down Expand Up @@ -479,8 +491,9 @@ def _unpack_metric_aggs(
pd_aggs: List[str],
response: Dict[str, Any],
numeric_only: Optional[bool],
percentiles: Optional[List[float]] = None,
is_dataframe_agg: bool = False,
):
) -> Dict[str, List[Any]]:
"""
This method unpacks metric aggregations JSON response.
This can be called either directly on an aggs query
Expand All @@ -495,15 +508,22 @@ def _unpack_metric_aggs(
pd_aggs:
a list of aggs
response:
a dict containing response from Elastic Search
a dict containing response from Elasticsearch
numeric_only:
return either numeric values or NaN/NaT
is_dataframe_agg:
- True then aggregation is called from dataframe
- False then aggregation is called from series
percentiles:
List of percentiles when 'quantile' agg is called. Otherwise it is None

Returns
-------
a dictionary on which agg caluculations are done.
"""
results: Dict[str, Any] = {}
percentile_values: List[float] = []
agg_value: Union[int, float]

for field in fields:
values = []
Expand All @@ -529,10 +549,19 @@ def _unpack_metric_aggs(

# Pull multiple values from 'percentiles' result.
if es_agg[0] == "percentiles":
agg_value = agg_value["values"]

agg_value = agg_value[es_agg[1]]
agg_value = agg_value["values"] # Returns dictionary
if pd_agg == "median":
agg_value = agg_value["50.0"]
# Currently Pandas does the same
# If we call quantile it returns the same result as of median.
elif pd_agg == "quantile" and is_dataframe_agg:
agg_value = agg_value["50.0"]
else:
# Maintain order of percentiles
percentile_values = [agg_value[str(i)] for i in percentiles]

if not percentile_values and pd_agg not in ("quantile", "median"):
agg_value = agg_value[es_agg[1]]
# Need to convert 'Population' stddev and variance
# from Elasticsearch into 'Sample' stddev and variance
# which is what pandas uses.
Expand Down Expand Up @@ -590,7 +619,7 @@ def _unpack_metric_aggs(
]

# Null usually means there were no results.
if not isinstance(agg_value, list) and (
if not isinstance(agg_value, (list, dict)) and (
agg_value is None or np.isnan(agg_value)
):
if is_dataframe_agg and not numeric_only:
Expand All @@ -612,12 +641,19 @@ def _unpack_metric_aggs(
)
for value in agg_value
]
elif percentile_values:
percentile_values = [
elasticsearch_date_to_pandas_date(
value, field.es_date_format
)
for value in percentile_values
]
else:
agg_value = elasticsearch_date_to_pandas_date(
agg_value, field.es_date_format
)
# If numeric_only is False | None then maintain column datatype
elif not numeric_only:
elif not numeric_only and pd_agg != "quantile":
# we're only converting to bool for lossless aggs like min, max, and median.
if pd_agg in {"max", "min", "median", "sum", "mode"}:
# 'sum' isn't representable with bool, use int64
Expand All @@ -626,14 +662,68 @@ def _unpack_metric_aggs(
else:
agg_value = field.np_dtype.type(agg_value)

values.append(agg_value)
if not percentile_values:
values.append(agg_value)

# If numeric_only is True and We only have a NaN type field then we check for empty.
if values:
results[field.column] = values if len(values) > 1 else values[0]
# This only runs when df.quantile() or series.quantile() is called
if percentile_values and not is_dataframe_agg:
results[f"{field.column}"] = percentile_values

return results

def quantile(
self,
query_compiler: "QueryCompiler",
pd_aggs: List[str],
quantiles: Union[int, float, List[int], List[float]],
is_dataframe: bool = True,
numeric_only: Optional[bool] = True,
) -> Union[pd.DataFrame, pd.Series]:
# To verify if quantile range falls between 0 to 1
def quantile_to_percentile(quantile: Any) -> float:
if isinstance(quantile, (int, float)):
quantile = float(quantile)
if quantile > 1 or quantile < 0:
raise ValueError(
f"quantile should be in range of 0 and 1, given {quantile}"
)
else:
raise TypeError("quantile should be of type int or float")
# quantile * 100 = percentile
# return float(...) because min(1.0) gives 1
return float(min(100, max(0, quantile * 100)))

percentiles = [
quantile_to_percentile(x)
for x in (
(quantiles,) if not isinstance(quantiles, (list, tuple)) else quantiles
)
]

result = self._metric_aggs(
query_compiler,
pd_aggs=pd_aggs,
percentiles=percentiles,
is_dataframe_agg=False,
numeric_only=numeric_only,
)

df = pd.DataFrame(
result,
index=[i / 100 for i in percentiles],
columns=result.keys(),
dtype=(np.float64 if numeric_only else None),
)

# Display Output same as pandas does
if isinstance(quantiles, float):
return df.squeeze()
else:
return df if is_dataframe else df.transpose().iloc[0]

def aggs_groupby(
self,
query_compiler: "QueryCompiler",
Expand Down Expand Up @@ -821,10 +911,13 @@ def bucket_generator(
return composite_buckets["buckets"]

@staticmethod
def _map_pd_aggs_to_es_aggs(pd_aggs):
def _map_pd_aggs_to_es_aggs(
pd_aggs: List[str], percentiles: Optional[List[float]] = None
) -> Union[List[str], List[Tuple[str, List[float]]]]:
"""
Args:
pd_aggs - list of pandas aggs (e.g. ['mad', 'min', 'std'] etc.)
percentiles - list of percentiles for 'quantile' agg

Returns:
ed_aggs - list of corresponding es_aggs (e.g. ['median_absolute_deviation', 'min', 'std'] etc.)
Expand Down Expand Up @@ -885,7 +978,14 @@ def _map_pd_aggs_to_es_aggs(pd_aggs):
elif pd_agg == "mad":
es_aggs.append("median_absolute_deviation")
elif pd_agg == "median":
es_aggs.append(("percentiles", "50.0"))
es_aggs.append(("percentiles", (50.0,)))
elif pd_agg == "quantile":
# None when 'quantile' is called in df.agg[...]
# Behaves same as median because pandas does the same.
if percentiles is not None:
es_aggs.append(("percentiles", tuple(percentiles)))
else:
es_aggs.append(("percentiles", (50.0,)))

elif pd_agg == "mode":
if len(pd_aggs) != 1:
Expand All @@ -896,9 +996,6 @@ def _map_pd_aggs_to_es_aggs(pd_aggs):
es_aggs.append("mode")

# Not implemented
elif pd_agg == "quantile":
# TODO
raise NotImplementedError(pd_agg, " not currently implemented")
elif pd_agg == "rank":
# TODO
raise NotImplementedError(pd_agg, " not currently implemented")
Expand Down
Loading