summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/common
diff options
context:
space:
mode:
authorzhanghongqing <[email protected]>2022-07-07 14:07:27 +0800
committerzhanghongqing <[email protected]>2022-07-07 14:07:27 +0800
commitf552793230d0428cbc63714ee296c1ce4971a31b (patch)
tree1bf3a26d957710b261f61a65559d393f55bf9382 /src/main/java/com/zdjizhi/common
Initial commit
Diffstat (limited to 'src/main/java/com/zdjizhi/common')
-rw-r--r--src/main/java/com/zdjizhi/common/DnsKeysSelector.java19
-rw-r--r--src/main/java/com/zdjizhi/common/FlowWriteConfig.java138
-rw-r--r--src/main/java/com/zdjizhi/common/KeysSelector.java23
3 files changed, 180 insertions, 0 deletions
diff --git a/src/main/java/com/zdjizhi/common/DnsKeysSelector.java b/src/main/java/com/zdjizhi/common/DnsKeysSelector.java
new file mode 100644
index 0000000..101597c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/DnsKeysSelector.java
@@ -0,0 +1,19 @@
+package com.zdjizhi.common;
+
+import org.apache.flink.api.java.functions.KeySelector;
+
+import java.util.Map;
+
+/**
+ * @description:
+ * @author: zhq
+ * @create: 2022-07-05
+ **/
+public class DnsKeysSelector implements KeySelector<Map<String, Object>, String> {
+
+ @Override
+ public String getKey(Map<String, Object> log) throws Exception {
+
+ return String.valueOf(log.get("dns_qname"));
+ }
+}
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
new file mode 100644
index 0000000..a84ebae
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -0,0 +1,138 @@
+package com.zdjizhi.common;
+
+
+import com.zdjizhi.utils.system.FlowWriteConfigurations;
+import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
+
+/**
+ * @author Administrator
+ */
+public class FlowWriteConfig {
+
+
+ private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
+
+ static {
+ encryptor.setPassword("galaxy");
+ }
+
+ public static final int IF_PARAM_LENGTH = 3;
+ /**
+ * 有此标识的字段为失效字段,不计入最终日志字段
+ */
+ public static final String VISIBILITY = "disabled";
+ /**
+ * 默认的切分符号
+ */
+ public static final String FORMAT_SPLITTER = ",";
+ /**
+ * 标识字段为日志字段还是schema指定字段
+ */
+ public static final String IS_JSON_KEY_TAG = "$.";
+ /**
+ * if函数连接分隔符
+ */
+ public static final String IF_CONDITION_SPLITTER = "=";
+ /**
+ * 默认的字符串解析编码
+ */
+ public static final String ENCODING = "UTF8";
+
+ /**
+ * Nacos
+ */
+ public static final String NACOS_SERVER = FlowWriteConfigurations.getStringProperty(0, "nacos.server");
+ public static final String NACOS_SCHEMA_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.schema.namespace");
+ public static final String NACOS_COMMON_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.common.namespace");
+ public static final String NACOS_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.data.id");
+ public static final String NACOS_PIN = FlowWriteConfigurations.getStringProperty(1, "nacos.pin");
+ public static final String NACOS_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.group");
+ public static final String NACOS_USERNAME = FlowWriteConfigurations.getStringProperty(1, "nacos.username");
+
+ /**
+ * System config
+ */
+ public static final Integer SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "source.parallelism");
+ public static final Integer SINK_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "sink.parallelism");
+ public static final Integer TRANSFORM_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "transform.parallelism");
+ public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num");
+ public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete");
+ public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(1, "mail.default.charset");
+ public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type");
+ public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout");
+
+ /**
+ * HBase
+ */
+ public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
+ public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name");
+
+ /**
+ * kafka common
+ */
+ public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user"));
+ public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.pin"));
+
+ /**
+ * kafka source config
+ */
+ public static final String SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic");
+ public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id");
+ public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms");
+ public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records");
+ public static final String MAX_PARTITION_FETCH_BYTES = FlowWriteConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
+
+ /**
+ * kafka sink config
+ */
+ public static final String SINK_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.topic");
+ public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(1, "producer.ack");
+ public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
+
+ /**
+ * connection kafka
+ */
+ public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries");
+ public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms");
+ public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms");
+ public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size");
+ public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
+ public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
+
+ /**
+ * http
+ */
+ public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(1, "app.id.http");
+ public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(1, "app.tick.tuple.freq.secs");
+
+ /**
+ * common config
+ */
+ public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"source.kafka.servers");
+ public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"sink.kafka.servers");
+ public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"zookeeper.servers");
+ public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0,"tools.library");
+ public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"hbase.zookeeper.servers");
+
+
+ /*
+ * ck
+ * */
+ public static final String CK_HOSTS = FlowWriteConfigurations.getStringProperty(0,"ck.hosts");
+ public static final String CK_USERNAME = FlowWriteConfigurations.getStringProperty(0,"ck.username");
+ public static final String CK_PIN = FlowWriteConfigurations.getStringProperty(0,"ck.pin");
+ public static final String CK_DATABASE = FlowWriteConfigurations.getStringProperty(0,"ck.database");
+
+ public static final int FLINK_WATERMARK_MAX_ORDERNESS = FlowWriteConfigurations.getIntProperty(0,"flink.watermark.max.orderness");
+ public static final int LOG_AGGREGATE_DURATION = FlowWriteConfigurations.getIntProperty(0,"log.aggregate.duration");
+ public static final String SOURCE_KAFKA_TOPIC_DNS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.dns");;
+ public static final String SOURCE_KAFKA_TOPIC_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.connection");
+ public static final String SOURCE_KAFKA_TOPIC_SKETCH = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.sketch");
+
+ //sink.ck.table
+ public static final String SINK_CK_TABLE_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.connection");
+ public static final String SINK_CK_TABLE_SKETCH = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.sketch");
+ public static final String SINK_CK_TABLE_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.dns");
+ public static final String SINK_CK_TABLE_RELATION_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.connection");
+ public static final String SINK_CK_TABLE_RELATION_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.dns");
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/KeysSelector.java b/src/main/java/com/zdjizhi/common/KeysSelector.java
new file mode 100644
index 0000000..a4d616c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/KeysSelector.java
@@ -0,0 +1,23 @@
+package com.zdjizhi.common;
+
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.Map;
+
+/**
+ * @description:
+ * @author: zhq
+ * @create: 2022-07-05
+ **/
+public class KeysSelector implements KeySelector<Map<String, Object>, Tuple2<String, String>> {
+
+ @Override
+ public Tuple2<String, String> getKey(Map<String,Object> log) throws Exception {
+
+ return Tuple2.of(
+ String.valueOf(log.get("src_ip")),
+ String.valueOf(log.get("dst_ip")));
+ }
+}