summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-11-09 10:10:52 +0800
committerzhanghongqing <[email protected]>2022-11-09 10:10:52 +0800
commit3d418a58de359dff5808cc5b10c9b01dbf76ed07 (patch)
tree75373f1bed6014f51562c79c8b1fd060cb79288e /src
parent9cdfe060cfeda37e04fa43563569efae53641eb4 (diff)
1.优化代码增加日志入库方式选择HEADmaster
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/zdjizhi/etl/connection/ConnLogService.java8
-rw-r--r--src/main/java/com/zdjizhi/etl/dns/DnsLogService.java10
-rw-r--r--src/main/java/com/zdjizhi/utils/arangodb/AGSink.java4
3 files changed, 11 insertions, 11 deletions
diff --git a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
index 9c5f8e0..ab07376 100644
--- a/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
+++ b/src/main/java/com/zdjizhi/etl/connection/ConnLogService.java
@@ -35,14 +35,15 @@ public class ConnLogService {
LogService.getLogCKSink(sketchSource, SINK_CK_TABLE_SKETCH);
//写入ck通联relation表
LogService.getLogCKSink(connTransformStream, SINK_CK_TABLE_RELATION_CONNECTION);
- } else {
+ } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 2){
LogService.getLogKafkaSink(connSource, SINK_CK_TABLE_CONNECTION);
LogService.getLogKafkaSink(sketchSource, SINK_CK_TABLE_SKETCH);
LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION);
+ } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 3){
+ LogService.getLogKafkaSink(connTransformStream, SINK_KAFKA_TOPIC_RELATION_CONNECTION);
}
if (SINK_ARANGODB_RAW_LOG_INSERT_OPEN == 1) {
-
DataStream<Map<String, Object>> sketchTransformStream = getSketchTransformStream(sketchSource);
//合并通联和通联sketch
@@ -96,7 +97,7 @@ public class ConnLogService {
}
private static DataStream<Map<String, Object>> getConnTransformStream(DataStream<Map<String, Object>> connSource) throws Exception {
- DataStream<Map<String, Object>> connTransformStream = connSource
+ return connSource
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
.withTimestampAssigner((event, timestamp) -> {
@@ -109,7 +110,6 @@ public class ConnLogService {
.setParallelism(TRANSFORM_PARALLELISM)
.filter(x -> Objects.nonNull(x) && TypeUtils.castToLong(x.get("sessions")) >= 0 && TypeUtils.castToLong(x.get("packets")) >= 0 && TypeUtils.castToLong(x.get("bytes")) >= 0)
.setParallelism(TRANSFORM_PARALLELISM);
- return connTransformStream;
}
private static DataStream<Map<String, Object>> getSketchTransformStream(DataStream<Map<String, Object>> sketchSource) throws Exception {
diff --git a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java
index 7c67d6e..dd5aff3 100644
--- a/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java
+++ b/src/main/java/com/zdjizhi/etl/dns/DnsLogService.java
@@ -32,9 +32,11 @@ public class DnsLogService {
LogService.getLogCKSink(dnsSource, SINK_CK_TABLE_DNS);
//dns 拆分后relation日志 ck入库
LogService.getLogCKSink(dnsTransform, SINK_CK_TABLE_RELATION_DNS);
- } else {
+ } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 2){
LogService.getLogKafkaSink(dnsSource, SINK_CK_TABLE_DNS);
LogService.getLogKafkaSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS);
+ } else if (SINK_CK_RAW_LOG_INSERT_OPEN == 3){
+ LogService.getLogKafkaSink(dnsTransform, SINK_KAFKA_TOPIC_RELATION_DNS);
}
//arango 入库,按record_type分组入不同的表
@@ -57,18 +59,17 @@ public class DnsLogService {
private static DataStream<Map<String, Object>> getLogSource(StreamExecutionEnvironment env, String source) throws Exception {
- DataStream<Map<String, Object>> dnsSource = env.addSource(KafkaConsumer.myDeserializationConsumer(source))
+ return env.addSource(KafkaConsumer.myDeserializationConsumer(source))
.setParallelism(SOURCE_PARALLELISM)
.filter(x -> Objects.nonNull(x) && Convert.toLong(x.get("capture_time")) > 0)
.setParallelism(SOURCE_PARALLELISM)
.map(new DnsMapFunction())
.setParallelism(SOURCE_PARALLELISM)
.name(source);
- return dnsSource;
}
private static DataStream<Map<String, Object>> getDnsTransformStream(DataStream<Map<String, Object>> dnsSource) throws Exception {
- DataStream<Map<String, Object>> dnsTransform = dnsSource.filter(x -> Objects.nonNull(x.get("response")))
+ return dnsSource.filter(x -> Objects.nonNull(x.get("response")))
.setParallelism(SOURCE_PARALLELISM)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(FLINK_WATERMARK_MAX_DELAY_TIME))
@@ -80,7 +81,6 @@ public class DnsLogService {
.window(TumblingProcessingTimeWindows.of(Time.seconds(LOG_AGGREGATE_DURATION)))
.process(new DnsRelationProcessFunction())
.setParallelism(TRANSFORM_PARALLELISM);
- return dnsTransform;
}
public static void getLogArangoSink(DataStream<BaseEdgeDocument> sourceStream, String sink) throws Exception {
diff --git a/src/main/java/com/zdjizhi/utils/arangodb/AGSink.java b/src/main/java/com/zdjizhi/utils/arangodb/AGSink.java
index db4b207..d4d05a5 100644
--- a/src/main/java/com/zdjizhi/utils/arangodb/AGSink.java
+++ b/src/main/java/com/zdjizhi/utils/arangodb/AGSink.java
@@ -24,7 +24,7 @@ public class AGSink extends RichSinkFunction<BaseEdgeDocument> {
private static final Log logger = LogFactory.get();
// ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IP
- private final List<BaseEdgeDocument> ipWithDataList;
+ private final CopyOnWriteArrayList<BaseEdgeDocument> ipWithDataList;
// 满足此时间条件写出数据
private final long insertArangoTimeInterval = SINK_ARANGODB_BATCH_DELAY_TIME;
// 插入的批次
@@ -82,7 +82,7 @@ public class AGSink extends RichSinkFunction<BaseEdgeDocument> {
if (ipWithDataList.size() >= this.insertArangoBatchSize) {
try {
flush(ipWithDataList);
- } catch (SQLException e) {
+ } catch (Exception e) {
logger.error("ck sink invoke flush failed.", e);
}
}