summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/zdjizhi/common/FlowWriteConfig.java39
-rw-r--r--src/main/java/com/zdjizhi/etl/CKBatchWindow.java (renamed from src/main/java/com/zdjizhi/common/CKWindow.java)4
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java (renamed from src/main/java/com/zdjizhi/common/ArangodbIPWindow.java)4
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java (renamed from src/main/java/com/zdjizhi/etl/ConnProcessFunction.java)24
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java (renamed from src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java)2
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/IpKeysSelector.java (renamed from src/main/java/com/zdjizhi/common/IpKeysSelector.java)2
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java (renamed from src/main/java/com/zdjizhi/etl/SketchProcessFunction.java)2
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java (renamed from src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java)4
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsGraphKeysSelector.java (renamed from src/main/java/com/zdjizhi/common/DnsGraphKeysSelector.java)2
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java (renamed from src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java)2
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java (renamed from src/main/java/com/zdjizhi/etl/DnsMapFunction.java)4
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java (renamed from src/main/java/com/zdjizhi/etl/DnsRelationProcessFunction.java)2
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsSplitFlatMapFunction.java (renamed from src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java)2
-rw-r--r--src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java31
-rw-r--r--src/main/java/com/zdjizhi/utils/app/AppUtils.java124
-rw-r--r--src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java6
-rw-r--r--src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java8
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFormMap.java5
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java5
-rw-r--r--src/main/java/com/zdjizhi/utils/general/TransFunction.java18
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java16
-rw-r--r--src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java4
22 files changed, 56 insertions, 254 deletions
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
index 57f09a0..3b68285 100644
--- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -39,17 +39,6 @@ public class FlowWriteConfig {
public static final String ENCODING = "UTF8";
/**
- * Nacos
- */
- public static final String NACOS_SERVER = FlowWriteConfigurations.getStringProperty(0, "nacos.server");
- public static final String NACOS_SCHEMA_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.schema.namespace");
- public static final String NACOS_COMMON_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.common.namespace");
- public static final String NACOS_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.data.id");
- public static final String NACOS_PIN = FlowWriteConfigurations.getStringProperty(1, "nacos.pin");
- public static final String NACOS_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.group");
- public static final String NACOS_USERNAME = FlowWriteConfigurations.getStringProperty(1, "nacos.username");
-
- /**
* System config
*/
public static final Integer SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "source.parallelism");
@@ -62,12 +51,6 @@ public class FlowWriteConfig {
public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout");
/**
- * HBase
- */
- public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
- public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name");
-
- /**
* kafka common
*/
public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user"));
@@ -76,7 +59,6 @@ public class FlowWriteConfig {
/**
* kafka source config
*/
- public static final String SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic");
public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id");
public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms");
public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records");
@@ -85,7 +67,6 @@ public class FlowWriteConfig {
/**
* kafka sink config
*/
- public static final String SINK_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.topic");
public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(1, "producer.ack");
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
@@ -100,19 +81,12 @@ public class FlowWriteConfig {
public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
/**
- * http
- */
- public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(1, "app.id.http");
- public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(1, "app.tick.tuple.freq.secs");
-
- /**
* common config
*/
public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers");
public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers");
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
- public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
/*
@@ -125,7 +99,7 @@ public class FlowWriteConfig {
public static final int CK_CONNECTION_TIMEOUT = FlowWriteConfigurations.getIntProperty(0, "ck.connection.timeout");
public static final int CK_SOCKET_TIMEOUT = FlowWriteConfigurations.getIntProperty(0, "ck.socket.timeout");
- public static final int FLINK_WATERMARK_MAX_ORDERNESS = FlowWriteConfigurations.getIntProperty(0, "flink.watermark.max.orderness");
+ public static final int FLINK_WATERMARK_MAX_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "flink.watermark.max.delay.time");
public static final int LOG_AGGREGATE_DURATION = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration");
public static final int LOG_AGGREGATE_DURATION_GRAPH = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration.graph");
public static final String SOURCE_KAFKA_TOPIC_DNS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.dns");
@@ -147,8 +121,6 @@ public class FlowWriteConfig {
public static final String R_NX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.nx.domain2domain");
-
-
public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangoDB.host");
public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangoDB.port");
public static final String ARANGODB_USER = FlowWriteConfigurations.getStringProperty(0, "arangoDB.user");
@@ -156,11 +128,8 @@ public class FlowWriteConfig {
public static final String ARANGODB_DB_NAME = FlowWriteConfigurations.getStringProperty(0, "arangoDB.DB.name");
public static final Integer ARANGODB_TTL = FlowWriteConfigurations.getIntProperty(0, "arangoDB.ttl");
public static final Integer ARANGODB_BATCH = FlowWriteConfigurations.getIntProperty(0, "arangoDB.batch");
-
- public static final Integer UPDATE_ARANGO_BATCH = FlowWriteConfigurations.getIntProperty(0, "update.arango.batch");
- public static final String ARANGODB_READ_LIMIT = FlowWriteConfigurations.getStringProperty(0, "arangoDB.read.limit");
public static final Integer THREAD_POOL_NUMBER = FlowWriteConfigurations.getIntProperty(0, "thread.pool.number");
- public static final Integer THREAD_AWAIT_TERMINATION_TIME = FlowWriteConfigurations.getIntProperty(0, "thread.await.termination.time");
- public static final Integer SINK_BATCH_TIME_OUT = FlowWriteConfigurations.getIntProperty(0, "sink.batch.time.out");
- public static final Integer SINK_BATCH = FlowWriteConfigurations.getIntProperty(0, "sink.batch");
+
+ public static final Integer SINK_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.batch.delay.time");
+ public static final Integer CK_BATCH = FlowWriteConfigurations.getIntProperty(0, "ck.batch");
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/CKWindow.java b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java
index b7c7b8c..f66455f 100644
--- a/src/main/java/com/zdjizhi/common/CKWindow.java
+++ b/src/main/java/com/zdjizhi/etl/CKBatchWindow.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.common;
+package com.zdjizhi.etl;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -9,7 +9,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
-public class CKWindow implements AllWindowFunction<Map<String, Object>, List<Map<String, Object>>, TimeWindow> {
+public class CKBatchWindow implements AllWindowFunction<Map<String, Object>, List<Map<String, Object>>, TimeWindow> {
@Override
public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<Map<String, Object>>> out) throws Exception {
diff --git a/src/main/java/com/zdjizhi/common/ArangodbIPWindow.java b/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java
index f91fb08..f5848c4 100644
--- a/src/main/java/com/zdjizhi/common/ArangodbIPWindow.java
+++ b/src/main/java/com/zdjizhi/etl/connection/ArangodbBatchIPWindow.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.common;
+package com.zdjizhi.etl.connection;
import cn.hutool.core.util.StrUtil;
import com.arangodb.entity.BaseEdgeDocument;
@@ -11,7 +11,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
-public class ArangodbIPWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
+public class ArangodbBatchIPWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
@Override
public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> collector) throws Exception {
diff --git a/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java
index 49041dc..cec2425 100644
--- a/src/main/java/com/zdjizhi/etl/ConnProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/connection/ConnProcessFunction.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.etl;
+package com.zdjizhi.etl.connection;
import cn.hutool.core.convert.Convert;
import cn.hutool.log.Log;
@@ -24,21 +24,8 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
@Override
public void process(Tuple2<String, String> keys, Context context, Iterable<Map<String, Object>> elements, Collector<Map<String, Object>> out) {
- Map<String, Object> middleResult = getMiddleResult(keys, elements);
- try {
- if (middleResult != null) {
- out.collect(middleResult);
- logger.debug("获取中间聚合结果:{}", middleResult.toString());
- }
- } catch (Exception e) {
- logger.error("获取中间聚合结果失败,middleResult: {}\n{}", middleResult.toString(), e);
- }
- }
-
- private Map<String, Object> getMiddleResult(Tuple2<String, String> keys, Iterable<Map<String, Object>> elements) {
-
- Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements);
try {
+ Tuple5<Long, Long, Long, Long, Long> values = connAggregate(elements);
if (values != null) {
Map<String, Object> result = new LinkedHashMap<>();
result.put("start_time", values.f0);
@@ -48,13 +35,12 @@ public class ConnProcessFunction extends ProcessWindowFunction<Map<String, Objec
result.put("sessions", values.f2);
result.put("packets", values.f3);
result.put("bytes", values.f4);
- return result;
+ out.collect(result);
+ logger.debug("获取中间聚合结果:{}", result.toString());
}
-
} catch (Exception e) {
- logger.error("加载中间结果集失败,keys: {} values: {}\n{}", keys, values, e);
+ logger.error("获取中间聚合结果失败,middleResult: {}", e);
}
- return null;
}
private Tuple5<Long, Long, Long, Long, Long> connAggregate(Iterable<Map<String, Object>> elements) {
diff --git a/src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java
index 5bce25d..e9dd0e2 100644
--- a/src/main/java/com/zdjizhi/etl/Ip2IpGraphProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/connection/Ip2IpGraphProcessFunction.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.etl;
+package com.zdjizhi.etl.connection;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateUtil;
diff --git a/src/main/java/com/zdjizhi/common/IpKeysSelector.java b/src/main/java/com/zdjizhi/etl/connection/IpKeysSelector.java
index 2dc7e79..470648e 100644
--- a/src/main/java/com/zdjizhi/common/IpKeysSelector.java
+++ b/src/main/java/com/zdjizhi/etl/connection/IpKeysSelector.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.common;
+package com.zdjizhi.etl.connection;
import org.apache.flink.api.java.functions.KeySelector;
diff --git a/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java
index 54d53b6..698ed55 100644
--- a/src/main/java/com/zdjizhi/etl/SketchProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/connection/SketchProcessFunction.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.etl;
+package com.zdjizhi.etl.connection;
import cn.hutool.core.convert.Convert;
import cn.hutool.log.Log;
diff --git a/src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java b/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java
index 373eef5..ada4b83 100644
--- a/src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java
+++ b/src/main/java/com/zdjizhi/etl/dns/ArangodbBatchDnsWindow.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.common;
+package com.zdjizhi.etl.dns;
import cn.hutool.core.util.StrUtil;
import com.arangodb.entity.BaseEdgeDocument;
@@ -11,7 +11,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
-public class ArangodbDnsWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
+public class ArangodbBatchDnsWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
@Override
public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> out) throws Exception {
diff --git a/src/main/java/com/zdjizhi/common/DnsGraphKeysSelector.java b/src/main/java/com/zdjizhi/etl/dns/DnsGraphKeysSelector.java
index 5aa08c5..f6e6a6f 100644
--- a/src/main/java/com/zdjizhi/common/DnsGraphKeysSelector.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsGraphKeysSelector.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.common;
+package com.zdjizhi.etl.dns;
import cn.hutool.core.util.StrUtil;
import org.apache.flink.api.java.functions.KeySelector;
diff --git a/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java
index 18d7a71..d8b3a36 100644
--- a/src/main/java/com/zdjizhi/etl/DnsGraphProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsGraphProcessFunction.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.etl;
+package com.zdjizhi.etl.dns;
import cn.hutool.core.convert.Convert;
import cn.hutool.log.Log;
diff --git a/src/main/java/com/zdjizhi/etl/DnsMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java
index 86c4616..b536152 100644
--- a/src/main/java/com/zdjizhi/etl/DnsMapFunction.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsMapFunction.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.etl;
+package com.zdjizhi.etl.dns;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONArray;
@@ -47,7 +47,7 @@ public class DnsMapFunction implements MapFunction<Map<String, Object>, Map<Stri
} else if (DnsType.CNAME.getCode().equals(type)) {
dnsCNAME = Joiner.on(",").skipNulls().join(dnsCNAME, body);
dnsCNAMENum++;
- } else if (DnsType.CNAME.getCode().equals(type)) {
+ } else if (DnsType.NS.getCode().equals(type)) {
dnsNs = Joiner.on(",").skipNulls().join(dnsNs, body);
dnsNsNum++;
} else if (DnsType.MX.getCode().equals(type)) {
diff --git a/src/main/java/com/zdjizhi/etl/DnsRelationProcessFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java
index 04e45f8..03a0c41 100644
--- a/src/main/java/com/zdjizhi/etl/DnsRelationProcessFunction.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsRelationProcessFunction.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.etl;
+package com.zdjizhi.etl.dns;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateUtil;
diff --git a/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java b/src/main/java/com/zdjizhi/etl/dns/DnsSplitFlatMapFunction.java
index 6dbe35e..96ce7b9 100644
--- a/src/main/java/com/zdjizhi/etl/DnsSplitFlatMapFunction.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsSplitFlatMapFunction.java
@@ -1,4 +1,4 @@
-package com.zdjizhi.etl;
+package com.zdjizhi.etl.dns;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
index 0ed5052..7ca7acb 100644
--- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -4,9 +4,11 @@ import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
-import com.zdjizhi.common.*;
+import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.enums.DnsType;
-import com.zdjizhi.etl.*;
+import com.zdjizhi.etl.CKBatchWindow;
+import com.zdjizhi.etl.connection.*;
+import com.zdjizhi.etl.dns.*;
import com.zdjizhi.utils.arangodb.ArangoDBSink;
import com.zdjizhi.utils.ck.ClickhouseSink;
import com.zdjizhi.utils.kafka.KafkaConsumer;
@@ -49,7 +51,7 @@ public class LogFlowWriteTopology {
//transform
DataStream<Map<String, Object>> connTransformStream = connSource
.assignTimestampsAndWatermarks(WatermarkStrategy
- .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS))
+ .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("conn_start_time")) * 1000))
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
@@ -58,7 +60,7 @@ public class LogFlowWriteTopology {
.setParallelism(FlowWriteConfig.TRANSFORM_PARALLELISM);
DataStream<Map<String, Object>> sketchTransformStream = sketchSource.assignTimestampsAndWatermarks(WatermarkStrategy
- .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS))
+ .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("sketch_start_time")) * 1000))
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.LOG_AGGREGATE_DURATION)))
@@ -72,16 +74,16 @@ public class LogFlowWriteTopology {
.keyBy(new IpKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new Ip2IpGraphProcessFunction())
-// .filter(Objects::nonNull)
+ .filter(Objects::nonNull)
.setParallelism(TRANSFORM_PARALLELISM);
//写入CKsink,批量处理
- connSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink");
- sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink");
- sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
+ connSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_CONNECTION)).name("CKSink");
+ sketchSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_SKETCH)).name("CKSink");
+ sketchTransformStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow()).addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_CONNECTION)).name("CKSink");
//写入arangodb
- ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new ArangodbIPWindow()).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP);
+ ip2ipGraph.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new ArangodbBatchIPWindow()).addSink(new ArangoDBSink(R_VISIT_IP2IP)).name(R_VISIT_IP2IP);
} else if (FlowWriteConfig.LOG_NEED_COMPLETE == 2) {
@@ -92,7 +94,7 @@ public class LogFlowWriteTopology {
.name(FlowWriteConfig.SOURCE_KAFKA_TOPIC_DNS);
DataStream<Map<String, Object>> dnsTransform = dnsSource.assignTimestampsAndWatermarks(WatermarkStrategy
- .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_ORDERNESS))
+ .<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> Convert.toLong(event.get("capture_time")) * 1000))
.flatMap(new DnsSplitFlatMapFunction())
.keyBy(new DnsGraphKeysSelector())
@@ -104,13 +106,13 @@ public class LogFlowWriteTopology {
//dns 原始日志 ck入库
dnsSource.filter(Objects::nonNull)
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
- .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow())
+ .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_DNS))
.setParallelism(FlowWriteConfig.SINK_PARALLELISM)
.name("CKSink");
//dns 拆分后relation日志 ck入库
- dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new CKWindow())
+ dnsTransform.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new CKBatchWindow())
.addSink(new ClickhouseSink(SINK_CK_TABLE_RELATION_DNS))
.setParallelism(SINK_PARALLELISM)
.name("CKSink");
@@ -119,12 +121,11 @@ public class LogFlowWriteTopology {
DataStream<Map<String, Object>> dnsGraph = dnsTransform.keyBy(new DnsGraphKeysSelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION_GRAPH)))
.process(new DnsGraphProcessFunction())
- .setParallelism(SINK_PARALLELISM)
- .filter(Objects::nonNull);
+ .setParallelism(SINK_PARALLELISM);
for (DnsType dnsEnum : DnsType.values()) {
dnsGraph.filter(x -> ObjectUtil.equal(dnsEnum.getType(), x.get("record_type")))
- .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_TIME_OUT))).apply(new ArangodbDnsWindow())
+ .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(SINK_BATCH_DELAY_TIME))).apply(new ArangodbBatchDnsWindow())
.addSink(new ArangoDBSink(dnsEnum.getSink()))
.setParallelism(SINK_PARALLELISM)
.name("ArangodbSink");
diff --git a/src/main/java/com/zdjizhi/utils/app/AppUtils.java b/src/main/java/com/zdjizhi/utils/app/AppUtils.java
deleted file mode 100644
index 1425ce9..0000000
--- a/src/main/java/com/zdjizhi/utils/app/AppUtils.java
+++ /dev/null
@@ -1,124 +0,0 @@
-package com.zdjizhi.utils.app;
-
-import cn.hutool.log.Log;
-import cn.hutool.log.LogFactory;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import com.zdjizhi.common.FlowWriteConfig;
-import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.http.HttpClientUtil;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * AppId 工具类
- *
- * @author qidaijie
- */
-
-@Deprecated
-public class AppUtils {
- private static final Log logger = LogFactory.get();
- private static Map<Integer, String> appIdMap = new ConcurrentHashMap<>(128);
- private static AppUtils appUtils;
-
- private static void getAppInstance() {
- appUtils = new AppUtils();
- }
-
-
- /**
- * 构造函数-新
- */
- private AppUtils() {
- //定时更新
- updateAppIdCache();
- }
-
- /**
- * 更新变量
- */
- private static void change() {
- if (appUtils == null) {
- getAppInstance();
- }
- timestampsFilter();
- }
-
-
- /**
- * 获取变更内容
- */
- private static void timestampsFilter() {
- try {
- Long begin = System.currentTimeMillis();
- String schema = HttpClientUtil.requestByGetMethod(FlowWriteConfig.APP_ID_HTTP);
- if (StringUtil.isNotBlank(schema)) {
- String data = JSONObject.parseObject(schema).getString("data");
- JSONArray objects = JSONArray.parseArray(data);
- for (Object object : objects) {
- JSONArray jsonArray = JSONArray.parseArray(object.toString());
- int key = jsonArray.getInteger(0);
- String value = jsonArray.getString(1);
- if (appIdMap.containsKey(key)) {
- if (!value.equals(appIdMap.get(key))) {
- appIdMap.put(key, value);
- }
- } else {
- appIdMap.put(key, value);
- }
- }
- logger.warn("Updating the correspondence takes time:" + (begin - System.currentTimeMillis()));
- logger.warn("Pull the length of the interface data:[" + objects.size() + "]");
- }
- } catch (RuntimeException e) {
- logger.error("Update cache app-id failed, exception:" + e);
- }
- }
-
-
- /**
- * 验证定时器,每隔一段时间验证一次-验证获取新的Cookie
- */
- private void updateAppIdCache() {
- ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
- executorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- if (FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS != 0) {
- change();
- }
- } catch (RuntimeException e) {
- logger.error("AppUtils update AppCache is error===>{" + e + "}<===");
- }
- }
- }, 1, FlowWriteConfig.APP_TICK_TUPLE_FREQ_SECS, TimeUnit.SECONDS);
- }
-
-
- /**
- * 获取 appName
- *
- * @param appId app_id
- * @return account
- */
- public static String getAppName(int appId) {
-
- if (appUtils == null) {
- getAppInstance();
- }
-
- if (appIdMap.containsKey(appId)) {
- return appIdMap.get(appId);
- } else {
- logger.warn("AppMap get appName is null, ID is :" + appId);
- return "";
- }
- }
-
-}
diff --git a/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java
index d2306b7..d94dad7 100644
--- a/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java
+++ b/src/main/java/com/zdjizhi/utils/arangodb/ArangoDBSink.java
@@ -17,9 +17,8 @@ public class ArangoDBSink extends RichSinkFunction<List<BaseEdgeDocument>> {
private String collection;
@Override
- public void invoke(List<BaseEdgeDocument> BaseEdgeDocuments, Context context) throws Exception {
-
- arangoDBConnect.overwrite(BaseEdgeDocuments, getCollection());
+ public void invoke(List<BaseEdgeDocument> baseEdgeDocuments, Context context) throws Exception {
+ arangoDBConnect.overwrite(baseEdgeDocuments, getCollection());
}
@Override
@@ -47,3 +46,4 @@ public class ArangoDBSink extends RichSinkFunction<List<BaseEdgeDocument>> {
}
}
+
diff --git a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
index cb37500..d3e14cc 100644
--- a/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
+++ b/src/main/java/com/zdjizhi/utils/ck/ClickhouseSink.java
@@ -29,7 +29,6 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
private PreparedStatement preparedStatement;
public String sink;
-
public ClickhouseSink(String sink) {
this.sink = sink;
}
@@ -49,7 +48,7 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
@Override
public void open(Configuration parameters) throws Exception {
-
+ super.open(parameters);
try {
// Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
// connection = DriverManager.getConnection("jdbc:clickhouse://" + CK_HOSTS + "/" + CK_DATABASE, CK_USERNAME, CK_PIN);
@@ -64,8 +63,7 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://" + CK_HOSTS, properties);
dataSource.scheduleActualization(10, TimeUnit.SECONDS);//开启检测
connection = dataSource.getConnection();
-
- log.info("get clickhouse connection success");
+ log.debug("get clickhouse connection success");
} catch (SQLException e) {
log.error("clickhouse connection error ,{}", e);
}
@@ -102,7 +100,7 @@ public class ClickhouseSink extends RichSinkFunction<List<Map<String, Object>>>
preparedStatement.addBatch();
count++;
//1w提交一次
- if (count % SINK_BATCH == 0) {
+ if (count % CK_BATCH == 0) {
preparedStatement.executeBatch();
connection.commit();
preparedStatement.clearBatch();
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
index 27daa71..42b7639 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFormMap.java
@@ -113,11 +113,6 @@ public class TransFormMap {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
}
break;
- case "app_match":
- if (logValue != null && appendTo == null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
- }
- break;
default:
}
}
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
index 34cabfa..9e92576 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFormTypeMap.java
@@ -115,11 +115,6 @@ public class TransFormTypeMap {
JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.flattenSpec(logValue.toString(), param));
}
break;
- case "app_match":
- if (logValue != null && appendToKeyValue == null) {
- JsonParseUtil.setValue(jsonMap, appendToKeyName, TransFunction.appMatch(logValue.toString()));
- }
- break;
default:
}
}
diff --git a/src/main/java/com/zdjizhi/utils/general/TransFunction.java b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
index e3363f9..84fe5cc 100644
--- a/src/main/java/com/zdjizhi/utils/general/TransFunction.java
+++ b/src/main/java/com/zdjizhi/utils/general/TransFunction.java
@@ -1,7 +1,6 @@
package com.zdjizhi.utils.general;
import cn.hutool.core.codec.Base64;
-import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.jayway.jsonpath.InvalidPathException;
@@ -10,7 +9,6 @@ import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.FormatUtils;
import com.zdjizhi.utils.IpLookupV2;
import com.zdjizhi.utils.StringUtil;
-import com.zdjizhi.utils.app.AppUtils;
import com.zdjizhi.utils.json.JsonParseUtil;
import java.math.BigInteger;
@@ -126,22 +124,6 @@ class TransFunction {
* @return account
*/
- /**
- * appId与缓存中对应关系补全appName
- *
- * @param appIds app id 列表
- * @return appName
- */
- @Deprecated
- static String appMatch(String appIds) {
- try {
- String appId = StrUtil.split(appIds, FlowWriteConfig.FORMAT_SPLITTER, true, true).get(0);
- return AppUtils.getAppName(Integer.parseInt(appId));
- } catch (NumberFormatException | ClassCastException exception) {
- logger.error("APP ID列表分割转换异常,异常APP ID列表:" + appIds);
- return "";
- }
- }
/**
* 解析顶级域名
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
index 97a53da..e15d535 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaConsumer.java
@@ -14,10 +14,10 @@ import java.util.Properties;
* @date 2021/6/813:54
*/
public class KafkaConsumer {
- private static Properties createConsumerConfig() {
+ private static Properties createConsumerConfig(String topic) {
Properties properties = new Properties();
properties.put("bootstrap.servers", FlowWriteConfig.SOURCE_KAFKA_SERVERS);
- properties.put("group.id", FlowWriteConfig.GROUP_ID);
+ properties.put("group.id", FlowWriteConfig.GROUP_ID + "-" + topic);
properties.put("session.timeout.ms", FlowWriteConfig.SESSION_TIMEOUT_MS);
properties.put("max.poll.records", FlowWriteConfig.MAX_POLL_RECORDS);
properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES);
@@ -33,9 +33,9 @@ public class KafkaConsumer {
* @return kafka logs -> map
*/
@SuppressWarnings("unchecked")
- public static FlinkKafkaConsumer<Map<String,Object>> myDeserializationConsumer(String topic) {
- FlinkKafkaConsumer<Map<String,Object>> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
- new TimestampDeserializationSchema(), createConsumerConfig());
+ public static FlinkKafkaConsumer<Map<String, Object>> myDeserializationConsumer(String topic) {
+ FlinkKafkaConsumer<Map<String, Object>> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
+ new TimestampDeserializationSchema(), createConsumerConfig(topic));
//随着checkpoint提交,将offset提交到kafka
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
@@ -51,9 +51,9 @@ public class KafkaConsumer {
*
* @return kafka logs
*/
- public static FlinkKafkaConsumer<String> flinkConsumer() {
- FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(FlowWriteConfig.SOURCE_KAFKA_TOPIC,
- new SimpleStringSchema(), createConsumerConfig());
+ public static FlinkKafkaConsumer<String> flinkConsumer(String topic) {
+ FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
+ new SimpleStringSchema(), createConsumerConfig(topic));
//随着checkpoint提交,将offset提交到kafka
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
diff --git a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
index 28ecff9..cf27cc3 100644
--- a/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
+++ b/src/main/java/com/zdjizhi/utils/kafka/KafkaProducer.java
@@ -33,9 +33,9 @@ public class KafkaProducer {
}
- public static FlinkKafkaProducer<String> getKafkaProducer() {
+ public static FlinkKafkaProducer<String> getKafkaProducer(String topic) {
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
- FlowWriteConfig.SINK_KAFKA_TOPIC,
+ topic,
new SimpleStringSchema(),
createProducerConfig(),
//sink与所有分区建立连接,轮询写入;