summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwangchengcheng <[email protected]>2023-04-25 17:41:15 +0800
committerwangchengcheng <[email protected]>2023-04-25 17:41:15 +0800
commit2398dc61bbff63ca7611a0f1cfc3d5f0c063f37c (patch)
tree77a350adf5139bf988b4e2bad9fa90eca5f6fa80
first commit
-rw-r--r--pom.xml231
-rw-r--r--properties/default_config.properties45
-rw-r--r--properties/service_flow_config.properties58
-rw-r--r--src/main/java/com/zdjizhi/common/Fileds.java235
-rw-r--r--src/main/java/com/zdjizhi/common/FlowWriteConfig.java100
-rw-r--r--src/main/java/com/zdjizhi/common/InterimSessionRecord.java192
-rw-r--r--src/main/java/com/zdjizhi/common/Metrics.java45
-rw-r--r--src/main/java/com/zdjizhi/common/SessionRecord.java316
-rw-r--r--src/main/java/com/zdjizhi/common/Tags.java68
-rw-r--r--src/main/java/com/zdjizhi/tools/function/CountWindowProcess.java147
-rw-r--r--src/main/java/com/zdjizhi/tools/function/KeyByFunction.java13
-rw-r--r--src/main/java/com/zdjizhi/tools/function/ParseInterimSessionFunction.java50
-rw-r--r--src/main/java/com/zdjizhi/tools/function/ParseSessionFunction.java51
-rw-r--r--src/main/java/com/zdjizhi/tools/kafka/CertUtils.java48
-rw-r--r--src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java49
-rw-r--r--src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java50
-rw-r--r--src/main/java/com/zdjizhi/tools/system/FlowWriteConfigurations.java69
-rw-r--r--src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java75
-rw-r--r--src/main/log4j.properties25
-rw-r--r--src/main/logback.xml42
-rw-r--r--src/test/java/com/zdjizhi/tools/DatasketchesTest.java253
-rw-r--r--src/test/java/com/zdjizhi/tools/FlagsTest.java48
-rw-r--r--src/test/java/com/zdjizhi/tools/TestJsonPath.java89
-rw-r--r--src/test/java/com/zdjizhi/tools/TestMap.java22
24 files changed, 2321 insertions, 0 deletions
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..3a48680
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,231 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>app-protocol-stat-traffic-agent</artifactId>
+ <version>20230425-test</version>
+
+ <name>app-protocol-stat-traffic-agent</name>
+ <url>http://www.example.com</url>
+
+ <repositories>
+ <repository>
+ <id>nexus</id>
+ <name>Team Nexus Repository</name>
+ <url>http://192.168.40.125:8099/content/groups/public</url>
+ </repository>
+
+ <repository>
+ <id>maven-ali</id>
+ <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ <checksumPolicy>fail</checksumPolicy>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <flink.version>1.13.1</flink.version>
+ <hadoop.version>2.7.1</hadoop.version>
+ <kafka.version>1.0.0</kafka.version>
+ <hbase.version>2.2.3</hbase.version>
+ <nacos.version>1.2.0</nacos.version>
+ <zdjz.tools.version>1.0.8</zdjz.tools.version>
+ <scope.type>provided</scope.type>
+<!-- <scope.type>compile</scope.type>-->
+ </properties>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.2</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>com.zdjizhi.topology.LogFlowWriteTopology</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>io.github.zlika</groupId>
+ <artifactId>reproducible-build-maven-plugin</artifactId>
+ <version>0.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>strip-jar</goal>
+ </goals>
+ <phase>package</phase>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>properties</directory>
+ <includes>
+ <include>**/*.properties</include>
+ <include>**/*.xml</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+
+ <resource>
+ <directory>src\main\java</directory>
+ <includes>
+ <include>log4j.properties</include>
+ </includes>
+ <filtering>false</filtering>
+ </resource>
+ </resources>
+ </build>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>galaxy</artifactId>
+ <version>${zdjz.tools.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope.type}</scope>
+ </dependency>
+
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope.type}</scope>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope.type}</scope>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka_2.12</artifactId>
+ <version>${flink.version}</version>
+ <!--<scope>${scope.type}</scope>-->
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>${scope.type}</scope>
+ </dependency>
+
+
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <version>5.3.2</version>
+ <scope>compile</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ <version>2.4.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>cn.hutool</groupId>
+ <artifactId>hutool-all</artifactId>
+ <version>5.7.17</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/com.alibaba.nacos/nacos-client -->
+ <dependency>
+ <groupId>com.alibaba.nacos</groupId>
+ <artifactId>nacos-client</artifactId>
+ <version>${nacos.version}</version>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.jasypt/jasypt -->
+ <dependency>
+ <groupId>org.jasypt</groupId>
+ <artifactId>jasypt</artifactId>
+ <version>1.9.3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.datasketches</groupId>
+ <artifactId>datasketches-java</artifactId>
+ <version>3.2.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>2.0.26</version>
+ </dependency>
+
+ </dependencies>
+</project>
+
diff --git a/properties/default_config.properties b/properties/default_config.properties
new file mode 100644
index 0000000..976e210
--- /dev/null
+++ b/properties/default_config.properties
@@ -0,0 +1,45 @@
+#====================Kafka KafkaConsumer====================#
+#kafka source connection timeout
+session.timeout.ms=60000
+
+#kafka source poll
+max.poll.records=5000
+
+#kafka source poll bytes
+max.partition.fetch.bytes=31457280
+#====================Kafka KafkaProducer====================#
+#producer重试的次数设置
+retries=0
+
+#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了
+linger.ms=10
+
+#如果在超时之前未收到响应,客户端将在必要时重新发送请求
+request.timeout.ms=30000
+
+#producer都是按照batch进行发送的,批次大小,默认:16384
+batch.size=262144
+
+#Producer端用于缓存消息的缓冲区大小
+#128M
+buffer.memory=134217728
+
+#这个参数决定了每次发送给Kafka服务器请求的最大大小
+#default: 10485760 = 10M
+max.request.size=10485760
+#====================kafka default====================#
+#kafka SASL/SSL username (encryption)
+kafka.user=nsyGpHKGFA4KW0zro9MDdw==
+
+#kafka SASL/SSL pin (encryption)
+kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
+
+#producer ack
+producer.ack=1
+
+#两个输出之间的最大时间(单位milliseconds)
+buffer.timeout=100
+
+
+
+random.range.num=20
diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties
new file mode 100644
index 0000000..3208551
--- /dev/null
+++ b/properties/service_flow_config.properties
@@ -0,0 +1,58 @@
+#--------------------------------地址配置------------------------------#
+
+#管理kafka地址
+source.kafka.servers=192.168.44.12:9094
+
+#管理输出kafka地址
+sink.kafka.servers=192.168.44.12:9094
+
+tools.library=D:\\workerspace\\dat\\
+#--------------------------------Kafka消费配置(session)------------------------------#
+
+#已关闭会话
+session.source.kafka.topic=SESSION-RECORD
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+session.group.id=session-record-app-protocol-stat-traffic-agent-2
+
+#consumer 并行度
+session.source.parallelism=1
+
+
+#--------------------------------Kafka消费配置(interim Session)------------------------------#
+
+#过渡会话
+interim.session.source.kafka.topic=SESSION-RECORD
+
+
+#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
+interim.session.group.id=interim-session-record-app-protocol-stat-traffic-agent-1
+
+#consumer 并行度
+interim.session.source.parallelism=1
+
+
+#--------------------------------窗口配置------------------------------#
+#计算窗口大小,单位为秒
+count.window.seconds=1
+
+#计算窗口的并行度
+count.window.parallelism=1
+
+#--------------------------------KafkaSink配置------------------------------#
+#输出kafka的topic名
+sink.kafka.topic=test-1
+
+
+
+
+
+
+#输出kafka的压缩模式
+producer.kafka.compression.type=snappy
+
+#输出kafka的并行度
+sink.kafka.parallelism=1
+
+#--------------------------------Metrics配置------------------------------#
+metrics.name=traffic_application_protocol_stat
diff --git a/src/main/java/com/zdjizhi/common/Fileds.java b/src/main/java/com/zdjizhi/common/Fileds.java
new file mode 100644
index 0000000..ed290d5
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/Fileds.java
@@ -0,0 +1,235 @@
+package com.zdjizhi.common;
+
+
+
+public class Fileds {
+ private long sessions;
+ private long in_bytes;
+ private long out_bytes;
+ private long in_pkts;
+ private long out_pkts;
+ private long c2s_pkts;
+ private long s2c_pkts;
+ private long c2s_bytes;
+ private long s2c_bytes;
+ private long c2s_fragments;
+ private long s2c_fragments;
+ private long c2s_tcp_lost_bytes;
+ private long s2c_tcp_lost_bytes;
+ private long c2s_tcp_ooorder_pkts;
+ private long s2c_tcp_ooorder_pkts;
+ private long c2s_tcp_retransmitted_pkts;
+ private long s2c_tcp_retransmitted_pkts;
+ private long c2s_tcp_retransmitted_bytes;
+ private long s2c_tcp_retransmitted_bytes;
+ private String client_ip_sketch;
+
+ public Fileds(long sessions, long in_bytes, long out_bytes, long in_pkts, long out_pkts, long c2s_pkts, long s2c_pkts, long c2s_bytes, long s2c_bytes, long c2s_fragments, long s2c_fragments, long c2s_tcp_lost_bytes, long s2c_tcp_lost_bytes, long c2s_tcp_ooorder_pkts, long s2c_tcp_ooorder_pkts, long c2s_tcp_retransmitted_pkts, long s2c_tcp_retransmitted_pkts, long c2s_tcp_retransmitted_bytes, long s2c_tcp_retransmitted_bytes,String client_ip_sketch) {
+ this.sessions = sessions;
+ this.in_bytes = in_bytes;
+ this.out_bytes = out_bytes;
+ this.in_pkts = in_pkts;
+ this.out_pkts = out_pkts;
+ this.c2s_pkts = c2s_pkts;
+ this.s2c_pkts = s2c_pkts;
+ this.c2s_bytes = c2s_bytes;
+ this.s2c_bytes = s2c_bytes;
+ this.c2s_fragments = c2s_fragments;
+ this.s2c_fragments = s2c_fragments;
+ this.c2s_tcp_lost_bytes = c2s_tcp_lost_bytes;
+ this.s2c_tcp_lost_bytes = s2c_tcp_lost_bytes;
+ this.c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts;
+ this.s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts;
+ this.c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts;
+ this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts;
+ this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes;
+ this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes;
+ this.client_ip_sketch = client_ip_sketch;
+ }
+
+ public long getSessions() {
+ return sessions;
+ }
+
+ public void setSessions(long sessions) {
+ this.sessions = sessions;
+ }
+
+ public long getIn_bytes() {
+ return in_bytes;
+ }
+
+ public void setIn_bytes(long in_bytes) {
+ this.in_bytes = in_bytes;
+ }
+
+ public long getOut_bytes() {
+ return out_bytes;
+ }
+
+ public void setOut_bytes(long out_bytes) {
+ this.out_bytes = out_bytes;
+ }
+
+ public long getIn_pkts() {
+ return in_pkts;
+ }
+
+ public void setIn_pkts(long in_pkts) {
+ this.in_pkts = in_pkts;
+ }
+
+ public long getOut_pkts() {
+ return out_pkts;
+ }
+
+ public void setOut_pkts(long out_pkts) {
+ this.out_pkts = out_pkts;
+ }
+
+ public long getC2s_pkts() {
+ return c2s_pkts;
+ }
+
+ public void setC2s_pkts(long c2s_pkts) {
+ this.c2s_pkts = c2s_pkts;
+ }
+
+ public long getS2c_pkts() {
+ return s2c_pkts;
+ }
+
+ public void setS2c_pkts(long s2c_pkts) {
+ this.s2c_pkts = s2c_pkts;
+ }
+
+ public long getC2s_bytes() {
+ return c2s_bytes;
+ }
+
+ public void setC2s_bytes(long c2s_bytes) {
+ this.c2s_bytes = c2s_bytes;
+ }
+
+ public long getS2c_bytes() {
+ return s2c_bytes;
+ }
+
+ public void setS2c_bytes(long s2c_bytes) {
+ this.s2c_bytes = s2c_bytes;
+ }
+
+ public long getC2s_fragments() {
+ return c2s_fragments;
+ }
+
+ public void setC2s_fragments(long c2s_fragments) {
+ this.c2s_fragments = c2s_fragments;
+ }
+
+ public long getS2c_fragments() {
+ return s2c_fragments;
+ }
+
+ public void setS2c_fragments(long s2c_fragments) {
+ this.s2c_fragments = s2c_fragments;
+ }
+
+ public long getC2s_tcp_lost_bytes() {
+ return c2s_tcp_lost_bytes;
+ }
+
+ public void setC2s_tcp_lost_bytes(long c2s_tcp_lost_bytes) {
+ this.c2s_tcp_lost_bytes = c2s_tcp_lost_bytes;
+ }
+
+ public long getS2c_tcp_lost_bytes() {
+ return s2c_tcp_lost_bytes;
+ }
+
+ public void setS2c_tcp_lost_bytes(long s2c_tcp_lost_bytes) {
+ this.s2c_tcp_lost_bytes = s2c_tcp_lost_bytes;
+ }
+
+ public long getC2s_tcp_ooorder_pkts() {
+ return c2s_tcp_ooorder_pkts;
+ }
+
+ public void setC2s_tcp_ooorder_pkts(long c2s_tcp_ooorder_pkts) {
+ this.c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts;
+ }
+
+ public long getS2c_tcp_ooorder_pkts() {
+ return s2c_tcp_ooorder_pkts;
+ }
+
+ public void setS2c_tcp_ooorder_pkts(long s2c_tcp_ooorder_pkts) {
+ this.s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts;
+ }
+
+ public long getC2s_tcp_retransmitted_pkts() {
+ return c2s_tcp_retransmitted_pkts;
+ }
+
+ public void setC2s_tcp_retransmitted_pkts(long c2s_tcp_retransmitted_pkts) {
+ this.c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts;
+ }
+
+ public long getS2c_tcp_retransmitted_pkts() {
+ return s2c_tcp_retransmitted_pkts;
+ }
+
+ public void setS2c_tcp_retransmitted_pkts(long s2c_tcp_retransmitted_pkts) {
+ this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts;
+ }
+
+ public long getC2s_tcp_retransmitted_bytes() {
+ return c2s_tcp_retransmitted_bytes;
+ }
+
+ public void setC2s_tcp_retransmitted_bytes(long c2s_tcp_retransmitted_bytes) {
+ this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes;
+ }
+
+ public long getS2c_tcp_retransmitted_bytes() {
+ return s2c_tcp_retransmitted_bytes;
+ }
+
+ public void setS2c_tcp_retransmitted_bytes(long s2c_tcp_retransmitted_bytes) {
+ this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes;
+ }
+
+ public String getClient_ip_sketch() {
+ return client_ip_sketch;
+ }
+
+ public void setClient_ip_sketch(String client_ip_sketch) {
+ this.client_ip_sketch = client_ip_sketch;
+ }
+
+ @Override
+ public String toString() {
+ return "Fileds{" +
+ "sessions=" + sessions +
+ ", in_bytes=" + in_bytes +
+ ", out_bytes=" + out_bytes +
+ ", in_pkts=" + in_pkts +
+ ", out_pkts=" + out_pkts +
+ ", c2s_pkts=" + c2s_pkts +
+ ", s2c_pkts=" + s2c_pkts +
+ ", c2s_bytes=" + c2s_bytes +
+ ", s2c_bytes=" + s2c_bytes +
+ ", c2s_fragments=" + c2s_fragments +
+ ", s2c_fragments=" + s2c_fragments +
+ ", c2s_tcp_lost_bytes=" + c2s_tcp_lost_bytes +
+ ", s2c_tcp_lost_bytes=" + s2c_tcp_lost_bytes +
+ ", c2s_tcp_ooorder_pkts=" + c2s_tcp_ooorder_pkts +
+ ", s2c_tcp_ooorder_pkts=" + s2c_tcp_ooorder_pkts +
+ ", c2s_tcp_retransmitted_pkts=" + c2s_tcp_retransmitted_pkts +
+ ", s2c_tcp_retransmitted_pkts=" + s2c_tcp_retransmitted_pkts +
+ ", c2s_tcp_retransmitted_bytes=" + c2s_tcp_retransmitted_bytes +
+ ", s2c_tcp_retransmitted_bytes=" + s2c_tcp_retransmitted_bytes +
+ ", client_ip_sketch='" + client_ip_sketch + '\'' +
+ '}';
+ }
+}
diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
new file mode 100644
index 0000000..5889110
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java
@@ -0,0 +1,100 @@
+package com.zdjizhi.common;
+
+
+import com.zdjizhi.tools.system.FlowWriteConfigurations;
+import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
+
+/**
+ * @author Administrator
+ */
+public class FlowWriteConfig {
+
+ private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
+
+ static {
+ encryptor.setPassword("galaxy");
+ }
+ /**
+ * kafka common
+ */
+ public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user"));
+ public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.pin"));
+
+
+ /**
+ * consumer session-record config
+ */
+ public static final String SESSION_SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "session.source.kafka.topic");
+ public static final String SESSION_GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "session.group.id");
+ public static final Integer SESSION_SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "session.source.parallelism");
+
+
+ /**
+ * consumer interim-session-record config
+ */
+ public static final String INTERIM_SESSION_SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "interim.session.source.kafka.topic");
+ public static final String INTERIM_SESSION_GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "interim.session.group.id");
+ public static final Integer INTERIM_SESSION_SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "interim.session.source.parallelism");
+
+
+
+
+
+
+
+ /**
+ * kafka source config
+ */
+ public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms");
+ public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records");
+ public static final String MAX_PARTITION_FETCH_BYTES = FlowWriteConfigurations.getStringProperty(1, "max.partition.fetch.bytes");
+
+ /**
+ * kafka sink config
+ */
+
+ public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(1, "producer.ack");
+ public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type");
+ public static final String SINK_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0,"sink.kafka.topic");
+ public static final Integer SINK_KAFKA_PARALLELISM = FlowWriteConfigurations.getIntProperty(0,"sink.kafka.parallelism");
+
+
+
+ /**
+ * connection kafka
+ */
+ public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries");
+ public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms");
+ public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms");
+ public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size");
+ public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory");
+ public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size");
+
+ /**
+ * common config
+ */
+ public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers");
+ public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers");
+ public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library");
+
+
+ /**
+ * window config
+ */
+ public static final Integer COUNT_WINDOW_SECONDS = FlowWriteConfigurations.getIntProperty(0, "count.window.seconds");
+ public static final Integer COUNT_WINDOW_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "count.window.parallelism");
+
+ /**
+ * metrics config
+ */
+
+ public static final String METRICS_NAME = FlowWriteConfigurations.getStringProperty(0, "metrics.name");
+
+ public static final Integer RANDOM_RANGE_NUM = FlowWriteConfigurations.getIntProperty(1, "random.range.num");
+
+
+
+
+ public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout");
+
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/common/InterimSessionRecord.java b/src/main/java/com/zdjizhi/common/InterimSessionRecord.java
new file mode 100644
index 0000000..efbcea4
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/InterimSessionRecord.java
@@ -0,0 +1,192 @@
+package com.zdjizhi.common;
+
+import com.alibaba.fastjson2.JSON;
+import com.jayway.jsonpath.JsonPath;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class InterimSessionRecord {
+ private long common_vsys_id;
+ private String common_device_id;
+ private String common_device_tag;
+ private String common_protocol_label;
+ private String common_app_full_path;
+
+ private String device_group;
+ private String data_center;
+
+ private long common_sessions;
+ private long common_flags;
+ private String common_l7_protocol;
+
+
+ private long common_c2s_pkt_diff;
+ private long common_s2c_pkt_diff;
+ private long common_c2s_byte_diff;
+ private long common_s2c_byte_diff;
+ private String common_client_ip;
+
+ public String getCommon_l7_protocol() {
+ return common_l7_protocol;
+ }
+
+ public void setCommon_l7_protocol(String common_l7_protocol) {
+ this.common_l7_protocol = common_l7_protocol;
+ }
+
+ public long getCommon_vsys_id() {
+ return common_vsys_id;
+ }
+
+ public void setCommon_vsys_id(long common_vsys_id) {
+ this.common_vsys_id = common_vsys_id;
+ }
+
+ public String getCommon_device_id() {
+ return common_device_id;
+ }
+
+ public void setCommon_device_id(String common_device_id) {
+ this.common_device_id = common_device_id;
+ }
+
+ public String getCommon_device_tag() {
+ return common_device_tag;
+ }
+
+ public void setCommon_device_tag(String common_device_tag) {
+ this.common_device_tag = common_device_tag;
+ }
+
+ public String getCommon_protocol_label() {
+ return common_protocol_label;
+ }
+
+ public void setCommon_protocol_label(String common_protocol_label) {
+ this.common_protocol_label = common_protocol_label;
+ }
+
+ public String getCommon_app_full_path() {
+ return common_app_full_path;
+ }
+
+ public void setCommon_app_full_path(String common_app_full_path) {
+ this.common_app_full_path = common_app_full_path;
+ }
+
+ public String getDevice_group() {
+ return device_group;
+ }
+
+ public void setDevice_group(String device_group) {
+ this.device_group = device_group;
+ }
+
+ public String getData_center() {
+ return data_center;
+ }
+
+ public void setData_center(String data_center) {
+ this.data_center = data_center;
+ }
+
+ public long getCommon_sessions() {
+ return common_sessions;
+ }
+
+ public void setCommon_sessions(long common_sessions) {
+ this.common_sessions = common_sessions;
+ }
+
+ public long getCommon_flags() {
+ return common_flags;
+ }
+
+ public void setCommon_flags(long common_flags) {
+ this.common_flags = common_flags;
+ }
+
+ public long getCommon_c2s_pkt_diff() {
+ return common_c2s_pkt_diff;
+ }
+
+ public void setCommon_c2s_pkt_diff(long common_c2s_pkt_diff) {
+ this.common_c2s_pkt_diff = common_c2s_pkt_diff;
+ }
+
+ public long getCommon_s2c_pkt_diff() {
+ return common_s2c_pkt_diff;
+ }
+
+ public void setCommon_s2c_pkt_diff(long common_s2c_pkt_diff) {
+ this.common_s2c_pkt_diff = common_s2c_pkt_diff;
+ }
+
+ public long getCommon_c2s_byte_diff() {
+ return common_c2s_byte_diff;
+ }
+
+ public void setCommon_c2s_byte_diff(long common_c2s_byte_diff) {
+ this.common_c2s_byte_diff = common_c2s_byte_diff;
+ }
+
+ public long getCommon_s2c_byte_diff() {
+ return common_s2c_byte_diff;
+ }
+
+ public void setCommon_s2c_byte_diff(long common_s2c_byte_diff) {
+ this.common_s2c_byte_diff = common_s2c_byte_diff;
+ }
+
+ public String getCommon_client_ip() {
+ return common_client_ip;
+ }
+
+ public void setCommon_client_ip(String common_client_ip) {
+ this.common_client_ip = common_client_ip;
+ }
+
+ public String getInterimSessionTags() {
+ Map<String, Object> tagsMap = new HashMap<String, Object>();
+
+ if (common_vsys_id == 0) {
+ common_vsys_id = 1;
+ }
+ tagsMap.put("vsys_id", common_vsys_id);
+ tagsMap.put("device_id", common_device_id);
+ tagsMap.put("common_device_tag", common_device_tag);
+ tagsMap.put("protocol_label", common_protocol_label);
+ tagsMap.put("app_full_path", common_protocol_label + "." + common_l7_protocol);
+ tagsMap.put("randomNum",ThreadLocalRandom.current().nextInt(FlowWriteConfig.RANDOM_RANGE_NUM));
+
+ return JSON.toJSONString(tagsMap);
+ }
+
+
+ public Fileds getInterimSessionFileds() {
+ long out_bytes;
+ long in_bytes;
+ long in_pkts;
+ long out_pkts;
+ Long clientIsLocal = 8L;
+ if ((common_flags & clientIsLocal) == 8L) {
+ in_bytes = common_s2c_byte_diff;
+ out_bytes = common_c2s_byte_diff;
+ in_pkts = common_s2c_pkt_diff;
+ out_pkts = common_c2s_pkt_diff;
+ } else {
+ in_bytes = common_c2s_byte_diff;
+ out_bytes = common_s2c_byte_diff;
+ in_pkts = common_c2s_pkt_diff;
+ out_pkts = common_s2c_pkt_diff;
+ }
+ return new Fileds(common_sessions, in_bytes, out_bytes, in_pkts, out_pkts, common_c2s_pkt_diff, common_s2c_pkt_diff, common_c2s_byte_diff, common_s2c_byte_diff, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, "");
+ }
+
+
+}
+
diff --git a/src/main/java/com/zdjizhi/common/Metrics.java b/src/main/java/com/zdjizhi/common/Metrics.java
new file mode 100644
index 0000000..17c9a61
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/Metrics.java
@@ -0,0 +1,45 @@
+package com.zdjizhi.common;
+
+import java.util.Map;
+
+public class Metrics {
+ private final static String name = FlowWriteConfig.METRICS_NAME;
+ private Map<String, Object> tags;
+ private Fileds fields;
+ private long timestamp;
+
+ public Metrics( Map<String, Object> tags, Fileds fields, long timestamp) {
+ this.tags = tags;
+ this.fields = fields;
+ this.timestamp = timestamp;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Map<String, Object> getTags() {
+ return tags;
+ }
+
+ public void setTags( Map<String, Object> tags) {
+ this.tags = tags;
+ }
+
+ public Fileds getFields() {
+ return fields;
+ }
+
+ public void setFields(Fileds fields) {
+ this.fields = fields;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/common/SessionRecord.java b/src/main/java/com/zdjizhi/common/SessionRecord.java
new file mode 100644
index 0000000..6fa9a52
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/SessionRecord.java
@@ -0,0 +1,316 @@
+package com.zdjizhi.common;
+
+import com.alibaba.fastjson2.JSON;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class SessionRecord {
+ private long common_vsys_id;
+ private String common_device_id;
+ private String common_device_tag;
+ private String common_protocol_label;
+ private String common_app_full_path;
+
+ private String device_group;
+ private String data_center;
+
+ private long common_sessions;
+ private long common_flags;
+ private String common_l7_protocol;
+
+
+ private long common_c2s_pkt_diff;
+ private long common_s2c_pkt_diff;
+ private long common_c2s_byte_diff;
+ private long common_s2c_byte_diff;
+ private long common_c2s_pkt_num;
+ private long common_s2c_pkt_num;
+ private long common_c2s_byte_num;
+ private long common_s2c_byte_num;
+ private long common_c2s_ipfrag_num;
+ private long common_s2c_ipfrag_num;
+ private long common_c2s_tcp_lostlen;
+ private long common_s2c_tcp_lostlen;
+ private long common_c2s_tcp_unorder_num;
+ private long common_s2c_tcp_unorder_num;
+ private long common_c2s_pkt_retrans;
+ private long common_s2c_pkt_retrans;
+ private long common_c2s_byte_retrans;
+ private long common_s2c_byte_retrans;
+ private String common_client_ip;
+
+ public long getCommon_vsys_id() {
+ return common_vsys_id;
+ }
+
+ public void setCommon_vsys_id(long common_vsys_id) {
+
+ this.common_vsys_id = common_vsys_id;
+ }
+
+ public String getCommon_device_id() {
+ return common_device_id;
+ }
+
+ public void setCommon_device_id(String common_device_id) {
+ this.common_device_id = common_device_id;
+ }
+
+ public String getCommon_device_tag() {
+ return common_device_tag;
+ }
+
+ public void setCommon_device_tag(String common_device_tag) {
+ this.common_device_tag = common_device_tag;
+ }
+
+ public String getCommon_protocol_label() {
+ return common_protocol_label;
+ }
+
+ public void setCommon_protocol_label(String common_protocol_label) {
+ this.common_protocol_label = common_protocol_label;
+ }
+
+ public String getCommon_app_full_path() {
+ return common_app_full_path;
+ }
+
+ public void setCommon_app_full_path(String common_app_full_path) {
+ this.common_app_full_path = common_app_full_path;
+ }
+
+ public long getCommon_sessions() {
+ return common_sessions;
+ }
+
+ public void setCommon_sessions(long common_sessions) {
+ this.common_sessions = common_sessions;
+ }
+
+ public long getCommon_flags() {
+ return common_flags;
+ }
+
+ public void setCommon_flags(long common_flags) {
+ this.common_flags = common_flags;
+ }
+
+ public long getCommon_c2s_pkt_num() {
+ return common_c2s_pkt_num;
+ }
+
+ public void setCommon_c2s_pkt_num(long common_c2s_pkt_num) {
+ this.common_c2s_pkt_num = common_c2s_pkt_num;
+ }
+
+ public long getCommon_s2c_pkt_num() {
+ return common_s2c_pkt_num;
+ }
+
+ public void setCommon_s2c_pkt_num(long common_s2c_pkt_num) {
+ this.common_s2c_pkt_num = common_s2c_pkt_num;
+ }
+
+ public long getCommon_c2s_byte_num() {
+ return common_c2s_byte_num;
+ }
+
+ public void setCommon_c2s_byte_num(long common_c2s_byte_num) {
+ this.common_c2s_byte_num = common_c2s_byte_num;
+ }
+
+ public long getCommon_s2c_byte_num() {
+ return common_s2c_byte_num;
+ }
+
+ public void setCommon_s2c_byte_num(long common_s2c_byte_num) {
+ this.common_s2c_byte_num = common_s2c_byte_num;
+ }
+
+ public long getCommon_c2s_ipfrag_num() {
+ return common_c2s_ipfrag_num;
+ }
+
+ public void setCommon_c2s_ipfrag_num(long common_c2s_ipfrag_num) {
+ this.common_c2s_ipfrag_num = common_c2s_ipfrag_num;
+ }
+
+ public long getCommon_s2c_ipfrag_num() {
+ return common_s2c_ipfrag_num;
+ }
+
+ public void setCommon_s2c_ipfrag_num(long common_s2c_ipfrag_num) {
+ this.common_s2c_ipfrag_num = common_s2c_ipfrag_num;
+ }
+
+ public long getCommon_c2s_tcp_lostlen() {
+ return common_c2s_tcp_lostlen;
+ }
+
+ public void setCommon_c2s_tcp_lostlen(long common_c2s_tcp_lostlen) {
+ this.common_c2s_tcp_lostlen = common_c2s_tcp_lostlen;
+ }
+
+ public long getCommon_s2c_tcp_lostlen() {
+ return common_s2c_tcp_lostlen;
+ }
+
+ public void setCommon_s2c_tcp_lostlen(long common_s2c_tcp_lostlen) {
+ this.common_s2c_tcp_lostlen = common_s2c_tcp_lostlen;
+ }
+
+ public long getCommon_c2s_tcp_unorder_num() {
+ return common_c2s_tcp_unorder_num;
+ }
+
+ public void setCommon_c2s_tcp_unorder_num(long common_c2s_tcp_unorder_num) {
+ this.common_c2s_tcp_unorder_num = common_c2s_tcp_unorder_num;
+ }
+
+ public long getCommon_s2c_tcp_unorder_num() {
+ return common_s2c_tcp_unorder_num;
+ }
+
+ public void setCommon_s2c_tcp_unorder_num(long common_s2c_tcp_unorder_num) {
+ this.common_s2c_tcp_unorder_num = common_s2c_tcp_unorder_num;
+ }
+
+ public long getCommon_c2s_pkt_retrans() {
+ return common_c2s_pkt_retrans;
+ }
+
+ public void setCommon_c2s_pkt_retrans(long common_c2s_pkt_retrans) {
+ this.common_c2s_pkt_retrans = common_c2s_pkt_retrans;
+ }
+
+ public long getCommon_s2c_pkt_retrans() {
+ return common_s2c_pkt_retrans;
+ }
+
+ public void setCommon_s2c_pkt_retrans(long common_s2c_pkt_retrans) {
+ this.common_s2c_pkt_retrans = common_s2c_pkt_retrans;
+ }
+
+ public long getCommon_c2s_byte_retrans() {
+ return common_c2s_byte_retrans;
+ }
+
+ public void setCommon_c2s_byte_retrans(long common_c2s_byte_retrans) {
+ this.common_c2s_byte_retrans = common_c2s_byte_retrans;
+ }
+
+ public long getCommon_s2c_byte_retrans() {
+ return common_s2c_byte_retrans;
+ }
+
+ public void setCommon_s2c_byte_retrans(long common_s2c_byte_retrans) {
+ this.common_s2c_byte_retrans = common_s2c_byte_retrans;
+ }
+
+ public String getCommon_client_ip() {
+ return common_client_ip;
+ }
+
+ public void setCommon_client_ip(String common_client_ip) {
+ this.common_client_ip = common_client_ip;
+ }
+
+ public String getDevice_group() {
+ return device_group;
+ }
+
+ public void setDevice_group(String device_group) {
+ this.device_group = device_group;
+ }
+
+ public String getData_center() {
+ return data_center;
+ }
+
+ public void setData_center(String data_center) {
+ this.data_center = data_center;
+ }
+
+ public long getCommon_c2s_pkt_diff() {
+ return common_c2s_pkt_diff;
+ }
+
+ public void setCommon_c2s_pkt_diff(long common_c2s_pkt_diff) {
+ this.common_c2s_pkt_diff = common_c2s_pkt_diff;
+ }
+
+ public long getCommon_s2c_pkt_diff() {
+ return common_s2c_pkt_diff;
+ }
+
+ public void setCommon_s2c_pkt_diff(long common_s2c_pkt_diff) {
+ this.common_s2c_pkt_diff = common_s2c_pkt_diff;
+ }
+
+ public long getCommon_c2s_byte_diff() {
+ return common_c2s_byte_diff;
+ }
+
+ public void setCommon_c2s_byte_diff(long common_c2s_byte_diff) {
+ this.common_c2s_byte_diff = common_c2s_byte_diff;
+ }
+
+ public long getCommon_s2c_byte_diff() {
+ return common_s2c_byte_diff;
+ }
+
+ public void setCommon_s2c_byte_diff(long common_s2c_byte_diff) {
+ this.common_s2c_byte_diff = common_s2c_byte_diff;
+ }
+
+ public String getCommon_l7_protocol() {
+ return common_l7_protocol;
+ }
+
+ public void setCommon_l7_protocol(String common_l7_protocol) {
+ this.common_l7_protocol = common_l7_protocol;
+ }
+
+ public String getSessionTags() {
+ Map<String, Object> tagsMap = new HashMap<String, Object>();
+
+ if (common_vsys_id == 0) {
+ common_vsys_id = 1;
+ }
+ tagsMap.put("vsys_id", common_vsys_id);
+ tagsMap.put("device_id", common_device_id);
+ tagsMap.put("common_device_tag", common_device_tag);
+ tagsMap.put("protocol_label", common_protocol_label);
+ tagsMap.put("app_full_path", common_protocol_label + "." + common_l7_protocol);
+ tagsMap.put("randomNum",ThreadLocalRandom.current().nextInt(FlowWriteConfig.RANDOM_RANGE_NUM));
+
+ return JSON.toJSONString(tagsMap);
+ }
+
+
+ public Fileds getSessionFileds() {
+ long out_bytes;
+ long in_bytes;
+ long in_pkts;
+ long out_pkts;
+ Long clientIsLocal = 8L;
+ if ((common_flags & clientIsLocal) == 8L) {
+ in_bytes = common_s2c_byte_diff;
+ out_bytes = common_c2s_byte_diff;
+ in_pkts = common_s2c_pkt_diff;
+ out_pkts = common_c2s_pkt_diff;
+ } else {
+ in_bytes = common_c2s_byte_diff;
+ out_bytes = common_s2c_byte_diff;
+ in_pkts = common_c2s_pkt_diff;
+ out_pkts = common_s2c_pkt_diff;
+ }
+ return new Fileds(common_sessions, in_bytes, out_bytes, in_pkts, out_pkts, common_c2s_pkt_diff, common_s2c_pkt_diff, common_c2s_byte_diff, common_s2c_byte_diff, common_c2s_ipfrag_num, common_s2c_ipfrag_num, common_c2s_tcp_lostlen, common_s2c_tcp_lostlen, common_c2s_tcp_unorder_num, common_s2c_tcp_unorder_num,
+ common_c2s_pkt_retrans, common_s2c_pkt_retrans, common_c2s_byte_retrans, common_s2c_byte_retrans, "");
+ }
+
+}
+
diff --git a/src/main/java/com/zdjizhi/common/Tags.java b/src/main/java/com/zdjizhi/common/Tags.java
new file mode 100644
index 0000000..0a6d40c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/common/Tags.java
@@ -0,0 +1,68 @@
+package com.zdjizhi.common;
+
+public class Tags {
+
+ private long vsys_id;
+ private String device_id;
+ private String device_group;
+ private String data_center;
+ private String protocol_label;
+ private String app_full_path;
+
+ public Tags(long vsys_id, String device_id, String device_group, String data_center, String protocol_label, String app_full_path) {
+ this.vsys_id = vsys_id;
+ this.device_id = device_id;
+ this.device_group = device_group;
+ this.data_center = data_center;
+ this.protocol_label = protocol_label;
+ this.app_full_path = app_full_path;
+ }
+
+ public long getVsys_id() {
+ return vsys_id;
+ }
+
+ public void setVsys_id(long vsys_id) {
+ this.vsys_id = vsys_id;
+ }
+
+ public String getDevice_id() {
+ return device_id;
+ }
+
+ public void setDevice_id(String device_id) {
+ this.device_id = device_id;
+ }
+
+ public String getDevice_group() {
+ return device_group;
+ }
+
+ public void setDevice_group(String device_group) {
+ this.device_group = device_group;
+ }
+
+ public String getData_center() {
+ return data_center;
+ }
+
+ public void setData_center(String data_center) {
+ this.data_center = data_center;
+ }
+
+ public String getProtocol_label() {
+ return protocol_label;
+ }
+
+ public void setProtocol_label(String protocol_label) {
+ this.protocol_label = protocol_label;
+ }
+
+ public String getApp_full_path() {
+ return app_full_path;
+ }
+
+ public void setApp_full_path(String app_full_path) {
+ this.app_full_path = app_full_path;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/tools/function/CountWindowProcess.java b/src/main/java/com/zdjizhi/tools/function/CountWindowProcess.java
new file mode 100644
index 0000000..92f0b57
--- /dev/null
+++ b/src/main/java/com/zdjizhi/tools/function/CountWindowProcess.java
@@ -0,0 +1,147 @@
+package com.zdjizhi.tools.function;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSON;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.common.Fileds;
+import com.zdjizhi.common.Metrics;
+
+import com.zdjizhi.utils.StringUtil;
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CountWindowProcess extends ProcessWindowFunction<Tuple3<String, Fileds, String>, String, String, TimeWindow> {
+ private static final Log logger = LogFactory.get();
+
+ private long sessions;
+ private long in_bytes;
+ private long out_bytes;
+ private long in_pkts;
+ private long out_pkts;
+ private long c2s_pkts;
+ private long s2c_pkts;
+ private long c2s_bytes;
+ private long s2c_bytes;
+ private long c2s_fragments;
+ private long s2c_fragments;
+ private long c2s_tcp_lost_bytes;
+ private long s2c_tcp_lost_bytes;
+ private long c2s_tcp_ooorder_pkts;
+ private long s2c_tcp_ooorder_pkts;
+ private long c2s_tcp_retransmitted_pkts;
+ private long s2c_tcp_retransmitted_pkts;
+ private long c2s_tcp_retransmitted_bytes;
+ private long s2c_tcp_retransmitted_bytes;
+ private String device_group;
+ private String data_center;
+ private ArrayList<Object> read;
+ private String common_device_tag;
+
+
+ private String dataCenterExpr = "$.tags[?(@.tag=='data_center')].value";
+ private String dataGroupExpr = "$.tags[?(@.tag=='device_group')].value";
+
+ private String client_ip_sketch;
+ private Map<String, Object> cacheMap = new HashMap<>(20);
+
+ @Override
+ public void process(String key, Context context, Iterable<Tuple3<String, Fileds, String>> iterable, Collector<String> collector) throws Exception {
+ if (StringUtil.isNotBlank(key)) {
+ try {
+ HllSketch hllSketch = new HllSketch(12);
+ for (Tuple3<String, Fileds, String> record : iterable) {
+ sessions = sessions + record.f1.getSessions();
+
+ in_bytes = in_bytes + record.f1.getIn_bytes();
+ out_bytes = out_pkts + record.f1.getOut_bytes();
+
+ in_pkts = in_pkts + record.f1.getIn_pkts();
+ out_pkts = out_pkts + record.f1.getOut_pkts();
+
+ c2s_pkts = c2s_pkts + record.f1.getC2s_pkts();
+ s2c_pkts = s2c_pkts + record.f1.getS2c_pkts();
+
+ c2s_bytes = c2s_bytes + record.f1.getC2s_bytes();
+ s2c_bytes = s2c_bytes + record.f1.getS2c_bytes();
+
+ c2s_fragments = c2s_fragments + record.f1.getC2s_fragments();
+ s2c_fragments = s2c_fragments + record.f1.getS2c_fragments();
+
+ c2s_tcp_lost_bytes = c2s_tcp_lost_bytes + record.f1.getC2s_tcp_lost_bytes();
+ s2c_tcp_lost_bytes = s2c_tcp_lost_bytes + record.f1.getS2c_tcp_lost_bytes();
+
+ c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts + record.f1.getC2s_tcp_ooorder_pkts();
+ s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts + record.f1.getS2c_tcp_ooorder_pkts();
+
+ c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts + record.f1.getC2s_tcp_retransmitted_pkts();
+ s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts + record.f1.getS2c_tcp_retransmitted_pkts();
+
+ c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes + record.f1.getC2s_tcp_retransmitted_bytes();
+ s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes + record.f1.getS2c_tcp_retransmitted_bytes();
+
+ hllSketch.update(record.f2);
+ }
+
+ client_ip_sketch = Base64.getEncoder().encodeToString(hllSketch.toUpdatableByteArray());
+ Fileds fileds = new Fileds(sessions, in_bytes, out_bytes, in_pkts, out_pkts, c2s_pkts, s2c_pkts, c2s_bytes, s2c_bytes, c2s_fragments, s2c_fragments, c2s_tcp_lost_bytes, s2c_tcp_lost_bytes, c2s_tcp_ooorder_pkts, s2c_tcp_ooorder_pkts, c2s_tcp_retransmitted_pkts, s2c_tcp_retransmitted_pkts, c2s_tcp_retransmitted_bytes, s2c_tcp_retransmitted_bytes, client_ip_sketch);
+
+
+
+ //解析device_tag获得data_center、device_group
+ Map keyMap = JSON.parseObject(key, Map.class);
+ common_device_tag = (String) keyMap.remove("common_device_tag");
+
+ if (common_device_tag != null) {
+ DocumentContext parse = JsonPath.parse(common_device_tag);
+ read = parse.read(dataCenterExpr);
+ if (read.size() >= 1) {
+ data_center = read.get(0).toString();
+ }
+ read = parse.read(dataGroupExpr);
+ if (read.size() >= 1) {
+ device_group = read.get(0).toString();
+ }
+ }
+
+ keyMap.put("device_group", device_group);
+ keyMap.put("data_center", data_center);
+ keyMap.remove("randomNum");
+
+ collector.collect(JSON.toJSONString(new Metrics(keyMap, fileds, context.window().getEnd() / 1000)));
+ sessions = 0;
+ in_bytes = 0;
+ out_bytes = 0;
+ in_pkts = 0;
+ out_pkts = 0;
+ c2s_pkts = 0;
+ s2c_pkts = 0;
+ c2s_bytes = 0;
+ s2c_bytes = 0;
+ c2s_fragments = 0;
+ s2c_fragments = 0;
+ c2s_tcp_lost_bytes = 0;
+ s2c_tcp_lost_bytes = 0;
+ c2s_tcp_ooorder_pkts = 0;
+ s2c_tcp_ooorder_pkts = 0;
+ c2s_tcp_retransmitted_pkts = 0;
+ s2c_tcp_retransmitted_pkts = 0;
+ c2s_tcp_retransmitted_bytes = 0;
+ s2c_tcp_retransmitted_bytes = 0;
+
+ } catch (Exception e) {
+ logger.error("windows count error,message:" + e);
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/tools/function/KeyByFunction.java b/src/main/java/com/zdjizhi/tools/function/KeyByFunction.java
new file mode 100644
index 0000000..49e7a7c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/tools/function/KeyByFunction.java
@@ -0,0 +1,13 @@
+package com.zdjizhi.tools.function;
+
+import com.zdjizhi.common.Fileds;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+public class KeyByFunction implements KeySelector<Tuple3<String, Fileds, String>, String> {
+ @Override
+ public String getKey(Tuple3<String, Fileds, String> value) throws Exception {
+
+ return value.f0;
+ }
+}
diff --git a/src/main/java/com/zdjizhi/tools/function/ParseInterimSessionFunction.java b/src/main/java/com/zdjizhi/tools/function/ParseInterimSessionFunction.java
new file mode 100644
index 0000000..8d273b0
--- /dev/null
+++ b/src/main/java/com/zdjizhi/tools/function/ParseInterimSessionFunction.java
@@ -0,0 +1,50 @@
+package com.zdjizhi.tools.function;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSON;
+import com.zdjizhi.common.Fileds;
+import com.zdjizhi.common.InterimSessionRecord;
+import com.zdjizhi.common.SessionRecord;
+import com.zdjizhi.common.Tags;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.util.Map;
+
+/**
+ * @author wangchengcheng
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2023/04/20
+ */
+public class ParseInterimSessionFunction implements MapFunction<String, Tuple3<String, Fileds, String>> {
+ private static final Log logger = LogFactory.get();
+ private InterimSessionRecord interimsessionRecord;
+ private String tags="";
+ private Fileds sessionFileds;
+
+ @Override
+ public Tuple3<String, Fileds, String> map(String message) throws Exception {
+
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ interimsessionRecord = JSON.parseObject(message, InterimSessionRecord.class);
+ //过滤common_protocol_label不为空的数据
+ if (StringUtil.isNotBlank(interimsessionRecord.getCommon_protocol_label())) {
+ tags = interimsessionRecord.getInterimSessionTags();
+ sessionFileds = interimsessionRecord.getInterimSessionFileds();
+ return new Tuple3<>(tags, sessionFileds, interimsessionRecord.getCommon_client_ip());
+ }else {
+ return new Tuple3<>("", sessionFileds, "");
+ }
+ }else {
+ return new Tuple3<>("", sessionFileds, "");
+ }
+ } catch (RuntimeException e) {
+ logger.error("An error occurred in the interim-session-record parsing,error message is:" + e + ",The original log is" + message);
+ return new Tuple3<>("", sessionFileds, "");
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/tools/function/ParseSessionFunction.java b/src/main/java/com/zdjizhi/tools/function/ParseSessionFunction.java
new file mode 100644
index 0000000..01b0826
--- /dev/null
+++ b/src/main/java/com/zdjizhi/tools/function/ParseSessionFunction.java
@@ -0,0 +1,51 @@
+package com.zdjizhi.tools.function;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.alibaba.fastjson2.JSON;
+import com.zdjizhi.common.Fileds;
+import com.zdjizhi.common.SessionRecord;
+import com.zdjizhi.common.Tags;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.flink.api.common.functions.MapFunction;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.util.Map;
+
+/**
+ * @author wangchengcheng
+ * @Package com.zdjizhi.utils.functions
+ * @Description:
+ * @date 2023/04/20
+ */
+public class ParseSessionFunction implements MapFunction<String, Tuple3<String, Fileds, String>> {
+ private static final Log logger = LogFactory.get();
+ private SessionRecord sessionRecord;
+ private String tags;
+ private Fileds sessionFileds;
+
+ @Override
+ public Tuple3<String, Fileds, String> map(String message) {
+
+ try {
+ if (StringUtil.isNotBlank(message)) {
+ sessionRecord = JSON.parseObject(message, SessionRecord.class);
+ //过滤common_protocol_label不为空的数据
+ if (StringUtil.isNotBlank(sessionRecord.getCommon_protocol_label())) {
+ tags = sessionRecord.getSessionTags();
+ sessionFileds = sessionRecord.getSessionFileds();
+ return new Tuple3<>(tags, sessionFileds, sessionRecord.getCommon_client_ip());
+ }else {
+ return new Tuple3<>("", sessionFileds, "");
+ }
+ }else {
+ return new Tuple3<>("", sessionFileds, "");
+ }
+
+ } catch (Exception e) {
+ logger.error("An error occurred in the session-record parsing,error message is:" + e + ",The original log is" + message);
+ return new Tuple3<>("", sessionFileds, "");
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java b/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java
new file mode 100644
index 0000000..ad93f29
--- /dev/null
+++ b/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java
@@ -0,0 +1,48 @@
+package com.zdjizhi.tools.kafka;
+
+import com.zdjizhi.common.FlowWriteConfig;
+import org.apache.kafka.common.config.SslConfigs;
+
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/9/610:37
+ */
+class CertUtils {
+ /**
+ * Kafka SASL认证端口
+ */
+ private static final String SASL_PORT = "9094";
+
+ /**
+ * Kafka SSL认证端口
+ */
+ private static final String SSL_PORT = "9095";
+
+ /**
+ * 根据连接信息端口判断认证方式。
+ *
+ * @param servers kafka 连接信息
+ * @param properties kafka 连接配置信息
+ */
+ static void chooseCert(String servers, Properties properties) {
+ if (servers.contains(SASL_PORT)) {
+ properties.put("security.protocol", "SASL_PLAINTEXT");
+ properties.put("sasl.mechanism", "PLAIN");
+ properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ + FlowWriteConfig.KAFKA_SASL_JAAS_USER + " password=" + FlowWriteConfig.KAFKA_SASL_JAAS_PIN + ";");
+ } else if (servers.contains(SSL_PORT)) {
+ properties.put("security.protocol", "SSL");
+ properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ properties.put("ssl.keystore.location", FlowWriteConfig.TOOLS_LIBRARY + "keystore.jks");
+ properties.put("ssl.keystore.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN);
+ properties.put("ssl.truststore.location", FlowWriteConfig.TOOLS_LIBRARY + "truststore.jks");
+ properties.put("ssl.truststore.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN);
+ properties.put("ssl.key.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN);
+ }
+
+ }
+}
diff --git a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java
new file mode 100644
index 0000000..d0fa01f
--- /dev/null
+++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java
@@ -0,0 +1,49 @@
+package com.zdjizhi.tools.kafka;
+
+import com.zdjizhi.common.FlowWriteConfig;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+
+
+
+import java.util.Properties;
+
+/**
+ * @author wangchengcheng
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2023/04/19
+ */
+public class KafkaConsumer {
+
+
+ //消费session-record配置
+ private static Properties createConsumerConfig(String groupId) {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", FlowWriteConfig.SOURCE_KAFKA_SERVERS);
+ properties.put("group.id", groupId);
+ properties.put("session.timeout.ms", FlowWriteConfig.SESSION_TIMEOUT_MS);
+ properties.put("max.poll.records", FlowWriteConfig.MAX_POLL_RECORDS);
+ properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES);
+ properties.put("partition.discovery.interval.ms", "10000");
+ CertUtils.chooseCert(FlowWriteConfig.SOURCE_KAFKA_SERVERS, properties);
+
+ return properties;
+ }
+
+
+ public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic,String groupId) {
+ FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
+ new SimpleStringSchema(), createConsumerConfig(groupId));
+
+ //随着checkpoint提交,将offset提交到kafka
+ kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
+
+ //从消费组当前的offset开始消费
+ kafkaConsumer.setStartFromGroupOffsets();
+
+ return kafkaConsumer;
+ }
+
+
+}
diff --git a/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java
new file mode 100644
index 0000000..ee31bc2
--- /dev/null
+++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java
@@ -0,0 +1,50 @@
+package com.zdjizhi.tools.kafka;
+
+import com.zdjizhi.common.FlowWriteConfig;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi.utils.kafka
+ * @Description:
+ * @date 2021/6/814:04
+ */
+public class KafkaProducer {
+
+ private static Properties createProducerConfig() {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", FlowWriteConfig.SINK_KAFKA_SERVERS);
+ properties.put("acks", FlowWriteConfig.PRODUCER_ACK);
+ properties.put("retries", FlowWriteConfig.RETRIES);
+ properties.put("linger.ms", FlowWriteConfig.LINGER_MS);
+ properties.put("request.timeout.ms", FlowWriteConfig.REQUEST_TIMEOUT_MS);
+ properties.put("batch.size", FlowWriteConfig.BATCH_SIZE);
+ properties.put("buffer.memory", FlowWriteConfig.BUFFER_MEMORY);
+ properties.put("max.request.size", FlowWriteConfig.MAX_REQUEST_SIZE);
+ properties.put("compression.type", FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE);
+
+ CertUtils.chooseCert(FlowWriteConfig.SINK_KAFKA_SERVERS, properties);
+
+ return properties;
+ }
+
+
+ public static FlinkKafkaProducer<String> getKafkaProducer() {
+ FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
+ FlowWriteConfig.SINK_KAFKA_TOPIC,
+ new SimpleStringSchema(),
+ createProducerConfig(),
+ //sink与所有分区建立连接,轮询写入;
+ Optional.empty());
+
+ //允许producer记录失败日志而不是捕获和抛出它们
+ kafkaProducer.setLogFailuresOnly(true);
+
+ return kafkaProducer;
+ }
+
+}
diff --git a/src/main/java/com/zdjizhi/tools/system/FlowWriteConfigurations.java b/src/main/java/com/zdjizhi/tools/system/FlowWriteConfigurations.java
new file mode 100644
index 0000000..d429def
--- /dev/null
+++ b/src/main/java/com/zdjizhi/tools/system/FlowWriteConfigurations.java
@@ -0,0 +1,69 @@
+package com.zdjizhi.tools.system;
+
+import com.zdjizhi.utils.StringUtil;
+
+import java.io.IOException;
+import java.util.Locale;
+import java.util.Properties;
+
+
+/**
+ * @author Administrator
+ */
+
+public final class FlowWriteConfigurations {
+
+ private static Properties propDefault = new Properties();
+ private static Properties propService = new Properties();
+
+
+ public static String getStringProperty(Integer type, String key) {
+ if (type == 0) {
+ return propService.getProperty(key).trim();
+ } else if (type == 1) {
+ return propDefault.getProperty(key).trim();
+ } else {
+ return null;
+ }
+ }
+
+ public static Integer getIntProperty(Integer type, String key) {
+ if (type == 0) {
+ return Integer.parseInt(propService.getProperty(key).trim());
+ } else if (type == 1) {
+ return Integer.parseInt(propDefault.getProperty(key).trim());
+ } else {
+ return null;
+ }
+ }
+
+ public static Long getLongProperty(Integer type, String key) {
+ if (type == 0) {
+ return Long.parseLong(propService.getProperty(key).trim());
+ } else if (type == 1) {
+ return Long.parseLong(propDefault.getProperty(key).trim());
+ } else {
+ return null;
+ }
+ }
+
+ public static Boolean getBooleanProperty(Integer type, String key) {
+ if (type == 0) {
+ return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
+ } else if (type == 1) {
+ return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true");
+ } else {
+ return null;
+ }
+ }
+
+ static {
+ try {
+ propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
+ propDefault.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties"));
+ } catch (IOException | RuntimeException e) {
+ propDefault = null;
+ propService = null;
+ }
+ }
+}
diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
new file mode 100644
index 0000000..170755f
--- /dev/null
+++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java
@@ -0,0 +1,75 @@
+package com.zdjizhi.topology;
+
+import cn.hutool.log.Log;
+import cn.hutool.log.LogFactory;
+import com.zdjizhi.common.Fileds;
+import com.zdjizhi.common.FlowWriteConfig;
+import com.zdjizhi.tools.function.CountWindowProcess;
+import com.zdjizhi.tools.function.KeyByFunction;
+import com.zdjizhi.tools.function.ParseInterimSessionFunction;
+import com.zdjizhi.tools.function.ParseSessionFunction;
+import com.zdjizhi.tools.kafka.KafkaConsumer;
+
+import com.zdjizhi.tools.kafka.KafkaProducer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.WindowedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+
+
+
+/**
+ * @author wangchengcheng
+ * @Package com.zdjizhi.topology
+ * @Description:
+ * @date 2023/04/19
+ */
+public class LogFlowWriteTopology {
+ private static final Log logger = LogFactory.get();
+
+ public static void main(String[] args) {
+ final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+ environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT);
+
+ //消费、清洗会话日志
+ SingleOutputStreamOperator<String> sessionSource = environment.addSource(KafkaConsumer.getKafkaConsumer(FlowWriteConfig.SESSION_SOURCE_KAFKA_TOPIC,FlowWriteConfig.SESSION_GROUP_ID))
+ .setParallelism(FlowWriteConfig.SESSION_SOURCE_PARALLELISM).name(FlowWriteConfig.SESSION_SOURCE_KAFKA_TOPIC);
+
+ SingleOutputStreamOperator<Tuple3<String, Fileds, String>> sessionParseMap = sessionSource.map(new ParseSessionFunction())
+ .setParallelism(FlowWriteConfig.SESSION_SOURCE_PARALLELISM).name("sessionParseMap");
+
+
+ //消费、清洗过渡会话
+ SingleOutputStreamOperator<String> interimSessionSource = environment.addSource(KafkaConsumer.getKafkaConsumer(FlowWriteConfig.INTERIM_SESSION_SOURCE_KAFKA_TOPIC,FlowWriteConfig.INTERIM_SESSION_GROUP_ID))
+ .setParallelism(FlowWriteConfig.INTERIM_SESSION_SOURCE_PARALLELISM).name(FlowWriteConfig.INTERIM_SESSION_SOURCE_KAFKA_TOPIC);
+
+ SingleOutputStreamOperator<Tuple3<String, Fileds, String>> interimSessionParseMap = interimSessionSource.map(new ParseInterimSessionFunction())
+ .setParallelism(FlowWriteConfig.INTERIM_SESSION_SOURCE_PARALLELISM).name("interimSessionParseMap");
+
+
+
+ //将会话、过渡会话汇聚并按照key分组后进入滑动窗口
+ WindowedStream<Tuple3<String, Fileds, String>, String, TimeWindow> window = sessionParseMap.union(interimSessionParseMap).keyBy(new KeyByFunction())
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.COUNT_WINDOW_SECONDS)));
+
+ SingleOutputStreamOperator<String> countWindow = window.process(new CountWindowProcess()).setParallelism(FlowWriteConfig.COUNT_WINDOW_PARALLELISM).name("countWindow");
+
+ countWindow.addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka")
+ .setParallelism(FlowWriteConfig.SINK_KAFKA_PARALLELISM).name(FlowWriteConfig.SINK_KAFKA_TOPIC);
+
+ try {
+ environment.execute(args[0]);
+ } catch (Exception e) {
+ logger.error("This Flink task start ERROR! Exception information is :" + e);
+ e.printStackTrace();
+ }
+
+ }
+
+
+}
diff --git a/src/main/log4j.properties b/src/main/log4j.properties
new file mode 100644
index 0000000..facffc7
--- /dev/null
+++ b/src/main/log4j.properties
@@ -0,0 +1,25 @@
+#Log4j
+log4j.rootLogger=info,console,file
+# 控制台日志设置
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=info
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
+
+# 文件日志设置
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.file.Threshold=info
+log4j.appender.file.encoding=UTF-8
+log4j.appender.file.Append=true
+#路径请用相对路径,做好相关测试输出到应用目下
+log4j.appender.file.file=${nis.root}/log/galaxy-name.log
+log4j.appender.file.DatePattern='.'yyyy-MM-dd
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
+log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
+#MyBatis 配置,com.nis.web.dao是mybatis接口所在包
+log4j.logger.com.nis.web.dao=info
+#bonecp数据源配置
+log4j.category.com.jolbox=info,console
+
+
diff --git a/src/main/logback.xml b/src/main/logback.xml
new file mode 100644
index 0000000..59095f6
--- /dev/null
+++ b/src/main/logback.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+
+ <!-- 格式化输出:%date表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度 %msg:日志消息,%n是换行符-->
+ <property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" />
+ <!-- 定义日志存储的路径,不要配置相对路径 -->
+ <property name="LOG_FILE_PATH" value="E:/logs/demo.%d{yyyy-MM-dd}.%i.log" />
+
+ <!-- 控制台输出日志 -->
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <!-- 按照上面配置的LOG_PATTERN来打印日志 -->
+ <pattern>${LOG_PATTERN}</pattern>
+ </encoder>
+ </appender>
+
+ <!--每天生成一个日志文件,保存30天的日志文件。rollingFile是用来切分文件的 -->
+ <appender name="FILE"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_FILE_PATH}</fileNamePattern>
+ <!-- keep 15 days' worth of history -->
+ <maxHistory>30</maxHistory>
+ <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+ <!-- 日志文件的最大大小 -->
+ <maxFileSize>20MB</maxFileSize>
+ </timeBasedFileNamingAndTriggeringPolicy>
+ </rollingPolicy>
+
+ <encoder>
+ <pattern>${LOG_PATTERN}</pattern>
+ </encoder>
+ </appender>
+ <!-- project default level项目输出的日志级别 -->
+ <logger name="com.example.demo" level="INFO" />
+
+ <!-- 日志输出级别 常用的日志级别按照从高到低依次为:ERROR、WARN、INFO、DEBUG。 -->
+ <root level="INFO">
+ <appender-ref ref="CONSOLE" />
+ <appender-ref ref="FILE" /><!--对应appender name="FILE"。 -->
+ </root>
+</configuration> \ No newline at end of file
diff --git a/src/test/java/com/zdjizhi/tools/DatasketchesTest.java b/src/test/java/com/zdjizhi/tools/DatasketchesTest.java
new file mode 100644
index 0000000..644c916
--- /dev/null
+++ b/src/test/java/com/zdjizhi/tools/DatasketchesTest.java
@@ -0,0 +1,253 @@
+package com.zdjizhi.tools;
+
+import cn.hutool.json.JSONUtil;
+import com.alibaba.fastjson2.*;
+import com.zdjizhi.utils.JsonMapper;
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.datasketches.hll.Union;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Test;
+
+import java.util.*;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2023/3/217:17
+ */
+public class DatasketchesTest {
+
+ @Test
+ public void HllSketchTest() {
+ HashSet<String> strings = new HashSet<>();
+
+ HllSketch sketch = new HllSketch(12);
+
+ for (int i = 0; i < 50; i++) {
+ String ip = "192.168.1." + i;
+ sketch.update(ip);
+ strings.add(ip);
+ }
+
+ System.out.println(sketch.getEstimate() + "--" + strings.size());
+
+ HashSet<String> randomStrings = new HashSet<>();
+
+ HllSketch randomSketch = new HllSketch(12);
+
+ for (int i = 0; i < 50; i++) {
+ String ip = makeIPv4Random();
+ randomSketch.update(ip);
+ randomStrings.add(ip);
+ }
+
+ System.out.println(randomSketch.getEstimate() + "--" + randomStrings.size());
+ }
+
+ @Test
+ public void HllSketchUnionTest() {
+ HashSet<String> strings = new HashSet<>();
+
+ HllSketch sketch = new HllSketch(12);
+
+ for (int i = 0; i < 50; i++) {
+ String ip = "192.168.1." + i;
+ sketch.update(ip);
+ strings.add(ip);
+ }
+
+ HllSketch sketch2 = new HllSketch(12);
+
+ for (int i = 0; i < 10; i++) {
+ String ip = "192.168.2." + i;
+ sketch2.update(ip);
+ strings.add(ip);
+ }
+
+ Union union = new Union(12);
+
+ union.update(sketch);
+ union.update(sketch2);
+ HllSketch sketch_result = HllSketch.heapify(union.getResult().toCompactByteArray());
+
+ System.out.println(sketch.getEstimate() + "--" + strings.size());
+ System.out.println(sketch2.getEstimate() + "--" + strings.size());
+ System.out.println(sketch_result.getEstimate() + "--" + strings.size());
+ }
+
+ @Test
+ public void HllSketchDruidTest() {
+ HashMap<String, Object> dataMap = new HashMap<>();
+
+ HashSet<String> strings = new HashSet<>();
+
+ HllSketch sketch = new HllSketch(12);
+
+ for (int i = 0; i < 50; i++) {
+ String ip = "192.168.1." + i;
+ sketch.update(ip);
+ strings.add(ip);
+ }
+
+ HllSketch sketch2 = new HllSketch(12);
+
+ for (int i = 0; i < 10; i++) {
+ String ip = "192.168.2." + i;
+ sketch2.update(ip);
+ strings.add(ip);
+ }
+
+ Union union = new Union(12);
+
+ union.update(sketch);
+ union.update(sketch2);
+ HllSketch sketch_result1 = HllSketch.heapify(union.getResult().toCompactByteArray());
+
+ HllSketch sketch3 = new HllSketch(12);
+
+ for (int i = 0; i < 10; i++) {
+ String ip = "192.168.3." + i;
+ sketch3.update(ip);
+ strings.add(ip);
+ }
+
+ Union union2 = new Union(12);
+
+ union2.update(sketch_result1);
+ union2.update(sketch3);
+ HllSketch sketch_result2 = HllSketch.heapify(union2.getResult().toCompactByteArray());
+
+ System.out.println(sketch.getEstimate() + "--" + strings.size());
+ System.out.println(sketch2.getEstimate() + "--" + strings.size());
+ System.out.println(sketch3.getEstimate() + "--" + strings.size());
+ System.out.println(sketch_result1.getEstimate() + "--" + strings.size());
+ System.out.println(sketch_result2.getEstimate() + "--" + strings.size());
+
+ Result result = new Result();
+ result.setC2s_pkt_num(10);
+ result.setS2c_pkt_num(10);
+ result.setC2s_byte_num(10);
+ result.setS2c_byte_num(10);
+ result.setStat_time(1679970031);
+ result.setSchema_type("HLLSketchMergeTest");
+
+ //CompactByte
+ result.setIp_object(sketch_result2.toCompactByteArray());
+// System.out.println(result.toString());
+ //sendMessage(JsonMapper.toJsonString(result);
+
+
+ //UpdatableByte
+ result.setIp_object(sketch_result2.toUpdatableByteArray());
+// System.out.println(result.toString());
+ //sendMessage(JsonMapper.toJsonString(result);
+
+ //Hashmap
+ dataMap.put("app_name", "TEST");
+ dataMap.put("protocol_stack_id", "HTTP");
+ dataMap.put("vsys_id", 1);
+ dataMap.put("stat_time", 1681370100);
+ dataMap.put("client_ip_sketch", sketch_result2.toUpdatableByteArray());
+
+ System.out.println("Jackson:" + JsonMapper.toJsonString(dataMap));
+ System.out.println("FastJson2:" + JSONObject.toJSONString(dataMap));
+ System.out.println("Hutool:" + JSONUtil.toJsonStr(dataMap) + "\n\n");
+
+ byte[] toJSONB = JSONB.toBytes(dataMap);
+// sendMessage(toJSONB);
+ JSONObject jsonObject = JSONB.parseObject(toJSONB);
+ System.out.println("FastJson2 Byte(JSONB):" + jsonObject.toJSONString() + "\n\n");
+
+
+ dataMap.put("client_ip_sketch", Base64.getEncoder().encodeToString(sketch_result2.toUpdatableByteArray()));
+ System.out.println("FastJson2 Byte(Base64):" + JSONObject.toJSONString(dataMap));
+ System.out.println("Hutool Byte(Base64):" + JSONObject.toJSONString(dataMap));
+ System.out.println(JSONUtil.toJsonStr(dataMap));
+
+
+// sendMessage(JSONObject.toJSONString(dataMap));
+ }
+
+
+ //随机生成ip
+ private static String makeIPv4Random() {
+ int v4_1 = new Random().nextInt(255) + 1;
+ int v4_2 = new Random().nextInt(255);
+ int v4_3 = new Random().nextInt(255);
+ int v4_4 = new Random().nextInt(255);
+ return v4_1 + "." + v4_2 + "." + v4_3 + "." + v4_4;
+ }
+
+ private static void sendMessage(Object message) {
+ Properties props = new Properties();
+ //kafka地址
+ props.put("bootstrap.servers", "192.168.44.12:9092");
+ props.put("acks", "all");
+ props.put("retries", 0);
+ props.put("linger.ms", 1);
+ props.put("buffer.memory", 67108864);
+// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+// props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ KafkaProducer<String, Object> kafkaProducer = new KafkaProducer<String, Object>(props);
+
+ kafkaProducer.send(new ProducerRecord<String, Object>("TRAFFIC-PROTOCOL-TEST", message));
+
+ kafkaProducer.close();
+ }
+}
+
+class Result {
+
+ private String schema_type;
+ private long c2s_byte_num;
+ private long c2s_pkt_num;
+ private long s2c_byte_num;
+ private long s2c_pkt_num;
+ private long stat_time;
+ private byte[] ip_object;
+
+ public void setSchema_type(String schema_type) {
+ this.schema_type = schema_type;
+ }
+
+ public void setC2s_byte_num(long c2s_byte_num) {
+ this.c2s_byte_num = c2s_byte_num;
+ }
+
+ public void setC2s_pkt_num(long c2s_pkt_num) {
+ this.c2s_pkt_num = c2s_pkt_num;
+ }
+
+ public void setS2c_byte_num(long s2c_byte_num) {
+ this.s2c_byte_num = s2c_byte_num;
+ }
+
+ public void setS2c_pkt_num(long s2c_pkt_num) {
+ this.s2c_pkt_num = s2c_pkt_num;
+ }
+
+ public void setStat_time(long stat_time) {
+ this.stat_time = stat_time;
+ }
+
+ public void setIp_object(byte[] ip_object) {
+ this.ip_object = ip_object;
+ }
+
+ @Override
+ public String toString() {
+ return "Result{" +
+ "schema_type='" + schema_type + '\'' +
+ ", c2s_byte_num=" + c2s_byte_num +
+ ", c2s_pkt_num=" + c2s_pkt_num +
+ ", s2c_byte_num=" + s2c_byte_num +
+ ", s2c_pkt_num=" + s2c_pkt_num +
+ ", stat_time=" + stat_time +
+ ", ip_object=" + Arrays.toString(ip_object) +
+ '}';
+ }
+} \ No newline at end of file
diff --git a/src/test/java/com/zdjizhi/tools/FlagsTest.java b/src/test/java/com/zdjizhi/tools/FlagsTest.java
new file mode 100644
index 0000000..a32e06b
--- /dev/null
+++ b/src/test/java/com/zdjizhi/tools/FlagsTest.java
@@ -0,0 +1,48 @@
+package com.zdjizhi.tools;
+
+import org.junit.Test;
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2023/4/1810:22
+ */
+public class FlagsTest {
+ /*
+ * 参考资料:https://juejin.cn/post/6879226834597691405
+ *
+ * 会话标记(实际存储为64位无符号整数),32-bit Field标识会话的网络行为,日志记录值和如下值通过Bitwise AND(&)操作进行查询和转换:
+ * 0x00000001 - (1) Asymmetric
+ * 0x00000002 - (2) Bulky
+ * 0x00000004 - (4) CBR Streaming
+ * 0x00000008 - (8) Client is Local
+ * 0x00000010 - (16) Server is Local
+ * 0x00000020 - (32) Download
+ * 0x00000040 - (64) Interactive
+ * 0x00000080 - (128) Inbound
+ * 0x00000100 - (256) Outbound
+ * 0x00000200 - (512) Pseudo Unidirectional
+ * 0x00000400 - (1024) Streaming
+ * 0x00000800 - (2048) Unidirectional
+ * 0x00001000 - (4096) Random looking
+ * 0x00002000 - (8192) C2S
+ * 0x00004000 - (16384) S2C
+ */
+
+ @Test
+ public void bitwiseAND() {
+ Long common_flags = 8200L;
+ Long clientIsLocal = 8L;
+ Long serverIsLocal = 16L;
+
+ System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal));
+// System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal)+"\n\n");
+
+ common_flags = 16400L;
+
+ System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal));
+ System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal));
+
+ }
+}
diff --git a/src/test/java/com/zdjizhi/tools/TestJsonPath.java b/src/test/java/com/zdjizhi/tools/TestJsonPath.java
new file mode 100644
index 0000000..05ef995
--- /dev/null
+++ b/src/test/java/com/zdjizhi/tools/TestJsonPath.java
@@ -0,0 +1,89 @@
+package com.zdjizhi.tools;
+
+import com.alibaba.fastjson2.JSON;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import com.zdjizhi.common.SessionRecord;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+public class TestJsonPath {
+ public static void main(String[] args) {
+
+ String data_center;
+ String device_group;
+
+ String sessionReocrd = "{\"common_schema_type\":\"BASE\",\"common_sessions\":1,\"common_protocol_label\":\"ETHERNET.IPv4.TCP\",\"common_c2s_byte_diff\":2264,\"common_c2s_pkt_diff\":4,\"common_s2c_byte_diff\":0,\"common_s2c_pkt_diff\":0,\"common_c2s_ipfrag_num\":0,\"common_s2c_ipfrag_num\":0,\"common_c2s_tcp_unorder_num\":0,\"common_s2c_tcp_unorder_num\":0,\"common_c2s_tcp_lostlen\":0,\"common_s2c_tcp_lostlen\":0,\"common_c2s_pkt_retrans\":3,\"common_s2c_pkt_retrans\":0,\"common_c2s_byte_retrans\":1698,\"common_s2c_byte_retrans\":0,\"common_flags\":8201,\"common_flags_identify_info\":\"{\\\"Asymmetric\\\":4,\\\"Client is Local\\\":1,\\\"C2S\\\":1}\",\"common_direction\":69,\"common_app_full_path\":\"unknown\",\"common_app_label\":\"unknown\",\"common_tcp_client_isn\":3855187193,\"common_server_ip\":\"45.188.134.11\",\"common_client_ip\":\"5.32.144.55\",\"common_server_port\":8081,\"common_client_port\":5652,\"common_stream_dir\":1,\"common_address_type\":4,\"common_address_list\":\"5652-8081-5.32.144.55-45.188.134.11\",\"common_start_time\":1682403782,\"common_end_time\":1682403802,\"common_con_duration_ms\":20164,\"common_s2c_pkt_num\":0,\"common_s2c_byte_num\":0,\"common_c2s_pkt_num\":4,\"common_c2s_byte_num\":2264,\"common_client_location\":\"俄罗斯联邦.莫斯科州.沃斯克列先斯克\",\"common_server_location\":\"巴西.马托格罗索州.Unknown\",\"common_stream_trace_id\":\"869469798157990979\",\"common_l4_protocol\":\"IPv4_TCP\",\"common_sled_ip\":\"192.168.40.81\",\"common_device_id\":\"21426003\",\"common_data_center\":\"center-xxg-9140\",\"common_device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-9140\\\"},{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-9140\\\"}]}\",\"common_t_vsys_id\":1,\"common_vsys_id\":1}";
+
+ SessionRecord sessionRecord = JSON.parseObject(sessionReocrd, SessionRecord.class);
+
+ int i = 0;
+ double begin = System.currentTimeMillis();
+
+ System.out.println(sessionRecord.getCommon_device_tag());
+// //jsonpath
+// while (i < 6000000) {
+// String common_device_tag = sessionRecord.getCommon_device_tag();
+// String dataCenterExpr = "$.tags[?(@.tag=='data_center')].value";
+// String dataGroupExpr = "$.tags[?(@.tag=='device_group')].value";
+//
+//
+// if (common_device_tag != null) {
+//
+// DocumentContext parse = JsonPath.parse(common_device_tag);
+//
+//
+//
+// ArrayList<Object> read = parse.read(dataCenterExpr);
+// if (read.size() >= 1) {
+// data_center = read.get(0).toString();
+//
+// }
+//
+// read = parse.read(dataGroupExpr);
+// if (read.size() >= 1) {
+// device_group = read.get(0).toString();
+// }
+// }
+//
+// i++;
+// }
+
+ double end = System.currentTimeMillis(); // 程序结束时间,调用系统当前时间
+
+ double time = end - begin;// 程序的运行时间
+
+ System.out.println(time/1000 +"秒");
+
+// //fastjson2
+// while (i < 600000) {
+//
+// String dataCenterExpr = "$.tags[?(@.tag=='data_center')][0].value";
+// String dataGroupExpr = "$.tags[?(@.tag=='device_group')][0].value";
+//
+//
+// if (common_device_tag != null) {
+// ArrayList<Object> read = JsonPath.parse(common_device_tag).read(dataCenterExpr);
+// if (read.size() >= 1) {
+// data_center = read.get(0).toString();
+//
+// }
+//
+// read = JsonPath.parse(common_device_tag).read(dataGroupExpr);
+// if (read.size() >= 1) {
+// device_group = read.get(0).toString();
+// }
+// }
+// i++;
+// }
+
+
+
+
+
+
+
+
+ }
+}
diff --git a/src/test/java/com/zdjizhi/tools/TestMap.java b/src/test/java/com/zdjizhi/tools/TestMap.java
new file mode 100644
index 0000000..5f4a831
--- /dev/null
+++ b/src/test/java/com/zdjizhi/tools/TestMap.java
@@ -0,0 +1,22 @@
+package com.zdjizhi.tools;
+
+import com.jayway.jsonpath.JsonPath;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestMap {
+
+ public static void main(String[] args) {
+ Map<String, Object> tagsMap = new HashMap<String, Object>();
+ tagsMap.put("session",1L);
+
+
+ Object tags = tagsMap.remove("tags");
+ System.out.println(tags);
+ System.out.println(tagsMap);
+
+ }
+ }
+