summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author梁超 <[email protected]>2024-07-01 07:56:41 +0000
committer梁超 <[email protected]>2024-07-01 07:56:41 +0000
commit1bf739af738e0efb5316b1a48b84d9618c40c5ee (patch)
treec093c2a16fbbcd8f1b83427aba72b8bef60f1185
parent53c6c267e8ac29e619e91ed52bede8a88ce4ee92 (diff)
parente106c6c6d6ea2a34bcd2d905e1604242451a88ad (diff)
Merge branch 'feature/easy-refactor' into 'develop'develop
[GAL-602] refactor: refactor this module based on the Easy Stream framework. See merge request galaxy/tsg_olap/sip-rtp-correlation!26
-rw-r--r--.gitlab-ci.yml47
-rw-r--r--CHANGELOG.md8
-rw-r--r--README.md20
-rw-r--r--pom.xml621
-rw-r--r--src/it/java/com/zdjizhi/flink/voip/CorrelateTest.java95
-rw-r--r--src/it/java/com/zdjizhi/flink/voip/conf/TestConfigs.java22
-rw-r--r--src/it/java/com/zdjizhi/flink/voip/functions/DataGenSource.java46
-rw-r--r--src/it/java/com/zdjizhi/flink/voip/functions/DoNothingSink.java43
-rw-r--r--src/it/resources/application-test.properties4
-rw-r--r--src/it/resources/log4j2-test.properties25
-rw-r--r--src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java33
-rw-r--r--src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/FindNotBlank.java15
-rw-r--r--src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/HasExternalIpAddress.java19
-rw-r--r--src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/HasIpAddress.java18
-rw-r--r--src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsExternalIpAddress.java17
-rw-r--r--src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsInternalIpAddress.java17
-rw-r--r--src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsIpAddress.java15
-rw-r--r--src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SnowflakeID.java14
-rw-r--r--src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java31
-rw-r--r--src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java26
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java108
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java107
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/conf/FusionConfiguration.java44
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java177
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/formats/JsonNodeSerializationSchema.java20
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/functions/Address.java54
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java32
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/functions/ObjectNodeInfo.java28
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/functions/SIPKeySelector.java31
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java91
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/functions/TypeSplitFunction.java45
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/functions/VSysIDKeySelector.java40
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java180
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java27
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/records/Record.java224
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/records/SIPRecord.java57
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/records/SchemaType.java51
-rw-r--r--src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java69
-rw-r--r--src/main/resources/META-INF/services/com.geedgenetworks.flink.easy.common.api.UDFFactory1
-rw-r--r--src/main/resources/application.properties7
-rw-r--r--src/main/resources/jobs/job.yml1795
-rw-r--r--src/main/resources/log4j2.properties2
-rw-r--r--src/site/markdown/changelogs.md5
-rw-r--r--src/site/markdown/deploy.md13
-rw-r--r--src/site/markdown/download.md8
-rw-r--r--src/site/markdown/index.md10
-rw-r--r--src/site/resources/css/site.css13
-rw-r--r--src/site/resources/images/logo.pngbin0 -> 4877 bytes
-rw-r--r--src/site/site.xml56
-rw-r--r--src/test/java/com/geedgenetworks/flink/easy/application/ApplicationTest.java33
-rw-r--r--src/test/java/com/zdjizhi/flink/voip/conf/FusionConfigurationTest.java67
-rw-r--r--src/test/java/com/zdjizhi/flink/voip/data/Generator.java98
-rw-r--r--src/test/java/com/zdjizhi/flink/voip/data/RTPGenerator.java111
-rw-r--r--src/test/java/com/zdjizhi/flink/voip/data/SIPGenerator.java138
-rw-r--r--src/test/java/com/zdjizhi/flink/voip/functions/AddressTest.java48
-rw-r--r--src/test/java/com/zdjizhi/flink/voip/functions/SIPPairingFunctionTest.java104
-rw-r--r--src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java70
-rw-r--r--src/test/resources/data/session-records.txt4
-rw-r--r--tools/dist/target.xml44
-rw-r--r--tools/maven/checkstyle.xml394
-rw-r--r--tools/maven/spotbugs-exclude.xml18
-rw-r--r--tools/maven/suppressions.xml12
62 files changed, 3101 insertions, 2471 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
new file mode 100644
index 0000000..2959ffc
--- /dev/null
+++ b/.gitlab-ci.yml
@@ -0,0 +1,47 @@
+image: 192.168.40.153:9080/common/maven:3.8.1-openjdk-11-slim-with-git
+
+variables:
+ MAVEN_CLI_OPTS: "--batch-mode --errors --show-version"
+
+stages:
+ - check
+ - test
+ - build
+
+snapshot-version:
+ stage: check
+ script:
+ - mvn $MAVEN_CLI_OPTS enforcer:enforce@snapshot-version-check
+ rules:
+ - if: $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" && $CI_PIPELINE_SOURCE == "merge_request_event"
+
+non-snapshot-version:
+ stage: check
+ script:
+ - mvn $MAVEN_CLI_OPTS enforcer:enforce@release-version-check
+ - |-
+ if `mvn $MAVEN_CLI_OPTS dependency:get@release-deploy-check > /dev/null 2>&1`; then
+ echo "The current version has been deployed."
+ exit 1
+ else
+ echo "The current version has not been deployed."
+ fi
+ rules:
+ - if: $CI_MERGE_REQUEST_TARGET_BRANCH_NAME =~ /(^master$|^release\/)/ && $CI_PIPELINE_SOURCE == "merge_request_event"
+
+test:
+ stage: test
+ script:
+ - mvn $MAVEN_CLI_OPTS clean test
+ only:
+ - merge_requests
+
+# Used for building snapshot versions on the develop branch.
+build:
+ stage: build
+ script:
+ - echo "$MAVEN_SETTINGS_XML" > /usr/share/maven/conf/settings.xml
+ - mvn clean site deploy -DskipTests
+ only:
+ - master
+ - /^release\// \ No newline at end of file
diff --git a/CHANGELOG.md b/CHANGELOG.md
deleted file mode 100644
index 8cd08b3..0000000
--- a/CHANGELOG.md
+++ /dev/null
@@ -1,8 +0,0 @@
-# Changelog
-
-### Hotfix
- - [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常
-
-### Feature
- - 输出 SIP Record
- - [GAL-419](https://jira.geedge.net/browse/GAL-419) 增加配置项 `include.intranet.ip`, 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联,不关联则输出到异常 Topic 中。 \ No newline at end of file
diff --git a/README.md b/README.md
index 51614e6..b125ba2 100644
--- a/README.md
+++ b/README.md
@@ -17,27 +17,9 @@ mvn clean package
使用以下命令运行Flink任务:
```shell
-flink run -c com.zdjizhi.flink.voip.CorrelateApp path/to/sip-rtp-correlation-<version>.jar application.properties
+flink run -c com.geedgenetworks.flink.easy.core.Runner path/to/sip-rtp-correlation-<version>.jar job.yml
```
-## 配置项说明
-
-| 配置项 | 类型 | 必需 | 默认值 | 描述 |
-|----------------------------------| ------------------- | ---------- | ---------------------------------------------------------- |-------------------------------------------|
-| source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 |
-| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
-| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 |
-| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
-| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
-| include.intranet.ip | BOOLEAN | N | True | 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联 |
-| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
-| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
-| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |
-| rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) |
-| job.name | STRING | N | correlation_sip_rtp_session | Job 名 |
-
-
-
## 贡献
如果您发现任何问题或改进项目的想法,欢迎提交 Issue 或 Pull Request。 \ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 8c50e8c..5973f1f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,23 +5,24 @@
<modelVersion>4.0.0</modelVersion>
- <groupId>com.zdjizhi</groupId>
+ <groupId>com.geedgenetworks.application</groupId>
<artifactId>sip-rtp-correlation</artifactId>
- <version>1.2.2</version>
+ <version>2.0-SNAPSHOT</version>
<name>Flink : SIP-RTP : Correlation</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <java.version>1.8</java.version>
+ <java.version>11</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
- <flink.version>1.13.6</flink.version>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
+ <flink.version>1.13.6</flink.version>
+ <easy.stream.version>1.3-SNAPSHOT</easy.stream.version>
<slf4j.version>1.7.32</slf4j.version>
<log4j.version>2.17.1</log4j.version>
- <jackson.version>2.13.2.20220328</jackson.version>
+ <junit.version>5.8.0</junit.version>
</properties>
<distributionManagement>
@@ -34,97 +35,177 @@
<id>platform-snapshots</id>
<url>http://192.168.40.153:8099/content/repositories/platform-snapshot</url>
</snapshotRepository>
+ <site>
+ <id>platform-site</id>
+ <url>
+ dav:http://192.168.40.153:8099/content/sites/platform-site/platform/application/sip-rtp-correlate-${project.version}
+ </url>
+ </site>
</distributionManagement>
+ <repositories>
+ <repository>
+ <id>central</id>
+ <url>http://192.168.40.153:8099/content/groups/public</url>
+ </repository>
+ <repository>
+ <id>snapshots</id>
+ <url>http://192.168.40.153:8099/content/groups/public</url>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
<dependencies>
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
+ <groupId>com.zdjizhi</groupId>
+ <artifactId>galaxy</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-json</artifactId>
- <version>${flink.version}</version>
+ <groupId>xyz.downgoon</groupId>
+ <artifactId>snowflake</artifactId>
+ <version>1.0.0</version>
</dependency>
+
+ <!-- Easy Stream -->
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-metrics-influxdb_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-core</artifactId>
</dependency>
<dependency>
- <groupId>com.zdjizhi</groupId>
- <artifactId>galaxy</artifactId>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-grouped-exec-pipeline</artifactId>
</dependency>
<dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>1.18.26</version>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-filter-pipeline</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-console-pipeline</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-split-pipeline</artifactId>
+ <version>${easy.stream.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-correlate-pipeline</artifactId>
+ <version>${easy.stream.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-union-pipeline</artifactId>
+ <version>${easy.stream.version}</version>
</dependency>
<dependency>
- <groupId>com.github.javafaker</groupId>
- <artifactId>javafaker</artifactId>
- <scope>test</scope>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-kafka-connector</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-text-connector</artifactId>
+ <version>${easy.stream.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-socket-connector</artifactId>
+ <version>${easy.stream.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-console-connector</artifactId>
+ <version>${easy.stream.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-json-format</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-flink-shim</artifactId>
+ </dependency>
+
+ <!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
+ <artifactId>flink-clients_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <classifier>tests</classifier>
- <scope>test</scope>
</dependency>
<dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter</artifactId>
- <scope>test</scope>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_${scala.binary.version}</artifactId>
</dependency>
-
<dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- <scope>runtime</scope>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api</artifactId>
- <scope>runtime</scope>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <scope>runtime</scope>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
+ <!-- DEV -->
+ <dependency>
+ <groupId>com.github.spotbugs</groupId>
+ <artifactId>spotbugs-annotations</artifactId>
+ <version>4.4.2</version>
+ </dependency>
+ <!-- LOG -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@@ -134,30 +215,43 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
+ <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
+ <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
+ <scope>runtime</scope>
</dependency>
<dependency>
<!-- API bridge between log4j 1 and 2 -->
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>${log4j.version}</version>
+ <scope>runtime</scope>
</dependency>
+ <!-- Test -->
<dependency>
- <groupId>com.github.javafaker</groupId>
- <artifactId>javafaker</artifactId>
- <version>1.0.2</version>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
</dependency>
+ <!-- Common -->
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
@@ -171,170 +265,236 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </exclusion>
</exclusions>
</dependency>
+ <!-- Easy Stream-->
<dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter</artifactId>
- <version>5.8.0</version>
- <scope>test</scope>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-common</artifactId>
+ <version>${easy.stream.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-core</artifactId>
+ <version>${easy.stream.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-grouped-exec-pipeline</artifactId>
+ <version>${easy.stream.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-filter-pipeline</artifactId>
+ <version>${easy.stream.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-console-pipeline</artifactId>
+ <version>${easy.stream.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-kafka-connector</artifactId>
+ <version>${easy.stream.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-json-format</artifactId>
+ <version>${easy.stream.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.geedgenetworks.flink</groupId>
+ <artifactId>easy-stream-flink-shim</artifactId>
+ <version>${easy.stream.version}</version>
+ </dependency>
+
+ <!-- Flink -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
-
<plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>3.1.2</version>
+ <configuration>
+ <suppressionsLocation>${basedir}/tools/maven/suppressions.xml</suppressionsLocation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <configLocation>${basedir}/tools/maven/checkstyle.xml</configLocation>
+ <logViolationsToConsole>true</logViolationsToConsole>
+ <failOnViolation>true</failOnViolation>
+ </configuration>
+ <dependencies>
+ <dependency>
+ <groupId>com.puppycrawl.tools</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>8.40</version>
+ </dependency>
+ </dependencies>
<executions>
<execution>
- <id>test-sources</id>
- <phase>generate-test-sources</phase>
+ <id>java-style-check</id>
+ <phase>compile</phase>
<goals>
- <goal>add-test-source</goal>
+ <goal>check</goal>
</goals>
<configuration>
- <sources>
- <source>src/it/java</source>
- </sources>
+ <sourceDirectories>src/main/java</sourceDirectories>
</configuration>
</execution>
<execution>
- <id>test-resources</id>
- <phase>generate-test-resources</phase>
+ <id>java-test-style-check</id>
+ <phase>test-compile</phase>
<goals>
- <goal>add-test-resource</goal>
+ <goal>check</goal>
</goals>
<configuration>
- <resources>
- <resource>
- <directory>src/it/resources</directory>
- </resource>
- </resources>
+ <testSourceDirectories>src/test/java</testSourceDirectories>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
</execution>
</executions>
</plugin>
-
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
+ <groupId>com.github.spotbugs</groupId>
+ <artifactId>spotbugs-maven-plugin</artifactId>
+ <version>4.4.2.2</version>
<configuration>
- <source>${java.version}</source>
- <target>${java.version}</target>
+ <xmlOutput>true</xmlOutput>
+ <!-- Low, Medium, High ('Low' is strictest) -->
+ <threshold>Low</threshold>
+ <effort>default</effort>
+ <spotbugsXmlOutputDirectory>${project.build.directory}/spotbugs</spotbugsXmlOutputDirectory>
+ <excludeFilterFile>${basedir}/tools/maven/spotbugs-exclude.xml</excludeFilterFile>
+ <failOnError>true</failOnError>
</configuration>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <id>copy-tests-dependencies</id>
- <phase>package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>${project.build.outputDirectory}/tests-lib</outputDirectory>
- <excludeScope>system</excludeScope>
- <excludeTransitive>false</excludeTransitive>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.jacoco</groupId>
- <artifactId>jacoco-maven-plugin</artifactId>
<executions>
<execution>
- <id>pre-unit-test</id>
+ <id>findbugs-main</id>
+ <phase>compile</phase>
<goals>
- <goal>prepare-agent</goal>
+ <goal>check</goal>
</goals>
- <configuration>
- <destFile>${project.build.directory}/jacoco.exec</destFile>
- </configuration>
</execution>
<execution>
- <id>test-report</id>
- <phase>verify</phase>
+ <id>findbugs-test</id>
+ <phase>test-compile</phase>
<goals>
- <goal>report</goal>
+ <goal>check</goal>
</goals>
<configuration>
- <dataFile>${project.build.directory}/jacoco.exec</dataFile>
- <outputDirectory>${project.reporting.outputDirectory}/jacoco</outputDirectory>
+ <includeTests>true</includeTests>
</configuration>
</execution>
</executions>
</plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <version>3.1.0</version>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>io.github.zlika</groupId>
- <artifactId>reproducible-build-maven-plugin</artifactId>
- <version>0.2</version>
- <executions>
- <execution>
- <goals>
- <goal>strip-jar</goal>
- </goals>
- <phase>package</phase>
- </execution>
- </executions>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.8.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ </configuration>
</plugin>
-
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
- <version>3.1.1</version>
+ <version>3.5.1</version>
<executions>
<execution>
+ <id>default-shade</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
+ <finalName>${project.artifactId}-${project.version}</finalName>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
+ <exclude>org.mockito:mockito-core</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
- <!-- Do not copy the signatures in the META-INF folder.
- Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
@@ -343,50 +503,151 @@
</excludes>
</filter>
</filters>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <id>build-jobs</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <finalName>${project.artifactId}-yml-${project.version}</finalName>
+ <appendAssemblyId>false</appendAssemblyId>
+ <descriptors>
+ <descriptor>tools/dist/target.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <id>site-resources</id>
+ <phase>pre-site</phase>
+ <goals>
+ <goal>resources</goal>
+ </goals>
+ <configuration>
+ <resources>
+ <resource>
+ <directory>src/site</directory>
+ <filtering>true</filtering>
+ <includes>
+ <include>**</include>
+ </includes>
+ </resource>
+ </resources>
</configuration>
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-site-plugin</artifactId>
+ <configuration>
+ <skip>false</skip>
+ </configuration>
+ <executions>
+ <execution>
+ <id>default-site</id>
+ <goals>
+ <goal>site</goal>
+ </goals>
+ <phase>site</phase>
+ <configuration>
+ <siteDirectory>${project.build.outputDirectory}</siteDirectory>
+ </configuration>
+ </execution>
+ <execution>
+ <id>site-deploy</id>
+ <goals>
+ <goal>stage-deploy</goal>
+ </goals>
+ <phase>deploy</phase>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <version>3.2.1</version>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>3.0.0-M3</version>
+ <executions>
+ <execution>
+ <id>release-version-check</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <requireReleaseVersion>
+ <message>SNAPSHOT versions ${project.version} are not allowed.</message>
+ </requireReleaseVersion>
+ </rules>
+ </configuration>
+ </execution>
+ <execution>
+ <id>snapshot-version-check</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <requireSnapshotVersion>
+ <message>Non-SNAPSHOT versions ${project.version} are not allowed.</message>
+ </requireSnapshotVersion>
+ </rules>
+ </configuration>
+ </execution>
+ </executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>3.0.0-M5</version>
- </plugin>
- <plugin>
- <groupId>org.jacoco</groupId>
- <artifactId>jacoco-maven-plugin</artifactId>
- <version>0.8.7</version>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.9.1</version>
+ <configuration>
+ <outputDirectory>${project.build.directory}/site</outputDirectory>
+ <relativizeDecorationLinks>false</relativizeDecorationLinks>
+ </configuration>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven.wagon</groupId>
+ <artifactId>wagon-webdav-jackrabbit</artifactId>
+ <version>2.8</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.maven.doxia</groupId>
+ <artifactId>doxia-module-markdown</artifactId>
+ <version>1.9.1</version>
+ </dependency>
+ </dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <version>3.3.0</version>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>3.1.0</version>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <version>3.1.0</version>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <version>3.6.0</version>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <version>3.1.1</version>
</plugin>
</plugins>
</pluginManagement>
</build>
+
</project> \ No newline at end of file
diff --git a/src/it/java/com/zdjizhi/flink/voip/CorrelateTest.java b/src/it/java/com/zdjizhi/flink/voip/CorrelateTest.java
deleted file mode 100644
index 398d61a..0000000
--- a/src/it/java/com/zdjizhi/flink/voip/CorrelateTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package com.zdjizhi.flink.voip;
-
-import com.zdjizhi.flink.voip.functions.*;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import java.util.Objects;
-
-// Integration test main class
-public class CorrelateTest {
-
- public static void main(String[] args) throws Exception {
- final Configuration envConfig = new Configuration();
- envConfig.setInteger("rest.port", 18081);
- envConfig.setBoolean("rest.flamegraph.enabled", true);
- envConfig.setString("metrics.reporter.influxdb.factory.class", "org.apache.flink.metrics.influxdb.InfluxdbReporterFactory");
- envConfig.setString("metrics.reporter.influxdb.scheme", "http");
- envConfig.setString("metrics.reporter.influxdb.host", "127.0.0.1");
- envConfig.setInteger("metrics.reporter.influxdb.port", 8086);
- envConfig.setString("metrics.reporter.influxdb.db", "flinks");
- envConfig.setString("metrics.reporter.influxdb.interval", "5 SECONDS");
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironmentWithWebUI(envConfig);
-
- final ParameterTool tool = ParameterTool.fromPropertiesFile(
- args.length > 0 ? args[0] :
- Objects.requireNonNull(
- CorrelateTest.class.getResource("/application-test.properties")).getPath()
- );
-
- final CheckpointConfig checkpointConfig = env.getCheckpointConfig();
- env.enableCheckpointing(Time.minutes(1)
- .toMilliseconds(), CheckpointingMode.AT_LEAST_ONCE);
- checkpointConfig.setCheckpointTimeout(Time.minutes(2).toMilliseconds());
- checkpointConfig.setCheckpointStorage("file://" + System.getProperty("java.io.tmpdir"));
- /* checkpointConfig.setCheckpointStorage(
- Objects.requireNonNull(
- CorrelateTest.class.getResource("/")).toString());*/
-
- final Configuration config = tool.getConfiguration();
- env.getConfig().setGlobalJobParameters(config);
- env.setParallelism(8);
-
- final SingleOutputStreamOperator<ObjectNode> sourceStream = env.addSource(new DataGenSource())
- .name("DataGenSource")
- .setParallelism(4);
-
- // Process the data using the TypeSplitFunction and split it into separate DataStreams for SIP and RTP data
- final SingleOutputStreamOperator<ObjectNode> splitsRecordsOperator =
- sourceStream
- .process(new TypeSplitFunction())
- .name("SplitsRecordsBasedSchemaType")
- .uid("splits-records-based-schema-type");
-
- // Get the DataStreams for SIP and RTP data from the side outputs.
- final DataStream<ObjectNode> sipDataStream = splitsRecordsOperator
- .getSideOutput(TypeSplitFunction.SIP_OUTPUT_TAG);
-
- final DataStream<ObjectNode> rtpDataStream = splitsRecordsOperator
- .getSideOutput(TypeSplitFunction.RTP_OUTPUT_TAG);
-
- // Process SIP data to create a double directional stream using SIPPairingFunction.
- final SingleOutputStreamOperator<ObjectNode> sipDoubleDirOperator = sipDataStream
- .keyBy(new SIPKeySelector())
- .process(new SIPPairingFunction())
- .name("PairingOneWayToDoubleStream")
- .uid("pairing-one-way-to-double");
-
- final KeySelector<ObjectNode, Tuple2<Integer, Address>> vSysSelector = new VSysIDKeySelector();
-
- // Fusion SIP data and RTP data to VoIP data.
- final SingleOutputStreamOperator<ObjectNode> voIpOperator = rtpDataStream
- .keyBy(vSysSelector)
- .connect(sipDoubleDirOperator.keyBy(vSysSelector))
- .process(new VoIPFusionFunction())
- .name("VoIPFusion")
- .uid("voip-fusion");
-
- voIpOperator.addSink(new DoNothingSink())
- .name("DoNothingSink")
- .setParallelism(1);
-
- env.execute("VoIP Fusion Job");
- }
-}
diff --git a/src/it/java/com/zdjizhi/flink/voip/conf/TestConfigs.java b/src/it/java/com/zdjizhi/flink/voip/conf/TestConfigs.java
deleted file mode 100644
index f1b1b6c..0000000
--- a/src/it/java/com/zdjizhi/flink/voip/conf/TestConfigs.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package com.zdjizhi.flink.voip.conf;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-
-// Test Configs.
-public class TestConfigs {
-
- // Ratio of valuable data in the generated dataset
- public static final ConfigOption<Integer> VALUABLE_DATA_PROPORTION =
- ConfigOptions.key("valuable.data.ratio")
- .intType()
- .defaultValue(40)
- .withDescription("Ratio of valuable data in the generated dataset.");
-
- // QPS of generate date record
- public static final ConfigOption<Long> DATA_GENERATE_RATE =
- ConfigOptions.key("data.generate.rate")
- .longType()
- .defaultValue(1000L)
- .withDescription("QPS of generate date record.");
-}
diff --git a/src/it/java/com/zdjizhi/flink/voip/functions/DataGenSource.java b/src/it/java/com/zdjizhi/flink/voip/functions/DataGenSource.java
deleted file mode 100644
index 43e9671..0000000
--- a/src/it/java/com/zdjizhi/flink/voip/functions/DataGenSource.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.zdjizhi.flink.voip.functions;
-
-import com.zdjizhi.flink.voip.conf.TestConfigs;
-import com.zdjizhi.flink.voip.data.RTPGenerator;
-import com.zdjizhi.flink.voip.data.SIPGenerator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-
-// Date Generate Source.
-public class DataGenSource extends RichParallelSourceFunction<ObjectNode> implements FunctionHelper {
-
- private transient SIPGenerator sipGenerator;
- private transient RTPGenerator rtpGenerator;
- private transient RateLimiter rateLimiter;
- private volatile boolean running;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- Integer ratio = getGlobalConfiguration()
- .get(TestConfigs.VALUABLE_DATA_PROPORTION);
- this.sipGenerator = new SIPGenerator(ratio);
- this.rtpGenerator = new RTPGenerator(ratio, sipGenerator);
- final Long rate = getGlobalConfiguration()
- .get(TestConfigs.DATA_GENERATE_RATE);
- this.rateLimiter = RateLimiter.create(
- (int) (rate / getRuntimeContext().getNumberOfParallelSubtasks()));
- this.running = true;
- }
-
- @Override
- public void run(SourceContext<ObjectNode> ctx) throws Exception {
- while (running) {
- rateLimiter.acquire();
- ctx.collect(sipGenerator.next());
- ctx.collect(rtpGenerator.next());
- }
- }
-
- @Override
- public void cancel() {
- this.running = false;
- }
-}
diff --git a/src/it/java/com/zdjizhi/flink/voip/functions/DoNothingSink.java b/src/it/java/com/zdjizhi/flink/voip/functions/DoNothingSink.java
deleted file mode 100644
index 6a6e573..0000000
--- a/src/it/java/com/zdjizhi/flink/voip/functions/DoNothingSink.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package com.zdjizhi.flink.voip.functions;
-
-import com.zdjizhi.flink.voip.records.Record;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-
-// It dose nothing with the incoming data and simply collects metrics for the number of
-// RTP and VoIP records processed per second.
-public class DoNothingSink extends RichSinkFunction<ObjectNode> {
-
- private transient MeterView numRTPRecordsPreSecond;
- private transient MeterView numVoIPRecordsPreSecond;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- RuntimeContext runtimeContext = getRuntimeContext();
- MetricGroup metricGroup = runtimeContext.getMetricGroup();
- numRTPRecordsPreSecond = metricGroup
- .meter("numRTPRecordsPreSecond", new MeterView(1));
- numVoIPRecordsPreSecond = metricGroup
- .meter("numVoIPRecordsPreSecond", new MeterView(1));
- }
-
- @Override
- public void invoke(ObjectNode obj, Context context) throws Exception {
- Record record = new Record(obj);
- switch (record.getSchemaType()) {
- case RTP:
- numRTPRecordsPreSecond.markEvent();
- break;
- case VOIP:
- numVoIPRecordsPreSecond.markEvent();
- break;
- default:
- }
- }
-}
diff --git a/src/it/resources/application-test.properties b/src/it/resources/application-test.properties
deleted file mode 100644
index 8d510b3..0000000
--- a/src/it/resources/application-test.properties
+++ /dev/null
@@ -1,4 +0,0 @@
-sip.state.clear.interval.minutes=1
-rtp.state.clear.interval.minutes=10
-valuable.data.ratio=20
-data.generate.rate=1000 \ No newline at end of file
diff --git a/src/it/resources/log4j2-test.properties b/src/it/resources/log4j2-test.properties
deleted file mode 100644
index 16f3340..0000000
--- a/src/it/resources/log4j2-test.properties
+++ /dev/null
@@ -1,25 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-rootLogger.level = INFO
-rootLogger.appenderRef.test.ref = TestLogger
-
-appender.test.name = TestLogger
-appender.test.type = CONSOLE
-appender.test.layout.type = PatternLayout
-appender.test.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java
new file mode 100644
index 0000000..20d0746
--- /dev/null
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java
@@ -0,0 +1,33 @@
+package com.geedgenetworks.flink.easy.application.voip;
+
+import com.geedgenetworks.flink.easy.application.voip.udf.*;
+import com.geedgenetworks.flink.easy.common.api.UDFFactory;
+import org.apache.flink.table.functions.UserDefinedFunction;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class VoipUDFFactory implements UDFFactory {
+
+ private static final Map<String, UserDefinedFunction> R =
+ new HashMap<>() {{
+ put("IS_IP_ADDRESS", new IsIpAddress());
+
+ put("IS_INTERNAL_IP_ADDRESS", new IsInternalIpAddress());
+ put("IS_EXTERNAL_IP_ADDRESS", new IsExternalIpAddress());
+
+ put("HAS_IP_ADDRESS", new HasIpAddress());
+ put("HAS_EXTERNAL_IP_ADDRESS", new HasExternalIpAddress());
+
+ put("STREAM_DIR", new StreamDir());
+ put("FIND_NOT_BLANK", new FindNotBlank());
+ put("SORT_ADDRESS", new SortAddress());
+
+ put("SNOWFLAKE_ID", new SnowflakeID());
+ }};
+
+ @Override
+ public Map<String, UserDefinedFunction> register() {
+ return R;
+ }
+}
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/FindNotBlank.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/FindNotBlank.java
new file mode 100644
index 0000000..a018ce2
--- /dev/null
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/FindNotBlank.java
@@ -0,0 +1,15 @@
+package com.geedgenetworks.flink.easy.application.voip.udf;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+
+public class FindNotBlank extends ScalarFunction {
+
+ public @DataTypeHint("STRING") String eval(String s1, String s2) {
+ if (StringUtils.isBlank(s1) && StringUtils.isNotBlank(s2)) {
+ return s2;
+ }
+ return s1;
+ }
+}
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/HasExternalIpAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/HasExternalIpAddress.java
new file mode 100644
index 0000000..2ce254a
--- /dev/null
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/HasExternalIpAddress.java
@@ -0,0 +1,19 @@
+package com.geedgenetworks.flink.easy.application.voip.udf;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+
+public class HasExternalIpAddress extends ScalarFunction {
+
+ private final IsExternalIpAddress isExternalIpAddress = new IsExternalIpAddress();
+
+ public @DataTypeHint("BOOLEAN") Boolean eval(String... ipaddr) {
+ if (null == ipaddr) {
+ return false;
+ }
+ for (var ip : ipaddr) {
+ return isExternalIpAddress.eval(ip);
+ }
+ return false;
+ }
+}
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/HasIpAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/HasIpAddress.java
new file mode 100644
index 0000000..7dddbc7
--- /dev/null
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/HasIpAddress.java
@@ -0,0 +1,18 @@
+package com.geedgenetworks.flink.easy.application.voip.udf;
+
+import com.zdjizhi.utils.IPUtil;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+
+public class HasIpAddress extends ScalarFunction {
+
+ public @DataTypeHint("BOOLEAN") Boolean eval(String... ipaddr) {
+ if (null == ipaddr) {
+ return false;
+ }
+ for (var ip : ipaddr) {
+ return ip != null && IPUtil.isIPAddress(ip);
+ }
+ return false;
+ }
+}
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsExternalIpAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsExternalIpAddress.java
new file mode 100644
index 0000000..a2970a6
--- /dev/null
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsExternalIpAddress.java
@@ -0,0 +1,17 @@
+package com.geedgenetworks.flink.easy.application.voip.udf;
+
+import com.zdjizhi.utils.IPUtil;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+
+import static com.zdjizhi.utils.IPUtil.isIPAddress;
+
+public class IsExternalIpAddress extends ScalarFunction {
+
+ public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) {
+ if (ipaddr == null || !isIPAddress(ipaddr)) {
+ return false;
+ }
+ return !IPUtil.internalIp(ipaddr);
+ }
+}
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsInternalIpAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsInternalIpAddress.java
new file mode 100644
index 0000000..55839ba
--- /dev/null
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsInternalIpAddress.java
@@ -0,0 +1,17 @@
+package com.geedgenetworks.flink.easy.application.voip.udf;
+
+import com.zdjizhi.utils.IPUtil;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+
+import static com.zdjizhi.utils.IPUtil.isIPAddress;
+
+public class IsInternalIpAddress extends ScalarFunction {
+
+ public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) {
+ if (!isIPAddress(ipaddr)) {
+ return false;
+ }
+ return IPUtil.internalIp(ipaddr);
+ }
+}
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsIpAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsIpAddress.java
new file mode 100644
index 0000000..cbef834
--- /dev/null
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsIpAddress.java
@@ -0,0 +1,15 @@
+package com.geedgenetworks.flink.easy.application.voip.udf;
+
+import com.zdjizhi.utils.IPUtil;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+
+public class IsIpAddress extends ScalarFunction {
+
+ public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) {
+ if (null == ipaddr) {
+ return false;
+ }
+ return IPUtil.isIPAddress(ipaddr);
+ }
+}
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SnowflakeID.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SnowflakeID.java
new file mode 100644
index 0000000..abaea8f
--- /dev/null
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SnowflakeID.java
@@ -0,0 +1,14 @@
+package com.geedgenetworks.flink.easy.application.voip.udf;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import xyz.downgoon.snowflake.Snowflake;
+
+public class SnowflakeID extends ScalarFunction {
+
+ private static final Snowflake SNOWFLAKE = new Snowflake(1, 1);
+
+ public @DataTypeHint("BIGINT") Long eval() {
+ return SNOWFLAKE.nextId();
+ }
+}
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java
new file mode 100644
index 0000000..ab422b7
--- /dev/null
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java
@@ -0,0 +1,31 @@
+package com.geedgenetworks.flink.easy.application.voip.udf;
+
+import com.google.common.collect.Lists;
+import com.zdjizhi.utils.IPUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+
+public class SortAddress extends ScalarFunction {
+
+ public @DataTypeHint("STRING")
+ String eval(
+ String ip1, Integer port1, String ip2, Integer port2) {
+ return of(Tuple2.of(ip1, port1), Tuple2.of(ip2, port2));
+ }
+
+ public static String of(
+ Tuple2<String, Integer> a1, Tuple2<String, Integer> a2) {
+ var list = Lists.newArrayList(a1, a2);
+ list.sort((a, b) -> {
+ if (a.f1.equals(b.f1)) {
+ return Long.compare(
+ IPUtil.getIpDesimal(a.f0), IPUtil.getIpDesimal(b.f0));
+ } else {
+ return a.f1.compareTo(b.f1);
+ }
+ });
+ return String.format("%s:%s,%s:%s",
+ list.get(0).f0, list.get(0).f1, list.get(1).f0, list.get(1).f1);
+ }
+} \ No newline at end of file
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java
new file mode 100644
index 0000000..3f5166f
--- /dev/null
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java
@@ -0,0 +1,26 @@
+package com.geedgenetworks.flink.easy.application.voip.udf;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+
+public class StreamDir extends ScalarFunction {
+
+ public @DataTypeHint("INT") Integer eval(Long flags) {
+ int v = 0;
+ if (flags == null) {
+ return v;
+ }
+ if ((flags & 8192) == 8192) {
+ v += 1;
+ }
+ if ((flags & 16384) == 16384) {
+ v += 2;
+ }
+ return v;
+ }
+
+ public static void main(String[] args) {
+ System.out.println(8192L + 16384L);
+ System.out.println(new StreamDir().eval(8192L + 16384L));
+ }
+} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java
deleted file mode 100644
index 304e04d..0000000
--- a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package com.zdjizhi.flink.voip;
-
-import com.zdjizhi.flink.voip.conf.FusionConfiguration;
-import com.zdjizhi.flink.voip.error.ErrorHandler;
-import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
-import com.zdjizhi.flink.voip.functions.*;
-import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-
-import java.time.Duration;
-
-import static com.zdjizhi.flink.voip.conf.FusionConfigs.*;
-
-/**
- * The main class for running the SIP RTP Correlation application.
- *
- * @author chaoc
- * @since 1.0
- */
-public class CorrelateApp {
-
- public static void main(String[] args) throws Exception {
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // param check
- if (args.length < 1) {
- throw new IllegalArgumentException("Error: Not found properties path. " +
- "\nUsage: flink -c xxx xxx.jar app.properties.");
- }
-
- final ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]);
-
- final Configuration config = tool.getConfiguration();
- env.getConfig().setGlobalJobParameters(config);
-
- final FusionConfiguration fusionConfiguration = new FusionConfiguration(config);
-
- final FlinkKafkaConsumer<ObjectNode> kafkaConsumer = new FlinkKafkaConsumer<>(
- config.get(SOURCE_KAFKA_TOPIC),
- new JsonNodeDeserializationSchema(),
- fusionConfiguration
- .getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
-
- final DataStream<ObjectNode> sourceStream = env.addSource(kafkaConsumer)
- .assignTimestampsAndWatermarks(
- WatermarkStrategy
- .<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(5))
- .withTimestampAssigner((SerializableTimestampAssigner<ObjectNode>)
- (element, recordTimestamp) ->
- element.get("start_timestamp_ms").asLong()));
-
- final ErrorHandler errorHandler = new ErrorHandler(config);
-
- // Process the data using the TypeSplitFunction and split it into separate DataStreams for SIP and RTP data
- final SingleOutputStreamOperator<ObjectNode> splitsRecordsOperator =
- errorHandler.filterError(sourceStream)
- .process(new TypeSplitFunction())
- .name("SplitsRecordsBasedSchemaType")
- .uid("splits-records-based-schema-type");
-
- // Get the DataStreams for SIP and RTP data from the side outputs.
- final DataStream<ObjectNode> sipDataStream = splitsRecordsOperator
- .getSideOutput(TypeSplitFunction.SIP_OUTPUT_TAG);
-
- final DataStream<ObjectNode> rtpDataStream = splitsRecordsOperator
- .getSideOutput(TypeSplitFunction.RTP_OUTPUT_TAG);
-
- // Process SIP data to create a double directional stream using SIPPairingFunction.
- final SingleOutputStreamOperator<ObjectNode> sipDoubleDirOperator = sipDataStream
- .keyBy(new SIPKeySelector())
- .process(new SIPPairingFunction())
- .name("PairingOneWayToDoubleStream")
- .uid("pairing-one-way-to-double");
-
- final KeySelector<ObjectNode, Tuple2<Integer, Address>> vSysSelector = new VSysIDKeySelector();
-
- // Fusion SIP data and RTP data to VoIP data.
- final SingleOutputStreamOperator<ObjectNode> voIpOperator = rtpDataStream
- .keyBy(vSysSelector)
- .connect(sipDoubleDirOperator.keyBy(vSysSelector))
- .process(new VoIPFusionFunction())
- .name("VoIPFusion")
- .uid("voip-fusion");
-
- final FlinkKafkaProducer<ObjectNode> producer = new FlinkKafkaProducer<>(
- config.get(SINK_KAFKA_TOPIC),
- new JsonNodeSerializationSchema(),
- fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
-
- voIpOperator
- .union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG))
- .addSink(producer);
-
- env.execute(config.get(JOB_NAME));
- }
-}
diff --git a/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java b/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java
deleted file mode 100644
index 926c5a0..0000000
--- a/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java
+++ /dev/null
@@ -1,107 +0,0 @@
-package com.zdjizhi.flink.voip.conf;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-
-/**
- * Containing configuration options for the Fusion application.
- *
- * @author chaoc
- * @since 1.0
- */
-public class FusionConfigs {
-
- /**
- * The prefix for Kafka properties used in the source.
- */
- public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props.";
-
- /**
- * The prefix for Kafka properties used in the sink.
- */
- public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props.";
-
- /**
- * Configuration prefix for the properties of the Kafka sink where the error data will be output.
- */
- public static final String ERROR_SINK_KAFKA_PROPERTIES_PREFIX = "error.sink.kafka.props.";
-
- /**
- * Configuration option for the Kafka topic used in the source.
- */
- public static final ConfigOption<String> SOURCE_KAFKA_TOPIC =
- ConfigOptions.key("source.kafka.topic")
- .stringType()
- .noDefaultValue()
- .withDescription("The Kafka topic used in the source.");
-
- /**
- * Configuration option for the Kafka topic used in the sink.
- */
- public static final ConfigOption<String> SINK_KAFKA_TOPIC =
- ConfigOptions.key("sink.kafka.topic")
- .stringType()
- .noDefaultValue()
- .withDescription("The Kafka topic used in the sink.");
-
- /**
- * Configuration option to enable or disable the output of error records.
- * If set to true, the error records will be sent to the specified Kafka topic.
- * Default value is false.
- */
- public static final ConfigOption<Boolean> ERROR_RECORDS_OUTPUT_ENABLE =
- ConfigOptions.key("error.records.output.enable")
- .booleanType()
- .defaultValue(false)
- .withDescription("Enable or disable the output of error records. " +
- "If set to true, the error records will be sent to the specified Kafka topic.");
-
- /**
- * Configuration option to determine whether to perform data correlate for intranet addresses.
- */
- public static final ConfigOption<Boolean> INCLUDE_INTRANET_IP =
- ConfigOptions.key("include.intranet.ip")
- .booleanType()
- .defaultValue(true)
- .withDescription("Whether to perform data correlate for intranet addresses");
-
-
- /**
- * Configuration option for specifying the Kafka topic name where the error data will be sent.
- * This configuration option is used when the output of error records is enabled.
- */
- public static final ConfigOption<String> ERROR_SINK_KAFKA_TOPIC =
- ConfigOptions.key("error.sink.kafka.topic")
- .stringType()
- .noDefaultValue()
- .withDescription("The Kafka topic name where the error records will be sent.");
-
- /**
- * The configuration option for the interval at which SIP state data
- * should be cleared.
- */
- public static final ConfigOption<Integer> SIP_STATE_CLEAR_INTERVAL =
- ConfigOptions.key("sip.state.clear.interval.minutes")
- .intType()
- .defaultValue(1)
- .withDescription("The interval at which SIP state data should be cleared.");
-
- /**
- * The configuration option for the interval at which STP state data
- * should be cleared.
- */
- public static final ConfigOption<Integer> RTP_STATE_CLEAR_INTERVAL =
- ConfigOptions.key("rtp.state.clear.interval.minutes")
- .intType()
- .defaultValue(6)
- .withDescription("The interval at which RTP state data should be cleared.");
-
- /**
- * Configuration option for specifying the name of a job.
- */
- public static final ConfigOption<String> JOB_NAME =
- ConfigOptions.key("job.name")
- .stringType()
- .defaultValue("correlation_sip_rtp_session")
- .withDescription("The name of current job.");
-} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfiguration.java b/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfiguration.java
deleted file mode 100644
index ad983fb..0000000
--- a/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfiguration.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package com.zdjizhi.flink.voip.conf;
-
-import org.apache.flink.configuration.Configuration;
-
-import java.util.Properties;
-
-/**
- * A wrapper class that extends the Flink `Configuration` to provide utility methods for handling
- * properties with a specific prefix. This class allows retrieving properties that start with the
- * given `prefix` and converts them into a `java.util.Properties` object.
- *
- * @author chaoc
- * @since 1.0
- */
-
-public class FusionConfiguration {
- private final Configuration config;
-
- public FusionConfiguration(final Configuration config) {
- this.config = config;
- }
-
- /**
- * Retrieves properties from the underlying `Configuration` instance that start with the specified
- * `prefix`. The properties are then converted into a `java.util.Properties` object and returned.
- *
- * @param prefix The prefix to filter properties.
- * @return A `java.util.Properties` object containing the properties with the specified prefix.
- */
- public Properties getProperties(final String prefix) {
- if (prefix == null) {
- final Properties props = new Properties();
- props.putAll(config.toMap());
- return props;
- }
- return config.toMap()
- .entrySet()
- .stream()
- .filter(entry -> entry.getKey().startsWith(prefix))
- .collect(Properties::new, (props, e) ->
- props.setProperty(e.getKey().substring(prefix.length()), e.getValue()),
- Properties::putAll);
- }
-}
diff --git a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java
deleted file mode 100644
index be41377..0000000
--- a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java
+++ /dev/null
@@ -1,177 +0,0 @@
-package com.zdjizhi.flink.voip.error;
-
-import com.zdjizhi.flink.voip.conf.FusionConfigs;
-import com.zdjizhi.flink.voip.conf.FusionConfiguration;
-import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
-import com.zdjizhi.flink.voip.functions.FunctionHelper;
-import com.zdjizhi.flink.voip.records.Record;
-import com.zdjizhi.flink.voip.records.SIPRecord;
-import com.zdjizhi.flink.voip.records.SchemaType;
-import com.zdjizhi.flink.voip.records.StreamDir;
-import com.zdjizhi.utils.IPUtil;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.function.Function;
-
-/**
- * The ErrorHandler class is responsible for handling and filtering error records from the input data stream.
- * It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled.
- *
- * @author chaoc
- * @since 1.0
- */
-public class ErrorHandler {
-
- private static final Logger LOG = LoggerFactory.getLogger(ErrorHandler.class);
- /**
- * The OutputTag for invalid records.
- */
- public static final OutputTag<ObjectNode> INVALID_OUTPUT_TAG =
- new OutputTag<>("invalid-records", TypeInformation.of(ObjectNode.class));
-
- private final Configuration config;
-
- /**
- * Creates a new ErrorHandler instance with the given configuration.
- *
- * @param config The configuration containing settings for error handling.
- */
- public ErrorHandler(final Configuration config) {
- this.config = config;
- }
-
- /**
- * Filters out error records from the input data stream and outputs them to a separate stream if enabled.
- *
- * @param dataStream The input data stream of ObjectNode records.
- * @return A new DataStream containing valid records without errors.
- */
- public DataStream<ObjectNode> filterError(final DataStream<ObjectNode> dataStream) {
- // Process the data stream to identify meaningless addresses and ports
- final SingleOutputStreamOperator<ObjectNode> operator = dataStream
- .process(new MeaninglessAddressProcessFunction())
- .name("MeaninglessRecords")
- .uid("meaningless-records");
-
- // If enabled, output the error records to the specified Kafka topic
- if (config.get(FusionConfigs.ERROR_RECORDS_OUTPUT_ENABLE)) {
-
- final String topic = config.get(FusionConfigs.ERROR_SINK_KAFKA_TOPIC);
-
- LOG.info("Meaningless data output is enabled, data will be sent to: Topic [{}]", topic);
-
- final DataStream<ObjectNode> errorStream = operator
- .getSideOutput(INVALID_OUTPUT_TAG);
-
- final FlinkKafkaProducer<ObjectNode> producer = new FlinkKafkaProducer<>(
- topic,
- new JsonNodeSerializationSchema(),
- new FusionConfiguration(config)
- .getProperties(FusionConfigs.ERROR_SINK_KAFKA_PROPERTIES_PREFIX)
- );
-
- errorStream.addSink(producer);
- }
- return operator;
- }
-}
-
-/**
- * The MeaninglessAddressProcessFunction is a custom ProcessFunction used to check for records
- * with invalid or meaningless addresses and ports. It separates them into the invalid output tag if necessary.
- */
-class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, ObjectNode> implements FunctionHelper {
-
- private static final Logger LOG = LoggerFactory.getLogger(MeaninglessAddressProcessFunction.class);
-
- private transient boolean includeIntranetIp;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- final Configuration config = getGlobalConfiguration();
- includeIntranetIp = config.get(FusionConfigs.INCLUDE_INTRANET_IP);
- }
-
- @Override
- public void processElement(ObjectNode obj,
- ProcessFunction<ObjectNode, ObjectNode>.Context ctx,
- Collector<ObjectNode> out) throws Exception {
- final Record record = new Record(obj);
- // Check for invalid or meaningless addresses and ports
- boolean cond1 = isIPAddress(record.getClientIp()) &&
- isIPAddress(record.getServerIp()) &&
- record.getClientPort() > 0 &&
- record.getServerPort() > 0;
-
- boolean cond8 = null != executeSafely(Record::getStreamDir, record);
-
- final SIPRecord sipRecord = new SIPRecord(obj);
- boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
- || isIPAddress(sipRecord.getResponderSdpConnectIp());
- boolean cond3 = !isIPAddress(sipRecord.getResponderSdpConnectIp())
- || (includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp()));
- boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
- || (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
- boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
- boolean cond6 = StreamDir.DOUBLE == executeSafely(Record::getStreamDir, sipRecord) &&
- (includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
- (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
-
- boolean cond7 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) &&
- isIPAddress(sipRecord.getResponderSdpConnectIp()) &&
- sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0;
-
- // Both client and server addresses in the data are valid.
- if (cond1 && cond8 && (!cond5 || cond7) && (
- // The address in the SIP one-way stream is valid and not an internal network address.
- cond2 && cond3 && cond4 && cond5
- // The coordinating addresses in the SIP double directional stream are valid
- // and not internal network addresses.
- || cond5 && cond6
- || !cond5)) {
- out.collect(obj);
- } else {
- // Output invalid records to the invalid output tag
- ctx.output(ErrorHandler.INVALID_OUTPUT_TAG, obj);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Meaningless record: {}", obj);
- }
- }
- }
-
- // ======================================================================================
- // ----------------------------------- private helper -----------------------------------
-
- public static <T, R> R executeSafely(Function<T, R> function, T v) {
- try {
- return function.apply(v);
- } catch (Exception e) {
- return null;
- }
- }
-
- private static boolean isIPAddress(final String ipaddr) {
- if (null == ipaddr) {
- return false;
- }
- return IPUtil.isIPAddress(ipaddr);
- }
-
- private static boolean isInternalIp(final String ipaddr) {
- if (!isIPAddress(ipaddr)) {
- return false;
- }
- return IPUtil.internalIp(ipaddr);
- }
-} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/flink/voip/formats/JsonNodeSerializationSchema.java b/src/main/java/com/zdjizhi/flink/voip/formats/JsonNodeSerializationSchema.java
deleted file mode 100644
index d193113..0000000
--- a/src/main/java/com/zdjizhi/flink/voip/formats/JsonNodeSerializationSchema.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package com.zdjizhi.flink.voip.formats;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-
-public class JsonNodeSerializationSchema implements SerializationSchema<ObjectNode> {
-
- private final ObjectMapper mapper = new ObjectMapper();
-
- @Override
- public byte[] serialize(ObjectNode jsonNodes) {
- try {
- return mapper.writeValueAsBytes(jsonNodes);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}
diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/Address.java b/src/main/java/com/zdjizhi/flink/voip/functions/Address.java
deleted file mode 100644
index 3642f58..0000000
--- a/src/main/java/com/zdjizhi/flink/voip/functions/Address.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package com.zdjizhi.flink.voip.functions;
-
-import com.google.common.collect.Lists;
-import com.zdjizhi.utils.IPUtil;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.List;
-
-
-/**
- * A pojo class representing an address with two IP and port pairs.
- *
- * @author chaoc
- * @since 1.0
- */
-@Data
-@AllArgsConstructor
-public class Address {
-
- // The first IP address.
- private final String ip1;
-
- //The first port number.
- private final int port1;
-
- //The second IP address.
- private final String ip2;
-
- //The second port number.
- private final int port2;
-
- /**
- * Creates an Address instance based on two tuples containing (String, Int) representing address information.
- * The method sorts the addresses based on the port number, and if the ports are equal, it sorts them based on
- * the numeric value of the IP address.
- *
- * @param a1 The first address information as a tuple (IP address, port).
- * @param a2 The second address information as a tuple (IP address, port).
- * @return An Address instance with addresses sorted and reordered.
- */
- public static Address of(Tuple2<String, Integer> a1, Tuple2<String, Integer> a2) {
- List<Tuple2<String, Integer>> list = Lists.newArrayList(a1, a2);
- list.sort((a, b) -> {
- if (a.f1.equals(b.f1)) {
- return Long.compare(IPUtil.getIpDesimal(a.f0), IPUtil.getIpDesimal(b.f0));
- } else {
- return a.f1.compareTo(b.f1);
- }
- });
- return new Address(list.get(0).f0, list.get(0).f1, list.get(1).f0, list.get(1).f1);
- }
-}
diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java b/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java
deleted file mode 100644
index 216e283..0000000
--- a/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package com.zdjizhi.flink.voip.functions;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.TimerService;
-
-/**
- * An interface that provides utility functions for Flink functions.
- *
- * @author chaoc
- * @since 1.0
- */
-public interface FunctionHelper extends RichFunction {
-
- /**
- * Get the global configuration for the current Flink job.
- *
- * @return The global configuration as a Configuration object.
- */
-
- default Configuration getGlobalConfiguration() {
- final ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext()
- .getExecutionConfig().getGlobalJobParameters();
- return (Configuration) globalJobParameters;
- }
-
- default void registerNextFireTimestamp(TimerService timerService, long interval) {
- long current = timerService.currentWatermark();
- timerService.registerEventTimeTimer(current + interval);
- }
-}
diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/ObjectNodeInfo.java b/src/main/java/com/zdjizhi/flink/voip/functions/ObjectNodeInfo.java
deleted file mode 100644
index d55110b..0000000
--- a/src/main/java/com/zdjizhi/flink/voip/functions/ObjectNodeInfo.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package com.zdjizhi.flink.voip.functions;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.Setter;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * A case class representing an ObjectNode with an expiration time and a pair times.
- *
- * @author chaoc
- * @since 1.0
- */
-@Data
-@AllArgsConstructor
-class ObjectNodeInfo {
-
- // The ObjectNode containing data.
- private ObjectNode obj;
-
- // The pair times for the object.
- @Setter
- private int times;
-
- public void incTimes() {
- this.times = this.times + 1;
- }
-}
diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/SIPKeySelector.java b/src/main/java/com/zdjizhi/flink/voip/functions/SIPKeySelector.java
deleted file mode 100644
index 678f9c1..0000000
--- a/src/main/java/com/zdjizhi/flink/voip/functions/SIPKeySelector.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package com.zdjizhi.flink.voip.functions;
-
-import com.zdjizhi.flink.voip.records.SIPRecord;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * A KeySelector implementation for extracting a composite key from an ObjectNode representing a SIP record.
- *
- * @author chaoc
- * @since 1.0
- */
-public class SIPKeySelector implements KeySelector<ObjectNode, Tuple3<Integer, String, Address>> {
-
- /**
- * Extracts the composite key (VSysID, CallID, Address) from the given ObjectNode.
- *
- * @param obj The ObjectNode representing a SIP record.
- * @return A Tuple3 containing the extracted key (VSysID, CallID, Address).
- * @throws Exception Thrown if an error occurs during key extraction.
- */
- @Override
- public Tuple3<Integer, String, Address> getKey(ObjectNode obj) throws Exception {
- final SIPRecord record = new SIPRecord(obj);
- final Address address = Address.of(Tuple2.of(record.getClientIp(), record.getClientPort()),
- Tuple2.of(record.getServerIp(), record.getServerPort()));
- return Tuple3.of(record.getVSysID(), record.getCallID(), address);
- }
-} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java
deleted file mode 100644
index 274da5d..0000000
--- a/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package com.zdjizhi.flink.voip.functions;
-
-import com.zdjizhi.flink.voip.conf.FusionConfigs;
-import com.zdjizhi.flink.voip.records.Record;
-import com.zdjizhi.flink.voip.records.StreamDir;
-import org.apache.flink.api.common.state.StateTtlConfig;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-
-/**
- * A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction.
- * SIP records are paired when they have the same addresses but opposite stream directions.
- *
- * @author chaoc
- * @since 1.0
- */
-public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>
- implements FunctionHelper {
-
- public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG =
- new OutputTag<>("unmatched-sip", TypeInformation.of(ObjectNode.class));
-
- private transient Time fireInterval;
-
- private transient ValueState<ObjectNode> valueState;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- int minutes = getGlobalConfiguration().get(FusionConfigs.SIP_STATE_CLEAR_INTERVAL);
- fireInterval = Time.minutes(minutes);
- final ValueStateDescriptor<ObjectNode> descriptor =
- new ValueStateDescriptor<>("sip-state", ObjectNode.class);
-
- final StateTtlConfig ttlConfig = StateTtlConfig
- .newBuilder(fireInterval)
- .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
- .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
- .useProcessingTime()
- .cleanupFullSnapshot()
- .build();
- descriptor.enableTimeToLive(ttlConfig);
- valueState = getRuntimeContext().getState(descriptor);
- }
-
- @Override
- public void processElement(ObjectNode value,
- KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.Context ctx,
- Collector<ObjectNode> out) throws Exception {
-
- final Record record = new Record(value);
- // When SIP is a one-way stream.
- if (StreamDir.DOUBLE != record.getStreamDir()) {
- // If the address is already stored in the mapState and has the opposite stream direction,
- // merge the SIP records, change the stream direction to DOUBLE, and output the merged record.
- final ObjectNode obj = valueState.value();
- if (null != obj && new Record(obj).getStreamDir() != record.getStreamDir()) {
- record.merge(obj)
- .setInt(Record.F_COMMON_STREAM_DIR, StreamDir.DOUBLE.getValue());
- out.collect(value);
- valueState.clear();
- } else {
- // If the address is not yet in the valueState.
- valueState.update(value);
- }
- } else {
- // If SIP is a double stream, pairing isn't required, directly output the record.
- out.collect(value);
- }
- registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds());
- }
-
- @Override
- public void onTimer(long timestamp,
- KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx,
- Collector<ObjectNode> out) throws Exception {
- final ObjectNode value = valueState.value();
- if (value != null) {
- ctx.output(SIP_OUTPUT_TAG, value);
- }
- valueState.clear();
- }
-}
diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/TypeSplitFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/TypeSplitFunction.java
deleted file mode 100644
index e8fb99e..0000000
--- a/src/main/java/com/zdjizhi/flink/voip/functions/TypeSplitFunction.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package com.zdjizhi.flink.voip.functions;
-
-import com.zdjizhi.flink.voip.records.Record;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-
-/**
- * A ProcessFunction that splits ObjectNode records based on their 'schemaType' field.
- * It outputs SIP records to the 'sipSchemaTypeOutputTag' and RTP records to the 'rtpSchemaTypeOutputTag'.
- *
- * @author chaoc
- * @since 1.0
- */
-public class TypeSplitFunction extends ProcessFunction<ObjectNode, ObjectNode> {
-
- /**
- * OutputTag for SIP records.
- */
- public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG =
- new OutputTag<>("schema-type-sip", TypeInformation.of(ObjectNode.class));
- /**
- * OutputTag for RTP records.
- */
- public static final OutputTag<ObjectNode> RTP_OUTPUT_TAG =
- new OutputTag<>("schema-type-rtp", TypeInformation.of(ObjectNode.class));
-
- @Override
- public void processElement(ObjectNode obj,
- ProcessFunction<ObjectNode, ObjectNode>.Context ctx,
- Collector<ObjectNode> out) throws Exception {
- final Record record = new Record(obj);
- switch (record.getSchemaType()) {
- case RTP:
- ctx.output(RTP_OUTPUT_TAG, obj);
- break;
- case SIP:
- ctx.output(SIP_OUTPUT_TAG, obj);
- break;
- default:
- }
- }
-}
diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/VSysIDKeySelector.java b/src/main/java/com/zdjizhi/flink/voip/functions/VSysIDKeySelector.java
deleted file mode 100644
index ab806c7..0000000
--- a/src/main/java/com/zdjizhi/flink/voip/functions/VSysIDKeySelector.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package com.zdjizhi.flink.voip.functions;
-
-import com.zdjizhi.flink.voip.records.Record;
-import com.zdjizhi.flink.voip.records.SIPRecord;
-import com.zdjizhi.flink.voip.records.SchemaType;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * A KeySelector implementation that extracts the key(VSysID) from an ObjectNode.
- *
- * @author chaoc
- * @since 1.0
- */
-public class VSysIDKeySelector implements KeySelector<ObjectNode, Tuple2<Integer, Address>> {
-
- /**
- * Extracts the composite key (VSysID, Address) from the given ObjectNode.
- *
- * @param obj The ObjectNode representing a SIP record.
- * @return A Tuple2 containing the extracted key (VSysID, Address).
- * @throws Exception Thrown if an error occurs during key extraction.
- */
- @Override
- public Tuple2<Integer, Address> getKey(ObjectNode obj) throws Exception {
- final Record record = new Record(obj);
- final Address address;
- if (record.getSchemaType() == SchemaType.SIP) {
- final SIPRecord sipRecord = new SIPRecord(obj);
- address = Address.of(
- Tuple2.of(sipRecord.getOriginatorSdpConnectIp(), sipRecord.getOriginatorSdpMediaPort()),
- Tuple2.of(sipRecord.getResponderSdpConnectIp(), sipRecord.getResponderSdpMediaPort()));
- } else {
- address = Address.of(Tuple2.of(record.getServerIp(), record.getServerPort()),
- Tuple2.of(record.getClientIp(), record.getClientPort()));
- }
- return Tuple2.of(record.getVSysID(), address);
- }
-}
diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java
deleted file mode 100644
index c8e32b7..0000000
--- a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java
+++ /dev/null
@@ -1,180 +0,0 @@
-package com.zdjizhi.flink.voip.functions;
-
-import com.zdjizhi.flink.voip.conf.FusionConfigs;
-import com.zdjizhi.flink.voip.records.*;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.*;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
-import org.apache.flink.util.Collector;
-
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * The `VoIPFusionFunction` is a key-based co-process function that performs VoIP fusion logic
- * for SIP and RTP records. It combines SIP and RTP records belonging to the same session
- * and emits fused VoIP records. The function utilizes keyed state to store and manage SIP and
- * RTP records, and it uses timers to trigger regular clearing of the state.
- *
- * @author chaoc
- * @since 1.0
- */
-public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>
- implements FunctionHelper {
-
- private static final int MAX_RTP_LINES = 2;
- private transient Time fireInterval;
- private transient ValueState<ObjectNodeInfo> sipState;
- private transient MapState<StreamDir, ObjectNode> rtpState;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- final int minutes = getGlobalConfiguration().get(FusionConfigs.RTP_STATE_CLEAR_INTERVAL);
- fireInterval = Time.minutes(minutes);
- final RuntimeContext context = getRuntimeContext();
- final ValueStateDescriptor<ObjectNodeInfo> sipDescriptor =
- new ValueStateDescriptor<>("sip-state", ObjectNodeInfo.class);
-
- final StateTtlConfig ttlConfig = StateTtlConfig
- .newBuilder(fireInterval)
- .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
- .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
- .useProcessingTime()
- .cleanupFullSnapshot()
- .build();
- sipDescriptor.enableTimeToLive(ttlConfig);
- sipState = context.getState(sipDescriptor);
-
- final MapStateDescriptor<StreamDir, ObjectNode> rtpDescriptor =
- new MapStateDescriptor<>("rtp-state", StreamDir.class, ObjectNode.class);
-
- final StateTtlConfig rtpTtlConfig = StateTtlConfig
- .newBuilder(Time.minutes(minutes + 1))
- .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
- .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
- .useProcessingTime()
- .cleanupFullSnapshot()
- .build();
- rtpDescriptor.enableTimeToLive(rtpTtlConfig);
- rtpState = context.getMapState(rtpDescriptor);
- }
-
- // SIP
- @Override
- public void processElement2(ObjectNode sipObj,
- KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.Context ctx,
- Collector<ObjectNode> out) throws Exception {
- final Iterator<Map.Entry<StreamDir, ObjectNode>> iterator = rtpState.iterator();
- if (rtpState.isEmpty()) {
- sipState.update(new ObjectNodeInfo(sipObj, 0));
- }
- while (iterator.hasNext()) {
- final Map.Entry<StreamDir, ObjectNode> entry = iterator.next();
- final ObjectNode rtpObj = entry.getValue();
- final Record rtpRecord = new Record(rtpObj);
-
- completeOriginatorField(rtpRecord, new SIPRecord(sipObj));
-
- rtpRecord.merge(sipObj)
- .setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
- out.collect(rtpObj);
- iterator.remove();
-
- switch (entry.getKey()) {
- case S2C:
- case C2S:
- ObjectNodeInfo info = sipState.value();
- if (info != null) {
- info.incTimes();
- if (info.getTimes() >= MAX_RTP_LINES) {
- sipState.clear();
- } else {
- sipState.update(new ObjectNodeInfo(sipObj, info.getTimes()));
- }
- } else {
- sipState.update(new ObjectNodeInfo(sipObj, 1));
- }
- break;
- default:
- // Double directional:
- // In the context of VoIP fusion, only one RTP double directional stream
- sipState.clear();
- }
- }
-
- registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds());
- }
-
- // RTP
- @Override
- public void processElement1(ObjectNode rtpObj,
- KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.Context ctx,
- Collector<ObjectNode> out) throws Exception {
- final Record rtpRecord = new Record(rtpObj);
- final ObjectNodeInfo info = sipState.value();
-
- final StreamDir streamDir = rtpRecord.getStreamDir();
- if (null != info) {
-
- completeOriginatorField(rtpRecord, new SIPRecord(info.getObj()));
-
- rtpRecord.merge(info.getObj())
- .setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
- out.collect(rtpObj);
-
- switch (streamDir) {
- case C2S:
- case S2C:
- info.incTimes();
- if (info.getTimes() >= MAX_RTP_LINES) {
- sipState.clear();
- }
- break;
- default:
- // Double
- sipState.clear();
- }
-
- } else {
- rtpState.put(streamDir, rtpObj);
- }
-
- registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds());
- }
-
- @Override
- public void onTimer(long timestamp,
- KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx,
- Collector<ObjectNode> out) throws Exception {
- for (ObjectNode obj : rtpState.values()) {
- final Record rtpRecord = new Record(obj);
- rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
- out.collect(obj);
- }
- rtpState.clear();
- sipState.clear();
- }
-
- // ======================================================================
- // PRIVATE HELPER
- // ======================================================================
-
- private void completeOriginatorField(final Record rtpRecord, final SIPRecord sipRecord) {
- if (StringUtils.isNoneBlank(rtpRecord.getClientIp())) {
- if (StringUtils.equals(sipRecord.getOriginatorSdpConnectIp(), rtpRecord.getClientIp())) {
- rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.C2S.getCode());
- return;
- } else if (StringUtils.equals(sipRecord.getResponderSdpConnectIp(), rtpRecord.getClientIp())) {
- rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.S2C.getCode());
- return;
- }
- }
- rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
- }
-} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java b/src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java
deleted file mode 100644
index c8df7db..0000000
--- a/src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package com.zdjizhi.flink.voip.records;
-
-import lombok.Getter;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-
-public class RTPRecord extends Record {
-
- public static final String F_ORIGINATOR_DIR = "rtp_originator_dir";
-
- public RTPRecord(ObjectNode obj) {
- super(obj);
- }
-
- @Getter
- public enum OriginatorDir {
-
- UNKNOWN(0),
- C2S(1),
- S2C(2);
-
- private final int code;
-
- OriginatorDir(int code) {
- this.code = code;
- }
- }
-}
diff --git a/src/main/java/com/zdjizhi/flink/voip/records/Record.java b/src/main/java/com/zdjizhi/flink/voip/records/Record.java
deleted file mode 100644
index 46052f3..0000000
--- a/src/main/java/com/zdjizhi/flink/voip/records/Record.java
+++ /dev/null
@@ -1,224 +0,0 @@
-package com.zdjizhi.flink.voip.records;
-
-import lombok.AllArgsConstructor;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.IntNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
-
-/**
- * Record class represents a data record with various fields.
- * <p>
- * It provides getter and setter methods for accessing the fields of the data record.
- *
- * @author chaoc
- * @since 1.0
- */
-@AllArgsConstructor
-public class Record {
-
- /**
- * 字段名:数据记录中的所属 vsys
- */
- public static final String F_COMMON_VSYS_ID = "vsys_id";
- /**
- * 字段名:数据记录中的字段类型
- */
- public static final String F_COMMON_SCHEMA_TYPE = "decoded_as";
- /**
- * 字段名:数据记录中的流类型
- */
- public static final String F_COMMON_STREAM_DIR = "common_stream_dir";
- /**
- * 字段名:数据记录中的流类型的 Flags
- */
- public static final String F_FLAGS = "flags";
- /**
- * 字段名:数据记录中的服务端地址
- */
- public static final String F_COMMON_SERVER_IP = "server_ip";
- /**
- * 字段名:数据记录中的服务端端口
- */
- public static final String F_COMMON_SERVER_PORT = "server_port";
- /**
- * 字段名:数据记录中的客户端地址
- */
- public static final String F_COMMON_CLIENT_IP = "client_ip";
- /**
- * 字段名:数据记录中的客户端端口
- */
- public static final String F_COMMON_CLIENT_PORT = "client_port";
-
- /**
- * ObjectNode data.
- */
- protected final ObjectNode obj;
-
- /**
- * Get the VSys ID from the data record.
- *
- * @return The VSys ID as an integer.
- */
- public int getVSysID() {
- int v = Record.getInt(obj, F_COMMON_VSYS_ID);
- return v == 0 ? 1 : v;
- }
-
- /**
- * Get the schema type from the data record.
- *
- * @return The schema type.
- */
- public final SchemaType getSchemaType() {
- return SchemaType.of(Record.getString(obj, F_COMMON_SCHEMA_TYPE));
- }
-
- /**
- * Get the stream direction from the data record.
- *
- * @return The stream direction.
- */
- public final StreamDir getStreamDir() {
- return StreamDir.ofFlags(Record.getLong(obj, F_FLAGS));
- }
-
- /**
- * Get the server IP address from the data record.
- *
- * @return The server IP address as a string.
- */
- public final String getServerIp() {
- return Record.getString(obj, F_COMMON_SERVER_IP);
- }
-
- /**
- * Get the server port from the data record.
- *
- * @return The server port as an integer.
- */
- public final int getServerPort() {
- return Record.getInt(obj, F_COMMON_SERVER_PORT);
- }
-
- /**
- * Get the client IP address from the data record.
- *
- * @return The client IP address as a string.
- */
- public final String getClientIp() {
- return Record.getString(obj, F_COMMON_CLIENT_IP);
- }
-
- /**
- * Get the client port from the data record.
- *
- * @return The client port as an integer.
- */
- public final int getClientPort() {
- return Record.getInt(obj, F_COMMON_CLIENT_PORT);
- }
-
- /**
- * Set an integer value to the specified field in the data record.
- *
- * @param name The name of the field.
- * @param value The integer value to set.
- */
- public final void setInt(final String name, final int value) {
- obj.set(name, IntNode.valueOf(value));
- }
-
- /**
- * Set a string value to the specified field in the data record.
- *
- * @param name The name of the field.
- * @param value The string value to set.
- */
- public final void setString(final String name, final String value) {
- obj.set(name, TextNode.valueOf(value));
- }
-
- /**
- * Merge the fields of another ObjectNode into the current data record.
- *
- * @param other The ObjectNode containing the fields to be merged.
- * @return This record.
- */
- public final Record merge(final ObjectNode other) {
- other.fields().forEachRemaining(entry -> obj.set(entry.getKey(), entry.getValue()));
- return this;
- }
-
- /**
- * Get an integer value from the specified field in the ObjectNode.
- *
- * @param obj The ObjectNode to get the value from.
- * @param field The name of the field.
- * @param defaultValue The default value to return if the field is not found or is not an integer.
- * @return The integer value from the field or the default value if the field is not found or is not an integer.
- */
- public static int getInt(final ObjectNode obj, final String field, final int defaultValue) {
- final JsonNode node = obj.get(field);
- return node != null && node.isInt() ? node.asInt(defaultValue) : defaultValue;
- }
-
- /**
- * Get an integer value from the specified field in the ObjectNode.
- *
- * @param obj The ObjectNode to get the value from.
- * @param field The name of the field.
- * @return The integer value from the field or 0 if the field is not found or is not an integer.
- */
- public static int getInt(final ObjectNode obj, final String field) {
- return getInt(obj, field, 0);
- }
-
- /**
- * Gets a long value from the specified field in the ObjectNode.
- *
- * @param obj The ObjectNode to get the value from.
- * @param field The name of the field.
- * @param defaultValue The default value to return if the field is not found or is not a long.
- * @return The long value from the field or the default value if the field is not found or is not a long.
- */
- public static long getLong(final ObjectNode obj, final String field, final long defaultValue) {
- final JsonNode node = obj.get(field);
- return node != null && node.isNumber() ? node.asLong() : defaultValue;
- }
-
- /**
- * Gets a long value from the specified field in the ObjectNode.
- *
- * @param obj The ObjectNode to get the value from.
- * @param field The name of the field.
- * @return The long value from the field or 0 if the field is not found or is not a long.
- */
- private static long getLong(final ObjectNode obj, final String field) {
- return getLong(obj, field, 0L);
- }
-
- /**
- * Get a string value from the specified field in the ObjectNode.
- *
- * @param obj The ObjectNode to get the value from.
- * @param field The name of the field.
- * @param defaultValue The default value to return if the field is not found or is not a string.
- * @return The string value from the field or the default value if the field is not found or is not a string.
- */
- public static String getString(final ObjectNode obj, final String field, final String defaultValue) {
- final JsonNode node = obj.get(field);
- return node != null && node.isTextual() ? node.asText(defaultValue) : defaultValue;
- }
-
- /**
- * Get a string value from the specified field in the ObjectNode.
- *
- * @param obj The ObjectNode to get the value from.
- * @param field The name of the field.
- * @return The string value from the field or null if the field is not found or is not a string.
- */
- public static String getString(final ObjectNode obj, final String field) {
- return getString(obj, field, null);
- }
-} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/flink/voip/records/SIPRecord.java b/src/main/java/com/zdjizhi/flink/voip/records/SIPRecord.java
deleted file mode 100644
index 1ece4a5..0000000
--- a/src/main/java/com/zdjizhi/flink/voip/records/SIPRecord.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.zdjizhi.flink.voip.records;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * SIP(Session Initiation Protocol)data record class, used to parse and access SIP data records.
- *
- * @author chaoc
- * @since 1.0
- */
-public class SIPRecord extends Record {
-
- /**
- * Field Name: SIP 通话的会话 ID
- */
- public static final String F_CALL_ID = "sip_call_id";
- /**
- * Field Name: SIP 通话的协调的主叫语音传输 IP
- */
- public static final String F_ORIGINATOR_SDP_CONNECT_IP = "sip_originator_sdp_connect_ip";
- /**
- * Field Name: SIP 通话的协调的主叫语音传输端口
- */
- public static final String F_ORIGINATOR_SDP_MEDIA_PORT = "sip_originator_sdp_media_port";
- /**
- * Field Name: SIP 通话的协调的被叫语音传输 IP
- */
- public static final String F_RESPONDER_SDP_CONNECT_IP = "sip_responder_sdp_connect_ip";
- /**
- * Field Name: SIP 通话的协调的被叫语音传输端口
- */
- public static final String F_RESPONDER_SDP_MEDIA_PORT = "sip_responder_sdp_media_port";
-
- public SIPRecord(final ObjectNode obj) {
- super(obj);
- }
-
- public String getCallID() {
- return Record.getString(obj, F_CALL_ID);
- }
-
- public String getOriginatorSdpConnectIp() {
- return Record.getString(obj, F_ORIGINATOR_SDP_CONNECT_IP);
- }
-
- public int getOriginatorSdpMediaPort() {
- return Record.getInt(obj, F_ORIGINATOR_SDP_MEDIA_PORT);
- }
-
- public String getResponderSdpConnectIp() {
- return Record.getString(obj, F_RESPONDER_SDP_CONNECT_IP);
- }
-
- public int getResponderSdpMediaPort() {
- return Record.getInt(obj, F_RESPONDER_SDP_MEDIA_PORT);
- }
-}
diff --git a/src/main/java/com/zdjizhi/flink/voip/records/SchemaType.java b/src/main/java/com/zdjizhi/flink/voip/records/SchemaType.java
deleted file mode 100644
index 40c4a91..0000000
--- a/src/main/java/com/zdjizhi/flink/voip/records/SchemaType.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package com.zdjizhi.flink.voip.records;
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-
-/**
- * The SchemaType enum represents different types of data schemas.
- *
- * @author chaoc
- * @since 1.0
- */
-@AllArgsConstructor
-@Getter
-public enum SchemaType {
-
- /**
- * Represents the SIP schema type.
- */
- SIP("SIP"),
-
- /**
- * Represents the RTP schema type.
- */
- RTP("RTP"),
-
- /**
- * Represents the VoIP schema type.
- */
- VOIP("VoIP");
-
- /**
- * The string value of the SchemaType.
- */
- private final String value;
-
- /**
- * Get the SchemaType enum based on the provided string value.
- *
- * @param value The string value of the SchemaType to retrieve.
- * @return The corresponding SchemaType enum.
- * @throws IllegalArgumentException if the provided value does not match any known SchemaType.
- */
- public static SchemaType of(final String value) {
- for (SchemaType schemaType : values()) {
- if (schemaType.value.equalsIgnoreCase(value)) {
- return schemaType;
- }
- }
- throw new IllegalArgumentException("Unknown SchemaType value '" + value + "'.");
- }
-} \ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java b/src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java
deleted file mode 100644
index 3732058..0000000
--- a/src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package com.zdjizhi.flink.voip.records;
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-
-/**
- * The StreamDir enum represents different types of data stream directions.
- *
- * @author chaoc
- * @since 1.0
- */
-@AllArgsConstructor
-@Getter
-public enum StreamDir {
-
- /**
- * Represents the Client-to-Server (C2S) stream direction.
- */
- C2S(1),
-
- /**
- * Represents the Server-to-Client (S2C) stream direction.
- */
- S2C(2),
-
- /**
- * Represents the bidirectional (double) stream direction.
- */
- DOUBLE(3);
-
- /**
- * The integer value of the StreamDir.
- */
- private final int value;
-
- /**
- * Get the StreamDir enum based on the provided integer value.
- *
- * @param value The integer value of the StreamDir to retrieve.
- * @return The corresponding StreamDir enum.
- * @throws IllegalArgumentException if the provided value does not match any known StreamDir.
- */
- public static StreamDir of(int value) {
- for (StreamDir streamDir : values()) {
- if (value == streamDir.value) {
- return streamDir;
- }
- }
- throw new IllegalArgumentException("Unknown StreamDir value '" + value + "'.");
- }
-
- /**
- * Get the StreamDir enum based on the provided flags value.
- *
- * @param flags The flags.
- * @return The corresponding StreamDir enum.
- * @throws IllegalArgumentException if the provided value does not match any known StreamDir.
- */
- public static StreamDir ofFlags(long flags) {
- int v = 0;
- if ((flags & 8192) == 8192) {
- v += 1;
- }
- if ((flags & 16384) == 16384) {
- v += 2;
- }
- return of(v);
- }
-}
diff --git a/src/main/resources/META-INF/services/com.geedgenetworks.flink.easy.common.api.UDFFactory b/src/main/resources/META-INF/services/com.geedgenetworks.flink.easy.common.api.UDFFactory
new file mode 100644
index 0000000..fdaf9e5
--- /dev/null
+++ b/src/main/resources/META-INF/services/com.geedgenetworks.flink.easy.common.api.UDFFactory
@@ -0,0 +1 @@
+com.geedgenetworks.flink.easy.application.voip.VoipUDFFactory \ No newline at end of file
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
deleted file mode 100644
index 7cc9d25..0000000
--- a/src/main/resources/application.properties
+++ /dev/null
@@ -1,7 +0,0 @@
-sink.kafka.topic=VOIP-CONVERSATION-RECORD
-sink.kafka.props.bootstrap.servers=localhost:9292
-
-
-source.kafka.topic=VOIP-RECORD
-source.kafka.props.bootstrap.servers=localhost:9292
-source.kafka.props.group.id=flink-voip-fusion \ No newline at end of file
diff --git a/src/main/resources/jobs/job.yml b/src/main/resources/jobs/job.yml
new file mode 100644
index 0000000..3f8b7d3
--- /dev/null
+++ b/src/main/resources/jobs/job.yml
@@ -0,0 +1,1795 @@
+job:
+ name: correlation_sip_rtp_session
+ parallelism: 1
+ active-pipeline:
+ - only-voip-records
+ - fusion-fail-records
+ - all-errors-records
+
+source:
+ - name: session-records
+ type: kafka
+ option:
+ topic: VOIP-RECORD
+ properties:
+ bootstrap.servers: 192.168.44.12:9094
+ group.id: sip-rtp-correlation
+ security.protocol: SASL_PLAINTEXT
+ sasl.mechanism: PLAIN
+ sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
+ format: json
+ schema:
+ ## General
+ - name: recv_time
+ data-type: BIGINT NOT NULL
+ - name: log_id
+ data-type: BIGINT NOT NULL
+ - name: decoded_as
+ data-type: STRING NOT NULL
+ - name: session_id
+ data-type: BIGINT NOT NULL
+ - name: start_timestamp_ms
+ data-type: BIGINT NOT NULL
+ # row-time:
+ - name: start_timestamp
+ for: TO_TIMESTAMP_LTZ(start_timestamp_ms, 3)
+ watermark: start_timestamp - INTERVAL '5' MINUTE
+ - name: end_timestamp_ms
+ data-type: BIGINT
+ - name: duration_ms
+ data-type: INT
+ - name: tcp_handshake_latency_ms
+ data-type: INT
+ - name: ingestion_time
+ data-type: BIGINT
+ - name: processing_time
+ data-type: BIGINT
+ - name: insert_time
+ data-type: BIGINT
+ - name: device_id
+ data-type: STRING
+ - name: out_link_id
+ data-type: INT
+ - name: in_link_id
+ data-type: INT
+ - name: device_tag
+ data-type: STRING
+ - name: data_center
+ data-type: STRING
+ - name: device_group
+ data-type: STRING
+ - name: sled_ip
+ data-type: STRING
+ - name: address_type
+ data-type: INT
+ - name: direction
+ data-type: STRING
+ - name: vsys_id
+ data-type: INT
+ - name: t_vsys_id
+ data-type: INT
+ - name: flags
+ data-type: BIGINT
+ - name: flags_identify_info
+ data-type: STRING
+ - name: c2s_ttl
+ data-type: INT
+ - name: s2c_ttl
+ data-type: INT
+ ## Treatment
+ - name: security_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: security_action
+ data-type: STRING
+ - name: monitor_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: shaping_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: proxy_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: statistics_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: sc_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: sc_rsp_raw
+ data-type: ARRAY<BIGINT>
+ - name: sc_rsp_decrypted
+ data-type: ARRAY<BIGINT>
+ - name: proxy_action
+ data-type: STRING
+ - name: proxy_pinning_status
+ data-type: INT
+ - name: proxy_intercept_status
+ data-type: INT
+ - name: proxy_passthrough_reason
+ data-type: STRING
+ - name: proxy_client_side_latency_ms
+ data-type: INT
+ - name: proxy_server_side_latency_ms
+ data-type: INT
+ - name: proxy_client_side_version
+ data-type: STRING
+ - name: proxy_server_side_version
+ data-type: STRING
+ - name: proxy_cert_verify
+ data-type: INT
+ - name: proxy_intercept_error
+ data-type: STRING
+ - name: monitor_mirrored_pkts
+ data-type: INT
+ - name: monitor_mirrored_bytes
+ data-type: INT
+ ## Source
+ - name: client_ip
+ data-type: STRING
+ - name: client_port
+ data-type: INT
+ - name: client_os_desc
+ data-type: STRING
+ - name: client_geolocation
+ data-type: STRING
+ - name: client_country
+ data-type: STRING
+ - name: client_super_administrative_area
+ data-type: STRING
+ - name: client_administrative_area
+ data-type: STRING
+ - name: client_sub_administrative_area
+ data-type: STRING
+ - name: client_asn
+ data-type: BIGINT
+ - name: subscriber_id
+ data-type: STRING
+ - name: imei
+ data-type: STRING
+ - name: imsi
+ data-type: STRING
+ - name: phone_number
+ data-type: STRING
+ - name: apn
+ data-type: STRING
+ ## Destination
+ - name: server_ip
+ data-type: STRING
+ - name: server_port
+ data-type: INT
+ - name: server_os_desc
+ data-type: STRING
+ - name: server_geolocation
+ data-type: STRING
+ - name: server_country
+ data-type: STRING
+ - name: server_super_administrative_area
+ data-type: STRING
+ - name: server_administrative_area
+ data-type: STRING
+ - name: server_sub_administrative_area
+ data-type: STRING
+ - name: server_asn
+ data-type: BIGINT
+ - name: server_fqdn
+ data-type: STRING
+ - name: server_domain
+ data-type: STRING
+ - name: fqdn_category_list
+ data-type: ARRAY<INT>
+ ## Application
+ - name: app_transition
+ data-type: STRING
+ - name: app
+ data-type: STRING
+ - name: app_category
+ data-type: STRING
+ - name: app_debug_info
+ data-type: STRING
+ - name: app_content
+ data-type: STRING
+ - name: app_extra_info
+ data-type: STRING
+ ## Protocol
+ - name: ip_protocol
+ data-type: STRING
+ - name: decoded_path
+ data-type: STRING
+ ## Transmission
+ - name: sent_pkts
+ data-type: BIGINT
+ - name: received_pkts
+ data-type: BIGINT
+ - name: sent_bytes
+ data-type: BIGINT
+ - name: received_bytes
+ data-type: BIGINT
+ - name: tcp_c2s_ip_fragments
+ data-type: BIGINT
+ - name: tcp_s2c_ip_fragments
+ data-type: BIGINT
+ - name: tcp_c2s_lost_bytes
+ data-type: BIGINT
+ - name: tcp_s2c_lost_bytes
+ data-type: BIGINT
+ - name: tcp_c2s_o3_pkts
+ data-type: BIGINT
+ - name: tcp_s2c_o3_pkts
+ data-type: BIGINT
+ - name: tcp_c2s_rtx_pkts
+ data-type: BIGINT
+ - name: tcp_s2c_rtx_pkts
+ data-type: BIGINT
+ - name: tcp_c2s_rtx_bytes
+ data-type: BIGINT
+ - name: tcp_s2c_rtx_bytes
+ data-type: BIGINT
+ - name: tcp_rtt_ms
+ data-type: INT
+ - name: tcp_client_isn
+ data-type: BIGINT
+ - name: tcp_server_isn
+ data-type: BIGINT
+ ## Other
+ - name: packet_capture_file
+ data-type: STRING
+ - name: in_src_mac
+ data-type: STRING
+ - name: out_src_mac
+ data-type: STRING
+ - name: in_dest_mac
+ data-type: STRING
+ - name: out_dest_mac
+ data-type: STRING
+ - name: encapsulation
+ data-type: STRING
+ - name: dup_traffic_flag
+ data-type: INT
+ - name: tunnel_id_list
+ data-type: ARRAY<BIGINT>
+ - name: tunnel_endpoint_a_desc
+ data-type: STRING
+ - name: tunnel_endpoint_b_desc
+ data-type: STRING
+ ## SIP
+ - name: sip_call_id
+ data-type: STRING
+ - name: sip_originator_description
+ data-type: STRING
+ - name: sip_responder_description
+ data-type: STRING
+ - name: sip_user_agent
+ data-type: STRING
+ - name: sip_server
+ data-type: STRING
+ - name: sip_originator_sdp_connect_ip
+ data-type: STRING
+ - name: sip_originator_sdp_media_port
+ data-type: INT
+ - name: sip_originator_sdp_media_type
+ data-type: STRING
+ - name: sip_originator_sdp_content
+ data-type: STRING
+ - name: sip_responder_sdp_connect_ip
+ data-type: STRING
+ - name: sip_responder_sdp_media_port
+ data-type: INT
+ - name: sip_responder_sdp_media_type
+ data-type: STRING
+ - name: sip_responder_sdp_content
+ data-type: STRING
+ - name: sip_duration_s
+ data-type: INT
+ - name: sip_bye
+ data-type: STRING
+ ## RTP
+ - name: rtp_payload_type_c2s
+ data-type: INT
+ - name: rtp_payload_type_s2c
+ data-type: INT
+ - name: rtp_pcap_path
+ data-type: STRING
+ - name: rtp_originator_dir
+ data-type: INT
+
+sink:
+ # 错误的 SIP 和 RTP
+ - name: all-errors-records
+ type: kafka
+ on: errors-records
+ option:
+ topic: VOIP-CONVERSATTON-RECORD
+ properties:
+ bootstrap.servers: 192.168.44.12:9094
+ security.protocol: SASL_PLAINTEXT
+ sasl.mechanism: PLAIN
+ sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
+ format: json
+ # 关联成功的 VOIP
+ - name: only-voip-records
+ on: voip-fusion.ok
+ type: kafka
+ option:
+ topic: VOIP-CONVERSATTON-RECORD
+ properties:
+ bootstrap.servers: 192.168.44.12:9094
+ security.protocol: SASL_PLAINTEXT
+ sasl.mechanism: PLAIN
+ sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
+ format: json
+ # 没有关联成功的 SIP 和 RTP
+ - name: fusion-fail-records
+ on: cannot-fusion-records
+ type: kafka
+ option:
+ topic: VOIP-CONVERSATTON-RECORD
+ properties:
+ bootstrap.servers: 192.168.44.12:9094
+ security.protocol: SASL_PLAINTEXT
+ sasl.mechanism: PLAIN
+ sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
+ format: json
+
+pipeline:
+ - name: split-for-valid
+ category: SPLIT
+ on: session-records
+ splits:
+ # Invalid ip or port
+ - name: error1-records
+ where: NOT(IS_IP_ADDRESS(client_ip)) || NOT(IS_IP_ADDRESS(server_ip)) || client_port <= 0 || server_port <= 0
+ # Invalid stream dir
+ - name: error2-records
+ where: decoded_as == 'SIP' &&STREAM_DIR(flags) != 1 && STREAM_DIR(flags) != 2 && STREAM_DIR(flags) != 3
+ # Invalid: SIP one-way stream and has invalid network address
+ - name: error3-records
+ where: decoded_as == 'SIP' && ( NOT(HAS_IP_ADDRESS(sip_originator_sdp_connect_ip, sip_responder_sdp_connect_ip)) || sip_originator_sdp_media_port <= 0 || sip_responder_sdp_media_port <= 0 )
+ - name: error4-records
+ where: decoded_as == 'SIP' && STREAM_DIR(flags) == 3 && ( NOT( IS_IP_ADDRESS(sip_originator_sdp_connect_ip) ) || NOT( IS_IP_ADDRESS(sip_responder_sdp_connect_ip) ) )
+
+ ### Notes: If internal IP address correlate is needed, please uncomment the following two items
+ # # Invalid: SIP one-way stream and internal network address
+ # - name: internal-error1-records
+ # where: decoded_as == 'SIP' && NOT(HAS_EXTERNAL_IP_ADDRESS(sip_originator_sdp_connect_ip, sip_responder_sdp_connect_ip))
+ # # Invalid: SIP double-way stream and internal network address
+ # - name: internal-error2-records
+ # where: decoded_as == 'SIP' && STREAM_DIR(flags) == 3 && ( NOT( IS_EXTERNAL_IP_ADDRESS(sip_originator_sdp_connect_ip) ) || NOT( IS_EXTERNAL_IP_ADDRESS(sip_responder_sdp_connect_ip) ) )
+ - name: split-by-protocol
+ category: SPLIT
+ on: split-for-valid
+ splits:
+ - name: rtp-records
+ where: decoded_as == 'RTP'
+ - name: sip-records
+ where: decoded_as == 'SIP'
+ - name: sip-double-way-records
+ category: CORRELATE
+ cache:
+ - name: v1
+ type: VALUE
+ ttl: 1 minute
+ schema:
+ ## General
+ - name: recv_time
+ data-type: BIGINT NOT NULL
+ - name: log_id
+ data-type: BIGINT NOT NULL
+ - name: decoded_as
+ data-type: STRING NOT NULL
+ - name: session_id
+ data-type: BIGINT NOT NULL
+ - name: start_timestamp_ms
+ data-type: BIGINT NOT NULL
+ # row-time:
+ - name: start_timestamp
+ data-type: TIMESTAMP_LTZ(3)
+ - name: end_timestamp_ms
+ data-type: BIGINT
+ - name: duration_ms
+ data-type: INT
+ - name: tcp_handshake_latency_ms
+ data-type: INT
+ - name: ingestion_time
+ data-type: BIGINT
+ - name: processing_time
+ data-type: BIGINT
+ - name: insert_time
+ data-type: BIGINT
+ - name: device_id
+ data-type: STRING
+ - name: out_link_id
+ data-type: INT
+ - name: in_link_id
+ data-type: INT
+ - name: device_tag
+ data-type: STRING
+ - name: data_center
+ data-type: STRING
+ - name: device_group
+ data-type: STRING
+ - name: sled_ip
+ data-type: STRING
+ - name: address_type
+ data-type: INT
+ - name: direction
+ data-type: STRING
+ - name: vsys_id
+ data-type: INT
+ - name: t_vsys_id
+ data-type: INT
+ - name: flags
+ data-type: BIGINT
+ - name: flags_identify_info
+ data-type: STRING
+ - name: c2s_ttl
+ data-type: INT
+ - name: s2c_ttl
+ data-type: INT
+ ## Treatment
+ - name: security_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: security_action
+ data-type: STRING
+ - name: monitor_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: shaping_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: proxy_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: statistics_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: sc_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: sc_rsp_raw
+ data-type: ARRAY<BIGINT>
+ - name: sc_rsp_decrypted
+ data-type: ARRAY<BIGINT>
+ - name: proxy_action
+ data-type: STRING
+ - name: proxy_pinning_status
+ data-type: INT
+ - name: proxy_intercept_status
+ data-type: INT
+ - name: proxy_passthrough_reason
+ data-type: STRING
+ - name: proxy_client_side_latency_ms
+ data-type: INT
+ - name: proxy_server_side_latency_ms
+ data-type: INT
+ - name: proxy_client_side_version
+ data-type: STRING
+ - name: proxy_server_side_version
+ data-type: STRING
+ - name: proxy_cert_verify
+ data-type: INT
+ - name: proxy_intercept_error
+ data-type: STRING
+ - name: monitor_mirrored_pkts
+ data-type: INT
+ - name: monitor_mirrored_bytes
+ data-type: INT
+ ## Source
+ - name: client_ip
+ data-type: STRING
+ - name: client_port
+ data-type: INT
+ - name: client_os_desc
+ data-type: STRING
+ - name: client_geolocation
+ data-type: STRING
+ - name: client_country
+ data-type: STRING
+ - name: client_super_administrative_area
+ data-type: STRING
+ - name: client_administrative_area
+ data-type: STRING
+ - name: client_sub_administrative_area
+ data-type: STRING
+ - name: client_asn
+ data-type: BIGINT
+ - name: subscriber_id
+ data-type: STRING
+ - name: imei
+ data-type: STRING
+ - name: imsi
+ data-type: STRING
+ - name: phone_number
+ data-type: STRING
+ - name: apn
+ data-type: STRING
+ ## Destination
+ - name: server_ip
+ data-type: STRING
+ - name: server_port
+ data-type: INT
+ - name: server_os_desc
+ data-type: STRING
+ - name: server_geolocation
+ data-type: STRING
+ - name: server_country
+ data-type: STRING
+ - name: server_super_administrative_area
+ data-type: STRING
+ - name: server_administrative_area
+ data-type: STRING
+ - name: server_sub_administrative_area
+ data-type: STRING
+ - name: server_asn
+ data-type: BIGINT
+ - name: server_fqdn
+ data-type: STRING
+ - name: server_domain
+ data-type: STRING
+ - name: fqdn_category_list
+ data-type: ARRAY<INT>
+ ## Application
+ - name: app_transition
+ data-type: STRING
+ - name: app
+ data-type: STRING
+ - name: app_category
+ data-type: STRING
+ - name: app_debug_info
+ data-type: STRING
+ - name: app_content
+ data-type: STRING
+ - name: app_extra_info
+ data-type: STRING
+ ## Protocol
+ - name: ip_protocol
+ data-type: STRING
+ - name: decoded_path
+ data-type: STRING
+ ## Transmission
+ - name: sent_pkts
+ data-type: BIGINT
+ - name: received_pkts
+ data-type: BIGINT
+ - name: sent_bytes
+ data-type: BIGINT
+ - name: received_bytes
+ data-type: BIGINT
+ - name: tcp_c2s_ip_fragments
+ data-type: BIGINT
+ - name: tcp_s2c_ip_fragments
+ data-type: BIGINT
+ - name: tcp_c2s_lost_bytes
+ data-type: BIGINT
+ - name: tcp_s2c_lost_bytes
+ data-type: BIGINT
+ - name: tcp_c2s_o3_pkts
+ data-type: BIGINT
+ - name: tcp_s2c_o3_pkts
+ data-type: BIGINT
+ - name: tcp_c2s_rtx_pkts
+ data-type: BIGINT
+ - name: tcp_s2c_rtx_pkts
+ data-type: BIGINT
+ - name: tcp_c2s_rtx_bytes
+ data-type: BIGINT
+ - name: tcp_s2c_rtx_bytes
+ data-type: BIGINT
+ - name: tcp_rtt_ms
+ data-type: INT
+ - name: tcp_client_isn
+ data-type: BIGINT
+ - name: tcp_server_isn
+ data-type: BIGINT
+ ## Other
+ - name: packet_capture_file
+ data-type: STRING
+ - name: in_src_mac
+ data-type: STRING
+ - name: out_src_mac
+ data-type: STRING
+ - name: in_dest_mac
+ data-type: STRING
+ - name: out_dest_mac
+ data-type: STRING
+ - name: encapsulation
+ data-type: STRING
+ - name: dup_traffic_flag
+ data-type: INT
+ - name: tunnel_id_list
+ data-type: ARRAY<BIGINT>
+ - name: tunnel_endpoint_a_desc
+ data-type: STRING
+ - name: tunnel_endpoint_b_desc
+ data-type: STRING
+ ## SIP
+ - name: sip_call_id
+ data-type: STRING
+ - name: sip_originator_description
+ data-type: STRING
+ - name: sip_responder_description
+ data-type: STRING
+ - name: sip_user_agent
+ data-type: STRING
+ - name: sip_server
+ data-type: STRING
+ - name: sip_originator_sdp_connect_ip
+ data-type: STRING
+ - name: sip_originator_sdp_media_port
+ data-type: INT
+ - name: sip_originator_sdp_media_type
+ data-type: STRING
+ - name: sip_originator_sdp_content
+ data-type: STRING
+ - name: sip_responder_sdp_connect_ip
+ data-type: STRING
+ - name: sip_responder_sdp_media_port
+ data-type: INT
+ - name: sip_responder_sdp_media_type
+ data-type: STRING
+ - name: sip_responder_sdp_content
+ data-type: STRING
+ - name: sip_duration_s
+ data-type: INT
+ - name: sip_bye
+ data-type: STRING
+ ## RTP
+ - name: rtp_payload_type_c2s
+ data-type: INT
+ - name: rtp_payload_type_s2c
+ data-type: INT
+ - name: rtp_pcap_path
+ data-type: STRING
+ - name: rtp_originator_dir
+ data-type: INT
+ where:
+ - on: sip-records
+ key-by: vsys_id, sip_call_id, SORT_ADDRESS( client_ip, client_port, server_ip, server_port )
+ process:
+ - if: STREAM_DIR(flags) != 3 && @v1.isNotNull && STREAM_DIR(@v1.$flags) != STREAM_DIR(flags)
+ then:
+ - |-
+ OUTPUT ok FROM withColumns(recv_time to sip_call_id),
+ FIND_NOT_BLANK(@v1.$sip_originator_description, sip_originator_description) AS sip_originator_description,
+ FIND_NOT_BLANK(@v1.$sip_responder_description, sip_responder_description) AS sip_responder_description,
+ FIND_NOT_BLANK(@v1.$sip_user_agent, sip_user_agent) AS sip_user_agent,
+ FIND_NOT_BLANK(@v1.$sip_server, sip_server) AS sip_server,
+ FIND_NOT_BLANK(@v1.$sip_originator_sdp_connect_ip, sip_originator_sdp_connect_ip) AS sip_originator_sdp_connect_ip,
+ (@v1.$sip_originator_sdp_media_port > 0).?(@v1.$sip_originator_sdp_media_port, sip_originator_sdp_media_port) AS sip_originator_sdp_media_port,
+ FIND_NOT_BLANK(@v1.$sip_originator_sdp_media_type, sip_originator_sdp_media_type) AS sip_originator_sdp_media_type,
+ FIND_NOT_BLANK(@v1.$sip_originator_sdp_content, sip_originator_sdp_content) AS sip_originator_sdp_content,
+ FIND_NOT_BLANK(@v1.$sip_responder_sdp_connect_ip, sip_responder_sdp_connect_ip) AS sip_responder_sdp_connect_ip,
+ (@v1.$sip_responder_sdp_media_port > 0).?(@v1.$sip_responder_sdp_media_port, sip_responder_sdp_media_port) AS sip_responder_sdp_media_port,
+ FIND_NOT_BLANK(@v1.$sip_responder_sdp_media_type, sip_responder_sdp_media_type) AS sip_responder_sdp_media_type,
+ FIND_NOT_BLANK(@v1.$sip_responder_sdp_content, sip_responder_sdp_content) AS sip_responder_sdp_content,
+ @v1.$sip_duration_s + sip_duration_s AS sip_duration_s,
+ FIND_NOT_BLANK(@v1.$sip_bye, sip_bye) AS sip_bye,
+ rtp_payload_type_c2s,
+ rtp_payload_type_s2c,
+ rtp_pcap_path,
+ rtp_originator_dir
+ - TRUNCATE v1
+ - if: STREAM_DIR(flags) != 3 && @v1.isNull
+ then:
+ - |-
+ SET v1 FROM withColumns(recv_time to rtp_originator_dir)
+ - if: STREAM_DIR(flags) == 3
+ then:
+ - |-
+ OUTPUT ok FROM withColumns(recv_time to rtp_originator_dir)
+ - SCHEDULING USING EVENT TIME FOR NOW + 60 * 1000
+ schedule:
+ - if: '@v1.isNotNull'
+ then:
+ - |-
+ OUTPUT fail FROM @v1.$recv_time AS recv_time,
+ @v1.$log_id AS log_id,
+ @v1.$decoded_as AS decoded_as,
+ @v1.$session_id AS session_id,
+ @v1.$start_timestamp_ms AS start_timestamp_ms,
+ @v1.$start_timestamp AS start_timestamp,
+ @v1.$end_timestamp_ms AS end_timestamp_ms,
+ @v1.$duration_ms AS duration_ms,
+ @v1.$tcp_handshake_latency_ms AS tcp_handshake_latency_ms,
+ @v1.$ingestion_time AS ingestion_time,
+ @v1.$processing_time AS processing_time,
+ @v1.$insert_time AS insert_time,
+ @v1.$device_id AS device_id,
+ @v1.$out_link_id AS out_link_id,
+ @v1.$in_link_id AS in_link_id,
+ @v1.$device_tag AS device_tag,
+ @v1.$data_center AS data_center,
+ @v1.$device_group AS device_group,
+ @v1.$sled_ip AS sled_ip,
+ @v1.$address_type AS address_type,
+ @v1.$direction AS direction,
+ @v1.$vsys_id AS vsys_id,
+ @v1.$t_vsys_id AS t_vsys_id,
+ @v1.$flags AS flags,
+ @v1.$flags_identify_info AS flags_identify_info,
+ @v1.$c2s_ttl AS c2s_ttl,
+ @v1.$s2c_ttl AS s2c_ttl,
+ @v1.$security_rule_list AS security_rule_list,
+ @v1.$security_action AS security_action,
+ @v1.$monitor_rule_list AS monitor_rule_list,
+ @v1.$shaping_rule_list AS shaping_rule_list,
+ @v1.$proxy_rule_list AS proxy_rule_list,
+ @v1.$statistics_rule_list AS statistics_rule_list,
+ @v1.$sc_rule_list AS sc_rule_list,
+ @v1.$sc_rsp_raw AS sc_rsp_raw,
+ @v1.$sc_rsp_decrypted AS sc_rsp_decrypted,
+ @v1.$proxy_action AS proxy_action,
+ @v1.$proxy_pinning_status AS proxy_pinning_status,
+ @v1.$proxy_intercept_status AS proxy_intercept_status,
+ @v1.$proxy_passthrough_reason AS proxy_passthrough_reason,
+ @v1.$proxy_client_side_latency_ms AS proxy_client_side_latency_ms,
+ @v1.$proxy_server_side_latency_ms AS proxy_server_side_latency_ms,
+ @v1.$proxy_client_side_version AS proxy_client_side_version,
+ @v1.$proxy_server_side_version AS proxy_server_side_version,
+ @v1.$proxy_cert_verify AS proxy_cert_verify,
+ @v1.$proxy_intercept_error AS proxy_intercept_error,
+ @v1.$monitor_mirrored_pkts AS monitor_mirrored_pkts,
+ @v1.$monitor_mirrored_bytes AS monitor_mirrored_bytes,
+ @v1.$client_ip AS client_ip,
+ @v1.$client_port AS client_port,
+ @v1.$client_os_desc AS client_os_desc,
+ @v1.$client_geolocation AS client_geolocation,
+ @v1.$client_country AS client_country,
+ @v1.$client_super_administrative_area AS client_super_administrative_area,
+ @v1.$client_administrative_area AS client_administrative_area,
+ @v1.$client_sub_administrative_area AS client_sub_administrative_area,
+ @v1.$client_asn AS client_asn,
+ @v1.$subscriber_id AS subscriber_id,
+ @v1.$imei AS imei,
+ @v1.$imsi AS imsi,
+ @v1.$phone_number AS phone_number,
+ @v1.$apn AS apn,
+ @v1.$server_ip AS server_ip,
+ @v1.$server_port AS server_port,
+ @v1.$server_os_desc AS server_os_desc,
+ @v1.$server_geolocation AS server_geolocation,
+ @v1.$server_country AS server_country,
+ @v1.$server_super_administrative_area AS server_super_administrative_area,
+ @v1.$server_administrative_area AS server_administrative_area,
+ @v1.$server_sub_administrative_area AS server_sub_administrative_area,
+ @v1.$server_asn AS server_asn,
+ @v1.$server_fqdn AS server_fqdn,
+ @v1.$server_domain AS server_domain,
+ @v1.$fqdn_category_list AS fqdn_category_list,
+ @v1.$app_transition AS app_transition,
+ @v1.$app AS app,
+ @v1.$app_category AS app_category,
+ @v1.$app_debug_info AS app_debug_info,
+ @v1.$app_content AS app_content,
+ @v1.$app_extra_info AS app_extra_info,
+ @v1.$ip_protocol AS ip_protocol,
+ @v1.$decoded_path AS decoded_path,
+ @v1.$sent_pkts AS sent_pkts,
+ @v1.$received_pkts AS received_pkts,
+ @v1.$sent_bytes AS sent_bytes,
+ @v1.$received_bytes AS received_bytes,
+ @v1.$tcp_c2s_ip_fragments AS tcp_c2s_ip_fragments,
+ @v1.$tcp_s2c_ip_fragments AS tcp_s2c_ip_fragments,
+ @v1.$tcp_c2s_lost_bytes AS tcp_c2s_lost_bytes,
+ @v1.$tcp_s2c_lost_bytes AS tcp_s2c_lost_bytes,
+ @v1.$tcp_c2s_o3_pkts AS tcp_c2s_o3_pkts,
+ @v1.$tcp_s2c_o3_pkts AS tcp_s2c_o3_pkts,
+ @v1.$tcp_c2s_rtx_pkts AS tcp_c2s_rtx_pkts,
+ @v1.$tcp_s2c_rtx_pkts AS tcp_s2c_rtx_pkts,
+ @v1.$tcp_c2s_rtx_bytes AS tcp_c2s_rtx_bytes,
+ @v1.$tcp_s2c_rtx_bytes AS tcp_s2c_rtx_bytes,
+ @v1.$tcp_rtt_ms AS tcp_rtt_ms,
+ @v1.$tcp_client_isn AS tcp_client_isn,
+ @v1.$tcp_server_isn AS tcp_server_isn,
+ @v1.$packet_capture_file AS packet_capture_file,
+ @v1.$in_src_mac AS in_src_mac,
+ @v1.$out_src_mac AS out_src_mac,
+ @v1.$in_dest_mac AS in_dest_mac,
+ @v1.$out_dest_mac AS out_dest_mac,
+ @v1.$encapsulation AS encapsulation,
+ @v1.$dup_traffic_flag AS dup_traffic_flag,
+ @v1.$tunnel_id_list AS tunnel_id_list,
+ @v1.$tunnel_endpoint_a_desc AS tunnel_endpoint_a_desc,
+ @v1.$tunnel_endpoint_b_desc AS tunnel_endpoint_b_desc,
+ @v1.$sip_call_id AS sip_call_id,
+ @v1.$sip_originator_description AS sip_originator_description,
+ @v1.$sip_responder_description AS sip_responder_description,
+ @v1.$sip_user_agent AS sip_user_agent,
+ @v1.$sip_server AS sip_server,
+ @v1.$sip_originator_sdp_connect_ip AS sip_originator_sdp_connect_ip,
+ @v1.$sip_originator_sdp_media_port AS sip_originator_sdp_media_port,
+ @v1.$sip_originator_sdp_media_type AS sip_originator_sdp_media_type,
+ @v1.$sip_originator_sdp_content AS sip_originator_sdp_content,
+ @v1.$sip_responder_sdp_connect_ip AS sip_responder_sdp_connect_ip,
+ @v1.$sip_responder_sdp_media_port AS sip_responder_sdp_media_port,
+ @v1.$sip_responder_sdp_media_type AS sip_responder_sdp_media_type,
+ @v1.$sip_responder_sdp_content AS sip_responder_sdp_content,
+ @v1.$sip_duration_s AS sip_duration_s,
+ @v1.$sip_bye AS sip_bye,
+ @v1.$rtp_payload_type_c2s AS rtp_payload_type_c2s,
+ @v1.$rtp_payload_type_s2c AS rtp_payload_type_s2c,
+ @v1.$rtp_pcap_path AS rtp_pcap_path,
+ @v1.$rtp_originator_dir AS rtp_originator_dir
+ - TRUNCATE v1
+ - name: voip-fusion
+ category: CORRELATE
+ cache:
+ - name: sip
+ type: VALUE
+ ttl: 6 minute
+ schema:
+ ## General
+ - name: recv_time
+ data-type: BIGINT NOT NULL
+ - name: log_id
+ data-type: BIGINT NOT NULL
+ - name: decoded_as
+ data-type: STRING NOT NULL
+ - name: session_id
+ data-type: BIGINT NOT NULL
+ - name: start_timestamp_ms
+ data-type: BIGINT NOT NULL
+ # row-time:
+ - name: start_timestamp
+ data-type: TIMESTAMP_LTZ(3)
+ - name: end_timestamp_ms
+ data-type: BIGINT
+ - name: duration_ms
+ data-type: INT
+ - name: tcp_handshake_latency_ms
+ data-type: INT
+ - name: ingestion_time
+ data-type: BIGINT
+ - name: processing_time
+ data-type: BIGINT
+ - name: insert_time
+ data-type: BIGINT
+ - name: device_id
+ data-type: STRING
+ - name: out_link_id
+ data-type: INT
+ - name: in_link_id
+ data-type: INT
+ - name: device_tag
+ data-type: STRING
+ - name: data_center
+ data-type: STRING
+ - name: device_group
+ data-type: STRING
+ - name: sled_ip
+ data-type: STRING
+ - name: address_type
+ data-type: INT
+ - name: direction
+ data-type: STRING
+ - name: vsys_id
+ data-type: INT
+ - name: t_vsys_id
+ data-type: INT
+ - name: flags
+ data-type: BIGINT
+ - name: flags_identify_info
+ data-type: STRING
+ - name: c2s_ttl
+ data-type: INT
+ - name: s2c_ttl
+ data-type: INT
+ ## Treatment
+ - name: security_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: security_action
+ data-type: STRING
+ - name: monitor_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: shaping_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: proxy_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: statistics_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: sc_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: sc_rsp_raw
+ data-type: ARRAY<BIGINT>
+ - name: sc_rsp_decrypted
+ data-type: ARRAY<BIGINT>
+ - name: proxy_action
+ data-type: STRING
+ - name: proxy_pinning_status
+ data-type: INT
+ - name: proxy_intercept_status
+ data-type: INT
+ - name: proxy_passthrough_reason
+ data-type: STRING
+ - name: proxy_client_side_latency_ms
+ data-type: INT
+ - name: proxy_server_side_latency_ms
+ data-type: INT
+ - name: proxy_client_side_version
+ data-type: STRING
+ - name: proxy_server_side_version
+ data-type: STRING
+ - name: proxy_cert_verify
+ data-type: INT
+ - name: proxy_intercept_error
+ data-type: STRING
+ - name: monitor_mirrored_pkts
+ data-type: INT
+ - name: monitor_mirrored_bytes
+ data-type: INT
+ ## Source
+ - name: client_ip
+ data-type: STRING
+ - name: client_port
+ data-type: INT
+ - name: client_os_desc
+ data-type: STRING
+ - name: client_geolocation
+ data-type: STRING
+ - name: client_country
+ data-type: STRING
+ - name: client_super_administrative_area
+ data-type: STRING
+ - name: client_administrative_area
+ data-type: STRING
+ - name: client_sub_administrative_area
+ data-type: STRING
+ - name: client_asn
+ data-type: BIGINT
+ - name: subscriber_id
+ data-type: STRING
+ - name: imei
+ data-type: STRING
+ - name: imsi
+ data-type: STRING
+ - name: phone_number
+ data-type: STRING
+ - name: apn
+ data-type: STRING
+ ## Destination
+ - name: server_ip
+ data-type: STRING
+ - name: server_port
+ data-type: INT
+ - name: server_os_desc
+ data-type: STRING
+ - name: server_geolocation
+ data-type: STRING
+ - name: server_country
+ data-type: STRING
+ - name: server_super_administrative_area
+ data-type: STRING
+ - name: server_administrative_area
+ data-type: STRING
+ - name: server_sub_administrative_area
+ data-type: STRING
+ - name: server_asn
+ data-type: BIGINT
+ - name: server_fqdn
+ data-type: STRING
+ - name: server_domain
+ data-type: STRING
+ - name: fqdn_category_list
+ data-type: ARRAY<INT>
+ ## Application
+ - name: app_transition
+ data-type: STRING
+ - name: app
+ data-type: STRING
+ - name: app_category
+ data-type: STRING
+ - name: app_debug_info
+ data-type: STRING
+ - name: app_content
+ data-type: STRING
+ - name: app_extra_info
+ data-type: STRING
+ ## Protocol
+ - name: ip_protocol
+ data-type: STRING
+ - name: decoded_path
+ data-type: STRING
+ ## Transmission
+ - name: sent_pkts
+ data-type: BIGINT
+ - name: received_pkts
+ data-type: BIGINT
+ - name: sent_bytes
+ data-type: BIGINT
+ - name: received_bytes
+ data-type: BIGINT
+ - name: tcp_c2s_ip_fragments
+ data-type: BIGINT
+ - name: tcp_s2c_ip_fragments
+ data-type: BIGINT
+ - name: tcp_c2s_lost_bytes
+ data-type: BIGINT
+ - name: tcp_s2c_lost_bytes
+ data-type: BIGINT
+ - name: tcp_c2s_o3_pkts
+ data-type: BIGINT
+ - name: tcp_s2c_o3_pkts
+ data-type: BIGINT
+ - name: tcp_c2s_rtx_pkts
+ data-type: BIGINT
+ - name: tcp_s2c_rtx_pkts
+ data-type: BIGINT
+ - name: tcp_c2s_rtx_bytes
+ data-type: BIGINT
+ - name: tcp_s2c_rtx_bytes
+ data-type: BIGINT
+ - name: tcp_rtt_ms
+ data-type: INT
+ - name: tcp_client_isn
+ data-type: BIGINT
+ - name: tcp_server_isn
+ data-type: BIGINT
+ ## Other
+ - name: packet_capture_file
+ data-type: STRING
+ - name: in_src_mac
+ data-type: STRING
+ - name: out_src_mac
+ data-type: STRING
+ - name: in_dest_mac
+ data-type: STRING
+ - name: out_dest_mac
+ data-type: STRING
+ - name: encapsulation
+ data-type: STRING
+ - name: dup_traffic_flag
+ data-type: INT
+ - name: tunnel_id_list
+ data-type: ARRAY<BIGINT>
+ - name: tunnel_endpoint_a_desc
+ data-type: STRING
+ - name: tunnel_endpoint_b_desc
+ data-type: STRING
+ ## SIP
+ - name: sip_call_id
+ data-type: STRING
+ - name: sip_originator_description
+ data-type: STRING
+ - name: sip_responder_description
+ data-type: STRING
+ - name: sip_user_agent
+ data-type: STRING
+ - name: sip_server
+ data-type: STRING
+ - name: sip_originator_sdp_connect_ip
+ data-type: STRING
+ - name: sip_originator_sdp_media_port
+ data-type: INT
+ - name: sip_originator_sdp_media_type
+ data-type: STRING
+ - name: sip_originator_sdp_content
+ data-type: STRING
+ - name: sip_responder_sdp_connect_ip
+ data-type: STRING
+ - name: sip_responder_sdp_media_port
+ data-type: INT
+ - name: sip_responder_sdp_media_type
+ data-type: STRING
+ - name: sip_responder_sdp_content
+ data-type: STRING
+ - name: sip_duration_s
+ data-type: INT
+ - name: sip_bye
+ data-type: STRING
+ ## RTP
+ - name: rtp_payload_type_c2s
+ data-type: INT
+ - name: rtp_payload_type_s2c
+ data-type: INT
+ - name: rtp_pcap_path
+ data-type: STRING
+ - name: rtp_originator_dir
+ data-type: INT
+ - name: rtp
+ type: LIST
+ ttl: 6 minute
+ schema:
+ ## General
+ - name: recv_time
+ data-type: BIGINT NOT NULL
+ - name: log_id
+ data-type: BIGINT NOT NULL
+ - name: decoded_as
+ data-type: STRING NOT NULL
+ - name: session_id
+ data-type: BIGINT NOT NULL
+ - name: start_timestamp_ms
+ data-type: BIGINT NOT NULL
+ # row-time:
+ - name: start_timestamp
+ data-type: TIMESTAMP_LTZ(3)
+ - name: end_timestamp_ms
+ data-type: BIGINT
+ - name: duration_ms
+ data-type: INT
+ - name: tcp_handshake_latency_ms
+ data-type: INT
+ - name: ingestion_time
+ data-type: BIGINT
+ - name: processing_time
+ data-type: BIGINT
+ - name: insert_time
+ data-type: BIGINT
+ - name: device_id
+ data-type: STRING
+ - name: out_link_id
+ data-type: INT
+ - name: in_link_id
+ data-type: INT
+ - name: device_tag
+ data-type: STRING
+ - name: data_center
+ data-type: STRING
+ - name: device_group
+ data-type: STRING
+ - name: sled_ip
+ data-type: STRING
+ - name: address_type
+ data-type: INT
+ - name: direction
+ data-type: STRING
+ - name: vsys_id
+ data-type: INT
+ - name: t_vsys_id
+ data-type: INT
+ - name: flags
+ data-type: BIGINT
+ - name: flags_identify_info
+ data-type: STRING
+ - name: c2s_ttl
+ data-type: INT
+ - name: s2c_ttl
+ data-type: INT
+ ## Treatment
+ - name: security_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: security_action
+ data-type: STRING
+ - name: monitor_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: shaping_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: proxy_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: statistics_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: sc_rule_list
+ data-type: ARRAY<BIGINT>
+ - name: sc_rsp_raw
+ data-type: ARRAY<BIGINT>
+ - name: sc_rsp_decrypted
+ data-type: ARRAY<BIGINT>
+ - name: proxy_action
+ data-type: STRING
+ - name: proxy_pinning_status
+ data-type: INT
+ - name: proxy_intercept_status
+ data-type: INT
+ - name: proxy_passthrough_reason
+ data-type: STRING
+ - name: proxy_client_side_latency_ms
+ data-type: INT
+ - name: proxy_server_side_latency_ms
+ data-type: INT
+ - name: proxy_client_side_version
+ data-type: STRING
+ - name: proxy_server_side_version
+ data-type: STRING
+ - name: proxy_cert_verify
+ data-type: INT
+ - name: proxy_intercept_error
+ data-type: STRING
+ - name: monitor_mirrored_pkts
+ data-type: INT
+ - name: monitor_mirrored_bytes
+ data-type: INT
+ ## Source
+ - name: client_ip
+ data-type: STRING
+ - name: client_port
+ data-type: INT
+ - name: client_os_desc
+ data-type: STRING
+ - name: client_geolocation
+ data-type: STRING
+ - name: client_country
+ data-type: STRING
+ - name: client_super_administrative_area
+ data-type: STRING
+ - name: client_administrative_area
+ data-type: STRING
+ - name: client_sub_administrative_area
+ data-type: STRING
+ - name: client_asn
+ data-type: BIGINT
+ - name: subscriber_id
+ data-type: STRING
+ - name: imei
+ data-type: STRING
+ - name: imsi
+ data-type: STRING
+ - name: phone_number
+ data-type: STRING
+ - name: apn
+ data-type: STRING
+ ## Destination
+ - name: server_ip
+ data-type: STRING
+ - name: server_port
+ data-type: INT
+ - name: server_os_desc
+ data-type: STRING
+ - name: server_geolocation
+ data-type: STRING
+ - name: server_country
+ data-type: STRING
+ - name: server_super_administrative_area
+ data-type: STRING
+ - name: server_administrative_area
+ data-type: STRING
+ - name: server_sub_administrative_area
+ data-type: STRING
+ - name: server_asn
+ data-type: BIGINT
+ - name: server_fqdn
+ data-type: STRING
+ - name: server_domain
+ data-type: STRING
+ - name: fqdn_category_list
+ data-type: ARRAY<INT>
+ ## Application
+ - name: app_transition
+ data-type: STRING
+ - name: app
+ data-type: STRING
+ - name: app_category
+ data-type: STRING
+ - name: app_debug_info
+ data-type: STRING
+ - name: app_content
+ data-type: STRING
+ - name: app_extra_info
+ data-type: STRING
+ ## Protocol
+ - name: ip_protocol
+ data-type: STRING
+ - name: decoded_path
+ data-type: STRING
+ ## Transmission
+ - name: sent_pkts
+ data-type: BIGINT
+ - name: received_pkts
+ data-type: BIGINT
+ - name: sent_bytes
+ data-type: BIGINT
+ - name: received_bytes
+ data-type: BIGINT
+ - name: tcp_c2s_ip_fragments
+ data-type: BIGINT
+ - name: tcp_s2c_ip_fragments
+ data-type: BIGINT
+ - name: tcp_c2s_lost_bytes
+ data-type: BIGINT
+ - name: tcp_s2c_lost_bytes
+ data-type: BIGINT
+ - name: tcp_c2s_o3_pkts
+ data-type: BIGINT
+ - name: tcp_s2c_o3_pkts
+ data-type: BIGINT
+ - name: tcp_c2s_rtx_pkts
+ data-type: BIGINT
+ - name: tcp_s2c_rtx_pkts
+ data-type: BIGINT
+ - name: tcp_c2s_rtx_bytes
+ data-type: BIGINT
+ - name: tcp_s2c_rtx_bytes
+ data-type: BIGINT
+ - name: tcp_rtt_ms
+ data-type: INT
+ - name: tcp_client_isn
+ data-type: BIGINT
+ - name: tcp_server_isn
+ data-type: BIGINT
+ ## Other
+ - name: packet_capture_file
+ data-type: STRING
+ - name: in_src_mac
+ data-type: STRING
+ - name: out_src_mac
+ data-type: STRING
+ - name: in_dest_mac
+ data-type: STRING
+ - name: out_dest_mac
+ data-type: STRING
+ - name: encapsulation
+ data-type: STRING
+ - name: dup_traffic_flag
+ data-type: INT
+ - name: tunnel_id_list
+ data-type: ARRAY<BIGINT>
+ - name: tunnel_endpoint_a_desc
+ data-type: STRING
+ - name: tunnel_endpoint_b_desc
+ data-type: STRING
+ ## SIP
+ - name: sip_call_id
+ data-type: STRING
+ - name: sip_originator_description
+ data-type: STRING
+ - name: sip_responder_description
+ data-type: STRING
+ - name: sip_user_agent
+ data-type: STRING
+ - name: sip_server
+ data-type: STRING
+ - name: sip_originator_sdp_connect_ip
+ data-type: STRING
+ - name: sip_originator_sdp_media_port
+ data-type: INT
+ - name: sip_originator_sdp_media_type
+ data-type: STRING
+ - name: sip_originator_sdp_content
+ data-type: STRING
+ - name: sip_responder_sdp_connect_ip
+ data-type: STRING
+ - name: sip_responder_sdp_media_port
+ data-type: INT
+ - name: sip_responder_sdp_media_type
+ data-type: STRING
+ - name: sip_responder_sdp_content
+ data-type: STRING
+ - name: sip_duration_s
+ data-type: INT
+ - name: sip_bye
+ data-type: STRING
+ ## RTP
+ - name: rtp_payload_type_c2s
+ data-type: INT
+ - name: rtp_payload_type_s2c
+ data-type: INT
+ - name: rtp_pcap_path
+ data-type: STRING
+ - name: rtp_originator_dir
+ data-type: INT
+ where:
+ - on: sip-double-way-records.ok
+ key-by: vsys_id, SORT_ADDRESS( sip_originator_sdp_connect_ip, sip_originator_sdp_media_port, sip_responder_sdp_connect_ip, sip_responder_sdp_media_port ) AS address
+ # SIP
+ process:
+ - SET sip FROM withColumns(recv_time to rtp_originator_dir)
+ - if: '@rtp.isNotNull && @rtp.cardinality > 0'
+ then:
+ - |-
+ FLAT OUTPUT ok FOR i IN rtp FROM
+ @i.$recv_time AS recv_time,
+ @i.$log_id AS log_id,
+ 'VoIP' AS decode_as,
+ @i.$session_id AS session_id,
+ @i.$start_timestamp_ms AS start_timestamp_ms,
+ @i.$end_timestamp_ms AS end_timestamp_ms,
+ @i.$duration_ms AS duration_ms,
+ @i.$tcp_handshake_latency_ms AS tcp_handshake_latency_ms,
+ @i.$ingestion_time AS ingestion_time,
+ @i.$processing_time AS processing_time,
+ @i.$insert_time AS insert_time,
+ @i.$device_id AS device_id,
+ @i.$out_link_id AS out_link_id,
+ @i.$in_link_id AS in_link_id,
+ @i.$device_tag AS device_tag,
+ @i.$data_center AS data_center,
+ @i.$device_group AS device_group,
+ @i.$sled_ip AS sled_ip,
+ @i.$address_type AS address_type,
+ @i.$direction AS direction,
+ @i.$vsys_id AS vsys_id,
+ @i.$t_vsys_id AS t_vsys_id,
+ @i.$flags AS flags,
+ @i.$flags_identify_info AS flags_identify_info,
+
+ @i.$c2s_ttl AS c2s_ttl,
+ @i.$s2c_ttl AS s2c_ttl,
+
+ @i.$client_ip AS client_ip,
+ @i.$client_port AS client_port,
+ @i.$client_os_desc AS client_os_desc,
+ @i.$client_geolocation AS client_geolocation,
+ @i.$client_country AS client_country,
+ @i.$client_super_administrative_area AS client_super_administrative_area,
+ @i.$client_administrative_area AS client_administrative_area,
+ @i.$client_sub_administrative_area AS client_sub_administrative_area,
+ @i.$client_asn AS client_asn,
+
+ @i.$server_ip AS server_ip,
+ @i.$server_port AS server_port,
+ @i.$server_os_desc AS server_os_desc,
+ @i.$server_geolocation AS server_geolocation,
+ @i.$server_country AS server_country,
+ @i.$server_super_administrative_area AS server_super_administrative_area,
+ @i.$server_administrative_area AS server_administrative_area,
+ @i.$server_sub_administrative_area AS server_sub_administrative_area,
+ @i.$server_asn AS server_asn,
+
+ @i.$ip_protocol AS ip_protocol,
+
+ @i.$sent_pkts AS sent_pkts,
+ @i.$received_pkts AS received_pkts,
+ @i.$sent_bytes AS sent_bytes,
+ @i.$received_bytes AS received_bytes,
+
+ @i.$sip_call_id AS sip_call_id,
+ @i.$sip_originator_description AS sip_originator_description,
+ @i.$sip_responder_description AS sip_responder_description,
+ @i.$sip_user_agent AS sip_user_agent,
+ @i.$sip_server AS sip_server,
+ @i.$sip_originator_sdp_connect_ip AS sip_originator_sdp_connect_ip,
+ @i.$sip_originator_sdp_media_port AS sip_originator_sdp_media_port,
+ @i.$sip_originator_sdp_media_type AS sip_originator_sdp_media_type,
+ @i.$sip_originator_sdp_content AS sip_originator_sdp_content,
+ @i.$sip_responder_sdp_connect_ip AS sip_responder_sdp_connect_ip,
+ @i.$sip_responder_sdp_media_port AS sip_responder_sdp_media_port,
+ @i.$sip_responder_sdp_media_type AS sip_responder_sdp_media_type,
+ @i.$sip_responder_sdp_content AS sip_responder_sdp_content,
+ @i.$sip_duration_s AS sip_duration_s,
+ @i.$sip_bye AS sip_bye,
+ @i.$rtp_payload_type_c2s AS rtp_payload_type_c2s,
+ @i.$rtp_payload_type_s2c AS rtp_payload_type_s2c,
+ @i.$rtp_pcap_path AS rtp_pcap_path,
+ ( @i.$client_ip == sip_originator_sdp_connect_ip).?(1, (@i.$client_ip == sip_responder_sdp_connect_ip).?(2, 0) ) AS rtp_originator_dir
+ - TRUNCATE rtp
+ # TODO USE EVENT
+ - SCHEDULING USING PROCESS TIME FOR NOW + 6 * 60 * 1000
+ - on: rtp-records
+ key-by: vsys_id, SORT_ADDRESS( client_ip, client_port, server_ip, server_port ) AS address
+ process:
+ - APPEND rtp FROM withColumns(recv_time to rtp_originator_dir)
+ - if: '@sip.isNotNull'
+ then:
+ - |-
+ FLAT OUTPUT ok FOR i IN rtp FROM
+ @i.$recv_time AS recv_time,
+ @i.$log_id AS log_id,
+ 'VoIP' AS decode_as,
+ @i.$session_id AS session_id,
+ @i.$start_timestamp_ms AS start_timestamp_ms,
+ @i.$end_timestamp_ms AS end_timestamp_ms,
+ @i.$duration_ms AS duration_ms,
+ @i.$tcp_handshake_latency_ms AS tcp_handshake_latency_ms,
+ @i.$ingestion_time AS ingestion_time,
+ @i.$processing_time AS processing_time,
+ @i.$insert_time AS insert_time,
+ @i.$device_id AS device_id,
+ @i.$out_link_id AS out_link_id,
+ @i.$in_link_id AS in_link_id,
+ @i.$device_tag AS device_tag,
+ @i.$data_center AS data_center,
+ @i.$device_group AS device_group,
+ @i.$sled_ip AS sled_ip,
+ @i.$address_type AS address_type,
+ @i.$direction AS direction,
+ @i.$vsys_id AS vsys_id,
+ @i.$t_vsys_id AS t_vsys_id,
+ @i.$flags AS flags,
+ @i.$flags_identify_info AS flags_identify_info,
+
+ @i.$c2s_ttl AS c2s_ttl,
+ @i.$s2c_ttl AS s2c_ttl,
+
+ @i.$client_ip AS client_ip,
+ @i.$client_port AS client_port,
+ @i.$client_os_desc AS client_os_desc,
+ @i.$client_geolocation AS client_geolocation,
+ @i.$client_country AS client_country,
+ @i.$client_super_administrative_area AS client_super_administrative_area,
+ @i.$client_administrative_area AS client_administrative_area,
+ @i.$client_sub_administrative_area AS client_sub_administrative_area,
+ @i.$client_asn AS client_asn,
+
+ @i.$server_ip AS server_ip,
+ @i.$server_port AS server_port,
+ @i.$server_os_desc AS server_os_desc,
+ @i.$server_geolocation AS server_geolocation,
+ @i.$server_country AS server_country,
+ @i.$server_super_administrative_area AS server_super_administrative_area,
+ @i.$server_administrative_area AS server_administrative_area,
+ @i.$server_sub_administrative_area AS server_sub_administrative_area,
+ @i.$server_asn AS server_asn,
+
+ @i.$ip_protocol AS ip_protocol,
+
+ @i.$sent_pkts AS sent_pkts,
+ @i.$received_pkts AS received_pkts,
+ @i.$sent_bytes AS sent_bytes,
+ @i.$received_bytes AS received_bytes,
+
+ @i.$sip_call_id AS sip_call_id,
+ @i.$sip_originator_description AS sip_originator_description,
+ @i.$sip_responder_description AS sip_responder_description,
+ @i.$sip_user_agent AS sip_user_agent,
+ @i.$sip_server AS sip_server,
+ @i.$sip_originator_sdp_connect_ip AS sip_originator_sdp_connect_ip,
+ @i.$sip_originator_sdp_media_port AS sip_originator_sdp_media_port,
+ @i.$sip_originator_sdp_media_type AS sip_originator_sdp_media_type,
+ @i.$sip_originator_sdp_content AS sip_originator_sdp_content,
+ @i.$sip_responder_sdp_connect_ip AS sip_responder_sdp_connect_ip,
+ @i.$sip_responder_sdp_media_port AS sip_responder_sdp_media_port,
+ @i.$sip_responder_sdp_media_type AS sip_responder_sdp_media_type,
+ @i.$sip_responder_sdp_content AS sip_responder_sdp_content,
+ @i.$sip_duration_s AS sip_duration_s,
+ @i.$sip_bye AS sip_bye,
+ @i.$rtp_payload_type_c2s AS rtp_payload_type_c2s,
+ @i.$rtp_payload_type_s2c AS rtp_payload_type_s2c,
+ @i.$rtp_pcap_path AS rtp_pcap_path,
+ ( @i.$client_ip == sip_originator_sdp_connect_ip).?(1, (@i.$client_ip == sip_responder_sdp_connect_ip).?(2, 0) ) AS rtp_originator_dir
+ - SCHEDULING USING PROCESS TIME FOR NOW + 6 * 60 * 1000
+ schedule:
+ - if: '@rtp.isNotNull && @rtp.cardinality > 0'
+ then:
+ - |-
+ FLAT OUTPUT fail FOR i IN rtp FROM @i.$recv_time AS recv_time,
+ @i.$log_id AS log_id,
+ @i.$decoded_as AS decoded_as,
+ @i.$session_id AS session_id,
+ @i.$start_timestamp_ms AS start_timestamp_ms,
+ @i.$start_timestamp AS start_timestamp,
+ @i.$end_timestamp_ms AS end_timestamp_ms,
+ @i.$duration_ms AS duration_ms,
+ @i.$tcp_handshake_latency_ms AS tcp_handshake_latency_ms,
+ @i.$ingestion_time AS ingestion_time,
+ @i.$processing_time AS processing_time,
+ @i.$insert_time AS insert_time,
+ @i.$device_id AS device_id,
+ @i.$out_link_id AS out_link_id,
+ @i.$in_link_id AS in_link_id,
+ @i.$device_tag AS device_tag,
+ @i.$data_center AS data_center,
+ @i.$device_group AS device_group,
+ @i.$sled_ip AS sled_ip,
+ @i.$address_type AS address_type,
+ @i.$direction AS direction,
+ @i.$vsys_id AS vsys_id,
+ @i.$t_vsys_id AS t_vsys_id,
+ @i.$flags AS flags,
+ @i.$flags_identify_info AS flags_identify_info,
+ @i.$c2s_ttl AS c2s_ttl,
+ @i.$s2c_ttl AS s2c_ttl,
+ @i.$security_rule_list AS security_rule_list,
+ @i.$security_action AS security_action,
+ @i.$monitor_rule_list AS monitor_rule_list,
+ @i.$shaping_rule_list AS shaping_rule_list,
+ @i.$proxy_rule_list AS proxy_rule_list,
+ @i.$statistics_rule_list AS statistics_rule_list,
+ @i.$sc_rule_list AS sc_rule_list,
+ @i.$sc_rsp_raw AS sc_rsp_raw,
+ @i.$sc_rsp_decrypted AS sc_rsp_decrypted,
+ @i.$proxy_action AS proxy_action,
+ @i.$proxy_pinning_status AS proxy_pinning_status,
+ @i.$proxy_intercept_status AS proxy_intercept_status,
+ @i.$proxy_passthrough_reason AS proxy_passthrough_reason,
+ @i.$proxy_client_side_latency_ms AS proxy_client_side_latency_ms,
+ @i.$proxy_server_side_latency_ms AS proxy_server_side_latency_ms,
+ @i.$proxy_client_side_version AS proxy_client_side_version,
+ @i.$proxy_server_side_version AS proxy_server_side_version,
+ @i.$proxy_cert_verify AS proxy_cert_verify,
+ @i.$proxy_intercept_error AS proxy_intercept_error,
+ @i.$monitor_mirrored_pkts AS monitor_mirrored_pkts,
+ @i.$monitor_mirrored_bytes AS monitor_mirrored_bytes,
+ @i.$client_ip AS client_ip,
+ @i.$client_port AS client_port,
+ @i.$client_os_desc AS client_os_desc,
+ @i.$client_geolocation AS client_geolocation,
+ @i.$client_country AS client_country,
+ @i.$client_super_administrative_area AS client_super_administrative_area,
+ @i.$client_administrative_area AS client_administrative_area,
+ @i.$client_sub_administrative_area AS client_sub_administrative_area,
+ @i.$client_asn AS client_asn,
+ @i.$subscriber_id AS subscriber_id,
+ @i.$imei AS imei,
+ @i.$imsi AS imsi,
+ @i.$phone_number AS phone_number,
+ @i.$apn AS apn,
+ @i.$server_ip AS server_ip,
+ @i.$server_port AS server_port,
+ @i.$server_os_desc AS server_os_desc,
+ @i.$server_geolocation AS server_geolocation,
+ @i.$server_country AS server_country,
+ @i.$server_super_administrative_area AS server_super_administrative_area,
+ @i.$server_administrative_area AS server_administrative_area,
+ @i.$server_sub_administrative_area AS server_sub_administrative_area,
+ @i.$server_asn AS server_asn,
+ @i.$server_fqdn AS server_fqdn,
+ @i.$server_domain AS server_domain,
+ @i.$fqdn_category_list AS fqdn_category_list,
+ @i.$app_transition AS app_transition,
+ @i.$app AS app,
+ @i.$app_category AS app_category,
+ @i.$app_debug_info AS app_debug_info,
+ @i.$app_content AS app_content,
+ @i.$app_extra_info AS app_extra_info,
+ @i.$ip_protocol AS ip_protocol,
+ @i.$decoded_path AS decoded_path,
+ @i.$sent_pkts AS sent_pkts,
+ @i.$received_pkts AS received_pkts,
+ @i.$sent_bytes AS sent_bytes,
+ @i.$received_bytes AS received_bytes,
+ @i.$tcp_c2s_ip_fragments AS tcp_c2s_ip_fragments,
+ @i.$tcp_s2c_ip_fragments AS tcp_s2c_ip_fragments,
+ @i.$tcp_c2s_lost_bytes AS tcp_c2s_lost_bytes,
+ @i.$tcp_s2c_lost_bytes AS tcp_s2c_lost_bytes,
+ @i.$tcp_c2s_o3_pkts AS tcp_c2s_o3_pkts,
+ @i.$tcp_s2c_o3_pkts AS tcp_s2c_o3_pkts,
+ @i.$tcp_c2s_rtx_pkts AS tcp_c2s_rtx_pkts,
+ @i.$tcp_s2c_rtx_pkts AS tcp_s2c_rtx_pkts,
+ @i.$tcp_c2s_rtx_bytes AS tcp_c2s_rtx_bytes,
+ @i.$tcp_s2c_rtx_bytes AS tcp_s2c_rtx_bytes,
+ @i.$tcp_rtt_ms AS tcp_rtt_ms,
+ @i.$tcp_client_isn AS tcp_client_isn,
+ @i.$tcp_server_isn AS tcp_server_isn,
+ @i.$packet_capture_file AS packet_capture_file,
+ @i.$in_src_mac AS in_src_mac,
+ @i.$out_src_mac AS out_src_mac,
+ @i.$in_dest_mac AS in_dest_mac,
+ @i.$out_dest_mac AS out_dest_mac,
+ @i.$encapsulation AS encapsulation,
+ @i.$dup_traffic_flag AS dup_traffic_flag,
+ @i.$tunnel_id_list AS tunnel_id_list,
+ @i.$tunnel_endpoint_a_desc AS tunnel_endpoint_a_desc,
+ @i.$tunnel_endpoint_b_desc AS tunnel_endpoint_b_desc,
+ @i.$sip_call_id AS sip_call_id,
+ @i.$sip_originator_description AS sip_originator_description,
+ @i.$sip_responder_description AS sip_responder_description,
+ @i.$sip_user_agent AS sip_user_agent,
+ @i.$sip_server AS sip_server,
+ @i.$sip_originator_sdp_connect_ip AS sip_originator_sdp_connect_ip,
+ @i.$sip_originator_sdp_media_port AS sip_originator_sdp_media_port,
+ @i.$sip_originator_sdp_media_type AS sip_originator_sdp_media_type,
+ @i.$sip_originator_sdp_content AS sip_originator_sdp_content,
+ @i.$sip_responder_sdp_connect_ip AS sip_responder_sdp_connect_ip,
+ @i.$sip_responder_sdp_media_port AS sip_responder_sdp_media_port,
+ @i.$sip_responder_sdp_media_type AS sip_responder_sdp_media_type,
+ @i.$sip_responder_sdp_content AS sip_responder_sdp_content,
+ @i.$sip_duration_s AS sip_duration_s,
+ @i.$sip_bye AS sip_bye,
+ @i.$rtp_payload_type_c2s AS rtp_payload_type_c2s,
+ @i.$rtp_payload_type_s2c AS rtp_payload_type_s2c,
+ @i.$rtp_pcap_path AS rtp_pcap_path,
+ @i.$rtp_originator_dir AS rtp_originator_dir
+ - TRUNCATE rtp
+ - if: '@sip.isNotNull'
+ then:
+ - |-
+ OUTPUT fail FROM @sip.$recv_time AS recv_time,
+ @sip.$log_id AS log_id,
+ @sip.$decoded_as AS decoded_as,
+ @sip.$session_id AS session_id,
+ @sip.$start_timestamp_ms AS start_timestamp_ms,
+ @sip.$start_timestamp AS start_timestamp,
+ @sip.$end_timestamp_ms AS end_timestamp_ms,
+ @sip.$duration_ms AS duration_ms,
+ @sip.$tcp_handshake_latency_ms AS tcp_handshake_latency_ms,
+ @sip.$ingestion_time AS ingestion_time,
+ @sip.$processing_time AS processing_time,
+ @sip.$insert_time AS insert_time,
+ @sip.$device_id AS device_id,
+ @sip.$out_link_id AS out_link_id,
+ @sip.$in_link_id AS in_link_id,
+ @sip.$device_tag AS device_tag,
+ @sip.$data_center AS data_center,
+ @sip.$device_group AS device_group,
+ @sip.$sled_ip AS sled_ip,
+ @sip.$address_type AS address_type,
+ @sip.$direction AS direction,
+ @sip.$vsys_id AS vsys_id,
+ @sip.$t_vsys_id AS t_vsys_id,
+ @sip.$flags AS flags,
+ @sip.$flags_identify_info AS flags_identify_info,
+ @sip.$c2s_ttl AS c2s_ttl,
+ @sip.$s2c_ttl AS s2c_ttl,
+ @sip.$security_rule_list AS security_rule_list,
+ @sip.$security_action AS security_action,
+ @sip.$monitor_rule_list AS monitor_rule_list,
+ @sip.$shaping_rule_list AS shaping_rule_list,
+ @sip.$proxy_rule_list AS proxy_rule_list,
+ @sip.$statistics_rule_list AS statistics_rule_list,
+ @sip.$sc_rule_list AS sc_rule_list,
+ @sip.$sc_rsp_raw AS sc_rsp_raw,
+ @sip.$sc_rsp_decrypted AS sc_rsp_decrypted,
+ @sip.$proxy_action AS proxy_action,
+ @sip.$proxy_pinning_status AS proxy_pinning_status,
+ @sip.$proxy_intercept_status AS proxy_intercept_status,
+ @sip.$proxy_passthrough_reason AS proxy_passthrough_reason,
+ @sip.$proxy_client_side_latency_ms AS proxy_client_side_latency_ms,
+ @sip.$proxy_server_side_latency_ms AS proxy_server_side_latency_ms,
+ @sip.$proxy_client_side_version AS proxy_client_side_version,
+ @sip.$proxy_server_side_version AS proxy_server_side_version,
+ @sip.$proxy_cert_verify AS proxy_cert_verify,
+ @sip.$proxy_intercept_error AS proxy_intercept_error,
+ @sip.$monitor_mirrored_pkts AS monitor_mirrored_pkts,
+ @sip.$monitor_mirrored_bytes AS monitor_mirrored_bytes,
+ @sip.$client_ip AS client_ip,
+ @sip.$client_port AS client_port,
+ @sip.$client_os_desc AS client_os_desc,
+ @sip.$client_geolocation AS client_geolocation,
+ @sip.$client_country AS client_country,
+ @sip.$client_super_administrative_area AS client_super_administrative_area,
+ @sip.$client_administrative_area AS client_administrative_area,
+ @sip.$client_sub_administrative_area AS client_sub_administrative_area,
+ @sip.$client_asn AS client_asn,
+ @sip.$subscriber_id AS subscriber_id,
+ @sip.$imei AS imei,
+ @sip.$imsi AS imsi,
+ @sip.$phone_number AS phone_number,
+ @sip.$apn AS apn,
+ @sip.$server_ip AS server_ip,
+ @sip.$server_port AS server_port,
+ @sip.$server_os_desc AS server_os_desc,
+ @sip.$server_geolocation AS server_geolocation,
+ @sip.$server_country AS server_country,
+ @sip.$server_super_administrative_area AS server_super_administrative_area,
+ @sip.$server_administrative_area AS server_administrative_area,
+ @sip.$server_sub_administrative_area AS server_sub_administrative_area,
+ @sip.$server_asn AS server_asn,
+ @sip.$server_fqdn AS server_fqdn,
+ @sip.$server_domain AS server_domain,
+ @sip.$fqdn_category_list AS fqdn_category_list,
+ @sip.$app_transition AS app_transition,
+ @sip.$app AS app,
+ @sip.$app_category AS app_category,
+ @sip.$app_debug_info AS app_debug_info,
+ @sip.$app_content AS app_content,
+ @sip.$app_extra_info AS app_extra_info,
+ @sip.$ip_protocol AS ip_protocol,
+ @sip.$decoded_path AS decoded_path,
+ @sip.$sent_pkts AS sent_pkts,
+ @sip.$received_pkts AS received_pkts,
+ @sip.$sent_bytes AS sent_bytes,
+ @sip.$received_bytes AS received_bytes,
+ @sip.$tcp_c2s_ip_fragments AS tcp_c2s_ip_fragments,
+ @sip.$tcp_s2c_ip_fragments AS tcp_s2c_ip_fragments,
+ @sip.$tcp_c2s_lost_bytes AS tcp_c2s_lost_bytes,
+ @sip.$tcp_s2c_lost_bytes AS tcp_s2c_lost_bytes,
+ @sip.$tcp_c2s_o3_pkts AS tcp_c2s_o3_pkts,
+ @sip.$tcp_s2c_o3_pkts AS tcp_s2c_o3_pkts,
+ @sip.$tcp_c2s_rtx_pkts AS tcp_c2s_rtx_pkts,
+ @sip.$tcp_s2c_rtx_pkts AS tcp_s2c_rtx_pkts,
+ @sip.$tcp_c2s_rtx_bytes AS tcp_c2s_rtx_bytes,
+ @sip.$tcp_s2c_rtx_bytes AS tcp_s2c_rtx_bytes,
+ @sip.$tcp_rtt_ms AS tcp_rtt_ms,
+ @sip.$tcp_client_isn AS tcp_client_isn,
+ @sip.$tcp_server_isn AS tcp_server_isn,
+ @sip.$packet_capture_file AS packet_capture_file,
+ @sip.$in_src_mac AS in_src_mac,
+ @sip.$out_src_mac AS out_src_mac,
+ @sip.$in_dest_mac AS in_dest_mac,
+ @sip.$out_dest_mac AS out_dest_mac,
+ @sip.$encapsulation AS encapsulation,
+ @sip.$dup_traffic_flag AS dup_traffic_flag,
+ @sip.$tunnel_id_list AS tunnel_id_list,
+ @sip.$tunnel_endpoint_a_desc AS tunnel_endpoint_a_desc,
+ @sip.$tunnel_endpoint_b_desc AS tunnel_endpoint_b_desc,
+ @sip.$sip_call_id AS sip_call_id,
+ @sip.$sip_originator_description AS sip_originator_description,
+ @sip.$sip_responder_description AS sip_responder_description,
+ @sip.$sip_user_agent AS sip_user_agent,
+ @sip.$sip_server AS sip_server,
+ @sip.$sip_originator_sdp_connect_ip AS sip_originator_sdp_connect_ip,
+ @sip.$sip_originator_sdp_media_port AS sip_originator_sdp_media_port,
+ @sip.$sip_originator_sdp_media_type AS sip_originator_sdp_media_type,
+ @sip.$sip_originator_sdp_content AS sip_originator_sdp_content,
+ @sip.$sip_responder_sdp_connect_ip AS sip_responder_sdp_connect_ip,
+ @sip.$sip_responder_sdp_media_port AS sip_responder_sdp_media_port,
+ @sip.$sip_responder_sdp_media_type AS sip_responder_sdp_media_type,
+ @sip.$sip_responder_sdp_content AS sip_responder_sdp_content,
+ @sip.$sip_duration_s AS sip_duration_s,
+ @sip.$sip_bye AS sip_bye,
+ @sip.$rtp_payload_type_c2s AS rtp_payload_type_c2s,
+ @sip.$rtp_payload_type_s2c AS rtp_payload_type_s2c,
+ @sip.$rtp_pcap_path AS rtp_pcap_path,
+ @sip.$rtp_originator_dir AS rtp_originator_dir
+ - TRUNCATE sip
+ - name: cannot-fusion-records
+ category: UNION
+ on:
+ - sip-double-way-records.fail # 没有双向关联成功的 SIP 单向流日志
+ - voip-fusion.fail # 没有关联上 SIP 的 RTP 日志 & 没关联上 RTP 的 DOUBLE SIP 日志
+ - name: errors-records
+ category: UNION
+ on:
+ - error1-records
+ - error2-records
+ - error3-records
+ - error4-records \ No newline at end of file
diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties
index e7c4ef9..32c696e 100644
--- a/src/main/resources/log4j2.properties
+++ b/src/main/resources/log4j2.properties
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-rootLogger.level = WARN
+rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
diff --git a/src/site/markdown/changelogs.md b/src/site/markdown/changelogs.md
new file mode 100644
index 0000000..aab7b60
--- /dev/null
+++ b/src/site/markdown/changelogs.md
@@ -0,0 +1,5 @@
+## Changelog
+
+### 2.0
+
+- [GAL-602](https://jira.geedge.net/browse/GAL-602) 基于 Easy Stream 框架的配置化改造。 \ No newline at end of file
diff --git a/src/site/markdown/deploy.md b/src/site/markdown/deploy.md
new file mode 100644
index 0000000..5114e9e
--- /dev/null
+++ b/src/site/markdown/deploy.md
@@ -0,0 +1,13 @@
+## Deploy
+
+- 准备 JDK ${java.version} 的环境
+
+- 准备 Flink ${flink.version} 的环境
+
+- [下载](./download.html) 对应版本 UDF 依赖 Jar
+
+- [下载](./download.html) 对应版本 Job 配置 (一个 yml 文件)
+
+- 执行命令 `flink run -Dflink.rest.bind-port=8081 -c com.geedgenetworks.flink.easy.core.Runner path/to/sip-rtp-correlation-<version>.jar job.yml`
+
+- 您将在控制台看到启动日志,同时您可以在 `http://<you-host>:8081` 看到任务 UI。 \ No newline at end of file
diff --git a/src/site/markdown/download.md b/src/site/markdown/download.md
new file mode 100644
index 0000000..72811df
--- /dev/null
+++ b/src/site/markdown/download.md
@@ -0,0 +1,8 @@
+## Download
+
+### Easy Stream ${project.version}
+
+| UDF Jar | Job |
+|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| --------------- |
+| [JAR](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.jar) ( [MD5](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.jar.md5) [SHA1](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.jar.sha1) ) | [YML](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.tar.gz) ( [MD5](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.tar.gz.md5) [SHA1](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.tar.gz.sha1) ) |
+
diff --git a/src/site/markdown/index.md b/src/site/markdown/index.md
new file mode 100644
index 0000000..2bf785c
--- /dev/null
+++ b/src/site/markdown/index.md
@@ -0,0 +1,10 @@
+## SIP RTP Correlation
+
+SIP RTP Correlation 项目是一个使用 Apache Flink 实现的实时数据处理项目,旨在从 Kafka 中读取 SIP(Session Initiation Protocol)和 RTP(Real-time Transport Protocol)数据,将它们融合成完整的 VoIP(Voice over Internet Protocol)通话数据。
+
+SIP RTP Correlation 项目可以用于实时监控和分析 VoIP 通话数据,提取关键指标,以及进行实时报警和诊断。
+
+<br/>
+
+
+You can download the latest release from [Job Yml](./jobs/job.yml). And you can changelog from [CHANGELOG.md](./changelogs.html). \ No newline at end of file
diff --git a/src/site/resources/css/site.css b/src/site/resources/css/site.css
new file mode 100644
index 0000000..11a3d99
--- /dev/null
+++ b/src/site/resources/css/site.css
@@ -0,0 +1,13 @@
+#banner {
+ height: 108px;
+ background: none;
+}
+
+#bannerLeft img {
+ margin-left: 18px;
+ margin-top: 10px;
+}
+
+div.well {
+ display: none;
+} \ No newline at end of file
diff --git a/src/site/resources/images/logo.png b/src/site/resources/images/logo.png
new file mode 100644
index 0000000..da2277a
--- /dev/null
+++ b/src/site/resources/images/logo.png
Binary files differ
diff --git a/src/site/site.xml b/src/site/site.xml
new file mode 100644
index 0000000..497fe49
--- /dev/null
+++ b/src/site/site.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain maven-site.vm copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="SIP RTP Correlate">
+ <bannerLeft>
+ <name>Easy Stream</name>
+ <src>images/logo.png</src>
+ <href>#</href>
+ </bannerLeft>
+
+ <publishDate position="right"/>
+ <version position="right"/>
+
+ <skin>
+ <groupId>org.apache.maven.skins</groupId>
+ <artifactId>maven-fluido-skin</artifactId>
+ <version>1.10.0</version>
+ </skin>
+
+ <custom>
+ <fluidoSkin>
+ <sourceLineNumbersEnabled>true</sourceLineNumbersEnabled>
+ </fluidoSkin>
+ </custom>
+
+ <body>
+ <breadcrumbs position="left">
+ <item name="Galaxy" href="#"/>
+ <item name="Platform" href="#"/>
+ <item name="Easy Stream" href="#"/>
+ <item name="Application" href="#"/>
+ </breadcrumbs>
+
+ <menu name="OVERVIEW" inherit="top">
+ <item name="Introduction" href="index.html"/>
+ <item name="Deploy" href="deploy.html"/>
+ <item name="Download" href="download.html"/>
+ </menu>
+
+ <footer>
+ <![CDATA[ Copyright ©2022 <a href="#">Galaxy Platform</a>. All rights reserved.]]>
+ </footer>
+ </body>
+</project>
diff --git a/src/test/java/com/geedgenetworks/flink/easy/application/ApplicationTest.java b/src/test/java/com/geedgenetworks/flink/easy/application/ApplicationTest.java
new file mode 100644
index 0000000..0c48a4d
--- /dev/null
+++ b/src/test/java/com/geedgenetworks/flink/easy/application/ApplicationTest.java
@@ -0,0 +1,33 @@
+package com.geedgenetworks.flink.easy.application;
+
+import com.geedgenetworks.flink.easy.core.Runners;
+import org.junit.jupiter.api.Test;
+
+public class ApplicationTest {
+
+ static {
+ System.setProperty("easy.execute.mode", "validate");
+ System.setProperty("flink.rest.bind-port", "8081");
+// System.setProperty("flink.rest.flamegraph.enabled", "true");
+ System.setProperty("flink.heartbeat.timeout", "1800000");
+ }
+
+ public static String discoverConfiguration(final String name) throws Exception {
+ var path = String.format("/jobs/%s.yml", name);
+ var resource = ApplicationTest.class.getResource(path);
+ if (resource == null) {
+ // maven
+ resource = ApplicationTest.class.getResource(String.format("../classes/%s", path));
+ }
+ if (resource == null) {
+ throw new IllegalArgumentException(
+ String.format("Not found job '%s' in path [%s].", name, path));
+ }
+ return resource.getPath();
+ }
+
+ @Test
+ public void testJob() throws Exception {
+ Runners.run(discoverConfiguration("job"));
+ }
+}
diff --git a/src/test/java/com/zdjizhi/flink/voip/conf/FusionConfigurationTest.java b/src/test/java/com/zdjizhi/flink/voip/conf/FusionConfigurationTest.java
deleted file mode 100644
index b45c9a9..0000000
--- a/src/test/java/com/zdjizhi/flink/voip/conf/FusionConfigurationTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package com.zdjizhi.flink.voip.conf;
-
-import org.apache.flink.configuration.Configuration;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.Properties;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class FusionConfigurationTest {
-
- private FusionConfiguration fusionConfiguration;
-
- @BeforeEach
- public void setUp() {
- final Configuration config;
- config = new Configuration();
- config.setString("prefix_key1", "value1");
- config.setString("prefix_key2", "value2");
- config.setString("other_key", "other_value");
-
- fusionConfiguration = new FusionConfiguration(config);
- }
-
- @Test
- public void testGetPropertiesWithValidPrefix() {
- String prefix = "prefix_";
- Properties properties = fusionConfiguration.getProperties(prefix);
-
- assertEquals(2, properties.size());
- assertEquals("value1", properties.getProperty("key1"));
- assertEquals("value2", properties.getProperty("key2"));
- }
-
- @Test
- public void testGetPropertiesWithInvalidPrefix() {
- String prefix = "invalid_";
- Properties properties = fusionConfiguration.getProperties(prefix);
-
- assertTrue(properties.isEmpty());
- }
-
- @Test
- public void testGetPropertiesWithEmptyPrefix() {
- String prefix = "";
- Properties properties = fusionConfiguration.getProperties(prefix);
-
- assertEquals(3, properties.size());
- assertEquals("value1", properties.getProperty("prefix_key1"));
- assertEquals("value2", properties.getProperty("prefix_key2"));
- assertEquals("other_value", properties.getProperty("other_key"));
- }
-
- @Test
- public void testGetPropertiesWithNullPrefix() {
- // Null prefix should be treated as an empty prefix
- String prefix = null;
- Properties properties = fusionConfiguration.getProperties(prefix);
-
- assertEquals(3, properties.size());
- assertEquals("value1", properties.getProperty("prefix_key1"));
- assertEquals("value2", properties.getProperty("prefix_key2"));
- assertEquals("other_value", properties.getProperty("other_key"));
- }
-}
diff --git a/src/test/java/com/zdjizhi/flink/voip/data/Generator.java b/src/test/java/com/zdjizhi/flink/voip/data/Generator.java
deleted file mode 100644
index b46c714..0000000
--- a/src/test/java/com/zdjizhi/flink/voip/data/Generator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-package com.zdjizhi.flink.voip.data;
-
-import com.github.javafaker.Faker;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * The abstract class Generator<T> serves as the base class for implementing record generators.
- * It provides common functionalities for generating random data and controlling the generation ratio.
- *
- * @param <T> The type of records to be generated.
- * @author chaoc
- * @since 1.0
- */
-public abstract class Generator<T> {
-
- protected final ThreadLocalRandom random;
- /**
- * The ratio of generated records. The total number of records generated will be 100 / ratio.
- */
- protected final int ratio;
-
- private final Faker faker;
-
- protected final AtomicReference<T> state;
-
- protected final ObjectMapper mapper;
-
- /**
- * Creates a new Generator with the given ratio.
- *
- * @param ratio The ratio of generated records. The total number of records generated will be 100 / ratio.
- */
- public Generator(final int ratio) {
- this.ratio = ratio;
- this.faker = new Faker();
- this.random = ThreadLocalRandom.current();
- this.state = new AtomicReference<>();
- this.mapper = new ObjectMapper();
- }
-
- /**
- * Generates the next record based on the specified ratio and the current state.
- * It randomly selects whether to generate a new record or return the last generated record.
- *
- * @return The next generated record of type T.
- */
- public abstract T next();
-
- /**
- * Generates a new record of type T.
- *
- * @return The newly generated record of type T.
- */
- protected abstract T generate();
-
- /**
- * Performs post-processing on the generated record of type T.
- * Subclasses can override this method to modify the generated record before returning it.
- *
- * @param v The generated record of type T.
- * @return The post-processed record of type T.
- */
- protected abstract T afterState(T v);
-
- /**
- * Generates a random IP address (either IPv4 or IPv6) .
- *
- * @return A randomly generated IP address as a string.
- */
- public final String nextIp() {
- if (random.nextBoolean()) {
- return faker.internet().ipV4Address();
- }
- return faker.internet().ipV6Address();
- }
-
- /**
- * Generates a random ID number.
- *
- * @return A randomly generated ID number as a string.
- */
- public final String nextId() {
- return faker.idNumber().valid();
- }
-
- /**
- * Generates a random port number within the range of 0 to 65535 (inclusive).
- *
- * @return A randomly generated port number as an integer.
- */
- public final int nextPort() {
- return random.nextInt(65535);
- }
-}
-
diff --git a/src/test/java/com/zdjizhi/flink/voip/data/RTPGenerator.java b/src/test/java/com/zdjizhi/flink/voip/data/RTPGenerator.java
deleted file mode 100644
index d997bad..0000000
--- a/src/test/java/com/zdjizhi/flink/voip/data/RTPGenerator.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package com.zdjizhi.flink.voip.data;
-
-import com.zdjizhi.flink.voip.records.Record;
-import com.zdjizhi.flink.voip.records.SIPRecord;
-import com.zdjizhi.flink.voip.records.SchemaType;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * RTPGenerator extends Generator<ObjectNode> and is responsible for generating RTP records.
- * It generates random RTP records with specific properties.
- *
- * @author chaoc
- * @since 1.0
- */
-public class RTPGenerator extends Generator<ObjectNode> {
- private final SIPGenerator sipGenerator;
-
- public RTPGenerator(int ratio, SIPGenerator sipGenerator) {
- super(ratio);
- this.sipGenerator = sipGenerator;
- }
-
- @Override
- public ObjectNode next() {
- int i = random.nextInt(100);
- if (i < ratio) {
- final ObjectNode node = sipGenerator.state.get();
- if (null != node) {
- final ObjectNode obj = generate();
- obj.set(Record.F_COMMON_CLIENT_IP, node.get(SIPRecord.F_ORIGINATOR_SDP_CONNECT_IP));
- obj.set(Record.F_COMMON_CLIENT_PORT, node.get(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT));
- obj.set(Record.F_COMMON_SERVER_IP, node.get(SIPRecord.F_RESPONDER_SDP_CONNECT_IP));
- obj.set(Record.F_COMMON_SERVER_PORT, node.get(SIPRecord.F_RESPONDER_SDP_MEDIA_PORT));
- return obj;
- }
- }
- return generate();
- }
-
- @Override
- protected ObjectNode generate() {
- final String json = "{\n" +
- " \"common_address_list\": \"42924-36682-172.17.201.16-172.17.200.50\",\n" +
- " \"common_address_type\": 4,\n" +
- " \"common_app_full_path\": \"rtp\",\n" +
- " \"common_app_label\": \"rtp\",\n" +
- " \"common_c2s_byte_num\": 0,\n" +
- " \"common_c2s_pkt_num\": 0,\n" +
- " \"common_client_ip\": \"172.17.201.16\",\n" +
- " \"common_client_port\": 42924,\n" +
- " \"common_con_duration_ms\": 1086,\n" +
- " \"common_device_id\": \"unknown\",\n" +
- " \"common_device_tag\": \"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-tsgxSfcOs\\\"}]}\",\n" +
- " \"common_direction\": 73,\n" +
- " \"common_end_time\": 1689295970,\n" +
- " \"common_flags\": 16401,\n" +
- " \"common_flags_identify_info\": \"{\\\"Asymmetric\\\":5,\\\"Server is Local\\\":1,\\\"S2C\\\":1}\",\n" +
- " \"common_l4_protocol\": \"IPv4_UDP\",\n" +
- " \"common_out_dest_mac\": \"02:fc:08:dc:92:d7\",\n" +
- " \"common_out_src_mac\": \"02:fc:08:dc:91:c3\",\n" +
- " \"common_protocol_label\": \"ETHERNET.IPv4.UDP\",\n" +
- " \"common_s2c_byte_num\": 7570,\n" +
- " \"common_s2c_pkt_num\": 5,\n" +
- " \"common_schema_type\": \"RTP\",\n" +
- " \"common_server_ip\": \"172.17.200.50\",\n" +
- " \"common_server_port\": 36682,\n" +
- " \"common_sessions\": 1,\n" +
- " \"common_sled_ip\": \"192.168.42.54\",\n" +
- " \"common_start_time\": 1689294629,\n" +
- " \"common_stream_dir\": 3,\n" +
- " \"common_stream_trace_id\": \"290484792956466709\",\n" +
- " \"common_t_vsys_id\": 24,\n" +
- " \"common_vsys_id\": 24,\n" +
- " \"raw_log_status\": \"CLOSE\",\n" +
- " \"rtp_payload_type_c2s\": 0,\n" +
- " \"rtp_payload_type_s2c\": 0,\n" +
- " \"rtp_pcap_path\": \"http://192.168.44.67:9098/hos/rtp_hos_bucket/rtp_172.17.200.50_172.17.201.16_27988_55806_1689294629.pcap\"\n" +
- "}";
-
- ObjectNode obj;
- try {
- obj = mapper.readValue(json, ObjectNode.class);
- } catch (Exception e){
- obj = mapper.createObjectNode();
- }
- final SIPRecord record = new SIPRecord(obj);
-
- record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue());
- record.setString(SIPRecord.F_CALL_ID, nextId());
- record.setInt(SIPRecord.F_COMMON_STREAM_DIR, random.nextInt(3) + 1);
-
- record.setString(Record.F_COMMON_SERVER_IP, nextIp());
- record.setInt(Record.F_COMMON_SERVER_PORT, nextPort());
- record.setString(Record.F_COMMON_CLIENT_IP, nextIp());
- record.setInt(Record.F_COMMON_CLIENT_PORT, nextPort());
- return obj;
- }
-
- @Override
- protected ObjectNode afterState(ObjectNode v) {
- final Record record = new Record(v);
- switch (record.getStreamDir()) {
- case DOUBLE:
- record.setInt(Record.F_COMMON_STREAM_DIR, random.nextInt(2) + 1);
- break;
- case S2C:
- default:
- }
- return v;
- }
-}
diff --git a/src/test/java/com/zdjizhi/flink/voip/data/SIPGenerator.java b/src/test/java/com/zdjizhi/flink/voip/data/SIPGenerator.java
deleted file mode 100644
index b86ea99..0000000
--- a/src/test/java/com/zdjizhi/flink/voip/data/SIPGenerator.java
+++ /dev/null
@@ -1,138 +0,0 @@
-package com.zdjizhi.flink.voip.data;
-
-import com.zdjizhi.flink.voip.records.Record;
-import com.zdjizhi.flink.voip.records.SIPRecord;
-import com.zdjizhi.flink.voip.records.SchemaType;
-import com.zdjizhi.flink.voip.records.StreamDir;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * SIPGenerator extends Generator<ObjectNode> and is responsible for generating SIP (Session Initiation Protocol) records.
- * It generates random SIP records with specific properties such as call ID, stream dir, server IP, server port,
- * client IP, client port... information for both originator and responder.
- *
- * @author chaoc
- * @since 1.0
- */
-public class SIPGenerator extends Generator<ObjectNode> {
- public SIPGenerator(final int ratio) {
- super(ratio);
- }
-
- /**
- * Creates a new SIPGenerator with the default ratio of 40.
- */
- public SIPGenerator() {
- this(40);
- }
-
- @Override
- public final ObjectNode next() {
- int i = random.nextInt(100);
- if (i < ratio && state.get() != null) {
- ObjectNode t = afterState(state.get());
- state.set(null);
- return t;
- } else {
- return state.updateAndGet(t -> generate());
- }
- }
-
-
- @Override
- protected ObjectNode generate() {
- final String json = "{\n" +
- " \"common_schema_type\": \"SIP\",\n" +
- " \"common_sessions\": 1,\n" +
- " \"sip_call_id\": \"[email protected]\",\n" +
- " \"sip_originator_description\": \"<sip:[email protected]>\",\n" +
- " \"sip_responder_description\": \"sip:[email protected]\",\n" +
- " \"sip_originator_sdp_content\": \"v=0\\r\\no=SIP-UA 3395000 3397200 IN IP4 172.17.200.50\\r\\ns=SIP Call\\r\\nc=IN IP4 172.17.200.50\\r\\nt=0 0\\r\\nm=audio 36682 RTP/AVP 0\\r\\na=rtpmap:0 PCMU/8000\\r\\n\",\n" +
- " \"sip_originator_sdp_connect_ip\": \"172.17.200.50\",\n" +
- " \"sip_originator_sdp_media_port\": 36682,\n" +
- " \"sip_originator_sdp_media_type\": \"0 PCMU/8000\",\n" +
- " \"common_first_ttl\": 128,\n" +
- " \"common_c2s_ipfrag_num\": 0,\n" +
- " \"common_s2c_ipfrag_num\": 0,\n" +
- " \"common_c2s_tcp_unorder_num\": 0,\n" +
- " \"common_s2c_tcp_unorder_num\": 0,\n" +
- " \"common_c2s_tcp_lostlen\": 0,\n" +
- " \"common_s2c_tcp_lostlen\": 0,\n" +
- " \"common_c2s_pkt_retrans\": 2,\n" +
- " \"common_s2c_pkt_retrans\": 0,\n" +
- " \"common_c2s_byte_retrans\": 1100,\n" +
- " \"common_s2c_byte_retrans\": 0,\n" +
- " \"common_direction\": 69,\n" +
- " \"common_app_full_path\": \"sip\",\n" +
- " \"common_app_label\": \"sip\",\n" +
- " \"common_tcp_client_isn\": 3004427198,\n" +
- " \"common_server_ip\": \"172.17.201.16\",\n" +
- " \"common_client_ip\": \"172.17.200.49\",\n" +
- " \"common_server_port\": 5060,\n" +
- " \"common_client_port\": 6948,\n" +
- " \"common_stream_dir\": 1,\n" +
- " \"common_address_type\": 4,\n" +
- " \"common_address_list\": \"6948-5060-172.17.200.50-172.17.201.16\",\n" +
- " \"common_start_time\": 1689295655,\n" +
- " \"common_end_time\": 1689295670,\n" +
- " \"common_con_duration_ms\": 41467,\n" +
- " \"common_s2c_pkt_num\": 0,\n" +
- " \"common_s2c_byte_num\": 0,\n" +
- " \"common_c2s_pkt_num\": 6,\n" +
- " \"common_c2s_byte_num\": 1834,\n" +
- " \"common_establish_latency_ms\": 249,\n" +
- " \"common_out_src_mac\": \"02:fc:08:dc:91:c3\",\n" +
- " \"common_out_dest_mac\": \"02:fc:08:dc:92:d7\",\n" +
- " \"common_flags\": 8201,\n" +
- " \"common_flags_identify_info\": \"{\\\"Asymmetric\\\":6,\\\"Client is Local\\\":1,\\\"C2S\\\":1}\",\n" +
- " \"common_protocol_label\": \"ETHERNET.IPv4.TCP\",\n" +
- " \"common_stream_trace_id\": \"290502385134123507\",\n" +
- " \"common_l4_protocol\": \"IPv4_TCP\",\n" +
- " \"common_sled_ip\": \"192.168.42.54\",\n" +
- " \"common_device_id\": \"unknown\",\n" +
- " \"common_device_tag\": \"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-tsgxSfcOs\\\"}]}\",\n" +
- " \"common_t_vsys_id\": 24,\n" +
- " \"common_vsys_id\": 24\n" +
- "}";
-
- ObjectNode obj;
- try {
- obj = mapper.readValue(json, ObjectNode.class);
- } catch (Exception e) {
- obj = mapper.createObjectNode();
- }
-
- final SIPRecord record = new SIPRecord(obj);
-
- record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.SIP.getValue());
-
- record.setString(SIPRecord.F_CALL_ID, nextId() + "@spirent.com");
- record.setInt(SIPRecord.F_COMMON_STREAM_DIR, (random.nextBoolean() ? StreamDir.S2C : StreamDir.C2S).getValue());
-
- record.setString(Record.F_COMMON_SERVER_IP, nextIp());
- record.setInt(Record.F_COMMON_SERVER_PORT, nextPort());
- record.setString(Record.F_COMMON_CLIENT_IP, nextIp());
- record.setInt(Record.F_COMMON_CLIENT_PORT, nextPort());
-
- record.setString(SIPRecord.F_ORIGINATOR_SDP_CONNECT_IP, nextIp());
- record.setInt(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT, nextPort());
- record.setString(SIPRecord.F_RESPONDER_SDP_CONNECT_IP, nextIp());
- record.setInt(SIPRecord.F_RESPONDER_SDP_MEDIA_PORT, nextPort());
- return obj;
- }
-
- @Override
- protected ObjectNode afterState(ObjectNode v) {
- final Record record = new Record(v);
- switch (record.getStreamDir()) {
- case C2S:
- record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.S2C.getValue());
- break;
- case S2C:
- record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.C2S.getValue());
- break;
- default:
- }
- return v;
- }
-}
diff --git a/src/test/java/com/zdjizhi/flink/voip/functions/AddressTest.java b/src/test/java/com/zdjizhi/flink/voip/functions/AddressTest.java
deleted file mode 100644
index 7eb5544..0000000
--- a/src/test/java/com/zdjizhi/flink/voip/functions/AddressTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package com.zdjizhi.flink.voip.functions;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class AddressTest {
-
- @Test
- public void testAddressOfWithDifferentPortNumbers() {
- Tuple2<String, Integer> a1 = Tuple2.of("192.168.1.10", 8080);
- Tuple2<String, Integer> a2 = Tuple2.of("192.168.1.20", 9090);
-
- Address address = Address.of(a1, a2);
-
- assertEquals("192.168.1.10", address.getIp1());
- assertEquals(8080, address.getPort1());
- assertEquals("192.168.1.20", address.getIp2());
- assertEquals(9090, address.getPort2());
- }
-
- @Test
- public void testAddressOfWithSamePortNumber() {
- Tuple2<String, Integer> a1 = Tuple2.of("192.168.1.10", 8080);
- Tuple2<String, Integer> a2 = Tuple2.of("192.168.1.20", 8080);
-
- Address address = Address.of(a1, a2);
-
- assertEquals("192.168.1.10", address.getIp1());
- assertEquals(8080, address.getPort1());
- assertEquals("192.168.1.20", address.getIp2());
- assertEquals(8080, address.getPort2());
- }
-
- @Test
- public void testAddressOfWithInvertedOrder() {
- Tuple2<String, Integer> a1 = Tuple2.of("192.168.1.20", 8080);
- Tuple2<String, Integer> a2 = Tuple2.of("192.168.1.10", 9090);
-
- Address address = Address.of(a1, a2);
-
- assertEquals("192.168.1.20", address.getIp1());
- assertEquals(8080, address.getPort1());
- assertEquals("192.168.1.10", address.getIp2());
- assertEquals(9090, address.getPort2());
- }
-}
diff --git a/src/test/java/com/zdjizhi/flink/voip/functions/SIPPairingFunctionTest.java b/src/test/java/com/zdjizhi/flink/voip/functions/SIPPairingFunctionTest.java
deleted file mode 100644
index 104c20e..0000000
--- a/src/test/java/com/zdjizhi/flink/voip/functions/SIPPairingFunctionTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package com.zdjizhi.flink.voip.functions;
-
-import com.zdjizhi.flink.voip.conf.FusionConfigs;
-import com.zdjizhi.flink.voip.data.SIPGenerator;
-import com.zdjizhi.flink.voip.records.Record;
-import com.zdjizhi.flink.voip.records.StreamDir;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.junit.jupiter.api.Assertions.*;
-
-
-public class SIPPairingFunctionTest {
- private static final int INTERVAL_MINUTES = 5;
- private static KeyedOneInputStreamOperatorTestHarness<Integer, ObjectNode, ObjectNode> testHarness;
-
- @BeforeAll
- static void setUp() throws Exception {
- final SIPPairingFunction func = new SIPPairingFunction();
- final KeyedProcessOperator<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode> operator =
- new KeyedProcessOperator<>(func);
- final TypeInformation<Integer> type = TypeInformation.of(Integer.class);
- final KeySelector<ObjectNode, Integer> keySelector = jsonNodes -> 0;
- testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, type);
-
- final Configuration configuration = new Configuration();
- configuration.set(FusionConfigs.SIP_STATE_CLEAR_INTERVAL, INTERVAL_MINUTES);
- testHarness.getExecutionConfig().setGlobalJobParameters(configuration);
- testHarness.open();
- }
-
- @Test
- public void testProperlyPairSIPRecords() throws Exception {
- final SIPGenerator sipGenerator = new SIPGenerator();
- long current = System.currentTimeMillis();
-
- final ObjectNode obj = sipGenerator.next();
- final Record record = new Record(obj);
- record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.DOUBLE.getValue());
-
- final StreamRecord<ObjectNode> streamRecord = new StreamRecord<>(obj, current);
-
- testHarness.processElement(streamRecord);
- assertTrue(testHarness.getOutput().contains(streamRecord));
-
- long interval = Time.minutes(INTERVAL_MINUTES).toMilliseconds();
- long nextFireTime = (testHarness.getProcessingTime() / interval + 1) * interval;
- assertTrue(testHarness.getProcessingTimeService().getActiveTimerTimestamps().contains(nextFireTime));
-
- final ObjectNode obj1 = obj.deepCopy();
- final Record record1 = new Record(obj1);
- record1.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.S2C.getValue());
-
- final ObjectNode obj2 = obj.deepCopy();
- final Record record2 = new Record(obj2);
- record2.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.C2S.getValue());
-
- final MapStateDescriptor<Address, ObjectNodeInfo> descriptor =
- new MapStateDescriptor<>(
- "sip-state",
- TypeInformation.of(Address.class),
- TypeInformation.of(ObjectNodeInfo.class)
- );
-
- testHarness.processElement(obj1, current);
- final MapState<Address, ObjectNodeInfo> mapState = testHarness.getOperator()
- .getKeyedStateStore().getMapState(descriptor);
- final Set<ObjectNode> objectNodes1 = Lists.newArrayList(mapState.values()).stream()
- .map(ObjectNodeInfo::getObj)
- .collect(Collectors.toSet());
- assertTrue(objectNodes1.contains(obj1));
-
- testHarness.processElement(obj2, current);
- final Set<ObjectNode> objectNodes2 = Lists.newArrayList(mapState.values()).stream()
- .map(ObjectNodeInfo::getObj)
- .collect(Collectors.toSet());
- assertFalse(objectNodes2.contains(obj2));
- assertTrue(testHarness.getOutput().contains(new StreamRecord<>(obj2, current)));
- assertEquals(2, testHarness.getOutput().size());
- }
-
-
- @AfterAll
- static void close() throws Exception {
- testHarness.close();
- }
-
-} \ No newline at end of file
diff --git a/src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java b/src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java
deleted file mode 100644
index 0f22986..0000000
--- a/src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package com.zdjizhi.flink.voip.records;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.IntNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class RecordTest {
-
- private static ObjectMapper mapper;
-
- @BeforeAll
- static void setUp() {
- mapper = new ObjectMapper();
- }
-
- @Test
- void testGetVSysID() {
- final ObjectNode obj = mapper.createObjectNode();
- final Record record = new Record(obj);
- record.setInt(Record.F_COMMON_VSYS_ID, 42);
- assertEquals(42, record.getVSysID());
-
- obj.set(Record.F_COMMON_VSYS_ID, IntNode.valueOf(40));
- assertEquals(40, record.getVSysID());
- }
-
- @Test
- void testGetSchemaType() {
- final ObjectNode obj = mapper.createObjectNode();
- final Record record = new Record(obj);
- record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue());
- assertEquals(SchemaType.RTP, record.getSchemaType());
-
- obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaType.VOIP.getValue()));
- assertEquals(SchemaType.VOIP, record.getSchemaType());
- }
-
- @Test
- void testSetInt() {
- final ObjectNode obj = mapper.createObjectNode();
- final Record record = new Record(obj);
- record.setInt("intField", 123);
- assertEquals(123, obj.get("intField").intValue());
- }
-
- @Test
- void testSetString() {
- final ObjectNode obj = mapper.createObjectNode();
- final Record record = new Record(obj);
- record.setString("stringField", "testValue");
- assertEquals("testValue", obj.get("stringField").textValue());
- }
-
- @Test
- void testMerge() {
- final ObjectNode obj = mapper.createObjectNode();
- obj.set("field1", IntNode.valueOf(1));
- ObjectNode other = mapper.createObjectNode();
- other.set("field2", TextNode.valueOf("value2"));
- Record record = new Record(obj);
- record.merge(other);
- assertEquals(1, obj.get("field1").intValue());
- assertEquals("value2", obj.get("field2").textValue());
- }
-}
diff --git a/src/test/resources/data/session-records.txt b/src/test/resources/data/session-records.txt
new file mode 100644
index 0000000..b54426f
--- /dev/null
+++ b/src/test/resources/data/session-records.txt
@@ -0,0 +1,4 @@
+{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240001,"decoded_as":"RTP","ip_protocol":"udp","address_type":4,"client_ip":"192.168.64.8","server_ip":"192.168.39.62","client_port":25524,"server_port":4580,"t_vsys_id":0,"vsys_id":1024,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.rtp","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830004000,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":57620,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","rtp_pcap_path":"123e4567-e89b-12d3-a456-426614174005"}
+{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240002,"decoded_as":"SIP","ip_protocol":"udp","address_type":4,"client_ip":"10.0.0.1","server_ip":"192.0.2.1","client_port":1000,"server_port":60000,"t_vsys_id":0,"vsys_id":1025,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.sip","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830000100,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":8192,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","sip_call_id":"NGMxZWY3Y2NmMzNlNGE3NzJhODgyZDAwM2YyMzQ4NGI.","sip_originator_description":"\"lina\"<sip:[email protected]>;tag=1837055d","sip_responder_description":"\"1075\"<sip:[email protected]>","sip_originator_sdp_connect_ip":"192.168.64.85","sip_originator_sdp_media_port":25524,"sip_originator_sdp_media_type":"application/sdp","sip_server":"OpenSIPS (2.4.11 (x86_64/linux))","sip_responder_sdp_connect_ip":"192.168.39.62","sip_responder_sdp_media_port":4580,"sip_responder_sdp_media_type":"application/sdp","sip_duration_s":590,"sip_bye":"responder","sip_cseq":"2 BYE","sip_via":"SIP/2.0/UDP 192.0.2.1:5060;branch=z9hG4bKbe7c.392190f1.0","sip_user_agent":"eyeBeam release 1011d stamp 40820","sip_is_request":0}
+{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240003,"decoded_as":"RTP","ip_protocol":"udp","address_type":4,"client_ip":"192.168.164.18","server_ip":"192.168.39.162","client_port":65121,"server_port":4670,"t_vsys_id":0,"vsys_id":1024,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.rtp","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830000000,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":57620,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","rtp_pcap_path":"123e4567-e89b-12d3-a456-426614174005"}
+{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240004,"decoded_as":"SIP","ip_protocol":"udp","address_type":4,"client_ip":"10.0.0.2","server_ip":"192.0.2.2","client_port":1000,"server_port":60000,"t_vsys_id":0,"vsys_id":1024,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.sip","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830000000,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":24584,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","sip_call_id":"IUMxZWY3Y2NmMzNlNGE3NzJhODgyZDAwM2YyMzQ4NGI.","sip_originator_description":"\"lina\"<sip:[email protected]>;tag=1837055d","sip_responder_description":"\"1075\"<sip:[email protected]>","sip_originator_sdp_connect_ip":"192.168.64.8","sip_originator_sdp_media_port":25524,"sip_originator_sdp_media_type":"application/sdp","sip_server":"OpenSIPS (2.4.11 (x86_64/linux))","sip_responder_sdp_connect_ip":"192.168.39.62","sip_responder_sdp_media_port":4580,"sip_responder_sdp_media_type":"application/sdp","sip_duration_s":590,"sip_bye":"responder","sip_cseq":"2 BYE","sip_via":"SIP/2.0/UDP 192.0.2.1:5060;branch=z9hG4bKbe7c.392190f1.0","sip_user_agent":"eyeBeam release 1011d stamp 40820","sip_is_request":0} \ No newline at end of file
diff --git a/tools/dist/target.xml b/tools/dist/target.xml
new file mode 100644
index 0000000..2ea0035
--- /dev/null
+++ b/tools/dist/target.xml
@@ -0,0 +1,44 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly
+ xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+ <id>bin</id>
+
+ <formats>
+ <format>tar.gz</format>
+ </formats>
+
+ <includeBaseDirectory>false</includeBaseDirectory>
+
+ <fileSets>
+ <fileSet>
+ <directory>src/main/resources/jobs</directory>
+ <includes>
+ <include>*.yml</include>
+ </includes>
+ <fileMode>0755</fileMode>
+ <lineEnding>lf</lineEnding>
+ <directoryMode>0644</directoryMode>
+ <outputDirectory>./</outputDirectory>
+ </fileSet>
+ </fileSets>
+
+</assembly>
diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml
new file mode 100644
index 0000000..78f2137
--- /dev/null
+++ b/tools/maven/checkstyle.xml
@@ -0,0 +1,394 @@
+<?xml version="1.0"?>
+<!DOCTYPE module PUBLIC
+ "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+ "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+
+<!--
+This is a checkstyle configuration file. For descriptions of
+what the following rules do, please see the checkstyle configuration
+page at http://checkstyle.sourceforge.net/config.html.
+-->
+
+<module name="Checker">
+
+ <module name="RegexpSingleline">
+ <!-- Checks that TODOs don't have stuff in parenthesis, e.g., username. -->
+ <property name="format" value="((//.*)|(\*.*))TODO\("/>
+ <property name="message" value="TODO comments must not include usernames."/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="RegexpSingleline">
+ <property name="format" value="\s+$"/>
+ <property name="message" value="Trailing whitespace"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="RegexpSingleline">
+ <property name="format" value="Throwables.propagate\("/>
+ <property name="message" value="Throwables.propagate is deprecated"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <!-- Prevent *Tests.java as tools may not pick them up -->
+ <module name="RegexpOnFilename">
+ <property name="fileNamePattern" value=".*Tests\.java$"/>
+ </module>
+
+ <module name="SuppressionFilter">
+ <property name="file" value="${checkstyle.suppressions.file}" default="suppressions.xml"/>
+ </module>
+
+ <module name="FileLength">
+ <property name="max" value="3000"/>
+ </module>
+
+ <!-- All Java AST specific tests live under TreeWalker module. -->
+ <module name="TreeWalker">
+
+ <!-- Allow use of comment to suppress javadocstyle -->
+ <module name="SuppressionCommentFilter">
+ <property name="offCommentFormat" value="CHECKSTYLE.OFF\: ([\w\|]+)"/>
+ <property name="onCommentFormat" value="CHECKSTYLE.ON\: ([\w\|]+)"/>
+ <property name="checkFormat" value="$1"/>
+ </module>
+
+ <!-- Prohibit T.getT() methods for standard boxed types -->
+ <module name="Regexp">
+ <property name="format" value="Boolean\.getBoolean"/>
+ <property name="illegalPattern" value="true"/>
+ <property name="message" value="Use System.getProperties() to get system properties."/>
+ </module>
+
+ <module name="Regexp">
+ <property name="format" value="Integer\.getInteger"/>
+ <property name="illegalPattern" value="true"/>
+ <property name="message" value="Use System.getProperties() to get system properties."/>
+ </module>
+
+ <module name="Regexp">
+ <property name="format" value="Long\.getLong"/>
+ <property name="illegalPattern" value="true"/>
+ <property name="message" value="Use System.getProperties() to get system properties."/>
+ </module>
+
+ <!--
+
+ IllegalImport cannot blacklist classes so we have to fall back to Regexp.
+
+ -->
+
+ <!-- forbid use of commons lang validate -->
+ <module name="Regexp">
+ <property name="format" value="org\.apache\.commons\.lang3\.Validate"/>
+ <property name="illegalPattern" value="true"/>
+ <property name="message"
+ value="Use Guava Checks instead of Commons Validate. Please refer to the coding guidelines."/>
+ </module>
+ <module name="Regexp">
+ <property name="format" value="org\.apache\.commons\.lang\."/>
+ <property name="illegalPattern" value="true"/>
+ <property name="message" value="Use commons-lang3 instead of commons-lang."/>
+ </module>
+ <module name="Regexp">
+ <property name="format" value="org\.codehaus\.jettison"/>
+ <property name="illegalPattern" value="true"/>
+ <property name="message" value="Use flink-shaded-jackson instead of jettison."/>
+ </module>
+ <module name="Regexp">
+ <property name="format" value="org\.testcontainers\.shaded"/>
+ <property name="illegalPattern" value="true"/>
+ <property name="message"
+ value="Use utilities from appropriate library instead of org.testcontainers."/>
+ </module>
+
+ <!-- Enforce Java-style array declarations -->
+ <module name="ArrayTypeStyle"/>
+
+ <module name="TodoComment">
+ <!-- Checks that disallowed strings are not used in comments. -->
+ <property name="format" value="(FIXME)|(XXX)"/>
+ </module>
+
+ <!--
+
+ IMPORT CHECKS
+
+ -->
+
+ <module name="RedundantImport">
+ <!-- Checks for redundant import statements. -->
+ <property name="severity" value="error"/>
+ <message key="import.redundancy"
+ value="Redundant import {0}."/>
+ </module>
+
+ <module name="IllegalImport">
+ <property name="illegalPkgs"
+ value="autovalue.shaded, avro.shaded, com.google.api.client.repackaged, com.google.appengine.repackaged"/>
+ </module>
+ <module name="IllegalImport">
+ <property name="illegalPkgs" value="com.fasterxml.jackson"/>
+ <message key="import.illegal" value="{0}; Use flink-shaded-jackson instead."/>
+ </module>
+ <module name="IllegalImport">
+ <property name="illegalPkgs" value="org.codehaus.jackson"/>
+ <message key="import.illegal" value="{0}; Use flink-shaded-jackson instead."/>
+ </module>
+ <module name="IllegalImport">
+ <property name="illegalPkgs" value="org.objectweb.asm"/>
+ <message key="import.illegal" value="{0}; Use flink-shaded-asm instead."/>
+ </module>
+ <module name="IllegalImport">
+ <property name="illegalPkgs" value="io.netty"/>
+ <message key="import.illegal" value="{0}; Use flink-shaded-netty instead."/>
+ </module>
+
+ <module name="RedundantModifier">
+ <!-- Checks for redundant modifiers on various symbol definitions.
+ See: http://checkstyle.sourceforge.net/config_modifier.html#RedundantModifier
+
+ We exclude METHOD_DEF to allow final methods in final classes to make them more future-proof.
+ -->
+ <property name="tokens"
+ value="VARIABLE_DEF, ANNOTATION_FIELD_DEF, INTERFACE_DEF, CLASS_DEF, ENUM_DEF"/>
+ </module>
+
+ <!--
+ IllegalImport cannot blacklist classes, and c.g.api.client.util is used for some shaded
+ code and some useful code. So we need to fall back to Regexp.
+ -->
+ <module name="RegexpSinglelineJava">
+ <property name="format" value="^import com.google.common.base.Preconditions;$"/>
+ <property name="message" value="Static import functions from Guava Preconditions"/>
+ </module>
+
+ <module name="UnusedImports">
+ <property name="severity" value="error"/>
+ <property name="processJavadoc" value="true"/>
+ <message key="import.unused"
+ value="Unused import: {0}."/>
+ </module>
+
+ <!--
+
+ NAMING CHECKS
+
+ -->
+
+ <!-- Item 38 - Adhere to generally accepted naming conventions -->
+
+ <module name="PackageName">
+ <!-- Validates identifiers for package names against the
+ supplied expression. -->
+ <!-- Here the default checkstyle rule restricts package name parts to
+ seven characters, this is not in line with common practice at Google.
+ -->
+ <property name="format" value="^[a-z]+(\.[a-z][a-z0-9]{1,})*$"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="TypeNameCheck">
+ <!-- Validates static, final fields against the
+ expression "^[A-Z][a-zA-Z0-9]*$". -->
+ <metadata name="altname" value="TypeName"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="ConstantNameCheck">
+ <!-- Validates non-private, static, final fields against the supplied
+ public/package final fields "^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$". -->
+ <metadata name="altname" value="ConstantName"/>
+ <property name="applyToPublic" value="true"/>
+ <property name="applyToProtected" value="true"/>
+ <property name="applyToPackage" value="true"/>
+ <property name="applyToPrivate" value="true"/>
+ <property name="format" value="^([A-Z][A-Z0-9]*(_[A-Z0-9]+)*|FLAG_.*)$"/>
+ <message key="name.invalidPattern"
+ value="Variable ''{0}'' should be in ALL_CAPS (if it is a constant) or be private (otherwise)."/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="StaticVariableNameCheck">
+ <!-- Validates static, non-final fields against the supplied
+ expression "^[a-z][a-zA-Z0-9]*_?$". -->
+ <metadata name="altname" value="StaticVariableName"/>
+ <property name="applyToPublic" value="true"/>
+ <property name="applyToProtected" value="true"/>
+ <property name="applyToPackage" value="true"/>
+ <property name="applyToPrivate" value="true"/>
+ <property name="format" value="^[a-z][a-zA-Z0-9]*_?$"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="MemberNameCheck">
+ <!-- Validates non-static members against the supplied expression. -->
+ <metadata name="altname" value="MemberName"/>
+ <property name="applyToPublic" value="true"/>
+ <property name="applyToProtected" value="true"/>
+ <property name="applyToPackage" value="true"/>
+ <property name="applyToPrivate" value="true"/>
+ <property name="format" value="^[a-z][a-zA-Z0-9]*$"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="MethodNameCheck">
+ <!-- Validates identifiers for method names. -->
+ <metadata name="altname" value="MethodName"/>
+ <property name="format" value="^[a-z][a-zA-Z0-9]*(_[a-zA-Z0-9]+)*$"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="ParameterName">
+ <!-- Validates identifiers for method parameters against the
+ expression "^[a-z][a-zA-Z0-9]*$". -->
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="LocalFinalVariableName">
+ <!-- Validates identifiers for local final variables against the
+ expression "^[a-z][a-zA-Z0-9]*$". -->
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="LocalVariableName">
+ <!-- Validates identifiers for local variables against the
+ expression "^[a-z][a-zA-Z0-9]*$". -->
+ <property name="severity" value="error"/>
+ </module>
+
+ <!--
+
+ LENGTH and CODING CHECKS
+
+ -->
+
+ <!-- Checks for braces around if and else blocks -->
+ <module name="NeedBraces">
+ <property name="severity" value="error"/>
+ <property name="tokens"
+ value="LITERAL_IF, LITERAL_ELSE, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO"/>
+ </module>
+
+ <module name="UpperEll">
+ <!-- Checks that long constants are defined with an upper ell.-->
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="FallThrough">
+ <!-- Warn about falling through to the next case statement. Similar to
+ javac -Xlint:fallthrough, but the check is suppressed if a single-line comment
+ on the last non-blank line preceding the fallen-into case contains 'fall through' (or
+ some other variants that we don't publicized to promote consistency).
+ -->
+ <property name="reliefPattern"
+ value="fall through|Fall through|fallthru|Fallthru|falls through|Falls through|fallthrough|Fallthrough|No break|NO break|no break|continue on"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <!-- Checks for over-complicated boolean expressions. -->
+ <module name="SimplifyBooleanExpression"/>
+
+ <!-- Detects empty statements (standalone ";" semicolon). -->
+ <module name="EmptyStatement"/>
+
+ <!-- Detect multiple consecutive semicolons (e.g. ";;"). -->
+ <module name="RegexpSinglelineJava">
+ <property name="format" value=";{2,}"/>
+ <property name="message" value="Use one semicolon"/>
+ <property name="ignoreComments" value="true"/>
+ </module>
+
+ <!--
+
+ MODIFIERS CHECKS
+
+ -->
+
+ <module name="ModifierOrder">
+ <!-- Warn if modifier order is inconsistent with JLS3 8.1.1, 8.3.1, and
+ 8.4.3. The prescribed order is:
+ public, protected, private, abstract, static, final, transient, volatile,
+ synchronized, native, strictfp
+ -->
+ <property name="severity" value="error"/>
+ </module>
+
+
+ <!--
+
+ WHITESPACE CHECKS
+
+ -->
+
+ <module name="EmptyLineSeparator">
+ <!-- Checks for empty line separator between tokens. The only
+ excluded token is VARIABLE_DEF, allowing class fields to
+ be declared on consecutive lines.
+ -->
+ <property name="allowMultipleEmptyLines" value="false"/>
+ <property name="allowMultipleEmptyLinesInsideClassMembers" value="false"/>
+ <property name="tokens" value="PACKAGE_DEF, IMPORT, STATIC_IMPORT, CLASS_DEF,
+ INTERFACE_DEF, ENUM_DEF, STATIC_INIT, INSTANCE_INIT, METHOD_DEF,
+ CTOR_DEF"/>
+ </module>
+
+ <module name="SingleSpaceSeparator"/>
+
+ <module name="WhitespaceAround">
+ <!-- Checks that various tokens are surrounded by whitespace.
+ This includes most binary operators and keywords followed
+ by regular or curly braces.
+ -->
+ <property name="tokens" value="ASSIGN, BAND, BAND_ASSIGN, BOR,
+ BOR_ASSIGN, BSR, BSR_ASSIGN, BXOR, BXOR_ASSIGN, COLON, DIV, DIV_ASSIGN,
+ EQUAL, GE, GT, LAMBDA, LAND, LE, LITERAL_CATCH, LITERAL_DO, LITERAL_ELSE,
+ LITERAL_FINALLY, LITERAL_FOR, LITERAL_IF, LITERAL_RETURN,
+ LITERAL_SYNCHRONIZED, LITERAL_TRY, LITERAL_WHILE, LOR, LT, MINUS,
+ MINUS_ASSIGN, MOD, MOD_ASSIGN, NOT_EQUAL, PLUS, PLUS_ASSIGN, QUESTION,
+ SL, SL_ASSIGN, SR_ASSIGN, STAR, STAR_ASSIGN, TYPE_EXTENSION_AND"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="WhitespaceAfter">
+ <!-- Checks that commas, semicolons and typecasts are followed by
+ whitespace.
+ -->
+ <property name="tokens" value="COMMA, SEMI, TYPECAST"/>
+ </module>
+
+ <module name="NoWhitespaceAfter">
+ <!-- Checks that there is no whitespace after various unary operators.
+ Linebreaks are allowed.
+ -->
+ <property name="tokens" value="BNOT, DEC, DOT, INC, LNOT, UNARY_MINUS,
+ UNARY_PLUS"/>
+ <property name="allowLineBreaks" value="true"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="NoWhitespaceBefore">
+ <!-- Checks that there is no whitespace before various unary operators.
+ Linebreaks are allowed.
+ -->
+ <property name="tokens" value="SEMI, DOT, POST_DEC, POST_INC"/>
+ <property name="allowLineBreaks" value="true"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="OperatorWrap">
+ <!-- Checks that assignment operators are at the end of the line. -->
+ <property name="option" value="eol"/>
+ <property name="tokens" value="ASSIGN"/>
+ </module>
+
+ <module name="ParenPad">
+ <!-- Checks that there is no whitespace before close parens or after
+ open parens.
+ -->
+ <property name="severity" value="error"/>
+ </module>
+
+ </module>
+</module>
+
diff --git a/tools/maven/spotbugs-exclude.xml b/tools/maven/spotbugs-exclude.xml
new file mode 100644
index 0000000..3647f17
--- /dev/null
+++ b/tools/maven/spotbugs-exclude.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<FindBugsFilter>
+ <Match>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ </Match>
+ <Match>
+ <Bug pattern="EI_EXPOSE_REP2"/>
+ </Match>
+ <Match>
+ <Bug pattern="SE_NO_SERIALVERSIONID"/>
+ </Match>
+ <Match>
+ <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
+ </Match>
+ <Match>
+ <Bug pattern="SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING"/>
+ </Match>
+</FindBugsFilter>
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
new file mode 100644
index 0000000..9c4d424
--- /dev/null
+++ b/tools/maven/suppressions.xml
@@ -0,0 +1,12 @@
+<?xml version="1.0"?>
+<!DOCTYPE suppressions PUBLIC
+ "-//Puppy Crawl//DTD Suppressions 1.1//EN"
+ "http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
+
+<suppressions>
+ <!-- target directory is not relevant for checkstyle -->
+ <suppress
+ files="[\\/]target[\\/]"
+ checks=".*"/>
+
+</suppressions>