diff options
| author | gujinkai <[email protected]> | 2024-01-12 15:42:02 +0800 |
|---|---|---|
| committer | gujinkai <[email protected]> | 2024-01-12 15:42:02 +0800 |
| commit | 2e07e9dc30a9af905682190cb8ee61590c5a81e5 (patch) | |
| tree | 44f74f703838b97b6d20562907d1c7e634ad5608 | |
| parent | e67b7dc6f4d865ed336f5b8081b5e0cb104f61e1 (diff) | |
feat: change indicator sink from kafka to clickhouse
2 files changed, 8 insertions, 2 deletions
diff --git a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/IndicatorSchedule.java b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/IndicatorSchedule.java index e67ae9e..44fa3cb 100644 --- a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/IndicatorSchedule.java +++ b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/IndicatorSchedule.java @@ -3,7 +3,7 @@ package com.zdjizhi.schedule.indicator; import com.zdjizhi.base.common.CnRecordLog; import com.zdjizhi.base.config.Configs; import com.zdjizhi.base.platform.Schedule; -import com.zdjizhi.base.utils.KafkaUtils; +import com.zdjizhi.base.sink.clickhouse.ClickHouseTableFactory; import com.zdjizhi.etl.CnRecordEtl; import com.zdjizhi.schedule.indicator.common.CommonConfig; import com.zdjizhi.schedule.indicator.common.serialization.FastjsonObjectSerializationSchema; @@ -45,7 +45,8 @@ public class IndicatorSchedule implements Schedule { securityStream.map(new FastjsonObjectSerializationSchema<>()) .name("FastjsonObjectSerialization").uid("fastjson-object-serialization") - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.SINK_TOPIC_NAME))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.SINK_TABLE_NAME))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.SINK_TOPIC_NAME))) .name("IndicatorMatchSink").uid("indicator-match-sink"); } } diff --git a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/common/CommonConfig.java b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/common/CommonConfig.java index 1eb7dc0..ae2dfb8 100644 --- a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/common/CommonConfig.java +++ b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/common/CommonConfig.java @@ -24,6 +24,11 @@ public final class CommonConfig { .defaultValue("SECURITY-EVENT-CN") .withDescription("The name of the Kafka sink topic."); + public static final ConfigOption<String> SINK_TABLE_NAME = ConfigOptions.key("sink.clickhouse.table") + .stringType() + .defaultValue("cn_security_event_local") + .withDescription("The name of the Kafka sink topic."); + /** * Configuration option for the full URL of a rule. */ |
