diff options
| author | zhanghongqing <[email protected]> | 2022-07-07 14:07:27 +0800 |
|---|---|---|
| committer | zhanghongqing <[email protected]> | 2022-07-07 14:07:27 +0800 |
| commit | f552793230d0428cbc63714ee296c1ce4971a31b (patch) | |
| tree | 1bf3a26d957710b261f61a65559d393f55bf9382 /src/main/java/com/zdjizhi/common | |
Initial commit
Diffstat (limited to 'src/main/java/com/zdjizhi/common')
| -rw-r--r-- | src/main/java/com/zdjizhi/common/DnsKeysSelector.java | 19 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/FlowWriteConfig.java | 138 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/common/KeysSelector.java | 23 |
3 files changed, 180 insertions, 0 deletions
diff --git a/src/main/java/com/zdjizhi/common/DnsKeysSelector.java b/src/main/java/com/zdjizhi/common/DnsKeysSelector.java new file mode 100644 index 0000000..101597c --- /dev/null +++ b/src/main/java/com/zdjizhi/common/DnsKeysSelector.java @@ -0,0 +1,19 @@ +package com.zdjizhi.common; + +import org.apache.flink.api.java.functions.KeySelector; + +import java.util.Map; + +/** + * @description: + * @author: zhq + * @create: 2022-07-05 + **/ +public class DnsKeysSelector implements KeySelector<Map<String, Object>, String> { + + @Override + public String getKey(Map<String, Object> log) throws Exception { + + return String.valueOf(log.get("dns_qname")); + } +} diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java new file mode 100644 index 0000000..a84ebae --- /dev/null +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -0,0 +1,138 @@ +package com.zdjizhi.common; + + +import com.zdjizhi.utils.system.FlowWriteConfigurations; +import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; + +/** + * @author Administrator + */ +public class FlowWriteConfig { + + + private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); + + static { + encryptor.setPassword("galaxy"); + } + + public static final int IF_PARAM_LENGTH = 3; + /** + * 有此标识的字段为失效字段,不计入最终日志字段 + */ + public static final String VISIBILITY = "disabled"; + /** + * 默认的切分符号 + */ + public static final String FORMAT_SPLITTER = ","; + /** + * 标识字段为日志字段还是schema指定字段 + */ + public static final String IS_JSON_KEY_TAG = "$."; + /** + * if函数连接分隔符 + */ + public static final String IF_CONDITION_SPLITTER = "="; + /** + * 默认的字符串解析编码 + */ + public static final String ENCODING = "UTF8"; + + /** + * Nacos + */ + public static final String NACOS_SERVER = FlowWriteConfigurations.getStringProperty(0, "nacos.server"); + public static final String NACOS_SCHEMA_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.schema.namespace"); + public static final String NACOS_COMMON_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.common.namespace"); + public static final String NACOS_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.data.id"); + public static final String NACOS_PIN = FlowWriteConfigurations.getStringProperty(1, "nacos.pin"); + public static final String NACOS_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.group"); + public static final String NACOS_USERNAME = FlowWriteConfigurations.getStringProperty(1, "nacos.username"); + + /** + * System config + */ + public static final Integer SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "source.parallelism"); + public static final Integer SINK_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "sink.parallelism"); + public static final Integer TRANSFORM_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "transform.parallelism"); + public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num"); + public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete"); + public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(1, "mail.default.charset"); + public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type"); + public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout"); + + /** + * HBase + */ + public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs"); + public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name"); + + /** + * kafka common + */ + public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user")); + public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.pin")); + + /** + * kafka source config + */ + public static final String SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic"); + public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id"); + public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms"); + public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records"); + public static final String MAX_PARTITION_FETCH_BYTES = FlowWriteConfigurations.getStringProperty(1, "max.partition.fetch.bytes"); + + /** + * kafka sink config + */ + public static final String SINK_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.topic"); + public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(1, "producer.ack"); + public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type"); + + /** + * connection kafka + */ + public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries"); + public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms"); + public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms"); + public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size"); + public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory"); + public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size"); + + /** + * http + */ + public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(1, "app.id.http"); + public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(1, "app.tick.tuple.freq.secs"); + + /** + * common config + */ + public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"source.kafka.servers"); + public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0,"sink.kafka.servers"); + public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"zookeeper.servers"); + public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0,"tools.library"); + public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0,"hbase.zookeeper.servers"); + + + /* + * ck + * */ + public static final String CK_HOSTS = FlowWriteConfigurations.getStringProperty(0,"ck.hosts"); + public static final String CK_USERNAME = FlowWriteConfigurations.getStringProperty(0,"ck.username"); + public static final String CK_PIN = FlowWriteConfigurations.getStringProperty(0,"ck.pin"); + public static final String CK_DATABASE = FlowWriteConfigurations.getStringProperty(0,"ck.database"); + + public static final int FLINK_WATERMARK_MAX_ORDERNESS = FlowWriteConfigurations.getIntProperty(0,"flink.watermark.max.orderness"); + public static final int LOG_AGGREGATE_DURATION = FlowWriteConfigurations.getIntProperty(0,"log.aggregate.duration"); + public static final String SOURCE_KAFKA_TOPIC_DNS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.dns");; + public static final String SOURCE_KAFKA_TOPIC_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.connection"); + public static final String SOURCE_KAFKA_TOPIC_SKETCH = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.sketch"); + + //sink.ck.table + public static final String SINK_CK_TABLE_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.connection"); + public static final String SINK_CK_TABLE_SKETCH = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.sketch"); + public static final String SINK_CK_TABLE_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.dns"); + public static final String SINK_CK_TABLE_RELATION_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.connection"); + public static final String SINK_CK_TABLE_RELATION_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.dns"); +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/KeysSelector.java b/src/main/java/com/zdjizhi/common/KeysSelector.java new file mode 100644 index 0000000..a4d616c --- /dev/null +++ b/src/main/java/com/zdjizhi/common/KeysSelector.java @@ -0,0 +1,23 @@ +package com.zdjizhi.common; + + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; + +import java.util.Map; + +/** + * @description: + * @author: zhq + * @create: 2022-07-05 + **/ +public class KeysSelector implements KeySelector<Map<String, Object>, Tuple2<String, String>> { + + @Override + public Tuple2<String, String> getKey(Map<String,Object> log) throws Exception { + + return Tuple2.of( + String.valueOf(log.get("src_ip")), + String.valueOf(log.get("dst_ip"))); + } +} |
