diff options
| author | tanghao <[email protected]> | 2024-08-22 10:38:29 +0800 |
|---|---|---|
| committer | tanghao <[email protected]> | 2024-08-22 10:38:29 +0800 |
| commit | 2b4d572e4db7be52b4b68d2cae780a41daaaaf95 (patch) | |
| tree | 93b4d8f0b692420a3851a5d3a8d8c6e39766bbfd | |
| parent | f9e7583fd3044601be38fd6df3b502078eb323f1 (diff) | |
fix: 修改clickhouse请求方法 detection过期最大持续时间为阈值时间
| -rw-r--r-- | cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java | 59 | ||||
| -rw-r--r-- | cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java | 2 |
2 files changed, 27 insertions, 34 deletions
diff --git a/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java b/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java index ad72054..b34f720 100644 --- a/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java +++ b/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java @@ -4,7 +4,6 @@ import java.time.Duration; import java.util.Date; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CompletableFuture; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -169,7 +168,7 @@ public class DetectionRunner implements CommandLineRunner{ clickHouseClient.write(clickHouseNode) .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) .query(batchInsertSql) - .execute(); + .executeAndWait(); log.info("[DetectionRunner] [Ck Save Data Finished]"); }else { log.info("[DetectionRunner] [ Data None]"); @@ -186,7 +185,7 @@ public class DetectionRunner implements CommandLineRunner{ } - private void queryEventDataToCache(Map<String, Event> eventCache) { + private void queryEventDataToCache(Map<String, Event> eventCache) throws Exception { log.info("[DetectionRunner] [Get Ck Data To Cache ] [Start]"); // 将ck中状态还在执行的数据查询存入本地缓存 String cacheSql = "SELECT " @@ -211,36 +210,30 @@ public class DetectionRunner implements CommandLineRunner{ + " HAVING" + " status1 = 0" + " ORDER BY end_time DESC"; - CompletableFuture<ClickHouseResponse> execute = clickHouseClient.write(clickHouseNode) - .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) - .query(cacheSql) - .execute(); - try { - ClickHouseResponse response = execute.get(); - Iterable<ClickHouseRecord> records = response.records(); - for(ClickHouseRecord record : records) { - Event event = new Event(); - event.setKeyFields(record.getValue(0).asString()); - event.setKeyValues(record.getValue(1).asString()); - event.setRuleId(record.getValue(2).asLong()); - event.setRuleVersion(record.getValue(3).asString()); - event.setMatchIds(record.getValue(4).asString()); - event.setEndTime(record.getValue(5).asLong()); - event.setStartTime(record.getValue(6).asLong()); - event.setRuleType(record.getValue(7).asInteger()); - event.setIsBuiltin(record.getValue(8).asInteger()); - event.setEventType(record.getValue(9).asString()); - event.setEventName(record.getValue(10).asString()); - event.setDurationS(record.getValue(11).asLong()); - event.setReset(record.getValue(12).asInteger()); - event.setEventId(record.getValue(13).asLong()); - event.setStatus(0); - eventCache.put(StrUtil.join(StrUtil.DASHED,event.getKeyFields(),event.getKeyValues(),event.getRuleId(),event.getRuleVersion()), event); - } - log.info("[DetectionRunner] [Get Ck Data To Cache ] [Finished]"); - } catch (Exception e) { - e.printStackTrace(); - log.error("[DetectionRunner] [Get Ck Data To Cache ] [Error Info :{}]",e.getMessage()); + ClickHouseResponse response = clickHouseClient.write(clickHouseNode) + .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) + .query(cacheSql) + .executeAndWait(); + Iterable<ClickHouseRecord> records = response.records(); + for(ClickHouseRecord record : records) { + Event event = new Event(); + event.setKeyFields(record.getValue(0).asString()); + event.setKeyValues(record.getValue(1).asString()); + event.setRuleId(record.getValue(2).asLong()); + event.setRuleVersion(record.getValue(3).asString()); + event.setMatchIds(record.getValue(4).asString()); + event.setEndTime(record.getValue(5).asLong()); + event.setStartTime(record.getValue(6).asLong()); + event.setRuleType(record.getValue(7).asInteger()); + event.setIsBuiltin(record.getValue(8).asInteger()); + event.setEventType(record.getValue(9).asString()); + event.setEventName(record.getValue(10).asString()); + event.setDurationS(record.getValue(11).asLong()); + event.setReset(record.getValue(12).asInteger()); + event.setEventId(record.getValue(13).asLong()); + event.setStatus(0); + eventCache.put(StrUtil.join(StrUtil.DASHED,event.getKeyFields(),event.getKeyValues(),event.getRuleId(),event.getRuleVersion()), event); } + log.info("[DetectionRunner] [Get Ck Data To Cache ] [Finished]"); } } diff --git a/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java b/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java index ab2de15..14541da 100644 --- a/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java +++ b/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java @@ -52,7 +52,7 @@ public class DetectionEventExpiredJob extends QuartzJobBean { if(event.getRuleType().equals(Constant.RuleType.INDICATOR_MATHCH.getIndex())) { event.setEndTime(new Date().getTime()/1000); } - event.setDurationS(event.getEndTime()-event.getStartTime()); + event.setDurationS(event.getReset().longValue() * 60); events.add(event); cache.remove(key); log.info("[DetectionEventExpiredJob] [Remove Event Data Cache] [id : {}]",event.getEventId()); |
