summaryrefslogtreecommitdiff
path: root/MSH-PIC/flink/topology
diff options
context:
space:
mode:
Diffstat (limited to 'MSH-PIC/flink/topology')
-rw-r--r--MSH-PIC/flink/topology/completion/config/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED78
-rw-r--r--MSH-PIC/flink/topology/completion/config/ETL-BGP-RECORD-COMPLETED78
-rw-r--r--MSH-PIC/flink/topology/completion/config/ETL-GTPC-RECORD-COMPLETED78
-rw-r--r--MSH-PIC/flink/topology/completion/config/ETL-INTERIM-SESSION-RECORD-COMPLETED78
-rw-r--r--MSH-PIC/flink/topology/completion/config/ETL-PROXY-EVENT-COMPLETED78
-rw-r--r--MSH-PIC/flink/topology/completion/config/ETL-RADIUS-RECORD-COMPLETED78
-rw-r--r--MSH-PIC/flink/topology/completion/config/ETL-SECURITY-EVENT-COMPLETED78
-rw-r--r--MSH-PIC/flink/topology/completion/config/ETL-SESSION-RECORD-COMPLETED78
-rw-r--r--MSH-PIC/flink/topology/completion/config/ETL-TRANSACTION-RECORD-COMPLETED78
-rw-r--r--MSH-PIC/flink/topology/completion/config/MIRROR-DOS-SKETCH-RECORD78
-rw-r--r--MSH-PIC/flink/topology/completion/config/MIRROR-GTPC-RECORD-COMPLETED78
-rw-r--r--MSH-PIC/flink/topology/completion/config/MIRROR-INTERNAL-PACKET-CAPTURE-EVENT78
-rw-r--r--MSH-PIC/flink/topology/completion/config/MIRROR-NETWORK-TRAFFIC-METRICS78
-rw-r--r--MSH-PIC/flink/topology/completion/config/MIRROR-POLICY-RULE-METRICS78
-rw-r--r--MSH-PIC/flink/topology/completion/config/MIRROR-PXY-EXCH-INTERMEDIA-CERT78
-rw-r--r--MSH-PIC/flink/topology/completion/config/MIRROR-SYS-PACKET-CAPTURE-EVENT78
-rw-r--r--MSH-PIC/flink/topology/completion/config/MIRROR-TRAFFIC-TOP-METRICS78
-rw-r--r--MSH-PIC/flink/topology/completion/config/MIRROR-VOIP-RECORD78
-rw-r--r--MSH-PIC/flink/topology/completion/service_flow_config.properties78
-rw-r--r--MSH-PIC/flink/topology/completion/start.sh67
-rw-r--r--MSH-PIC/flink/topology/completion/stop.sh34
-rw-r--r--MSH-PIC/flink/topology/data/asn_v4.mmdbbin0 -> 5873392 bytes
-rw-r--r--MSH-PIC/flink/topology/data/asn_v6.mmdbbin0 -> 3011336 bytes
-rw-r--r--MSH-PIC/flink/topology/data/ip_v4_built_in.mmdbbin0 -> 28626992 bytes
-rw-r--r--MSH-PIC/flink/topology/data/ip_v4_user_defined.mmdbbin0 -> 621 bytes
-rw-r--r--MSH-PIC/flink/topology/data/ip_v6_built_in.mmdbbin0 -> 7560407 bytes
-rw-r--r--MSH-PIC/flink/topology/data/ip_v6_user_defined.mmdbbin0 -> 1197 bytes
-rw-r--r--MSH-PIC/flink/topology/data/keystore.jksbin0 -> 787 bytes
-rw-r--r--MSH-PIC/flink/topology/data/truststore.jksbin0 -> 583 bytes
-rw-r--r--MSH-PIC/flink/topology/relationship-gtpc-user/config/RELATIONSHIP-GTPC-USER33
-rw-r--r--MSH-PIC/flink/topology/relationship-gtpc-user/service_flow_config.properties33
-rw-r--r--MSH-PIC/flink/topology/relationship-gtpc-user/start.sh67
-rw-r--r--MSH-PIC/flink/topology/relationship-gtpc-user/stop.sh34
-rw-r--r--MSH-PIC/flink/topology/relationship-radius-account/config/RELATIONSHIP-RADIUS-ACCOUNT28
-rw-r--r--MSH-PIC/flink/topology/relationship-radius-account/service_flow_config.properties28
-rw-r--r--MSH-PIC/flink/topology/relationship-radius-account/start.sh67
-rw-r--r--MSH-PIC/flink/topology/relationship-radius-account/stop.sh34
37 files changed, 1907 insertions, 0 deletions
diff --git a/MSH-PIC/flink/topology/completion/config/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED b/MSH-PIC/flink/topology/completion/config/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED
new file mode 100644
index 0000000..ad22a08
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/config/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED
@@ -0,0 +1,78 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#用于分配log_id、连接hbase的zookeeper地址
+zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#hdfs地址用于获取定位库
+hdfs.servers=192.168.20.193:9000,192.168.20.194:9000
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.storage.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/
+
+#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高
+knowledgebase.type.list=ip_location,asn
+
+#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name
+knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6
+
+#工具库地址,存放秘钥文件等。
+tools.library=/home/tsg/olap/topology/data/
+
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.20.252:8848
+
+#schema namespace名称
+nacos.schema.namespace=MSH
+
+#schema data id名称
+nacos.schema.data.id=active_defence_event.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
+
+#--------------------------------Kafka消费/生产配置------------------------------#
+#kafka 接收数据topic
+source.kafka.topic=ACTIVE-DEFENCE-EVENT
+
+#补全数据 输出 topic
+sink.kafka.topic=ACTIVE-DEFENCE-EVENT-COMPLETED
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=active-defence-log-20220408-1
+
+#--------------------------------topology配置------------------------------#
+#consumer 并行度
+source.parallelism=3
+
+#转换函数并行度
+transform.parallelism=3
+
+#kafka producer 并行度
+sink.parallelism=3
+
+#数据中心,取值范围(0-31)
+data.center.id.num=1
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#--------------------------------默认值配置------------------------------#
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=1
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
diff --git a/MSH-PIC/flink/topology/completion/config/ETL-BGP-RECORD-COMPLETED b/MSH-PIC/flink/topology/completion/config/ETL-BGP-RECORD-COMPLETED
new file mode 100644
index 0000000..330ab1a
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/config/ETL-BGP-RECORD-COMPLETED
@@ -0,0 +1,78 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#用于分配log_id、连接hbase的zookeeper地址
+zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#hdfs地址用于获取定位库
+hdfs.servers=192.168.20.193:9000,192.168.20.194:9000
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.storage.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.file.storage.path=/knowledgebase/ETL-BGP-RECORD-COMPLETED/
+
+#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高
+knowledgebase.type.list=ip_location,asn
+
+#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name
+knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6
+
+#工具库地址,存放秘钥文件等。
+tools.library=/home/tsg/olap/topology/data/
+
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.20.252:8848
+
+#schema namespace名称
+nacos.schema.namespace=MSH
+
+#schema data id名称
+nacos.schema.data.id=bgp_record.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
+
+#--------------------------------Kafka消费/生产配置------------------------------#
+#kafka 接收数据topic
+source.kafka.topic=BGP-RECORD
+
+#补全数据 输出 topic
+sink.kafka.topic=BGP-RECORD-COMPLETED
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=bgp-record-20220801-1
+
+#--------------------------------topology配置------------------------------#
+#consumer 并行度
+source.parallelism=3
+
+#转换函数并行度
+transform.parallelism=3
+
+#kafka producer 并行度
+sink.parallelism=3
+
+#数据中心,取值范围(0-31)
+data.center.id.num=2
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#--------------------------------默认值配置------------------------------#
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=1
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
diff --git a/MSH-PIC/flink/topology/completion/config/ETL-GTPC-RECORD-COMPLETED b/MSH-PIC/flink/topology/completion/config/ETL-GTPC-RECORD-COMPLETED
new file mode 100644
index 0000000..b12330c
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/config/ETL-GTPC-RECORD-COMPLETED
@@ -0,0 +1,78 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#用于分配log_id、连接hbase的zookeeper地址
+zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#hdfs地址用于获取定位库
+hdfs.servers=192.168.20.193:9000,192.168.20.194:9000
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.storage.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.file.storage.path=/knowledgebase/ETL-GTPC-RECORD-COMPLETED/
+
+#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高
+knowledgebase.type.list=ip_location,asn
+
+#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name
+knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6
+
+#工具库地址,存放秘钥文件等。
+tools.library=/home/tsg/olap/topology/data/
+
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.20.252:8848
+
+#schema namespace名称
+nacos.schema.namespace=MSH
+
+#schema data id名称
+nacos.schema.data.id=gtpc_record.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
+
+#--------------------------------Kafka消费/生产配置------------------------------#
+#kafka 接收数据topic
+source.kafka.topic=GTPC-RECORD
+
+#补全数据 输出 topic
+sink.kafka.topic=GTPC-RECORD-COMPLETED
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=gtpc-record-log-20220408-1
+
+#--------------------------------topology配置------------------------------#
+#consumer 并行度
+source.parallelism=3
+
+#转换函数并行度
+transform.parallelism=3
+
+#kafka producer 并行度
+sink.parallelism=3
+
+#数据中心,取值范围(0-31)
+data.center.id.num=3
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#--------------------------------默认值配置------------------------------#
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=1
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
diff --git a/MSH-PIC/flink/topology/completion/config/ETL-INTERIM-SESSION-RECORD-COMPLETED b/MSH-PIC/flink/topology/completion/config/ETL-INTERIM-SESSION-RECORD-COMPLETED
new file mode 100644
index 0000000..7d30bd9
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/config/ETL-INTERIM-SESSION-RECORD-COMPLETED
@@ -0,0 +1,78 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#用于分配log_id、连接hbase的zookeeper地址
+zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#hdfs地址用于获取定位库
+hdfs.servers=192.168.20.193:9000,192.168.20.194:9000
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.storage.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.file.storage.path=/knowledgebase/ETL-INTERIM-SESSION-RECORD-COMPLETED/
+
+#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高
+knowledgebase.type.list=ip_location,asn
+
+#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name
+knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6
+
+#工具库地址,存放秘钥文件等。
+tools.library=/home/tsg/olap/topology/data/
+
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.20.252:8848
+
+#schema namespace名称
+nacos.schema.namespace=MSH
+
+#schema data id名称
+nacos.schema.data.id=interim_session_record.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
+
+#--------------------------------Kafka消费/生产配置------------------------------#
+#kafka 接收数据topic
+source.kafka.topic=INTERIM-SESSION-RECORD
+
+#补全数据 输出 topic
+sink.kafka.topic=INTERIM-SESSION-RECORD-COMPLETED
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=linterim-session-record-20220408-1
+
+#--------------------------------topology配置------------------------------#
+#consumer 并行度
+source.parallelism=3
+
+#转换函数并行度
+transform.parallelism=3
+
+#kafka producer 并行度
+sink.parallelism=3
+
+#数据中心,取值范围(0-31)
+data.center.id.num=4
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#--------------------------------默认值配置------------------------------#
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=1
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
diff --git a/MSH-PIC/flink/topology/completion/config/ETL-PROXY-EVENT-COMPLETED b/MSH-PIC/flink/topology/completion/config/ETL-PROXY-EVENT-COMPLETED
new file mode 100644
index 0000000..74af8f9
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/config/ETL-PROXY-EVENT-COMPLETED
@@ -0,0 +1,78 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#用于分配log_id、连接hbase的zookeeper地址
+zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#hdfs地址用于获取定位库
+hdfs.servers=192.168.20.193:9000,192.168.20.194:9000
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.storage.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.file.storage.path=/knowledgebase/ETL-PROXY-EVENT-COMPLETED/
+
+#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高
+knowledgebase.type.list=ip_location,asn
+
+#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name
+knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6
+
+#工具库地址,存放秘钥文件等。
+tools.library=/home/tsg/olap/topology/data/
+
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.20.252:8848
+
+#schema namespace名称
+nacos.schema.namespace=MSH
+
+#schema data id名称
+nacos.schema.data.id=proxy_event.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
+
+#--------------------------------Kafka消费/生产配置------------------------------#
+#kafka 接收数据topic
+source.kafka.topic=PROXY-EVENT
+
+#补全数据 输出 topic
+sink.kafka.topic=PROXY-EVENT-COMPLETED
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=proxy-event-20220408-1
+
+#--------------------------------topology配置------------------------------#
+#consumer 并行度
+source.parallelism=24
+
+#转换函数并行度
+transform.parallelism=24
+
+#kafka producer 并行度
+sink.parallelism=24
+
+#数据中心,取值范围(0-31)
+data.center.id.num=5
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#--------------------------------默认值配置------------------------------#
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=1
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
diff --git a/MSH-PIC/flink/topology/completion/config/ETL-RADIUS-RECORD-COMPLETED b/MSH-PIC/flink/topology/completion/config/ETL-RADIUS-RECORD-COMPLETED
new file mode 100644
index 0000000..120a247
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/config/ETL-RADIUS-RECORD-COMPLETED
@@ -0,0 +1,78 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#用于分配log_id、连接hbase的zookeeper地址
+zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#hdfs地址用于获取定位库
+hdfs.servers=192.168.20.193:9000,192.168.20.194:9000
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.storage.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.file.storage.path=/knowledgebase/ETL-RADIUS-RECORD-COMPLETED/
+
+#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高
+knowledgebase.type.list=ip_location,asn
+
+#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name
+knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6
+
+#工具库地址,存放秘钥文件等。
+tools.library=/home/tsg/olap/topology/data/
+
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.20.252:8848
+
+#schema namespace名称
+nacos.schema.namespace=MSH
+
+#schema data id名称
+nacos.schema.data.id=radius_record.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
+
+#--------------------------------Kafka消费/生产配置------------------------------#
+#kafka 接收数据topic
+source.kafka.topic=RADIUS-RECORD
+
+#补全数据 输出 topic
+sink.kafka.topic=RADIUS-RECORD-COMPLETED
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=radius-record-log-20220408-1
+
+#--------------------------------topology配置------------------------------#
+#consumer 并行度
+source.parallelism=3
+
+#转换函数并行度
+transform.parallelism=3
+
+#kafka producer 并行度
+sink.parallelism=3
+
+#数据中心,取值范围(0-31)
+data.center.id.num=6
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#--------------------------------默认值配置------------------------------#
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=1
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
diff --git a/MSH-PIC/flink/topology/completion/config/ETL-SECURITY-EVENT-COMPLETED b/MSH-PIC/flink/topology/completion/config/ETL-SECURITY-EVENT-COMPLETED
new file mode 100644
index 0000000..9e22783
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/config/ETL-SECURITY-EVENT-COMPLETED
@@ -0,0 +1,78 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#用于分配log_id、连接hbase的zookeeper地址
+zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#hdfs地址用于获取定位库
+hdfs.servers=192.168.20.193:9000,192.168.20.194:9000
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.storage.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.file.storage.path=/knowledgebase/ETL-SECURITY-EVENT-COMPLETED/
+
+#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高
+knowledgebase.type.list=ip_location,asn
+
+#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name
+knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6
+
+#工具库地址,存放秘钥文件等。
+tools.library=/home/tsg/olap/topology/data/
+
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.20.252:8848
+
+#schema namespace名称
+nacos.schema.namespace=MSH
+
+#schema data id名称
+nacos.schema.data.id=security_event.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
+
+#--------------------------------Kafka消费/生产配置------------------------------#
+#kafka 接收数据topic
+source.kafka.topic=SECURITY-EVENT
+
+#补全数据 输出 topic
+sink.kafka.topic=SECURITY-EVENT-COMPLETED
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=security-event-log-20220408-1
+
+#--------------------------------topology配置------------------------------#
+#consumer 并行度
+source.parallelism=24
+
+#转换函数并行度
+transform.parallelism=24
+
+#kafka producer 并行度
+sink.parallelism=24
+
+#数据中心,取值范围(0-31)
+data.center.id.num=7
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#--------------------------------默认值配置------------------------------#
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=1
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
diff --git a/MSH-PIC/flink/topology/completion/config/ETL-SESSION-RECORD-COMPLETED b/MSH-PIC/flink/topology/completion/config/ETL-SESSION-RECORD-COMPLETED
new file mode 100644
index 0000000..1638f99
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/config/ETL-SESSION-RECORD-COMPLETED
@@ -0,0 +1,78 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#用于分配log_id、连接hbase的zookeeper地址
+zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#hdfs地址用于获取定位库
+hdfs.servers=192.168.20.193:9000,192.168.20.194:9000
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.storage.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.file.storage.path=/knowledgebase/ETL-SESSION-RECORD-COMPLETED/
+
+#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高
+knowledgebase.type.list=ip_location,asn
+
+#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name
+knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6
+
+#工具库地址,存放秘钥文件等。
+tools.library=/home/tsg/olap/topology/data/
+
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.20.252:8848
+
+#schema namespace名称
+nacos.schema.namespace=MSH
+
+#schema data id名称
+nacos.schema.data.id=session_record.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
+
+#--------------------------------Kafka消费/生产配置------------------------------#
+#kafka 接收数据topic
+source.kafka.topic=SESSION-RECORD
+
+#补全数据 输出 topic
+sink.kafka.topic=SESSION-RECORD-COMPLETED
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=session-record-log-20220408-1
+
+#--------------------------------topology配置------------------------------#
+#consumer 并行度
+source.parallelism=3
+
+#转换函数并行度
+transform.parallelism=3
+
+#kafka producer 并行度
+sink.parallelism=3
+
+#数据中心,取值范围(0-31)
+data.center.id.num=8
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#--------------------------------默认值配置------------------------------#
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=1
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
diff --git a/MSH-PIC/flink/topology/completion/config/ETL-TRANSACTION-RECORD-COMPLETED b/MSH-PIC/flink/topology/completion/config/ETL-TRANSACTION-RECORD-COMPLETED
new file mode 100644
index 0000000..a862abc
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/config/ETL-TRANSACTION-RECORD-COMPLETED
@@ -0,0 +1,78 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#用于分配log_id、连接hbase的zookeeper地址
+zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#hdfs地址用于获取定位库
+hdfs.servers=192.168.20.193:9000,192.168.20.194:9000
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.storage.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.file.storage.path=/knowledgebase/ETL-TRANSACTION-RECORD-COMPLETED/
+
+#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高
+knowledgebase.type.list=ip_location,asn
+
+#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name
+knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6
+
+#工具库地址,存放秘钥文件等。
+tools.library=/home/tsg/olap/topology/data/
+
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.20.252:8848
+
+#schema namespace名称
+nacos.schema.namespace=MSH
+
+#schema data id名称
+nacos.schema.data.id=transaction_record.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
+
+#--------------------------------Kafka消费/生产配置------------------------------#
+#kafka 接收数据topic
+source.kafka.topic=TRANSACTION-RECORD
+
+#补全数据 输出 topic
+sink.kafka.topic=TRANSACTION-RECORD-COMPLETED
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=transaction-record-20220408-1
+
+#--------------------------------topology配置------------------------------#
+#consumer 并行度
+source.parallelism=3
+
+#转换函数并行度
+transform.parallelism=3
+
+#kafka producer 并行度
+sink.parallelism=3
+
+#数据中心,取值范围(0-31)
+data.center.id.num=10
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#--------------------------------默认值配置------------------------------#
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=1
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
diff --git a/MSH-PIC/flink/topology/completion/config/MIRROR-DOS-SKETCH-RECORD b/MSH-PIC/flink/topology/completion/config/MIRROR-DOS-SKETCH-RECORD
new file mode 100644
index 0000000..9996c66
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/config/MIRROR-DOS-SKETCH-RECORD
@@ -0,0 +1,78 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.20.223:9094,192.168.20.224:9094,192.168.20.225:9094
+
+#用于分配log_id、连接hbase的zookeeper地址
+zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#hdfs地址用于获取定位库
+hdfs.servers=192.168.20.193:9000,192.168.20.194:9000
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.storage.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/
+
+#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高
+knowledgebase.type.list=ip_location,asn
+
+#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name
+knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6
+
+#工具库地址,存放秘钥文件等。
+tools.library=/home/tsg/olap/topology/data/
+
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.20.252:8848
+
+#schema namespace名称
+nacos.schema.namespace=MSH
+
+#schema data id名称
+nacos.schema.data.id=active_defence_event.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
+
+#--------------------------------Kafka消费/生产配置------------------------------#
+#kafka 接收数据topic
+source.kafka.topic=DOS-SKETCH-RECORD
+
+#补全数据 输出 topic
+sink.kafka.topic=DOS-SKETCH-RECORD
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=dos-sketch-record-20230629-1
+
+#--------------------------------topology配置------------------------------#
+#consumer 并行度
+source.parallelism=3
+
+#转换函数并行度
+transform.parallelism=3
+
+#kafka producer 并行度
+sink.parallelism=3
+
+#数据中心,取值范围(0-31)
+data.center.id.num=1
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#--------------------------------默认值配置------------------------------#
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=0
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
diff --git a/MSH-PIC/flink/topology/completion/config/MIRROR-GTPC-RECORD-COMPLETED b/MSH-PIC/flink/topology/completion/config/MIRROR-GTPC-RECORD-COMPLETED
new file mode 100644
index 0000000..7c2ab7f
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/config/MIRROR-GTPC-RECORD-COMPLETED
@@ -0,0 +1,78 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.20.223:9094,192.168.20.224:9094,192.168.20.225:9094
+
+#用于分配log_id、连接hbase的zookeeper地址
+zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#hdfs地址用于获取定位库
+hdfs.servers=192.168.20.193:9000,192.168.20.194:9000
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.storage.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/
+
+#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高
+knowledgebase.type.list=ip_location,asn
+
+#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name
+knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6
+
+#工具库地址,存放秘钥文件等。
+tools.library=/home/tsg/olap/topology/data/
+
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.20.252:8848
+
+#schema namespace名称
+nacos.schema.namespace=MSH
+
+#schema data id名称
+nacos.schema.data.id=active_defence_event.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
+
+#--------------------------------Kafka消费/生产配置------------------------------#
+#kafka 接收数据topic
+source.kafka.topic=GTPC-RECORD-COMPLETED
+
+#补全数据 输出 topic
+sink.kafka.topic=GTPC-RECORD-COMPLETED
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=gtpc-record-completed-20230629-1
+
+#--------------------------------topology配置------------------------------#
+#consumer 并行度
+source.parallelism=3
+
+#转换函数并行度
+transform.parallelism=3
+
+#kafka producer 并行度
+sink.parallelism=3
+
+#数据中心,取值范围(0-31)
+data.center.id.num=1
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#--------------------------------默认值配置------------------------------#
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=0
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
diff --git a/MSH-PIC/flink/topology/completion/config/MIRROR-INTERNAL-PACKET-CAPTURE-EVENT b/MSH-PIC/flink/topology/completion/config/MIRROR-INTERNAL-PACKET-CAPTURE-EVENT
new file mode 100644
index 0000000..c4363e4
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/config/MIRROR-INTERNAL-PACKET-CAPTURE-EVENT
@@ -0,0 +1,78 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.20.223:9094,192.168.20.224:9094,192.168.20.225:9094
+
+#用于分配log_id、连接hbase的zookeeper地址
+zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#hdfs地址用于获取定位库
+hdfs.servers=192.168.20.193:9000,192.168.20.194:9000
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.storage.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/
+
+#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高
+knowledgebase.type.list=ip_location,asn
+
+#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name
+knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6
+
+#工具库地址,存放秘钥文件等。
+tools.library=/home/tsg/olap/topology/data/
+
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.20.252:8848
+
+#schema namespace名称
+nacos.schema.namespace=MSH
+
+#schema data id名称
+nacos.schema.data.id=active_defence_event.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
+
+#--------------------------------Kafka消费/生产配置------------------------------#
+#kafka 接收数据topic
+source.kafka.topic=INTERNAL-PACKET-CAPTURE-EVENT
+
+#补全数据 输出 topic
+sink.kafka.topic=INTERNAL-PACKET-CAPTURE-EVENT
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=internal-packet-capture-event-20230629-1
+
+#--------------------------------topology配置------------------------------#
+#consumer 并行度
+source.parallelism=3
+
+#转换函数并行度
+transform.parallelism=3
+
+#kafka producer 并行度
+sink.parallelism=3
+
+#数据中心,取值范围(0-31)
+data.center.id.num=1
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#--------------------------------默认值配置------------------------------#
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=0
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
diff --git a/MSH-PIC/flink/topology/completion/config/MIRROR-NETWORK-TRAFFIC-METRICS b/MSH-PIC/flink/topology/completion/config/MIRROR-NETWORK-TRAFFIC-METRICS
new file mode 100644
index 0000000..45ef126
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/config/MIRROR-NETWORK-TRAFFIC-METRICS
@@ -0,0 +1,78 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.20.223:9094,192.168.20.224:9094,192.168.20.225:9094
+
+#用于分配log_id、连接hbase的zookeeper地址
+zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#hdfs地址用于获取定位库
+hdfs.servers=192.168.20.193:9000,192.168.20.194:9000
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.storage.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/
+
+#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高
+knowledgebase.type.list=ip_location,asn
+
+#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name
+knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6
+
+#工具库地址,存放秘钥文件等。
+tools.library=/home/tsg/olap/topology/data/
+
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.20.252:8848
+
+#schema namespace名称
+nacos.schema.namespace=MSH
+
+#schema data id名称
+nacos.schema.data.id=active_defence_event.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
+
+#--------------------------------Kafka消费/生产配置------------------------------#
+#kafka 接收数据topic
+source.kafka.topic=NETWORK-TRAFFIC-METRICS
+
+#补全数据 输出 topic
+sink.kafka.topic=NETWORK-TRAFFIC-METRICS
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=network-traffic-metrics-20230629-1
+
+#--------------------------------topology配置------------------------------#
+#consumer 并行度
+source.parallelism=3
+
+#转换函数并行度
+transform.parallelism=3
+
+#kafka producer 并行度
+sink.parallelism=3
+
+#数据中心,取值范围(0-31)
+data.center.id.num=1
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#--------------------------------默认值配置------------------------------#
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=0
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
diff --git a/MSH-PIC/flink/topology/completion/config/MIRROR-POLICY-RULE-METRICS b/MSH-PIC/flink/topology/completion/config/MIRROR-POLICY-RULE-METRICS
new file mode 100644
index 0000000..da02fdf
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/config/MIRROR-POLICY-RULE-METRICS
@@ -0,0 +1,78 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.20.223:9094,192.168.20.224:9094,192.168.20.225:9094
+
+#用于分配log_id、连接hbase的zookeeper地址
+zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#hdfs地址用于获取定位库
+hdfs.servers=192.168.20.193:9000,192.168.20.194:9000
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.storage.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/
+
+#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高
+knowledgebase.type.list=ip_location,asn
+
+#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name
+knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6
+
+#工具库地址,存放秘钥文件等。
+tools.library=/home/tsg/olap/topology/data/
+
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.20.252:8848
+
+#schema namespace名称
+nacos.schema.namespace=MSH
+
+#schema data id名称
+nacos.schema.data.id=active_defence_event.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
+
+#--------------------------------Kafka消费/生产配置------------------------------#
+#kafka 接收数据topic
+source.kafka.topic=POLICY-RULE-METRICS
+
+#补全数据 输出 topic
+sink.kafka.topic=POLICY-RULE-METRICS
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=policy-rule-metrics-20230629-1
+
+#--------------------------------topology配置------------------------------#
+#consumer 并行度
+source.parallelism=3
+
+#转换函数并行度
+transform.parallelism=3
+
+#kafka producer 并行度
+sink.parallelism=3
+
+#数据中心,取值范围(0-31)
+data.center.id.num=1
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#--------------------------------默认值配置------------------------------#
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=0
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
diff --git a/MSH-PIC/flink/topology/completion/config/MIRROR-PXY-EXCH-INTERMEDIA-CERT b/MSH-PIC/flink/topology/completion/config/MIRROR-PXY-EXCH-INTERMEDIA-CERT
new file mode 100644
index 0000000..d7fc346
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/config/MIRROR-PXY-EXCH-INTERMEDIA-CERT
@@ -0,0 +1,78 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.20.223:9094,192.168.20.224:9094,192.168.20.225:9094
+
+#用于分配log_id、连接hbase的zookeeper地址
+zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#hdfs地址用于获取定位库
+hdfs.servers=192.168.20.193:9000,192.168.20.194:9000
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.storage.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/
+
+#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高
+knowledgebase.type.list=ip_location,asn
+
+#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name
+knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6
+
+#工具库地址,存放秘钥文件等。
+tools.library=/home/tsg/olap/topology/data/
+
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.20.252:8848
+
+#schema namespace名称
+nacos.schema.namespace=MSH
+
+#schema data id名称
+nacos.schema.data.id=active_defence_event.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
+
+#--------------------------------Kafka消费/生产配置------------------------------#
+#kafka 接收数据topic
+source.kafka.topic=PXY-EXCH-INTERMEDIA-CERT
+
+#补全数据 输出 topic
+sink.kafka.topic=PXY-EXCH-INTERMEDIA-CERT
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=pxy-exch-intermedia-cert-1
+
+#--------------------------------topology配置------------------------------#
+#consumer 并行度
+source.parallelism=3
+
+#转换函数并行度
+transform.parallelism=3
+
+#kafka producer 并行度
+sink.parallelism=3
+
+#数据中心,取值范围(0-31)
+data.center.id.num=1
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#--------------------------------默认值配置------------------------------#
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=0
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
diff --git a/MSH-PIC/flink/topology/completion/config/MIRROR-SYS-PACKET-CAPTURE-EVENT b/MSH-PIC/flink/topology/completion/config/MIRROR-SYS-PACKET-CAPTURE-EVENT
new file mode 100644
index 0000000..b361532
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/config/MIRROR-SYS-PACKET-CAPTURE-EVENT
@@ -0,0 +1,78 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.20.223:9094,192.168.20.224:9094,192.168.20.225:9094
+
+#用于分配log_id、连接hbase的zookeeper地址
+zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#hdfs地址用于获取定位库
+hdfs.servers=192.168.20.193:9000,192.168.20.194:9000
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.storage.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/
+
+#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高
+knowledgebase.type.list=ip_location,asn
+
+#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name
+knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6
+
+#工具库地址,存放秘钥文件等。
+tools.library=/home/tsg/olap/topology/data/
+
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.20.252:8848
+
+#schema namespace名称
+nacos.schema.namespace=MSH
+
+#schema data id名称
+nacos.schema.data.id=active_defence_event.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
+
+#--------------------------------Kafka消费/生产配置------------------------------#
+#kafka 接收数据topic
+source.kafka.topic=SYS-PACKET-CAPTURE-EVENT
+
+#补全数据 输出 topic
+sink.kafka.topic=SYS-PACKET-CAPTURE-EVENT
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=sys-packet-capture-event-20230629-1
+
+#--------------------------------topology配置------------------------------#
+#consumer 并行度
+source.parallelism=3
+
+#转换函数并行度
+transform.parallelism=3
+
+#kafka producer 并行度
+sink.parallelism=3
+
+#数据中心,取值范围(0-31)
+data.center.id.num=1
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#--------------------------------默认值配置------------------------------#
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=0
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
diff --git a/MSH-PIC/flink/topology/completion/config/MIRROR-TRAFFIC-TOP-METRICS b/MSH-PIC/flink/topology/completion/config/MIRROR-TRAFFIC-TOP-METRICS
new file mode 100644
index 0000000..5ee5b31
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/config/MIRROR-TRAFFIC-TOP-METRICS
@@ -0,0 +1,78 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.20.223:9094,192.168.20.224:9094,192.168.20.225:9094
+
+#用于分配log_id、连接hbase的zookeeper地址
+zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#hdfs地址用于获取定位库
+hdfs.servers=192.168.20.193:9000,192.168.20.194:9000
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.storage.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/
+
+#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高
+knowledgebase.type.list=ip_location,asn
+
+#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name
+knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6
+
+#工具库地址,存放秘钥文件等。
+tools.library=/home/tsg/olap/topology/data/
+
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.20.252:8848
+
+#schema namespace名称
+nacos.schema.namespace=MSH
+
+#schema data id名称
+nacos.schema.data.id=active_defence_event.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
+
+#--------------------------------Kafka消费/生产配置------------------------------#
+#kafka 接收数据topic
+source.kafka.topic=TRAFFIC-TOP-METRICS
+
+#补全数据 输出 topic
+sink.kafka.topic=TRAFFIC-TOP-METRICS
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=traffic-top-metrics-20230629-1
+
+#--------------------------------topology配置------------------------------#
+#consumer 并行度
+source.parallelism=3
+
+#转换函数并行度
+transform.parallelism=3
+
+#kafka producer 并行度
+sink.parallelism=3
+
+#数据中心,取值范围(0-31)
+data.center.id.num=1
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#--------------------------------默认值配置------------------------------#
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=0
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
diff --git a/MSH-PIC/flink/topology/completion/config/MIRROR-VOIP-RECORD b/MSH-PIC/flink/topology/completion/config/MIRROR-VOIP-RECORD
new file mode 100644
index 0000000..21e7267
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/config/MIRROR-VOIP-RECORD
@@ -0,0 +1,78 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.20.223:9094,192.168.20.224:9094,192.168.20.225:9094
+
+#用于分配log_id、连接hbase的zookeeper地址
+zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#hdfs地址用于获取定位库
+hdfs.servers=192.168.20.193:9000,192.168.20.194:9000
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.storage.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.file.storage.path=/knowledgebase/ETL-ACTIVE-DEFENCE-EVENT-COMPLETED/
+
+#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高
+knowledgebase.type.list=ip_location,asn
+
+#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name
+knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6
+
+#工具库地址,存放秘钥文件等。
+tools.library=/home/tsg/olap/topology/data/
+
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.20.252:8848
+
+#schema namespace名称
+nacos.schema.namespace=MSH
+
+#schema data id名称
+nacos.schema.data.id=active_defence_event.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
+
+#--------------------------------Kafka消费/生产配置------------------------------#
+#kafka 接收数据topic
+source.kafka.topic=VOIP-RECORD
+
+#补全数据 输出 topic
+sink.kafka.topic=VOIP-RECORD
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=voip-record-20230629-1
+
+#--------------------------------topology配置------------------------------#
+#consumer 并行度
+source.parallelism=3
+
+#转换函数并行度
+transform.parallelism=3
+
+#kafka producer 并行度
+sink.parallelism=3
+
+#数据中心,取值范围(0-31)
+data.center.id.num=1
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#--------------------------------默认值配置------------------------------#
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=0
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
diff --git a/MSH-PIC/flink/topology/completion/service_flow_config.properties b/MSH-PIC/flink/topology/completion/service_flow_config.properties
new file mode 100644
index 0000000..5527d21
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/service_flow_config.properties
@@ -0,0 +1,78 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+source.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#用于分配log_id、连接hbase的zookeeper地址
+zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#hdfs地址用于获取定位库
+hdfs.servers=192.168.20.193:9000,192.168.20.194:9000
+
+#--------------------------------HTTP/定位库------------------------------#
+#定位库存储文件系统类型,hdfs or local
+knowledgebase.file.storage.type=hdfs
+
+#定位库地址,根据file.system.type配置填写对应地址路径。
+knowledgebase.file.storage.path=/knowledgebase/ETL-SECURITY-EVENT-COMPLETED/
+
+#从知识库元数据中需要获取的文件type列表,配置为空则不过滤type;优先级比name高
+knowledgebase.type.list=ip_location,asn
+
+#从知识库元数据中需要获取文件的name列表;配置为空则不过滤name
+knowledgebase.name.list=ip_v4_built_in,ip_v6_built_in,ip_v4_user_defined,ip_v6_user_defined,asn_v4,asn_v6
+
+#工具库地址,存放秘钥文件等。
+tools.library=/home/tsg/olap/topology/dat/
+
+#--------------------------------nacos配置------------------------------#
+#nacos 地址
+nacos.server=192.168.20.252:8848
+
+#schema namespace名称
+nacos.schema.namespace=MSH
+
+#schema data id名称
+nacos.schema.data.id=security_event.json
+
+#knowledgebase namespace名称
+nacos.knowledgebase.namespace=
+
+#knowledgebase data id名称
+nacos.knowledgebase.data.id=knowledge_base.json
+
+#--------------------------------Kafka消费/生产配置------------------------------#
+#kafka 接收数据topic
+source.kafka.topic=SECURITY-EVENT
+
+#补全数据 输出 topic
+sink.kafka.topic=SECURITY-EVENT-COMPLETED
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=security-event-log-20220408-1
+
+#--------------------------------topology配置------------------------------#
+#consumer 并行度
+source.parallelism=24
+
+#转换函数并行度
+transform.parallelism=24
+
+#kafka producer 并行度
+sink.parallelism=24
+
+#数据中心,取值范围(0-31)
+data.center.id.num=7
+
+#hbase 更新时间,如填写0则不更新缓存
+hbase.tick.tuple.freq.secs=180
+
+#--------------------------------默认值配置------------------------------#
+
+#0不需要补全原样输出日志,1需要补全
+log.need.complete=1
+
+#生产者压缩模式 none or snappy
+producer.kafka.compression.type=snappy
diff --git a/MSH-PIC/flink/topology/completion/start.sh b/MSH-PIC/flink/topology/completion/start.sh
new file mode 100644
index 0000000..28760f5
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/start.sh
@@ -0,0 +1,67 @@
+#!/bin/bash
+#启动storm任务脚本
+source /etc/profile
+#任务jar所在目录
+BASE_DIR=$(pwd)
+
+#######################参数配置####################################
+#yarn任务运行模式 per-job 或 session
+TASK_MODE="per-job"
+#更新jar的名字
+PRIMORDIAL='log-completion-schema-230607-FastJson2.jar'
+#jar name
+JAR_NAME='log-completion-schema_tmp.jar'
+
+MAIN_CLASS="com.zdjizhi.topology.LogFlowWriteTopology"
+SESSION_CLUSTER="Flink session cluster"
+CONFIG_NAME="service_flow_config.properties"
+JOBMANAGER_MEMORY="1024m"
+TASKMANAGER_MEMORY="4096m"
+TASK_SLOTS=3
+#######################参数配置####################################
+
+APPLICATION_ID=$(yarn application -list | grep "$SESSION_CLUSTER" | awk '{print $1}')
+
+yes | cp -r $PRIMORDIAL $JAR_NAME
+
+#cd $BASE_DIR
+jar -xvf $BASE_DIR/$JAR_NAME $CONFIG_NAME
+function read_dir() {
+ for file in $(ls $1); do
+ if [ -d $1"/"$file ]; then
+ read_dir $1"/"$file
+ else
+ if [[ -z $TASK_MODE || $TASK_MODE == "per-job" ]]; then
+ num=$(yarn application -list | grep $file | wc -l)
+ if [ $num -eq "0" ]; then
+ cat $1$file >$BASE_DIR/$CONFIG_NAME
+ jar -uvf $BASE_DIR/$JAR_NAME $CONFIG_NAME
+ if [[ $file == "ETL-PROXY-EVENT-COMPLETED" || $file == "ETL-SECURITY-EVENT-COMPLETED" ]]; then
+ flink run -t yarn-per-job -Djobmanager.memory.process.size=2048m -Dtaskmanager.memory.process.size=5120m -Dyarn.application.name=$file -Dtaskmanager.numberOfTaskSlots=8 -d -c $MAIN_CLASS $BASE_DIR/$JAR_NAME $file
+ else
+ flink run -t yarn-per-job -Djobmanager.memory.process.size=$JOBMANAGER_MEMORY -Dtaskmanager.memory.process.size=$TASKMANAGER_MEMORY -Dyarn.application.name=$file -Dtaskmanager.numberOfTaskSlots=$TASK_SLOTS -d -c $MAIN_CLASS $BASE_DIR/$JAR_NAME $file
+ sleep 10
+ fi
+ fi
+ fi
+ if [[ -n $APPLICATION_ID && (-z $TASK_MODE || $TASK_MODE == "session") ]]; then
+ num=$(flink list | grep "$file" | grep -v flink | wc -l)
+ if [ $num -eq "0" ]; then
+ cat $1$file >$BASE_DIR/$CONFIG_NAME
+ jar -uvf $BASE_DIR/$JAR_NAME $CONFIG_NAME
+ #session
+ flink run -t yarn-session -Dyarn.application.id=$APPLICATION_ID -d -c $MAIN_CLASS $BASE_DIR/$JAR_NAME $file
+ sleep 10
+ fi
+ fi
+ fi
+ done
+}
+if [ $# != 1 ]; then
+ echo "usage: ./startall.sh [Configuration path]"
+ exit 1
+fi
+#读取第一个参数 为配置文件目录名称
+read_dir $1
+rm -rf $JAR_NAME
+
diff --git a/MSH-PIC/flink/topology/completion/stop.sh b/MSH-PIC/flink/topology/completion/stop.sh
new file mode 100644
index 0000000..24e1a83
--- /dev/null
+++ b/MSH-PIC/flink/topology/completion/stop.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+
+#flink任务停止脚本
+source /etc/profile
+#加参数 per-job 或 session
+TASK_MODE="per-job"
+SESSION_CLUSTER="Flink session cluster"
+
+APPLICATION_ID=$(yarn application -list | grep "$SESSION_CLUSTER" | awk '{print $1}')
+
+function read_dir() {
+ for file in $(ls $1); do
+ if [ -d $1"/"$file ]; then
+ read_dir $1"/"$file
+ else
+ if [[ $TASK_MODE == "per-job" ]]; then
+ appid=$(yarn application -list | grep "$file" | awk '{print $1}')
+ yarn application -kill $appid
+ echo -e "\033[32mcancel $file\033[0m"
+
+ elif [[ -n $APPLICATION_ID && $TASK_MODE == "session" ]]; then
+ jobid=$(flink list | grep -v flink | grep "$file" | awk '{print $4}')
+ flink cancel $jobid
+ echo -e "\033[32mcancel $file\033[0m"
+ fi
+
+ fi
+
+ done
+}
+
+#读取第一个参数 为配置文件目录名
+read_dir $1
+
diff --git a/MSH-PIC/flink/topology/data/asn_v4.mmdb b/MSH-PIC/flink/topology/data/asn_v4.mmdb
new file mode 100644
index 0000000..63df444
--- /dev/null
+++ b/MSH-PIC/flink/topology/data/asn_v4.mmdb
Binary files differ
diff --git a/MSH-PIC/flink/topology/data/asn_v6.mmdb b/MSH-PIC/flink/topology/data/asn_v6.mmdb
new file mode 100644
index 0000000..25cff33
--- /dev/null
+++ b/MSH-PIC/flink/topology/data/asn_v6.mmdb
Binary files differ
diff --git a/MSH-PIC/flink/topology/data/ip_v4_built_in.mmdb b/MSH-PIC/flink/topology/data/ip_v4_built_in.mmdb
new file mode 100644
index 0000000..7210af4
--- /dev/null
+++ b/MSH-PIC/flink/topology/data/ip_v4_built_in.mmdb
Binary files differ
diff --git a/MSH-PIC/flink/topology/data/ip_v4_user_defined.mmdb b/MSH-PIC/flink/topology/data/ip_v4_user_defined.mmdb
new file mode 100644
index 0000000..9853019
--- /dev/null
+++ b/MSH-PIC/flink/topology/data/ip_v4_user_defined.mmdb
Binary files differ
diff --git a/MSH-PIC/flink/topology/data/ip_v6_built_in.mmdb b/MSH-PIC/flink/topology/data/ip_v6_built_in.mmdb
new file mode 100644
index 0000000..35d1d32
--- /dev/null
+++ b/MSH-PIC/flink/topology/data/ip_v6_built_in.mmdb
Binary files differ
diff --git a/MSH-PIC/flink/topology/data/ip_v6_user_defined.mmdb b/MSH-PIC/flink/topology/data/ip_v6_user_defined.mmdb
new file mode 100644
index 0000000..5047903
--- /dev/null
+++ b/MSH-PIC/flink/topology/data/ip_v6_user_defined.mmdb
Binary files differ
diff --git a/MSH-PIC/flink/topology/data/keystore.jks b/MSH-PIC/flink/topology/data/keystore.jks
new file mode 100644
index 0000000..2e2328b
--- /dev/null
+++ b/MSH-PIC/flink/topology/data/keystore.jks
Binary files differ
diff --git a/MSH-PIC/flink/topology/data/truststore.jks b/MSH-PIC/flink/topology/data/truststore.jks
new file mode 100644
index 0000000..b435e09
--- /dev/null
+++ b/MSH-PIC/flink/topology/data/truststore.jks
Binary files differ
diff --git a/MSH-PIC/flink/topology/relationship-gtpc-user/config/RELATIONSHIP-GTPC-USER b/MSH-PIC/flink/topology/relationship-gtpc-user/config/RELATIONSHIP-GTPC-USER
new file mode 100644
index 0000000..82d314d
--- /dev/null
+++ b/MSH-PIC/flink/topology/relationship-gtpc-user/config/RELATIONSHIP-GTPC-USER
@@ -0,0 +1,33 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+input.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#hbase zookeeper地址 用于连接HBase
+hbase.zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+hbase.scan.limit=100000
+
+hbase.rpc.timeout=60000
+
+cache.expire.seconds=86400
+
+cache.max.size=100000
+
+cache.update.seconds=3600
+#--------------------------------Kafka消费组信息------------------------------#
+
+#kafka 接收数据topic
+input.kafka.topic=GTPC-RECORD-COMPLETED
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=relationship-gtpc-user-20220830-1
+
+#--------------------------------topology配置------------------------------#
+#ip-account对应关系表
+relation.user.teid.table.name=tsg_galaxy:relation_user_teid
+
+#定位库地址
+tools.library=/home/tsg/olap/topology/data/
+
+#account-ip对应关系表
+gtpc.knowledge.base.table.name=tsg_galaxy:gtpc_knowledge_base
diff --git a/MSH-PIC/flink/topology/relationship-gtpc-user/service_flow_config.properties b/MSH-PIC/flink/topology/relationship-gtpc-user/service_flow_config.properties
new file mode 100644
index 0000000..b72fac4
--- /dev/null
+++ b/MSH-PIC/flink/topology/relationship-gtpc-user/service_flow_config.properties
@@ -0,0 +1,33 @@
+#--------------------------------地址配置------------------------------#
+#管理kafka地址
+input.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#hbase zookeeper地址 用于连接HBase
+hbase.zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+hbase.scan.limit=100000
+
+hbase.rpc.timeout=60000
+
+cache.expire.seconds=86400
+
+cache.max.size=100000
+
+cache.update.seconds=3600
+#--------------------------------Kafka消费组信息------------------------------#
+
+#kafka 接收数据topic
+input.kafka.topic=GTPC-RECORD-COMPLETED
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=relationship-gtpc-user-20220830-1
+
+#--------------------------------topology配置------------------------------#
+#ip-account对应关系表
+relation.user.teid.table.name=tsg_galaxy:relation_user_teid
+
+#定位库地址
+tools.library=/home/tsg/olap/topology/dat/
+
+#account-ip对应关系表
+gtpc.knowledge.base.table.name=tsg_galaxy:gtpc_knowledge_base
diff --git a/MSH-PIC/flink/topology/relationship-gtpc-user/start.sh b/MSH-PIC/flink/topology/relationship-gtpc-user/start.sh
new file mode 100644
index 0000000..9ea8fd5
--- /dev/null
+++ b/MSH-PIC/flink/topology/relationship-gtpc-user/start.sh
@@ -0,0 +1,67 @@
+#!/bin/bash
+#启动storm任务脚本
+source /etc/profile
+#######################参数配置####################################
+#yarn任务运行模式 per-job 或 per-job
+TASK_MODE="per-job"
+#更新jar的名字
+#PRIMORDIAL
+PRIMORDIAL='relationship-gtpc-user-23-06-02.jar'
+#jar name
+JAR_NAME='relationship-gtpc-user_tmp.jar'
+
+SESSION_CLUSTER="Flink per-job cluster"
+MAIN_CLASS=""
+CONFIG_NAME="service_flow_config.properties"
+JOBMANAGER_MEMORY="1024m"
+TASKMANAGER_MEMORY="3072m"
+TASK_SLOTS=3
+CLASS_LOADER='child-first'
+#######################参数配置####################################
+
+#任务jar所在目录
+BASE_DIR=$(pwd)
+APPLICATION_ID=$(yarn application -list | grep "$SESSION_CLUSTER" | awk '{print $1}')
+
+yes | cp -r $PRIMORDIAL $JAR_NAME
+
+#cd $BASE_DIR
+jar -xvf $BASE_DIR/$JAR_NAME $CONFIG_NAME
+function read_dir() {
+ for file in $(ls $1); do
+ if [ -d $1"/"$file ]; then
+ read_dir $1"/"$file
+ else
+ #perl job
+ if [[ $TASK_MODE == "per-job" ]]; then
+ num=$(yarn application -list | grep $file | wc -l)
+ if [ $num -eq "0" ]; then
+ cat $1$file >$BASE_DIR/$CONFIG_NAME
+ jar -uvf $BASE_DIR/$JAR_NAME $CONFIG_NAME
+
+ flink run -t yarn-per-job -Djobmanager.memory.process.size=$JOBMANAGER_MEMORY -Dtaskmanager.memory.process.size=$TASKMANAGER_MEMORY -Dyarn.application.name=$file -Dtaskmanager.numberOfTaskSlots=$TASK_SLOTS -p 3 -d $BASE_DIR/$JAR_NAME $file
+ sleep 10
+ fi
+ elif [[ -n $APPLICATION_ID && $TASK_MODE == "per-job" ]]; then
+ num=$(flink list | grep "$file" | grep -v flink | wc -l)
+ if [ $num -eq "0" ]; then
+ cat $1$file >$BASE_DIR/$CONFIG_NAME
+ jar -uvf $BASE_DIR/$JAR_NAME $CONFIG_NAME
+ #per-job
+ flink run -t yarn-per-job -Dyarn.application.id=$APPLICATION_ID -Dclassloader.resolve-order=$CLASS_LOADER -d $BASE_DIR/$JAR_NAME $file
+ sleep 10
+ fi
+ fi
+
+ fi
+ done
+}
+if [ $# != 1 ]; then
+ echo "usage: ./startall.sh [Configuration path]"
+ exit 1
+fi
+#读取第一个参数 为配置文件目录名称
+read_dir $1
+
+rm -rf $JAR_NAME
+
diff --git a/MSH-PIC/flink/topology/relationship-gtpc-user/stop.sh b/MSH-PIC/flink/topology/relationship-gtpc-user/stop.sh
new file mode 100644
index 0000000..3657871
--- /dev/null
+++ b/MSH-PIC/flink/topology/relationship-gtpc-user/stop.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+
+#flink任务停止脚本
+source /etc/profile
+#加参数 per-job 或 per-job
+TASK_MODE="per-job"
+SESSION_CLUSTER="Flink per-job cluster"
+
+APPLICATION_ID=$(yarn application -list | grep "$SESSION_CLUSTER" | awk '{print $1}')
+
+function read_dir() {
+ for file in $(ls $1); do
+ if [ -d $1"/"$file ]; then
+ read_dir $1"/"$file
+ else
+ if [[ $TASK_MODE == "per-job" ]]; then
+ appid=$(yarn application -list | grep "$file" | awk '{print $1}')
+ yarn application -kill $appid
+ echo -e "\033[32mcancel $file\033[0m"
+
+ elif [[ -n $APPLICATION_ID && $TASK_MODE == "per-job" ]]; then
+ jobid=$(flink list | grep -v flink | grep "$file" | awk '{print $4}')
+ flink cancel $jobid
+ echo -e "\033[32mcancel $file\033[0m"
+ fi
+
+ fi
+
+ done
+}
+
+#读取第一个参数 为配置文件目录名
+read_dir $1
+
diff --git a/MSH-PIC/flink/topology/relationship-radius-account/config/RELATIONSHIP-RADIUS-ACCOUNT b/MSH-PIC/flink/topology/relationship-radius-account/config/RELATIONSHIP-RADIUS-ACCOUNT
new file mode 100644
index 0000000..9dccde7
--- /dev/null
+++ b/MSH-PIC/flink/topology/relationship-radius-account/config/RELATIONSHIP-RADIUS-ACCOUNT
@@ -0,0 +1,28 @@
+#管理kafka地址
+input.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#hbase zookeeper地址 用于连接HBase
+hbase.zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#--------------------------------Kafka消费组信息------------------------------#
+
+#kafka 接收数据topic
+input.kafka.topic=RADIUS-RECORD
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=account-framedip-hbase-20211113-1
+
+#--------------------------------topology配置------------------------------#
+#ip-account对应关系表
+hbase.framedip.table.name=tsg_galaxy:relation_framedip_account
+
+#定位库地址
+tools.library=/home/tsg/olap/topology/data/
+
+#account-ip对应关系表
+hbase.account.table.name=tsg_galaxy:relation_account_framedip
+
+hbase.rpc.timeout=60000
+
+hbase.scan.limit=100000
+
diff --git a/MSH-PIC/flink/topology/relationship-radius-account/service_flow_config.properties b/MSH-PIC/flink/topology/relationship-radius-account/service_flow_config.properties
new file mode 100644
index 0000000..ccc7ad7
--- /dev/null
+++ b/MSH-PIC/flink/topology/relationship-radius-account/service_flow_config.properties
@@ -0,0 +1,28 @@
+#管理kafka地址
+input.kafka.servers=192.168.20.193:9094,192.168.20.194:9094,192.168.20.195:9094
+
+#hbase zookeeper地址 用于连接HBase
+hbase.zookeeper.servers=192.168.20.193:2181,192.168.20.194:2181,192.168.20.195:2181
+
+#--------------------------------Kafka消费组信息------------------------------#
+
+#kafka 接收数据topic
+input.kafka.topic=RADIUS-RECORD
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+group.id=account-framedip-hbase-20211113-1
+
+#--------------------------------topology配置------------------------------#
+#ip-account对应关系表
+hbase.framedip.table.name=tsg_galaxy:relation_framedip_account
+
+#定位库地址
+tools.library=/home/tsg/olap/topology/dat/
+
+#account-ip对应关系表
+hbase.account.table.name=tsg_galaxy:relation_account_framedip
+
+hbase.rpc.timeout=60000
+
+hbase.scan.limit=100000
+
diff --git a/MSH-PIC/flink/topology/relationship-radius-account/start.sh b/MSH-PIC/flink/topology/relationship-radius-account/start.sh
new file mode 100644
index 0000000..00eee48
--- /dev/null
+++ b/MSH-PIC/flink/topology/relationship-radius-account/start.sh
@@ -0,0 +1,67 @@
+#!/bin/bash
+#启动storm任务脚本
+source /etc/profile
+#######################参数配置####################################
+#yarn任务运行模式 per-job 或 per-job
+TASK_MODE="per-job"
+#更新jar的名字
+#PRIMORDIAL
+PRIMORDIAL='radius-relation-23-06-02.jar'
+#jar name
+JAR_NAME='radius-relation_tmp.jar'
+
+SESSION_CLUSTER="Flink per-job cluster"
+MAIN_CLASS=""
+CONFIG_NAME="service_flow_config.properties"
+JOBMANAGER_MEMORY="1024m"
+TASKMANAGER_MEMORY="3072m"
+TASK_SLOTS=3
+CLASS_LOADER='child-first'
+#######################参数配置####################################
+
+#任务jar所在目录
+BASE_DIR=$(pwd)
+APPLICATION_ID=$(yarn application -list | grep "$SESSION_CLUSTER" | awk '{print $1}')
+
+yes | cp -r $PRIMORDIAL $JAR_NAME
+
+#cd $BASE_DIR
+jar -xvf $BASE_DIR/$JAR_NAME $CONFIG_NAME
+function read_dir() {
+ for file in $(ls $1); do
+ if [ -d $1"/"$file ]; then
+ read_dir $1"/"$file
+ else
+ #perl job
+ if [[ $TASK_MODE == "per-job" ]]; then
+ num=$(yarn application -list | grep $file | wc -l)
+ if [ $num -eq "0" ]; then
+ cat $1$file >$BASE_DIR/$CONFIG_NAME
+ jar -uvf $BASE_DIR/$JAR_NAME $CONFIG_NAME
+
+ flink run -t yarn-per-job -Djobmanager.memory.process.size=$JOBMANAGER_MEMORY -Dtaskmanager.memory.process.size=$TASKMANAGER_MEMORY -Dyarn.application.name=$file -Dtaskmanager.numberOfTaskSlots=$TASK_SLOTS -p 3 -d $BASE_DIR/$JAR_NAME $file
+ sleep 10
+ fi
+ elif [[ -n $APPLICATION_ID && $TASK_MODE == "per-job" ]]; then
+ num=$(flink list | grep "$file" | grep -v flink | wc -l)
+ if [ $num -eq "0" ]; then
+ cat $1$file >$BASE_DIR/$CONFIG_NAME
+ jar -uvf $BASE_DIR/$JAR_NAME $CONFIG_NAME
+ #per-job
+ flink run -t yarn-per-job -Dyarn.application.id=$APPLICATION_ID -Dclassloader.resolve-order=$CLASS_LOADER -d $BASE_DIR/$JAR_NAME $file
+ sleep 10
+ fi
+ fi
+
+ fi
+ done
+}
+if [ $# != 1 ]; then
+ echo "usage: ./startall.sh [Configuration path]"
+ exit 1
+fi
+#读取第一个参数 为配置文件目录名称
+read_dir $1
+
+rm -rf $JAR_NAME
+
diff --git a/MSH-PIC/flink/topology/relationship-radius-account/stop.sh b/MSH-PIC/flink/topology/relationship-radius-account/stop.sh
new file mode 100644
index 0000000..3657871
--- /dev/null
+++ b/MSH-PIC/flink/topology/relationship-radius-account/stop.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+
+#flink任务停止脚本
+source /etc/profile
+#加参数 per-job 或 per-job
+TASK_MODE="per-job"
+SESSION_CLUSTER="Flink per-job cluster"
+
+APPLICATION_ID=$(yarn application -list | grep "$SESSION_CLUSTER" | awk '{print $1}')
+
+function read_dir() {
+ for file in $(ls $1); do
+ if [ -d $1"/"$file ]; then
+ read_dir $1"/"$file
+ else
+ if [[ $TASK_MODE == "per-job" ]]; then
+ appid=$(yarn application -list | grep "$file" | awk '{print $1}')
+ yarn application -kill $appid
+ echo -e "\033[32mcancel $file\033[0m"
+
+ elif [[ -n $APPLICATION_ID && $TASK_MODE == "per-job" ]]; then
+ jobid=$(flink list | grep -v flink | grep "$file" | awk '{print $4}')
+ flink cancel $jobid
+ echo -e "\033[32mcancel $file\033[0m"
+ fi
+
+ fi
+
+ done
+}
+
+#读取第一个参数 为配置文件目录名
+read_dir $1
+