Skip to content
This repository was archived by the owner on Sep 12, 2024. It is now read-only.

Druid configurable count column #985

Merged
merged 5 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chaos_genius/core/anomaly/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def __init__(
kpi_info["data_source"]
).as_dict["connection_type"]
self._preaggregated = conn_type == "Druid"
self._preaggregated_count_col = "count"
self._preaggregated_count_col = self.kpi_info["count_column"]

logger.info(f"Anomaly controller initialized for KPI ID: {kpi_info['id']}")

Expand Down
2 changes: 1 addition & 1 deletion chaos_genius/core/rca/rca_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def __init__(
"connection_type"
]
self._preaggregated = conn_type == "Druid"
self._preaggregated_count_col = "count"
self._preaggregated_count_col = self.kpi_info["count_column"]

self.end_date = load_input_data_end_date(kpi_info, end_date)
logger.info(f"RCA Controller end date: {self.end_date}")
Expand Down
42 changes: 38 additions & 4 deletions chaos_genius/core/utils/kpi_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any, Dict, List, Optional, Tuple, Union

import pandas as pd
from pandas.api.types import is_datetime64_any_dtype as is_datetime
from pandas.api.types import is_datetime64_any_dtype as is_datetime, is_integer_dtype as is_integer

from chaos_genius.core.rca.root_cause_analysis import SUPPORTED_AGGREGATIONS
from chaos_genius.core.utils.data_loader import DataLoader
Expand Down Expand Up @@ -40,14 +40,15 @@ def validate_kpi(
# TODO: Take in connection info as an argument instead of
# getting it here as it will help with mocking for tests.
connection_info = DataSource.get_by_id(kpi_info["data_source"]).as_dict
supports_date_string_parsing = connection_info["name"] == "Druid"
supports_date_string_parsing = connection_info["connection_type"] == "Druid"

status, message = _validate_kpi_from_df(
df,
kpi_info,
kpi_column_name=kpi_info["metric"],
agg_type=kpi_info["aggregation"],
date_column_name=kpi_info["datetime_column"],
count_column_name=kpi_info["count_column"],
supports_date_string_parsing=supports_date_string_parsing,
)

Expand All @@ -68,6 +69,7 @@ def _validate_kpi_from_df(
kpi_column_name: str,
agg_type: str,
date_column_name: str,
count_column_name: Optional[str],
supports_date_string_parsing: bool = False,
) -> Tuple[bool, str]:
"""Invoke each validation check and break if there's a falsy check.
Expand All @@ -82,6 +84,8 @@ def _validate_kpi_from_df(
:type agg_type: str
:param date_column_name: Name of the date column
:type date_column_name: str
:param count_column_name: Name of the count column, relevant for preaggregated data
:type count_column_name: Optional[str]
:param supports_date_string_parsing: Bool for allowing parsing of strings, defaults to False
:type supports_date_string_parsing: bool, optional
:return: returns a tuple with the status as a bool and a status message
Expand Down Expand Up @@ -134,12 +138,19 @@ def _validate_kpi_from_df(
),
},
{
"debug_str": "Check #5: Validate dimensions",
"debug_str": "Check #5: Validate count column is of number type",
"status": lambda: _validate_count_column_is_number(
df,
count_column_name=count_column_name,
),
},
{
"debug_str": "Check #6: Validate dimensions",
"status": lambda: _validate_dimensions(kpi_info),
},
{
"debug_str": (
"Check #6: Validate KPI has no more than "
"Check #7: Validate KPI has no more than "
f"{MAX_ROWS_IN_KPI} rows"
),
"status": lambda: _validate_for_maximum_kpi_size(kpi_info),
Expand Down Expand Up @@ -284,6 +295,29 @@ def _validate_date_column_is_parseable(

return True, "Accepted!"

def _validate_count_column_is_number(
df: pd.core.frame.DataFrame,
count_column_name: Optional[str],
) -> Tuple[bool, str]:
"""Validate if specified date column is parseable.

:param df: A pandas DataFrame
:type df: pd.core.frame.DataFrame
:param count_column_name: Name of the count column, relevant for preaggregated data
:type date_column_name: Optional[str]
:return: returns a tuple with the status as a bool and a status message
:rtype: Tuple[bool, str]
"""
# has to be integer if count_column_name exists, only then proceed else exit
if count_column_name:
if not(is_integer(df[count_column_name])):
invalid_type_err_msg = (
"The count column is of the type"
f" {df[count_column_name].dtype}, use 'cast' to convert to integer."
)
return False, invalid_type_err_msg
return True, "Accepted!"


def _validate_for_maximum_kpi_size(
kpi_info: Dict[str, Any],
Expand Down
8 changes: 8 additions & 0 deletions chaos_genius/databases/models/kpi_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class Kpi(PkModel):
metric = Column(db.Text(), nullable=False)
aggregation = Column(db.String(80), nullable=False)
datetime_column = Column(db.Text(), nullable=False)
count_column = Column(db.Text(), nullable=True, default=None)
filters = Column(db.JSON)
dimensions = Column(db.JSON)
timezone_aware = Column(db.Boolean(), nullable=False, default=False)
Expand Down Expand Up @@ -59,6 +60,7 @@ def safe_dict(self):
"metric": self.metric,
"aggregation": self.aggregation,
"datetime_column": self.datetime_column,
"count_column": self.count_column,
"dimensions": self.dimensions,
"timezone_aware": self.timezone_aware,
"run_anomaly": self.run_anomaly,
Expand All @@ -85,6 +87,7 @@ def as_dict(self):
"metric": self.metric,
"aggregation": self.aggregation,
"datetime_column": self.datetime_column,
"count_column": self.count_column,
"filters": self.filters,
"dimensions": self.dimensions,
"timezone_aware": self.timezone_aware,
Expand Down Expand Up @@ -170,6 +173,11 @@ def meta_info(cls):
"is_editable": True,
"is_sensitive": False,
},
{
"name": "count_column",
"is_editable": True,
"is_sensitive": False,
},
{
"name": "dimensions",
"is_editable": True,
Expand Down
1 change: 1 addition & 0 deletions chaos_genius/views/kpi_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def kpi():
metric=data.get("metric"),
aggregation=data.get("aggregation"),
datetime_column=data.get("datetime_column"),
count_column=data.get("count_column"),
filters=data.get("filters"),
dimensions=data.get("dimensions"),
)
Expand Down
35 changes: 35 additions & 0 deletions migrations/versions/27d55d03f753_add_count_column_upgrade_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Add Count column & upgrade step

Revision ID: 27d55d03f753
Revises: e3625265a039
Create Date: 2022-06-08 14:29:57.525557

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '27d55d03f753'
down_revision = 'e3625265a039'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("kpi", sa.Column("count_column", sa.Text(), nullable=True))
# ### end Alembic commands ###

update_query = """
UPDATE kpi
SET count_column = 'count'
WHERE data_source IN
( SELECT id FROM data_source WHERE connection_type = 'Druid' )
"""
op.execute(update_query)

def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('kpi', 'count_column')
# ### end Alembic commands ###
3 changes: 3 additions & 0 deletions tests/test_anomaly_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime

import pandas as pd
from pandas.core.base import NoNewAttributesMixin
import pytest
from _pytest.monkeypatch import MonkeyPatch
from pandas.testing import assert_frame_equal
Expand All @@ -25,6 +26,7 @@ def load_input_data(file_name):
"kpi_type": "table",
"aggregation": "sum",
"datetime_column": "dt",
"count_column": None,
"table_name": "lyft_data",
"metric": "y",
"anomaly_params": {
Expand All @@ -43,6 +45,7 @@ def load_input_data(file_name):
"kpi_type": "table",
"aggregation": "sum",
"datetime_column": "dt",
"count_column": None,
"table_name": "cloud_cost",
"metric": "y",
"anomaly_params": {
Expand Down
2 changes: 2 additions & 0 deletions tests/test_kpi_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def new_kpi_df(): # noqa: D103
metric="metric_col",
aggregation="sum",
datetime_column="date_col",
count_column=None,
filters=[],
dimensions=["dim1", "dim2"],
)
Expand Down Expand Up @@ -140,6 +141,7 @@ def check_kpi_validation( # noqa: D103
kpi_column_name=kpi_info["metric"],
agg_type=kpi_info["aggregation"],
date_column_name=kpi_info["datetime_column"],
count_column_name=kpi_info["count_column"],
**extra_kpi_validation_data,
)

Expand Down