Skip to content
This repository was archived by the owner on Sep 12, 2024. It is now read-only.

refactor(kpi-controller): fix type errors and lint issues #979

Merged
merged 2 commits into from
Jun 7, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 60 additions & 56 deletions chaos_genius/controllers/kpi_controller.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
"""Logic and helpers for interaction with KPIs."""
import logging
from datetime import date, datetime, timedelta
from typing import Iterator, List, Optional

from sqlalchemy import delete

from chaos_genius.controllers.task_monitor import (
checkpoint_failure,
checkpoint_success,
)
from chaos_genius.controllers.task_monitor import checkpoint_failure, checkpoint_success
from chaos_genius.core.anomaly.controller import AnomalyDetectionController
from chaos_genius.core.rca.constants import TIME_RANGES_BY_KEY
from chaos_genius.core.rca.rca_controller import RootCauseAnalysisController
Expand All @@ -25,19 +23,16 @@


def _is_data_present_for_end_date(
kpi_info: dict, end_date: date = None
kpi_info: dict, end_date: Optional[date] = None
) -> bool:
if end_date is None:
end_date = datetime.now().date()
df_count = DataLoader(
kpi_info, end_date=end_date, days_before=0
).get_count()
df_count = DataLoader(kpi_info, end_date=end_date, days_before=0).get_count()
return df_count != 0


def get_kpi_data_from_id(n: int) -> dict:
"""Returns the corresponding KPI data for the given KPI ID
from KPI_DATA.
"""Returns the corresponding KPI data for the given KPI ID from KPI_DATA.

:param n: ID of KPI
:type n: int
Expand All @@ -58,45 +53,52 @@ def get_kpi_data_from_id(n: int) -> dict:
def run_anomaly_for_kpi(
kpi_id: int, end_date: Optional[date] = None, task_id: Optional[int] = None
):
"""Runs anomaly detection for given kpi_id.

Blocking function (it does NOT spawn a celery task).
"""
logger.info(f"Starting Anomaly Detection for KPI ID: {kpi_id}.")
kpi_info = get_kpi_data_from_id(kpi_id)
logger.info(f"(KPI ID: {kpi_id}) Retrieved KPI information.")

logger.info("(KPI ID: {kpi_id}) Selecting end date.")

if (
end_date is None
and kpi_info["scheduler_params"]["scheduler_frequency"] == "D"
):
# by default we always calculate for n-days_offset_for_analytics
end_date = datetime.today().date() - timedelta(
days=(DAYS_OFFSET_FOR_ANALTYICS)
)
# Check if data is available or not then try for n-days_offset_for_analytics-1
if not _is_data_present_for_end_date(kpi_info, end_date):
end_date = end_date - timedelta(days=1)
logger.info("(KPI ID: {kpi_id}) Decreasing end date by 1.")
if end_date is None:
scheduler_frequency = kpi_info["scheduler_params"]["scheduler_frequency"]

if scheduler_frequency == "D":
# by default we always calculate for n-days_offset_for_analytics
true_end_date = datetime.now().date() - timedelta(
days=(DAYS_OFFSET_FOR_ANALTYICS)
)
# Check if data is available or not then try for
# n-days_offset_for_analytics-1
if not _is_data_present_for_end_date(kpi_info, true_end_date):
true_end_date = true_end_date - timedelta(days=1)
logger.info("(KPI ID: {kpi_id}) Decreasing end date by 1.")

elif scheduler_frequency == "H":
true_end_date = datetime.now().date()

elif (
end_date is None
and kpi_info["scheduler_params"]["scheduler_frequency"] == "H"
):
end_date = datetime.today().date()
else:
raise ValueError(
f"KPI ID {kpi_id} has invalid scheduler frequency: "
f"{scheduler_frequency}"
)
else:
true_end_date = end_date

logger.info(f"(KPI ID: {kpi_id}) End date is {end_date}.")
logger.info(f"(KPI ID: {kpi_id}) End date is {true_end_date}.")

adc = AnomalyDetectionController(kpi_info, end_date, task_id=task_id)
adc = AnomalyDetectionController(kpi_info, true_end_date, task_id=task_id)
adc.detect()
logger.info(f"Anomaly Detection has completed for KPI ID: {kpi_id}.")


def _get_end_date_for_rca_kpi(kpi_info: dict, end_date: date = None) -> date:
def _get_end_date_for_rca_kpi(kpi_info: dict, end_date: Optional[date] = None) -> date:
# by default we always calculate for n-1
if end_date is None:
end_date = datetime.today().date() - timedelta(
days=(DAYS_OFFSET_FOR_ANALTYICS)
)
end_date = datetime.today().date() - timedelta(days=(DAYS_OFFSET_FOR_ANALTYICS))

count = 0
while not _is_data_present_for_end_date(kpi_info, end_date):
Expand All @@ -113,8 +115,12 @@ def _get_end_date_for_rca_kpi(kpi_info: dict, end_date: date = None) -> date:


def run_rca_for_kpi(
kpi_id: int, end_date: date = None, task_id: Optional[int] = None
kpi_id: int, end_date: Optional[date] = None, task_id: Optional[int] = None
) -> bool:
"""Runs DeepDrills for given kpi_id.

Blocking function (it does NOT spawn a celery task).
"""
try:
logger.info(f"Starting RCA for KPI ID: {kpi_id}.")
kpi_info = get_kpi_data_from_id(kpi_id)
Expand All @@ -128,12 +134,13 @@ def run_rca_for_kpi(
task_id, kpi_id, "DeepDrills", "Data Loader and Validation"
)
logger.info(
f"(Task: {task_id}, KPI: {kpi_id}) DeepDrills - Data Loader and Validation - Success",
(
f"(Task: {task_id}, KPI: {kpi_id}) DeepDrills - Data Loader and "
"Validation - Success"
),
)
except Exception as e: # noqa: B902
logger.error(
f"Getting end date failed for KPI: {kpi_id}.", exc_info=e
)
logger.error(f"Getting end date failed for KPI: {kpi_id}.", exc_info=e)
if task_id is not None:
checkpoint_failure(
task_id,
Expand All @@ -143,7 +150,10 @@ def run_rca_for_kpi(
e,
)
logger.error(
f"(Task: {task_id}, KPI: {kpi_id}) DeepDrills - Data Loader and Validation - Exception occured.",
(
f"(Task: {task_id}, KPI: {kpi_id}) DeepDrills - Data Loader and"
" Validation - Exception occured."
),
exc_info=e,
)
return False
Expand All @@ -155,48 +165,42 @@ def run_rca_for_kpi(
logger.info(f"Completed RCA for KPI ID: {kpi_id}.")

except Exception as e: # noqa: B902
logger.error(
f"RCA encountered an error for KPI ID: {kpi_id}", exc_info=e
)
logger.error(f"RCA encountered an error for KPI ID: {kpi_id}", exc_info=e)
if task_id is not None:
checkpoint_failure(
task_id, kpi_id, "DeepDrills", "DeepDrills complete", e
)
checkpoint_failure(task_id, kpi_id, "DeepDrills", "DeepDrills complete", e)
return False

if task_id is not None:
checkpoint_success(
task_id, kpi_id, "DeepDrills", "DeepDrills complete"
)
checkpoint_success(task_id, kpi_id, "DeepDrills", "DeepDrills complete")

return True


def get_anomaly_kpis() -> Iterator[Kpi]:
"""Returns a list of all KPIs for which anomaly needs to run."""
kpis = Kpi.query.distinct("kpi_id").filter(
(Kpi.run_anomaly == True) & (Kpi.active == True)
(Kpi.run_anomaly == True) & (Kpi.active == True) # noqa: E712
)
return kpis


def get_active_kpis() -> Iterator[Kpi]:
"""Returns a list of all active KPIs."""
kpis = Kpi.query.distinct("kpi_id").filter(
(Kpi.active == True) & (Kpi.is_static == False)
(Kpi.active == True) & (Kpi.is_static == False) # noqa: E712
)
return kpis


def get_anomaly_data(
kpi_ids: List[int],
anomaly_types: List[str] = None,
anomaly_types: Optional[List[str]] = None,
anomalies_only: bool = False,
start_timestamp: datetime = None,
start_timestamp: Optional[datetime] = None,
include_start_timestamp: bool = True,
end_timestamp: datetime = None,
end_timestamp: Optional[datetime] = None,
include_end_timestamp: bool = True,
severity_cutoff: float = None,
severity_cutoff: Optional[float] = None,
) -> List[AnomalyDataOutput]:
"""Returns list of anomaly points using paramters to filter the output."""
filters = []
Expand Down Expand Up @@ -273,8 +277,8 @@ def delete_anomaly_output_for_kpi(kpi_id: int):
db.session.commit()


def get_anomaly_count(kpi_id, timeline):

def get_anomaly_count(kpi_id: int, timeline: str):
"""Return a count of anomalies for given kpi_id and given timeline."""
curr_date = datetime.now().date()
(_, _), (sd, _) = TIME_RANGES_BY_KEY[timeline]["function"](curr_date)

Expand Down