diff options
| author | wangchengcheng <[email protected]> | 2023-04-25 17:41:15 +0800 |
|---|---|---|
| committer | wangchengcheng <[email protected]> | 2023-04-25 17:41:15 +0800 |
| commit | 2398dc61bbff63ca7611a0f1cfc3d5f0c063f37c (patch) | |
| tree | 77a350adf5139bf988b4e2bad9fa90eca5f6fa80 | |
first commit
24 files changed, 2321 insertions, 0 deletions
@@ -0,0 +1,231 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.zdjizhi</groupId> + <artifactId>app-protocol-stat-traffic-agent</artifactId> + <version>20230425-test</version> + + <name>app-protocol-stat-traffic-agent</name> + <url>http://www.example.com</url> + + <repositories> + <repository> + <id>nexus</id> + <name>Team Nexus Repository</name> + <url>http://192.168.40.125:8099/content/groups/public</url> + </repository> + + <repository> + <id>maven-ali</id> + <url>http://maven.aliyun.com/nexus/content/groups/public/</url> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + <checksumPolicy>fail</checksumPolicy> + </snapshots> + </repository> + </repositories> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <flink.version>1.13.1</flink.version> + <hadoop.version>2.7.1</hadoop.version> + <kafka.version>1.0.0</kafka.version> + <hbase.version>2.2.3</hbase.version> + <nacos.version>1.2.0</nacos.version> + <zdjz.tools.version>1.0.8</zdjz.tools.version> + <scope.type>provided</scope.type> +<!-- <scope.type>compile</scope.type>--> + </properties> + + <build> + <plugins> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.2</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>com.zdjizhi.topology.LogFlowWriteTopology</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>io.github.zlika</groupId> + <artifactId>reproducible-build-maven-plugin</artifactId> + <version>0.2</version> + <executions> + <execution> + <goals> + <goal>strip-jar</goal> + </goals> + <phase>package</phase> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + <resources> + <resource> + <directory>properties</directory> + <includes> + <include>**/*.properties</include> + <include>**/*.xml</include> + </includes> + <filtering>false</filtering> + </resource> + + <resource> + <directory>src\main\java</directory> + <includes> + <include>log4j.properties</include> + </includes> + <filtering>false</filtering> + </resource> + </resources> + </build> + + <dependencies> + + <dependency> + <groupId>com.zdjizhi</groupId> + <artifactId>galaxy</artifactId> + <version>${zdjz.tools.version}</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + <scope>${scope.type}</scope> + </dependency> + + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.12</artifactId> + <version>${flink.version}</version> + <scope>${scope.type}</scope> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.12</artifactId> + <version>${flink.version}</version> + <scope>${scope.type}</scope> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka_2.12</artifactId> + <version>${flink.version}</version> + <!--<scope>${scope.type}</scope>--> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + <scope>${scope.type}</scope> + </dependency> + + + + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <version>5.3.2</version> + <scope>compile</scope> + </dependency> + + + <dependency> + <groupId>com.jayway.jsonpath</groupId> + <artifactId>json-path</artifactId> + <version>2.4.0</version> + </dependency> + + <dependency> + <groupId>cn.hutool</groupId> + <artifactId>hutool-all</artifactId> + <version>5.7.17</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + + <!-- https://mvnrepository.com/artifact/com.alibaba.nacos/nacos-client --> + <dependency> + <groupId>com.alibaba.nacos</groupId> + <artifactId>nacos-client</artifactId> + <version>${nacos.version}</version> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.jasypt/jasypt --> + <dependency> + <groupId>org.jasypt</groupId> + <artifactId>jasypt</artifactId> + <version>1.9.3</version> + </dependency> + + <dependency> + <groupId>org.apache.datasketches</groupId> + <artifactId>datasketches-java</artifactId> + <version>3.2.0</version> + </dependency> + + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>2.0.26</version> + </dependency> + + </dependencies> +</project> + diff --git a/properties/default_config.properties b/properties/default_config.properties new file mode 100644 index 0000000..976e210 --- /dev/null +++ b/properties/default_config.properties @@ -0,0 +1,45 @@ +#====================Kafka KafkaConsumer====================# +#kafka source connection timeout +session.timeout.ms=60000 + +#kafka source poll +max.poll.records=5000 + +#kafka source poll bytes +max.partition.fetch.bytes=31457280 +#====================Kafka KafkaProducer====================# +#producer重试的次数设置 +retries=0 + +#他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了 +linger.ms=10 + +#如果在超时之前未收到响应,客户端将在必要时重新发送请求 +request.timeout.ms=30000 + +#producer都是按照batch进行发送的,批次大小,默认:16384 +batch.size=262144 + +#Producer端用于缓存消息的缓冲区大小 +#128M +buffer.memory=134217728 + +#这个参数决定了每次发送给Kafka服务器请求的最大大小 +#default: 10485760 = 10M +max.request.size=10485760 +#====================kafka default====================# +#kafka SASL/SSL username (encryption) +kafka.user=nsyGpHKGFA4KW0zro9MDdw== + +#kafka SASL/SSL pin (encryption) +kafka.pin=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ + +#producer ack +producer.ack=1 + +#两个输出之间的最大时间(单位milliseconds) +buffer.timeout=100 + + + +random.range.num=20 diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties new file mode 100644 index 0000000..3208551 --- /dev/null +++ b/properties/service_flow_config.properties @@ -0,0 +1,58 @@ +#--------------------------------地址配置------------------------------# + +#管理kafka地址 +source.kafka.servers=192.168.44.12:9094 + +#管理输出kafka地址 +sink.kafka.servers=192.168.44.12:9094 + +tools.library=D:\\workerspace\\dat\\ +#--------------------------------Kafka消费配置(session)------------------------------# + +#已关闭会话 +session.source.kafka.topic=SESSION-RECORD + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +session.group.id=session-record-app-protocol-stat-traffic-agent-2 + +#consumer 并行度 +session.source.parallelism=1 + + +#--------------------------------Kafka消费配置(interim Session)------------------------------# + +#过渡会话 +interim.session.source.kafka.topic=SESSION-RECORD + + +#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; +interim.session.group.id=interim-session-record-app-protocol-stat-traffic-agent-1 + +#consumer 并行度 +interim.session.source.parallelism=1 + + +#--------------------------------窗口配置------------------------------# +#计算窗口大小,单位为秒 +count.window.seconds=1 + +#计算窗口的并行度 +count.window.parallelism=1 + +#--------------------------------KafkaSink配置------------------------------# +#输出kafka的topic名 +sink.kafka.topic=test-1 + + + + + + +#输出kafka的压缩模式 +producer.kafka.compression.type=snappy + +#输出kafka的并行度 +sink.kafka.parallelism=1 + +#--------------------------------Metrics配置------------------------------# +metrics.name=traffic_application_protocol_stat diff --git a/src/main/java/com/zdjizhi/common/Fileds.java b/src/main/java/com/zdjizhi/common/Fileds.java new file mode 100644 index 0000000..ed290d5 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/Fileds.java @@ -0,0 +1,235 @@ +package com.zdjizhi.common; + + + +public class Fileds { + private long sessions; + private long in_bytes; + private long out_bytes; + private long in_pkts; + private long out_pkts; + private long c2s_pkts; + private long s2c_pkts; + private long c2s_bytes; + private long s2c_bytes; + private long c2s_fragments; + private long s2c_fragments; + private long c2s_tcp_lost_bytes; + private long s2c_tcp_lost_bytes; + private long c2s_tcp_ooorder_pkts; + private long s2c_tcp_ooorder_pkts; + private long c2s_tcp_retransmitted_pkts; + private long s2c_tcp_retransmitted_pkts; + private long c2s_tcp_retransmitted_bytes; + private long s2c_tcp_retransmitted_bytes; + private String client_ip_sketch; + + public Fileds(long sessions, long in_bytes, long out_bytes, long in_pkts, long out_pkts, long c2s_pkts, long s2c_pkts, long c2s_bytes, long s2c_bytes, long c2s_fragments, long s2c_fragments, long c2s_tcp_lost_bytes, long s2c_tcp_lost_bytes, long c2s_tcp_ooorder_pkts, long s2c_tcp_ooorder_pkts, long c2s_tcp_retransmitted_pkts, long s2c_tcp_retransmitted_pkts, long c2s_tcp_retransmitted_bytes, long s2c_tcp_retransmitted_bytes,String client_ip_sketch) { + this.sessions = sessions; + this.in_bytes = in_bytes; + this.out_bytes = out_bytes; + this.in_pkts = in_pkts; + this.out_pkts = out_pkts; + this.c2s_pkts = c2s_pkts; + this.s2c_pkts = s2c_pkts; + this.c2s_bytes = c2s_bytes; + this.s2c_bytes = s2c_bytes; + this.c2s_fragments = c2s_fragments; + this.s2c_fragments = s2c_fragments; + this.c2s_tcp_lost_bytes = c2s_tcp_lost_bytes; + this.s2c_tcp_lost_bytes = s2c_tcp_lost_bytes; + this.c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts; + this.s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts; + this.c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts; + this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts; + this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes; + this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes; + this.client_ip_sketch = client_ip_sketch; + } + + public long getSessions() { + return sessions; + } + + public void setSessions(long sessions) { + this.sessions = sessions; + } + + public long getIn_bytes() { + return in_bytes; + } + + public void setIn_bytes(long in_bytes) { + this.in_bytes = in_bytes; + } + + public long getOut_bytes() { + return out_bytes; + } + + public void setOut_bytes(long out_bytes) { + this.out_bytes = out_bytes; + } + + public long getIn_pkts() { + return in_pkts; + } + + public void setIn_pkts(long in_pkts) { + this.in_pkts = in_pkts; + } + + public long getOut_pkts() { + return out_pkts; + } + + public void setOut_pkts(long out_pkts) { + this.out_pkts = out_pkts; + } + + public long getC2s_pkts() { + return c2s_pkts; + } + + public void setC2s_pkts(long c2s_pkts) { + this.c2s_pkts = c2s_pkts; + } + + public long getS2c_pkts() { + return s2c_pkts; + } + + public void setS2c_pkts(long s2c_pkts) { + this.s2c_pkts = s2c_pkts; + } + + public long getC2s_bytes() { + return c2s_bytes; + } + + public void setC2s_bytes(long c2s_bytes) { + this.c2s_bytes = c2s_bytes; + } + + public long getS2c_bytes() { + return s2c_bytes; + } + + public void setS2c_bytes(long s2c_bytes) { + this.s2c_bytes = s2c_bytes; + } + + public long getC2s_fragments() { + return c2s_fragments; + } + + public void setC2s_fragments(long c2s_fragments) { + this.c2s_fragments = c2s_fragments; + } + + public long getS2c_fragments() { + return s2c_fragments; + } + + public void setS2c_fragments(long s2c_fragments) { + this.s2c_fragments = s2c_fragments; + } + + public long getC2s_tcp_lost_bytes() { + return c2s_tcp_lost_bytes; + } + + public void setC2s_tcp_lost_bytes(long c2s_tcp_lost_bytes) { + this.c2s_tcp_lost_bytes = c2s_tcp_lost_bytes; + } + + public long getS2c_tcp_lost_bytes() { + return s2c_tcp_lost_bytes; + } + + public void setS2c_tcp_lost_bytes(long s2c_tcp_lost_bytes) { + this.s2c_tcp_lost_bytes = s2c_tcp_lost_bytes; + } + + public long getC2s_tcp_ooorder_pkts() { + return c2s_tcp_ooorder_pkts; + } + + public void setC2s_tcp_ooorder_pkts(long c2s_tcp_ooorder_pkts) { + this.c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts; + } + + public long getS2c_tcp_ooorder_pkts() { + return s2c_tcp_ooorder_pkts; + } + + public void setS2c_tcp_ooorder_pkts(long s2c_tcp_ooorder_pkts) { + this.s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts; + } + + public long getC2s_tcp_retransmitted_pkts() { + return c2s_tcp_retransmitted_pkts; + } + + public void setC2s_tcp_retransmitted_pkts(long c2s_tcp_retransmitted_pkts) { + this.c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts; + } + + public long getS2c_tcp_retransmitted_pkts() { + return s2c_tcp_retransmitted_pkts; + } + + public void setS2c_tcp_retransmitted_pkts(long s2c_tcp_retransmitted_pkts) { + this.s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts; + } + + public long getC2s_tcp_retransmitted_bytes() { + return c2s_tcp_retransmitted_bytes; + } + + public void setC2s_tcp_retransmitted_bytes(long c2s_tcp_retransmitted_bytes) { + this.c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes; + } + + public long getS2c_tcp_retransmitted_bytes() { + return s2c_tcp_retransmitted_bytes; + } + + public void setS2c_tcp_retransmitted_bytes(long s2c_tcp_retransmitted_bytes) { + this.s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes; + } + + public String getClient_ip_sketch() { + return client_ip_sketch; + } + + public void setClient_ip_sketch(String client_ip_sketch) { + this.client_ip_sketch = client_ip_sketch; + } + + @Override + public String toString() { + return "Fileds{" + + "sessions=" + sessions + + ", in_bytes=" + in_bytes + + ", out_bytes=" + out_bytes + + ", in_pkts=" + in_pkts + + ", out_pkts=" + out_pkts + + ", c2s_pkts=" + c2s_pkts + + ", s2c_pkts=" + s2c_pkts + + ", c2s_bytes=" + c2s_bytes + + ", s2c_bytes=" + s2c_bytes + + ", c2s_fragments=" + c2s_fragments + + ", s2c_fragments=" + s2c_fragments + + ", c2s_tcp_lost_bytes=" + c2s_tcp_lost_bytes + + ", s2c_tcp_lost_bytes=" + s2c_tcp_lost_bytes + + ", c2s_tcp_ooorder_pkts=" + c2s_tcp_ooorder_pkts + + ", s2c_tcp_ooorder_pkts=" + s2c_tcp_ooorder_pkts + + ", c2s_tcp_retransmitted_pkts=" + c2s_tcp_retransmitted_pkts + + ", s2c_tcp_retransmitted_pkts=" + s2c_tcp_retransmitted_pkts + + ", c2s_tcp_retransmitted_bytes=" + c2s_tcp_retransmitted_bytes + + ", s2c_tcp_retransmitted_bytes=" + s2c_tcp_retransmitted_bytes + + ", client_ip_sketch='" + client_ip_sketch + '\'' + + '}'; + } +} diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java new file mode 100644 index 0000000..5889110 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -0,0 +1,100 @@ +package com.zdjizhi.common; + + +import com.zdjizhi.tools.system.FlowWriteConfigurations; +import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; + +/** + * @author Administrator + */ +public class FlowWriteConfig { + + private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); + + static { + encryptor.setPassword("galaxy"); + } + /** + * kafka common + */ + public static final String KAFKA_SASL_JAAS_USER = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.user")); + public static final String KAFKA_SASL_JAAS_PIN = encryptor.decrypt(FlowWriteConfigurations.getStringProperty(1, "kafka.pin")); + + + /** + * consumer session-record config + */ + public static final String SESSION_SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "session.source.kafka.topic"); + public static final String SESSION_GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "session.group.id"); + public static final Integer SESSION_SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "session.source.parallelism"); + + + /** + * consumer interim-session-record config + */ + public static final String INTERIM_SESSION_SOURCE_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "interim.session.source.kafka.topic"); + public static final String INTERIM_SESSION_GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "interim.session.group.id"); + public static final Integer INTERIM_SESSION_SOURCE_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "interim.session.source.parallelism"); + + + + + + + + /** + * kafka source config + */ + public static final String SESSION_TIMEOUT_MS = FlowWriteConfigurations.getStringProperty(1, "session.timeout.ms"); + public static final String MAX_POLL_RECORDS = FlowWriteConfigurations.getStringProperty(1, "max.poll.records"); + public static final String MAX_PARTITION_FETCH_BYTES = FlowWriteConfigurations.getStringProperty(1, "max.partition.fetch.bytes"); + + /** + * kafka sink config + */ + + public static final String PRODUCER_ACK = FlowWriteConfigurations.getStringProperty(1, "producer.ack"); + public static final String PRODUCER_KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "producer.kafka.compression.type"); + public static final String SINK_KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0,"sink.kafka.topic"); + public static final Integer SINK_KAFKA_PARALLELISM = FlowWriteConfigurations.getIntProperty(0,"sink.kafka.parallelism"); + + + + /** + * connection kafka + */ + public static final String RETRIES = FlowWriteConfigurations.getStringProperty(1, "retries"); + public static final String LINGER_MS = FlowWriteConfigurations.getStringProperty(1, "linger.ms"); + public static final Integer REQUEST_TIMEOUT_MS = FlowWriteConfigurations.getIntProperty(1, "request.timeout.ms"); + public static final Integer BATCH_SIZE = FlowWriteConfigurations.getIntProperty(1, "batch.size"); + public static final Integer BUFFER_MEMORY = FlowWriteConfigurations.getIntProperty(1, "buffer.memory"); + public static final Integer MAX_REQUEST_SIZE = FlowWriteConfigurations.getIntProperty(1, "max.request.size"); + + /** + * common config + */ + public static final String SOURCE_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "source.kafka.servers"); + public static final String SINK_KAFKA_SERVERS = FlowWriteConfigurations.getStringProperty(0, "sink.kafka.servers"); + public static final String TOOLS_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "tools.library"); + + + /** + * window config + */ + public static final Integer COUNT_WINDOW_SECONDS = FlowWriteConfigurations.getIntProperty(0, "count.window.seconds"); + public static final Integer COUNT_WINDOW_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "count.window.parallelism"); + + /** + * metrics config + */ + + public static final String METRICS_NAME = FlowWriteConfigurations.getStringProperty(0, "metrics.name"); + + public static final Integer RANDOM_RANGE_NUM = FlowWriteConfigurations.getIntProperty(1, "random.range.num"); + + + + + public static final Integer BUFFER_TIMEOUT = FlowWriteConfigurations.getIntProperty(1, "buffer.timeout"); + +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/InterimSessionRecord.java b/src/main/java/com/zdjizhi/common/InterimSessionRecord.java new file mode 100644 index 0000000..efbcea4 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/InterimSessionRecord.java @@ -0,0 +1,192 @@ +package com.zdjizhi.common; + +import com.alibaba.fastjson2.JSON; +import com.jayway.jsonpath.JsonPath; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +public class InterimSessionRecord { + private long common_vsys_id; + private String common_device_id; + private String common_device_tag; + private String common_protocol_label; + private String common_app_full_path; + + private String device_group; + private String data_center; + + private long common_sessions; + private long common_flags; + private String common_l7_protocol; + + + private long common_c2s_pkt_diff; + private long common_s2c_pkt_diff; + private long common_c2s_byte_diff; + private long common_s2c_byte_diff; + private String common_client_ip; + + public String getCommon_l7_protocol() { + return common_l7_protocol; + } + + public void setCommon_l7_protocol(String common_l7_protocol) { + this.common_l7_protocol = common_l7_protocol; + } + + public long getCommon_vsys_id() { + return common_vsys_id; + } + + public void setCommon_vsys_id(long common_vsys_id) { + this.common_vsys_id = common_vsys_id; + } + + public String getCommon_device_id() { + return common_device_id; + } + + public void setCommon_device_id(String common_device_id) { + this.common_device_id = common_device_id; + } + + public String getCommon_device_tag() { + return common_device_tag; + } + + public void setCommon_device_tag(String common_device_tag) { + this.common_device_tag = common_device_tag; + } + + public String getCommon_protocol_label() { + return common_protocol_label; + } + + public void setCommon_protocol_label(String common_protocol_label) { + this.common_protocol_label = common_protocol_label; + } + + public String getCommon_app_full_path() { + return common_app_full_path; + } + + public void setCommon_app_full_path(String common_app_full_path) { + this.common_app_full_path = common_app_full_path; + } + + public String getDevice_group() { + return device_group; + } + + public void setDevice_group(String device_group) { + this.device_group = device_group; + } + + public String getData_center() { + return data_center; + } + + public void setData_center(String data_center) { + this.data_center = data_center; + } + + public long getCommon_sessions() { + return common_sessions; + } + + public void setCommon_sessions(long common_sessions) { + this.common_sessions = common_sessions; + } + + public long getCommon_flags() { + return common_flags; + } + + public void setCommon_flags(long common_flags) { + this.common_flags = common_flags; + } + + public long getCommon_c2s_pkt_diff() { + return common_c2s_pkt_diff; + } + + public void setCommon_c2s_pkt_diff(long common_c2s_pkt_diff) { + this.common_c2s_pkt_diff = common_c2s_pkt_diff; + } + + public long getCommon_s2c_pkt_diff() { + return common_s2c_pkt_diff; + } + + public void setCommon_s2c_pkt_diff(long common_s2c_pkt_diff) { + this.common_s2c_pkt_diff = common_s2c_pkt_diff; + } + + public long getCommon_c2s_byte_diff() { + return common_c2s_byte_diff; + } + + public void setCommon_c2s_byte_diff(long common_c2s_byte_diff) { + this.common_c2s_byte_diff = common_c2s_byte_diff; + } + + public long getCommon_s2c_byte_diff() { + return common_s2c_byte_diff; + } + + public void setCommon_s2c_byte_diff(long common_s2c_byte_diff) { + this.common_s2c_byte_diff = common_s2c_byte_diff; + } + + public String getCommon_client_ip() { + return common_client_ip; + } + + public void setCommon_client_ip(String common_client_ip) { + this.common_client_ip = common_client_ip; + } + + public String getInterimSessionTags() { + Map<String, Object> tagsMap = new HashMap<String, Object>(); + + if (common_vsys_id == 0) { + common_vsys_id = 1; + } + tagsMap.put("vsys_id", common_vsys_id); + tagsMap.put("device_id", common_device_id); + tagsMap.put("common_device_tag", common_device_tag); + tagsMap.put("protocol_label", common_protocol_label); + tagsMap.put("app_full_path", common_protocol_label + "." + common_l7_protocol); + tagsMap.put("randomNum",ThreadLocalRandom.current().nextInt(FlowWriteConfig.RANDOM_RANGE_NUM)); + + return JSON.toJSONString(tagsMap); + } + + + public Fileds getInterimSessionFileds() { + long out_bytes; + long in_bytes; + long in_pkts; + long out_pkts; + Long clientIsLocal = 8L; + if ((common_flags & clientIsLocal) == 8L) { + in_bytes = common_s2c_byte_diff; + out_bytes = common_c2s_byte_diff; + in_pkts = common_s2c_pkt_diff; + out_pkts = common_c2s_pkt_diff; + } else { + in_bytes = common_c2s_byte_diff; + out_bytes = common_s2c_byte_diff; + in_pkts = common_c2s_pkt_diff; + out_pkts = common_s2c_pkt_diff; + } + return new Fileds(common_sessions, in_bytes, out_bytes, in_pkts, out_pkts, common_c2s_pkt_diff, common_s2c_pkt_diff, common_c2s_byte_diff, common_s2c_byte_diff, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, ""); + } + + +} + diff --git a/src/main/java/com/zdjizhi/common/Metrics.java b/src/main/java/com/zdjizhi/common/Metrics.java new file mode 100644 index 0000000..17c9a61 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/Metrics.java @@ -0,0 +1,45 @@ +package com.zdjizhi.common; + +import java.util.Map; + +public class Metrics { + private final static String name = FlowWriteConfig.METRICS_NAME; + private Map<String, Object> tags; + private Fileds fields; + private long timestamp; + + public Metrics( Map<String, Object> tags, Fileds fields, long timestamp) { + this.tags = tags; + this.fields = fields; + this.timestamp = timestamp; + } + + public String getName() { + return name; + } + + public Map<String, Object> getTags() { + return tags; + } + + public void setTags( Map<String, Object> tags) { + this.tags = tags; + } + + public Fileds getFields() { + return fields; + } + + public void setFields(Fileds fields) { + this.fields = fields; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + +} diff --git a/src/main/java/com/zdjizhi/common/SessionRecord.java b/src/main/java/com/zdjizhi/common/SessionRecord.java new file mode 100644 index 0000000..6fa9a52 --- /dev/null +++ b/src/main/java/com/zdjizhi/common/SessionRecord.java @@ -0,0 +1,316 @@ +package com.zdjizhi.common; + +import com.alibaba.fastjson2.JSON; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +public class SessionRecord { + private long common_vsys_id; + private String common_device_id; + private String common_device_tag; + private String common_protocol_label; + private String common_app_full_path; + + private String device_group; + private String data_center; + + private long common_sessions; + private long common_flags; + private String common_l7_protocol; + + + private long common_c2s_pkt_diff; + private long common_s2c_pkt_diff; + private long common_c2s_byte_diff; + private long common_s2c_byte_diff; + private long common_c2s_pkt_num; + private long common_s2c_pkt_num; + private long common_c2s_byte_num; + private long common_s2c_byte_num; + private long common_c2s_ipfrag_num; + private long common_s2c_ipfrag_num; + private long common_c2s_tcp_lostlen; + private long common_s2c_tcp_lostlen; + private long common_c2s_tcp_unorder_num; + private long common_s2c_tcp_unorder_num; + private long common_c2s_pkt_retrans; + private long common_s2c_pkt_retrans; + private long common_c2s_byte_retrans; + private long common_s2c_byte_retrans; + private String common_client_ip; + + public long getCommon_vsys_id() { + return common_vsys_id; + } + + public void setCommon_vsys_id(long common_vsys_id) { + + this.common_vsys_id = common_vsys_id; + } + + public String getCommon_device_id() { + return common_device_id; + } + + public void setCommon_device_id(String common_device_id) { + this.common_device_id = common_device_id; + } + + public String getCommon_device_tag() { + return common_device_tag; + } + + public void setCommon_device_tag(String common_device_tag) { + this.common_device_tag = common_device_tag; + } + + public String getCommon_protocol_label() { + return common_protocol_label; + } + + public void setCommon_protocol_label(String common_protocol_label) { + this.common_protocol_label = common_protocol_label; + } + + public String getCommon_app_full_path() { + return common_app_full_path; + } + + public void setCommon_app_full_path(String common_app_full_path) { + this.common_app_full_path = common_app_full_path; + } + + public long getCommon_sessions() { + return common_sessions; + } + + public void setCommon_sessions(long common_sessions) { + this.common_sessions = common_sessions; + } + + public long getCommon_flags() { + return common_flags; + } + + public void setCommon_flags(long common_flags) { + this.common_flags = common_flags; + } + + public long getCommon_c2s_pkt_num() { + return common_c2s_pkt_num; + } + + public void setCommon_c2s_pkt_num(long common_c2s_pkt_num) { + this.common_c2s_pkt_num = common_c2s_pkt_num; + } + + public long getCommon_s2c_pkt_num() { + return common_s2c_pkt_num; + } + + public void setCommon_s2c_pkt_num(long common_s2c_pkt_num) { + this.common_s2c_pkt_num = common_s2c_pkt_num; + } + + public long getCommon_c2s_byte_num() { + return common_c2s_byte_num; + } + + public void setCommon_c2s_byte_num(long common_c2s_byte_num) { + this.common_c2s_byte_num = common_c2s_byte_num; + } + + public long getCommon_s2c_byte_num() { + return common_s2c_byte_num; + } + + public void setCommon_s2c_byte_num(long common_s2c_byte_num) { + this.common_s2c_byte_num = common_s2c_byte_num; + } + + public long getCommon_c2s_ipfrag_num() { + return common_c2s_ipfrag_num; + } + + public void setCommon_c2s_ipfrag_num(long common_c2s_ipfrag_num) { + this.common_c2s_ipfrag_num = common_c2s_ipfrag_num; + } + + public long getCommon_s2c_ipfrag_num() { + return common_s2c_ipfrag_num; + } + + public void setCommon_s2c_ipfrag_num(long common_s2c_ipfrag_num) { + this.common_s2c_ipfrag_num = common_s2c_ipfrag_num; + } + + public long getCommon_c2s_tcp_lostlen() { + return common_c2s_tcp_lostlen; + } + + public void setCommon_c2s_tcp_lostlen(long common_c2s_tcp_lostlen) { + this.common_c2s_tcp_lostlen = common_c2s_tcp_lostlen; + } + + public long getCommon_s2c_tcp_lostlen() { + return common_s2c_tcp_lostlen; + } + + public void setCommon_s2c_tcp_lostlen(long common_s2c_tcp_lostlen) { + this.common_s2c_tcp_lostlen = common_s2c_tcp_lostlen; + } + + public long getCommon_c2s_tcp_unorder_num() { + return common_c2s_tcp_unorder_num; + } + + public void setCommon_c2s_tcp_unorder_num(long common_c2s_tcp_unorder_num) { + this.common_c2s_tcp_unorder_num = common_c2s_tcp_unorder_num; + } + + public long getCommon_s2c_tcp_unorder_num() { + return common_s2c_tcp_unorder_num; + } + + public void setCommon_s2c_tcp_unorder_num(long common_s2c_tcp_unorder_num) { + this.common_s2c_tcp_unorder_num = common_s2c_tcp_unorder_num; + } + + public long getCommon_c2s_pkt_retrans() { + return common_c2s_pkt_retrans; + } + + public void setCommon_c2s_pkt_retrans(long common_c2s_pkt_retrans) { + this.common_c2s_pkt_retrans = common_c2s_pkt_retrans; + } + + public long getCommon_s2c_pkt_retrans() { + return common_s2c_pkt_retrans; + } + + public void setCommon_s2c_pkt_retrans(long common_s2c_pkt_retrans) { + this.common_s2c_pkt_retrans = common_s2c_pkt_retrans; + } + + public long getCommon_c2s_byte_retrans() { + return common_c2s_byte_retrans; + } + + public void setCommon_c2s_byte_retrans(long common_c2s_byte_retrans) { + this.common_c2s_byte_retrans = common_c2s_byte_retrans; + } + + public long getCommon_s2c_byte_retrans() { + return common_s2c_byte_retrans; + } + + public void setCommon_s2c_byte_retrans(long common_s2c_byte_retrans) { + this.common_s2c_byte_retrans = common_s2c_byte_retrans; + } + + public String getCommon_client_ip() { + return common_client_ip; + } + + public void setCommon_client_ip(String common_client_ip) { + this.common_client_ip = common_client_ip; + } + + public String getDevice_group() { + return device_group; + } + + public void setDevice_group(String device_group) { + this.device_group = device_group; + } + + public String getData_center() { + return data_center; + } + + public void setData_center(String data_center) { + this.data_center = data_center; + } + + public long getCommon_c2s_pkt_diff() { + return common_c2s_pkt_diff; + } + + public void setCommon_c2s_pkt_diff(long common_c2s_pkt_diff) { + this.common_c2s_pkt_diff = common_c2s_pkt_diff; + } + + public long getCommon_s2c_pkt_diff() { + return common_s2c_pkt_diff; + } + + public void setCommon_s2c_pkt_diff(long common_s2c_pkt_diff) { + this.common_s2c_pkt_diff = common_s2c_pkt_diff; + } + + public long getCommon_c2s_byte_diff() { + return common_c2s_byte_diff; + } + + public void setCommon_c2s_byte_diff(long common_c2s_byte_diff) { + this.common_c2s_byte_diff = common_c2s_byte_diff; + } + + public long getCommon_s2c_byte_diff() { + return common_s2c_byte_diff; + } + + public void setCommon_s2c_byte_diff(long common_s2c_byte_diff) { + this.common_s2c_byte_diff = common_s2c_byte_diff; + } + + public String getCommon_l7_protocol() { + return common_l7_protocol; + } + + public void setCommon_l7_protocol(String common_l7_protocol) { + this.common_l7_protocol = common_l7_protocol; + } + + public String getSessionTags() { + Map<String, Object> tagsMap = new HashMap<String, Object>(); + + if (common_vsys_id == 0) { + common_vsys_id = 1; + } + tagsMap.put("vsys_id", common_vsys_id); + tagsMap.put("device_id", common_device_id); + tagsMap.put("common_device_tag", common_device_tag); + tagsMap.put("protocol_label", common_protocol_label); + tagsMap.put("app_full_path", common_protocol_label + "." + common_l7_protocol); + tagsMap.put("randomNum",ThreadLocalRandom.current().nextInt(FlowWriteConfig.RANDOM_RANGE_NUM)); + + return JSON.toJSONString(tagsMap); + } + + + public Fileds getSessionFileds() { + long out_bytes; + long in_bytes; + long in_pkts; + long out_pkts; + Long clientIsLocal = 8L; + if ((common_flags & clientIsLocal) == 8L) { + in_bytes = common_s2c_byte_diff; + out_bytes = common_c2s_byte_diff; + in_pkts = common_s2c_pkt_diff; + out_pkts = common_c2s_pkt_diff; + } else { + in_bytes = common_c2s_byte_diff; + out_bytes = common_s2c_byte_diff; + in_pkts = common_c2s_pkt_diff; + out_pkts = common_s2c_pkt_diff; + } + return new Fileds(common_sessions, in_bytes, out_bytes, in_pkts, out_pkts, common_c2s_pkt_diff, common_s2c_pkt_diff, common_c2s_byte_diff, common_s2c_byte_diff, common_c2s_ipfrag_num, common_s2c_ipfrag_num, common_c2s_tcp_lostlen, common_s2c_tcp_lostlen, common_c2s_tcp_unorder_num, common_s2c_tcp_unorder_num, + common_c2s_pkt_retrans, common_s2c_pkt_retrans, common_c2s_byte_retrans, common_s2c_byte_retrans, ""); + } + +} + diff --git a/src/main/java/com/zdjizhi/common/Tags.java b/src/main/java/com/zdjizhi/common/Tags.java new file mode 100644 index 0000000..0a6d40c --- /dev/null +++ b/src/main/java/com/zdjizhi/common/Tags.java @@ -0,0 +1,68 @@ +package com.zdjizhi.common; + +public class Tags { + + private long vsys_id; + private String device_id; + private String device_group; + private String data_center; + private String protocol_label; + private String app_full_path; + + public Tags(long vsys_id, String device_id, String device_group, String data_center, String protocol_label, String app_full_path) { + this.vsys_id = vsys_id; + this.device_id = device_id; + this.device_group = device_group; + this.data_center = data_center; + this.protocol_label = protocol_label; + this.app_full_path = app_full_path; + } + + public long getVsys_id() { + return vsys_id; + } + + public void setVsys_id(long vsys_id) { + this.vsys_id = vsys_id; + } + + public String getDevice_id() { + return device_id; + } + + public void setDevice_id(String device_id) { + this.device_id = device_id; + } + + public String getDevice_group() { + return device_group; + } + + public void setDevice_group(String device_group) { + this.device_group = device_group; + } + + public String getData_center() { + return data_center; + } + + public void setData_center(String data_center) { + this.data_center = data_center; + } + + public String getProtocol_label() { + return protocol_label; + } + + public void setProtocol_label(String protocol_label) { + this.protocol_label = protocol_label; + } + + public String getApp_full_path() { + return app_full_path; + } + + public void setApp_full_path(String app_full_path) { + this.app_full_path = app_full_path; + } +} diff --git a/src/main/java/com/zdjizhi/tools/function/CountWindowProcess.java b/src/main/java/com/zdjizhi/tools/function/CountWindowProcess.java new file mode 100644 index 0000000..92f0b57 --- /dev/null +++ b/src/main/java/com/zdjizhi/tools/function/CountWindowProcess.java @@ -0,0 +1,147 @@ +package com.zdjizhi.tools.function; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSON; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.Fileds; +import com.zdjizhi.common.Metrics; + +import com.zdjizhi.utils.StringUtil; +import org.apache.datasketches.hll.HllSketch; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + + +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; + +public class CountWindowProcess extends ProcessWindowFunction<Tuple3<String, Fileds, String>, String, String, TimeWindow> { + private static final Log logger = LogFactory.get(); + + private long sessions; + private long in_bytes; + private long out_bytes; + private long in_pkts; + private long out_pkts; + private long c2s_pkts; + private long s2c_pkts; + private long c2s_bytes; + private long s2c_bytes; + private long c2s_fragments; + private long s2c_fragments; + private long c2s_tcp_lost_bytes; + private long s2c_tcp_lost_bytes; + private long c2s_tcp_ooorder_pkts; + private long s2c_tcp_ooorder_pkts; + private long c2s_tcp_retransmitted_pkts; + private long s2c_tcp_retransmitted_pkts; + private long c2s_tcp_retransmitted_bytes; + private long s2c_tcp_retransmitted_bytes; + private String device_group; + private String data_center; + private ArrayList<Object> read; + private String common_device_tag; + + + private String dataCenterExpr = "$.tags[?(@.tag=='data_center')].value"; + private String dataGroupExpr = "$.tags[?(@.tag=='device_group')].value"; + + private String client_ip_sketch; + private Map<String, Object> cacheMap = new HashMap<>(20); + + @Override + public void process(String key, Context context, Iterable<Tuple3<String, Fileds, String>> iterable, Collector<String> collector) throws Exception { + if (StringUtil.isNotBlank(key)) { + try { + HllSketch hllSketch = new HllSketch(12); + for (Tuple3<String, Fileds, String> record : iterable) { + sessions = sessions + record.f1.getSessions(); + + in_bytes = in_bytes + record.f1.getIn_bytes(); + out_bytes = out_pkts + record.f1.getOut_bytes(); + + in_pkts = in_pkts + record.f1.getIn_pkts(); + out_pkts = out_pkts + record.f1.getOut_pkts(); + + c2s_pkts = c2s_pkts + record.f1.getC2s_pkts(); + s2c_pkts = s2c_pkts + record.f1.getS2c_pkts(); + + c2s_bytes = c2s_bytes + record.f1.getC2s_bytes(); + s2c_bytes = s2c_bytes + record.f1.getS2c_bytes(); + + c2s_fragments = c2s_fragments + record.f1.getC2s_fragments(); + s2c_fragments = s2c_fragments + record.f1.getS2c_fragments(); + + c2s_tcp_lost_bytes = c2s_tcp_lost_bytes + record.f1.getC2s_tcp_lost_bytes(); + s2c_tcp_lost_bytes = s2c_tcp_lost_bytes + record.f1.getS2c_tcp_lost_bytes(); + + c2s_tcp_ooorder_pkts = c2s_tcp_ooorder_pkts + record.f1.getC2s_tcp_ooorder_pkts(); + s2c_tcp_ooorder_pkts = s2c_tcp_ooorder_pkts + record.f1.getS2c_tcp_ooorder_pkts(); + + c2s_tcp_retransmitted_pkts = c2s_tcp_retransmitted_pkts + record.f1.getC2s_tcp_retransmitted_pkts(); + s2c_tcp_retransmitted_pkts = s2c_tcp_retransmitted_pkts + record.f1.getS2c_tcp_retransmitted_pkts(); + + c2s_tcp_retransmitted_bytes = c2s_tcp_retransmitted_bytes + record.f1.getC2s_tcp_retransmitted_bytes(); + s2c_tcp_retransmitted_bytes = s2c_tcp_retransmitted_bytes + record.f1.getS2c_tcp_retransmitted_bytes(); + + hllSketch.update(record.f2); + } + + client_ip_sketch = Base64.getEncoder().encodeToString(hllSketch.toUpdatableByteArray()); + Fileds fileds = new Fileds(sessions, in_bytes, out_bytes, in_pkts, out_pkts, c2s_pkts, s2c_pkts, c2s_bytes, s2c_bytes, c2s_fragments, s2c_fragments, c2s_tcp_lost_bytes, s2c_tcp_lost_bytes, c2s_tcp_ooorder_pkts, s2c_tcp_ooorder_pkts, c2s_tcp_retransmitted_pkts, s2c_tcp_retransmitted_pkts, c2s_tcp_retransmitted_bytes, s2c_tcp_retransmitted_bytes, client_ip_sketch); + + + + //解析device_tag获得data_center、device_group + Map keyMap = JSON.parseObject(key, Map.class); + common_device_tag = (String) keyMap.remove("common_device_tag"); + + if (common_device_tag != null) { + DocumentContext parse = JsonPath.parse(common_device_tag); + read = parse.read(dataCenterExpr); + if (read.size() >= 1) { + data_center = read.get(0).toString(); + } + read = parse.read(dataGroupExpr); + if (read.size() >= 1) { + device_group = read.get(0).toString(); + } + } + + keyMap.put("device_group", device_group); + keyMap.put("data_center", data_center); + keyMap.remove("randomNum"); + + collector.collect(JSON.toJSONString(new Metrics(keyMap, fileds, context.window().getEnd() / 1000))); + sessions = 0; + in_bytes = 0; + out_bytes = 0; + in_pkts = 0; + out_pkts = 0; + c2s_pkts = 0; + s2c_pkts = 0; + c2s_bytes = 0; + s2c_bytes = 0; + c2s_fragments = 0; + s2c_fragments = 0; + c2s_tcp_lost_bytes = 0; + s2c_tcp_lost_bytes = 0; + c2s_tcp_ooorder_pkts = 0; + s2c_tcp_ooorder_pkts = 0; + c2s_tcp_retransmitted_pkts = 0; + s2c_tcp_retransmitted_pkts = 0; + c2s_tcp_retransmitted_bytes = 0; + s2c_tcp_retransmitted_bytes = 0; + + } catch (Exception e) { + logger.error("windows count error,message:" + e); + } + } + } +} diff --git a/src/main/java/com/zdjizhi/tools/function/KeyByFunction.java b/src/main/java/com/zdjizhi/tools/function/KeyByFunction.java new file mode 100644 index 0000000..49e7a7c --- /dev/null +++ b/src/main/java/com/zdjizhi/tools/function/KeyByFunction.java @@ -0,0 +1,13 @@ +package com.zdjizhi.tools.function; + +import com.zdjizhi.common.Fileds; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple3; + +public class KeyByFunction implements KeySelector<Tuple3<String, Fileds, String>, String> { + @Override + public String getKey(Tuple3<String, Fileds, String> value) throws Exception { + + return value.f0; + } +} diff --git a/src/main/java/com/zdjizhi/tools/function/ParseInterimSessionFunction.java b/src/main/java/com/zdjizhi/tools/function/ParseInterimSessionFunction.java new file mode 100644 index 0000000..8d273b0 --- /dev/null +++ b/src/main/java/com/zdjizhi/tools/function/ParseInterimSessionFunction.java @@ -0,0 +1,50 @@ +package com.zdjizhi.tools.function; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSON; +import com.zdjizhi.common.Fileds; +import com.zdjizhi.common.InterimSessionRecord; +import com.zdjizhi.common.SessionRecord; +import com.zdjizhi.common.Tags; +import com.zdjizhi.utils.StringUtil; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple3; + +import java.util.Map; + +/** + * @author wangchengcheng + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2023/04/20 + */ +public class ParseInterimSessionFunction implements MapFunction<String, Tuple3<String, Fileds, String>> { + private static final Log logger = LogFactory.get(); + private InterimSessionRecord interimsessionRecord; + private String tags=""; + private Fileds sessionFileds; + + @Override + public Tuple3<String, Fileds, String> map(String message) throws Exception { + + try { + if (StringUtil.isNotBlank(message)) { + interimsessionRecord = JSON.parseObject(message, InterimSessionRecord.class); + //过滤common_protocol_label不为空的数据 + if (StringUtil.isNotBlank(interimsessionRecord.getCommon_protocol_label())) { + tags = interimsessionRecord.getInterimSessionTags(); + sessionFileds = interimsessionRecord.getInterimSessionFileds(); + return new Tuple3<>(tags, sessionFileds, interimsessionRecord.getCommon_client_ip()); + }else { + return new Tuple3<>("", sessionFileds, ""); + } + }else { + return new Tuple3<>("", sessionFileds, ""); + } + } catch (RuntimeException e) { + logger.error("An error occurred in the interim-session-record parsing,error message is:" + e + ",The original log is" + message); + return new Tuple3<>("", sessionFileds, ""); + } + } +} diff --git a/src/main/java/com/zdjizhi/tools/function/ParseSessionFunction.java b/src/main/java/com/zdjizhi/tools/function/ParseSessionFunction.java new file mode 100644 index 0000000..01b0826 --- /dev/null +++ b/src/main/java/com/zdjizhi/tools/function/ParseSessionFunction.java @@ -0,0 +1,51 @@ +package com.zdjizhi.tools.function; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSON; +import com.zdjizhi.common.Fileds; +import com.zdjizhi.common.SessionRecord; +import com.zdjizhi.common.Tags; +import com.zdjizhi.utils.StringUtil; +import org.apache.flink.api.common.functions.MapFunction; + +import org.apache.flink.api.java.tuple.Tuple3; + +import java.util.Map; + +/** + * @author wangchengcheng + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2023/04/20 + */ +public class ParseSessionFunction implements MapFunction<String, Tuple3<String, Fileds, String>> { + private static final Log logger = LogFactory.get(); + private SessionRecord sessionRecord; + private String tags; + private Fileds sessionFileds; + + @Override + public Tuple3<String, Fileds, String> map(String message) { + + try { + if (StringUtil.isNotBlank(message)) { + sessionRecord = JSON.parseObject(message, SessionRecord.class); + //过滤common_protocol_label不为空的数据 + if (StringUtil.isNotBlank(sessionRecord.getCommon_protocol_label())) { + tags = sessionRecord.getSessionTags(); + sessionFileds = sessionRecord.getSessionFileds(); + return new Tuple3<>(tags, sessionFileds, sessionRecord.getCommon_client_ip()); + }else { + return new Tuple3<>("", sessionFileds, ""); + } + }else { + return new Tuple3<>("", sessionFileds, ""); + } + + } catch (Exception e) { + logger.error("An error occurred in the session-record parsing,error message is:" + e + ",The original log is" + message); + return new Tuple3<>("", sessionFileds, ""); + } + } +} diff --git a/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java b/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java new file mode 100644 index 0000000..ad93f29 --- /dev/null +++ b/src/main/java/com/zdjizhi/tools/kafka/CertUtils.java @@ -0,0 +1,48 @@ +package com.zdjizhi.tools.kafka; + +import com.zdjizhi.common.FlowWriteConfig; +import org.apache.kafka.common.config.SslConfigs; + +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/9/610:37 + */ +class CertUtils { + /** + * Kafka SASL认证端口 + */ + private static final String SASL_PORT = "9094"; + + /** + * Kafka SSL认证端口 + */ + private static final String SSL_PORT = "9095"; + + /** + * 根据连接信息端口判断认证方式。 + * + * @param servers kafka 连接信息 + * @param properties kafka 连接配置信息 + */ + static void chooseCert(String servers, Properties properties) { + if (servers.contains(SASL_PORT)) { + properties.put("security.protocol", "SASL_PLAINTEXT"); + properties.put("sasl.mechanism", "PLAIN"); + properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + + FlowWriteConfig.KAFKA_SASL_JAAS_USER + " password=" + FlowWriteConfig.KAFKA_SASL_JAAS_PIN + ";"); + } else if (servers.contains(SSL_PORT)) { + properties.put("security.protocol", "SSL"); + properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + properties.put("ssl.keystore.location", FlowWriteConfig.TOOLS_LIBRARY + "keystore.jks"); + properties.put("ssl.keystore.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN); + properties.put("ssl.truststore.location", FlowWriteConfig.TOOLS_LIBRARY + "truststore.jks"); + properties.put("ssl.truststore.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN); + properties.put("ssl.key.password", FlowWriteConfig.KAFKA_SASL_JAAS_PIN); + } + + } +} diff --git a/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java new file mode 100644 index 0000000..d0fa01f --- /dev/null +++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaConsumer.java @@ -0,0 +1,49 @@ +package com.zdjizhi.tools.kafka; + +import com.zdjizhi.common.FlowWriteConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; + + + +import java.util.Properties; + +/** + * @author wangchengcheng + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2023/04/19 + */ +public class KafkaConsumer { + + + //消费session-record配置 + private static Properties createConsumerConfig(String groupId) { + Properties properties = new Properties(); + properties.put("bootstrap.servers", FlowWriteConfig.SOURCE_KAFKA_SERVERS); + properties.put("group.id", groupId); + properties.put("session.timeout.ms", FlowWriteConfig.SESSION_TIMEOUT_MS); + properties.put("max.poll.records", FlowWriteConfig.MAX_POLL_RECORDS); + properties.put("max.partition.fetch.bytes", FlowWriteConfig.MAX_PARTITION_FETCH_BYTES); + properties.put("partition.discovery.interval.ms", "10000"); + CertUtils.chooseCert(FlowWriteConfig.SOURCE_KAFKA_SERVERS, properties); + + return properties; + } + + + public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic,String groupId) { + FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, + new SimpleStringSchema(), createConsumerConfig(groupId)); + + //随着checkpoint提交,将offset提交到kafka + kafkaConsumer.setCommitOffsetsOnCheckpoints(true); + + //从消费组当前的offset开始消费 + kafkaConsumer.setStartFromGroupOffsets(); + + return kafkaConsumer; + } + + +} diff --git a/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java new file mode 100644 index 0000000..ee31bc2 --- /dev/null +++ b/src/main/java/com/zdjizhi/tools/kafka/KafkaProducer.java @@ -0,0 +1,50 @@ +package com.zdjizhi.tools.kafka; + +import com.zdjizhi.common.FlowWriteConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; + +import java.util.Optional; +import java.util.Properties; + +/** + * @author qidaijie + * @Package com.zdjizhi.utils.kafka + * @Description: + * @date 2021/6/814:04 + */ +public class KafkaProducer { + + private static Properties createProducerConfig() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", FlowWriteConfig.SINK_KAFKA_SERVERS); + properties.put("acks", FlowWriteConfig.PRODUCER_ACK); + properties.put("retries", FlowWriteConfig.RETRIES); + properties.put("linger.ms", FlowWriteConfig.LINGER_MS); + properties.put("request.timeout.ms", FlowWriteConfig.REQUEST_TIMEOUT_MS); + properties.put("batch.size", FlowWriteConfig.BATCH_SIZE); + properties.put("buffer.memory", FlowWriteConfig.BUFFER_MEMORY); + properties.put("max.request.size", FlowWriteConfig.MAX_REQUEST_SIZE); + properties.put("compression.type", FlowWriteConfig.PRODUCER_KAFKA_COMPRESSION_TYPE); + + CertUtils.chooseCert(FlowWriteConfig.SINK_KAFKA_SERVERS, properties); + + return properties; + } + + + public static FlinkKafkaProducer<String> getKafkaProducer() { + FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>( + FlowWriteConfig.SINK_KAFKA_TOPIC, + new SimpleStringSchema(), + createProducerConfig(), + //sink与所有分区建立连接,轮询写入; + Optional.empty()); + + //允许producer记录失败日志而不是捕获和抛出它们 + kafkaProducer.setLogFailuresOnly(true); + + return kafkaProducer; + } + +} diff --git a/src/main/java/com/zdjizhi/tools/system/FlowWriteConfigurations.java b/src/main/java/com/zdjizhi/tools/system/FlowWriteConfigurations.java new file mode 100644 index 0000000..d429def --- /dev/null +++ b/src/main/java/com/zdjizhi/tools/system/FlowWriteConfigurations.java @@ -0,0 +1,69 @@ +package com.zdjizhi.tools.system; + +import com.zdjizhi.utils.StringUtil; + +import java.io.IOException; +import java.util.Locale; +import java.util.Properties; + + +/** + * @author Administrator + */ + +public final class FlowWriteConfigurations { + + private static Properties propDefault = new Properties(); + private static Properties propService = new Properties(); + + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propService.getProperty(key).trim(); + } else if (type == 1) { + return propDefault.getProperty(key).trim(); + } else { + return null; + } + } + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propService.getProperty(key).trim()); + } else if (type == 1) { + return Integer.parseInt(propDefault.getProperty(key).trim()); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propService.getProperty(key).trim()); + } else if (type == 1) { + return Long.parseLong(propDefault.getProperty(key).trim()); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return StringUtil.equals(propService.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else if (type == 1) { + return StringUtil.equals(propDefault.getProperty(key).toLowerCase().trim().toUpperCase(Locale.ENGLISH), "true"); + } else { + return null; + } + } + + static { + try { + propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties")); + propDefault.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("default_config.properties")); + } catch (IOException | RuntimeException e) { + propDefault = null; + propService = null; + } + } +} diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java new file mode 100644 index 0000000..170755f --- /dev/null +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -0,0 +1,75 @@ +package com.zdjizhi.topology; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.Fileds; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.tools.function.CountWindowProcess; +import com.zdjizhi.tools.function.KeyByFunction; +import com.zdjizhi.tools.function.ParseInterimSessionFunction; +import com.zdjizhi.tools.function.ParseSessionFunction; +import com.zdjizhi.tools.kafka.KafkaConsumer; + +import com.zdjizhi.tools.kafka.KafkaProducer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.WindowedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + + + + +/** + * @author wangchengcheng + * @Package com.zdjizhi.topology + * @Description: + * @date 2023/04/19 + */ +public class LogFlowWriteTopology { + private static final Log logger = LogFactory.get(); + + public static void main(String[] args) { + final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + environment.setBufferTimeout(FlowWriteConfig.BUFFER_TIMEOUT); + + //消费、清洗会话日志 + SingleOutputStreamOperator<String> sessionSource = environment.addSource(KafkaConsumer.getKafkaConsumer(FlowWriteConfig.SESSION_SOURCE_KAFKA_TOPIC,FlowWriteConfig.SESSION_GROUP_ID)) + .setParallelism(FlowWriteConfig.SESSION_SOURCE_PARALLELISM).name(FlowWriteConfig.SESSION_SOURCE_KAFKA_TOPIC); + + SingleOutputStreamOperator<Tuple3<String, Fileds, String>> sessionParseMap = sessionSource.map(new ParseSessionFunction()) + .setParallelism(FlowWriteConfig.SESSION_SOURCE_PARALLELISM).name("sessionParseMap"); + + + //消费、清洗过渡会话 + SingleOutputStreamOperator<String> interimSessionSource = environment.addSource(KafkaConsumer.getKafkaConsumer(FlowWriteConfig.INTERIM_SESSION_SOURCE_KAFKA_TOPIC,FlowWriteConfig.INTERIM_SESSION_GROUP_ID)) + .setParallelism(FlowWriteConfig.INTERIM_SESSION_SOURCE_PARALLELISM).name(FlowWriteConfig.INTERIM_SESSION_SOURCE_KAFKA_TOPIC); + + SingleOutputStreamOperator<Tuple3<String, Fileds, String>> interimSessionParseMap = interimSessionSource.map(new ParseInterimSessionFunction()) + .setParallelism(FlowWriteConfig.INTERIM_SESSION_SOURCE_PARALLELISM).name("interimSessionParseMap"); + + + + //将会话、过渡会话汇聚并按照key分组后进入滑动窗口 + WindowedStream<Tuple3<String, Fileds, String>, String, TimeWindow> window = sessionParseMap.union(interimSessionParseMap).keyBy(new KeyByFunction()) + .window(TumblingProcessingTimeWindows.of(Time.seconds(FlowWriteConfig.COUNT_WINDOW_SECONDS))); + + SingleOutputStreamOperator<String> countWindow = window.process(new CountWindowProcess()).setParallelism(FlowWriteConfig.COUNT_WINDOW_PARALLELISM).name("countWindow"); + + countWindow.addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka") + .setParallelism(FlowWriteConfig.SINK_KAFKA_PARALLELISM).name(FlowWriteConfig.SINK_KAFKA_TOPIC); + + try { + environment.execute(args[0]); + } catch (Exception e) { + logger.error("This Flink task start ERROR! Exception information is :" + e); + e.printStackTrace(); + } + + } + + +} diff --git a/src/main/log4j.properties b/src/main/log4j.properties new file mode 100644 index 0000000..facffc7 --- /dev/null +++ b/src/main/log4j.properties @@ -0,0 +1,25 @@ +#Log4j +log4j.rootLogger=info,console,file +# 控制台日志设置 +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=info +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n + +# 文件日志设置 +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.Threshold=info +log4j.appender.file.encoding=UTF-8 +log4j.appender.file.Append=true +#路径请用相对路径,做好相关测试输出到应用目下 +log4j.appender.file.file=${nis.root}/log/galaxy-name.log +log4j.appender.file.DatePattern='.'yyyy-MM-dd +log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n +log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n +#MyBatis 配置,com.nis.web.dao是mybatis接口所在包 +log4j.logger.com.nis.web.dao=info +#bonecp数据源配置 +log4j.category.com.jolbox=info,console + + diff --git a/src/main/logback.xml b/src/main/logback.xml new file mode 100644 index 0000000..59095f6 --- /dev/null +++ b/src/main/logback.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<configuration> + + <!-- 格式化输出:%date表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度 %msg:日志消息,%n是换行符--> + <property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" /> + <!-- 定义日志存储的路径,不要配置相对路径 --> + <property name="LOG_FILE_PATH" value="E:/logs/demo.%d{yyyy-MM-dd}.%i.log" /> + + <!-- 控制台输出日志 --> + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <!-- 按照上面配置的LOG_PATTERN来打印日志 --> + <pattern>${LOG_PATTERN}</pattern> + </encoder> + </appender> + + <!--每天生成一个日志文件,保存30天的日志文件。rollingFile是用来切分文件的 --> + <appender name="FILE" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${LOG_FILE_PATH}</fileNamePattern> + <!-- keep 15 days' worth of history --> + <maxHistory>30</maxHistory> + <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> + <!-- 日志文件的最大大小 --> + <maxFileSize>20MB</maxFileSize> + </timeBasedFileNamingAndTriggeringPolicy> + </rollingPolicy> + + <encoder> + <pattern>${LOG_PATTERN}</pattern> + </encoder> + </appender> + <!-- project default level项目输出的日志级别 --> + <logger name="com.example.demo" level="INFO" /> + + <!-- 日志输出级别 常用的日志级别按照从高到低依次为:ERROR、WARN、INFO、DEBUG。 --> + <root level="INFO"> + <appender-ref ref="CONSOLE" /> + <appender-ref ref="FILE" /><!--对应appender name="FILE"。 --> + </root> +</configuration>
\ No newline at end of file diff --git a/src/test/java/com/zdjizhi/tools/DatasketchesTest.java b/src/test/java/com/zdjizhi/tools/DatasketchesTest.java new file mode 100644 index 0000000..644c916 --- /dev/null +++ b/src/test/java/com/zdjizhi/tools/DatasketchesTest.java @@ -0,0 +1,253 @@ +package com.zdjizhi.tools; + +import cn.hutool.json.JSONUtil; +import com.alibaba.fastjson2.*; +import com.zdjizhi.utils.JsonMapper; +import org.apache.datasketches.hll.HllSketch; +import org.apache.datasketches.hll.Union; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.Test; + +import java.util.*; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2023/3/217:17 + */ +public class DatasketchesTest { + + @Test + public void HllSketchTest() { + HashSet<String> strings = new HashSet<>(); + + HllSketch sketch = new HllSketch(12); + + for (int i = 0; i < 50; i++) { + String ip = "192.168.1." + i; + sketch.update(ip); + strings.add(ip); + } + + System.out.println(sketch.getEstimate() + "--" + strings.size()); + + HashSet<String> randomStrings = new HashSet<>(); + + HllSketch randomSketch = new HllSketch(12); + + for (int i = 0; i < 50; i++) { + String ip = makeIPv4Random(); + randomSketch.update(ip); + randomStrings.add(ip); + } + + System.out.println(randomSketch.getEstimate() + "--" + randomStrings.size()); + } + + @Test + public void HllSketchUnionTest() { + HashSet<String> strings = new HashSet<>(); + + HllSketch sketch = new HllSketch(12); + + for (int i = 0; i < 50; i++) { + String ip = "192.168.1." + i; + sketch.update(ip); + strings.add(ip); + } + + HllSketch sketch2 = new HllSketch(12); + + for (int i = 0; i < 10; i++) { + String ip = "192.168.2." + i; + sketch2.update(ip); + strings.add(ip); + } + + Union union = new Union(12); + + union.update(sketch); + union.update(sketch2); + HllSketch sketch_result = HllSketch.heapify(union.getResult().toCompactByteArray()); + + System.out.println(sketch.getEstimate() + "--" + strings.size()); + System.out.println(sketch2.getEstimate() + "--" + strings.size()); + System.out.println(sketch_result.getEstimate() + "--" + strings.size()); + } + + @Test + public void HllSketchDruidTest() { + HashMap<String, Object> dataMap = new HashMap<>(); + + HashSet<String> strings = new HashSet<>(); + + HllSketch sketch = new HllSketch(12); + + for (int i = 0; i < 50; i++) { + String ip = "192.168.1." + i; + sketch.update(ip); + strings.add(ip); + } + + HllSketch sketch2 = new HllSketch(12); + + for (int i = 0; i < 10; i++) { + String ip = "192.168.2." + i; + sketch2.update(ip); + strings.add(ip); + } + + Union union = new Union(12); + + union.update(sketch); + union.update(sketch2); + HllSketch sketch_result1 = HllSketch.heapify(union.getResult().toCompactByteArray()); + + HllSketch sketch3 = new HllSketch(12); + + for (int i = 0; i < 10; i++) { + String ip = "192.168.3." + i; + sketch3.update(ip); + strings.add(ip); + } + + Union union2 = new Union(12); + + union2.update(sketch_result1); + union2.update(sketch3); + HllSketch sketch_result2 = HllSketch.heapify(union2.getResult().toCompactByteArray()); + + System.out.println(sketch.getEstimate() + "--" + strings.size()); + System.out.println(sketch2.getEstimate() + "--" + strings.size()); + System.out.println(sketch3.getEstimate() + "--" + strings.size()); + System.out.println(sketch_result1.getEstimate() + "--" + strings.size()); + System.out.println(sketch_result2.getEstimate() + "--" + strings.size()); + + Result result = new Result(); + result.setC2s_pkt_num(10); + result.setS2c_pkt_num(10); + result.setC2s_byte_num(10); + result.setS2c_byte_num(10); + result.setStat_time(1679970031); + result.setSchema_type("HLLSketchMergeTest"); + + //CompactByte + result.setIp_object(sketch_result2.toCompactByteArray()); +// System.out.println(result.toString()); + //sendMessage(JsonMapper.toJsonString(result); + + + //UpdatableByte + result.setIp_object(sketch_result2.toUpdatableByteArray()); +// System.out.println(result.toString()); + //sendMessage(JsonMapper.toJsonString(result); + + //Hashmap + dataMap.put("app_name", "TEST"); + dataMap.put("protocol_stack_id", "HTTP"); + dataMap.put("vsys_id", 1); + dataMap.put("stat_time", 1681370100); + dataMap.put("client_ip_sketch", sketch_result2.toUpdatableByteArray()); + + System.out.println("Jackson:" + JsonMapper.toJsonString(dataMap)); + System.out.println("FastJson2:" + JSONObject.toJSONString(dataMap)); + System.out.println("Hutool:" + JSONUtil.toJsonStr(dataMap) + "\n\n"); + + byte[] toJSONB = JSONB.toBytes(dataMap); +// sendMessage(toJSONB); + JSONObject jsonObject = JSONB.parseObject(toJSONB); + System.out.println("FastJson2 Byte(JSONB):" + jsonObject.toJSONString() + "\n\n"); + + + dataMap.put("client_ip_sketch", Base64.getEncoder().encodeToString(sketch_result2.toUpdatableByteArray())); + System.out.println("FastJson2 Byte(Base64):" + JSONObject.toJSONString(dataMap)); + System.out.println("Hutool Byte(Base64):" + JSONObject.toJSONString(dataMap)); + System.out.println(JSONUtil.toJsonStr(dataMap)); + + +// sendMessage(JSONObject.toJSONString(dataMap)); + } + + + //随机生成ip + private static String makeIPv4Random() { + int v4_1 = new Random().nextInt(255) + 1; + int v4_2 = new Random().nextInt(255); + int v4_3 = new Random().nextInt(255); + int v4_4 = new Random().nextInt(255); + return v4_1 + "." + v4_2 + "." + v4_3 + "." + v4_4; + } + + private static void sendMessage(Object message) { + Properties props = new Properties(); + //kafka地址 + props.put("bootstrap.servers", "192.168.44.12:9092"); + props.put("acks", "all"); + props.put("retries", 0); + props.put("linger.ms", 1); + props.put("buffer.memory", 67108864); +// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); +// props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + KafkaProducer<String, Object> kafkaProducer = new KafkaProducer<String, Object>(props); + + kafkaProducer.send(new ProducerRecord<String, Object>("TRAFFIC-PROTOCOL-TEST", message)); + + kafkaProducer.close(); + } +} + +class Result { + + private String schema_type; + private long c2s_byte_num; + private long c2s_pkt_num; + private long s2c_byte_num; + private long s2c_pkt_num; + private long stat_time; + private byte[] ip_object; + + public void setSchema_type(String schema_type) { + this.schema_type = schema_type; + } + + public void setC2s_byte_num(long c2s_byte_num) { + this.c2s_byte_num = c2s_byte_num; + } + + public void setC2s_pkt_num(long c2s_pkt_num) { + this.c2s_pkt_num = c2s_pkt_num; + } + + public void setS2c_byte_num(long s2c_byte_num) { + this.s2c_byte_num = s2c_byte_num; + } + + public void setS2c_pkt_num(long s2c_pkt_num) { + this.s2c_pkt_num = s2c_pkt_num; + } + + public void setStat_time(long stat_time) { + this.stat_time = stat_time; + } + + public void setIp_object(byte[] ip_object) { + this.ip_object = ip_object; + } + + @Override + public String toString() { + return "Result{" + + "schema_type='" + schema_type + '\'' + + ", c2s_byte_num=" + c2s_byte_num + + ", c2s_pkt_num=" + c2s_pkt_num + + ", s2c_byte_num=" + s2c_byte_num + + ", s2c_pkt_num=" + s2c_pkt_num + + ", stat_time=" + stat_time + + ", ip_object=" + Arrays.toString(ip_object) + + '}'; + } +}
\ No newline at end of file diff --git a/src/test/java/com/zdjizhi/tools/FlagsTest.java b/src/test/java/com/zdjizhi/tools/FlagsTest.java new file mode 100644 index 0000000..a32e06b --- /dev/null +++ b/src/test/java/com/zdjizhi/tools/FlagsTest.java @@ -0,0 +1,48 @@ +package com.zdjizhi.tools; + +import org.junit.Test; + +/** + * @author qidaijie + * @Package com.zdjizhi + * @Description: + * @date 2023/4/1810:22 + */ +public class FlagsTest { + /* + * 参考资料:https://juejin.cn/post/6879226834597691405 + * + * 会话标记(实际存储为64位无符号整数),32-bit Field标识会话的网络行为,日志记录值和如下值通过Bitwise AND(&)操作进行查询和转换: + * 0x00000001 - (1) Asymmetric + * 0x00000002 - (2) Bulky + * 0x00000004 - (4) CBR Streaming + * 0x00000008 - (8) Client is Local + * 0x00000010 - (16) Server is Local + * 0x00000020 - (32) Download + * 0x00000040 - (64) Interactive + * 0x00000080 - (128) Inbound + * 0x00000100 - (256) Outbound + * 0x00000200 - (512) Pseudo Unidirectional + * 0x00000400 - (1024) Streaming + * 0x00000800 - (2048) Unidirectional + * 0x00001000 - (4096) Random looking + * 0x00002000 - (8192) C2S + * 0x00004000 - (16384) S2C + */ + + @Test + public void bitwiseAND() { + Long common_flags = 8200L; + Long clientIsLocal = 8L; + Long serverIsLocal = 16L; + + System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal)); +// System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal)+"\n\n"); + + common_flags = 16400L; + + System.out.println("common_flags & clientIsLocal = " + (common_flags & clientIsLocal)); + System.out.println("common_flags & serverIsLocal = " + (common_flags & serverIsLocal)); + + } +} diff --git a/src/test/java/com/zdjizhi/tools/TestJsonPath.java b/src/test/java/com/zdjizhi/tools/TestJsonPath.java new file mode 100644 index 0000000..05ef995 --- /dev/null +++ b/src/test/java/com/zdjizhi/tools/TestJsonPath.java @@ -0,0 +1,89 @@ +package com.zdjizhi.tools; + +import com.alibaba.fastjson2.JSON; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.zdjizhi.common.SessionRecord; + +import java.util.ArrayList; +import java.util.Map; + +public class TestJsonPath { + public static void main(String[] args) { + + String data_center; + String device_group; + + String sessionReocrd = "{\"common_schema_type\":\"BASE\",\"common_sessions\":1,\"common_protocol_label\":\"ETHERNET.IPv4.TCP\",\"common_c2s_byte_diff\":2264,\"common_c2s_pkt_diff\":4,\"common_s2c_byte_diff\":0,\"common_s2c_pkt_diff\":0,\"common_c2s_ipfrag_num\":0,\"common_s2c_ipfrag_num\":0,\"common_c2s_tcp_unorder_num\":0,\"common_s2c_tcp_unorder_num\":0,\"common_c2s_tcp_lostlen\":0,\"common_s2c_tcp_lostlen\":0,\"common_c2s_pkt_retrans\":3,\"common_s2c_pkt_retrans\":0,\"common_c2s_byte_retrans\":1698,\"common_s2c_byte_retrans\":0,\"common_flags\":8201,\"common_flags_identify_info\":\"{\\\"Asymmetric\\\":4,\\\"Client is Local\\\":1,\\\"C2S\\\":1}\",\"common_direction\":69,\"common_app_full_path\":\"unknown\",\"common_app_label\":\"unknown\",\"common_tcp_client_isn\":3855187193,\"common_server_ip\":\"45.188.134.11\",\"common_client_ip\":\"5.32.144.55\",\"common_server_port\":8081,\"common_client_port\":5652,\"common_stream_dir\":1,\"common_address_type\":4,\"common_address_list\":\"5652-8081-5.32.144.55-45.188.134.11\",\"common_start_time\":1682403782,\"common_end_time\":1682403802,\"common_con_duration_ms\":20164,\"common_s2c_pkt_num\":0,\"common_s2c_byte_num\":0,\"common_c2s_pkt_num\":4,\"common_c2s_byte_num\":2264,\"common_client_location\":\"俄罗斯联邦.莫斯科州.沃斯克列先斯克\",\"common_server_location\":\"巴西.马托格罗索州.Unknown\",\"common_stream_trace_id\":\"869469798157990979\",\"common_l4_protocol\":\"IPv4_TCP\",\"common_sled_ip\":\"192.168.40.81\",\"common_device_id\":\"21426003\",\"common_data_center\":\"center-xxg-9140\",\"common_device_tag\":\"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-9140\\\"},{\\\"tag\\\":\\\"data_center\\\",\\\"value\\\":\\\"center-xxg-9140\\\"}]}\",\"common_t_vsys_id\":1,\"common_vsys_id\":1}"; + + SessionRecord sessionRecord = JSON.parseObject(sessionReocrd, SessionRecord.class); + + int i = 0; + double begin = System.currentTimeMillis(); + + System.out.println(sessionRecord.getCommon_device_tag()); +// //jsonpath +// while (i < 6000000) { +// String common_device_tag = sessionRecord.getCommon_device_tag(); +// String dataCenterExpr = "$.tags[?(@.tag=='data_center')].value"; +// String dataGroupExpr = "$.tags[?(@.tag=='device_group')].value"; +// +// +// if (common_device_tag != null) { +// +// DocumentContext parse = JsonPath.parse(common_device_tag); +// +// +// +// ArrayList<Object> read = parse.read(dataCenterExpr); +// if (read.size() >= 1) { +// data_center = read.get(0).toString(); +// +// } +// +// read = parse.read(dataGroupExpr); +// if (read.size() >= 1) { +// device_group = read.get(0).toString(); +// } +// } +// +// i++; +// } + + double end = System.currentTimeMillis(); // 程序结束时间,调用系统当前时间 + + double time = end - begin;// 程序的运行时间 + + System.out.println(time/1000 +"秒"); + +// //fastjson2 +// while (i < 600000) { +// +// String dataCenterExpr = "$.tags[?(@.tag=='data_center')][0].value"; +// String dataGroupExpr = "$.tags[?(@.tag=='device_group')][0].value"; +// +// +// if (common_device_tag != null) { +// ArrayList<Object> read = JsonPath.parse(common_device_tag).read(dataCenterExpr); +// if (read.size() >= 1) { +// data_center = read.get(0).toString(); +// +// } +// +// read = JsonPath.parse(common_device_tag).read(dataGroupExpr); +// if (read.size() >= 1) { +// device_group = read.get(0).toString(); +// } +// } +// i++; +// } + + + + + + + + + } +} diff --git a/src/test/java/com/zdjizhi/tools/TestMap.java b/src/test/java/com/zdjizhi/tools/TestMap.java new file mode 100644 index 0000000..5f4a831 --- /dev/null +++ b/src/test/java/com/zdjizhi/tools/TestMap.java @@ -0,0 +1,22 @@ +package com.zdjizhi.tools; + +import com.jayway.jsonpath.JsonPath; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +public class TestMap { + + public static void main(String[] args) { + Map<String, Object> tagsMap = new HashMap<String, Object>(); + tagsMap.put("session",1L); + + + Object tags = tagsMap.remove("tags"); + System.out.println(tags); + System.out.println(tagsMap); + + } + } + |
