1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
|
package com.zdjizhi.common;
import com.zdjizhi.utils.system.FlowWriteConfigurations;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
/**
* @author Administrator
*/
public class FlowWriteConfig {
private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
static {
encryptor.setPassword("galaxy");
}
public static final int IF_PARAM_LENGTH = 3;
/**
* 有此标识的字段为失效字段,不计入最终日志字段
*/
public static final String VISIBILITY = "disabled";
/**
* 默认的切分符号
*/
public static final String FORMAT_SPLITTER = ",";
/**
* 标识字段为日志字段还是schema指定字段
*/
public static final String IS_JSON_KEY_TAG = "$.";
/**
* if函数连接分隔符
*/
public static final String IF_CONDITION_SPLITTER = "=";
/**
* 默认的字符串解析编码
*/
public static final String ENCODING = "UTF8";
/**
* Nacos
*/
public static final String NACOS_SERVER = FlowWriteConfigurations.getStringProperty(0, "nacos.server");
public static final String NACOS_SCHEMA_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.schema.namespace");
public static final String NACOS_COMMON_NAMESPACE = FlowWriteConfigurations.getStringProperty(0, "nacos.common.namespace");
public static final String NACOS_DATA_ID = FlowWriteConfigurations.getStringProperty(0, "nacos.data.id");
public static final String NACOS_PIN = FlowWriteConfigurations.getStringProperty(1, "nacos.pin");
public static final String NACOS_GROUP = FlowWriteConfigurations.getStringProperty(1, "nacos.group");
public static final String NACOS_USERNAME = FlowWriteConfigurations.getStringProperty(1, "nacos.username");
/**
* System config
*/
public static final Integer SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "source.parallelism");
public static final Integer SINK_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "sink.parallelism");
public static final Integer TRANSFORM_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "transform.parallelism");
public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num");
public static final Integer LOG_NEED_COMPLETE = FlowWriteConfigurations.getIntProperty(0, "log.need.complete");
public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(1, "mail.default.charset");
public static final Integer LOG_TRANSFORM_TYPE = FlowWriteConfigurations.getIntProperty(1, "log.transform.type");
public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout");
/**
* HBase
*/
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(1, "hbase.table.name");
/**
* kafka common
*/
public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user"));
public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.pin"));
/**
* kafka source config
*/
public static final String SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic");
public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id");
public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms");
public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records");
public static final String MAX_PARTITION_FETCH_BYTES = FlowWriteConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
/**
* kafka sink config
*/
public static final String SINK_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.topic");
public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(1, "producer.ack");
public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
/**
* connection kafka
*/
public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries");
public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms");
public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms");
public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size");
public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
/**
* http
*/
public static final String APP_ID_HTTP = FlowWriteConfigurations.getStringProperty(1, "app.id.http");
public static final Integer APP_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(1, "app.tick.tuple.freq.secs");
/**
* common config
*/
public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers");
public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers");
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
/*
* ck
* */
public static final String CK_HOSTS = FlowWriteConfigurations.getStringProperty(0, "ck.hosts");
public static final String CK_USERNAME = FlowWriteConfigurations.getStringProperty(0, "ck.username");
public static final String CK_PIN = FlowWriteConfigurations.getStringProperty(0, "ck.pin");
public static final String CK_DATABASE = FlowWriteConfigurations.getStringProperty(0, "ck.database");
public static final int CK_CONNECTION_TIMEOUT = FlowWriteConfigurations.getIntProperty(0, "ck.connection.timeout");
public static final int CK_SOCKET_TIMEOUT = FlowWriteConfigurations.getIntProperty(0, "ck.socket.timeout");
public static final int FLINK_WATERMARK_MAX_ORDERNESS = FlowWriteConfigurations.getIntProperty(0, "flink.watermark.max.orderness");
public static final int LOG_AGGREGATE_DURATION = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration");
public static final int LOG_AGGREGATE_DURATION_GRAPH = FlowWriteConfigurations.getIntProperty(0, "log.aggregate.duration.graph");
public static final String SOURCE_KAFKA_TOPIC_DNS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.dns");
;
public static final String SOURCE_KAFKA_TOPIC_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.connection");
public static final String SOURCE_KAFKA_TOPIC_SKETCH = FlowWriteConfigurations.getStringProperty(0, "source.kafka.topic.sketch");
//sink.ck.table
public static final String SINK_CK_TABLE_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.connection");
public static final String SINK_CK_TABLE_SKETCH = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.sketch");
public static final String SINK_CK_TABLE_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.dns");
public static final String SINK_CK_TABLE_RELATION_CONNECTION = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.connection");
public static final String SINK_CK_TABLE_RELATION_DNS = FlowWriteConfigurations.getStringProperty(0, "sink.ck.table.relation.dns");
public static final String R_VISIT_IP2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.visit.ip2ip");
public static final String R_CNAME_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.cname.domain2domain");
public static final String R_MX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.mx.domain2domain");
public static final String R_RESOLVE_DOMAIN2IP = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.resolve.domain2ip");
public static final String R_NX_DOMAIN2DOMAIN = FlowWriteConfigurations.getStringProperty(0, "sink.arango.table.r.nx.domain2domain");
public static final String ARANGODB_HOST = FlowWriteConfigurations.getStringProperty(0, "arangoDB.host");
public static final Integer ARANGODB_PORT = FlowWriteConfigurations.getIntProperty(0, "arangoDB.port");
public static final String ARANGODB_USER = FlowWriteConfigurations.getStringProperty(0, "arangoDB.user");
public static final String ARANGODB_PASSWORD = FlowWriteConfigurations.getStringProperty(0, "arangoDB.password");
public static final String ARANGODB_DB_NAME = FlowWriteConfigurations.getStringProperty(0, "arangoDB.DB.name");
public static final Integer ARANGODB_TTL = FlowWriteConfigurations.getIntProperty(0, "arangoDB.ttl");
public static final Integer ARANGODB_BATCH = FlowWriteConfigurations.getIntProperty(0, "arangoDB.batch");
public static final Integer UPDATE_ARANGO_BATCH = FlowWriteConfigurations.getIntProperty(0, "update.arango.batch");
public static final String ARANGODB_READ_LIMIT = FlowWriteConfigurations.getStringProperty(0, "arangoDB.read.limit");
public static final Integer THREAD_POOL_NUMBER = FlowWriteConfigurations.getIntProperty(0, "thread.pool.number");
public static final Integer THREAD_AWAIT_TERMINATION_TIME = FlowWriteConfigurations.getIntProperty(0, "thread.await.termination.time");
public static final Integer SINK_BATCH_TIME_OUT = FlowWriteConfigurations.getIntProperty(0, "sink.batch.time.out");
public static final Integer SINK_BATCH = FlowWriteConfigurations.getIntProperty(0, "sink.batch");
}
|