Skip to content
This repository was archived by the owner on Sep 12, 2024. It is now read-only.

feat(anomaly): Sub-Dimension Anomaly Filter #999

Merged
merged 13 commits into from
Jun 22, 2022
161 changes: 80 additions & 81 deletions chaos_genius/views/anomaly_data_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import csv
import io
import time
from collections import defaultdict
from datetime import date, datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple, cast

Expand Down Expand Up @@ -48,6 +49,7 @@ def kpi_anomaly_detection(kpi_id):
current_app.logger.info(f"Anomaly Detection Started for KPI ID: {kpi_id}")
data = []
end_date = None
is_overall = True
try:
kpi_info = get_kpi_data_from_id(kpi_id)

Expand All @@ -67,7 +69,18 @@ def kpi_anomaly_detection(kpi_id):

end_date = get_anomaly_output_end_date(kpi_info)

anom_data = get_overall_data(kpi_id, end_date, period)
dimensions_values = _get_dimensions_values(kpi_id, end_date, period)

dimension = request.args.get("dimension", default=None)
value = request.args.get("value", default=None)

if dimension and value:
is_overall = False
subdim = f"`{dimension}` == \"{value}\""
anom_data = get_dq_and_subdim_data(kpi_id, end_date, "subdim", subdim, period)
else:
is_overall = True
anom_data = get_overall_data(kpi_id, end_date, period)

anom_data["x_axis_limits"] = get_anomaly_graph_x_lims(end_date, period, hourly)

Expand All @@ -77,7 +90,7 @@ def kpi_anomaly_detection(kpi_id):
# remove from here once updated in frontend
"base_anomaly_id": kpi_id,
}
data["chart_data"]["title"] = kpi_info["name"]

current_app.logger.info(f"Anomaly DD Retrieval Completed for KPI ID: {kpi_id}")

end_date = get_datetime_string_with_tz(end_date, hourly)
Expand All @@ -91,9 +104,11 @@ def kpi_anomaly_detection(kpi_id):
return jsonify(
{
"data": data,
"dimensions_values": dimensions_values,
"msg": "",
"anomaly_end_date": end_date,
"last_run_time_anomaly": anomaly_last_scan,
"is_overall": is_overall,
}
)

Expand Down Expand Up @@ -166,85 +181,6 @@ def kpi_anomaly_data_quality(kpi_id):
return jsonify({"data": data, "msg": ""})


@blueprint.route("/<int:kpi_id>/subdim-anomaly", methods=["GET"])
def kpi_subdim_anomaly(kpi_id):
current_app.logger.info(f"Subdimension Anomaly Started for KPI ID: {kpi_id}")
subdim_graphs = []
end_date = None
try:
kpi_info = get_kpi_data_from_id(kpi_id)
period = kpi_info["anomaly_params"]["anomaly_period"]
hourly = kpi_info["anomaly_params"]["frequency"] == "H"

end_date = get_anomaly_output_end_date(kpi_info)
graph_xlims = get_anomaly_graph_x_lims(end_date, period, hourly)
if hourly:
# Use a 24 hour window to find peak severity per subdim and rank in descending order
start_date = end_date - timedelta(hours=23)
query = (
db.session.query(
AnomalyDataOutput.series_type,
func.max(AnomalyDataOutput.severity),
)
.filter(
(AnomalyDataOutput.kpi_id == kpi_id)
& (AnomalyDataOutput.data_datetime >= start_date)
& (AnomalyDataOutput.data_datetime <= end_date)
& (AnomalyDataOutput.anomaly_type == "subdim")
& (AnomalyDataOutput.is_anomaly != 0)
)
.group_by(AnomalyDataOutput.series_type)
.order_by(func.max(AnomalyDataOutput.severity).desc())
.limit(TOP_SUBDIMENSIONS_FOR_ANOMALY)
)

else:
query = (
AnomalyDataOutput.query.filter(
(AnomalyDataOutput.kpi_id == kpi_id)
& (AnomalyDataOutput.data_datetime == end_date)
& (AnomalyDataOutput.anomaly_type == "subdim")
& (AnomalyDataOutput.is_anomaly != 0)
)
.order_by(AnomalyDataOutput.severity.desc())
.limit(TOP_SUBDIMENSIONS_FOR_ANOMALY)
)
results = pd.read_sql(query.statement, query.session.bind)

if len(results) == 0:
end_date_str = ""
current_app.logger.error("No Subdimension Anomaly Found", exc_info=1)
else:
end_date_str = get_datetime_string_with_tz(end_date, hourly)

subdims = results.series_type
for subdim in subdims:
anom_data = get_dq_and_subdim_data(
kpi_id, end_date, "subdim", subdim, period
)
anom_data["x_axis_limits"] = graph_xlims
subdim_graphs.append(anom_data)
current_app.logger.info(
f"Subdimension Anomaly Retrieval Completed for KPI ID: {kpi_id}"
)

anomaly_last_scan = get_lastscan_string_with_tz(
kpi_info["scheduler_params"]["last_scheduled_time_anomaly"]
)

except: # noqa: E722
current_app.logger.error("Error in Subdimension Anomaly Retrieval", exc_info=1)

return jsonify(
{
"data": subdim_graphs,
"msg": "",
"anomaly_end_date": end_date_str,
"last_run_time_anomaly": anomaly_last_scan,
}
)


@blueprint.route("/anomaly-params/meta-info", methods=["GET"])
def kpi_anomaly_params_meta():
# TODO: Move this dict into the corresponding data model
Expand Down Expand Up @@ -425,6 +361,68 @@ def kpi_anomaly_retraining(kpi_id):
return jsonify({"msg": f"retraining failed for KPI: {kpi_id}, KPI id is None"})


def _get_dimensions_values(
kpi_id: int, end_date: datetime, period=90
) -> Dict[str, List[str]]:
"""Creates a dictionary of KPI dimension and their values.

:param kpi_id: ID of the KPI
:type kpi_id: int
:param end_date: last data entry of the KPI
:type end_date: datetime
:param period: time window of KPI
:type period: int
:return dimension_values_dict: dictionary of {dimension:list(vals)}
:rtype: dict
"""
start_date = pd.to_datetime(end_date) - timedelta(days=period)
start_date_str = start_date.strftime("%Y-%m-%d %H:%M:%S")
end_date_str = end_date.strftime("%Y-%m-%d %H:%M:%S")

# Get unique list of subdims and values from DB
results = (
db.session.query(
func.distinct(AnomalyDataOutput.series_type)
)
.filter(
(AnomalyDataOutput.kpi_id == kpi_id)
& (AnomalyDataOutput.data_datetime >= start_date_str)
& (AnomalyDataOutput.data_datetime <= end_date_str)
& (AnomalyDataOutput.anomaly_type == "subdim")
).all()
)

if len(results) == 0:
current_app.logger.info("No Subdimension Anomaly Found")
return []

# series_type strings are in the format "`dimension` == "value""
# split string into dimension and value to get [`dimension`, "value"]
split_dimension_value = list(
map(lambda dim_val: dim_val[0].split(" == "), results)
)

dimension_values_dict = defaultdict(list)
for dim_val in split_dimension_value:
# remove extra quotation marks
dimension, values = map(lambda dim_val_string: dim_val_string[1:-1], dim_val)
dimension_values_dict[dimension].append(values)

dimension_values_list = [
{
"label": dimension,
"value": dimension,
"subdim_value_options": [
{"label": value, "value": value}
for value in dimension_values_dict[dimension]
],
}
for dimension in dimension_values_dict
]

return dimension_values_list


def fill_graph_data(row, graph_data):
"""Fills graph_data with intervals, values, and predicted_values for
a given row.
Expand Down Expand Up @@ -503,6 +501,7 @@ def get_overall_data_points(kpi_id: int, n: int = 60) -> List:

return data_points


def get_overall_data(kpi_id, end_date: datetime, n=90):
start_date = end_date - timedelta(days=n)
start_date = start_date.strftime("%Y-%m-%d %H:%M:%S")
Expand Down