diff options
| author | tanghao <[email protected]> | 2024-08-27 09:48:24 +0800 |
|---|---|---|
| committer | tanghao <[email protected]> | 2024-08-27 09:48:24 +0800 |
| commit | 0d4a37daefceba1529b4da5cecdac53338b74ac1 (patch) | |
| tree | 2372937abab200f3fec95606d570a500c5e47479 | |
| parent | 6b95eb0036521f4ba18450f5577db0e5f5db1264 (diff) | |
fix: 优化Detection代码逻辑 关闭已完成client 释放资源
| -rw-r--r-- | cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java | 8 | ||||
| -rw-r--r-- | cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java | 6 |
2 files changed, 12 insertions, 2 deletions
diff --git a/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java b/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java index 68e3391..6f24a4e 100644 --- a/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java +++ b/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java @@ -62,6 +62,7 @@ public class DetectionRunner implements CommandLineRunner{ try { while(true) { ClickHouseResponse resp = null; + ClickHouseClient clickHouseClient = null; try { Map<String,Event> eventMap = new HashMap<String,Event>(); ConsumerRecords<Object, Object> datas = kafkaConsumer.poll(Duration.ofSeconds(60)); @@ -166,11 +167,12 @@ public class DetectionRunner implements CommandLineRunner{ String batchInsertSql = sb.substring(0, sb.length()-1); //批量存储到clickhouse - ClickHouseClient clickHouseClient = ClickHouseClient.newInstance(clickHouseNode.getProtocol()); + clickHouseClient = ClickHouseClient.newInstance(clickHouseNode.getProtocol()); resp = clickHouseClient.write(clickHouseNode) .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) .query(batchInsertSql) .executeAndWait(); + log.info("[DetectionRunner] [Ck Save Data Finished]"); }else { log.info("[DetectionRunner] [ Data None]"); @@ -183,6 +185,9 @@ public class DetectionRunner implements CommandLineRunner{ if(ObjectUtil.isNotEmpty(resp)){ resp.close(); } + if(ObjectUtil.isNotEmpty(clickHouseClient)){ + clickHouseClient.close(); + } } } }finally { @@ -242,6 +247,7 @@ public class DetectionRunner implements CommandLineRunner{ eventCache.put(StrUtil.join(StrUtil.DASHED,event.getKeyFields(),event.getKeyValues(),event.getRuleId(),event.getRuleVersion()), event); } response.close(); + clickHouseClient.close(); log.info("[DetectionRunner] [Get Ck Data To Cache ] [Finished]"); } } diff --git a/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java b/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java index 5deeb55..e1694af 100644 --- a/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java +++ b/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java @@ -40,6 +40,7 @@ public class DetectionEventExpiredJob extends QuartzJobBean { protected void executeInternal(JobExecutionContext arg0){ ClickHouseResponse resp = null; + ClickHouseClient clickHouseClient = null; try { // 获取缓存中的event数据 Map<String, Event> cache = Context.DETECTTION_EVENT_CACHE; @@ -75,7 +76,7 @@ public class DetectionEventExpiredJob extends QuartzJobBean { String batchInsertSql = sb.substring(0, sb.length() - 1); log.info("[DetectionEventExpiredJob] [Save Event Data] [SQL: {}]", batchInsertSql); //批量存储到clickhouse - ClickHouseClient clickHouseClient = ClickHouseClient.newInstance(clickHouseNode.getProtocol()); + clickHouseClient = ClickHouseClient.newInstance(clickHouseNode.getProtocol()); resp = clickHouseClient.write(clickHouseNode) .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) .query(batchInsertSql) @@ -97,6 +98,9 @@ public class DetectionEventExpiredJob extends QuartzJobBean { if(ObjectUtils.isNotEmpty(resp)){ resp.close(); } + if(ObjectUtils.isNotEmpty(clickHouseClient)){ + clickHouseClient.close(); + } } } } |
