summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgujinkai <[email protected]>2024-01-12 15:42:02 +0800
committergujinkai <[email protected]>2024-01-12 15:42:02 +0800
commit2e07e9dc30a9af905682190cb8ee61590c5a81e5 (patch)
tree44f74f703838b97b6d20562907d1c7e634ad5608
parente67b7dc6f4d865ed336f5b8081b5e0cb104f61e1 (diff)
feat: change indicator sink from kafka to clickhouse
-rw-r--r--module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/IndicatorSchedule.java5
-rw-r--r--module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/common/CommonConfig.java5
2 files changed, 8 insertions, 2 deletions
diff --git a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/IndicatorSchedule.java b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/IndicatorSchedule.java
index e67ae9e..44fa3cb 100644
--- a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/IndicatorSchedule.java
+++ b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/IndicatorSchedule.java
@@ -3,7 +3,7 @@ package com.zdjizhi.schedule.indicator;
import com.zdjizhi.base.common.CnRecordLog;
import com.zdjizhi.base.config.Configs;
import com.zdjizhi.base.platform.Schedule;
-import com.zdjizhi.base.utils.KafkaUtils;
+import com.zdjizhi.base.sink.clickhouse.ClickHouseTableFactory;
import com.zdjizhi.etl.CnRecordEtl;
import com.zdjizhi.schedule.indicator.common.CommonConfig;
import com.zdjizhi.schedule.indicator.common.serialization.FastjsonObjectSerializationSchema;
@@ -45,7 +45,8 @@ public class IndicatorSchedule implements Schedule {
securityStream.map(new FastjsonObjectSerializationSchema<>())
.name("FastjsonObjectSerialization").uid("fastjson-object-serialization")
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.SINK_TOPIC_NAME)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.SINK_TABLE_NAME)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.SINK_TOPIC_NAME)))
.name("IndicatorMatchSink").uid("indicator-match-sink");
}
}
diff --git a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/common/CommonConfig.java b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/common/CommonConfig.java
index 1eb7dc0..ae2dfb8 100644
--- a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/common/CommonConfig.java
+++ b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/common/CommonConfig.java
@@ -24,6 +24,11 @@ public final class CommonConfig {
.defaultValue("SECURITY-EVENT-CN")
.withDescription("The name of the Kafka sink topic.");
+ public static final ConfigOption<String> SINK_TABLE_NAME = ConfigOptions.key("sink.clickhouse.table")
+ .stringType()
+ .defaultValue("cn_security_event_local")
+ .withDescription("The name of the Kafka sink topic.");
+
/**
* Configuration option for the full URL of a rule.
*/