diff options
| author | 梁超 <[email protected]> | 2023-12-18 07:52:56 +0000 |
|---|---|---|
| committer | 梁超 <[email protected]> | 2023-12-18 07:52:56 +0000 |
| commit | 12828291a9f5d4ad18173ac800ab6a4b889bdfda (patch) | |
| tree | 834c1f01bde695259162405dce5380ad6ad473b1 | |
| parent | 2542a8bfd23958eee0da0585304c5f8318471d66 (diff) | |
| parent | f21e814763292bd8341ae59fe14fa245e84f40ad (diff) | |
Merge branch 'hotfix/app-name' into 'main'
fix: modify job name
See merge request galaxy/tsg_olap/sip-rtp-correlation!24
| -rw-r--r-- | README.md | 25 | ||||
| -rw-r--r-- | pom.xml | 2 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java | 2 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java | 9 | ||||
| -rw-r--r-- | src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java | 16 |
5 files changed, 38 insertions, 16 deletions
@@ -22,18 +22,19 @@ flink run -c com.zdjizhi.flink.voip.CorrelateApp path/to/sip-rtp-correlation-<ve ## 配置项说明 -| 配置项 | 类型 | 必需 | 默认值 | 描述 | -| --------------------------- | ------------------- | ---------- | ----------------------------------------------------------- |--------------------------------------------| -| source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 | -| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties | -| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 | -| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties | -| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 | -| include.intranet.ip | BOOLEAN | N | True | 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联 | -| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 | -| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties | -| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) | -| rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) | +| 配置项 | 类型 | 必需 | 默认值 | 描述 | +|----------------------------------| ------------------- | ---------- | ---------------------------------------------------------- |-------------------------------------------| +| source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 | +| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties | +| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 | +| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties | +| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 | +| include.intranet.ip | BOOLEAN | N | True | 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联 | +| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 | +| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties | +| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) | +| rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) | +| job.name | STRING | N | correlation_sip_rtp_session | Job 名 | @@ -7,7 +7,7 @@ <groupId>com.zdjizhi</groupId> <artifactId>sip-rtp-correlation</artifactId> - <version>1.2-rc3</version> + <version>1.2.1</version> <name>Flink : SIP-RTP : Correlation</name> diff --git a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java index efb53b6..304e04d 100644 --- a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java +++ b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java @@ -103,6 +103,6 @@ public class CorrelateApp { .union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG)) .addSink(producer); - env.execute("SIP-RTP-CORRELATION"); + env.execute(config.get(JOB_NAME)); } } diff --git a/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java b/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java index 86f04b5..926c5a0 100644 --- a/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java +++ b/src/main/java/com/zdjizhi/flink/voip/conf/FusionConfigs.java @@ -95,4 +95,13 @@ public class FusionConfigs { .intType() .defaultValue(6) .withDescription("The interval at which RTP state data should be cleared."); + + /** + * Configuration option for specifying the name of a job. + */ + public static final ConfigOption<String> JOB_NAME = + ConfigOptions.key("job.name") + .stringType() + .defaultValue("correlation_sip_rtp_session") + .withDescription("The name of current job."); }
\ No newline at end of file diff --git a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java index 98bbbe0..be41377 100644 --- a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java +++ b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java @@ -21,6 +21,8 @@ import org.apache.flink.util.OutputTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.function.Function; + /** * The ErrorHandler class is responsible for handling and filtering error records from the input data stream. * It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled. @@ -112,6 +114,8 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje record.getClientPort() > 0 && record.getServerPort() > 0; + boolean cond8 = null != executeSafely(Record::getStreamDir, record); + final SIPRecord sipRecord = new SIPRecord(obj); boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) || isIPAddress(sipRecord.getResponderSdpConnectIp()); @@ -120,7 +124,7 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp()) || (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp())); boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType()); - boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() && + boolean cond6 = StreamDir.DOUBLE == executeSafely(Record::getStreamDir, sipRecord) && (includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) && (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp())); @@ -129,7 +133,7 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0; // Both client and server addresses in the data are valid. - if (cond1 && (!cond5 || cond7) && ( + if (cond1 && cond8 && (!cond5 || cond7) && ( // The address in the SIP one-way stream is valid and not an internal network address. cond2 && cond3 && cond4 && cond5 // The coordinating addresses in the SIP double directional stream are valid @@ -149,6 +153,14 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje // ====================================================================================== // ----------------------------------- private helper ----------------------------------- + public static <T, R> R executeSafely(Function<T, R> function, T v) { + try { + return function.apply(v); + } catch (Exception e) { + return null; + } + } + private static boolean isIPAddress(final String ipaddr) { if (null == ipaddr) { return false; |
