diff options
| author | tanghao <[email protected]> | 2024-08-23 09:58:52 +0800 |
|---|---|---|
| committer | tanghao <[email protected]> | 2024-08-23 09:58:52 +0800 |
| commit | 82e117ef759c37e26ef3dab90533139dfc658c7f (patch) | |
| tree | a87738c244196788025dc9e2285ca75a0c4609f0 | |
| parent | f6176542c577e32f24cb31bc6532c83c656d79e3 (diff) | |
fix: 修复clickhouse频繁报超时异常问题
3 files changed, 22 insertions, 16 deletions
diff --git a/cn-admin/src/main/java/net/geedge/common/config/ClickhouseConfig.java b/cn-admin/src/main/java/net/geedge/common/config/ClickhouseConfig.java index fed2073..f4cbe72 100644 --- a/cn-admin/src/main/java/net/geedge/common/config/ClickhouseConfig.java +++ b/cn-admin/src/main/java/net/geedge/common/config/ClickhouseConfig.java @@ -32,10 +32,4 @@ public class ClickhouseConfig { ClickHouseNode.of(url)).credentials(credentials).build(); return server; } - - @Bean - public ClickHouseClient clickHouseClient() { - ClickHouseClient client = ClickHouseClient.newInstance(clickHouseNode().getProtocol()); - return client; - } } diff --git a/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java b/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java index 17a9874..68e3391 100644 --- a/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java +++ b/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java @@ -39,8 +39,6 @@ import net.geedge.modules.detection.entity.MatchDto; public class DetectionRunner implements CommandLineRunner{ private Log log = Log.get(); - @Autowired - private ClickHouseClient clickHouseClient; @Autowired private ClickHouseNode clickHouseNode; @@ -63,6 +61,7 @@ public class DetectionRunner implements CommandLineRunner{ SnowflakeId snowflakeId = new SnowflakeId(datacenterId,workNumId); try { while(true) { + ClickHouseResponse resp = null; try { Map<String,Event> eventMap = new HashMap<String,Event>(); ConsumerRecords<Object, Object> datas = kafkaConsumer.poll(Duration.ofSeconds(60)); @@ -167,10 +166,11 @@ public class DetectionRunner implements CommandLineRunner{ String batchInsertSql = sb.substring(0, sb.length()-1); //批量存储到clickhouse - clickHouseClient.write(clickHouseNode) - .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) - .query(batchInsertSql) - .executeAndWait(); + ClickHouseClient clickHouseClient = ClickHouseClient.newInstance(clickHouseNode.getProtocol()); + resp = clickHouseClient.write(clickHouseNode) + .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) + .query(batchInsertSql) + .executeAndWait(); log.info("[DetectionRunner] [Ck Save Data Finished]"); }else { log.info("[DetectionRunner] [ Data None]"); @@ -179,6 +179,10 @@ public class DetectionRunner implements CommandLineRunner{ }catch(Exception e) { e.printStackTrace(); log.error("[DetectionRunner] [Error Info :{}]",e.getMessage()); + }finally{ + if(ObjectUtil.isNotEmpty(resp)){ + resp.close(); + } } } }finally { @@ -212,6 +216,7 @@ public class DetectionRunner implements CommandLineRunner{ + " HAVING" + " status1 = 0" + " ORDER BY end_time DESC"; + ClickHouseClient clickHouseClient = ClickHouseClient.newInstance(clickHouseNode.getProtocol()); ClickHouseResponse response = clickHouseClient.write(clickHouseNode) .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) .query(cacheSql) @@ -236,6 +241,7 @@ public class DetectionRunner implements CommandLineRunner{ event.setStatus(0); eventCache.put(StrUtil.join(StrUtil.DASHED,event.getKeyFields(),event.getKeyValues(),event.getRuleId(),event.getRuleVersion()), event); } + response.close(); log.info("[DetectionRunner] [Get Ck Data To Cache ] [Finished]"); } } diff --git a/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java b/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java index b0f9302..5deeb55 100644 --- a/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java +++ b/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java @@ -5,6 +5,8 @@ import java.util.Date; import java.util.List; import java.util.Map; +import com.alibaba.fastjson.util.IOUtils; +import com.clickhouse.client.ClickHouseResponse; import org.quartz.DisallowConcurrentExecution; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; @@ -30,15 +32,14 @@ import net.geedge.modules.detection.entity.Event; @DisallowConcurrentExecution public class DetectionEventExpiredJob extends QuartzJobBean { private Log log = Log.get(); - - @Autowired - private ClickHouseClient clickHouseClient; @Autowired private ClickHouseNode clickHouseNode; @Override protected void executeInternal(JobExecutionContext arg0){ + + ClickHouseResponse resp = null; try { // 获取缓存中的event数据 Map<String, Event> cache = Context.DETECTTION_EVENT_CACHE; @@ -74,7 +75,8 @@ public class DetectionEventExpiredJob extends QuartzJobBean { String batchInsertSql = sb.substring(0, sb.length() - 1); log.info("[DetectionEventExpiredJob] [Save Event Data] [SQL: {}]", batchInsertSql); //批量存储到clickhouse - clickHouseClient.write(clickHouseNode) + ClickHouseClient clickHouseClient = ClickHouseClient.newInstance(clickHouseNode.getProtocol()); + resp = clickHouseClient.write(clickHouseNode) .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) .query(batchInsertSql) .executeAndWait(); @@ -91,6 +93,10 @@ public class DetectionEventExpiredJob extends QuartzJobBean { }catch(Exception e){ e.printStackTrace(); log.error("[DetectionEventExpiredJob] [Error info] : {}",e.getMessage()); + }finally{ + if(ObjectUtils.isNotEmpty(resp)){ + resp.close(); + } } } } |
