summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/common
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/zdjizhi/common')
-rw-r--r--src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java36
-rw-r--r--src/main/java/com/zdjizhi/common/ArangodbIPWindow.java36
-rw-r--r--src/main/java/com/zdjizhi/common/CKWindow.java24
-rw-r--r--src/main/java/com/zdjizhi/common/DnsGraphKeysSelector.java23
-rw-r--r--src/main/java/com/zdjizhi/common/FlowWriteConfig.java39
-rw-r--r--src/main/java/com/zdjizhi/common/IpKeysSelector.java22
6 files changed, 4 insertions, 176 deletions
diff --git a/src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java b/src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java
deleted file mode 100644
index 373eef5..0000000
--- a/src/main/java/com/zdjizhi/common/ArangodbDnsWindow.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.zdjizhi.common;
-
-import cn.hutool.core.util.StrUtil;
-import com.arangodb.entity.BaseEdgeDocument;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-public class ArangodbDnsWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
-
- @Override
- public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> out) throws Exception {
- Iterator<Map<String, Object>> iterator = iterable.iterator();
- List<BaseEdgeDocument> batchLog = new ArrayList<>();
- while (iterator.hasNext()) {
- Map<String, Object> next = iterator.next();
- String qname = StrUtil.toString(next.get("qname"));
- String record = StrUtil.toString(next.get("record"));
- BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument();
- baseEdgeDocument.setKey(String.join("-", qname, record));
- baseEdgeDocument.setFrom("qname/" + qname);
- baseEdgeDocument.setTo("record/" + record);
- baseEdgeDocument.addAttribute("qname", qname);
- baseEdgeDocument.addAttribute("record", record);
- baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time"));
-
- batchLog.add(baseEdgeDocument);
- }
- out.collect(batchLog);
- }
-}
diff --git a/src/main/java/com/zdjizhi/common/ArangodbIPWindow.java b/src/main/java/com/zdjizhi/common/ArangodbIPWindow.java
deleted file mode 100644
index f91fb08..0000000
--- a/src/main/java/com/zdjizhi/common/ArangodbIPWindow.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.zdjizhi.common;
-
-import cn.hutool.core.util.StrUtil;
-import com.arangodb.entity.BaseEdgeDocument;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-public class ArangodbIPWindow implements AllWindowFunction<Map<String, Object>, List<BaseEdgeDocument>, TimeWindow> {
-
- @Override
- public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<BaseEdgeDocument>> collector) throws Exception {
- Iterator<Map<String, Object>> iterator = iterable.iterator();
- List<BaseEdgeDocument> batchLog = new ArrayList<>();
- while (iterator.hasNext()) {
- Map<String, Object> next = iterator.next();
- String srcIp = StrUtil.toString(next.get("src_ip"));
- String dstIp = StrUtil.toString(next.get("dst_ip"));
- BaseEdgeDocument baseEdgeDocument = new BaseEdgeDocument();
- baseEdgeDocument.setKey(String.join("-", srcIp, dstIp));
- baseEdgeDocument.setFrom("src_ip/" + srcIp);
- baseEdgeDocument.setTo("dst_ip/" + dstIp);
- baseEdgeDocument.addAttribute("src_ip", srcIp);
- baseEdgeDocument.addAttribute("dst_ip", dstIp);
- baseEdgeDocument.addAttribute("last_found_time", next.get("last_found_time"));
-
- batchLog.add(baseEdgeDocument);
- }
- collector.collect(batchLog);
- }
-}
diff --git a/src/main/java/com/zdjizhi/common/CKWindow.java b/src/main/java/com/zdjizhi/common/CKWindow.java
deleted file mode 100644
index b7c7b8c..0000000
--- a/src/main/java/com/zdjizhi/common/CKWindow.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package com.zdjizhi.common;
-
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-public class CKWindow implements AllWindowFunction<Map<String, Object>, List<Map<String, Object>>, TimeWindow> {
-
- @Override
- public void apply(TimeWindow timeWindow, Iterable<Map<String, Object>> iterable, Collector<List<Map<String, Object>>> out) throws Exception {
- Iterator<Map<String, Object>> iterator = iterable.iterator();
- List<Map<String, Object>> batchLog = new ArrayList<>();
- while (iterator.hasNext()) {
- Map<String, Object> next = iterator.next();
- batchLog.add(next);
- }
- out.collect(batchLog);
- }
-}
diff --git a/src/main/java/com/zdjizhi/common/DnsGraphKeysSelector.java b/src/main/java/com/zdjizhi/common/DnsGraphKeysSelector.java
deleted file mode 100644
index 5aa08c5..0000000
--- a/src/main/java/com/zdjizhi/common/DnsGraphKeysSelector.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.zdjizhi.common;
-
-import cn.hutool.core.util.StrUtil;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-import java.util.Map;
-
-/**
- * @description:
- * @author: zhq
- * @create: 2022-07-05
- **/
-public class DnsGraphKeysSelector implements KeySelector<Map<String, Object>, Tuple3<String, String, String>> {
-
- @Override
- public Tuple3<String, String, String> getKey(Map<String, Object> log) throws Exception {
-
- return Tuple3.of(StrUtil.toString(log.get("record_type")),
- StrUtil.toString(log.get("qname")),
- StrUtil.toString(log.get("record")));
- }
-}
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
index 57f09a0..3b68285 100644
--- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -39,17 +39,6 @@ public class FlowWriteConfig {
public static final String ENCODING = "UTF8";
/**
- * Nacos
- */
- public static final String NACOS_SERVER = FlowWriteConfigurations.getStringProperty(0, "nacos.server");
- public static final String NACOS_SCHEMA_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.schema.namespace");
- public static final String NACOS_COMMON_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.common.namespace");
- public static final String NACOS_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.data.id");
- public static final String NACOS_PIN = FlowWriteConfigurations.getStringProperty(1, "nacos.pin");
- public static final String NACOS_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.group");
- public static final String NACOS_USERNAME = FlowWriteConfigurations.getStringProperty(1, "nacos.username");
-
- /**
* System config
*/
public static final Integer SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "source.parallelism");
@@ -62,12 +51,6 @@ public class FlowWriteConfig {
public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout");
/**
- * HBase
- */
- public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
- public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name");
-
- /**
* kafka common
*/
public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user"));
@@ -76,7 +59,6 @@ public class FlowWriteConfig {
/**
* kafka source config
*/
- public static final String SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic");
public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id");
public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms");
public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records");
@@ -85,7 +67,6 @@ public class FlowWriteConfig {
/**
* kafka sink config
*/
- public static final String SINK_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.topic");
public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(1, "producer.ack");
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
@@ -100,19 +81,12 @@ public class FlowWriteConfig {
public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
/**
- * http
- */
- public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(1, "app.id.http");
- public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(1, "app.tick.tuple.freq.secs");
-
- /**
* common config
*/
public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers");
public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers");
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
- public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
/*
@@ -125,7 +99,7 @@ public class FlowWriteConfig {
public static final int CK_CONNECTION_TIMEOUT = FlowWriteConfigurations.getIntProperty(0, "ck.connection.timeout");
public static final int CK_SOCKET_TIMEOUT = FlowWriteConfigurations.getIntProperty(0, "ck.socket.timeout");
- public static final int FLINK_WATERMARK_MAX_ORDERNESS = FlowWriteConfigurations.getIntProperty(0, "flink.watermark.max.orderness");
+ public static final int FLINK_WATERMARK_MAX_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "flink.watermark.max.delay.time");
public static final int LOG_AGGREGATE_DURATION = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration");
public static final int LOG_AGGREGATE_DURATION_GRAPH = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration.graph");
public static final String SOURCE_KAFKA_TOPIC_DNS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.dns");
@@ -147,8 +121,6 @@ public class FlowWriteConfig {
public static final String R_NX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.nx.domain2domain");
-
-
public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangoDB.host");
public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangoDB.port");
public static final String ARANGODB_USER = FlowWriteConfigurations.getStringProperty(0, "arangoDB.user");
@@ -156,11 +128,8 @@ public class FlowWriteConfig {
public static final String ARANGODB_DB_NAME = FlowWriteConfigurations.getStringProperty(0, "arangoDB.DB.name");
public static final Integer ARANGODB_TTL = FlowWriteConfigurations.getIntProperty(0, "arangoDB.ttl");
public static final Integer ARANGODB_BATCH = FlowWriteConfigurations.getIntProperty(0, "arangoDB.batch");
-
- public static final Integer UPDATE_ARANGO_BATCH = FlowWriteConfigurations.getIntProperty(0, "update.arango.batch");
- public static final String ARANGODB_READ_LIMIT = FlowWriteConfigurations.getStringProperty(0, "arangoDB.read.limit");
public static final Integer THREAD_POOL_NUMBER = FlowWriteConfigurations.getIntProperty(0, "thread.pool.number");
- public static final Integer THREAD_AWAIT_TERMINATION_TIME = FlowWriteConfigurations.getIntProperty(0, "thread.await.termination.time");
- public static final Integer SINK_BATCH_TIME_OUT = FlowWriteConfigurations.getIntProperty(0, "sink.batch.time.out");
- public static final Integer SINK_BATCH = FlowWriteConfigurations.getIntProperty(0, "sink.batch");
+
+ public static final Integer SINK_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.batch.delay.time");
+ public static final Integer CK_BATCH = FlowWriteConfigurations.getIntProperty(0, "ck.batch");
} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/IpKeysSelector.java b/src/main/java/com/zdjizhi/common/IpKeysSelector.java
deleted file mode 100644
index 2dc7e79..0000000
--- a/src/main/java/com/zdjizhi/common/IpKeysSelector.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package com.zdjizhi.common;
-
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.Map;
-
-/**
- * @description:
- * @author: zhq
- * @create: 2022-07-05
- **/
-public class IpKeysSelector implements KeySelector<Map<String, Object>, Tuple2<String, String>> {
-
- @Override
- public Tuple2<String, String> getKey(Map<String,Object> log) throws Exception {
- return Tuple2.of(
- String.valueOf(log.get("src_ip")),
- String.valueOf(log.get("dst_ip")));
- }
-}