summaryrefslogtreecommitdiff
path: root/evaluation/f1pa.py
diff options
context:
space:
mode:
authorZHENG Yanqin <[email protected]>2023-05-25 07:37:53 +0000
committerZHENG Yanqin <[email protected]>2023-05-25 07:37:53 +0000
commite9896bd62bb29da00ec00a121374167ad91bfe47 (patch)
treed94845574c8ef7473d0204d28b4efd4038035463 /evaluation/f1pa.py
parentfad9aa875c84b38cbb5a6010e104922b1eea7291 (diff)
parent4c5734c624705449c6b21c4b2bc5554e7259fdba (diff)
Merge branch 'master' into 'main'HEADmain
readme See merge request zyq/time_series_anomaly_detection!1
Diffstat (limited to 'evaluation/f1pa.py')
-rw-r--r--evaluation/f1pa.py66
1 files changed, 66 insertions, 0 deletions
diff --git a/evaluation/f1pa.py b/evaluation/f1pa.py
new file mode 100644
index 0000000..be11f98
--- /dev/null
+++ b/evaluation/f1pa.py
@@ -0,0 +1,66 @@
+import numpy as np
+from sklearn.metrics import f1_score, precision_score, recall_score
+
+
+def adjust_predicts(score, label,
+ threshold=None,
+ pred=None,
+ calc_latency=False):
+ """
+ 该函数是point-adjust官方源码
+ Calculate adjusted predict labels using given `score`, `threshold` (or given `pred`) and `label`.
+ Args:
+ score =: The anomaly score
+ label : The ground-truth label
+ threshold (float): The threshold of anomaly score.
+ A point is labeled as "anomaly" if its score is lower than the threshold.
+ pred : if not None, adjust `pred` and ignore `score` and `threshold`,
+ calc_latency (bool):
+ Returns:
+ np.ndarray: predict labels
+ """
+ if len(score) != len(label):
+ raise ValueError("score and label must have the same length")
+ score = np.asarray(score)
+ label = np.asarray(label)
+ latency = 0
+ if pred is None:
+ predict = score < threshold
+ else:
+ predict = pred
+ actual = label > 0.1
+ anomaly_state = False
+ anomaly_count = 0
+ for i in range(len(score)):
+ if actual[i] and predict[i] and not anomaly_state:
+ anomaly_state = True
+ anomaly_count += 1
+ for j in range(i, 0, -1):
+ if not actual[j]:
+ break
+ else:
+ if not predict[j]:
+ predict[j] = True
+ latency += 1
+ elif not actual[i]:
+ anomaly_state = False
+ if anomaly_state:
+ predict[i] = True
+ if calc_latency:
+ return predict, latency / (anomaly_count + 1e-4)
+ else:
+ return predict
+
+
+def evaluate(y_true: list, y_pred: list) -> float:
+ """
+ F1PA评估方法,经过point adjust调整标签后再用F1评分
+ :param y_true: 真实标签
+ :param y_pred: 检测标签
+ :return: 经过pa调整后的f1、recall、precision
+ """
+ y_true, y_pred = y_true.copy(), y_pred.copy()
+ adjust_y_pred = adjust_predicts(score=np.array([0] * len(y_true)), label=y_true, pred=y_pred)
+ f1 = f1_score(y_true, adjust_y_pred)
+ return f1
+