summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortanghao <[email protected]>2024-08-22 10:38:29 +0800
committertanghao <[email protected]>2024-08-22 10:38:29 +0800
commit2b4d572e4db7be52b4b68d2cae780a41daaaaf95 (patch)
tree93b4d8f0b692420a3851a5d3a8d8c6e39766bbfd
parentf9e7583fd3044601be38fd6df3b502078eb323f1 (diff)
fix: 修改clickhouse请求方法 detection过期最大持续时间为阈值时间
-rw-r--r--cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java59
-rw-r--r--cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java2
2 files changed, 27 insertions, 34 deletions
diff --git a/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java b/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java
index ad72054..b34f720 100644
--- a/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java
+++ b/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java
@@ -4,7 +4,6 @@ import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -169,7 +168,7 @@ public class DetectionRunner implements CommandLineRunner{
clickHouseClient.write(clickHouseNode)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query(batchInsertSql)
- .execute();
+ .executeAndWait();
log.info("[DetectionRunner] [Ck Save Data Finished]");
}else {
log.info("[DetectionRunner] [ Data None]");
@@ -186,7 +185,7 @@ public class DetectionRunner implements CommandLineRunner{
}
- private void queryEventDataToCache(Map<String, Event> eventCache) {
+ private void queryEventDataToCache(Map<String, Event> eventCache) throws Exception {
log.info("[DetectionRunner] [Get Ck Data To Cache ] [Start]");
// 将ck中状态还在执行的数据查询存入本地缓存
String cacheSql = "SELECT "
@@ -211,36 +210,30 @@ public class DetectionRunner implements CommandLineRunner{
+ " HAVING"
+ " status1 = 0"
+ " ORDER BY end_time DESC";
- CompletableFuture<ClickHouseResponse> execute = clickHouseClient.write(clickHouseNode)
- .format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
- .query(cacheSql)
- .execute();
- try {
- ClickHouseResponse response = execute.get();
- Iterable<ClickHouseRecord> records = response.records();
- for(ClickHouseRecord record : records) {
- Event event = new Event();
- event.setKeyFields(record.getValue(0).asString());
- event.setKeyValues(record.getValue(1).asString());
- event.setRuleId(record.getValue(2).asLong());
- event.setRuleVersion(record.getValue(3).asString());
- event.setMatchIds(record.getValue(4).asString());
- event.setEndTime(record.getValue(5).asLong());
- event.setStartTime(record.getValue(6).asLong());
- event.setRuleType(record.getValue(7).asInteger());
- event.setIsBuiltin(record.getValue(8).asInteger());
- event.setEventType(record.getValue(9).asString());
- event.setEventName(record.getValue(10).asString());
- event.setDurationS(record.getValue(11).asLong());
- event.setReset(record.getValue(12).asInteger());
- event.setEventId(record.getValue(13).asLong());
- event.setStatus(0);
- eventCache.put(StrUtil.join(StrUtil.DASHED,event.getKeyFields(),event.getKeyValues(),event.getRuleId(),event.getRuleVersion()), event);
- }
- log.info("[DetectionRunner] [Get Ck Data To Cache ] [Finished]");
- } catch (Exception e) {
- e.printStackTrace();
- log.error("[DetectionRunner] [Get Ck Data To Cache ] [Error Info :{}]",e.getMessage());
+ ClickHouseResponse response = clickHouseClient.write(clickHouseNode)
+ .format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
+ .query(cacheSql)
+ .executeAndWait();
+ Iterable<ClickHouseRecord> records = response.records();
+ for(ClickHouseRecord record : records) {
+ Event event = new Event();
+ event.setKeyFields(record.getValue(0).asString());
+ event.setKeyValues(record.getValue(1).asString());
+ event.setRuleId(record.getValue(2).asLong());
+ event.setRuleVersion(record.getValue(3).asString());
+ event.setMatchIds(record.getValue(4).asString());
+ event.setEndTime(record.getValue(5).asLong());
+ event.setStartTime(record.getValue(6).asLong());
+ event.setRuleType(record.getValue(7).asInteger());
+ event.setIsBuiltin(record.getValue(8).asInteger());
+ event.setEventType(record.getValue(9).asString());
+ event.setEventName(record.getValue(10).asString());
+ event.setDurationS(record.getValue(11).asLong());
+ event.setReset(record.getValue(12).asInteger());
+ event.setEventId(record.getValue(13).asLong());
+ event.setStatus(0);
+ eventCache.put(StrUtil.join(StrUtil.DASHED,event.getKeyFields(),event.getKeyValues(),event.getRuleId(),event.getRuleVersion()), event);
}
+ log.info("[DetectionRunner] [Get Ck Data To Cache ] [Finished]");
}
}
diff --git a/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java b/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java
index ab2de15..14541da 100644
--- a/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java
+++ b/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java
@@ -52,7 +52,7 @@ public class DetectionEventExpiredJob extends QuartzJobBean {
if(event.getRuleType().equals(Constant.RuleType.INDICATOR_MATHCH.getIndex())) {
event.setEndTime(new Date().getTime()/1000);
}
- event.setDurationS(event.getEndTime()-event.getStartTime());
+ event.setDurationS(event.getReset().longValue() * 60);
events.add(event);
cache.remove(key);
log.info("[DetectionEventExpiredJob] [Remove Event Data Cache] [id : {}]",event.getEventId());