From 0d4a37daefceba1529b4da5cecdac53338b74ac1 Mon Sep 17 00:00:00 2001 From: tanghao Date: Tue, 27 Aug 2024 09:48:24 +0800 Subject: fix: 优化Detection代码逻辑 关闭已完成client 释放资源 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/net/geedge/common/config/DetectionRunner.java | 8 +++++++- .../geedge/modules/detection/job/DetectionEventExpiredJob.java | 6 +++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java b/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java index 68e3391..6f24a4e 100644 --- a/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java +++ b/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java @@ -62,6 +62,7 @@ public class DetectionRunner implements CommandLineRunner{ try { while(true) { ClickHouseResponse resp = null; + ClickHouseClient clickHouseClient = null; try { Map eventMap = new HashMap(); ConsumerRecords datas = kafkaConsumer.poll(Duration.ofSeconds(60)); @@ -166,11 +167,12 @@ public class DetectionRunner implements CommandLineRunner{ String batchInsertSql = sb.substring(0, sb.length()-1); //批量存储到clickhouse - ClickHouseClient clickHouseClient = ClickHouseClient.newInstance(clickHouseNode.getProtocol()); + clickHouseClient = ClickHouseClient.newInstance(clickHouseNode.getProtocol()); resp = clickHouseClient.write(clickHouseNode) .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) .query(batchInsertSql) .executeAndWait(); + log.info("[DetectionRunner] [Ck Save Data Finished]"); }else { log.info("[DetectionRunner] [ Data None]"); @@ -183,6 +185,9 @@ public class DetectionRunner implements CommandLineRunner{ if(ObjectUtil.isNotEmpty(resp)){ resp.close(); } + if(ObjectUtil.isNotEmpty(clickHouseClient)){ + clickHouseClient.close(); + } } } }finally { @@ -242,6 +247,7 @@ public class DetectionRunner implements CommandLineRunner{ eventCache.put(StrUtil.join(StrUtil.DASHED,event.getKeyFields(),event.getKeyValues(),event.getRuleId(),event.getRuleVersion()), event); } response.close(); + clickHouseClient.close(); log.info("[DetectionRunner] [Get Ck Data To Cache ] [Finished]"); } } diff --git a/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java b/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java index 5deeb55..e1694af 100644 --- a/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java +++ b/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java @@ -40,6 +40,7 @@ public class DetectionEventExpiredJob extends QuartzJobBean { protected void executeInternal(JobExecutionContext arg0){ ClickHouseResponse resp = null; + ClickHouseClient clickHouseClient = null; try { // 获取缓存中的event数据 Map cache = Context.DETECTTION_EVENT_CACHE; @@ -75,7 +76,7 @@ public class DetectionEventExpiredJob extends QuartzJobBean { String batchInsertSql = sb.substring(0, sb.length() - 1); log.info("[DetectionEventExpiredJob] [Save Event Data] [SQL: {}]", batchInsertSql); //批量存储到clickhouse - ClickHouseClient clickHouseClient = ClickHouseClient.newInstance(clickHouseNode.getProtocol()); + clickHouseClient = ClickHouseClient.newInstance(clickHouseNode.getProtocol()); resp = clickHouseClient.write(clickHouseNode) .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) .query(batchInsertSql) @@ -97,6 +98,9 @@ public class DetectionEventExpiredJob extends QuartzJobBean { if(ObjectUtils.isNotEmpty(resp)){ resp.close(); } + if(ObjectUtils.isNotEmpty(clickHouseClient)){ + clickHouseClient.close(); + } } } } -- cgit v1.2.3