diff options
| author | gujinkai <[email protected]> | 2024-03-08 09:47:02 +0800 |
|---|---|---|
| committer | gujinkai <[email protected]> | 2024-03-29 16:06:22 +0800 |
| commit | 4192cea1eeb8cbc5beed0bcec8b04600504fab4c (patch) | |
| tree | f567ae0861288aafe1b54599a03b25e3906ffbcb | |
| parent | 581a4d488cccc261cd5f6f4b235111e66b57569e (diff) | |
fix: change sink from kafka to clickhouse
5 files changed, 49 insertions, 49 deletions
diff --git a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/IndicatorSchedule.java b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/IndicatorSchedule.java index 4b09eba..44fa3cb 100644 --- a/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/IndicatorSchedule.java +++ b/module-CN-indicator-match/src/main/java/com/zdjizhi/schedule/indicator/IndicatorSchedule.java @@ -3,7 +3,7 @@ package com.zdjizhi.schedule.indicator; import com.zdjizhi.base.common.CnRecordLog; import com.zdjizhi.base.config.Configs; import com.zdjizhi.base.platform.Schedule; -import com.zdjizhi.base.utils.KafkaUtils; +import com.zdjizhi.base.sink.clickhouse.ClickHouseTableFactory; import com.zdjizhi.etl.CnRecordEtl; import com.zdjizhi.schedule.indicator.common.CommonConfig; import com.zdjizhi.schedule.indicator.common.serialization.FastjsonObjectSerializationSchema; @@ -45,8 +45,8 @@ public class IndicatorSchedule implements Schedule { securityStream.map(new FastjsonObjectSerializationSchema<>()) .name("FastjsonObjectSerialization").uid("fastjson-object-serialization") - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.SINK_TABLE_NAME))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.SINK_TOPIC_NAME))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.SINK_TABLE_NAME))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.SINK_TOPIC_NAME))) .name("IndicatorMatchSink").uid("indicator-match-sink"); } } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java index 4a63216..22c8a9d 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/CnPreMetric.java @@ -3,7 +3,7 @@ package com.zdjizhi.pre; import com.zdjizhi.base.common.CnRecordLog; import com.zdjizhi.base.config.Configs; import com.zdjizhi.base.platform.Schedule; -import com.zdjizhi.base.utils.KafkaUtils; +import com.zdjizhi.base.sink.clickhouse.ClickHouseTableFactory; import com.zdjizhi.etl.CnRecordEtl; import com.zdjizhi.pre.common.CnMetricLog; import com.zdjizhi.pre.common.CommonConfig; @@ -41,8 +41,8 @@ public class CnPreMetric implements Schedule { .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) .reduce(new SecondAggregationReduce(), new MetricIpProcessWindowFunc("client")); clientIpMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_IP_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_IP_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_IP_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_IP_TOPIC))) .setParallelism(outputParallelism); SingleOutputStreamOperator<String> serverIpMetric = process.filter((log) -> log.getMetricKey() == MetricKeyConfig.SERVER_IP) @@ -50,8 +50,8 @@ public class CnPreMetric implements Schedule { .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) .reduce(new SecondAggregationReduce(), new MetricIpProcessWindowFunc("server")); serverIpMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_IP_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_IP_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_IP_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_IP_TOPIC))) .setParallelism(outputParallelism); //metric_region @@ -66,8 +66,8 @@ public class CnPreMetric implements Schedule { .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) .reduce(new SecondAggregationReduce(), new MetricRegionProcessWindowFunc("client")); clientRegionMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_REGION_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_REGION_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_REGION_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_REGION_TOPIC))) .setParallelism(outputParallelism); SingleOutputStreamOperator<String> serverRegionMetric = process @@ -81,8 +81,8 @@ public class CnPreMetric implements Schedule { .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) .reduce(new SecondAggregationReduce(), new MetricRegionProcessWindowFunc("server")); serverRegionMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_REGION_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_REGION_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_REGION_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_REGION_TOPIC))) .setParallelism(outputParallelism); // //metric_asn @@ -97,8 +97,8 @@ public class CnPreMetric implements Schedule { .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) .reduce(new SecondAggregationReduce(), new MetricAsnProcessFunc("client")); clientAsnMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_ASN_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_ASN_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_ASN_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_ASN_TOPIC))) .setParallelism(outputParallelism); SingleOutputStreamOperator<String> serverAsnMetric = process @@ -112,8 +112,8 @@ public class CnPreMetric implements Schedule { .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) .reduce(new SecondAggregationReduce(), new MetricAsnProcessFunc("server")); serverAsnMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_ASN_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_ASN_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_ASN_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_ASN_TOPIC))) .setParallelism(outputParallelism); //metric_application @@ -123,8 +123,8 @@ public class CnPreMetric implements Schedule { .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) .reduce(new SecondAggregationReduce(), new MetricAppProcessFunc()); appMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_APPLICATION_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_APPLICATION_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_APPLICATION_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_APPLICATION_TOPIC))) .setParallelism(outputParallelism); //metric_domain @@ -134,8 +134,8 @@ public class CnPreMetric implements Schedule { .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) .reduce(new SecondAggregationReduce(), new MetricDomainProcessFunc()); domainMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DOMAIN_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DOMAIN_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DOMAIN_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DOMAIN_TOPIC))) .setParallelism(outputParallelism); //metric_protocol @@ -150,8 +150,8 @@ public class CnPreMetric implements Schedule { .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) .reduce(new SecondAggregationReduce(), new MetricProtocolProcessFunc()); protocolMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_PROTOCOL_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_PROTOCOL_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_PROTOCOL_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_PROTOCOL_TOPIC))) .setParallelism(outputParallelism); //metric_link @@ -169,8 +169,8 @@ public class CnPreMetric implements Schedule { .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) .reduce(new SecondAggregationReduce(), new MetricLinkProcessFunc()); linkMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_LINK_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_LINK_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_LINK_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_LINK_TOPIC))) .setParallelism(outputParallelism); SingleOutputStreamOperator<String> subscriberAppMetric = process.filter((log) -> log.getMetricKey() == MetricKeyConfig.SUBSCRIBER_APP) @@ -183,8 +183,8 @@ public class CnPreMetric implements Schedule { .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) .reduce(new SecondAggregationReduce(), new MetricSubscriberAppProcessFunc()); subscriberAppMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_SUBSCRIBER_APP_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_SUBSCRIBER_APP_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_SUBSCRIBER_APP_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_SUBSCRIBER_APP_TOPIC))) .setParallelism(outputParallelism); } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/DnsPreMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/DnsPreMetric.java index ca39841..3b8387a 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/DnsPreMetric.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/dns/DnsPreMetric.java @@ -3,7 +3,7 @@ package com.zdjizhi.pre.dns; import com.zdjizhi.base.common.CnRecordLog; import com.zdjizhi.base.config.Configs; import com.zdjizhi.base.platform.Schedule; -import com.zdjizhi.base.utils.KafkaUtils; +import com.zdjizhi.base.sink.clickhouse.ClickHouseTableFactory; import com.zdjizhi.etl.CnRecordEtl; import com.zdjizhi.pre.common.MetricKeyConfig; import com.zdjizhi.pre.dns.common.CommonConfig; @@ -38,8 +38,8 @@ public class DnsPreMetric implements Schedule { .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) .reduce(new SecondAggregationReduce(), new MetricDnsServerIpFunc()); serverIpMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_SERVER_IP_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_SERVER_IP_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_SERVER_IP_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_SERVER_IP_TOPIC))) .setParallelism(outputParallelism); //metric_dns_qname @@ -48,8 +48,8 @@ public class DnsPreMetric implements Schedule { .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) .reduce(new SecondAggregationReduce(), new MetricDnsQnameFunc()); qnameMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_QNAME_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_QNAME_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_QNAME_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_QNAME_TOPIC))) .setParallelism(outputParallelism); //metric_dns_qtype @@ -58,8 +58,8 @@ public class DnsPreMetric implements Schedule { .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) .reduce(new SecondAggregationReduce(), new MetricDnsQtypeFunc()); qtypeMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_QTYPE_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_QTYPE_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_QTYPE_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_QTYPE_TOPIC))) .setParallelism(outputParallelism); //metric_dns_rcode @@ -68,8 +68,8 @@ public class DnsPreMetric implements Schedule { .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) .reduce(new SecondAggregationReduce(), new MetricDnsRcodeFunc()); rcodeMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_RCODE_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_RCODE_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_RCODE_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_RCODE_TOPIC))) .setParallelism(outputParallelism); //metric_dns_rr_a @@ -78,8 +78,8 @@ public class DnsPreMetric implements Schedule { .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) .reduce(new SecondAggregationReduce(), new MetricDnsRraFunc()); rraMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_RRA_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_RRA_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_RRA_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_RRA_TOPIC))) .setParallelism(outputParallelism); //metric_dns_rr_aaaa @@ -88,8 +88,8 @@ public class DnsPreMetric implements Schedule { .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) .reduce(new SecondAggregationReduce(), new MetricDnsRraaaaFunc()); rr4aMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_RRAAAA_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_RRAAAA_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_RRAAAA_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_RRAAAA_TOPIC))) .setParallelism(outputParallelism); //metric_dns_rr_cname @@ -98,8 +98,8 @@ public class DnsPreMetric implements Schedule { .window(TumblingEventTimeWindows.of(Time.minutes(windowsTime))) .reduce(new SecondAggregationReduce(), new MetricDnsRrcnameFunc()); rrcnameMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_RRCNAME_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_RRCNAME_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_DNS_RRCNAME_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_DNS_RRCNAME_TOPIC))) .setParallelism(outputParallelism); } } diff --git a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java index 18da277..878d4c4 100644 --- a/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java +++ b/module-CN-pre-metrics/src/main/java/com/zdjizhi/pre/relation/CnRelationMetric.java @@ -4,7 +4,7 @@ import com.alibaba.fastjson2.JSON; import com.zdjizhi.base.common.CnRecordLog; import com.zdjizhi.base.config.Configs; import com.zdjizhi.base.platform.Schedule; -import com.zdjizhi.base.utils.KafkaUtils; +import com.zdjizhi.base.sink.clickhouse.ClickHouseTableFactory; import com.zdjizhi.etl.CnRecordEtl; import com.zdjizhi.pre.common.CommonConfig; import com.zdjizhi.pre.relation.common.RelationMetricLog; @@ -49,8 +49,8 @@ public class CnRelationMetric implements Schedule { } }); relationMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_RELATION_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_RELATION_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_RELATION_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_RELATION_TOPIC))) .setParallelism(Configs.get(CommonConfig.ENTITY_RELATION_OUTPUT_PARALLELISM)); @@ -77,8 +77,8 @@ public class CnRelationMetric implements Schedule { } }); attributeMetric - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_ATTRIBUTE_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_ATTRIBUTE_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.METRIC_ATTRIBUTE_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.METRIC_ATTRIBUTE_TOPIC))) .setParallelism(Configs.get(CommonConfig.DYNAMIC_ATTRIBUTE_OUTPUT_PARALLELISM)); } } diff --git a/platform-etl/src/main/java/com/zdjizhi/etl/CnRecordPersistence.java b/platform-etl/src/main/java/com/zdjizhi/etl/CnRecordPersistence.java index 6fa2cab..505ac02 100644 --- a/platform-etl/src/main/java/com/zdjizhi/etl/CnRecordPersistence.java +++ b/platform-etl/src/main/java/com/zdjizhi/etl/CnRecordPersistence.java @@ -4,7 +4,7 @@ import com.alibaba.fastjson2.JSON; import com.zdjizhi.base.common.CommonConfig; import com.zdjizhi.base.config.Configs; import com.zdjizhi.base.platform.Schedule; -import com.zdjizhi.base.utils.KafkaUtils; +import com.zdjizhi.base.sink.clickhouse.ClickHouseTableFactory; import java.util.Objects; @@ -15,8 +15,8 @@ public class CnRecordPersistence implements Schedule { CnRecordEtl.singleOutputStreamOperator .filter(Objects::nonNull) .map(JSON::toJSONString) - //.addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.CN_RECORD_TABLE))) - .addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.CN_RECORD_TOPIC))) + .addSink(ClickHouseTableFactory.getSinkFunction(Configs.get(CommonConfig.CN_RECORD_TABLE))) + //.addSink(KafkaUtils.getKafkaSink(Configs.get(CommonConfig.CN_RECORD_TOPIC))) .setParallelism(Configs.get(CommonConfig.CN_RECORD_PARALLELISM)); } } |
