summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgujinkai <[email protected]>2024-03-08 09:47:02 +0800
committergujinkai <[email protected]>2024-03-29 16:06:22 +0800
commit4192cea1eeb8cbc5beed0bcec8b04600504fab4c (patch)
treef567ae0861288aafe1b54599a03b25e3906ffbcb
parent581a4d488cccc261cd5f6f4b235111e66b57569e (diff)
fix: change sink from kafka to clickhouse
-rw-r--r--module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/IndicatorSchedule.java6
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java46
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/DnsPreMetric.java30
-rw-r--r--module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java10
-rw-r--r--platform-etl/src/main/java/com/zdjizhi/etl/CnRecordPersistence.java6
5 files changed, 49 insertions, 49 deletions
diff --git a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/IndicatorSchedule.java b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/IndicatorSchedule.java
index 4b09eba..44fa3cb 100644
--- a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/IndicatorSchedule.java
+++ b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/IndicatorSchedule.java
@@ -3,7 +3,7 @@ package com.zdjizhi.schedule.indicator;
import com.zdjizhi.base.common.CnRecordLog;
import com.zdjizhi.base.config.Configs;
import com.zdjizhi.base.platform.Schedule;
-import com.zdjizhi.base.utils.KafkaUtils;
+import com.zdjizhi.base.sink.clickhouse.ClickHouseTableFactory;
import com.zdjizhi.etl.CnRecordEtl;
import com.zdjizhi.schedule.indicator.common.CommonConfig;
import com.zdjizhi.schedule.indicator.common.serialization.FastjsonObjectSerializationSchema;
@@ -45,8 +45,8 @@ public class IndicatorSchedule implements Schedule {
securityStream.map(new FastjsonObjectSerializationSchema<>())
.name("FastjsonObjectSerialization").uid("fastjson-object-serialization")
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.SINK_TABLE_NAME)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.SINK_TOPIC_NAME)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.SINK_TABLE_NAME)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.SINK_TOPIC_NAME)))
.name("IndicatorMatchSink").uid("indicator-match-sink");
}
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java
index 4a63216..22c8a9d 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java
@@ -3,7 +3,7 @@ package com.zdjizhi.pre;
import com.zdjizhi.base.common.CnRecordLog;
import com.zdjizhi.base.config.Configs;
import com.zdjizhi.base.platform.Schedule;
-import com.zdjizhi.base.utils.KafkaUtils;
+import com.zdjizhi.base.sink.clickhouse.ClickHouseTableFactory;
import com.zdjizhi.etl.CnRecordEtl;
import com.zdjizhi.pre.common.CnMetricLog;
import com.zdjizhi.pre.common.CommonConfig;
@@ -41,8 +41,8 @@ public class CnPreMetric implements Schedule {
.window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
.reduce(new SecondAggregationReduce(), new MetricIpProcessWindowFunc("client"));
clientIpMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_IP_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_IP_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_IP_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_IP_TOPIC)))
.setParallelism(outputParallelism);
SingleOutputStreamOperator<String> serverIpMetric = process.filter((log) -> log.getMetricKey() == MetricKeyConfig.SERVER_IP)
@@ -50,8 +50,8 @@ public class CnPreMetric implements Schedule {
.window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
.reduce(new SecondAggregationReduce(), new MetricIpProcessWindowFunc("server"));
serverIpMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_IP_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_IP_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_IP_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_IP_TOPIC)))
.setParallelism(outputParallelism);
//metric_region
@@ -66,8 +66,8 @@ public class CnPreMetric implements Schedule {
.window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
.reduce(new SecondAggregationReduce(), new MetricRegionProcessWindowFunc("client"));
clientRegionMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_REGION_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_REGION_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_REGION_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_REGION_TOPIC)))
.setParallelism(outputParallelism);
SingleOutputStreamOperator<String> serverRegionMetric = process
@@ -81,8 +81,8 @@ public class CnPreMetric implements Schedule {
.window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
.reduce(new SecondAggregationReduce(), new MetricRegionProcessWindowFunc("server"));
serverRegionMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_REGION_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_REGION_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_REGION_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_REGION_TOPIC)))
.setParallelism(outputParallelism);
// //metric_asn
@@ -97,8 +97,8 @@ public class CnPreMetric implements Schedule {
.window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
.reduce(new SecondAggregationReduce(), new MetricAsnProcessFunc("client"));
clientAsnMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_ASN_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_ASN_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_ASN_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_ASN_TOPIC)))
.setParallelism(outputParallelism);
SingleOutputStreamOperator<String> serverAsnMetric = process
@@ -112,8 +112,8 @@ public class CnPreMetric implements Schedule {
.window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
.reduce(new SecondAggregationReduce(), new MetricAsnProcessFunc("server"));
serverAsnMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_ASN_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_ASN_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_ASN_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_ASN_TOPIC)))
.setParallelism(outputParallelism);
//metric_application
@@ -123,8 +123,8 @@ public class CnPreMetric implements Schedule {
.window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
.reduce(new SecondAggregationReduce(), new MetricAppProcessFunc());
appMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_APPLICATION_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_APPLICATION_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_APPLICATION_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_APPLICATION_TOPIC)))
.setParallelism(outputParallelism);
//metric_domain
@@ -134,8 +134,8 @@ public class CnPreMetric implements Schedule {
.window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
.reduce(new SecondAggregationReduce(), new MetricDomainProcessFunc());
domainMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DOMAIN_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DOMAIN_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DOMAIN_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DOMAIN_TOPIC)))
.setParallelism(outputParallelism);
//metric_protocol
@@ -150,8 +150,8 @@ public class CnPreMetric implements Schedule {
.window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
.reduce(new SecondAggregationReduce(), new MetricProtocolProcessFunc());
protocolMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_PROTOCOL_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_PROTOCOL_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_PROTOCOL_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_PROTOCOL_TOPIC)))
.setParallelism(outputParallelism);
//metric_link
@@ -169,8 +169,8 @@ public class CnPreMetric implements Schedule {
.window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
.reduce(new SecondAggregationReduce(), new MetricLinkProcessFunc());
linkMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_LINK_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_LINK_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_LINK_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_LINK_TOPIC)))
.setParallelism(outputParallelism);
SingleOutputStreamOperator<String> subscriberAppMetric = process.filter((log) -> log.getMetricKey() == MetricKeyConfig.SUBSCRIBER_APP)
@@ -183,8 +183,8 @@ public class CnPreMetric implements Schedule {
.window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
.reduce(new SecondAggregationReduce(), new MetricSubscriberAppProcessFunc());
subscriberAppMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_SUBSCRIBER_APP_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_SUBSCRIBER_APP_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_SUBSCRIBER_APP_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_SUBSCRIBER_APP_TOPIC)))
.setParallelism(outputParallelism);
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/DnsPreMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/DnsPreMetric.java
index ca39841..3b8387a 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/DnsPreMetric.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/DnsPreMetric.java
@@ -3,7 +3,7 @@ package com.zdjizhi.pre.dns;
import com.zdjizhi.base.common.CnRecordLog;
import com.zdjizhi.base.config.Configs;
import com.zdjizhi.base.platform.Schedule;
-import com.zdjizhi.base.utils.KafkaUtils;
+import com.zdjizhi.base.sink.clickhouse.ClickHouseTableFactory;
import com.zdjizhi.etl.CnRecordEtl;
import com.zdjizhi.pre.common.MetricKeyConfig;
import com.zdjizhi.pre.dns.common.CommonConfig;
@@ -38,8 +38,8 @@ public class DnsPreMetric implements Schedule {
.window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
.reduce(new SecondAggregationReduce(), new MetricDnsServerIpFunc());
serverIpMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_SERVER_IP_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_SERVER_IP_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_SERVER_IP_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_SERVER_IP_TOPIC)))
.setParallelism(outputParallelism);
//metric_dns_qname
@@ -48,8 +48,8 @@ public class DnsPreMetric implements Schedule {
.window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
.reduce(new SecondAggregationReduce(), new MetricDnsQnameFunc());
qnameMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_QNAME_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_QNAME_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_QNAME_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_QNAME_TOPIC)))
.setParallelism(outputParallelism);
//metric_dns_qtype
@@ -58,8 +58,8 @@ public class DnsPreMetric implements Schedule {
.window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
.reduce(new SecondAggregationReduce(), new MetricDnsQtypeFunc());
qtypeMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_QTYPE_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_QTYPE_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_QTYPE_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_QTYPE_TOPIC)))
.setParallelism(outputParallelism);
//metric_dns_rcode
@@ -68,8 +68,8 @@ public class DnsPreMetric implements Schedule {
.window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
.reduce(new SecondAggregationReduce(), new MetricDnsRcodeFunc());
rcodeMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_RCODE_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_RCODE_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_RCODE_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_RCODE_TOPIC)))
.setParallelism(outputParallelism);
//metric_dns_rr_a
@@ -78,8 +78,8 @@ public class DnsPreMetric implements Schedule {
.window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
.reduce(new SecondAggregationReduce(), new MetricDnsRraFunc());
rraMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_RRA_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_RRA_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_RRA_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_RRA_TOPIC)))
.setParallelism(outputParallelism);
//metric_dns_rr_aaaa
@@ -88,8 +88,8 @@ public class DnsPreMetric implements Schedule {
.window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
.reduce(new SecondAggregationReduce(), new MetricDnsRraaaaFunc());
rr4aMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_RRAAAA_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_RRAAAA_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_RRAAAA_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_RRAAAA_TOPIC)))
.setParallelism(outputParallelism);
//metric_dns_rr_cname
@@ -98,8 +98,8 @@ public class DnsPreMetric implements Schedule {
.window(TumblingEventTimeWindows.of(Time.minutes(windowsTime)))
.reduce(new SecondAggregationReduce(), new MetricDnsRrcnameFunc());
rrcnameMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_RRCNAME_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_RRCNAME_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_RRCNAME_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_RRCNAME_TOPIC)))
.setParallelism(outputParallelism);
}
}
diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java
index 18da277..878d4c4 100644
--- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java
+++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java
@@ -4,7 +4,7 @@ import com.alibaba.fastjson2.JSON;
import com.zdjizhi.base.common.CnRecordLog;
import com.zdjizhi.base.config.Configs;
import com.zdjizhi.base.platform.Schedule;
-import com.zdjizhi.base.utils.KafkaUtils;
+import com.zdjizhi.base.sink.clickhouse.ClickHouseTableFactory;
import com.zdjizhi.etl.CnRecordEtl;
import com.zdjizhi.pre.common.CommonConfig;
import com.zdjizhi.pre.relation.common.RelationMetricLog;
@@ -49,8 +49,8 @@ public class CnRelationMetric implements Schedule {
}
});
relationMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_RELATION_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_RELATION_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_RELATION_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_RELATION_TOPIC)))
.setParallelism(Configs.get(CommonConfig.ENTITY_RELATION_OUTPUT_PARALLELISM));
@@ -77,8 +77,8 @@ public class CnRelationMetric implements Schedule {
}
});
attributeMetric
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_ATTRIBUTE_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_ATTRIBUTE_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_ATTRIBUTE_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_ATTRIBUTE_TOPIC)))
.setParallelism(Configs.get(CommonConfig.DYNAMIC_ATTRIBUTE_OUTPUT_PARALLELISM));
}
}
diff --git a/platform-etl/src/main/java/com/zdjizhi/etl/CnRecordPersistence.java b/platform-etl/src/main/java/com/zdjizhi/etl/CnRecordPersistence.java
index 6fa2cab..505ac02 100644
--- a/platform-etl/src/main/java/com/zdjizhi/etl/CnRecordPersistence.java
+++ b/platform-etl/src/main/java/com/zdjizhi/etl/CnRecordPersistence.java
@@ -4,7 +4,7 @@ import com.alibaba.fastjson2.JSON;
import com.zdjizhi.base.common.CommonConfig;
import com.zdjizhi.base.config.Configs;
import com.zdjizhi.base.platform.Schedule;
-import com.zdjizhi.base.utils.KafkaUtils;
+import com.zdjizhi.base.sink.clickhouse.ClickHouseTableFactory;
import java.util.Objects;
@@ -15,8 +15,8 @@ public class CnRecordPersistence implements Schedule {
CnRecordEtl.singleOutputStreamOperator
.filter(Objects::nonNull)
.map(JSON::toJSONString)
- //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.CN_RECORD_TABLE)))
- .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.CN_RECORD_TOPIC)))
+ .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.CN_RECORD_TABLE)))
+ //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.CN_RECORD_TOPIC)))
.setParallelism(Configs.get(CommonConfig.CN_RECORD_PARALLELISM));
}
}