summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortanghao <[email protected]>2024-08-23 09:58:52 +0800
committertanghao <[email protected]>2024-08-23 09:58:52 +0800
commit82e117ef759c37e26ef3dab90533139dfc658c7f (patch)
treea87738c244196788025dc9e2285ca75a0c4609f0
parentf6176542c577e32f24cb31bc6532c83c656d79e3 (diff)
fix: 修复clickhouse频繁报超时异常问题
-rw-r--r--cn-admin/src/main/java/net/geedge/common/config/ClickhouseConfig.java6
-rw-r--r--cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java18
-rw-r--r--cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java14
3 files changed, 22 insertions, 16 deletions
diff --git a/cn-admin/src/main/java/net/geedge/common/config/ClickhouseConfig.java b/cn-admin/src/main/java/net/geedge/common/config/ClickhouseConfig.java
index fed2073..f4cbe72 100644
--- a/cn-admin/src/main/java/net/geedge/common/config/ClickhouseConfig.java
+++ b/cn-admin/src/main/java/net/geedge/common/config/ClickhouseConfig.java
@@ -32,10 +32,4 @@ public class ClickhouseConfig {
ClickHouseNode.of(url)).credentials(credentials).build();
return server;
}
-
- @Bean
- public ClickHouseClient clickHouseClient() {
- ClickHouseClient client = ClickHouseClient.newInstance(clickHouseNode().getProtocol());
- return client;
- }
}
diff --git a/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java b/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java
index 17a9874..68e3391 100644
--- a/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java
+++ b/cn-admin/src/main/java/net/geedge/common/config/DetectionRunner.java
@@ -39,8 +39,6 @@ import net.geedge.modules.detection.entity.MatchDto;
public class DetectionRunner implements CommandLineRunner{
private Log log = Log.get();
- @Autowired
- private ClickHouseClient clickHouseClient;
@Autowired
private ClickHouseNode clickHouseNode;
@@ -63,6 +61,7 @@ public class DetectionRunner implements CommandLineRunner{
SnowflakeId snowflakeId = new SnowflakeId(datacenterId,workNumId);
try {
while(true) {
+ ClickHouseResponse resp = null;
try {
Map<String,Event> eventMap = new HashMap<String,Event>();
ConsumerRecords<Object, Object> datas = kafkaConsumer.poll(Duration.ofSeconds(60));
@@ -167,10 +166,11 @@ public class DetectionRunner implements CommandLineRunner{
String batchInsertSql = sb.substring(0, sb.length()-1);
//批量存储到clickhouse
- clickHouseClient.write(clickHouseNode)
- .format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
- .query(batchInsertSql)
- .executeAndWait();
+ ClickHouseClient clickHouseClient = ClickHouseClient.newInstance(clickHouseNode.getProtocol());
+ resp = clickHouseClient.write(clickHouseNode)
+ .format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
+ .query(batchInsertSql)
+ .executeAndWait();
log.info("[DetectionRunner] [Ck Save Data Finished]");
}else {
log.info("[DetectionRunner] [ Data None]");
@@ -179,6 +179,10 @@ public class DetectionRunner implements CommandLineRunner{
}catch(Exception e) {
e.printStackTrace();
log.error("[DetectionRunner] [Error Info :{}]",e.getMessage());
+ }finally{
+ if(ObjectUtil.isNotEmpty(resp)){
+ resp.close();
+ }
}
}
}finally {
@@ -212,6 +216,7 @@ public class DetectionRunner implements CommandLineRunner{
+ " HAVING"
+ " status1 = 0"
+ " ORDER BY end_time DESC";
+ ClickHouseClient clickHouseClient = ClickHouseClient.newInstance(clickHouseNode.getProtocol());
ClickHouseResponse response = clickHouseClient.write(clickHouseNode)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query(cacheSql)
@@ -236,6 +241,7 @@ public class DetectionRunner implements CommandLineRunner{
event.setStatus(0);
eventCache.put(StrUtil.join(StrUtil.DASHED,event.getKeyFields(),event.getKeyValues(),event.getRuleId(),event.getRuleVersion()), event);
}
+ response.close();
log.info("[DetectionRunner] [Get Ck Data To Cache ] [Finished]");
}
}
diff --git a/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java b/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java
index b0f9302..5deeb55 100644
--- a/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java
+++ b/cn-admin/src/main/java/net/geedge/modules/detection/job/DetectionEventExpiredJob.java
@@ -5,6 +5,8 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
+import com.alibaba.fastjson.util.IOUtils;
+import com.clickhouse.client.ClickHouseResponse;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@@ -30,15 +32,14 @@ import net.geedge.modules.detection.entity.Event;
@DisallowConcurrentExecution
public class DetectionEventExpiredJob extends QuartzJobBean {
private Log log = Log.get();
-
- @Autowired
- private ClickHouseClient clickHouseClient;
@Autowired
private ClickHouseNode clickHouseNode;
@Override
protected void executeInternal(JobExecutionContext arg0){
+
+ ClickHouseResponse resp = null;
try {
// 获取缓存中的event数据
Map<String, Event> cache = Context.DETECTTION_EVENT_CACHE;
@@ -74,7 +75,8 @@ public class DetectionEventExpiredJob extends QuartzJobBean {
String batchInsertSql = sb.substring(0, sb.length() - 1);
log.info("[DetectionEventExpiredJob] [Save Event Data] [SQL: {}]", batchInsertSql);
//批量存储到clickhouse
- clickHouseClient.write(clickHouseNode)
+ ClickHouseClient clickHouseClient = ClickHouseClient.newInstance(clickHouseNode.getProtocol());
+ resp = clickHouseClient.write(clickHouseNode)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query(batchInsertSql)
.executeAndWait();
@@ -91,6 +93,10 @@ public class DetectionEventExpiredJob extends QuartzJobBean {
}catch(Exception e){
e.printStackTrace();
log.error("[DetectionEventExpiredJob] [Error info] : {}",e.getMessage());
+ }finally{
+ if(ObjectUtils.isNotEmpty(resp)){
+ resp.close();
+ }
}
}
}