diff options
62 files changed, 3101 insertions, 2471 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000..2959ffc --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,47 @@ +image: 192.168.40.153:9080/common/maven:3.8.1-openjdk-11-slim-with-git + +variables: + MAVEN_CLI_OPTS: "--batch-mode --errors --show-version" + +stages: + - check + - test + - build + +snapshot-version: + stage: check + script: + - mvn $MAVEN_CLI_OPTS enforcer:enforce@snapshot-version-check + rules: + - if: $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" && $CI_PIPELINE_SOURCE == "merge_request_event" + +non-snapshot-version: + stage: check + script: + - mvn $MAVEN_CLI_OPTS enforcer:enforce@release-version-check + - |- + if `mvn $MAVEN_CLI_OPTS dependency:get@release-deploy-check > /dev/null 2>&1`; then + echo "The current version has been deployed." + exit 1 + else + echo "The current version has not been deployed." + fi + rules: + - if: $CI_MERGE_REQUEST_TARGET_BRANCH_NAME =~ /(^master$|^release\/)/ && $CI_PIPELINE_SOURCE == "merge_request_event" + +test: + stage: test + script: + - mvn $MAVEN_CLI_OPTS clean test + only: + - merge_requests + +# Used for building snapshot versions on the develop branch. +build: + stage: build + script: + - echo "$MAVEN_SETTINGS_XML" > /usr/share/maven/conf/settings.xml + - mvn clean site deploy -DskipTests + only: + - master + - /^release\//
\ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md deleted file mode 100644 index 8cd08b3..0000000 --- a/CHANGELOG.md +++ /dev/null @@ -1,8 +0,0 @@ -# Changelog - -### Hotfix - - [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常 - -### Feature - - 输出 SIP Record - - [GAL-419](https://jira.geedge.net/browse/GAL-419) 增加配置项 `include.intranet.ip`, 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联,不关联则输出到异常 Topic 中。
\ No newline at end of file @@ -17,27 +17,9 @@ mvn clean package 使用以下命令运行Flink任务: ```shell -flink run -c com.zdjizhi.flink.voip.CorrelateApp path/to/sip-rtp-correlation-<version>.jar application.properties +flink run -c com.geedgenetworks.flink.easy.core.Runner path/to/sip-rtp-correlation-<version>.jar job.yml ``` -## 配置项说明 - -| 配置项 | 类型 | 必需 | 默认值 | 描述 | -|----------------------------------| ------------------- | ---------- | ---------------------------------------------------------- |-------------------------------------------| -| source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 | -| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties | -| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 | -| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties | -| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 | -| include.intranet.ip | BOOLEAN | N | True | 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联 | -| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 | -| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties | -| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) | -| rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) | -| job.name | STRING | N | correlation_sip_rtp_session | Job 名 | - - - ## 贡献 如果您发现任何问题或改进项目的想法,欢迎提交 Issue 或 Pull Request。
\ No newline at end of file @@ -5,23 +5,24 @@ <modelVersion>4.0.0</modelVersion> - <groupId>com.zdjizhi</groupId> + <groupId>com.geedgenetworks.application</groupId> <artifactId>sip-rtp-correlation</artifactId> - <version>1.2.2</version> + <version>2.0-SNAPSHOT</version> <name>Flink : SIP-RTP : Correlation</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <java.version>1.8</java.version> + <java.version>11</java.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> - <flink.version>1.13.6</flink.version> <scala.version>2.12.10</scala.version> <scala.binary.version>2.12</scala.binary.version> + <flink.version>1.13.6</flink.version> + <easy.stream.version>1.3-SNAPSHOT</easy.stream.version> <slf4j.version>1.7.32</slf4j.version> <log4j.version>2.17.1</log4j.version> - <jackson.version>2.13.2.20220328</jackson.version> + <junit.version>5.8.0</junit.version> </properties> <distributionManagement> @@ -34,97 +35,177 @@ <id>platform-snapshots</id> <url>http://192.168.40.153:8099/content/repositories/platform-snapshot</url> </snapshotRepository> + <site> + <id>platform-site</id> + <url> + dav:http://192.168.40.153:8099/content/sites/platform-site/platform/application/sip-rtp-correlate-${project.version} + </url> + </site> </distributionManagement> + <repositories> + <repository> + <id>central</id> + <url>http://192.168.40.153:8099/content/groups/public</url> + </repository> + <repository> + <id>snapshots</id> + <url>http://192.168.40.153:8099/content/groups/public</url> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> + </repositories> + <dependencies> <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> </dependency> <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_${scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> </dependency> <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime-web_${scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>provided</scope> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> </dependency> <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> - <version>${flink.version}</version> + <groupId>com.zdjizhi</groupId> + <artifactId>galaxy</artifactId> </dependency> <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-json</artifactId> - <version>${flink.version}</version> + <groupId>xyz.downgoon</groupId> + <artifactId>snowflake</artifactId> + <version>1.0.0</version> </dependency> + + <!-- Easy Stream --> <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metrics-influxdb_${scala.binary.version}</artifactId> - <version>${flink.version}</version> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-common</artifactId> + </dependency> + <dependency> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-core</artifactId> </dependency> <dependency> - <groupId>com.zdjizhi</groupId> - <artifactId>galaxy</artifactId> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-grouped-exec-pipeline</artifactId> </dependency> <dependency> - <groupId>org.projectlombok</groupId> - <artifactId>lombok</artifactId> - <version>1.18.26</version> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-filter-pipeline</artifactId> + </dependency> + <dependency> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-console-pipeline</artifactId> + </dependency> + <dependency> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-split-pipeline</artifactId> + <version>${easy.stream.version}</version> + </dependency> + <dependency> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-correlate-pipeline</artifactId> + <version>${easy.stream.version}</version> + </dependency> + <dependency> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-union-pipeline</artifactId> + <version>${easy.stream.version}</version> </dependency> <dependency> - <groupId>com.github.javafaker</groupId> - <artifactId>javafaker</artifactId> - <scope>test</scope> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-kafka-connector</artifactId> + </dependency> + <dependency> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-text-connector</artifactId> + <version>${easy.stream.version}</version> + </dependency> + <dependency> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-socket-connector</artifactId> + <version>${easy.stream.version}</version> + </dependency> + <dependency> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-console-connector</artifactId> + <version>${easy.stream.version}</version> </dependency> + + <dependency> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-json-format</artifactId> + </dependency> + <dependency> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-flink-shim</artifactId> + </dependency> + + <!-- Flink --> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_${scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>test</scope> + <artifactId>flink-clients_${scala.binary.version}</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> - <version>${flink.version}</version> - <classifier>tests</classifier> - <scope>test</scope> </dependency> <dependency> - <groupId>org.junit.jupiter</groupId> - <artifactId>junit-jupiter</artifactId> - <scope>test</scope> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_${scala.binary.version}</artifactId> </dependency> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-slf4j-impl</artifactId> - <scope>runtime</scope> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime-web_${scala.binary.version}</artifactId> </dependency> <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-api</artifactId> - <scope>runtime</scope> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> </dependency> <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-core</artifactId> - <scope>runtime</scope> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId> + </dependency> + + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> + <!-- DEV --> + <dependency> + <groupId>com.github.spotbugs</groupId> + <artifactId>spotbugs-annotations</artifactId> + <version>4.4.2</version> + </dependency> + <!-- LOG --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> @@ -134,30 +215,43 @@ <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${log4j.version}</version> + <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j.version}</version> + <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version> + <scope>runtime</scope> </dependency> <dependency> <!-- API bridge between log4j 1 and 2 --> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-1.2-api</artifactId> <version>${log4j.version}</version> + <scope>runtime</scope> </dependency> + <!-- Test --> <dependency> - <groupId>com.github.javafaker</groupId> - <artifactId>javafaker</artifactId> - <version>1.0.2</version> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> + <version>${junit.version}</version> + <scope>test</scope> </dependency> + <!-- Common --> <dependency> <groupId>com.zdjizhi</groupId> <artifactId>galaxy</artifactId> @@ -171,170 +265,236 @@ <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </exclusion> </exclusions> </dependency> + <!-- Easy Stream--> <dependency> - <groupId>org.junit.jupiter</groupId> - <artifactId>junit-jupiter</artifactId> - <version>5.8.0</version> - <scope>test</scope> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-common</artifactId> + <version>${easy.stream.version}</version> + </dependency> + <dependency> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-core</artifactId> + <version>${easy.stream.version}</version> + </dependency> + <dependency> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-grouped-exec-pipeline</artifactId> + <version>${easy.stream.version}</version> + </dependency> + <dependency> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-filter-pipeline</artifactId> + <version>${easy.stream.version}</version> + </dependency> + <dependency> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-console-pipeline</artifactId> + <version>${easy.stream.version}</version> + </dependency> + <dependency> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-kafka-connector</artifactId> + <version>${easy.stream.version}</version> + </dependency> + <dependency> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-json-format</artifactId> + <version>${easy.stream.version}</version> + </dependency> + <dependency> + <groupId>com.geedgenetworks.flink</groupId> + <artifactId>easy-stream-flink-shim</artifactId> + <version>${easy.stream.version}</version> + </dependency> + + <!-- Flink --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime-web_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-json</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>3.1.2</version> + <configuration> + <suppressionsLocation>${basedir}/tools/maven/suppressions.xml</suppressionsLocation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <configLocation>${basedir}/tools/maven/checkstyle.xml</configLocation> + <logViolationsToConsole>true</logViolationsToConsole> + <failOnViolation>true</failOnViolation> + </configuration> + <dependencies> + <dependency> + <groupId>com.puppycrawl.tools</groupId> + <artifactId>checkstyle</artifactId> + <version>8.40</version> + </dependency> + </dependencies> <executions> <execution> - <id>test-sources</id> - <phase>generate-test-sources</phase> + <id>java-style-check</id> + <phase>compile</phase> <goals> - <goal>add-test-source</goal> + <goal>check</goal> </goals> <configuration> - <sources> - <source>src/it/java</source> - </sources> + <sourceDirectories>src/main/java</sourceDirectories> </configuration> </execution> <execution> - <id>test-resources</id> - <phase>generate-test-resources</phase> + <id>java-test-style-check</id> + <phase>test-compile</phase> <goals> - <goal>add-test-resource</goal> + <goal>check</goal> </goals> <configuration> - <resources> - <resource> - <directory>src/it/resources</directory> - </resource> - </resources> + <testSourceDirectories>src/test/java</testSourceDirectories> + <includeTestSourceDirectory>true</includeTestSourceDirectory> </configuration> </execution> </executions> </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.1</version> + <groupId>com.github.spotbugs</groupId> + <artifactId>spotbugs-maven-plugin</artifactId> + <version>4.4.2.2</version> <configuration> - <source>${java.version}</source> - <target>${java.version}</target> + <xmlOutput>true</xmlOutput> + <!-- Low, Medium, High ('Low' is strictest) --> + <threshold>Low</threshold> + <effort>default</effort> + <spotbugsXmlOutputDirectory>${project.build.directory}/spotbugs</spotbugsXmlOutputDirectory> + <excludeFilterFile>${basedir}/tools/maven/spotbugs-exclude.xml</excludeFilterFile> + <failOnError>true</failOnError> </configuration> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <executions> - <execution> - <id>copy-tests-dependencies</id> - <phase>package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>${project.build.outputDirectory}/tests-lib</outputDirectory> - <excludeScope>system</excludeScope> - <excludeTransitive>false</excludeTransitive> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.jacoco</groupId> - <artifactId>jacoco-maven-plugin</artifactId> <executions> <execution> - <id>pre-unit-test</id> + <id>findbugs-main</id> + <phase>compile</phase> <goals> - <goal>prepare-agent</goal> + <goal>check</goal> </goals> - <configuration> - <destFile>${project.build.directory}/jacoco.exec</destFile> - </configuration> </execution> <execution> - <id>test-report</id> - <phase>verify</phase> + <id>findbugs-test</id> + <phase>test-compile</phase> <goals> - <goal>report</goal> + <goal>check</goal> </goals> <configuration> - <dataFile>${project.build.directory}/jacoco.exec</dataFile> - <outputDirectory>${project.reporting.outputDirectory}/jacoco</outputDirectory> + <includeTests>true</includeTests> </configuration> </execution> </executions> </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-source-plugin</artifactId> - </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <version>3.1.0</version> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>io.github.zlika</groupId> - <artifactId>reproducible-build-maven-plugin</artifactId> - <version>0.2</version> - <executions> - <execution> - <goals> - <goal>strip-jar</goal> - </goals> - <phase>package</phase> - </execution> - </executions> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.8.1</version> + <configuration> + <source>${maven.compiler.source}</source> + <target>${maven.compiler.target}</target> + </configuration> </plugin> - <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> - <version>3.1.1</version> + <version>3.5.1</version> <executions> <execution> + <id>default-shade</id> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> + <finalName>${project.artifactId}-${project.version}</finalName> <artifactSet> <excludes> <exclude>org.apache.flink:force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>org.apache.logging.log4j:*</exclude> + <exclude>org.mockito:mockito-core</exclude> </excludes> </artifactSet> <filters> <filter> - <!-- Do not copy the signatures in the META-INF folder. - Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> @@ -343,50 +503,151 @@ </excludes> </filter> </filters> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <version>3.0.0</version> + <executions> + <execution> + <id>build-jobs</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <finalName>${project.artifactId}-yml-${project.version}</finalName> + <appendAssemblyId>false</appendAssemblyId> + <descriptors> + <descriptor>tools/dist/target.xml</descriptor> + </descriptors> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-resources-plugin</artifactId> + <version>3.0.0</version> + <executions> + <execution> + <id>site-resources</id> + <phase>pre-site</phase> + <goals> + <goal>resources</goal> + </goals> + <configuration> + <resources> + <resource> + <directory>src/site</directory> + <filtering>true</filtering> + <includes> + <include>**</include> + </includes> + </resource> + </resources> </configuration> </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-site-plugin</artifactId> + <configuration> + <skip>false</skip> + </configuration> + <executions> + <execution> + <id>default-site</id> + <goals> + <goal>site</goal> + </goals> + <phase>site</phase> + <configuration> + <siteDirectory>${project.build.outputDirectory}</siteDirectory> + </configuration> + </execution> + <execution> + <id>site-deploy</id> + <goals> + <goal>stage-deploy</goal> + </goals> + <phase>deploy</phase> + </execution> + </executions> + </plugin> </plugins> <pluginManagement> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-source-plugin</artifactId> - <version>3.2.1</version> + <artifactId>maven-enforcer-plugin</artifactId> + <version>3.0.0-M3</version> + <executions> + <execution> + <id>release-version-check</id> + <goals> + <goal>enforce</goal> + </goals> + <configuration> + <rules> + <requireReleaseVersion> + <message>SNAPSHOT versions ${project.version} are not allowed.</message> + </requireReleaseVersion> + </rules> + </configuration> + </execution> + <execution> + <id>snapshot-version-check</id> + <goals> + <goal>enforce</goal> + </goals> + <configuration> + <rules> + <requireSnapshotVersion> + <message>Non-SNAPSHOT versions ${project.version} are not allowed.</message> + </requireSnapshotVersion> + </rules> + </configuration> + </execution> + </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <version>3.0.0-M5</version> - </plugin> - <plugin> - <groupId>org.jacoco</groupId> - <artifactId>jacoco-maven-plugin</artifactId> - <version>0.8.7</version> + <artifactId>maven-site-plugin</artifactId> + <version>3.9.1</version> + <configuration> + <outputDirectory>${project.build.directory}/site</outputDirectory> + <relativizeDecorationLinks>false</relativizeDecorationLinks> + </configuration> + <dependencies> + <dependency> + <groupId>org.apache.maven.wagon</groupId> + <artifactId>wagon-webdav-jackrabbit</artifactId> + <version>2.8</version> + </dependency> + <dependency> + <groupId>org.apache.maven.doxia</groupId> + <artifactId>doxia-module-markdown</artifactId> + <version>1.9.1</version> + </dependency> + </dependencies> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <version>3.3.0</version> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>3.1.0</version> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <version>3.1.0</version> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <version>3.6.0</version> + <artifactId>maven-deploy-plugin</artifactId> + <version>3.1.1</version> </plugin> </plugins> </pluginManagement> </build> + </project>
\ No newline at end of file diff --git a/src/it/java/com/zdjizhi/flink/voip/CorrelateTest.java b/src/it/java/com/zdjizhi/flink/voip/CorrelateTest.java deleted file mode 100644 index 398d61a..0000000 --- a/src/it/java/com/zdjizhi/flink/voip/CorrelateTest.java +++ /dev/null @@ -1,95 +0,0 @@ -package com.zdjizhi.flink.voip; - -import com.zdjizhi.flink.voip.functions.*; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.CheckpointConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -import java.util.Objects; - -// Integration test main class -public class CorrelateTest { - - public static void main(String[] args) throws Exception { - final Configuration envConfig = new Configuration(); - envConfig.setInteger("rest.port", 18081); - envConfig.setBoolean("rest.flamegraph.enabled", true); - envConfig.setString("metrics.reporter.influxdb.factory.class", "org.apache.flink.metrics.influxdb.InfluxdbReporterFactory"); - envConfig.setString("metrics.reporter.influxdb.scheme", "http"); - envConfig.setString("metrics.reporter.influxdb.host", "127.0.0.1"); - envConfig.setInteger("metrics.reporter.influxdb.port", 8086); - envConfig.setString("metrics.reporter.influxdb.db", "flinks"); - envConfig.setString("metrics.reporter.influxdb.interval", "5 SECONDS"); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment - .createLocalEnvironmentWithWebUI(envConfig); - - final ParameterTool tool = ParameterTool.fromPropertiesFile( - args.length > 0 ? args[0] : - Objects.requireNonNull( - CorrelateTest.class.getResource("/application-test.properties")).getPath() - ); - - final CheckpointConfig checkpointConfig = env.getCheckpointConfig(); - env.enableCheckpointing(Time.minutes(1) - .toMilliseconds(), CheckpointingMode.AT_LEAST_ONCE); - checkpointConfig.setCheckpointTimeout(Time.minutes(2).toMilliseconds()); - checkpointConfig.setCheckpointStorage("file://" + System.getProperty("java.io.tmpdir")); - /* checkpointConfig.setCheckpointStorage( - Objects.requireNonNull( - CorrelateTest.class.getResource("/")).toString());*/ - - final Configuration config = tool.getConfiguration(); - env.getConfig().setGlobalJobParameters(config); - env.setParallelism(8); - - final SingleOutputStreamOperator<ObjectNode> sourceStream = env.addSource(new DataGenSource()) - .name("DataGenSource") - .setParallelism(4); - - // Process the data using the TypeSplitFunction and split it into separate DataStreams for SIP and RTP data - final SingleOutputStreamOperator<ObjectNode> splitsRecordsOperator = - sourceStream - .process(new TypeSplitFunction()) - .name("SplitsRecordsBasedSchemaType") - .uid("splits-records-based-schema-type"); - - // Get the DataStreams for SIP and RTP data from the side outputs. - final DataStream<ObjectNode> sipDataStream = splitsRecordsOperator - .getSideOutput(TypeSplitFunction.SIP_OUTPUT_TAG); - - final DataStream<ObjectNode> rtpDataStream = splitsRecordsOperator - .getSideOutput(TypeSplitFunction.RTP_OUTPUT_TAG); - - // Process SIP data to create a double directional stream using SIPPairingFunction. - final SingleOutputStreamOperator<ObjectNode> sipDoubleDirOperator = sipDataStream - .keyBy(new SIPKeySelector()) - .process(new SIPPairingFunction()) - .name("PairingOneWayToDoubleStream") - .uid("pairing-one-way-to-double"); - - final KeySelector<ObjectNode, Tuple2<Integer, Address>> vSysSelector = new VSysIDKeySelector(); - - // Fusion SIP data and RTP data to VoIP data. - final SingleOutputStreamOperator<ObjectNode> voIpOperator = rtpDataStream - .keyBy(vSysSelector) - .connect(sipDoubleDirOperator.keyBy(vSysSelector)) - .process(new VoIPFusionFunction()) - .name("VoIPFusion") - .uid("voip-fusion"); - - voIpOperator.addSink(new DoNothingSink()) - .name("DoNothingSink") - .setParallelism(1); - - env.execute("VoIP Fusion Job"); - } -} diff --git a/src/it/java/com/zdjizhi/flink/voip/conf/TestConfigs.java b/src/it/java/com/zdjizhi/flink/voip/conf/TestConfigs.java deleted file mode 100644 index f1b1b6c..0000000 --- a/src/it/java/com/zdjizhi/flink/voip/conf/TestConfigs.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.zdjizhi.flink.voip.conf; - -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; - -// Test Configs. -public class TestConfigs { - - // Ratio of valuable data in the generated dataset - public static final ConfigOption<Integer> VALUABLE_DATA_PROPORTION = - ConfigOptions.key("valuable.data.ratio") - .intType() - .defaultValue(40) - .withDescription("Ratio of valuable data in the generated dataset."); - - // QPS of generate date record - public static final ConfigOption<Long> DATA_GENERATE_RATE = - ConfigOptions.key("data.generate.rate") - .longType() - .defaultValue(1000L) - .withDescription("QPS of generate date record."); -} diff --git a/src/it/java/com/zdjizhi/flink/voip/functions/DataGenSource.java b/src/it/java/com/zdjizhi/flink/voip/functions/DataGenSource.java deleted file mode 100644 index 43e9671..0000000 --- a/src/it/java/com/zdjizhi/flink/voip/functions/DataGenSource.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.zdjizhi.flink.voip.functions; - -import com.zdjizhi.flink.voip.conf.TestConfigs; -import com.zdjizhi.flink.voip.data.RTPGenerator; -import com.zdjizhi.flink.voip.data.SIPGenerator; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; - -// Date Generate Source. -public class DataGenSource extends RichParallelSourceFunction<ObjectNode> implements FunctionHelper { - - private transient SIPGenerator sipGenerator; - private transient RTPGenerator rtpGenerator; - private transient RateLimiter rateLimiter; - private volatile boolean running; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - Integer ratio = getGlobalConfiguration() - .get(TestConfigs.VALUABLE_DATA_PROPORTION); - this.sipGenerator = new SIPGenerator(ratio); - this.rtpGenerator = new RTPGenerator(ratio, sipGenerator); - final Long rate = getGlobalConfiguration() - .get(TestConfigs.DATA_GENERATE_RATE); - this.rateLimiter = RateLimiter.create( - (int) (rate / getRuntimeContext().getNumberOfParallelSubtasks())); - this.running = true; - } - - @Override - public void run(SourceContext<ObjectNode> ctx) throws Exception { - while (running) { - rateLimiter.acquire(); - ctx.collect(sipGenerator.next()); - ctx.collect(rtpGenerator.next()); - } - } - - @Override - public void cancel() { - this.running = false; - } -} diff --git a/src/it/java/com/zdjizhi/flink/voip/functions/DoNothingSink.java b/src/it/java/com/zdjizhi/flink/voip/functions/DoNothingSink.java deleted file mode 100644 index 6a6e573..0000000 --- a/src/it/java/com/zdjizhi/flink/voip/functions/DoNothingSink.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.zdjizhi.flink.voip.functions; - -import com.zdjizhi.flink.voip.records.Record; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.MeterView; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; - - -// It dose nothing with the incoming data and simply collects metrics for the number of -// RTP and VoIP records processed per second. -public class DoNothingSink extends RichSinkFunction<ObjectNode> { - - private transient MeterView numRTPRecordsPreSecond; - private transient MeterView numVoIPRecordsPreSecond; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - RuntimeContext runtimeContext = getRuntimeContext(); - MetricGroup metricGroup = runtimeContext.getMetricGroup(); - numRTPRecordsPreSecond = metricGroup - .meter("numRTPRecordsPreSecond", new MeterView(1)); - numVoIPRecordsPreSecond = metricGroup - .meter("numVoIPRecordsPreSecond", new MeterView(1)); - } - - @Override - public void invoke(ObjectNode obj, Context context) throws Exception { - Record record = new Record(obj); - switch (record.getSchemaType()) { - case RTP: - numRTPRecordsPreSecond.markEvent(); - break; - case VOIP: - numVoIPRecordsPreSecond.markEvent(); - break; - default: - } - } -} diff --git a/src/it/resources/application-test.properties b/src/it/resources/application-test.properties deleted file mode 100644 index 8d510b3..0000000 --- a/src/it/resources/application-test.properties +++ /dev/null @@ -1,4 +0,0 @@ -sip.state.clear.interval.minutes=1 -rtp.state.clear.interval.minutes=10 -valuable.data.ratio=20 -data.generate.rate=1000
\ No newline at end of file diff --git a/src/it/resources/log4j2-test.properties b/src/it/resources/log4j2-test.properties deleted file mode 100644 index 16f3340..0000000 --- a/src/it/resources/log4j2-test.properties +++ /dev/null @@ -1,25 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -rootLogger.level = INFO -rootLogger.appenderRef.test.ref = TestLogger - -appender.test.name = TestLogger -appender.test.type = CONSOLE -appender.test.layout.type = PatternLayout -appender.test.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java new file mode 100644 index 0000000..20d0746 --- /dev/null +++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java @@ -0,0 +1,33 @@ +package com.geedgenetworks.flink.easy.application.voip; + +import com.geedgenetworks.flink.easy.application.voip.udf.*; +import com.geedgenetworks.flink.easy.common.api.UDFFactory; +import org.apache.flink.table.functions.UserDefinedFunction; + +import java.util.HashMap; +import java.util.Map; + +public class VoipUDFFactory implements UDFFactory { + + private static final Map<String, UserDefinedFunction> R = + new HashMap<>() {{ + put("IS_IP_ADDRESS", new IsIpAddress()); + + put("IS_INTERNAL_IP_ADDRESS", new IsInternalIpAddress()); + put("IS_EXTERNAL_IP_ADDRESS", new IsExternalIpAddress()); + + put("HAS_IP_ADDRESS", new HasIpAddress()); + put("HAS_EXTERNAL_IP_ADDRESS", new HasExternalIpAddress()); + + put("STREAM_DIR", new StreamDir()); + put("FIND_NOT_BLANK", new FindNotBlank()); + put("SORT_ADDRESS", new SortAddress()); + + put("SNOWFLAKE_ID", new SnowflakeID()); + }}; + + @Override + public Map<String, UserDefinedFunction> register() { + return R; + } +} diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/FindNotBlank.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/FindNotBlank.java new file mode 100644 index 0000000..a018ce2 --- /dev/null +++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/FindNotBlank.java @@ -0,0 +1,15 @@ +package com.geedgenetworks.flink.easy.application.voip.udf; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; + +public class FindNotBlank extends ScalarFunction { + + public @DataTypeHint("STRING") String eval(String s1, String s2) { + if (StringUtils.isBlank(s1) && StringUtils.isNotBlank(s2)) { + return s2; + } + return s1; + } +} diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/HasExternalIpAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/HasExternalIpAddress.java new file mode 100644 index 0000000..2ce254a --- /dev/null +++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/HasExternalIpAddress.java @@ -0,0 +1,19 @@ +package com.geedgenetworks.flink.easy.application.voip.udf; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; + +public class HasExternalIpAddress extends ScalarFunction { + + private final IsExternalIpAddress isExternalIpAddress = new IsExternalIpAddress(); + + public @DataTypeHint("BOOLEAN") Boolean eval(String... ipaddr) { + if (null == ipaddr) { + return false; + } + for (var ip : ipaddr) { + return isExternalIpAddress.eval(ip); + } + return false; + } +} diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/HasIpAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/HasIpAddress.java new file mode 100644 index 0000000..7dddbc7 --- /dev/null +++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/HasIpAddress.java @@ -0,0 +1,18 @@ +package com.geedgenetworks.flink.easy.application.voip.udf; + +import com.zdjizhi.utils.IPUtil; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; + +public class HasIpAddress extends ScalarFunction { + + public @DataTypeHint("BOOLEAN") Boolean eval(String... ipaddr) { + if (null == ipaddr) { + return false; + } + for (var ip : ipaddr) { + return ip != null && IPUtil.isIPAddress(ip); + } + return false; + } +} diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsExternalIpAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsExternalIpAddress.java new file mode 100644 index 0000000..a2970a6 --- /dev/null +++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsExternalIpAddress.java @@ -0,0 +1,17 @@ +package com.geedgenetworks.flink.easy.application.voip.udf; + +import com.zdjizhi.utils.IPUtil; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; + +import static com.zdjizhi.utils.IPUtil.isIPAddress; + +public class IsExternalIpAddress extends ScalarFunction { + + public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) { + if (ipaddr == null || !isIPAddress(ipaddr)) { + return false; + } + return !IPUtil.internalIp(ipaddr); + } +} diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsInternalIpAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsInternalIpAddress.java new file mode 100644 index 0000000..55839ba --- /dev/null +++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsInternalIpAddress.java @@ -0,0 +1,17 @@ +package com.geedgenetworks.flink.easy.application.voip.udf; + +import com.zdjizhi.utils.IPUtil; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; + +import static com.zdjizhi.utils.IPUtil.isIPAddress; + +public class IsInternalIpAddress extends ScalarFunction { + + public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) { + if (!isIPAddress(ipaddr)) { + return false; + } + return IPUtil.internalIp(ipaddr); + } +} diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsIpAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsIpAddress.java new file mode 100644 index 0000000..cbef834 --- /dev/null +++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsIpAddress.java @@ -0,0 +1,15 @@ +package com.geedgenetworks.flink.easy.application.voip.udf; + +import com.zdjizhi.utils.IPUtil; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; + +public class IsIpAddress extends ScalarFunction { + + public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) { + if (null == ipaddr) { + return false; + } + return IPUtil.isIPAddress(ipaddr); + } +} diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SnowflakeID.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SnowflakeID.java new file mode 100644 index 0000000..abaea8f --- /dev/null +++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SnowflakeID.java @@ -0,0 +1,14 @@ +package com.geedgenetworks.flink.easy.application.voip.udf; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import xyz.downgoon.snowflake.Snowflake; + +public class SnowflakeID extends ScalarFunction { + + private static final Snowflake SNOWFLAKE = new Snowflake(1, 1); + + public @DataTypeHint("BIGINT") Long eval() { + return SNOWFLAKE.nextId(); + } +} diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java new file mode 100644 index 0000000..ab422b7 --- /dev/null +++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java @@ -0,0 +1,31 @@ +package com.geedgenetworks.flink.easy.application.voip.udf; + +import com.google.common.collect.Lists; +import com.zdjizhi.utils.IPUtil; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; + +public class SortAddress extends ScalarFunction { + + public @DataTypeHint("STRING") + String eval( + String ip1, Integer port1, String ip2, Integer port2) { + return of(Tuple2.of(ip1, port1), Tuple2.of(ip2, port2)); + } + + public static String of( + Tuple2<String, Integer> a1, Tuple2<String, Integer> a2) { + var list = Lists.newArrayList(a1, a2); + list.sort((a, b) -> { + if (a.f1.equals(b.f1)) { + return Long.compare( + IPUtil.getIpDesimal(a.f0), IPUtil.getIpDesimal(b.f0)); + } else { + return a.f1.compareTo(b.f1); + } + }); + return String.format("%s:%s,%s:%s", + list.get(0).f0, list.get(0).f1, list.get(1).f0, list.get(1).f1); + } +}
\ No newline at end of file diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java new file mode 100644 index 0000000..3f5166f --- /dev/null +++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java @@ -0,0 +1,26 @@ +package com.geedgenetworks.flink.easy.application.voip.udf; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; + +public class StreamDir extends ScalarFunction { + + public @DataTypeHint("INT") Integer eval(Long flags) { + int v = 0; + if (flags == null) { + return v; + } + if ((flags & 8192) == 8192) { + v += 1; + } + if ((flags & 16384) == 16384) { + v += 2; + } + return v; + } + + public static void main(String[] args) { + System.out.println(8192L + 16384L); + System.out.println(new StreamDir().eval(8192L + 16384L)); + } +}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java deleted file mode 100644 index 304e04d..0000000 --- a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java +++ /dev/null @@ -1,108 +0,0 @@ -package com.zdjizhi.flink.voip; - -import com.zdjizhi.flink.voip.conf.FusionConfiguration; -import com.zdjizhi.flink.voip.error.ErrorHandler; -import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema; -import com.zdjizhi.flink.voip.functions.*; -import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.formats.json.JsonNodeDeserializationSchema; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; - -import java.time.Duration; - -import static com.zdjizhi.flink.voip.conf.FusionConfigs.*; - -/** - * The main class for running the SIP RTP Correlation application. - * - * @author chaoc - * @since 1.0 - */ -public class CorrelateApp { - - public static void main(String[] args) throws Exception { - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - // param check - if (args.length < 1) { - throw new IllegalArgumentException("Error: Not found properties path. " + - "\nUsage: flink -c xxx xxx.jar app.properties."); - } - - final ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]); - - final Configuration config = tool.getConfiguration(); - env.getConfig().setGlobalJobParameters(config); - - final FusionConfiguration fusionConfiguration = new FusionConfiguration(config); - - final FlinkKafkaConsumer<ObjectNode> kafkaConsumer = new FlinkKafkaConsumer<>( - config.get(SOURCE_KAFKA_TOPIC), - new JsonNodeDeserializationSchema(), - fusionConfiguration - .getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX)); - - final DataStream<ObjectNode> sourceStream = env.addSource(kafkaConsumer) - .assignTimestampsAndWatermarks( - WatermarkStrategy - .<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(5)) - .withTimestampAssigner((SerializableTimestampAssigner<ObjectNode>) - (element, recordTimestamp) -> - element.get("start_timestamp_ms").asLong())); - - final ErrorHandler errorHandler = new ErrorHandler(config); - - // Process the data using the TypeSplitFunction and split it into separate DataStreams for SIP and RTP data - final SingleOutputStreamOperator<ObjectNode> splitsRecordsOperator = - errorHandler.filterError(sourceStream) - .process(new TypeSplitFunction()) - .name("SplitsRecordsBasedSchemaType") - .uid("splits-records-based-schema-type"); - - // Get the DataStreams for SIP and RTP data from the side outputs. - final DataStream<ObjectNode> sipDataStream = splitsRecordsOperator - .getSideOutput(TypeSplitFunction.SIP_OUTPUT_TAG); - - final DataStream<ObjectNode> rtpDataStream = splitsRecordsOperator - .getSideOutput(TypeSplitFunction.RTP_OUTPUT_TAG); - - // Process SIP data to create a double directional stream using SIPPairingFunction. - final SingleOutputStreamOperator<ObjectNode> sipDoubleDirOperator = sipDataStream - .keyBy(new SIPKeySelector()) - .process(new SIPPairingFunction()) - .name("PairingOneWayToDoubleStream") - .uid("pairing-one-way-to-double"); - - final KeySelector<ObjectNode, Tuple2<Integer, Address>> vSysSelector = new VSysIDKeySelector(); - - // Fusion SIP data and RTP data to VoIP data. - final SingleOutputStreamOperator<ObjectNode> voIpOperator = rtpDataStream - .keyBy(vSysSelector) - .connect(sipDoubleDirOperator.keyBy(vSysSelector)) - .process(new VoIPFusionFunction()) - .name("VoIPFusion") - .uid("voip-fusion"); - - final FlinkKafkaProducer<ObjectNode> producer = new FlinkKafkaProducer<>( - config.get(SINK_KAFKA_TOPIC), - new JsonNodeSerializationSchema(), - fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)); - - voIpOperator - .union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG)) - .addSink(producer); - - env.execute(config.get(JOB_NAME)); - } -} diff --git a/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java b/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java deleted file mode 100644 index 926c5a0..0000000 --- a/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java +++ /dev/null @@ -1,107 +0,0 @@ -package com.zdjizhi.flink.voip.conf; - -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; - -/** - * Containing configuration options for the Fusion application. - * - * @author chaoc - * @since 1.0 - */ -public class FusionConfigs { - - /** - * The prefix for Kafka properties used in the source. - */ - public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props."; - - /** - * The prefix for Kafka properties used in the sink. - */ - public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props."; - - /** - * Configuration prefix for the properties of the Kafka sink where the error data will be output. - */ - public static final String ERROR_SINK_KAFKA_PROPERTIES_PREFIX = "error.sink.kafka.props."; - - /** - * Configuration option for the Kafka topic used in the source. - */ - public static final ConfigOption<String> SOURCE_KAFKA_TOPIC = - ConfigOptions.key("source.kafka.topic") - .stringType() - .noDefaultValue() - .withDescription("The Kafka topic used in the source."); - - /** - * Configuration option for the Kafka topic used in the sink. - */ - public static final ConfigOption<String> SINK_KAFKA_TOPIC = - ConfigOptions.key("sink.kafka.topic") - .stringType() - .noDefaultValue() - .withDescription("The Kafka topic used in the sink."); - - /** - * Configuration option to enable or disable the output of error records. - * If set to true, the error records will be sent to the specified Kafka topic. - * Default value is false. - */ - public static final ConfigOption<Boolean> ERROR_RECORDS_OUTPUT_ENABLE = - ConfigOptions.key("error.records.output.enable") - .booleanType() - .defaultValue(false) - .withDescription("Enable or disable the output of error records. " + - "If set to true, the error records will be sent to the specified Kafka topic."); - - /** - * Configuration option to determine whether to perform data correlate for intranet addresses. - */ - public static final ConfigOption<Boolean> INCLUDE_INTRANET_IP = - ConfigOptions.key("include.intranet.ip") - .booleanType() - .defaultValue(true) - .withDescription("Whether to perform data correlate for intranet addresses"); - - - /** - * Configuration option for specifying the Kafka topic name where the error data will be sent. - * This configuration option is used when the output of error records is enabled. - */ - public static final ConfigOption<String> ERROR_SINK_KAFKA_TOPIC = - ConfigOptions.key("error.sink.kafka.topic") - .stringType() - .noDefaultValue() - .withDescription("The Kafka topic name where the error records will be sent."); - - /** - * The configuration option for the interval at which SIP state data - * should be cleared. - */ - public static final ConfigOption<Integer> SIP_STATE_CLEAR_INTERVAL = - ConfigOptions.key("sip.state.clear.interval.minutes") - .intType() - .defaultValue(1) - .withDescription("The interval at which SIP state data should be cleared."); - - /** - * The configuration option for the interval at which STP state data - * should be cleared. - */ - public static final ConfigOption<Integer> RTP_STATE_CLEAR_INTERVAL = - ConfigOptions.key("rtp.state.clear.interval.minutes") - .intType() - .defaultValue(6) - .withDescription("The interval at which RTP state data should be cleared."); - - /** - * Configuration option for specifying the name of a job. - */ - public static final ConfigOption<String> JOB_NAME = - ConfigOptions.key("job.name") - .stringType() - .defaultValue("correlation_sip_rtp_session") - .withDescription("The name of current job."); -}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfiguration.java b/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfiguration.java deleted file mode 100644 index ad983fb..0000000 --- a/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfiguration.java +++ /dev/null @@ -1,44 +0,0 @@ -package com.zdjizhi.flink.voip.conf; - -import org.apache.flink.configuration.Configuration; - -import java.util.Properties; - -/** - * A wrapper class that extends the Flink `Configuration` to provide utility methods for handling - * properties with a specific prefix. This class allows retrieving properties that start with the - * given `prefix` and converts them into a `java.util.Properties` object. - * - * @author chaoc - * @since 1.0 - */ - -public class FusionConfiguration { - private final Configuration config; - - public FusionConfiguration(final Configuration config) { - this.config = config; - } - - /** - * Retrieves properties from the underlying `Configuration` instance that start with the specified - * `prefix`. The properties are then converted into a `java.util.Properties` object and returned. - * - * @param prefix The prefix to filter properties. - * @return A `java.util.Properties` object containing the properties with the specified prefix. - */ - public Properties getProperties(final String prefix) { - if (prefix == null) { - final Properties props = new Properties(); - props.putAll(config.toMap()); - return props; - } - return config.toMap() - .entrySet() - .stream() - .filter(entry -> entry.getKey().startsWith(prefix)) - .collect(Properties::new, (props, e) -> - props.setProperty(e.getKey().substring(prefix.length()), e.getValue()), - Properties::putAll); - } -} diff --git a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java deleted file mode 100644 index be41377..0000000 --- a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java +++ /dev/null @@ -1,177 +0,0 @@ -package com.zdjizhi.flink.voip.error; - -import com.zdjizhi.flink.voip.conf.FusionConfigs; -import com.zdjizhi.flink.voip.conf.FusionConfiguration; -import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema; -import com.zdjizhi.flink.voip.functions.FunctionHelper; -import com.zdjizhi.flink.voip.records.Record; -import com.zdjizhi.flink.voip.records.SIPRecord; -import com.zdjizhi.flink.voip.records.SchemaType; -import com.zdjizhi.flink.voip.records.StreamDir; -import com.zdjizhi.utils.IPUtil; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; -import org.apache.flink.util.Collector; -import org.apache.flink.util.OutputTag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.function.Function; - -/** - * The ErrorHandler class is responsible for handling and filtering error records from the input data stream. - * It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled. - * - * @author chaoc - * @since 1.0 - */ -public class ErrorHandler { - - private static final Logger LOG = LoggerFactory.getLogger(ErrorHandler.class); - /** - * The OutputTag for invalid records. - */ - public static final OutputTag<ObjectNode> INVALID_OUTPUT_TAG = - new OutputTag<>("invalid-records", TypeInformation.of(ObjectNode.class)); - - private final Configuration config; - - /** - * Creates a new ErrorHandler instance with the given configuration. - * - * @param config The configuration containing settings for error handling. - */ - public ErrorHandler(final Configuration config) { - this.config = config; - } - - /** - * Filters out error records from the input data stream and outputs them to a separate stream if enabled. - * - * @param dataStream The input data stream of ObjectNode records. - * @return A new DataStream containing valid records without errors. - */ - public DataStream<ObjectNode> filterError(final DataStream<ObjectNode> dataStream) { - // Process the data stream to identify meaningless addresses and ports - final SingleOutputStreamOperator<ObjectNode> operator = dataStream - .process(new MeaninglessAddressProcessFunction()) - .name("MeaninglessRecords") - .uid("meaningless-records"); - - // If enabled, output the error records to the specified Kafka topic - if (config.get(FusionConfigs.ERROR_RECORDS_OUTPUT_ENABLE)) { - - final String topic = config.get(FusionConfigs.ERROR_SINK_KAFKA_TOPIC); - - LOG.info("Meaningless data output is enabled, data will be sent to: Topic [{}]", topic); - - final DataStream<ObjectNode> errorStream = operator - .getSideOutput(INVALID_OUTPUT_TAG); - - final FlinkKafkaProducer<ObjectNode> producer = new FlinkKafkaProducer<>( - topic, - new JsonNodeSerializationSchema(), - new FusionConfiguration(config) - .getProperties(FusionConfigs.ERROR_SINK_KAFKA_PROPERTIES_PREFIX) - ); - - errorStream.addSink(producer); - } - return operator; - } -} - -/** - * The MeaninglessAddressProcessFunction is a custom ProcessFunction used to check for records - * with invalid or meaningless addresses and ports. It separates them into the invalid output tag if necessary. - */ -class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, ObjectNode> implements FunctionHelper { - - private static final Logger LOG = LoggerFactory.getLogger(MeaninglessAddressProcessFunction.class); - - private transient boolean includeIntranetIp; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - final Configuration config = getGlobalConfiguration(); - includeIntranetIp = config.get(FusionConfigs.INCLUDE_INTRANET_IP); - } - - @Override - public void processElement(ObjectNode obj, - ProcessFunction<ObjectNode, ObjectNode>.Context ctx, - Collector<ObjectNode> out) throws Exception { - final Record record = new Record(obj); - // Check for invalid or meaningless addresses and ports - boolean cond1 = isIPAddress(record.getClientIp()) && - isIPAddress(record.getServerIp()) && - record.getClientPort() > 0 && - record.getServerPort() > 0; - - boolean cond8 = null != executeSafely(Record::getStreamDir, record); - - final SIPRecord sipRecord = new SIPRecord(obj); - boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) - || isIPAddress(sipRecord.getResponderSdpConnectIp()); - boolean cond3 = !isIPAddress(sipRecord.getResponderSdpConnectIp()) - || (includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())); - boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp()) - || (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp())); - boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType()); - boolean cond6 = StreamDir.DOUBLE == executeSafely(Record::getStreamDir, sipRecord) && - (includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) && - (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp())); - - boolean cond7 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) && - isIPAddress(sipRecord.getResponderSdpConnectIp()) && - sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0; - - // Both client and server addresses in the data are valid. - if (cond1 && cond8 && (!cond5 || cond7) && ( - // The address in the SIP one-way stream is valid and not an internal network address. - cond2 && cond3 && cond4 && cond5 - // The coordinating addresses in the SIP double directional stream are valid - // and not internal network addresses. - || cond5 && cond6 - || !cond5)) { - out.collect(obj); - } else { - // Output invalid records to the invalid output tag - ctx.output(ErrorHandler.INVALID_OUTPUT_TAG, obj); - if (LOG.isDebugEnabled()) { - LOG.debug("Meaningless record: {}", obj); - } - } - } - - // ====================================================================================== - // ----------------------------------- private helper ----------------------------------- - - public static <T, R> R executeSafely(Function<T, R> function, T v) { - try { - return function.apply(v); - } catch (Exception e) { - return null; - } - } - - private static boolean isIPAddress(final String ipaddr) { - if (null == ipaddr) { - return false; - } - return IPUtil.isIPAddress(ipaddr); - } - - private static boolean isInternalIp(final String ipaddr) { - if (!isIPAddress(ipaddr)) { - return false; - } - return IPUtil.internalIp(ipaddr); - } -}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/flink/voip/formats/JsonNodeSerializationSchema.java b/src/main/java/com/zdjizhi/flink/voip/formats/JsonNodeSerializationSchema.java deleted file mode 100644 index d193113..0000000 --- a/src/main/java/com/zdjizhi/flink/voip/formats/JsonNodeSerializationSchema.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.zdjizhi.flink.voip.formats; - -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; - -public class JsonNodeSerializationSchema implements SerializationSchema<ObjectNode> { - - private final ObjectMapper mapper = new ObjectMapper(); - - @Override - public byte[] serialize(ObjectNode jsonNodes) { - try { - return mapper.writeValueAsBytes(jsonNodes); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/Address.java b/src/main/java/com/zdjizhi/flink/voip/functions/Address.java deleted file mode 100644 index 3642f58..0000000 --- a/src/main/java/com/zdjizhi/flink/voip/functions/Address.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.zdjizhi.flink.voip.functions; - -import com.google.common.collect.Lists; -import com.zdjizhi.utils.IPUtil; -import lombok.AllArgsConstructor; -import lombok.Data; -import org.apache.flink.api.java.tuple.Tuple2; - -import java.util.List; - - -/** - * A pojo class representing an address with two IP and port pairs. - * - * @author chaoc - * @since 1.0 - */ -@Data -@AllArgsConstructor -public class Address { - - // The first IP address. - private final String ip1; - - //The first port number. - private final int port1; - - //The second IP address. - private final String ip2; - - //The second port number. - private final int port2; - - /** - * Creates an Address instance based on two tuples containing (String, Int) representing address information. - * The method sorts the addresses based on the port number, and if the ports are equal, it sorts them based on - * the numeric value of the IP address. - * - * @param a1 The first address information as a tuple (IP address, port). - * @param a2 The second address information as a tuple (IP address, port). - * @return An Address instance with addresses sorted and reordered. - */ - public static Address of(Tuple2<String, Integer> a1, Tuple2<String, Integer> a2) { - List<Tuple2<String, Integer>> list = Lists.newArrayList(a1, a2); - list.sort((a, b) -> { - if (a.f1.equals(b.f1)) { - return Long.compare(IPUtil.getIpDesimal(a.f0), IPUtil.getIpDesimal(b.f0)); - } else { - return a.f1.compareTo(b.f1); - } - }); - return new Address(list.get(0).f0, list.get(0).f1, list.get(1).f0, list.get(1).f1); - } -} diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java b/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java deleted file mode 100644 index 216e283..0000000 --- a/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.zdjizhi.flink.voip.functions; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.RichFunction; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.TimerService; - -/** - * An interface that provides utility functions for Flink functions. - * - * @author chaoc - * @since 1.0 - */ -public interface FunctionHelper extends RichFunction { - - /** - * Get the global configuration for the current Flink job. - * - * @return The global configuration as a Configuration object. - */ - - default Configuration getGlobalConfiguration() { - final ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext() - .getExecutionConfig().getGlobalJobParameters(); - return (Configuration) globalJobParameters; - } - - default void registerNextFireTimestamp(TimerService timerService, long interval) { - long current = timerService.currentWatermark(); - timerService.registerEventTimeTimer(current + interval); - } -} diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/ObjectNodeInfo.java b/src/main/java/com/zdjizhi/flink/voip/functions/ObjectNodeInfo.java deleted file mode 100644 index d55110b..0000000 --- a/src/main/java/com/zdjizhi/flink/voip/functions/ObjectNodeInfo.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.zdjizhi.flink.voip.functions; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.Setter; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; - -/** - * A case class representing an ObjectNode with an expiration time and a pair times. - * - * @author chaoc - * @since 1.0 - */ -@Data -@AllArgsConstructor -class ObjectNodeInfo { - - // The ObjectNode containing data. - private ObjectNode obj; - - // The pair times for the object. - @Setter - private int times; - - public void incTimes() { - this.times = this.times + 1; - } -} diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/SIPKeySelector.java b/src/main/java/com/zdjizhi/flink/voip/functions/SIPKeySelector.java deleted file mode 100644 index 678f9c1..0000000 --- a/src/main/java/com/zdjizhi/flink/voip/functions/SIPKeySelector.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.zdjizhi.flink.voip.functions; - -import com.zdjizhi.flink.voip.records.SIPRecord; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; - -/** - * A KeySelector implementation for extracting a composite key from an ObjectNode representing a SIP record. - * - * @author chaoc - * @since 1.0 - */ -public class SIPKeySelector implements KeySelector<ObjectNode, Tuple3<Integer, String, Address>> { - - /** - * Extracts the composite key (VSysID, CallID, Address) from the given ObjectNode. - * - * @param obj The ObjectNode representing a SIP record. - * @return A Tuple3 containing the extracted key (VSysID, CallID, Address). - * @throws Exception Thrown if an error occurs during key extraction. - */ - @Override - public Tuple3<Integer, String, Address> getKey(ObjectNode obj) throws Exception { - final SIPRecord record = new SIPRecord(obj); - final Address address = Address.of(Tuple2.of(record.getClientIp(), record.getClientPort()), - Tuple2.of(record.getServerIp(), record.getServerPort())); - return Tuple3.of(record.getVSysID(), record.getCallID(), address); - } -}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java deleted file mode 100644 index 274da5d..0000000 --- a/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java +++ /dev/null @@ -1,91 +0,0 @@ -package com.zdjizhi.flink.voip.functions; - -import com.zdjizhi.flink.voip.conf.FusionConfigs; -import com.zdjizhi.flink.voip.records.Record; -import com.zdjizhi.flink.voip.records.StreamDir; -import org.apache.flink.api.common.state.StateTtlConfig; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import org.apache.flink.util.Collector; -import org.apache.flink.util.OutputTag; - -/** - * A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction. - * SIP records are paired when they have the same addresses but opposite stream directions. - * - * @author chaoc - * @since 1.0 - */ -public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode> - implements FunctionHelper { - - public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG = - new OutputTag<>("unmatched-sip", TypeInformation.of(ObjectNode.class)); - - private transient Time fireInterval; - - private transient ValueState<ObjectNode> valueState; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - int minutes = getGlobalConfiguration().get(FusionConfigs.SIP_STATE_CLEAR_INTERVAL); - fireInterval = Time.minutes(minutes); - final ValueStateDescriptor<ObjectNode> descriptor = - new ValueStateDescriptor<>("sip-state", ObjectNode.class); - - final StateTtlConfig ttlConfig = StateTtlConfig - .newBuilder(fireInterval) - .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) - .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) - .useProcessingTime() - .cleanupFullSnapshot() - .build(); - descriptor.enableTimeToLive(ttlConfig); - valueState = getRuntimeContext().getState(descriptor); - } - - @Override - public void processElement(ObjectNode value, - KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.Context ctx, - Collector<ObjectNode> out) throws Exception { - - final Record record = new Record(value); - // When SIP is a one-way stream. - if (StreamDir.DOUBLE != record.getStreamDir()) { - // If the address is already stored in the mapState and has the opposite stream direction, - // merge the SIP records, change the stream direction to DOUBLE, and output the merged record. - final ObjectNode obj = valueState.value(); - if (null != obj && new Record(obj).getStreamDir() != record.getStreamDir()) { - record.merge(obj) - .setInt(Record.F_COMMON_STREAM_DIR, StreamDir.DOUBLE.getValue()); - out.collect(value); - valueState.clear(); - } else { - // If the address is not yet in the valueState. - valueState.update(value); - } - } else { - // If SIP is a double stream, pairing isn't required, directly output the record. - out.collect(value); - } - registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds()); - } - - @Override - public void onTimer(long timestamp, - KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx, - Collector<ObjectNode> out) throws Exception { - final ObjectNode value = valueState.value(); - if (value != null) { - ctx.output(SIP_OUTPUT_TAG, value); - } - valueState.clear(); - } -} diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/TypeSplitFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/TypeSplitFunction.java deleted file mode 100644 index e8fb99e..0000000 --- a/src/main/java/com/zdjizhi/flink/voip/functions/TypeSplitFunction.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.zdjizhi.flink.voip.functions; - -import com.zdjizhi.flink.voip.records.Record; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.util.Collector; -import org.apache.flink.util.OutputTag; - -/** - * A ProcessFunction that splits ObjectNode records based on their 'schemaType' field. - * It outputs SIP records to the 'sipSchemaTypeOutputTag' and RTP records to the 'rtpSchemaTypeOutputTag'. - * - * @author chaoc - * @since 1.0 - */ -public class TypeSplitFunction extends ProcessFunction<ObjectNode, ObjectNode> { - - /** - * OutputTag for SIP records. - */ - public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG = - new OutputTag<>("schema-type-sip", TypeInformation.of(ObjectNode.class)); - /** - * OutputTag for RTP records. - */ - public static final OutputTag<ObjectNode> RTP_OUTPUT_TAG = - new OutputTag<>("schema-type-rtp", TypeInformation.of(ObjectNode.class)); - - @Override - public void processElement(ObjectNode obj, - ProcessFunction<ObjectNode, ObjectNode>.Context ctx, - Collector<ObjectNode> out) throws Exception { - final Record record = new Record(obj); - switch (record.getSchemaType()) { - case RTP: - ctx.output(RTP_OUTPUT_TAG, obj); - break; - case SIP: - ctx.output(SIP_OUTPUT_TAG, obj); - break; - default: - } - } -} diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/VSysIDKeySelector.java b/src/main/java/com/zdjizhi/flink/voip/functions/VSysIDKeySelector.java deleted file mode 100644 index ab806c7..0000000 --- a/src/main/java/com/zdjizhi/flink/voip/functions/VSysIDKeySelector.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.zdjizhi.flink.voip.functions; - -import com.zdjizhi.flink.voip.records.Record; -import com.zdjizhi.flink.voip.records.SIPRecord; -import com.zdjizhi.flink.voip.records.SchemaType; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; - -/** - * A KeySelector implementation that extracts the key(VSysID) from an ObjectNode. - * - * @author chaoc - * @since 1.0 - */ -public class VSysIDKeySelector implements KeySelector<ObjectNode, Tuple2<Integer, Address>> { - - /** - * Extracts the composite key (VSysID, Address) from the given ObjectNode. - * - * @param obj The ObjectNode representing a SIP record. - * @return A Tuple2 containing the extracted key (VSysID, Address). - * @throws Exception Thrown if an error occurs during key extraction. - */ - @Override - public Tuple2<Integer, Address> getKey(ObjectNode obj) throws Exception { - final Record record = new Record(obj); - final Address address; - if (record.getSchemaType() == SchemaType.SIP) { - final SIPRecord sipRecord = new SIPRecord(obj); - address = Address.of( - Tuple2.of(sipRecord.getOriginatorSdpConnectIp(), sipRecord.getOriginatorSdpMediaPort()), - Tuple2.of(sipRecord.getResponderSdpConnectIp(), sipRecord.getResponderSdpMediaPort())); - } else { - address = Address.of(Tuple2.of(record.getServerIp(), record.getServerPort()), - Tuple2.of(record.getClientIp(), record.getClientPort())); - } - return Tuple2.of(record.getVSysID(), address); - } -} diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java deleted file mode 100644 index c8e32b7..0000000 --- a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java +++ /dev/null @@ -1,180 +0,0 @@ -package com.zdjizhi.flink.voip.functions; - -import com.zdjizhi.flink.voip.conf.FusionConfigs; -import com.zdjizhi.flink.voip.records.*; -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.state.*; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; -import org.apache.flink.util.Collector; - -import java.util.Iterator; -import java.util.Map; - -/** - * The `VoIPFusionFunction` is a key-based co-process function that performs VoIP fusion logic - * for SIP and RTP records. It combines SIP and RTP records belonging to the same session - * and emits fused VoIP records. The function utilizes keyed state to store and manage SIP and - * RTP records, and it uses timers to trigger regular clearing of the state. - * - * @author chaoc - * @since 1.0 - */ -public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode> - implements FunctionHelper { - - private static final int MAX_RTP_LINES = 2; - private transient Time fireInterval; - private transient ValueState<ObjectNodeInfo> sipState; - private transient MapState<StreamDir, ObjectNode> rtpState; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - final int minutes = getGlobalConfiguration().get(FusionConfigs.RTP_STATE_CLEAR_INTERVAL); - fireInterval = Time.minutes(minutes); - final RuntimeContext context = getRuntimeContext(); - final ValueStateDescriptor<ObjectNodeInfo> sipDescriptor = - new ValueStateDescriptor<>("sip-state", ObjectNodeInfo.class); - - final StateTtlConfig ttlConfig = StateTtlConfig - .newBuilder(fireInterval) - .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) - .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) - .useProcessingTime() - .cleanupFullSnapshot() - .build(); - sipDescriptor.enableTimeToLive(ttlConfig); - sipState = context.getState(sipDescriptor); - - final MapStateDescriptor<StreamDir, ObjectNode> rtpDescriptor = - new MapStateDescriptor<>("rtp-state", StreamDir.class, ObjectNode.class); - - final StateTtlConfig rtpTtlConfig = StateTtlConfig - .newBuilder(Time.minutes(minutes + 1)) - .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) - .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) - .useProcessingTime() - .cleanupFullSnapshot() - .build(); - rtpDescriptor.enableTimeToLive(rtpTtlConfig); - rtpState = context.getMapState(rtpDescriptor); - } - - // SIP - @Override - public void processElement2(ObjectNode sipObj, - KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.Context ctx, - Collector<ObjectNode> out) throws Exception { - final Iterator<Map.Entry<StreamDir, ObjectNode>> iterator = rtpState.iterator(); - if (rtpState.isEmpty()) { - sipState.update(new ObjectNodeInfo(sipObj, 0)); - } - while (iterator.hasNext()) { - final Map.Entry<StreamDir, ObjectNode> entry = iterator.next(); - final ObjectNode rtpObj = entry.getValue(); - final Record rtpRecord = new Record(rtpObj); - - completeOriginatorField(rtpRecord, new SIPRecord(sipObj)); - - rtpRecord.merge(sipObj) - .setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue()); - out.collect(rtpObj); - iterator.remove(); - - switch (entry.getKey()) { - case S2C: - case C2S: - ObjectNodeInfo info = sipState.value(); - if (info != null) { - info.incTimes(); - if (info.getTimes() >= MAX_RTP_LINES) { - sipState.clear(); - } else { - sipState.update(new ObjectNodeInfo(sipObj, info.getTimes())); - } - } else { - sipState.update(new ObjectNodeInfo(sipObj, 1)); - } - break; - default: - // Double directional: - // In the context of VoIP fusion, only one RTP double directional stream - sipState.clear(); - } - } - - registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds()); - } - - // RTP - @Override - public void processElement1(ObjectNode rtpObj, - KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.Context ctx, - Collector<ObjectNode> out) throws Exception { - final Record rtpRecord = new Record(rtpObj); - final ObjectNodeInfo info = sipState.value(); - - final StreamDir streamDir = rtpRecord.getStreamDir(); - if (null != info) { - - completeOriginatorField(rtpRecord, new SIPRecord(info.getObj())); - - rtpRecord.merge(info.getObj()) - .setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue()); - out.collect(rtpObj); - - switch (streamDir) { - case C2S: - case S2C: - info.incTimes(); - if (info.getTimes() >= MAX_RTP_LINES) { - sipState.clear(); - } - break; - default: - // Double - sipState.clear(); - } - - } else { - rtpState.put(streamDir, rtpObj); - } - - registerNextFireTimestamp(ctx.timerService(), fireInterval.toMilliseconds()); - } - - @Override - public void onTimer(long timestamp, - KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx, - Collector<ObjectNode> out) throws Exception { - for (ObjectNode obj : rtpState.values()) { - final Record rtpRecord = new Record(obj); - rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode()); - out.collect(obj); - } - rtpState.clear(); - sipState.clear(); - } - - // ====================================================================== - // PRIVATE HELPER - // ====================================================================== - - private void completeOriginatorField(final Record rtpRecord, final SIPRecord sipRecord) { - if (StringUtils.isNoneBlank(rtpRecord.getClientIp())) { - if (StringUtils.equals(sipRecord.getOriginatorSdpConnectIp(), rtpRecord.getClientIp())) { - rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.C2S.getCode()); - return; - } else if (StringUtils.equals(sipRecord.getResponderSdpConnectIp(), rtpRecord.getClientIp())) { - rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.S2C.getCode()); - return; - } - } - rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode()); - } -}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java b/src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java deleted file mode 100644 index c8df7db..0000000 --- a/src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.zdjizhi.flink.voip.records; - -import lombok.Getter; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; - -public class RTPRecord extends Record { - - public static final String F_ORIGINATOR_DIR = "rtp_originator_dir"; - - public RTPRecord(ObjectNode obj) { - super(obj); - } - - @Getter - public enum OriginatorDir { - - UNKNOWN(0), - C2S(1), - S2C(2); - - private final int code; - - OriginatorDir(int code) { - this.code = code; - } - } -} diff --git a/src/main/java/com/zdjizhi/flink/voip/records/Record.java b/src/main/java/com/zdjizhi/flink/voip/records/Record.java deleted file mode 100644 index 46052f3..0000000 --- a/src/main/java/com/zdjizhi/flink/voip/records/Record.java +++ /dev/null @@ -1,224 +0,0 @@ -package com.zdjizhi.flink.voip.records; - -import lombok.AllArgsConstructor; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.IntNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode; - -/** - * Record class represents a data record with various fields. - * <p> - * It provides getter and setter methods for accessing the fields of the data record. - * - * @author chaoc - * @since 1.0 - */ -@AllArgsConstructor -public class Record { - - /** - * 字段名:数据记录中的所属 vsys - */ - public static final String F_COMMON_VSYS_ID = "vsys_id"; - /** - * 字段名:数据记录中的字段类型 - */ - public static final String F_COMMON_SCHEMA_TYPE = "decoded_as"; - /** - * 字段名:数据记录中的流类型 - */ - public static final String F_COMMON_STREAM_DIR = "common_stream_dir"; - /** - * 字段名:数据记录中的流类型的 Flags - */ - public static final String F_FLAGS = "flags"; - /** - * 字段名:数据记录中的服务端地址 - */ - public static final String F_COMMON_SERVER_IP = "server_ip"; - /** - * 字段名:数据记录中的服务端端口 - */ - public static final String F_COMMON_SERVER_PORT = "server_port"; - /** - * 字段名:数据记录中的客户端地址 - */ - public static final String F_COMMON_CLIENT_IP = "client_ip"; - /** - * 字段名:数据记录中的客户端端口 - */ - public static final String F_COMMON_CLIENT_PORT = "client_port"; - - /** - * ObjectNode data. - */ - protected final ObjectNode obj; - - /** - * Get the VSys ID from the data record. - * - * @return The VSys ID as an integer. - */ - public int getVSysID() { - int v = Record.getInt(obj, F_COMMON_VSYS_ID); - return v == 0 ? 1 : v; - } - - /** - * Get the schema type from the data record. - * - * @return The schema type. - */ - public final SchemaType getSchemaType() { - return SchemaType.of(Record.getString(obj, F_COMMON_SCHEMA_TYPE)); - } - - /** - * Get the stream direction from the data record. - * - * @return The stream direction. - */ - public final StreamDir getStreamDir() { - return StreamDir.ofFlags(Record.getLong(obj, F_FLAGS)); - } - - /** - * Get the server IP address from the data record. - * - * @return The server IP address as a string. - */ - public final String getServerIp() { - return Record.getString(obj, F_COMMON_SERVER_IP); - } - - /** - * Get the server port from the data record. - * - * @return The server port as an integer. - */ - public final int getServerPort() { - return Record.getInt(obj, F_COMMON_SERVER_PORT); - } - - /** - * Get the client IP address from the data record. - * - * @return The client IP address as a string. - */ - public final String getClientIp() { - return Record.getString(obj, F_COMMON_CLIENT_IP); - } - - /** - * Get the client port from the data record. - * - * @return The client port as an integer. - */ - public final int getClientPort() { - return Record.getInt(obj, F_COMMON_CLIENT_PORT); - } - - /** - * Set an integer value to the specified field in the data record. - * - * @param name The name of the field. - * @param value The integer value to set. - */ - public final void setInt(final String name, final int value) { - obj.set(name, IntNode.valueOf(value)); - } - - /** - * Set a string value to the specified field in the data record. - * - * @param name The name of the field. - * @param value The string value to set. - */ - public final void setString(final String name, final String value) { - obj.set(name, TextNode.valueOf(value)); - } - - /** - * Merge the fields of another ObjectNode into the current data record. - * - * @param other The ObjectNode containing the fields to be merged. - * @return This record. - */ - public final Record merge(final ObjectNode other) { - other.fields().forEachRemaining(entry -> obj.set(entry.getKey(), entry.getValue())); - return this; - } - - /** - * Get an integer value from the specified field in the ObjectNode. - * - * @param obj The ObjectNode to get the value from. - * @param field The name of the field. - * @param defaultValue The default value to return if the field is not found or is not an integer. - * @return The integer value from the field or the default value if the field is not found or is not an integer. - */ - public static int getInt(final ObjectNode obj, final String field, final int defaultValue) { - final JsonNode node = obj.get(field); - return node != null && node.isInt() ? node.asInt(defaultValue) : defaultValue; - } - - /** - * Get an integer value from the specified field in the ObjectNode. - * - * @param obj The ObjectNode to get the value from. - * @param field The name of the field. - * @return The integer value from the field or 0 if the field is not found or is not an integer. - */ - public static int getInt(final ObjectNode obj, final String field) { - return getInt(obj, field, 0); - } - - /** - * Gets a long value from the specified field in the ObjectNode. - * - * @param obj The ObjectNode to get the value from. - * @param field The name of the field. - * @param defaultValue The default value to return if the field is not found or is not a long. - * @return The long value from the field or the default value if the field is not found or is not a long. - */ - public static long getLong(final ObjectNode obj, final String field, final long defaultValue) { - final JsonNode node = obj.get(field); - return node != null && node.isNumber() ? node.asLong() : defaultValue; - } - - /** - * Gets a long value from the specified field in the ObjectNode. - * - * @param obj The ObjectNode to get the value from. - * @param field The name of the field. - * @return The long value from the field or 0 if the field is not found or is not a long. - */ - private static long getLong(final ObjectNode obj, final String field) { - return getLong(obj, field, 0L); - } - - /** - * Get a string value from the specified field in the ObjectNode. - * - * @param obj The ObjectNode to get the value from. - * @param field The name of the field. - * @param defaultValue The default value to return if the field is not found or is not a string. - * @return The string value from the field or the default value if the field is not found or is not a string. - */ - public static String getString(final ObjectNode obj, final String field, final String defaultValue) { - final JsonNode node = obj.get(field); - return node != null && node.isTextual() ? node.asText(defaultValue) : defaultValue; - } - - /** - * Get a string value from the specified field in the ObjectNode. - * - * @param obj The ObjectNode to get the value from. - * @param field The name of the field. - * @return The string value from the field or null if the field is not found or is not a string. - */ - public static String getString(final ObjectNode obj, final String field) { - return getString(obj, field, null); - } -}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/flink/voip/records/SIPRecord.java b/src/main/java/com/zdjizhi/flink/voip/records/SIPRecord.java deleted file mode 100644 index 1ece4a5..0000000 --- a/src/main/java/com/zdjizhi/flink/voip/records/SIPRecord.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.zdjizhi.flink.voip.records; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; - -/** - * SIP(Session Initiation Protocol)data record class, used to parse and access SIP data records. - * - * @author chaoc - * @since 1.0 - */ -public class SIPRecord extends Record { - - /** - * Field Name: SIP 通话的会话 ID - */ - public static final String F_CALL_ID = "sip_call_id"; - /** - * Field Name: SIP 通话的协调的主叫语音传输 IP - */ - public static final String F_ORIGINATOR_SDP_CONNECT_IP = "sip_originator_sdp_connect_ip"; - /** - * Field Name: SIP 通话的协调的主叫语音传输端口 - */ - public static final String F_ORIGINATOR_SDP_MEDIA_PORT = "sip_originator_sdp_media_port"; - /** - * Field Name: SIP 通话的协调的被叫语音传输 IP - */ - public static final String F_RESPONDER_SDP_CONNECT_IP = "sip_responder_sdp_connect_ip"; - /** - * Field Name: SIP 通话的协调的被叫语音传输端口 - */ - public static final String F_RESPONDER_SDP_MEDIA_PORT = "sip_responder_sdp_media_port"; - - public SIPRecord(final ObjectNode obj) { - super(obj); - } - - public String getCallID() { - return Record.getString(obj, F_CALL_ID); - } - - public String getOriginatorSdpConnectIp() { - return Record.getString(obj, F_ORIGINATOR_SDP_CONNECT_IP); - } - - public int getOriginatorSdpMediaPort() { - return Record.getInt(obj, F_ORIGINATOR_SDP_MEDIA_PORT); - } - - public String getResponderSdpConnectIp() { - return Record.getString(obj, F_RESPONDER_SDP_CONNECT_IP); - } - - public int getResponderSdpMediaPort() { - return Record.getInt(obj, F_RESPONDER_SDP_MEDIA_PORT); - } -} diff --git a/src/main/java/com/zdjizhi/flink/voip/records/SchemaType.java b/src/main/java/com/zdjizhi/flink/voip/records/SchemaType.java deleted file mode 100644 index 40c4a91..0000000 --- a/src/main/java/com/zdjizhi/flink/voip/records/SchemaType.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.zdjizhi.flink.voip.records; - -import lombok.AllArgsConstructor; -import lombok.Getter; - -/** - * The SchemaType enum represents different types of data schemas. - * - * @author chaoc - * @since 1.0 - */ -@AllArgsConstructor -@Getter -public enum SchemaType { - - /** - * Represents the SIP schema type. - */ - SIP("SIP"), - - /** - * Represents the RTP schema type. - */ - RTP("RTP"), - - /** - * Represents the VoIP schema type. - */ - VOIP("VoIP"); - - /** - * The string value of the SchemaType. - */ - private final String value; - - /** - * Get the SchemaType enum based on the provided string value. - * - * @param value The string value of the SchemaType to retrieve. - * @return The corresponding SchemaType enum. - * @throws IllegalArgumentException if the provided value does not match any known SchemaType. - */ - public static SchemaType of(final String value) { - for (SchemaType schemaType : values()) { - if (schemaType.value.equalsIgnoreCase(value)) { - return schemaType; - } - } - throw new IllegalArgumentException("Unknown SchemaType value '" + value + "'."); - } -}
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java b/src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java deleted file mode 100644 index 3732058..0000000 --- a/src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java +++ /dev/null @@ -1,69 +0,0 @@ -package com.zdjizhi.flink.voip.records; - -import lombok.AllArgsConstructor; -import lombok.Getter; - -/** - * The StreamDir enum represents different types of data stream directions. - * - * @author chaoc - * @since 1.0 - */ -@AllArgsConstructor -@Getter -public enum StreamDir { - - /** - * Represents the Client-to-Server (C2S) stream direction. - */ - C2S(1), - - /** - * Represents the Server-to-Client (S2C) stream direction. - */ - S2C(2), - - /** - * Represents the bidirectional (double) stream direction. - */ - DOUBLE(3); - - /** - * The integer value of the StreamDir. - */ - private final int value; - - /** - * Get the StreamDir enum based on the provided integer value. - * - * @param value The integer value of the StreamDir to retrieve. - * @return The corresponding StreamDir enum. - * @throws IllegalArgumentException if the provided value does not match any known StreamDir. - */ - public static StreamDir of(int value) { - for (StreamDir streamDir : values()) { - if (value == streamDir.value) { - return streamDir; - } - } - throw new IllegalArgumentException("Unknown StreamDir value '" + value + "'."); - } - - /** - * Get the StreamDir enum based on the provided flags value. - * - * @param flags The flags. - * @return The corresponding StreamDir enum. - * @throws IllegalArgumentException if the provided value does not match any known StreamDir. - */ - public static StreamDir ofFlags(long flags) { - int v = 0; - if ((flags & 8192) == 8192) { - v += 1; - } - if ((flags & 16384) == 16384) { - v += 2; - } - return of(v); - } -} diff --git a/src/main/resources/META-INF/services/com.geedgenetworks.flink.easy.common.api.UDFFactory b/src/main/resources/META-INF/services/com.geedgenetworks.flink.easy.common.api.UDFFactory new file mode 100644 index 0000000..fdaf9e5 --- /dev/null +++ b/src/main/resources/META-INF/services/com.geedgenetworks.flink.easy.common.api.UDFFactory @@ -0,0 +1 @@ +com.geedgenetworks.flink.easy.application.voip.VoipUDFFactory
\ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties deleted file mode 100644 index 7cc9d25..0000000 --- a/src/main/resources/application.properties +++ /dev/null @@ -1,7 +0,0 @@ -sink.kafka.topic=VOIP-CONVERSATION-RECORD -sink.kafka.props.bootstrap.servers=localhost:9292 - - -source.kafka.topic=VOIP-RECORD -source.kafka.props.bootstrap.servers=localhost:9292 -source.kafka.props.group.id=flink-voip-fusion
\ No newline at end of file diff --git a/src/main/resources/jobs/job.yml b/src/main/resources/jobs/job.yml new file mode 100644 index 0000000..3f8b7d3 --- /dev/null +++ b/src/main/resources/jobs/job.yml @@ -0,0 +1,1795 @@ +job: + name: correlation_sip_rtp_session + parallelism: 1 + active-pipeline: + - only-voip-records + - fusion-fail-records + - all-errors-records + +source: + - name: session-records + type: kafka + option: + topic: VOIP-RECORD + properties: + bootstrap.servers: 192.168.44.12:9094 + group.id: sip-rtp-correlation + security.protocol: SASL_PLAINTEXT + sasl.mechanism: PLAIN + sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; + format: json + schema: + ## General + - name: recv_time + data-type: BIGINT NOT NULL + - name: log_id + data-type: BIGINT NOT NULL + - name: decoded_as + data-type: STRING NOT NULL + - name: session_id + data-type: BIGINT NOT NULL + - name: start_timestamp_ms + data-type: BIGINT NOT NULL + # row-time: + - name: start_timestamp + for: TO_TIMESTAMP_LTZ(start_timestamp_ms, 3) + watermark: start_timestamp - INTERVAL '5' MINUTE + - name: end_timestamp_ms + data-type: BIGINT + - name: duration_ms + data-type: INT + - name: tcp_handshake_latency_ms + data-type: INT + - name: ingestion_time + data-type: BIGINT + - name: processing_time + data-type: BIGINT + - name: insert_time + data-type: BIGINT + - name: device_id + data-type: STRING + - name: out_link_id + data-type: INT + - name: in_link_id + data-type: INT + - name: device_tag + data-type: STRING + - name: data_center + data-type: STRING + - name: device_group + data-type: STRING + - name: sled_ip + data-type: STRING + - name: address_type + data-type: INT + - name: direction + data-type: STRING + - name: vsys_id + data-type: INT + - name: t_vsys_id + data-type: INT + - name: flags + data-type: BIGINT + - name: flags_identify_info + data-type: STRING + - name: c2s_ttl + data-type: INT + - name: s2c_ttl + data-type: INT + ## Treatment + - name: security_rule_list + data-type: ARRAY<BIGINT> + - name: security_action + data-type: STRING + - name: monitor_rule_list + data-type: ARRAY<BIGINT> + - name: shaping_rule_list + data-type: ARRAY<BIGINT> + - name: proxy_rule_list + data-type: ARRAY<BIGINT> + - name: statistics_rule_list + data-type: ARRAY<BIGINT> + - name: sc_rule_list + data-type: ARRAY<BIGINT> + - name: sc_rsp_raw + data-type: ARRAY<BIGINT> + - name: sc_rsp_decrypted + data-type: ARRAY<BIGINT> + - name: proxy_action + data-type: STRING + - name: proxy_pinning_status + data-type: INT + - name: proxy_intercept_status + data-type: INT + - name: proxy_passthrough_reason + data-type: STRING + - name: proxy_client_side_latency_ms + data-type: INT + - name: proxy_server_side_latency_ms + data-type: INT + - name: proxy_client_side_version + data-type: STRING + - name: proxy_server_side_version + data-type: STRING + - name: proxy_cert_verify + data-type: INT + - name: proxy_intercept_error + data-type: STRING + - name: monitor_mirrored_pkts + data-type: INT + - name: monitor_mirrored_bytes + data-type: INT + ## Source + - name: client_ip + data-type: STRING + - name: client_port + data-type: INT + - name: client_os_desc + data-type: STRING + - name: client_geolocation + data-type: STRING + - name: client_country + data-type: STRING + - name: client_super_administrative_area + data-type: STRING + - name: client_administrative_area + data-type: STRING + - name: client_sub_administrative_area + data-type: STRING + - name: client_asn + data-type: BIGINT + - name: subscriber_id + data-type: STRING + - name: imei + data-type: STRING + - name: imsi + data-type: STRING + - name: phone_number + data-type: STRING + - name: apn + data-type: STRING + ## Destination + - name: server_ip + data-type: STRING + - name: server_port + data-type: INT + - name: server_os_desc + data-type: STRING + - name: server_geolocation + data-type: STRING + - name: server_country + data-type: STRING + - name: server_super_administrative_area + data-type: STRING + - name: server_administrative_area + data-type: STRING + - name: server_sub_administrative_area + data-type: STRING + - name: server_asn + data-type: BIGINT + - name: server_fqdn + data-type: STRING + - name: server_domain + data-type: STRING + - name: fqdn_category_list + data-type: ARRAY<INT> + ## Application + - name: app_transition + data-type: STRING + - name: app + data-type: STRING + - name: app_category + data-type: STRING + - name: app_debug_info + data-type: STRING + - name: app_content + data-type: STRING + - name: app_extra_info + data-type: STRING + ## Protocol + - name: ip_protocol + data-type: STRING + - name: decoded_path + data-type: STRING + ## Transmission + - name: sent_pkts + data-type: BIGINT + - name: received_pkts + data-type: BIGINT + - name: sent_bytes + data-type: BIGINT + - name: received_bytes + data-type: BIGINT + - name: tcp_c2s_ip_fragments + data-type: BIGINT + - name: tcp_s2c_ip_fragments + data-type: BIGINT + - name: tcp_c2s_lost_bytes + data-type: BIGINT + - name: tcp_s2c_lost_bytes + data-type: BIGINT + - name: tcp_c2s_o3_pkts + data-type: BIGINT + - name: tcp_s2c_o3_pkts + data-type: BIGINT + - name: tcp_c2s_rtx_pkts + data-type: BIGINT + - name: tcp_s2c_rtx_pkts + data-type: BIGINT + - name: tcp_c2s_rtx_bytes + data-type: BIGINT + - name: tcp_s2c_rtx_bytes + data-type: BIGINT + - name: tcp_rtt_ms + data-type: INT + - name: tcp_client_isn + data-type: BIGINT + - name: tcp_server_isn + data-type: BIGINT + ## Other + - name: packet_capture_file + data-type: STRING + - name: in_src_mac + data-type: STRING + - name: out_src_mac + data-type: STRING + - name: in_dest_mac + data-type: STRING + - name: out_dest_mac + data-type: STRING + - name: encapsulation + data-type: STRING + - name: dup_traffic_flag + data-type: INT + - name: tunnel_id_list + data-type: ARRAY<BIGINT> + - name: tunnel_endpoint_a_desc + data-type: STRING + - name: tunnel_endpoint_b_desc + data-type: STRING + ## SIP + - name: sip_call_id + data-type: STRING + - name: sip_originator_description + data-type: STRING + - name: sip_responder_description + data-type: STRING + - name: sip_user_agent + data-type: STRING + - name: sip_server + data-type: STRING + - name: sip_originator_sdp_connect_ip + data-type: STRING + - name: sip_originator_sdp_media_port + data-type: INT + - name: sip_originator_sdp_media_type + data-type: STRING + - name: sip_originator_sdp_content + data-type: STRING + - name: sip_responder_sdp_connect_ip + data-type: STRING + - name: sip_responder_sdp_media_port + data-type: INT + - name: sip_responder_sdp_media_type + data-type: STRING + - name: sip_responder_sdp_content + data-type: STRING + - name: sip_duration_s + data-type: INT + - name: sip_bye + data-type: STRING + ## RTP + - name: rtp_payload_type_c2s + data-type: INT + - name: rtp_payload_type_s2c + data-type: INT + - name: rtp_pcap_path + data-type: STRING + - name: rtp_originator_dir + data-type: INT + +sink: + # 错误的 SIP 和 RTP + - name: all-errors-records + type: kafka + on: errors-records + option: + topic: VOIP-CONVERSATTON-RECORD + properties: + bootstrap.servers: 192.168.44.12:9094 + security.protocol: SASL_PLAINTEXT + sasl.mechanism: PLAIN + sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; + format: json + # 关联成功的 VOIP + - name: only-voip-records + on: voip-fusion.ok + type: kafka + option: + topic: VOIP-CONVERSATTON-RECORD + properties: + bootstrap.servers: 192.168.44.12:9094 + security.protocol: SASL_PLAINTEXT + sasl.mechanism: PLAIN + sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; + format: json + # 没有关联成功的 SIP 和 RTP + - name: fusion-fail-records + on: cannot-fusion-records + type: kafka + option: + topic: VOIP-CONVERSATTON-RECORD + properties: + bootstrap.servers: 192.168.44.12:9094 + security.protocol: SASL_PLAINTEXT + sasl.mechanism: PLAIN + sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019"; + format: json + +pipeline: + - name: split-for-valid + category: SPLIT + on: session-records + splits: + # Invalid ip or port + - name: error1-records + where: NOT(IS_IP_ADDRESS(client_ip)) || NOT(IS_IP_ADDRESS(server_ip)) || client_port <= 0 || server_port <= 0 + # Invalid stream dir + - name: error2-records + where: decoded_as == 'SIP' &&STREAM_DIR(flags) != 1 && STREAM_DIR(flags) != 2 && STREAM_DIR(flags) != 3 + # Invalid: SIP one-way stream and has invalid network address + - name: error3-records + where: decoded_as == 'SIP' && ( NOT(HAS_IP_ADDRESS(sip_originator_sdp_connect_ip, sip_responder_sdp_connect_ip)) || sip_originator_sdp_media_port <= 0 || sip_responder_sdp_media_port <= 0 ) + - name: error4-records + where: decoded_as == 'SIP' && STREAM_DIR(flags) == 3 && ( NOT( IS_IP_ADDRESS(sip_originator_sdp_connect_ip) ) || NOT( IS_IP_ADDRESS(sip_responder_sdp_connect_ip) ) ) + + ### Notes: If internal IP address correlate is needed, please uncomment the following two items + # # Invalid: SIP one-way stream and internal network address + # - name: internal-error1-records + # where: decoded_as == 'SIP' && NOT(HAS_EXTERNAL_IP_ADDRESS(sip_originator_sdp_connect_ip, sip_responder_sdp_connect_ip)) + # # Invalid: SIP double-way stream and internal network address + # - name: internal-error2-records + # where: decoded_as == 'SIP' && STREAM_DIR(flags) == 3 && ( NOT( IS_EXTERNAL_IP_ADDRESS(sip_originator_sdp_connect_ip) ) || NOT( IS_EXTERNAL_IP_ADDRESS(sip_responder_sdp_connect_ip) ) ) + - name: split-by-protocol + category: SPLIT + on: split-for-valid + splits: + - name: rtp-records + where: decoded_as == 'RTP' + - name: sip-records + where: decoded_as == 'SIP' + - name: sip-double-way-records + category: CORRELATE + cache: + - name: v1 + type: VALUE + ttl: 1 minute + schema: + ## General + - name: recv_time + data-type: BIGINT NOT NULL + - name: log_id + data-type: BIGINT NOT NULL + - name: decoded_as + data-type: STRING NOT NULL + - name: session_id + data-type: BIGINT NOT NULL + - name: start_timestamp_ms + data-type: BIGINT NOT NULL + # row-time: + - name: start_timestamp + data-type: TIMESTAMP_LTZ(3) + - name: end_timestamp_ms + data-type: BIGINT + - name: duration_ms + data-type: INT + - name: tcp_handshake_latency_ms + data-type: INT + - name: ingestion_time + data-type: BIGINT + - name: processing_time + data-type: BIGINT + - name: insert_time + data-type: BIGINT + - name: device_id + data-type: STRING + - name: out_link_id + data-type: INT + - name: in_link_id + data-type: INT + - name: device_tag + data-type: STRING + - name: data_center + data-type: STRING + - name: device_group + data-type: STRING + - name: sled_ip + data-type: STRING + - name: address_type + data-type: INT + - name: direction + data-type: STRING + - name: vsys_id + data-type: INT + - name: t_vsys_id + data-type: INT + - name: flags + data-type: BIGINT + - name: flags_identify_info + data-type: STRING + - name: c2s_ttl + data-type: INT + - name: s2c_ttl + data-type: INT + ## Treatment + - name: security_rule_list + data-type: ARRAY<BIGINT> + - name: security_action + data-type: STRING + - name: monitor_rule_list + data-type: ARRAY<BIGINT> + - name: shaping_rule_list + data-type: ARRAY<BIGINT> + - name: proxy_rule_list + data-type: ARRAY<BIGINT> + - name: statistics_rule_list + data-type: ARRAY<BIGINT> + - name: sc_rule_list + data-type: ARRAY<BIGINT> + - name: sc_rsp_raw + data-type: ARRAY<BIGINT> + - name: sc_rsp_decrypted + data-type: ARRAY<BIGINT> + - name: proxy_action + data-type: STRING + - name: proxy_pinning_status + data-type: INT + - name: proxy_intercept_status + data-type: INT + - name: proxy_passthrough_reason + data-type: STRING + - name: proxy_client_side_latency_ms + data-type: INT + - name: proxy_server_side_latency_ms + data-type: INT + - name: proxy_client_side_version + data-type: STRING + - name: proxy_server_side_version + data-type: STRING + - name: proxy_cert_verify + data-type: INT + - name: proxy_intercept_error + data-type: STRING + - name: monitor_mirrored_pkts + data-type: INT + - name: monitor_mirrored_bytes + data-type: INT + ## Source + - name: client_ip + data-type: STRING + - name: client_port + data-type: INT + - name: client_os_desc + data-type: STRING + - name: client_geolocation + data-type: STRING + - name: client_country + data-type: STRING + - name: client_super_administrative_area + data-type: STRING + - name: client_administrative_area + data-type: STRING + - name: client_sub_administrative_area + data-type: STRING + - name: client_asn + data-type: BIGINT + - name: subscriber_id + data-type: STRING + - name: imei + data-type: STRING + - name: imsi + data-type: STRING + - name: phone_number + data-type: STRING + - name: apn + data-type: STRING + ## Destination + - name: server_ip + data-type: STRING + - name: server_port + data-type: INT + - name: server_os_desc + data-type: STRING + - name: server_geolocation + data-type: STRING + - name: server_country + data-type: STRING + - name: server_super_administrative_area + data-type: STRING + - name: server_administrative_area + data-type: STRING + - name: server_sub_administrative_area + data-type: STRING + - name: server_asn + data-type: BIGINT + - name: server_fqdn + data-type: STRING + - name: server_domain + data-type: STRING + - name: fqdn_category_list + data-type: ARRAY<INT> + ## Application + - name: app_transition + data-type: STRING + - name: app + data-type: STRING + - name: app_category + data-type: STRING + - name: app_debug_info + data-type: STRING + - name: app_content + data-type: STRING + - name: app_extra_info + data-type: STRING + ## Protocol + - name: ip_protocol + data-type: STRING + - name: decoded_path + data-type: STRING + ## Transmission + - name: sent_pkts + data-type: BIGINT + - name: received_pkts + data-type: BIGINT + - name: sent_bytes + data-type: BIGINT + - name: received_bytes + data-type: BIGINT + - name: tcp_c2s_ip_fragments + data-type: BIGINT + - name: tcp_s2c_ip_fragments + data-type: BIGINT + - name: tcp_c2s_lost_bytes + data-type: BIGINT + - name: tcp_s2c_lost_bytes + data-type: BIGINT + - name: tcp_c2s_o3_pkts + data-type: BIGINT + - name: tcp_s2c_o3_pkts + data-type: BIGINT + - name: tcp_c2s_rtx_pkts + data-type: BIGINT + - name: tcp_s2c_rtx_pkts + data-type: BIGINT + - name: tcp_c2s_rtx_bytes + data-type: BIGINT + - name: tcp_s2c_rtx_bytes + data-type: BIGINT + - name: tcp_rtt_ms + data-type: INT + - name: tcp_client_isn + data-type: BIGINT + - name: tcp_server_isn + data-type: BIGINT + ## Other + - name: packet_capture_file + data-type: STRING + - name: in_src_mac + data-type: STRING + - name: out_src_mac + data-type: STRING + - name: in_dest_mac + data-type: STRING + - name: out_dest_mac + data-type: STRING + - name: encapsulation + data-type: STRING + - name: dup_traffic_flag + data-type: INT + - name: tunnel_id_list + data-type: ARRAY<BIGINT> + - name: tunnel_endpoint_a_desc + data-type: STRING + - name: tunnel_endpoint_b_desc + data-type: STRING + ## SIP + - name: sip_call_id + data-type: STRING + - name: sip_originator_description + data-type: STRING + - name: sip_responder_description + data-type: STRING + - name: sip_user_agent + data-type: STRING + - name: sip_server + data-type: STRING + - name: sip_originator_sdp_connect_ip + data-type: STRING + - name: sip_originator_sdp_media_port + data-type: INT + - name: sip_originator_sdp_media_type + data-type: STRING + - name: sip_originator_sdp_content + data-type: STRING + - name: sip_responder_sdp_connect_ip + data-type: STRING + - name: sip_responder_sdp_media_port + data-type: INT + - name: sip_responder_sdp_media_type + data-type: STRING + - name: sip_responder_sdp_content + data-type: STRING + - name: sip_duration_s + data-type: INT + - name: sip_bye + data-type: STRING + ## RTP + - name: rtp_payload_type_c2s + data-type: INT + - name: rtp_payload_type_s2c + data-type: INT + - name: rtp_pcap_path + data-type: STRING + - name: rtp_originator_dir + data-type: INT + where: + - on: sip-records + key-by: vsys_id, sip_call_id, SORT_ADDRESS( client_ip, client_port, server_ip, server_port ) + process: + - if: STREAM_DIR(flags) != 3 && @v1.isNotNull && STREAM_DIR(@v1.$flags) != STREAM_DIR(flags) + then: + - |- + OUTPUT ok FROM withColumns(recv_time to sip_call_id), + FIND_NOT_BLANK(@v1.$sip_originator_description, sip_originator_description) AS sip_originator_description, + FIND_NOT_BLANK(@v1.$sip_responder_description, sip_responder_description) AS sip_responder_description, + FIND_NOT_BLANK(@v1.$sip_user_agent, sip_user_agent) AS sip_user_agent, + FIND_NOT_BLANK(@v1.$sip_server, sip_server) AS sip_server, + FIND_NOT_BLANK(@v1.$sip_originator_sdp_connect_ip, sip_originator_sdp_connect_ip) AS sip_originator_sdp_connect_ip, + (@v1.$sip_originator_sdp_media_port > 0).?(@v1.$sip_originator_sdp_media_port, sip_originator_sdp_media_port) AS sip_originator_sdp_media_port, + FIND_NOT_BLANK(@v1.$sip_originator_sdp_media_type, sip_originator_sdp_media_type) AS sip_originator_sdp_media_type, + FIND_NOT_BLANK(@v1.$sip_originator_sdp_content, sip_originator_sdp_content) AS sip_originator_sdp_content, + FIND_NOT_BLANK(@v1.$sip_responder_sdp_connect_ip, sip_responder_sdp_connect_ip) AS sip_responder_sdp_connect_ip, + (@v1.$sip_responder_sdp_media_port > 0).?(@v1.$sip_responder_sdp_media_port, sip_responder_sdp_media_port) AS sip_responder_sdp_media_port, + FIND_NOT_BLANK(@v1.$sip_responder_sdp_media_type, sip_responder_sdp_media_type) AS sip_responder_sdp_media_type, + FIND_NOT_BLANK(@v1.$sip_responder_sdp_content, sip_responder_sdp_content) AS sip_responder_sdp_content, + @v1.$sip_duration_s + sip_duration_s AS sip_duration_s, + FIND_NOT_BLANK(@v1.$sip_bye, sip_bye) AS sip_bye, + rtp_payload_type_c2s, + rtp_payload_type_s2c, + rtp_pcap_path, + rtp_originator_dir + - TRUNCATE v1 + - if: STREAM_DIR(flags) != 3 && @v1.isNull + then: + - |- + SET v1 FROM withColumns(recv_time to rtp_originator_dir) + - if: STREAM_DIR(flags) == 3 + then: + - |- + OUTPUT ok FROM withColumns(recv_time to rtp_originator_dir) + - SCHEDULING USING EVENT TIME FOR NOW + 60 * 1000 + schedule: + - if: '@v1.isNotNull' + then: + - |- + OUTPUT fail FROM @v1.$recv_time AS recv_time, + @v1.$log_id AS log_id, + @v1.$decoded_as AS decoded_as, + @v1.$session_id AS session_id, + @v1.$start_timestamp_ms AS start_timestamp_ms, + @v1.$start_timestamp AS start_timestamp, + @v1.$end_timestamp_ms AS end_timestamp_ms, + @v1.$duration_ms AS duration_ms, + @v1.$tcp_handshake_latency_ms AS tcp_handshake_latency_ms, + @v1.$ingestion_time AS ingestion_time, + @v1.$processing_time AS processing_time, + @v1.$insert_time AS insert_time, + @v1.$device_id AS device_id, + @v1.$out_link_id AS out_link_id, + @v1.$in_link_id AS in_link_id, + @v1.$device_tag AS device_tag, + @v1.$data_center AS data_center, + @v1.$device_group AS device_group, + @v1.$sled_ip AS sled_ip, + @v1.$address_type AS address_type, + @v1.$direction AS direction, + @v1.$vsys_id AS vsys_id, + @v1.$t_vsys_id AS t_vsys_id, + @v1.$flags AS flags, + @v1.$flags_identify_info AS flags_identify_info, + @v1.$c2s_ttl AS c2s_ttl, + @v1.$s2c_ttl AS s2c_ttl, + @v1.$security_rule_list AS security_rule_list, + @v1.$security_action AS security_action, + @v1.$monitor_rule_list AS monitor_rule_list, + @v1.$shaping_rule_list AS shaping_rule_list, + @v1.$proxy_rule_list AS proxy_rule_list, + @v1.$statistics_rule_list AS statistics_rule_list, + @v1.$sc_rule_list AS sc_rule_list, + @v1.$sc_rsp_raw AS sc_rsp_raw, + @v1.$sc_rsp_decrypted AS sc_rsp_decrypted, + @v1.$proxy_action AS proxy_action, + @v1.$proxy_pinning_status AS proxy_pinning_status, + @v1.$proxy_intercept_status AS proxy_intercept_status, + @v1.$proxy_passthrough_reason AS proxy_passthrough_reason, + @v1.$proxy_client_side_latency_ms AS proxy_client_side_latency_ms, + @v1.$proxy_server_side_latency_ms AS proxy_server_side_latency_ms, + @v1.$proxy_client_side_version AS proxy_client_side_version, + @v1.$proxy_server_side_version AS proxy_server_side_version, + @v1.$proxy_cert_verify AS proxy_cert_verify, + @v1.$proxy_intercept_error AS proxy_intercept_error, + @v1.$monitor_mirrored_pkts AS monitor_mirrored_pkts, + @v1.$monitor_mirrored_bytes AS monitor_mirrored_bytes, + @v1.$client_ip AS client_ip, + @v1.$client_port AS client_port, + @v1.$client_os_desc AS client_os_desc, + @v1.$client_geolocation AS client_geolocation, + @v1.$client_country AS client_country, + @v1.$client_super_administrative_area AS client_super_administrative_area, + @v1.$client_administrative_area AS client_administrative_area, + @v1.$client_sub_administrative_area AS client_sub_administrative_area, + @v1.$client_asn AS client_asn, + @v1.$subscriber_id AS subscriber_id, + @v1.$imei AS imei, + @v1.$imsi AS imsi, + @v1.$phone_number AS phone_number, + @v1.$apn AS apn, + @v1.$server_ip AS server_ip, + @v1.$server_port AS server_port, + @v1.$server_os_desc AS server_os_desc, + @v1.$server_geolocation AS server_geolocation, + @v1.$server_country AS server_country, + @v1.$server_super_administrative_area AS server_super_administrative_area, + @v1.$server_administrative_area AS server_administrative_area, + @v1.$server_sub_administrative_area AS server_sub_administrative_area, + @v1.$server_asn AS server_asn, + @v1.$server_fqdn AS server_fqdn, + @v1.$server_domain AS server_domain, + @v1.$fqdn_category_list AS fqdn_category_list, + @v1.$app_transition AS app_transition, + @v1.$app AS app, + @v1.$app_category AS app_category, + @v1.$app_debug_info AS app_debug_info, + @v1.$app_content AS app_content, + @v1.$app_extra_info AS app_extra_info, + @v1.$ip_protocol AS ip_protocol, + @v1.$decoded_path AS decoded_path, + @v1.$sent_pkts AS sent_pkts, + @v1.$received_pkts AS received_pkts, + @v1.$sent_bytes AS sent_bytes, + @v1.$received_bytes AS received_bytes, + @v1.$tcp_c2s_ip_fragments AS tcp_c2s_ip_fragments, + @v1.$tcp_s2c_ip_fragments AS tcp_s2c_ip_fragments, + @v1.$tcp_c2s_lost_bytes AS tcp_c2s_lost_bytes, + @v1.$tcp_s2c_lost_bytes AS tcp_s2c_lost_bytes, + @v1.$tcp_c2s_o3_pkts AS tcp_c2s_o3_pkts, + @v1.$tcp_s2c_o3_pkts AS tcp_s2c_o3_pkts, + @v1.$tcp_c2s_rtx_pkts AS tcp_c2s_rtx_pkts, + @v1.$tcp_s2c_rtx_pkts AS tcp_s2c_rtx_pkts, + @v1.$tcp_c2s_rtx_bytes AS tcp_c2s_rtx_bytes, + @v1.$tcp_s2c_rtx_bytes AS tcp_s2c_rtx_bytes, + @v1.$tcp_rtt_ms AS tcp_rtt_ms, + @v1.$tcp_client_isn AS tcp_client_isn, + @v1.$tcp_server_isn AS tcp_server_isn, + @v1.$packet_capture_file AS packet_capture_file, + @v1.$in_src_mac AS in_src_mac, + @v1.$out_src_mac AS out_src_mac, + @v1.$in_dest_mac AS in_dest_mac, + @v1.$out_dest_mac AS out_dest_mac, + @v1.$encapsulation AS encapsulation, + @v1.$dup_traffic_flag AS dup_traffic_flag, + @v1.$tunnel_id_list AS tunnel_id_list, + @v1.$tunnel_endpoint_a_desc AS tunnel_endpoint_a_desc, + @v1.$tunnel_endpoint_b_desc AS tunnel_endpoint_b_desc, + @v1.$sip_call_id AS sip_call_id, + @v1.$sip_originator_description AS sip_originator_description, + @v1.$sip_responder_description AS sip_responder_description, + @v1.$sip_user_agent AS sip_user_agent, + @v1.$sip_server AS sip_server, + @v1.$sip_originator_sdp_connect_ip AS sip_originator_sdp_connect_ip, + @v1.$sip_originator_sdp_media_port AS sip_originator_sdp_media_port, + @v1.$sip_originator_sdp_media_type AS sip_originator_sdp_media_type, + @v1.$sip_originator_sdp_content AS sip_originator_sdp_content, + @v1.$sip_responder_sdp_connect_ip AS sip_responder_sdp_connect_ip, + @v1.$sip_responder_sdp_media_port AS sip_responder_sdp_media_port, + @v1.$sip_responder_sdp_media_type AS sip_responder_sdp_media_type, + @v1.$sip_responder_sdp_content AS sip_responder_sdp_content, + @v1.$sip_duration_s AS sip_duration_s, + @v1.$sip_bye AS sip_bye, + @v1.$rtp_payload_type_c2s AS rtp_payload_type_c2s, + @v1.$rtp_payload_type_s2c AS rtp_payload_type_s2c, + @v1.$rtp_pcap_path AS rtp_pcap_path, + @v1.$rtp_originator_dir AS rtp_originator_dir + - TRUNCATE v1 + - name: voip-fusion + category: CORRELATE + cache: + - name: sip + type: VALUE + ttl: 6 minute + schema: + ## General + - name: recv_time + data-type: BIGINT NOT NULL + - name: log_id + data-type: BIGINT NOT NULL + - name: decoded_as + data-type: STRING NOT NULL + - name: session_id + data-type: BIGINT NOT NULL + - name: start_timestamp_ms + data-type: BIGINT NOT NULL + # row-time: + - name: start_timestamp + data-type: TIMESTAMP_LTZ(3) + - name: end_timestamp_ms + data-type: BIGINT + - name: duration_ms + data-type: INT + - name: tcp_handshake_latency_ms + data-type: INT + - name: ingestion_time + data-type: BIGINT + - name: processing_time + data-type: BIGINT + - name: insert_time + data-type: BIGINT + - name: device_id + data-type: STRING + - name: out_link_id + data-type: INT + - name: in_link_id + data-type: INT + - name: device_tag + data-type: STRING + - name: data_center + data-type: STRING + - name: device_group + data-type: STRING + - name: sled_ip + data-type: STRING + - name: address_type + data-type: INT + - name: direction + data-type: STRING + - name: vsys_id + data-type: INT + - name: t_vsys_id + data-type: INT + - name: flags + data-type: BIGINT + - name: flags_identify_info + data-type: STRING + - name: c2s_ttl + data-type: INT + - name: s2c_ttl + data-type: INT + ## Treatment + - name: security_rule_list + data-type: ARRAY<BIGINT> + - name: security_action + data-type: STRING + - name: monitor_rule_list + data-type: ARRAY<BIGINT> + - name: shaping_rule_list + data-type: ARRAY<BIGINT> + - name: proxy_rule_list + data-type: ARRAY<BIGINT> + - name: statistics_rule_list + data-type: ARRAY<BIGINT> + - name: sc_rule_list + data-type: ARRAY<BIGINT> + - name: sc_rsp_raw + data-type: ARRAY<BIGINT> + - name: sc_rsp_decrypted + data-type: ARRAY<BIGINT> + - name: proxy_action + data-type: STRING + - name: proxy_pinning_status + data-type: INT + - name: proxy_intercept_status + data-type: INT + - name: proxy_passthrough_reason + data-type: STRING + - name: proxy_client_side_latency_ms + data-type: INT + - name: proxy_server_side_latency_ms + data-type: INT + - name: proxy_client_side_version + data-type: STRING + - name: proxy_server_side_version + data-type: STRING + - name: proxy_cert_verify + data-type: INT + - name: proxy_intercept_error + data-type: STRING + - name: monitor_mirrored_pkts + data-type: INT + - name: monitor_mirrored_bytes + data-type: INT + ## Source + - name: client_ip + data-type: STRING + - name: client_port + data-type: INT + - name: client_os_desc + data-type: STRING + - name: client_geolocation + data-type: STRING + - name: client_country + data-type: STRING + - name: client_super_administrative_area + data-type: STRING + - name: client_administrative_area + data-type: STRING + - name: client_sub_administrative_area + data-type: STRING + - name: client_asn + data-type: BIGINT + - name: subscriber_id + data-type: STRING + - name: imei + data-type: STRING + - name: imsi + data-type: STRING + - name: phone_number + data-type: STRING + - name: apn + data-type: STRING + ## Destination + - name: server_ip + data-type: STRING + - name: server_port + data-type: INT + - name: server_os_desc + data-type: STRING + - name: server_geolocation + data-type: STRING + - name: server_country + data-type: STRING + - name: server_super_administrative_area + data-type: STRING + - name: server_administrative_area + data-type: STRING + - name: server_sub_administrative_area + data-type: STRING + - name: server_asn + data-type: BIGINT + - name: server_fqdn + data-type: STRING + - name: server_domain + data-type: STRING + - name: fqdn_category_list + data-type: ARRAY<INT> + ## Application + - name: app_transition + data-type: STRING + - name: app + data-type: STRING + - name: app_category + data-type: STRING + - name: app_debug_info + data-type: STRING + - name: app_content + data-type: STRING + - name: app_extra_info + data-type: STRING + ## Protocol + - name: ip_protocol + data-type: STRING + - name: decoded_path + data-type: STRING + ## Transmission + - name: sent_pkts + data-type: BIGINT + - name: received_pkts + data-type: BIGINT + - name: sent_bytes + data-type: BIGINT + - name: received_bytes + data-type: BIGINT + - name: tcp_c2s_ip_fragments + data-type: BIGINT + - name: tcp_s2c_ip_fragments + data-type: BIGINT + - name: tcp_c2s_lost_bytes + data-type: BIGINT + - name: tcp_s2c_lost_bytes + data-type: BIGINT + - name: tcp_c2s_o3_pkts + data-type: BIGINT + - name: tcp_s2c_o3_pkts + data-type: BIGINT + - name: tcp_c2s_rtx_pkts + data-type: BIGINT + - name: tcp_s2c_rtx_pkts + data-type: BIGINT + - name: tcp_c2s_rtx_bytes + data-type: BIGINT + - name: tcp_s2c_rtx_bytes + data-type: BIGINT + - name: tcp_rtt_ms + data-type: INT + - name: tcp_client_isn + data-type: BIGINT + - name: tcp_server_isn + data-type: BIGINT + ## Other + - name: packet_capture_file + data-type: STRING + - name: in_src_mac + data-type: STRING + - name: out_src_mac + data-type: STRING + - name: in_dest_mac + data-type: STRING + - name: out_dest_mac + data-type: STRING + - name: encapsulation + data-type: STRING + - name: dup_traffic_flag + data-type: INT + - name: tunnel_id_list + data-type: ARRAY<BIGINT> + - name: tunnel_endpoint_a_desc + data-type: STRING + - name: tunnel_endpoint_b_desc + data-type: STRING + ## SIP + - name: sip_call_id + data-type: STRING + - name: sip_originator_description + data-type: STRING + - name: sip_responder_description + data-type: STRING + - name: sip_user_agent + data-type: STRING + - name: sip_server + data-type: STRING + - name: sip_originator_sdp_connect_ip + data-type: STRING + - name: sip_originator_sdp_media_port + data-type: INT + - name: sip_originator_sdp_media_type + data-type: STRING + - name: sip_originator_sdp_content + data-type: STRING + - name: sip_responder_sdp_connect_ip + data-type: STRING + - name: sip_responder_sdp_media_port + data-type: INT + - name: sip_responder_sdp_media_type + data-type: STRING + - name: sip_responder_sdp_content + data-type: STRING + - name: sip_duration_s + data-type: INT + - name: sip_bye + data-type: STRING + ## RTP + - name: rtp_payload_type_c2s + data-type: INT + - name: rtp_payload_type_s2c + data-type: INT + - name: rtp_pcap_path + data-type: STRING + - name: rtp_originator_dir + data-type: INT + - name: rtp + type: LIST + ttl: 6 minute + schema: + ## General + - name: recv_time + data-type: BIGINT NOT NULL + - name: log_id + data-type: BIGINT NOT NULL + - name: decoded_as + data-type: STRING NOT NULL + - name: session_id + data-type: BIGINT NOT NULL + - name: start_timestamp_ms + data-type: BIGINT NOT NULL + # row-time: + - name: start_timestamp + data-type: TIMESTAMP_LTZ(3) + - name: end_timestamp_ms + data-type: BIGINT + - name: duration_ms + data-type: INT + - name: tcp_handshake_latency_ms + data-type: INT + - name: ingestion_time + data-type: BIGINT + - name: processing_time + data-type: BIGINT + - name: insert_time + data-type: BIGINT + - name: device_id + data-type: STRING + - name: out_link_id + data-type: INT + - name: in_link_id + data-type: INT + - name: device_tag + data-type: STRING + - name: data_center + data-type: STRING + - name: device_group + data-type: STRING + - name: sled_ip + data-type: STRING + - name: address_type + data-type: INT + - name: direction + data-type: STRING + - name: vsys_id + data-type: INT + - name: t_vsys_id + data-type: INT + - name: flags + data-type: BIGINT + - name: flags_identify_info + data-type: STRING + - name: c2s_ttl + data-type: INT + - name: s2c_ttl + data-type: INT + ## Treatment + - name: security_rule_list + data-type: ARRAY<BIGINT> + - name: security_action + data-type: STRING + - name: monitor_rule_list + data-type: ARRAY<BIGINT> + - name: shaping_rule_list + data-type: ARRAY<BIGINT> + - name: proxy_rule_list + data-type: ARRAY<BIGINT> + - name: statistics_rule_list + data-type: ARRAY<BIGINT> + - name: sc_rule_list + data-type: ARRAY<BIGINT> + - name: sc_rsp_raw + data-type: ARRAY<BIGINT> + - name: sc_rsp_decrypted + data-type: ARRAY<BIGINT> + - name: proxy_action + data-type: STRING + - name: proxy_pinning_status + data-type: INT + - name: proxy_intercept_status + data-type: INT + - name: proxy_passthrough_reason + data-type: STRING + - name: proxy_client_side_latency_ms + data-type: INT + - name: proxy_server_side_latency_ms + data-type: INT + - name: proxy_client_side_version + data-type: STRING + - name: proxy_server_side_version + data-type: STRING + - name: proxy_cert_verify + data-type: INT + - name: proxy_intercept_error + data-type: STRING + - name: monitor_mirrored_pkts + data-type: INT + - name: monitor_mirrored_bytes + data-type: INT + ## Source + - name: client_ip + data-type: STRING + - name: client_port + data-type: INT + - name: client_os_desc + data-type: STRING + - name: client_geolocation + data-type: STRING + - name: client_country + data-type: STRING + - name: client_super_administrative_area + data-type: STRING + - name: client_administrative_area + data-type: STRING + - name: client_sub_administrative_area + data-type: STRING + - name: client_asn + data-type: BIGINT + - name: subscriber_id + data-type: STRING + - name: imei + data-type: STRING + - name: imsi + data-type: STRING + - name: phone_number + data-type: STRING + - name: apn + data-type: STRING + ## Destination + - name: server_ip + data-type: STRING + - name: server_port + data-type: INT + - name: server_os_desc + data-type: STRING + - name: server_geolocation + data-type: STRING + - name: server_country + data-type: STRING + - name: server_super_administrative_area + data-type: STRING + - name: server_administrative_area + data-type: STRING + - name: server_sub_administrative_area + data-type: STRING + - name: server_asn + data-type: BIGINT + - name: server_fqdn + data-type: STRING + - name: server_domain + data-type: STRING + - name: fqdn_category_list + data-type: ARRAY<INT> + ## Application + - name: app_transition + data-type: STRING + - name: app + data-type: STRING + - name: app_category + data-type: STRING + - name: app_debug_info + data-type: STRING + - name: app_content + data-type: STRING + - name: app_extra_info + data-type: STRING + ## Protocol + - name: ip_protocol + data-type: STRING + - name: decoded_path + data-type: STRING + ## Transmission + - name: sent_pkts + data-type: BIGINT + - name: received_pkts + data-type: BIGINT + - name: sent_bytes + data-type: BIGINT + - name: received_bytes + data-type: BIGINT + - name: tcp_c2s_ip_fragments + data-type: BIGINT + - name: tcp_s2c_ip_fragments + data-type: BIGINT + - name: tcp_c2s_lost_bytes + data-type: BIGINT + - name: tcp_s2c_lost_bytes + data-type: BIGINT + - name: tcp_c2s_o3_pkts + data-type: BIGINT + - name: tcp_s2c_o3_pkts + data-type: BIGINT + - name: tcp_c2s_rtx_pkts + data-type: BIGINT + - name: tcp_s2c_rtx_pkts + data-type: BIGINT + - name: tcp_c2s_rtx_bytes + data-type: BIGINT + - name: tcp_s2c_rtx_bytes + data-type: BIGINT + - name: tcp_rtt_ms + data-type: INT + - name: tcp_client_isn + data-type: BIGINT + - name: tcp_server_isn + data-type: BIGINT + ## Other + - name: packet_capture_file + data-type: STRING + - name: in_src_mac + data-type: STRING + - name: out_src_mac + data-type: STRING + - name: in_dest_mac + data-type: STRING + - name: out_dest_mac + data-type: STRING + - name: encapsulation + data-type: STRING + - name: dup_traffic_flag + data-type: INT + - name: tunnel_id_list + data-type: ARRAY<BIGINT> + - name: tunnel_endpoint_a_desc + data-type: STRING + - name: tunnel_endpoint_b_desc + data-type: STRING + ## SIP + - name: sip_call_id + data-type: STRING + - name: sip_originator_description + data-type: STRING + - name: sip_responder_description + data-type: STRING + - name: sip_user_agent + data-type: STRING + - name: sip_server + data-type: STRING + - name: sip_originator_sdp_connect_ip + data-type: STRING + - name: sip_originator_sdp_media_port + data-type: INT + - name: sip_originator_sdp_media_type + data-type: STRING + - name: sip_originator_sdp_content + data-type: STRING + - name: sip_responder_sdp_connect_ip + data-type: STRING + - name: sip_responder_sdp_media_port + data-type: INT + - name: sip_responder_sdp_media_type + data-type: STRING + - name: sip_responder_sdp_content + data-type: STRING + - name: sip_duration_s + data-type: INT + - name: sip_bye + data-type: STRING + ## RTP + - name: rtp_payload_type_c2s + data-type: INT + - name: rtp_payload_type_s2c + data-type: INT + - name: rtp_pcap_path + data-type: STRING + - name: rtp_originator_dir + data-type: INT + where: + - on: sip-double-way-records.ok + key-by: vsys_id, SORT_ADDRESS( sip_originator_sdp_connect_ip, sip_originator_sdp_media_port, sip_responder_sdp_connect_ip, sip_responder_sdp_media_port ) AS address + # SIP + process: + - SET sip FROM withColumns(recv_time to rtp_originator_dir) + - if: '@rtp.isNotNull && @rtp.cardinality > 0' + then: + - |- + FLAT OUTPUT ok FOR i IN rtp FROM + @i.$recv_time AS recv_time, + @i.$log_id AS log_id, + 'VoIP' AS decode_as, + @i.$session_id AS session_id, + @i.$start_timestamp_ms AS start_timestamp_ms, + @i.$end_timestamp_ms AS end_timestamp_ms, + @i.$duration_ms AS duration_ms, + @i.$tcp_handshake_latency_ms AS tcp_handshake_latency_ms, + @i.$ingestion_time AS ingestion_time, + @i.$processing_time AS processing_time, + @i.$insert_time AS insert_time, + @i.$device_id AS device_id, + @i.$out_link_id AS out_link_id, + @i.$in_link_id AS in_link_id, + @i.$device_tag AS device_tag, + @i.$data_center AS data_center, + @i.$device_group AS device_group, + @i.$sled_ip AS sled_ip, + @i.$address_type AS address_type, + @i.$direction AS direction, + @i.$vsys_id AS vsys_id, + @i.$t_vsys_id AS t_vsys_id, + @i.$flags AS flags, + @i.$flags_identify_info AS flags_identify_info, + + @i.$c2s_ttl AS c2s_ttl, + @i.$s2c_ttl AS s2c_ttl, + + @i.$client_ip AS client_ip, + @i.$client_port AS client_port, + @i.$client_os_desc AS client_os_desc, + @i.$client_geolocation AS client_geolocation, + @i.$client_country AS client_country, + @i.$client_super_administrative_area AS client_super_administrative_area, + @i.$client_administrative_area AS client_administrative_area, + @i.$client_sub_administrative_area AS client_sub_administrative_area, + @i.$client_asn AS client_asn, + + @i.$server_ip AS server_ip, + @i.$server_port AS server_port, + @i.$server_os_desc AS server_os_desc, + @i.$server_geolocation AS server_geolocation, + @i.$server_country AS server_country, + @i.$server_super_administrative_area AS server_super_administrative_area, + @i.$server_administrative_area AS server_administrative_area, + @i.$server_sub_administrative_area AS server_sub_administrative_area, + @i.$server_asn AS server_asn, + + @i.$ip_protocol AS ip_protocol, + + @i.$sent_pkts AS sent_pkts, + @i.$received_pkts AS received_pkts, + @i.$sent_bytes AS sent_bytes, + @i.$received_bytes AS received_bytes, + + @i.$sip_call_id AS sip_call_id, + @i.$sip_originator_description AS sip_originator_description, + @i.$sip_responder_description AS sip_responder_description, + @i.$sip_user_agent AS sip_user_agent, + @i.$sip_server AS sip_server, + @i.$sip_originator_sdp_connect_ip AS sip_originator_sdp_connect_ip, + @i.$sip_originator_sdp_media_port AS sip_originator_sdp_media_port, + @i.$sip_originator_sdp_media_type AS sip_originator_sdp_media_type, + @i.$sip_originator_sdp_content AS sip_originator_sdp_content, + @i.$sip_responder_sdp_connect_ip AS sip_responder_sdp_connect_ip, + @i.$sip_responder_sdp_media_port AS sip_responder_sdp_media_port, + @i.$sip_responder_sdp_media_type AS sip_responder_sdp_media_type, + @i.$sip_responder_sdp_content AS sip_responder_sdp_content, + @i.$sip_duration_s AS sip_duration_s, + @i.$sip_bye AS sip_bye, + @i.$rtp_payload_type_c2s AS rtp_payload_type_c2s, + @i.$rtp_payload_type_s2c AS rtp_payload_type_s2c, + @i.$rtp_pcap_path AS rtp_pcap_path, + ( @i.$client_ip == sip_originator_sdp_connect_ip).?(1, (@i.$client_ip == sip_responder_sdp_connect_ip).?(2, 0) ) AS rtp_originator_dir + - TRUNCATE rtp + # TODO USE EVENT + - SCHEDULING USING PROCESS TIME FOR NOW + 6 * 60 * 1000 + - on: rtp-records + key-by: vsys_id, SORT_ADDRESS( client_ip, client_port, server_ip, server_port ) AS address + process: + - APPEND rtp FROM withColumns(recv_time to rtp_originator_dir) + - if: '@sip.isNotNull' + then: + - |- + FLAT OUTPUT ok FOR i IN rtp FROM + @i.$recv_time AS recv_time, + @i.$log_id AS log_id, + 'VoIP' AS decode_as, + @i.$session_id AS session_id, + @i.$start_timestamp_ms AS start_timestamp_ms, + @i.$end_timestamp_ms AS end_timestamp_ms, + @i.$duration_ms AS duration_ms, + @i.$tcp_handshake_latency_ms AS tcp_handshake_latency_ms, + @i.$ingestion_time AS ingestion_time, + @i.$processing_time AS processing_time, + @i.$insert_time AS insert_time, + @i.$device_id AS device_id, + @i.$out_link_id AS out_link_id, + @i.$in_link_id AS in_link_id, + @i.$device_tag AS device_tag, + @i.$data_center AS data_center, + @i.$device_group AS device_group, + @i.$sled_ip AS sled_ip, + @i.$address_type AS address_type, + @i.$direction AS direction, + @i.$vsys_id AS vsys_id, + @i.$t_vsys_id AS t_vsys_id, + @i.$flags AS flags, + @i.$flags_identify_info AS flags_identify_info, + + @i.$c2s_ttl AS c2s_ttl, + @i.$s2c_ttl AS s2c_ttl, + + @i.$client_ip AS client_ip, + @i.$client_port AS client_port, + @i.$client_os_desc AS client_os_desc, + @i.$client_geolocation AS client_geolocation, + @i.$client_country AS client_country, + @i.$client_super_administrative_area AS client_super_administrative_area, + @i.$client_administrative_area AS client_administrative_area, + @i.$client_sub_administrative_area AS client_sub_administrative_area, + @i.$client_asn AS client_asn, + + @i.$server_ip AS server_ip, + @i.$server_port AS server_port, + @i.$server_os_desc AS server_os_desc, + @i.$server_geolocation AS server_geolocation, + @i.$server_country AS server_country, + @i.$server_super_administrative_area AS server_super_administrative_area, + @i.$server_administrative_area AS server_administrative_area, + @i.$server_sub_administrative_area AS server_sub_administrative_area, + @i.$server_asn AS server_asn, + + @i.$ip_protocol AS ip_protocol, + + @i.$sent_pkts AS sent_pkts, + @i.$received_pkts AS received_pkts, + @i.$sent_bytes AS sent_bytes, + @i.$received_bytes AS received_bytes, + + @i.$sip_call_id AS sip_call_id, + @i.$sip_originator_description AS sip_originator_description, + @i.$sip_responder_description AS sip_responder_description, + @i.$sip_user_agent AS sip_user_agent, + @i.$sip_server AS sip_server, + @i.$sip_originator_sdp_connect_ip AS sip_originator_sdp_connect_ip, + @i.$sip_originator_sdp_media_port AS sip_originator_sdp_media_port, + @i.$sip_originator_sdp_media_type AS sip_originator_sdp_media_type, + @i.$sip_originator_sdp_content AS sip_originator_sdp_content, + @i.$sip_responder_sdp_connect_ip AS sip_responder_sdp_connect_ip, + @i.$sip_responder_sdp_media_port AS sip_responder_sdp_media_port, + @i.$sip_responder_sdp_media_type AS sip_responder_sdp_media_type, + @i.$sip_responder_sdp_content AS sip_responder_sdp_content, + @i.$sip_duration_s AS sip_duration_s, + @i.$sip_bye AS sip_bye, + @i.$rtp_payload_type_c2s AS rtp_payload_type_c2s, + @i.$rtp_payload_type_s2c AS rtp_payload_type_s2c, + @i.$rtp_pcap_path AS rtp_pcap_path, + ( @i.$client_ip == sip_originator_sdp_connect_ip).?(1, (@i.$client_ip == sip_responder_sdp_connect_ip).?(2, 0) ) AS rtp_originator_dir + - SCHEDULING USING PROCESS TIME FOR NOW + 6 * 60 * 1000 + schedule: + - if: '@rtp.isNotNull && @rtp.cardinality > 0' + then: + - |- + FLAT OUTPUT fail FOR i IN rtp FROM @i.$recv_time AS recv_time, + @i.$log_id AS log_id, + @i.$decoded_as AS decoded_as, + @i.$session_id AS session_id, + @i.$start_timestamp_ms AS start_timestamp_ms, + @i.$start_timestamp AS start_timestamp, + @i.$end_timestamp_ms AS end_timestamp_ms, + @i.$duration_ms AS duration_ms, + @i.$tcp_handshake_latency_ms AS tcp_handshake_latency_ms, + @i.$ingestion_time AS ingestion_time, + @i.$processing_time AS processing_time, + @i.$insert_time AS insert_time, + @i.$device_id AS device_id, + @i.$out_link_id AS out_link_id, + @i.$in_link_id AS in_link_id, + @i.$device_tag AS device_tag, + @i.$data_center AS data_center, + @i.$device_group AS device_group, + @i.$sled_ip AS sled_ip, + @i.$address_type AS address_type, + @i.$direction AS direction, + @i.$vsys_id AS vsys_id, + @i.$t_vsys_id AS t_vsys_id, + @i.$flags AS flags, + @i.$flags_identify_info AS flags_identify_info, + @i.$c2s_ttl AS c2s_ttl, + @i.$s2c_ttl AS s2c_ttl, + @i.$security_rule_list AS security_rule_list, + @i.$security_action AS security_action, + @i.$monitor_rule_list AS monitor_rule_list, + @i.$shaping_rule_list AS shaping_rule_list, + @i.$proxy_rule_list AS proxy_rule_list, + @i.$statistics_rule_list AS statistics_rule_list, + @i.$sc_rule_list AS sc_rule_list, + @i.$sc_rsp_raw AS sc_rsp_raw, + @i.$sc_rsp_decrypted AS sc_rsp_decrypted, + @i.$proxy_action AS proxy_action, + @i.$proxy_pinning_status AS proxy_pinning_status, + @i.$proxy_intercept_status AS proxy_intercept_status, + @i.$proxy_passthrough_reason AS proxy_passthrough_reason, + @i.$proxy_client_side_latency_ms AS proxy_client_side_latency_ms, + @i.$proxy_server_side_latency_ms AS proxy_server_side_latency_ms, + @i.$proxy_client_side_version AS proxy_client_side_version, + @i.$proxy_server_side_version AS proxy_server_side_version, + @i.$proxy_cert_verify AS proxy_cert_verify, + @i.$proxy_intercept_error AS proxy_intercept_error, + @i.$monitor_mirrored_pkts AS monitor_mirrored_pkts, + @i.$monitor_mirrored_bytes AS monitor_mirrored_bytes, + @i.$client_ip AS client_ip, + @i.$client_port AS client_port, + @i.$client_os_desc AS client_os_desc, + @i.$client_geolocation AS client_geolocation, + @i.$client_country AS client_country, + @i.$client_super_administrative_area AS client_super_administrative_area, + @i.$client_administrative_area AS client_administrative_area, + @i.$client_sub_administrative_area AS client_sub_administrative_area, + @i.$client_asn AS client_asn, + @i.$subscriber_id AS subscriber_id, + @i.$imei AS imei, + @i.$imsi AS imsi, + @i.$phone_number AS phone_number, + @i.$apn AS apn, + @i.$server_ip AS server_ip, + @i.$server_port AS server_port, + @i.$server_os_desc AS server_os_desc, + @i.$server_geolocation AS server_geolocation, + @i.$server_country AS server_country, + @i.$server_super_administrative_area AS server_super_administrative_area, + @i.$server_administrative_area AS server_administrative_area, + @i.$server_sub_administrative_area AS server_sub_administrative_area, + @i.$server_asn AS server_asn, + @i.$server_fqdn AS server_fqdn, + @i.$server_domain AS server_domain, + @i.$fqdn_category_list AS fqdn_category_list, + @i.$app_transition AS app_transition, + @i.$app AS app, + @i.$app_category AS app_category, + @i.$app_debug_info AS app_debug_info, + @i.$app_content AS app_content, + @i.$app_extra_info AS app_extra_info, + @i.$ip_protocol AS ip_protocol, + @i.$decoded_path AS decoded_path, + @i.$sent_pkts AS sent_pkts, + @i.$received_pkts AS received_pkts, + @i.$sent_bytes AS sent_bytes, + @i.$received_bytes AS received_bytes, + @i.$tcp_c2s_ip_fragments AS tcp_c2s_ip_fragments, + @i.$tcp_s2c_ip_fragments AS tcp_s2c_ip_fragments, + @i.$tcp_c2s_lost_bytes AS tcp_c2s_lost_bytes, + @i.$tcp_s2c_lost_bytes AS tcp_s2c_lost_bytes, + @i.$tcp_c2s_o3_pkts AS tcp_c2s_o3_pkts, + @i.$tcp_s2c_o3_pkts AS tcp_s2c_o3_pkts, + @i.$tcp_c2s_rtx_pkts AS tcp_c2s_rtx_pkts, + @i.$tcp_s2c_rtx_pkts AS tcp_s2c_rtx_pkts, + @i.$tcp_c2s_rtx_bytes AS tcp_c2s_rtx_bytes, + @i.$tcp_s2c_rtx_bytes AS tcp_s2c_rtx_bytes, + @i.$tcp_rtt_ms AS tcp_rtt_ms, + @i.$tcp_client_isn AS tcp_client_isn, + @i.$tcp_server_isn AS tcp_server_isn, + @i.$packet_capture_file AS packet_capture_file, + @i.$in_src_mac AS in_src_mac, + @i.$out_src_mac AS out_src_mac, + @i.$in_dest_mac AS in_dest_mac, + @i.$out_dest_mac AS out_dest_mac, + @i.$encapsulation AS encapsulation, + @i.$dup_traffic_flag AS dup_traffic_flag, + @i.$tunnel_id_list AS tunnel_id_list, + @i.$tunnel_endpoint_a_desc AS tunnel_endpoint_a_desc, + @i.$tunnel_endpoint_b_desc AS tunnel_endpoint_b_desc, + @i.$sip_call_id AS sip_call_id, + @i.$sip_originator_description AS sip_originator_description, + @i.$sip_responder_description AS sip_responder_description, + @i.$sip_user_agent AS sip_user_agent, + @i.$sip_server AS sip_server, + @i.$sip_originator_sdp_connect_ip AS sip_originator_sdp_connect_ip, + @i.$sip_originator_sdp_media_port AS sip_originator_sdp_media_port, + @i.$sip_originator_sdp_media_type AS sip_originator_sdp_media_type, + @i.$sip_originator_sdp_content AS sip_originator_sdp_content, + @i.$sip_responder_sdp_connect_ip AS sip_responder_sdp_connect_ip, + @i.$sip_responder_sdp_media_port AS sip_responder_sdp_media_port, + @i.$sip_responder_sdp_media_type AS sip_responder_sdp_media_type, + @i.$sip_responder_sdp_content AS sip_responder_sdp_content, + @i.$sip_duration_s AS sip_duration_s, + @i.$sip_bye AS sip_bye, + @i.$rtp_payload_type_c2s AS rtp_payload_type_c2s, + @i.$rtp_payload_type_s2c AS rtp_payload_type_s2c, + @i.$rtp_pcap_path AS rtp_pcap_path, + @i.$rtp_originator_dir AS rtp_originator_dir + - TRUNCATE rtp + - if: '@sip.isNotNull' + then: + - |- + OUTPUT fail FROM @sip.$recv_time AS recv_time, + @sip.$log_id AS log_id, + @sip.$decoded_as AS decoded_as, + @sip.$session_id AS session_id, + @sip.$start_timestamp_ms AS start_timestamp_ms, + @sip.$start_timestamp AS start_timestamp, + @sip.$end_timestamp_ms AS end_timestamp_ms, + @sip.$duration_ms AS duration_ms, + @sip.$tcp_handshake_latency_ms AS tcp_handshake_latency_ms, + @sip.$ingestion_time AS ingestion_time, + @sip.$processing_time AS processing_time, + @sip.$insert_time AS insert_time, + @sip.$device_id AS device_id, + @sip.$out_link_id AS out_link_id, + @sip.$in_link_id AS in_link_id, + @sip.$device_tag AS device_tag, + @sip.$data_center AS data_center, + @sip.$device_group AS device_group, + @sip.$sled_ip AS sled_ip, + @sip.$address_type AS address_type, + @sip.$direction AS direction, + @sip.$vsys_id AS vsys_id, + @sip.$t_vsys_id AS t_vsys_id, + @sip.$flags AS flags, + @sip.$flags_identify_info AS flags_identify_info, + @sip.$c2s_ttl AS c2s_ttl, + @sip.$s2c_ttl AS s2c_ttl, + @sip.$security_rule_list AS security_rule_list, + @sip.$security_action AS security_action, + @sip.$monitor_rule_list AS monitor_rule_list, + @sip.$shaping_rule_list AS shaping_rule_list, + @sip.$proxy_rule_list AS proxy_rule_list, + @sip.$statistics_rule_list AS statistics_rule_list, + @sip.$sc_rule_list AS sc_rule_list, + @sip.$sc_rsp_raw AS sc_rsp_raw, + @sip.$sc_rsp_decrypted AS sc_rsp_decrypted, + @sip.$proxy_action AS proxy_action, + @sip.$proxy_pinning_status AS proxy_pinning_status, + @sip.$proxy_intercept_status AS proxy_intercept_status, + @sip.$proxy_passthrough_reason AS proxy_passthrough_reason, + @sip.$proxy_client_side_latency_ms AS proxy_client_side_latency_ms, + @sip.$proxy_server_side_latency_ms AS proxy_server_side_latency_ms, + @sip.$proxy_client_side_version AS proxy_client_side_version, + @sip.$proxy_server_side_version AS proxy_server_side_version, + @sip.$proxy_cert_verify AS proxy_cert_verify, + @sip.$proxy_intercept_error AS proxy_intercept_error, + @sip.$monitor_mirrored_pkts AS monitor_mirrored_pkts, + @sip.$monitor_mirrored_bytes AS monitor_mirrored_bytes, + @sip.$client_ip AS client_ip, + @sip.$client_port AS client_port, + @sip.$client_os_desc AS client_os_desc, + @sip.$client_geolocation AS client_geolocation, + @sip.$client_country AS client_country, + @sip.$client_super_administrative_area AS client_super_administrative_area, + @sip.$client_administrative_area AS client_administrative_area, + @sip.$client_sub_administrative_area AS client_sub_administrative_area, + @sip.$client_asn AS client_asn, + @sip.$subscriber_id AS subscriber_id, + @sip.$imei AS imei, + @sip.$imsi AS imsi, + @sip.$phone_number AS phone_number, + @sip.$apn AS apn, + @sip.$server_ip AS server_ip, + @sip.$server_port AS server_port, + @sip.$server_os_desc AS server_os_desc, + @sip.$server_geolocation AS server_geolocation, + @sip.$server_country AS server_country, + @sip.$server_super_administrative_area AS server_super_administrative_area, + @sip.$server_administrative_area AS server_administrative_area, + @sip.$server_sub_administrative_area AS server_sub_administrative_area, + @sip.$server_asn AS server_asn, + @sip.$server_fqdn AS server_fqdn, + @sip.$server_domain AS server_domain, + @sip.$fqdn_category_list AS fqdn_category_list, + @sip.$app_transition AS app_transition, + @sip.$app AS app, + @sip.$app_category AS app_category, + @sip.$app_debug_info AS app_debug_info, + @sip.$app_content AS app_content, + @sip.$app_extra_info AS app_extra_info, + @sip.$ip_protocol AS ip_protocol, + @sip.$decoded_path AS decoded_path, + @sip.$sent_pkts AS sent_pkts, + @sip.$received_pkts AS received_pkts, + @sip.$sent_bytes AS sent_bytes, + @sip.$received_bytes AS received_bytes, + @sip.$tcp_c2s_ip_fragments AS tcp_c2s_ip_fragments, + @sip.$tcp_s2c_ip_fragments AS tcp_s2c_ip_fragments, + @sip.$tcp_c2s_lost_bytes AS tcp_c2s_lost_bytes, + @sip.$tcp_s2c_lost_bytes AS tcp_s2c_lost_bytes, + @sip.$tcp_c2s_o3_pkts AS tcp_c2s_o3_pkts, + @sip.$tcp_s2c_o3_pkts AS tcp_s2c_o3_pkts, + @sip.$tcp_c2s_rtx_pkts AS tcp_c2s_rtx_pkts, + @sip.$tcp_s2c_rtx_pkts AS tcp_s2c_rtx_pkts, + @sip.$tcp_c2s_rtx_bytes AS tcp_c2s_rtx_bytes, + @sip.$tcp_s2c_rtx_bytes AS tcp_s2c_rtx_bytes, + @sip.$tcp_rtt_ms AS tcp_rtt_ms, + @sip.$tcp_client_isn AS tcp_client_isn, + @sip.$tcp_server_isn AS tcp_server_isn, + @sip.$packet_capture_file AS packet_capture_file, + @sip.$in_src_mac AS in_src_mac, + @sip.$out_src_mac AS out_src_mac, + @sip.$in_dest_mac AS in_dest_mac, + @sip.$out_dest_mac AS out_dest_mac, + @sip.$encapsulation AS encapsulation, + @sip.$dup_traffic_flag AS dup_traffic_flag, + @sip.$tunnel_id_list AS tunnel_id_list, + @sip.$tunnel_endpoint_a_desc AS tunnel_endpoint_a_desc, + @sip.$tunnel_endpoint_b_desc AS tunnel_endpoint_b_desc, + @sip.$sip_call_id AS sip_call_id, + @sip.$sip_originator_description AS sip_originator_description, + @sip.$sip_responder_description AS sip_responder_description, + @sip.$sip_user_agent AS sip_user_agent, + @sip.$sip_server AS sip_server, + @sip.$sip_originator_sdp_connect_ip AS sip_originator_sdp_connect_ip, + @sip.$sip_originator_sdp_media_port AS sip_originator_sdp_media_port, + @sip.$sip_originator_sdp_media_type AS sip_originator_sdp_media_type, + @sip.$sip_originator_sdp_content AS sip_originator_sdp_content, + @sip.$sip_responder_sdp_connect_ip AS sip_responder_sdp_connect_ip, + @sip.$sip_responder_sdp_media_port AS sip_responder_sdp_media_port, + @sip.$sip_responder_sdp_media_type AS sip_responder_sdp_media_type, + @sip.$sip_responder_sdp_content AS sip_responder_sdp_content, + @sip.$sip_duration_s AS sip_duration_s, + @sip.$sip_bye AS sip_bye, + @sip.$rtp_payload_type_c2s AS rtp_payload_type_c2s, + @sip.$rtp_payload_type_s2c AS rtp_payload_type_s2c, + @sip.$rtp_pcap_path AS rtp_pcap_path, + @sip.$rtp_originator_dir AS rtp_originator_dir + - TRUNCATE sip + - name: cannot-fusion-records + category: UNION + on: + - sip-double-way-records.fail # 没有双向关联成功的 SIP 单向流日志 + - voip-fusion.fail # 没有关联上 SIP 的 RTP 日志 & 没关联上 RTP 的 DOUBLE SIP 日志 + - name: errors-records + category: UNION + on: + - error1-records + - error2-records + - error3-records + - error4-records
\ No newline at end of file diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties index e7c4ef9..32c696e 100644 --- a/src/main/resources/log4j2.properties +++ b/src/main/resources/log4j2.properties @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -rootLogger.level = WARN +rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender appender.console.name = ConsoleAppender diff --git a/src/site/markdown/changelogs.md b/src/site/markdown/changelogs.md new file mode 100644 index 0000000..aab7b60 --- /dev/null +++ b/src/site/markdown/changelogs.md @@ -0,0 +1,5 @@ +## Changelog + +### 2.0 + +- [GAL-602](https://jira.geedge.net/browse/GAL-602) 基于 Easy Stream 框架的配置化改造。
\ No newline at end of file diff --git a/src/site/markdown/deploy.md b/src/site/markdown/deploy.md new file mode 100644 index 0000000..5114e9e --- /dev/null +++ b/src/site/markdown/deploy.md @@ -0,0 +1,13 @@ +## Deploy + +- 准备 JDK ${java.version} 的环境 + +- 准备 Flink ${flink.version} 的环境 + +- [下载](./download.html) 对应版本 UDF 依赖 Jar + +- [下载](./download.html) 对应版本 Job 配置 (一个 yml 文件) + +- 执行命令 `flink run -Dflink.rest.bind-port=8081 -c com.geedgenetworks.flink.easy.core.Runner path/to/sip-rtp-correlation-<version>.jar job.yml` + +- 您将在控制台看到启动日志,同时您可以在 `http://<you-host>:8081` 看到任务 UI。
\ No newline at end of file diff --git a/src/site/markdown/download.md b/src/site/markdown/download.md new file mode 100644 index 0000000..72811df --- /dev/null +++ b/src/site/markdown/download.md @@ -0,0 +1,8 @@ +## Download + +### Easy Stream ${project.version} + +| UDF Jar | Job | +|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| --------------- | +| [JAR](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.jar) ( [MD5](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.jar.md5) [SHA1](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.jar.sha1) ) | [YML](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.tar.gz) ( [MD5](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.tar.gz.md5) [SHA1](${project.distributionManagement.repository.url}/com/geedgenetworks/application/sip-rtp-correlation/${project.version}/${project.artifactId}-${project.version}.tar.gz.sha1) ) | + diff --git a/src/site/markdown/index.md b/src/site/markdown/index.md new file mode 100644 index 0000000..2bf785c --- /dev/null +++ b/src/site/markdown/index.md @@ -0,0 +1,10 @@ +## SIP RTP Correlation + +SIP RTP Correlation 项目是一个使用 Apache Flink 实现的实时数据处理项目,旨在从 Kafka 中读取 SIP(Session Initiation Protocol)和 RTP(Real-time Transport Protocol)数据,将它们融合成完整的 VoIP(Voice over Internet Protocol)通话数据。 + +SIP RTP Correlation 项目可以用于实时监控和分析 VoIP 通话数据,提取关键指标,以及进行实时报警和诊断。 + +<br/> + + +You can download the latest release from [Job Yml](./jobs/job.yml). And you can changelog from [CHANGELOG.md](./changelogs.html).
\ No newline at end of file diff --git a/src/site/resources/css/site.css b/src/site/resources/css/site.css new file mode 100644 index 0000000..11a3d99 --- /dev/null +++ b/src/site/resources/css/site.css @@ -0,0 +1,13 @@ +#banner { + height: 108px; + background: none; +} + +#bannerLeft img { + margin-left: 18px; + margin-top: 10px; +} + +div.well { + display: none; +}
\ No newline at end of file diff --git a/src/site/resources/images/logo.png b/src/site/resources/images/logo.png Binary files differnew file mode 100644 index 0000000..da2277a --- /dev/null +++ b/src/site/resources/images/logo.png diff --git a/src/site/site.xml b/src/site/site.xml new file mode 100644 index 0000000..497fe49 --- /dev/null +++ b/src/site/site.xml @@ -0,0 +1,56 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain maven-site.vm copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project name="SIP RTP Correlate"> + <bannerLeft> + <name>Easy Stream</name> + <src>images/logo.png</src> + <href>#</href> + </bannerLeft> + + <publishDate position="right"/> + <version position="right"/> + + <skin> + <groupId>org.apache.maven.skins</groupId> + <artifactId>maven-fluido-skin</artifactId> + <version>1.10.0</version> + </skin> + + <custom> + <fluidoSkin> + <sourceLineNumbersEnabled>true</sourceLineNumbersEnabled> + </fluidoSkin> + </custom> + + <body> + <breadcrumbs position="left"> + <item name="Galaxy" href="#"/> + <item name="Platform" href="#"/> + <item name="Easy Stream" href="#"/> + <item name="Application" href="#"/> + </breadcrumbs> + + <menu name="OVERVIEW" inherit="top"> + <item name="Introduction" href="index.html"/> + <item name="Deploy" href="deploy.html"/> + <item name="Download" href="download.html"/> + </menu> + + <footer> + <![CDATA[ Copyright ©2022 <a href="#">Galaxy Platform</a>. All rights reserved.]]> + </footer> + </body> +</project> diff --git a/src/test/java/com/geedgenetworks/flink/easy/application/ApplicationTest.java b/src/test/java/com/geedgenetworks/flink/easy/application/ApplicationTest.java new file mode 100644 index 0000000..0c48a4d --- /dev/null +++ b/src/test/java/com/geedgenetworks/flink/easy/application/ApplicationTest.java @@ -0,0 +1,33 @@ +package com.geedgenetworks.flink.easy.application; + +import com.geedgenetworks.flink.easy.core.Runners; +import org.junit.jupiter.api.Test; + +public class ApplicationTest { + + static { + System.setProperty("easy.execute.mode", "validate"); + System.setProperty("flink.rest.bind-port", "8081"); +// System.setProperty("flink.rest.flamegraph.enabled", "true"); + System.setProperty("flink.heartbeat.timeout", "1800000"); + } + + public static String discoverConfiguration(final String name) throws Exception { + var path = String.format("/jobs/%s.yml", name); + var resource = ApplicationTest.class.getResource(path); + if (resource == null) { + // maven + resource = ApplicationTest.class.getResource(String.format("../classes/%s", path)); + } + if (resource == null) { + throw new IllegalArgumentException( + String.format("Not found job '%s' in path [%s].", name, path)); + } + return resource.getPath(); + } + + @Test + public void testJob() throws Exception { + Runners.run(discoverConfiguration("job")); + } +} diff --git a/src/test/java/com/zdjizhi/flink/voip/conf/FusionConfigurationTest.java b/src/test/java/com/zdjizhi/flink/voip/conf/FusionConfigurationTest.java deleted file mode 100644 index b45c9a9..0000000 --- a/src/test/java/com/zdjizhi/flink/voip/conf/FusionConfigurationTest.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.zdjizhi.flink.voip.conf; - -import org.apache.flink.configuration.Configuration; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.Properties; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class FusionConfigurationTest { - - private FusionConfiguration fusionConfiguration; - - @BeforeEach - public void setUp() { - final Configuration config; - config = new Configuration(); - config.setString("prefix_key1", "value1"); - config.setString("prefix_key2", "value2"); - config.setString("other_key", "other_value"); - - fusionConfiguration = new FusionConfiguration(config); - } - - @Test - public void testGetPropertiesWithValidPrefix() { - String prefix = "prefix_"; - Properties properties = fusionConfiguration.getProperties(prefix); - - assertEquals(2, properties.size()); - assertEquals("value1", properties.getProperty("key1")); - assertEquals("value2", properties.getProperty("key2")); - } - - @Test - public void testGetPropertiesWithInvalidPrefix() { - String prefix = "invalid_"; - Properties properties = fusionConfiguration.getProperties(prefix); - - assertTrue(properties.isEmpty()); - } - - @Test - public void testGetPropertiesWithEmptyPrefix() { - String prefix = ""; - Properties properties = fusionConfiguration.getProperties(prefix); - - assertEquals(3, properties.size()); - assertEquals("value1", properties.getProperty("prefix_key1")); - assertEquals("value2", properties.getProperty("prefix_key2")); - assertEquals("other_value", properties.getProperty("other_key")); - } - - @Test - public void testGetPropertiesWithNullPrefix() { - // Null prefix should be treated as an empty prefix - String prefix = null; - Properties properties = fusionConfiguration.getProperties(prefix); - - assertEquals(3, properties.size()); - assertEquals("value1", properties.getProperty("prefix_key1")); - assertEquals("value2", properties.getProperty("prefix_key2")); - assertEquals("other_value", properties.getProperty("other_key")); - } -} diff --git a/src/test/java/com/zdjizhi/flink/voip/data/Generator.java b/src/test/java/com/zdjizhi/flink/voip/data/Generator.java deleted file mode 100644 index b46c714..0000000 --- a/src/test/java/com/zdjizhi/flink/voip/data/Generator.java +++ /dev/null @@ -1,98 +0,0 @@ -package com.zdjizhi.flink.voip.data; - -import com.github.javafaker.Faker; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; - -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicReference; - -/** - * The abstract class Generator<T> serves as the base class for implementing record generators. - * It provides common functionalities for generating random data and controlling the generation ratio. - * - * @param <T> The type of records to be generated. - * @author chaoc - * @since 1.0 - */ -public abstract class Generator<T> { - - protected final ThreadLocalRandom random; - /** - * The ratio of generated records. The total number of records generated will be 100 / ratio. - */ - protected final int ratio; - - private final Faker faker; - - protected final AtomicReference<T> state; - - protected final ObjectMapper mapper; - - /** - * Creates a new Generator with the given ratio. - * - * @param ratio The ratio of generated records. The total number of records generated will be 100 / ratio. - */ - public Generator(final int ratio) { - this.ratio = ratio; - this.faker = new Faker(); - this.random = ThreadLocalRandom.current(); - this.state = new AtomicReference<>(); - this.mapper = new ObjectMapper(); - } - - /** - * Generates the next record based on the specified ratio and the current state. - * It randomly selects whether to generate a new record or return the last generated record. - * - * @return The next generated record of type T. - */ - public abstract T next(); - - /** - * Generates a new record of type T. - * - * @return The newly generated record of type T. - */ - protected abstract T generate(); - - /** - * Performs post-processing on the generated record of type T. - * Subclasses can override this method to modify the generated record before returning it. - * - * @param v The generated record of type T. - * @return The post-processed record of type T. - */ - protected abstract T afterState(T v); - - /** - * Generates a random IP address (either IPv4 or IPv6) . - * - * @return A randomly generated IP address as a string. - */ - public final String nextIp() { - if (random.nextBoolean()) { - return faker.internet().ipV4Address(); - } - return faker.internet().ipV6Address(); - } - - /** - * Generates a random ID number. - * - * @return A randomly generated ID number as a string. - */ - public final String nextId() { - return faker.idNumber().valid(); - } - - /** - * Generates a random port number within the range of 0 to 65535 (inclusive). - * - * @return A randomly generated port number as an integer. - */ - public final int nextPort() { - return random.nextInt(65535); - } -} - diff --git a/src/test/java/com/zdjizhi/flink/voip/data/RTPGenerator.java b/src/test/java/com/zdjizhi/flink/voip/data/RTPGenerator.java deleted file mode 100644 index d997bad..0000000 --- a/src/test/java/com/zdjizhi/flink/voip/data/RTPGenerator.java +++ /dev/null @@ -1,111 +0,0 @@ -package com.zdjizhi.flink.voip.data; - -import com.zdjizhi.flink.voip.records.Record; -import com.zdjizhi.flink.voip.records.SIPRecord; -import com.zdjizhi.flink.voip.records.SchemaType; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; - -/** - * RTPGenerator extends Generator<ObjectNode> and is responsible for generating RTP records. - * It generates random RTP records with specific properties. - * - * @author chaoc - * @since 1.0 - */ -public class RTPGenerator extends Generator<ObjectNode> { - private final SIPGenerator sipGenerator; - - public RTPGenerator(int ratio, SIPGenerator sipGenerator) { - super(ratio); - this.sipGenerator = sipGenerator; - } - - @Override - public ObjectNode next() { - int i = random.nextInt(100); - if (i < ratio) { - final ObjectNode node = sipGenerator.state.get(); - if (null != node) { - final ObjectNode obj = generate(); - obj.set(Record.F_COMMON_CLIENT_IP, node.get(SIPRecord.F_ORIGINATOR_SDP_CONNECT_IP)); - obj.set(Record.F_COMMON_CLIENT_PORT, node.get(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT)); - obj.set(Record.F_COMMON_SERVER_IP, node.get(SIPRecord.F_RESPONDER_SDP_CONNECT_IP)); - obj.set(Record.F_COMMON_SERVER_PORT, node.get(SIPRecord.F_RESPONDER_SDP_MEDIA_PORT)); - return obj; - } - } - return generate(); - } - - @Override - protected ObjectNode generate() { - final String json = "{\n" + - " \"common_address_list\": \"42924-36682-172.17.201.16-172.17.200.50\",\n" + - " \"common_address_type\": 4,\n" + - " \"common_app_full_path\": \"rtp\",\n" + - " \"common_app_label\": \"rtp\",\n" + - " \"common_c2s_byte_num\": 0,\n" + - " \"common_c2s_pkt_num\": 0,\n" + - " \"common_client_ip\": \"172.17.201.16\",\n" + - " \"common_client_port\": 42924,\n" + - " \"common_con_duration_ms\": 1086,\n" + - " \"common_device_id\": \"unknown\",\n" + - " \"common_device_tag\": \"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-tsgxSfcOs\\\"}]}\",\n" + - " \"common_direction\": 73,\n" + - " \"common_end_time\": 1689295970,\n" + - " \"common_flags\": 16401,\n" + - " \"common_flags_identify_info\": \"{\\\"Asymmetric\\\":5,\\\"Server is Local\\\":1,\\\"S2C\\\":1}\",\n" + - " \"common_l4_protocol\": \"IPv4_UDP\",\n" + - " \"common_out_dest_mac\": \"02:fc:08:dc:92:d7\",\n" + - " \"common_out_src_mac\": \"02:fc:08:dc:91:c3\",\n" + - " \"common_protocol_label\": \"ETHERNET.IPv4.UDP\",\n" + - " \"common_s2c_byte_num\": 7570,\n" + - " \"common_s2c_pkt_num\": 5,\n" + - " \"common_schema_type\": \"RTP\",\n" + - " \"common_server_ip\": \"172.17.200.50\",\n" + - " \"common_server_port\": 36682,\n" + - " \"common_sessions\": 1,\n" + - " \"common_sled_ip\": \"192.168.42.54\",\n" + - " \"common_start_time\": 1689294629,\n" + - " \"common_stream_dir\": 3,\n" + - " \"common_stream_trace_id\": \"290484792956466709\",\n" + - " \"common_t_vsys_id\": 24,\n" + - " \"common_vsys_id\": 24,\n" + - " \"raw_log_status\": \"CLOSE\",\n" + - " \"rtp_payload_type_c2s\": 0,\n" + - " \"rtp_payload_type_s2c\": 0,\n" + - " \"rtp_pcap_path\": \"http://192.168.44.67:9098/hos/rtp_hos_bucket/rtp_172.17.200.50_172.17.201.16_27988_55806_1689294629.pcap\"\n" + - "}"; - - ObjectNode obj; - try { - obj = mapper.readValue(json, ObjectNode.class); - } catch (Exception e){ - obj = mapper.createObjectNode(); - } - final SIPRecord record = new SIPRecord(obj); - - record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue()); - record.setString(SIPRecord.F_CALL_ID, nextId()); - record.setInt(SIPRecord.F_COMMON_STREAM_DIR, random.nextInt(3) + 1); - - record.setString(Record.F_COMMON_SERVER_IP, nextIp()); - record.setInt(Record.F_COMMON_SERVER_PORT, nextPort()); - record.setString(Record.F_COMMON_CLIENT_IP, nextIp()); - record.setInt(Record.F_COMMON_CLIENT_PORT, nextPort()); - return obj; - } - - @Override - protected ObjectNode afterState(ObjectNode v) { - final Record record = new Record(v); - switch (record.getStreamDir()) { - case DOUBLE: - record.setInt(Record.F_COMMON_STREAM_DIR, random.nextInt(2) + 1); - break; - case S2C: - default: - } - return v; - } -} diff --git a/src/test/java/com/zdjizhi/flink/voip/data/SIPGenerator.java b/src/test/java/com/zdjizhi/flink/voip/data/SIPGenerator.java deleted file mode 100644 index b86ea99..0000000 --- a/src/test/java/com/zdjizhi/flink/voip/data/SIPGenerator.java +++ /dev/null @@ -1,138 +0,0 @@ -package com.zdjizhi.flink.voip.data; - -import com.zdjizhi.flink.voip.records.Record; -import com.zdjizhi.flink.voip.records.SIPRecord; -import com.zdjizhi.flink.voip.records.SchemaType; -import com.zdjizhi.flink.voip.records.StreamDir; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; - -/** - * SIPGenerator extends Generator<ObjectNode> and is responsible for generating SIP (Session Initiation Protocol) records. - * It generates random SIP records with specific properties such as call ID, stream dir, server IP, server port, - * client IP, client port... information for both originator and responder. - * - * @author chaoc - * @since 1.0 - */ -public class SIPGenerator extends Generator<ObjectNode> { - public SIPGenerator(final int ratio) { - super(ratio); - } - - /** - * Creates a new SIPGenerator with the default ratio of 40. - */ - public SIPGenerator() { - this(40); - } - - @Override - public final ObjectNode next() { - int i = random.nextInt(100); - if (i < ratio && state.get() != null) { - ObjectNode t = afterState(state.get()); - state.set(null); - return t; - } else { - return state.updateAndGet(t -> generate()); - } - } - - - @Override - protected ObjectNode generate() { - final String json = "{\n" + - " \"common_schema_type\": \"SIP\",\n" + - " \"common_sessions\": 1,\n" + - " \"sip_call_id\": \"[email protected]\",\n" + - " \"sip_originator_description\": \"<sip:[email protected]>\",\n" + - " \"sip_responder_description\": \"sip:[email protected]\",\n" + - " \"sip_originator_sdp_content\": \"v=0\\r\\no=SIP-UA 3395000 3397200 IN IP4 172.17.200.50\\r\\ns=SIP Call\\r\\nc=IN IP4 172.17.200.50\\r\\nt=0 0\\r\\nm=audio 36682 RTP/AVP 0\\r\\na=rtpmap:0 PCMU/8000\\r\\n\",\n" + - " \"sip_originator_sdp_connect_ip\": \"172.17.200.50\",\n" + - " \"sip_originator_sdp_media_port\": 36682,\n" + - " \"sip_originator_sdp_media_type\": \"0 PCMU/8000\",\n" + - " \"common_first_ttl\": 128,\n" + - " \"common_c2s_ipfrag_num\": 0,\n" + - " \"common_s2c_ipfrag_num\": 0,\n" + - " \"common_c2s_tcp_unorder_num\": 0,\n" + - " \"common_s2c_tcp_unorder_num\": 0,\n" + - " \"common_c2s_tcp_lostlen\": 0,\n" + - " \"common_s2c_tcp_lostlen\": 0,\n" + - " \"common_c2s_pkt_retrans\": 2,\n" + - " \"common_s2c_pkt_retrans\": 0,\n" + - " \"common_c2s_byte_retrans\": 1100,\n" + - " \"common_s2c_byte_retrans\": 0,\n" + - " \"common_direction\": 69,\n" + - " \"common_app_full_path\": \"sip\",\n" + - " \"common_app_label\": \"sip\",\n" + - " \"common_tcp_client_isn\": 3004427198,\n" + - " \"common_server_ip\": \"172.17.201.16\",\n" + - " \"common_client_ip\": \"172.17.200.49\",\n" + - " \"common_server_port\": 5060,\n" + - " \"common_client_port\": 6948,\n" + - " \"common_stream_dir\": 1,\n" + - " \"common_address_type\": 4,\n" + - " \"common_address_list\": \"6948-5060-172.17.200.50-172.17.201.16\",\n" + - " \"common_start_time\": 1689295655,\n" + - " \"common_end_time\": 1689295670,\n" + - " \"common_con_duration_ms\": 41467,\n" + - " \"common_s2c_pkt_num\": 0,\n" + - " \"common_s2c_byte_num\": 0,\n" + - " \"common_c2s_pkt_num\": 6,\n" + - " \"common_c2s_byte_num\": 1834,\n" + - " \"common_establish_latency_ms\": 249,\n" + - " \"common_out_src_mac\": \"02:fc:08:dc:91:c3\",\n" + - " \"common_out_dest_mac\": \"02:fc:08:dc:92:d7\",\n" + - " \"common_flags\": 8201,\n" + - " \"common_flags_identify_info\": \"{\\\"Asymmetric\\\":6,\\\"Client is Local\\\":1,\\\"C2S\\\":1}\",\n" + - " \"common_protocol_label\": \"ETHERNET.IPv4.TCP\",\n" + - " \"common_stream_trace_id\": \"290502385134123507\",\n" + - " \"common_l4_protocol\": \"IPv4_TCP\",\n" + - " \"common_sled_ip\": \"192.168.42.54\",\n" + - " \"common_device_id\": \"unknown\",\n" + - " \"common_device_tag\": \"{\\\"tags\\\":[{\\\"tag\\\":\\\"device_group\\\",\\\"value\\\":\\\"group-xxg-tsgxSfcOs\\\"}]}\",\n" + - " \"common_t_vsys_id\": 24,\n" + - " \"common_vsys_id\": 24\n" + - "}"; - - ObjectNode obj; - try { - obj = mapper.readValue(json, ObjectNode.class); - } catch (Exception e) { - obj = mapper.createObjectNode(); - } - - final SIPRecord record = new SIPRecord(obj); - - record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.SIP.getValue()); - - record.setString(SIPRecord.F_CALL_ID, nextId() + "@spirent.com"); - record.setInt(SIPRecord.F_COMMON_STREAM_DIR, (random.nextBoolean() ? StreamDir.S2C : StreamDir.C2S).getValue()); - - record.setString(Record.F_COMMON_SERVER_IP, nextIp()); - record.setInt(Record.F_COMMON_SERVER_PORT, nextPort()); - record.setString(Record.F_COMMON_CLIENT_IP, nextIp()); - record.setInt(Record.F_COMMON_CLIENT_PORT, nextPort()); - - record.setString(SIPRecord.F_ORIGINATOR_SDP_CONNECT_IP, nextIp()); - record.setInt(SIPRecord.F_ORIGINATOR_SDP_MEDIA_PORT, nextPort()); - record.setString(SIPRecord.F_RESPONDER_SDP_CONNECT_IP, nextIp()); - record.setInt(SIPRecord.F_RESPONDER_SDP_MEDIA_PORT, nextPort()); - return obj; - } - - @Override - protected ObjectNode afterState(ObjectNode v) { - final Record record = new Record(v); - switch (record.getStreamDir()) { - case C2S: - record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.S2C.getValue()); - break; - case S2C: - record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.C2S.getValue()); - break; - default: - } - return v; - } -} diff --git a/src/test/java/com/zdjizhi/flink/voip/functions/AddressTest.java b/src/test/java/com/zdjizhi/flink/voip/functions/AddressTest.java deleted file mode 100644 index 7eb5544..0000000 --- a/src/test/java/com/zdjizhi/flink/voip/functions/AddressTest.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.zdjizhi.flink.voip.functions; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class AddressTest { - - @Test - public void testAddressOfWithDifferentPortNumbers() { - Tuple2<String, Integer> a1 = Tuple2.of("192.168.1.10", 8080); - Tuple2<String, Integer> a2 = Tuple2.of("192.168.1.20", 9090); - - Address address = Address.of(a1, a2); - - assertEquals("192.168.1.10", address.getIp1()); - assertEquals(8080, address.getPort1()); - assertEquals("192.168.1.20", address.getIp2()); - assertEquals(9090, address.getPort2()); - } - - @Test - public void testAddressOfWithSamePortNumber() { - Tuple2<String, Integer> a1 = Tuple2.of("192.168.1.10", 8080); - Tuple2<String, Integer> a2 = Tuple2.of("192.168.1.20", 8080); - - Address address = Address.of(a1, a2); - - assertEquals("192.168.1.10", address.getIp1()); - assertEquals(8080, address.getPort1()); - assertEquals("192.168.1.20", address.getIp2()); - assertEquals(8080, address.getPort2()); - } - - @Test - public void testAddressOfWithInvertedOrder() { - Tuple2<String, Integer> a1 = Tuple2.of("192.168.1.20", 8080); - Tuple2<String, Integer> a2 = Tuple2.of("192.168.1.10", 9090); - - Address address = Address.of(a1, a2); - - assertEquals("192.168.1.20", address.getIp1()); - assertEquals(8080, address.getPort1()); - assertEquals("192.168.1.10", address.getIp2()); - assertEquals(9090, address.getPort2()); - } -} diff --git a/src/test/java/com/zdjizhi/flink/voip/functions/SIPPairingFunctionTest.java b/src/test/java/com/zdjizhi/flink/voip/functions/SIPPairingFunctionTest.java deleted file mode 100644 index 104c20e..0000000 --- a/src/test/java/com/zdjizhi/flink/voip/functions/SIPPairingFunctionTest.java +++ /dev/null @@ -1,104 +0,0 @@ -package com.zdjizhi.flink.voip.functions; - -import com.zdjizhi.flink.voip.conf.FusionConfigs; -import com.zdjizhi.flink.voip.data.SIPGenerator; -import com.zdjizhi.flink.voip.records.Record; -import com.zdjizhi.flink.voip.records.StreamDir; -import org.apache.flink.api.common.state.MapState; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.streaming.api.operators.KeyedProcessOperator; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -import java.util.Set; -import java.util.stream.Collectors; - -import static org.junit.jupiter.api.Assertions.*; - - -public class SIPPairingFunctionTest { - private static final int INTERVAL_MINUTES = 5; - private static KeyedOneInputStreamOperatorTestHarness<Integer, ObjectNode, ObjectNode> testHarness; - - @BeforeAll - static void setUp() throws Exception { - final SIPPairingFunction func = new SIPPairingFunction(); - final KeyedProcessOperator<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode> operator = - new KeyedProcessOperator<>(func); - final TypeInformation<Integer> type = TypeInformation.of(Integer.class); - final KeySelector<ObjectNode, Integer> keySelector = jsonNodes -> 0; - testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, type); - - final Configuration configuration = new Configuration(); - configuration.set(FusionConfigs.SIP_STATE_CLEAR_INTERVAL, INTERVAL_MINUTES); - testHarness.getExecutionConfig().setGlobalJobParameters(configuration); - testHarness.open(); - } - - @Test - public void testProperlyPairSIPRecords() throws Exception { - final SIPGenerator sipGenerator = new SIPGenerator(); - long current = System.currentTimeMillis(); - - final ObjectNode obj = sipGenerator.next(); - final Record record = new Record(obj); - record.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.DOUBLE.getValue()); - - final StreamRecord<ObjectNode> streamRecord = new StreamRecord<>(obj, current); - - testHarness.processElement(streamRecord); - assertTrue(testHarness.getOutput().contains(streamRecord)); - - long interval = Time.minutes(INTERVAL_MINUTES).toMilliseconds(); - long nextFireTime = (testHarness.getProcessingTime() / interval + 1) * interval; - assertTrue(testHarness.getProcessingTimeService().getActiveTimerTimestamps().contains(nextFireTime)); - - final ObjectNode obj1 = obj.deepCopy(); - final Record record1 = new Record(obj1); - record1.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.S2C.getValue()); - - final ObjectNode obj2 = obj.deepCopy(); - final Record record2 = new Record(obj2); - record2.setInt(Record.F_COMMON_STREAM_DIR, StreamDir.C2S.getValue()); - - final MapStateDescriptor<Address, ObjectNodeInfo> descriptor = - new MapStateDescriptor<>( - "sip-state", - TypeInformation.of(Address.class), - TypeInformation.of(ObjectNodeInfo.class) - ); - - testHarness.processElement(obj1, current); - final MapState<Address, ObjectNodeInfo> mapState = testHarness.getOperator() - .getKeyedStateStore().getMapState(descriptor); - final Set<ObjectNode> objectNodes1 = Lists.newArrayList(mapState.values()).stream() - .map(ObjectNodeInfo::getObj) - .collect(Collectors.toSet()); - assertTrue(objectNodes1.contains(obj1)); - - testHarness.processElement(obj2, current); - final Set<ObjectNode> objectNodes2 = Lists.newArrayList(mapState.values()).stream() - .map(ObjectNodeInfo::getObj) - .collect(Collectors.toSet()); - assertFalse(objectNodes2.contains(obj2)); - assertTrue(testHarness.getOutput().contains(new StreamRecord<>(obj2, current))); - assertEquals(2, testHarness.getOutput().size()); - } - - - @AfterAll - static void close() throws Exception { - testHarness.close(); - } - -}
\ No newline at end of file diff --git a/src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java b/src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java deleted file mode 100644 index 0f22986..0000000 --- a/src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.zdjizhi.flink.voip.records; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.IntNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class RecordTest { - - private static ObjectMapper mapper; - - @BeforeAll - static void setUp() { - mapper = new ObjectMapper(); - } - - @Test - void testGetVSysID() { - final ObjectNode obj = mapper.createObjectNode(); - final Record record = new Record(obj); - record.setInt(Record.F_COMMON_VSYS_ID, 42); - assertEquals(42, record.getVSysID()); - - obj.set(Record.F_COMMON_VSYS_ID, IntNode.valueOf(40)); - assertEquals(40, record.getVSysID()); - } - - @Test - void testGetSchemaType() { - final ObjectNode obj = mapper.createObjectNode(); - final Record record = new Record(obj); - record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue()); - assertEquals(SchemaType.RTP, record.getSchemaType()); - - obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaType.VOIP.getValue())); - assertEquals(SchemaType.VOIP, record.getSchemaType()); - } - - @Test - void testSetInt() { - final ObjectNode obj = mapper.createObjectNode(); - final Record record = new Record(obj); - record.setInt("intField", 123); - assertEquals(123, obj.get("intField").intValue()); - } - - @Test - void testSetString() { - final ObjectNode obj = mapper.createObjectNode(); - final Record record = new Record(obj); - record.setString("stringField", "testValue"); - assertEquals("testValue", obj.get("stringField").textValue()); - } - - @Test - void testMerge() { - final ObjectNode obj = mapper.createObjectNode(); - obj.set("field1", IntNode.valueOf(1)); - ObjectNode other = mapper.createObjectNode(); - other.set("field2", TextNode.valueOf("value2")); - Record record = new Record(obj); - record.merge(other); - assertEquals(1, obj.get("field1").intValue()); - assertEquals("value2", obj.get("field2").textValue()); - } -} diff --git a/src/test/resources/data/session-records.txt b/src/test/resources/data/session-records.txt new file mode 100644 index 0000000..b54426f --- /dev/null +++ b/src/test/resources/data/session-records.txt @@ -0,0 +1,4 @@ +{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240001,"decoded_as":"RTP","ip_protocol":"udp","address_type":4,"client_ip":"192.168.64.8","server_ip":"192.168.39.62","client_port":25524,"server_port":4580,"t_vsys_id":0,"vsys_id":1024,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.rtp","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830004000,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":57620,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","rtp_pcap_path":"123e4567-e89b-12d3-a456-426614174005"} +{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240002,"decoded_as":"SIP","ip_protocol":"udp","address_type":4,"client_ip":"10.0.0.1","server_ip":"192.0.2.1","client_port":1000,"server_port":60000,"t_vsys_id":0,"vsys_id":1025,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.sip","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830000100,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":8192,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","sip_call_id":"NGMxZWY3Y2NmMzNlNGE3NzJhODgyZDAwM2YyMzQ4NGI.","sip_originator_description":"\"lina\"<sip:[email protected]>;tag=1837055d","sip_responder_description":"\"1075\"<sip:[email protected]>","sip_originator_sdp_connect_ip":"192.168.64.85","sip_originator_sdp_media_port":25524,"sip_originator_sdp_media_type":"application/sdp","sip_server":"OpenSIPS (2.4.11 (x86_64/linux))","sip_responder_sdp_connect_ip":"192.168.39.62","sip_responder_sdp_media_port":4580,"sip_responder_sdp_media_type":"application/sdp","sip_duration_s":590,"sip_bye":"responder","sip_cseq":"2 BYE","sip_via":"SIP/2.0/UDP 192.0.2.1:5060;branch=z9hG4bKbe7c.392190f1.0","sip_user_agent":"eyeBeam release 1011d stamp 40820","sip_is_request":0} +{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240003,"decoded_as":"RTP","ip_protocol":"udp","address_type":4,"client_ip":"192.168.164.18","server_ip":"192.168.39.162","client_port":65121,"server_port":4670,"t_vsys_id":0,"vsys_id":1024,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.rtp","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830000000,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":57620,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","rtp_pcap_path":"123e4567-e89b-12d3-a456-426614174005"} +{"log_id": 438322402985248769,"recv_time":946681200,"__inputid":"tsg_olap","session_id":10240004,"decoded_as":"SIP","ip_protocol":"udp","address_type":4,"client_ip":"10.0.0.2","server_ip":"192.0.2.2","client_port":1000,"server_port":60000,"t_vsys_id":0,"vsys_id":1024,"data_center":"tsg_olap","device_group":"tsg_olap","device_id":"0000000000000000","sled_ip":"127.0.0.1","app":"unknown","app_transition":"","client_geolocation":"unknown","server_geolocation":"unknown","decoded_path":"ETHERNET.IPv4.UDP.sip","server_fqdn":"","out_src_mac":"00:1A:2B:3C:4D:5E","out_dest_mac":"5E:4D:3C:2B:1A:00","start_timestamp_ms":1715830000000,"end_timestamp_ms":946681200000,"tcp_rtt_ms":0,"tcp_client_isn":0,"tcp_server_isn":0,"tcp_handshake_latency_ms":0,"in_link_id":0,"out_link_id":0,"duration_ms":0,"sent_pkts":0,"sent_bytes":0,"received_pkts":0,"received_bytes":0,"tcp_c2s_ip_fragments":0,"tcp_s2c_ip_fragments":0,"tcp_c2s_rtx_pkts":0,"tcp_c2s_rtx_bytes":0,"tcp_s2c_rtx_pkts":0,"tcp_s2c_rtx_bytes":0,"tcp_c2s_o3_pkts":0,"tcp_s2c_o3_pkts":0,"tcp_c2s_lost_bytes":0,"tcp_s2c_lost_bytes":0,"flags":24584,"flags_identify_info":[1,1],"fqdn_category_list":[0],"monitor_rule_list":[0],"security_rule_list":[0],"sc_rule_list":[0],"shaping_rule_list":[0],"proxy_rule_list":[0],"statistics_rule_list":[0],"monitor_mirrored_pkts":0,"monitor_mirrored_bytes":0,"client_os_desc":"Windows","server_os_desc":"Linux","device_tag":"{\"tags\":[{\"tag\":\"data_center\",\"value\":\"tsg_olap\"},{\"tag\":\"device_group\",\"value\":\"tsg_olap\"}]}","dup_traffic_flag":0,"sc_rsp_raw":[0],"encapsulation":"[{\"tunnels_schema_type\":\"ETHERNET\",\"source_mac\":\"00:1A:2B:3C:4D:5E\",\"destination_mac\":\"5E:4D:3C:2B:1A:00\"}]","sip_call_id":"IUMxZWY3Y2NmMzNlNGE3NzJhODgyZDAwM2YyMzQ4NGI.","sip_originator_description":"\"lina\"<sip:[email protected]>;tag=1837055d","sip_responder_description":"\"1075\"<sip:[email protected]>","sip_originator_sdp_connect_ip":"192.168.64.8","sip_originator_sdp_media_port":25524,"sip_originator_sdp_media_type":"application/sdp","sip_server":"OpenSIPS (2.4.11 (x86_64/linux))","sip_responder_sdp_connect_ip":"192.168.39.62","sip_responder_sdp_media_port":4580,"sip_responder_sdp_media_type":"application/sdp","sip_duration_s":590,"sip_bye":"responder","sip_cseq":"2 BYE","sip_via":"SIP/2.0/UDP 192.0.2.1:5060;branch=z9hG4bKbe7c.392190f1.0","sip_user_agent":"eyeBeam release 1011d stamp 40820","sip_is_request":0}
\ No newline at end of file diff --git a/tools/dist/target.xml b/tools/dist/target.xml new file mode 100644 index 0000000..2ea0035 --- /dev/null +++ b/tools/dist/target.xml @@ -0,0 +1,44 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<assembly + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> + <id>bin</id> + + <formats> + <format>tar.gz</format> + </formats> + + <includeBaseDirectory>false</includeBaseDirectory> + + <fileSets> + <fileSet> + <directory>src/main/resources/jobs</directory> + <includes> + <include>*.yml</include> + </includes> + <fileMode>0755</fileMode> + <lineEnding>lf</lineEnding> + <directoryMode>0644</directoryMode> + <outputDirectory>./</outputDirectory> + </fileSet> + </fileSets> + +</assembly> diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml new file mode 100644 index 0000000..78f2137 --- /dev/null +++ b/tools/maven/checkstyle.xml @@ -0,0 +1,394 @@ +<?xml version="1.0"?> +<!DOCTYPE module PUBLIC + "-//Puppy Crawl//DTD Check Configuration 1.3//EN" + "http://www.puppycrawl.com/dtds/configuration_1_3.dtd"> + +<!-- +This is a checkstyle configuration file. For descriptions of +what the following rules do, please see the checkstyle configuration +page at http://checkstyle.sourceforge.net/config.html. +--> + +<module name="Checker"> + + <module name="RegexpSingleline"> + <!-- Checks that TODOs don't have stuff in parenthesis, e.g., username. --> + <property name="format" value="((//.*)|(\*.*))TODO\("/> + <property name="message" value="TODO comments must not include usernames."/> + <property name="severity" value="error"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="\s+$"/> + <property name="message" value="Trailing whitespace"/> + <property name="severity" value="error"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="Throwables.propagate\("/> + <property name="message" value="Throwables.propagate is deprecated"/> + <property name="severity" value="error"/> + </module> + + <!-- Prevent *Tests.java as tools may not pick them up --> + <module name="RegexpOnFilename"> + <property name="fileNamePattern" value=".*Tests\.java$"/> + </module> + + <module name="SuppressionFilter"> + <property name="file" value="${checkstyle.suppressions.file}" default="suppressions.xml"/> + </module> + + <module name="FileLength"> + <property name="max" value="3000"/> + </module> + + <!-- All Java AST specific tests live under TreeWalker module. --> + <module name="TreeWalker"> + + <!-- Allow use of comment to suppress javadocstyle --> + <module name="SuppressionCommentFilter"> + <property name="offCommentFormat" value="CHECKSTYLE.OFF\: ([\w\|]+)"/> + <property name="onCommentFormat" value="CHECKSTYLE.ON\: ([\w\|]+)"/> + <property name="checkFormat" value="$1"/> + </module> + + <!-- Prohibit T.getT() methods for standard boxed types --> + <module name="Regexp"> + <property name="format" value="Boolean\.getBoolean"/> + <property name="illegalPattern" value="true"/> + <property name="message" value="Use System.getProperties() to get system properties."/> + </module> + + <module name="Regexp"> + <property name="format" value="Integer\.getInteger"/> + <property name="illegalPattern" value="true"/> + <property name="message" value="Use System.getProperties() to get system properties."/> + </module> + + <module name="Regexp"> + <property name="format" value="Long\.getLong"/> + <property name="illegalPattern" value="true"/> + <property name="message" value="Use System.getProperties() to get system properties."/> + </module> + + <!-- + + IllegalImport cannot blacklist classes so we have to fall back to Regexp. + + --> + + <!-- forbid use of commons lang validate --> + <module name="Regexp"> + <property name="format" value="org\.apache\.commons\.lang3\.Validate"/> + <property name="illegalPattern" value="true"/> + <property name="message" + value="Use Guava Checks instead of Commons Validate. Please refer to the coding guidelines."/> + </module> + <module name="Regexp"> + <property name="format" value="org\.apache\.commons\.lang\."/> + <property name="illegalPattern" value="true"/> + <property name="message" value="Use commons-lang3 instead of commons-lang."/> + </module> + <module name="Regexp"> + <property name="format" value="org\.codehaus\.jettison"/> + <property name="illegalPattern" value="true"/> + <property name="message" value="Use flink-shaded-jackson instead of jettison."/> + </module> + <module name="Regexp"> + <property name="format" value="org\.testcontainers\.shaded"/> + <property name="illegalPattern" value="true"/> + <property name="message" + value="Use utilities from appropriate library instead of org.testcontainers."/> + </module> + + <!-- Enforce Java-style array declarations --> + <module name="ArrayTypeStyle"/> + + <module name="TodoComment"> + <!-- Checks that disallowed strings are not used in comments. --> + <property name="format" value="(FIXME)|(XXX)"/> + </module> + + <!-- + + IMPORT CHECKS + + --> + + <module name="RedundantImport"> + <!-- Checks for redundant import statements. --> + <property name="severity" value="error"/> + <message key="import.redundancy" + value="Redundant import {0}."/> + </module> + + <module name="IllegalImport"> + <property name="illegalPkgs" + value="autovalue.shaded, avro.shaded, com.google.api.client.repackaged, com.google.appengine.repackaged"/> + </module> + <module name="IllegalImport"> + <property name="illegalPkgs" value="com.fasterxml.jackson"/> + <message key="import.illegal" value="{0}; Use flink-shaded-jackson instead."/> + </module> + <module name="IllegalImport"> + <property name="illegalPkgs" value="org.codehaus.jackson"/> + <message key="import.illegal" value="{0}; Use flink-shaded-jackson instead."/> + </module> + <module name="IllegalImport"> + <property name="illegalPkgs" value="org.objectweb.asm"/> + <message key="import.illegal" value="{0}; Use flink-shaded-asm instead."/> + </module> + <module name="IllegalImport"> + <property name="illegalPkgs" value="io.netty"/> + <message key="import.illegal" value="{0}; Use flink-shaded-netty instead."/> + </module> + + <module name="RedundantModifier"> + <!-- Checks for redundant modifiers on various symbol definitions. + See: http://checkstyle.sourceforge.net/config_modifier.html#RedundantModifier + + We exclude METHOD_DEF to allow final methods in final classes to make them more future-proof. + --> + <property name="tokens" + value="VARIABLE_DEF, ANNOTATION_FIELD_DEF, INTERFACE_DEF, CLASS_DEF, ENUM_DEF"/> + </module> + + <!-- + IllegalImport cannot blacklist classes, and c.g.api.client.util is used for some shaded + code and some useful code. So we need to fall back to Regexp. + --> + <module name="RegexpSinglelineJava"> + <property name="format" value="^import com.google.common.base.Preconditions;$"/> + <property name="message" value="Static import functions from Guava Preconditions"/> + </module> + + <module name="UnusedImports"> + <property name="severity" value="error"/> + <property name="processJavadoc" value="true"/> + <message key="import.unused" + value="Unused import: {0}."/> + </module> + + <!-- + + NAMING CHECKS + + --> + + <!-- Item 38 - Adhere to generally accepted naming conventions --> + + <module name="PackageName"> + <!-- Validates identifiers for package names against the + supplied expression. --> + <!-- Here the default checkstyle rule restricts package name parts to + seven characters, this is not in line with common practice at Google. + --> + <property name="format" value="^[a-z]+(\.[a-z][a-z0-9]{1,})*$"/> + <property name="severity" value="error"/> + </module> + + <module name="TypeNameCheck"> + <!-- Validates static, final fields against the + expression "^[A-Z][a-zA-Z0-9]*$". --> + <metadata name="altname" value="TypeName"/> + <property name="severity" value="error"/> + </module> + + <module name="ConstantNameCheck"> + <!-- Validates non-private, static, final fields against the supplied + public/package final fields "^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$". --> + <metadata name="altname" value="ConstantName"/> + <property name="applyToPublic" value="true"/> + <property name="applyToProtected" value="true"/> + <property name="applyToPackage" value="true"/> + <property name="applyToPrivate" value="true"/> + <property name="format" value="^([A-Z][A-Z0-9]*(_[A-Z0-9]+)*|FLAG_.*)$"/> + <message key="name.invalidPattern" + value="Variable ''{0}'' should be in ALL_CAPS (if it is a constant) or be private (otherwise)."/> + <property name="severity" value="error"/> + </module> + + <module name="StaticVariableNameCheck"> + <!-- Validates static, non-final fields against the supplied + expression "^[a-z][a-zA-Z0-9]*_?$". --> + <metadata name="altname" value="StaticVariableName"/> + <property name="applyToPublic" value="true"/> + <property name="applyToProtected" value="true"/> + <property name="applyToPackage" value="true"/> + <property name="applyToPrivate" value="true"/> + <property name="format" value="^[a-z][a-zA-Z0-9]*_?$"/> + <property name="severity" value="error"/> + </module> + + <module name="MemberNameCheck"> + <!-- Validates non-static members against the supplied expression. --> + <metadata name="altname" value="MemberName"/> + <property name="applyToPublic" value="true"/> + <property name="applyToProtected" value="true"/> + <property name="applyToPackage" value="true"/> + <property name="applyToPrivate" value="true"/> + <property name="format" value="^[a-z][a-zA-Z0-9]*$"/> + <property name="severity" value="error"/> + </module> + + <module name="MethodNameCheck"> + <!-- Validates identifiers for method names. --> + <metadata name="altname" value="MethodName"/> + <property name="format" value="^[a-z][a-zA-Z0-9]*(_[a-zA-Z0-9]+)*$"/> + <property name="severity" value="error"/> + </module> + + <module name="ParameterName"> + <!-- Validates identifiers for method parameters against the + expression "^[a-z][a-zA-Z0-9]*$". --> + <property name="severity" value="error"/> + </module> + + <module name="LocalFinalVariableName"> + <!-- Validates identifiers for local final variables against the + expression "^[a-z][a-zA-Z0-9]*$". --> + <property name="severity" value="error"/> + </module> + + <module name="LocalVariableName"> + <!-- Validates identifiers for local variables against the + expression "^[a-z][a-zA-Z0-9]*$". --> + <property name="severity" value="error"/> + </module> + + <!-- + + LENGTH and CODING CHECKS + + --> + + <!-- Checks for braces around if and else blocks --> + <module name="NeedBraces"> + <property name="severity" value="error"/> + <property name="tokens" + value="LITERAL_IF, LITERAL_ELSE, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO"/> + </module> + + <module name="UpperEll"> + <!-- Checks that long constants are defined with an upper ell.--> + <property name="severity" value="error"/> + </module> + + <module name="FallThrough"> + <!-- Warn about falling through to the next case statement. Similar to + javac -Xlint:fallthrough, but the check is suppressed if a single-line comment + on the last non-blank line preceding the fallen-into case contains 'fall through' (or + some other variants that we don't publicized to promote consistency). + --> + <property name="reliefPattern" + value="fall through|Fall through|fallthru|Fallthru|falls through|Falls through|fallthrough|Fallthrough|No break|NO break|no break|continue on"/> + <property name="severity" value="error"/> + </module> + + <!-- Checks for over-complicated boolean expressions. --> + <module name="SimplifyBooleanExpression"/> + + <!-- Detects empty statements (standalone ";" semicolon). --> + <module name="EmptyStatement"/> + + <!-- Detect multiple consecutive semicolons (e.g. ";;"). --> + <module name="RegexpSinglelineJava"> + <property name="format" value=";{2,}"/> + <property name="message" value="Use one semicolon"/> + <property name="ignoreComments" value="true"/> + </module> + + <!-- + + MODIFIERS CHECKS + + --> + + <module name="ModifierOrder"> + <!-- Warn if modifier order is inconsistent with JLS3 8.1.1, 8.3.1, and + 8.4.3. The prescribed order is: + public, protected, private, abstract, static, final, transient, volatile, + synchronized, native, strictfp + --> + <property name="severity" value="error"/> + </module> + + + <!-- + + WHITESPACE CHECKS + + --> + + <module name="EmptyLineSeparator"> + <!-- Checks for empty line separator between tokens. The only + excluded token is VARIABLE_DEF, allowing class fields to + be declared on consecutive lines. + --> + <property name="allowMultipleEmptyLines" value="false"/> + <property name="allowMultipleEmptyLinesInsideClassMembers" value="false"/> + <property name="tokens" value="PACKAGE_DEF, IMPORT, STATIC_IMPORT, CLASS_DEF, + INTERFACE_DEF, ENUM_DEF, STATIC_INIT, INSTANCE_INIT, METHOD_DEF, + CTOR_DEF"/> + </module> + + <module name="SingleSpaceSeparator"/> + + <module name="WhitespaceAround"> + <!-- Checks that various tokens are surrounded by whitespace. + This includes most binary operators and keywords followed + by regular or curly braces. + --> + <property name="tokens" value="ASSIGN, BAND, BAND_ASSIGN, BOR, + BOR_ASSIGN, BSR, BSR_ASSIGN, BXOR, BXOR_ASSIGN, COLON, DIV, DIV_ASSIGN, + EQUAL, GE, GT, LAMBDA, LAND, LE, LITERAL_CATCH, LITERAL_DO, LITERAL_ELSE, + LITERAL_FINALLY, LITERAL_FOR, LITERAL_IF, LITERAL_RETURN, + LITERAL_SYNCHRONIZED, LITERAL_TRY, LITERAL_WHILE, LOR, LT, MINUS, + MINUS_ASSIGN, MOD, MOD_ASSIGN, NOT_EQUAL, PLUS, PLUS_ASSIGN, QUESTION, + SL, SL_ASSIGN, SR_ASSIGN, STAR, STAR_ASSIGN, TYPE_EXTENSION_AND"/> + <property name="severity" value="error"/> + </module> + + <module name="WhitespaceAfter"> + <!-- Checks that commas, semicolons and typecasts are followed by + whitespace. + --> + <property name="tokens" value="COMMA, SEMI, TYPECAST"/> + </module> + + <module name="NoWhitespaceAfter"> + <!-- Checks that there is no whitespace after various unary operators. + Linebreaks are allowed. + --> + <property name="tokens" value="BNOT, DEC, DOT, INC, LNOT, UNARY_MINUS, + UNARY_PLUS"/> + <property name="allowLineBreaks" value="true"/> + <property name="severity" value="error"/> + </module> + + <module name="NoWhitespaceBefore"> + <!-- Checks that there is no whitespace before various unary operators. + Linebreaks are allowed. + --> + <property name="tokens" value="SEMI, DOT, POST_DEC, POST_INC"/> + <property name="allowLineBreaks" value="true"/> + <property name="severity" value="error"/> + </module> + + <module name="OperatorWrap"> + <!-- Checks that assignment operators are at the end of the line. --> + <property name="option" value="eol"/> + <property name="tokens" value="ASSIGN"/> + </module> + + <module name="ParenPad"> + <!-- Checks that there is no whitespace before close parens or after + open parens. + --> + <property name="severity" value="error"/> + </module> + + </module> +</module> + diff --git a/tools/maven/spotbugs-exclude.xml b/tools/maven/spotbugs-exclude.xml new file mode 100644 index 0000000..3647f17 --- /dev/null +++ b/tools/maven/spotbugs-exclude.xml @@ -0,0 +1,18 @@ +<?xml version="1.0" encoding="UTF-8"?> +<FindBugsFilter> + <Match> + <Bug pattern="EI_EXPOSE_REP"/> + </Match> + <Match> + <Bug pattern="EI_EXPOSE_REP2"/> + </Match> + <Match> + <Bug pattern="SE_NO_SERIALVERSIONID"/> + </Match> + <Match> + <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/> + </Match> + <Match> + <Bug pattern="SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING"/> + </Match> +</FindBugsFilter> diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml new file mode 100644 index 0000000..9c4d424 --- /dev/null +++ b/tools/maven/suppressions.xml @@ -0,0 +1,12 @@ +<?xml version="1.0"?> +<!DOCTYPE suppressions PUBLIC + "-//Puppy Crawl//DTD Suppressions 1.1//EN" + "http://www.puppycrawl.com/dtds/suppressions_1_1.dtd"> + +<suppressions> + <!-- target directory is not relevant for checkstyle --> + <suppress + files="[\\/]target[\\/]" + checks=".*"/> + +</suppressions> |
