forked from otzslayer/lgcns-mlops-template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdrift_detection.py
173 lines (138 loc) · 4.69 KB
/
drift_detection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import json
import os
import sys
import warnings
from datetime import datetime
import joblib
import numpy as np
import pandas as pd
from deepchecks import SuiteResult
from deepchecks.core.suite import SuiteResult
from deepchecks.tabular import Dataset
from deepchecks.tabular.suites import model_evaluation, train_test_validation
from src.common.constants import (
ARTIFACT_PATH,
DATA_PATH,
DRIFT_DETECTION_PATH,
LOG_FILEPATH,
)
from src.common.logger import handle_exception, set_logger
from src.preprocess import CAT_FEATURES, preprocess_pipeline
logger = set_logger(os.path.join(LOG_FILEPATH, "logs.log"))
sys.excepthook = handle_exception
warnings.filterwarnings(action="ignore")
DATE = datetime.now().strftime("%Y%m%d")
LABEL_NAME = "rent"
model = joblib.load(os.path.join(ARTIFACT_PATH, "model.pkl"))
def load_data(filename: str) -> pd.DataFrame:
return pd.read_csv(
os.path.join(DATA_PATH, filename),
usecols=lambda x: x not in ["area_locality", "posted_on", "id"],
)
def log_failed_check_info(suite_result: SuiteResult):
for result in suite_result.get_not_passed_checks():
logger.info(
"The following test failed!\n"
f"{result.header}: {result.conditions_results[0].details}"
)
def get_drift_test(suite_result: SuiteResult, test_name: str) -> dict:
result_json = json.loads(suite_result.to_json())
test_result = [
x
for x in result_json.get("results")
if x.get("check").get("name") == test_name
][0]
conditions_results = test_result.get("conditions_results")[0]
value = test_result.get("value").get("Drift score").get("value")
conditions_results["value"] = value
return conditions_results
def data_drift_detection(
train_df: pd.DataFrame,
new_df: pd.DataFrame,
label: str,
cat_features: str,
save_as_html: bool = False,
) -> None:
# TODO: Dataset 클래스를 이용해 train_set과 new_set을 만들 것
train_set = Dataset(train_df, label=label, cat_features=cat_features)
new_set = Dataset(new_df, label=label, cat_features=cat_features)
validation_suite = train_test_validation()
# TODO: Data Drift 결과를 얻기 위해 suite 실행
suite_result = validation_suite.run(train_set, new_set)
log_failed_check_info(suite_result=suite_result)
if save_as_html:
suite_result.save_as_html(
os.path.join(DRIFT_DETECTION_PATH, f"{DATE}_data_drift.html")
)
def model_drift_detection(
train_df: pd.DataFrame,
new_df: pd.DataFrame,
label: str,
cat_features: str,
save_as_json: bool = True,
save_as_html: bool = False,
) -> None:
def get_xy(df: pd.DataFrame):
y = np.log1p(df[label])
x = preprocess_pipeline.fit_transform(X=df.drop([label], axis=1), y=y)
return x, y
x_train, y_train = get_xy(train_df)
x_new, y_new = get_xy(new_df)
train_set = Dataset(
x_train,
label=y_train,
cat_features=cat_features,
)
new_set = Dataset(
x_new,
label=y_new,
cat_features=cat_features,
)
evaluation_suite = model_evaluation()
# TODO: Model Drift 결과를 얻기 위해 suite 실행
suite_result = evaluation_suite.run(
train_set, new_set, model["gradient_booster"]
)
log_failed_check_info(suite_result=suite_result)
# Prediction Drift 정보만 저장
if save_as_json:
prediction_drift = get_drift_test(
suite_result=suite_result, test_name="Prediction Drift"
)
json_obj = json.dumps(prediction_drift, indent=4)
with open("./prediction_drift.json", "w") as file:
file.write(json_obj)
if save_as_html:
suite_result.save_as_html(
os.path.join(DRIFT_DETECTION_PATH, f"{DATE}_model_drift.html")
)
def main():
train_df = load_data(filename="house_rent_train.csv")
new_df = load_data(filename="house_rent_new.csv")
logger.debug(f"{train_df.info()}")
logger.debug(f"{new_df.info()}")
logger.info("Detect data drift")
data_drift_detection(
# TODO: Data drift detection 함수 인자 추가
train_df=train_df,
new_df=new_df,
label=LABEL_NAME,
cat_features=CAT_FEATURES,
save_as_html=True,
)
logger.info("Detect model drift")
model_drift_detection(
# TODO: Model drift detection 함수 인자 추가
train_df=train_df,
new_df=new_df,
label=LABEL_NAME,
cat_features=CAT_FEATURES,
save_as_json=True,
save_as_html=True,
)
logger.info(
"Detection results can be found in the following path:\n"
f"{DRIFT_DETECTION_PATH}"
)
if __name__ == "__main__":
main()