summaryrefslogtreecommitdiff
path: root/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
blob: 7795d9e00341f2d01f41d38c2fbeea042f910145 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package com.zdjizhi.common;


import com.zdjizhi.utils.system.FlowWriteConfigurations;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;

/**
 * @author Administrator
 */
public class FlowWriteConfig {


    private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();

    static {
        encryptor.setPassword("galaxy");
    }

    public static final int IF_PARAM_LENGTH = 3;
    /**
     * 有此标识的字段为失效字段,不计入最终日志字段
     */
    public static final String VISIBILITY = "disabled";
    /**
     * 默认的切分符号
     */
    public static final String FORMAT_SPLITTER = ",";
    /**
     * 标识字段为日志字段还是schema指定字段
     */
    public static final String IS_JSON_KEY_TAG = "$.";
    /**
     * if函数连接分隔符
     */
    public static final String IF_CONDITION_SPLITTER = "=";
    /**
     * 默认的字符串解析编码
     */
    public static final String ENCODING = "UTF8";

    /**
     * System config
     */
    public static final Integer SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "source.parallelism");
    public static final Integer SINK_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "sink.parallelism");
    public static final Integer TRANSFORM_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "transform.parallelism");
    public static final Integer DATA_CENTER_ID_NUM = 0 ;
    public static final Integer LOG_TYPE = FlowWriteConfigurations.getIntProperty(0, "log.type");
    public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(1, "mail.default.charset");
    public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type");
    public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout");

    /**
     * kafka common
     */
    public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user"));
    public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.pin"));

    /**
     * kafka source config
     */
    public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id");
    public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms");
    public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records");
    public static final String MAX_PARTITION_FETCH_BYTES = FlowWriteConfigurations.getStringProperty(1, "max.partition.fetch.bytes");

    /**
     * kafka sink config
     */
    public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(1, "producer.ack");
    public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");

    /**
     * connection kafka
     */
    public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries");
    public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms");
    public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms");
    public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size");
    public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
    public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");

    /**
     * common config
     */
    public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers");
    public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers");
    public static final String ZOOKEEPER_SERVERS = "zookeeper.servers";
    public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");


    /*
     * ck
     * */
    public static final String CK_HOSTS = FlowWriteConfigurations.getStringProperty(0, "ck.hosts");
    public static final String CK_USERNAME = FlowWriteConfigurations.getStringProperty(0, "ck.username");
    public static final String CK_PIN = FlowWriteConfigurations.getStringProperty(0, "ck.pin");
    public static final String CK_DATABASE = FlowWriteConfigurations.getStringProperty(0, "ck.database");
    public static final int CK_CONNECTION_TIMEOUT = FlowWriteConfigurations.getIntProperty(0, "ck.connection.timeout");
    public static final int CK_SOCKET_TIMEOUT = FlowWriteConfigurations.getIntProperty(0, "ck.socket.timeout");

    public static final int FLINK_WATERMARK_MAX_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "flink.watermark.max.delay.time");
    public static final int LOG_AGGREGATE_DURATION = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration");
    public static final int LOG_AGGREGATE_DURATION_GRAPH = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration.graph");
    public static final String SOURCE_KAFKA_TOPIC_DNS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.dns");
    ;
    public static final String SOURCE_KAFKA_TOPIC_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.connection");
    public static final String SOURCE_KAFKA_TOPIC_SKETCH = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.sketch");
    public static final String SINK_KAFKA_TOPIC_RELATION_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.topic.relation.connection");
    public static final String SINK_KAFKA_TOPIC_RELATION_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.topic.relation.dns");
    //sink.ck.table
    public static final String SINK_CK_TABLE_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.connection");
    public static final String SINK_CK_TABLE_SKETCH = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.sketch");
    public static final String SINK_CK_TABLE_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.dns");
    public static final String SINK_CK_TABLE_RELATION_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.connection");
    public static final String SINK_CK_TABLE_RELATION_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.dns");


    public static final String R_VISIT_IP2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.visit.ip2ip");
    public static final String R_CNAME_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.cname.domain2domain");
    public static final String R_MX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.mx.domain2domain");
    public static final String R_RESOLVE_DOMAIN2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.resolve.domain2ip");
    public static final String R_NX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arangodb.table.r.nx.domain2domain");


    public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangodb.host");
    public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangodb.port");
    public static final String ARANGODB_USER = FlowWriteConfigurations.getStringProperty(0, "arangodb.user");
    public static final String ARANGODB_PASSWORD = FlowWriteConfigurations.getStringProperty(0, "arangodb.password");
    public static final String ARANGODB_DB_NAME = FlowWriteConfigurations.getStringProperty(0, "arangodb.db.name");
    public static final Integer ARANGODB_TTL = FlowWriteConfigurations.getIntProperty(0, "arangodb.ttl");
    public static final Integer ARANGODB_BATCH = FlowWriteConfigurations.getIntProperty(0, "arangodb.batch");
    public static final Integer ARANGODB_THREAD_POOL_NUMBER = FlowWriteConfigurations.getIntProperty(0, "arangodb.thread.pool.number");

    public static final Integer SINK_CK_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.ck.batch.delay.time");
    public static final Integer SINK_ARANGODB_BATCH_DELAY_TIME = FlowWriteConfigurations.getIntProperty(0, "sink.arangodb.batch.delay.time");
    public static final Integer CK_BATCH = FlowWriteConfigurations.getIntProperty(0, "ck.batch");
    public static final Integer SINK_CK_RAW_LOG_INSERT_OPEN = FlowWriteConfigurations.getIntProperty(0, "sink.ck.raw.log.insert.open");
    public static final Integer AGGREGATE_MAX_VALUE_LENGTH = FlowWriteConfigurations.getIntProperty(0, "aggregate.max.value.length");

    public static final Integer SINK_ARANGODB_RAW_LOG_INSERT_OPEN = FlowWriteConfigurations.getIntProperty(0, "sink.arangodb.raw.log.insert.open");


    public static final Integer HIKARI_MINIMUM_IDLE = FlowWriteConfigurations.getIntProperty(1, "spring.datasource.hikari.minimum-idle");
    public static final Integer HIKARI_MAXIMUM_POOL_SIZE = FlowWriteConfigurations.getIntProperty(1, "spring.datasource.hikari.maximum-pool-size");
    public static final Long HIKARI_IDLE_TIMEOUT = FlowWriteConfigurations.getLongProperty(1, "spring.datasource.hikari.idle-timeout");
    public static final Long HIKARI_MAX_LIFETIME = FlowWriteConfigurations.getLongProperty(1, "spring.datasource.hikari.max-lifetime");
    public static final Integer HIKARI_CONNECTION_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "spring.datasource.hikari.connection-timeout");
    public static final Integer CK_MAX_THREADS = FlowWriteConfigurations.getIntProperty(1, "ck.max.threads");
    public static final Integer CK_SCHEDULE_ACTUALIZATION = FlowWriteConfigurations.getIntProperty(1, "ck.schedule.actualization");

}