summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortanghao <[email protected]>2024-08-27 09:48:24 +0800
committertanghao <[email protected]>2024-08-27 09:48:24 +0800
commit0d4a37daefceba1529b4da5cecdac53338b74ac1 (patch)
tree2372937abab200f3fec95606d570a500c5e47479
parent6b95eb0036521f4ba18450f5577db0e5f5db1264 (diff)
fix: 优化Detection代码逻辑 关闭已完成client 释放资源
-rw-r--r--cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java8
-rw-r--r--cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java6
2 files changed, 12 insertions, 2 deletions
diff --git a/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java b/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java
index 68e3391..6f24a4e 100644
--- a/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java
+++ b/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java
@@ -62,6 +62,7 @@ public class DetectionRunner implements CommandLineRunner{
try {
while(true) {
ClickHouseResponse resp = null;
+ ClickHouseClient clickHouseClient = null;
try {
Map<String,Event> eventMap = new HashMap<String,Event>();
ConsumerRecords<Object, Object> datas = kafkaConsumer.poll(Duration.ofSeconds(60));
@@ -166,11 +167,12 @@ public class DetectionRunner implements CommandLineRunner{
String batchInsertSql = sb.substring(0, sb.length()-1);
//批量存储到clickhouse
- ClickHouseClient clickHouseClient = ClickHouseClient.newInstance(clickHouseNode.getProtocol());
+ clickHouseClient = ClickHouseClient.newInstance(clickHouseNode.getProtocol());
resp = clickHouseClient.write(clickHouseNode)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query(batchInsertSql)
.executeAndWait();
+
log.info("[DetectionRunner] [Ck Save Data Finished]");
}else {
log.info("[DetectionRunner] [ Data None]");
@@ -183,6 +185,9 @@ public class DetectionRunner implements CommandLineRunner{
if(ObjectUtil.isNotEmpty(resp)){
resp.close();
}
+ if(ObjectUtil.isNotEmpty(clickHouseClient)){
+ clickHouseClient.close();
+ }
}
}
}finally {
@@ -242,6 +247,7 @@ public class DetectionRunner implements CommandLineRunner{
eventCache.put(StrUtil.join(StrUtil.DASHED,event.getKeyFields(),event.getKeyValues(),event.getRuleId(),event.getRuleVersion()), event);
}
response.close();
+ clickHouseClient.close();
log.info("[DetectionRunner] [Get Ck Data To Cache ] [Finished]");
}
}
diff --git a/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java b/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java
index 5deeb55..e1694af 100644
--- a/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java
+++ b/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java
@@ -40,6 +40,7 @@ public class DetectionEventExpiredJob extends QuartzJobBean {
protected void executeInternal(JobExecutionContext arg0){
ClickHouseResponse resp = null;
+ ClickHouseClient clickHouseClient = null;
try {
// 获取缓存中的event数据
Map<String, Event> cache = Context.DETECTTION_EVENT_CACHE;
@@ -75,7 +76,7 @@ public class DetectionEventExpiredJob extends QuartzJobBean {
String batchInsertSql = sb.substring(0, sb.length() - 1);
log.info("[DetectionEventExpiredJob] [Save Event Data] [SQL: {}]", batchInsertSql);
//批量存储到clickhouse
- ClickHouseClient clickHouseClient = ClickHouseClient.newInstance(clickHouseNode.getProtocol());
+ clickHouseClient = ClickHouseClient.newInstance(clickHouseNode.getProtocol());
resp = clickHouseClient.write(clickHouseNode)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query(batchInsertSql)
@@ -97,6 +98,9 @@ public class DetectionEventExpiredJob extends QuartzJobBean {
if(ObjectUtils.isNotEmpty(resp)){
resp.close();
}
+ if(ObjectUtils.isNotEmpty(clickHouseClient)){
+ clickHouseClient.close();
+ }
}
}
}